Spring Boot 中 RocketMQ 事务消息的发送与接收(完整实操)

在 Spring Boot 中实现 RocketMQ 事务消息的「发送(生产者)+ 接收(消费者)」,核心是依托 RocketMQ 事务消息的「半消息 + 本地事务 + 事务回查」机制,结合 Spring Boot 封装的 rocketmq-spring-boot-starter 快速集成。以下是从零到一的实操步骤,包含完整代码、配置和测试验证。

一、环境准备

1. 基础依赖(pom.xml)

引入 Spring Boot 与 RocketMQ 整合依赖,版本需匹配(推荐 rocketmq-spring-boot-starter:2.2.3 适配 RocketMQ 4.9.x):
xml
<dependencies>
    <!-- Spring Boot 核心 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.7.15</version>
    </dependency>

    <!-- RocketMQ Spring Boot 整合包 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>

    <!-- JSON 序列化(解析消息体) -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>

    <!-- 测试依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. RocketMQ 配置(application.yml)

配置 NameServer 地址、生产者 / 消费者核心参数(事务消息需指定唯一生产者组):
yaml
spring:
  application:
    name: rocketmq-tx-demo

# RocketMQ 核心配置
rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址(集群用;分隔)
  producer:
    group: tx-producer-group    # 事务生产者组(必须唯一,监听器需绑定)
    send-message-timeout: 3000  # 消息发送超时时间
    retry-times-when-send-failed: 3  # 发送失败重试次数
  consumer:
    group: tx-consumer-group    # 事务消息消费者组(必须唯一)
    consume-thread-max: 20      # 最大消费线程数
    retry-times-when-consume-failed: 3  # 消费失败重试次数(默认16次,建议3-5次)

3. RocketMQ 服务部署

确保本地 / 服务器已启动 RocketMQ 的 NameServer 和 Broker(参考命令):
bash
运行
# 启动 NameServer
nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv &

# 启动 Broker(指定 NameServer 地址)
nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -n 127.0.0.1:9876 &

二、步骤 1:实现事务消息发送(生产者)

事务消息生产者需完成「发送半消息 + 执行本地事务 + 事务回查」三个核心动作,核心是通过 RocketMQLocalTransactionListener 绑定本地事务逻辑。

1. 定义业务 DTO(以订单支付为例)

java
运行
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;

// 订单支付消息体
@Data
@AllArgsConstructor
public class OrderPayDTO {
    private String orderId;      // 订单ID(业务唯一标识)
    private String userId;       // 用户ID
    private BigDecimal payAmount;// 支付金额
}

2. 事务消息生产者核心代码

java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;

@Service
public class TxMessageProducer {
    // 注入 Spring Boot 封装的 RocketMQ 模板
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 步骤1:发送事务半消息(触发本地事务)
     * @param orderId 订单ID
     * @param userId 用户ID
     * @param payAmount 支付金额
     */
    public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
        // 1. 构建业务消息体
        OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
        String msgBody = JSON.toJSONString(dto);

        // 2. 构建事务消息(必须设置唯一事务ID,避免回查冲突)
        Message<String> txMessage = MessageBuilder
                .withPayload(msgBody)
                // 事务唯一标识(RocketMQ 用于回查)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                // 自定义头(便于回查时快速获取关键信息)
                .setHeader("orderId", orderId)
                .build();

        // 3. 发送半消息 + 绑定本地事务监听器
        // 参数说明:
        // - "order_pay_topic:tx_tag":主题+标签(标签用于消息过滤)
        // - txMessage:事务消息体
        // - userId:附加参数(传递到本地事务方法)
        rocketMQTemplate.sendMessageInTransaction(
                "order_pay_topic:tx_tag",
                txMessage,
                userId
        );
        System.out.println("半消息发送成功,订单ID:" + orderId);
    }

    /**
     * 步骤2:本地事务监听器(核心:执行本地事务 + 事务回查)
     * 注解说明:txProducerGroup 必须与 application.yml 中生产者组一致!
     */
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
    public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        /**
         * 执行本地事务(核心业务逻辑:如扣减余额、更新订单状态)
         * @param msg 半消息内容
         * @param arg 发送半消息时传递的附加参数(此处为 userId)
         * @return 事务状态:COMMIT(提交)/ROLLBACK(回滚)/UNKNOWN(触发回查)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                // 解析消息体
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();
                String userId = (String) arg;

                // ========== 核心:执行本地事务(实际场景操作数据库)==========
                // 示例逻辑:1. 扣减用户余额  2. 更新订单为已支付
                boolean deductSuccess = deductUserBalance(userId, dto.getPayAmount());
                boolean updateOrderSuccess = updateOrderStatus(orderId, "PAID");

                if (deductSuccess && updateOrderSuccess) {
                    System.out.println("本地事务执行成功,订单ID:" + orderId);
                    // 提交事务:Broker 会将半消息转为可消费状态
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    System.out.println("本地事务执行失败,订单ID:" + orderId);
                    // 回滚事务:Broker 会删除半消息,消费者不会收到
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                System.err.println("本地事务执行异常:" + e.getMessage());
                // 返回 UNKNOWN:Broker 会定时触发回查(默认60秒一次)
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        /**
         * 事务回查(解决:本地事务成功,但 COMMIT 指令未送达 Broker 的场景)
         * Broker 定时调用此方法,确认本地事务最终状态
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            try {
                // 解析消息体,获取订单ID
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();

                // ========== 核心:回查本地事务状态(必须查持久化存储,如数据库)==========
                String orderStatus = getOrderStatusFromDB(orderId);
                if ("PAID".equals(orderStatus)) {
                    System.out.println("事务回查成功,订单已支付,提交消息:" + orderId);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else if ("UNPAID".equals(orderStatus)) {
                    System.out.println("事务回查失败,订单未支付,回滚消息:" + orderId);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                } else {
                    // 状态未知,继续回查(最多回查15次,可配置)
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
                }
            } catch (Exception e) {
                System.err.println("事务回查异常:" + e.getMessage());
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        // ------------------- 模拟数据库操作 -------------------
        // 模拟扣减用户余额
        private boolean deductUserBalance(String userId, BigDecimal amount) {
            // 实际逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
            return true; // 模拟成功,可改为 false 测试回滚
        }

        // 模拟更新订单状态
        private boolean updateOrderStatus(String orderId, String status) {
            // 实际逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
            return true;
        }

        // 模拟查询订单状态(用于事务回查)
        private String getOrderStatusFromDB(String orderId) {
            // 实际逻辑:SELECT status FROM t_order WHERE order_id = ?
            return "PAID";
        }
    }
}

三、步骤 2:实现事务消息接收(消费者)

消费者仅能收到「生产者提交事务」后的消息,需保证消费幂等性(避免重复消费导致数据错误)。
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
// 监听配置:主题+消费者组+标签过滤
@RocketMQMessageListener(
        topic = "order_pay_topic",        // 与生产者发送的主题一致
        consumerGroup = "tx-consumer-group", // 与 application.yml 中消费者组一致
        selectorExpression = "tx_tag",    // 只消费 tx_tag 标签的消息
        consumeMode = org.apache.rocketmq.common.protocol.heartbeat.ConsumeMode.CONCURRENTLY, // 并发消费
        messageModel = org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING // 集群模式
)
public class TxMessageConsumer implements RocketMQListener<String> {

    /**
     * 消费事务消息(仅当生产者提交事务后,才会触发此方法)
     * @param msg 消息体(与生产者发送的内容一致)
     */
    @Override
    public void onMessage(String msg) {
        try {
            // 1. 解析消息体
            OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
            String orderId = dto.getOrderId();
            System.out.println("收到事务消息,开始消费:" + orderId);

            // 2. 幂等校验(核心!避免重复消费)
            if (checkConsumed(orderId)) {
                System.out.println("消息已消费,跳过处理:" + orderId);
                return;
            }

            // 3. 执行业务逻辑(如创建物流、发送短信)
            boolean createLogistics = createLogistics(orderId);
            boolean sendSms = sendPaySuccessSms(dto.getUserId(), orderId);

            if (createLogistics && sendSms) {
                // 4. 记录消费状态(用于幂等校验)
                recordConsumeStatus(orderId);
                System.out.println("消息消费成功:" + orderId);
            } else {
                System.err.println("消息消费失败:" + orderId);
                // 抛出异常触发重试(默认重试3次,可配置)
                throw new RuntimeException("消费失败,触发重试");
            }
        } catch (Exception e) {
            System.err.println("消费异常:" + e.getMessage());
            throw new RuntimeException(e); // 必须抛出异常,否则 RocketMQ 认为消费成功
        }
    }

    // ------------------- 模拟消费逻辑 -------------------
    // 幂等校验:查询是否已消费过该订单
    private boolean checkConsumed(String orderId) {
        // 实际逻辑:SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
        return false; // 模拟未消费
    }

    // 记录消费状态(落地到数据库)
    private void recordConsumeStatus(String orderId) {
        // 实际逻辑:INSERT INTO t_consume_log (order_id, consume_time) VALUES (?, NOW())
    }

    // 模拟创建物流单
    private boolean createLogistics(String orderId) {
        // 实际逻辑:INSERT INTO t_logistics (order_id, status) VALUES (?, 'CREATE')
        return true;
    }

    // 模拟发送支付成功短信
    private boolean sendPaySuccessSms(String userId, String orderId) {
        // 实际逻辑:调用短信接口发送通知
        return true;
    }
}

四、步骤 3:测试事务消息的发送与接收

编写测试类,验证完整流程:
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;

@SpringBootTest
public class RocketMQTxTest {
    @Autowired
    private TxMessageProducer txProducer;

    @Test
    public void testTxMessageSendAndConsume() throws InterruptedException {
        // 模拟订单支付:订单ID=ORDER_20250101,用户ID=USER_001,支付金额=100元
        txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
        
        // 等待事务执行和消费完成(实际生产无需休眠,此处为测试)
        Thread.sleep(10000);
    }
}

五、核心流程验证与日志说明

1. 正常流程日志(本地事务成功)

plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行成功,订单ID:ORDER_20250101
收到事务消息,开始消费:ORDER_20250101
消息消费成功:ORDER_20250101

2. 本地事务失败日志

plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行失败,订单ID:ORDER_20250101
# 消费者无日志(Broker 删除半消息,未投递)

3. 事务回查日志(网络异常导致 COMMIT 未送达)

plaintext
半消息发送成功,订单ID:ORDER_20250101
本地事务执行成功,订单ID:ORDER_20250101
事务回查成功,订单已支付,提交消息:ORDER_20250101
收到事务消息,开始消费:ORDER_20250101
消息消费成功:ORDER_20250101

六、关键避坑点

  1. 生产者组与监听器绑定不一致@RocketMQTransactionListener 的 txProducerGroup 必须和 application.yml 中 rocketmq.producer.group 完全一致,否则监听器不生效;
  2. 本地事务状态未落地:事务回查时必须从数据库 / 日志中获取状态,不能仅依赖内存,否则宕机后无法确认状态;
  3. 消费端未做幂等:RocketMQ 重试机制会导致重复消费,必须基于订单 ID / 消息 ID 实现幂等;
  4. 回查次数过多:默认回查 15 次,可通过 Broker 配置 transactionCheckMax=10 减少,避免无效回查。

七、核心总结

Spring Boot 中 RocketMQ 事务消息的发送与接收核心逻辑:
  1. 发送端:通过 sendMessageInTransaction 发送半消息,绑定 RocketMQLocalTransactionListener 执行本地事务和回查,确保「本地事务成功则消息投递」;
  2. 接收端:监听指定主题,消费前做幂等校验,消费失败抛出异常触发重试,最终失败进入死信队列;
  3. 可靠性:依赖「半消息 + 回查」保证消息与本地事务一致性,消费端幂等保证数据不重复处理。
该方案适用于电商订单、资金转账、物流同步等需要「最终一致性」的分布式场景,是 RocketMQ 事务消息的标准落地方式。
阅读剩余
THE END