Java多线程
Java并发
JUC锁

Java多线程 22 - AbstractQueuedSynchronizer详解(3)

简介:AQS是一个抽象类,继承自AbstractOwnableSynchronizer类,并实现了Serializable接口。虽然AQS是一个抽象类,但其内部并没有抽象方法,这是典型的模板设计模式的应用。AQS作为一个基础组件为继承它的实现类提供基础设施,如构建等待队列、控制同步状态等;其内部除了提供并发操作的核心方法以及等待队列操作外,还提供了一些模板方法让子类自己实现,AQS只关注内部公共方法实现,并不关心外部不同模式的实现。

在前两篇AQS文章中,以ReentrantLock为导向,讲解了AQS对独占模式下锁的各类操作;AQS还支持共享模式的锁操作,在JUC包中,ReentrantReadWriteLock类对共享模式有着经典的实现,本篇文章就以ReentrantReadWriteLock为导向,讲解AQS对共享模式的支持。

ReentrantReadWriteLock锁的简单使用在前面的文章中已经讲解过了,不了解的读者可以先阅读该文章Java多线程 17 - ReentrantReadWriteLock读写锁,建议先熟悉ReentrantReadWriteLock锁的使用,再阅读本文。

1. ReentrantReadWriteLock的基本架构

ReentrantReadWriteLock的实现非常经典,由于ReentrantReadWriteLock允许同时创建读锁和写锁,其中读锁是共享锁,而写锁是独占锁,因此它需要同时控制两种同步状态,且这两种同步状态是不同的。ReentrantReadWriteLock也是基于AQS类实现的,从前面的文章中我们知道,AQS使用了一个int类型的变量state来控制同步状态的获取、重入和释放,这种使用在ReentrantLock的实现中尤其明显;与ReentrantLock一样,ReentrantReadWriteLock类中也存在着同步器Sync的概念,且在ReentrantReadWriteLock中巧妙地使用Sync将state变量分成了两个部分分别用于控制两种同步状态,源码如下:

  • abstract static class Sync extends AbstractQueuedSynchronizer {
  • private static final long serialVersionUID = 6317671515068378041L;
  • /*
  • * Read vs write count extraction constants and functions.
  • * Lock state is logically divided into two unsigned shorts:
  • * The lower one representing the exclusive (writer) lock hold count,
  • * and the upper the shared (reader) hold count.
  • * AQS提供的state是int类型的变量,占32位
  • * 在ReentrantReadWriteLock中需要同时表示独占和共享两种模式的加锁次数
  • * 因此将32位state分为两部分,高16位表示共享锁加锁次数,低16位表示独占锁加锁次数
  • * 因此共享和独占最多加锁次数为2^16 - 1,即65535次
  • */
  • // 该值是位移位数标识
  • static final int SHARED_SHIFT = 16;
  • /**
  • * 共享模式的计数器,65536, 0001 0000 0000 0000 0000
  • * 由于共享锁加锁次数占据高16位,因此每次加锁,需要加上第17位为1,其余位全为0的值,才能正确完成计数
  • */
  • static final int SHARED_UNIT = (1 << SHARED_SHIFT);
  • // 加锁的最大次数,即16位二进制最大表示的大小,65535, 1111 1111 1111 1111
  • static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
  • // 计算独占模式的高位掩码,65535, 0001 0000 0000 0000 0000 - 1 -> 1111 1111 1111 1111
  • static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  • /**
  • * Returns the number of shared holds represented in count
  • * 获取共享模式下加锁的次数
  • */
  • static int sharedCount(int c) {
  • // 无符号右移,将低16位抛弃后,得到的就是高16位,即共享锁加锁次数
  • return c >>> SHARED_SHIFT;
  • }
  • /**
  • * Returns the number of exclusive holds represented in count
  • * 获取独占模式下加锁的次数
  • */
  • static int exclusiveCount(int c) {
  • // c和0000 0000 0000 0000 1111 1111 1111 1111相与后,高16位被抛弃,得到低16位,即独占锁加锁次数
  • return c & EXCLUSIVE_MASK;
  • }
  • ...
  • }

从源码可以得知,ReentrantReadWriteLock类的内部抽象类Sync继承自AQS类,state作为一个int类型的变量,在内存中存储的二进制长度为32位,因此Sync将state分为两部分,其中高16位代表共享锁的加锁次数,而低16位代表独占锁的加锁次数,通过一系列的运算实现,详细分析如下:

  • SHARED_SHIFT:这个常量的值恒定为16,代表在计算过程中需要用到的位移位数。
  • MAX_COUNT:代表加锁的最大次数;因为state分为两部分后,共享锁和独占锁的记录值最大只能是16位的二进制数,因此最大加锁次数为65535次(16位全为1,2^16 - 1),就是MAX_COUNT的值。
  • SHARED_UNIT:这个值是在更新共享锁时的计数单位;由于共享锁占据高16位,因此每次共享锁加锁次数加1,都需要加上第17位为1,其余位全为0的值,才能正确完成计数。
  • EXCLUSIVE_MASK:用于计算独占锁的高位掩码,通过将state值与该值相与,可以将state的高16位全部置为0,得到的就是低16位的值。

同时,Sync提供了两个方法用于获取共享锁和独占锁的重入次数。sharedCount(int c)用于获取共享锁的重入次数,它的实现是将传入的c(具体使用中会传入state)进行无符号右移16位操作,右移后c值的低16位会被抛弃,高16位会被补0,得到的就是共享锁的重入次数。exclusiveCount(int c)用于获取独占锁的重入次数,它的实现是将传入的c(具体使用中会传入state)和EXCLUSIVE_MASK做与计算操作,计算后c值的高16位会被置为0,得到的低16位就是独占锁的重入次数。

下图是对上述各类值和操作的示意图:

1.ReentrantReadWriteLock对state的划分.png

分析完ReentrantReadWriteLock对state的划分,我们考察一下ReentrantReadWriteLock对state的整体结构:

  • public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
  • private final ReentrantReadWriteLock.ReadLock readerLock;
  • /** Inner class providing writelock */
  • private final ReentrantReadWriteLock.WriteLock writerLock;
  • /** Performs all synchronization mechanics */
  • final Sync sync;
  • public ReentrantReadWriteLock(boolean fair) {
  • sync = fair ? new FairSync() : new NonfairSync();
  • readerLock = new ReadLock(this);
  • writerLock = new WriteLock(this);
  • }
  • public ReentrantReadWriteLock.WriteLock writeLock() {
  • return writerLock;
  • }
  • public ReentrantReadWriteLock.ReadLock readLock() {
  • return readerLock;
  • }
  • abstract static class Sync extends AbstractQueuedSynchronizer { ... }
  • static final class NonfairSync extends Sync { ... }
  • static final class FairSync extends Sync { ... }
  • public static class ReadLock implements Lock, java.io.Serializable { ... }
  • public static class WriteLock implements Lock, java.io.Serializable { ... }
  • }

可以看到ReentrantReadWriteLock在公平锁与非公平锁的实现上与ReentrantLock一样,有NonfairSync和FairSync两个都继承Sync的类,通过构造函数传入的布尔值决定要构造哪一种Sync实例。

ReentrantReadWriteLock比ReentrantLock多出了两个内部类:ReadLock和WriteLock, 用来定义读锁和写锁,然后在构造函数中,会构造一个读锁和一个写锁实例保存到成员变量readerLockwriterLockreadLock()writeLock()方法就是用于返回这两个成员变量保存的锁实例。

2. 写锁分析

我们先关注WriteLock类中的lock()unlock()方法:

  • public void lock() {
  • sync.acquire(1);
  • }
  • public void unlock() {
  • sync.release(1);
  • }

可以发现lock()unlock()方法分别调用了同步器的acquire(1)release(1)方法,这里依旧以公平模式下的FairSync同步器为例;与ReentrantLock一样,ReentrantReadWriteLock类中FairSync的acquire(int)release(int)方法也是来自父类Sync,而父类Sync的这两个方法来自于AQS:

  • public final void acquire(int arg) {
  • /**
  • * 这里的tryAcquire()会尝试获取同步状态
  • * 如果没有获取到,将会调用addWaiter()方法将当前线程包装为一个Node节点加入等待队列
  • * 然后对节点调用acquireQueued()方法使其进入自旋尝试获取同步的状态
  • * 加入成功后将中断当前线程
  • */
  • if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • selfInterrupt();
  • }
  • public final boolean release(int arg) {
  • if (tryRelease(arg)) {
  • Node h = head;
  • if (h != null && h.waitStatus != 0)
  • // 唤醒等待状态的线程
  • unparkSuccessor(h);
  • return true;
  • }
  • return false;
  • }

这一步的实现其实与ReentrantLock是一模一样的,不同点在于此时acquire(int)方法中tryAcquire(int)release(int)方法中的tryRelease(int)会调用ReentrantReadWriteLock类的Sync类重写的方法;先关注tryAcquire(int)方法的源码:

  • protected final boolean tryAcquire(int acquires) {
  • /*
  • * Walkthrough:
  • * 1. If read count nonzero or write count nonzero
  • * and owner is a different thread, fail.
  • * 2. If count would saturate, fail. (This can only
  • * happen if count is already nonzero.)
  • * 3. Otherwise, this thread is eligible for lock if
  • * it is either a reentrant acquire or
  • * queue policy allows it. If so, update state
  • * and set owner.
  • */
  • Thread current = Thread.currentThread();
  • // state值
  • int c = getState();
  • // 获取独占锁的持有记录值
  • int w = exclusiveCount(c);
  • if (c != 0) {
  • /**
  • * (Note: if c != 0 and w == 0 then shared count != 0)
  • * 当c不为0时,表示独占锁和共享锁其中一个必然有被线程持有,分两种情况:
  • * 1. 当独占锁记录值为0,则此时共享锁必然被持有了,则获取独占锁失败,直接返回false
  • * 2. 当独占锁记录值不为0,即此时独占锁被持有了,就需要判断获取锁的线程是否就是当前拥有独占锁的线程,如果不是则获取独占锁失败,直接返回false
  • */
  • if (w == 0 || current != getExclusiveOwnerThread())
  • return false;
  • // 判断独占锁重入次数是否过多,导致记录值超过MAX_COUNT
  • if (w + exclusiveCount(acquires) > MAX_COUNT)
  • throw new Error("Maximum lock count exceeded");
  • /**
  • * Reentrant acquire
  • * 获取独占锁成功,更新state的独占锁记录值
  • */
  • setState(c + acquires);
  • return true;
  • }
  • /**
  • * 此时state值为0,表示没有线程获取锁
  • * writerShouldBlock()底层调用了hasQueuedPredecessors(),用于判断同步队列是否有线程等待了很久
  • * 如果没有等待的线程,就尝试CAS方式修改state值,
  • * 如果修改失败说明此时有其他线程并发抢锁,而当前线程没抢到,直接返回false
  • */
  • if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
  • return false;
  • // 抢锁成功,将独占锁线程设置为当前线程
  • setExclusiveOwnerThread(current);
  • return true;
  • }

上面的源码已经将tryAcquire(int)方法的流程讲解地非常清楚了,这里需要注意的是,如果有线程持有了共享锁(读锁),那么获取独占锁(写锁)是失败的,即当有线程在进行读操作时,无法进行写操作。同时写操作同一时刻只允许一个线程进行。

释放锁操作的tryRelease(int)方法比较简单,源码如下:

  • protected final boolean tryRelease(int releases) {
  • // 判断当前线程是否是拥有独占锁的线程,如果不是直接抛出异常
  • if (!isHeldExclusively())
  • throw new IllegalMonitorStateException();
  • // 计算释放锁后的state值
  • int nextc = getState() - releases;
  • // 判断独占锁记录值是否为0,如果为0表示可以释放独占锁了
  • boolean free = exclusiveCount(nextc) == 0;
  • if (free)
  • // 将拥有独占锁的线程记录变量设置为null,即释放独占锁
  • setExclusiveOwnerThread(null);
  • // 更新state值
  • setState(nextc);
  • // 返回值表示独占锁是否已被释放
  • return free;
  • }

ReentrantReadWriteLock类在写锁的获取和释放上与ReentrantLock相比,其实就只有上面更新state值的操作不一样,其他的类似获取锁不成功进入同步队列进行等待、被唤醒、重新抢锁等操作,其实就是AQS独占模式的应用,流程与ReentrantLock的流程是一模一样的,这里不再赘述。本文的重心在接下来的内容中。

3. 读锁分析

ReentrantReadWriteLock类的读锁,是典型的共享锁的应用,也是本文需要讲解的重心。类似于写锁,读锁的lock()unlock()的实现实际对应Sync的tryAcquireShared(int)tryReleaseShared(int)方法:

  • public void lock() {
  • sync.acquireShared(1);
  • }
  • public void unlock() {
  • sync.releaseShared(1);
  • }

acquireShared(int)releaseShared(int)都属于AQS的方法,内部又调用了tryAcquireShared(int)tryReleaseShared(int)

  • public final void acquireShared(int arg) {
  • if (tryAcquireShared(arg) < 0)
  • // 获取失败将调用doAcquireShared()
  • doAcquireShared(arg);
  • }
  • public final boolean releaseShared(int arg) {
  • if (tryReleaseShared(arg)) {
  • doReleaseShared();
  • return true;
  • }
  • return false;
  • }

tryAcquireShared(int)tryReleaseShared(int)自然又是由ReentrantReadWriteLock的内部类Sync进行了重写。

3.1. 获取读锁

我们先关注用于获取读锁(共享锁)的tryAcquireShared(int)方法的源码:

  • protected final int tryAcquireShared(int unused) {
  • /*
  • * Walkthrough:
  • * 1. If write lock held by another thread, fail.
  • * 2. Otherwise, this thread is eligible for
  • * lock wrt state, so ask if it should block
  • * because of queue policy. If not, try
  • * to grant by CASing state and updating count.
  • * Note that step does not check for reentrant
  • * acquires, which is postponed to full version
  • * to avoid having to check hold count in
  • * the more typical non-reentrant case.
  • * 3. If step 2 fails either because thread
  • * apparently not eligible or CAS fails or count
  • * saturated, chain to version with full retry loop.
  • */
  • Thread current = Thread.currentThread();
  • // 获取state值
  • int c = getState();
  • /**
  • * 如果已经有线程获取了独占锁,且当前线程并不是拥有独占锁的线程,则获取共享锁失败,直接返回-1
  • * 从这里我们可以得知,拥有写锁的线程其实是可以同时拥有读锁的,这也是锁降级的实现,后面会讲解
  • */
  • if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
  • return -1;
  • // 共享锁持有计数
  • int r = sharedCount(c);
  • /**
  • * 如果同步队列中没有线程已经等待了很久
  • * 且共享锁的计数小于最大阈值,则尝试修改state的值,即更新共享锁记录值
  • */
  • if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
  • // 能够进入if内部,表示更新共享锁记录值成功
  • if (r == 0) {
  • /**
  • * 如果r为0,则表示之前其实没有线程持有共享锁,
  • * 因此当前线程是第一个持有共享锁的线程,使用firstReader记录当前线程
  • * 同时设置firstReaderHoldCount为1
  • */
  • firstReader = current;
  • firstReaderHoldCount = 1;
  • } else if (firstReader == current) {
  • /**
  • * 如果r不为0,则判断当前线程是否是第一个持有共享锁的线程,
  • * 如果是,就将firstReaderHoldCount的值加1
  • */
  • firstReaderHoldCount++;
  • } else {
  • /**
  • * 走到这里,说明当前线程不是第一个获取共享锁的线程
  • * 需要在readHolds的本线程副本中记录当前线程重入数,readHolds是一个ThreadLocal类型的对象
  • * 这是为了实现JDK 1.6中加入的getReadHoldCount()方法的,
  • * 这个方法能获取当前线程重入共享锁的次数,
  • * 原理很简单:
  • * 如果当前只有一个线程的话,还不需要使用readHolds,直接更新firstReaderHoldCount来记录重入数,
  • * 当有第二个线程来的时候,就要使用readHolds,每个线程拥有自己的副本,用来保存自己的重入数。
  • */
  • HoldCounter rh = cachedHoldCounter;
  • if (rh == null || rh.tid != getThreadId(current))
  • cachedHoldCounter = rh = readHolds.get();
  • else if (rh.count == 0)
  • readHolds.set(rh);
  • rh.count++;
  • }
  • // 返回1,表示获取共享锁成功
  • return 1;
  • }
  • /**
  • * 走到这里,说明获取共享锁失败,会进入自旋抢锁;失败的情况有三种:
  • * 1. 当前同步队列已经有线程等待很久了;
  • * 2. 共享锁的计数太大,已经超过最大阈值;
  • * 3. CAS方式修改共享锁记录值失败
  • */
  • return fullTryAcquireShared(current);
  • }

上面的源码对tryAcquireShared(int)方法的讲解已经比较清楚了,这里唯一需要注意的是,读锁记录了每个线程重入的次数,在内部实现上,如果仅仅只有一个线程多次重入获取了读锁,其实只会更新成员变量firstReaderHoldCount进行记录;一旦有多个线程获取了读锁,就会使用ThreadLocal类型的变量readHolds分别记录每个线程的重入次数。

tryAcquireShared(int)方法的最后,当获取共享锁失败时,会调用fullTryAcquireShared(current)方法,该方法源码如下:

  • /**
  • * Full version of acquire for reads, that handles CAS misses
  • * and reentrant reads not dealt with in tryAcquireShared.
  • */
  • final int fullTryAcquireShared(Thread current) {
  • /*
  • * This code is in part redundant with that in
  • * tryAcquireShared but is simpler overall by not
  • * complicating tryAcquireShared with interactions between
  • * retries and lazily reading hold counts.
  • */
  • HoldCounter rh = null;
  • for (; ; ) {
  • // state值
  • int c = getState();
  • if (exclusiveCount(c) != 0) {
  • // 如果已有线程持有独占锁,并且持有独占锁的线程不是当前线程,则表示获取失败,直接返回-1
  • if (getExclusiveOwnerThread() != current)
  • return -1;
  • // else we hold the exclusive lock; blocking here
  • // would cause deadlock.
  • } else if (readerShouldBlock()) {
  • // 走到这里表示可能需要阻塞当前线程
  • // Make sure we're not acquiring read lock reentrantly
  • if (firstReader == current) {
  • // assert firstReaderHoldCount > 0;
  • // 当前线程是第一个获取共享锁的线程,则继续往下执行
  • } else {
  • // 否则取得当前线程的获取锁的次数,如果次数为0,则表示获取失败,直接返回-1
  • if (rh == null) {
  • rh = cachedHoldCounter;
  • if (rh == null || rh.tid != getThreadId(current)) {
  • rh = readHolds.get();
  • if (rh.count == 0)
  • readHolds.remove();
  • }
  • }
  • if (rh.count == 0)
  • return -1;
  • }
  • }
  • // 如果共享锁的计数等于最大阈值,抛出错误
  • if (sharedCount(c) == MAX_COUNT)
  • throw new Error("Maximum lock count exceeded");
  • // CAS方式修改共享锁的计数
  • if (compareAndSetState(c, c + SHARED_UNIT)) {
  • if (sharedCount(c) == 0) {
  • /**
  • * 如果sharedCount(c)为0,则表示之前其实没有线程持有共享锁,注意此时的c是修改前的共享锁的计数
  • * 因此当前线程是第一个持有共享锁的线程,使用firstReader记录当前线程
  • * 同时设置firstReaderHoldCount为1
  • */
  • firstReader = current;
  • firstReaderHoldCount = 1;
  • } else if (firstReader == current) {
  • /**
  • * 如果sharedCount(c)不为0,则判断当前线程是否是第一个持有共享锁的线程,
  • * 如果是,就将firstReaderHoldCount的值加1
  • */
  • firstReaderHoldCount++;
  • } else {
  • /**
  • * 走到这里,说明当前线程不是第一个获取共享锁的线程
  • * 需要在readHolds的本线程副本中记录当前线程重入数
  • */
  • if (rh == null)
  • rh = cachedHoldCounter;
  • if (rh == null || rh.tid != getThreadId(current))
  • rh = readHolds.get();
  • else if (rh.count == 0)
  • readHolds.set(rh);
  • rh.count++;
  • cachedHoldCounter = rh; // cache for release
  • }
  • // 返回1,表示获取共享锁成功
  • return 1;
  • }
  • }
  • }

从源码可以看出fullTryAcquireShared(Thread)方法与tryAcquireShared(int)主要功能基本相同,不同的是fullTryAcquireShared(Thread)方法会进入自旋,在CAS方式修改state失败后在下一次循环重新尝试修改。同时,如果当前有线程获取了写锁(即独占锁),或者需要进行阻塞等待时,会返回-1。

回到acquireShared(int)方法,当tryAcquireShared(int)在尝试修改state获取共享锁失败后会返回-1,因此会执行doAcquireShared(arg),该方法源码如下:

  • private void doAcquireShared(int arg) {
  • // 创建共享节点并添加到队列尾
  • final Node node = addWaiter(Node.SHARED);
  • boolean failed = true;
  • try {
  • boolean interrupted = false;
  • // 然后进行自旋,不断尝试获取同步状态
  • for (; ; ) {
  • final Node p = node.predecessor();
  • if (p == head) {
  • // 尝试获取共享锁
  • int r = tryAcquireShared(arg);
  • if (r >= 0) {
  • // 获取共享锁成功,向后传递
  • setHeadAndPropagate(node, r);
  • p.next = null; // help GC
  • if (interrupted)
  • // 如果在等待期间出现中断,则重现中断操作
  • selfInterrupt();
  • failed = false;
  • return;
  • }
  • }
  • /**
  • * 执行到此处,说明上面尝试获取锁失败了,因此可以尝试将当前线程挂起
  • * 这里的shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()在之前已经讲解过了
  • */
  • if (shouldParkAfterFailedAcquire(p, node) &&
  • parkAndCheckInterrupt())
  • interrupted = true;
  • }
  • } finally {
  • if (failed)
  • cancelAcquire(node);
  • }
  • }

doAcquireShared(int)与之前讲解的acquireQueued(final Node, int)方法非常类似,因此这里我们只关注在线程获取到锁之后执行的setHeadAndPropagate(node, r)方法,源码如下:

  • /**
  • * Sets head of queue, and checks if successor may be waiting
  • * in shared mode, if so propagating if either propagate > 0 or
  • * PROPAGATE status was set.
  • *
  • * @param node the node
  • * @param propagate the return value from a tryAcquireShared
  • */
  • private void setHeadAndPropagate(Node node, int propagate) {
  • // 记录旧的头节点,用于下面的各项检查
  • Node h = head; // Record old head for check below
  • // 当前节点的线程已获取到共享锁,因此将其该节点设置为头节点
  • setHead(node);
  • /*
  • * Try to signal next queued node if:
  • * Propagation was indicated by caller,
  • * or was recorded (as h.waitStatus) by a previous operation
  • * (note: this uses sign-check of waitStatus because
  • * PROPAGATE status may transition to SIGNAL.)
  • * and
  • * The next node is waiting in shared mode,
  • * or we don't know, because it appears null
  • *
  • * The conservatism in both of these checks may cause
  • * unnecessary wake-ups, but only when there are multiple
  • * racing acquires/releases, so most need signals now or soon
  • * anyway.
  • *
  • * propagate大于0时,表示当前线程已经获取到共享锁了,需要尝试唤醒下一个节点
  • * 如果旧的头节点为空,表示之前同步队列就是空的,没有线程在等待
  • * 如果旧的头节点不为空,但旧的头节点的waitStatus小于0(即为SIGNAL(-1)或者PROPAGATE(-3)),表示应该尝试唤醒后继节点
  • */
  • if (propagate > 0 || h == null || h.waitStatus < 0) {
  • /**
  • * 如果node的下一个节点为空,表示此时同步队列没有节点等待了,因此当前线程是最后一个获取共享锁的线程
  • * 或者node的下一个节点不为空,且是共享节点
  • * 就执行释放共享锁操作
  • */
  • Node s = node.next;
  • if (s == null || s.isShared())
  • doReleaseShared();
  • }
  • }

acquireQueued(final Node, int)方法中setHead()的实现不一样,setHeadAndPropagate(node, r)方法不仅将获取到共享锁的节点设置为头节点,并且进行了传播唤醒;查看doReleaseShared()的源码:

  • /**
  • * Release action for shared mode -- signal successor and ensure
  • * propagation. (Note: For exclusive mode, release just amounts
  • * to calling unparkSuccessor of head if it needs signal.)
  • * 共享模式的释放操作,将唤醒后继节点并保证传播性
  • * 与独占模式不同,独占模式只会尝试唤醒后继节点
  • */
  • private void doReleaseShared() {
  • /*
  • * Ensure that a release propagates, even if there are other
  • * in-progress acquires/releases. This proceeds in the usual
  • * way of trying to unparkSuccessor of head if it needs
  • * signal. But if it does not, status is set to PROPAGATE to
  • * ensure that upon release, propagation continues.
  • * Additionally, we must loop in case a new node is added
  • * while we are doing this. Also, unlike other uses of
  • * unparkSuccessor, we need to know if CAS to reset status
  • * fails, if so rechecking.
  • */
  • for (; ; ) {
  • Node h = head;
  • if (h != null && h != tail) {
  • int ws = h.waitStatus;
  • if (ws == Node.SIGNAL) {
  • // 如果当前节点的waitStatus为SIGNAL,则尝试改为0,修改成功就唤醒头节点的后继节点锁包装的线程,修改失败重新尝试修改
  • if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  • continue; // loop to recheck cases
  • // 这里会唤醒头节点的后继节点
  • unparkSuccessor(h);
  • } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  • // 如果头节点的waitStatus为0,则尝试修改为PROPAGATE,如果修改失败则重新尝试修改
  • continue; // loop on failed CAS
  • }
  • /**
  • * 由于头节点的后继节点所包装的线程被唤醒了,所以当被唤醒的线程获取到共享锁后,头节点可能会被修改;
  • * 因此当头节点被修改后,继续循环操作,开始下一轮的传播唤醒;
  • * 如果头节点没有被改变,说明此时虽然唤醒了后继节点,但后继节点并没有获取到共享锁,
  • * 因此直接break,将下一轮的传播唤醒操作交给后继节点
  • */
  • if (h == head) // loop if head changed
  • break;
  • }
  • }

doReleaseShared()整体是一个无限for循环,它会不断地尝试唤醒头节点的后继节点。这里我们着重分析一下该for循环的跳出条件h == headh == head成立即表示在一次循环后,头节点没有被修改,那么什么时候头节点会被修改呢?假设当前线程获取到共享锁了,因此该线程所在的节点就是头节点(注意,此时头节点只是代表该线程,但该节点的thread被置为null了),头节点尝试唤醒它的后继节点后,如果后继节点成功获取到共享锁就会将自己设置为头节点,此时头节点就被修改了,h == head条件不成立,还会进入下一轮for循环,继续唤醒新的头节点的后继节点;头节点没有被修改的情况主要是:头节点唤醒的后继节点并不是共享节点,此时该节点的线程想获取独占锁,是会被阻塞的。所以此时,头节点并不会被修改,因此直接跳出for循环,结束传播唤醒。

因此我们还可以得出一个结论,当同步队列中既存在读锁的竞争线程,也存在写锁的竞争线程时,如果某个线程已经获取到了读锁,会在同步队列中进行向后传播唤醒,试图唤醒后面正处于挂起状态的尝试获取读锁的线程,直到遇到第一个挂起的尝试获取写锁的线程为止(因为在有线程读的情况下无法同时写),且处于请求写锁的线程之后的所有挂起的请求读锁的线程,也只能继续等待,无法通过传播唤醒获取到读锁。这种设计既实现了读锁的共享获取,也保证了获取锁的顺序性。

同时,我们回顾这一段代码:

  • private void doReleaseShared() {
  • ...
  • for (; ; ) {
  • Node h = head;
  • if (h != null && h != tail) {
  • int ws = h.waitStatus;
  • if (ws == Node.SIGNAL) {
  • // 如果当前节点的waitStatus为SIGNAL,则尝试改为0,修改成功就唤醒头节点的后继节点锁包装的线程,修改失败重新尝试修改
  • if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  • continue; // loop to recheck cases
  • // 这里会唤醒头节点的后继节点
  • unparkSuccessor(h);
  • } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  • // 如果头节点的waitStatus为0,则尝试修改为PROPAGATE,如果修改失败则重新尝试修改
  • continue; // loop on failed CAS
  • }
  • }
  • ...
  • }

从上面的分析我们知道,当头节点唤醒了后继节点且后继节点成功地获取到共享锁之后,这个后继节点的线程也可能会运行到doReleaseShared()方法中的,而此时头节点的线程开始了第二次for循环,因此两个线程会在for循环内相遇,但只会有一个线程能够成功将头节点的waitStatusSIGNAL修改为0;其中的一个线程修改成功后,另一个线程可能会恰好判断到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))分支,此时头节点的waitStatus正好为0,因此会被尝试修改为PROPAGATE。这两个线程的竞争,一直到h == head才会结束。

setHeadAndPropagate(Node, int)doReleaseShared()的设计看似存在相互冲突的地方,例如存在两个线程同时尝试唤醒后继节点,但个人觉得此处这样设计,虽然可能会使已经获取到共享锁的线程存在性能损耗,却能够有效地提高唤醒共享节点的性能和吞吐量。

3.2. 释放读锁

释放读锁由releaseShared(int)方法处理,该方法由AQS提供,并且调用了ReentrantReadWriteLock中Sync同步器重写的tryReleaseShared(int)方法:

  • public final boolean releaseShared(int arg) {
  • if (tryReleaseShared(arg)) {
  • doReleaseShared();
  • return true;
  • }
  • return false;
  • }

tryReleaseShared(int)方法的源码如下:

  • protected final boolean tryReleaseShared(int unused) {
  • Thread current = Thread.currentThread();
  • if (firstReader == current) {
  • /**
  • * 如果释放共享锁的线程就是firstReader,且firstReaderHoldCount为0,
  • * 说明此时firstReader只重入获取了一次共享锁,
  • * 此时释放了共享锁,就将firstReader置为null
  • */
  • // assert firstReaderHoldCount > 0;
  • if (firstReaderHoldCount == 1)
  • firstReader = null;
  • else
  • // 否则只是将firstReaderHoldCount减1
  • firstReaderHoldCount--;
  • } else {
  • // 否则需要从readHolds中取出当前线程重入的次数
  • HoldCounter rh = cachedHoldCounter;
  • if (rh == null || rh.tid != getThreadId(current))
  • rh = readHolds.get();
  • int count = rh.count;
  • if (count <= 1) {
  • // 此时表示当前线程释放了所有的重入次数
  • readHolds.remove();
  • if (count <= 0)
  • // 此时表示释放锁的次数超过了加锁的次数,直接抛出异常
  • throw unmatchedUnlockException();
  • }
  • // 更新值
  • --rh.count;
  • }
  • for (; ; ) {
  • // 自旋方式保证共享锁记录值能被成功更新
  • int c = getState();
  • int nextc = c - SHARED_UNIT;
  • if (compareAndSetState(c, nextc))
  • // Releasing the read lock has no effect on readers,
  • // but it may allow waiting writers to proceed if
  • // both read and write locks are now free.
  • // 如果共享锁的记录值更新后为0,则表示所有线程都已经释放了锁,返回true
  • return nextc == 0;
  • }
  • }

注释中已经详细解释了tryReleaseShared(int)方法的主要功能,tryReleaseShared(int)返回true的时候,表示当前已经没有线程拥有锁了(无论是共享锁还是独占锁),此时会调用doReleaseShared()唤醒等待的共享节点所包装的线程,这个方法在之前已经分析过了。

4. 公平模式和非公平模式

ReentrantReadWriteLock中的独占锁和共享锁同时存在公平和非公平两种模式,在上面的讲解中我们一直是以FairSync公平同步器讲解的公平模式下的独占锁和共享锁。在使用tryAcquire(int)方法获取独占锁,和使用tryAcquireShared(int)方法获取共享锁时,它们分别调用了writerShouldBlock()readerShouldBlock()来进行公平和非公平的区分:

  • // 获取独占锁的方法
  • protected final boolean tryAcquire(int acquires) {
  • ...
  • if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
  • return false;
  • ...
  • }
  • // 获取共享锁的方法
  • protected final int tryAcquireShared(int unused) {
  • ...
  • if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
  • ...
  • }
  • ...
  • }

这两个方法的声明都在Sync中:

  • abstract static class Sync extends AbstractQueuedSynchronizer {
  • ...
  • abstract boolean readerShouldBlock();
  • abstract boolean writerShouldBlock();
  • ...
  • }

但它们的具体实现在FairSync和NonfairSync中却各不同;下面是FairSync和NonfairSync中readerShouldBlock()各自的实现:

  • // FairSync中的readerShouldBlock()调用了hasQueuedPredecessors(),这个方法在上一篇文章中已经讲解过了
  • // 需要判断是否有前驱节点,如果有则返回false,否则返回true
  • final boolean readerShouldBlock() {
  • return hasQueuedPredecessors();
  • }
  • // NonfairSync中的readerShouldBlock()调用apparentlyFirstQueuedIsExclusive()方法
  • final boolean readerShouldBlock() {
  • return apparentlyFirstQueuedIsExclusive();
  • }
  • /**
  • * 当head节点不为null,且head节点的下一个节点s不为null,且s是独占模式(写线程),且s的线程不为null时,返回true。
  • * 即判断头节点的下一个节点是否是正在等待的独占锁(写锁),如果是,则需要等待。
  • * 目的是不应该让写锁始终等待,避免可能的写线程饥饿。
  • */
  • final boolean apparentlyFirstQueuedIsExclusive() {
  • Node h, s;
  • return (h = head) != null &&
  • (s = h.next) != null &&
  • !s.isShared() &&
  • s.thread != null;
  • }

可以发现,在竞争共享锁(即读锁)时,公平模式下竞争线程会首先查看同步队列中是否有线程已经等待了一段时间,如果有就表示自己让步;而在非公平模式下,只有在同步队列中第一个等待的线程竞争是独占锁(即写锁),才会进行让步,其他情况不会让步。非公平模式的这种设计是为了避免写线程因读线程过多而造成饥饿发生。

FairSync和NonfairSync中writerShouldBlock()各自的实现:

  • // FairSync中的writerShouldBlock()调用了hasQueuedPredecessors(),不再赘述
  • final boolean writerShouldBlock() {
  • return hasQueuedPredecessors();
  • }
  • // NonfairSync中的writerShouldBlock()直接返回false,即永远不让步
  • final boolean writerShouldBlock() {
  • return false; // writers can always barge
  • }

上面的源码表示,在竞争独占锁(即写锁)时,公平模式下竞争线程会首先查看同步队列中是否有线程已经等待了一段时间,如果有就表示自己让步;而在非公平模式下是永远不会让步的,这是由于竞争独占锁的是写线程,无需让步。

至此,关于ReentrantReadWriteLock中涉及的AQS相关的重要知识点已经讲解完毕了,类似于带有超时机制的获取锁操作、对异常的处理方式其实与之前讲解的ReentrantLock是类似的,这里就不再赘述。在后面的内容中,将探讨其他几类基于AQS实现的工具类,以便更深地理解AQS的实现和使用。