在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(极简可运行代码示例)
以下是 Spring Boot + RocketMQ 事务消息 的极简可运行代码示例,仅保留核心逻辑,剔除所有冗余代码,可直接复制运行。
- 本地启动 RocketMQ(NameServer: 9876,Broker 正常运行);
- JDK 8+、Spring Boot 2.7.x。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
</parent>
<groupId>com.example</groupId>
<artifactId>rocketmq-tx-mini</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: tx-group
consumer:
group: tx-consumer-group
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TxApplication {
public static void main(String[] args) {
SpringApplication.run(TxApplication.class, args);
}
}
package com.example;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class TxProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTxMsg(String msg) {
Message<String> txMessage = MessageBuilder
.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
rocketMQTemplate.sendMessageInTransaction("tx-topic", txMessage, null);
System.out.println("半消息发送成功:" + msg);
}
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-group")
public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String content = (String) msg.getPayload();
try {
boolean success = true;
if (success) {
System.out.println("本地事务执行成功,提交消息:" + content);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("本地事务执行失败,回滚消息:" + content);
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
}
}
}
package com.example;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "tx-consumer-group")
public class TxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消费事务消息:" + msg);
}
}
package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TxTest {
@Autowired
private TxProducer txProducer;
@Test
public void testTxMsg() throws InterruptedException {
txProducer.sendTxMsg("Hello RocketMQ TX!");
Thread.sleep(3000);
}
}
半消息发送成功:Hello RocketMQ TX!
本地事务执行成功,提交消息:Hello RocketMQ TX!
消费事务消息:Hello RocketMQ TX!
半消息发送成功:Hello RocketMQ TX!
本地事务执行失败,回滚消息:Hello RocketMQ TX!
# 消费者无输出(消息被回滚)
- 事务组绑定:
@RocketMQTransactionListener 的 txProducerGroup 必须和配置中 rocketmq.producer.group 一致;
- 事务 ID:消息必须设置
RocketMQHeaders.TRANSACTION_ID,否则回查异常;
- 本地事务状态:
COMMIT(投递消息)、ROLLBACK(删除消息)、UNKNOWN(触发回查);
- 运行前提:确保 RocketMQ NameServer 和 Broker 已启动(默认端口 9876)。
此示例是极简可运行版本,仅保留事务消息的核心逻辑,实际业务中只需替换「本地事务」和「消费逻辑」即可。
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9922.html
文章版权归作者所有,未经允许请勿转载。
THE END