Java多线程
Java并发
JUC锁

Java多线程 21 - AbstractQueuedSynchronizer详解(2)

简介:AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。

1. Condition条件队列模型

上一篇文章中,我们以AQS在ReentrantLock独占锁在公平模式下的使用方法为导向,介绍了AQS控制同步状态的获取和释放的基本思想和流程,并详细分析了源码逻辑;但上一篇文章规避了两个重要的概念:Condition和共享模式。因此在这篇为文章将继续探讨Condition在AQS中的实现以及其源码分析。

需要注意的是,在阅读本文之前你应该已经完全读懂上一篇文章中介绍的AQS所涉及的所有概念,同时你还需要明白Condition的使用场景。关于Condition的使用,可以关注前面的文章Java多线程 18 - Condition

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait()notify()notifyAll()方法是和同步锁(synchronized关键字)捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。

常见的情况下(其实也是目前仅有的情况),Condition会和ReentrantLock配合使用,这是由于,Condition作为一个接口,它在标准JDK中唯一的两个实现分别位于AbstractQueuedSynchronizer和AbstractQueuedLongSynchronizer两个抽象类中,命名都为ConditionObject;同时,对AbstractQueuedSynchronizer中ConditionObject的直接使用,位于ReentrantLock和ReentrantReadWriteLock中,本文依旧以ReentrantLock作为导向作介绍;查看它所使用的ConditionObject代码:

  • final ConditionObject newCondition() {
  • return new ConditionObject();
  • }

注:AbstractQueuedLongSynchronizer与AbstractQueuedSynchronizer功能基本相同,唯一的区别在于AbstractQueuedLongSynchronizer中state变量的类型为long。

先查看AQS中ConditionObject内部类的变量,如下:

  • public class ConditionObject implements Condition, java.io.Serializable {
  • private static final long serialVersionUID = 1173984872572414699L;
  • /** 条件队列的头节点,transient修饰,不进行序列化 */
  • private transient Node firstWaiter;
  • /** 条件队列的尾节点,transient修饰,不进行序列化 */
  • private transient Node lastWaiter;
  • /**
  • * Mode meaning to reinterrupt on exit from wait
  • * 异常标识,表示在退出等待时需要重新中断线程
  • */
  • private static final int REINTERRUPT = 1;
  • /**
  • * Mode meaning to throw InterruptedException on exit from wait
  • * 异常标识,表示在退出等待时需要抛出InterruptedException异常
  • */
  • private static final int THROW_IE = -1;
  • ...

这里,我们需要引入条件队列的概念。在ConditionObject中其实也维护了一个队列,我们称之为条件队列。这个队列中的节点也是由AQS内部类Node实现的,节点记录了所有调用await()操作的线程;在上面列出的ConditionObject的变量中,firstWaiter指向了条件队列的头节点,lastWaiter指向了条件队列的尾节点。需要注意的是,每一个ConditionObject的实例,都将单独维护一个条件队列。条件队列的大致结构示意图如下:

1.Condition条件队列模型.png

条件队列与之前涉及的等待队列非常类似,不同点在于:

  1. 等待队列由AQS直接维护;条件队列由ConditionObject维护;
  2. 等待队列头节点仅仅用作标记作用,未包装线程;条件队列的每个节点都包装了对应的线程。
  3. 等待队列是双向链表;条件队列是单向链表,它的节点的prev属性没有被使用。

对条件队列有了基础的印象,接下来就开始介绍ConditionObject对条件队列的各类维护操作。

2. await()操作

在使用Condition时,我们只需要在某个线程的代码内调用Condition实例的await()操作就可以使当前线程进入等待状态,Condition提供了多种await()方法:

  • 普通await()方法,会抛出InterruptedException异常。
  • 带有超时机制的awaitNanos(long nanosTimeout)await(long time, TimeUnit unit)awaitUntil(Date deadline)方法,会抛出InterruptedException异常。
  • 不抛出InterruptedException异常的awaitUninterruptibly()方法。

这里我们先分析普通的await()方法,其他两种将在后面的部分进行讲解。await()的源码如下:

  • public final void await() throws InterruptedException {
  • if (Thread.interrupted())
  • // 由于该方法是可中断的,如果线程被中断过,则直接抛出中断异常,不用往下执行了
  • throw new InterruptedException();
  • // 将当前线程添加到条件队列中,返回的添加的节点
  • Node node = addConditionWaiter();
  • // 释放锁,返回值是拥有锁时的state的值;由于该线程已经要进入等待了,自然应该将其获取的锁释放掉
  • int savedState = fullyRelease(node);
  • int interruptMode = 0;
  • /**
  • * 当节点不在同步队列中时会不断循环,而循环体中LockSupport.park(this)则会将线程挂起;
  • * 如果节点被移入了同步队列,并被成功唤醒,就会从LockSupport.park(this)处继续往下执行;
  • * 此时会判断是否发生过中断,如果发生过则直接跳出while循环,否则还会继续判断一次while循环条件,
  • * 在这一次判断中,因为节点已从条件队列转移到同步队列中了,isOnSyncQueue返回true,就会跳出循环。
  • */
  • while (!isOnSyncQueue(node)) {
  • // 挂起当前线程,当前线程会阻塞在这里等待被唤醒,不再往下执行
  • LockSupport.park(this);
  • /**
  • * 需要注意这里,checkInterruptWhileWaiting(node)返回值为true时也会结束循环
  • * 这个操作后面会详细讲
  • */
  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  • break;
  • }
  • /**
  • * 代码运行到这里,说明节点被唤醒了
  • * 当acquireQueued(node, savedState)返回true时,表示线程已经获取到锁了
  • * 且线程在等待期间发生了中断
  • * 此时会判断interruptMode是否为THROW_IE,如果不是,将interruptMode置为REINTERRUPT
  • */
  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  • interruptMode = REINTERRUPT;
  • if (node.nextWaiter != null) // clean up if cancelled
  • // 清理取消等待的线程节点,后面会详细分析
  • unlinkCancelledWaiters();
  • if (interruptMode != 0)
  • // 如果interruptMode不为0,需要处理产生的中断异常,后面会详细分析
  • reportInterruptAfterWait(interruptMode);
  • }

接下来分析await()中涉及的部分方法,首先是addConditionWaiter(),这个方法负责将调用await()方法的线程包装为节点添加到条件队列中,源码如下:

  • private Node addConditionWaiter() {
  • // 尾节点
  • Node t = lastWaiter;
  • // If lastWaiter is cancelled, clean out.
  • // 如果尾节点的waitStatus状态不为CONDITION,将进行清理操作
  • if (t != null && t.waitStatus != Node.CONDITION) {
  • unlinkCancelledWaiters();
  • t = lastWaiter;
  • }
  • // 构造新节点,记录了当前线程,waitStatus值为CONDITION
  • Node node = new Node(Thread.currentThread(), Node.CONDITION);
  • // 将新节点添加到条件队列
  • if (t == null)
  • firstWaiter = node;
  • else
  • t.nextWaiter = node;
  • lastWaiter = node;
  • // 返回新添加的节点
  • return node;
  • }

addConditionWaiter()中存在一个unlinkCancelledWaiters(),用于清理条件队列中取消等待的节点,该方法在await()中也有调用,下面是该方法的源码:

  • // 清理条件队列中取消等待的节点
  • private void unlinkCancelledWaiters() {
  • // 游标节点,从第一个节点开始遍历
  • Node t = firstWaiter;
  • Node trail = null;
  • while (t != null) {
  • // 获取游标节点的后继节点
  • Node next = t.nextWaiter;
  • if (t.waitStatus != Node.CONDITION) {
  • // 游标节点的waitStatus不为CONDITION,表示其取消了等待
  • // 先断开与后继节点的连接
  • t.nextWaiter = null;
  • if (trail == null)
  • // 运行到这里,说明游标节点前面没有节点
  • // 因此直接将firstWaiter指向游标节点的下一个节点即可
  • firstWaiter = next;
  • else
  • // 运行到这里,表示游标节点前面有节点,需要跳过当前游标节点
  • // trail不为null时,就是指向的游标节点的前一个节点
  • trail.nextWaiter = next;
  • if (next == null)
  • // next为空表示已经遍历到队列尾了
  • lastWaiter = trail;
  • } else
  • // 游标节点waitStatus为CONDITION,因此它还在等待,记录这一次的游标节点
  • trail = t;
  • // 游标后移
  • t = next;
  • }
  • }

这个方法的实现比较简单,就是用于清理条件队列中取消等待的节点;注意,只要节点的状态不是CONDITION,就表示取消了等待,这与之前讲解的同步队列是不一样的。

await()在将当前线程添加到条件队列后,会调用fullyRelease(node)释放当前线程获取的锁,也即是将当前线程之前修改的state重新置为0,源码如下:

  • final int fullyRelease(Node node) {
  • boolean failed = true;
  • try {
  • int savedState = getState();
  • // 释放node节点包装的线程获取的锁,释放成功后,state会置为0
  • // 这里release()的参数是savedState,是为了重置所有重入操作修改state的累计值
  • if (release(savedState)) {
  • failed = false;
  • return savedState;
  • } else {
  • throw new IllegalMonitorStateException();
  • }
  • } finally {
  • // 如果失败,就直接将node节点的waitStatus置为CANCELLED,取消等待状态
  • // 这样在unlinkCancelledWaiters()方法中该节点就会被清理掉
  • if (failed)
  • node.waitStatus = Node.CANCELLED;
  • }
  • }

从源码中可知,如果当前线程在释放锁时失败,会抛出IllegalMonitorStateException异常,同时包装当前线程的节点的waitStatus会被置为CANCELLED,以便在unlinkCancelledWaiters()方法中将该节点清理掉。

在将当前线程持有的锁释放后,await()方法会进入while (!isOnSyncQueue(node)) { ... }循环进行自旋,这里的isOnSyncQueue(node)属于AQS 类,用于判断node节点是否已经转移到同步队列中了,源码如下:

  • /**
  • * Returns true if a node, always one that was initially placed on
  • * a condition queue, is now waiting to reacquire on sync queue.
  • * 用于判断节点是否被移入了同步队列
  • *
  • * @param node the node
  • * @return true if is reacquiring
  • */
  • final boolean isOnSyncQueue(Node node) {
  • if (node.waitStatus == Node.CONDITION || node.prev == null)
  • /**
  • * 需要注意的是,节点在被移入同步队列时,waitStatus会被置为0
  • * 如果node的waitStatus为CONDITION,说明该节点还在条件队列
  • * 如果node的waitStatus不为CONDITION,但它的前驱节点为空,说明它现在是等待队列的头节点
  • * 等待队列的头节点是获取了锁的节点,不处于同步队列中
  • */
  • return false;
  • if (node.next != null) // If has successor, it must be on queue
  • // 如果节点有后继节点,一定是位于同步队列
  • return true;
  • /*
  • * node.prev can be non-null, but not yet on queue because
  • * the CAS to place it on queue can fail. So we have to
  • * traverse from tail to make sure it actually made it. It
  • * will always be near the tail in calls to this method, and
  • * unless the CAS failed (which is unlikely), it will be
  • * there, so we hardly ever traverse much.
  • * 需要在同步队列中从后往前遍历查找,以判断node是否处于同步队列中
  • * 条件队列是单向链表,节点prev不会有值;而同步队列是双向链表,节点的prev是有值的
  • * 因此是否可以通过判断prev是否为null来判断节点是不是在同步队列?
  • * 不能!由于AQS中在节点入队时,首先设置node.prev为tail,
  • * 然后以CAS方式将自己设置为新的tail,但是这个CAS操作可能是失败的,此时节点并没有成功入队
  • */
  • return findNodeFromTail(node);
  • }

isOnSyncQueue(Node)方法的最后,使用AQS类的findNodeFromTail(node)在等待队列中从后往前遍历查找是否有当前节点,代码如下:

  • /**
  • * Returns true if node is on sync queue by searching backwards from tail.
  • * Called only when needed by isOnSyncQueue.
  • * 在同步队列中从尾节点开始向前遍历
  • *
  • * @return true if present
  • */
  • private boolean findNodeFromTail(Node node) {
  • Node t = tail;
  • for (; ; ) {
  • if (t == node)
  • return true;
  • if (t == null)
  • return false;
  • t = t.prev;
  • }
  • }

回到await()方法的while (!isOnSyncQueue(node)) { ... }循环,如果node节点不在同步队列中,while会执行循环体以挂起当前线程等待被唤醒。

while循环结束的条件有两个:

  1. 节点被移入同步队列,也即是对Condition实例执行了signal()signal()操作,并成功获取到了锁,此时说明线程可以解除挂起了。
  2. checkInterruptWhileWaiting(node)返回值不为0,即当前线程挂起期间发生了中断操作。

我们先考虑第一种情况,即线程被唤醒。

3. signal()操作

通过Condition实例的signal()方法可以唤醒挂起的线程,它的源码如下:

  • public final void signal() {
  • // 执行唤醒的线程必须拥有独占锁
  • if (!isHeldExclusively())
  • throw new IllegalMonitorStateException();
  • // 唤醒头节点
  • Node first = firstWaiter;
  • if (first != null)
  • // 执行doSignal()唤醒
  • doSignal(first);
  • }

doSignal()方法的源码:

  • private void doSignal(Node first) {
  • do {
  • /**
  • * 先将firstWaiter后移,如果为null表示条件队列已经没有节点了
  • * 因此也将lastWaiter置为null
  • */
  • if ((firstWaiter = first.nextWaiter) == null)
  • lastWaiter = null;
  • // 因为first指向的节点即将转移到同步队列了,因此断开它与条件队列的连接
  • first.nextWaiter = null;
  • /**
  • * while条件中,使用transferForSignal(first)将待唤醒的节点转移到同步队列
  • * 如果转移失败的话,就选择下一个节点尝试转移,直到队列为空
  • */
  • } while (!transferForSignal(first) && (first = firstWaiter) != null);
  • }

doSignal(Node)方法表明,尝试唤醒的节点是从条件队列头开始选取的,如果唤醒失败,会往后依次选择节点尝试,直到遍历到条件队列末尾。

AQS类的transferForSignal(Node)方法的源码如下:

  • final boolean transferForSignal(Node node) {
  • /*
  • * If cannot change waitStatus, the node has been cancelled.
  • * 尝试CAS方式修改node节点的waitStatus为0,
  • * 如果修改失败,说明node节点的waitStatus不为CONDITION,
  • * 将直接返回false,表示唤醒失败
  • */
  • if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  • return false;
  • /*
  • * Splice onto queue and try to set waitStatus of predecessor to
  • * indicate that thread is (probably) waiting. If cancelled or
  • * attempt to set waitStatus fails, wake up to resync (in which
  • * case the waitStatus can be transiently and harmlessly wrong).
  • * 调用enq(node)进行自旋方式入队操作,将节点添加到同步队列
  • * 这个方法返回的p是添加成功后node节点在同步队列中前驱节点
  • */
  • Node p = enq(node);
  • int ws = p.waitStatus;
  • /**
  • * 如果p节点的waitStatus大于0,表示node的前驱节点已经取消了排队,直接唤醒node节点上的线程;
  • * 如果p节点的waitStatus小于或等于0,尝试修改p节点的waitStatus为SIGNAL,这是同步队列入队的常规操作,之前有过介绍
  • * 如果修改失败,也直接唤醒node节点上的线程
  • */
  • if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  • LockSupport.unpark(node.thread);
  • return true;
  • }

当待唤醒的线程所在的节点被转移到同步队列,且顺利将该节点的前驱节点的waitStatus修改为SIGNAL后,就会等待同步队列的排队调度,直到自己包装的线程被唤醒。

这里简单地对signalAll()做一下补充解释,其实signalAll()的原理与signal()一样,只不过它对条件队列所有的节点都进行了唤醒,相关源码如下:

  • public final void signalAll() {
  • if (!isHeldExclusively())
  • throw new IllegalMonitorStateException();
  • Node first = firstWaiter;
  • if (first != null)
  • // 唤醒条件队列所有的节点
  • doSignalAll(first);
  • }
  • private void doSignalAll(Node first) {
  • lastWaiter = firstWaiter = null;
  • // 遍历整个条件队列,对每个节点都执行transferForSignal()操作进行唤醒
  • do {
  • Node next = first.nextWaiter;
  • first.nextWaiter = null;
  • transferForSignal(first);
  • first = next;
  • } while (first != null);
  • }

4. 线程唤醒后

回到await()方法中,我们知道当线程调用Condition实例的await()方法,会被挂起在await()中的while循环中,回顾这部分的代码:

  • public final void await() throws InterruptedException {
  • ...
  • while (!isOnSyncQueue(node)) {
  • // 挂起当前线程
  • LockSupport.park(this);
  • /**
  • * 需要注意这里,checkInterruptWhileWaiting(node)返回值为true时也会结束循环
  • */
  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  • break;
  • }
  • ...
  • }

从源码可知,当线程被唤醒后会继续执行while循环体剩下的代码。

4.1. 中断检查

唤醒后线程所做的第一件事,是对中断的检查;这里我们需要明白为什么在LockSupport.park(this)挂起线程后面会紧跟着检查线程中断状态的操作。在这里使用LockSupport的park()方法挂起的线程,会有以下的几种途径被唤醒:

  1. 正常途径;使用signal()signalAll()进行唤醒,此时线程所在的节点会被转移到同步队列,当该节点获取了锁,就会被LockSupport的unpark()方法唤醒。
  2. 转移操作中的特殊情况;在上面介绍过,使用signal()signalAll()进行唤醒时,如果线程所在的节点转移到同步队列后,发现其前驱节点取消了排队,或者尝试CAS方式修改其前驱节点的waitStatus为SIGNAL失败时,会直接使用LockSupport的unpark()方法唤醒。
  3. 线程中断;当线程被LockSupport的park()方法挂起期间,如果对该线程执行了中断操作(可能是在另一个线程中对该线程执行的中断操作),该线程也会被唤醒。
  4. 假唤醒;类似于Object.wait(),也存在假唤醒问题。

因此,在前面的while循环中,线程被唤醒的第一件事,是调用checkInterruptWhileWaiting(Node)方法检查异常状态。checkInterruptWhileWaiting(Node)方法属于ConditionObject,它的源码如下:

  • private int checkInterruptWhileWaiting(Node node) {
  • return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
  • }

Thread.interrupted()用于获取当前线程的中断标识状态,并且重置中断标识为false,如果该方法返回为true,说明当前线程发生过中断操作;因此,如果当前线程没有发生中断操作,checkInterruptWhileWaiting(node)会一直返回0。如果当前线程发生过中断操作,checkInterruptWhileWaiting(Node)方法会调用AQS的transferAfterCancelledWait(node)方法来判断中断操作是发生在signal唤醒之前还是之后:

  • final boolean transferAfterCancelledWait(Node node) {
  • /**
  • * 尝试CAS方式修改node的waitStatus为0,
  • * 如果修改成功,表示是在signal操作之前发生的中断,
  • * 这是因为,条件队列中的节点的waitStatus都为CONDITION,
  • * 而同步队列中的节点的waitStatus不会为CONDITION,只会是0、SIGNAL、CANCELLED或PROPAGATE
  • */
  • if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  • // 如果是在signal操作之前发生的中断,则还需要将节点转移到同步队列
  • enq(node);
  • return true;
  • }
  • /*
  • * If we lost out to a signal(), then we can't proceed
  • * until it finishes its enq(). Cancelling during an
  • * incomplete transfer is both rare and transient, so just
  • * spin.
  • * 否则是在signal操作之后发生的中断,此时判断节点是否在同步队列中
  • * 如果不在,说明此时节点虽然被signal尝试唤醒了,修改了waitStatus为0,但enq(node)操作还没有完成
  • * 即节点还没有顺利转移到同步队列,此时使当前线程(即被唤醒的线程)执行让步操作
  • */
  • while (!isOnSyncQueue(node))
  • Thread.yield();
  • return false;
  • }

由于在之前介绍的AQS类的transferForSignal(Node)方法中,在准备转移节点到同步队列之前,会首先CAS方式修改节点的waitStatus为0,然后进行enq(node)将节点添加到同步队列中;因此可以通过尝试CAS修改节点的waitStatus值从CONDITION为0,如果修改成功,说明中断操作是在signal操作执行之后,enq(node)入队操作之前执行发生的(添加到同步队列的节点的waitStatus值都被置为了0),因此需要再次调用enq(node)进行入队。否则判断节点是否已经存在于同步队列,如果不在,说明虽然节点的状态已被修改为了0,但enq(node)入队操作还未完成,因此执行让步操作尝试让节点顺利入队。

同时,根据transferAfterCancelledWait(node)方法,这里讲解一下interruptMode的取值:

  • THROW_IE:表示在await()返回的时候,需要抛出InterruptedException异常。在signal操作之前发生中断时,会被设置为这个值。
  • REINTERRUPT:表示在await()返回的时候,需要重新设置中断状态。在signal操作之后发生中断时,会被设置为这个值。
  • 0 :表示在await()等待期间没有发生中断。

另外,从上面的源码我们还可以得知,即使线程发送了中断操作,最后也会被转移到同步队列中等待唤醒。

4.2. 获取锁

当线程被唤醒且线程所在的节点正处于同步队列中时,await()方法就会结束while循环的自旋,开始执行接下来的代码。需要注意的是,走到这一步代表线程所在的节点一定是处于同步队列中了,否则不会跳出while循环(检查中断状态的操作中也保证了节点的入队)。我们关注await()方法剩余的代码:

  • public final void await() throws InterruptedException {
  • ...
  • /**
  • * 代码运行到这里,说明节点已经在同步队列中了,且被唤醒了
  • * 因此可以尝试获取一下锁
  • * 当acquireQueued(node, savedState)返回true时,表示线程已经获取到锁了
  • * 且线程在等待期间发生了中断
  • * 此时会判断interruptMode是否为THROW_IE,如果不是,将interruptMode置为REINTERRUPT
  • */
  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  • interruptMode = REINTERRUPT;
  • if (node.nextWaiter != null) // clean up if cancelled
  • // 清理取消等待的线程节点,后面会详细分析
  • unlinkCancelledWaiters();
  • ...
  • }

while循环结束后,会首先调用acquireQueued(node, savedState)进行线程尝试获取锁操作,这个方法在前一篇文章中已经详细介绍过了,属于同步队列中的抢锁操作。传入的savedState即是线程在之前释放锁时调用release()传入的值,在这里重新传入是为了保证线程在进入等待和被唤醒前后所持有的重入次数保持一致。acquireQueued(node, savedState)方法如果成功返回,当它的返回值为true时代表线程在等待期间发生过中断,会判断interruptMode != THROW_IE,如果该条件成立,说明在signal操作之前就发生中断,就将interruptMode置为REINTERRUPT,用于待会重新中断。

对于if (node.nextWaiter != null)分支的判断,其实是为了处理某些特殊情况。在上面分析的doSignal()方法中,signal操作发生后,会执行first.nextWaiter = null断开待唤醒节点与条件队列的连接,然后将待唤醒节点转移到同步队列。但如果在signal操作之前发生了中断,虽然节点也会被转移到同步队列,但此时并没有执行将该节点的后继节点置为null(可以查看上面的transferAfterCancelledWait(Node)方法),也就是说此时该节点可能并没有与条件队列断开连接,因此就会执行unlinkCancelledWaiters()方法尝试清理掉这个处于特殊情况的节点。

4.3. 处理中断状态

await()方法最后一个if分支,会在interruptMode不为0时,处理产生的异常中断:

  • public final void await() throws InterruptedException {
  • ...
  • if (interruptMode != 0)
  • // 如果interruptMode不为0,需要处理产生的中断异常
  • reportInterruptAfterWait(interruptMode);
  • }

其中reportInterruptAfterWait()方法的源码如下,代码比较简单就不再赘述:

  • private void reportInterruptAfterWait(int interruptMode)
  • throws InterruptedException {
  • if (interruptMode == THROW_IE)
  • throw new InterruptedException();
  • else if (interruptMode == REINTERRUPT)
  • selfInterrupt();
  • }

5. await的超时机制

Condition还提供了超时机制的await()方法,有以下几个:

  • public interface Condition {
  • ...
  • long awaitNanos(long nanosTimeout) throws InterruptedException;
  • boolean await(long time, TimeUnit unit) throws InterruptedException;
  • boolean awaitUntil(Date deadline) throws InterruptedException;
  • ...
  • }

它们的实现都类似,这里给出ConditionObject类中的实现源码及注释:

  • public final long awaitNanos(long nanosTimeout)
  • throws InterruptedException {
  • // 处理中断异常
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • Node node = addConditionWaiter();
  • // 释放锁,返回值是拥有锁时的state的值
  • int savedState = fullyRelease(node);
  • // 记录当前系统纳秒时间
  • long lastTime = System.nanoTime();
  • int interruptMode = 0;
  • /**
  • * 当节点不在同步队列中时会不断循环
  • * 如果isOnSyncQueue返回true,表示节点已从条件队列转移到同步队列中了
  • */
  • while (!isOnSyncQueue(node)) {
  • // 判断是否已经超过了超时时间
  • if (nanosTimeout <= 0L) {
  • // 如果超时时间限制小于或等于0,直接取消等待,保证其转移到同步队列
  • transferAfterCancelledWait(node);
  • break;
  • }
  • // 使用LockSupport进行线程挂起
  • LockSupport.parkNanos(this, nanosTimeout);
  • // 中断检查
  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  • break;
  • // 当前系统纳秒时间
  • long now = System.nanoTime();
  • // 更新超时时间,减去已经消耗的时间
  • nanosTimeout -= now - lastTime;
  • // 更新系统纳秒时间的记录
  • lastTime = now;
  • }
  • /**
  • * 代码运行到这里,说明节点被唤醒了
  • * 当acquireQueued(node, savedState)返回true时,表示线程已经获取到锁了
  • * 且线程在等待期间发生了中断
  • * 此时会判断interruptMode是否为THROW_IE,如果不是,将interruptMode置为REINTERRUPT
  • */
  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  • interruptMode = REINTERRUPT;
  • if (node.nextWaiter != null)
  • // 清理取消等待的线程节点
  • unlinkCancelledWaiters();
  • if (interruptMode != 0)
  • // 如果interruptMode不为0,需要处理产生的中断异常
  • reportInterruptAfterWait(interruptMode);
  • // 返回的值是未等待的时间长度
  • return nanosTimeout - (System.nanoTime() - lastTime);
  • }
  • public final boolean await(long time, TimeUnit unit)
  • throws InterruptedException {
  • // 检查deadline参数
  • if (unit == null)
  • throw new NullPointerException();
  • // 转换超时时间为纳秒
  • long nanosTimeout = unit.toNanos(time);
  • // 处理中断异常
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • // 将当前线程添加到条件队列中,返回的添加的节点
  • Node node = addConditionWaiter();
  • // 释放锁,返回值是拥有锁时的state的值
  • int savedState = fullyRelease(node);
  • // 记录当前系统纳秒时间
  • long lastTime = System.nanoTime();
  • // 记录是否超时
  • boolean timedout = false;
  • int interruptMode = 0;
  • /**
  • * 当节点不在同步队列中时会不断循环
  • * 如果isOnSyncQueue返回true,表示节点已从条件队列转移到同步队列中了
  • */
  • while (!isOnSyncQueue(node)) {
  • // 判断是否已经超过了超时时间
  • if (nanosTimeout <= 0L) {
  • /**
  • * 即将取消等待
  • * 如果返回true,表明此时signal没有发生,存在超时,且在transferAfterCancelledWait(node)内已成功将节点转移到同步队列
  • * 如果返回false,表明signal操作发生了,节点已被转移到同步队列,即没有超时
  • */
  • timedout = transferAfterCancelledWait(node);
  • break;
  • }
  • /**
  • * spinForTimeoutThreshold的值为1000,即1000纳秒
  • * 如果剩余超时时间不足1毫秒,直接自旋而非使用parkNanos,以提高性能
  • */
  • if (nanosTimeout >= spinForTimeoutThreshold)
  • LockSupport.parkNanos(this, nanosTimeout);
  • // 中断检查
  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  • break;
  • // 当前系统纳秒时间
  • long now = System.nanoTime();
  • // 更新超时时间,减去已经消耗的时间
  • nanosTimeout -= now - lastTime;
  • // 更新系统纳秒时间的记录
  • lastTime = now;
  • }
  • /**
  • * 代码运行到这里,说明节点被唤醒了
  • * 当acquireQueued(node, savedState)返回true时,表示线程已经获取到锁了
  • * 且线程在等待期间发生了中断
  • * 此时会判断interruptMode是否为THROW_IE,如果不是,将interruptMode置为REINTERRUPT
  • */
  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  • interruptMode = REINTERRUPT;
  • // 清理取消等待的线程节点
  • if (node.nextWaiter != null)
  • unlinkCancelledWaiters();
  • if (interruptMode != 0)
  • // 如果interruptMode不为0,需要处理产生的中断异常
  • reportInterruptAfterWait(interruptMode);
  • // 返回值表示是否超时
  • return !timedout;
  • }
  • public final boolean awaitUntil(Date deadline)
  • throws InterruptedException {
  • // 检查deadline参数
  • if (deadline == null)
  • throw new NullPointerException();
  • // 转换deadline为时间戳
  • long abstime = deadline.getTime();
  • // 处理中断异常
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • // 将当前线程添加到条件队列中,返回的添加的节点
  • Node node = addConditionWaiter();
  • // 释放锁,返回值是拥有锁时的state的值
  • int savedState = fullyRelease(node);
  • // 记录是否超时
  • boolean timedout = false;
  • int interruptMode = 0;
  • /**
  • * 当节点不在同步队列中时会不断循环
  • * 如果isOnSyncQueue返回true,表示节点已从条件队列转移到同步队列中了
  • */
  • while (!isOnSyncQueue(node)) {
  • // 判断当前系统时间是否大于超时时间
  • if (System.currentTimeMillis() > abstime) {
  • /**
  • * 即将取消等待
  • * 如果返回true,表明此时signal没有发生,存在超时,且在transferAfterCancelledWait(node)内已成功将节点转移到同步队列
  • * 如果返回false,表明signal操作发生了,节点已被转移到同步队列,即没有超时
  • */
  • timedout = transferAfterCancelledWait(node);
  • break;
  • }
  • // 挂起线程
  • LockSupport.parkUntil(this, abstime);
  • // 中断检查
  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  • break;
  • }
  • /**
  • * 代码运行到这里,说明节点被唤醒了
  • * 当acquireQueued(node, savedState)返回true时,表示线程已经获取到锁了
  • * 且线程在等待期间发生了中断
  • * 此时会判断interruptMode是否为THROW_IE,如果不是,将interruptMode置为REINTERRUPT
  • */
  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  • interruptMode = REINTERRUPT;
  • if (node.nextWaiter != null)
  • // 清理取消等待的线程节点
  • unlinkCancelledWaiters();
  • if (interruptMode != 0)
  • // 如果interruptMode不为0,需要处理产生的中断异常
  • reportInterruptAfterWait(interruptMode);
  • // 返回值表示是否超时
  • return !timedout;
  • }

6. awaitUninterruptibly()

Condition提供了awaitUninterruptibly(),也可以用于使线程进入等待,与await()不同的是,该方法不会抛出InterruptedException异常;它的源码如下:

  • public final void awaitUninterruptibly() {
  • Node node = addConditionWaiter();
  • int savedState = fullyRelease(node);
  • boolean interrupted = false;
  • while (!isOnSyncQueue(node)) {
  • LockSupport.park(this);
  • if (Thread.interrupted())
  • interrupted = true;
  • }
  • if (acquireQueued(node, savedState) || interrupted)
  • selfInterrupt();
  • }

7. AQS的取消排队

在上一篇讲解AQS文章中,当节点添加到同步队列后,会调用acquireQueued(Node, int)方法使节点包装的线程进入自旋和挂起状态,回顾一下源码:

  • /**
  • * 在独占模式下,挂起刚刚被添加到等待队列的节点
  • *
  • * @param node 添加到等待队列的节点
  • * @param arg acquire操作的数值
  • * @return 如果在等待期间线程发生了中断,返回值将为true,否则为false
  • */
  • final boolean acquireQueued(final Node node, int arg) {
  • boolean failed = true;
  • try {
  • // 记录当前线程在自旋等待期间是否存在被中断操作
  • boolean interrupted = false;
  • // 这里会让节点进入自旋状态
  • for (; ; ) {
  • /**
  • * 获取自己的前驱节点
  • * 如果前驱节点为head节点,就可以尝试获取锁
  • * 由于所有等待的节点都会调用该方法进入自旋
  • * 而只有前驱节点为head节点的等待节点才能尝试获取锁
  • * 这样就能保证永远是队首的节点获取锁
  • */
  • final Node p = node.predecessor();
  • if (p == head && tryAcquire(arg)) {
  • // 获取锁成功
  • setHead(node);
  • p.next = null; // help GC
  • failed = false;
  • return interrupted;
  • }
  • // 执行到此处,说明上面尝试获取锁失败了,因此可以尝试将当前线程挂起
  • if (shouldParkAfterFailedAcquire(p, node) &&
  • parkAndCheckInterrupt())
  • interrupted = true;
  • }
  • } finally {
  • if (failed)
  • cancelAcquire(node);
  • }
  • }

这个方法已经详细分析过了,但有一处没有详细说,即try ... finally块中finally块里,当failed为true时,会调用cancelAcquire(node)的情况。但其实在acquireQueued(Node, int)方法中,并不会出现异常抛出的情况,其中tryAcquire(arg)shouldParkAfterFailedAcquire(p, node)的实现都是不会抛出异常的,而parkAndCheckInterrupt()也仅仅是返回Thread.interrupted()的值,并没有抛出异常。也就是说,在acquireQueued(Node, int)的实现里,是不会因为线程发生中断而影响到线程挂起、获取锁等操作的,它只会在线程发生中断操作后进行记录,然后将记录的结果返回给上一层调用自己的方法处理,如:

  • public final void acquire(int arg) {
  • if (!tryAcquire(arg) &&
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • selfInterrupt();
  • }

其实,在acquireQueued(Node, int)方法里,finally块里的内容是永远不会执行的,真正会发生线程调用cancelAcquire(node)取消等待操作的情况,是存在于doAcquireInterruptibly(int arg)方法中的。默认情况下,ReentrantLock的lock()方法不会有异常抛出,但它还提供了另一个lockInterruptibly()也可以用于获取锁操作,该方法会抛出InterruptedException异常:

  • public void lockInterruptibly() throws InterruptedException {
  • sync.acquireInterruptibly(1);
  • }

我们顺着这个方法的源码往后面查看:

  • // AQS的方法
  • public final void acquireInterruptibly(int arg)
  • throws InterruptedException {
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • if (!tryAcquire(arg))
  • doAcquireInterruptibly(arg);
  • }
  • // AQS的方法
  • private void doAcquireInterruptibly(int arg) throws InterruptedException {
  • final Node node = addWaiter(Node.EXCLUSIVE);
  • boolean failed = true;
  • try {
  • for (; ; ) {
  • final Node p = node.predecessor();
  • if (p == head && tryAcquire(arg)) {
  • setHead(node);
  • p.next = null; // help GC
  • failed = false;
  • return;
  • }
  • if (shouldParkAfterFailedAcquire(p, node) &&
  • parkAndCheckInterrupt())
  • // 这里会抛出InterruptedException异常
  • throw new InterruptedException();
  • }
  • } finally {
  • if (failed)
  • cancelAcquire(node);
  • }
  • }

doAcquireInterruptibly(int)的实现与acquireQueued(Node, int)方法基本相同,唯一不同的地方在于,doAcquireInterruptibly(int)if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())成立的分支中,抛出了InterruptedException异常,此时一旦异常抛出,虽然没有进行捕获,但finally块是会执行的,且此时failed值为true,因此就会执行cancelAcquire(node),查看cancelAcquire(Node)方法的源码:

  • private void cancelAcquire(Node node) {
  • // 如果node节点为空,直接放弃
  • if (node == null)
  • return;
  • // 将node所包装的线程置为null
  • node.thread = null;
  • // 跳过取消的节点
  • Node pred = node.prev;
  • while (pred.waitStatus > 0)
  • node.prev = pred = pred.prev;
  • // predNext is the apparent node to unsplice. CASes below will
  • // fail if not, in which case, we lost race vs another cancel
  • // or signal, so no further action is necessary.
  • Node predNext = pred.next;
  • // Can use unconditional write instead of CAS here.
  • // After this atomic step, other Nodes can skip past us.
  • // Before, we are free of interference from other threads.
  • // 将node的waitStatus设置为CANCELLED,代表节点取消排队
  • node.waitStatus = Node.CANCELLED;
  • // If we are the tail, remove ourselves.
  • // 如果node节点时尾节点,将node自己移除
  • if (node == tail && compareAndSetTail(node, pred)) {
  • // 修改成功后将前驱节点指向旧的尾节点的连接断掉
  • compareAndSetNext(pred, predNext, null);
  • } else {
  • // If successor needs signal, try to set pred's next-link
  • // so it will get one. Otherwise wake it up to propagate.
  • // 如果node的后继节点需要被唤醒,则需要将node的前驱节点的后继连接到这个后继节点
  • int ws;
  • if (pred != head &&
  • ((ws = pred.waitStatus) == Node.SIGNAL ||
  • (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  • pred.thread != null) {
  • // node的后继节点
  • Node next = node.next;
  • if (next != null && next.waitStatus <= 0)
  • // 将node的前驱节点的后继连接到这个后继节点
  • compareAndSetNext(pred, predNext, next);
  • } else {
  • // 唤醒node节点
  • unparkSuccessor(node);
  • }
  • node.next = node; // help GC
  • }
  • }

cancelAcquire(Node)方法的作用,其实就是将取消等待的节点从同步队列中移除,并且唤醒该节点包装的线程。从上面分析也可以得出一个结论:如果我们要取消一个线程的排队,就需要在另外一个线程中对其进行中断;前提是使用了会抛出异常的抢锁方法。

至此,其实AQS相关的独占模式相关内容已经讲解完了,也即是,对于Node类的waitStatus变量,其中0(初始状态)、SIGNAL(值为-1)、CONDITION(值为-2)和CANCELLED(值为1)这几种值涉及的情况都讲解到了,还剩余PROPAGATE(值为-3)的情况没有讲解,这种值出现在共享模式情况下,会在下一篇文章中讲解。