Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例

以下是 Spring Boot + RocketMQ 事务消息 完整可运行的代码示例,包含生产者(事务消息发送)、消费者(消息接收)、核心配置,基于 RocketMQ 4.9.x + Spring Boot 2.7.x 实现,可直接复制到项目中运行。

一、前置依赖(pom.xml)

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rocketmq-tx-demo</artifactId>
    <version>1.0.0</version>

    <dependencies>
        <!-- Spring Boot 核心 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- RocketMQ Spring Boot 整合 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>

        <!-- JSON 序列化 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!-- 测试依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

二、核心配置(application.yml)

yaml
spring:
  application:
    name: rocketmq-tx-demo

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876  # 本地NameServer地址(集群用;分隔)
  producer:
    group: tx-producer-group    # 事务生产者组(必须唯一)
    send-message-timeout: 3000  # 发送超时时间
    retry-times-when-send-failed: 3  # 发送失败重试次数
  consumer:
    group: tx-consumer-group    # 消费者组(必须唯一)
    consume-thread-max: 20      # 最大消费线程数
    retry-times-when-consume-failed: 3  # 消费失败重试次数

三、消息体 DTO(OrderPayDTO.java)

java
运行
package com.example.rocketmqtxdemo.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;

/**
 * 订单支付消息体
 */
@Data
@AllArgsConstructor
public class OrderPayDTO {
    private String orderId;      // 订单ID(业务唯一标识)
    private String userId;       // 用户ID
    private BigDecimal payAmount;// 支付金额
}

四、事务消息生产者(TxMessageProducer.java)

java
运行
package com.example.rocketmqtxdemo.producer;

import com.alibaba.fastjson.JSON;
import com.example.rocketmqtxdemo.dto.OrderPayDTO;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;

/**
 * 事务消息生产者(发送半消息 + 执行本地事务 + 事务回查)
 */
@Service
public class TxMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务半消息
     */
    public void sendTxPayMessage(String orderId, String userId, BigDecimal payAmount) {
        // 1. 构建业务消息体
        OrderPayDTO dto = new OrderPayDTO(orderId, userId, payAmount);
        String msgBody = JSON.toJSONString(dto);

        // 2. 构建事务消息(必须设置唯一事务ID)
        Message<String> txMessage = MessageBuilder
                .withPayload(msgBody)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                .setHeader("orderId", orderId)
                .build();

        // 3. 发送半消息 + 绑定本地事务监听器
        rocketMQTemplate.sendMessageInTransaction(
                "order_pay_topic:tx_tag",  // 主题+标签
                txMessage,                 // 消息体
                userId                     // 附加参数(传递到本地事务方法)
        );
        System.out.println("【生产者】半消息发送成功,订单ID:" + orderId);
    }

    /**
     * 本地事务监听器(核心:执行本地事务 + 事务回查)
     */
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
    public class OrderPayTxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        /**
         * 执行本地事务(核心业务逻辑:扣减余额、更新订单状态)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                // 解析消息体
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();
                String userId = (String) arg;

                // ========== 模拟本地事务:扣减用户余额 + 更新订单状态 ==========
                boolean deductSuccess = deductUserBalance(userId, dto.getPayAmount());
                boolean updateOrderSuccess = updateOrderStatus(orderId, "PAID");

                if (deductSuccess && updateOrderSuccess) {
                    System.out.println("【生产者】本地事务执行成功,订单ID:" + orderId);
                    // 提交事务:Broker投递消息给消费者
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    System.out.println("【生产者】本地事务执行失败,订单ID:" + orderId);
                    // 回滚事务:Broker删除半消息
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                System.err.println("【生产者】本地事务执行异常:" + e.getMessage());
                // 返回UNKNOWN,触发Broker回查
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        /**
         * 事务回查(解决:本地事务成功但COMMIT指令未送达Broker)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            try {
                String msgBody = (String) msg.getPayload();
                OrderPayDTO dto = JSON.parseObject(msgBody, OrderPayDTO.class);
                String orderId = dto.getOrderId();

                // ========== 模拟回查本地事务状态(查数据库) ==========
                String orderStatus = getOrderStatusFromDB(orderId);
                if ("PAID".equals(orderStatus)) {
                    System.out.println("【生产者】事务回查成功,提交消息,订单ID:" + orderId);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
                } else {
                    System.out.println("【生产者】事务回查失败,回滚消息,订单ID:" + orderId);
                    return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
                }
            } catch (Exception e) {
                System.err.println("【生产者】事务回查异常:" + e.getMessage());
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
            }
        }

        // ------------------- 模拟数据库操作 -------------------
        private boolean deductUserBalance(String userId, BigDecimal amount) {
            // 实际逻辑:UPDATE t_user SET balance = balance - ? WHERE user_id = ?
            return true; // 模拟成功,改为false可测试回滚
        }

        private boolean updateOrderStatus(String orderId, String status) {
            // 实际逻辑:UPDATE t_order SET status = ? WHERE order_id = ?
            return true;
        }

        private String getOrderStatusFromDB(String orderId) {
            // 实际逻辑:SELECT status FROM t_order WHERE order_id = ?
            return "PAID";
        }
    }
}

五、事务消息消费者(TxMessageConsumer.java)

java
运行
package com.example.rocketmqtxdemo.consumer;

import com.alibaba.fastjson.JSON;
import com.example.rocketmqtxdemo.dto.OrderPayDTO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 事务消息消费者(仅接收生产者提交的消息)
 */
@Service
@RocketMQMessageListener(
        topic = "order_pay_topic",        // 与生产者主题一致
        consumerGroup = "tx-consumer-group", // 与配置文件消费者组一致
        selectorExpression = "tx_tag"     // 过滤标签
)
public class TxMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        try {
            // 1. 解析消息体
            OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
            String orderId = dto.getOrderId();
            System.out.println("【消费者】收到事务消息,订单ID:" + orderId);

            // 2. 幂等校验(核心!避免重复消费)
            if (checkConsumed(orderId)) {
                System.out.println("【消费者】消息已消费,跳过处理,订单ID:" + orderId);
                return;
            }

            // 3. 模拟消费逻辑:创建物流单 + 发送短信
            boolean createLogistics = createLogistics(orderId);
            boolean sendSms = sendPaySuccessSms(dto.getUserId(), orderId);

            if (createLogistics && sendSms) {
                // 4. 记录消费状态(落地到数据库)
                recordConsumeStatus(orderId);
                System.out.println("【消费者】消息消费成功,订单ID:" + orderId);
            } else {
                System.err.println("【消费者】消息消费失败,订单ID:" + orderId);
                throw new RuntimeException("消费失败,触发重试");
            }
        } catch (Exception e) {
            System.err.println("【消费者】消费异常:" + e.getMessage());
            throw new RuntimeException(e); // 抛出异常触发重试
        }
    }

    // ------------------- 模拟消费逻辑 -------------------
    // 幂等校验:查询是否已消费
    private boolean checkConsumed(String orderId) {
        // 实际逻辑:SELECT COUNT(1) FROM t_consume_log WHERE order_id = ?
        return false;
    }

    // 记录消费状态
    private void recordConsumeStatus(String orderId) {
        // 实际逻辑:INSERT INTO t_consume_log (order_id, consume_time) VALUES (?, NOW())
    }

    // 模拟创建物流单
    private boolean createLogistics(String orderId) {
        return true;
    }

    // 模拟发送支付成功短信
    private boolean sendPaySuccessSms(String userId, String orderId) {
        return true;
    }
}

六、启动类(RocketmqTxDemoApplication.java)

java
运行
package com.example.rocketmqtxdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketmqTxDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketmqTxDemoApplication.class, args);
    }
}

七、测试类(RocketMQTxTest.java)

java
运行
package com.example.rocketmqtxdemo;

import com.example.rocketmqtxdemo.producer.TxMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;

@SpringBootTest
public class RocketMQTxTest {

    @Autowired
    private TxMessageProducer txProducer;

    @Test
    public void testTxMessageSendAndConsume() throws InterruptedException {
        // 模拟发送订单支付事务消息
        txProducer.sendTxPayMessage("ORDER_20250101", "USER_001", new BigDecimal(100));
        
        // 等待事务执行和消费完成(测试用,生产环境无需休眠)
        Thread.sleep(10000);
    }
}

八、运行说明

  1. 启动 RocketMQ:先启动 NameServer 和 Broker(本地部署参考命令):
    bash
    运行
    # 启动NameServer
    nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv &
    # 启动Broker
    nohup sh ${ROCKETMQ_HOME}/bin/mqbroker -n 127.0.0.1:9876 &
    
  2. 运行测试类:执行 RocketMQTxTest 的 testTxMessageSendAndConsume 方法,控制台输出如下:
    plaintext
    【生产者】半消息发送成功,订单ID:ORDER_20250101
    【生产者】本地事务执行成功,订单ID:ORDER_20250101
    【消费者】收到事务消息,订单ID:ORDER_20250101
    【消费者】消息消费成功,订单ID:ORDER_20250101
    

九、核心关键点

  1. 事务监听器绑定@RocketMQTransactionListener 的 txProducerGroup 必须和配置文件中 rocketmq.producer.group 一致;
  2. 幂等性:消费者必须通过「订单 ID / 消息 ID」做幂等校验,避免重复消费;
  3. 事务回查:本地事务状态需落地到数据库,回查时从数据库读取,不能依赖内存;
  4. 异常处理:消费失败抛出异常触发重试,重试耗尽后消息进入死信队列(%DLQ%tx-consumer-group)。
该示例覆盖了事务消息的完整流程,可直接适配电商订单、资金转账等「最终一致性」业务场景。
阅读剩余
THE END