知识模块
☕ Java 知识模块
十、中间件
RabbitMQ 可靠性

RabbitMQ 消息可靠性

消息队列的可靠性是分布式系统的重要保障,RabbitMQ 提供了多种机制确保消息不丢失。

一、消息丢失场景分析

生产者 ──→ Exchange ──→ Queue ──→ 消费者
  │          │           │         │
  ├─ 发送失败 ├─ 路由失败 ├─ 持久化失败 ├─ 消费失败
  │          │           │         │
  └── 发送确认 └── 路由确认 └── 持久化  └── 手动确认

消息可能在以下环节丢失:

  1. 生产者到 Broker:网络问题、Broker 宕机
  2. Broker 内部:Exchange 路由失败、Queue 未持久化
  3. Broker 到消费者:消费者宕机、处理异常

二、生产者可靠性

1. 发送方确认(Publisher Confirm)

// 开启确认模式
channel.confirmSelect();
 
// 同步确认
channel.basicPublish(...);
channel.waitForConfirms();  // 阻塞等待确认
 
// 异步确认(推荐)
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        // 确认成功
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        // 确认失败,重发
    }
});

2. 事务机制(不推荐)

try {
    channel.txSelect();  // 开启事务
    channel.basicPublish(...);
    channel.txCommit();  // 提交事务
} catch (Exception e) {
    channel.txRollback();  // 回滚事务
}

注意:事务会严重影响性能,生产环境推荐使用 Confirm 模式。

3. 批量确认

// 批量发送后确认
for (int i = 0; i < 100; i++) {
    channel.basicPublish(...);
}
channel.waitForConfirms();  // 批量确认

三、Broker 可靠性

1. 消息持久化

// 队列持久化
channel.queueDeclare(
    "queue-name",
    true,   // durable: 是否持久化
    false,  // exclusive: 是否独占
    false,  // autoDelete: 是否自动删除
    null
);
 
// 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 2 表示持久化
    .build();
channel.basicPublish("", "queue-name", props, message.getBytes());

2. Exchange 持久化

channel.exchangeDeclare(
    "exchange-name",
    "direct",
    true    // durable: 是否持久化
);

3. 集群高可用

        ┌──────────────┐
        │   HA Proxy   │  负载均衡
        └──────┬───────┘

    ┌──────────┼──────────┐
    ↓          ↓          ↓
┌────────┐ ┌────────┐ ┌────────┐
│ Node1  │ │ Node2  │ │ Node3  │
│ Master │ │ Slave  │ │ Slave  │
└────────┘ └────────┘ └────────┘
   同步镜像队列数据

镜像队列配置

# 管理界面策略配置
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

四、消费者可靠性

1. 手动确认(Manual Ack)

// 关闭自动确认
boolean autoAck = false;
channel.basicConsume("queue-name", autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息
            doWork(body);
            
            // 手动确认
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败,拒绝消息
            // requeue = true: 重新入队
            // requeue = false: 丢弃或进入死信队列
            channel.basicNack(envelope.getDeliveryTag(), false, false);
        }
    }
});

2. 确认模式对比

模式说明可靠性性能
自动确认消息投递后立即删除
手动确认消费者处理后确认
批量确认批量处理批量确认较高

3. 预取数量(QoS)

// 限制未确认消息数量
channel.basicQos(10);  // 最多10条未确认消息

防止消费者预取过多消息导致内存溢出或处理不过来。

五、死信队列(DLX)

1. 死信产生条件

  • 消息被拒绝(basicNack/basicReject)且 requeue=false
  • 消息过期(TTL)
  • 队列达到最大长度

2. 配置死信队列

// 死信交换机
channel.exchangeDeclare("dlx.exchange", "direct");
 
// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.key");
 
channel.queueDeclare("business.queue", true, false, false, args);
 
// 死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");

3. 死信处理流程

业务队列

   ├─ 消息过期 ──→ 死信交换机 ──→ 死信队列
   ├─ 被拒绝   ──→
   └─ 队列满   ──→

                     后续处理
                     ├── 重试
                     ├── 告警
                     └── 人工处理

六、消息幂等性

1. 问题场景

消费者处理消息 ──→ 确认失败(网络断开)


              消息重新投递 ──→ 重复消费

2. 解决方案

方案一:唯一 ID + 数据库去重

public void consume(Message message) {
    String messageId = message.getMessageId();
    
    // 检查是否已处理
    if (processedMessageDao.exists(messageId)) {
        return;  // 已处理,直接确认
    }
    
    // 处理消息(事务)
    transactionTemplate.execute(status -> {
        doWork(message);
        processedMessageDao.save(messageId);
        return null;
    });
}

方案二:Redis 去重

public void consume(Message message) {
    String messageId = message.getMessageId();
    String key = "msg:processed:" + messageId;
    
    // SETNX 原子操作
    if (!redis.setNX(key, "1", 24 * 3600)) {
        return;  // 已处理
    }
    
    doWork(message);
}

方案三:业务幂等设计

// 订单状态机
public void updateOrderStatus(OrderMessage msg) {
    // UPDATE 只有当前状态符合条件才会更新
    int rows = orderMapper.updateStatus(
        msg.getOrderId(),
        "PAID",      // 目标状态
        "CREATED"    // 当前状态必须是 CREATED
    );
    
    if (rows == 0) {
        // 状态已更新,幂等返回
        return;
    }
}

七、消息顺序性

1. 问题场景

队列: [msg1, msg2, msg3]
消费者A: 处理 msg1 (慢)
消费者B: 处理 msg2 (快) ──→ msg2 先完成
结果: msg2 在 msg1 之前处理完成

2. 解决方案

方案一:单消费者

// 限制只有一个消费者
channel.basicQos(1);  // 每次只取一条

方案二:消息分组

同一订单的消息路由到同一队列

Exchange

   ├── 订单1 ──→ Queue-1 ──→ Consumer-1
   ├── 订单2 ──→ Queue-2 ──→ Consumer-2
   └── 订单3 ──→ Queue-3 ──→ Consumer-3

八、面试高频问题

Q1: 如何保证消息不丢失?

生产者 ──→ 发送确认(Confirm 模式)

Broker  ──→ 持久化(Exchange + Queue + Message)

消费者 ──→ 手动确认(basicAck)

Q2: RabbitMQ 如何实现延迟消息?

方案一:TTL + 死信队列

消息 ──→ 延迟队列(TTL)──→ 死信交换机 ──→ 业务队列

方案二:延迟插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Q3: 大量消息堆积怎么处理?

  1. 增加消费者数量
  2. 临时扩容处理
  3. 持久化后重启
  4. 监控和告警机制

Q4: Confirm 模式 vs 事务模式?

对比项Confirm 模式事务模式
性能低(同步阻塞)
可靠性
异步支持支持不支持
推荐场景生产环境特殊需求

Q5: 如何设计消息重试机制?

// 消息头记录重试次数
public void consume(Message message) {
    int retryCount = getRetryCount(message);
    
    try {
        doWork(message);
        channel.basicAck(...);
    } catch (Exception e) {
        if (retryCount < MAX_RETRY) {
            // 重试:重新入队
            channel.basicNack(..., true);
        } else {
            // 超过重试次数:进入死信队列
            channel.basicNack(..., false);
        }
    }
}

九、最佳实践

1. 生产者配置清单

// 1. 开启 Confirm 模式
channel.confirmSelect();
 
// 2. 异步确认
channel.addConfirmListener(...);
 
// 3. 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)
    .messageId(UUID.randomUUID().toString())  // 幂等ID
    .build();
 
// 4. 发送超时和重试

2. 消费者配置清单

// 1. 手动确认
boolean autoAck = false;
 
// 2. 预取限制
channel.basicQos(10);
 
// 3. 幂等处理
// 4. 异常处理和重试
// 5. 死信队列兜底

3. 监控指标

  • 队列深度
  • 消费速率
  • 确认延迟
  • 死信队列消息数

更新时间:2026年3月16日