Java
Java多线程
Java并发
JUC锁

Java多线程 18 - Condition

简介:Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

1. Condition介绍

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait()notify()notifyAll()方法是和同步锁(synchronized关键字)捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。

Condition函数列表如下:

  • // 使当前线程在接到信号或被中断之前一直处于等待状态
  • void await()
  • // 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
  • boolean await(long time, TimeUnit unit)
  • // 使当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
  • long awaitNanos(long nanosTimeout)
  • // 使当前线程在接到信号之前一直处于等待状态,不响应中断,但会记录中断操作
  • void awaitUninterruptibly()
  • // 使当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
  • boolean awaitUntil(Date deadline)
  • // 唤醒一个等待线程
  • void signal()
  • // 唤醒所有等待线程
  • void signalAll()

Condition强大的地方在于,它能够更加精细的控制多线程的等待与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition,指定特定的线程进入等待或被唤醒。但Object类中的wait()notify()notifyAll()实现的等待和唤醒,虽然可以指定特定的线程进入等待,但并不能通过notify()notifyAll()明确地指定被唤醒的线程。

2. Condition示例

为了演示Condition的用法,读者可以先参考前面介绍wait()signal()signalAll()用法的文章,然后再来与Condition的用法进行对比。Condition需要和互斥锁或共享锁捆绑使用,其实在之前讲解ReentrantLock时,最后使用Condition协助,实现了一个生产者消费者的模型,因此这里就不讲解Condition的简单用法了,读者可以自行查看ReentrantLock中的例子。

下面的例子中BoundedBuffer类来自于JUC包源码作者Doug Lea给出的例子,也是一个生产者消费者的场景:

  • package com.coderap.lock.condition;
  • import java.util.concurrent.locks.Lock;
  • import java.util.concurrent.locks.Condition;
  • import java.util.concurrent.locks.ReentrantLock;
  • class BoundedBuffer {
  • final Lock lock = new ReentrantLock();
  • final Condition notFull = lock.newCondition();
  • final Condition notEmpty = lock.newCondition();
  • final Object[] items = new Object[5];
  • int putptr, takeptr, count;
  • public void put(Object x) throws InterruptedException {
  • lock.lock(); // 获取锁
  • try {
  • // 如果缓冲已满,则等待;直到缓冲不是满的,才将x添加到缓冲中
  • while (count == items.length)
  • notFull.await();
  • // 将x添加到缓冲中
  • items[putptr] = x;
  • // 将put统计数putptr+1;如果缓冲已满,则设putptr为0
  • if (++putptr == items.length) putptr = 0;
  • // 将缓冲数量+1
  • ++count;
  • // 唤醒take线程,因为take线程通过notEmpty.await()等待
  • notEmpty.signal();
  • // 打印写入的数据
  • System.out.println(Thread.currentThread().getName() + " put " + (Integer) x);
  • } finally {
  • lock.unlock(); // 释放锁
  • }
  • }
  • public Object take() throws InterruptedException {
  • lock.lock(); // 获取锁
  • try {
  • // 如果缓冲为空,则等待;直到缓冲不为空,才将x从缓冲中取出
  • while (count == 0)
  • notEmpty.await();
  • // 将x从缓冲中取出
  • Object x = items[takeptr];
  • // 将take统计数takeptr+1;如果缓冲为空,则设takeptr为0
  • if (++takeptr == items.length) takeptr = 0;
  • // 将缓冲数量-1
  • --count;
  • // 唤醒put线程,因为put线程通过notFull.await()等待
  • notFull.signal();
  • // 打印取出的数据
  • System.out.println(Thread.currentThread().getName() + " take " + (Integer) x);
  • return x;
  • } finally {
  • lock.unlock(); // 释放锁
  • }
  • }
  • }
  • public class ConditionTest {
  • private static BoundedBuffer boundedBuffer = new BoundedBuffer();
  • public static void main(String[] args) {
  • for (int i = 0; i < 10; i++) {
  • // 启动10个写线程,向boundedBuffer中不断的写数据
  • new PutThread("PutThread - " + i, i).start();
  • // 启动10个读线程,从boundedBuffer中不断的读数据
  • new TakeThread("TakeThread - " + i).start();
  • }
  • }
  • static class PutThread extends Thread {
  • private int num;
  • public PutThread(String name, int num) {
  • super(name);
  • this.num = num;
  • }
  • public void run() {
  • try {
  • // 线程休眠1ms
  • Thread.sleep(1);
  • // 向BoundedBuffer中写入数据
  • boundedBuffer.put(num);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • static class TakeThread extends Thread {
  • public TakeThread(String name) {
  • super(name);
  • }
  • public void run() {
  • try {
  • // 线程休眠1ms
  • Thread.sleep(10);
  • // 从BoundedBuffer中取出数据
  • Integer num = (Integer) boundedBuffer.take();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }

运行时某一次的打印结果如下:

  • PutThread - 0 put 0
  • PutThread - 1 put 1
  • PutThread - 2 put 2
  • PutThread - 3 put 3
  • PutThread - 4 put 4
  • TakeThread - 2 take 0
  • TakeThread - 1 take 1
  • TakeThread - 0 take 2
  • PutThread - 5 put 5
  • PutThread - 6 put 6
  • PutThread - 7 put 7
  • TakeThread - 3 take 3
  • TakeThread - 9 take 4
  • PutThread - 8 put 8
  • TakeThread - 4 take 5
  • TakeThread - 6 take 6
  • TakeThread - 5 take 7
  • TakeThread - 8 take 8
  • PutThread - 9 put 9
  • TakeThread - 7 take 9

Doug Lea给出的例子其实与前面ReentrantLock中的例子非常相似。BoundedBuffer是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作一个BoundedBuffer对象时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用notFullnotEmpty这两个Condition。notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put()方法的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count加1,最后调用notEmpty.signal()唤醒notEmpty上的等待线程,即调用notEmpty.await进入阻塞等待状态的线程。简言之,notFull控制缓冲区的写入,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程;notEmpty控制缓冲区的读取,当读取了缓冲区数据之后会唤醒notFull上的等待线程。在ConditionTest的main()方法中,启动10个写线程,向BoundedBuffer中不断的写数据;同时也启动10个读线程,从BoundedBuffer中不断的读数据;读写线程间歇调用,实现生产者的生产动作和消费者的消费动作。

由于Condition类在配合锁使用时,很多底层细节都与锁的机制有关,因此关于Condition的具体实现,会在后面讲解AbstractQueuedSynchronizer中详细阐述。