事务消息的半消息阶段是如何实现的?

RocketMQ 事务消息的「半消息(Half Message)」是实现「消息投递与本地事务一致性」的核心机制,本质是Broker 接收到但标记为「暂不可投递」的特殊消息,其实现依赖 RocketMQ 对消息存储、状态标记、事务回查的特殊设计。以下从「底层实现原理、核心流程、关键机制」三方面拆解半消息的实现逻辑:

一、半消息的核心定义

半消息是事务消息的「中间态消息」:
  • 生产者发送半消息后,Broker 会持久化该消息,但不会立即投递到消费者(标记为「暂不可投递」);
  • 仅当生产者提交事务(返回 COMMIT)后,Broker 才将半消息标记为「可投递」,推送给消费者;
  • 若生产者回滚事务(返回 ROLLBACK),Broker 会直接删除半消息,消费者永远无法收到。

二、半消息的底层实现原理

1. 消息存储结构的特殊标记

RocketMQ 的 Broker 存储消息时,会在「消息存储单元(CommitLog)」中为半消息添加事务状态标记(TransactionStatus),核心字段包括:
字段 作用
transactionId 事务唯一 ID(生产者生成,用于事务回查时关联半消息与本地事务)
transactionStatus 半消息状态:PREPARED(暂存)/ COMMITTED(已提交)/ ROLLBACKED(已回滚)
producerGroup 事务生产者组(用于 Broker 回查生产者时定位对应的事务监听器)
关键区别:普通消息的 transactionStatus 为 NONE,Broker 接收后直接标记为「可投递」;半消息初始状态为 PREPARED,需等待生产者指令更新状态。

2. 半消息的存储隔离

RocketMQ 为半消息设计了独立的存储逻辑
  • 半消息先写入 Broker 的 CommitLog(与普通消息共用存储),但在「消费队列(ConsumeQueue)」中暂不生成索引(消费者通过 ConsumeQueue 拉取消息,无索引则无法消费);
  • 仅当半消息状态更新为 COMMITTED 后,Broker 才会为其生成 ConsumeQueue 索引,消费者才能拉取到该消息;
  • 若状态为 ROLLBACKED,Broker 会删除 CommitLog 中的半消息(或标记为已删除),并清理相关索引。

三、半消息的完整生命周期(核心流程)

半消息的实现分为「发送、暂存、状态更新、投递 / 删除」四个阶段,全程由 Broker 的「事务消息处理器(TransactionMessageService)」管控:

阶段 1:生产者发送半消息

  1. 生产者调用 sendMessageInTransaction 发送事务消息;
  2. RocketMQ 客户端先将消息标记为「半消息」(设置 transactionStatus=PREPARED),发送到 Broker;
  3. Broker 接收后,将半消息写入 CommitLog,但不生成 ConsumeQueue 索引,并返回「发送成功」给生产者。

阶段 2:生产者执行本地事务

  1. 生产者收到 Broker 的半消息确认后,触发 executeLocalTransaction 执行本地事务(如扣减库存、更新订单状态);
  2. 本地事务执行完成后,生产者向 Broker 发送「事务状态指令」(COMMIT/ROLLBACK/UNKNOWN)。

阶段 3:Broker 更新半消息状态

  • 情况 1:收到 COMMIT 指令

    Broker 将半消息的 transactionStatus 改为 COMMITTED,并为其生成 ConsumeQueue 索引 → 消费者可拉取并消费该消息。

  • 情况 2:收到 ROLLBACK 指令

    Broker 将半消息的 transactionStatus 改为 ROLLBACKED,删除 CommitLog 中的半消息 → 消费者永远收不到。

  • 情况 3:收到 UNKNOWN 或未收到指令

    Broker 将半消息标记为「待回查」,启动「事务回查定时任务」。

阶段 4:事务回查(兜底机制)

  1. Broker 的「事务回查线程池」定时扫描状态为 PREPARED 且超过回查阈值的半消息;
  2. 根据半消息中的 producerGroup 定位到对应的生产者,发送「事务回查请求」;
  3. 生产者执行 checkLocalTransaction 回查本地事务状态,返回 COMMIT/ROLLBACK/UNKNOWN
  4. Broker 根据回查结果更新半消息状态(重复阶段 3 的逻辑),最多回查 15 次(可配置),仍为 UNKNOWN 则标记为 ROLLBACKED

四、半消息实现的关键机制

1. 事务状态的持久化

Broker 会将半消息的事务状态(PREPARED/COMMITTED/ROLLBACKED)持久化到「事务状态表(TransactionalMessageTable)」,即使 Broker 宕机重启,也能恢复半消息的状态,保证流程不中断。

2. 回查机制的触发条件

Broker 触发事务回查的核心条件:
  • 生产者发送半消息后,未在指定时间内(默认 60 秒)发送事务状态指令;
  • 生产者返回 UNKNOWN 状态(本地事务执行异常);
  • 生产者宕机,无法主动发送状态指令。

3. 半消息与普通消息的存储区别

存储维度 普通消息 半消息
CommitLog 写入 直接写入,无特殊标记 写入时标记 transactionStatus=PREPARED
ConsumeQueue 索引 写入后立即生成 仅提交后生成,回滚则删除
事务状态表 无记录 记录 transactionId 和状态

五、半消息实现的核心代码(Broker 侧简化逻辑)

以下是 Broker 处理半消息的核心逻辑伪代码,帮助理解底层实现:
java
运行
// Broker 接收半消息的处理逻辑
public void processHalfMessage(Message msg) {
    // 1. 标记为半消息(PREPARED 状态)
    msg.setTransactionStatus(TransactionStatus.PREPARED);
    // 2. 写入 CommitLog(持久化)
    commitLog.putMessage(msg);
    // 3. 不生成 ConsumeQueue 索引(消费者无法拉取)
    // 4. 记录到事务状态表
    transactionTable.put(msg.getTransactionId(), msg);
    // 5. 返回成功给生产者
    return SendResult.SUCCESS;
}

// Broker 处理生产者的事务状态指令
public void processTransactionStatus(String transactionId, TransactionStatus status) {
    Message msg = transactionTable.get(transactionId);
    if (status == TransactionStatus.COMMIT) {
        // 提交:生成 ConsumeQueue 索引,消费者可消费
        consumeQueue.createIndex(msg);
        msg.setTransactionStatus(TransactionStatus.COMMITTED);
    } else if (status == TransactionStatus.ROLLBACK) {
        // 回滚:删除 CommitLog 和事务表记录
        commitLog.deleteMessage(msg);
        transactionTable.remove(transactionId);
    } else {
        // UNKNOWN:标记为待回查
        msg.setNeedCheck(true);
    }
}

// 事务回查定时任务
public void transactionCheckTask() {
    // 扫描待回查的半消息
    List<Message> needCheckMsgs = transactionTable.scanNeedCheckMessages();
    for (Message msg : needCheckMsgs) {
        // 调用生产者的回查接口
        TransactionStatus status = producer.checkLocalTransaction(msg.getTransactionId());
        // 更新半消息状态
        processTransactionStatus(msg.getTransactionId(), status);
    }
}

六、核心总结

半消息的实现核心是「状态标记 + 存储隔离 + 回查兜底」:
  1. 状态标记:通过 transactionStatus 区分半消息的中间态,实现「暂不可投递」;
  2. 存储隔离:半消息写入 CommitLog 但不生成消费索引,仅提交后才对消费者可见;
  3. 回查兜底:Broker 主动回查生产者,解决「生产者宕机 / 网络异常导致状态丢失」的问题。
半消息是 RocketMQ 事务消息的「灵魂」,通过这种设计,实现了「本地事务执行」与「消息投递」的解耦,最终保证分布式事务的最终一致性。
阅读剩余
THE END