保证 RocketMQ 事务消息的最终一致性(即「本地事务执行」与「消息投递 / 消费」的最终一致),核心是依托 RocketMQ 事务消息的「半消息 + 本地事务 + 事务回查」机制,同时配合生产端可靠性、消费端幂等性、异常兜底策略形成闭环。以下是分维度的落地方案,覆盖核心原理、关键措施和避坑要点:
RocketMQ 事务消息的最终一致性依赖「两阶段提交 + 回查兜底」,核心流程保证:
- 第一阶段:生产者发送「半消息」(Broker 暂存,不可投递),确保消息能成功写入 Broker;
- 第二阶段:执行本地事务,根据结果向 Broker 发送「提交 / 回滚」指令:
- 提交:Broker 投递消息给消费者,保证「本地事务成功 → 消息必投递」;
- 回滚:Broker 删除半消息,保证「本地事务失败 → 消息不投递」;
- 兜底机制:若生产者宕机 / 网络异常导致 Broker 未收到指令,Broker 定时回查生产者,确认本地事务状态后补全操作,避免「状态丢失」。
生产端是最终一致性的核心,需确保「本地事务执行结果」与「消息提交 / 回滚」严格匹配:
- 问题:若本地事务状态仅存于内存,生产者宕机后,Broker 回查时无法获取真实状态,导致一致性破坏;
- 解决方案:将本地事务状态(成功 / 失败)写入数据库 / 事务日志,回查时从持久化存储读取,而非内存。
private boolean doLocalBiz(String orderId) {
boolean success = deductStock(orderId);
saveTxLog(orderId, success ? "SUCCESS" : "FAIL");
return success;
}
private boolean checkLocalBiz(String orderId) {
TxLog txLog = getTxLogByOrderId(orderId);
return "SUCCESS".equals(txLog.getStatus());
}
- 问题:Broker 回查可能多次触发本地事务状态校验,若本地事务非幂等,重复执行会导致数据错误;
- 解决方案:
- 本地事务操作(如扣减库存、更新订单)基于「业务唯一 ID(如订单 ID)」做幂等;
- 避免本地事务包含「非幂等操作」(如新增记录、调用第三方接口),可通过「状态机 + 乐观锁」控制。
private boolean deductStock(String orderId) {
int affected = jdbcTemplate.update(
"UPDATE t_stock SET num = num - 1 WHERE order_id = ? AND num > 0 AND status = 'UNDO'",
orderId
);
return affected > 0;
}
Broker 的回查机制是生产端的兜底,需合理配置确保回查有效:
- 本地事务执行异常:返回
UNKNOWN 触发回查,而非直接回滚(避免因临时异常误判);
- 半消息发送失败:开启生产者重试(
retry-times-when-send-failed=3),确保半消息能写入 Broker;
- 提交 / 回滚指令发送失败:无需处理,依赖 Broker 回查兜底。
消费端需确保「消息一旦投递,业务逻辑必执行成功」,避免「消息收到但业务未处理」的一致性问题:
- 问题:Broker 重试机制、网络异常会导致消息重复投递,重复消费会破坏数据一致性;
- 解决方案:基于「业务唯一 ID(如订单 ID)」或「RocketMQ 消息 ID」做幂等校验:
@Override
public void onMessage(String msg) {
OrderPayDTO dto = JSON.parseObject(msg, OrderPayDTO.class);
String orderId = dto.getOrderId();
if (checkConsumed(orderId)) {
return;
}
createLogistics(orderId);
saveConsumeLog(orderId);
}
- 默认机制:RocketMQ 消费失败后会自动重试(默认 16 次),重试间隔逐渐延长;
- 优化配置:根据业务场景调整重试次数(如
retry-times-when-consume-failed=3),避免过多重试导致消息堆积;
- 重试耗尽处理:消息进入「死信队列(DLQ)」,需监控死信队列并人工介入处理(避免消息丢失)。
- 消费逻辑避免依赖「一次性资源」(如临时文件、第三方接口单次调用);
- 若调用第三方接口失败,可通过「定时任务补偿」,确保业务最终执行成功。
针对极端异常场景(如 Broker 宕机、网络分区),需补充全局兜底策略:
- 监控 Broker 中「待回查的半消息数量」:若数量激增,说明生产端可能异常,及时告警;
- 监控死信队列:若有消息进入,立即触发告警,人工介入处理;
- 监控消费堆积:若消费进度落后,及时扩容消费线程或排查消费逻辑。
- 针对「本地事务成功但消息未投递」「消息投递但消费失败」的场景,开发定时补偿任务:
- 扫描「本地事务成功但无消费记录」的订单,主动触发消费逻辑;
- 扫描死信队列,重试消费失败的消息。
@Scheduled(fixedRate = 300000)
public void compensateTxMsg() {
List<String> unConsumedOrderIds = getUnConsumedOrderIds();
for (String orderId : unConsumedOrderIds) {
createLogistics(orderId);
saveConsumeLog(orderId);
}
}
定期(如每天凌晨)执行「生产端事务日志」与「消费端消费日志」的对账:
- 若「生产端事务成功」但「消费端无消费记录」,触发补偿;
- 若「生产端事务失败」但「消费端有消费记录」,执行回滚操作(如恢复库存、撤销物流单)。
RocketMQ 事务消息的最终一致性是「生产端 + 消费端 + 全局兜底」的三重保障:
- 生产端:半消息保证消息能写入 Broker,本地事务状态持久化 + 回查兜底保证「本地事务结果 ↔ 消息状态」一致;
- 消费端:幂等消费 + 失败重试保证「消息投递 → 业务必执行成功」;
- 全局兜底:监控告警 + 定时补偿 + 数据对账,解决极端异常场景的一致性问题。
这套方案能覆盖 99.99% 的业务场景,实现「本地事务执行」与「消息消费」的最终一致性,是电商、金融等核心业务的标准落地方式。