保证 RocketMQ 事务消息的最终一致性(即「本地事务执行结果」与「消息投递 / 消费结果」最终一致),核心是构建「生产端一致性 + 消费端一致性 + 全局兜底补偿」的闭环体系,依托 RocketMQ 事务消息的原生机制,结合业务层的可靠性设计。以下是可落地的全流程方案,覆盖核心原理、关键措施和实操细节:
RocketMQ 事务消息的最终一致性,底层依赖「半消息 + 本地事务 + 事务回查」的原生机制:
- 半消息:Broker 暂存消息但不投递,保证「消息能成功写入 Broker」;
- 本地事务绑定:消息投递与否完全依赖本地事务执行结果;
- 事务回查:Broker 主动校验本地事务状态,解决「生产者宕机 / 网络异常导致状态丢失」的问题。
在此基础上,需通过业务层设计弥补原生机制的不足,形成完整闭环。
生产端是一致性的核心入口,需确保「本地事务成功则消息必投递,本地事务失败则消息必回滚」。
问题:若本地事务状态仅存于内存,生产者宕机后,Broker 回查时无法获取真实状态,导致一致性破坏。解决方案:将本地事务状态写入数据库 / 事务日志表,回查时从持久化存储读取(而非内存)。
private boolean executeLocalTx(String orderId) {
boolean bizSuccess = deductStock(orderId);
saveTxLog(orderId, bizSuccess ? "SUCCESS" : "FAIL");
return bizSuccess;
}
private boolean checkLocalTx(String orderId) {
TxLog txLog = txLogMapper.selectByOrderId(orderId);
return txLog != null && "SUCCESS".equals(txLog.getStatus());
}
问题:Broker 回查可能多次触发本地事务校验,非幂等操作会导致数据错误(如重复扣减库存)。解决方案:
- 本地事务操作基于「业务唯一 ID(如订单 ID)」做幂等;
- 避免包含「一次性操作」(如新增无主键记录、调用无幂等的第三方接口);
- 用「乐观锁 / 状态机」控制重复执行:
private boolean deductStock(String orderId) {
int affected = jdbcTemplate.update(
"UPDATE t_stock SET num = num - 1 WHERE order_id = ? AND num > 0 AND status = 'UNDO'",
orderId
);
return affected > 0;
}
Broker 的回查是生产端的最后保障,需合理配置确保回查有效:
- 半消息发送失败:开启生产者重试(
retry-times-when-send-failed=3),确保半消息能写入 Broker;
- 本地事务执行异常:返回
UNKNOWN 触发回查(而非直接回滚),避免临时异常误判;
- 提交 / 回滚指令发送失败:无需处理,依赖 Broker 回查兜底。
消费端需确保「消息一旦投递,业务逻辑必执行成功」,避免「消息收到但业务未处理」的一致性问题。
问题:Broker 重试、网络异常会导致消息重复投递,重复消费会破坏数据(如重复创建物流单)。解决方案:基于「业务唯一 ID(如订单 ID)」或「RocketMQ 消息 ID」做幂等校验,步骤:
- 消费前校验是否已处理;
- 处理成功后记录消费日志;
- 已处理则直接返回,不执行业务逻辑。
@Override
public void onMessage(String msg) {
OrderDTO dto = JSON.parseObject(msg, OrderDTO.class);
String orderId = dto.getOrderId();
if (consumeLogMapper.existsByOrderId(orderId)) {
System.out.println("消息已消费,跳过:" + orderId);
return;
}
createLogistics(orderId);
consumeLogMapper.insert(new ConsumeLog(orderId, "SUCCESS"));
}
- 默认机制:RocketMQ 消费失败后自动重试(默认 16 次),重试间隔逐渐延长;
- 优化配置:根据业务调整重试次数(如
retry-times-when-consume-failed=3),避免过多重试导致堆积;
- 重试耗尽处理:消息进入「死信队列(DLQ)」,需监控死信队列并人工介入(避免消息丢失)。
- 消费逻辑避免依赖「一次性资源」(如临时文件、第三方接口单次调用);
- 调用第三方接口失败时,优先重试(而非直接抛异常),或通过「定时任务补偿」。
针对 Broker 宕机、网络分区等极端场景,需补充「监控 + 补偿 + 对账」机制,确保最终一致。
- 监控 Broker 「待回查半消息数量」:数量激增说明生产端异常,触发告警;
- 监控死信队列:有消息进入则立即告警,人工介入;
- 监控消费堆积:消费进度落后超过阈值,扩容消费线程或排查逻辑。
开发定时任务,扫描「生产端成功但消费端未处理」的异常数据,主动补偿:
@Scheduled(fixedRate = 300000)
public void compensateTxMsg() {
List<String> unConsumedOrderIds = txLogMapper.selectSuccessButUnConsumed();
for (String orderId : unConsumedOrderIds) {
createLogistics(orderId);
consumeLogMapper.insert(new ConsumeLog(orderId, "SUCCESS"));
}
}
定期(如每天凌晨)执行「生产端事务日志」与「消费端消费日志」的对账:
- 若「生产端 SUCCESS」但「消费端无记录」:触发补偿;
- 若「生产端 FAIL」但「消费端有记录」:执行回滚(如恢复库存、撤销物流单);
- 对账结果输出异常报表,人工复核。
最终一致性 = 生产端(半消息 + 事务日志 + 回查)
+ 消费端(幂等 + 失败重试 + 消费日志)
+ 全局兜底(监控 + 定时补偿 + 对账)
这套方案能覆盖 99.99% 的业务场景,实现「本地事务执行」与「消息消费」的最终一致性,是电商订单、资金转账等核心业务的标准落地方式。核心原则:状态持久化、操作幂等化、异常可重试、全局有补偿。