RabbitMQ 消息队列
什么是消息队列?
消息队列(MQ)是一种异步通信机制,用于应用解耦、流量削峰、异步处理。
生产者 → 消息队列 → 消费者
优点:
1. 解耦:生产者和消费者不需要直接交互
2. 异步:生产者发送消息后立即返回
3. 削峰:高峰期消息堆积,消费者按能力处理RabbitMQ 核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Exchange | 交换机,接收消息并路由到队列 |
| Queue | 队列,存储消息 |
| Binding | 绑定,Exchange 和 Queue 的关系 |
| Routing Key | 路由键,消息路由规则 |
| Virtual Host | 虚拟主机,隔离不同环境 |
消息流转
Producer
↓
Exchange(交换机)
↓ 根据路由规则
Queue(队列)
↓
Consumer(消费者)交换机类型
| 类型 | 说明 | 路由规则 |
|---|---|---|
| Direct | 直连 | Routing Key 精确匹配 |
| Topic | 主题 | Routing Key 模式匹配 |
| Fanout | 扇出 | 广播到所有绑定队列 |
| Headers | 头部 | 根据消息头匹配 |
Direct Exchange
// 生产者
rabbitTemplate.convertAndSend(
"direct.exchange", // 交换机
"order.created", // routing key
message // 消息
);
// 消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue("order.queue"),
exchange = @Exchange(value = "direct.exchange", type = "direct"),
key = "order.created"
))
public void handleOrder(String message) {
System.out.println("收到消息: " + message);
}Topic Exchange
// 生产者
rabbitTemplate.convertAndSend("topic.exchange", "order.created", message);
rabbitTemplate.convertAndSend("topic.exchange", "order.updated", message);
rabbitTemplate.convertAndSend("topic.exchange", "user.created", message);
// 消费者1:监听所有 order 事件
@RabbitListener(bindings = @QueueBinding(
value = @Queue("order.queue"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = "order.*"
))
public void handleOrder(String message) { }
// 消费者2:监听所有事件
@RabbitListener(bindings = @QueueBinding(
value = @Queue("all.queue"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = "#"
))
public void handleAll(String message) { }Fanout Exchange
// 生产者:广播到所有队列
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
// 消费者1
@RabbitListener(bindings = @QueueBinding(
value = @Queue("queue1"),
exchange = @Exchange(value = "fanout.exchange", type = "fanout")
))
public void handle1(String message) { }
// 消费者2
@RabbitListener(bindings = @QueueBinding(
value = @Queue("queue2"),
exchange = @Exchange(value = "fanout.exchange", type = "fanout")
))
public void handle2(String message) { }消息可靠性
1. 生产者确认
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 消息到达 Exchange 的回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息到达 Exchange");
} else {
System.out.println("消息未到达 Exchange: " + cause);
}
});
// 消息从 Exchange 到 Queue 失败的回调
template.setReturnsCallback(returned -> {
System.out.println("消息路由失败: " + returned.getMessage());
});
return template;
}
}
// 配置
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true2. 消费者确认
// 手动确认
@RabbitListener(queues = "order.queue")
public void handleOrder(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理业务
processOrder(message);
// 确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(tag, false, true);
}
}
// 配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认3. 消息持久化
// 队列持久化
@Bean
public Queue durableQueue() {
return QueueBuilder.durable("order.queue").build();
}
// Exchange 持久化
@Bean
public Exchange durableExchange() {
return ExchangeBuilder.directExchange("order.exchange").durable(true).build();
}
// 消息持久化
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});死信队列
// 定义死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dlq.order").build();
}
// 定义业务队列,绑定死信队列
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.deadLetterExchange("dlq.exchange") // 死信交换机
.deadLetterRoutingKey("dlq.order") // 死信路由键
.ttl(30000) // 过期时间 30秒
.build();
}
// 消费死信队列
@RabbitListener(queues = "dlq.order")
public void handleDeadLetter(String message) {
System.out.println("处理死信: " + message);
}死信触发条件
- 消息被拒绝(basicNack/basicReject)且 requeue=false
- 消息过期(TTL)
- 队列达到最大长度
延迟队列
// 使用 RabbitMQ 延迟插件
rabbitTemplate.convertAndSend(
"delay.exchange",
"delay.routing.key",
message,
msg -> {
msg.getMessageProperties().setDelay(5000); // 延迟 5 秒
return msg;
}
);消息幂等性
@Service
public class OrderConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@RabbitListener(queues = "order.queue")
public void handleOrder(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
// 解析消息
OrderMessage msg = JSON.parseObject(message, OrderMessage.class);
// 幂等性检查
String key = "order:processed:" + msg.getOrderId();
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", 1, TimeUnit.DAYS);
if (!Boolean.TRUE.equals(success)) {
// 已处理过,直接确认
channel.basicAck(tag, false);
return;
}
try {
// 处理业务
processOrder(msg);
channel.basicAck(tag, false);
} catch (Exception e) {
// 删除幂等标记
redisTemplate.delete(key);
channel.basicNack(tag, false, true);
}
}
}面试高频问题
Q1: RabbitMQ 如何保证消息不丢失?
- 生产者确认:Confirm Callback
- Exchange/Queue 持久化:durable=true
- 消息持久化:deliveryMode=PERSISTENT
- 消费者手动确认:basicAck
Q2: 如何保证消息顺序性?
单个队列 + 单个消费者
Q3: RabbitMQ 和 Kafka 的区别?
| 对比 | RabbitMQ | Kafka |
|---|---|---|
| 定位 | 消息代理 | 分布式流平台 |
| 吞吐量 | 中 | 高 |
| 延迟 | 低 | 中 |
| 消息存储 | 内存/磁盘 | 磁盘 |
| 适用场景 | 业务系统 | 大数据、日志 |
总结
RabbitMQ 核心要点:
1. Exchange 类型:Direct、Topic、Fanout、Headers
2. 可靠性:生产者确认、消费者确认、持久化
3. 死信队列:处理失败消息
4. 幂等性:Redis 记录已处理消息