Java多线程
Java并发
JUC集合

Java多线程 33——ConcurrentSkipListMap(JDK1.7)详解(二)

简介:ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。

1. ConcurrentSkipListMap源码解析(一)

在接下来的内容中将以ConcurrentSkipListMap源码为基础详细解析其各类操作的实现,并从构造、增加、删除、修改、查询等过程演示源码的工作流程和原理。

1.1. ConcurrentSkipListMap的构造方法

ConcurrentSkipListMap有4个构造方法,声明如下:

  • // 构造一个新的空映射,该映射按照键的自然顺序进行排序
  • ConcurrentSkipListMap()
  • // 构造一个新的空映射,该映射按照指定的比较器进行排序
  • ConcurrentSkipListMap(Comparator<? super K> comparator)
  • // 构造一个新映射,该映射所包含的映射关系与给定映射包含的映射关系相同,并按照键的自然顺序进行排序
  • ConcurrentSkipListMap(Map<? extends K, ? extends V> m)
  • // 构造一个新映射,该映射所包含的映射关系与指定的有序映射包含的映射关系相同,使用的顺序也相同
  • ConcurrentSkipListMap(SortedMap<K, ? extends V> m)

由于ConcurrentSkipListMap是有序Map,因此在构造时可以指定比较器,如果没有指定比较器将使用默认的比较方法对键进行排序。除了默认的构造器外,可以传入一个Comparator参数用于指定构造器。另外ConcurrentSkipListMap也提供了从已有的Map构造,与ConcurrentHashMap不同的是,这里可以传入SortedMap对象,ConcurrentSkipListMap会直接沿用传入的SortedMap对象的比较器作为键比较器。这4个构造方法的源码很类似:

  • /**
  • * Constructs a new, empty map, sorted according to the
  • * {@linkplain Comparable natural ordering} of the keys.
  • * 构造一个新的ConcurrentSkipListMap实例
  • */
  • public ConcurrentSkipListMap() {
  • this.comparator = null;
  • // 初始化方法
  • initialize();
  • }
  • /**
  • * Constructs a new, empty map, sorted according to the specified
  • * comparator.
  • * <p>
  • * 指定比较器,构造一个新的ConcurrentSkipListMap实例
  • *
  • * @param comparator the comparator that will be used to order this map.
  • * If <tt>null</tt>, the {@linkplain Comparable natural
  • * ordering} of the keys will be used.
  • */
  • public ConcurrentSkipListMap(Comparator<? super K> comparator) {
  • this.comparator = comparator;
  • // 初始化方法
  • initialize();
  • }
  • /**
  • * Constructs a new map containing the same mappings as the given map,
  • * sorted according to the {@linkplain Comparable natural ordering} of
  • * the keys.
  • * <p>
  • * 构造一个新的ConcurrentSkipListMap实例,并将传入的Map中的元素添加到实例中
  • *
  • * @param m the map whose mappings are to be placed in this map
  • * @throws ClassCastException if the keys in <tt>m</tt> are not
  • * {@link Comparable}, or are not mutually comparable
  • * @throws NullPointerException if the specified map or any of its keys
  • * or values are null
  • */
  • public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
  • this.comparator = null;
  • initialize();
  • // 调用putAll()方法添加元素
  • putAll(m);
  • }
  • /**
  • * Constructs a new map containing the same mappings and using the
  • * same ordering as the specified sorted map.
  • * <p>
  • * 构造一个新的ConcurrentSkipListMap实例,并将传入的SortedMap中的元素添加到实例中
  • *
  • * @param m the sorted map whose mappings are to be placed in this
  • * map, and whose comparator is to be used to sort this map
  • * @throws NullPointerException if the specified sorted map or any of
  • * its keys or values are null
  • */
  • public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
  • // 将传入Map的比较器作为ConcurrentSkipListMap的比较器
  • this.comparator = m.comparator();
  • initialize();
  • // 调用buildFromSorted()方法添加元素
  • buildFromSorted(m);
  • }

可以发现,所有的构造方法都调用了initialize()方法进行初始化,该方法的源码如下:

  • final void initialize() {
  • keySet = null;
  • entrySet = null;
  • values = null;
  • descendingMap = null;
  • // 最小为4
  • randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
  • head = new HeadIndex<K, V>(new Node<K, V>(null, BASE_HEADER, null), null, null, 1);
  • }

initialize()方法重置了一些与ConcurrentSkipListMap息息相关的成员变量,同时生成了randomSeed,通过位运算保证了randomSeed最小为4。同时还初始化了head字段的指向,该字段是一个HeadIndex节点,level级别为1,rightdown都为null,同时其node字段指向了BASE_HEADER对象,这个对象在前面讲到过,就是一个简单的Object对象,仅用作标记使用。

以Map为参数的构造方法中,ConcurrentSkipListMap(Map<? extends K, ? extends V> m)使用了putAll(m)将传入的Map中的元素添加到新创建的ConcurrentSkipListMap中,putAll(Map<? extends K, ? extends V>)方法继承自AbstractMap类,源码如下:

  • public void putAll(Map<? extends K, ? extends V> m) {
  • for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
  • put(e.getKey(), e.getValue());
  • }

可以看出,它内部调用了put(K, V),具体细节在后面会讲解。

我们主要关注一下ConcurrentSkipListMap(SortedMap<K, ? extends V> m)方法内部调用的buildFromSorted(m),源码如下:

  • /**
  • * Streamlined bulk insertion to initialize from elements of
  • * given sorted map. Call only from constructor or clone
  • * method.
  • *
  • * 批量将有序集合中依次添加到新创建的ConcurrentSkipListMap中
  • * 该方法只会在构造方法和clone()方法中被调用
  • */
  • private void buildFromSorted(SortedMap<K, ? extends V> map) {
  • // 参数检查
  • if (map == null)
  • throw new NullPointerException();
  • // 头节点
  • HeadIndex<K, V> h = head;
  • // 头节点的node
  • Node<K, V> basepred = h.node;
  • // Track the current rightmost node at each level. Uses an
  • // ArrayList to avoid committing to initial or maximum level.
  • // 跟踪当前层最右边的节点
  • ArrayList<Index<K, V>> preds = new ArrayList<Index<K, V>>();
  • // initialize
  • /**
  • * 这一步的操作大致效果是这样的,假设当前跳表的结构如下:
  • * head -> n1 -> n2
  • * |
  • * index1 -> n3 -> n4 -> n5
  • * |
  • * index2 -> n6 -> n7
  • * 则会将head、index1、index2添加到preds中,顺序如下:
  • * (index2, index1, head)
  • */
  • // 根据头节点层级向preds添加null占位
  • for (int i = 0; i <= h.level; ++i)
  • preds.add(null);
  • // 反向往preds中添加头节点这一列的节点
  • Index<K, V> q = h;
  • for (int i = h.level; i > 0; --i) {
  • preds.set(i, q);
  • q = q.down;
  • }
  • // 迭代传入的map,将元素添加到新创建的ConcurrentSkipListMap中
  • Iterator<? extends Map.Entry<? extends K, ? extends V>> it = map.entrySet().iterator();
  • while (it.hasNext()) {
  • // 取出元素
  • Map.Entry<? extends K, ? extends V> e = it.next();
  • // 随机一个级别j
  • int j = randomLevel();
  • // j与head级别进行比较,如果大于,就置j为只比head级别大1
  • if (j > h.level) j = h.level + 1;
  • // 取出并检查键值对
  • K k = e.getKey();
  • V v = e.getValue();
  • if (k == null || v == null)
  • throw new NullPointerException();
  • // 根据键值对构造一个节点,后继节点为null
  • Node<K, V> z = new Node<K, V>(k, v, null);
  • // 将z作为head的node的后继节点
  • basepred.next = z;
  • // basepred后移,指向新添加的节点z
  • basepred = z;
  • if (j > 0) {
  • Index<K, V> idx = null;
  • // 从1遍历到j
  • for (int i = 1; i <= j; ++i) {
  • // 构建一个新的Index节点,其node指向上面新创建的节点z,后继节点为null
  • idx = new Index<K, V>(z, idx, null);
  • if (i > h.level)
  • /**
  • * 如果i比head节点的层级大,应该新构建一个head节点,补在idx前面
  • * 这一步的操作大致是这样的,假设当前跳表的结构如下:
  • * head(1)
  • * 此时head级别为1,此时插入一个级别为2的Index,级别比head大,因此需要新构建一个head,级别也为2
  • * 修改后结构如下:
  • * head(new, 2) -> idx(2)
  • * |
  • * head(old, 1)
  • */
  • h = new HeadIndex<K, V>(h.node, h, idx, i);
  • // 更新preds数组
  • if (i < preds.size()) {
  • preds.get(i).right = idx;
  • preds.set(i, idx);
  • } else
  • preds.add(idx);
  • }
  • }
  • }
  • // 更新head
  • head = h;
  • }

buildFromSorted(SortedMap<K, ? extends V>)方法接收一个Map参数,内部会通过迭代该Map将其所有元素添加到ConcurrentSkipListMap中;在添加过程中,通过randomLevel()方法随机获取一个级别,然后根据该级别调整跳表的结构。randomLevel()方法的源码如下:

  • /**
  • * Returns a random level for inserting a new node.
  • * Hardwired to k=1, p=0.5, max 31 (see above and
  • * Pugh's "Skip List Cookbook", sec 3.4).
  • * <p>
  • * This uses the simplest of the generators described in George
  • * Marsaglia's "Xorshift RNGs" paper. This is not a high-quality
  • * generator but is acceptable here.
  • *
  • * 为即将插入的新的Index节点生成一个随机级别,
  • * 这里使用的算法是Pugh's的论文《Skip List Cookbook》中3.4节提到的内容,level最大为31。
  • * 这使用了George Marsaglia的《Xorshift RNGs》论文中描述的最简单的生成器。
  • * 这个生成器并不完美,但在这里是可以接受的。
  • */
  • private int randomLevel() {
  • int x = randomSeed;
  • x ^= x << 13;
  • x ^= x >>> 17;
  • randomSeed = x ^= x << 5;
  • if ((x & 0x80000001) != 0) // test highest and lowest bits
  • return 0;
  • int level = 1;
  • while (((x >>>= 1) & 1) != 0) ++level;
  • return level;
  • }

关于级别的生成方法使用的算法我们不做讨论,这里只需要知道生成的级别最大为31。

这里我们模拟一下构造方法的过程,就以ConcurrentSkipListMap(SortedMap<K, ? extends V> m)方法为例,假设传入的SortedMap中有以下键值对:

  • <15, "Mary">
  • <20, "Jack">

在构造ConcurrentSkipListMap的过程中,记录了传入的SortedMap的比较器后,会先调用initialize()方法构建一个HeadIndex节点,HeadIndex节点的node变量指向了BASE_HEADER,而成员变量head指向了该HeadIndex节点;示意图如下:

1.ConcurrentSkipListMap的initialize()方法过程.png

接下来会调用buildFromSorted(SortedMap<K, ? extends V>)方法将传入的SortedMap中的键值对添加到跳表中,我们重点分析该方法。buildFromSorted(SortedMap<K, ? extends V>)方法内部使用列表preds来记录添加键值对过程中,最后产生的一列Index节点,用于维护跳表结构,在起始时刻preds按照从下往上的顺序存储了head这一列的Index节点,在本例中即只有head节点。示意图如下:

2.buildFromSorted()方法构建Index数组.png

接下来会迭代遍历传入的SortedMap的键值对,依次进行添加。在每次添加时,会随机生成一个级别,添加过程中会根据这个级别维护跳表结构。SortedMap的每一个键值对都会被构造为一个Node节点;以我们的示例数据为例,第一组键值对数据所对应的Node节点会作为head节点所包含的Node节点(即BASE_HEADER)的后继节点。

成功将新的Node节点添加到跳表中后,会根据随机生成的级别来维护跳表的级别结构。这里我们假设随机级别为2,大于当前head节点的级别1,且是符合条件的;因此buildFromSorted(SortedMap<K, ? extends V>)方法会创建两个Index节点,第二个Index节点的down指向第一个Index节点,且两个Index节点的node都指向上面新创建的Node节点。根据代码逻辑,还会创建一个level为2、down域指向head节点的HeadIndex节点;此时跳表中存在两个HeadIndex节点,需要将这两个HeadIndex节点的right域分别于两个Index节点相连接,并更新head成员变量,使其指向新创建的level为2的HeadIndex节点。最后会将新添加的两个Index节点记录在preds列表中。示意图如下:

3.buildFromSorted()方法添加第一组数据.png

添加第二组数据的流程与上面的类似,不过这里假设随机生成的级别为3,因此也需要维护跳表的级别;添加第二组数据的示意图如下:

4.buildFromSorted()方法添加第二组数据.png

从上面的分析过程和示意图可知,在buildFromSorted(SortedMap<K, ? extends V>)方法内向跳表添加数据的过程有以下要点:

  1. ConcurrentSkipListMap跳表结构中,第一列的索引节点都为HeadIndex节点,从上往下进行衔接;除此之外其他列的索引节点都为Index节点。
  2. 所有装载键值对的Node数据节点位于跳表结构的最底层,即每一列节点的最下面的节点为Node数据节点。
  3. 每一列的Index节点的node变量都引用着这一列最下面的Node数据节点。第一列所有的HeadIndex节点的node变量引用的Node节点的值是BASE_HEADER节点,该Node代表基础头节点,并不装载用户有效数据。
  4. 每层的节点(无论是Node数据节点,还是HeadIndex和Index索引节点)都构成了一个单向链表结构。

其实ConcurrentSkipListMap内部的跳表结构就是典型的跳表结构,根据以上的数据添加过程我们可以理清HeadIndex、Index、Node这三类节点在跳表中充当的角色,这也便于我们后面理解ConcurrentSkipListMap的各类操作。下面是buildFromSorted(SortedMap<K, ? extends V>)方法的完整过程示意图:

5.ConcurrentSkipListMap的buildFromSorted()方法过程.png

1.2. 查找辅助方法

在介绍ConcurrentSkipListMap的各项主要操作之前,我们先关注一个大类别的方法:查找辅助方法。由于ConcurrentSkipListMap中跳表结构的复杂性,ConcurrentSkipListMap提供了大量的查找辅助方法,声明如下:

  • // 查找第一个数据节点
  • Node<K, V> findFirst()
  • // 查找最后一个数据节点
  • Node<K, V> findLast()
  • // 根据key查找符合条件的前驱节点
  • private Node<K, V> findPredecessor(Comparable<? super K> key)
  • // 具体用处请参考该方法的完整注释的讲解
  • private Node<K, V> findPredecessorOfLast()
  • // 根据key查找数据节点
  • private Node<K, V> findNode(Comparable<? super K> key)
  • // 根据指定操作查找附近的数据节点
  • Node<K, V> findNear(K kkey, int rel)

这些方法都有很强大的用处,在这里我们先对这几个方法进行简单地介绍,大家如果对这部分的源码有疑问也没关系,只需要在脑海中有个印象即可,在后面的示例操作中会对这些方法做详细演示和解析。

  1. findFirst()方法

首先关注findFirst()方法,从方法声明就可以得知,它是用于查找第一个第一个有效的数据节点,源码如下:

  • /**
  • * Specialized variant of findNode to get first valid node.
  • * 找到第一个有效的节点
  • *
  • * @return first node or null if empty
  • */
  • Node<K, V> findFirst() {
  • // 无限循环
  • for (; ; ) {
  • // head头索引的Node
  • Node<K, V> b = head.node;
  • // 头节点的后继节点n
  • Node<K, V> n = b.next;
  • if (n == null)
  • // n为null则返回null
  • return null;
  • if (n.value != null)
  • // n的value不为null则返回该后继节点
  • return n;
  • // n的value为null,删除n节点
  • n.helpDelete(b, n.next);
  • }
  • }

findFirst()可以得知,第一个有效的数据节点就是基础头节点(即head指向的、值为BASE_HEADER的那个节点)的有效的后继节点。由于主体代码在一个无限for循环中,因此如果刚好遇到BASE_HEADER节点的后继节点n为被删除的节点,会使用n.helpDelete(b, n.next)协助删除n节点,然后进行新的循环再次查找。

  1. findLast()方法

findLast()方法用于查找最末尾的有效的数据节点,它的源码如下:

  • /**
  • * Specialized version of find to get last valid node.
  • *
  • * 查找最后一个有效的数据节点
  • *
  • * @return last node or null if empty
  • */
  • Node<K, V> findLast() {
  • /*
  • * findPredecessor can't be used to traverse index level
  • * because this doesn't use comparisons. So traversals of
  • * both levels are folded together.
  • */
  • // 头节点
  • Index<K, V> q = head;
  • // 无限循环
  • for (; ; ) {
  • Index<K, V> d, r;
  • // 头节点的right节点
  • if ((r = q.right) != null) {
  • // 如果r为被删除的节点,将其从索引链中断开
  • if (r.indexesDeletedNode()) {
  • q.unlink(r);
  • // 重新赋值q,然后进行下一次循环
  • q = head; // restart
  • } else
  • // 否则q进行右移操作,即指向r,然后进行下一次循环
  • q = r;
  • } else if ((d = q.down) != null) { // 如果q的right节点为null,则尝试下潜
  • // 走到这里表示q的down节点不为null,下潜成功,将进行下一次循环
  • q = d;
  • } else {
  • // 走到这里,表示q的right点为null,down点也为null,即q无法右移也无法下潜
  • // 获取q指向的Node节点b
  • Node<K, V> b = q.node;
  • // 获取b的后继
  • Node<K, V> n = b.next;
  • for (; ; ) {
  • if (n == null)
  • /**
  • * n为null的话,表示b已经是Node链中最后一个节点
  • * 如果此时b不是BASE_HEADER,那么b就是要找的最后一个节点
  • */
  • return b.isBaseHeader() ? null : b;
  • // 走到这里表示n不为null,获取n的后继为f
  • Node<K, V> f = n.next; // inconsistent read
  • // 检查n是否改变,如果改变的话跳出本次循环,重新进行外层循环重试
  • if (n != b.next)
  • break;
  • /**
  • * 检查n是否是被删除的节点
  • * 如果是,调用helpDelete将其从Node链中断开,
  • * 然后跳出本次循环,重新进行外层循环重试
  • */
  • Object v = n.value;
  • if (v == null) { // n is deleted
  • n.helpDelete(b, f);
  • break;
  • }
  • /**
  • * 走到这里说明n是有效的
  • * 这里用于判断b是否是被删除的节点
  • * 当v(n.value)== n时,表示n是删除标记节点
  • * 此时如果b的value也为null,则b也是被删除的节点
  • * 如果是,就跳出本次循环,重新进行外层循环重试
  • */
  • if (v == n || b.value == null) // b is deleted
  • break;
  • // 走到这里说明b和n都有效,后移,再次进行循环判断
  • b = n;
  • n = f;
  • }
  • q = head; // restart
  • }
  • }
  • }

源码比较复杂,这里将仔细梳理。findLast()方法内有两个无限for循环,第一个无限for循环中,首先引用head节点为q,然后有if…else if…else的三个条件分支,其实这三个分支各有用处;其中第一个if分支中的代码用于在跳表结构中对q进行右移操作,第二个else if分支中的代码用于在跳表结构中对q进行下潜操作,而第三个else分支中的代码则最为重要,当代码走到这个分支,说明q节点已经无法右移且无法下潜了,此时在该分支里的无限for循环中将从q指向的Node节点开始往右遍历,并进行各类检查,直到遍历到某个节点的后继为null时,即该节点已经没有后继节点了,判断该节点是否是BASE_HEADER,如果不是,该节点就是我们要找的最后一个有效的数据节点。

  1. findPredecessor(Comparable<? super K>)方法

由于ConcurrentSkipListMap中的键是具有可比较特性的(通过比较器或者key实现了Comparable接口),因此所有的键值对都是键有序的。findPredecessor(Comparable<? super K> key)方法用于寻找给定键所对应的符合条件的前驱节点,这里的符合条件是指有效且未被标记为删除,该方法的源码如下:

  • /**
  • * Returns a base-level node with key strictly less than given key,
  • * or the base-level header if there is no such node. Also
  • * unlinks indexes to deleted nodes found along the way. Callers
  • * rely on this side-effect of clearing indices to deleted nodes.
  • *
  • * 根据传入的key,查找符合条件的前驱节点
  • *
  • * @param key the key
  • * @return a predecessor of key
  • */
  • private Node<K, V> findPredecessor(Comparable<? super K> key) {
  • if (key == null)
  • throw new NullPointerException(); // don't postpone errors
  • for (; ; ) {
  • // 头节点
  • Index<K, V> q = head;
  • Index<K, V> r = q.right;
  • for (; ; ) {
  • if (r != null) { // 头节点的右Index节点不为null
  • // 查看当前Index包装的Node节点
  • Node<K, V> n = r.node;
  • K k = n.key;
  • // 这一步if会尝试将失效的Index节点移除
  • if (n.value == null) {
  • /**
  • * value为空,表示这个Node节点已经被删除了
  • * 就调用unlink()将其从链表中移除,unlink()操作会将q的右节点换成r的右节点
  • * 移除成功会跳出内层循环;否则继续往下执行
  • */
  • if (!q.unlink(r))
  • // 跳出内层循环
  • break; // restart
  • // 走到这里说明unlink()操作失败,这一步会将r指向q新的右节点
  • r = q.right; // reread r
  • // 继续
  • continue;
  • }
  • /**
  • * 走到这里说明value不为null,对比key
  • * 当传入的key比遍历到的节点的k大,就继续往后移
  • * 即将q指向r,将r指向r的右节点(即后继)
  • * 然后继续下一个循环
  • */
  • if (key.compareTo(k) > 0) {
  • q = r;
  • r = r.right;
  • continue;
  • }
  • }
  • /**
  • * 走到这里有两种情况:
  • * 1. r为null;此时表示head节点没有右节点,所以需要查找下一层
  • * 2. r不为null,但key小于或等于k
  • * 注:r为q的后继节点
  • */
  • // 获取head的下节点
  • Index<K, V> d = q.down;
  • if (d != null) {
  • /**
  • * 下节点不为null,将q指向该下节点,将r指向下节点的右节点
  • * 这个操作相当于往下移了一层,因为当前层已经没有可找的了
  • */
  • q = d;
  • r = d.right;
  • } else
  • /**
  • * 走到这里,说明d为null,即只有一层,因此直接将当前q的Node返回
  • * 此时应该有两种情况:
  • * 1. q的右节点r(即后继节点)不为null时,但r的key比传入的key大;
  • * 2. q的右节点r为null;
  • * 这两种情况下,q的Node就应该是要找的前驱节点
  • */
  • return q.node;
  • }
  • }
  • }

上面源码中的注释已经把findPredecessor(Comparable<? super K>)方法的原理讲解地比较清楚了,这里就补充几个注意点:

  • findPredecessor(Comparable<? super K>)方法由于需要遍历跳表数据,使用了两层for循环,其中第一个for循环用于查找失败时的重试操作,而第二个for循环则是用于主要的各类逻辑判断和移位操作。
  • findPredecessor(Comparable<? super K>)方法利用跳表的特性,从head节点开始,从上层往下层,从左往右进行查找。
  • findPredecessor(Comparable<? super K>)方法遍历的过程中,它会协助将失效的Index节点移除掉,当一个Index节点指向的Node节点的值为null,该Index节点就被视为无效,这一功能非常重要。
  • 从遍历查找的过程可知,ConcurrentSkipListMap中跳表数据的顺序是从小到大进行排列的。
  1. findPredecessorOfLast()方法

findPredecessorOfLast()方法的字面意思是“查找最后一个数据节点的前驱节点”,但其实从该方法的实现和对该方法的使用严格来讲,它返回的节点并不是最后一个数据节点的前驱。它会遍历到跳表底层靠后的位置,然后返回特定的Index对应的Node节点;这里的“特定”也存在两种情况。我们先观察它的源码:

  • /**
  • * Specialized variant of findPredecessor to get predecessor of last
  • * valid node. Needed when removing the last entry. It is possible
  • * that all successors of returned node will have been deleted upon
  • * return, in which case this method can be retried.
  • *
  • * 这个方法的字面意思是查找最后一个Node节点的前驱节点
  • * 其实分析这个方法的内容以及对它的使用,可以发现它并不是返回最后一个Node节点的前驱节点
  • * 它会遍历到跳表底层靠后的位置,然后返回特定的Index对应的Node节点
  • * 这时这个Node节点可能还有后继节点
  • *
  • * @return likely predecessor of last node
  • */
  • private Node<K, V> findPredecessorOfLast() {
  • // 无限循环
  • for (; ; ) {
  • // 获取head节点
  • Index<K, V> q = head;
  • // 无限循环
  • for (; ; ) {
  • Index<K, V> d, r;
  • // 头节点的right节点
  • if ((r = q.right) != null) {
  • // 如果r为被删除的节点,将其从索引链中断开
  • if (r.indexesDeletedNode()) {
  • // 重新赋值q,然后进行下一次循环
  • q.unlink(r);
  • break; // must restart
  • }
  • // proceed as far across as possible without overshooting
  • /**
  • * 否则查看r对应的Node节点的后继是否为null
  • * 如果不为null,则将q往后移动
  • */
  • if (r.node.next != null) {
  • // q往后移动
  • q = r;
  • continue;
  • }
  • }
  • /**
  • * 走到这里有两种情况:
  • * 1. q的right节点r为null
  • * 2. q的right节点r不为null,但r对应的Node节点的后继为null
  • * 尝试下潜操作
  • */
  • if ((d = q.down) != null)
  • // 走到这里表示q的down节点不为null,下潜成功,将进行下一次循环
  • q = d;
  • /**
  • * 走到这里表示q节点的down节点也为null,下潜失败
  • */
  • else
  • return q.node;
  • }
  • }
  • }

从源码可知,该方法最后确实遍历到了跳表底层靠后的位置,最后返回的Node节点q.node并不是Node链中最后一个节点的前驱,q.node节点之后还可能存在1个以上的Node节点,这些Node节点并没有被跳表的索引结构所引用。

大家对findPredecessorOfLast()方法如果有疑问也不用深究,该方法仅仅会在后面讲解的doRemoveLastEntry()方法中用到,到时候讲解这个方法的时候大家就能真正理解findPredecessorOfLast()方法的实际作用了。

  1. findNear(K, int)方法

在关注findNear(K, int)方法之前,我们先关注如下三个变量:

  • // Control values OR'ed as arguments to findNear
  • private static final int EQ = 1;
  • private static final int LT = 2;
  • private static final int GT = 0; // Actually checked as !LT

这三个变量在findNear(K, int)方法中使用频繁,分别表示Equal、Great Than和Less Than,即相等、大于和小于。注意它们的取值从大到小依次是0、1、2,转换为二进制即是0000、0010、0100,这样表示有助于在后面使用OR位运算时对它们进行合并。

再来查看findNear(K, int)方法的源码:

  • /**
  • * Utility for ceiling, floor, lower, higher methods.
  • *
  • * @param kkey the key
  • * @param rel the relation -- OR'ed combination of EQ, LT, GT;查找关系,值为EQ, LT, GT其中一个
  • * @return nearest node fitting relation, or null if no such
  • */
  • Node<K, V> findNear(K kkey, int rel) {
  • // 包装key
  • Comparable<? super K> key = comparable(kkey);
  • // 无限循环
  • for (; ; ) {
  • // 获取符合条件的前驱节点,标识为b
  • Node<K, V> b = findPredecessor(key);
  • // 获取b的后继为n
  • Node<K, V> n = b.next;
  • // 无限循环
  • for (; ; ) {
  • if (n == null)
  • /**
  • * n为null时,说明b已经是Node链中最后一个节点了
  • * rel & LT为0时,表示rel不为LT
  • * 此时表示,要查找的Node并不是小于传入的key的Node,即要查找的是键大于或等于key的Node
  • * 由于跳表中节点时按键从小到大的顺序排列的,因此b就是要查找的Node节点
  • */
  • return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
  • // 走到这里说明b不为null,获取n的后继为f,此时有关系b -> n -> f
  • Node<K, V> f = n.next;
  • // 检查n是否被改变
  • if (n != b.next) // inconsistent read
  • break;
  • /**
  • * 如果n的value为null表示n是被删除的节点,使用helpDelete()协助将其从Node链移除
  • * 然后进行直接跳出内层循环
  • */
  • Object v = n.value;
  • if (v == null) { // n is deleted
  • n.helpDelete(b, f);
  • break;
  • }
  • /**
  • * 检查n是否是标记删除节点,
  • * 如果n是标记删除节点,则检查b是否是被删除节点,如果b是被删除的节点,
  • * 就跳出内层循环,重新在外层循环中使用findPredecessor(key)查找
  • */
  • if (v == n || b.value == null) // b is deleted
  • break;
  • // 走到这里表示b和n都是正常的Node节点,因此比较传入key与n的key
  • int c = key.compareTo(n.key);
  • /**
  • * 首先关注其中两个条件的意思:
  • * 1. (rel & EQ) != 0表示指定比较操作存在EQ操作,即可能是EQ(等于)、LT | EQ(小于等于)或GT | EQ(大于等于)
  • * 2. (rel & LT) == 0表示指定操作不存在LT操作,即可能是EQ(等于)、GT(大于)或GT | EQ(大于等于)
  • * 再来看if的判断过程
  • * 1. 如果传入key与n的key相等,且比较操作存在EQ操作(即查找等于、小于等于或大于等于key的节点),就返回n;
  • * 2. 如果传入key与n的key相等,但比较操作不存在EQ操作(即查找不等于key的节点),n不符合条件,什么都不做;
  • * 3. 如果传入key小于n的key,且不存在LT操作(即查找等于、大于或大于等于key的节点),就返回n;(我一直觉得此处有问题,当传入带有EQ的操作,此时也将n返回了)
  • * 4. 如果传入key小于n的key,但存在LT操作(即查找小于、小于等于key的节点),n不符合条件,什么都不做。
  • */
  • if ((c == 0 && (rel & EQ) != 0) || (c < 0 && (rel & LT) == 0))
  • return n;
  • /**
  • * 如果传入key小于等于n的key,且比较操作存在LT操作(即可能是小于、小于等于)
  • * 判断b是否是BASE_HEADER,如果不是就返回b
  • */
  • if (c <= 0 && (rel & LT) != 0)
  • return b.isBaseHeader() ? null : b;
  • // 走到这里说明传入的key大于n的key,因此需要进行后移操作
  • b = n;
  • n = f;
  • }
  • }
  • }

findNear(K, int)方法接收的第二个参数用于表示操作类型,是一个int值,这个值可以是LT、EQ和GT之间的任意组合。findNear(K, int)方法的实现过程其实比较简单,同样是使用两层嵌套的无限for循环,在第二个for循环中不断遍历Node链,对拿到的一系列Node节点进行了各类检查工作,然后根据传入的操作类型判断遍历到的节点是否符合条件,如果符合就返回,否则进行新的尝试。

  1. findNode(Comparable<? super K>)方法

findNode(key)方法用于寻找给定键所对应的有效的Node节点,它的源码如下:

  • private Node<K, V> findNode(Comparable<? super K> key) {
  • // 无限循环
  • for (; ; ) {
  • // 尝试找到符合条件的key的前驱节点
  • Node<K, V> b = findPredecessor(key);
  • // 获取b的后继节点
  • Node<K, V> n = b.next;
  • for (; ; ) {
  • // n为null,表示没找到,直接返回null
  • if (n == null)
  • return null;
  • // 得到n的后继节点
  • Node<K, V> f = n.next;
  • // 检查b的后继是否被改变,如果改变就跳出内层循环
  • if (n != b.next) // inconsistent read
  • break;
  • // 获取n的value
  • Object v = n.value;
  • // n的value为null表示n已经被删掉了
  • if (v == null) { // n is deleted
  • // 调用helpDelete()方法将其删除,并跳出内层循环
  • n.helpDelete(b, f);
  • break;
  • }
  • // 走到这里说明n的value不为null
  • if (v == n || b.value == null) // b is deleted
  • /**
  • * 两种情况:
  • * 1. v == n说明n是标记节点
  • * 2. v != n,但b.value为null,说明b是被删除的节点
  • * 跳出内层循环
  • */
  • break;
  • // 比较key和n的key的大小
  • int c = key.compareTo(n.key);
  • // key和n的key相同,表示找到
  • if (c == 0)
  • return n;
  • // 表示没有找到
  • if (c < 0)
  • return null;
  • // 往后移,跳出进行下一次循环,即往后遍历
  • b = n;
  • n = f;
  • }
  • }
  • }

findNode(Comparable<? super K> key)方法首先通过上面讲过的findPredecessor(Comparable<? super K>)方法找到给定键所对应的符合条件的前驱节点,然后从该前驱节点开始,对后面的节点一一进行比较,找到键与传入的key相同的节点返回即可,如果找不到就返回null。具体的操作源码中的注释已经讲解的非常清楚了。

注意在findNode(Comparable<? super K>)方法查找过程中,会通过helpDelete(Node<K, V> b, Node<K, V> f)方法将失效的n节点从Node链中移除,这在后面删除键值对相关的方法中会有体现,敬请期待。

这六个查找辅助方法在ConcurrentSkipListMap的各项功能实现中充当了很重要的责任,这里将它们一并进行讲解也是为后面的内容做铺垫。另外与这六个方法相关,有一些功能性的方法内部就是调用的它们,由于比较简单,这里只贴出源码如下:

  • /**
  • * @throws NoSuchElementException {@inheritDoc}
  • * 获取第一个数据节点的键
  • */
  • public K firstKey() {
  • Node<K, V> n = findFirst();
  • if (n == null)
  • throw new NoSuchElementException();
  • return n.key;
  • }
  • /**
  • * @throws NoSuchElementException {@inheritDoc}
  • * 获取最后一个数据节点的键
  • */
  • public K lastKey() {
  • Node<K, V> n = findLast();
  • if (n == null)
  • throw new NoSuchElementException();
  • return n.key;
  • }
  • /**
  • * Returns SimpleImmutableEntry for results of findNear.
  • *
  • * 根据指定操作查找附近的数据节点,包装为SimpleImmutableEntry实例返回
  • *
  • * @param key the key
  • * @param rel the relation -- OR'ed combination of EQ, LT, GT
  • * @return Entry fitting relation, or null if no such
  • */
  • AbstractMap.SimpleImmutableEntry<K, V> getNear(K key, int rel) {
  • for (; ; ) {
  • Node<K, V> n = findNear(key, rel);
  • if (n == null)
  • return null;
  • AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
  • if (e != null)
  • return e;
  • }
  • }
  • /**
  • * Returns a key-value mapping associated with the greatest key
  • * strictly less than the given key, or <tt>null</tt> if there is
  • * no such key. The returned entry does <em>not</em> support the
  • * <tt>Entry.setValue</tt> method.
  • *
  • * 获取比给定key小的最大键对应的键值对
  • *
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public Map.Entry<K, V> lowerEntry(K key) {
  • return getNear(key, LT);
  • }
  • /**
  • * 获取比给定key小的最大键对应的键值对的键
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public K lowerKey(K key) {
  • Node<K, V> n = findNear(key, LT);
  • return (n == null) ? null : n.key;
  • }
  • /**
  • * Returns a key-value mapping associated with the greatest key
  • * less than or equal to the given key, or <tt>null</tt> if there
  • * is no such key. The returned entry does <em>not</em> support
  • * the <tt>Entry.setValue</tt> method.
  • *
  • * 获取键小于或等于给定key的最大键值对
  • *
  • * @param key the key
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public Map.Entry<K, V> floorEntry(K key) {
  • return getNear(key, LT | EQ);
  • }
  • /**
  • * 获取键小于或等于给定key的最大键值对对应的键
  • * @param key the key
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public K floorKey(K key) {
  • Node<K, V> n = findNear(key, LT | EQ);
  • return (n == null) ? null : n.key;
  • }
  • /**
  • * Returns a key-value mapping associated with the least key
  • * greater than or equal to the given key, or <tt>null</tt> if
  • * there is no such entry. The returned entry does <em>not</em>
  • * support the <tt>Entry.setValue</tt> method.
  • *
  • * 获取键大于或等于给定key的最大键值对
  • *
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public Map.Entry<K, V> ceilingEntry(K key) {
  • return getNear(key, GT | EQ);
  • }
  • /**
  • * 获取键大于或等于给定key的最大键
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public K ceilingKey(K key) {
  • Node<K, V> n = findNear(key, GT | EQ);
  • return (n == null) ? null : n.key;
  • }
  • /**
  • * Returns a key-value mapping associated with the least key
  • * strictly greater than the given key, or <tt>null</tt> if there
  • * is no such key. The returned entry does <em>not</em> support
  • * the <tt>Entry.setValue</tt> method.
  • *
  • * 获取键大于给定key的最大键值对
  • *
  • * @param key the key
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public Map.Entry<K, V> higherEntry(K key) {
  • return getNear(key, GT);
  • }
  • /**
  • * 获取键大于给定key的最大键
  • * @param key the key
  • * @throws ClassCastException {@inheritDoc}
  • * @throws NullPointerException if the specified key is null
  • */
  • public K higherKey(K key) {
  • Node<K, V> n = findNear(key, GT);
  • return (n == null) ? null : n.key;
  • }
  • /**
  • * Returns a key-value mapping associated with the least
  • * key in this map, or <tt>null</tt> if the map is empty.
  • * The returned entry does <em>not</em> support
  • * the <tt>Entry.setValue</tt> method.
  • * 获取第一个数据节点的键值对,包装为SimpleImmutableEntry实例返回
  • */
  • public Map.Entry<K, V> firstEntry() {
  • for (; ; ) {
  • Node<K, V> n = findFirst();
  • if (n == null)
  • return null;
  • AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
  • if (e != null)
  • return e;
  • }
  • }
  • /**
  • * Returns a key-value mapping associated with the greatest
  • * key in this map, or <tt>null</tt> if the map is empty.
  • * The returned entry does <em>not</em> support
  • * the <tt>Entry.setValue</tt> method.
  • * 获取最后一个数据节点的键值对,包装为SimpleImmutableEntry实例返回
  • */
  • public Map.Entry<K, V> lastEntry() {
  • for (; ; ) {
  • Node<K, V> n = findLast();
  • if (n == null)
  • return null;
  • AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
  • if (e != null)
  • return e;
  • }
  • }

1.3. size()和isEmpty()方法

ConcurrentSkipListMap提供了size()方法用于获取键值对个数,实现是非常简单的,由于所有的键值对都存放在跳表最下面一层的由Node构成的单向链表中,因此只需要遍历这个单向链表就可以获取键值对的总个数,源码如下:

  • /**
  • * Returns the number of key-value mappings in this map. If this map
  • * contains more than <tt>Integer.MAX_VALUE</tt> elements, it
  • * returns <tt>Integer.MAX_VALUE</tt>.
  • *
  • * <p>Beware that, unlike in most collections, this method is
  • * <em>NOT</em> a constant-time operation. Because of the
  • * asynchronous nature of these maps, determining the current
  • * number of elements requires traversing them all to count them.
  • * Additionally, it is possible for the size to change during
  • * execution of this method, in which case the returned result
  • * will be inaccurate. Thus, this method is typically not very
  • * useful in concurrent applications.
  • *
  • * @return the number of elements in this map
  • */
  • public int size() {
  • long count = 0;
  • // 遍历计算
  • for (Node<K, V> n = findFirst(); n != null; n = n.next) {
  • if (n.getValidValue() != null)
  • ++count;
  • }
  • // 最大只能为Integer.MAX_VALUE
  • return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
  • }

注:需要注意的是,如果ConcurrentSkipListMap中键值对个数大于等于Integer.MAX_VALUE,则取Integer.MAX_VALUE,避免溢出。

其中findFirst()在前面已经介绍过了,用于查找第一个有效的Node,作为遍历操作的起始Node。从findFirst()可以得知,起始节点起始就是基础头节点(即head指向的、值为BASE_HEADER的那个节点)的后继节点。

isEmpty()方法则更简单了,只需要判断是否能找到符合条件的起始节点即可,源码如下:

  • /**
  • * Returns <tt>true</tt> if this map contains no key-value mappings.
  • *
  • * @return <tt>true</tt> if this map contains no key-value mappings
  • */
  • public boolean isEmpty() {
  • // 如果找到的第一个有效节点为null则表示Map为空
  • return findFirst() == null;
  • }

1.4. contains()相关方法

ConcurrentSkipListMap中contains()方法有两类:containsKey(Object key)containsValue(Object value),分别用于判断是否包含某个键和是否包含某个值。先考察containsValue(Object value)方法,源码如下:

  • /**
  • * Returns <tt>true</tt> if this map maps one or more keys to the
  • * specified value. This operation requires time linear in the
  • * map size. Additionally, it is possible for the map to change
  • * during execution of this method, in which case the returned
  • * result may be inaccurate.
  • *
  • * 判断是否包含某个值,需要遍历Node节点
  • * 返回的值可能不准确,因为存在其他线程并发修改的情况
  • *
  • * @param value value whose presence in this map is to be tested
  • * @return <tt>true</tt> if a mapping to <tt>value</tt> exists;
  • * <tt>false</tt> otherwise
  • * @throws NullPointerException if the specified value is null
  • */
  • public boolean containsValue(Object value) {
  • // 参数检查
  • if (value == null)
  • throw new NullPointerException();
  • // 遍历Node链
  • for (Node<K, V> n = findFirst(); n != null; n = n.next) {
  • V v = n.getValidValue();
  • // 判断值是否相等
  • if (v != null && value.equals(v))
  • return true;
  • }
  • return false;
  • }

containsValue(Object value)方法的原理是遍历Node链,对每个Node的值进行判断,如果存在某个Node的值与传入value相同,则返回true表示包含。需要注意的是,由于多线程并发的情况,在遍历判断的过程中可能存在节点数据被其他线程修改的情况,因此containsValue(Object value)方法的结果并不一定准确。

至于containsKey(Object key)的实现则是直接通过doGet(Object okey)方法直接根据key查找对应的值来实现的,关于`doGet(Object)方法会在后面讲解,这里先贴出containsKey(Object key)方法的源码:

  • /**
  • * Returns <tt>true</tt> if this map contains a mapping for the specified
  • * key.
  • *
  • * 判断是否存在某个键
  • *
  • * @param key key whose presence in this map is to be tested
  • * @return <tt>true</tt> if this map contains a mapping for the specified key
  • * @throws ClassCastException if the specified key cannot be compared
  • * with the keys currently in the map
  • * @throws NullPointerException if the specified key is null
  • */
  • public boolean containsKey(Object key) {
  • // 调用doGet()方法获取key对应的值,如果返回不为null则表示存在这个key
  • return doGet(key) != null;
  • }

1.5. put()相关方法

与ConcurrentHashMap非常类似,ConcurrentSkipListMap的put()相关的方法也有三类:put(K, V)putIfAbsent(K, V)putAll(Map<? extends K, ? extends V>)。其中putAll(Map<? extends K, ? extends V>)在某个构造方法中用到了,底层调用的是put(K, V)方法,而put(K, V)putIfAbsent(K, V)底层都调用了doPut(K kkey, V value, boolean onlyIfAbsent)方法,区别在于传入的第三个参数不一样,源码如下:

  • /**
  • * Associates the specified value with the specified key in this map.
  • * If the map previously contained a mapping for the key, the old
  • * value is replaced.
  • * <p>
  • * 添加键值对到Map中,键存在时会覆盖原来的值
  • *
  • * @param key key with which the specified value is to be associated
  • * @param value value to be associated with the specified key
  • * @return the previous value associated with the specified key, or
  • * <tt>null</tt> if there was no mapping for the key 当覆盖时会返回旧值,否则返回null
  • * @throws ClassCastException if the specified key cannot be compared
  • * with the keys currently in the map
  • * @throws NullPointerException if the specified key or value is null 键或值为空都会抛出异常
  • */
  • public V put(K key, V value) {
  • // value不能为null
  • if (value == null)
  • throw new NullPointerException();
  • // 调用doPut()方法添加元素,onlyIfAbsent传入false
  • return doPut(key, value, false);
  • }
  • /**
  • * {@inheritDoc}
  • *
  • * 添加键值对到Map中,只有在键不存在时才会添加
  • *
  • * @return the previous value associated with the specified key,
  • * or <tt>null</tt> if there was no mapping for the key
  • * @throws ClassCastException if the specified key cannot be compared
  • * with the keys currently in the map
  • * @throws NullPointerException if the specified key or value is null
  • */
  • public V putIfAbsent(K key, V value) {
  • // value不能为null
  • if (value == null)
  • throw new NullPointerException();
  • // 调用doPut()方法添加元素,onlyIfAbsent传入true
  • return doPut(key, value, true);
  • }

因此我们需要着重关注doPut(K, V, boolean)方法,它的源码如下:

  • /**
  • * Main insertion method. Adds element if not present, or
  • * replaces value if present and onlyIfAbsent is false.
  • * 主要的插入方法,当键不存在时会添加,当键存在时,如果onlyIfAbsent为false则会覆盖旧值
  • *
  • * @param kkey the key
  • * @param value the value that must be associated with key
  • * @param onlyIfAbsent if should not insert if already present 是否只在不存在时添加
  • * @return the old value, or null if newly inserted 旧值(当覆盖、或onlyIfAbsent为true但key已存在时),或者null(新添加)
  • */
  • private V doPut(K kkey, V value, boolean onlyIfAbsent) {
  • /**
  • * 在comparable()方法中,key为null会抛出NullPointerException。
  • * 这个地方得到的key其实就是一个Comparable类型实例,可能是ComparableUsingComparator对象
  • * ComparableUsingComparator将key和Comparator封装了一下
  • */
  • Comparable<? super K> key = comparable(kkey);
  • // 无限循环
  • for (; ; ) {
  • /**
  • * 在跳表中尝试找到符合条件的key的前驱节点
  • * 查找的规则在于:跳表中底层Node的key比上层的key要大,Node链中后面Node的key比前面的key要大
  • * 因此查找时是从上往下,从前往后找
  • */
  • Node<K, V> b = findPredecessor(key);
  • // 获取b的后继节点
  • Node<K, V> n = b.next;
  • // 无限循环
  • for (; ; ) {
  • if (n != null) {
  • // b的后继节点n不为null,得到n的后继节点
  • Node<K, V> f = n.next;
  • // 检查b的后继是否被改变,如果改变就跳出内层循环
  • if (n != b.next) // inconsistent read
  • break;
  • // 获取n的value
  • Object v = n.value;
  • // n的value为null表示n已经被删掉了
  • if (v == null) { // n is deleted
  • /**
  • * 调用helpDelete()方法将其删除,并跳出内层循环
  • * 此时可能返回的情况可能有两种:
  • * 修改前:b -> n -> f -> f.next(f.next可能有,也可能没有)
  • * 修改后:
  • * 1. b -> n -> marker -> f
  • * 2. b -> f.next
  • * 然后跳出内层循环
  • */
  • n.helpDelete(b, f);
  • break;
  • }
  • // 走到这里说明n的value不为null
  • if (v == n || b.value == null) // b is deleted
  • /**
  • * 两种情况:
  • * 1. v == n说明n是标记节点
  • * 2. v != n,但b.value为null,说明b是被删除的节点
  • * 跳出内层循环
  • */
  • break;
  • // 比较key和n的key的大小
  • int c = key.compareTo(n.key);
  • if (c > 0) {
  • /**
  • * 要比n的key大,就往后移,跳出进行下一次循环,即往后遍历:
  • * 之前:b -> n -> f
  • * 现在:(old b) -> b(old n) -> n(old f)
  • */
  • b = n;
  • n = f;
  • continue;
  • }
  • if (c == 0) {
  • /**
  • * key与n的key相等,则表示键已存在,此时如果onlyIfAbsent为false,
  • * 就修改n的value为新value,如果修改成功就返回旧value,
  • * 如果onlyIfAbsent为true,则直接返回n的value,不做任何处理,
  • * 否则进行下一次尝试
  • */
  • if (onlyIfAbsent || n.casValue(v, value))
  • return (V) v;
  • else
  • // 否则跳出内层循环
  • break; // restart if lost race to replace value
  • }
  • // else c < 0; fall through
  • }
  • /**
  • * 走到这里有两种情况:
  • * 1. n为null;此时b就是新节点的前驱
  • * 2. n不为null,且已经确认传入的key小于等于n的key,因此n就是新节点的后继,b就是新节点的前驱
  • */
  • // 构建一个新节点z,将n作为该节点的后继
  • Node<K, V> z = new Node<K, V>(kkey, value, n);
  • // CAS修改b的后继为新节点z,如果修改不成功就跳出内层循环
  • if (!b.casNext(n, z))
  • break; // restart if lost race to append to b
  • // 随机一个级别,这个方法后面讲解
  • int level = randomLevel();
  • if (level > 0)
  • // 插入一个级别
  • insertIndex(z, level);
  • return null;
  • }
  • }
  • }

下面将doPut(K, V, boolean)方法分为几个大的模块来一一讲解,同时在讲解过程中将配合一个插入案例进行演示,我们以前面讲解ConcurrentSkipListMap(SortedMap<K, ? extends V> m)方法时最终构建好的ConcurrentSkipListMap为例,插入一组新数据:17 => "Tom";当前ConcurrentSkipListMap中跳表的结构如下:

6.put操作前的跳表状态.png

1.5.1. 处理可比较键

doPut(K, V, boolean)方法的最开始,使用comparable(Object key)将传入的key转换成一个Comparable对象,comparable(Object key)方法源码如下:

  • /**
  • * If using comparator, return a ComparableUsingComparator, else
  • * cast key as Comparable, which may cause ClassCastException,
  • * which is propagated back to caller.
  • *
  • * 如果使用比较器,将返回一个ComparableUsingComparator
  • * 否则将key转换为Comparable对象,可能会抛出ClassCastException异常
  • */
  • private Comparable<? super K> comparable(Object key) throws ClassCastException {
  • if (key == null)
  • throw new NullPointerException();
  • if (comparator != null)
  • // 使用构造器,构造一个ComparableUsingComparator实例,并返回
  • return new ComparableUsingComparator<K>((K) key, comparator);
  • else
  • // 不使用构造器,将key转换为Comparable对象
  • return (Comparable<? super K>) key;
  • }

从源码可知,如果使用了比较器,则将key包装为一个ComparableUsingComparator对象,如果没有使用比较器,则将key强制转为Comparable对象,转换过程中可能会抛出ClassCastException异常。在我们的示例中,插入的键值对数据的类型是Integer => String,即键是Integer类型,该类型是实现了Comparable接口的。

1.5.2. 定位合适的前驱节点

在接下来的代码中,doPut(K, V, boolean)方法使用了两个无限for循环来实现主要的添加操作,其中第一个for循环用于添加失败时的重试操作,而第二个for循环则是用于对Node节点的遍历操作;与findPredecessor(Comparable<? super K>)方法相比,但后者的遍历是大范围真实的遍历,而前者的遍历只有一小部分,因为findPredecessor(Comparable<? super K>)方法已经帮其缩小了目标范围。另外从for循环中的代码逻辑可知,continue操作会结束本次内层循环并直接进行新的内层循环,而break操作会跳出内层循环并直接进行新的外层循环。

外层循环首先通过findPredecessor(key)在跳表中查找符合条件的、可以作为key的前驱的第一个节点;findPredecessor(key)方法在前面已经简单讲解过了,这里以我们的示例为例演示findPredecessor(key)的具体步骤,现在跳表中已经有两组数据:<15, "Mary"><20, "Jack">,现在要插入一条新数据<17, "Tom">,有以下的查找步骤:

  1. 首先从head这一层开始查找,最后会查找到<20, "Jack">,查找顺序如下:

7.定位合适的前驱节点第一次循环.png

由于这组数据的键比新数据的键大,因此需要下移一层重新查找。

  1. 下移一层开始查找,最后会查找到<15, "Mary">,查找顺序如下:

8.定位合适的前驱节点第二次循环.png

由于这组数据的键比新数据的键小,因此需要往后移动,重新查找。

  1. 往后移动后开始查找,最后又会查找到<20, "Jack">,查找顺序如下:

9.定位合适的前驱节点第三次循环.png

由于这组数据的键比新数据的键大,因此需要下移一层重新查找。

  1. 下移一层开始查找,最后还是会查找到<20, "Jack">,查找顺序如下:

10.定位合适的前驱节点第四次循环.png

虽然这组数据的键比新数据的键大,但此时已无法下潜(由于qdown为null),因此qnode,即数据<15, "Mary">所在的节点即是符合条件的前驱节点。

findPredecessor(Comparable<? super K>)方法的查找过程可知,其实该方法存在一定的查找效率,即跳表层级越高,需要循环的次数越多。下面贴出这四步循环查找的全图:

11.put操作——定位合适的前驱节点.png

1.5.3. 添加操作的情况分类

回到doPut(K, V, boolean)方法中,当找到了合适的前驱节点(标识为b)后,又获取了该前驱节点的后继节点(标识为n),然后在内层for循环首先对这两个节点进行了几类检查,删除一些无效的节点。在各类检查通过之后,就会根据键来寻找合适的位置了,分三类情况:

  1. 当传入的键比节点n的键要大时,就往后移动,主要表现代码如下:
  • // 比较key和n的key的大小
  • int c = key.compareTo(n.key);
  • if (c > 0) {
  • /**
  • * 要比n的key大,就往后移,跳出进行下一次循环,即往后遍历:
  • * 之前:b -> n -> f
  • * 现在:(old b) -> b(old n) -> n(old f)
  • */
  • b = n;
  • n = f;
  • continue;
  • }

注:这种情况是存在的,比如<15, "Mary"><20, "Jack">中间还存在其他的数据,例如<16, "Jerry">。我们的示例是一种简单的情况。

当一直往后移动到n被置为null时,第一类情况会在下一次内层循环转换为第三类情况。

  1. 当传入的键与节点n的键相同时,则根据onlyIfAbsent参数决定是否覆盖旧值,主要表现代码如下:
  • // 比较key和n的key的大小
  • int c = key.compareTo(n.key);
  • ...
  • if (c == 0) {
  • /**
  • * key与n的key相等,则表示键已存在,此时如果onlyIfAbsent为false,
  • * 就修改n的value为新value,如果修改成功就返回旧value,
  • * 如果onlyIfAbsent为true,则直接返回n的value,不做任何处理,
  • * 否则进行下一次尝试
  • */
  • if (onlyIfAbsent || n.casValue(v, value))
  • return (V) v;
  • else
  • // 否则跳出内层循环
  • break; // restart if lost race to replace value
  • }
  1. 当传入的键比节点n的键要小时,则表示此时bn之间就应该是新添加的键值对对应节点所在位置,需要根据添加的键值对新创建一个节点,插入到bn之间,很显然,我们的示例就是这种情况。主要表现代码如下:
  • /**
  • * 走到这里有两种情况:
  • * 1. n为null;此时b就是新节点的前驱
  • * 2. n不为null,且已经确认传入的key小于等于n的key,因此n就是新节点的后继,b就是新节点的前驱
  • */
  • // 构建一个新节点z,将n作为该节点的后继
  • Node<K, V> z = new Node<K, V>(kkey, value, n);
  • // CAS修改b的后继为新节点z,如果修改不成功就跳出内层循环
  • if (!b.casNext(n, z))
  • break; // restart if lost race to append to b

需要注意的是,第三类情况也会出现在当n节点为null的情况下,即此时找到的前驱节点b没有后继节点。

根据第三类情况,则我们添加的键值对数据所在的Node节点就应该处于bn之间,插入数据的示意图如下:

12.put操作——插入Node数据节点.png

1.5.4. 新增情况下的级别维护

在上面的第三类情况中,当插入一个新的Node节点成功后一定会执行后面插入级别的操作,即下面的代码:

  • // 随机一个级别,这个方法后面讲解
  • int level = randomLevel();
  • if (level > 0)
  • // 插入一个级别
  • insertIndex(z, level);
  • return null;

randomLevel()方法在前面已经讲解过了,用于随机生成一个级别。这里主要关注一下insertIndex(z, level)方法,它根据新创建的Node节点z和随机得到的级别向跳表中插入了对应的Index节点,方法insertIndex(Node<K, V>, int)源码如下:

  • /**
  • * Creates and adds index nodes for the given node.
  • *
  • * 根据Node节点和随机级别添加对应的Index节点
  • *
  • * @param z the node Node数据节点
  • * @param level the level of the index 随机级别
  • */
  • private void insertIndex(Node<K, V> z, int level) {
  • HeadIndex<K, V> h = head;
  • // 当前head节点的level
  • int max = h.level;
  • /**
  • * 如果插入的level小于等于max,则不用创建新层级,
  • * 只需要构建相应层级的Index列,添加到跳表中即可
  • */
  • if (level <= max) {
  • Index<K, V> idx = null;
  • /**
  • * 循环创建Index节点,
  • * 这种循环方式可以构建一列Index节点,
  • * 最终idx指向最顶层的那个Index节点,这一列Index节点的node都指向z
  • * 即假设level为2,则最终会有这样的结构:
  • * Index(idx)
  • * |
  • * Index
  • * |
  • * Node(z)
  • */
  • for (int i = 1; i <= level; ++i)
  • idx = new Index<K, V>(z, idx, null);
  • // 使用addIndex()方法将idx添加到跳表中,后面分析
  • addIndex(idx, h, level);
  • } else { // Add a new level
  • /*
  • * To reduce interference by other threads checking for
  • * empty levels in tryReduceLevel, new levels are added
  • * with initialized right pointers. Which in turn requires
  • * keeping levels in an array to access them while
  • * creating new head index nodes from the opposite
  • * direction.
  • *
  • * 走到这里,说明level比head的level还要大,
  • * 因此需要添加一个新的层级,即需要构建新的HeadIndex
  • */
  • // 指定新层级只比当前层级大1
  • level = max + 1;
  • // 将要创建的Index放在一个数组中
  • Index<K, V>[] idxs = (Index<K, V>[]) new Index[level + 1];
  • Index<K, V> idx = null;
  • // 循环创建Index节点,效果与上面的一样;并把创建的Index节点放在idxs数组中
  • for (int i = 1; i <= level; ++i)
  • idxs[i] = idx = new Index<K, V>(z, idx, null);
  • HeadIndex<K, V> oldh;
  • int k;
  • for (; ; ) {
  • // 旧的head
  • oldh = head;
  • // 旧的head的级别
  • int oldLevel = oldh.level;
  • if (level <= oldLevel) { // lost race to add level
  • /**
  • * 执行到这里,说明存在并发情况,
  • * 其他线程由于维护了级别,导致head节点的级别出现了增加,
  • * 因此直接记录level,跳出for循环,由addIndex()完成添加Index操作
  • */
  • k = level;
  • break;
  • }
  • //
  • HeadIndex<K, V> newh = oldh;
  • // 旧的HeadIndex指向的Node节点
  • Node<K, V> oldbase = oldh.node;
  • // 从旧的HeadIndex节点层级加1开始,一直到level级别,创建新的HeadIndex
  • for (int j = oldLevel + 1; j <= level; ++j)
  • /**
  • * 所有新的HeadIndex的层级为j,node都指向oldbase
  • * right指向对应层级的Index(从idxs中获取),down指向当前顶层的HeadIndex
  • * 具体效果如下:
  • * HeadIndex(level) -> Index(idxs[level])
  • * | |
  • * ...... ......
  • * | |
  • * HeadIndex(oldLevel + 1) -> Index(idxs[oldLevel + 1])
  • * | |
  • * HeadIndex(oldLevel) -> Index(idxs[level])
  • */
  • newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j);
  • // 修改head
  • if (casHead(oldh, newh)) {
  • // 如果修改成功,就记录旧的层级,并跳出for循环,由addIndex()完成添加Index操作
  • k = oldLevel;
  • break;
  • }
  • }
  • // 会将构建好的一列Index添加到跳表中
  • addIndex(idxs[k], oldh, k);
  • }
  • }

insertIndex(Node<K, V>, int)方法根据传入级别是否大于当前head节点的级别分了两种情况。

当传入级别小于等于当前head节点的级别时,不需要构造新的HeadIndex节点,只需构造好相应列的Index节点,Index列的构建方式源码注释已经讲解得很清楚了;然后通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)方法添加到跳表中即可,该方法会在后面讲解。

当传入级别大于当前head节点的级别时,则需要构建新的HeadIndex节点并添加到跳表中;在这个过程中需要注意并发情况的发生,当尝试添加HeadIndex节点过程中恰好有其他线程并发增加了HeadIndex的级别,则当前添加HeadIndex的操作可以直接结束,仅仅将Index列添加到跳表中即可。关于HeadIndex的添加源码注释解释的比较清楚,这里也不赘述。

我们主要需要关注addIndex(Index<K, V>, HeadIndex<K, V>, int)方法,它的源码如下:

  • /**
  • * Adds given index nodes from given level down to 1.
  • * 添加Index节点
  • *
  • * @param idx the topmost index node being inserted 最高层的将被插入的节点
  • * @param h the value of head to use to insert. This must be
  • * snapshotted by callers to provide correct insertion level
  • * @param indexLevel the level of the index 插入的级别
  • */
  • private void addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel) {
  • // Track next level to insert in case of retries
  • // 跟踪下一个待插入的级别,用于重试
  • int insertionLevel = indexLevel;
  • // 包装idx中node节点的key
  • Comparable<? super K> key = comparable(idx.node.key);
  • // 检查key
  • if (key == null) throw new NullPointerException();
  • // Similar to findPredecessor, but adding index nodes along path to key.
  • // 与findPredecessor()类似,但会根据遍历到相应key的路径添加Index节点
  • for (; ; ) {
  • // 头节点的级别
  • int j = h.level;
  • // 记录头节点
  • Index<K, V> q = h;
  • // 头节点的后继节点
  • Index<K, V> r = q.right;
  • // 待插入的Index节点
  • Index<K, V> t = idx;
  • for (; ; ) {
  • if (r != null) { // 头节点的后继节点不为null
  • // 取出该后继节点的Node
  • Node<K, V> n = r.node;
  • // compare before deletion check avoids needing recheck
  • // 与传入的待插入Index节点比较Node节点的key
  • int c = key.compareTo(n.key);
  • // 如果头节点的后继节点r中node的value为null,表明该节点被删除了
  • if (n.value == null) {
  • /**
  • * 调用unlink()将其从链中移除
  • * 移除之前:q -> r -> r.right
  • * 移除之后:q -> r.right
  • */
  • if (!q.unlink(r))
  • // 移除不成功就跳出内层循环重试
  • break;
  • // 此时表示移除r成功,将q新的后继节点赋值给r,进行下一次内层循环
  • r = q.right;
  • continue;
  • }
  • // 走到这里表示r不是被删除的节点
  • if (c > 0) {
  • // 如果传入的节点idx的key比r所代表节点的key大,则往后移,并继续查找
  • q = r;
  • r = r.right;
  • continue;
  • }
  • }
  • /**
  • * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
  • * 即已经找到了Index节点正确的待插入位置
  • * 需要判断当前待插入Index节点的级别与q与r的级别是否相同,
  • * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
  • * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
  • */
  • if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
  • // Don't insert index if node already deleted
  • // 检查待插入的Index节点的Node节点是否是被删除的节点
  • if (t.indexesDeletedNode()) {
  • // 待插入的Index节点中的Node节点是一个被删除的节点
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key); // cleans up
  • // 返回,结束这次Index节点的插入
  • return;
  • }
  • /**
  • * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
  • * 插入前:q -> r -> r.right
  • * 插入后:q -> t -> r -> r.right
  • */
  • if (!q.link(r, t))
  • // 插入不成功,跳出内层循环重试
  • break; // restart
  • // 插入成功,insertionLevel自减后为0时,就可以返回了
  • if (--insertionLevel == 0) {
  • // need final deletion check before return
  • // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
  • if (t.indexesDeletedNode())
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key);
  • // 返回
  • return;
  • }
  • }
  • /**
  • * 走到这里,有两种情况:
  • * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
  • * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
  • */
  • if (--j >= insertionLevel && j < indexLevel)
  • // 下潜到待插入Index列的下一层级
  • t = t.down;
  • // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
  • q = q.down;
  • r = q.right;
  • }
  • }
  • }

insertIndex(Node<K, V>, int)方法虽然完成了Index列的构建,但真正的插入工作还是交给了addIndex(Index<K, V>, HeadIndex<K, V>, int)方法。addIndex(Index<K, V>, HeadIndex<K, V>, int)会根据传入的Index节点idxindexLevel(注意,此时如果indexLevel大于1,表示idx其实是一个Index列的顶层Index节点),逐级维护跳表与待添加的Index节点间的层级关系。

要维护层级关系,首先需要在跳表中根据Index节点对应的Node节点的键(标识为key),从传入的HeadIndex节点h开始遍历查找符合条件的前驱Index节点;这种查找分为两种情况:

  • 当查找到的Index节点对应的Node节点的键小于key,则需要继续后移查找,表现代码如下:
  • Comparable<? super K> key = comparable(idx.node.key);
  • Index<K, V> q = h;
  • // 头节点的后继节点
  • Index<K, V> r = q.right;
  • // 取出该后继节点的Node
  • Node<K, V> n = r.node;
  • // compare before deletion check avoids needing recheck
  • // 与传入的待插入Index节点比较Node节点的key
  • int c = key.compareTo(n.key);
  • if (c > 0) {
  • // 如果传入的节点idx的key比r所代表节点的key大,则往后移,并继续查找
  • q = r;
  • r = r.right;
  • continue;
  • }
  • 当查找到的Index节点对应的Node节点的键大于等于key,则标识找到了Index节点正确的待插入位置,代码中表示为qr之间,此时需要将待插入的Index节点插到qr两个节点之间,表现代码如下:
  • /**
  • * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
  • * 即已经找到了Index节点正确的待插入位置
  • * 需要判断当前待插入Index节点的级别与q与r所在的层级是否相同,
  • * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
  • * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
  • */
  • if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
  • // Don't insert index if node already deleted
  • // 检查待插入的Index节点的Node节点是否是被删除的节点
  • if (t.indexesDeletedNode()) {
  • // 待插入的Index节点中的Node节点是一个被删除的节点
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key); // cleans up
  • // 返回,结束这次Index节点的插入
  • return;
  • }
  • /**
  • * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
  • * 插入前:q -> r -> r.right
  • * 插入后:q -> t -> r -> r.right
  • */
  • if (!q.link(r, t))
  • // 插入不成功,跳出内层循环重试
  • break; // restart
  • // 插入成功,insertionLevel自减后为0时,就可以返回了
  • if (--insertionLevel == 0) {
  • // need final deletion check before return
  • // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
  • if (t.indexesDeletedNode())
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key);
  • // 返回
  • return;
  • }
  • }

注意,这里在插入之前其实做了一次j == insertionLevel的判断,以保证当前插入的Index的层级与qr两个节点所在的层级相同的情况下才插入。大家可以回顾跳表的结构,跳表中所有的索引列其实是保持逻辑上底部对齐的,因此在插入Index时一定要保证插入过程完成后,Index列每一层的Index都应该处在正确的层级上。

j == insertionLevel条件不满足时,表示当前插入的Index的层级与qr两个节点所在的层级不同,此时应该下潜一级,然后重新进行前驱节点的查找,表现代码如下:

  • /**
  • * 走到这里,有两种情况:
  • * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
  • * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
  • */
  • if (--j >= insertionLevel && j < indexLevel)
  • // 下潜到待插入Index列的下一层级
  • t = t.down;
  • // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
  • q = q.down;
  • r = q.right;

不过我们需要注意,这部分代码不仅仅作用于上面这种情况。在上面j == insertionLevel满足的情况下,如果Index节点插到qr两个节点之间的工作顺利完成,还会执行如下代码:

  • // 插入成功,insertionLevel自减后为0时,就可以返回了
  • if (--insertionLevel == 0) {
  • // need final deletion check before return
  • // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
  • if (t.indexesDeletedNode())
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key);
  • // 返回
  • return;
  • }

这段代码的意思是,insertionLevel自减后如果为0,则表示待插入的Index列上所有的Index节点都已经连接到跳表中了,因此可以直接return返回结束了;但如果insertionLevel自减后如果不为0,则表示待插入的Index列上还有Index节点没有维护,因此也会执行下潜一层的操作,处理剩下的Index节点。注意下面的代码:

  • if (--j >= insertionLevel && j < indexLevel)
  • // 下潜到待插入Index列的下一层级
  • t = t.down;

这里的操作中,t是引用的传入的idx,表示最开始传入的idx是指向了一个Index列的顶层节点,t = t.down即是在这个Index列中进行下潜。

这部分对addIndex(Index<K, V>, HeadIndex<K, V>, int)方法的讲解非常抽象,比较难以理解,这里我们以示例来演示。在前面的操作中,虽然我们已经将数据<17, "Tom">添加到了跳表中,但并没有维护层级关系。此时head头节点的level是3,假设随机得到的级别是2,根据insertIndex(Node<K, V> z, int level)方法,将会构建2层的Index列(所有的Index节点的node都指向新添加的Node数据节点),示意图如下:

13.put操作——Index列的情况.png

此时Index列的所有Index节点只是与新添加的Node节点有关联关系,并没有与跳表中的索引结构进行关联。

1.5.4.1 查找符合条件的前驱Index节点

通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)可以将Index列添加到跳表的索引结构中。在添加之前,首先需要定位合适的添加位置,即遍历查找符合条件的前驱Index节点,查找流程图如下:

14.put操作——维护层级关系的(待插入位置查找顺序).png

从查找流程可知,在第一次查找中,虽然最后得到的Node节点的键比新添加的Node节点的键(通过传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)方法的idx.node.key获取)大,但由于此时q指向的节点的层级大于Index列的总层级(即传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)方法的indexLevel参数),因此并不满足条件,需要q节点进行下潜后再次查找。

而在第二次查找中,最后得到的Node节点的键比新添加的Node节点的键小,因此需要q节点向后移动,此时r节点也会在q节点移动后被更新,然后再次进行查找。

第三次查找中,最后得到的Node节点与第一次查找时相同,但是此时q节点的层级是等于Index列的总层级的,因此符合插入条件,即此时q节点就是符合条件的前驱Index节点。

1.5.4.2 插入Index列

找到了符合条件的前驱Index节点,就可以进行Index列的插入工作了;需要注意的是,传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)方法的idx虽然只是一个Index对象,但从insertIndex(Node<K, V> z, int level)方法可以看出,当level大于1时,最后构成的idx对象其实是一个Index列中最高层的Index对象,它们从上到下依次通过down指针进行连接,这个注意点已经强调过很多次了;回顾代码:

  • private void insertIndex(Node<K, V> z, int level) {
  • HeadIndex<K, V> h = head;
  • // 当前head节点的level
  • int max = h.level;
  • /**
  • * 如果插入的level小于等于max,则不用创建新层级,
  • * 只需要构建相应层级的Index列,添加到跳表中即可
  • */
  • if (level <= max) {
  • Index<K, V> idx = null;
  • /**
  • * 循环创建Index节点,
  • * 这种循环方式可以构建一列Index节点,
  • * 最终idx指向最顶层的那个Index节点,这一列Index节点的node都指向z
  • * 即假设level为2,则最终会有这样的结构:
  • * Index(idx)
  • * |
  • * Index
  • * |
  • * Node(z)
  • */
  • for (int i = 1; i <= level; ++i)
  • idx = new Index<K, V>(z, idx, null);
  • // 使用addIndex()方法将idx添加到跳表中,后面分析
  • addIndex(idx, h, level);
  • } else { // Add a new level
  • /*
  • *
  • * 走到这里,说明level比head的level还要大,
  • * 因此需要添加一个新的层级,即需要构建新的HeadIndex
  • */
  • // 指定新层级只比当前层级大1
  • level = max + 1;
  • // 将要创建的Index放在一个数组中
  • Index<K, V>[] idxs = (Index<K, V>[]) new Index[level + 1];
  • Index<K, V> idx = null;
  • // 循环创建Index节点,效果与上面的一样;并把创建的Index节点放在idxs数组中
  • for (int i = 1; i <= level; ++i)
  • idxs[i] = idx = new Index<K, V>(z, idx, null);
  • ...
  • // 会将构建好的一列Index添加到跳表中
  • addIndex(idxs[k], oldh, k);
  • }
  • }

因此,插入Index列其实涉及整个列中所有Index节点与跳表索引结构之间的维护工作;下面是本示例中Index列插入过程的示意图:

15.put操作——维护层级关系的(插入操作).png

在本例中Index列只有两层,因此插入Index列涉及两次连接操作。在插入上层Index节点时,会根据上面查找到的符合条件的前驱Index节点q以及q的后继节点r,把上层Index节点插在qr之间,这一步是通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)方法下面的片段实现的:

  • /**
  • * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
  • * 即已经找到了Index节点正确的待插入位置
  • * 需要判断当前待插入Index节点的级别与q与r的级别是否相同,
  • * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
  • * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
  • */
  • if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
  • // Don't insert index if node already deleted
  • // 检查待插入的Index节点的Node节点是否是被删除的节点
  • if (t.indexesDeletedNode()) {
  • // 待插入的Index节点中的Node节点是一个被删除的节点
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key); // cleans up
  • // 返回,结束这次Index节点的插入
  • return;
  • }
  • /**
  • * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
  • * 插入前:q -> r -> r.right
  • * 插入后:q -> t -> r -> r.right
  • */
  • if (!q.link(r, t))
  • // 插入不成功,跳出内层循环重试
  • break; // restart
  • // 插入成功,insertionLevel自减后为0时,就可以返回了
  • if (--insertionLevel == 0) {
  • // need final deletion check before return
  • // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
  • if (t.indexesDeletedNode())
  • // findNode(key)方法的代码会移除已标记为删除的节点
  • findNode(key);
  • // 返回
  • return;
  • }
  • }

q.link(r, t)就是最终的连接操作;插入成功后,会对insertionLevel进行自减,这一步操作相当于对已插入的层级进行计数,表示已经有一层插入成功了。如果自减后的insertionLevel为0,表示所有层级的Index节点都插入完成,因此可以return返回结束。

当自减后的insertionLevel不为0,则会继续往下执行下面的代码:

  • /**
  • * 走到这里,有两种情况:
  • * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
  • * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
  • */
  • if (--j >= insertionLevel && j < indexLevel)
  • // 下潜到待插入Index列的下一层级
  • t = t.down;
  • // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
  • q = q.down;
  • r = q.right;

这部分代码使t指针下潜了,同时q指针也下潜了,然后在下一次内层循环中,就可以在新的qr之间插入Index列中下层的Index节点了;下层的Index节点插入完成后insertionLevel再次自减就会变为0,表示所有层级的Index节点都插入完成,直接return返回结束插入工作。到此,添加键值对的操作也就全部宣告完成了。

从上面的分析可知,doPut(K kkey, V value, boolean onlyIfAbsent)方法的实现的确非常复杂,其实主要复杂的地方并不是新键值对的添加,而是对整个跳表结构的维护。这部分的解析大家如果看不明白也没关系,只需要明白以下的两个注意点即可:

  1. 在ConcurrentSkipListMap添加键值对时,会首先找到符合条件的前驱Node节点,然后将键值对包装为Node节点添加到这个前驱Node节点的后面。
  2. 每次添加之后会随机得到一个级别,如果级别大于0,就会构建相应数量的Index节点构成Index列,插入到跳表中。

能够读懂doPut(K, V, boolean)方法的实现,后面的各种操作就非常简单了。doPut(K, V, boolean)方法是所有操作方法里最复杂的一个。