Java
Java多线程

Java多线程 09 - 生产者和消费者模型

简介:生产者和消费者模型问题是个非常典型的多线程问题,涉及到的对象包括生产者、消费者、仓库和产品。

1. 生产者和消费者模型

生产者和消费者模型问题是个非常典型的多线程问题,涉及到的对象包括生产者、消费者、仓库和产品。他们之间的关系如下:

  1. 生产者仅仅在仓储未满时候生产,仓满则停止生产。
  2. 消费者仅仅在仓储有产品时候才能消费,仓空则等待。
  3. 当消费者发现仓储没产品可消费时候会通知生产者生产。
  4. 生产者在生产出可消费产品时候,应该通知等待的消费者去消费。

生产者和消费者实现

下面通过wait() / notify()方式实现该模型。源码如下:

  • /** 仓库 */
  • class Store {
  • // 已经存储的数量
  • public volatile int count = 0;
  • // 最大容量
  • private int MAX_COUNT = 5;
  • public synchronized void store() {
  • try {
  • Thread.sleep(new Random().nextInt(300));
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • // 当超过最大容量的时候,生成操作开始等待
  • while (count >= MAX_COUNT) {
  • try {
  • System.out.println(Thread.currentThread().getName() + " begin waiting, store count: " + count);
  • wait();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • // 否则进行生产
  • count++;
  • System.out.println(Thread.currentThread().getName() + " produced, store count: " + count);
  • // 叫醒所有的等待线程,主要是为了叫醒等待的消费者线程
  • notifyAll();
  • }
  • public synchronized void consum() {
  • try {
  • Thread.sleep(new Random().nextInt(150));
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • // 当仓库为空时,消费操作开始等待
  • while (count <= 0) {
  • try {
  • System.out.println(Thread.currentThread().getName() + " begin waiting, store count: " + count);
  • wait();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • // 否则进行消费
  • count--;
  • System.out.println(Thread.currentThread().getName() + " consumed, store count: " + count);
  • // 叫醒所有的等待线程,主要是为了叫醒等待的生产者线程
  • notifyAll();
  • }
  • }
  • /** 生产者 */
  • class Producer implements Runnable {
  • private Store store;
  • public Producer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • // 开始不断生成
  • while (true) {
  • store.store();
  • try {
  • Thread.sleep(300);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • /** 消费者 */
  • class Consumer implements Runnable {
  • private Store store;
  • public Consumer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • // 开始不断消费
  • while (true) {
  • store.consum();
  • try {
  • Thread.sleep(300);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • public class ProducerAndConsumerTest {
  • public static void main(String[] args) {
  • Store store = new Store();
  • // 开启两个生产线程
  • new Thread(new Producer(store), "生产者1").start();
  • new Thread(new Producer(store), "生产者2").start();
  • new Thread(new Producer(store), "生产者3").start();
  • new Thread(new Producer(store), "生产者4").start();
  • new Thread(new Producer(store), "生产者5").start();
  • // 开启三个消费线程
  • new Thread(new Consumer(store), "消费者1").start();
  • new Thread(new Consumer(store), "消费者2").start();
  • new Thread(new Consumer(store), "消费者3").start();
  • }
  • }

某一次运行结果片段如下:

  • 生产者1 produced, store left count: 1
  • 消费者3 consumed, store left count: 0
  • 消费者2 begin waiting, store left count: 0
  • 消费者1 begin waiting, store left count: 0
  • 生产者5 produced, store left count: 1
  • 生产者4 produced, store left count: 2
  • 生产者3 produced, store left count: 3
  • 生产者2 produced, store left count: 4
  • 生产者4 produced, store left count: 5
  • 生产者5 begin waiting, store left count: 5
  • 消费者3 consumed, store left count: 4
  • 消费者1 consumed, store left count: 3
  • 消费者2 consumed, store left count: 2
  • 生产者1 produced, store left count: 3
  • 生产者4 produced, store left count: 4
  • 生产者2 produced, store left count: 5
  • 生产者5 begin waiting, store left count: 5
  • 生产者3 begin waiting, store left count: 5
  • 生产者1 begin waiting, store left count: 5
  • 消费者2 consumed, store left count: 4
  • ...

在上面的代码中,有一个仓库类Store,它的store()consum()方法负责存入或消费商品,这两个方法都是加锁的,并且是以当前Store对象为锁。

store()方法中,首先判断了当前库存量是否大于最大库存量,如果大于就使当前线程开始等待,如果没有达到最大库存量,就开始增加库存,并且调用notifyAll()方法唤醒所有线程。

consum()方法中,首先判断了当前库存量是否小于等于0,如果满足就使当前线程开始等待,如果仓库还有库存,就开始消费商品,并且调用notifyAll()方法唤醒所有线程。

生产者和消费者分别对应Producer和Consumer两个类,这两个类的实现非常简单,就是循环调用仓库的store()consum()方法分别生成商品和消费商品。需要注意的是,在主要代码中,Producer和Consumer两个类的实例中的Store实例是同一个,也就是说生产者和消费者是针对同一个仓库进行生产和消费操作的。

在主线程中,分别开启了五个生产线程和三个消费线程。

当Producer生产线程在生产的产品达到最大库存量时,会停下来等待,而当Consumer消费线程在消费到仓库库存为0时也会停下来等待;而每次生产或消费的时候都会执行notifyAll()方法唤醒所有线程,这个唤醒操作会唤醒等待的生产或消费线程继续进行生产或消费。