MQ(Message Queue,消息队列)是分布式系统中实现异步通信、解耦、削峰填谷的核心中间件,主流产品包括 RabbitMQ、Kafka、RocketMQ、ActiveMQ 等,不同产品适配不同场景,但核心应用逻辑一致。以下是 MQ 最核心的应用场景、落地实操及选型建议:
传统同步调用中,A 系统调用 B 系统后需等待 B 响应才能继续,若 B 耗时 / 故障,会导致 A 阻塞。通过 MQ 实现异步通信:A 发送消息到 MQ 后立即返回,B 从 MQ 消费消息异步处理,彻底解耦上下游。
- 用户注册:注册成功后,同步返回结果,异步触发「发送验证码、创建积分账户、推送欢迎消息」等操作;
- 订单创建:订单提交后,异步触发「库存扣减、物流创建、短信通知」。
@Configuration
public class RabbitConfig {
@Bean
public Queue orderQueue() {
return new Queue("order.create.queue", true);
}
}
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public String createOrder(Order order) {
orderMapper.insert(order);
rabbitTemplate.convertAndSend("order.create.queue", order.getId());
return "订单创建成功,订单号:" + order.getId();
}
}
@Service
public class StockConsumer {
@RabbitListener(queues = "order.create.queue")
public void deductStock(String orderId) {
OrderItem item = orderItemMapper.selectByOrderId(orderId);
stockMapper.deduct(item.getProductId(), item.getNum());
log.info("订单{}库存扣减完成", orderId);
}
}
- 解耦:上下游系统无需感知对方存在,只需约定消息格式;
- 提升响应速度:核心流程(如订单创建)无需等待非核心流程(如短信)完成;
- 容错:消费端故障时,消息暂存 MQ,恢复后自动消费,不丢失数据。
秒杀、大促等场景下,请求量瞬间激增(如 10 万 QPS),直接打向后端会导致数据库 / 应用崩溃。通过 MQ 承接突发流量,后端按自身处理能力(如 2 万 QPS)消费消息,实现「削峰填谷」。
- 电商秒杀:用户下单请求先写入 MQ,库存系统匀速消费;
- 日志采集:应用产生的海量日志先写入 MQ,分析系统按需消费;
- 接口限流:第三方接口(如支付)有 QPS 限制,通过 MQ 控制调用频率。
Kafka 因高吞吐特性,是削峰场景的首选,核心配置需控制消费速率:
@Service
public class SeckillProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public String seckill(String userId, String productId) {
if (seckillMapper.checkUser(userId, productId)) {
return "您已参与秒杀";
}
String message = userId + "," + productId;
kafkaTemplate.send("seckill.topic", message);
return "秒杀请求已提交,请等待结果";
}
}
@Service
public class SeckillConsumer {
@Value("${kafka.consumer.threads:5}")
private int consumerThreads;
@Value("${kafka.consumer.rate:100}")
private int consumeRate;
@KafkaListener(topics = "seckill.topic", concurrency = "${kafka.consumer.threads}")
public void consume(String message) throws InterruptedException {
Thread.sleep(1000 / consumeRate);
String[] params = message.split(",");
seckillService.process(params[0], params[1]);
}
}
broker.id=1
listeners=PLAINTEXT://:9092
log.dirs=/usr/local/kafka/logs
num.partitions=8 # 分区数决定并行消费能力(越多吞吐越高)
log.retention.hours=24 # 消息保留 24 小时(应对消费延迟)
- 抗突发流量:MQ 承接峰值请求,避免后端服务被压垮;
- 资源利用最大化:后端按最优性能消费,避免资源闲置 / 过载;
- 流量可控:可动态调整消费速率,适配后端处理能力。
分布式系统中,多系统(如 Java、Python、PHP 异构系统)需通信,直接调用易导致「牵一发而动全身」。通过 MQ 作为统一消息通道,各系统只需按格式生产 / 消费消息,无需耦合对方接口。
- 电商中台:订单系统(Java)、物流系统(Python)、财务系统(PHP)通过 MQ 同步数据;
- 微服务通信:替代 RESTful 同步调用,实现微服务解耦;
- 跨机房数据同步:异地机房通过 MQ 异步同步核心数据。
{
"msgId": "20251229001",
"msgType": "ORDER_PAY",
"data": {
"orderId": "O20251229001",
"payAmount": 99.00,
"payTime": "2025-12-29 10:00:00"
},
"timestamp": 1735567200000
}
@Service
public class PayProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendPayMessage(PayDTO payDTO) {
Map<String, Object> msg = new HashMap<>();
msg.put("msgId", UUID.randomUUID().toString());
msg.put("msgType", "ORDER_PAY");
msg.put("data", payDTO);
msg.put("timestamp", System.currentTimeMillis());
rabbitTemplate.convertAndSend("order.exchange", "order.pay", msg);
}
}
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.100'))
channel = connection.channel()
channel.exchange_declare(exchange='order.exchange', exchange_type='direct')
channel.queue_declare(queue='finance.queue', durable=True)
channel.queue_bind(exchange='order.exchange', queue='finance.queue', routing_key='order.pay')
def callback(ch, method, properties, body):
msg = json.loads(body)
order_id = msg['data']['orderId']
pay_amount = msg['data']['payAmount']
print(f"财务系统:订单{order_id}支付{pay_amount}元,生成账单")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='finance.queue', on_message_callback=callback)
channel.start_consuming()
- 异构系统兼容:MQ 支持多语言客户端(Java/Python/PHP/Go),无需适配不同接口;
- 扩展性强:新增系统只需消费对应消息,无需修改原有系统;
- 降低耦合:上游系统无需关注下游有哪些系统,只需发送消息到 MQ。
分布式系统中,跨库 / 跨服务事务无法通过数据库事务保证一致性,通过 MQ 实现「最终一致性」:核心操作成功后,发送消息触发其他操作,失败则重试,最终所有操作达成一致。
- 订单支付:支付成功后,异步更新订单状态、扣减库存、生成物流单;
- 资金转账:A 账户扣款成功后,异步给 B 账户加款;
- 数据同步:主库数据变更后,异步同步到从库 / ES/Redis。
RocketMQ 支持「事务消息」,解决「消息发送成功但业务回滚」或「业务成功但消息发送失败」的问题:
@Service
public class TransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void transfer(String fromAccount, String toAccount, BigDecimal amount) {
Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(new TransferDTO(fromAccount, toAccount, amount)))
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
rocketMQTemplate.sendMessageInTransaction("transfer.topic", message, null);
}
@RocketMQTransactionListener
class TransferTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
try {
accountMapper.deduct(dto.getFromAccount(), dto.getAmount());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
TransferDTO dto = JSON.parseObject(msg.getPayload().toString(), TransferDTO.class);
boolean success = accountMapper.checkDeduct(dto.getFromAccount(), dto.getAmount());
return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
}
@Service
public class TransferConsumer {
@RocketMQMessageListener(topic = "transfer.topic", consumerGroup = "transfer-group")
public class TransferListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
TransferDTO dto = JSON.parseObject(msg, TransferDTO.class);
int retryCount = 3;
while (retryCount > 0) {
try {
accountMapper.add(dto.getToAccount(), dto.getAmount());
log.info("B账户{}加款{}元成功", dto.getToAccount(), dto.getAmount());
return;
} catch (Exception e) {
retryCount--;
log.error("加款失败,剩余重试次数{}", retryCount);
try { Thread.sleep(1000); } catch (InterruptedException ex) {}
}
}
log.error("B账户{}加款{}元失败,进入死信队列", dto.getToAccount(), dto.getAmount());
}
}
}
- 保证最终一致性:即使中间步骤失败,通过重试最终达成数据一致;
- 避免分布式事务阻塞:无需锁定多个资源,异步执行;
- 容错性强:某一步骤故障时,消息暂存,恢复后继续执行。
不同 MQ 产品特性差异大,需根据业务场景选择:
- 生产者:开启消息确认(如 RabbitMQ 的
publisher-confirms),确保消息发送到 MQ;
- 消费者:开启手动 Ack,消费成功后再确认,避免消息丢失;
- MQ 端:开启队列 / 消息持久化,避免 MQ 宕机丢失消息。
消费端需保证「重复消费消息不影响结果」,常用方案:
- 基于唯一消息 ID 去重;
- 业务逻辑加幂等校验(如订单状态已更新则跳过)。
消费失败的消息(如重试多次仍失败)进入死信队列,避免阻塞正常消息,后续人工介入处理。
- 监控指标:消息堆积数、消费速率、生产速率、延迟时间;
- 告警阈值:堆积数超过 1 万、消费延迟超过 5 分钟触发告警。
MQ 的核心价值是 **「异步、解耦、削峰、最终一致」**,掌握以下核心原则即可覆盖绝大多数业务场景:
- 同步改异步:非核心流程通过 MQ 异步执行,提升响应速度;
- 峰值削平:突发流量通过 MQ 缓冲,后端匀速消费;
- 系统解耦:异构系统 / 微服务通过 MQ 通信,降低耦合;
- 最终一致:跨服务事务通过 MQ 重试机制保证最终数据一致。
不同 MQ 产品只需适配语法,核心应用逻辑通用,选型时优先匹配业务场景(如高吞吐选 Kafka,事务消息选 RocketMQ,轻量场景选 RabbitMQ)。