RocketMQ + Java 完整实操示例(Spring Boot 集成)

以下是基于 Spring Boot 2.x + RocketMQ 4.9.x 的完整实操,覆盖「消息生产、消费、事务消息、延迟消息、死信队列」核心场景,可直接落地运行。

一、环境准备

1. 安装 RocketMQ

(Linux 环境为例,Windows 可参考官方文档)
bash
运行
# 1. 下载并解压
wget https://archive.apache.org/dist/rocketmq/4.9.7/rocketmq-all-4.9.7-bin-release.zip
unzip rocketmq-all-4.9.7-bin-release.zip -d /usr/local/rocketmq

# 2. 配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile

# 3. 调整 JVM 内存(避免内存不足)
sed -i 's/-Xms4g -Xmx4g/-Xms512m -Xmx512m/g' /usr/local/rocketmq/bin/runserver.sh
sed -i 's/-Xms4g -Xmx4g/-Xms512m -Xmx512m/g' /usr/local/rocketmq/bin/runbroker.sh

# 4. 启动 NameServer 和 Broker
nohup sh mqnamesrv &  # NameServer 默认端口 9876
nohup sh mqbroker -n 127.0.0.1:9876 &  # 连接 NameServer 启动 Broker

2. 依赖引入(Spring Boot 项目 pom.xml)

xml
<dependencies>
    <!-- Spring Boot 核心 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- RocketMQ 整合 Spring Boot -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    <!-- JSON 工具 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <!-- 测试 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3. 配置文件(application.yml)

yaml
spring:
  application:
    name: rocketmq-demo

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876  # NameServer 地址
  producer:
    group: demo-producer-group  # 生产者组(必须唯一)
    send-message-timeout: 3000  # 发送超时时间
    retry-times-when-send-failed: 2  # 发送失败重试次数
  consumer:
    group: demo-consumer-group  # 消费者组(必须唯一)
    consume-thread-max: 20      # 最大消费线程数

二、基础场景:普通消息生产与消费

1. 生产者(发送普通消息)

java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NormalMsgProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通同步消息(等待 Broker 确认,最常用)
     * @param topic 主题(需提前创建,RocketMQ 支持自动创建)
     * @param msg 消息内容
     */
    public void sendSyncMsg(String topic, String msg) {
        // 格式:"topic:tag"(tag 用于消息过滤,可选)
        rocketMQTemplate.syncSend(topic + ":normal", msg);
        System.out.println("同步消息发送成功:" + msg);
    }

    /**
     * 发送异步消息(不阻塞,通过回调获取结果)
     */
    public void sendAsyncMsg(String topic, String msg) {
        rocketMQTemplate.asyncSend(topic + ":async", msg,
                // 发送成功回调
                sendResult -> System.out.println("异步消息发送成功:" + sendResult),
                // 发送失败回调
                throwable -> System.err.println("异步消息发送失败:" + throwable.getMessage()));
    }

    /**
     * 发送单向消息(无需确认,适用于日志、埋点等非核心场景)
     */
    public void sendOneWayMsg(String topic, String msg) {
        rocketMQTemplate.sendOneWay(topic + ":oneway", msg);
        System.out.println("单向消息发送完成(无需确认):" + msg);
    }
}

2. 消费者(消费普通消息)

java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 消费者监听指定主题+标签的消息
 * topic:监听的主题
 * consumerGroup:消费者组(必须与配置文件一致)
 * selectorExpression:标签过滤(* 表示所有标签)
 */
@Service
@RocketMQMessageListener(
        topic = "demo_topic",
        consumerGroup = "demo-consumer-group",
        selectorExpression = "normal || async"  // 只消费 normal 和 async 标签的消息
)
public class NormalMsgConsumer implements RocketMQListener<String> {

    /**
     * 消息消费逻辑(默认自动 Ack,消费异常会重试)
     */
    @Override
    public void onMessage(String msg) {
        try {
            // 业务处理:如解析消息、更新数据库等
            System.out.println("消费普通消息:" + msg);
            // 模拟业务处理
            Thread.sleep(100);
        } catch (Exception e) {
            System.err.println("消费消息失败:" + e.getMessage());
            // 消费失败会自动重试(默认 16 次,可配置)
            throw new RuntimeException("消费失败,触发重试");
        }
    }
}

3. 测试类(验证生产消费)

java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RocketMQDemoTest {
    @Autowired
    private NormalMsgProducer producer;

    @Test
    public void testNormalMsg() throws InterruptedException {
        // 发送同步消息
        producer.sendSyncMsg("demo_topic", "Hello RocketMQ - Sync");
        // 发送异步消息
        producer.sendAsyncMsg("demo_topic", "Hello RocketMQ - Async");
        // 发送单向消息
        producer.sendOneWayMsg("demo_topic", "Hello RocketMQ - OneWay");
        
        // 等待消费完成
        Thread.sleep(5000);
    }
}

三、进阶场景 1:事务消息(保证最终一致性)

适用于「本地事务执行成功后,消息才允许消费」的场景(如订单支付、资金转账)。

1. 事务消息生产者

java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.UUID;

// 模拟转账 DTO
class TransferDTO {
    private String fromAccount;
    private String toAccount;
    private BigDecimal amount;

    // 构造器、getter/setter 省略
    public TransferDTO(String fromAccount, String toAccount, BigDecimal amount) {
        this.fromAccount = fromAccount;
        this.toAccount = toAccount;
        this.amount = amount;
    }
}

@Service
public class TransactionMsgProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息(半消息)
     */
    public void sendTransactionMsg(String fromAccount, String toAccount, BigDecimal amount) {
        TransferDTO dto = new TransferDTO(fromAccount, toAccount, amount);
        // 构建消息体
        Message<String> message = MessageBuilder
                .withPayload(JSON.toJSONString(dto))
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                .build();

        // 发送半消息,触发本地事务
        rocketMQTemplate.sendMessageInTransaction(
                "transfer_topic:transfer",  // 主题+标签
                message,                     // 消息内容
                null                         // 附加参数(可传本地事务需要的参数)
        );
    }

    /**
     * 本地事务监听器(核心:执行本地事务 + 事务回查)
     */
    @org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "demo-producer-group")
    public class TransferTransactionListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {

        /**
         * 执行本地事务(扣减转出账户余额)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
            try {
                // 模拟本地事务:扣减 fromAccount 余额(实际场景需操作数据库)
                System.out.println("本地事务执行:扣减账户 " + dto.getFromAccount() + " 余额 " + dto.getAmount());
                // 模拟异常:注释掉下面一行,测试回滚逻辑
                // throw new RuntimeException("扣减余额失败");
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT; // 提交消息
            } catch (Exception e) {
                System.err.println("本地事务执行失败:" + e.getMessage());
                return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK; // 回滚消息
            }
        }

        /**
         * 事务回查(解决生产者宕机导致的事务状态未知)
         */
        @Override
        public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
            // 模拟回查:检查本地事务是否执行成功(实际场景查数据库)
            System.out.println("事务回查:检查账户 " + dto.getFromAccount() + " 扣减是否成功");
            return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
        }
    }
}

2. 事务消息消费者

java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        topic = "transfer_topic",
        consumerGroup = "transfer-consumer-group",
        selectorExpression = "transfer"
)
public class TransferMsgConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        TransferDTO dto = JSON.parseObject(msg, TransferDTO.class);
        // 消费逻辑:给转入账户加款(失败重试保证最终一致性)
        int retryCount = 3; // 重试次数
        while (retryCount > 0) {
            try {
                System.out.println("消费事务消息:给账户 " + dto.getToAccount() + " 加款 " + dto.getAmount());
                // 模拟业务处理
                Thread.sleep(200);
                return; // 消费成功,退出重试
            } catch (Exception e) {
                retryCount--;
                System.err.println("加款失败,剩余重试次数:" + retryCount);
                try { Thread.sleep(1000); } catch (InterruptedException ex) {}
            }
        }
        // 重试失败,记录死信队列(人工介入)
        System.err.println("账户 " + dto.getToAccount() + " 加款失败,进入死信队列");
    }
}

3. 测试事务消息

java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.math.BigDecimal;

@SpringBootTest
public class TransactionMsgTest {
    @Autowired
    private TransactionMsgProducer producer;

    @Test
    public void testTransfer() throws InterruptedException {
        // 模拟转账:账户 A 转 100 元到账户 B
        producer.sendTransactionMsg("A", "B", new BigDecimal(100));
        Thread.sleep(10000); // 等待事务执行和消费
    }
}

四、进阶场景 2:延迟消息(定时任务)

RocketMQ 支持固定级别的延迟消息(不支持自定义时间),适用于「订单超时关闭、定时提醒」等场景。

1. 延迟消息生产者

java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class DelayMsgProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送延迟消息
     * RocketMQ 延迟级别:1=1s, 2=5s, 3=10s, 4=30s, 5=1m, 6=2m, 7=3m, 8=4m, 9=5m, 10=6m, 11=7m, 12=8m, 13=9m, 14=10m, 15=20m, 16=30m, 17=1h, 18=2h
     */
    public void sendDelayMsg(String orderId) {
        String msg = "订单超时关闭:" + orderId;
        // 构建延迟消息(延迟级别 5 = 1 分钟)
        rocketMQTemplate.syncSend(
                "delay_topic:order",
                MessageBuilder.withPayload(msg).build(),
                3000,  // 发送超时
                5      // 延迟级别
        );
        System.out.println("延迟消息发送成功(1分钟后消费):" + msg);
    }
}

2. 延迟消息消费者

java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        topic = "delay_topic",
        consumerGroup = "delay-consumer-group",
        selectorExpression = "order"
)
public class DelayMsgConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("消费延迟消息:" + msg);
        // 业务逻辑:关闭超时未支付的订单
    }
}

3. 测试延迟消息

java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DelayMsgTest {
    @Autowired
    private DelayMsgProducer producer;

    @Test
    public void testDelayMsg() throws InterruptedException {
        producer.sendDelayMsg("ORDER_20251229001");
        // 等待延迟消息消费
        Thread.sleep(70000); // 等待 70 秒
    }
}

五、核心运维操作

1. RocketMQ 常用命令

bash
运行
# 查看主题列表
mqadmin topicList -n 127.0.0.1:9876

# 创建主题
mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t demo_topic

# 查看消息消费进度
mqadmin consumerProgress -n 127.0.0.1:9876 -g demo-consumer-group

# 查看死信队列
mqadmin queryDLQ -n 127.0.0.1:9876 -g demo-consumer-group -t demo_topic

# 停止 RocketMQ
mqshutdown broker
mqshutdown namesrv

2. 关键配置优化

场景 优化配置 作用
消息可靠性 producer.retry-times-when-send-failed=3 发送失败重试,提升送达率
消费幂等 消费端基于 msgId / 业务唯一 ID 去重 避免重复消费导致数据不一致
堆积处理 consumer.consume-thread-max=50 增加消费线程,加快堆积消息处理
死信队列 配置死信主题 + 重试次数 消费失败消息进入死信,避免阻塞

六、核心总结

RocketMQ + Java 的核心应用场景及关键要点:
  1. 普通消息:同步 / 异步 / 单向消息适配不同可靠性需求,标签过滤实现消息分类;
  2. 事务消息:解决分布式事务最终一致性,核心是「半消息 + 本地事务 + 回查」;
  3. 延迟消息:利用固定延迟级别实现定时任务,替代传统定时任务框架;
  4. 可靠性保障:生产端重试、消费端手动 Ack、消息持久化是核心;
  5. 运维关键:监控消息堆积、消费进度,配置死信队列处理异常消息。
以上示例可直接在 Spring Boot 项目中运行,只需替换 RocketMQ 地址为实际部署地址,即可适配电商、金融、物流等绝大多数业务场景。
阅读剩余
THE END