在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(核心配置)

以下是 Spring Boot 中使用 RocketMQ 事务消息的核心配置 + 极简可运行代码示例,聚焦关键配置和核心逻辑,剔除冗余代码,便于快速集成到项目中。

一、核心依赖(仅保留关键)

xml
<!-- pom.xml 核心依赖 -->
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- RocketMQ 核心整合包 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
</dependencies>

二、核心配置文件(application.yml)

yaml
# 仅保留 RocketMQ 事务消息核心配置
rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址
  producer:
    group: tx-producer-group    # 事务生产者组(必须唯一,监听器需绑定)
    send-message-timeout: 3000  # 发送超时
  consumer:
    group: tx-consumer-group    # 消费者组(必须唯一)
    retry-times-when-consume-failed: 3  # 消费失败重试次数

三、核心代码:事务消息生产者(发送 + 本地事务)

java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class TxProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 1. 发送事务半消息(核心方法)
    public void sendTxMsg(String bizId, String content) {
        // 构建事务消息(必须设置事务ID)
        Message<String> txMsg = MessageBuilder
                .withPayload(content)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                .build();

        // 发送半消息 + 绑定本地事务监听器
        rocketMQTemplate.sendMessageInTransaction(
                "tx_topic:tx_tag",  // 主题+标签
                txMsg,              // 消息体
                bizId               // 附加参数(传递到本地事务)
        );
    }

    // 2. 本地事务监听器(事务消息核心配置)
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
    public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        // 核心1:执行本地事务(业务逻辑)
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                String bizId = (String) arg;
                String content = (String) msg.getPayload();
                
                // ========== 本地事务核心逻辑(如操作数据库)==========
                boolean isSuccess = doLocalBiz(bizId, content);

                if (isSuccess) {
                    // 提交:消息可被消费
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    // 回滚:消息被删除
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                // 未知:触发Broker回查
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        // 核心2:事务回查(解决状态丢失)
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String content = (String) msg.getPayload();
            // 回查本地事务状态(查DB/日志)
            boolean isSuccess = checkLocalBiz(content);
            return isSuccess ? 
                org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT :
                org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
        }

        // 模拟本地事务
        private boolean doLocalBiz(String bizId, String content) {
            return true; // 业务成功返回true,失败返回false
        }

        // 模拟事务回查
        private boolean checkLocalBiz(String content) {
            return true;
        }
    }
}

四、核心代码:事务消息消费者(接收 + 幂等)

java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
// 核心配置:监听主题+消费者组+标签
@RocketMQMessageListener(
        topic = "tx_topic",
        consumerGroup = "tx-consumer-group",
        selectorExpression = "tx_tag"
)
public class TxConsumer implements RocketMQListener<String> {

    // 消费核心方法(仅接收生产者提交的消息)
    @Override
    public void onMessage(String msg) {
        try {
            // 核心:幂等校验(必加!避免重复消费)
            if (checkIdempotent(msg)) {
                System.out.println("消息已消费,跳过:" + msg);
                return;
            }

            // ========== 消费核心逻辑 ==========
            doConsumeBiz(msg);

            // 记录消费状态(幂等依据)
            recordConsumeStatus(msg);
            System.out.println("消费成功:" + msg);
        } catch (Exception e) {
            // 抛出异常触发重试
            throw new RuntimeException("消费失败,重试", e);
        }
    }

    // 幂等校验(核心)
    private boolean checkIdempotent(String msg) {
        return false; // 未消费返回false,已消费返回true
    }

    // 记录消费状态(落地DB)
    private void recordConsumeStatus(String msg) {}

    // 消费业务逻辑
    private void doConsumeBiz(String msg) {}
}

五、启动类(最简)

java
运行
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketMQTxApp {
    public static void main(String[] args) {
        SpringApplication.run(RocketMQTxApp.class, args);
    }
}

六、测试代码(验证核心流程)

java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TxTest {
    @Autowired
    private TxProducer txProducer;

    @Test
    public void testTxMsg() throws InterruptedException {
        // 发送事务消息
        txProducer.sendTxMsg("BIZ_001", "测试事务消息");
        // 等待消费完成
        Thread.sleep(5000);
    }
}

核心配置关键点总结

配置项 作用
rocketmq.producer.group 事务生产者组,必须与 @RocketMQTransactionListener 的 txProducerGroup 一致
RocketMQHeaders.TRANSACTION_ID 事务唯一 ID,避免回查冲突
executeLocalTransaction 本地事务执行逻辑,返回 COMMIT/ROLLBACK/UNKNOWN
checkLocalTransaction 事务回查逻辑,解决「本地事务成功但 COMMIT 指令丢失」问题
消费者幂等校验 必须实现(如基于 msg / 业务 ID),避免重复消费

运行前提

  1. 本地 / 服务器已启动 RocketMQ NameServer(9876 端口)和 Broker;
  2. 核心配置中 name-server 地址需与实际部署一致。
此示例保留了 RocketMQ 事务消息的核心配置和核心逻辑,可直接复制到项目中,替换 doLocalBiz/doConsumeBiz 为实际业务逻辑即可快速集成。
阅读剩余
THE END