Java多线程
Java并发
JUC锁

Java多线程 25——Semaphore详解

简介:Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可。

1. Semaphore简介

Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可。

Semaphore的函数列表如下:

  • // 创建具有给定的许可数和非公平的公平设置的Semaphore
  • Semaphore(int permits)
  • // 创建具有给定的许可数和给定的公平设置的Semaphore
  • Semaphore(int permits, boolean fair)
  • // 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
  • void acquire()
  • // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
  • void acquire(int permits)
  • // 从此信号量中获取许可,在有可用的许可前将其阻塞
  • void acquireUninterruptibly()
  • // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
  • void acquireUninterruptibly(int permits)
  • // 返回此信号量中当前可用的许可数
  • int availablePermits()
  • // 获取并返回立即可用的所有许可
  • int drainPermits()
  • // 返回一个collection,包含可能等待获取的线程
  • protected Collection<Thread> getQueuedThreads()
  • // 返回正在等待获取的线程的估计数目
  • int getQueueLength()
  • // 查询是否有线程正在等待获取
  • boolean hasQueuedThreads()
  • // 如果此信号量的公平设置为 true,则返回 true
  • boolean isFair()
  • // 根据指定的缩减量减小可用许可的数目
  • protected void reducePermits(int reduction)
  • // 释放一个许可,将其返回给信号量
  • void release()
  • // 释放给定数目的许可,将其返回到信号量
  • void release(int permits)
  • // 返回标识此信号量的字符串,以及信号量的状态
  • String toString()
  • // 仅在调用时此信号量存在一个可用许可,才从信号量获取许可
  • boolean tryAcquire()
  • // 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可
  • boolean tryAcquire(int permits)
  • // 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit)
  • // 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
  • boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore的类图如下:

1.Semaphore.png

从图中可以看出:

  1. 和ReentrantLock一样,Semaphore也包含了Sync类型的sync对象,Sync是一个继承于AQS的抽象类。
  2. Sync包括两个子类:公平信号量FairSync和非公平信号量NonfairSync。sync是FairSync或NonfairSync的实例;默认情况下为是NonfairSync实例,即默认是非公平信号量。

2. Semaphore示例

下面是一个简单的示例,演示了三个线程对Semaphore的使用:

  • package com.coderap.juc.lock.tools;
  • import java.util.Random;
  • import java.util.concurrent.Semaphore;
  • class Task implements Runnable {
  • private final int acquire;
  • private Semaphore semaphore;
  • public Task(Semaphore semaphore, int acquire) {
  • this.semaphore = semaphore;
  • this.acquire = acquire;
  • }
  • @Override
  • public void run() {
  • try {
  • System.out.println(Thread.currentThread().getName() + " available permits: " + this.semaphore.availablePermits() + ", need: " + this.acquire);
  • // 获取许可
  • this.semaphore.acquire(this.acquire);
  • System.out.println(Thread.currentThread().getName() + " acquire permit: " + this.acquire);
  • System.out.println(Thread.currentThread().getName() + " executed");
  • // 睡眠
  • Thread.sleep(new Random().nextInt(3000));
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • } finally {
  • // 释放许可
  • this.semaphore.release(this.acquire);
  • System.out.println(Thread.currentThread().getName() + " release permit: " + this.acquire);
  • }
  • }
  • }
  • public class SemaphoreTest {
  • public static void main(String [] args) {
  • // 每一次只能由2个线程允许同时执行
  • Semaphore semaphore = new Semaphore(10);
  • // 创建3个线程执行任务,acquire参数表示每个线程将要获取的许可量
  • new Thread(new Task(semaphore, 4), "Task 0").start();
  • new Thread(new Task(semaphore, 3), "Task 1").start();
  • new Thread(new Task(semaphore, 8), "Task 2").start();
  • }
  • }

源码比较简单,定义了一个许可量总数为10的Semaphore对象semaphore,然后开启了三个线程分别使用该semaphore获取对应的许可量,其中Task 0线程获取许可量为4,Task 1线程获取许可量为3,Task 2线程获取许可量为8;因此,Task 0Task 1是可以成功获取到相应的许可量的,但Task 3由于获取的许可量是8,但此时semaphore剩余的许可量为3,所以需要等待Task 0Task 1释放掉获取的许可量后才能成功获取。上述代码某一次运行的结果如下:

  • Task 0 available permits: 10, need: 4
  • Task 0 acquire permit: 4
  • Task 0 executed
  • Task 1 available permits: 10, need: 3
  • Task 1 acquire permit: 3
  • Task 1 executed
  • Task 2 available permits: 6, need: 8
  • Task 1 release permit: 3
  • Task 0 release permit: 4
  • Task 2 acquire permit: 8
  • Task 2 executed
  • Task 2 release permit: 8

3. Semaphore源码分析

有了上面对Semaphore的使用,接下来从源码层面分析一下Semaphore的实现。

注:Semaphore底层其实也是基于AQS的共享模式实现的,如果读者对AQS的实现不了解,建议先阅读前面三篇讲解AQS的文章。

Semaphore的源码比较简单,下面先全部贴出来,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.Collection;
  • import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  • public class Semaphore implements java.io.Serializable {
  • private static final long serialVersionUID = -3222578661600680210L;
  • /** 同步器 */
  • private final Sync sync;
  • abstract static class Sync extends AbstractQueuedSynchronizer {
  • private static final long serialVersionUID = 1192457210091910933L;
  • // Sync的构造方法
  • Sync(int permits) {
  • // 直接将state设置为permits
  • setState(permits);
  • }
  • // 获取permits
  • final int getPermits() {
  • // 直接获取state
  • return getState();
  • }
  • final int nonfairTryAcquireShared(int acquires) {
  • for (;;) {
  • // 设置可以获得的信号量的许可数
  • int available = getState();
  • // 设置获得acquires个信号量许可之后,剩余的信号量许可数
  • int remaining = available - acquires;
  • // 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
  • if (remaining < 0 ||
  • compareAndSetState(available, remaining))
  • return remaining;
  • }
  • }
  • protected final boolean tryReleaseShared(int releases) {
  • for (;;) {
  • // 获取当前state值
  • int current = getState();
  • // 计算添加释放的许可量后state的值
  • int next = current + releases;
  • // 溢出
  • if (next < current) // overflow
  • throw new Error("Maximum permit count exceeded");
  • if (compareAndSetState(current, next))
  • // 修改state为新值后返回true表示释放
  • return true;
  • }
  • }
  • final void reducePermits(int reductions) {
  • for (;;) {
  • int current = getState();
  • int next = current - reductions;
  • if (next > current) // underflow
  • throw new Error("Permit count underflow");
  • if (compareAndSetState(current, next))
  • return;
  • }
  • }
  • final int drainPermits() {
  • for (;;) {
  • int current = getState();
  • if (current == 0 || compareAndSetState(current, 0))
  • return current;
  • }
  • }
  • }
  • // 非公平同步器
  • static final class NonfairSync extends Sync {
  • private static final long serialVersionUID = -2694183684443567898L;
  • NonfairSync(int permits) {
  • super(permits);
  • }
  • protected int tryAcquireShared(int acquires) {
  • return nonfairTryAcquireShared(acquires);
  • }
  • }
  • // 公平同步器
  • static final class FairSync extends Sync {
  • private static final long serialVersionUID = 2014338818796000944L;
  • FairSync(int permits) {
  • super(permits);
  • }
  • protected int tryAcquireShared(int acquires) {
  • for (;;) {
  • if (hasQueuedPredecessors())
  • // 公平模式下需要先判断同步队列中是否有线程已经等待很久了
  • return -1;
  • int available = getState();
  • // 计算从state中减去获取的许可量后的值remaining
  • int remaining = available - acquires;
  • /**
  • * 如果remaining小于0,则直接返回remaining,注意此时返回的remaining是小于0的,表示获取失败
  • * 如果remaining大于等于0,则尝试CAS修改state为remaining,如果修改成功就返回remaining,表示获取成功
  • * 否则自旋进入下一次的尝试
  • */
  • if (remaining < 0 || compareAndSetState(available, remaining))
  • return remaining;
  • }
  • }
  • }
  • public Semaphore(int permits) {
  • sync = new NonfairSync(permits);
  • }
  • public Semaphore(int permits, boolean fair) {
  • // 根据传入的fair参数决定使用哪种同步器
  • sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  • }
  • // 获取1个许可量
  • public void acquire() throws InterruptedException {
  • sync.acquireSharedInterruptibly(1);
  • }
  • // 获取指定数量的许可量
  • public void acquire(int permits) throws InterruptedException {
  • if (permits < 0) throw new IllegalArgumentException();
  • sync.acquireSharedInterruptibly(permits);
  • }
  • public void acquireUninterruptibly() {
  • sync.acquireShared(1);
  • }
  • public boolean tryAcquire() {
  • return sync.nonfairTryAcquireShared(1) >= 0;
  • }
  • public boolean tryAcquire(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  • }
  • public void acquireUninterruptibly(int permits) {
  • if (permits < 0) throw new IllegalArgumentException();
  • sync.acquireShared(permits);
  • }
  • public boolean tryAcquire(int permits) {
  • if (permits < 0) throw new IllegalArgumentException();
  • return sync.nonfairTryAcquireShared(permits) >= 0;
  • }
  • public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
  • throws InterruptedException {
  • if (permits < 0) throw new IllegalArgumentException();
  • return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  • }
  • // 释放1个许可量
  • public void release() {
  • sync.releaseShared(1);
  • }
  • // 释放指定数量的许可量
  • public void release(int permits) {
  • if (permits < 0) throw new IllegalArgumentException();
  • sync.releaseShared(permits);
  • }
  • // 获取可用的permit数量
  • public int availablePermits() {
  • return sync.getPermits();
  • }
  • // 获取并返回立即可用的所有许可
  • public int drainPermits() {
  • return sync.drainPermits();
  • }
  • // 根据指定的缩减量减小可用许可的数目
  • protected void reducePermits(int reduction) {
  • if (reduction < 0) throw new IllegalArgumentException();
  • sync.reducePermits(reduction);
  • }
  • public boolean isFair() {
  • return sync instanceof FairSync;
  • }
  • public final boolean hasQueuedThreads() {
  • return sync.hasQueuedThreads();
  • }
  • public final int getQueueLength() {
  • return sync.getQueueLength();
  • }
  • protected Collection<Thread> getQueuedThreads() {
  • return sync.getQueuedThreads();
  • }
  • public String toString() {
  • return super.toString() + "[Permits = " + sync.getPermits() + "]";
  • }
  • }

3.1. 基本结构

Semaphore的实现使用的是AQS的共享模式;Semaphore中Sync是一个抽象类,它实现了大部分的基础结构和方法。在创建Semaphore实例时,Sync的构造方法会被调用,并将传给Semaphore构造方法的参数premits传入:

  • // Sync的构造方法
  • Sync(int permits) {
  • // 直接将state设置为permits
  • setState(permits);
  • }

从源码可以得知,Sync对permits参数的处理是直接使用AQS提供的state来控制:

  • // 获取permits
  • final int getPermits() {
  • // 直接获取state
  • return getState();
  • }
  • // 返回立即可用的所有许可,并将state置为0
  • final int drainPermits() {
  • for (;;) {
  • int current = getState();
  • if (current == 0 || compareAndSetState(current, 0))
  • return current;
  • }
  • }

同时Semaphore提供了对permits进行各类快捷操作的方法:

  • // 获取可用的permit数量
  • public int availablePermits() {
  • return sync.getPermits();
  • }
  • // 获取并返回立即可用的所有许可
  • public int drainPermits() {
  • return sync.drainPermits();
  • }
  • // 根据指定的缩减量减小可用许可的数目
  • protected void reducePermits(int reduction) {
  • if (reduction < 0) throw new IllegalArgumentException();
  • sync.reducePermits(reduction);
  • }

Sync的继承有公平同步器FairSync和非公平同步器NonfairSync两种,这个与之前讲解的ReentrantLock和ReentrantReadWriteLock很相似:

  • // 默认情况下使用的是非公平同步器
  • public Semaphore(int permits) {
  • sync = new NonfairSync(permits);
  • }
  • public Semaphore(int permits, boolean fair) {
  • // 根据传入的fair参数决定使用哪种同步器
  • sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  • }

3.2. acquire操作

首先看Semaphore的acquire(int)方法,当线程调用Semaphore实例的acquire(int)方法时会传入要求的许可量,如果获取不到要求的许可量就会被阻塞,acquire(int)方法存在一个没有参数的重载acquire(),默认传入的是permits参数为1,它们的源码如下:

  • public void acquire() throws InterruptedException {
  • sync.acquireSharedInterruptibly(1);
  • }
  • public void acquire(int permits) throws InterruptedException {
  • if (permits < 0) throw new IllegalArgumentException();
  • sync.acquireSharedInterruptibly(permits);
  • }

会发现,acquire(int)方法底层调用的是Sync的acquireSharedInterruptibly(int)方法,这个方法继承自AQS:

  • public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  • if (Thread.interrupted())
  • throw new InterruptedException();
  • if (tryAcquireShared(arg) < 0)
  • doAcquireSharedInterruptibly(arg);
  • }

AQS的acquireSharedInterruptibly(int)方法内部先判断了发生中断的情况,然后调用了tryAcquireShared(arg),这个方法是被Semaphore的内部类Sync重写的,即以获取共享锁的方式来控制线程是否成功获取要求的许可量;以FairSync中重写的为例:

  • protected int tryAcquireShared(int acquires) {
  • for (;;) {
  • if (hasQueuedPredecessors())
  • // 公平模式下需要先判断同步队列中是否有线程已经等待很久了
  • return -1;
  • int available = getState();
  • // 计算从state中减去获取的许可量后的值remaining
  • int remaining = available - acquires;
  • /**
  • * 如果remaining小于0,则直接返回remaining,注意此时返回的remaining是小于0的,表示获取失败
  • * 如果remaining大于等于0,则尝试CAS修改state为remaining,如果修改成功就返回remaining,表示获取成功
  • * 否则自旋进入下一次的尝试
  • */
  • if (remaining < 0 || compareAndSetState(available, remaining))
  • return remaining;
  • }
  • }

在公平模式下,会首先判断同步队列是否有线程已经等待了很久,如果有会直接返回-1表示当前线程获取共享锁失败,也即是表示当前线程获取要求的许可量失败;如果没有线程已经在等待,就会计算剩余的许可量是否满足当前线程请求的许可量,计算出来的remaining结果值如果小于0,表示不满足,将直接返回remaining(值为负数),如果remaining大于等于0,表示满足,就会从剩余许可量中减去要求的许可量并将新值更新到state变量,然后返回remaining(值为非负数)。

上层的acquireSharedInterruptibly(int)方法会根据tryAcquireShared(int)的返回值决定是否调用doAcquireSharedInterruptibly(arg),该方法来自于AQS,是doAcquireShared(int)的可中断版本,功能基本与doAcquireShared(int)一样,它的功能主要用于将当前线程包装为共享节点,添加到AQS的同步队列进行等待;具体功能可以查看前面讲解AQS的文章8.AbstractQueuedSynchronizer详解(三),这里不再赘述。

3.3. release操作

release操作用于释放许可量,它在Semaphore中体现为两个重载方法:

  • // 释放1个许可量
  • public void release() {
  • sync.releaseShared(1);
  • }
  • // 释放指定数量的许可量
  • public void release(int permits) {
  • if (permits < 0) throw new IllegalArgumentException();
  • sync.releaseShared(permits);
  • }

与acquire操作非常类似,release操作调用了Sync的releaseShared(int)方法,而这个方法继承自AQS:

  • public final boolean releaseShared(int arg) {
  • if (tryReleaseShared(arg)) {
  • doReleaseShared();
  • return true;
  • }
  • return false;
  • }

方法体内调用的tryReleaseShared(arg)则被Sync重写:

  • protected final boolean tryReleaseShared(int releases) {
  • for (;;) {
  • // 获取当前state值
  • int current = getState();
  • // 计算添加释放的许可量后state的值
  • int next = current + releases;
  • // 溢出
  • if (next < current) // overflow
  • throw new Error("Maximum permit count exceeded");
  • if (compareAndSetState(current, next))
  • // 修改state为新值后返回true表示释放成功
  • return true;
  • }
  • }

tryReleaseShared(int)方法总是会返回true,然后releaseShared(int)方法会调用doReleaseShared(),这个方法在之前的8.AbstractQueuedSynchronizer详解(三)中也分析过了,用于传播唤醒所有的等待线程竞争许可量的获取,成功获取到许可量的线程会继续往下执行,而没获取到许可量的线程又会重新被挂起等待,这里只贴出代码,不再赘述:

  • /**
  • * Release action for shared mode -- signal successor and ensure
  • * propagation. (Note: For exclusive mode, release just amounts
  • * to calling unparkSuccessor of head if it needs signal.)
  • * 共享模式的释放操作,将唤醒后继节点并保证传播性
  • * 与独占模式不同,独占模式只会尝试唤醒后继节点
  • */
  • private void doReleaseShared() {
  • /*
  • * Ensure that a release propagates, even if there are other
  • * in-progress acquires/releases. This proceeds in the usual
  • * way of trying to unparkSuccessor of head if it needs
  • * signal. But if it does not, status is set to PROPAGATE to
  • * ensure that upon release, propagation continues.
  • * Additionally, we must loop in case a new node is added
  • * while we are doing this. Also, unlike other uses of
  • * unparkSuccessor, we need to know if CAS to reset status
  • * fails, if so rechecking.
  • */
  • for (; ; ) {
  • Node h = head;
  • if (h != null && h != tail) {
  • int ws = h.waitStatus;
  • if (ws == Node.SIGNAL) {
  • // 如果当前节点的waitStatus为SIGNAL,则尝试改为0,修改成功就唤醒头节点的后继节点锁包装的线程,修改失败重新尝试修改
  • if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  • continue; // loop to recheck cases
  • // 这里会唤醒头节点的后继节点
  • unparkSuccessor(h);
  • } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  • // 如果头节点的waitStatus为0,则尝试修改为PROPAGATE,如果修改失败则重新尝试修改
  • continue; // loop on failed CAS
  • }
  • /**
  • * 由于头节点的后继节点所包装的线程被唤醒了,所以当被唤醒的线程获取到共享锁后,头节点可能会被修改;
  • * 因此当头节点被修改后,继续循环操作,开始下一轮的传播唤醒;
  • * 如果头节点没有被改变,说明此时虽然唤醒了后继节点,但后继节点并没有获取到共享锁,
  • * 因此直接break,将下一轮的传播唤醒操作交给后继节点
  • */
  • if (h == head) // loop if head changed
  • break;
  • }
  • }

3.4. 非公平模式

Semaphore中的非公平同步器的实现是NonFairSync,非公平信号量许可的释放与公平信号量许可的释放是一样的,不同的是它们获取许可量的机制不同,非公平同步器的tryAcquireShared(int)调用了父类Sync中nonfairTryAcquireShared(int)

  • protected int tryAcquireShared(int acquires) {
  • return nonfairTryAcquireShared(acquires);
  • }

nonfairTryAcquireShared(int)方法的实现如下:

  • final int nonfairTryAcquireShared(int acquires) {
  • for (;;) {
  • // 设置可以获得的信号量的许可数
  • int available = getState();
  • // 设置获得acquires个信号量许可之后,剩余的信号量许可数
  • int remaining = available - acquires;
  • // 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
  • if (remaining < 0 ||
  • compareAndSetState(available, remaining))
  • return remaining;
  • }
  • }

nonfairTryAcquireShared(int)的for循环中,它都会直接判断当前剩余的许可量是否足够;足够的话,则直接设置可以获得的信号量许可数,进而再获取信号量。
而公平信号量的tryAcquireShared(int)中,在获取信号量之前会通过if (hasQueuedPredecessors())来判断当前同步队列是不是有线程在等待,如果有会直接返回-1表示当前线程获取许可量失败。