RocketMQ 事务消息的「半消息(Half Message)」是实现「消息投递与本地事务一致性」的核心机制,本质是Broker 接收到但标记为「暂不可投递」的特殊消息,其实现依赖 RocketMQ 对消息存储、状态标记、事务回查的特殊设计。以下从「底层实现原理、核心流程、关键机制」三方面拆解半消息的实现逻辑:
半消息是事务消息的「中间态消息」:
- 生产者发送半消息后,Broker 会持久化该消息,但不会立即投递到消费者(标记为「暂不可投递」);
- 仅当生产者提交事务(返回
COMMIT)后,Broker 才将半消息标记为「可投递」,推送给消费者;
- 若生产者回滚事务(返回
ROLLBACK),Broker 会直接删除半消息,消费者永远无法收到。
RocketMQ 的 Broker 存储消息时,会在「消息存储单元(CommitLog)」中为半消息添加事务状态标记(TransactionStatus),核心字段包括:
关键区别:普通消息的 transactionStatus 为 NONE,Broker 接收后直接标记为「可投递」;半消息初始状态为 PREPARED,需等待生产者指令更新状态。
RocketMQ 为半消息设计了独立的存储逻辑:
- 半消息先写入 Broker 的 CommitLog(与普通消息共用存储),但在「消费队列(ConsumeQueue)」中暂不生成索引(消费者通过 ConsumeQueue 拉取消息,无索引则无法消费);
- 仅当半消息状态更新为
COMMITTED 后,Broker 才会为其生成 ConsumeQueue 索引,消费者才能拉取到该消息;
- 若状态为
ROLLBACKED,Broker 会删除 CommitLog 中的半消息(或标记为已删除),并清理相关索引。
半消息的实现分为「发送、暂存、状态更新、投递 / 删除」四个阶段,全程由 Broker 的「事务消息处理器(TransactionMessageService)」管控:
- 生产者调用
sendMessageInTransaction 发送事务消息;
- RocketMQ 客户端先将消息标记为「半消息」(设置
transactionStatus=PREPARED),发送到 Broker;
- Broker 接收后,将半消息写入 CommitLog,但不生成 ConsumeQueue 索引,并返回「发送成功」给生产者。
- 生产者收到 Broker 的半消息确认后,触发
executeLocalTransaction 执行本地事务(如扣减库存、更新订单状态);
- 本地事务执行完成后,生产者向 Broker 发送「事务状态指令」(
COMMIT/ROLLBACK/UNKNOWN)。
- 情况 1:收到
COMMIT 指令
Broker 将半消息的 transactionStatus 改为 COMMITTED,并为其生成 ConsumeQueue 索引 → 消费者可拉取并消费该消息。
- 情况 2:收到
ROLLBACK 指令
Broker 将半消息的 transactionStatus 改为 ROLLBACKED,删除 CommitLog 中的半消息 → 消费者永远收不到。
- 情况 3:收到
UNKNOWN 或未收到指令
Broker 将半消息标记为「待回查」,启动「事务回查定时任务」。
- Broker 的「事务回查线程池」定时扫描状态为
PREPARED 且超过回查阈值的半消息;
- 根据半消息中的
producerGroup 定位到对应的生产者,发送「事务回查请求」;
- 生产者执行
checkLocalTransaction 回查本地事务状态,返回 COMMIT/ROLLBACK/UNKNOWN;
- Broker 根据回查结果更新半消息状态(重复阶段 3 的逻辑),最多回查 15 次(可配置),仍为
UNKNOWN 则标记为 ROLLBACKED。
Broker 会将半消息的事务状态(PREPARED/COMMITTED/ROLLBACKED)持久化到「事务状态表(TransactionalMessageTable)」,即使 Broker 宕机重启,也能恢复半消息的状态,保证流程不中断。
Broker 触发事务回查的核心条件:
- 生产者发送半消息后,未在指定时间内(默认 60 秒)发送事务状态指令;
- 生产者返回
UNKNOWN 状态(本地事务执行异常);
- 生产者宕机,无法主动发送状态指令。
以下是 Broker 处理半消息的核心逻辑伪代码,帮助理解底层实现:
public void processHalfMessage(Message msg) {
msg.setTransactionStatus(TransactionStatus.PREPARED);
commitLog.putMessage(msg);
transactionTable.put(msg.getTransactionId(), msg);
return SendResult.SUCCESS;
}
public void processTransactionStatus(String transactionId, TransactionStatus status) {
Message msg = transactionTable.get(transactionId);
if (status == TransactionStatus.COMMIT) {
consumeQueue.createIndex(msg);
msg.setTransactionStatus(TransactionStatus.COMMITTED);
} else if (status == TransactionStatus.ROLLBACK) {
commitLog.deleteMessage(msg);
transactionTable.remove(transactionId);
} else {
msg.setNeedCheck(true);
}
}
public void transactionCheckTask() {
List<Message> needCheckMsgs = transactionTable.scanNeedCheckMessages();
for (Message msg : needCheckMsgs) {
TransactionStatus status = producer.checkLocalTransaction(msg.getTransactionId());
processTransactionStatus(msg.getTransactionId(), status);
}
}
半消息的实现核心是「状态标记 + 存储隔离 + 回查兜底」:
- 状态标记:通过
transactionStatus 区分半消息的中间态,实现「暂不可投递」;
- 存储隔离:半消息写入 CommitLog 但不生成消费索引,仅提交后才对消费者可见;
- 回查兜底:Broker 主动回查生产者,解决「生产者宕机 / 网络异常导致状态丢失」的问题。
半消息是 RocketMQ 事务消息的「灵魂」,通过这种设计,实现了「本地事务执行」与「消息投递」的解耦,最终保证分布式事务的最终一致性。