Skip to content

消息队列 Message Queue

概念

消息队列(Message Queue)是分布式系统中实现服务解耦、异步处理和削峰填谷的核心中间件。

为什么需要消息队列?

1. 服务解耦 (Decoupling)

没有消息队列时,上游服务需要知道所有下游服务的接口并逐一调用:

订单服务 ──► 库存服务

   ├──► 积分服务

   ├──► 通知服务

   └──► 物流服务     ← 新增一个下游就要修改订单服务代码

引入消息队列后,上游只需发送一条消息,下游按需订阅:

订单服务 ──► MQ ──► 库存服务
              ├──► 积分服务
              ├──► 通知服务
              └──► 物流服务     ← 新增下游无需修改订单服务

2. 异步处理 (Async Processing)

同步调用:用户下单 → 扣库存(50ms)→ 发积分(50ms)→ 发短信(100ms) = 200ms

异步消息:用户下单 → 扣库存(50ms)→ 发消息到 MQ(5ms) = 55ms 积分、短信等由消费者异步处理,用户无需等待。

3. 削峰填谷 (Peak Shaving)

请求量

  │    ┌──────┐
  │    │ 峰值  │
  │    │10000 │        消息队列作为缓冲
  │    │ QPS  │        ┌─────────────────────
  │    │      │        │ 消费者以固定速率处理
  │────┘      └────────│ 如 2000 QPS
  │                    └─────────────────────
  └───────────────────────────────────────► 时间

秒杀场景下,瞬时请求写入 MQ,消费者以稳定速率处理,避免数据库被瞬时流量压垮。

核心概念

概念说明
Producer(生产者)消息的发送方,负责将消息发送到 Broker
Consumer(消费者)消息的接收方,负责从 Broker 拉取或接收消息并处理
Broker(代理/服务端)消息队列服务器,负责存储和转发消息
Topic(主题)消息的逻辑分类,Producer 将消息发送到指定 Topic
Partition(分区)Topic 的物理分片,用于并行处理和水平扩展
Consumer Group(消费者组)一组消费者共同消费一个 Topic,每条消息只被组内一个消费者处理
Offset(偏移量)消息在分区中的位置标识,消费者通过 Offset 追踪消费进度

消息模型

点对点模型 (Point-to-Point)

每条消息只能被一个消费者消费,消费后即从队列中移除。

Producer ──► Queue ──► Consumer A(消费消息 1)
                  ──► Consumer B(消费消息 2)
                  ──► Consumer A(消费消息 3)

适用场景:任务分发、工单处理(每个任务只需处理一次)。

发布/订阅模型 (Pub/Sub)

每条消息可以被多个订阅者消费,每个订阅者独立接收完整的消息副本。

Producer ──► Topic ──► Consumer Group A(订单处理)
                  ├──► Consumer Group B(日志记录)
                  └──► Consumer Group C(数据分析)

Kafka 采用的就是 Pub/Sub 模型,通过 Consumer Group 实现灵活的消费模式:

  • 不同 Group 订阅同一 Topic → 每个 Group 都能收到全量消息(广播)
  • 同一 Group 内的多个 Consumer → 分担消费(负载均衡)

核心原理

消息投递语义

语义说明可能的问题实现难度
At-most-once消息最多投递一次,可能丢失消息丢失
At-least-once消息至少投递一次,不会丢失但可能重复消息重复
Exactly-once消息恰好投递一次,不丢不重

At-most-once(最多一次)

Producer 发送消息后不等待确认,Consumer 收到消息后先提交 Offset 再处理。如果处理失败,消息不会重新投递。

Producer ──发送──► Broker(不等 ACK)
Consumer ──► 提交 Offset ──► 处理消息(失败则消息丢失)

适用场景:日志采集、监控指标上报(允许少量数据丢失)。

At-least-once(至少一次)

Producer 发送消息后等待 Broker 的 ACK 确认,失败则重试。Consumer 先处理消息再提交 Offset,如果提交 Offset 前崩溃,重启后会重新消费。

Producer ──发送──► Broker ──ACK──► Producer(确认收到)
Consumer ──► 处理消息 ──► 提交 Offset(处理完才提交)

适用场景:大多数业务场景(订单、支付),配合幂等性保证不重复处理。

Exactly-once(恰好一次)

最难实现的语义,通常通过幂等 Producer + 事务性消费来实现。Kafka 0.11+ 支持幂等 Producer 和事务(Transactional API),但有性能开销。

实际工程中,大多数系统采用 At-least-once + 业务幂等 的方案,兼顾可靠性和性能。

Kafka 架构详解

整体架构

┌─────────────────────────────────────────────────────────┐
│                     Kafka Cluster                        │
│                                                          │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐             │
│  │ Broker 0  │   │ Broker 1  │   │ Broker 2  │             │
│  │           │   │           │   │           │             │
│  │ Topic-A   │   │ Topic-A   │   │ Topic-A   │             │
│  │ P0(Leader)│   │ P1(Leader)│   │ P2(Leader)│             │
│  │ P1(Replica│   │ P2(Replica│   │ P0(Replica│             │
│  └──────────┘   └──────────┘   └──────────┘             │
│                                                          │
│  ┌─────────────────────────────────────────┐             │
│  │             ZooKeeper / KRaft            │             │
│  │   (集群元数据管理、Leader 选举)           │             │
│  └─────────────────────────────────────────┘             │
└─────────────────────────────────────────────────────────┘

Producer ──►  Broker(根据 Partition 策略路由)

Consumer Group A:
  Consumer 1 ← P0
  Consumer 2 ← P1
  Consumer 3 ← P2

Partition 与 Offset

每个 Partition 是一个有序的、不可变的消息序列,新消息追加到末尾。Offset 是消息在 Partition 中的唯一编号。

Partition 0:
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │  ← Offset
└───┴───┴───┴───┴───┴───┴───┴───┘

                     Consumer 当前消费位置

Partition 1:
┌───┬───┬───┬───┬───┐
│ 0 │ 1 │ 2 │ 3 │ 4 │  ← Offset
└───┴───┴───┴───┴───┘

         Consumer 当前消费位置

ISR(In-Sync Replicas)机制

ISR 是与 Leader 保持完全同步的副本集合。只有 ISR 中的副本才有资格参与 ACK 确认和 Leader 选举。

Leader (Broker 0)          Follower (Broker 1)        Follower (Broker 2)
      │                           │                           │
      │◄── 持续同步(已追上)────────│                           │
      │                           │◄── 落后太多(滞后)─────────│
      │                           │                           │
ISR = { Broker 0, Broker 1 }    ← Broker 2 已被踢出 ISR

acks 配置对可靠性和性能的影响:

acks 值含义丢消息风险吞吐量
acks=0Producer 不等待任何确认,即发即忘最高
acks=1Leader 写入本地日志即返回成功中(Leader 宕机未同步则丢失)
acks=all(或 -1等待所有 ISR 副本确认写入最低最低

min.insync.replicas 配置:指定 ISR 中至少需要多少个副本确认,写入才算成功。与 acks=all 配合使用:

# 典型高可靠配置
replication.factor=3         # 3 个副本
min.insync.replicas=2        # 至少 2 个 ISR 副本确认
acks=all                     # 等待所有 ISR

上述配置意味着:即使 1 个副本宕机,写入仍然成功;若 2 个副本宕机,写入会失败(抛出 NotEnoughReplicasException),从而防止数据丢失。

ISR 收缩时发生什么:

当 Follower 副本的消息同步滞后超过 replica.lag.time.max.ms(默认 30s),该副本会被踢出 ISR。此时:

  • 若 ISR 大小降至 min.insync.replicas 以下,且 acks=all,新的写请求会直接报错
  • Leader 会继续工作,等待落后副本追上后重新加入 ISR

unclean.leader.election.enable(默认 false):

  • false:只允许 ISR 中的副本成为新 Leader,避免数据丢失,但若 ISR 为空则分区不可用
  • true:允许落后的副本(不在 ISR 中)成为 Leader,牺牲一致性换取可用性,生产环境强烈建议保持 false

Consumer Group Rebalance

当 Consumer Group 中的成员数量发生变化时(消费者加入或离开),Kafka 会触发 Rebalance(重平衡),重新分配 Partition 与 Consumer 的绑定关系。

Rebalance 前:                    Rebalance 后(Consumer 3 离开):
┌──────────┐                     ┌──────────┐
│Consumer 1│← P0                 │Consumer 1│← P0, P2
│Consumer 2│← P1                 │Consumer 2│← P1
│Consumer 3│← P2  (下线)         └──────────┘
└──────────┘

Rebalance 的缺点:

  • 重平衡期间整个 Consumer Group 暂停消费(Stop The World)
  • 频繁 Rebalance 会影响吞吐量
  • 可能导致消息重复消费(Offset 还未提交就发生了 Rebalance)

优化方案:

  • 增大 session.timeout.msheartbeat.interval.ms,减少误判消费者离线
  • 使用 Kafka 2.3+ 的 Cooperative Sticky Assignor,支持增量 Rebalance,减少影响范围

消息顺序性

Kafka 保证同一 Partition 内的消息是有序的。要保证某类消息的顺序,需要将它们路由到同一个 Partition。

java
// 通过指定 key 保证同一订单的消息进入同一 Partition
producer.send(new ProducerRecord<>("order-topic", orderId, message));
// Kafka 默认按 key 的 hash 值 % partition 数来选择分区

全局有序要求 Topic 只设 1 个 Partition,但这意味着只能有 1 个 Consumer,完全丧失了并行消费的能力,吞吐量极低

实际工程中,大多数场景只需要业务维度的有序(如同一用户的操作有序),通过 Partition Key 即可实现,不需要全局有序。

死信队列与重试机制

当消息消费失败且达到最大重试次数后,将消息转移到死信队列,避免阻塞正常消息的消费。

正常 Topic ──► Consumer ──处理失败──► 重试队列(最多 3 次)

                                    重试仍失败

                                    死信队列 (DLQ)

                                    人工排查 / 告警
java
@KafkaListener(topics = "order-topic")
public void consume(ConsumerRecord<String, String> record) {
    int maxRetries = 3;
    int retryCount = getRetryCount(record); // 从 Header 获取重试次数

    try {
        processOrder(record.value());
    } catch (Exception e) {
        if (retryCount < maxRetries) {
            // 发送到重试 Topic,携带重试次数
            sendToRetryTopic(record, retryCount + 1);
        } else {
            // 超过最大重试次数,发送到死信队列
            sendToDeadLetterQueue(record);
            log.error("消息处理失败,已发送至 DLQ: {}", record.value());
        }
    }
}

重试策略建议:

  • 使用指数退避(Exponential Backoff):第 1 次重试等 1s,第 2 次等 4s,第 3 次等 16s
  • 设置合理的最大重试次数(通常 3~5 次)
  • 死信队列需要配套监控告警和人工处理机制

消息幂等性

由于 At-least-once 语义下消息可能重复投递,消费者必须保证幂等处理(处理多次和处理一次的结果相同)。

方案一:数据库唯一约束

java
// 利用数据库唯一索引防止重复插入
try {
    orderMapper.insert(order); // orderId 设置唯一索引
} catch (DuplicateKeyException e) {
    log.info("订单已存在,忽略重复消息: {}", order.getOrderId());
}

方案二:Redis 去重

java
public boolean processMessage(String messageId, String payload) {
    // SETNX: 如果 key 不存在则设置,返回 true;已存在则返回 false
    Boolean isNew = redis.opsForValue()
        .setIfAbsent("msg:dedup:" + messageId, "1", 24, TimeUnit.HOURS);

    if (Boolean.FALSE.equals(isNew)) {
        log.info("重复消息,跳过处理: {}", messageId);
        return false;
    }

    // 首次处理
    doProcess(payload);
    return true;
}

方案三:幂等性 Key(Idempotency Key)

Producer 为每条消息生成全局唯一的幂等 Key,Consumer 处理前检查是否已处理过。

java
// Producer 端
String idempotencyKey = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);
record.headers().add("idempotency-key", idempotencyKey.getBytes());

// Consumer 端
String idempotencyKey = new String(record.headers().lastHeader("idempotency-key").value());
if (idempotencyStore.exists(idempotencyKey)) {
    return; // 已处理,跳过
}
idempotencyStore.save(idempotencyKey);
processMessage(record.value());

RocketMQ 事务消息

RocketMQ 通过半消息(Half Message)机制实现分布式事务,保证本地事务与消息发送的原子性。

半消息流程:

Producer                    Broker                     Consumer
   │                           │                           │
   │──① 发送半消息─────────────►│                           │
   │                           │(存入内部 half topic,     │
   │                           │  消费者不可见)            │
   │◄─② 返回发送结果────────────│                           │
   │                           │                           │
   │──③ 执行本地事务            │                           │
   │   (本地 DB 操作)         │                           │
   │                           │                           │
   │──④ 发送 Commit/Rollback──►│                           │
   │   (成功→Commit,失败→     │                           │
   │     Rollback)            │                           │
   │                           │──⑤ Commit: 消息可见──────►│
   │                           │   Rollback: 消息删除      │
   │                           │                           │
   │         (若 Producer 崩溃,Broker 发起回查)          │
   │◄─⑥ Broker 回查本地事务状态─│                           │
   │──⑦ 返回事务状态────────────►│                           │

Java 代码示例(TransactionListener 实现):

java
// 1. 定义事务监听器
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderService orderService;

    // 执行本地事务(半消息发送成功后调用)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String orderId = new String(msg.getBody());
            // 执行本地数据库操作
            orderService.createOrder(orderId);
            return LocalTransactionState.COMMIT_MESSAGE; // 本地事务成功,提交消息
        } catch (Exception e) {
            log.error("本地事务执行失败", e);
            return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务失败,回滚消息
        }
    }

    // Broker 回查时调用(用于 Producer 崩溃恢复)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = new String(msg.getBody());
        // 查询本地事务是否已完成
        boolean exists = orderService.orderExists(orderId);
        return exists
            ? LocalTransactionState.COMMIT_MESSAGE
            : LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

// 2. 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");
producer.setTransactionListener(new OrderTransactionListener());
producer.start();

Message msg = new Message("order-topic", orderId.getBytes());
// sendMessageInTransaction 会自动走半消息流程
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

技术选型与对比

特性KafkaRabbitMQRocketMQ
开发语言Scala/JavaErlangJava
单机吞吐量百万级 TPS万级 TPS十万级 TPS
消息延迟ms 级us 级(微秒)ms 级
消息可靠性高(副本机制)高(镜像队列)高(同步刷盘)
消息顺序Partition 级别有序不保证Queue 级别有序
事务消息支持(Kafka Transactions)不支持原生事务支持(半消息机制)
延迟消息不原生支持支持(TTL + DLX)原生支持
消息回溯支持(按 Offset 回溯)不支持支持(按时间回溯)
社区生态大数据生态丰富插件丰富阿里巴巴开源
典型场景日志、大数据流处理、事件溯源企业级消息集成、复杂路由电商、金融交易

选型建议:

  • 大数据/日志流处理 → Kafka(高吞吐、持久化、与 Flink/Spark 生态集成好)
  • 企业应用/复杂路由 → RabbitMQ(灵活的路由机制、低延迟、协议标准化)
  • 电商/金融 → RocketMQ(事务消息、延迟消息、阿里巴巴大规模验证)

实战案例:延迟消息实现订单超时取消

场景: 用户下单后 30 分钟未付款,自动取消订单。

方案一:RocketMQ 延迟消息

RocketMQ 原生支持延迟消息,通过预设的延迟级别实现:

延迟级别(delayLevel):
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

                                              级别 16 = 30 分钟
java
Message msg = new Message("order-cancel-topic", orderId.getBytes());
msg.setDelayTimeLevel(16); // 第 16 级 = 30 分钟后投递
producer.send(msg);

// Consumer 收到消息时订单已经过了 30 分钟
@RocketMQMessageListener(topic = "order-cancel-topic", consumerGroup = "cancel-group")
public class OrderCancelConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String orderId) {
        orderService.cancelIfUnpaid(orderId); // 检查未付款则取消
    }
}

方案二:Redis ZSET

以超时时间戳作为 score,轮询工作线程定期扫描到期任务:

java
// 下单时将 orderId 放入 ZSET,score = 当前时间 + 30分钟
long expireAt = System.currentTimeMillis() + 30 * 60 * 1000;
redis.opsForZSet().add("order:timeout", orderId, expireAt);

// 定时任务(每隔 5s 扫描一次)
@Scheduled(fixedDelay = 5000)
public void scanTimeoutOrders() {
    long now = System.currentTimeMillis();
    // 取出 score <= now 的所有订单(已到期)
    Set<String> timeoutOrders = redis.opsForZSet()
        .rangeByScore("order:timeout", 0, now);
    for (String orderId : timeoutOrders) {
        orderService.cancelIfUnpaid(orderId);
        redis.opsForZSet().remove("order:timeout", orderId);
    }
}

方案三:Kafka + TTL

Kafka 不原生支持延迟消息,通常通过专门的延迟 Topic + 时间轮调度实现:

Producer ──► delay-30m-topic(带时间戳)

            延迟消费服务(检查消息时间戳,未到期则 sleep/重放)

            到期 ──► order-cancel-topic ──► Consumer 处理

三种方案对比:

对比维度RocketMQ 延迟消息Redis ZSETKafka + 延迟服务
实现复杂度低(原生支持)
精度固定级别(30m 等)秒级(取决于轮询间隔)自定义
可靠性高(持久化)中(依赖 Redis 持久化)高(Kafka 持久化)
扩展性受限于预设级别好(任意时间)好(任意时间)
适用场景电商标准延迟场景中小规模、已有 Redis 基础设施大数据量、已有 Kafka 基础设施

实际工程中,RocketMQ 延迟消息是最简单可靠的方案;若技术栈已有 Redis 且规模不大,Redis ZSET 也是轻量选择。

面试常问 & 怎么答

Q1: 如何保证消息不丢失?

从三个环节分别保障:

Producer 端:

  • 使用同步发送(producer.send().get())或带回调的异步发送
  • 设置 acks=all(等待所有 ISR 副本确认),确保消息写入多个副本
  • 配置合理的重试次数(retries)和重试间隔

Broker 端:

  • 设置 replication.factor >= 3(至少 3 个副本)
  • 设置 min.insync.replicas >= 2(至少 2 个副本确认写入)
  • 配合 acks=all,确保 ISR 中有足够副本同步后才返回成功
  • unclean.leader.election.enable=false,防止落后的副本成为 Leader 导致数据丢失

Consumer 端:

  • 关闭自动提交 Offset(enable.auto.commit=false
  • 消息处理成功后再手动提交 Offset
  • 配合幂等消费,防止重复处理

Q2: Kafka 如何保证消息顺序性?

Kafka 保证的是 Partition 级别的有序性,而非全局有序。

实现方式: 将需要保序的消息发送到同一 Partition,通过指定同一个 key(如 orderId、userId):

java
producer.send(new ProducerRecord<>("topic", orderId, message));

Kafka 默认使用 key 的 hash 值对 Partition 数取模来选择分区,同一 key 的消息一定进入同一 Partition。

注意事项:

  • 如果 Partition 数量发生变化(扩容),hash 映射会改变,历史消息的顺序关系可能被打破
  • 同一 Partition 只能被同一 Consumer Group 中的一个 Consumer 消费,天然保证顺序
  • 如果需要全局有序,只能设置 1 个 Partition,但吞吐量会大幅下降

Q3: 如何实现幂等消费?

由于网络抖动、Consumer Rebalance 等原因,消息可能被重复投递。消费者需要保证幂等性 —— 处理多次和处理一次的结果相同。

常用方案:

方案原理适用场景
数据库唯一约束利用唯一索引防止重复插入订单创建、用户注册
Redis 去重SETNX 记录已处理的 messageId通用场景,高性能
幂等 KeyProducer 生成唯一 Key,Consumer 处理前检查跨系统消息去重
状态机业务状态只能单向流转(如待支付→已支付),重复消费不影响订单状态变更

最佳实践: 采用 At-least-once 投递语义 + 业务侧幂等处理,是兼顾可靠性和性能的主流方案。

Q4: Kafka 为什么吞吐量高?

Kafka 的高吞吐量来自多个层面的设计:

技术点说明
顺序磁盘 I/O消息追加写入,顺序读写比随机读写快 2~3 个数量级
零拷贝(Zero Copy)使用 sendfile 系统调用,数据从磁盘直接到网卡,跳过用户态拷贝
批量发送与压缩Producer 将多条消息打包为一个批次(batch)发送,减少网络往返次数
Partition 并行多个 Partition 可并行读写,线性扩展吞吐量
Page Cache写入先到操作系统 Page Cache(内存),由 OS 异步刷盘,读取优先命中 Cache

面试回答时,把"顺序写 + 零拷贝 + 批量 + 分区并行 + Page Cache"五点串联起来,完整展示对 Kafka 底层的理解。

看到什么就先想到这类

关键词 / 场景第一反应
异步解耦、削峰填谷引入消息队列
高吞吐日志、事件流、大数据Kafka
事务消息、电商订单、金融RocketMQ 半消息机制
复杂路由、灵活协议、企业集成RabbitMQ
消息丢失问题acks=all + min.insync.replicas + 手动提交 Offset
消息重复问题ACK + 幂等消费(数据库唯一键 / Redis SETNX)
延迟任务、订单超时取消RocketMQ 延迟消息 / Redis ZSET
ISR 收缩、脑裂、数据一致性unclean.leader.election.enable=false

延伸阅读