Java多线程
Java并发
JUC锁

Java多线程 23 - CountDownLatch详解

简介:CountDownLatch是一个同步辅助类,被称作”栅栏“,它能够实现在某些线程中执行的操作完成之前,让一个或多个线程一直等待。

1. CountDownLatch简介

CountDownLatch是一个同步辅助类,被称作”栅栏“,它能够实现在某些线程中执行的操作完成之前,让一个或多个线程一直等待。CountDownLatch和CyclicBarrier的区别如下:

  1. CountDownLatch的作用是允许多个线程等待其他线程完成执行;而CyclicBarrier则是允许多个线程相互等待。
  2. CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

关于CyclicBarrier的原理后面会讲解。

CountDownLatch函数列表如下:

  • // 构造一个用给定计数初始化的CountDownLatch
  • CountDownLatch(int count)
  • // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
  • void await()
  • // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
  • boolean await(long timeout, TimeUnit unit)
  • // 递减锁存器的计数,如果计数到达零,则释放所有等待的线程
  • void countDown()
  • // 返回当前计数
  • long getCount()
  • // 返回标识此锁存器及其状态的字符串
  • String toString()

CountDownLatch的类图如下:

1.CountDownLatch.png

CountDownLatch的数据结构很简单,它是通过AQS共享锁实现的。它包含了Sync类型的sync对象,Sync继承于AQS。

2. CountDownLatch示例

要想了解CountDownLatch内部的原理,首先需要知道CountDownLatch如何使用;在下面的例子中,模拟了十个人的赛跑,赛跑的过程可以分为四个部分:选手准备、裁判发令起跑、选手开始赛跑及最后的结果统计。在这个过程中,裁判发令起跑需要在所有选手准备完成之后进行,且选手起跑则需要在裁判发令之后,最后的分数统计则需要在所有选手跑完之后才能进行,因此在本例中使用了三个CountDownLatch实例进行控制;首先看示例代码:

  • package com.coderap.juc.lock.tools;
  • import java.util.*;
  • import java.util.concurrent.CountDownLatch;
  • class Runner implements Runnable {
  • private CountDownLatch prepareLatch;
  • private CountDownLatch startLatch;
  • private CountDownLatch endLatch;
  • private TreeMap<String, Long> results;
  • public Runner(CountDownLatch prepareLatch, CountDownLatch startLatch, CountDownLatch endLatch, TreeMap<String, Long> results) {
  • this.prepareLatch = prepareLatch;
  • this.startLatch = startLatch;
  • this.endLatch = endLatch;
  • this.results = results;
  • }
  • @Override
  • public void run() {
  • // 开始准备
  • System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " begin prepare");
  • try {
  • // 模拟准备工作花费的时间
  • Thread.sleep(new Random().nextInt(1000));
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • // 准备工作完成,将prepareLatch进行countDown,告诉裁判
  • this.prepareLatch.countDown();
  • System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " prepared already, wait the action signal");
  • try {
  • // 等待裁判发命令起跑
  • this.startLatch.await();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • // 开始起跑
  • System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " begin run");
  • long startTime = System.currentTimeMillis();
  • try {
  • // 模拟跑步花费时间
  • Thread.sleep(new Random().nextInt(3000));
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • long endTime = System.currentTimeMillis();
  • // 计算花费时间,并存入results
  • System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " end run, use time: " + (endTime - startTime) + " ms");
  • this.results.put(Thread.currentThread().getName(), endTime - startTime);
  • // 跑完全程,将endLatch进行countDown,告诉裁判
  • this.endLatch.countDown();
  • }
  • }
  • public class CountDownLatchTest {
  • public static void main(String[] args) throws InterruptedException {
  • // 参赛人数
  • int runnerCount = 10;
  • // 用于准备就绪的栅栏
  • CountDownLatch prepareLatch = new CountDownLatch(runnerCount);
  • // 用于起跑的栅栏
  • CountDownLatch startLatch = new CountDownLatch(1);
  • // 用于结束后计算排名的栅栏
  • CountDownLatch endLatch = new CountDownLatch(runnerCount);
  • // 用于装载最后结果的Map
  • TreeMap<String, Long> results = new TreeMap<>();
  • for (int i = 0; i < runnerCount; i++) {
  • // for循环创建相应数量的线程
  • new Thread(new Runner(prepareLatch, startLatch, endLatch, results), "Runner " + i).start();
  • }
  • // 等待所有线程准备完毕
  • prepareLatch.await();
  • System.out.println("-------------------- All Runners Already --------------------");
  • // 发号起跑
  • startLatch.countDown();
  • System.out.println("-------------------- Begin Run, Wait Results --------------------");
  • // 等待所有线程结束
  • endLatch.await();
  • // 所有线程结束后,就可以计算排名了
  • System.out.println("-------------------- Results Rank --------------------");
  • System.out.println("All Participate Runners Count: " + results.size());
  • List<Map.Entry<String, Long>> resultsList = new ArrayList<>(results.entrySet());
  • Collections.sort(resultsList, new Comparator<Map.Entry<String, Long>>() {
  • public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
  • return (int) (o1.getValue() - o2.getValue());
  • }
  • });
  • for (Map.Entry<String, Long> result : resultsList) {
  • System.out.println(result.getValue() + " : " + result.getKey());
  • }
  • }
  • }

首先我们看CountDownLatchTest的main()方法,它首先定义了参赛的选手数量runnerCount为10,然后创建了三个CountDownLatch对象,其中prepareLatch用于控制所有选手的全部准备就绪的时刻,startLatch用于控制所有选手起跑的时刻,endLatch则用于控制所有选手跑完的时刻。results是一个TreeMap对象,用于装载最后的比赛结果,它的键是模拟选手的线程的名字,值为每个选手最终的耗时。

在for循环中,创建了相应数量的线程并直接启动,for循环之后调用了prepareLatch.await(),主线程在执行完这行代码后会阻塞在这里,由于创建prepareLatch时我们给CountDownLatch的构造方法传入的count参数是runnerCount,即10,因此主线程直到prepareLatch被执行10次countDown()操作才会解除阻塞。这一步相当于选手的准备工作。

prepareLatch.countDown()会在哪里执行呢?我们观察实现了Runnable接口的Runner线程类,在Runner类的run()方法中,首先模拟了准备工作和准备工作花费的时间,准备工作完成后就会调用this.prepareLatch.countDown(),之后因为调用了this.startLatch.await()而阻塞。所创建的10个模拟参赛选手的线程都会执行上述的代码,最后都会阻塞在startLatch.await()这行代码上。这一步相当于所有选手已经准备完毕,等待裁判发号令。

但所有线程都执行了this.prepareLatch.countDown(),即一共执行10次,主线程就会从prepareLatch.await()这行代码的地方唤醒,进而执行startLatch.countDown(),由于创建startLatch时我们给CountDownLatch的构造方法传入的count参数的值是1,因此只要执行一次startLatch.countDown(),所有阻塞在startLatch.await()这行代码的Runner线程都会被唤醒,进行后面的赛跑。这一步相当于裁判发出了起跑号令。

同时主线程会往下执行到endLatch.await(),又会进入阻塞状态,由于创建endLatch时我们给CountDownLatch的构造方法传入的count参数是runnerCount,即10,因此主线程直到endLatch被执行10次countDown()操作才会解除阻塞。这一步相当于裁判在等待所有选手跑完。

回到Runner类的run()方法,在从this.startLatch.await()这行代码唤醒后,会进入下面的模拟跑步,在跑完之后又会调用this.endLatch.countDown()run()方法就结束了。当所有线程都执行了this.startLatch.countDown(),即一共执行10次,主线程就会从endLatch.await()这行代码的地方唤醒,执行最后的统计比赛结果的过程。

运行上面的代码,某一次的结果如下:

  • 1540780225299: Runner 0 begin prepare
  • 1540780225299: Runner 5 begin prepare
  • 1540780225300: Runner 4 begin prepare
  • 1540780225300: Runner 3 begin prepare
  • 1540780225300: Runner 2 begin prepare
  • 1540780225300: Runner 7 begin prepare
  • 1540780225300: Runner 1 begin prepare
  • 1540780225300: Runner 6 begin prepare
  • 1540780225300: Runner 8 begin prepare
  • 1540780225300: Runner 9 begin prepare
  • 1540780225333: Runner 4 prepared already, wait the action signal
  • 1540780225358: Runner 7 prepared already, wait the action signal
  • 1540780225505: Runner 5 prepared already, wait the action signal
  • 1540780225602: Runner 6 prepared already, wait the action signal
  • 1540780225664: Runner 3 prepared already, wait the action signal
  • 1540780225692: Runner 8 prepared already, wait the action signal
  • 1540780225753: Runner 1 prepared already, wait the action signal
  • 1540780225867: Runner 2 prepared already, wait the action signal
  • 1540780225932: Runner 0 prepared already, wait the action signal
  • 1540780226109: Runner 9 prepared already, wait the action signal
  • -------------------- All Runners Already --------------------
  • -------------------- Begin Run, Wait Results --------------------
  • 1540780226109: Runner 4 begin run
  • 1540780226109: Runner 7 begin run
  • 1540780226109: Runner 5 begin run
  • 1540780226110: Runner 6 begin run
  • 1540780226110: Runner 3 begin run
  • 1540780226110: Runner 8 begin run
  • 1540780226110: Runner 1 begin run
  • 1540780226110: Runner 2 begin run
  • 1540780226110: Runner 9 begin run
  • 1540780226110: Runner 0 begin run
  • 1540780226217: Runner 7 end run, use time: 107 ms
  • 1540780226338: Runner 5 end run, use time: 228 ms
  • 1540780226638: Runner 6 end run, use time: 528 ms
  • 1540780226780: Runner 2 end run, use time: 670 ms
  • 1540780227023: Runner 0 end run, use time: 913 ms
  • 1540780227182: Runner 1 end run, use time: 1072 ms
  • 1540780227834: Runner 8 end run, use time: 1724 ms
  • 1540780227847: Runner 4 end run, use time: 1738 ms
  • 1540780227902: Runner 3 end run, use time: 1792 ms
  • 1540780228582: Runner 9 end run, use time: 2472 ms
  • -------------------- Results Rank --------------------
  • All Participate Runners Count: 10
  • 107 : Runner 7
  • 228 : Runner 5
  • 528 : Runner 6
  • 670 : Runner 2
  • 913 : Runner 0
  • 1072 : Runner 1
  • 1724 : Runner 8
  • 1738 : Runner 4
  • 1792 : Runner 3
  • 2472 : Runner 9

3. CountDownLatch源码分析

有了上面对CountDownLatch的使用,接下来从源码层面分析一下CountDownLatch的实现。

注:CountDownLatch底层其实就是基于AQS的共享模式实现的,如果读者对AQS的实现不了解,建议先阅读前面三篇讲解AQS的文章,理解了AQS的实现后再来看CountDownLatch的源码会发现其实现真的非常简单。

CountDownLatch的源码比较简单,也不多,下面先全部贴出来,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  • public class CountDownLatch {
  • // 同步器类
  • private static final class Sync extends AbstractQueuedSynchronizer {
  • private static final long serialVersionUID = 4982264981922014374L;
  • // 构造方法
  • Sync(int count) {
  • // 直接将state设置为count
  • setState(count);
  • }
  • // 获取count,其实就是state的值
  • int getCount() {
  • return getState();
  • }
  • protected int tryAcquireShared(int acquires) {
  • // 当state为0时表示获取共享锁成功,否则失败
  • return (getState() == 0) ? 1 : -1;
  • }
  • protected boolean tryReleaseShared(int releases) {
  • // Decrement count; signal when transition to zero
  • // 自旋更新state,当state为0时表示可以唤醒阻塞的线程了
  • for (;;) {
  • int c = getState();
  • if (c == 0)
  • return false;
  • int nextc = c - 1;
  • if (compareAndSetState(c, nextc))
  • return nextc == 0;
  • }
  • }
  • }
  • // 同步器
  • private final Sync sync;
  • // 构造方法,会使用传入的count参数直接创建一个Sync对象
  • public CountDownLatch(int count) {
  • if (count < 0) throw new IllegalArgumentException("count < 0");
  • // 创建Sync对象
  • this.sync = new Sync(count);
  • }
  • public void await() throws InterruptedException {
  • // Sync的acquireSharedInterruptibly()方法来自于父类AQS
  • sync.acquireSharedInterruptibly(1);
  • }
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
  • // Sync的tryAcquireSharedNanos()方法来自于父类AQS
  • return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  • }
  • public void countDown() {
  • // Sync的releaseShared()方法来自于父类AQS
  • sync.releaseShared(1);
  • }
  • public long getCount() {
  • return sync.getCount();
  • }
  • public String toString() {
  • return super.toString() + "[Count = " + sync.getCount() + "]";
  • }
  • }

3.1. await操作

首先关注await()方法,await()方法调用了sync对象的acquireSharedInterruptibly()方法,该方法继承自AQS:

  • public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • if (tryAcquireShared(arg) < 0)
  • doAcquireSharedInterruptibly(arg);
  • }

acquireSharedInterruptibly()方法会调用tryAcquireShared(arg),这个方法来自Sync的重写,如果state为0的时候就返回1,否则返回-1;state变量会在创建CountDownLatch的时候直接初始化为传入的count参数。因此只要创建CountDownLatch的时传入大于0的count,tryAcquireShared(arg)就会返回-1。

tryAcquireShared(arg)返回-1后if条件成立,就会调用doAcquireSharedInterruptibly(arg),该方法源码如下:

  • private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  • final Node node = addWaiter(Node.SHARED);
  • boolean failed = true;
  • try {
  • for (;;) {
  • final Node p = node.predecessor();
  • if (p == head) {
  • int r = tryAcquireShared(arg);
  • if (r >= 0) {
  • setHeadAndPropagate(node, r);
  • p.next = null; // help GC
  • failed = false;
  • return;
  • }
  • }
  • if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  • throw new InterruptedException();
  • }
  • } finally {
  • if (failed)
  • cancelAcquire(node);
  • }
  • }

doAcquireSharedInterruptibly(arg)其实和之前在AQS共享模式中讲解的doAcquireShared(int)实现基本一样,差别在于前者会直接抛出异常。CountDownLatch中使用抛出异常的方法是可以保证某些线程阻塞后可以通过中断操作取消排队的。

doAcquireSharedInterruptibly(arg)方法会在调用tryAcquireShared(arg)返回值小于0时挂起当前线程,这也就实现了线程的等待。多个线程就是通过这种方式实现挂起等待的。

3.2. countDown操作

countDown()方法调用了syncreleaseShared(1),该方法继承自AQS的,源码如下:

  • public final boolean releaseShared(int arg) {
  • if (tryReleaseShared(arg)) {
  • doReleaseShared();
  • return true;
  • }
  • return false;
  • }

releaseShared(int)方法中又会调用tryReleaseShared(int),这个方法被CountDownLatch的Sync重写了:

  • protected boolean tryReleaseShared(int releases) {
  • // Decrement count; signal when transition to zero
  • // 自旋更新state,当state为0时表示可以唤醒阻塞的线程了
  • for (;;) {
  • int c = getState();
  • if (c == 0)
  • return false;
  • int nextc = c - 1;
  • if (compareAndSetState(c, nextc))
  • return nextc == 0;
  • }
  • }

tryReleaseShared(int)源码也比较简单,就是更新state的值,由于传入的release参数为1,因此每调用一次countDown()方法,就会将state的值减1,且当state为0时,tryReleaseShared(int)将返回true,上层的releaseShared(int)就会调用doReleaseShared(),这个方法在之前的AQS中已经讲解过了,它会传播唤醒同步队列中等待的获取共享锁的线程。

通过上面的分析,CountDownLatch的实现其实非常简单,总结一下有以下几个重要点:

  1. CountDownLatch基于AQS共享锁实现,在创建时会根据传入的count参数将AQS提供的state的值直接置为count的值。
  2. 当线程调用CountDownLatch实例的await()方法,会执行抢锁操作,判断是否抢锁成功的依据是state是否为0,如果不为0则代表抢锁失败,直接将线程包装成节点装入同步队列进行等待。
  3. 每次调用CountDownLatch实例的countDown(),都会使state的值减1,如果减1之后state的值减为0时,就会唤醒同步队列中阻塞的线程,由于是共享模式,这些线程会依次被传播唤醒继续执行。