MQ(消息队列)基础配置

MQ(消息队列)基础配置主要围绕连接、服务端、生产者、消费者、可靠性、性能五大核心维度展开,下面以RabbitMQ、Kafka、RocketMQ三大主流 MQ 为例,给出可直接落地的基础配置说明。

一、通用基础配置(三大 MQ 共性)

1. 连接配置(必配)

所有 MQ 客户端第一步都是配置服务地址、端口、认证信息,建立连接。
  • RabbitMQ(默认端口 5672)
    yaml
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtual-host: /  # 虚拟主机,隔离不同业务
    
  • Kafka(默认端口 9092)
    yaml
    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092  # 集群地址用逗号分隔
    
  • RocketMQ(默认端口 9876/10911)
    yaml
    rocketmq:
      name-server: 127.0.0.1:9876  # NameServer地址
    

2. 消息模型与核心概念配置

  • RabbitMQ:基于Exchange+Queue模型,需配置交换机类型、队列、绑定关系
    • 交换机类型:Direct(精准路由)、Topic(通配符)、Fanout(广播)、Headers(头匹配)。
    • 队列属性:持久化(durable)、自动删除(autoDelete)、排他(exclusive)。
  • Kafka:基于Topic+Partition模型,需配置Topic 名称、分区数、副本数
    • 分区数:建议 =CPU 核心数,提升并发消费能力。
    • 副本数:生产环境建议≥2,保障高可用。
  • RocketMQ:基于Topic+MessageQueue模型,需配置Topic、消费组、Broker 角色
    • 消费组:同一组内消费者负载均衡,不同组广播消费。
    • Broker 角色:ASYNC_MASTER(异步主)、SYNC_MASTER(同步主)、SLAVE(从)。

二、RabbitMQ 基础配置详解

1. 服务端核心配置(rabbitmq.conf)

ini
# 内存阈值(超过40%阻塞生产者)
vm_memory_high_watermark.relative = 0.4
# 磁盘阈值(低于500MB阻塞生产者)
disk_free_limit.relative = 0.5
# 消息持久化(队列+消息都持久化才生效)
queue.durable = true
# 预取数(消费者一次拉取消息数,避免堆积)
prefetch_count = 10

2. 生产者配置(Spring Boot)

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启发布确认
    publisher-returns: true             # 开启消息返回(路由失败回调)
    template:
      mandatory: true                   # 路由失败返回生产者

3. 消费者配置(Spring Boot)

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动ACK(保障消息不丢失)
        prefetch: 10              # 预取数,和服务端一致
        concurrency: 5            # 消费者并发数
        max-concurrency: 10       # 最大并发数

三、Kafka 基础配置详解

1. 服务端核心配置(server.properties)

ini
# 监听地址
listeners=PLAINTEXT://:9092
# 分区数(默认1,建议调大)
num.partitions=8
# 副本数(默认1,生产≥2)
default.replication.factor=2
# 日志保留时间(7天)
log.retention.hours=168
# 日志刷盘策略(异步刷盘提升性能)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 禁止不干净leader选举(避免数据丢失)
unclean.leader.election.enable=false

2. 生产者配置(Spring Boot)

yaml
spring:
  kafka:
    producer:
      acks: 1  # 0(不确认)、1(leader确认)、all(所有副本确认)
      retries: 3  # 重试次数
      batch-size: 16384  # 批量发送大小(16KB)
      buffer-memory: 33554432  # 缓冲区大小(32MB)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 消费者配置(Spring Boot)

yaml
spring:
  kafka:
    consumer:
      group-id: demo-group  # 消费组ID
      auto-offset-reset: earliest  # 从头消费(latest:最新)
      enable-auto-commit: false  # 关闭自动提交(手动提交更可靠)
      max-poll-records: 50  # 一次拉取最大消息数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual_immediate  # 手动立即提交

四、RocketMQ 基础配置详解

1. 服务端核心配置(broker.conf)

ini
# NameServer地址
namesrvAddr=127.0.0.1:9876
# Broker角色(异步主)
brokerRole=ASYNC_MASTER
# 刷盘策略(异步刷盘,SYNC_FLUSH为同步)
flushDiskType=ASYNC_FLUSH
# 消息保留时间(48小时)
fileReservedTime=48
# 自动创建Topic(生产建议关闭)
autoCreateTopicEnable=false
# 默认队列数(4个)
defaultTopicQueueNums=4

2. 生产者配置(Spring Boot)

yaml
rocketmq:
  producer:
    group: demo-producer-group  # 生产者组
    send-message-timeout: 3000  # 发送超时(3秒)
    retry-times-when-send-failed: 3  # 重试次数
    retry-times-when-send-async-failed: 3  # 异步重试次数

3. 消费者配置(Spring Boot)

yaml
rocketmq:
  consumer:
    group: demo-consumer-group  # 消费组
    message-model: CLUSTERING  # 集群模式(广播:BROADCASTING)
    consume-from-where: CONSUME_FROM_LAST_OFFSET  # 从最新开始消费
    max-reconsume-times: 5  # 最大重试次数(超过进入死信队列)
    pull-batch-size: 32  # 批量拉取大小

五、可靠性与性能基础配置(通用)

1. 消息可靠性(不丢失、不重复)

  • 持久化:所有 MQ 都需配置队列 / Topic 持久化 + 消息持久化,避免服务重启丢失消息。
  • 确认机制
    • RabbitMQ:生产者publisher-confirm,消费者手动 ACK
    • Kafka:生产者acks=all,消费者手动提交 offset
    • RocketMQ:生产者sendMessageSync(同步发送),消费者手动 ACK
  • 死信队列:配置消息重试次数,失败消息进入死信队列,人工处理。

2. 性能优化(高吞吐、低延迟)

  • 批量操作:生产者批量发送,消费者批量拉取 / 批量 ACK。
  • 并发配置:增加消费者并发数、Kafka 分区数、RocketMQ 队列数。
  • 压缩消息:Kafka 开启compression.type=lz4/gzip,减少网络传输。
  • 异步发送:生产者使用异步发送,避免阻塞主线程。

六、配置对比速览

表格
配置项 RabbitMQ Kafka RocketMQ
默认端口 5672 9092 9876(NameServer)/10911(Broker)
核心模型 Exchange+Queue Topic+Partition Topic+MessageQueue
持久化 队列 + 消息持久化 日志文件持久化 CommitLog+ConsumeQueue 持久化
确认机制 发布确认 + 手动 ACK acks + 手动提交 offset 同步发送 + 手动 ACK
并发优化 prefetch + 多消费者 分区数 + 多消费者 队列数 + 多消费者
死信队列 需手动配置 无原生支持(需自定义) 默认开启

七、配置最佳实践

  1. 生产环境禁用自动创建 Topic/Queue,避免资源混乱。
  2. 消息大小:Kafka/RocketMQ 建议≤1MB,RabbitMQ≤64MB,大消息拆分或用文件存储 + 消息引用。
  3. 消费幂等:所有 MQ 都需实现消费幂等(如消息 ID 去重),避免重复消费。
  4. 监控配置:开启 MQ 监控(如 RabbitMQ Management、Kafka JMX、RocketMQ Dashboard),实时查看堆积、延迟、吞吐量。
上一篇 ICS是什么文件格式?. ICS文件怎么打开?
下一篇 AP双链路技术白皮书-6W100