在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(极简可运行代码示例)
以下是 Spring Boot + RocketMQ 事务消息 的极简可运行代码示例,仅保留核心逻辑,剔除所有冗余代码,可直接复制运行。
一、环境准备
- 本地启动 RocketMQ(NameServer: 9876,Broker 正常运行);
- JDK 8+、Spring Boot 2.7.x。
二、核心依赖(pom.xml)
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
</parent>
<groupId>com.example</groupId>
<artifactId>rocketmq-tx-mini</artifactId>
<version>1.0</version>
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ 整合 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、核心配置(application.yml)
yaml
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ NameServer 地址
producer:
group: tx-group # 事务生产者组(必须唯一)
consumer:
group: tx-consumer-group # 消费者组(必须唯一)
四、核心代码(仅 3 个文件)
1. 启动类(TxApplication.java)
java
运行
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TxApplication {
public static void main(String[] args) {
SpringApplication.run(TxApplication.class, args);
}
}
2. 事务消息生产者(TxProducer.java)
java
运行
package com.example;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class TxProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送事务消息
public void sendTxMsg(String msg) {
// 构建事务消息(必须加事务ID)
Message<String> txMessage = MessageBuilder
.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送半消息 + 绑定本地事务
rocketMQTemplate.sendMessageInTransaction("tx-topic", txMessage, null);
System.out.println("半消息发送成功:" + msg);
}
// 本地事务监听器(核心)
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-group")
public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
// 执行本地事务
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String content = (String) msg.getPayload();
try {
// 模拟本地事务(成功则提交,失败则回滚)
boolean success = true; // 改为false测试回滚
if (success) {
System.out.println("本地事务执行成功,提交消息:" + content);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("本地事务执行失败,回滚消息:" + content);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
// 事务回查(兜底)
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 模拟回查:默认提交
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
}
}
}
3. 事务消息消费者(TxConsumer.java)
java
运行
package com.example;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "tx-consumer-group")
public class TxConsumer implements RocketMQListener<String> {
// 消费消息(仅接收提交的事务消息)
@Override
public void onMessage(String msg) {
// 模拟消费逻辑(幂等校验可自行补充)
System.out.println("消费事务消息:" + msg);
}
}
五、测试代码(TxTest.java)
java
运行
package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TxTest {
@Autowired
private TxProducer txProducer;
@Test
public void testTxMsg() throws InterruptedException {
// 发送事务消息
txProducer.sendTxMsg("Hello RocketMQ TX!");
// 等待消费完成
Thread.sleep(3000);
}
}
六、运行结果
正常流程(本地事务成功)
plaintext
半消息发送成功:Hello RocketMQ TX!
本地事务执行成功,提交消息:Hello RocketMQ TX!
消费事务消息:Hello RocketMQ TX!
本地事务失败(修改 success = false)
plaintext
半消息发送成功:Hello RocketMQ TX!
本地事务执行失败,回滚消息:Hello RocketMQ TX!
# 消费者无输出(消息被回滚)
核心关键点
- 事务组绑定:
@RocketMQTransactionListener的txProducerGroup必须和配置中rocketmq.producer.group一致; - 事务 ID:消息必须设置
RocketMQHeaders.TRANSACTION_ID,否则回查异常; - 本地事务状态:
COMMIT(投递消息)、ROLLBACK(删除消息)、UNKNOWN(触发回查); - 运行前提:确保 RocketMQ NameServer 和 Broker 已启动(默认端口 9876)。
此示例是极简可运行版本,仅保留事务消息的核心逻辑,实际业务中只需替换「本地事务」和「消费逻辑」即可。
阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9922.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐