Java多线程
Java并发
JUC集合

Java多线程 38 - LinkedBlockingDeque详解

简介:LinkedBlockingDeque是双向链表实现的双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(添加或删除);并且该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。

1. LinkedBlockingDeque介绍

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(添加或删除);并且该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小为Integer.MAX_VALUE

LinkedBlockingDeque的类图结构如下:

1.LinkedBlockingDeque类图结构.png

  1. LinkedBlockingDeque继承于AbstractQueue,它本质上是一个支持FIFO和FILO的双向的队列。
  2. LinkedBlockingDeque实现了BlockingDeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
  3. LinkedBlockingDeque是通过双向链表实现的。
    • first是双向链表的表头。
    • last是双向链表的表尾。
    • count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数。
    • capacity是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的。
    • lock是控制对LinkedBlockingDeque访问的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取CPU执行权。
    • notEmptynotFull分别是非空条件和未满条件。通过它们能够更加细腻进行并发控制。
      • 若线程A要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它线程B向队列中插入了数据之后,会调用notEmpty.signal()唤醒notEmpty上的等待线程。此时线程A会被唤醒从而得以继续运行。此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock
      • 若线程H要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它线程I取出数据之后,会调用notFull.signal()唤醒notFull上的等待线程。此时,线程H就会被唤醒从而得以继续运行。此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock

关于ReentrantLock以及Condition等更多的内容,可以参考:

  1. Java多线程 16——ReentrantLock互斥锁
  2. Java多线程 18——Condition
  3. Java多线程 20——AbstractQueuedSynchronizer详解(一)
  4. Java多线程 21——AbstractQueuedSynchronizer详解(二)
  5. Java多线程 22——AbstractQueuedSynchronizer详解(三)

LinkedBlockingQueue函数列表如下:

  • // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingDeque
  • LinkedBlockingDeque()
  • // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingDeque,最初包含给定collection的元素,以该collection迭代器的遍历顺序添加
  • LinkedBlockingDeque(Collection<? extends E> c)
  • // 创建一个具有给定(固定)容量的 LinkedBlockingDeque
  • LinkedBlockingDeque(int capacity)
  • // 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾
  • boolean add(E e)
  • // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出IllegalStateException
  • void addFirst(E e)
  • // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出IllegalStateException
  • void addLast(E e)
  • // 以原子方式(atomically)从此双端队列移除所有元素
  • void clear()
  • // 如果此双端队列包含指定的元素,则返回true
  • boolean contains(Object o)
  • // 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器
  • Iterator<E> descendingIterator()
  • // 移除此队列中所有可用的元素,并将它们添加到给定collection中
  • int drainTo(Collection<? super E> c)
  • // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中
  • int drainTo(Collection<? super E> c, int maxElements)
  • // 获取但不移除此双端队列表示的队列的头部
  • E element()
  • // 获取,但不移除此双端队列的第一个元素
  • E getFirst()
  • // 获取,但不移除此双端队列的最后一个元素
  • E getLast()
  • // 返回在此双端队列元素上以恰当顺序进行迭代的迭代器
  • Iterator<E> iterator()
  • // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回true;如果当前没有空间可用,则返回false
  • boolean offer(E e)
  • // 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间
  • boolean offer(E e, long timeout, TimeUnit unit)
  • // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回true;如果当前没有空间可用,则返回false
  • boolean offerFirst(E e)
  • // 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间
  • boolean offerFirst(E e, long timeout, TimeUnit unit)
  • // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回true;如果当前没有空间可用,则返回false
  • boolean offerLast(E e)
  • // 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间
  • boolean offerLast(E e, long timeout, TimeUnit unit)
  • // 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回null
  • E peek()
  • // 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回null
  • E peekFirst()
  • // 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回null
  • E peekLast()
  • // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回null
  • E poll()
  • // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素
  • E poll(long timeout, TimeUnit unit)
  • // 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回null
  • E pollFirst()
  • // 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素
  • E pollFirst(long timeout, TimeUnit unit)
  • // 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回null
  • E pollLast()
  • // 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素
  • E pollLast(long timeout, TimeUnit unit)
  • // 从此双端队列所表示的堆栈中弹出一个元素
  • E pop()
  • // 将元素推入此双端队列表示的栈
  • void push(E e)
  • // 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间
  • void put(E e)
  • // 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间
  • void putFirst(E e)
  • // 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间
  • void putLast(E e)
  • // 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数
  • int remainingCapacity()
  • // 获取并移除此双端队列表示的队列的头部
  • E remove()
  • // 从此双端队列移除第一次出现的指定元素
  • boolean remove(Object o)
  • // 获取并移除此双端队列第一个元素
  • E removeFirst()
  • // 从此双端队列移除第一次出现的指定元素
  • boolean removeFirstOccurrence(Object o)
  • // 获取并移除此双端队列的最后一个元素
  • E removeLast()
  • // 从此双端队列移除最后一次出现的指定元素
  • boolean removeLastOccurrence(Object o)
  • // 返回此双端队列中的元素数
  • int size()
  • // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素
  • E take()
  • // 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素
  • E takeFirst()
  • // 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素
  • E takeLast()
  • // 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组
  • Object[] toArray()
  • // 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型
  • <T> T[] toArray(T[] a)
  • // 返回此collection的字符串表示形式
  • String toString()

2. LinkedBlockingDeque源码解析

下面是LinkedBlockingDeque的源码解析,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.AbstractQueue;
  • import java.util.Collection;
  • import java.util.Iterator;
  • import java.util.NoSuchElementException;
  • import java.util.concurrent.locks.Condition;
  • import java.util.concurrent.locks.ReentrantLock;
  • public class LinkedBlockingDeque<E>
  • extends AbstractQueue<E>
  • implements BlockingDeque<E>, java.io.Serializable {
  • private static final long serialVersionUID = -387911632671998426L;
  • /**
  • * Doubly-linked list node class
  • * 双向链表的节点
  • * */
  • static final class Node<E> {
  • /**
  • * The item, or null if this node has been removed.
  • * 保存数据的属性,当节点被移除时该值为null
  • */
  • E item;
  • /**
  • * One of:
  • * - the real predecessor Node
  • * - this Node, meaning the predecessor is tail
  • * - null, meaning there is no predecessor
  • *
  • * 三种情况之一:
  • * 1. 前驱节点
  • * 2. 值为自己时,表示该节点的前驱是尾节点
  • * 3. 值为null时,表示该节点没有前驱
  • */
  • Node<E> prev;
  • /**
  • * One of:
  • * - the real successor Node
  • * - this Node, meaning the successor is head
  • * - null, meaning there is no successor
  • *
  • * 三种情况之一:
  • * 1. 后继节点
  • * 2. 值为自己时,表示该节点的后继是头节点
  • * 3. 值为null时,表示该节点没有后继
  • */
  • Node<E> next;
  • // 构造方法,初始化item
  • Node(E x) {
  • item = x;
  • }
  • }
  • /**
  • * Pointer to first node.
  • * 指向第一个节点
  • * Invariant: (first == null && last == null) ||
  • * (first.prev == null && first.item != null)
  • */
  • transient Node<E> first;
  • /**
  • * Pointer to last node.
  • * 指向最后一个节点
  • * Invariant: (first == null && last == null) ||
  • * (last.next == null && last.item != null)
  • */
  • transient Node<E> last;
  • /**
  • * Number of items in the deque
  • * 链表已有的元素数量
  • * */
  • private transient int count;
  • /**
  • * Maximum number of items in the deque
  • * 链表的最大容量
  • * */
  • private final int capacity;
  • /**
  • * Main lock guarding all access
  • *
  • * 链表访问控制锁
  • * */
  • final ReentrantLock lock = new ReentrantLock();
  • /**
  • * Condition for waiting takes
  • * 获取操作等待条件
  • * */
  • private final Condition notEmpty = lock.newCondition();
  • /**
  • * Condition for waiting puts
  • * 添加操作等待条件
  • * */
  • private final Condition notFull = lock.newCondition();
  • /**
  • * Creates a {@code LinkedBlockingDeque} with a capacity of
  • * {@link Integer#MAX_VALUE}.
  • * 创建LinkedBlockingDeque,默认容量为Integer.MAX_VALUE
  • */
  • public LinkedBlockingDeque() {
  • this(Integer.MAX_VALUE);
  • }
  • /**
  • * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
  • *
  • * 根据给定容量创建一个LinkedBlockingDeque
  • *
  • * @param capacity the capacity of this deque
  • * @throws IllegalArgumentException if {@code capacity} is less than 1
  • */
  • public LinkedBlockingDeque(int capacity) {
  • // 容量不可小于等于0
  • if (capacity <= 0) throw new IllegalArgumentException();
  • this.capacity = capacity;
  • }
  • /**
  • * Creates a {@code LinkedBlockingDeque} with a capacity of
  • * {@link Integer#MAX_VALUE}, initially containing the elements of
  • * the given collection, added in traversal order of the
  • * collection's iterator.
  • *
  • * 创建一个LinkedBlockingDeque,将传入的Collection中的元素添加到该LinkedBlockingDeque中
  • *
  • * @param c the collection of elements to initially contain
  • * @throws NullPointerException if the specified collection or any
  • * of its elements are null
  • */
  • public LinkedBlockingDeque(Collection<? extends E> c) {
  • // 先创建一个默认大小的LinkedBlockingDeque
  • this(Integer.MAX_VALUE);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock(); // Never contended, but necessary for visibility
  • try {
  • // 遍历传入的Collection
  • for (E e : c) {
  • // 元素不能为null
  • if (e == null)
  • throw new NullPointerException();
  • // 将元素添加到LinkedBlockingDeque中
  • if (!linkLast(new Node<E>(e)))
  • throw new IllegalStateException("Deque full");
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // Basic linking and unlinking operations, called only while holding lock
  • /**
  • * Links node as first element, or returns false if full.
  • * 将传入的Node作为第一个元素添加到链表中,如果链表已满则返回false
  • * 1. head <-> node1 <-> ... <-> last
  • * 2. NewNode -> head <-> node1 <-> ... <-> last
  • * 3. NewNode(head) <-> head(old) <-> node1 <-> ... <-> last
  • */
  • private boolean linkFirst(Node<E> node) {
  • // assert lock.isHeldByCurrentThread();
  • // 判断容量是否已满,如果已满就返回false
  • if (count >= capacity)
  • return false;
  • // 获取first节点
  • Node<E> f = first;
  • // 将传入Node的后继设为first节点
  • node.next = f;
  • first = node;
  • // 如果last为null,说明之前只有first一个节点(first也可能为null,即链表为空),则也将last指向node
  • if (last == null)
  • last = node;
  • else
  • // 否则表示last也已指向了某个节点,将f的prev指向node即可
  • f.prev = node;
  • // 维护数量
  • ++count;
  • // 唤醒等待在notEmpty上的线程
  • notEmpty.signal();
  • return true;
  • }
  • /**
  • * Links node as last element, or returns false if full.
  • * 将传入的Node作为最后一个元素添加到链表中,如果链表已满则返回false
  • * 1. head <-> node1 <-> ... <-> last
  • * 2. head <-> node1 <-> ... <-> last <- NewNode
  • * 3. head <-> node1 <-> ... <-> last(old) <-> NewNode(last)
  • */
  • private boolean linkLast(Node<E> node) {
  • // assert lock.isHeldByCurrentThread();
  • // 判断容量是否已满,如果已满就返回false
  • if (count >= capacity)
  • return false;
  • // 获取last节点
  • Node<E> l = last;
  • // 将传入Node的前驱设为last节点
  • node.prev = l;
  • // 将last指向node
  • last = node;
  • // 如果first为null,说明之前只有last一个节点(last也可能为null,即链表为空),则也将first指向node
  • if (first == null)
  • first = node;
  • else
  • // 否则表示first也已指向了某个节点,将l的next指向node即可
  • l.next = node;
  • // 维护数量
  • ++count;
  • // 唤醒等待在notEmpty上的线程
  • notEmpty.signal();
  • return true;
  • }
  • /**
  • * Removes and returns first element, or null if empty.
  • * 将第一个元素(first)移除并返回,如果first为空则返回null
  • */
  • private E unlinkFirst() {
  • // assert lock.isHeldByCurrentThread();
  • // 引用first元素
  • Node<E> f = first;
  • // 如果first元素为null,直接返回null
  • if (f == null)
  • return null;
  • // 获取frist的next
  • Node<E> n = f.next;
  • // 获取first的item
  • E item = f.item;
  • // 将first的item置为null
  • f.item = null;
  • // 将f的next指向自己,协助GC
  • f.next = f; // help GC
  • // 将f之前的后继赋值给first,即first指向了新的节点
  • first = n;
  • // 如果n为null,表示此时链表为空,则将last也置为null
  • if (n == null)
  • last = null;
  • else
  • // 否则将n的前驱置为null,此时n就为first节点
  • n.prev = null;
  • // 维护数量
  • --count;
  • // 唤醒等待在notFull上的线程
  • notFull.signal();
  • return item;
  • }
  • /**
  • * Removes and returns last element, or null if empty.
  • * 将最后个元素(last)移除并返回,如果last为空则返回null
  • */
  • private E unlinkLast() {
  • // assert lock.isHeldByCurrentThread();
  • // 引用last元素
  • Node<E> l = last;
  • // 如果last元素为null,直接返回null
  • if (l == null)
  • return null;
  • // 获取last的prev
  • Node<E> p = l.prev;
  • // 获取last的item
  • E item = l.item;
  • // 将flast的item置为null
  • l.item = null;
  • // 将l的prev指向自己,协助GC
  • l.prev = l; // help GC
  • // 将l之前的前驱赋值给last,即last指向了新的节点
  • last = p;
  • // 如果p为null,表示此时链表为空,则将first也置为null
  • if (p == null)
  • first = null;
  • else
  • // 否则将p的后继置为null,此时p就为last节点
  • p.next = null;
  • // 维护数量
  • --count;
  • // 唤醒等待在notFull上的线程
  • notFull.signal();
  • return item;
  • }
  • /**
  • * Unlinks x.
  • *
  • * 将x节点从链表中断开
  • */
  • void unlink(Node<E> x) {
  • // assert lock.isHeldByCurrentThread();
  • // 获取x的前驱和后继
  • Node<E> p = x.prev;
  • Node<E> n = x.next;
  • /**
  • * 如果x的前驱为null,表示x是first节点,直接调用unlinkFirst()
  • * 如果x的后继为null,表示x是last节点,直接调用unlinkLast()
  • */
  • if (p == null) {
  • unlinkFirst();
  • } else if (n == null) {
  • unlinkLast();
  • } else {
  • // p的后继置为n,n的前驱置为p,即将p和n之间的x跳过了
  • p.next = n;
  • n.prev = p;
  • // 将x的item置为null,防止由于x还在迭代器中被引用,导致x.item无法正常GC
  • x.item = null;
  • // Don't mess with x's links. They may still be in use by
  • // an iterator.
  • // 维护数量
  • --count;
  • // 唤醒等待在notFull上的线程
  • notFull.signal();
  • }
  • }
  • // BlockingDeque methods
  • /**
  • * 添加元素到链表头,内部调用offerFirst(),会立即返回结果,
  • * 添加失败将抛出异常
  • *
  • * @throws IllegalStateException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public void addFirst(E e) {
  • if (!offerFirst(e))
  • throw new IllegalStateException("Deque full");
  • }
  • /**
  • * 添加元素到链表尾,内部调用offerLast(),会立即返回结果,
  • * 添加失败将抛出异常
  • *
  • * @throws IllegalStateException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public void addLast(E e) {
  • if (!offerLast(e))
  • throw new IllegalStateException("Deque full");
  • }
  • /**
  • * 从链表头取出元素,会立即返回结果,
  • * 添加成功返回true,添加失败返回false
  • *
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public boolean offerFirst(E e) {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 使用linkFirst()将node添加到链表头
  • return linkFirst(node);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 从链表尾取出元素,会立即返回结果,
  • * 添加成功返回true,添加失败返回false
  • *
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public boolean offerLast(E e) {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 使用linkLast()将node添加到链表尾
  • return linkLast(node);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 添加元素到链表头,如果添加失败会阻塞,一直到添加成功
  • * 阻塞等待可中断
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public void putFirst(E e) throws InterruptedException {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // while循环,使用linkFirst()将node添加到链表尾
  • while (!linkFirst(node))
  • // 如果添加失败会进入阻塞等待
  • notFull.await();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 添加元素到链表尾,如果添加失败会阻塞,一直到添加成功
  • * 阻塞等待可中断
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public void putLast(E e) throws InterruptedException {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // while循环,使用linkLast()将node添加到链表尾
  • while (!linkLast(node))
  • // 如果添加失败会进入阻塞等待
  • notFull.await();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 添加元素到链表头,带有超时机制,如果添加失败会阻塞,一直到添加成功
  • * 阻塞等待可中断
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public boolean offerFirst(E e, long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // while循环,使用linkFirst()将node添加到链表头
  • while (!linkFirst(node)) {
  • // 添加失败就计算超时时间,如果超时直接返回false,结束添加
  • if (nanos <= 0)
  • return false;
  • // 未超时就进入等待
  • nanos = notFull.awaitNanos(nanos);
  • }
  • return true;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 添加元素到链表尾,带有超时机制,如果添加失败会阻塞,一直到添加成功
  • * 阻塞等待可中断
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public boolean offerLast(E e, long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 检查参数
  • if (e == null) throw new NullPointerException();
  • // 使用e构造一个Node节点
  • Node<E> node = new Node<E>(e);
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // while循环,使用linkLast()将node添加到链表尾
  • while (!linkLast(node)) {
  • // 添加失败就计算超时时间,如果超时直接返回false,结束添加
  • if (nanos <= 0)
  • return false;
  • // 未超时就进入等待
  • nanos = notFull.awaitNanos(nanos);
  • }
  • return true;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 移除链表头元素,内部调用pollFirst()
  • *
  • * @throws NoSuchElementException {@inheritDoc}
  • */
  • public E removeFirst() {
  • E x = pollFirst();
  • if (x == null) throw new NoSuchElementException();
  • return x;
  • }
  • /**
  • * 移除链表头元素,内部调用pollLast()
  • *
  • * @throws NoSuchElementException {@inheritDoc}
  • */
  • public E removeLast() {
  • E x = pollLast();
  • if (x == null) throw new NoSuchElementException();
  • return x;
  • }
  • /**
  • * 获取链表头元素,会立即返回结果
  • * 如果没获取到,将直接返回null
  • * 内部调用unlinkFirst()
  • *
  • * @return
  • */
  • public E pollFirst() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • return unlinkFirst();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 获取链表尾元素,会立即返回结果
  • * 如果没获取到,将直接返回null
  • * 内部调用pollLast()
  • *
  • * @return
  • */
  • public E pollLast() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • return unlinkLast();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 获取链表头元素,如果没获取到,将进入阻塞等待状态
  • * 内部调用unlinkFirst()
  • *
  • * @return
  • */
  • public E takeFirst() throws InterruptedException {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 使用unlinkFirst()获取元素,如果为null则进入等待状态
  • E x;
  • while ( (x = unlinkFirst()) == null)
  • notEmpty.await();
  • // 获取到元素,直接返回
  • return x;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 获取链表尾元素,如果没获取到,将进入阻塞等待状态
  • * 内部调用unlinkLast()
  • *
  • * @return
  • */
  • public E takeLast() throws InterruptedException {
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 使用unlinkLast()获取元素,如果为null则进入等待状态
  • E x;
  • while ( (x = unlinkLast()) == null)
  • notEmpty.await();
  • return x;
  • } finally {
  • lock.unlock();
  • }
  • }
  • /**
  • * 获取链表头元素,带有超时机制,如果没获取到,将进入阻塞等待状态
  • * 内部调用unlinkFirst()
  • * 等待可中断
  • *
  • * @return
  • */
  • public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // while循环,使用unlinkFirst()获取链表头元素
  • E x;
  • while ( (x = unlinkFirst()) == null) {
  • // 获取失败就计算超时时间,如果超时直接返回null,结束添加
  • if (nanos <= 0)
  • return null;
  • // 未超时就进入等待状态
  • nanos = notEmpty.awaitNanos(nanos);
  • }
  • // 获取成功,直接返回
  • return x;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 获取链表尾元素,带有超时机制,如果没获取到,将进入阻塞等待状态
  • * 内部调用unlinkLast()
  • * 等待可中断
  • *
  • * @return
  • */
  • public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // while循环,使用unlinkLast()获取链表尾元素
  • E x;
  • while ( (x = unlinkLast()) == null) {
  • // 获取失败就计算超时时间,如果超时直接返回null,结束添加
  • if (nanos <= 0)
  • return null;
  • // 未超时就进入等待状态
  • nanos = notEmpty.awaitNanos(nanos);
  • }
  • // 获取成功,直接返回
  • return x;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 查看链表头元素,会立即返回,如果头元素为null会抛出异常
  • * 内部调用peekFirst()
  • *
  • * @throws NoSuchElementException {@inheritDoc}
  • */
  • public E getFirst() {
  • E x = peekFirst();
  • if (x == null) throw new NoSuchElementException();
  • return x;
  • }
  • /**
  • * 查看链表尾元素,会立即返回,如果尾元素为null会抛出异常
  • * 内部调用peekLast()
  • *
  • * @throws NoSuchElementException {@inheritDoc}
  • */
  • public E getLast() {
  • E x = peekLast();
  • if (x == null) throw new NoSuchElementException();
  • return x;
  • }
  • /**
  • * 查看链表头元素,会立即返回,如果头元素为null将直接返回null
  • * @return
  • */
  • public E peekFirst() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 取first
  • return (first == null) ? null : first.item;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 查看链表尾元素,会立即返回,如果尾元素为null将直接返回null
  • * @return
  • */
  • public E peekLast() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 取last
  • return (last == null) ? null : last.item;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 移除链表中出现的第一个指定元素
  • * @param o
  • * @return
  • */
  • public boolean removeFirstOccurrence(Object o) {
  • // 检查参数
  • if (o == null) return false;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 从前向后遍历链表
  • for (Node<E> p = first; p != null; p = p.next) {
  • // 如果遍历到的节点的元素与o相同
  • if (o.equals(p.item)) {
  • // 调用unlink()将该节点从链表中断开
  • unlink(p);
  • // 返回true
  • return true;
  • }
  • }
  • // 如果没有移除任何元素,返回false
  • return false;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 移除链表中出现的最后个指定元素
  • * @param o
  • * @return
  • */
  • public boolean removeLastOccurrence(Object o) {
  • // 检查参数
  • if (o == null) return false;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 从后向前遍历链表
  • for (Node<E> p = last; p != null; p = p.prev) {
  • // 如果遍历到的节点的元素与o相同
  • if (o.equals(p.item)) {
  • // 调用unlink()将该节点从链表中断开
  • unlink(p);
  • // 返回true
  • return true;
  • }
  • }
  • // 如果没有移除任何元素,返回false
  • return false;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // BlockingQueue methods
  • /**
  • * Inserts the specified element at the end of this deque unless it would
  • * violate capacity restrictions. When using a capacity-restricted deque,
  • * it is generally preferable to use method {@link #offer(Object) offer}.
  • *
  • * <p>This method is equivalent to {@link #addLast}.
  • *
  • * 添加操作,调用了addLast(),即默认添加到链表尾,链表已满时会抛出异常
  • *
  • * @throws IllegalStateException if the element cannot be added at this
  • * time due to capacity restrictions
  • * @throws NullPointerException if the specified element is null
  • */
  • public boolean add(E e) {
  • addLast(e);
  • return true;
  • }
  • /**
  • * 添加操作,调用了offerLast(),即默认添加到链表尾,链表已满时立即返回false
  • *
  • * @throws NullPointerException if the specified element is null
  • */
  • public boolean offer(E e) {
  • return offerLast(e);
  • }
  • /**
  • * 添加操作,调用了putLast(),即默认添加到链表尾,链表已满时会阻塞
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public void put(E e) throws InterruptedException {
  • putLast(e);
  • }
  • /**
  • * 带有超时机制的添加操作,调用了offerLast(),即默认添加到链表尾,链表已满时进入带有超时机制的等待
  • * 超时后依旧无法添加将返回false
  • *
  • * @throws NullPointerException {@inheritDoc}
  • * @throws InterruptedException {@inheritDoc}
  • */
  • public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  • return offerLast(e, timeout, unit);
  • }
  • /**
  • * Retrieves and removes the head of the queue represented by this deque.
  • * This method differs from {@link #poll poll} only in that it throws an
  • * exception if this deque is empty.
  • *
  • * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
  • *
  • * 获取操作,默认移除头元素,链表为空时会抛出异常
  • * 内部调用removeFirst()
  • *
  • * @return the head of the queue represented by this deque
  • * @throws NoSuchElementException if this deque is empty
  • */
  • public E remove() {
  • return removeFirst();
  • }
  • /**
  • * 获取操作,默认移除头元素,链表为空时会立即返回null
  • * 内部调用pollFirst()
  • */
  • public E poll() {
  • return pollFirst();
  • }
  • /**
  • * 获取操作,默认移除头元素,链表为空时会阻塞
  • * 内部调用takeFirst()
  • */
  • public E take() throws InterruptedException {
  • return takeFirst();
  • }
  • /**
  • * 带有超时机制的获取操作,调用了pollFirst(),默认移除头元素,链表为空时进入带有超时机制的等待
  • * 超时后依旧无法获取将返回null
  • *
  • * @param timeout
  • * @param unit
  • * @return
  • * @throws InterruptedException
  • */
  • public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  • return pollFirst(timeout, unit);
  • }
  • /**
  • * Retrieves, but does not remove, the head of the queue represented by
  • * this deque. This method differs from {@link #peek peek} only in that
  • * it throws an exception if this deque is empty.
  • *
  • * <p>This method is equivalent to {@link #getFirst() getFirst}.
  • *
  • * 查看元素,默认查看链表头元素,会立即返回,链表为空时会抛出异常
  • * 内部调用getFirst()
  • *
  • * @return the head of the queue represented by this deque
  • * @throws NoSuchElementException if this deque is empty
  • */
  • public E element() {
  • return getFirst();
  • }
  • /**
  • * 查看元素,默认查看链表头元素,会立即返回,链表为空时返回null
  • * 内部调用peekFirst()
  • */
  • public E peek() {
  • return peekFirst();
  • }
  • /**
  • * Returns the number of additional elements that this deque can ideally
  • * (in the absence of memory or resource constraints) accept without
  • * blocking. This is always equal to the initial capacity of this deque
  • * less the current {@code size} of this deque.
  • *
  • * <p>Note that you <em>cannot</em> always tell if an attempt to insert
  • * an element will succeed by inspecting {@code remainingCapacity}
  • * because it may be the case that another thread is about to
  • * insert or remove an element.
  • *
  • * 查看剩余容量
  • */
  • public int remainingCapacity() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 计算剩余容量并返回
  • return capacity - count;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 将所有的元素从ArrayBlockingDeque中移除,并添加到给定的Collection中
  • *
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c) {
  • // 调用重载的方法,传入maxElements参数为Integer.MAX_VALUE
  • return drainTo(c, Integer.MAX_VALUE);
  • }
  • /**
  • * 将所有的元素从ArrayBlockingDeque中移除,并添加到给定的Collection中
  • *
  • * @throws UnsupportedOperationException {@inheritDoc}
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • * @throws IllegalArgumentException {@inheritDoc}
  • */
  • public int drainTo(Collection<? super E> c, int maxElements) {
  • // 检查参数
  • if (c == null)
  • throw new NullPointerException();
  • if (c == this)
  • throw new IllegalArgumentException();
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 取maxElements和已有元素数量的最小值
  • int n = Math.min(maxElements, count);
  • // 从头开始遍历
  • for (int i = 0; i < n; i++) {
  • // 将first的item加入到c中
  • c.add(first.item); // In this order, in case add() throws.
  • // 然后将first从链表中断开连接,下次循环取到的first就是新的节点了
  • unlinkFirst();
  • }
  • // 返回操作的元素个数
  • return n;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // Stack methods
  • /**
  • * 压栈操作,默认压到链表头,链表已满时会抛出异常
  • * 内部调用addFirst()
  • * @throws IllegalStateException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public void push(E e) {
  • addFirst(e);
  • }
  • /**
  • * 出栈操作,默认获取链表头元素,链表为空时会抛出异常
  • * 内部调用removeFirst()
  • *
  • * @throws NoSuchElementException {@inheritDoc}
  • */
  • public E pop() {
  • return removeFirst();
  • }
  • // Collection methods
  • /**
  • * Removes the first occurrence of the specified element from this deque.
  • * If the deque does not contain the element, it is unchanged.
  • * More formally, removes the first element {@code e} such that
  • * {@code o.equals(e)} (if such an element exists).
  • * Returns {@code true} if this deque contained the specified element
  • * (or equivalently, if this deque changed as a result of the call).
  • *
  • * <p>This method is equivalent to
  • * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
  • *
  • * 移除指定元素,默认移除第一个出现的,内部调用removeFirstOccurrence()
  • *
  • * @param o element to be removed from this deque, if present
  • * @return {@code true} if this deque changed as a result of the call
  • */
  • public boolean remove(Object o) {
  • return removeFirstOccurrence(o);
  • }
  • /**
  • * Returns the number of elements in this deque.
  • *
  • * 返回已有元素数量
  • *
  • * @return the number of elements in this deque
  • */
  • public int size() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 返回count
  • return count;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns {@code true} if this deque contains the specified element.
  • * More formally, returns {@code true} if and only if this deque contains
  • * at least one element {@code e} such that {@code o.equals(e)}.
  • *
  • * 判断是否包含某个指定元素
  • *
  • * @param o object to be checked for containment in this deque
  • * @return {@code true} if this deque contains the specified element
  • */
  • public boolean contains(Object o) {
  • // 检查参数
  • if (o == null) return false;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 从前往后遍历
  • for (Node<E> p = first; p != null; p = p.next)
  • // 当遍历到的节点的元素与o相同,表示包含,直接返回true
  • if (o.equals(p.item))
  • return true;
  • // 走到这里,表示没有找到,返回false
  • return false;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /*
  • * TODO: Add support for more efficient bulk operations.
  • *
  • * We don't want to acquire the lock for every iteration, but we
  • * also want other threads a chance to interact with the
  • * collection, especially when count is close to capacity.
  • */
  • // /**
  • // * Adds all of the elements in the specified collection to this
  • // * queue. Attempts to addAll of a queue to itself result in
  • // * {@code IllegalArgumentException}. Further, the behavior of
  • // * this operation is undefined if the specified collection is
  • // * modified while the operation is in progress.
  • // *
  • // * @param c collection containing elements to be added to this queue
  • // * @return {@code true} if this queue changed as a result of the call
  • // * @throws ClassCastException {@inheritDoc}
  • // * @throws NullPointerException {@inheritDoc}
  • // * @throws IllegalArgumentException {@inheritDoc}
  • // * @throws IllegalStateException {@inheritDoc}
  • // * @see #add(Object)
  • // */
  • // public boolean addAll(Collection<? extends E> c) {
  • // if (c == null)
  • // throw new NullPointerException();
  • // if (c == this)
  • // throw new IllegalArgumentException();
  • // final ReentrantLock lock = this.lock;
  • // lock.lock();
  • // try {
  • // boolean modified = false;
  • // for (E e : c)
  • // if (linkLast(e))
  • // modified = true;
  • // return modified;
  • // } finally {
  • // lock.unlock();
  • // }
  • // }
  • /**
  • * Returns an array containing all of the elements in this deque, in
  • * proper sequence (from first to last element).
  • *
  • * <p>The returned array will be "safe" in that no references to it are
  • * maintained by this deque. (In other words, this method must allocate
  • * a new array). The caller is thus free to modify the returned array.
  • *
  • * <p>This method acts as bridge between array-based and collection-based
  • * APIs.
  • *
  • * 转换为Object数组
  • *
  • * @return an array containing all of the elements in this deque
  • */
  • @SuppressWarnings("unchecked")
  • public Object[] toArray() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 根据count创建对应大小的Object数组a
  • Object[] a = new Object[count];
  • int k = 0;
  • // 从前往后遍历链表,将元素添加到数组a中
  • for (Node<E> p = first; p != null; p = p.next)
  • a[k++] = p.item;
  • // 返回a
  • return a;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns an array containing all of the elements in this deque, in
  • * proper sequence; the runtime type of the returned array is that of
  • * the specified array. If the deque fits in the specified array, it
  • * is returned therein. Otherwise, a new array is allocated with the
  • * runtime type of the specified array and the size of this deque.
  • *
  • * <p>If this deque fits in the specified array with room to spare
  • * (i.e., the array has more elements than this deque), the element in
  • * the array immediately following the end of the deque is set to
  • * {@code null}.
  • *
  • * <p>Like the {@link #toArray()} method, this method acts as bridge between
  • * array-based and collection-based APIs. Further, this method allows
  • * precise control over the runtime type of the output array, and may,
  • * under certain circumstances, be used to save allocation costs.
  • *
  • * <p>Suppose {@code x} is a deque known to contain only strings.
  • * The following code can be used to dump the deque into a newly
  • * allocated array of {@code String}:
  • *
  • * <pre>
  • * String[] y = x.toArray(new String[0]);</pre>
  • *
  • * Note that {@code toArray(new Object[0])} is identical in function to
  • * {@code toArray()}.
  • *
  • * 转换为特定类型的数组
  • *
  • * @param a the array into which the elements of the deque are to
  • * be stored, if it is big enough; otherwise, a new array of the
  • * same runtime type is allocated for this purpose
  • * @return an array containing all of the elements in this deque
  • * @throws ArrayStoreException if the runtime type of the specified array
  • * is not a supertype of the runtime type of every element in
  • * this deque
  • * @throws NullPointerException if the specified array is null
  • */
  • @SuppressWarnings("unchecked")
  • public <T> T[] toArray(T[] a) {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 检查传入的a的容量是否小于元素个数
  • if (a.length < count)
  • // 如果小于,利用反射创建一个新的容量为count的数组a
  • a = (T[])java.lang.reflect.Array.newInstance
  • (a.getClass().getComponentType(), count);
  • // 从前往后遍历链表,将元素添加到数组a中
  • int k = 0;
  • for (Node<E> p = first; p != null; p = p.next)
  • a[k++] = (T)p.item;
  • // 如果a的容量大于元素个数,则将没用到的位置都置为null
  • if (a.length > k)
  • a[k] = null;
  • // 返回a
  • return a;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // toString方法重写
  • public String toString() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // first为空,直接返回"[]"
  • Node<E> p = first;
  • if (p == null)
  • return "[]";
  • // 拼接
  • StringBuilder sb = new StringBuilder();
  • sb.append('[');
  • for (;;) {
  • E e = p.item;
  • // 如果节点引用的是LinkedBlockingDeque自己,显示"(this Collection)"
  • sb.append(e == this ? "(this Collection)" : e);
  • p = p.next;
  • if (p == null)
  • return sb.append(']').toString();
  • sb.append(',').append(' ');
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Atomically removes all of the elements from this deque.
  • * The deque will be empty after this call returns.
  • */
  • public void clear() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 从前往后遍历,将每个元素的item、prev、next都置为null
  • for (Node<E> f = first; f != null; ) {
  • f.item = null;
  • Node<E> n = f.next;
  • f.prev = null;
  • f.next = null;
  • f = n;
  • }
  • // first、last置为null
  • first = last = null;
  • // 元素个数置为0
  • count = 0;
  • // 唤醒等待在notFull上的线程
  • notFull.signalAll();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns an iterator over the elements in this deque in proper sequence.
  • * The elements will be returned in order from first (head) to last (tail).
  • *
  • * <p>The returned iterator is a "weakly consistent" iterator that
  • * will never throw {@link java.util.ConcurrentModificationException
  • * ConcurrentModificationException}, and guarantees to traverse
  • * elements as they existed upon construction of the iterator, and
  • * may (but is not guaranteed to) reflect any modifications
  • * subsequent to construction.
  • *
  • * 获取迭代器
  • *
  • * @return an iterator over the elements in this deque in proper sequence
  • */
  • public Iterator<E> iterator() {
  • return new Itr();
  • }
  • /**
  • * Returns an iterator over the elements in this deque in reverse
  • * sequential order. The elements will be returned in order from
  • * last (tail) to first (head).
  • *
  • * <p>The returned iterator is a "weakly consistent" iterator that
  • * will never throw {@link java.util.ConcurrentModificationException
  • * ConcurrentModificationException}, and guarantees to traverse
  • * elements as they existed upon construction of the iterator, and
  • * may (but is not guaranteed to) reflect any modifications
  • * subsequent to construction.
  • *
  • * 获取逆序迭代器
  • *
  • * @return an iterator over the elements in this deque in reverse order
  • */
  • public Iterator<E> descendingIterator() {
  • return new DescendingItr();
  • }
  • /**
  • * Base class for Iterators for LinkedBlockingDeque
  • * 迭代器基类
  • */
  • private abstract class AbstractItr implements Iterator<E> {
  • /**
  • * The next node to return in next()
  • * 下一个节点
  • */
  • Node<E> next;
  • /**
  • * nextItem holds on to item fields because once we claim that
  • * an element exists in hasNext(), we must return item read
  • * under lock (in advance()) even if it was in the process of
  • * being removed when hasNext() was called.
  • * 下一个元素
  • */
  • E nextItem;
  • /**
  • * Node returned by most recent call to next. Needed by remove.
  • * Reset to null if this element is deleted by a call to remove.
  • *
  • * 上次返回的节点
  • */
  • private Node<E> lastRet;
  • abstract Node<E> firstNode();
  • abstract Node<E> nextNode(Node<E> n);
  • AbstractItr() {
  • // set to initial position
  • // 加锁
  • final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  • lock.lock();
  • try {
  • // next指向第一个节点
  • next = firstNode();
  • // nextItem为next的item
  • nextItem = (next == null) ? null : next.item;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Returns the successor node of the given non-null, but
  • * possibly previously deleted, node.
  • *
  • * 获取给定节点的后继节点
  • */
  • private Node<E> succ(Node<E> n) {
  • // Chains of deleted nodes ending in null or self-links
  • // are possible if multiple interior nodes are removed.
  • for (;;) {
  • // 获取n的下一个节点为s
  • Node<E> s = nextNode(n);
  • // 如果s为null,直接返回null
  • if (s == null)
  • return null;
  • // 如果s不为null,且s的item不为null,就返回s
  • else if (s.item != null)
  • return s;
  • // 如果s就是n,说明n被删除了,直接返回头节点
  • else if (s == n)
  • return firstNode();
  • else
  • // s不为null,s的item为null,s也不是n,向后移动
  • n = s;
  • }
  • }
  • /**
  • * Advances next.
  • * 移动到下一个元素
  • */
  • void advance() {
  • // 加锁
  • final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  • lock.lock();
  • try {
  • // 获取next的后继
  • // assert next != null;
  • next = succ(next);
  • // nextItem指向next的item
  • nextItem = (next == null) ? null : next.item;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 是否有下一个元素
  • public boolean hasNext() {
  • return next != null;
  • }
  • // 获取下一个元素
  • public E next() {
  • // next为null时直接抛出异常
  • if (next == null)
  • throw new NoSuchElementException();
  • // lastRet记录next
  • lastRet = next;
  • // x记录nextItem
  • E x = nextItem;
  • // 向前推进
  • advance();
  • // 返回x
  • return x;
  • }
  • // 移除元素
  • public void remove() {
  • // 获取上次返回的元素为n
  • Node<E> n = lastRet;
  • // 如果上次返回的元素为null,直接抛出异常
  • if (n == null)
  • throw new IllegalStateException();
  • // 将lastRet置为null
  • lastRet = null;
  • // 加锁
  • final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  • lock.lock();
  • try {
  • // 如果n的item不为null
  • if (n.item != null)
  • // 将其从链表中断开连接
  • unlink(n);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • }
  • /**
  • * Forward iterator
  • * 顺序迭代器
  • * */
  • private class Itr extends AbstractItr {
  • // 重写方法,第一个元素为first
  • Node<E> firstNode() { return first; }
  • // 重写方法,从前往后遍历
  • Node<E> nextNode(Node<E> n) { return n.next; }
  • }
  • /**
  • * Descending iterator
  • * 逆序迭代器
  • * */
  • private class DescendingItr extends AbstractItr {
  • // 重写方法,第一个元素为last
  • Node<E> firstNode() { return last; }
  • // 重写方法,从后往前遍历
  • Node<E> nextNode(Node<E> n) { return n.prev; }
  • }
  • /**
  • * Save the state of this deque to a stream (that is, serialize it).
  • *
  • * 序列化写
  • *
  • * @serialData The capacity (int), followed by elements (each an
  • * {@code Object}) in the proper order, followed by a null
  • * @param s the stream
  • */
  • private void writeObject(java.io.ObjectOutputStream s)
  • throws java.io.IOException {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // Write out capacity and any hidden stuff
  • // 调用默认序列化写
  • s.defaultWriteObject();
  • // Write out all elements in the proper order.
  • // 从前往后遍历链表,依次写出节点的item
  • for (Node<E> p = first; p != null; p = p.next)
  • s.writeObject(p.item);
  • // Use trailing null as sentinel
  • // 最后写出一个null
  • s.writeObject(null);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * Reconstitute this deque from a stream (that is,
  • * deserialize it).
  • *
  • * 序列化读
  • *
  • * @param s the stream
  • */
  • private void readObject(java.io.ObjectInputStream s)
  • throws java.io.IOException, ClassNotFoundException {
  • // 调用默认序列化读
  • s.defaultReadObject();
  • // count置为0
  • count = 0;
  • // first、last置为null
  • first = null;
  • last = null;
  • // Read in all elements and place in queue
  • // 无限循环,读入Object
  • for (;;) {
  • @SuppressWarnings("unchecked")
  • // 读到的元素不为null,使用add()方法将其添加到LinkedBlockingDeque中
  • E item = (E)s.readObject();
  • if (item == null)
  • break;
  • add(item);
  • }
  • }
  • }

从对LinkedBlockingDeque源码的分析可知,其实LinkedBlockingDeque内部维护了一个双向链表,数据可以在链表两边添加和取出;在每次对链表中的元素进行操作时都会加锁以保障数据在线程并发情况下的安全性。

具体的实现原理上面源码中的注释已经讲解得非常详细了,大家如有疑问可以仔细阅读,这里不再赘述了。

3. LinkedBlockingDeque示例

下面提供一个简单的LinkedBlockingDeque的示例:

  • package com.coderap.collection;
  • import java.util.Iterator;
  • import java.util.LinkedList;
  • import java.util.Queue;
  • import java.util.concurrent.LinkedBlockingQueue;
  • public class LinkedBlockingQueueTest {
  • // 循环次数
  • private final static int loopCount = 4;
  • // 操作的linkedList和LinkedBlockingDeque
  • private final static LinkedList<String> linkedList = new LinkedList<>();
  • private final static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(8);
  • public static void main(String[] args) {
  • new Thread(new LinkedBlockingQueueTest.OperateThread(linkedBlockingQueue, loopCount), "T-1").start();
  • new Thread(new LinkedBlockingQueueTest.OperateThread(linkedBlockingQueue, loopCount), "T-2").start();
  • //new Thread(new LinkedBlockingQueueTest.OperateThread(linkedList, loopCount), "T-3").start();
  • //new Thread(new LinkedBlockingQueueTest.OperateThread(linkedList, loopCount), "T-4").start();
  • }
  • // 遍历方法
  • private static void iterate(Queue queue) {
  • StringBuilder stringBuilder = new StringBuilder();
  • Iterator iterator = queue.iterator();
  • while (iterator.hasNext()) {
  • stringBuilder.append(iterator.next());
  • stringBuilder.append(", ");
  • }
  • // 删除最后多余的字符
  • stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length() - 1);
  • // 打印
  • System.out.println(Thread.currentThread().getName() + " iterate: " + stringBuilder.toString());
  • }
  • private static class OperateThread implements Runnable {
  • private Queue queue;
  • private int loopCount;
  • public OperateThread(Queue queue, int loopCount) {
  • this.queue = queue;
  • this.loopCount = loopCount;
  • }
  • @Override
  • public void run() {
  • // 循环添加并遍历打印
  • while (loopCount > 0) {
  • queue.add(Thread.currentThread().getName() + " - " + loopCount);
  • iterate(queue);
  • loopCount--;
  • }
  • }
  • }
  • }

某一次运行结果如下:

  • T-1 iterate: T-1 - 4, T-2 - 4
  • T-2 iterate: T-1 - 4, T-2 - 4
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2
  • T-1 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2, T-1 - 1
  • T-2 iterate: T-1 - 4, T-2 - 4, T-1 - 3, T-2 - 3, T-1 - 2, T-2 - 2, T-1 - 1, T-2 - 1