Spring Boot 中 RocketMQ 事务消息的发送与接收(完整实操)
在 Spring Boot 中实现 RocketMQ 事务消息的「发送(生产者)+ 接收(消费者)」,核心是依托 RocketMQ 事务消息的「半消息 + 本地事务 + 事务回查」机制,结合 Spring Boot 封装的
rocketmq-spring-boot-starter 快速集成。以下是从零到一的实操步骤,包含完整代码、配置和测试验证。一、环境准备
1. 基础依赖(pom.xml)
引入 Spring Boot 与 RocketMQ 整合依赖,版本需匹配(推荐
rocketmq-spring-boot-starter:2.2.3 适配 RocketMQ 4.9.x):xml
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.15</version>
</dependency>
<!-- RocketMQ Spring Boot 整合包 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- JSON 序列化(解析消息体) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. RocketMQ 配置(application.yml)
配置 NameServer 地址、生产者 / 消费者核心参数(事务消息需指定唯一生产者组):
yaml
spring:
application:
name: rocketmq-tx-demo
# RocketMQ 核心配置
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址(集群用;分隔)
producer:
group: tx-producer-group # 事务生产者组(必须唯一,监听器需绑定)
send-message-timeout: 3000 # 消息发送超时时间
retry-times-when-send-failed: 3 # 发送失败重试次数
consumer:
group: tx-consumer-group # 事务消息消费者组(必须唯一)
consume-thread-max: 20 # 最大消费线程数
retry-times-when-consume-failed: 3 # 消费失败重试次数(默认16次,建议3-5次)
3. RocketMQ 服务部署
确保本地 / 服务器已启动 RocketMQ 的 NameServer 和 Broker(参考命令):
bash
运行
# 启动 NameServer
nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv &
# 启动 Broker(指定 NameServer 地址)
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -n 127.0.0.1:9876 &
二、步骤 1:实现事务消息发送(生产者)
事务消息生产者需完成「发送半消息 + 执行本地事务 + 事务回查」三个核心动作,核心是通过
RocketMQLocalTransactionListener 绑定本地事务逻辑。1. 定义业务 DTO(以订单支付为例)
java
运行
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
// 订单支付消息体
@Data
@AllArgsConstructor
public class OrderPayDTO {
private String orderId; // 订单ID(业务唯一标识)
private String userId; // 用户ID
private BigDecimal payAmount;// 支付金额
}
2. 事务消息生产者核心代码
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class TxMessageProducer {
// 注入 Spring Boot 封装的 RocketMQ 模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 步骤1:发送事务半消息(触发本地事务)
* @param orderId 订单ID
* @param userId 用户ID
* @param payAmount 支付金额
*/
public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
// 1. 构建业务消息体
OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
String msgBody = JSON.toJSONString(dto);
// 2. 构建事务消息(必须设置唯一事务ID,避免回查冲突)
Message<String> txMessage = MessageBuilder
.withPayload(msgBody)
// 事务唯一标识(RocketMQ 用于回查)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
// 自定义头(便于回查时快速获取关键信息)
.setHeader("orderId", orderId)
.build();
// 3. 发送半消息 + 绑定本地事务监听器
// 参数说明:
// - "order_pay_topic:tx_tag":主题+标签(标签用于消息过滤)
// - txMessage:事务消息体
// - userId:附加参数(传递到本地事务方法)
rocketMQTemplate.sendMessageInTransaction(
"order_pay_topic:tx_tag",
txMessage,
userId
);
System.out.println("半消息发送成功,订单ID:" + orderId);
}
/**
* 步骤2:本地事务监听器(核心:执行本地事务 + 事务回查)
* 注解说明:txProducerGroup 必须与 application.yml 中生产者组一致!
*/
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
/**
* 执行本地事务(核心业务逻辑:如扣减余额、更新订单状态)
* @param msg 半消息内容
* @param arg 发送半消息时传递的附加参数(此处为 userId)
* @return 事务状态:COMMIT(提交)/ROLLBACK(回滚)/UNKNOWN(触发回查)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 解析消息体
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
String userId = (String) arg;
// ========== 核心:执行本地事务(实际场景操作数据库)==========
// 示例逻辑:1. 扣减用户余额 2. 更新订单为已支付
boolean deductSuccess = deductUserBalance(userId, dto.getPayAmount());
boolean updateOrderSuccess = updateOrderStatus(orderId, "PAID");
if (deductSuccess && updateOrderSuccess) {
System.out.println("本地事务执行成功,订单ID:" + orderId);
// 提交事务:Broker 会将半消息转为可消费状态
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("本地事务执行失败,订单ID:" + orderId);
// 回滚事务:Broker 会删除半消息,消费者不会收到
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("本地事务执行异常:" + e.getMessage());
// 返回 UNKNOWN:Broker 会定时触发回查(默认60秒一次)
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 事务回查(解决:本地事务成功,但 COMMIT 指令未送达 Broker 的场景)
* Broker 定时调用此方法,确认本地事务最终状态
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// 解析消息体,获取订单ID
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
// ========== 核心:回查本地事务状态(必须查持久化存储,如数据库)==========
String orderStatus = getOrderStatusFromDB(orderId);
if ("PAID".equals(orderStatus)) {
System.out.println("事务回查成功,订单已支付,提交消息:" + orderId);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else if ("UNPAID".equals(orderStatus)) {
System.out.println("事务回查失败,订单未支付,回滚消息:" + orderId);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
} else {
// 状态未知,继续回查(最多回查15次,可配置)
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
System.err.println("事务回查异常:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
// ------------------- 模拟数据库操作 -------------------
// 模拟扣减用户余额
private boolean deductUserBalance(String userId, BigDecimal amount) {
// 实际逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
return true; // 模拟成功,可改为 false 测试回滚
}
// 模拟更新订单状态
private boolean updateOrderStatus(String orderId, String status) {
// 实际逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
return true;
}
// 模拟查询订单状态(用于事务回查)
private String getOrderStatusFromDB(String orderId) {
// 实际逻辑:SELECT status FROM t_order WHERE order_id = ?
return "PAID";
}
}
}
三、步骤 2:实现事务消息接收(消费者)
消费者仅能收到「生产者提交事务」后的消息,需保证消费幂等性(避免重复消费导致数据错误)。
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
// 监听配置:主题+消费者组+标签过滤
@RocketMQMessageListener(
topic = "order_pay_topic", // 与生产者发送的主题一致
consumerGroup = "tx-consumer-group", // 与 application.yml 中消费者组一致
selectorExpression = "tx_tag", // 只消费 tx_tag 标签的消息
consumeMode = org.apache.rocketmq.common.protocol.heartbeat.ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING // 集群模式
)
public class TxMessageConsumer implements RocketMQListener<String> {
/**
* 消费事务消息(仅当生产者提交事务后,才会触发此方法)
* @param msg 消息体(与生产者发送的内容一致)
*/
@Override
public void onMessage(String msg) {
try {
// 1. 解析消息体
OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
String orderId = dto.getOrderId();
System.out.println("收到事务消息,开始消费:" + orderId);
// 2. 幂等校验(核心!避免重复消费)
if (checkConsumed(orderId)) {
System.out.println("消息已消费,跳过处理:" + orderId);
return;
}
// 3. 执行业务逻辑(如创建物流、发送短信)
boolean createLogistics = createLogistics(orderId);
boolean sendSms = sendPaySuccessSms(dto.getUserId(), orderId);
if (createLogistics && sendSms) {
// 4. 记录消费状态(用于幂等校验)
recordConsumeStatus(orderId);
System.out.println("消息消费成功:" + orderId);
} else {
System.err.println("消息消费失败:" + orderId);
// 抛出异常触发重试(默认重试3次,可配置)
throw new RuntimeException("消费失败,触发重试");
}
} catch (Exception e) {
System.err.println("消费异常:" + e.getMessage());
throw new RuntimeException(e); // 必须抛出异常,否则 RocketMQ 认为消费成功
}
}
// ------------------- 模拟消费逻辑 -------------------
// 幂等校验:查询是否已消费过该订单
private boolean checkConsumed(String orderId) {
// 实际逻辑:SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
return false; // 模拟未消费
}
// 记录消费状态(落地到数据库)
private void recordConsumeStatus(String orderId) {
// 实际逻辑:INSERT INTO t_consume_log (order_id, consume_time) VALUES (?, NOW())
}
// 模拟创建物流单
private boolean createLogistics(String orderId) {
// 实际逻辑:INSERT INTO t_logistics (order_id, status) VALUES (?, 'CREATE')
return true;
}
// 模拟发送支付成功短信
private boolean sendPaySuccessSms(String userId, String orderId) {
// 实际逻辑:调用短信接口发送通知
return true;
}
}
四、步骤 3:测试事务消息的发送与接收
编写测试类,验证完整流程:
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
@SpringBootTest
public class RocketMQTxTest {
@Autowired
private TxMessageProducer txProducer;
@Test
public void testTxMessageSendAndConsume() throws InterruptedException {
// 模拟订单支付:订单ID=ORDER_20250101,用户ID=USER_001,支付金额=100元
txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
// 等待事务执行和消费完成(实际生产无需休眠,此处为测试)
Thread.sleep(10000);
}
}
五、核心流程验证与日志说明
1. 正常流程日志(本地事务成功)
plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行成功,订单ID:ORDER_20250101
收到事务消息,开始消费:ORDER_20250101
消息消费成功:ORDER_20250101
2. 本地事务失败日志
plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行失败,订单ID:ORDER_20250101
# 消费者无日志(Broker 删除半消息,未投递)
3. 事务回查日志(网络异常导致 COMMIT 未送达)
plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行成功,订单ID:ORDER_20250101
事务回查成功,订单已支付,提交消息:ORDER_20250101
收到事务消息,开始消费:ORDER_20250101
消息消费成功:ORDER_20250101
六、关键避坑点
- 生产者组与监听器绑定不一致:
@RocketMQTransactionListener的txProducerGroup必须和application.yml中rocketmq.producer.group完全一致,否则监听器不生效; - 本地事务状态未落地:事务回查时必须从数据库 / 日志中获取状态,不能仅依赖内存,否则宕机后无法确认状态;
- 消费端未做幂等:RocketMQ 重试机制会导致重复消费,必须基于订单 ID / 消息 ID 实现幂等;
- 回查次数过多:默认回查 15 次,可通过 Broker 配置
transactionCheckMax=10减少,避免无效回查。
七、核心总结
Spring Boot 中 RocketMQ 事务消息的发送与接收核心逻辑:
- 发送端:通过
sendMessageInTransaction发送半消息,绑定RocketMQLocalTransactionListener执行本地事务和回查,确保「本地事务成功则消息投递」; - 接收端:监听指定主题,消费前做幂等校验,消费失败抛出异常触发重试,最终失败进入死信队列;
- 可靠性:依赖「半消息 + 回查」保证消息与本地事务一致性,消费端幂等保证数据不重复处理。
该方案适用于电商订单、资金转账、物流同步等需要「最终一致性」的分布式场景,是 RocketMQ 事务消息的标准落地方式。
阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9919.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐