RocketMQ 事务消息是解决分布式事务最终一致性的核心方案,核心原理是「半消息 + 本地事务执行 + 事务回查」,确保「本地事务执行成功则消息投递,执行失败则消息丢弃」。以下从原理、核心流程、代码实现、关键配置、避坑要点全维度讲解如何实现。
RocketMQ 事务消息本质是「两阶段提交」的变种,通过「半消息」机制解决「消息发送与本地事务执行不一致」的问题:
- 发送半消息:生产者向 Broker 发送半消息,Broker 持久化后返回确认;
- 执行本地事务:生产者接收到半消息确认后,执行本地核心业务逻辑;
- 提交 / 回滚事务:本地事务成功则发送
COMMIT 指令(Broker 投递消息给消费者),失败则发送 ROLLBACK 指令(Broker 删除半消息);
- 事务回查:若生产者宕机 / 网络异常导致 Broker 未收到状态指令,Broker 会定时回查生产者,确认本地事务状态后再处理消息。
- 已部署 RocketMQ(NameServer + Broker),版本 ≥4.3.0(事务消息最低支持版本);
- Spring Boot 项目引入 RocketMQ 依赖(推荐
rocketmq-spring-boot-starter:2.2.3);
- 配置 RocketMQ 地址(
application.yml):
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: tx-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
生产者需实现「发送半消息 + 执行本地事务 + 事务回查」三个核心逻辑:
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.UUID;
class TransferDTO {
private String fromAccount;
private String toAccount;
private BigDecimal amount;
public TransferDTO(String fromAccount, String toAccount, BigDecimal amount) {
this.fromAccount = fromAccount;
this.toAccount = toAccount;
this.amount = amount;
}
}
@Service
public class RocketMQTxProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTxMessage(String fromAccount, String toAccount, BigDecimal amount) {
TransferDTO dto = new TransferDTO(fromAccount, toAccount, amount);
String msgBody = JSON.toJSONString(dto);
Message<String> txMessage = MessageBuilder
.withPayload(msgBody)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
rocketMQTemplate.sendMessageInTransaction(
"transfer_topic:tx_tag",
txMessage,
null
);
}
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
boolean txSuccess = deductAccount(dto.getFromAccount(), dto.getAmount());
if (txSuccess) {
System.out.println("本地事务执行成功:扣减账户 " + dto.getFromAccount() + " 余额 " + dto.getAmount());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("本地事务执行失败:账户 " + dto.getFromAccount() + " 扣减失败");
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("本地事务执行异常:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
boolean txSuccess = checkAccountDeduct(dto.getFromAccount(), dto.getAmount());
if (txSuccess) {
System.out.println("事务回查成功:账户 " + dto.getFromAccount() + " 扣减已确认");
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("事务回查失败:账户 " + dto.getFromAccount() + " 扣减未执行");
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
System.err.println("事务回查异常:" + e.getMessage());
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
private boolean deductAccount(String account, BigDecimal amount) {
return true;
}
private boolean checkAccountDeduct(String account, BigDecimal amount) {
return true;
}
}
}
消费者只需监听指定主题,消费逻辑需保证「幂等性」(避免重复消费导致数据不一致):
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "transfer_topic",
consumerGroup = "tx-consumer-group",
selectorExpression = "tx_tag"
)
public class RocketMQTxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
try {
TransferDTO dto = JSON.parseObject(msg, TransferDTO.class);
boolean consumeSuccess = addAccount(dto.getToAccount(), dto.getAmount());
if (consumeSuccess) {
System.out.println("消费成功:给账户 " + dto.getToAccount() + " 加款 " + dto.getAmount());
} else {
System.err.println("消费失败:账户 " + dto.getToAccount() + " 加款失败");
throw new RuntimeException("加款失败,触发重试");
}
} catch (Exception e) {
System.err.println("消费异常:" + e.getMessage());
throw new RuntimeException(e);
}
}
private boolean addAccount(String account, BigDecimal amount) {
return true;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.math.BigDecimal;
@SpringBootTest
public class TxMessageTest {
@Autowired
private RocketMQTxProducer txProducer;
@Test
public void testTxMessage() throws InterruptedException {
txProducer.sendTxMessage("A", "B", new BigDecimal(100));
Thread.sleep(10000);
}
}
修改 broker.conf,调整回查频率和次数:
transactionCheckInterval=10000
transactionCheckMax=10
transactionCheckThreadPoolNums=5
rocketmq:
producer:
retry-times-when-send-failed: 3
retry-times-when-send-async-failed: 3
compress-message-body-threshold: 4096
消费端必须实现幂等,否则重复消费会导致数据错误,常用方案:
- 基于业务唯一 ID:如转账流水号,消费前先查是否已处理;
- 基于消息 ID:RocketMQ 消息 ID 全局唯一,可缓存已消费的 msgId;
- 数据库乐观锁:如
UPDATE account SET amount = amount + 100 WHERE id = 'B' AND version = 1。
- 现象:Broker 收到半消息,但本地事务执行失败,返回
ROLLBACK;
- 结果:Broker 删除半消息,消费者不会收到消息,数据一致;
- 避坑:本地事务必须包含「幂等校验」,避免重复执行。
- 现象:本地事务执行成功,但网络异常导致 Broker 未收到
COMMIT;
- 结果:Broker 触发事务回查,生产者回查确认后仍会提交消息;
- 避坑:回查逻辑必须能准确获取本地事务状态(需落地事务日志)。
- 现象:回查时本地事务状态无法确认,持续返回
UNKNOWN;
- 结果:超过最大回查次数后,消息进入「死信队列」;
- 避坑:死信队列需监控,人工介入处理异常消息。
- 原因:Broker 重试机制、网络异常导致重复投递;
- 解决方案:消费端严格实现幂等,避免重复执行业务逻辑。
RocketMQ 事务消息实现的核心是「半消息 + 本地事务 + 回查」,关键要点:
- 生产者:必须实现
RocketMQLocalTransactionListener,包含「本地事务执行」和「事务回查」逻辑;
- 可靠性:本地事务状态需落地(如数据库 / 日志),确保回查能准确获取状态;
- 消费端:必须幂等,避免重复消费导致数据不一致;
- 运维:监控事务回查次数、死信队列,及时处理异常消息。
该方案适用于「最终一致性」场景(如电商订单、资金转账、物流同步),无需强一致性的分布式锁 / 2PC,是分布式系统中最常用的事务解决方案。