在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(极简可运行代码示例)

以下是 Spring Boot + RocketMQ 事务消息 的极简可运行代码示例,仅保留核心逻辑,剔除所有冗余代码,可直接复制运行。

一、环境准备

  1. 本地启动 RocketMQ(NameServer: 9876,Broker 正常运行);
  2. JDK 8+、Spring Boot 2.7.x。

二、核心依赖(pom.xml)

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rocketmq-tx-mini</artifactId>
    <version>1.0</version>

    <dependencies>
        <!-- Spring Boot 核心 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- RocketMQ 整合 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

三、核心配置(application.yml)

yaml
rocketmq:
  name-server: 127.0.0.1:9876  # RocketMQ NameServer 地址
  producer:
    group: tx-group  # 事务生产者组(必须唯一)
  consumer:
    group: tx-consumer-group  # 消费者组(必须唯一)

四、核心代码(仅 3 个文件)

1. 启动类(TxApplication.java)

java
运行
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TxApplication {
    public static void main(String[] args) {
        SpringApplication.run(TxApplication.class, args);
    }
}

2. 事务消息生产者(TxProducer.java)

java
运行
package com.example;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class TxProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 发送事务消息
    public void sendTxMsg(String msg) {
        // 构建事务消息(必须加事务ID)
        Message<String> txMessage = MessageBuilder
                .withPayload(msg)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                .build();

        // 发送半消息 + 绑定本地事务
        rocketMQTemplate.sendMessageInTransaction("tx-topic", txMessage, null);
        System.out.println("半消息发送成功:" + msg);
    }

    // 本地事务监听器(核心)
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-group")
    public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        // 执行本地事务
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String content = (String) msg.getPayload();
            try {
                // 模拟本地事务(成功则提交,失败则回滚)
                boolean success = true; // 改为false测试回滚
                if (success) {
                    System.out.println("本地事务执行成功,提交消息:" + content);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    System.out.println("本地事务执行失败,回滚消息:" + content);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        // 事务回查(兜底)
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // 模拟回查:默认提交
            return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
        }
    }
}

3. 事务消息消费者(TxConsumer.java)

java
运行
package com.example;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "tx-consumer-group")
public class TxConsumer implements RocketMQListener<String> {

    // 消费消息(仅接收提交的事务消息)
    @Override
    public void onMessage(String msg) {
        // 模拟消费逻辑(幂等校验可自行补充)
        System.out.println("消费事务消息:" + msg);
    }
}

五、测试代码(TxTest.java)

java
运行
package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TxTest {
    @Autowired
    private TxProducer txProducer;

    @Test
    public void testTxMsg() throws InterruptedException {
        // 发送事务消息
        txProducer.sendTxMsg("Hello RocketMQ TX!");
        // 等待消费完成
        Thread.sleep(3000);
    }
}

六、运行结果

正常流程(本地事务成功)

plaintext
半消息发送成功:Hello RocketMQ TX!
本地事务执行成功,提交消息:Hello RocketMQ TX!
消费事务消息:Hello RocketMQ TX!

本地事务失败(修改 success = false

plaintext
半消息发送成功:Hello RocketMQ TX!
本地事务执行失败,回滚消息:Hello RocketMQ TX!
# 消费者无输出(消息被回滚)

核心关键点

  1. 事务组绑定@RocketMQTransactionListener 的 txProducerGroup 必须和配置中 rocketmq.producer.group 一致;
  2. 事务 ID:消息必须设置 RocketMQHeaders.TRANSACTION_ID,否则回查异常;
  3. 本地事务状态COMMIT(投递消息)、ROLLBACK(删除消息)、UNKNOWN(触发回查);
  4. 运行前提:确保 RocketMQ NameServer 和 Broker 已启动(默认端口 9876)。
此示例是极简可运行版本,仅保留事务消息的核心逻辑,实际业务中只需替换「本地事务」和「消费逻辑」即可。
阅读剩余
THE END