生产者-消费者模式
面试高频考点:生产者-消费者模式是并发编程中最经典的设计模式之一,考察对线程通信、锁机制和并发工具的理解。
核心概念
什么是生产者-消费者模式
生产者-消费者模式是一种并发编程设计模式,用于解决生产数据和消费数据速度不匹配的问题:
- 生产者(Producer):负责生产数据并放入缓冲区
- 消费者(Consumer):负责从缓冲区取出数据并处理
- 缓冲区(Buffer):用于存储生产者生产的数据,消费者从中取数据
┌──────────┐ ┌─────────────────┐ ┌──────────┐
│ 生产者 A │ ──→ │ │ ←── │ 消费者 X │
├──────────┤ │ 缓冲区 │ ├──────────┤
│ 生产者 B │ ──→ │ [1][2][3][ ] │ ←── │ 消费者 Y │
├──────────┤ │ │ ├──────────┤
│ 生产者 C │ ──→ │ │ ←── │ 消费者 Z │
└──────────┘ └─────────────────┘ └──────────┘模式优势
- 解耦:生产者和消费者不需要直接交互
- 缓冲:应对生产消费速度差异
- 异步处理:提高系统吞吐量
- 复用:生产和消费逻辑可以独立扩展
wait/notify 实现
基本实现
public class Buffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
public Buffer(int capacity) {
this.capacity = capacity;
}
// 生产者调用
public synchronized void produce(int value) throws InterruptedException {
// 缓冲区满,等待消费者消费
while (queue.size() == capacity) {
System.out.println("缓冲区已满,生产者等待...");
wait();
}
queue.add(value);
System.out.println("生产: " + value + ", 缓冲区大小: " + queue.size());
// 唤醒消费者
notifyAll();
}
// 消费者调用
public synchronized int consume() throws InterruptedException {
// 缓冲区空,等待生产者生产
while (queue.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
wait();
}
int value = queue.poll();
System.out.println("消费: " + value + ", 缓冲区大小: " + queue.size());
// 唤醒生产者
notifyAll();
return value;
}
}完整示例
public class ProducerConsumerDemo {
public static void main(String[] args) {
Buffer buffer = new Buffer(5);
// 创建3个生产者
for (int i = 0; i < 3; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
buffer.produce(producerId * 100 + j);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-" + i).start();
}
// 创建2个消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
while (true) {
buffer.consume();
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-" + i).start();
}
}
}为什么用 while 不用 if
// 错误写法 - 使用 if
public synchronized int consume() throws InterruptedException {
if (queue.isEmpty()) {
wait(); // 可能被虚假唤醒
}
return queue.poll(); // 可能抛出 NullPointerException
}
// 正确写法 - 使用 while
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 即使被虚假唤醒,也会再次检查条件
}
return queue.poll();
}原因:
- 虚假唤醒:线程可能在条件不满足时被唤醒
- 多消费者:被唤醒时队列可能已被其他消费者清空
- 中断唤醒:线程中断时也会从 wait 返回
BlockingQueue 实现
使用 ArrayBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
// 创建有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put(i); // 队列满时自动阻塞
System.out.println("生产: " + i);
Thread.sleep(100);
}
queue.put(-1); // 结束标志
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
int value;
while ((value = queue.take()) != -1) { // 队列空时自动阻塞
System.out.println("消费: " + value);
Thread.sleep(150);
}
System.out.println("消费者结束");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}BlockingQueue 核心方法
| 方法 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | - | - |
各种 BlockingQueue 实现
// 1. ArrayBlockingQueue - 有界数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100);
// 2. LinkedBlockingQueue - 可选有界链表队列
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);
BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 无界
// 3. PriorityBlockingQueue - 优先级队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
// 4. SynchronousQueue - 无缓冲队列
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 每个put必须等待take,直接传递
// 5. DelayQueue - 延迟队列
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 6. LinkedTransferQueue - 链表传输队列
BlockingQueue<String> transferQueue = new LinkedTransferQueue<>();实际应用:任务队列
public class TaskQueue {
private final BlockingQueue<Runnable> taskQueue;
private final ExecutorService executor;
public TaskQueue(int queueSize, int threadCount) {
this.taskQueue = new ArrayBlockingQueue<>(queueSize);
this.executor = Executors.newFixedThreadPool(threadCount);
// 启动消费者线程
for (int i = 0; i < threadCount; i++) {
executor.submit(this::consumeTask);
}
}
// 生产任务
public void submit(Runnable task) throws InterruptedException {
taskQueue.put(task);
}
// 消费任务
private void consumeTask() {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void shutdown() {
executor.shutdown();
}
}Condition 实现
基本用法
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBuffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 缓冲区未满
private final Condition notEmpty = lock.newCondition(); // 缓冲区非空
public ConditionBuffer(int capacity) {
this.capacity = capacity;
}
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("缓冲区已满,等待消费...");
notFull.await(); // 等待缓冲区不满
}
queue.add(value);
System.out.println("生产: " + value);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("缓冲区为空,等待生产...");
notEmpty.await(); // 等待缓冲区不空
}
int value = queue.poll();
System.out.println("消费: " + value);
notFull.signal(); // 通知生产者
return value;
} finally {
lock.unlock();
}
}
}Condition vs wait/notify
| 特性 | wait/notify | Condition |
|---|---|---|
| 所属 | Object | Lock |
| 条件数量 | 单一 | 可创建多个 |
| 唤醒方式 | notify/notifyAll | signal/signalAll |
| 公平性 | 不支持 | 支持公平模式 |
| 灵活性 | 低 | 高 |
多条件变量示例
public class AdvancedBuffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Condition sizeExceedsThreshold = lock.newCondition();
private final int threshold;
public AdvancedBuffer(int capacity, int threshold) {
this.capacity = capacity;
this.threshold = threshold;
}
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await();
}
queue.add(value);
// 队列非空,唤醒消费者
notEmpty.signal();
// 队列超过阈值,触发特殊处理
if (queue.size() > threshold) {
sizeExceedsThreshold.signal();
}
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
int value = queue.poll();
// 队列不满,唤醒生产者
notFull.signal();
return value;
} finally {
lock.unlock();
}
}
// 等待队列超过阈值
public void waitForThreshold() throws InterruptedException {
lock.lock();
try {
while (queue.size() <= threshold) {
sizeExceedsThreshold.await();
}
// 执行阈值触发逻辑
System.out.println("队列超过阈值: " + queue.size());
} finally {
lock.unlock();
}
}
}生产者-消费者变体
多生产者多消费者
public class MultiProducerConsumer {
private final BlockingQueue<Task> queue;
private final ExecutorService producerPool;
private final ExecutorService consumerPool;
public MultiProducerConsumer(int queueSize, int producerCount, int consumerCount) {
this.queue = new ArrayBlockingQueue<>(queueSize);
this.producerPool = Executors.newFixedThreadPool(producerCount);
this.consumerPool = Executors.newFixedThreadPool(consumerCount);
}
public void start() {
// 启动多个生产者
for (int i = 0; i < producerCount; i++) {
producerPool.submit(new Producer(queue, i));
}
// 启动多个消费者
for (int i = 0; i < consumerCount; i++) {
consumerPool.submit(new Consumer(queue, i));
}
}
static class Producer implements Runnable {
private final BlockingQueue<Task> queue;
private final int id;
public Producer(BlockingQueue<Task> queue, int id) {
this.queue = queue;
this.id = id;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Task task = createTask();
queue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private Task createTask() {
// 创建任务
return new Task();
}
}
static class Consumer implements Runnable {
private final BlockingQueue<Task> queue;
private final int id;
public Consumer(BlockingQueue<Task> queue, int id) {
this.queue = queue;
this.id = id;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Task task = queue.take();
processTask(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void processTask(Task task) {
// 处理任务
}
}
}优先级生产者-消费者
public class PriorityBuffer {
private final PriorityBlockingQueue<PriorityTask> queue;
public PriorityBuffer() {
this.queue = new PriorityBlockingQueue<>(100,
Comparator.comparingInt(PriorityTask::getPriority).reversed());
}
public void produce(PriorityTask task) throws InterruptedException {
queue.put(task);
}
public PriorityTask consume() throws InterruptedException {
return queue.take(); // 总是返回优先级最高的任务
}
}
class PriorityTask implements Comparable<PriorityTask> {
private final int priority;
private final Runnable task;
public PriorityTask(int priority, Runnable task) {
this.priority = priority;
this.task = task;
}
public int getPriority() {
return priority;
}
public void execute() {
task.run();
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority); // 高优先级在前
}
}延迟生产者-消费者
public class DelayedBuffer {
private final DelayQueue<DelayedTask> queue = new DelayQueue<>();
public void produce(Runnable task, long delay, TimeUnit unit) {
long expireTime = System.nanoTime() + unit.toNanos(delay);
queue.put(new DelayedTask(task, expireTime));
}
public Runnable consume() throws InterruptedException {
DelayedTask delayedTask = queue.take();
return delayedTask.getTask();
}
static class DelayedTask implements Delayed {
private final Runnable task;
private final long expireTime;
public DelayedTask(Runnable task, long expireTime) {
this.task = task;
this.expireTime = expireTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.expireTime, ((DelayedTask) other).expireTime);
}
public Runnable getTask() {
return task;
}
}
}实际应用场景
日志系统
public class AsyncLogger {
private final BlockingQueue<LogEntry> logQueue;
private final ExecutorService consumer;
public AsyncLogger(int queueSize) {
this.logQueue = new LinkedBlockingQueue<>(queueSize);
this.consumer = Executors.newSingleThreadExecutor();
this.consumer.submit(this::consumeLogs);
}
// 生产者:业务线程调用
public void log(String level, String message) {
LogEntry entry = new LogEntry(level, message, System.currentTimeMillis());
if (!logQueue.offer(entry)) {
// 队列满,降级处理
System.err.println("日志队列已满,丢弃日志: " + message);
}
}
// 消费者:异步写入文件
private void consumeLogs() {
while (!Thread.currentThread().isInterrupted()) {
try {
LogEntry entry = logQueue.take();
writeToFile(entry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void writeToFile(LogEntry entry) {
// 写入文件逻辑
}
}数据库连接池
public class ConnectionPool {
private final BlockingQueue<Connection> pool;
private final int maxSize;
public ConnectionPool(int initialSize, int maxSize) {
this.maxSize = maxSize;
this.pool = new LinkedBlockingQueue<>(maxSize);
for (int i = 0; i < initialSize; i++) {
pool.offer(createConnection());
}
}
// 消费者:获取连接
public Connection getConnection(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Connection conn = pool.poll(timeout, unit);
if (conn == null) {
throw new TimeoutException("获取连接超时");
}
return conn;
}
// 生产者:归还连接
public void releaseConnection(Connection conn) {
if (conn != null && pool.size() < maxSize) {
pool.offer(conn);
}
}
private Connection createConnection() {
// 创建数据库连接
return null;
}
}面试要点
问题1
Q: 生产者-消费者模式的作用是什么? A:
- 解耦:生产者和消费者不需要直接交互,通过缓冲区解耦
- 缓冲:应对生产和消费速度差异,平衡负载
- 异步处理:提高系统吞吐量,生产者不必等待消费者
- 扩展性:可以独立扩展生产者和消费者数量
问题2
Q: wait/notify 为什么要在同步块中使用? A:
- 防止竞态条件:wait 前需要检查条件,notify 需要改变条件,都需要原子操作
- 避免信号丢失:如果在 wait 之前 notify 已执行,线程将永远等待
- IllegalMonitorStateException:不在同步块中调用会抛异常
问题3
Q: 为什么用 while 循环检查条件而不是 if? A:
- 虚假唤醒:线程可能在条件不满足时被唤醒
- 多线程竞争:被唤醒时资源可能已被其他线程获取
- 中断唤醒:线程中断时也会从 wait 返回
使用 while 可以确保每次唤醒后都重新检查条件。
问题4
Q: BlockingQueue 有哪些实现?各自特点是什么? A:
| 实现类 | 特点 | 适用场景 |
|---|---|---|
| ArrayBlockingQueue | 有界数组,固定大小 | 边界明确的生产消费 |
| LinkedBlockingQueue | 可选有界链表 | 高吞吐量场景 |
| PriorityBlockingQueue | 优先级队列 | 需要按优先级处理 |
| SynchronousQueue | 无缓冲,直接传递 | 一对一交接场景 |
| DelayQueue | 延迟队列 | 定时任务调度 |
问题5
Q: Condition 相比 wait/notify 有什么优势? A:
- 多条件变量:一个 Lock 可以创建多个 Condition,实现更精细的等待/唤醒控制
- 公平性支持:配合 ReentrantLock 的公平模式
- 更灵活:可以只唤醒特定条件的线程
- 可中断:支持响应中断的等待
问题6
Q: 如何处理生产速度远大于消费速度的情况? A:
- 有界队列:使用 ArrayBlockingQueue 设置上限
- 拒绝策略:队列满时使用 offer 返回 false 或抛异常
- 背压机制:使用 Semaphore 限制生产速度
- 增加消费者:动态扩容消费者线程
- 降级处理:丢弃部分数据或降低处理精度
总结
| 实现方式 | 复杂度 | 灵活性 | 推荐场景 |
|---|---|---|---|
| wait/notify | 高 | 低 | 学习原理、简单场景 |
| BlockingQueue | 低 | 中 | 大多数实际应用 |
| Condition | 中 | 高 | 需要精细控制 |
最佳实践:
- 优先使用 BlockingQueue
- 队列大小要有上限
- 考虑优雅停机
- 处理好异常和中断
- 监控队列积压情况
掌握生产者-消费者模式是并发编程的基础能力!