Java
Java多线程
Java并发
JUC锁

Java多线程 16 - ReentrantLock互斥锁

简介:ReentrantLock是一个可重入的互斥锁,又被称为独占锁。顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。

1. ReentrantLock介绍

ReentrantLock是一个可重入的互斥锁,又被称为独占锁。顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。

ReentrantLock分为公平锁和非公平锁。它们的区别体现在获取锁的机制上是否公平。锁是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到锁时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在公平锁的机制下,线程依次排队获取锁;而非公平锁在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

ReentrantLock函数列表如下:

  • // 创建一个 ReentrantLock ,默认是非公平锁。
  • ReentrantLock()
  • // 创建策略是fair的ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。
  • ReentrantLock(boolean fair)
  • // 查询当前线程保持此锁的次数。
  • int getHoldCount()
  • // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回null。
  • protected Thread getOwner()
  • // 返回一个collection,它包含可能正等待获取此锁的线程。
  • protected Collection<Thread> getQueuedThreads()
  • // 返回正等待获取此锁的线程估计数。
  • int getQueueLength()
  • // 返回一个collection,它包含可能正在等待与此锁相关给定条件的那些线程。
  • protected Collection<Thread> getWaitingThreads(Condition condition)
  • // 返回等待与此锁相关的给定条件的线程估计数。
  • int getWaitQueueLength(Condition condition)
  • // 查询给定线程是否正在等待获取此锁。
  • boolean hasQueuedThread(Thread thread)
  • // 查询是否有些线程正在等待获取此锁。
  • boolean hasQueuedThreads()
  • // 查询是否有些线程正在等待与此锁有关的给定条件。
  • boolean hasWaiters(Condition condition)
  • // 如果是公平锁返回true,否则返回false。
  • boolean isFair()
  • // 查询当前线程是否保持此锁。
  • boolean isHeldByCurrentThread()
  • // 查询此锁是否由任意线程保持。
  • boolean isLocked()
  • // 获取锁。
  • void lock()
  • // 如果当前线程未被中断,则获取锁。
  • void lockInterruptibly()
  • // 返回用来与此 Lock 实例一起使用的 Condition 实例。
  • Condition newCondition()
  • // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
  • boolean tryLock()
  • // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
  • boolean tryLock(long timeout, TimeUnit unit)
  • // 试图释放此锁。
  • void unlock()

ReentrantLock的类图如下:

1.ReentrantLock.png

从中可以看出:

  1. ReentrantLock实现了Lock接口。Lock接口提供了获取锁的lock()方法和释放锁的unlock()方法。
  2. ReentrantLock中包含Sync同步器,通过Sync同步器,ReentrantLock实现了锁的竞争、获取和释放。
  3. Sync是一个继承于AQS的抽象类,Sync包括公平同步器FairSync和非公平同步器NonfairSync。sync对象是FairSync和NonfairSync中的一个,默认是NonfairSync。

2. ReentrantLock示例

首先是一个简单的生产者和消费者示例,使用ReentrantLock锁实现同步:

  • import java.util.concurrent.locks.ReentrantLock;
  • import java.util.concurrent.locks.Lock;
  • class Store {
  • // 仓库最大库存量
  • private final static int MAX_COUNT = 10;
  • // 已存储的商品的数量
  • private volatile int storeCount = 0;
  • // 互斥锁
  • private Lock lock = new ReentrantLock();
  • // 生产方法
  • public void produce() {
  • // 加锁
  • lock.lock();
  • try {
  • // 当已存储量小于最大库存时,进行生产
  • if (storeCount < MAX_COUNT) {
  • storeCount++;
  • System.out.println(Thread.currentThread().getName() + ", Store produced, now left count is " + storeCount);
  • }
  • } finally {
  • // 释放锁
  • lock.unlock();
  • }
  • }
  • public void consum() {
  • // 加锁
  • lock.lock();
  • try {
  • // 当已存储量大于0时,进行消费
  • if (storeCount > 0) {
  • storeCount--;
  • System.out.println(Thread.currentThread().getName() + ", Store consumed, now left count is " + storeCount);
  • }
  • } finally {
  • // 释放锁
  • lock.unlock();
  • }
  • }
  • }
  • class Producer implements Runnable {
  • private Store store;
  • public Producer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • // 不断生产
  • while (true) {
  • this.store.produce();
  • try {
  • Thread.sleep(1500);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • class Consumer implements Runnable {
  • private Store store;
  • public Consumer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • // 不断消费
  • while (true) {
  • this.store.consum();
  • try {
  • Thread.sleep(1500);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • public class ProducerAndConsumerTest {
  • public static void main(String[] args) {
  • // 创建一个仓库
  • Store store = new Store();
  • // 启动5个生产线程
  • new Thread(new Producer(store), "Producer-1").start();
  • new Thread(new Producer(store), "Producer-2").start();
  • new Thread(new Producer(store), "Producer-3").start();
  • new Thread(new Producer(store), "Producer-4").start();
  • new Thread(new Producer(store), "Producer-5").start();
  • // 启动2个消费线程
  • new Thread(new Consumer(store), "Consumer-1").start();
  • new Thread(new Consumer(store), "Consumer-2").start();
  • }
  • }

某一次运行结果:

  • Producer-1, Store produced, now left count is 1
  • Producer-2, Store produced, now left count is 2
  • Producer-3, Store produced, now left count is 3
  • Producer-4, Store produced, now left count is 4
  • Producer-5, Store produced, now left count is 5
  • Consumer-1, Store consumed, now left count is 4
  • Consumer-2, Store consumed, now left count is 3
  • Producer-1, Store produced, now left count is 4
  • Producer-2, Store produced, now left count is 5
  • Producer-3, Store produced, now left count is 6
  • Producer-4, Store produced, now left count is 7
  • Producer-5, Store produced, now left count is 8
  • Consumer-1, Store consumed, now left count is 7
  • Consumer-2, Store consumed, now left count is 6
  • ...

结果分析:
1. Store是个仓库类,通过produce()能往仓库中生产商品,通过consume()能消费仓库中的商品。通过互斥锁lock实现对仓库的互斥访问:在生产或消费仓库中商品前,会先通过lock()锁住仓库,操作完之后再通过unlock()解锁。
2. Producer是生产者线程类,它会不断地向仓库中生产商品,为了方便查看日志,每次生产都会睡眠1.5秒。
3. Customer是消费者线程类,它会不断地从仓库中消费商品,为了方便查看日志,每次生产都会睡眠1.5秒。
4. 在主线程main中,创建了5个生产者线程生产商品,2个消费者线程消费商品。通过打印日志可知,生产线程和消费线程的协作是不存在问题的,数量的变化是正确无误的。

虽然上面的代码可以使一个生产者和消费者的模型正常运行,但存在一些缺陷,例如当仓库中商品数量已达最大库存时,虽然Store的生产方法会直接跳过生产过程并释放锁,但是这种情况下还是不能避免下次获取锁的线程依旧是生产线程,依旧无法生产,这就导致可能会有一段时间生产线程和和消费线程都处于空等状态,存在一定的性能问题。正常的情况应该是,当仓库的商品数量已达库存上限时,使生产线程等待,让消费线程进行消费,同样的,当仓库的商品数量为0时,应该使消费线程等待,让生产线程进行生产。想要达到这样的效果,就需要使用Condition类,通过Condition中的await()方法,能让线程阻塞,类似于wait();通过Condition的signal()方法,能让唤醒线程,类似于notify()

  • import java.util.concurrent.locks.Condition;
  • import java.util.concurrent.locks.Lock;
  • import java.util.concurrent.locks.ReentrantLock;
  • class Store {
  • // 仓库最大库存量
  • private final static int MAX_COUNT = 10;
  • // 已存储的商品的数量
  • private volatile int storeCount = 0;
  • // 互斥锁
  • private Lock lock = new ReentrantLock();
  • // 生产condition
  • private Condition producerCondition = lock.newCondition();
  • // 消费condition
  • private Condition consumerCondition = lock.newCondition();
  • public void store() {
  • try {
  • // 加锁
  • lock.lock();
  • while (storeCount >= MAX_COUNT) {
  • try {
  • // 让生产线程等待
  • System.out.println(Thread.currentThread().getName() + " Producers await");
  • producerCondition.await();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • storeCount++;
  • System.out.println(Thread.currentThread().getName() + " produced, store storeCount: " + storeCount);
  • // 让消费线程唤醒
  • consumerCondition.signal();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • public void consum() {
  • try {
  • // 加锁
  • lock.lock();
  • while (storeCount <= 0) {
  • try {
  • // 让消费线程等待
  • System.out.println(Thread.currentThread().getName() + " Consumer await");
  • consumerCondition.await();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • storeCount--;
  • System.out.println(Thread.currentThread().getName() + " consumed, store storeCount: " + storeCount);
  • // 让生产线程唤醒
  • producerCondition.signal();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • }
  • class Producer implements Runnable {
  • private Store store;
  • public Producer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • while (true) {
  • store.store();
  • try {
  • Thread.sleep(1500);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • class Consumer implements Runnable {
  • private Store store;
  • public Consumer(Store store) {
  • this.store = store;
  • }
  • @Override
  • public void run() {
  • while (true) {
  • store.consum();
  • try {
  • Thread.sleep(1500);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • public class ProducerAndConsumerTest {
  • public static void main(String[] args) {
  • // 创建一个仓库
  • Store store = new Store();
  • // 启动5个生产线程
  • new Thread(new Producer(store), "Producer-1").start();
  • new Thread(new Producer(store), "Producer-2").start();
  • new Thread(new Producer(store), "Producer-3").start();
  • new Thread(new Producer(store), "Producer-4").start();
  • new Thread(new Producer(store), "Producer-5").start();
  • // 启动2个消费线程
  • new Thread(new Consumer(store), "Consumer-1").start();
  • new Thread(new Consumer(store), "Consumer-2").start();
  • }
  • }

某一次运行结果如下:

  • Producer-1 produced, store storeCount: 1
  • Producer-2 produced, store storeCount: 2
  • Producer-3 produced, store storeCount: 3
  • Producer-4 produced, store storeCount: 4
  • Producer-5 produced, store storeCount: 5
  • Consumer-1 consumed, store storeCount: 4
  • Consumer-2 consumed, store storeCount: 3
  • Producer-1 produced, store storeCount: 4
  • Producer-2 produced, store storeCount: 5
  • Producer-4 produced, store storeCount: 6
  • Producer-3 produced, store storeCount: 7
  • Producer-5 produced, store storeCount: 8
  • Consumer-1 consumed, store storeCount: 7
  • Consumer-2 consumed, store storeCount: 6
  • Producer-1 produced, store storeCount: 7
  • Producer-3 produced, store storeCount: 8
  • Producer-2 produced, store storeCount: 9
  • Producer-4 produced, store storeCount: 10
  • Producer-5 Producers await
  • Consumer-1 consumed, store storeCount: 9
  • Consumer-2 consumed, store storeCount: 8
  • Producer-5 produced, store storeCount: 9
  • Producer-3 produced, store storeCount: 10
  • Producer-1 Producers await
  • Producer-2 Producers await
  • Producer-4 Producers await
  • Consumer-1 consumed, store storeCount: 9
  • Consumer-2 consumed, store storeCount: 8

注:ReentrantLock对应的源码分析,会在后面与AbstractQueuedSynchronizer一起介绍,这里读者只需要先明白ReentrantLock的基本用法即可。