1. 生产者和消费者模型
生产者和消费者模型问题是个非常典型的多线程问题,涉及到的对象包括生产者、消费者、仓库和产品。他们之间的关系如下:
- 生产者仅仅在仓储未满时候生产,仓满则停止生产。
- 消费者仅仅在仓储有产品时候才能消费,仓空则等待。
- 当消费者发现仓储没产品可消费时候会通知生产者生产。
- 生产者在生产出可消费产品时候,应该通知等待的消费者去消费。
生产者和消费者实现
下面通过wait()
/ notify()
方式实现该模型。源码如下:
- /** 仓库 */
- class Store {
- // 已经存储的数量
- public volatile int count = 0;
- // 最大容量
- private int MAX_COUNT = 5;
- public synchronized void store() {
- try {
- Thread.sleep(new Random().nextInt(300));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 当超过最大容量的时候,生成操作开始等待
- while (count >= MAX_COUNT) {
- try {
- System.out.println(Thread.currentThread().getName() + " begin waiting, store count: " + count);
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 否则进行生产
- count++;
- System.out.println(Thread.currentThread().getName() + " produced, store count: " + count);
- // 叫醒所有的等待线程,主要是为了叫醒等待的消费者线程
- notifyAll();
- }
- public synchronized void consum() {
- try {
- Thread.sleep(new Random().nextInt(150));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 当仓库为空时,消费操作开始等待
- while (count <= 0) {
- try {
- System.out.println(Thread.currentThread().getName() + " begin waiting, store count: " + count);
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 否则进行消费
- count--;
- System.out.println(Thread.currentThread().getName() + " consumed, store count: " + count);
- // 叫醒所有的等待线程,主要是为了叫醒等待的生产者线程
- notifyAll();
- }
- }
- /** 生产者 */
- class Producer implements Runnable {
- private Store store;
- public Producer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- // 开始不断生成
- while (true) {
- store.store();
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- /** 消费者 */
- class Consumer implements Runnable {
- private Store store;
- public Consumer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- // 开始不断消费
- while (true) {
- store.consum();
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- public class ProducerAndConsumerTest {
- public static void main(String[] args) {
- Store store = new Store();
- // 开启两个生产线程
- new Thread(new Producer(store), "生产者1").start();
- new Thread(new Producer(store), "生产者2").start();
- new Thread(new Producer(store), "生产者3").start();
- new Thread(new Producer(store), "生产者4").start();
- new Thread(new Producer(store), "生产者5").start();
- // 开启三个消费线程
- new Thread(new Consumer(store), "消费者1").start();
- new Thread(new Consumer(store), "消费者2").start();
- new Thread(new Consumer(store), "消费者3").start();
- }
- }
某一次运行结果片段如下:
- 生产者1 produced, store left count: 1
- 消费者3 consumed, store left count: 0
- 消费者2 begin waiting, store left count: 0
- 消费者1 begin waiting, store left count: 0
- 生产者5 produced, store left count: 1
- 生产者4 produced, store left count: 2
- 生产者3 produced, store left count: 3
- 生产者2 produced, store left count: 4
- 生产者4 produced, store left count: 5
- 生产者5 begin waiting, store left count: 5
- 消费者3 consumed, store left count: 4
- 消费者1 consumed, store left count: 3
- 消费者2 consumed, store left count: 2
- 生产者1 produced, store left count: 3
- 生产者4 produced, store left count: 4
- 生产者2 produced, store left count: 5
- 生产者5 begin waiting, store left count: 5
- 生产者3 begin waiting, store left count: 5
- 生产者1 begin waiting, store left count: 5
- 消费者2 consumed, store left count: 4
- ...
在上面的代码中,有一个仓库类Store,它的store()
和consum()
方法负责存入或消费商品,这两个方法都是加锁的,并且是以当前Store对象为锁。
在store()
方法中,首先判断了当前库存量是否大于最大库存量,如果大于就使当前线程开始等待,如果没有达到最大库存量,就开始增加库存,并且调用notifyAll()
方法唤醒所有线程。
在consum()
方法中,首先判断了当前库存量是否小于等于0,如果满足就使当前线程开始等待,如果仓库还有库存,就开始消费商品,并且调用notifyAll()
方法唤醒所有线程。
生产者和消费者分别对应Producer和Consumer两个类,这两个类的实现非常简单,就是循环调用仓库的store()
和consum()
方法分别生成商品和消费商品。需要注意的是,在主要代码中,Producer和Consumer两个类的实例中的Store实例是同一个,也就是说生产者和消费者是针对同一个仓库进行生产和消费操作的。
在主线程中,分别开启了五个生产线程和三个消费线程。
当Producer生产线程在生产的产品达到最大库存量时,会停下来等待,而当Consumer消费线程在消费到仓库库存为0时也会停下来等待;而每次生产或消费的时候都会执行notifyAll()
方法唤醒所有线程,这个唤醒操作会唤醒等待的生产或消费线程继续进行生产或消费。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...