RocketMQ + Java 完整实操示例(Spring Boot 集成)
以下是基于 Spring Boot 2.x + RocketMQ 4.9.x 的完整实操,覆盖「消息生产、消费、事务消息、延迟消息、死信队列」核心场景,可直接落地运行。
一、环境准备
1. 安装 RocketMQ
(Linux 环境为例,Windows 可参考官方文档)
bash
运行
# 1. 下载并解压
wget https://archive.apache.org/dist/rocketmq/4.9.7/rocketmq-all-4.9.7-bin-release.zip
unzip rocketmq-all-4.9.7-bin-release.zip -d /usr/local/rocketmq
# 2. 配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
echo "export PATH=\$PATH:\$ROCKETMQ_HOME/bin" >> /etc/profile
source /etc/profile
# 3. 调整 JVM 内存(避免内存不足)
sed -i 's/-Xms4g -Xmx4g/-Xms512m -Xmx512m/g' /usr/local/rocketmq/bin/runserver.sh
sed -i 's/-Xms4g -Xmx4g/-Xms512m -Xmx512m/g' /usr/local/rocketmq/bin/runbroker.sh
# 4. 启动 NameServer 和 Broker
nohup sh mqnamesrv & # NameServer 默认端口 9876
nohup sh mqbroker -n 127.0.0.1:9876 & # 连接 NameServer 启动 Broker
2. 依赖引入(Spring Boot 项目 pom.xml)
xml
<dependencies>
<!-- Spring Boot 核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ 整合 Spring Boot -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- JSON 工具 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3. 配置文件(application.yml)
yaml
spring:
application:
name: rocketmq-demo
# RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876 # NameServer 地址
producer:
group: demo-producer-group # 生产者组(必须唯一)
send-message-timeout: 3000 # 发送超时时间
retry-times-when-send-failed: 2 # 发送失败重试次数
consumer:
group: demo-consumer-group # 消费者组(必须唯一)
consume-thread-max: 20 # 最大消费线程数
二、基础场景:普通消息生产与消费
1. 生产者(发送普通消息)
java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class NormalMsgProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通同步消息(等待 Broker 确认,最常用)
* @param topic 主题(需提前创建,RocketMQ 支持自动创建)
* @param msg 消息内容
*/
public void sendSyncMsg(String topic, String msg) {
// 格式:"topic:tag"(tag 用于消息过滤,可选)
rocketMQTemplate.syncSend(topic + ":normal", msg);
System.out.println("同步消息发送成功:" + msg);
}
/**
* 发送异步消息(不阻塞,通过回调获取结果)
*/
public void sendAsyncMsg(String topic, String msg) {
rocketMQTemplate.asyncSend(topic + ":async", msg,
// 发送成功回调
sendResult -> System.out.println("异步消息发送成功:" + sendResult),
// 发送失败回调
throwable -> System.err.println("异步消息发送失败:" + throwable.getMessage()));
}
/**
* 发送单向消息(无需确认,适用于日志、埋点等非核心场景)
*/
public void sendOneWayMsg(String topic, String msg) {
rocketMQTemplate.sendOneWay(topic + ":oneway", msg);
System.out.println("单向消息发送完成(无需确认):" + msg);
}
}
2. 消费者(消费普通消息)
java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* 消费者监听指定主题+标签的消息
* topic:监听的主题
* consumerGroup:消费者组(必须与配置文件一致)
* selectorExpression:标签过滤(* 表示所有标签)
*/
@Service
@RocketMQMessageListener(
topic = "demo_topic",
consumerGroup = "demo-consumer-group",
selectorExpression = "normal || async" // 只消费 normal 和 async 标签的消息
)
public class NormalMsgConsumer implements RocketMQListener<String> {
/**
* 消息消费逻辑(默认自动 Ack,消费异常会重试)
*/
@Override
public void onMessage(String msg) {
try {
// 业务处理:如解析消息、更新数据库等
System.out.println("消费普通消息:" + msg);
// 模拟业务处理
Thread.sleep(100);
} catch (Exception e) {
System.err.println("消费消息失败:" + e.getMessage());
// 消费失败会自动重试(默认 16 次,可配置)
throw new RuntimeException("消费失败,触发重试");
}
}
}
3. 测试类(验证生产消费)
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RocketMQDemoTest {
@Autowired
private NormalMsgProducer producer;
@Test
public void testNormalMsg() throws InterruptedException {
// 发送同步消息
producer.sendSyncMsg("demo_topic", "Hello RocketMQ - Sync");
// 发送异步消息
producer.sendAsyncMsg("demo_topic", "Hello RocketMQ - Async");
// 发送单向消息
producer.sendOneWayMsg("demo_topic", "Hello RocketMQ - OneWay");
// 等待消费完成
Thread.sleep(5000);
}
}
三、进阶场景 1:事务消息(保证最终一致性)
适用于「本地事务执行成功后,消息才允许消费」的场景(如订单支付、资金转账)。
1. 事务消息生产者
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.UUID;
// 模拟转账 DTO
class TransferDTO {
private String fromAccount;
private String toAccount;
private BigDecimal amount;
// 构造器、getter/setter 省略
public TransferDTO(String fromAccount, String toAccount, BigDecimal amount) {
this.fromAccount = fromAccount;
this.toAccount = toAccount;
this.amount = amount;
}
}
@Service
public class TransactionMsgProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息(半消息)
*/
public void sendTransactionMsg(String fromAccount, String toAccount, BigDecimal amount) {
TransferDTO dto = new TransferDTO(fromAccount, toAccount, amount);
// 构建消息体
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(dto))
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送半消息,触发本地事务
rocketMQTemplate.sendMessageInTransaction(
"transfer_topic:transfer", // 主题+标签
message, // 消息内容
null // 附加参数(可传本地事务需要的参数)
);
}
/**
* 本地事务监听器(核心:执行本地事务 + 事务回查)
*/
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "demo-producer-group")
public class TransferTransactionListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
/**
* 执行本地事务(扣减转出账户余额)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
try {
// 模拟本地事务:扣减 fromAccount 余额(实际场景需操作数据库)
System.out.println("本地事务执行:扣减账户 " + dto.getFromAccount() + " 余额 " + dto.getAmount());
// 模拟异常:注释掉下面一行,测试回滚逻辑
// throw new RuntimeException("扣减余额失败");
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT; // 提交消息
} catch (Exception e) {
System.err.println("本地事务执行失败:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK; // 回滚消息
}
}
/**
* 事务回查(解决生产者宕机导致的事务状态未知)
*/
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
// 模拟回查:检查本地事务是否执行成功(实际场景查数据库)
System.out.println("事务回查:检查账户 " + dto.getFromAccount() + " 扣减是否成功");
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
}
}
}
2. 事务消息消费者
java
运行
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "transfer_topic",
consumerGroup = "transfer-consumer-group",
selectorExpression = "transfer"
)
public class TransferMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
TransferDTO dto = JSON.parseObject(msg, TransferDTO.class);
// 消费逻辑:给转入账户加款(失败重试保证最终一致性)
int retryCount = 3; // 重试次数
while (retryCount > 0) {
try {
System.out.println("消费事务消息:给账户 " + dto.getToAccount() + " 加款 " + dto.getAmount());
// 模拟业务处理
Thread.sleep(200);
return; // 消费成功,退出重试
} catch (Exception e) {
retryCount--;
System.err.println("加款失败,剩余重试次数:" + retryCount);
try { Thread.sleep(1000); } catch (InterruptedException ex) {}
}
}
// 重试失败,记录死信队列(人工介入)
System.err.println("账户 " + dto.getToAccount() + " 加款失败,进入死信队列");
}
}
3. 测试事务消息
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
@SpringBootTest
public class TransactionMsgTest {
@Autowired
private TransactionMsgProducer producer;
@Test
public void testTransfer() throws InterruptedException {
// 模拟转账:账户 A 转 100 元到账户 B
producer.sendTransactionMsg("A", "B", new BigDecimal(100));
Thread.sleep(10000); // 等待事务执行和消费
}
}
四、进阶场景 2:延迟消息(定时任务)
RocketMQ 支持固定级别的延迟消息(不支持自定义时间),适用于「订单超时关闭、定时提醒」等场景。
1. 延迟消息生产者
java
运行
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class DelayMsgProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息
* RocketMQ 延迟级别:1=1s, 2=5s, 3=10s, 4=30s, 5=1m, 6=2m, 7=3m, 8=4m, 9=5m, 10=6m, 11=7m, 12=8m, 13=9m, 14=10m, 15=20m, 16=30m, 17=1h, 18=2h
*/
public void sendDelayMsg(String orderId) {
String msg = "订单超时关闭:" + orderId;
// 构建延迟消息(延迟级别 5 = 1 分钟)
rocketMQTemplate.syncSend(
"delay_topic:order",
MessageBuilder.withPayload(msg).build(),
3000, // 发送超时
5 // 延迟级别
);
System.out.println("延迟消息发送成功(1分钟后消费):" + msg);
}
}
2. 延迟消息消费者
java
运行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "delay_topic",
consumerGroup = "delay-consumer-group",
selectorExpression = "order"
)
public class DelayMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消费延迟消息:" + msg);
// 业务逻辑:关闭超时未支付的订单
}
}
3. 测试延迟消息
java
运行
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DelayMsgTest {
@Autowired
private DelayMsgProducer producer;
@Test
public void testDelayMsg() throws InterruptedException {
producer.sendDelayMsg("ORDER_20251229001");
// 等待延迟消息消费
Thread.sleep(70000); // 等待 70 秒
}
}
五、核心运维操作
1. RocketMQ 常用命令
bash
运行
# 查看主题列表
mqadmin topicList -n 127.0.0.1:9876
# 创建主题
mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t demo_topic
# 查看消息消费进度
mqadmin consumerProgress -n 127.0.0.1:9876 -g demo-consumer-group
# 查看死信队列
mqadmin queryDLQ -n 127.0.0.1:9876 -g demo-consumer-group -t demo_topic
# 停止 RocketMQ
mqshutdown broker
mqshutdown namesrv
2. 关键配置优化
| 场景 | 优化配置 | 作用 |
|---|---|---|
| 消息可靠性 | producer.retry-times-when-send-failed=3 | 发送失败重试,提升送达率 |
| 消费幂等 | 消费端基于 msgId / 业务唯一 ID 去重 | 避免重复消费导致数据不一致 |
| 堆积处理 | consumer.consume-thread-max=50 | 增加消费线程,加快堆积消息处理 |
| 死信队列 | 配置死信主题 + 重试次数 | 消费失败消息进入死信,避免阻塞 |
六、核心总结
RocketMQ + Java 的核心应用场景及关键要点:
- 普通消息:同步 / 异步 / 单向消息适配不同可靠性需求,标签过滤实现消息分类;
- 事务消息:解决分布式事务最终一致性,核心是「半消息 + 本地事务 + 回查」;
- 延迟消息:利用固定延迟级别实现定时任务,替代传统定时任务框架;
- 可靠性保障:生产端重试、消费端手动 Ack、消息持久化是核心;
- 运维关键:监控消息堆积、消费进度,配置死信队列处理异常消息。
以上示例可直接在 Spring Boot 项目中运行,只需替换 RocketMQ 地址为实际部署地址,即可适配电商、金融、物流等绝大多数业务场景。
阅读剩余
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9916.html
文章版权归作者所有,未经允许请勿转载。
THE END
相关推荐