Java多线程 16 - ReentrantLock互斥锁
发布于 / 2017-12-02
简介:ReentrantLock是一个可重入的互斥锁,又被称为独占锁。顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
1. ReentrantLock介绍
ReentrantLock是一个可重入的互斥锁,又被称为独占锁。顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为公平锁和非公平锁。它们的区别体现在获取锁的机制上是否公平。锁是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到锁时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在公平锁的机制下,线程依次排队获取锁;而非公平锁在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
ReentrantLock函数列表如下:
- // 创建一个 ReentrantLock ,默认是非公平锁。
- ReentrantLock()
- // 创建策略是fair的ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。
- ReentrantLock(boolean fair)
- // 查询当前线程保持此锁的次数。
- int getHoldCount()
- // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回null。
- protected Thread getOwner()
- // 返回一个collection,它包含可能正等待获取此锁的线程。
- protected Collection<Thread> getQueuedThreads()
- // 返回正等待获取此锁的线程估计数。
- int getQueueLength()
- // 返回一个collection,它包含可能正在等待与此锁相关给定条件的那些线程。
- protected Collection<Thread> getWaitingThreads(Condition condition)
- // 返回等待与此锁相关的给定条件的线程估计数。
- int getWaitQueueLength(Condition condition)
- // 查询给定线程是否正在等待获取此锁。
- boolean hasQueuedThread(Thread thread)
- // 查询是否有些线程正在等待获取此锁。
- boolean hasQueuedThreads()
- // 查询是否有些线程正在等待与此锁有关的给定条件。
- boolean hasWaiters(Condition condition)
- // 如果是公平锁返回true,否则返回false。
- boolean isFair()
- // 查询当前线程是否保持此锁。
- boolean isHeldByCurrentThread()
- // 查询此锁是否由任意线程保持。
- boolean isLocked()
- // 获取锁。
- void lock()
- // 如果当前线程未被中断,则获取锁。
- void lockInterruptibly()
- // 返回用来与此 Lock 实例一起使用的 Condition 实例。
- Condition newCondition()
- // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
- boolean tryLock()
- // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
- boolean tryLock(long timeout, TimeUnit unit)
- // 试图释放此锁。
- void unlock()
ReentrantLock的类图如下:
从中可以看出:
- ReentrantLock实现了Lock接口。Lock接口提供了获取锁的
lock()
方法和释放锁的unlock()
方法。 - ReentrantLock中包含Sync同步器,通过Sync同步器,ReentrantLock实现了锁的竞争、获取和释放。
- Sync是一个继承于AQS的抽象类,Sync包括公平同步器FairSync和非公平同步器NonfairSync。
sync
对象是FairSync和NonfairSync中的一个,默认是NonfairSync。
2. ReentrantLock示例
首先是一个简单的生产者和消费者示例,使用ReentrantLock锁实现同步:
- import java.util.concurrent.locks.ReentrantLock;
- import java.util.concurrent.locks.Lock;
- class Store {
- // 仓库最大库存量
- private final static int MAX_COUNT = 10;
- // 已存储的商品的数量
- private volatile int storeCount = 0;
- // 互斥锁
- private Lock lock = new ReentrantLock();
- // 生产方法
- public void produce() {
- // 加锁
- lock.lock();
- try {
- // 当已存储量小于最大库存时,进行生产
- if (storeCount < MAX_COUNT) {
- storeCount++;
- System.out.println(Thread.currentThread().getName() + ", Store produced, now left count is " + storeCount);
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
- public void consum() {
- // 加锁
- lock.lock();
- try {
- // 当已存储量大于0时,进行消费
- if (storeCount > 0) {
- storeCount--;
- System.out.println(Thread.currentThread().getName() + ", Store consumed, now left count is " + storeCount);
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
- }
- class Producer implements Runnable {
- private Store store;
- public Producer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- // 不断生产
- while (true) {
- this.store.produce();
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- class Consumer implements Runnable {
- private Store store;
- public Consumer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- // 不断消费
- while (true) {
- this.store.consum();
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- public class ProducerAndConsumerTest {
- public static void main(String[] args) {
- // 创建一个仓库
- Store store = new Store();
- // 启动5个生产线程
- new Thread(new Producer(store), "Producer-1").start();
- new Thread(new Producer(store), "Producer-2").start();
- new Thread(new Producer(store), "Producer-3").start();
- new Thread(new Producer(store), "Producer-4").start();
- new Thread(new Producer(store), "Producer-5").start();
- // 启动2个消费线程
- new Thread(new Consumer(store), "Consumer-1").start();
- new Thread(new Consumer(store), "Consumer-2").start();
- }
- }
某一次运行结果:
- Producer-1, Store produced, now left count is 1
- Producer-2, Store produced, now left count is 2
- Producer-3, Store produced, now left count is 3
- Producer-4, Store produced, now left count is 4
- Producer-5, Store produced, now left count is 5
- Consumer-1, Store consumed, now left count is 4
- Consumer-2, Store consumed, now left count is 3
- Producer-1, Store produced, now left count is 4
- Producer-2, Store produced, now left count is 5
- Producer-3, Store produced, now left count is 6
- Producer-4, Store produced, now left count is 7
- Producer-5, Store produced, now left count is 8
- Consumer-1, Store consumed, now left count is 7
- Consumer-2, Store consumed, now left count is 6
- ...
结果分析:
1. Store是个仓库类,通过produce()
能往仓库中生产商品,通过consume()
能消费仓库中的商品。通过互斥锁lock实现对仓库的互斥访问:在生产或消费仓库中商品前,会先通过lock()
锁住仓库,操作完之后再通过unlock()
解锁。
2. Producer是生产者线程类,它会不断地向仓库中生产商品,为了方便查看日志,每次生产都会睡眠1.5秒。
3. Customer是消费者线程类,它会不断地从仓库中消费商品,为了方便查看日志,每次生产都会睡眠1.5秒。
4. 在主线程main中,创建了5个生产者线程生产商品,2个消费者线程消费商品。通过打印日志可知,生产线程和消费线程的协作是不存在问题的,数量的变化是正确无误的。
虽然上面的代码可以使一个生产者和消费者的模型正常运行,但存在一些缺陷,例如当仓库中商品数量已达最大库存时,虽然Store的生产方法会直接跳过生产过程并释放锁,但是这种情况下还是不能避免下次获取锁的线程依旧是生产线程,依旧无法生产,这就导致可能会有一段时间生产线程和和消费线程都处于空等状态,存在一定的性能问题。正常的情况应该是,当仓库的商品数量已达库存上限时,使生产线程等待,让消费线程进行消费,同样的,当仓库的商品数量为0时,应该使消费线程等待,让生产线程进行生产。想要达到这样的效果,就需要使用Condition类,通过Condition中的await()
方法,能让线程阻塞,类似于wait()
;通过Condition的signal()
方法,能让唤醒线程,类似于notify()
:
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- class Store {
- // 仓库最大库存量
- private final static int MAX_COUNT = 10;
- // 已存储的商品的数量
- private volatile int storeCount = 0;
- // 互斥锁
- private Lock lock = new ReentrantLock();
- // 生产condition
- private Condition producerCondition = lock.newCondition();
- // 消费condition
- private Condition consumerCondition = lock.newCondition();
- public void store() {
- try {
- // 加锁
- lock.lock();
- while (storeCount >= MAX_COUNT) {
- try {
- // 让生产线程等待
- System.out.println(Thread.currentThread().getName() + " Producers await");
- producerCondition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- storeCount++;
- System.out.println(Thread.currentThread().getName() + " produced, store storeCount: " + storeCount);
- // 让消费线程唤醒
- consumerCondition.signal();
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- public void consum() {
- try {
- // 加锁
- lock.lock();
- while (storeCount <= 0) {
- try {
- // 让消费线程等待
- System.out.println(Thread.currentThread().getName() + " Consumer await");
- consumerCondition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- storeCount--;
- System.out.println(Thread.currentThread().getName() + " consumed, store storeCount: " + storeCount);
- // 让生产线程唤醒
- producerCondition.signal();
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- }
- class Producer implements Runnable {
- private Store store;
- public Producer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- while (true) {
- store.store();
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- class Consumer implements Runnable {
- private Store store;
- public Consumer(Store store) {
- this.store = store;
- }
- @Override
- public void run() {
- while (true) {
- store.consum();
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- public class ProducerAndConsumerTest {
- public static void main(String[] args) {
- // 创建一个仓库
- Store store = new Store();
- // 启动5个生产线程
- new Thread(new Producer(store), "Producer-1").start();
- new Thread(new Producer(store), "Producer-2").start();
- new Thread(new Producer(store), "Producer-3").start();
- new Thread(new Producer(store), "Producer-4").start();
- new Thread(new Producer(store), "Producer-5").start();
- // 启动2个消费线程
- new Thread(new Consumer(store), "Consumer-1").start();
- new Thread(new Consumer(store), "Consumer-2").start();
- }
- }
某一次运行结果如下:
- Producer-1 produced, store storeCount: 1
- Producer-2 produced, store storeCount: 2
- Producer-3 produced, store storeCount: 3
- Producer-4 produced, store storeCount: 4
- Producer-5 produced, store storeCount: 5
- Consumer-1 consumed, store storeCount: 4
- Consumer-2 consumed, store storeCount: 3
- Producer-1 produced, store storeCount: 4
- Producer-2 produced, store storeCount: 5
- Producer-4 produced, store storeCount: 6
- Producer-3 produced, store storeCount: 7
- Producer-5 produced, store storeCount: 8
- Consumer-1 consumed, store storeCount: 7
- Consumer-2 consumed, store storeCount: 6
- Producer-1 produced, store storeCount: 7
- Producer-3 produced, store storeCount: 8
- Producer-2 produced, store storeCount: 9
- Producer-4 produced, store storeCount: 10
- Producer-5 Producers await
- Consumer-1 consumed, store storeCount: 9
- Consumer-2 consumed, store storeCount: 8
- Producer-5 produced, store storeCount: 9
- Producer-3 produced, store storeCount: 10
- Producer-1 Producers await
- Producer-2 Producers await
- Producer-4 Producers await
- Consumer-1 consumed, store storeCount: 9
- Consumer-2 consumed, store storeCount: 8
注:ReentrantLock对应的源码分析,会在后面与AbstractQueuedSynchronizer一起介绍,这里读者只需要先明白ReentrantLock的基本用法即可。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...