RocketMQ 事务消息的「半消息(Half Message)」是实现「消息投递与本地事务一致性」的核心机制,本质是Broker 接收到但标记为「暂不可投递」的特殊消息,其实现依赖 RocketMQ 对消息存储、状态标记、事务回查的特殊设计。以下从「底层实现原理、核心流程、关键机制」三方面拆解半消息的实现逻辑:
一、半消息的核心定义
半消息是事务消息的「中间态消息」:
- 生产者发送半消息后,Broker 会持久化该消息,但不会立即投递到消费者(标记为「暂不可投递」);
- 仅当生产者提交事务(返回
COMMIT)后,Broker 才将半消息标记为「可投递」,推送给消费者; - 若生产者回滚事务(返回
ROLLBACK),Broker 会直接删除半消息,消费者永远无法收到。
二、半消息的底层实现原理
1. 消息存储结构的特殊标记
RocketMQ 的 Broker 存储消息时,会在「消息存储单元(CommitLog)」中为半消息添加事务状态标记(TransactionStatus),核心字段包括:
| 字段 | 作用 |
|---|---|
transactionId |
事务唯一 ID(生产者生成,用于事务回查时关联半消息与本地事务) |
transactionStatus |
半消息状态:PREPARED(暂存)/ COMMITTED(已提交)/ ROLLBACKED(已回滚) |
producerGroup |
事务生产者组(用于 Broker 回查生产者时定位对应的事务监听器) |
关键区别:普通消息的
transactionStatus 为 NONE,Broker 接收后直接标记为「可投递」;半消息初始状态为 PREPARED,需等待生产者指令更新状态。2. 半消息的存储隔离
RocketMQ 为半消息设计了独立的存储逻辑:
- 半消息先写入 Broker 的 CommitLog(与普通消息共用存储),但在「消费队列(ConsumeQueue)」中暂不生成索引(消费者通过 ConsumeQueue 拉取消息,无索引则无法消费);
- 仅当半消息状态更新为
COMMITTED后,Broker 才会为其生成 ConsumeQueue 索引,消费者才能拉取到该消息; - 若状态为
ROLLBACKED,Broker 会删除 CommitLog 中的半消息(或标记为已删除),并清理相关索引。
三、半消息的完整生命周期(核心流程)
半消息的实现分为「发送、暂存、状态更新、投递 / 删除」四个阶段,全程由 Broker 的「事务消息处理器(TransactionMessageService)」管控:
阶段 1:生产者发送半消息
- 生产者调用
sendMessageInTransaction发送事务消息; - RocketMQ 客户端先将消息标记为「半消息」(设置
transactionStatus=PREPARED),发送到 Broker; - Broker 接收后,将半消息写入 CommitLog,但不生成 ConsumeQueue 索引,并返回「发送成功」给生产者。
阶段 2:生产者执行本地事务
- 生产者收到 Broker 的半消息确认后,触发
executeLocalTransaction执行本地事务(如扣减库存、更新订单状态); - 本地事务执行完成后,生产者向 Broker 发送「事务状态指令」(
COMMIT/ROLLBACK/UNKNOWN)。
阶段 3:Broker 更新半消息状态
- 情况 1:收到
COMMIT指令Broker 将半消息的
transactionStatus改为COMMITTED,并为其生成 ConsumeQueue 索引 → 消费者可拉取并消费该消息。 - 情况 2:收到
ROLLBACK指令Broker 将半消息的
transactionStatus改为ROLLBACKED,删除 CommitLog 中的半消息 → 消费者永远收不到。 - 情况 3:收到
UNKNOWN或未收到指令Broker 将半消息标记为「待回查」,启动「事务回查定时任务」。
阶段 4:事务回查(兜底机制)
- Broker 的「事务回查线程池」定时扫描状态为
PREPARED且超过回查阈值的半消息; - 根据半消息中的
producerGroup定位到对应的生产者,发送「事务回查请求」; - 生产者执行
checkLocalTransaction回查本地事务状态,返回COMMIT/ROLLBACK/UNKNOWN; - Broker 根据回查结果更新半消息状态(重复阶段 3 的逻辑),最多回查 15 次(可配置),仍为
UNKNOWN则标记为ROLLBACKED。
四、半消息实现的关键机制
1. 事务状态的持久化
Broker 会将半消息的事务状态(
PREPARED/COMMITTED/ROLLBACKED)持久化到「事务状态表(TransactionalMessageTable)」,即使 Broker 宕机重启,也能恢复半消息的状态,保证流程不中断。2. 回查机制的触发条件
Broker 触发事务回查的核心条件:
- 生产者发送半消息后,未在指定时间内(默认 60 秒)发送事务状态指令;
- 生产者返回
UNKNOWN状态(本地事务执行异常); - 生产者宕机,无法主动发送状态指令。
3. 半消息与普通消息的存储区别
| 存储维度 | 普通消息 | 半消息 |
|---|---|---|
| CommitLog 写入 | 直接写入,无特殊标记 | 写入时标记 transactionStatus=PREPARED |
| ConsumeQueue 索引 | 写入后立即生成 | 仅提交后生成,回滚则删除 |
| 事务状态表 | 无记录 | 记录 transactionId 和状态 |
五、半消息实现的核心代码(Broker 侧简化逻辑)
以下是 Broker 处理半消息的核心逻辑伪代码,帮助理解底层实现:
java
运行
// Broker 接收半消息的处理逻辑
public void processHalfMessage(Message msg) {
// 1. 标记为半消息(PREPARED 状态)
msg.setTransactionStatus(TransactionStatus.PREPARED);
// 2. 写入 CommitLog(持久化)
commitLog.putMessage(msg);
// 3. 不生成 ConsumeQueue 索引(消费者无法拉取)
// 4. 记录到事务状态表
transactionTable.put(msg.getTransactionId(), msg);
// 5. 返回成功给生产者
return SendResult.SUCCESS;
}
// Broker 处理生产者的事务状态指令
public void processTransactionStatus(String transactionId, TransactionStatus status) {
Message msg = transactionTable.get(transactionId);
if (status == TransactionStatus.COMMIT) {
// 提交:生成 ConsumeQueue 索引,消费者可消费
consumeQueue.createIndex(msg);
msg.setTransactionStatus(TransactionStatus.COMMITTED);
} else if (status == TransactionStatus.ROLLBACK) {
// 回滚:删除 CommitLog 和事务表记录
commitLog.deleteMessage(msg);
transactionTable.remove(transactionId);
} else {
// UNKNOWN:标记为待回查
msg.setNeedCheck(true);
}
}
// 事务回查定时任务
public void transactionCheckTask() {
// 扫描待回查的半消息
List<Message> needCheckMsgs = transactionTable.scanNeedCheckMessages();
for (Message msg : needCheckMsgs) {
// 调用生产者的回查接口
TransactionStatus status = producer.checkLocalTransaction(msg.getTransactionId());
// 更新半消息状态
processTransactionStatus(msg.getTransactionId(), status);
}
}
六、核心总结
半消息的实现核心是「状态标记 + 存储隔离 + 回查兜底」:
- 状态标记:通过
transactionStatus区分半消息的中间态,实现「暂不可投递」; - 存储隔离:半消息写入 CommitLog 但不生成消费索引,仅提交后才对消费者可见;
- 回查兜底:Broker 主动回查生产者,解决「生产者宕机 / 网络异常导致状态丢失」的问题。
半消息是 RocketMQ 事务消息的「灵魂」,通过这种设计,实现了「本地事务执行」与「消息投递」的解耦,最终保证分布式事务的最终一致性。