Spring Boot 中 RocketMQ 事务消息完整配置指南
在 Spring Boot 中配置 RocketMQ 事务消息,核心是「依赖引入 + 基础配置 + 事务生产者 / 监听器配置 + 消费端配置」,以下是分步实操,覆盖配置文件、核心代码、关键优化,可直接落地。
一、前置准备
- 环境要求:
- Spring Boot 2.x(推荐 2.3.x+);
- RocketMQ 4.3.0+(事务消息最低支持版本,推荐 4.9.x);
- JDK 8+。
- RocketMQ 部署:已启动 NameServer 和 Broker(参考之前的 RocketMQ 安装步骤)。
二、步骤 1:引入依赖(pom.xml)
核心引入
rocketmq-spring-boot-starter,版本需与 RocketMQ 服务端兼容(推荐 2.2.3,适配 RocketMQ 4.9.x):xml
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ 整合 Spring Boot 核心依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- JSON 序列化(可选,推荐) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- 测试依赖(可选) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
三、步骤 2:核心配置文件(application.yml/application.properties)
1. YAML 格式(推荐)
yaml
spring:
application:
name: rocketmq-tx-demo # 应用名称
# RocketMQ 基础配置
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址(多个用;分隔,如 192.168.1.100:9876;192.168.1.101:9876)
producer:
group: tx-producer-group # 事务生产者组(必须唯一,事务监听器需绑定此值)
send-message-timeout: 3000 # 消息发送超时时间(ms)
retry-times-when-send-failed: 2 # 同步发送失败重试次数
retry-times-when-send-async-failed: 2 # 异步发送失败重试次数
compress-message-body-threshold: 4096 # 消息体压缩阈值(超过4K自动压缩)
consumer:
group: tx-consumer-group # 事务消息消费者组(必须唯一)
consume-thread-min: 10 # 最小消费线程数
consume-thread-max: 20 # 最大消费线程数
consume-message-batch-max-size: 1 # 批量消费大小(事务消息建议单条消费)
retry-times-when-consume-failed: 3 # 消费失败重试次数(默认16次,可自定义)
2. Properties 格式(兼容老项目)
properties
# 应用名称
spring.application.name=rocketmq-tx-demo
# RocketMQ NameServer 地址
rocketmq.name-server=127.0.0.1:9876
# 生产者配置
rocketmq.producer.group=tx-producer-group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=2
rocketmq.producer.compress-message-body-threshold=4096
# 消费者配置
rocketmq.consumer.group=tx-consumer-group
rocketmq.consumer.consume-thread-min=10
rocketmq.consumer.consume-thread-max=20
rocketmq.consumer.consume-message-batch-max-size=1
rocketmq.consumer.retry-times-when-consume-failed=3
四、步骤 3:事务消息生产者配置(核心)
需实现「发送半消息 + 本地事务监听器」,监听器是事务消息的核心,必须绑定生产者组:
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.UUID;
// 1. 业务DTO(以订单支付为例)
class OrderPayDTO {
private String orderId; // 订单ID
private String userId; // 用户ID
private BigDecimal payAmount;// 支付金额
// 构造器、getter/setter 省略
public OrderPayDTO(String orderId, String userId, BigDecimal payAmount) {
this.orderId = orderId;
this.userId = userId;
this.payAmount = payAmount;
}
}
// 2. 事务消息生产者核心配置
@Component
public class RocketMQTxProducer {
// 注入Spring Boot封装的RocketMQ模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务半消息(触发本地事务)
*/
public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
// 1. 构建业务消息体
OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
String msgBody = JSON.toJSONString(dto);
// 2. 构建事务消息(必须设置唯一事务ID,避免回查冲突)
Message<String> txMessage = MessageBuilder
.withPayload(msgBody)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) // 事务唯一标识
.setHeader("orderId", orderId) // 自定义头(可选,便于回查)
.build();
// 3. 发送半消息 + 绑定本地事务监听器
// 参数1:topic:tag(tag可选,用于消息过滤)
// 参数2:事务消息体
// 参数3:附加参数(可传递本地事务上下文,如userId)
rocketMQTemplate.sendMessageInTransaction(
"order_pay_topic:tx_pay_tag",
txMessage,
userId // 附加参数,会传递到本地事务方法
);
}
/**
* 本地事务监听器(核心:执行本地事务 + 事务回查)
* 注解说明:txProducerGroup 必须与配置文件中生产者组一致!
*/
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
/**
* 第一步:执行本地事务(核心业务逻辑,如扣减库存、更新订单状态)
* @param msg 半消息内容
* @param arg 发送半消息时传递的附加参数(此处为userId)
* @return 事务状态:COMMIT(提交)/ROLLBACK(回滚)/UNKNOWN(未知,触发回查)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 解析消息体
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
String userId = (String) arg;
// ========== 核心:执行本地事务(实际场景操作数据库)==========
// 示例:1. 扣减用户余额 2. 更新订单为已支付
boolean txSuccess = this.updateOrderStatus(orderId, "PAID")
&& this.deductUserBalance(userId, dto.getPayAmount());
if (txSuccess) {
System.out.println("本地事务执行成功:订单" + orderId + "支付完成");
// 提交事务:Broker会投递消息给消费者
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("本地事务执行失败:订单" + orderId + "支付失败");
// 回滚事务:Broker会删除半消息,消费者不会收到
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("本地事务执行异常:" + e.getMessage());
// 返回UNKNOWN:Broker会定时回查(默认60秒一次)
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 第二步:事务回查(解决生产者宕机/网络异常导致的状态丢失)
* Broker会定时调用此方法,确认本地事务最终状态
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// 解析消息体,获取订单ID
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
// ========== 核心:回查本地事务状态(查数据库/事务日志)==========
String orderStatus = this.getOrderStatus(orderId);
if ("PAID".equals(orderStatus)) {
// 本地事务已成功,提交消息
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else if ("UNPAID".equals(orderStatus)) {
// 本地事务未执行,回滚消息
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
} else {
// 状态未知,继续回查
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
System.err.println("事务回查异常:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
// 模拟:更新订单状态(实际场景操作数据库)
private boolean updateOrderStatus(String orderId, String status) {
// 业务逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
return true;
}
// 模拟:扣减用户余额(实际场景操作数据库)
private boolean deductUserBalance(String userId, BigDecimal amount) {
// 业务逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
return true;
}
// 模拟:查询订单状态(用于事务回查)
private String getOrderStatus(String orderId) {
// 业务逻辑:SELECT status FROM t_order WHERE order_id = ?
return "PAID";
}
}
}
五、步骤 4:事务消息消费者配置
消费端需监听事务消息,且必须保证幂等性(避免重复消费):
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
// 监听事务消息的主题+标签,消费者组必须与配置文件一致
@RocketMQMessageListener(
topic = "order_pay_topic",
consumerGroup = "tx-consumer-group",
selectorExpression = "tx_pay_tag", // 只消费tx_pay_tag标签的消息
consumeMode = org.apache.rocketmq.common.protocol.heartbeat.ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING // 集群模式
)
public class OrderPayTxConsumer implements RocketMQListener<String> {
/**
* 消费事务消息(仅当生产者提交事务后,才会收到消息)
* 注意:消费失败会自动重试,需保证幂等性!
*/
@Override
public void onMessage(String msg) {
try {
OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
String orderId = dto.getOrderId();
// ========== 幂等校验(核心!避免重复消费)==========
if (this.checkConsumed(orderId)) {
System.out.println("消息已消费:订单" + orderId + ",跳过处理");
return;
}
// ========== 消费逻辑(如触发物流创建、发送短信通知)==========
boolean consumeSuccess = this.createLogistics(orderId) && this.sendPaySms(dto.getUserId(), orderId);
if (consumeSuccess) {
System.out.println("消费成功:订单" + orderId + ",创建物流+发送短信完成");
// 记录消费状态(用于幂等校验)
this.recordConsumeStatus(orderId, "SUCCESS");
} else {
System.err.println("消费失败:订单" + orderId);
// 抛出异常触发重试
throw new RuntimeException("消费失败,重试");
}
} catch (Exception e) {
System.err.println("消费异常:" + e.getMessage());
throw new RuntimeException(e); // 触发重试(默认最多16次)
}
}
// 模拟:幂等校验(查消费日志)
private boolean checkConsumed(String orderId) {
// SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
return false;
}
// 模拟:记录消费状态
private void recordConsumeStatus(String orderId, String status) {
// INSERT INTO t_consume_log (order_id, status) VALUES (?, ?)
}
// 模拟:创建物流单
private boolean createLogistics(String orderId) {
return true;
}
// 模拟:发送支付成功短信
private boolean sendPaySms(String userId, String orderId) {
return true;
}
}
六、步骤 5:测试事务消息
编写测试类验证配置是否生效:
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
@SpringBootTest
public class RocketMQTxTest {
@Autowired
private RocketMQTxProducer txProducer;
@Test
public void testTxMessage() throws InterruptedException {
// 模拟订单支付:订单ID=ORDER_20250101,用户ID=USER_001,支付金额=100元
txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
// 等待事务执行和消费完成
Thread.sleep(10000);
}
}
七、关键配置优化与避坑
1. 事务回查参数调整(Broker 端)
修改 RocketMQ 的
broker.conf,优化回查频率(默认 60 秒,可缩短):properties
# 事务回查间隔(ms),建议10秒
transactionCheckInterval=10000
# 最大回查次数(默认15次,可改为10次)
transactionCheckMax=10
# 回查线程数(默认1,高并发场景增加)
transactionCheckThreadPoolNums=5
2. 消费端重试与死信队列
- 消费失败重试次数:配置文件中
retry-times-when-consume-failed建议设为 3~5 次,过多重试易导致消息堆积; - 死信队列:重试次数耗尽后,消息进入死信队列(命名规则:
%DLQ%+消费者组),需监控并人工处理。
3. 核心避坑点
| 问题场景 | 原因 | 解决方案 |
|---|---|---|
| 事务监听器不生效 | @RocketMQTransactionListener 的 txProducerGroup 与配置文件生产者组不一致 |
严格保证两者名称相同 |
| 回查时获取不到事务状态 | 本地事务状态未落地(如仅存内存) | 必须将事务状态写入数据库 / 日志,回查时从持久化存储读取 |
| 消费端重复消费 | 未做幂等校验 | 基于订单 ID / 消息 ID 实现幂等,避免重复执行业务逻辑 |
| 半消息发送成功但本地事务未执行 | 生产者宕机 / 网络异常 | 依赖 Broker 的事务回查机制,确保回查逻辑准确 |
八、核心总结
Spring Boot 中配置 RocketMQ 事务消息的核心要点:
- 配置文件:重点配置生产者组、NameServer 地址,消费者组需唯一;
- 生产者:通过
sendMessageInTransaction发送半消息,事务监听器必须绑定生产者组; - 本地事务:
executeLocalTransaction执行核心业务,checkLocalTransaction处理回查; - 消费端:必须实现幂等,避免重复消费导致数据不一致;
- 运维:监控事务回查次数、死信队列,及时处理异常消息。
该配置适配电商、金融等绝大多数「最终一致性」场景,是分布式事务的主流解决方案。
阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9918.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐