Java多线程
Java并发
JUC锁

Java多线程 20 - AbstractQueuedSynchronizer详解(1)

简介:AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。

1. AQS的基础设计

注:在下面的内容中,统一将AbstractQueuedSynchronizer简称为AQS。

AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。

通过这种方式,AQS的实现类可以根据自身需求来完成共享模式或独占模式的同步功能;如实现独占模式的同步功能,只需要重写tryAcquire()方法和tryRelease()方法即可;而实现共享模式的同步功能,只需要重写tryAcquireShared()方法和tryReleaseShared()方法。无论是共享模式还是独占模式,基础实现都是基于AQS,不同的是对用于同步状态逻辑控制的模板方法的具体实现。AQS提供给独占模式和共享模式的模板方法如下:

  • // 独占模式下获取同步状态的方法
  • protected boolean tryAcquire(int arg) {
  • throw new UnsupportedOperationException();
  • }
  • // 独占模式下释放同步状态的方法
  • protected boolean tryRelease(int arg) {
  • throw new UnsupportedOperationException();
  • }
  • // 共享模式下获取同步状态的方法
  • protected int tryAcquireShared(int arg) {
  • throw new UnsupportedOperationException();
  • }
  • // 共享模式下释放同步状态的方法
  • protected boolean tryReleaseShared(int arg) {
  • throw new UnsupportedOperationException();
  • }
  • // 判断是否持有独占同步状态
  • protected boolean isHeldExclusively() {
  • throw new UnsupportedOperationException();
  • }

同时AQS内部还提供了一个int类型的state成员变量,用于控制同步状态;state变量由volatile内存语义修饰,同时compareAndSetState()方法还实现了对state变量的CAS原子性修改操作,源码如下:

  • // 同步器状态标识,存在volatile内存语义
  • private volatile int state;
  • // 获取当前同步器状态标识
  • protected final int getState() {
  • return state;
  • }
  • // 设置同步器状态标识
  • protected final void setState(int newState) {
  • state = newState;
  • }
  • // 原子性操作,当state的值为expect,将其更新为update,更新成功返回true
  • protected final boolean compareAndSetState(int expect, int update) {
  • return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  • }

state成员变量是供具体实现类来使用的,不同的模式具有不同的使用方法和目的。

上面提到的这些内容,读者只需要在脑海中有一个印象即可,在后面的讲解中会对这些内容的使用作详细解释。

2. 等待队列模型

在正式讲解AQS类之前,我们还需要了解在AQS中存在的重要的数据结构。

AQS类的内部维护了一个等待队列来存放所有的未获取到同步状态的线程,该队列实际上是以双向链表的数据结构实现的,链表节点的类型是AQS类的内部类Node,我们首先关注Node类的实现,这也是深入理解AQS类实现的首要条件。

注:这里的同步状态,如果不好理解,可以先将其等视为“锁”。

其实Node类的实现非常简单,它的源代码如下:

  • // 等待队列节点类
  • static final class Node {
  • /*********** 以下属性用于标识队列的类型:共享或者独占 ***********/
  • // 表明是共享模式节点
  • static final Node SHARED = new Node();
  • // 表明是独占(排他)模式节点
  • static final Node EXCLUSIVE = null;
  • /*********** 以下属性用于节点的等待状态,也即是成员变量waitStatus的值 ***********/
  • // 标识当前Node对应的线程已处于结束状态,即线程已经取消了获取同步状态的请求
  • static final int CANCELLED = 1;
  • // 该值用于标识当前节点的后继节点对应的线程需要被唤醒
  • static final int SIGNAL = -1;
  • // 标识条件状态;后面会详细介绍,可以先不关注
  • static final int CONDITION = -2;
  • // 在共享模式中使用时表示获得的同步状态会被传播;后面会详细介绍,可以先不关注
  • static final int PROPAGATE = -3;
  • /*********** 等待状态,可能为CANCELLED、SIGNAL、CONDITION、PROPAGATE ***********/
  • volatile int waitStatus;
  • /*********** 维护双向链表的前后指针 ***********/
  • // 指向前驱节点
  • volatile Node prev;
  • // 指向后继节点
  • volatile Node next;
  • /*********** 存放尝试获取同步状态的线程 ***********/
  • volatile Thread thread;
  • // 等待队列的后继节点,与Condition有关;后面会详细介绍,可以先不关注
  • Node nextWaiter;
  • // 是否为共享模式
  • final boolean isShared() {
  • return nextWaiter == SHARED;
  • }
  • // 获取前驱节点
  • final Node predecessor() throws NullPointerException {
  • Node p = prev;
  • if (p == null)
  • throw new NullPointerException();
  • else
  • return p;
  • }
  • // 无参构造
  • Node() {
  • }
  • // 有参构造
  • Node(Thread thread, Node mode) {
  • this.nextWaiter = mode;
  • this.thread = thread;
  • }
  • // 有参构造
  • Node(Thread thread, int waitStatus) {
  • this.waitStatus = waitStatus;
  • this.thread = thread;
  • }
  • }

每个Node类的实例代表了等待队列中一个节点,而每个节点中包装了一个线程(即thread成员变量)。AQS类设计为可以让多个线程同时并发地请求同步状态,因此它需要处理在竞争情况下没有获取到同步状态的线程,而处理方式正是使用Node实例来包装这些线程,然后装入等待队列,等到合适的时机再将其唤醒,至于何时才算时机合适,在后面会详细介绍。

AQS类的某些成员变量,也是为了实现等待队列而存在的:

  • // 指向队列头节点
  • private transient volatile Node head;
  • // 指向队列尾节点
  • private transient volatile Node tail;

从变量名就可以得知,head将指向等待队列的头节点,而tail则会指向等待队列的尾节点。有了上面的理解,我们就可以推断出等待队列的大致结构,示意图如下:

1.AQS等待队列模型.png

对等待队列有了一定的了解之后,下面继续分析同步状态的获取和释放过程。

3. 同步状态的竞争和获取

由于AQS设计为一个抽象类,我们需要从它的实现类来分析具体的流程实现,这里以公平模式下的独占锁ReentrantLock为例。作为日常使用次数最多的独占锁,ReentrantLock中的内部类Sync即为其使用的同步器,继承了AQS类:

  • public class ReentrantLock implements Lock, java.io.Serializable {
  • ...
  • private final Sync sync;
  • abstract static class Sync extends AbstractQueuedSynchronizer {
  • ...
  • }
  • ...
  • }

但Sync也是一个抽象类,真正的同步器是由Sync的两个子类实现的,以分别实现公平锁和非公平锁两种机制:

  • // 公平同步器
  • static final class FairSync extends Sync {
  • ...
  • }
  • // 非公平同步器
  • static final class NonfairSync extends Sync {
  • ...
  • }

默认情况下,ReentrantLock使用的是非公平同步器NonfairSync来控制同步操作,另外通过给构造方法传入true可以指定使用公平同步器:

  • public ReentrantLock() {
  • sync = new NonfairSync();
  • }
  • public ReentrantLock(boolean fair) {
  • sync = fair ? new FairSync() : new NonfairSync();
  • }

在之前使用ReentrantLock时,调用ReentrantLock实例的lock()方法可以实现上锁,其实是调用了同步器Sync的lock()方法:

  • public void lock() {
  • sync.lock();
  • }

这里我们以FairSync为研究对象:

  • static final class FairSync extends Sync {
  • private static final long serialVersionUID = -3000897897090466540L;
  • final void lock() {
  • acquire(1);
  • }
  • ...
  • }

此处我们先关注其lock()方法,发现其实绕了一圈,加锁操作的最后还是调用了父类AQS的acquire()方法,传入的参数为1;AQS类的acquire()方法源码如下:

  • public final void acquire(int arg) {
  • /**
  • * 这里的tryAcquire()会尝试获取同步状态
  • * 如果没有获取到,将会调用addWaiter()方法将当前线程包装为一个Node节点加入等待队列
  • * 然后对节点调用acquireQueued()方法使其进入自旋尝试获取同步的状态
  • * 加入成功后将中断当前线程
  • */
  • if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • selfInterrupt();
  • }

从AQS的acquire()方法源码可以看出,这里的if的判断条件存在三种操作:tryAcquire()addWaiter()acquireQueued();当tryAcquire()的返回值为false时才会执行后面两个方法。

我们先考察tryAcquire();AQS的tryAcquire()是模板方法,直接抛出UnsupportedOperationException异常,需要子类重写,而在FairSync同步器中重写的tryAcquire()方法的源码如下:

  • // AQS的tryAcquire()方法
  • protected boolean tryAcquire(int arg) {
  • throw new UnsupportedOperationException();
  • }
  • // FairSync中重写的tryAcquire()方法
  • protected final boolean tryAcquire(int acquires) {
  • // 获取当前线程
  • final Thread current = Thread.currentThread();
  • // 获取state
  • int c = getState();
  • if (c == 0) {
  • /**
  • * 此时state为0,表示没有线程持有锁。
  • * 先调用hasQueuedPredecessors()查看是否有线程已经等待了很久,
  • * 如果没有,就是用CAS操作修改state为acquires的值,
  • * 修改成功后设置独占线程为当前线程,并返回true表示获取锁成功,
  • * 如果修改失败,说明可能正好被其他的某个线程抢先修改了。
  • */
  • if (!hasQueuedPredecessors() &&
  • compareAndSetState(0, acquires)) {
  • // 设置独占线程为当前线程
  • setExclusiveOwnerThread(current);
  • return true;
  • }
  • } else if (current == getExclusiveOwnerThread()) {
  • /**
  • * state不为0,且当前线程就为拥有锁的线程,
  • * 表示是以重入的方式在获取锁,
  • * 将state修改为state + acquires,并返回true表示获取锁成功。
  • */
  • int nextc = c + acquires;
  • if (nextc < 0)
  • throw new Error("Maximum lock count exceeded");
  • setState(nextc);
  • return true;
  • }
  • /**
  • * 返回false,代表获取锁失败;能走到这里存在两种情况:
  • - 虽然是第一个尝试获取锁的线程,但修改state失败,即可能正好被其他的某个线程抢先修改了;
  • - 不是第一个尝试获取锁的线程,且也不是重入的方式获取。
  • */
  • return false;
  • }

FairSync重写的方法体其实比较简单,其中hasQueuedPredecessors()方法用于查看是否有线程已经等待了很久,这是为了实现公平性需要做的工作,后面会介绍;tryAcquire()的返回值存在三种不同的结果。首先获取了AQS提供的同步状态变量state,并判断其是否为0,如果为0表示此时还没有线程获取过同步状态,因此将尝试使用CAS操作将state更新为传入的acquires的值,如果修改成功,就使用setExclusiveOwnerThread()方法将记录独占线程的变量exclusiveOwnerThread设置为当前线程,并返回true代表获取同步状态成功;如果修改state失败则会直接返回false代表获取同步状态失败。

其中setExclusiveOwnerThread()方法来自于AQS的父类AbstractOwnableSynchronizer,相关源码如下:

  • // 拥有独占模式同步状态的线程
  • private transient Thread exclusiveOwnerThread;
  • // 设置拥有独占模式同步状态的线程
  • protected final void setExclusiveOwnerThread(Thread t) {
  • exclusiveOwnerThread = t;
  • }
  • // 获取拥有独占模式同步状态的线程
  • protected final Thread getExclusiveOwnerThread() {
  • return exclusiveOwnerThread;
  • }

state不为0时,还会判断getExclusiveOwnerThread()得到的拥有独占模式同步状态的线程是否就是当前线程;这一步的判断是为了可重入而实现的,当同一个线程多次尝试获取同步状态,理应让其顺利获取;因此在if (getExclusiveOwnerThread() == currentThread)分支中,会设置state值为state + acquires,并返回true表示获取同步状态成功。

tryAcquire()方法的返回值为true时代表获取锁成功,因此整个过程就结束了,后续的释放锁操作会在后面讲解;但当tryAcquire()方法的返回值为false时则表示获取锁失败,这时才会调用acquireQueued(addWaiter(Node.EXCLUSIVE), 1);下面先分析addWaiter(Node.EXCLUSIVE),它的源码如下:

  • /**
  • * 以当前线程及给定的模式(共享或独占)创建一个入队节点
  • *
  • * @param mode 可以是Node.EXCLUSIVE(独占模式)或Node.SHARED(共享模式)
  • * @return 新节点
  • */
  • private Node addWaiter(Node mode) {
  • // 创建新节点
  • Node node = new Node(Thread.currentThread(), mode);
  • // Try the fast path of enq; backup to full enq on failure
  • // 尝试快速在尾部添加新节点
  • Node pred = tail;
  • if (pred != null) {
  • /**
  • * 队列不为空,队中已有节点。
  • * 先将node的前驱设置为当前尾节点(即tail变量指向的节点),
  • * 然后将tail变量指向node,此时node就变为尾节点了,
  • * 如果设置成功,就将原来尾节点的后继节点设置为node,如下图:
  • * 原始:head <-> node(1) <-> ... <-> node(n-1) <-> node(n) <-> tail
  • * 1. head <-> node(1) <-> ... <-> node(n) <-> tail(pred) <- node(new)
  • * 2. head <-> node(1) <-> ... <-> node(n) <-> node(pred) <-> tail(new)
  • */
  • node.prev = pred;
  • if (compareAndSetTail(pred, node)) {
  • pred.next = node;
  • // 返回新节点
  • return node;
  • }
  • }
  • /**
  • * 快速添加尾新节点失败,
  • * 说明队列为空,或者由于有并发线程在竞争入队,导致CAS修改tail失败,
  • * 那么进入enq,以自旋方式修改。
  • */
  • enq(node);
  • return node;
  • }

addWaiter(Node.EXCLUSIVE)调用后,会以当前线程为参数使用Node类的构造方法创建一个独占模式(由于mode参数传入的是Node.EXCLUSIVE)的Node节点;然后尝试在等待队列的尾部插入该节点。

addWaiter(Node)内部在调用enq(node)之前的代码,是在等待队列不为空时尝试的快速插入尾节点的操作,但在默认初始情况下,AQS的headtail成员变量都为空,也即是表示整个等待队列为空。此时上面addWaiter()代码中会直接调用enq(node)添加节点;使用enq()方法插入尾节点和快速插入尾节点的区别在于,enq()方法会使用无限循环多次尝试,两种方式插入尾节点的具体操作其实是相同的;enq(Node)方法源码如下:

  • /**
  • * Node节点入队
  • * 节点将通过自旋的方式插入到队列尾部
  • *
  • * @param node 需要入队的节点
  • * @return 节点的前驱
  • */
  • private Node enq(final Node node) {
  • for (; ; ) {
  • Node t = tail;
  • if (t == null) { // Must initialize
  • if (compareAndSetHead(new Node()))
  • /**
  • * 当tail为空,表示此时队列中没有节点,
  • * 就将一个空节点设置为头节点,
  • * 并且将尾节点也指向头节点。
  • */
  • tail = head;
  • } else {
  • /**
  • * 否则队列中已有节点
  • * 先将node的前驱设置为尾节点
  • * 然后将尾节点设置为node
  • * 如果设置成功,就将原来尾节点的后继节点设置为node,如下图:
  • * 原始:head <-> node(1) <-> ... <-> node(n-1) <-> node(n) <-> tail
  • * 1. head <-> node(1) <-> ... <-> node(n) <-> tail(t) <- node(new)
  • * 2. head <-> node(1) <-> ... <-> node(n) <-> node(t) <-> tail(new)
  • */
  • node.prev = t;
  • if (compareAndSetTail(t, node)) {
  • t.next = node;
  • // 返回原来的尾节点,即现在尾节点的前驱节点
  • return t;
  • }
  • }
  • }
  • }

enq()方法内部的for无限循环只有在存在返回值的时候才会结束。因此当由于等待队列为空的原因而进入了enq()方法后,先会执行if (t == null)为true的分支,即CAS方式设置等待队列头节点指针变量head为一个空节点,然后将tail也指向该节点。需要注意的是,由于Node的默认构造方法创建的节点的waitStatus变量值为0,因此此时head头节点的waitStatus即为0。

当这一次循环结束,由于此时headtail都不为空了,下次for循环就会进入else分支,先将传入的新节点的前驱指针指向当前tail指向的节点,然后以CAS方式修改tail指向新节点,将原来tail指向的节点的后继指针指向新节点,最后返回原来的尾节点。如果修改不成功,将在下次循环继续修改,自旋直到修改成功后返回。

对于enq()对节点操作的过程,有以下示意图:

2.enq()方法入队操作.png

enq()方法结束之后,回到addWaiter()中,addWaiter()会将刚刚添加到等待队列的节点返回然后结束,此时就到了acquireQueued(addWaiter(Node.EXCLUSIVE), 1)的过程了,acquireQueued()方法的源码如下:

  • /**
  • * 在独占模式下,挂起刚刚被添加到等待队列的节点
  • *
  • * @param node 添加到等待队列的节点
  • * @param arg acquire操作的数值
  • * @return 如果在等待期间线程发生了中断,返回值将为true,否则为false
  • */
  • final boolean acquireQueued(final Node node, int arg) {
  • boolean failed = true;
  • try {
  • // 记录当前线程在自旋等待期间是否存在被中断操作
  • boolean interrupted = false;
  • // 这里会让节点进入自旋状态
  • for (; ; ) {
  • /**
  • * 获取自己的前驱节点
  • * 如果前驱节点为head节点,就可以尝试获取锁
  • * 由于所有等待的节点都会调用该方法进入自旋
  • * 而只有前驱节点为head节点的等待节点才能尝试获取锁
  • * 这样就能保证永远是队首的节点获取锁
  • */
  • final Node p = node.predecessor();
  • if (p == head && tryAcquire(arg)) {
  • // 获取锁成功,更新头节点为当前节点
  • setHead(node);
  • p.next = null; // help GC
  • failed = false;
  • return interrupted;
  • }
  • // 执行到此处,说明上面尝试获取锁失败了,因此可以尝试将当前线程挂起
  • if (shouldParkAfterFailedAcquire(p, node) &&
  • parkAndCheckInterrupt())
  • interrupted = true;
  • }
  • } finally {
  • if (failed)
  • cancelAcquire(node);
  • }
  • }

acquireQueued()方法的主要作用是用于挂起线程,但在挂起线程之前,它额外做了两件事情。首先在挂起之前,如果当前节点的前驱节点是头节点,acquireQueued()方法会调用tryAcquire()重新尝试获取锁,这是因为,如果一个节点的前驱节点是头节点,说明它就是接下来将要被唤醒的节点,它所代表的线程自然可以尝试获取锁了,这一步其实是为了之后的唤醒操作而准备的;这里解释一个例外的情况,即是当前节点的前驱节点是头节点,但是它所代表的线程尝试获取锁却失败了,这可能是由于在非公平模式下,有其他线程抢先一步竞争得到了锁。

如果当前线程所代表的线程获取锁失败了,在acquireQueued()方法后面的代码中会根据情况挂起线程,一旦某个线程挂起,该线程就会阻塞在acquireQueued()方法内直到被唤醒;因此acquireQueued()将方法体的大部分代码都放置于无限循环体内,这样某个线程被唤醒后就会重复执行循环体前面获取锁的动作,获取成功就继续执行,获取失败会被再次挂起。

acquireQueued()方法在挂起操作前额外做的第二件事情,是检查前驱节点的waitStatus状态,这部分代码由shouldParkAfterFailedAcquire()方法体现,代码如下:

  • /**
  • * Checks and updates status for a node that failed to acquire.
  • * Returns true if thread should block. This is the main signal
  • * control in all acquire loops. Requires that pred == node.prev
  • *
  • * @param pred 当前节点的前驱节点
  • * @param node 当前节点
  • * @return 当线程需要阻塞时返回true
  • */
  • private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  • // 获取前驱节点的waitStatus
  • int ws = pred.waitStatus;
  • if (ws == Node.SIGNAL) // SINGAL为-1
  • /*
  • * This node has already set status asking a release to signal it, so it can safely park.
  • * 如果前驱节点的waitStatus为SIGNAL,表示当前节点已经设置了要求释放锁的线程将其唤醒的状态,可以安全阻塞
  • * 上面的解释非常绕,用通俗的话来讲,当一个节点的前驱节点的waitStatus为SIGNAL时,前驱节点在释放锁时会将其唤醒
  • */
  • return true;
  • if (ws > 0) {
  • /*
  • * Predecessor was cancelled. Skip over predecessors and indicate retry.
  • * 如果前驱节点的waitStatus大于0,表示前驱节点已经取消了排队等待,
  • * 尝试向前查找一个waitStatus不大于0的节点作为当前节点的前驱节点,并把该节点的后继节点设置为当前节点,
  • * 上面向前查找的过程即do ... while循环的作用,
  • * 这个不断向前查找的过程,可以将当前节点前面所有取消了排队的节点全部剔除略过,减小链表的长度
  • */
  • do {
  • node.prev = pred = pred.prev;
  • } while (pred.waitStatus > 0);
  • pred.next = node;
  • } else {
  • /*
  • * 当执行到这里,前驱节点的waitStatus只会为0,CONDITION(-2)、PROPAGATE(-3),
  • * 此时则尝试将前驱节点的waitStatus状态设置为SIGNAL,
  • * 做这一步操作,主要是用于在后面某个线程释放锁时,
  • * 如果该线程的waitStatus状态为SIGNAL,且其为等待队列的头节点,就会尝试唤醒其后继节点。
  • * waitStatus must be 0 or PROPAGATE. Indicate that we
  • * need a signal, but don't park yet. Caller will need to
  • * retry to make sure it cannot acquire before parking.
  • */
  • compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  • }
  • return false;
  • }

shouldParkAfterFailedAcquire()方法中,分为三种情况:

  1. 只有当前节点的前驱节点的waitStatus状态值为SINGAL(值为-1)的情况下才会返回true,此时当前节点包装的线程才会执行挂起操作,否则acquireQueued()内会不断循环调用shouldParkAfterFailedAcquire()方法进行尝试,直到返回true;
  2. 当前驱节点waitStatus状态值大于0时,表示此时前驱节点已经取消了排队,因此需要为当前节点往前寻找一个waitStatus状态正常的节点作前驱节点(这一步在后面唤醒会有用处,此时若无法理解可以不必关注);
  3. 如果当前节点的前驱节点的waitStatus状态值为CONDITION(值为-2)或PROPAGATE(值为-3),则使用CAS方式修改前驱节点的waitStatus状态值为SIGNAL

上面这些操作的作用,可以保证在挂起线程之前,该线程所在节点的前驱节点的状态一定是SIGNAL,这其实是保证被挂起的线程能在未来正确的被唤醒,因为在公平模式的独占锁中,被唤醒的线程永远是同步队列第一个节点的所包装的线程。

注:我们将除去head节点的等待队列成为同步队列,其中所有节点包装的线程都没有获取到锁;等待队列包含head节点,但head节点是没有包装任何线程的,这是因为,最开始初始化时head节点就是一个空节点,且一个线程如果在经过等待之后获取了锁,那么包装该线程的节点就会成为head节点,但设置之后head节点的线程其实被置为null了,可以观察代码:

  • private Node enq(final Node node) {
  • for (; ; ) {
  • Node t = tail;
  • if (t == null) { // Must initialize
  • // 头节点设置为了一个空节点
  • if (compareAndSetHead(new Node()))
  • tail = head;
  • } else {
  • node.prev = t;
  • if (compareAndSetTail(t, node)) {
  • t.next = node;
  • return t;
  • }
  • }
  • }
  • }
  • ...
  • private void setHead(Node node) {
  • head = node;
  • // 设置为头节点后,线程被设置为了null
  • node.thread = null;
  • node.prev = null;
  • }

shouldParkAfterFailedAcquire()返回值为true时,就代表此时可以安全地挂起线程了,因此会继续执行parkAndCheckInterrupt()方法,源码如下:

  • private final boolean parkAndCheckInterrupt() {
  • // 使用LockSupport挂起线程
  • // 执行到这里之后,线程会被挂起阻塞,等待被唤醒
  • LockSupport.park(this);
  • return Thread.interrupted();
  • }

parkAndCheckInterrupt()方法使用LockSupport.park()来挂起线程,使用的是Unsafe类相关的方法,源码如下:

  • public static void park(Object blocker) {
  • Thread t = Thread.currentThread();
  • setBlocker(t, blocker);
  • unsafe.park(false, 0L);
  • setBlocker(t, null);
  • }

parkAndCheckInterrupt()方法的返回值为Thread.interrupted(),用于告诉acquireQueued()方法线程是否发生过中断。我们需要注意的是,shouldParkAfterFailedAcquire()parkAndCheckInterrupt()这两个方法可能会重复执行,因此在每次重复挂起线程后,一旦线程发生过中断,就会立即更新在acquireQueued()方法中定义的用于记录线程是否发生过中断的局部变量interrupted,这个变量最后会被acquireQueued()方法作为返回值返回,而我们回顾到最开始的AQS类的acquire()方法:

  • public final void acquire(int arg) {
  • /**
  • * 这里的tryAcquire()会尝试获取同步状态
  • * 如果没有获取到,将会调用addWaiter()方法将当前线程包装为一个Node节点加入等待队列
  • * 然后对节点调用acquireQueued()方法使其进入自旋尝试获取同步的状态
  • * 加入成功后将中断当前线程
  • */
  • if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • selfInterrupt();
  • }

会发现当acquireQueued()方法返回值为true,也即是当前线程已经成功获取到锁,但发生了中断,会执行selfInterrupt()对当前线程执行中断:

  • private static void selfInterrupt() {
  • Thread.currentThread().interrupt();
  • }

以上即是以ReentrantLock锁为例对同步状态的竞争和获取的详细解释,但只涉及到独占锁在公平模式下的情况。至于其他情况,会在后面的内容中讲解。下面我们先来了解同步状态的释放,依旧是以ReentrantLock为例。

4. 同步状态的释放

在使用ReentrantLock时,当需要控制同步的代码块结束后,会调用锁实例的unlock()方法来释放锁,ReentrantLock的unlock()方法如下:

  • public void unlock() {
  • sync.release(1);
  • }

可以得知,unlock()方法其实调用了同步器syncrelease()方法;其实释放锁与加锁操作非常类似,syncrelease()方法还是会调用AQS的release()方法:

  • public final boolean release(int arg) {
  • if (tryRelease(arg)) {
  • Node h = head;
  • if (h != null && h.waitStatus != 0)
  • unparkSuccessor(h);
  • return true;
  • }
  • return false;
  • }

在AQS的release()方法中,依旧会将判断是否成功释放锁的业务交给真正的实现类来处理,以FairSync为例,它的tryRelease()方法是从父类Sync中继承而来的,源码如下:

  • protected final boolean tryRelease(int releases) {
  • // 计算即将更新的state值
  • int c = getState() - releases;
  • if (Thread.currentThread() != getExclusiveOwnerThread())
  • // 如果不是当前拥有独占锁的线程尝试释放释放锁,就抛出异常
  • throw new IllegalMonitorStateException();
  • // 是否完全释放锁的标志,针对重入做的额外判断
  • boolean free = false;
  • if (c == 0) {
  • // 当更新后的state值为0时,可以释放锁
  • free = true;
  • // 设置拥有独占锁的线程为null
  • setExclusiveOwnerThread(null);
  • }
  • // 更新state值
  • setState(c);
  • return free;
  • }

从上面的注释可以得知,其实tryRelease()tryAcquire()操作正好是相反的,注释解释得已经比较清楚了,这里就不做多的赘述。

tryRelease()返回值为true时,表示释放锁成功,在前面release()方法中就会进入if (tryRelease(arg))分支,分支体内的代码的作用,就是唤醒头节点的后继节点;当头节点不为空且其waitStatus状态不为0时,会调用unparkSuccessor(h),唤醒头节点的后继节点,unparkSuccessor()位于AQS类中,代码如下:

  • /**
  • * Wakes up node's successor, if one exists.
  • * 如果node存在后继节点,则唤醒
  • * @param node the node
  • */
  • private void unparkSuccessor(Node node) {
  • /*
  • * If status is negative (i.e., possibly needing signal) try
  • * to clear in anticipation of signalling. It is OK if this
  • * fails or if status is changed by waiting thread.
  • */
  • int ws = node.waitStatus;
  • if (ws < 0)
  • // 将node的waitStatus状态设置为0(初始化状态)
  • compareAndSetWaitStatus(node, ws, 0);
  • /*
  • * Thread to unpark is held in successor, which is normally
  • * just the next node. But if cancelled or apparently null,
  • * traverse backwards from tail to find the actual
  • * non-cancelled successor.
  • * 拿到后继节点
  • */
  • Node s = node.next;
  • // 检查节点状态,当其waitStatus为CANCELLED将视为不符合条件
  • if (s == null || s.waitStatus > 0) {
  • s = null;
  • // 如果后继节点不符合条件,则从尾节点开始往前找,直到找到位于队列最前面的符合条件的节点
  • for (Node t = tail; t != null && t != node; t = t.prev)
  • if (t.waitStatus <= 0)
  • s = t;
  • }
  • if (s != null)
  • // 终止一个挂起的线程
  • LockSupport.unpark(s.thread);
  • }

我们这里传入的是等待队列的头节点,unparkSuccessor()方法首先判断了头节点waitStatus,如果小于0就使用CAS方式将其设置为0。接下来会拿到头节点的后继节点,如果后继节点为空,或者后继节点的waitStatus大于0(大于0表示取消了排队),就认为该后继节点不符合唤醒条件,此时会从尾节点开始从后往前查找位于等待队列最前面的(但不是头节点当前的后继节点),且waitStatus小于或等于0的节点作为待唤醒节点。当拿到合格的待唤醒节点后,使用LockSupport.unpark()方法将该节点所包装的线程唤醒,使用的是Unsafe类相关的方法:

  • public static void unpark(Thread thread) {
  • if (thread != null)
  • unsafe.unpark(thread);
  • }

当节点被唤醒,且获取到CPU的调度之后,就会继续从之前挂起的地方开始执行:

  • private final boolean parkAndCheckInterrupt() {
  • // 使用LockSupport挂起线程
  • // 执行到这里之后,线程会被挂起阻塞,等待被唤醒
  • // 被唤醒后会从这里开始执行
  • LockSupport.park(this);
  • return Thread.interrupted();
  • }

这里存在一个难以发现的疑问,虽然在unparkSuccessor()方法中出现从后向前遍历查找合格待唤醒节点的过程很少会出现,但一旦出现了,假设现在找到的合格的待唤醒节点处于等待队列中间位置,在唤醒这个节点包装的线程后,会回到acquireQueued()方法的for循环中,但由于此时该节点位于等待队列中间位置,它的前驱节点并不是头节点,所以不会尝试获取锁。那此时等待队列中所有的节点对应的线程都会陷入了阻塞状态?整个等待队列就瘫痪了?

我们先观察unparkSuccessor()方法从后向前遍历的循环:

  • // 如果后继节点不符合条件,则从尾节点开始往前找,直到找到位于队列最前面的符合条件的节点
  • for (Node t = tail; t != null && t != node; t = t.prev)
  • if (t.waitStatus <= 0)
  • s = t;

需要注意的是,从后向前查找的范围是除头节点和第二个节点之外的所有节点,最后找到的节点必然是处于等待队列第二个节点之后的某个节点,且从头节点到该节点之间的所有节点的状态必然全都大于0(即都取消了排队)。当找到待唤醒的节点后,该节点包装的线程从parkAndCheckInterrupt()唤醒后会重新尝试获取同步状态,失败后又会执行shouldParkAfterFailedAcquire()方法,这个方法设计十分巧妙,我们回顾它的代码:

  • private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  • // 获取前驱节点的waitStatus
  • int ws = pred.waitStatus;
  • if (ws == Node.SIGNAL) // SINGAL为-1
  • /*
  • * 如果前驱节点的waitStatus为SIGNAL,表示当前节点已经设置了要求释放锁的线程将其唤醒的状态,可以安全阻塞
  • * 上面的解释非常绕,用通俗的话来讲,当一个节点的前驱节点的waitStatus为SIGNAL时,前驱节点在释放锁时会将其唤醒
  • */
  • return true;
  • if (ws > 0) {
  • /*
  • * 如果前驱节点的waitStatus大于0,表示前驱节点已经取消了排队等待,
  • * 尝试向前查找一个waitStatus不大于0的节点作为当前节点的前驱节点,并把该节点的后继节点设置为当前节点,
  • * 上面向前查找的过程即do ... while循环的作用,
  • * 这个不断向前查找的过程,可以将当前节点前面所有取消了排队的节点全部剔除略过,减小链表的长度
  • */
  • do {
  • node.prev = pred = pred.prev;
  • } while (pred.waitStatus > 0);
  • pred.next = node;
  • } else {
  • /*
  • * 当执行到这里,前驱节点的waitStatus只会为0,CONDITION(-2)、PROPAGATE(-3),
  • * 此时则尝试将前驱节点的waitStatus状态设置为SIGNAL,
  • * 做这一步操作,主要是用于在后面某个线程释放锁时,
  • * 如果该线程的waitStatus状态为SIGNAL,且其为等待队列的头节点,就会尝试唤醒其后继节点。
  • */
  • compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  • }
  • return false;
  • }

因为此时从头节点到待唤醒节点的之间的所有节点的状态全部是大于0的,因此会触发if (ws > 0)分支里的do ... while操作将这些节点全部剔除,做完剔除工作之后,待唤醒节点就变成了头节点的后继节点了,且shouldParkAfterFailedAcquire()返回了false,进而会忽略运行parkAndCheckInterrupt()方法直接进行下一次循环,在下一次循环中待唤醒节点就能成功获取到锁了。如果不好理解,可以观察下图:

3.遍历获取待唤醒节点操作.png

至此,其实AQS同步状态相关的简单业务逻辑已经讲解完毕了,不过大部分代码的逻辑都是基于ReentrantLock的FairSync来讲解的,因此下面将补充讲解非公平锁的差别。

5. 非公平锁

非公平锁和公平锁在获取锁的方法上流程是一样的,它们的区别主要表现在尝试获取锁的机制不同。简单来说,公平锁在每次尝试获取锁时,会根据等待队列依次排序等待;而非公平锁在每次尝试获取锁时,将无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁。公平与否,是针对等待队列中处于等待状态的线程而言的。

ReentrantLock类中NonfairSync类的源码如下:

  • static final class NonfairSync extends Sync {
  • private static final long serialVersionUID = 7316153563782823691L;
  • final void lock() {
  • if (compareAndSetState(0, 1))
  • setExclusiveOwnerThread(Thread.currentThread());
  • else
  • acquire(1);
  • }
  • protected final boolean tryAcquire(int acquires) {
  • return nonfairTryAcquire(acquires);
  • }
  • }

从源码可以得知,NonfairSync的lock()方法与公平同步器FairSync相比,一开始就尝试修改同步状态,如果没有修改成功才会进入acquire(1)方法,acquire(1)会调用tryAcquire()方法,与FairSync不同的是,NonfairSync的tryAcquire()方法调用了父类Sync的nonfairTryAcquire(acquires)

  • final boolean nonfairTryAcquire(int acquires) {
  • final Thread current = Thread.currentThread();
  • int c = getState();
  • if (c == 0) {
  • if (compareAndSetState(0, acquires)) {
  • setExclusiveOwnerThread(current);
  • return true;
  • }
  • } else if (current == getExclusiveOwnerThread()) {
  • int nextc = c + acquires;
  • if (nextc < 0) // overflow
  • throw new Error("Maximum lock count exceeded");
  • setState(nextc);
  • return true;
  • }
  • return false;
  • }

与FairSync的tryAcquire()方法对比:

  • protected final boolean tryAcquire(int acquires) {
  • final Thread current = Thread.currentThread();
  • int c = getState();
  • if (c == 0) {
  • if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
  • setExclusiveOwnerThread(current);
  • return true;
  • }
  • } else if (current == getExclusiveOwnerThread()) {
  • int nextc = c + acquires;
  • if (nextc < 0)
  • throw new Error("Maximum lock count exceeded");
  • setState(nextc);
  • return true;
  • }
  • return false;
  • }

会发现nonfairTryAcquire()中少了一步操作:调用hasQueuedPredecessors()。在FairSync的tryAcquire()方法中,当state为0时,会首先调用hasQueuedPredecessors()方法,如果其返回值为false,才会尝试CAS方式修改statehasQueuedPredecessors()源码如下:

  • public final boolean hasQueuedPredecessors() {
  • // The correctness of this depends on head being initialized
  • // before tail and on head.next being accurate if the current
  • // thread is first in queue.
  • Node t = tail; // Read fields in reverse initialization order
  • Node h = head;
  • Node s;
  • // 等待队列为空时,返回false
  • // 等待队列不为空时,分三种情况:
  • // - 头节点的后继节点为空,返回true
  • // - 头节点的后继节点不为空,但该后继节点包装的线程是当前线程,返回false
  • // - 头节点的后继节点不为空,且该后继节点包装的线程并不是当前线程,返回true
  • return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  • }

从源码可以得知,公平模式下,有两种情况会尝试获取同步状态:

  • 队列为空;此时当然可以直接尝试获取同步状态。
  • 队列不为空,但当前线程就是头节点的后继节点包装的线程;此时当前线程就是即将唤醒的线程,因此也可以直接尝试获取同步状态。

从上面的分析我们可以得出公平模式和非公平模式的差异:公平模式下,线程在尝试获取锁时,即使锁没有被任何线程锁持有,它也会判断自己是不是即将被唤醒的那个线程,如果是才尝试获取锁,如果不是就排队等待。而在非公平模式下不会有这些考虑,直接就尝试获取锁,但如果这两次尝试都不成功的话,就与公平锁一样,需要添加到等待队列尾部等待唤醒。

非公平锁会有更好的性能,但非公平锁让获取锁的时间变得不确定,可能会导致在同步队列中的线程长期处于饥饿状态。