知识模块
☕ Java 知识模块
三、Java 并发编程
生产者-消费者模式

生产者-消费者模式

面试高频考点:生产者-消费者模式是并发编程中最经典的设计模式之一,考察对线程通信、锁机制和并发工具的理解。

核心概念

什么是生产者-消费者模式

生产者-消费者模式是一种并发编程设计模式,用于解决生产数据和消费数据速度不匹配的问题:

  • 生产者(Producer):负责生产数据并放入缓冲区
  • 消费者(Consumer):负责从缓冲区取出数据并处理
  • 缓冲区(Buffer):用于存储生产者生产的数据,消费者从中取数据
┌──────────┐     ┌─────────────────┐     ┌──────────┐
│ 生产者 A  │ ──→ │                 │ ←── │ 消费者 X  │
├──────────┤     │     缓冲区       │     ├──────────┤
│ 生产者 B  │ ──→ │  [1][2][3][ ]   │ ←── │ 消费者 Y  │
├──────────┤     │                 │     ├──────────┤
│ 生产者 C  │ ──→ │                 │ ←── │ 消费者 Z  │
└──────────┘     └─────────────────┘     └──────────┘

模式优势

  1. 解耦:生产者和消费者不需要直接交互
  2. 缓冲:应对生产消费速度差异
  3. 异步处理:提高系统吞吐量
  4. 复用:生产和消费逻辑可以独立扩展

wait/notify 实现

基本实现

public class Buffer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;
    
    public Buffer(int capacity) {
        this.capacity = capacity;
    }
    
    // 生产者调用
    public synchronized void produce(int value) throws InterruptedException {
        // 缓冲区满,等待消费者消费
        while (queue.size() == capacity) {
            System.out.println("缓冲区已满,生产者等待...");
            wait();
        }
        
        queue.add(value);
        System.out.println("生产: " + value + ", 缓冲区大小: " + queue.size());
        
        // 唤醒消费者
        notifyAll();
    }
    
    // 消费者调用
    public synchronized int consume() throws InterruptedException {
        // 缓冲区空,等待生产者生产
        while (queue.isEmpty()) {
            System.out.println("缓冲区为空,消费者等待...");
            wait();
        }
        
        int value = queue.poll();
        System.out.println("消费: " + value + ", 缓冲区大小: " + queue.size());
        
        // 唤醒生产者
        notifyAll();
        
        return value;
    }
}

完整示例

public class ProducerConsumerDemo {
    
    public static void main(String[] args) {
        Buffer buffer = new Buffer(5);
        
        // 创建3个生产者
        for (int i = 0; i < 3; i++) {
            final int producerId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        buffer.produce(producerId * 100 + j);
                        Thread.sleep(100);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }
        
        // 创建2个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        buffer.consume();
                        Thread.sleep(150);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i).start();
        }
    }
}

为什么用 while 不用 if

// 错误写法 - 使用 if
public synchronized int consume() throws InterruptedException {
    if (queue.isEmpty()) {
        wait();  // 可能被虚假唤醒
    }
    return queue.poll();  // 可能抛出 NullPointerException
}
 
// 正确写法 - 使用 while
public synchronized int consume() throws InterruptedException {
    while (queue.isEmpty()) {
        wait();  // 即使被虚假唤醒,也会再次检查条件
    }
    return queue.poll();
}

原因

  1. 虚假唤醒:线程可能在条件不满足时被唤醒
  2. 多消费者:被唤醒时队列可能已被其他消费者清空
  3. 中断唤醒:线程中断时也会从 wait 返回

BlockingQueue 实现

使用 ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
public class BlockingQueueDemo {
    
    public static void main(String[] args) {
        // 创建有界阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    queue.put(i);  // 队列满时自动阻塞
                    System.out.println("生产: " + i);
                    Thread.sleep(100);
                }
                queue.put(-1);  // 结束标志
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                int value;
                while ((value = queue.take()) != -1) {  // 队列空时自动阻塞
                    System.out.println("消费: " + value);
                    Thread.sleep(150);
                }
                System.out.println("消费者结束");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
}

BlockingQueue 核心方法

方法抛异常返回特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()--

各种 BlockingQueue 实现

// 1. ArrayBlockingQueue - 有界数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100);
 
// 2. LinkedBlockingQueue - 可选有界链表队列
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);
BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();  // 无界
 
// 3. PriorityBlockingQueue - 优先级队列
BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();
 
// 4. SynchronousQueue - 无缓冲队列
BlockingQueue<String> syncQueue = new SynchronousQueue<>();
// 每个put必须等待take,直接传递
 
// 5. DelayQueue - 延迟队列
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();
 
// 6. LinkedTransferQueue - 链表传输队列
BlockingQueue<String> transferQueue = new LinkedTransferQueue<>();

实际应用:任务队列

public class TaskQueue {
    private final BlockingQueue<Runnable> taskQueue;
    private final ExecutorService executor;
    
    public TaskQueue(int queueSize, int threadCount) {
        this.taskQueue = new ArrayBlockingQueue<>(queueSize);
        this.executor = Executors.newFixedThreadPool(threadCount);
        
        // 启动消费者线程
        for (int i = 0; i < threadCount; i++) {
            executor.submit(this::consumeTask);
        }
    }
    
    // 生产任务
    public void submit(Runnable task) throws InterruptedException {
        taskQueue.put(task);
    }
    
    // 消费任务
    private void consumeTask() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Runnable task = taskQueue.take();
                task.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    public void shutdown() {
        executor.shutdown();
    }
}

Condition 实现

基本用法

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class ConditionBuffer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();  // 缓冲区未满
    private final Condition notEmpty = lock.newCondition(); // 缓冲区非空
    
    public ConditionBuffer(int capacity) {
        this.capacity = capacity;
    }
    
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                System.out.println("缓冲区已满,等待消费...");
                notFull.await();  // 等待缓冲区不满
            }
            
            queue.add(value);
            System.out.println("生产: " + value);
            
            notEmpty.signal();  // 通知消费者
        } finally {
            lock.unlock();
        }
    }
    
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println("缓冲区为空,等待生产...");
                notEmpty.await();  // 等待缓冲区不空
            }
            
            int value = queue.poll();
            System.out.println("消费: " + value);
            
            notFull.signal();  // 通知生产者
            
            return value;
        } finally {
            lock.unlock();
        }
    }
}

Condition vs wait/notify

特性wait/notifyCondition
所属ObjectLock
条件数量单一可创建多个
唤醒方式notify/notifyAllsignal/signalAll
公平性不支持支持公平模式
灵活性

多条件变量示例

public class AdvancedBuffer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final Condition sizeExceedsThreshold = lock.newCondition();
    private final int threshold;
    
    public AdvancedBuffer(int capacity, int threshold) {
        this.capacity = capacity;
        this.threshold = threshold;
    }
    
    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();
            }
            
            queue.add(value);
            
            // 队列非空,唤醒消费者
            notEmpty.signal();
            
            // 队列超过阈值,触发特殊处理
            if (queue.size() > threshold) {
                sizeExceedsThreshold.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            
            int value = queue.poll();
            
            // 队列不满,唤醒生产者
            notFull.signal();
            
            return value;
        } finally {
            lock.unlock();
        }
    }
    
    // 等待队列超过阈值
    public void waitForThreshold() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() <= threshold) {
                sizeExceedsThreshold.await();
            }
            // 执行阈值触发逻辑
            System.out.println("队列超过阈值: " + queue.size());
        } finally {
            lock.unlock();
        }
    }
}

生产者-消费者变体

多生产者多消费者

public class MultiProducerConsumer {
    private final BlockingQueue<Task> queue;
    private final ExecutorService producerPool;
    private final ExecutorService consumerPool;
    
    public MultiProducerConsumer(int queueSize, int producerCount, int consumerCount) {
        this.queue = new ArrayBlockingQueue<>(queueSize);
        this.producerPool = Executors.newFixedThreadPool(producerCount);
        this.consumerPool = Executors.newFixedThreadPool(consumerCount);
    }
    
    public void start() {
        // 启动多个生产者
        for (int i = 0; i < producerCount; i++) {
            producerPool.submit(new Producer(queue, i));
        }
        
        // 启动多个消费者
        for (int i = 0; i < consumerCount; i++) {
            consumerPool.submit(new Consumer(queue, i));
        }
    }
    
    static class Producer implements Runnable {
        private final BlockingQueue<Task> queue;
        private final int id;
        
        public Producer(BlockingQueue<Task> queue, int id) {
            this.queue = queue;
            this.id = id;
        }
        
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Task task = createTask();
                    queue.put(task);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        private Task createTask() {
            // 创建任务
            return new Task();
        }
    }
    
    static class Consumer implements Runnable {
        private final BlockingQueue<Task> queue;
        private final int id;
        
        public Consumer(BlockingQueue<Task> queue, int id) {
            this.queue = queue;
            this.id = id;
        }
        
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Task task = queue.take();
                    processTask(task);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        private void processTask(Task task) {
            // 处理任务
        }
    }
}

优先级生产者-消费者

public class PriorityBuffer {
    private final PriorityBlockingQueue<PriorityTask> queue;
    
    public PriorityBuffer() {
        this.queue = new PriorityBlockingQueue<>(100, 
            Comparator.comparingInt(PriorityTask::getPriority).reversed());
    }
    
    public void produce(PriorityTask task) throws InterruptedException {
        queue.put(task);
    }
    
    public PriorityTask consume() throws InterruptedException {
        return queue.take();  // 总是返回优先级最高的任务
    }
}
 
class PriorityTask implements Comparable<PriorityTask> {
    private final int priority;
    private final Runnable task;
    
    public PriorityTask(int priority, Runnable task) {
        this.priority = priority;
        this.task = task;
    }
    
    public int getPriority() {
        return priority;
    }
    
    public void execute() {
        task.run();
    }
    
    @Override
    public int compareTo(PriorityTask other) {
        return Integer.compare(other.priority, this.priority);  // 高优先级在前
    }
}

延迟生产者-消费者

public class DelayedBuffer {
    private final DelayQueue<DelayedTask> queue = new DelayQueue<>();
    
    public void produce(Runnable task, long delay, TimeUnit unit) {
        long expireTime = System.nanoTime() + unit.toNanos(delay);
        queue.put(new DelayedTask(task, expireTime));
    }
    
    public Runnable consume() throws InterruptedException {
        DelayedTask delayedTask = queue.take();
        return delayedTask.getTask();
    }
    
    static class DelayedTask implements Delayed {
        private final Runnable task;
        private final long expireTime;
        
        public DelayedTask(Runnable task, long expireTime) {
            this.task = task;
            this.expireTime = expireTime;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
        
        @Override
        public int compareTo(Delayed other) {
            return Long.compare(this.expireTime, ((DelayedTask) other).expireTime);
        }
        
        public Runnable getTask() {
            return task;
        }
    }
}

实际应用场景

日志系统

public class AsyncLogger {
    private final BlockingQueue<LogEntry> logQueue;
    private final ExecutorService consumer;
    
    public AsyncLogger(int queueSize) {
        this.logQueue = new LinkedBlockingQueue<>(queueSize);
        this.consumer = Executors.newSingleThreadExecutor();
        this.consumer.submit(this::consumeLogs);
    }
    
    // 生产者:业务线程调用
    public void log(String level, String message) {
        LogEntry entry = new LogEntry(level, message, System.currentTimeMillis());
        if (!logQueue.offer(entry)) {
            // 队列满,降级处理
            System.err.println("日志队列已满,丢弃日志: " + message);
        }
    }
    
    // 消费者:异步写入文件
    private void consumeLogs() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                LogEntry entry = logQueue.take();
                writeToFile(entry);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    private void writeToFile(LogEntry entry) {
        // 写入文件逻辑
    }
}

数据库连接池

public class ConnectionPool {
    private final BlockingQueue<Connection> pool;
    private final int maxSize;
    
    public ConnectionPool(int initialSize, int maxSize) {
        this.maxSize = maxSize;
        this.pool = new LinkedBlockingQueue<>(maxSize);
        
        for (int i = 0; i < initialSize; i++) {
            pool.offer(createConnection());
        }
    }
    
    // 消费者:获取连接
    public Connection getConnection(long timeout, TimeUnit unit) 
            throws InterruptedException, TimeoutException {
        Connection conn = pool.poll(timeout, unit);
        if (conn == null) {
            throw new TimeoutException("获取连接超时");
        }
        return conn;
    }
    
    // 生产者:归还连接
    public void releaseConnection(Connection conn) {
        if (conn != null && pool.size() < maxSize) {
            pool.offer(conn);
        }
    }
    
    private Connection createConnection() {
        // 创建数据库连接
        return null;
    }
}

面试要点

问题1

Q: 生产者-消费者模式的作用是什么? A:

  1. 解耦:生产者和消费者不需要直接交互,通过缓冲区解耦
  2. 缓冲:应对生产和消费速度差异,平衡负载
  3. 异步处理:提高系统吞吐量,生产者不必等待消费者
  4. 扩展性:可以独立扩展生产者和消费者数量

问题2

Q: wait/notify 为什么要在同步块中使用? A:

  1. 防止竞态条件:wait 前需要检查条件,notify 需要改变条件,都需要原子操作
  2. 避免信号丢失:如果在 wait 之前 notify 已执行,线程将永远等待
  3. IllegalMonitorStateException:不在同步块中调用会抛异常

问题3

Q: 为什么用 while 循环检查条件而不是 if? A:

  1. 虚假唤醒:线程可能在条件不满足时被唤醒
  2. 多线程竞争:被唤醒时资源可能已被其他线程获取
  3. 中断唤醒:线程中断时也会从 wait 返回

使用 while 可以确保每次唤醒后都重新检查条件。

问题4

Q: BlockingQueue 有哪些实现?各自特点是什么? A:

实现类特点适用场景
ArrayBlockingQueue有界数组,固定大小边界明确的生产消费
LinkedBlockingQueue可选有界链表高吞吐量场景
PriorityBlockingQueue优先级队列需要按优先级处理
SynchronousQueue无缓冲,直接传递一对一交接场景
DelayQueue延迟队列定时任务调度

问题5

Q: Condition 相比 wait/notify 有什么优势? A:

  1. 多条件变量:一个 Lock 可以创建多个 Condition,实现更精细的等待/唤醒控制
  2. 公平性支持:配合 ReentrantLock 的公平模式
  3. 更灵活:可以只唤醒特定条件的线程
  4. 可中断:支持响应中断的等待

问题6

Q: 如何处理生产速度远大于消费速度的情况? A:

  1. 有界队列:使用 ArrayBlockingQueue 设置上限
  2. 拒绝策略:队列满时使用 offer 返回 false 或抛异常
  3. 背压机制:使用 Semaphore 限制生产速度
  4. 增加消费者:动态扩容消费者线程
  5. 降级处理:丢弃部分数据或降低处理精度

总结

实现方式复杂度灵活性推荐场景
wait/notify学习原理、简单场景
BlockingQueue大多数实际应用
Condition需要精细控制

最佳实践

  1. 优先使用 BlockingQueue
  2. 队列大小要有上限
  3. 考虑优雅停机
  4. 处理好异常和中断
  5. 监控队列积压情况

掌握生产者-消费者模式是并发编程的基础能力!