知识模块
☕ Java 知识模块
九、分布式系统
RabbitMQ 消息队列

RabbitMQ 消息队列

什么是消息队列?

消息队列(MQ)是一种异步通信机制,用于应用解耦、流量削峰、异步处理。

生产者 → 消息队列 → 消费者

优点:
1. 解耦:生产者和消费者不需要直接交互
2. 异步:生产者发送消息后立即返回
3. 削峰:高峰期消息堆积,消费者按能力处理

RabbitMQ 核心概念

概念说明
Producer消息生产者
Consumer消息消费者
Exchange交换机,接收消息并路由到队列
Queue队列,存储消息
Binding绑定,Exchange 和 Queue 的关系
Routing Key路由键,消息路由规则
Virtual Host虚拟主机,隔离不同环境

消息流转

Producer

Exchange(交换机)
    ↓ 根据路由规则
Queue(队列)

Consumer(消费者)

交换机类型

类型说明路由规则
Direct直连Routing Key 精确匹配
Topic主题Routing Key 模式匹配
Fanout扇出广播到所有绑定队列
Headers头部根据消息头匹配

Direct Exchange

// 生产者
rabbitTemplate.convertAndSend(
    "direct.exchange",  // 交换机
    "order.created",    // routing key
    message             // 消息
);
 
// 消费者
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("order.queue"),
    exchange = @Exchange(value = "direct.exchange", type = "direct"),
    key = "order.created"
))
public void handleOrder(String message) {
    System.out.println("收到消息: " + message);
}

Topic Exchange

// 生产者
rabbitTemplate.convertAndSend("topic.exchange", "order.created", message);
rabbitTemplate.convertAndSend("topic.exchange", "order.updated", message);
rabbitTemplate.convertAndSend("topic.exchange", "user.created", message);
 
// 消费者1:监听所有 order 事件
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("order.queue"),
    exchange = @Exchange(value = "topic.exchange", type = "topic"),
    key = "order.*"
))
public void handleOrder(String message) { }
 
// 消费者2:监听所有事件
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("all.queue"),
    exchange = @Exchange(value = "topic.exchange", type = "topic"),
    key = "#"
))
public void handleAll(String message) { }

Fanout Exchange

// 生产者:广播到所有队列
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
 
// 消费者1
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("queue1"),
    exchange = @Exchange(value = "fanout.exchange", type = "fanout")
))
public void handle1(String message) { }
 
// 消费者2
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("queue2"),
    exchange = @Exchange(value = "fanout.exchange", type = "fanout")
))
public void handle2(String message) { }

消息可靠性

1. 生产者确认

@Configuration
public class RabbitConfig {
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // 消息到达 Exchange 的回调
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息到达 Exchange");
            } else {
                System.out.println("消息未到达 Exchange: " + cause);
            }
        });
        
        // 消息从 Exchange 到 Queue 失败的回调
        template.setReturnsCallback(returned -> {
            System.out.println("消息路由失败: " + returned.getMessage());
        });
        
        return template;
    }
}
 
// 配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

2. 消费者确认

// 手动确认
@RabbitListener(queues = "order.queue")
public void handleOrder(String message, Channel channel, 
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        // 处理业务
        processOrder(message);
        
        // 确认消息
        channel.basicAck(tag, false);
    } catch (Exception e) {
        // 拒绝消息,重新入队
        channel.basicNack(tag, false, true);
    }
}
 
// 配置
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认

3. 消息持久化

// 队列持久化
@Bean
public Queue durableQueue() {
    return QueueBuilder.durable("order.queue").build();
}
 
// Exchange 持久化
@Bean
public Exchange durableExchange() {
    return ExchangeBuilder.directExchange("order.exchange").durable(true).build();
}
 
// 消息持久化
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return msg;
});

死信队列

// 定义死信队列
@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("dlq.order").build();
}
 
// 定义业务队列,绑定死信队列
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order.queue")
        .deadLetterExchange("dlq.exchange")    // 死信交换机
        .deadLetterRoutingKey("dlq.order")     // 死信路由键
        .ttl(30000)                            // 过期时间 30秒
        .build();
}
 
// 消费死信队列
@RabbitListener(queues = "dlq.order")
public void handleDeadLetter(String message) {
    System.out.println("处理死信: " + message);
}

死信触发条件

  1. 消息被拒绝(basicNack/basicReject)且 requeue=false
  2. 消息过期(TTL)
  3. 队列达到最大长度

延迟队列

// 使用 RabbitMQ 延迟插件
rabbitTemplate.convertAndSend(
    "delay.exchange",
    "delay.routing.key",
    message,
    msg -> {
        msg.getMessageProperties().setDelay(5000); // 延迟 5 秒
        return msg;
    }
);

消息幂等性

@Service
public class OrderConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(String message, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // 解析消息
        OrderMessage msg = JSON.parseObject(message, OrderMessage.class);
        
        // 幂等性检查
        String key = "order:processed:" + msg.getOrderId();
        Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", 1, TimeUnit.DAYS);
        
        if (!Boolean.TRUE.equals(success)) {
            // 已处理过,直接确认
            channel.basicAck(tag, false);
            return;
        }
        
        try {
            // 处理业务
            processOrder(msg);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 删除幂等标记
            redisTemplate.delete(key);
            channel.basicNack(tag, false, true);
        }
    }
}

面试高频问题

Q1: RabbitMQ 如何保证消息不丢失?

  1. 生产者确认:Confirm Callback
  2. Exchange/Queue 持久化:durable=true
  3. 消息持久化:deliveryMode=PERSISTENT
  4. 消费者手动确认:basicAck

Q2: 如何保证消息顺序性?

单个队列 + 单个消费者

Q3: RabbitMQ 和 Kafka 的区别?

对比RabbitMQKafka
定位消息代理分布式流平台
吞吐量
延迟
消息存储内存/磁盘磁盘
适用场景业务系统大数据、日志

总结

RabbitMQ 核心要点:
1. Exchange 类型:Direct、Topic、Fanout、Headers
2. 可靠性:生产者确认、消费者确认、持久化
3. 死信队列:处理失败消息
4. 幂等性:Redis 记录已处理消息