Spring Boot 中 RocketMQ 事务消息完整配置指南

在 Spring Boot 中配置 RocketMQ 事务消息,核心是「依赖引入 + 基础配置 + 事务生产者 / 监听器配置 + 消费端配置」,以下是分步实操,覆盖配置文件、核心代码、关键优化,可直接落地。

一、前置准备

  1. 环境要求
    • Spring Boot 2.x(推荐 2.3.x+);
    • RocketMQ 4.3.0+(事务消息最低支持版本,推荐 4.9.x);
    • JDK 8+。
  2. RocketMQ 部署:已启动 NameServer 和 Broker(参考之前的 RocketMQ 安装步骤)。

二、步骤 1:引入依赖(pom.xml)

核心引入 rocketmq-spring-boot-starter,版本需与 RocketMQ 服务端兼容(推荐 2.2.3,适配 RocketMQ 4.9.x):
xml
<dependencies>
    <!-- Spring Boot 核心 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- RocketMQ 整合 Spring Boot 核心依赖 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>

    <!-- JSON 序列化(可选,推荐) -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>

    <!-- 测试依赖(可选) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

三、步骤 2:核心配置文件(application.yml/application.properties)

1. YAML 格式(推荐)

yaml
spring:
  application:
    name: rocketmq-tx-demo  # 应用名称

# RocketMQ 基础配置
rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址(多个用;分隔,如 192.168.1.100:9876;192.168.1.101:9876)
  producer:
    group: tx-producer-group  # 事务生产者组(必须唯一,事务监听器需绑定此值)
    send-message-timeout: 3000  # 消息发送超时时间(ms)
    retry-times-when-send-failed: 2  # 同步发送失败重试次数
    retry-times-when-send-async-failed: 2  # 异步发送失败重试次数
    compress-message-body-threshold: 4096  # 消息体压缩阈值(超过4K自动压缩)
  consumer:
    group: tx-consumer-group  # 事务消息消费者组(必须唯一)
    consume-thread-min: 10    # 最小消费线程数
    consume-thread-max: 20    # 最大消费线程数
    consume-message-batch-max-size: 1  # 批量消费大小(事务消息建议单条消费)
    retry-times-when-consume-failed: 3  # 消费失败重试次数(默认16次,可自定义)

2. Properties 格式(兼容老项目)

properties
# 应用名称
spring.application.name=rocketmq-tx-demo

# RocketMQ NameServer 地址
rocketmq.name-server=127.0.0.1:9876

# 生产者配置
rocketmq.producer.group=tx-producer-group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=2
rocketmq.producer.compress-message-body-threshold=4096

# 消费者配置
rocketmq.consumer.group=tx-consumer-group
rocketmq.consumer.consume-thread-min=10
rocketmq.consumer.consume-thread-max=20
rocketmq.consumer.consume-message-batch-max-size=1
rocketmq.consumer.retry-times-when-consume-failed=3

四、步骤 3:事务消息生产者配置(核心)

需实现「发送半消息 + 本地事务监听器」,监听器是事务消息的核心,必须绑定生产者组:
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.UUID;

// 1. 业务DTO(以订单支付为例)
class OrderPayDTO {
    private String orderId;      // 订单ID
    private String userId;       // 用户ID
    private BigDecimal payAmount;// 支付金额

    // 构造器、getter/setter 省略
    public OrderPayDTO(String orderId, String userId, BigDecimal payAmount) {
        this.orderId = orderId;
        this.userId = userId;
        this.payAmount = payAmount;
    }
}

// 2. 事务消息生产者核心配置
@Component
public class RocketMQTxProducer {
    // 注入Spring Boot封装的RocketMQ模板
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务半消息(触发本地事务)
     */
    public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
        // 1. 构建业务消息体
        OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
        String msgBody = JSON.toJSONString(dto);

        // 2. 构建事务消息(必须设置唯一事务ID,避免回查冲突)
        Message<String> txMessage = MessageBuilder
                .withPayload(msgBody)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) // 事务唯一标识
                .setHeader("orderId", orderId) // 自定义头(可选,便于回查)
                .build();

        // 3. 发送半消息 + 绑定本地事务监听器
        // 参数1:topic:tag(tag可选,用于消息过滤)
        // 参数2:事务消息体
        // 参数3:附加参数(可传递本地事务上下文,如userId)
        rocketMQTemplate.sendMessageInTransaction(
                "order_pay_topic:tx_pay_tag",
                txMessage,
                userId // 附加参数,会传递到本地事务方法
        );
    }

    /**
     * 本地事务监听器(核心:执行本地事务 + 事务回查)
     * 注解说明:txProducerGroup 必须与配置文件中生产者组一致!
     */
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
    public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        /**
         * 第一步:执行本地事务(核心业务逻辑,如扣减库存、更新订单状态)
         * @param msg 半消息内容
         * @param arg 发送半消息时传递的附加参数(此处为userId)
         * @return 事务状态:COMMIT(提交)/ROLLBACK(回滚)/UNKNOWN(未知,触发回查)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                // 解析消息体
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();
                String userId = (String) arg;

                // ========== 核心:执行本地事务(实际场景操作数据库)==========
                // 示例:1. 扣减用户余额  2. 更新订单为已支付
                boolean txSuccess = this.updateOrderStatus(orderId, "PAID") 
                        && this.deductUserBalance(userId, dto.getPayAmount());

                if (txSuccess) {
                    System.out.println("本地事务执行成功:订单" + orderId + "支付完成");
                    // 提交事务:Broker会投递消息给消费者
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    System.out.println("本地事务执行失败:订单" + orderId + "支付失败");
                    // 回滚事务:Broker会删除半消息,消费者不会收到
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                System.err.println("本地事务执行异常:" + e.getMessage());
                // 返回UNKNOWN:Broker会定时回查(默认60秒一次)
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        /**
         * 第二步:事务回查(解决生产者宕机/网络异常导致的状态丢失)
         * Broker会定时调用此方法,确认本地事务最终状态
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            try {
                // 解析消息体,获取订单ID
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();

                // ========== 核心:回查本地事务状态(查数据库/事务日志)==========
                String orderStatus = this.getOrderStatus(orderId);
                if ("PAID".equals(orderStatus)) {
                    // 本地事务已成功,提交消息
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else if ("UNPAID".equals(orderStatus)) {
                    // 本地事务未执行,回滚消息
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                } else {
                    // 状态未知,继续回查
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
                }
            } catch (Exception e) {
                System.err.println("事务回查异常:" + e.getMessage());
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        // 模拟:更新订单状态(实际场景操作数据库)
        private boolean updateOrderStatus(String orderId, String status) {
            // 业务逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
            return true;
        }

        // 模拟:扣减用户余额(实际场景操作数据库)
        private boolean deductUserBalance(String userId, BigDecimal amount) {
            // 业务逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
            return true;
        }

        // 模拟:查询订单状态(用于事务回查)
        private String getOrderStatus(String orderId) {
            // 业务逻辑:SELECT status FROM t_order WHERE order_id = ?
            return "PAID";
        }
    }
}

五、步骤 4:事务消息消费者配置

消费端需监听事务消息,且必须保证幂等性(避免重复消费):
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
// 监听事务消息的主题+标签,消费者组必须与配置文件一致
@RocketMQMessageListener(
        topic = "order_pay_topic",
        consumerGroup = "tx-consumer-group",
        selectorExpression = "tx_pay_tag", // 只消费tx_pay_tag标签的消息
        consumeMode = org.apache.rocketmq.common.protocol.heartbeat.ConsumeMode.CONCURRENTLY, // 并发消费
        messageModel = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING // 集群模式
)
public class OrderPayTxConsumer implements RocketMQListener<String> {

    /**
     * 消费事务消息(仅当生产者提交事务后,才会收到消息)
     * 注意:消费失败会自动重试,需保证幂等性!
     */
    @Override
    public void onMessage(String msg) {
        try {
            OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
            String orderId = dto.getOrderId();

            // ========== 幂等校验(核心!避免重复消费)==========
            if (this.checkConsumed(orderId)) {
                System.out.println("消息已消费:订单" + orderId + ",跳过处理");
                return;
            }

            // ========== 消费逻辑(如触发物流创建、发送短信通知)==========
            boolean consumeSuccess = this.createLogistics(orderId) && this.sendPaySms(dto.getUserId(), orderId);

            if (consumeSuccess) {
                System.out.println("消费成功:订单" + orderId + ",创建物流+发送短信完成");
                // 记录消费状态(用于幂等校验)
                this.recordConsumeStatus(orderId, "SUCCESS");
            } else {
                System.err.println("消费失败:订单" + orderId);
                // 抛出异常触发重试
                throw new RuntimeException("消费失败,重试");
            }
        } catch (Exception e) {
            System.err.println("消费异常:" + e.getMessage());
            throw new RuntimeException(e); // 触发重试(默认最多16次)
        }
    }

    // 模拟:幂等校验(查消费日志)
    private boolean checkConsumed(String orderId) {
        // SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
        return false;
    }

    // 模拟:记录消费状态
    private void recordConsumeStatus(String orderId, String status) {
        // INSERT INTO t_consume_log (order_id, status) VALUES (?, ?)
    }

    // 模拟:创建物流单
    private boolean createLogistics(String orderId) {
        return true;
    }

    // 模拟:发送支付成功短信
    private boolean sendPaySms(String userId, String orderId) {
        return true;
    }
}

六、步骤 5:测试事务消息

编写测试类验证配置是否生效:
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.math.BigDecimal;

@SpringBootTest
public class RocketMQTxTest {
    @Autowired
    private RocketMQTxProducer txProducer;

    @Test
    public void testTxMessage() throws InterruptedException {
        // 模拟订单支付:订单ID=ORDER_20250101,用户ID=USER_001,支付金额=100元
        txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
        
        // 等待事务执行和消费完成
        Thread.sleep(10000);
    }
}

七、关键配置优化与避坑

1. 事务回查参数调整(Broker 端)

修改 RocketMQ 的 broker.conf,优化回查频率(默认 60 秒,可缩短):
properties
# 事务回查间隔(ms),建议10秒
transactionCheckInterval=10000
# 最大回查次数(默认15次,可改为10次)
transactionCheckMax=10
# 回查线程数(默认1,高并发场景增加)
transactionCheckThreadPoolNums=5

2. 消费端重试与死信队列

  • 消费失败重试次数:配置文件中 retry-times-when-consume-failed 建议设为 3~5 次,过多重试易导致消息堆积;
  • 死信队列:重试次数耗尽后,消息进入死信队列(命名规则:%DLQ%+消费者组),需监控并人工处理。

3. 核心避坑点

问题场景 原因 解决方案
事务监听器不生效 @RocketMQTransactionListener 的 txProducerGroup 与配置文件生产者组不一致 严格保证两者名称相同
回查时获取不到事务状态 本地事务状态未落地(如仅存内存) 必须将事务状态写入数据库 / 日志,回查时从持久化存储读取
消费端重复消费 未做幂等校验 基于订单 ID / 消息 ID 实现幂等,避免重复执行业务逻辑
半消息发送成功但本地事务未执行 生产者宕机 / 网络异常 依赖 Broker 的事务回查机制,确保回查逻辑准确

八、核心总结

Spring Boot 中配置 RocketMQ 事务消息的核心要点:
  1. 配置文件:重点配置生产者组、NameServer 地址,消费者组需唯一;
  2. 生产者:通过 sendMessageInTransaction 发送半消息,事务监听器必须绑定生产者组;
  3. 本地事务executeLocalTransaction 执行核心业务,checkLocalTransaction 处理回查;
  4. 消费端:必须实现幂等,避免重复消费导致数据不一致;
  5. 运维:监控事务回查次数、死信队列,及时处理异常消息。
该配置适配电商、金融等绝大多数「最终一致性」场景,是分布式事务的主流解决方案。
阅读剩余
THE END