Java多线程 24 - CyclicBarrier详解
发布于 / 2018-05-14
简介:CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
1. CyclicBarrier简介
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。注意比较CountDownLatch和CyclicBarrier:
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
CyclicBarrier函数列表如下:
- // 创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预定义的操作
- CyclicBarrier(int parties)
- // 创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程执行
- CyclicBarrier(int parties, Runnable barrierAction)
- // 在所有参与者都已经在此barrier上调用await方法之前,将一直等待
- int await()
- // 在所有参与者都已经在此屏障上调用await方法之前将一直等待,或者超出了指定的等待时间
- int await(long timeout, TimeUnit unit)
- // 返回当前在屏障处等待的参与者数目
- int getNumberWaiting()
- // 返回要求启动此 barrier 的参与者数目
- int getParties()
- // 查询此屏障是否处于损坏状态
- boolean isBroken()
- // 将屏障重置为其初始状态
- void reset()
CyclicBarrier的类图如下:
CyclicBarrier是包含了ReentrantLock对象lock
和Condition对象trip
,它是通过独占锁实现的。
2. CyclicBarrier示例
下面先以一个简单的示例演示CyclicBarrier怎么使用。示例要表达是多个员工等待开会的过程,当多名员工决定进行会议时,会陆续来到会议室,只有等到所有的员工都到达会议室后,才能开始会议进行各自的发言。代码如下:
- package com.coderap.juc.lock.tools;
- import java.util.Date;
- import java.util.Random;
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- class Staff implements Runnable {
- private CyclicBarrier cyclicBarrier;
- public Staff(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
- @Override
- public void run() {
- // 模拟耗时
- try {
- Thread.sleep(new Random().nextInt(3000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 到达会议室,开始等待其他线程
- System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " wait to meeting");
- try {
- // 告知cyclicBarrier已经开始等待
- this.cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- // 开始报告
- try {
- Thread.sleep(new Random().nextInt(3000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(new Date().getTime() + ": " + Thread.currentThread().getName() + " begin reporting");
- }
- }
- public class CyclicBarrierTest {
- public static void main(String[] args) {
- // 与会人数
- int meetCount = 10;
- CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
- @Override
- public void run() {
- // 当所有与会人到齐后开始会议
- System.out.println(new Date().getTime() + ": " + "All staff already, let's begin meeting");
- }
- });
- for (int i = 0; i < meetCount; i++) {
- // 循环创建相应的与会线程
- new Thread(new Staff(cyclicBarrier), "Staff " + i).start();
- }
- }
- }
在示例代码中,CyclicBarrierTest类的main()
方法中创建了CyclicBarrier对象cyclicBarrier
时传入了两个参数,其中parties
为10,barrierAction
为一个Runnable对象。接下来开启了10个Staff线程实例模拟10个员工进行会议,且将cyclicBarrier
对象传给了这10个Staff实例。在Staff类的run()
方法中首先等待了一段时间,然后调用this.cyclicBarrier.await()
告知cyclicBarrier
对象自己已经开始等待,然后就阻塞在这行代码处。当所有的Staff线程实例都调用了this.cyclicBarrier.await()
后,阻塞的Staff线程会被唤醒,此时在创建CyclicBarrier对象传入的Runnable线程实例会被启动(作为一个子线程),然后所有的Staff线程都会往下执行。
该示例的运行结果如下:
- 1571457111264: Staff 9 wait to meeting
- 1571457111614: Staff 6 wait to meeting
- 1571457111949: Staff 8 wait to meeting
- 1571457112454: Staff 1 wait to meeting
- 1571457113092: Staff 7 wait to meeting
- 1571457113363: Staff 3 wait to meeting
- 1571457113555: Staff 5 wait to meeting
- 1571457114048: Staff 4 wait to meeting
- 1571457115054: Staff 2 wait to meeting
- 1571457115974: Staff 0 wait to meeting
- 1571457115974: All staff already, let's begin meeting
- 1571457116021: Staff 0 begin reporting
- 1571457116155: Staff 4 begin reporting
- 1571457116493: Staff 2 begin reporting
- 1571457116663: Staff 6 begin reporting
- 1571457116745: Staff 9 begin reporting
- 1571457117233: Staff 1 begin reporting
- 1571457117238: Staff 3 begin reporting
- 1571457118565: Staff 7 begin reporting
- 1571457120413: Staff 8 begin reporting
- 1571457120638: Staff 5 begin reporting
3. CyclicBarrier源码分析
有了上面对CyclicBarrier的使用,接下来从源码层面分析一下CyclicBarrier的实现。
注:CyclicBarrier底层是基于AQS和Condition实现的,如果读者对AQS和Condition的实现不了解,建议先阅读前面三篇讲解AQS的文章,理解了AQS的实现后再来看CyclicBarrier的源码会发现其实现真的非常简单。
CyclicBarrier的源码比较简单,下面先全部贴出来,基于JDK 1.7.0_07:
- package java.util.concurrent;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class CyclicBarrier {
- // 构造方法
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- // 构造方法
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- // 表示一代栅栏
- private static class Generation {
- // 栅栏是否被强制释放
- boolean broken = false;
- }
- /** 用于控制栅栏入口的锁 */
- private final ReentrantLock lock = new ReentrantLock();
- /** 用于控制线程等待的Condition对象 */
- private final Condition trip = lock.newCondition();
- /** 参与的线程总数 */
- private final int parties;
- /* 当栅栏被放开时执行的回调 */
- private final Runnable barrierCommand;
- /** 当前这一代栅栏 */
- private Generation generation = new Generation();
- // 用于记录当前已经处于等待状态的线程数
- private int count;
- // 开启下一代栅栏
- private void nextGeneration() {
- // 唤醒所有阻塞的线程
- trip.signalAll();
- // 重置count
- count = parties;
- // 创建一个新的Generation对象
- generation = new Generation();
- }
- // 强制释放栅栏
- private void breakBarrier() {
- // 更新栅栏是否是被强制释放的记录
- generation.broken = true;
- // 重置count
- count = parties;
- // 唤醒所有阻塞的线程
- trip.signalAll();
- }
- // 主要的await等待方法
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- // 拿到重入锁并上锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 拿到Generation对象
- final Generation g = generation;
- // 如果当前Generation对象的栅栏是否已经被释放
- if (g.broken)
- throw new BrokenBarrierException();
- // 如果出现异常,将强制释放栅栏,避免被阻塞的线程饿死,同时抛出异常
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 计数减1
- int index = --count;
- if (index == 0) { // tripped
- // 如果计数变为0,表示栅栏可以释放了
- // 记录是否执行了释放栅栏的变量
- boolean ranAction = false;
- try {
- // 执行释放栅栏的回调线程
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- /**
- * 更新是否执行了释放栅栏的变量为true
- * 注意,如果ranAction更新为true,说明barrierCommand执行没有抛错
- * 如果如果ranAction没有被更新为true,则可能是barrierCommand执行抛错了,
- * 将不会执行后面的nextGeneration()代码唤醒阻塞的线程,
- * 因此需要在finally块中强制释放栅栏,避免阻塞线程饿死
- */
- ranAction = true;
- /**
- * 释放栅栏后开启下一个新的Generation
- * nextGeneration()里会调用trip.signalAll()唤醒所有阻塞线程
- */
- nextGeneration();
- return 0;
- } finally {
- /**
- * 检查是否执行了释放栅栏的回调线程,
- * 如果没执行说明可能是在执行释放栅栏的回调线程时抛错了
- * 因此就强制释放栅栏
- */
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- // 自旋循环,直到栅栏被放开、被强制释放栅栏、遇到中断操作或超时
- for (;;) {
- try {
- if (!timed)
- /**
- * 没有使用超时机制
- * 这里的await会将线程所在节点移入条件队列,
- * 然后释放state值,挂起线程
- */
- trip.await();
- else if (nanos > 0L)
- // 带有超时的挂起
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- // 运行到这里,说明在挂起期间被中断了
- if (g == generation && ! g.broken) {
- // 如果还是当前这一代的栅栏,但栅栏没有被释放,就强制释放栅栏
- breakBarrier();
- // 然后抛出异常
- throw ie;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- /**
- * 这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。
- * 其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断
- */
- Thread.currentThread().interrupt();
- }
- }
- /**
- * 当栅栏被释放时,抛出BrokenBarrierException异常
- * 比如某个线程在await期间被中断了,它会调用breakBarrier()
- * 而breakBarrier()会将g.broken设置为true,然后唤醒所有线程
- * 因此其他线程唤醒后运行到这里就会抛出BrokenBarrierException异常
- */
- if (g.broken)
- throw new BrokenBarrierException();
- // 已经重置为下一代了,直接返回index
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- // 超时了,强制释放栅栏,并抛出超时异常
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
- // 获取parties
- public int getParties() {
- return parties;
- }
- // 当前线程进入等待
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- // 带有超时机制的等待
- public int await(long timeout, TimeUnit unit)
- throws InterruptedException,
- BrokenBarrierException,
- TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
- // 判断栅栏是否已被释放
- public boolean isBroken() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return generation.broken;
- } finally {
- lock.unlock();
- }
- }
- // 重置操作,这个操作会强制释放栅栏、唤醒所有线程并开启下一代
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); // 强制释放栅栏
- nextGeneration(); // 开启下一代Generation
- } finally {
- lock.unlock();
- }
- }
- // 获取等待的线程数量
- public int getNumberWaiting() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return parties - count;
- } finally {
- lock.unlock();
- }
- }
- }
3.1. 基本结构
我们先来看一下CyclicBarrier中基本的成员变量:
- /** 用于控制栅栏入口的锁 */
- private final ReentrantLock lock = new ReentrantLock();
- /** 用于控制线程等待的Condition对象 */
- private final Condition trip = lock.newCondition();
- /** 参与的线程总数 */
- private final int parties;
- /* 当栅栏被放开时执行的回调 */
- private final Runnable barrierCommand;
- /** 当前这一代栅栏 */
- private Generation generation = new Generation();
- // 用于记录当前已经处于等待状态的线程数
- private int count;
CyclicBarrier使用了ReentrantLock来保证多线程对栅栏的同步访问,并且使用了Condition方式来控制线程的等待和唤醒。
从之前的介绍可知,CyclicBarrier是可以重复使用的,而实现重复使用的方式就是在其内部维护了一个辅助类Generation,该类是一个静态内部类,只有一个成员变量broken
用于标识当前这一代的栅栏是否被强制释放:
- // 表示一代栅栏
- private static class Generation {
- // 栅栏是否被强制释放
- boolean broken = false;
- }
CyclicBarrier提供了快捷方法nextGeneration()
用于直接开启下一代栅栏:
- // 开启下一代栅栏
- private void nextGeneration() {
- // 唤醒所有阻塞的线程
- trip.signalAll();
- // 重置count
- count = parties;
- // 创建一个新的Generation对象
- generation = new Generation();
- }
方法体比较简单,就是唤醒阻塞在当前这一代栅栏上的所有线程,然后重置count
参数为原始的parties
,接着创建新一代Generation。
另外CyclicBarrier提供了reset()
重置方法,用于强制释放栅栏并开启下一代Generation:
- // 重置操作,这个操作会强制释放栅栏、唤醒所有线程并开启下一代
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); // 强制释放栅栏
- nextGeneration(); // 开启下一代Generation
- } finally {
- lock.unlock();
- }
- }
reset()
方法用到的breakBarrier()
方法如下:
- // 强制释放栅栏
- private void breakBarrier() {
- // 更新栅栏是否是被强制释放的记录
- generation.broken = true;
- // 重置count
- count = parties;
- // 唤醒所有阻塞的线程
- trip.signalAll();
- }
另外一些辅助方法都是用于获取某些内部状态:
- // 获取parties
- public int getParties() {
- return parties;
- }
- // 判断栅栏是否已被释放
- public boolean isBroken() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return generation.broken;
- } finally {
- lock.unlock();
- }
- }
- // 获取等待的线程数量
- public int getNumberWaiting() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return parties - count;
- } finally {
- lock.unlock();
- }
- }
3.2. await操作
当我们调用CyclicBarrier的await()
方法时就会让当前线程进入等待阻塞状态,直到指定数量的所有的线程都调用了await()
,这些进入等待阻塞状态的线程才会被唤醒。await()
方法另外有一个带有超时机制的重载方法,它们都调用了内部的dowait(false, 0L)
方法:
- // 主要的await等待方法
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- // 拿到重入锁并上锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 拿到Generation对象
- final Generation g = generation;
- // 如果当前Generation对象的栅栏是否已经被释放
- if (g.broken)
- throw new BrokenBarrierException();
- // 如果出现异常,将强制释放栅栏,避免被阻塞的线程饿死,同时抛出异常
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 计数减1
- int index = --count;
- if (index == 0) { // tripped
- // 如果计数变为0,表示栅栏可以释放了
- // 记录是否执行了释放栅栏的变量
- boolean ranAction = false;
- try {
- // 执行释放栅栏的回调线程
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- /**
- * 更新是否执行了释放栅栏的变量为true
- * 注意,如果ranAction更新为true,说明barrierCommand执行没有抛错
- * 如果如果ranAction没有被更新为true,则可能是barrierCommand执行抛错了,
- * 将不会执行后面的nextGeneration()代码唤醒阻塞的线程,
- * 因此需要在finally块中强制释放栅栏,避免阻塞线程饿死
- */
- ranAction = true;
- /**
- * 释放栅栏后开启下一个新的Generation
- * nextGeneration()里会调用trip.signalAll()唤醒所有阻塞线程
- */
- nextGeneration();
- return 0;
- } finally {
- /**
- * 检查是否执行了释放栅栏的回调线程,
- * 如果没执行说明可能是在执行释放栅栏的回调线程时抛错了
- * 因此就强制释放栅栏
- */
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- // 自旋循环,直到栅栏被放开、被强制释放栅栏、遇到中断操作或超时
- for (;;) {
- try {
- if (!timed)
- /**
- * 这里的await会将线程所在节点移入条件队列,
- * 然后释放state值,挂起线程
- */
- trip.await();
- else if (nanos > 0L)
- // 带有超时的挂起
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- // 运行到这里,说明在挂起期间被中断了
- if (g == generation && ! g.broken) {
- // 如果还是当前这一代的栅栏,但栅栏没有被释放,就强制释放栅栏
- breakBarrier();
- // 然后抛出异常
- throw ie;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- /**
- * 这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。
- * 其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断
- */
- Thread.currentThread().interrupt();
- }
- }
- /**
- * 当栅栏被释放时,抛出BrokenBarrierException异常
- * 比如某个线程在await期间被中断了,它会调用breakBarrier()
- * 而breakBarrier()会将g.broken设置为true,然后唤醒所有线程
- * 因此其他线程唤醒后运行到这里就会抛出BrokenBarrierException异常
- */
- if (g.broken)
- throw new BrokenBarrierException();
- // 已经重置为下一代了,直接返回index
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- // 超时了,强制释放栅栏,并抛出超时异常
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
在dowait(boolean, long)
方法中,在做完一些必要的检查后工作,会对count
值进行减1操作,如果减1后的值为0,则代表指定数量的所有线程都执行了await()
方法,可以释放栅栏了。
我们先关注count
减1后不为0的情况,这时会跳过if (index == 0)
成立的分支体,进入下面的无限for循环中自旋。for循环中会根据是否使用了超时机制,调用Condition对象trip
相应的await()
方法来挂起当前线程;当线程在挂起过程中被中断了,会在catch块中处理InterruptedException中断异常,如果在被中断的时候还处于当前的这一代栅栏且栅栏没有被强制释放,那么就强制释放栅栏,然后将刚刚获取的InterruptedException异常抛给上一层。这里需要注意的是,正常情况下当执行到for循环的try块的时候,当前线程已经被挂起了,等待被唤醒。
3.3. 释放栅栏
当最后一个线程在调用了CyclicBarrier的await()
方法后,由于count
值减1为0会释放栅栏;释放的操作比较简单,首先由当前线程(也即是最后一个调用CyclicBarrier的await()
方法的线程)执行在创建CyclicBarrier时传入的回调barrierCommand
,注意这里的try ... finally
块,当执行回调barrierCommand
如果抛出错误时,try代码块就不会往下执行nextGeneration()
操作,但finally块还是会保证执行的;在上面分析过,nextGeneration()
操作肩负着唤醒所有正处于阻塞线程的任务,如果它没有被执行,那些阻塞的线程就无法被唤醒了;为了解决这个问题,在finally块中进行了判断,如果try块中抛出了错误导致nextGeneration()
没被调用,就会保证执行breakBarrier()
来强制释放栅栏。
nextGeneration()
和breakBarrier()
中都调用了trip.signalAll()
来唤醒所有因调用trip.await()
阻塞的线程,这些线程被唤醒后,会继续执行for循环剩余的代码,首先根据g.broken
是否为true判断栅栏是不是被强制释放的,如果是就抛出BrokenBarrierException异常,结束for循环,否则继续判断当前的CyclicBarrier的栅栏是不是已经被重置为下一代了(注意:一旦栅栏被正常释放,generation
一定被重置成新的Generation对象),如果是就返回index,如果不是则代表还处于当前这一代的栅栏上(这种情况可能是await()
超时或假醒造成的),因此需要判断await()
操作是否超时,如果超时就强制释放栅栏并抛出TimeoutException异常,如果没有超时将重新执行for循环进入下一轮挂起。
注:
index
返回值为当前线程执行await()
方法顺序的逆序索引,0表示当前线程是最后一个到达的线程。
这里总结一下解除阻塞线程的情况有以下几种:
- 最后一个线程调用
await()
,这种是正常情况。 - 当前线程被中断,此时会强制释放栅栏。
- 其他正在该CyclicBarrier上等待的线程被中断,此时会强制释放栅栏。
- 其他正在该CyclicBarrier上等待的线程超时,此时会强制释放栅栏。
- 其他某个线程调用该CyclicBarrier的
reset()
方法,此时会强制释放栅栏。
如果当前线程在进入此方法时已经设置了该线程的中断状态或者在等待时被中断,将抛出InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时栅栏被reset()
或者在调用await()
时栅栏被强制释放,将抛出BrokenBarrierException异常。
如果任何线程在等待时被中断,则其他所有等待线程都将抛出BrokenBarrierException异常,并将栅栏的broken
置于为true。如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作(barrierAction
),那么在允许其他线程继续运行之前,由当前线程执行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将栅栏的broken
置于为true。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...