在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(核心配置)
以下是 Spring Boot 中使用 RocketMQ 事务消息的核心配置 + 极简可运行代码示例,聚焦关键配置和核心逻辑,剔除冗余代码,便于快速集成到项目中。
一、核心依赖(仅保留关键)
xml
<!-- pom.xml 核心依赖 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ 核心整合包 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
二、核心配置文件(application.yml)
yaml
# 仅保留 RocketMQ 事务消息核心配置
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址
producer:
group: tx-producer-group # 事务生产者组(必须唯一,监听器需绑定)
send-message-timeout: 3000 # 发送超时
consumer:
group: tx-consumer-group # 消费者组(必须唯一)
retry-times-when-consume-failed: 3 # 消费失败重试次数
三、核心代码:事务消息生产者(发送 + 本地事务)
java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class TxProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 1. 发送事务半消息(核心方法)
public void sendTxMsg(String bizId, String content) {
// 构建事务消息(必须设置事务ID)
Message<String> txMsg = MessageBuilder
.withPayload(content)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送半消息 + 绑定本地事务监听器
rocketMQTemplate.sendMessageInTransaction(
"tx_topic:tx_tag", // 主题+标签
txMsg, // 消息体
bizId // 附加参数(传递到本地事务)
);
}
// 2. 本地事务监听器(事务消息核心配置)
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
// 核心1:执行本地事务(业务逻辑)
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String bizId = (String) arg;
String content = (String) msg.getPayload();
// ========== 本地事务核心逻辑(如操作数据库)==========
boolean isSuccess = doLocalBiz(bizId, content);
if (isSuccess) {
// 提交:消息可被消费
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
// 回滚:消息被删除
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
// 未知:触发Broker回查
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
// 核心2:事务回查(解决状态丢失)
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String content = (String) msg.getPayload();
// 回查本地事务状态(查DB/日志)
boolean isSuccess = checkLocalBiz(content);
return isSuccess ?
org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT :
org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
// 模拟本地事务
private boolean doLocalBiz(String bizId, String content) {
return true; // 业务成功返回true,失败返回false
}
// 模拟事务回查
private boolean checkLocalBiz(String content) {
return true;
}
}
}
四、核心代码:事务消息消费者(接收 + 幂等)
java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
// 核心配置:监听主题+消费者组+标签
@RocketMQMessageListener(
topic = "tx_topic",
consumerGroup = "tx-consumer-group",
selectorExpression = "tx_tag"
)
public class TxConsumer implements RocketMQListener<String> {
// 消费核心方法(仅接收生产者提交的消息)
@Override
public void onMessage(String msg) {
try {
// 核心:幂等校验(必加!避免重复消费)
if (checkIdempotent(msg)) {
System.out.println("消息已消费,跳过:" + msg);
return;
}
// ========== 消费核心逻辑 ==========
doConsumeBiz(msg);
// 记录消费状态(幂等依据)
recordConsumeStatus(msg);
System.out.println("消费成功:" + msg);
} catch (Exception e) {
// 抛出异常触发重试
throw new RuntimeException("消费失败,重试", e);
}
}
// 幂等校验(核心)
private boolean checkIdempotent(String msg) {
return false; // 未消费返回false,已消费返回true
}
// 记录消费状态(落地DB)
private void recordConsumeStatus(String msg) {}
// 消费业务逻辑
private void doConsumeBiz(String msg) {}
}
五、启动类(最简)
java
运行
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMQTxApp {
public static void main(String[] args) {
SpringApplication.run(RocketMQTxApp.class, args);
}
}
六、测试代码(验证核心流程)
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TxTest {
@Autowired
private TxProducer txProducer;
@Test
public void testTxMsg() throws InterruptedException {
// 发送事务消息
txProducer.sendTxMsg("BIZ_001", "测试事务消息");
// 等待消费完成
Thread.sleep(5000);
}
}
核心配置关键点总结
| 配置项 | 作用 |
|---|---|
rocketmq.producer.group |
事务生产者组,必须与 @RocketMQTransactionListener 的 txProducerGroup 一致 |
RocketMQHeaders.TRANSACTION_ID |
事务唯一 ID,避免回查冲突 |
executeLocalTransaction |
本地事务执行逻辑,返回 COMMIT/ROLLBACK/UNKNOWN |
checkLocalTransaction |
事务回查逻辑,解决「本地事务成功但 COMMIT 指令丢失」问题 |
| 消费者幂等校验 | 必须实现(如基于 msg / 业务 ID),避免重复消费 |
运行前提
- 本地 / 服务器已启动 RocketMQ NameServer(9876 端口)和 Broker;
- 核心配置中
name-server地址需与实际部署一致。
此示例保留了 RocketMQ 事务消息的核心配置和核心逻辑,可直接复制到项目中,替换
doLocalBiz/doConsumeBiz 为实际业务逻辑即可快速集成。阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9921.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐