Java多线程
Java并发
JUC集合

Java多线程 37 - LinkedBlockingQueue详解

简介:LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

1. LinkedBlockingQueue介绍

LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量;如果不指定,默认容量大小为Integer.MAX_VALUE

LinkedBlockingQueue的类图如下图所示:

1.LinkedBlockingQueue类图结构.png

  1. LinkedBlockingQueue继承于AbstractQueue,本质上是一个FIFO(先进先出)的队列。
  2. LinkedBlockingQueue实现了BlockingQueue接口,支持多线程并发;当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
  3. LinkedBlockingQueue是通过单链表实现的。
    • head是链表的表头。获取数据时,都是从表头head处取出。
    • last是链表的表尾。新增数据时,都是从表尾last处插入。
    • count是链表的实际大小,即当前链表中包含的节点个数。
    • capacity是列表的容量,它是在创建链表时指定的。
    • putLock是插入锁,takeLock是取出锁;notEmpty是非空条件,notFull是未满条件。通过它们对链表进行并发控制。
  4. LinkedBlockingQueue在实现多线程对竞争资源的互斥访问时,对于添加和取出操作分别使用了不同的锁。对于添加操作,通过添加锁putLock进行同步;对于取出操作,通过取出锁takeLock进行同步。此外,添加锁putLock和非满条件notFull相关联,取出锁takeLock和非空条件notEmpty相关联,通过notFullnotEmpty更细腻地控制锁。
    • 若线程A要取出数据时,队列正好为空,则线程A会执行notEmpty.await()进行等待;当其它线程B向队列中添加了数据之后,会调用notEmpty.signal()唤醒notEmpty上的等待线程;此时线程A会被唤醒从而得以继续运行。此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock
    • 若某线程H要插入数据时,队列已满,则线程H会执行notFull.await()进行等待;当其它线程I取出数据之后,会调用notFull.signal()唤醒notFull上的等待线程。此时线程H就会被唤醒从而得以继续运行。此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock

关于ReentrantLock以及Condition等更多的内容,可以参考:

  1. Java多线程 16——ReentrantLock互斥锁
  2. Java多线程 20——AbstractQueuedSynchronizer详解(一)
  3. Java多线程 21——AbstractQueuedSynchronizer详解(二)
  4. Java多线程 22——AbstractQueuedSynchronizer详解(三)
  5. Java多线程 18——Condition

LinkedBlockingQueue函数列表如下:

  • // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue
  • LinkedBlockingQueue()
  • // 创建一个容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含给定collection的元素,元素按该collection迭代器的遍历顺序添加
  • LinkedBlockingQueue(Collection<? extends E> c)
  • // 创建一个具有给定(固定)容量的LinkedBlockingQueue
  • LinkedBlockingQueue(int capacity)
  • // 从队列彻底移除所有元素
  • void clear()
  • // 移除此队列中所有可用的元素,并将它们添加到给定collection中
  • int drainTo(Collection<? super E> c)
  • // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中
  • int drainTo(Collection<? super E> c, int maxElements)
  • // 返回在队列中的元素上按适当顺序进行迭代的迭代器
  • Iterator<E> iterator()
  • // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回true,如果此队列已满,则返回false
  • boolean offer(E e)
  • // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用
  • boolean offer(E e, long timeout, TimeUnit unit)
  • // 获取但不移除此队列的头;如果此队列为空,则返回null
  • E peek()
  • // 获取并移除此队列的头,如果此队列为空,则返回null
  • E poll()
  • // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
  • E poll(long timeout, TimeUnit unit)
  • // 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用
  • void put(E e)
  • // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量
  • int remainingCapacity()
  • // 从此队列移除指定元素的单个实例(如果存在)
  • boolean remove(Object o)
  • // 返回队列中的元素个数
  • int size()
  • // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
  • E take()
  • // 返回按适当顺序包含此队列中所有元素的数组
  • Object[] toArray()
  • // 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型
  • <T> T[] toArray(T[] a)
  • // 返回此collection的字符串表示形式
  • String toString()

2. LinkedBlockingQueue源码解析

下面是LinkedBlockingQueue的源码解析,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.concurrent.atomic.AtomicInteger;
  • import java.util.concurrent.locks.Condition;
  • import java.util.concurrent.locks.ReentrantLock;
  • import java.util.AbstractQueue;
  • import java.util.Collection;
  • import java.util.Iterator;
  • import java.util.NoSuchElementException;
  • public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  • implements BlockingQueue<E>, java.io.Serializable {
  • private static final long serialVersionUID = -6903933977591709194L;
  • /**
  • * Linked list node class
  • * 链表的节点
  • */
  • static class Node<E> {
  • // 保存数据的属性
  • E item;
  • /**
  • * One of:
  • * - the real successor Node
  • * - this Node, meaning the successor is head.next
  • * - null, meaning there is no successor (this is the last node)
  • * 三种情况之一:
  • * - 表示后继节点;
  • * - 如果值是自己,表示后继节点是head.next
  • * - null,表示没有后继,即该节点是尾节点
  • */
  • Node<E> next;
  • // 构造方法,传入x赋值
  • Node(E x) { item = x; }
  • }
  • /**
  • * The capacity bound, or Integer.MAX_VALUE if none
  • * 容量限制,如果没有设置就为Integer.MAX_VALUE
  • * */
  • private final int capacity;
  • /**
  • * Current number of elements
  • * 当前元素的总个数
  • * */
  • private final AtomicInteger count = new AtomicInteger(0);
  • /**
  • * Head of linked list.
  • * Invariant: head.item == null
  • * 链表头节点
  • */
  • private transient Node<E> head;
  • /**
  • * Tail of linked list.
  • * Invariant: last.next == null
  • * 链表尾节点
  • */
  • private transient Node<E> last;
  • /**
  • * Lock held by take, poll, etc
  • * 被take,poll等操作持有的锁
  • * */
  • private final ReentrantLock takeLock = new ReentrantLock();
  • /**
  • * Wait queue for waiting takes
  • * 获取操作的等待条件
  • * */
  • private final Condition notEmpty = takeLock.newCondition();
  • /**
  • * Lock held by put, offer, etc
  • * 被put,offer等操作持有的锁
  • * */
  • private final ReentrantLock putLock = new ReentrantLock();
  • /**
  • * Wait queue for waiting puts
  • * 添加操作的等待条件
  • * */
  • private final Condition notFull = putLock.newCondition();
  • /**
  • * Signals a waiting take. Called only from put/offer (which do not
  • * otherwise ordinarily lock takeLock.)
  • *
  • * 唤醒等待在notEmpty条件上的线程
  • */
  • private void signalNotEmpty() {
  • // 获取takeLock锁
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lock();
  • try {
  • // 唤醒
  • notEmpty.signal();
  • } finally {
  • // 解开takeLock锁
  • takeLock.unlock();
  • }
  • }
  • /**
  • * Signals a waiting put. Called only from take/poll.
  • *
  • * 唤醒等待在notFull条件上的线程
  • */
  • private void signalNotFull() {
  • // 获取putLock锁
  • final ReentrantLock putLock = this.putLock;
  • putLock.lock();
  • try {
  • // 唤醒
  • notFull.signal();
  • } finally {
  • // 解开putLock锁
  • putLock.unlock();
  • }
  • }
  • /**
  • * Links node at end of queue.
  • *
  • * 向队列中添加一个元素到队尾
  • *
  • * @param node the node
  • */
  • private void enqueue(Node<E> node) {
  • // assert putLock.isHeldByCurrentThread();
  • // assert last.next == null;
  • // 将last的next置为新元素,并将last也置为新元素
  • last = last.next = node;
  • }
  • /**
  • * Removes a node from head of queue.
  • *
  • * 从队首获取并移除一个元素
  • *
  • * 从取出元素的方式可知,链表的结构如下
  • *
  • * head(item=null) -> first -> second -> ... -> last(next=null)
  • *
  • * 头节点的item为null,尾节点的next为null
  • *
  • * @return the node
  • */
  • private E dequeue() {
  • // assert takeLock.isHeldByCurrentThread();
  • // assert head.item == null;
  • // 获取头
  • Node<E> h = head;
  • // 获取头的后继
  • Node<E> first = h.next;
  • // 将头的后继置为自己,协助GC
  • h.next = h; // help GC
  • // 将head指向first
  • head = first;
  • // 将first的item取出
  • E x = first.item;
  • // 将first的item置为null
  • first.item = null;
  • // 返回之前取出的first的item
  • return x;
  • }
  • /**
  • * Lock to prevent both puts and takes.
  • * 将putLock和takeLock都上锁,阻止添加或获取
  • */
  • void fullyLock() {
  • putLock.lock();
  • takeLock.lock();
  • }
  • /**
  • * Unlock to allow both puts and takes.
  • * 将putLock和takeLock都解锁,允许添加或获取
  • */
  • void fullyUnlock() {
  • takeLock.unlock();
  • putLock.unlock();
  • }
  • // /**
  • // * Tells whether both locks are held by current thread.
  • // */
  • // boolean isFullyLocked() {
  • // return (putLock.isHeldByCurrentThread() &&
  • // takeLock.isHeldByCurrentThread());
  • // }
  • /**
  • * Creates a {@code LinkedBlockingQueue} with a capacity of
  • * {@link Integer#MAX_VALUE}.
  • * 默认容量为Integer.MAX_VALUE
  • */
  • public LinkedBlockingQueue() {
  • this(Integer.MAX_VALUE);
  • }
  • /**
  • * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
  • *
  • * 给定容量创建LinkedBlockingQueue
  • *
  • * @param capacity the capacity of this queue
  • * @throws IllegalArgumentException if {@code capacity} is not greater
  • * than zero
  • */
  • public LinkedBlockingQueue(int capacity) {
  • // 检查参数
  • if (capacity <= 0) throw new IllegalArgumentException();
  • // 记录容量
  • this.capacity = capacity;
  • // 初始化的链表head和last都指向了一个item值为null的Node节点
  • last = head = new Node<E>(null);
  • }
  • /**
  • * Creates a {@code LinkedBlockingQueue} with a capacity of
  • * {@link Integer#MAX_VALUE}, initially containing the elements of the
  • * given collection,
  • * added in traversal order of the collection's iterator.
  • *
  • * 给定一个Collection,创建一个默认容量的LinkedBlockingQueue
  • * 将Collection中的元素添加到该LinkedBlockingQueue
  • *
  • * @param c the collection of elements to initially contain
  • * @throws NullPointerException if the specified collection or any
  • * of its elements are null
  • */
  • public LinkedBlockingQueue(Collection<? extends E> c) {
  • // 按默认容量创建LinkedBlockingQueue
  • this(Integer.MAX_VALUE);
  • // 加putLock锁
  • final ReentrantLock putLock = this.putLock;
  • putLock.lock(); // Never contended, but necessary for visibility
  • try {
  • // 记录已经添加了多少个元素
  • int n = 0;
  • // 循环遍历Collection中的元素
  • for (E e : c) {
  • // 检查元素
  • if (e == null)
  • throw new NullPointerException();
  • // 如果Collection的元素超过Integer.MAX_VALUE,会抛出异常
  • if (n == capacity)
  • throw new IllegalStateException("Queue full");
  • // 元素入队
  • enqueue(new Node<E>(e));
  • // 技计数
  • ++n;
  • }
  • // 修改count值
  • count.set(n);
  • } finally {
  • // 解锁
  • putLock.unlock();
  • }
  • }
  • // this doc comment is overridden to remove the reference to collections
  • // greater in size than Integer.MAX_VALUE
  • /**
  • * Returns the number of elements in this queue.
  • *
  • * 获取队列中元素的个数
  • *
  • * @return the number of elements in this queue
  • */
  • public int size() {
  • // count是AutomaticInteger值
  • return count.get();
  • }
  • // this doc comment is a modified copy of the inherited doc comment,
  • // without the reference to unlimited queues.
  • /**
  • * Returns the number of additional elements that this queue can ideally
  • * (in the absence of memory or resource constraints) accept without
  • * blocking. This is always equal to the initial capacity of this queue
  • * less the current {@code size} of this queue.
  • *
  • * <p>Note that you <em>cannot</em> always tell if an attempt to insert
  • * an element will succeed by inspecting {@code remainingCapacity}
  • * because it may be the case that another thread is about to
  • * insert or remove an element.
  • *
  • * 查看剩余容量
  • */
  • public int remainingCapacity() {
  • return capacity - count.get();
  • }
  • /**
  • * Inserts the specified element at the tail of this queue, waiting if
  • * necessary for space to become available.
  • *
  • * 添加元素到队尾,该方法会在队列已满的情况下阻塞,可中断
  • *
  • * @throws InterruptedException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public void put(E e) throws InterruptedException {
  • // 检查元素
  • if (e == null) throw new NullPointerException();
  • // Note: convention in all put/take/etc is to preset local var
  • // holding count negative to indicate failure unless set.
  • int c = -1;
  • // 构建新节点
  • Node<E> node = new Node(e);
  • // 引用putLock锁
  • final ReentrantLock putLock = this.putLock;
  • // 获取数量
  • final AtomicInteger count = this.count;
  • // 上锁,可中断
  • putLock.lockInterruptibly();
  • try {
  • /*
  • * Note that count is used in wait guard even though it is
  • * not protected by lock. This works because count can
  • * only decrease at this point (all other puts are shut
  • * out by lock), and we (or some other waiting put) are
  • * signalled if it ever changes from capacity. Similarly
  • * for all other uses of count in other wait guards.
  • */
  • // 当容量已满时,使用notFull条件进入等待
  • while (count.get() == capacity) {
  • notFull.await();
  • }
  • // 到此处说明容量还有空闲
  • enqueue(node);
  • /**
  • * 对已有数量进行自增,
  • * 并检查自增前已有元素数量+1是否达到容量上限,如果没有就唤醒等待在notFull上的线程
  • */
  • c = count.getAndIncrement();
  • if (c + 1 < capacity)
  • notFull.signal();
  • } finally {
  • // 解锁
  • putLock.unlock();
  • }
  • // 如果自增前已有元素数量为0,则唤醒等待在notEmpty上的线程
  • if (c == 0)
  • signalNotEmpty();
  • }
  • /**
  • * Inserts the specified element at the tail of this queue, waiting if
  • * necessary up to the specified wait time for space to become available.
  • *
  • * 添加元素到队尾,该方法带有超时机制
  • *
  • * @return {@code true} if successful, or {@code false} if
  • * the specified waiting time elapses before space is available.
  • * @throws InterruptedException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public boolean offer(E e, long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • int c = -1;
  • // 引用putLock锁
  • final ReentrantLock putLock = this.putLock;
  • // 获取数量
  • final AtomicInteger count = this.count;
  • // 上锁,可中断
  • putLock.lockInterruptibly();
  • try {
  • // 当容量已满时
  • while (count.get() == capacity) {
  • // 超时时间已过,直接返回false
  • if (nanos <= 0)
  • return false;
  • // 否则使用notFull条件进入等待
  • nanos = notFull.awaitNanos(nanos);
  • }
  • // 到此处说明容量还有空闲
  • enqueue(new Node<E>(e));
  • /**
  • * 对已有数量进行自增,
  • * 并检查自增前已有元素数量+1是否达到容量上限,如果没有就唤醒等待在notFull上的线程
  • */
  • c = count.getAndIncrement();
  • if (c + 1 < capacity)
  • notFull.signal();
  • } finally {
  • // 解锁
  • putLock.unlock();
  • }
  • // 如果自增前已有元素数量为0,则唤醒等待在notEmpty上的线程
  • if (c == 0)
  • signalNotEmpty();
  • // 到这里说明添加成功,返回true
  • return true;
  • }
  • /**
  • * Inserts the specified element at the tail of this queue if it is
  • * possible to do so immediately without exceeding the queue's capacity,
  • * returning {@code true} upon success and {@code false} if this queue
  • * is full.
  • * When using a capacity-restricted queue, this method is generally
  • * preferable to method {@link BlockingQueue#add add}, which can fail to
  • * insert an element only by throwing an exception.
  • *
  • * 添加元素到队尾,该方法会立即返回添加的结果
  • *
  • * @throws NullPointerException if the specified element is null
  • */
  • public boolean offer(E e) {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 获取数量
  • final AtomicInteger count = this.count;
  • // 如果容量已满,则立即返回false,添加失败
  • if (count.get() == capacity)
  • return false;
  • int c = -1;
  • Node<E> node = new Node(e);
  • // 引用putLock锁
  • final ReentrantLock putLock = this.putLock;
  • // 上锁
  • putLock.lock();
  • try {
  • // 容量未满
  • if (count.get() < capacity) {
  • // 添加元素
  • enqueue(node);
  • /**
  • * 对已有数量进行自增,
  • * 并检查自增前已有元素数量+1是否达到容量上限,如果没有就唤醒等待在notFull上的线程
  • */
  • c = count.getAndIncrement();
  • if (c + 1 < capacity)
  • notFull.signal();
  • }
  • } finally {
  • // 解锁
  • putLock.unlock();
  • }
  • // 如果自增前已有元素数量为0,则唤醒等待在notEmpty上的线程
  • if (c == 0)
  • signalNotEmpty();
  • // 到这里c大于0说明添加成功,返回true
  • return c >= 0;
  • }
  • // 取出队首元素,该方法在链表为空时会进入等待状态,可中断
  • public E take() throws InterruptedException {
  • E x;
  • int c = -1;
  • // 获取count
  • final AtomicInteger count = this.count;
  • // 加锁,可中断
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lockInterruptibly();
  • try {
  • // 当count为0时,表示链表中没有数据,使用notEmpty进入等待
  • while (count.get() == 0) {
  • notEmpty.await();
  • }
  • // 走到这里表示链表中有数据,出队
  • x = dequeue();
  • /**
  • * 对已有数量进行自减,
  • * 并检查自减前已有元素数量是否大于1,如果是就唤醒等待在notEmpty上的线程
  • */
  • c = count.getAndDecrement();
  • if (c > 1)
  • notEmpty.signal();
  • } finally {
  • // 解锁
  • takeLock.unlock();
  • }
  • // 当自减前已有元素数量等于容量,唤醒等待在notFull上的线程
  • if (c == capacity)
  • signalNotFull();
  • // 返回取到的元素
  • return x;
  • }
  • // 取出队首元素,该方法带有超时机制
  • public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  • E x = null;
  • int c = -1;
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 获取count
  • final AtomicInteger count = this.count;
  • // 加锁,可中断
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lockInterruptibly();
  • try {
  • // 当count为0时,表示链表中没有数据
  • while (count.get() == 0) {
  • // 检查超时时间,如果已超时,直接返回null
  • if (nanos <= 0)
  • return null;
  • // 未超时,使用notEmpty进入等待
  • nanos = notEmpty.awaitNanos(nanos);
  • }
  • // 走到这里表示链表中有数据,出队
  • x = dequeue();
  • /**
  • * 对已有数量进行自减,
  • * 并检查自减前已有元素数量是否大于1,如果是就唤醒等待在notEmpty上的线程
  • */
  • c = count.getAndDecrement();
  • if (c > 1)
  • notEmpty.signal();
  • } finally {
  • // 解锁
  • takeLock.unlock();
  • }
  • // 当自减前已有元素数量等于容量,唤醒等待在notFull上的线程
  • if (c == capacity)
  • signalNotFull();
  • // 返回取到的元素
  • return x;
  • }
  • // 取出队首元素,会立即返回结果
  • public E poll() {
  • // 获取count
  • final AtomicInteger count = this.count;
  • // 当count为0时链表为空,直接返回null
  • if (count.get() == 0)
  • return null;
  • // 走到此处说明链表不为空
  • E x = null;
  • int c = -1;
  • // 加锁
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lock();
  • try {
  • // 当已有元素数量大于0时
  • if (count.get() > 0) {
  • // 出队
  • x = dequeue();
  • /**
  • * 对已有数量进行自减,
  • * 并检查自减前已有元素数量是否大于1,如果是就唤醒等待在notEmpty上的线程
  • */
  • c = count.getAndDecrement();
  • if (c > 1)
  • notEmpty.signal();
  • }
  • } finally {
  • // 解锁
  • takeLock.unlock();
  • }
  • // 当自减前已有元素数量等于容量,唤醒等待在notFull上的线程
  • if (c == capacity)
  • signalNotFull();
  • // 返回取到的元素
  • return x;
  • }
  • // 查看队首元素
  • public E peek() {
  • // 当count为0时链表为空,直接返回null
  • if (count.get() == 0)
  • return null;
  • // 加锁
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lock();
  • try {
  • // 获取head的后继为first
  • Node<E> first = head.next;
  • // 如果first为null,直接返回null
  • if (first == null)
  • return null;
  • else
  • // 否则返回first的item
  • return first.item;
  • } finally {
  • // 解锁
  • takeLock.unlock();
  • }
  • }
  • /**
  • * Unlinks interior Node p with predecessor trail.
  • * 将p从trail上断开连接
  • */
  • void unlink(Node<E> p, Node<E> trail) {
  • // assert isFullyLocked();
  • // p.next is not changed, to allow iterators that are
  • // traversing p to maintain their weak-consistency guarantee.
  • // 将p的item置为null
  • p.item = null;
  • // 将trail的后继指向p的后继
  • trail.next = p.next;
  • // 如果p之前是尾节点,就将last置为新的尾节点trail
  • if (last == p)
  • last = trail;
  • /**
  • * 对已有数量进行自减,
  • * 并检查自减前已有元素数量是否等于容量,如果是就唤醒等待在notFull上的线程
  • */
  • if (count.getAndDecrement() == capacity)
  • notFull.signal();
  • }
  • /**
  • * Removes a single instance of the specified element from this queue,
  • * if it is present. More formally, removes an element {@code e} such
  • * that {@code o.equals(e)}, if this queue contains one or more such
  • * elements.
  • * Returns {@code true} if this queue contained the specified element
  • * (or equivalently, if this queue changed as a result of the call).
  • *
  • * 移除指定元素
  • *
  • * @param o element to be removed from this queue, if present
  • * @return {@code true} if this queue changed as a result of the call
  • */
  • public boolean remove(Object o) {
  • // 检查参数
  • if (o == null) return false;
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 从头开始向后遍历链表
  • for (Node<E> trail = head, p = trail.next;
  • p != null;
  • trail = p, p = p.next) {
  • /**
  • * 其中,p是trail的后继
  • * 如果遍历到的节点的item与哦相同,就将p从trail后继上断开
  • * 然后返回true
  • */
  • if (o.equals(p.item)) {
  • unlink(p, trail);
  • return true;
  • }
  • }
  • // 走到这里说明没有找到与o相同的元素,返回false
  • return false;
  • } finally {
  • // putLock和takeLock都解锁
  • fullyUnlock();
  • }
  • }
  • /**
  • * Returns {@code true} if this queue contains the specified element.
  • * More formally, returns {@code true} if and only if this queue contains
  • * at least one element {@code e} such that {@code o.equals(e)}.
  • *
  • * 判断指定元素是否存在于链表中
  • *
  • * @param o object to be checked for containment in this queue
  • * @return {@code true} if this queue contains the specified element
  • */
  • public boolean contains(Object o) {
  • // 检查参数
  • if (o == null) return false;
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 从头开始向后遍历链表,如果存在与o相同的元素,就返回true
  • for (Node<E> p = head.next; p != null; p = p.next)
  • if (o.equals(p.item))
  • return true;
  • // 否则返回false
  • return false;
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • /**
  • * Returns an array containing all of the elements in this queue, in
  • * proper sequence.
  • *
  • * <p>The returned array will be "safe" in that no references to it are
  • * maintained by this queue. (In other words, this method must allocate
  • * a new array). The caller is thus free to modify the returned array.
  • *
  • * <p>This method acts as bridge between array-based and collection-based
  • * APIs.
  • *
  • * 转换为Object数组
  • *
  • * @return an array containing all of the elements in this queue
  • */
  • public Object[] toArray() {
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 通过count值创建一个相应大小的Object数组a
  • int size = count.get();
  • Object[] a = new Object[size];
  • // 遍历链表,将元素依次添加到Object数组a中
  • int k = 0;
  • for (Node<E> p = head.next; p != null; p = p.next)
  • a[k++] = p.item;
  • // 返回a
  • return a;
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • /**
  • * Returns an array containing all of the elements in this queue, in
  • * proper sequence; the runtime type of the returned array is that of
  • * the specified array. If the queue fits in the specified array, it
  • * is returned therein. Otherwise, a new array is allocated with the
  • * runtime type of the specified array and the size of this queue.
  • *
  • * <p>If this queue fits in the specified array with room to spare
  • * (i.e., the array has more elements than this queue), the element in
  • * the array immediately following the end of the queue is set to
  • * {@code null}.
  • *
  • * <p>Like the {@link #toArray()} method, this method acts as bridge between
  • * array-based and collection-based APIs. Further, this method allows
  • * precise control over the runtime type of the output array, and may,
  • * under certain circumstances, be used to save allocation costs.
  • *
  • * <p>Suppose {@code x} is a queue known to contain only strings.
  • * The following code can be used to dump the queue into a newly
  • * allocated array of {@code String}:
  • *
  • * <pre>
  • * String[] y = x.toArray(new String[0]);</pre>
  • *
  • * Note that {@code toArray(new Object[0])} is identical in function to
  • * {@code toArray()}.
  • *
  • * 转换为一个指定类型的数组
  • *
  • * @param a the array into which the elements of the queue are to
  • * be stored, if it is big enough; otherwise, a new array of the
  • * same runtime type is allocated for this purpose
  • * @return an array containing all of the elements in this queue
  • * @throws ArrayStoreException if the runtime type of the specified array
  • * is not a supertype of the runtime type of every element in
  • * this queue
  • * @throws NullPointerException if the specified array is null
  • */
  • @SuppressWarnings("unchecked")
  • public <T> T[] toArray(T[] a) {
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 获取count值为size
  • int size = count.get();
  • // 检查a的大小是否小于已有元素数量
  • if (a.length < size)
  • // 如果小于,使用反射创建一个大小为size的新的数组a
  • a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
  • // 遍历链表,将所有元素添加到a中
  • int k = 0;
  • for (Node<E> p = head.next; p != null; p = p.next)
  • a[k++] = (T)p.item;
  • // 添加完所有的元素后,如果a还有空闲空间,全部置为null
  • if (a.length > k)
  • a[k] = null;
  • // 返回a
  • return a;
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • public String toString() {
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 获取头节点的后继节点
  • Node<E> p = head.next;
  • // 如果p为null,表示链表中没有数据,直接返回"[]"
  • if (p == null)
  • return "[]";
  • StringBuilder sb = new StringBuilder();
  • sb.append('[');
  • // 遍历链表,并拼接节点元素
  • for (;;) {
  • E e = p.item;
  • // 如果元素引用的是LinkedBlockingQueue自己,就显示"(this Collection)"
  • sb.append(e == this ? "(this Collection)" : e);
  • p = p.next;
  • // 最后一个节点,拼接"]"
  • if (p == null)
  • return sb.append(']').toString();
  • sb.append(',').append(' ');
  • }
  • } finally {
  • fullyUnlock();
  • }
  • }
  • /**
  • * Atomically removes all of the elements from this queue.
  • * The queue will be empty after this call returns.
  • *
  • * 清除链表
  • */
  • public void clear() {
  • // putLock和takeLock都加锁
  • fullyLock();
  • try {
  • // 遍历链表,将所有节点的next置为自己,协助GC,并将item置为null
  • for (Node<E> p, h = head; (p = h.next) != null; h = p) {
  • h.next = h;
  • p.item = null;
  • }
  • // head也指向last
  • head = last;
  • // assert head.item == null && head.next == null;
  • // 将count设置为0,如果设置之前的值为capacity,则唤醒等待在notFull上的值
  • if (count.getAndSet(0) == capacity)
  • notFull.signal();
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • /**
  • * 将LinkedBlockingQueue中的元素全部转移到给的的Collection中
  • *
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c) {
  • // 调用重载的方法,传入maxElements参数为Integer.MAX_VALUE
  • return drainTo(c, Integer.MAX_VALUE);
  • }
  • /**
  • * 将LinkedBlockingQueue中的元素全部转移到给的的Collection中
  • *
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c, int maxElements) {
  • // 检查参数,c不能为null,不能为当前LinkedBlockingQueue自己
  • if (c == null)
  • throw new NullPointerException();
  • if (c == this)
  • throw new IllegalArgumentException();
  • //
  • boolean signalNotFull = false;
  • // 加锁
  • final ReentrantLock takeLock = this.takeLock;
  • takeLock.lock();
  • try {
  • // 取maxElements和已有元素数量的最小值
  • int n = Math.min(maxElements, count.get());
  • // count.get provides visibility to first n Nodes
  • Node<E> h = head;
  • int i = 0;
  • try {
  • // 遍历链表
  • while (i < n) {
  • // p是h的后继
  • Node<E> p = h.next;
  • // 将节点p的item添加到c中
  • c.add(p.item);
  • // 将p的item置为null
  • p.item = null;
  • // h节点的next置为自己,协助GC
  • h.next = h;
  • // h向后移动
  • h = p;
  • ++i;
  • }
  • // 返回移动的个数
  • return n;
  • } finally {
  • // Restore invariants even if c.add() threw
  • // 当i大于0时,表示有元素被移动了
  • if (i > 0) {
  • // assert h.item == null;
  • // 将head指向h,即新的头节点
  • head = h;
  • // 将count减去i,在减去i之前如果count为capacity,就将signalNotFull置为true
  • signalNotFull = (count.getAndAdd(-i) == capacity);
  • }
  • }
  • } finally {
  • // 解锁
  • takeLock.unlock();
  • if (signalNotFull)
  • // 唤醒等待在notFull上的线程
  • signalNotFull();
  • }
  • }
  • /**
  • * Returns an iterator over the elements in this queue in proper sequence.
  • * The elements will be returned in order from first (head) to last (tail).
  • *
  • * <p>The returned iterator is a "weakly consistent" iterator that
  • * will never throw {@link java.util.ConcurrentModificationException
  • * ConcurrentModificationException}, and guarantees to traverse
  • * elements as they existed upon construction of the iterator, and
  • * may (but is not guaranteed to) reflect any modifications
  • * subsequent to construction.
  • *
  • * 获取迭代器
  • *
  • * @return an iterator over the elements in this queue in proper sequence
  • */
  • public Iterator<E> iterator() {
  • return new Itr();
  • }
  • private class Itr implements Iterator<E> {
  • /*
  • * Basic weakly-consistent iterator. At all times hold the next
  • * item to hand out so that if hasNext() reports true, we will
  • * still have it to return even if lost race with a take etc.
  • */
  • // 当前节点
  • private Node<E> current;
  • // 上次返回的节点
  • private Node<E> lastRet;
  • // 当前元素
  • private E currentElement;
  • Itr() {
  • // 加锁
  • fullyLock();
  • try {
  • // 将current指向第一个有效节点,即head的后继节点
  • current = head.next;
  • // 如果current不为null,将current.item赋值给currentElement
  • if (current != null)
  • currentElement = current.item;
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • // 判断是否还有下一个元素
  • public boolean hasNext() {
  • return current != null;
  • }
  • /**
  • * Returns the next live successor of p, or null if no such.
  • *
  • * Unlike other traversal methods, iterators need to handle both:
  • * - dequeued nodes (p.next == p)
  • * - (possibly multiple) interior removed nodes (p.item == null)
  • *
  • * 获取下一个节点,用于在返回元素之后更新current
  • */
  • private Node<E> nextNode(Node<E> p) {
  • for (;;) {
  • // s是p的后继
  • Node<E> s = p.next;
  • /**
  • * 当s等于p时,表示p其实是已出队的节点
  • * 由于链表是从头部出队,因此此时只需要返回head.next即可
  • */
  • if (s == p)
  • return head.next;
  • /**
  • * 当s为null,表示已经当末尾了,此时返回s(即null)即可
  • * 当s不为null,且s的item不为null,此时s是有效的后继节点,返回s即可
  • */
  • if (s == null || s.item != null)
  • return s;
  • /**
  • * 走到这里表示,p.next != p,p.next != null,但p.next.item == null
  • * 则继续往后遍历
  • */
  • p = s;
  • }
  • }
  • // 获取下一个元素
  • public E next() {
  • // 加锁
  • fullyLock();
  • try {
  • // 如果current为null,表示没有下一个节点了,抛出异常
  • if (current == null)
  • throw new NoSuchElementException();
  • // 取得currentElement
  • E x = currentElement;
  • // 将current赋值给lastRet
  • lastRet = current;
  • // 更新current和currentElement
  • current = nextNode(current);
  • currentElement = (current == null) ? null : current.item;
  • // 返回旧的currentElement
  • return x;
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • // 移除元素
  • public void remove() {
  • // 如果上一次返回元素的节点为null,则直接抛出错误
  • if (lastRet == null)
  • throw new IllegalStateException();
  • // 加锁
  • fullyLock();
  • try {
  • // 获取上一次返回元素的节点
  • Node<E> node = lastRet;
  • // 将lastRet置为null
  • lastRet = null;
  • // 从head开始遍历链表
  • for (Node<E> trail = head, p = trail.next;
  • p != null;
  • trail = p, p = p.next) {
  • // 如果p就是node,将p从自己的前驱trail上断开连接,并跳出循环
  • if (p == node) {
  • unlink(p, trail);
  • break;
  • }
  • }
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • }
  • /**
  • * Save the state to a stream (that is, serialize it).
  • *
  • * 序列化写
  • *
  • * @serialData The capacity is emitted (int), followed by all of
  • * its elements (each an {@code Object}) in the proper order,
  • * followed by a null
  • * @param s the stream
  • */
  • private void writeObject(java.io.ObjectOutputStream s)
  • throws java.io.IOException {
  • // 加锁
  • fullyLock();
  • try {
  • // Write out any hidden stuff, plus capacity
  • // 调用默认序列化写
  • s.defaultWriteObject();
  • // Write out all elements in the proper order.
  • // 从第一个有效节点开始遍历,写出每个节点的item
  • for (Node<E> p = head.next; p != null; p = p.next)
  • s.writeObject(p.item);
  • // Use trailing null as sentinel
  • // 写出一个null
  • s.writeObject(null);
  • } finally {
  • // 解锁
  • fullyUnlock();
  • }
  • }
  • /**
  • * Reconstitute this queue instance from a stream (that is,
  • * deserialize it).
  • *
  • * 序列化读
  • *
  • * @param s the stream
  • */
  • private void readObject(java.io.ObjectInputStream s)
  • throws java.io.IOException, ClassNotFoundException {
  • // Read in capacity, and any hidden stuff
  • // 调用默认序列化读
  • s.defaultReadObject();
  • // 将count置为0
  • count.set(0);
  • // head和last都置为item为null的Node
  • last = head = new Node<E>(null);
  • // Read in all elements and place in queue
  • // 循环调用readObject(),将读入的对象通过add()方法添加
  • for (;;) {
  • @SuppressWarnings("unchecked")
  • E item = (E)s.readObject();
  • if (item == null)
  • break;
  • // add()方法将读到的数据添加到LinkedBlockingQueue中
  • add(item);
  • }
  • }
  • }

从对LinkedBlockingQueue源码的分析可知,其实LinkedBlockingQueue内部维护了一个单向链表,数据在链表尾添加,在链表头取出;在每次对链表中的元素进行操作时都会加锁以保障数据在线程并发情况下的安全性。

具体的实现原理上面源码中的注释已经讲解得非常详细了,大家如有疑问可以仔细阅读,这里不再赘述了。

3. LinkedBlockingQueue示例

下面提供一个简单的LinkedBlockingQueue的示例:

  • package com.coderap.collection;
  • import java.util.Iterator;
  • import java.util.LinkedList;
  • import java.util.Queue;
  • import java.util.concurrent.LinkedBlockingQueue;
  • public class LinkedBlockingQueueTest {
  • // 循环次数
  • private final static int loopCount = 4;
  • // 操作的linkedList和LinkedBlockingDeque
  • private final static LinkedList<String> linkedList = new LinkedList<>();
  • private final static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(8);
  • public static void main(String[] args) {
  • new Thread(new LinkedBlockingQueueTest.OperateThread(linkedBlockingQueue, loopCount), "T-1").start();
  • new Thread(new LinkedBlockingQueueTest.OperateThread(linkedBlockingQueue, loopCount), "T-2").start();
  • //new Thread(new LinkedBlockingQueueTest.OperateThread(linkedList, loopCount), "T-3").start();
  • //new Thread(new LinkedBlockingQueueTest.OperateThread(linkedList, loopCount), "T-4").start();
  • }
  • // 遍历方法
  • private static void iterate(Queue queue) {
  • StringBuilder stringBuilder = new StringBuilder();
  • Iterator iterator = queue.iterator();
  • while (iterator.hasNext()) {
  • stringBuilder.append(iterator.next());
  • stringBuilder.append(", ");
  • }
  • // 删除最后多余的字符
  • stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length() - 1);
  • // 打印
  • System.out.println(Thread.currentThread().getName() + " iterate: " + stringBuilder.toString());
  • }
  • private static class OperateThread implements Runnable {
  • private Queue queue;
  • private int loopCount;
  • public OperateThread(Queue queue, int loopCount) {
  • this.queue = queue;
  • this.loopCount = loopCount;
  • }
  • @Override
  • public void run() {
  • // 循环添加并遍历打印
  • while (loopCount > 0) {
  • queue.add(Thread.currentThread().getName() + " - " + loopCount);
  • iterate(queue);
  • loopCount--;
  • }
  • }
  • }
  • }

某一次运行结果如下:

  • T-2 iterate: T-1 - 4, T-2 - 4
  • T-1 iterate: T-1 - 4, T-2 - 4
  • T-2 iterate: T-1 - 4, T-2 - 4, T-2 - 3
  • T-1 iterate: T-1 - 4, T-2 - 4, T-2 - 3, T-1 - 3
  • T-2 iterate: T-1 - 4, T-2 - 4, T-2 - 3, T-1 - 3, T-2 - 2
  • T-1 iterate: T-1 - 4, T-2 - 4, T-2 - 3, T-1 - 3, T-2 - 2, T-1 - 2
  • T-2 iterate: T-1 - 4, T-2 - 4, T-2 - 3, T-1 - 3, T-2 - 2, T-1 - 2, T-2 - 1, T-1 - 1
  • T-1 iterate: T-1 - 4, T-2 - 4, T-2 - 3, T-1 - 3, T-2 - 2, T-1 - 2, T-2 - 1, T-1 - 1