Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例
以下是 Spring Boot + RocketMQ 事务消息 完整可运行的代码示例,包含生产者(事务消息发送)、消费者(消息接收)、核心配置,基于 RocketMQ 4.9.x + Spring Boot 2.7.x 实现,可直接复制到项目中运行。
一、前置依赖(pom.xml)
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>rocketmq-tx-demo</artifactId>
<version>1.0.0</version>
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ Spring Boot 整合 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、核心配置(application.yml)
yaml
spring:
application:
name: rocketmq-tx-demo
# RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876 # 本地NameServer地址(集群用;分隔)
producer:
group: tx-producer-group # 事务生产者组(必须唯一)
send-message-timeout: 3000 # 发送超时时间
retry-times-when-send-failed: 3 # 发送失败重试次数
consumer:
group: tx-consumer-group # 消费者组(必须唯一)
consume-thread-max: 20 # 最大消费线程数
retry-times-when-consume-failed: 3 # 消费失败重试次数
三、消息体 DTO(OrderPayDTO.java)
java
运行
package com.example.rocketmqtxdemo.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
/**
* 订单支付消息体
*/
@Data
@AllArgsConstructor
public class OrderPayDTO {
private String orderId; // 订单ID(业务唯一标识)
private String userId; // 用户ID
private BigDecimal payAmount;// 支付金额
}
四、事务消息生产者(TxMessageProducer.java)
java
运行
package com.example.rocketmqtxdemo.producer;
import com.alibaba.fastjson.JSON;
import com.example.rocketmqtxdemo.dto.OrderPayDTO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* 事务消息生产者(发送半消息 + 执行本地事务 + 事务回查)
*/
@Service
public class TxMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务半消息
*/
public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
// 1. 构建业务消息体
OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
String msgBody = JSON.toJSONString(dto);
// 2. 构建事务消息(必须设置唯一事务ID)
Message<String> txMessage = MessageBuilder
.withPayload(msgBody)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.setHeader("orderId", orderId)
.build();
// 3. 发送半消息 + 绑定本地事务监听器
rocketMQTemplate.sendMessageInTransaction(
"order_pay_topic:tx_tag", // 主题+标签
txMessage, // 消息体
userId // 附加参数(传递到本地事务方法)
);
System.out.println("【生产者】半消息发送成功,订单ID:" + orderId);
}
/**
* 本地事务监听器(核心:执行本地事务 + 事务回查)
*/
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
/**
* 执行本地事务(核心业务逻辑:扣减余额、更新订单状态)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 解析消息体
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
String userId = (String) arg;
// ========== 模拟本地事务:扣减用户余额 + 更新订单状态 ==========
boolean deductSuccess = deductUserBalance(userId, dto.getPayAmount());
boolean updateOrderSuccess = updateOrderStatus(orderId, "PAID");
if (deductSuccess && updateOrderSuccess) {
System.out.println("【生产者】本地事务执行成功,订单ID:" + orderId);
// 提交事务:Broker投递消息给消费者
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("【生产者】本地事务执行失败,订单ID:" + orderId);
// 回滚事务:Broker删除半消息
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("【生产者】本地事务执行异常:" + e.getMessage());
// 返回UNKNOWN,触发Broker回查
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 事务回查(解决:本地事务成功但COMMIT指令未送达Broker)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
String msgBody = (String) msg.getPayload();
OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
String orderId = dto.getOrderId();
// ========== 模拟回查本地事务状态(查数据库) ==========
String orderStatus = getOrderStatusFromDB(orderId);
if ("PAID".equals(orderStatus)) {
System.out.println("【生产者】事务回查成功,提交消息,订单ID:" + orderId);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("【生产者】事务回查失败,回滚消息,订单ID:" + orderId);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("【生产者】事务回查异常:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
// ------------------- 模拟数据库操作 -------------------
private boolean deductUserBalance(String userId, BigDecimal amount) {
// 实际逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
return true; // 模拟成功,改为false可测试回滚
}
private boolean updateOrderStatus(String orderId, String status) {
// 实际逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
return true;
}
private String getOrderStatusFromDB(String orderId) {
// 实际逻辑:SELECT status FROM t_order WHERE order_id = ?
return "PAID";
}
}
}
五、事务消息消费者(TxMessageConsumer.java)
java
运行
package com.example.rocketmqtxdemo.consumer;
import com.alibaba.fastjson.JSON;
import com.example.rocketmqtxdemo.dto.OrderPayDTO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 事务消息消费者(仅接收生产者提交的消息)
*/
@Service
@RocketMQMessageListener(
topic = "order_pay_topic", // 与生产者主题一致
consumerGroup = "tx-consumer-group", // 与配置文件消费者组一致
selectorExpression = "tx_tag" // 过滤标签
)
public class TxMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
try {
// 1. 解析消息体
OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
String orderId = dto.getOrderId();
System.out.println("【消费者】收到事务消息,订单ID:" + orderId);
// 2. 幂等校验(核心!避免重复消费)
if (checkConsumed(orderId)) {
System.out.println("【消费者】消息已消费,跳过处理,订单ID:" + orderId);
return;
}
// 3. 模拟消费逻辑:创建物流单 + 发送短信
boolean createLogistics = createLogistics(orderId);
boolean sendSms = sendPaySuccessSms(dto.getUserId(), orderId);
if (createLogistics && sendSms) {
// 4. 记录消费状态(落地到数据库)
recordConsumeStatus(orderId);
System.out.println("【消费者】消息消费成功,订单ID:" + orderId);
} else {
System.err.println("【消费者】消息消费失败,订单ID:" + orderId);
throw new RuntimeException("消费失败,触发重试");
}
} catch (Exception e) {
System.err.println("【消费者】消费异常:" + e.getMessage());
throw new RuntimeException(e); // 抛出异常触发重试
}
}
// ------------------- 模拟消费逻辑 -------------------
// 幂等校验:查询是否已消费
private boolean checkConsumed(String orderId) {
// 实际逻辑:SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
return false;
}
// 记录消费状态
private void recordConsumeStatus(String orderId) {
// 实际逻辑:INSERT INTO t_consume_log (order_id, consume_time) VALUES (?, NOW())
}
// 模拟创建物流单
private boolean createLogistics(String orderId) {
return true;
}
// 模拟发送支付成功短信
private boolean sendPaySuccessSms(String userId, String orderId) {
return true;
}
}
六、启动类(RocketmqTxDemoApplication.java)
java
运行
package com.example.rocketmqtxdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqTxDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqTxDemoApplication.class, args);
}
}
七、测试类(RocketMQTxTest.java)
java
运行
package com.example.rocketmqtxdemo;
import com.example.rocketmqtxdemo.producer.TxMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
@SpringBootTest
public class RocketMQTxTest {
@Autowired
private TxMessageProducer txProducer;
@Test
public void testTxMessageSendAndConsume() throws InterruptedException {
// 模拟发送订单支付事务消息
txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
// 等待事务执行和消费完成(测试用,生产环境无需休眠)
Thread.sleep(10000);
}
}
八、运行说明
- 启动 RocketMQ:先启动 NameServer 和 Broker(本地部署参考命令):
bash运行
# 启动NameServer nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv & # 启动Broker nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -n 127.0.0.1:9876 & - 运行测试类:执行
RocketMQTxTest的testTxMessageSendAndConsume方法,控制台输出如下:plaintext【生产者】半消息发送成功,订单ID:ORDER_20250101 【生产者】本地事务执行成功,订单ID:ORDER_20250101 【消费者】收到事务消息,订单ID:ORDER_20250101 【消费者】消息消费成功,订单ID:ORDER_20250101
九、核心关键点
- 事务监听器绑定:
@RocketMQTransactionListener的txProducerGroup必须和配置文件中rocketmq.producer.group一致; - 幂等性:消费者必须通过「订单 ID / 消息 ID」做幂等校验,避免重复消费;
- 事务回查:本地事务状态需落地到数据库,回查时从数据库读取,不能依赖内存;
- 异常处理:消费失败抛出异常触发重试,重试耗尽后消息进入死信队列(
%DLQ%tx-consumer-group)。
该示例覆盖了事务消息的完整流程,可直接适配电商订单、资金转账等「最终一致性」业务场景。
阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9920.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐