Java多线程
Java并发
JUC集合

Java多线程 36 - ArrayBlockingQueue详解

简介:ArrayBlockingQueue是数组实现的线程安全的、有界的阻塞队列。

1. ArrayBlockingQueue介绍

ArrayBlockingQueue是数组实现的线程安全的、有界的阻塞队列,其中:

  • 线程安全是指ArrayBlockingQueue内部通过互斥锁保护竞争资源,实现了多线程对竞争资源的互斥访问。
  • 有界是指ArrayBlockingQueue对应的数组是有界限的,当添加的元素个数超出了数组的大小,可能会抛出异常或进入等待,视调用的方法而不同。
  • 阻塞队列是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它的调用特定的方法操作该资源的线程需要阻塞等待;ArrayBlockingQueue内部以数组实现了一个循环队列,按FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。

注:这里的“调用特定的方法”意思是,ArrayBlockingQueue也提供了相应的非阻塞方法。

注:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是数组实现的,并且是有界限的;而ConcurrentLinkedQueue是链表实现的,是无界限的。

ArrayBlockingQueue的类图结构如下:

1.ArrayBlockingQueue类图结构.png

  1. ArrayBlockingQueue继承于AbstractQueue,并且它实现了BlockingQueue接口。
  2. ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小是在创建ArrayBlockingQueue时指定的。
  3. ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象lock。ArrayBlockingQueue就是根据ReentrantLock互斥锁实现多线程对竞争资源的互斥访问。另外,ReentrantLock分为公平锁和非公平锁,具体使用公平锁还是非公平锁可以在创建ArrayBlockingQueue时可以指定;ArrayBlockingQueue默认会使用非公平锁。
  4. ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象notEmptynotFull。Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问:
    • 若线程A要取数据时,数组正好为空,则该线程会执行notEmpty.await()进行等待;当其它线程B向数组中插入了数据之后,会调用notEmpty.signal()唤醒notEmpty上的等待线程。此时,线程A会被唤醒从而得以继续运行。
    • 若线程H要插入数据时,数组已满,则该线程会它执行notFull.await()进行等待;当其它线程I取出数据之后,会调用notFull.signal()唤醒notFull上的等待线程。此时,线程H就会被唤醒从而得以继续运行。

关于ReentrantLock以及Condition等更多的内容,可以参考:

  1. Java多线程 16 - ReentrantLock互斥锁
  2. Java多线程 18 - Condition
  3. Java多线程 20 - AbstractQueuedSynchronizer详解(1)
  4. Java多线程 20 - AbstractQueuedSynchronizer详解(2)
  5. Java多线程 20 - AbstractQueuedSynchronizer详解(3)

ArrayBlockingQueue函数列表如下:

  • // 创建一个带有给定的(固定)容量和默认访问策略的ArrayBlockingQueue
  • ArrayBlockingQueue(int capacity)
  • // 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue
  • ArrayBlockingQueue(int capacity, boolean fair)
  • // 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue,它最初包含给定collection的元素,并以collection迭代器的遍历顺序添加元素
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
  • // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回true,如果此队列已满,则抛出IllegalStateException
  • boolean add(E e)
  • // 自动移除此队列中的所有元素
  • void clear()
  • // 如果此队列包含指定的元素,则返回true
  • boolean contains(Object o)
  • // 移除此队列中所有可用的元素,并将它们添加到给定collection中
  • int drainTo(Collection<? super E> c)
  • // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中
  • int drainTo(Collection<? super E> c, int maxElements)
  • // 返回在此队列中的元素上按适当顺序进行迭代的迭代器
  • Iterator<E> iterator()
  • // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回true,如果此队列已满,则返回false
  • boolean offer(E e)
  • // 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间
  • boolean offer(E e, long timeout, TimeUnit unit)
  • // 获取但不移除此队列的头;如果此队列为空,则返回 null
  • E peek()
  • // 获取并移除此队列的头,如果此队列为空,则返回 null
  • E poll()
  • // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
  • E poll(long timeout, TimeUnit unit)
  • // 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间
  • void put(E e)
  • // 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量
  • int remainingCapacity()
  • // 从此队列中移除指定元素的单个实例(如果存在)
  • boolean remove(Object o)
  • // 返回此队列中元素的数量
  • int size()
  • // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
  • E take()
  • // 返回一个按适当顺序包含此队列中所有元素的数组
  • Object[] toArray()
  • // 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型
  • <T> T[] toArray(T[] a)
  • // 返回此collection的字符串表示形式
  • String toString()

2. BlockingQueue的四类操作场景

ArrayBlockingQueue实现了BlockingQueue接口,实际上,在后面文章中讲解的阻塞队列都实现了BlockingQueue接口。BlockingQueue接口的大多数方法与List接口类似,这里只对其提供的用于插入队尾元素、移除或检查队首元素等多个方法进行区别总结,它们的区别如下:

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, timeout, time_unit)
移除 remove() poll() take() poll(timeout, time_unit)
检查 element() peek()
  1. 抛出异常:当阻塞队列满时,再使用add(e)方法往队列中插入元素会抛出IllegalStateException: Queue full异常;当阻塞队列空时,再使用remove()方法移除元素,或使用element()检查队首元素时会抛出NoSuchElementException异常。
  2. 特殊值:使用offer(e)方法插入元素,会返回true表示成功或false表示插入失败;使用poll()方法移除元素,若移除元素成功就返回出队元素,当没有元素时则返回null;使用peek()方法检查队首元素时,若队首元素存在则返回该元素,当没有元素时则返回null。
  3. 阻塞:当阻塞队列满时,再使用put(e)方法往队列中插入元素会阻塞线程直到队列有空闲位置,或者被中断退出;当阻塞队列空时,再使用take()方法移除元素时会阻塞线程直到队列中有剩余元素,或者被中断退出。
  4. 超时:当阻塞队列满时,再使用offer(e, timeout, time_unit)方法往队列中插入元素会阻塞线程直到队列有空闲位置,或者超时退出,或者被中断退出;当阻塞队列空时,再使用poll(timeout, time_unit)方法移除元素时会阻塞线程直到队列中有剩余元素,或者超时退出,或者被中断退出。

3. ArrayBlockingQueue源码解析

下面是ArrayBlockingQueue的源码解析,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.concurrent.locks.Condition;
  • import java.util.concurrent.locks.ReentrantLock;
  • import java.util.AbstractQueue;
  • import java.util.Collection;
  • import java.util.Iterator;
  • import java.util.NoSuchElementException;
  • import java.lang.ref.WeakReference;
  • import java.util.Spliterators;
  • import java.util.Spliterator;
  • public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  • implements BlockingQueue<E>, java.io.Serializable {
  • /**
  • * Serialization ID. This class relies on default serialization
  • * even for the items array, which is default-serialized, even if
  • * it is empty. Otherwise it could not be declared final, which is
  • * necessary here.
  • */
  • private static final long serialVersionUID = -817911632652898426L;
  • /**
  • * The queued items
  • * 装载数据的数组
  • * */
  • final Object[] items;
  • /**
  • * items index for next take, poll, peek or remove
  • * 下一个take,poll,peek,remove操作的元素索引
  • * */
  • int takeIndex;
  • /**
  • * items index for next put, offer, or add
  • * 下一个put,offer,add操作的元素索引
  • * */
  • int putIndex;
  • /**
  • * Number of elements in the queue
  • * 队列中的元素个数
  • * */
  • int count;
  • /*
  • * Concurrency control uses the classic two-condition algorithm found in any textbook.
  • * 使用经典的two-condition算法控制并发,其实就是生产者-消费者模型算法
  • */
  • /**
  • * Main lock guarding all access
  • * 保证访问主要的锁
  • * */
  • final ReentrantLock lock;
  • /**
  • * Condition for waiting takes
  • * 获取操作的等待条件
  • * */
  • private final Condition notEmpty;
  • /**
  • * Condition for waiting puts
  • * 添加操作的等待条件
  • * */
  • private final Condition notFull;
  • // Internal helper methods
  • /**
  • * Circularly increment i.
  • * 循环对i自增,传入i,对i进行加1操作
  • * 如果加1后等于items数组的长度,则返回0,否则返回i
  • * 防止自增越界
  • */
  • final int inc(int i) {
  • return (++i == items.length) ? 0 : i;
  • }
  • /**
  • * Circularly decrement i.
  • * 循环对i自减,传入i
  • * 如果i等于0,则返回items数组的长度减1,否则返回i-1
  • * 防止自减越界
  • */
  • final int dec(int i) {
  • return ((i == 0) ? items.length : i) - 1;
  • }
  • // 强转为E类型
  • @SuppressWarnings("unchecked")
  • static <E> E cast(Object item) {
  • return (E) item;
  • }
  • /**
  • * Returns item at index i.
  • * 返回items中位于i索引位置的元素
  • */
  • final E itemAt(int i) {
  • return this.<E>cast(items[i]);
  • }
  • /**
  • * Throws NullPointerException if argument is null.
  • *
  • * 检查v是否为null
  • * @param v the element
  • */
  • private static void checkNotNull(Object v) {
  • if (v == null)
  • throw new NullPointerException();
  • }
  • /**
  • * Inserts element at current put position, advances, and signals.
  • * Call only when holding lock.
  • * 在当前putIndex位置上插入元素,并对putIndex进行自增,
  • * count自增,使用notEmpty.signal()唤醒等待在notEmpty条件上的线程
  • * 仅在获取锁的情况下调用
  • */
  • private void insert(E x) {
  • // 将putIndex索引位置的元素置为x
  • items[putIndex] = x;
  • // putIndex自增
  • putIndex = inc(putIndex);
  • // count自增,因为添加了元素
  • ++count;
  • // 唤醒notEmpty上的等待线程
  • notEmpty.signal();
  • }
  • /**
  • * Extracts element at current take position, advances, and signals.
  • * Call only when holding lock.
  • * 在当前takeIndex上提取元素,并对takeIndex进行自增
  • * count自减,使用notFull.signal()唤醒等待在notFull条件上的线程
  • * 仅在获取锁的情况下调用
  • */
  • private E extract() {
  • // 引用items数组
  • final Object[] items = this.items;
  • // 获取items中takeIndex位置的元素并强转为E类型
  • E x = this.<E>cast(items[takeIndex]);
  • // 将items的takeIndex位置上置为null
  • items[takeIndex] = null;
  • // takeIndex自增
  • takeIndex = inc(takeIndex);
  • // count自减,因为提取了元素
  • --count;
  • // 唤醒notFull上的等待线程
  • notFull.signal();
  • // 返回x
  • return x;
  • }
  • /**
  • * Deletes item at position i.
  • * Utility for remove and iterator.remove.
  • * Call only when holding lock.
  • * 移除items中i位置上的元素
  • * remove操作和iterator.remove操作的工具方法
  • * 仅在获取锁的情况下调用
  • */
  • void removeAt(int i) {
  • // 引用items数组
  • final Object[] items = this.items;
  • // if removing front item, just advance
  • if (i == takeIndex) { // 如果移除的是takeIndex索引上的元素,只需要取出返回即可
  • // 将items的takeIndex位置上置为null
  • items[takeIndex] = null;
  • // takeIndex自增
  • takeIndex = inc(takeIndex);
  • } else { // 说明移除的不是takeIndex索引上的元素
  • // slide over all others up through putIndex.
  • /**
  • * 无限循环
  • * 这里的处理方式是,从i开始一直往后遍历,直到putIndex
  • * 将i后面的元素依次往前移动1位
  • */
  • for (;;) {
  • // 对i进行自增,nexti为i自增后的索引
  • int nexti = inc(i);
  • if (nexti != putIndex) {
  • // 如果nexti不为putIndex,则将nexti位置上的元素赋值给i
  • items[i] = items[nexti];
  • // 将i置为nexti,相当于后移
  • i = nexti;
  • } else {
  • // 如果nexti为putIndex,则将i位置上的元素置为null
  • items[i] = null;
  • // 将putIndex置为i
  • putIndex = i;
  • // 结束循环
  • break;
  • }
  • }
  • }
  • // count自减,因为移除了元素
  • --count;
  • // 唤醒notFull上的等待线程
  • notFull.signal();
  • }
  • /**
  • * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • * capacity and default access policy.
  • *
  • * 仅传入容量参数时默认使用非公平锁
  • *
  • * @param capacity the capacity of this queue
  • * @throws IllegalArgumentException if {@code capacity < 1}
  • */
  • public ArrayBlockingQueue(int capacity) {
  • // 默认使用非公平锁
  • this(capacity, false);
  • }
  • /**
  • * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • * capacity and the specified access policy.
  • *
  • * 通过容量、是否使用公平锁两个参数来创建ArrayBlockingQueue
  • *
  • * @param capacity the capacity of this queue
  • * @param fair if {@code true} then queue accesses for threads blocked
  • * on insertion or removal, are processed in FIFO order;
  • * if {@code false} the access order is unspecified.
  • * @throws IllegalArgumentException if {@code capacity < 1}
  • */
  • public ArrayBlockingQueue(int capacity, boolean fair) {
  • // 检查容量
  • if (capacity <= 0)
  • throw new IllegalArgumentException();
  • // 创建capacity大小的Object数组
  • this.items = new Object[capacity];
  • // 根据fair创建锁
  • lock = new ReentrantLock(fair);
  • // 创建两个Condition
  • notEmpty = lock.newCondition();
  • notFull = lock.newCondition();
  • }
  • /**
  • * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • * capacity, the specified access policy and initially containing the
  • * elements of the given collection,
  • * added in traversal order of the collection's iterator.
  • *
  • * 根据容量、是否使用公平锁及一个Collection来创建ArrayBlockingQueue
  • * 需要将Collection中的元素添加到ArrayBlockingQueue中
  • *
  • * 需要注意的是,当Collection中元素个数大于capacity,会抛出IllegalArgumentException错误
  • *
  • * @param capacity the capacity of this queue
  • * @param fair if {@code true} then queue accesses for threads blocked
  • * on insertion or removal, are processed in FIFO order;
  • * if {@code false} the access order is unspecified.
  • * @param c the collection of elements to initially contain
  • * @throws IllegalArgumentException if {@code capacity} is less than
  • * {@code c.size()}, or less than 1.
  • * @throws NullPointerException if the specified collection or any
  • * of its elements are null
  • */
  • public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  • // 调用重载构造方法创建ArrayBlockingQueue
  • this(capacity, fair);
  • // 加锁操作
  • final ReentrantLock lock = this.lock;
  • lock.lock(); // Lock only for visibility, not mutual exclusion
  • try {
  • int i = 0;
  • try {
  • // 循环将元素添加到items中
  • for (E e : c) {
  • // 检查添加的元素是否为null
  • checkNotNull(e);
  • items[i++] = e;
  • }
  • } catch (ArrayIndexOutOfBoundsException ex) {
  • /**
  • * 注意,当Collection中元素个数大于capacity,会出现ArrayIndexOutOfBoundsException异常
  • * 但此处封装了,会抛出IllegalArgumentException异常
  • */
  • throw new IllegalArgumentException();
  • }
  • // 维护count和putIndex
  • count = i;
  • putIndex = (i == capacity) ? 0 : i;
  • } finally {
  • // 解锁操作
  • lock.unlock();
  • }
  • }
  • /**
  • * Inserts the specified element at the tail of this queue if it is
  • * possible to do so immediately without exceeding the queue's capacity,
  • * returning {@code true} upon success and throwing an
  • * {@code IllegalStateException} if this queue is full.
  • *
  • * 添加到队尾操作,调用的AbstractQueue的add方法
  • * 底层调用的offer()方法
  • *
  • * 会立即返回结果,这个方法与put()方法有区别,注意甄别
  • * 往Queue尾添加元素,当没有超出容量是会立即返回true,
  • * items已满时抛出IllegalStateException异常
  • *
  • * @param e the element to add
  • * @return {@code true} (as specified by {@link Collection#add})
  • * @throws IllegalStateException if this queue is full
  • * @throws NullPointerException if the specified element is null
  • */
  • public boolean add(E e) {
  • return super.add(e);
  • }
  • /**
  • * Inserts the specified element at the tail of this queue if it is
  • * possible to do so immediately without exceeding the queue's capacity,
  • * returning {@code true} upon success and {@code false} if this queue
  • * is full. This method is generally preferable to method {@link #add},
  • * which can fail to insert an element only by throwing an exception.
  • *
  • * 添加到队尾方法,与add()相比,offer不会抛出异常,只会返回true/false
  • *
  • * @throws NullPointerException if the specified element is null
  • */
  • public boolean offer(E e) {
  • // 检查添加元素的合法性
  • checkNotNull(e);
  • // 获取锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 如果count与items容量相同,表示数组满了,返回false
  • if (count == items.length)
  • return false;
  • else {
  • // 否则可以添加,调用insert()插入
  • insert(e);
  • return true;
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Inserts the specified element at the tail of this queue, waiting
  • * for space to become available if the queue is full.
  • *
  • * put()方法也用做添加元素到队尾,但与add()不同的是:
  • * add()会立即返回结果,插入成功返回true,如果items满了抛出IllegalStateException异常
  • * put()当在items满了的时候不会立即返回,而会使线程进入阻塞等待,直到可以添加元素
  • *
  • * @throws InterruptedException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public void put(E e) throws InterruptedException {
  • // 检查元素
  • checkNotNull(e);
  • // 获取锁
  • final ReentrantLock lock = this.lock;
  • /**
  • * 这里获取的是可中断锁,会抛出InterruptedException异常
  • * 在等待期间,可以对阻塞线程进行中断操作
  • */
  • lock.lockInterruptibly();
  • try {
  • // 当items满了的时候,调用notFull.await()进行等待
  • while (count == items.length)
  • notFull.await();
  • // 等待结束,插入元素
  • insert(e);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Inserts the specified element at the tail of this queue, waiting
  • * up to the specified wait time for space to become available if
  • * the queue is full.
  • *
  • * 带超时功能的offer()操作
  • *
  • * @throws InterruptedException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  • // 检查元素
  • checkNotNull(e);
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 获取锁
  • final ReentrantLock lock = this.lock;
  • /**
  • * 这里获取的是可中断锁,会抛出InterruptedException异常
  • * 在等待期间,可以对阻塞线程进行中断操作
  • */
  • lock.lockInterruptibly();
  • try {
  • // 当items满了的时候,如果nanos小于0,表示已经超时了,直接返回false
  • while (count == items.length) {
  • if (nanos <= 0)
  • return false;
  • // 否则进行超时等待
  • nanos = notFull.awaitNanos(nanos);
  • }
  • // 等待结束,插入元素
  • insert(e);
  • return true;
  • } finally {
  • // 解锁操作
  • lock.unlock();
  • }
  • }
  • // 获取队首元素操作,会立即返回
  • public E poll() {
  • // 获取锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 如果items为空就返回null,否则使用extract()取出返回
  • return (count == 0) ? null : extract();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 获取队首元素操作,不会立即返回,如果items为空会进入阻塞等待
  • public E take() throws InterruptedException {
  • // 获取可中断锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // 如果items为空就进入等待
  • while (count == 0)
  • notEmpty.await();
  • // 等待结束,使用extract()取出返回
  • return extract();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 带有超时功能的poll()操作
  • public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 获取可中断锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // 如果items为空就进入等待
  • while (count == 0) {
  • // 如果超时了还没有等到,就返回null
  • if (nanos <= 0)
  • return null;
  • // 更新超时时间
  • nanos = notEmpty.awaitNanos(nanos);
  • }
  • // 等待结束,使用extract()取出返回
  • return extract();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 查看队首元素操作,与获取不同的是,该方法不会将元素从队中取出,只是获取该元素的引用
  • public E peek() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 如果items为空就返回null,否则使用itemAt()获取返回
  • return (count == 0) ? null : itemAt(takeIndex);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // this doc comment is overridden to remove the reference to collections
  • // greater in size than Integer.MAX_VALUE
  • /**
  • * Returns the number of elements in this queue.
  • *
  • * @return the number of elements in this queue
  • */
  • public int size() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 返回count
  • return count;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // this doc comment is a modified copy of the inherited doc comment,
  • // without the reference to unlimited queues.
  • /**
  • * Returns the number of additional elements that this queue can ideally
  • * (in the absence of memory or resource constraints) accept without
  • * blocking. This is always equal to the initial capacity of this queue
  • * less the current {@code size} of this queue.
  • *
  • * <p>Note that you <em>cannot</em> always tell if an attempt to insert
  • * an element will succeed by inspecting {@code remainingCapacity}
  • * because it may be the case that another thread is about to
  • * insert or remove an element.
  • *
  • * 返回items空闲空间大小
  • */
  • public int remainingCapacity() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 计算并返回空闲空间大小
  • return items.length - count;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Removes a single instance of the specified element from this queue,
  • * if it is present. More formally, removes an element {@code e} such
  • * that {@code o.equals(e)}, if this queue contains one or more such
  • * elements.
  • * Returns {@code true} if this queue contained the specified element
  • * (or equivalently, if this queue changed as a result of the call).
  • *
  • * <p>Removal of interior elements in circular array based queues
  • * is an intrinsically slow and disruptive operation, so should
  • * be undertaken only in exceptional circumstances, ideally
  • * only when the queue is known not to be accessible by other
  • * threads.
  • *
  • * 移除特定的元素,移除成功返回true,否则返回false(不存在时也会返回false)
  • *
  • * @param o element to be removed from this queue, if present
  • * @return {@code true} if this queue changed as a result of the call
  • */
  • public boolean remove(Object o) {
  • // 当传入的o为null时直接返回false
  • if (o == null) return false;
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 遍历items数组
  • for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
  • // 当遍历到的元素与o相同
  • if (o.equals(items[i])) {
  • // 移除o,并返回true
  • removeAt(i);
  • return true;
  • }
  • }
  • // 否则返回false
  • return false;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns {@code true} if this queue contains the specified element.
  • * More formally, returns {@code true} if and only if this queue contains
  • * at least one element {@code e} such that {@code o.equals(e)}.
  • *
  • * 判断是否包含某个元素
  • *
  • * @param o object to be checked for containment in this queue
  • * @return {@code true} if this queue contains the specified element
  • */
  • public boolean contains(Object o) {
  • // 如果o为null直接返回false
  • if (o == null) return false;
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 遍历items数组
  • for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
  • // 当遍历到的元素与o相同时直接返回true
  • if (o.equals(items[i]))
  • return true;
  • // 否则返回false
  • return false;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns an array containing all of the elements in this queue, in
  • * proper sequence.
  • *
  • * <p>The returned array will be "safe" in that no references to it are
  • * maintained by this queue. (In other words, this method must allocate
  • * a new array). The caller is thus free to modify the returned array.
  • *
  • * <p>This method acts as bridge between array-based and collection-based
  • * APIs.
  • *
  • * 转为一个Object数组
  • *
  • * @return an array containing all of the elements in this queue
  • */
  • public Object[] toArray() {
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 根据count创建一个Object数组
  • final int count = this.count;
  • Object[] a = new Object[count];
  • // 遍历items数组,把元素都装入新创建的Object数组中
  • for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
  • a[k] = items[i];
  • return a;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns an array containing all of the elements in this queue, in
  • * proper sequence; the runtime type of the returned array is that of
  • * the specified array. If the queue fits in the specified array, it
  • * is returned therein. Otherwise, a new array is allocated with the
  • * runtime type of the specified array and the size of this queue.
  • *
  • * <p>If this queue fits in the specified array with room to spare
  • * (i.e., the array has more elements than this queue), the element in
  • * the array immediately following the end of the queue is set to
  • * {@code null}.
  • *
  • * <p>Like the {@link #toArray()} method, this method acts as bridge between
  • * array-based and collection-based APIs. Further, this method allows
  • * precise control over the runtime type of the output array, and may,
  • * under certain circumstances, be used to save allocation costs.
  • *
  • * <p>Suppose {@code x} is a queue known to contain only strings.
  • * The following code can be used to dump the queue into a newly
  • * allocated array of {@code String}:
  • *
  • * <pre>
  • * String[] y = x.toArray(new String[0]);</pre>
  • *
  • * Note that {@code toArray(new Object[0])} is identical in function to
  • * {@code toArray()}.
  • *
  • * 将items中的元素装入传入的a,当a容量不足时,会使用反射技术创建一个新的a
  • *
  • * @param a the array into which the elements of the queue are to
  • * be stored, if it is big enough; otherwise, a new array of the
  • * same runtime type is allocated for this purpose
  • * @return an array containing all of the elements in this queue
  • * @throws ArrayStoreException if the runtime type of the specified array
  • * is not a supertype of the runtime type of every element in
  • * this queue
  • * @throws NullPointerException if the specified array is null
  • */
  • @SuppressWarnings("unchecked")
  • public <T> T[] toArray(T[] a) {
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 计算a容量是否足够
  • final int count = this.count;
  • final int len = a.length;
  • // 当a的大小小于count时,根据count创建一个新的a数组
  • if (len < count)
  • a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
  • // 遍历items数组,把元素都装入a数组中
  • for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
  • a[k] = (T) items[i];
  • // 当a容量大于count时,将a中count位置上的元素置为null
  • if (len > count)
  • a[count] = null;
  • return a;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • public String toString() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // count为0,直接返回[]
  • int k = count;
  • if (k == 0)
  • return "[]";
  • // 否则进行拼接
  • StringBuilder sb = new StringBuilder();
  • sb.append('[');
  • // 遍历items数组
  • for (int i = takeIndex; ; i = inc(i)) {
  • Object e = items[i];
  • // 当某个元素引用的是自己时,显示"(this Collection)"
  • sb.append(e == this ? "(this Collection)" : e);
  • if (--k == 0)
  • // 最后的位置添加]
  • return sb.append(']').toString();
  • sb.append(',').append(' ');
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Atomically removes all of the elements from this queue.
  • * The queue will be empty after this call returns.
  • *
  • * 清空ArrayBlockingQueue
  • */
  • public void clear() {
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 遍历items数组,将所有位置的元素置为null
  • for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
  • items[i] = null;
  • // count、putIndex、takeIndex都置为0
  • count = 0;
  • putIndex = 0;
  • takeIndex = 0;
  • // 唤醒在notFull上等待的线程
  • notFull.signalAll();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 将所有的元素从ArrayBlockingQueue中移除,并添加到给定的Collection中
  • *
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c) {
  • // 检查参数
  • checkNotNull(c);
  • // c不能是自己,否则抛出异常
  • if (c == this)
  • throw new IllegalArgumentException();
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • int i = takeIndex;
  • int n = 0;
  • int max = count;
  • // 循环添加元素到c中
  • while (n < max) {
  • c.add(this.<E>cast(items[i]));
  • // 将items中i位置的元素都置为null
  • items[i] = null;
  • i = inc(i);
  • ++n;
  • }
  • // 如果n大于0,表示items被修改了
  • if (n > 0) {
  • // 将count、putIndex、takeIndex都置为0
  • count = 0;
  • putIndex = 0;
  • takeIndex = 0;
  • // 唤醒在notFull上等待的线程
  • notFull.signalAll();
  • }
  • // 返回移动元素的个数
  • return n;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c, int maxElements) {
  • // 检查参数
  • checkNotNull(c);
  • // c不能是自己,否则抛出异常
  • if (c == this)
  • throw new IllegalArgumentException();
  • // 检查参数
  • if (maxElements <= 0)
  • return 0;
  • // 引用items数组
  • final Object[] items = this.items;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • int i = takeIndex;
  • int n = 0;
  • // 根据maxElements和count的大小决定max的大小
  • int max = (maxElements < count) ? maxElements : count;
  • // 循环添加元素到c中
  • while (n < max) {
  • c.add(this.<E>cast(items[i]));
  • // 将items中i位置的元素都置为null
  • items[i] = null;
  • i = inc(i);
  • ++n;
  • }
  • // 如果n大于0,表示items被修改了
  • if (n > 0) {
  • // count减去移走的元素个数,takeIndex置为0
  • count -= n;
  • takeIndex = i;
  • // 唤醒在notFull上等待的线程
  • notFull.signalAll();
  • }
  • // 返回移动元素的个数
  • return n;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns an iterator over the elements in this queue in proper sequence.
  • * The elements will be returned in order from first (head) to last (tail).
  • *
  • * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
  • * will never throw {@link java.util.ConcurrentModificationException
  • * ConcurrentModificationException},
  • * and guarantees to traverse elements as they existed upon
  • * construction of the iterator, and may (but is not guaranteed to)
  • * reflect any modifications subsequent to construction.
  • *
  • * 返回迭代器
  • *
  • * @return an iterator over the elements in this queue in proper sequence
  • */
  • public Iterator<E> iterator() {
  • return new Itr();
  • }
  • /**
  • * Iterator for ArrayBlockingQueue. To maintain weak consistency
  • * with respect to puts and takes, we (1) read ahead one slot, so
  • * as to not report hasNext true but then not have an element to
  • * return -- however we later recheck this slot to use the most
  • * current value; (2) ensure that each array slot is traversed at
  • * most once (by tracking "remaining" elements); (3) skip over
  • * null slots, which can occur if takes race ahead of iterators.
  • * However, for circular array-based queues, we cannot rely on any
  • * well established definition of what it means to be weakly
  • * consistent with respect to interior removes since these may
  • * require slot overwrites in the process of sliding elements to
  • * cover gaps. So we settle for resiliency, operating on
  • * established apparent nexts, which may miss some elements that
  • * have moved between calls to next.
  • *
  • * 在遍历过程中,如果其他线程的操作导致ArrayBlockingQueue中的元素个数发生改变
  • * 可能会抛出NoSuchElementException异常
  • */
  • private class Itr implements Iterator<E> {
  • // 剩余元素个数
  • private int remaining; // Number of elements yet to be returned
  • // 下一个应该返回的元素的索引
  • private int nextIndex; // Index of element to be returned by next
  • // 下一个应该返回的元素
  • private E nextItem; // Element to be returned by next call to next
  • // 上一次返回的元素
  • private E lastItem; // Element returned by last call to next
  • // 上一次返回的元素的索引,当没有时置为-1
  • private int lastRet; // Index of last element returned, or -1 if none
  • Itr() {
  • // 加锁
  • final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  • lock.lock();
  • try {
  • // lastRet置为-1
  • lastRet = -1;
  • // 初始化remaining为count
  • if ((remaining = count) > 0)
  • /**
  • * 当items中有元素时,nextItem就为索引takeIndex上的元素,
  • * 并将takeIndex赋值给nextIndex
  • */
  • nextItem = itemAt(nextIndex = takeIndex);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 判断是否还有下一个元素
  • public boolean hasNext() {
  • // 当还有剩余时,说明还有下一个
  • return remaining > 0;
  • }
  • // 获取下一个元素
  • public E next() {
  • // 加锁
  • final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  • lock.lock();
  • try {
  • // 如果没有剩余了,就抛出异常
  • if (remaining <= 0)
  • throw new NoSuchElementException();
  • // 先使用lastRet记录nextIndex
  • lastRet = nextIndex;
  • // 获得nextIndex索引位置的元素
  • E x = itemAt(nextIndex); // check for fresher value
  • /**
  • * 如果获得的元素为null,将使用旧的元素
  • * 这个旧的元素会在上次调用next()的时候被记录
  • */
  • if (x == null) {
  • x = nextItem; // we are forced to report old value
  • lastItem = null; // but ensure remove fails
  • }
  • else
  • // 否则返回获得的元素
  • lastItem = x;
  • // 当还有剩余的时候,记录下一个位置的元素
  • while (--remaining > 0 && // skip over nulls
  • (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
  • ;
  • return x;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • public void remove() {
  • // 加锁
  • final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  • lock.lock();
  • try {
  • // 上一次返回的元素的索引,如果为-1,表示items已经为空,直接抛出异常
  • int i = lastRet;
  • if (i == -1)
  • throw new IllegalStateException();
  • // 将上一次返回的元素的索引置为-1
  • lastRet = -1;
  • // 记录上次返回的元素
  • E x = lastItem;
  • // 将上一次返回的元素置为null
  • lastItem = null;
  • // only remove if item still at index
  • // 只有当这个元素还存在于items的i位置上时,才会移除
  • if (x != null && x == items[i]) {
  • // 判断移除的是否是队首元素
  • boolean removingHead = (i == takeIndex);
  • // 移除元素
  • removeAt(i);
  • // 如果不是队首元素,需要维护nextIndex
  • if (!removingHead)
  • nextIndex = dec(nextIndex);
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • }
  • }

从对ArrayBlockingQueue源码的分析可知,其实ArrayBlockingQueue内部是以一个数组维护的循环队列,通过Condition条件队列构成了生产者-消费者的模型;在每次对数组中的元素进行操作时都会加锁以保障数据在线程并发情况下的安全性。

具体的实现原理上面源码中的注释已经讲解得非常详细了,大家如有疑问可以仔细阅读,这里不再赘述了。

4. ArrayBlockingQueue示例

下面提供一个简单的ArrayBlockingQueue的示例:

  • package com.coderap.collection;
  • import java.util.Iterator;
  • import java.util.LinkedList;
  • import java.util.Queue;
  • import java.util.concurrent.ArrayBlockingQueue;
  • public class ArrayBlockingQueueTest {
  • // 循环次数
  • private final static int loopCount = 4;
  • // 操作的linkedList和ArrayBlockingQueue
  • private final static LinkedList<String> linkedList = new LinkedList<>();
  • private final static ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue(8);
  • public static void main(String[] args) {
  • new Thread(new ArrayBlockingQueueTest.OperateThread(arrayBlockingQueue, loopCount), "T-1").start();
  • new Thread(new ArrayBlockingQueueTest.OperateThread(arrayBlockingQueue, loopCount), "T-2").start();
  • //new Thread(new ArrayBlockingQueueTest.OperateThread(linkedList, loopCount), "T-3").start();
  • //new Thread(new ArrayBlockingQueueTest.OperateThread(linkedList, loopCount), "T-4").start();
  • }
  • // 遍历方法
  • private static void iterate(Queue queue) {
  • StringBuilder stringBuilder = new StringBuilder();
  • Iterator iterator = queue.iterator();
  • while (iterator.hasNext()) {
  • stringBuilder.append(iterator.next());
  • stringBuilder.append(", ");
  • }
  • // 删除最后多余的字符
  • stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length() - 1);
  • // 打印
  • System.out.println(Thread.currentThread().getName() + " iterate: " + stringBuilder.toString());
  • }
  • private static class OperateThread implements Runnable {
  • private Queue queue;
  • private int loopCount;
  • public OperateThread(Queue queue, int loopCount) {
  • this.queue = queue;
  • this.loopCount = loopCount;
  • }
  • @Override
  • public void run() {
  • // 循环添加并遍历打印
  • while (loopCount > 0) {
  • queue.add(Thread.currentThread().getName() + " - " + loopCount);
  • iterate(queue);
  • loopCount--;
  • }
  • }
  • }
  • }

某一次运行结果如下:

  • T-1 iterate: T-1 - 4, T-2 - 4
  • T-2 iterate: T-1 - 4, T-2 - 4
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2, T-1 - 1
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2, T-1 - 1
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2, T-1 - 1, T-2 - 1