Java多线程 25 - Semaphore详解
发布于 / 2018-05-23
简介:Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可。
1. Semaphore简介
Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()
来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()
来释放它所持有的信号量许可。
Semaphore的函数列表如下:
- // 创建具有给定的许可数和非公平的公平设置的Semaphore
- Semaphore(int permits)
- // 创建具有给定的许可数和给定的公平设置的Semaphore
- Semaphore(int permits, boolean fair)
- // 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
- void acquire()
- // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
- void acquire(int permits)
- // 从此信号量中获取许可,在有可用的许可前将其阻塞
- void acquireUninterruptibly()
- // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
- void acquireUninterruptibly(int permits)
- // 返回此信号量中当前可用的许可数
- int availablePermits()
- // 获取并返回立即可用的所有许可
- int drainPermits()
- // 返回一个collection,包含可能等待获取的线程
- protected Collection<Thread> getQueuedThreads()
- // 返回正在等待获取的线程的估计数目
- int getQueueLength()
- // 查询是否有线程正在等待获取
- boolean hasQueuedThreads()
- // 如果此信号量的公平设置为 true,则返回 true
- boolean isFair()
- // 根据指定的缩减量减小可用许可的数目
- protected void reducePermits(int reduction)
- // 释放一个许可,将其返回给信号量
- void release()
- // 释放给定数目的许可,将其返回到信号量
- void release(int permits)
- // 返回标识此信号量的字符串,以及信号量的状态
- String toString()
- // 仅在调用时此信号量存在一个可用许可,才从信号量获取许可
- boolean tryAcquire()
- // 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可
- boolean tryAcquire(int permits)
- // 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可
- boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- // 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
- boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore的类图如下:
从图中可以看出:
- 和ReentrantLock一样,Semaphore也包含了Sync类型的
sync
对象,Sync是一个继承于AQS的抽象类。 - Sync包括两个子类:公平信号量FairSync和非公平信号量NonfairSync。
sync
是FairSync或NonfairSync的实例;默认情况下为是NonfairSync实例,即默认是非公平信号量。
2. Semaphore示例
下面是一个简单的示例,演示了三个线程对Semaphore的使用:
- package com.coderap.juc.lock.tools;
- import java.util.Random;
- import java.util.concurrent.Semaphore;
- class Task implements Runnable {
- private final int acquire;
- private Semaphore semaphore;
- public Task(Semaphore semaphore, int acquire) {
- this.semaphore = semaphore;
- this.acquire = acquire;
- }
- @Override
- public void run() {
- try {
- System.out.println(Thread.currentThread().getName() + " available permits: " + this.semaphore.availablePermits() + ", need: " + this.acquire);
- // 获取许可
- this.semaphore.acquire(this.acquire);
- System.out.println(Thread.currentThread().getName() + " acquire permit: " + this.acquire);
- System.out.println(Thread.currentThread().getName() + " executed");
- // 睡眠
- Thread.sleep(new Random().nextInt(3000));
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 释放许可
- this.semaphore.release(this.acquire);
- System.out.println(Thread.currentThread().getName() + " release permit: " + this.acquire);
- }
- }
- }
- public class SemaphoreTest {
- public static void main(String [] args) {
- // 每一次只能由2个线程允许同时执行
- Semaphore semaphore = new Semaphore(10);
- // 创建3个线程执行任务,acquire参数表示每个线程将要获取的许可量
- new Thread(new Task(semaphore, 4), "Task 0").start();
- new Thread(new Task(semaphore, 3), "Task 1").start();
- new Thread(new Task(semaphore, 8), "Task 2").start();
- }
- }
源码比较简单,定义了一个许可量总数为10的Semaphore对象semaphore
,然后开启了三个线程分别使用该semaphore
获取对应的许可量,其中Task 0
线程获取许可量为4,Task 1
线程获取许可量为3,Task 2
线程获取许可量为8;因此,Task 0
和Task 1
是可以成功获取到相应的许可量的,但Task 3
由于获取的许可量是8,但此时semaphore
剩余的许可量为3,所以需要等待Task 0
和Task 1
释放掉获取的许可量后才能成功获取。上述代码某一次运行的结果如下:
- Task 0 available permits: 10, need: 4
- Task 0 acquire permit: 4
- Task 0 executed
- Task 1 available permits: 10, need: 3
- Task 1 acquire permit: 3
- Task 1 executed
- Task 2 available permits: 6, need: 8
- Task 1 release permit: 3
- Task 0 release permit: 4
- Task 2 acquire permit: 8
- Task 2 executed
- Task 2 release permit: 8
3. Semaphore源码分析
有了上面对Semaphore的使用,接下来从源码层面分析一下Semaphore的实现。
注:Semaphore底层其实也是基于AQS的共享模式实现的,如果读者对AQS的实现不了解,建议先阅读前面三篇讲解AQS的文章。
Semaphore的源码比较简单,下面先全部贴出来,基于JDK 1.7.0_07:
- package java.util.concurrent;
- import java.util.Collection;
- import java.util.concurrent.locks.AbstractQueuedSynchronizer;
- public class Semaphore implements java.io.Serializable {
- private static final long serialVersionUID = -3222578661600680210L;
- /** 同步器 */
- private final Sync sync;
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- // Sync的构造方法
- Sync(int permits) {
- // 直接将state设置为permits
- setState(permits);
- }
- // 获取permits
- final int getPermits() {
- // 直接获取state
- return getState();
- }
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 设置可以获得的信号量的许可数
- int available = getState();
- // 设置获得acquires个信号量许可之后,剩余的信号量许可数
- int remaining = available - acquires;
- // 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取当前state值
- int current = getState();
- // 计算添加释放的许可量后state的值
- int next = current + releases;
- // 溢出
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))
- // 修改state为新值后返回true表示释放
- return true;
- }
- }
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next > current) // underflow
- throw new Error("Permit count underflow");
- if (compareAndSetState(current, next))
- return;
- }
- }
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
- // 非公平同步器
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
- // 公平同步器
- static final class FairSync extends Sync {
- private static final long serialVersionUID = 2014338818796000944L;
- FairSync(int permits) {
- super(permits);
- }
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors())
- // 公平模式下需要先判断同步队列中是否有线程已经等待很久了
- return -1;
- int available = getState();
- // 计算从state中减去获取的许可量后的值remaining
- int remaining = available - acquires;
- /**
- * 如果remaining小于0,则直接返回remaining,注意此时返回的remaining是小于0的,表示获取失败
- * 如果remaining大于等于0,则尝试CAS修改state为remaining,如果修改成功就返回remaining,表示获取成功
- * 否则自旋进入下一次的尝试
- */
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
- }
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- // 根据传入的fair参数决定使用哪种同步器
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
- // 获取1个许可量
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- // 获取指定数量的许可量
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
- public void acquireUninterruptibly() {
- sync.acquireShared(1);
- }
- public boolean tryAcquire() {
- return sync.nonfairTryAcquireShared(1) >= 0;
- }
- public boolean tryAcquire(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public void acquireUninterruptibly(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireShared(permits);
- }
- public boolean tryAcquire(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- return sync.nonfairTryAcquireShared(permits) >= 0;
- }
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- }
- // 释放1个许可量
- public void release() {
- sync.releaseShared(1);
- }
- // 释放指定数量的许可量
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
- // 获取可用的permit数量
- public int availablePermits() {
- return sync.getPermits();
- }
- // 获取并返回立即可用的所有许可
- public int drainPermits() {
- return sync.drainPermits();
- }
- // 根据指定的缩减量减小可用许可的数目
- protected void reducePermits(int reduction) {
- if (reduction < 0) throw new IllegalArgumentException();
- sync.reducePermits(reduction);
- }
- public boolean isFair() {
- return sync instanceof FairSync;
- }
- public final boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
- public final int getQueueLength() {
- return sync.getQueueLength();
- }
- protected Collection<Thread> getQueuedThreads() {
- return sync.getQueuedThreads();
- }
- public String toString() {
- return super.toString() + "[Permits = " + sync.getPermits() + "]";
- }
- }
3.1. 基本结构
Semaphore的实现使用的是AQS的共享模式;Semaphore中Sync是一个抽象类,它实现了大部分的基础结构和方法。在创建Semaphore实例时,Sync的构造方法会被调用,并将传给Semaphore构造方法的参数premits
传入:
- // Sync的构造方法
- Sync(int permits) {
- // 直接将state设置为permits
- setState(permits);
- }
从源码可以得知,Sync对permits
参数的处理是直接使用AQS提供的state
来控制:
- // 获取permits
- final int getPermits() {
- // 直接获取state
- return getState();
- }
- // 返回立即可用的所有许可,并将state置为0
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
同时Semaphore提供了对permits
进行各类快捷操作的方法:
- // 获取可用的permit数量
- public int availablePermits() {
- return sync.getPermits();
- }
- // 获取并返回立即可用的所有许可
- public int drainPermits() {
- return sync.drainPermits();
- }
- // 根据指定的缩减量减小可用许可的数目
- protected void reducePermits(int reduction) {
- if (reduction < 0) throw new IllegalArgumentException();
- sync.reducePermits(reduction);
- }
Sync的继承有公平同步器FairSync和非公平同步器NonfairSync两种,这个与之前讲解的ReentrantLock和ReentrantReadWriteLock很相似:
- // 默认情况下使用的是非公平同步器
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- // 根据传入的fair参数决定使用哪种同步器
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
3.2. acquire操作
首先看Semaphore的acquire(int)
方法,当线程调用Semaphore实例的acquire(int)
方法时会传入要求的许可量,如果获取不到要求的许可量就会被阻塞,acquire(int)
方法存在一个没有参数的重载acquire()
,默认传入的是permits
参数为1,它们的源码如下:
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
会发现,acquire(int)
方法底层调用的是Sync的acquireSharedInterruptibly(int)
方法,这个方法继承自AQS:
- public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
AQS的acquireSharedInterruptibly(int)
方法内部先判断了发生中断的情况,然后调用了tryAcquireShared(arg)
,这个方法是被Semaphore的内部类Sync重写的,即以获取共享锁的方式来控制线程是否成功获取要求的许可量;以FairSync中重写的为例:
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors())
- // 公平模式下需要先判断同步队列中是否有线程已经等待很久了
- return -1;
- int available = getState();
- // 计算从state中减去获取的许可量后的值remaining
- int remaining = available - acquires;
- /**
- * 如果remaining小于0,则直接返回remaining,注意此时返回的remaining是小于0的,表示获取失败
- * 如果remaining大于等于0,则尝试CAS修改state为remaining,如果修改成功就返回remaining,表示获取成功
- * 否则自旋进入下一次的尝试
- */
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
在公平模式下,会首先判断同步队列是否有线程已经等待了很久,如果有会直接返回-1表示当前线程获取共享锁失败,也即是表示当前线程获取要求的许可量失败;如果没有线程已经在等待,就会计算剩余的许可量是否满足当前线程请求的许可量,计算出来的remaining
结果值如果小于0,表示不满足,将直接返回remaining
(值为负数),如果remaining
大于等于0,表示满足,就会从剩余许可量中减去要求的许可量并将新值更新到state
变量,然后返回remaining
(值为非负数)。
上层的acquireSharedInterruptibly(int)
方法会根据tryAcquireShared(int)
的返回值决定是否调用doAcquireSharedInterruptibly(arg)
,该方法来自于AQS,是doAcquireShared(int)
的可中断版本,功能基本与doAcquireShared(int)
一样,它的功能主要用于将当前线程包装为共享节点,添加到AQS的同步队列进行等待;具体功能可以查看前面讲解AQS的文章8.AbstractQueuedSynchronizer详解(三),这里不再赘述。
3.3. release操作
release操作用于释放许可量,它在Semaphore中体现为两个重载方法:
- // 释放1个许可量
- public void release() {
- sync.releaseShared(1);
- }
- // 释放指定数量的许可量
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
与acquire操作非常类似,release操作调用了Sync的releaseShared(int)
方法,而这个方法继承自AQS:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
方法体内调用的tryReleaseShared(arg)
则被Sync重写:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取当前state值
- int current = getState();
- // 计算添加释放的许可量后state的值
- int next = current + releases;
- // 溢出
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next))
- // 修改state为新值后返回true表示释放成功
- return true;
- }
- }
tryReleaseShared(int)
方法总是会返回true,然后releaseShared(int)
方法会调用doReleaseShared()
,这个方法在之前的8.AbstractQueuedSynchronizer详解(三)中也分析过了,用于传播唤醒所有的等待线程竞争许可量的获取,成功获取到许可量的线程会继续往下执行,而没获取到许可量的线程又会重新被挂起等待,这里只贴出代码,不再赘述:
- /**
- * Release action for shared mode -- signal successor and ensure
- * propagation. (Note: For exclusive mode, release just amounts
- * to calling unparkSuccessor of head if it needs signal.)
- * 共享模式的释放操作,将唤醒后继节点并保证传播性
- * 与独占模式不同,独占模式只会尝试唤醒后继节点
- */
- private void doReleaseShared() {
- /*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
- */
- for (; ; ) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- // 如果当前节点的waitStatus为SIGNAL,则尝试改为0,修改成功就唤醒头节点的后继节点锁包装的线程,修改失败重新尝试修改
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- // 这里会唤醒头节点的后继节点
- unparkSuccessor(h);
- } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- // 如果头节点的waitStatus为0,则尝试修改为PROPAGATE,如果修改失败则重新尝试修改
- continue; // loop on failed CAS
- }
- /**
- * 由于头节点的后继节点所包装的线程被唤醒了,所以当被唤醒的线程获取到共享锁后,头节点可能会被修改;
- * 因此当头节点被修改后,继续循环操作,开始下一轮的传播唤醒;
- * 如果头节点没有被改变,说明此时虽然唤醒了后继节点,但后继节点并没有获取到共享锁,
- * 因此直接break,将下一轮的传播唤醒操作交给后继节点
- */
- if (h == head) // loop if head changed
- break;
- }
- }
3.4. 非公平模式
Semaphore中的非公平同步器的实现是NonFairSync,非公平信号量许可的释放与公平信号量许可的释放是一样的,不同的是它们获取许可量的机制不同,非公平同步器的tryAcquireShared(int)
调用了父类Sync中nonfairTryAcquireShared(int)
:
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
nonfairTryAcquireShared(int)
方法的实现如下:
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 设置可以获得的信号量的许可数
- int available = getState();
- // 设置获得acquires个信号量许可之后,剩余的信号量许可数
- int remaining = available - acquires;
- // 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
在nonfairTryAcquireShared(int)
的for循环中,它都会直接判断当前剩余的许可量是否足够;足够的话,则直接设置可以获得的信号量许可数,进而再获取信号量。
而公平信号量的tryAcquireShared(int)
中,在获取信号量之前会通过if (hasQueuedPredecessors())
来判断当前同步队列是不是有线程在等待,如果有会直接返回-1表示当前线程获取许可量失败。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...