Java多线程 33 - ConcurrentSkipListMap详解(2)
发布于 / 2018-08-30
简介:ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。
1. ConcurrentSkipListMap源码解析(一)
在接下来的内容中将以ConcurrentSkipListMap源码为基础详细解析其各类操作的实现,并从构造、增加、删除、修改、查询等过程演示源码的工作流程和原理。
1.1. ConcurrentSkipListMap的构造方法
ConcurrentSkipListMap有4个构造方法,声明如下:
- // 构造一个新的空映射,该映射按照键的自然顺序进行排序
- ConcurrentSkipListMap()
- // 构造一个新的空映射,该映射按照指定的比较器进行排序
- ConcurrentSkipListMap(Comparator<? super K> comparator)
- // 构造一个新映射,该映射所包含的映射关系与给定映射包含的映射关系相同,并按照键的自然顺序进行排序
- ConcurrentSkipListMap(Map<? extends K, ? extends V> m)
- // 构造一个新映射,该映射所包含的映射关系与指定的有序映射包含的映射关系相同,使用的顺序也相同
- ConcurrentSkipListMap(SortedMap<K, ? extends V> m)
由于ConcurrentSkipListMap是有序Map,因此在构造时可以指定比较器,如果没有指定比较器将使用默认的比较方法对键进行排序。除了默认的构造器外,可以传入一个Comparator参数用于指定构造器。另外ConcurrentSkipListMap也提供了从已有的Map构造,与ConcurrentHashMap不同的是,这里可以传入SortedMap对象,ConcurrentSkipListMap会直接沿用传入的SortedMap对象的比较器作为键比较器。这4个构造方法的源码很类似:
- /**
- * Constructs a new, empty map, sorted according to the
- * {@linkplain Comparable natural ordering} of the keys.
- * 构造一个新的ConcurrentSkipListMap实例
- */
- public ConcurrentSkipListMap() {
- this.comparator = null;
- // 初始化方法
- initialize();
- }
- /**
- * Constructs a new, empty map, sorted according to the specified
- * comparator.
- * <p>
- * 指定比较器,构造一个新的ConcurrentSkipListMap实例
- *
- * @param comparator the comparator that will be used to order this map.
- * If <tt>null</tt>, the {@linkplain Comparable natural
- * ordering} of the keys will be used.
- */
- public ConcurrentSkipListMap(Comparator<? super K> comparator) {
- this.comparator = comparator;
- // 初始化方法
- initialize();
- }
- /**
- * Constructs a new map containing the same mappings as the given map,
- * sorted according to the {@linkplain Comparable natural ordering} of
- * the keys.
- * <p>
- * 构造一个新的ConcurrentSkipListMap实例,并将传入的Map中的元素添加到实例中
- *
- * @param m the map whose mappings are to be placed in this map
- * @throws ClassCastException if the keys in <tt>m</tt> are not
- * {@link Comparable}, or are not mutually comparable
- * @throws NullPointerException if the specified map or any of its keys
- * or values are null
- */
- public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
- this.comparator = null;
- initialize();
- // 调用putAll()方法添加元素
- putAll(m);
- }
- /**
- * Constructs a new map containing the same mappings and using the
- * same ordering as the specified sorted map.
- * <p>
- * 构造一个新的ConcurrentSkipListMap实例,并将传入的SortedMap中的元素添加到实例中
- *
- * @param m the sorted map whose mappings are to be placed in this
- * map, and whose comparator is to be used to sort this map
- * @throws NullPointerException if the specified sorted map or any of
- * its keys or values are null
- */
- public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
- // 将传入Map的比较器作为ConcurrentSkipListMap的比较器
- this.comparator = m.comparator();
- initialize();
- // 调用buildFromSorted()方法添加元素
- buildFromSorted(m);
- }
可以发现,所有的构造方法都调用了initialize()
方法进行初始化,该方法的源码如下:
- final void initialize() {
- keySet = null;
- entrySet = null;
- values = null;
- descendingMap = null;
- // 最小为4
- randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
- head = new HeadIndex<K, V>(new Node<K, V>(null, BASE_HEADER, null), null, null, 1);
- }
initialize()
方法重置了一些与ConcurrentSkipListMap息息相关的成员变量,同时生成了randomSeed
,通过位运算保证了randomSeed
最小为4。同时还初始化了head
字段的指向,该字段是一个HeadIndex节点,level
级别为1,right
和down
都为null,同时其node
字段指向了BASE_HEADER
对象,这个对象在前面讲到过,就是一个简单的Object对象,仅用作标记使用。
以Map为参数的构造方法中,ConcurrentSkipListMap(Map<? extends K, ? extends V> m)
使用了putAll(m)
将传入的Map中的元素添加到新创建的ConcurrentSkipListMap中,putAll(Map<? extends K, ? extends V>)
方法继承自AbstractMap类,源码如下:
- public void putAll(Map<? extends K, ? extends V> m) {
- for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
- put(e.getKey(), e.getValue());
- }
可以看出,它内部调用了put(K, V)
,具体细节在后面会讲解。
我们主要关注一下ConcurrentSkipListMap(SortedMap<K, ? extends V> m)
方法内部调用的buildFromSorted(m)
,源码如下:
- /**
- * Streamlined bulk insertion to initialize from elements of
- * given sorted map. Call only from constructor or clone
- * method.
- *
- * 批量将有序集合中依次添加到新创建的ConcurrentSkipListMap中
- * 该方法只会在构造方法和clone()方法中被调用
- */
- private void buildFromSorted(SortedMap<K, ? extends V> map) {
- // 参数检查
- if (map == null)
- throw new NullPointerException();
- // 头节点
- HeadIndex<K, V> h = head;
- // 头节点的node
- Node<K, V> basepred = h.node;
- // Track the current rightmost node at each level. Uses an
- // ArrayList to avoid committing to initial or maximum level.
- // 跟踪当前层最右边的节点
- ArrayList<Index<K, V>> preds = new ArrayList<Index<K, V>>();
- // initialize
- /**
- * 这一步的操作大致效果是这样的,假设当前跳表的结构如下:
- * head -> n1 -> n2
- * |
- * index1 -> n3 -> n4 -> n5
- * |
- * index2 -> n6 -> n7
- * 则会将head、index1、index2添加到preds中,顺序如下:
- * (index2, index1, head)
- */
- // 根据头节点层级向preds添加null占位
- for (int i = 0; i <= h.level; ++i)
- preds.add(null);
- // 反向往preds中添加头节点这一列的节点
- Index<K, V> q = h;
- for (int i = h.level; i > 0; --i) {
- preds.set(i, q);
- q = q.down;
- }
- // 迭代传入的map,将元素添加到新创建的ConcurrentSkipListMap中
- Iterator<? extends Map.Entry<? extends K, ? extends V>> it = map.entrySet().iterator();
- while (it.hasNext()) {
- // 取出元素
- Map.Entry<? extends K, ? extends V> e = it.next();
- // 随机一个级别j
- int j = randomLevel();
- // j与head级别进行比较,如果大于,就置j为只比head级别大1
- if (j > h.level) j = h.level + 1;
- // 取出并检查键值对
- K k = e.getKey();
- V v = e.getValue();
- if (k == null || v == null)
- throw new NullPointerException();
- // 根据键值对构造一个节点,后继节点为null
- Node<K, V> z = new Node<K, V>(k, v, null);
- // 将z作为head的node的后继节点
- basepred.next = z;
- // basepred后移,指向新添加的节点z
- basepred = z;
- if (j > 0) {
- Index<K, V> idx = null;
- // 从1遍历到j
- for (int i = 1; i <= j; ++i) {
- // 构建一个新的Index节点,其node指向上面新创建的节点z,后继节点为null
- idx = new Index<K, V>(z, idx, null);
- if (i > h.level)
- /**
- * 如果i比head节点的层级大,应该新构建一个head节点,补在idx前面
- * 这一步的操作大致是这样的,假设当前跳表的结构如下:
- * head(1)
- * 此时head级别为1,此时插入一个级别为2的Index,级别比head大,因此需要新构建一个head,级别也为2
- * 修改后结构如下:
- * head(new, 2) -> idx(2)
- * |
- * head(old, 1)
- */
- h = new HeadIndex<K, V>(h.node, h, idx, i);
- // 更新preds数组
- if (i < preds.size()) {
- preds.get(i).right = idx;
- preds.set(i, idx);
- } else
- preds.add(idx);
- }
- }
- }
- // 更新head
- head = h;
- }
buildFromSorted(SortedMap<K, ? extends V>)
方法接收一个Map参数,内部会通过迭代该Map将其所有元素添加到ConcurrentSkipListMap中;在添加过程中,通过randomLevel()
方法随机获取一个级别,然后根据该级别调整跳表的结构。randomLevel()
方法的源码如下:
- /**
- * Returns a random level for inserting a new node.
- * Hardwired to k=1, p=0.5, max 31 (see above and
- * Pugh's "Skip List Cookbook", sec 3.4).
- * <p>
- * This uses the simplest of the generators described in George
- * Marsaglia's "Xorshift RNGs" paper. This is not a high-quality
- * generator but is acceptable here.
- *
- * 为即将插入的新的Index节点生成一个随机级别,
- * 这里使用的算法是Pugh's的论文《Skip List Cookbook》中3.4节提到的内容,level最大为31。
- * 这使用了George Marsaglia的《Xorshift RNGs》论文中描述的最简单的生成器。
- * 这个生成器并不完美,但在这里是可以接受的。
- */
- private int randomLevel() {
- int x = randomSeed;
- x ^= x << 13;
- x ^= x >>> 17;
- randomSeed = x ^= x << 5;
- if ((x & 0x80000001) != 0) // test highest and lowest bits
- return 0;
- int level = 1;
- while (((x >>>= 1) & 1) != 0) ++level;
- return level;
- }
关于级别的生成方法使用的算法我们不做讨论,这里只需要知道生成的级别最大为31。
这里我们模拟一下构造方法的过程,就以ConcurrentSkipListMap(SortedMap<K, ? extends V> m)
方法为例,假设传入的SortedMap中有以下键值对:
- <15, "Mary">
- <20, "Jack">
在构造ConcurrentSkipListMap的过程中,记录了传入的SortedMap的比较器后,会先调用initialize()
方法构建一个HeadIndex节点,HeadIndex节点的node
变量指向了BASE_HEADER
,而成员变量head
指向了该HeadIndex节点;示意图如下:
接下来会调用buildFromSorted(SortedMap<K, ? extends V>)
方法将传入的SortedMap中的键值对添加到跳表中,我们重点分析该方法。buildFromSorted(SortedMap<K, ? extends V>)
方法内部使用列表preds
来记录添加键值对过程中,最后产生的一列Index节点,用于维护跳表结构,在起始时刻preds
按照从下往上的顺序存储了head
这一列的Index节点,在本例中即只有head
节点。示意图如下:
接下来会迭代遍历传入的SortedMap的键值对,依次进行添加。在每次添加时,会随机生成一个级别,添加过程中会根据这个级别维护跳表结构。SortedMap的每一个键值对都会被构造为一个Node节点;以我们的示例数据为例,第一组键值对数据所对应的Node节点会作为head
节点所包含的Node节点(即BASE_HEADER
)的后继节点。
成功将新的Node节点添加到跳表中后,会根据随机生成的级别来维护跳表的级别结构。这里我们假设随机级别为2,大于当前head
节点的级别1,且是符合条件的;因此buildFromSorted(SortedMap<K, ? extends V>)
方法会创建两个Index节点,第二个Index节点的down
指向第一个Index节点,且两个Index节点的node
都指向上面新创建的Node节点。根据代码逻辑,还会创建一个level
为2、down
域指向head
节点的HeadIndex节点;此时跳表中存在两个HeadIndex节点,需要将这两个HeadIndex节点的right
域分别于两个Index节点相连接,并更新head
成员变量,使其指向新创建的level
为2的HeadIndex节点。最后会将新添加的两个Index节点记录在preds
列表中。示意图如下:
添加第二组数据的流程与上面的类似,不过这里假设随机生成的级别为3,因此也需要维护跳表的级别;添加第二组数据的示意图如下:
从上面的分析过程和示意图可知,在buildFromSorted(SortedMap<K, ? extends V>)
方法内向跳表添加数据的过程有以下要点:
- ConcurrentSkipListMap跳表结构中,第一列的索引节点都为HeadIndex节点,从上往下进行衔接;除此之外其他列的索引节点都为Index节点。
- 所有装载键值对的Node数据节点位于跳表结构的最底层,即每一列节点的最下面的节点为Node数据节点。
- 每一列的Index节点的
node
变量都引用着这一列最下面的Node数据节点。第一列所有的HeadIndex节点的node
变量引用的Node节点的值是BASE_HEADER
节点,该Node代表基础头节点,并不装载用户有效数据。 - 每层的节点(无论是Node数据节点,还是HeadIndex和Index索引节点)都构成了一个单向链表结构。
其实ConcurrentSkipListMap内部的跳表结构就是典型的跳表结构,根据以上的数据添加过程我们可以理清HeadIndex、Index、Node这三类节点在跳表中充当的角色,这也便于我们后面理解ConcurrentSkipListMap的各类操作。下面是buildFromSorted(SortedMap<K, ? extends V>)
方法的完整过程示意图:
1.2. 查找辅助方法
在介绍ConcurrentSkipListMap的各项主要操作之前,我们先关注一个大类别的方法:查找辅助方法。由于ConcurrentSkipListMap中跳表结构的复杂性,ConcurrentSkipListMap提供了大量的查找辅助方法,声明如下:
- // 查找第一个数据节点
- Node<K, V> findFirst()
- // 查找最后一个数据节点
- Node<K, V> findLast()
- // 根据key查找符合条件的前驱节点
- private Node<K, V> findPredecessor(Comparable<? super K> key)
- // 具体用处请参考该方法的完整注释的讲解
- private Node<K, V> findPredecessorOfLast()
- // 根据key查找数据节点
- private Node<K, V> findNode(Comparable<? super K> key)
- // 根据指定操作查找附近的数据节点
- Node<K, V> findNear(K kkey, int rel)
这些方法都有很强大的用处,在这里我们先对这几个方法进行简单地介绍,大家如果对这部分的源码有疑问也没关系,只需要在脑海中有个印象即可,在后面的示例操作中会对这些方法做详细演示和解析。
findFirst()
方法
首先关注findFirst()
方法,从方法声明就可以得知,它是用于查找第一个第一个有效的数据节点,源码如下:
- /**
- * Specialized variant of findNode to get first valid node.
- * 找到第一个有效的节点
- *
- * @return first node or null if empty
- */
- Node<K, V> findFirst() {
- // 无限循环
- for (; ; ) {
- // head头索引的Node
- Node<K, V> b = head.node;
- // 头节点的后继节点n
- Node<K, V> n = b.next;
- if (n == null)
- // n为null则返回null
- return null;
- if (n.value != null)
- // n的value不为null则返回该后继节点
- return n;
- // n的value为null,删除n节点
- n.helpDelete(b, n.next);
- }
- }
从findFirst()
可以得知,第一个有效的数据节点就是基础头节点(即head
指向的、值为BASE_HEADER
的那个节点)的有效的后继节点。由于主体代码在一个无限for循环中,因此如果刚好遇到BASE_HEADER
节点的后继节点n
为被删除的节点,会使用n.helpDelete(b, n.next)
协助删除n
节点,然后进行新的循环再次查找。
findLast()
方法
findLast()
方法用于查找最末尾的有效的数据节点,它的源码如下:
- /**
- * Specialized version of find to get last valid node.
- *
- * 查找最后一个有效的数据节点
- *
- * @return last node or null if empty
- */
- Node<K, V> findLast() {
- /*
- * findPredecessor can't be used to traverse index level
- * because this doesn't use comparisons. So traversals of
- * both levels are folded together.
- */
- // 头节点
- Index<K, V> q = head;
- // 无限循环
- for (; ; ) {
- Index<K, V> d, r;
- // 头节点的right节点
- if ((r = q.right) != null) {
- // 如果r为被删除的节点,将其从索引链中断开
- if (r.indexesDeletedNode()) {
- q.unlink(r);
- // 重新赋值q,然后进行下一次循环
- q = head; // restart
- } else
- // 否则q进行右移操作,即指向r,然后进行下一次循环
- q = r;
- } else if ((d = q.down) != null) { // 如果q的right节点为null,则尝试下潜
- // 走到这里表示q的down节点不为null,下潜成功,将进行下一次循环
- q = d;
- } else {
- // 走到这里,表示q的right点为null,down点也为null,即q无法右移也无法下潜
- // 获取q指向的Node节点b
- Node<K, V> b = q.node;
- // 获取b的后继
- Node<K, V> n = b.next;
- for (; ; ) {
- if (n == null)
- /**
- * n为null的话,表示b已经是Node链中最后一个节点
- * 如果此时b不是BASE_HEADER,那么b就是要找的最后一个节点
- */
- return b.isBaseHeader() ? null : b;
- // 走到这里表示n不为null,获取n的后继为f
- Node<K, V> f = n.next; // inconsistent read
- // 检查n是否改变,如果改变的话跳出本次循环,重新进行外层循环重试
- if (n != b.next)
- break;
- /**
- * 检查n是否是被删除的节点
- * 如果是,调用helpDelete将其从Node链中断开,
- * 然后跳出本次循环,重新进行外层循环重试
- */
- Object v = n.value;
- if (v == null) { // n is deleted
- n.helpDelete(b, f);
- break;
- }
- /**
- * 走到这里说明n是有效的
- * 这里用于判断b是否是被删除的节点
- * 当v(n.value)== n时,表示n是删除标记节点
- * 此时如果b的value也为null,则b也是被删除的节点
- * 如果是,就跳出本次循环,重新进行外层循环重试
- */
- if (v == n || b.value == null) // b is deleted
- break;
- // 走到这里说明b和n都有效,后移,再次进行循环判断
- b = n;
- n = f;
- }
- q = head; // restart
- }
- }
- }
源码比较复杂,这里将仔细梳理。findLast()
方法内有两个无限for循环,第一个无限for循环中,首先引用head
节点为q
,然后有if…else if…else的三个条件分支,其实这三个分支各有用处;其中第一个if分支中的代码用于在跳表结构中对q
进行右移操作,第二个else if分支中的代码用于在跳表结构中对q
进行下潜操作,而第三个else分支中的代码则最为重要,当代码走到这个分支,说明q
节点已经无法右移且无法下潜了,此时在该分支里的无限for循环中将从q
指向的Node节点开始往右遍历,并进行各类检查,直到遍历到某个节点的后继为null时,即该节点已经没有后继节点了,判断该节点是否是BASE_HEADER,如果不是,该节点就是我们要找的最后一个有效的数据节点。
findPredecessor(Comparable<? super K>)
方法
由于ConcurrentSkipListMap中的键是具有可比较特性的(通过比较器或者key
实现了Comparable接口),因此所有的键值对都是键有序的。findPredecessor(Comparable<? super K> key)
方法用于寻找给定键所对应的符合条件的前驱节点,这里的符合条件是指有效且未被标记为删除,该方法的源码如下:
- /**
- * Returns a base-level node with key strictly less than given key,
- * or the base-level header if there is no such node. Also
- * unlinks indexes to deleted nodes found along the way. Callers
- * rely on this side-effect of clearing indices to deleted nodes.
- *
- * 根据传入的key,查找符合条件的前驱节点
- *
- * @param key the key
- * @return a predecessor of key
- */
- private Node<K, V> findPredecessor(Comparable<? super K> key) {
- if (key == null)
- throw new NullPointerException(); // don't postpone errors
- for (; ; ) {
- // 头节点
- Index<K, V> q = head;
- Index<K, V> r = q.right;
- for (; ; ) {
- if (r != null) { // 头节点的右Index节点不为null
- // 查看当前Index包装的Node节点
- Node<K, V> n = r.node;
- K k = n.key;
- // 这一步if会尝试将失效的Index节点移除
- if (n.value == null) {
- /**
- * value为空,表示这个Node节点已经被删除了
- * 就调用unlink()将其从链表中移除,unlink()操作会将q的右节点换成r的右节点
- * 移除成功会跳出内层循环;否则继续往下执行
- */
- if (!q.unlink(r))
- // 跳出内层循环
- break; // restart
- // 走到这里说明unlink()操作失败,这一步会将r指向q新的右节点
- r = q.right; // reread r
- // 继续
- continue;
- }
- /**
- * 走到这里说明value不为null,对比key
- * 当传入的key比遍历到的节点的k大,就继续往后移
- * 即将q指向r,将r指向r的右节点(即后继)
- * 然后继续下一个循环
- */
- if (key.compareTo(k) > 0) {
- q = r;
- r = r.right;
- continue;
- }
- }
- /**
- * 走到这里有两种情况:
- * 1. r为null;此时表示head节点没有右节点,所以需要查找下一层
- * 2. r不为null,但key小于或等于k
- * 注:r为q的后继节点
- */
- // 获取head的下节点
- Index<K, V> d = q.down;
- if (d != null) {
- /**
- * 下节点不为null,将q指向该下节点,将r指向下节点的右节点
- * 这个操作相当于往下移了一层,因为当前层已经没有可找的了
- */
- q = d;
- r = d.right;
- } else
- /**
- * 走到这里,说明d为null,即只有一层,因此直接将当前q的Node返回
- * 此时应该有两种情况:
- * 1. q的右节点r(即后继节点)不为null时,但r的key比传入的key大;
- * 2. q的右节点r为null;
- * 这两种情况下,q的Node就应该是要找的前驱节点
- */
- return q.node;
- }
- }
- }
上面源码中的注释已经把findPredecessor(Comparable<? super K>)
方法的原理讲解地比较清楚了,这里就补充几个注意点:
findPredecessor(Comparable<? super K>)
方法由于需要遍历跳表数据,使用了两层for循环,其中第一个for循环用于查找失败时的重试操作,而第二个for循环则是用于主要的各类逻辑判断和移位操作。findPredecessor(Comparable<? super K>)
方法利用跳表的特性,从head
节点开始,从上层往下层,从左往右进行查找。- 在
findPredecessor(Comparable<? super K>)
方法遍历的过程中,它会协助将失效的Index节点移除掉,当一个Index节点指向的Node节点的值为null,该Index节点就被视为无效,这一功能非常重要。 - 从遍历查找的过程可知,ConcurrentSkipListMap中跳表数据的顺序是从小到大进行排列的。
findPredecessorOfLast()
方法
findPredecessorOfLast()
方法的字面意思是“查找最后一个数据节点的前驱节点”,但其实从该方法的实现和对该方法的使用严格来讲,它返回的节点并不是最后一个数据节点的前驱。它会遍历到跳表底层靠后的位置,然后返回特定的Index对应的Node节点;这里的“特定”也存在两种情况。我们先观察它的源码:
- /**
- * Specialized variant of findPredecessor to get predecessor of last
- * valid node. Needed when removing the last entry. It is possible
- * that all successors of returned node will have been deleted upon
- * return, in which case this method can be retried.
- *
- * 这个方法的字面意思是查找最后一个Node节点的前驱节点
- * 其实分析这个方法的内容以及对它的使用,可以发现它并不是返回最后一个Node节点的前驱节点
- * 它会遍历到跳表底层靠后的位置,然后返回特定的Index对应的Node节点
- * 这时这个Node节点可能还有后继节点
- *
- * @return likely predecessor of last node
- */
- private Node<K, V> findPredecessorOfLast() {
- // 无限循环
- for (; ; ) {
- // 获取head节点
- Index<K, V> q = head;
- // 无限循环
- for (; ; ) {
- Index<K, V> d, r;
- // 头节点的right节点
- if ((r = q.right) != null) {
- // 如果r为被删除的节点,将其从索引链中断开
- if (r.indexesDeletedNode()) {
- // 重新赋值q,然后进行下一次循环
- q.unlink(r);
- break; // must restart
- }
- // proceed as far across as possible without overshooting
- /**
- * 否则查看r对应的Node节点的后继是否为null
- * 如果不为null,则将q往后移动
- */
- if (r.node.next != null) {
- // q往后移动
- q = r;
- continue;
- }
- }
- /**
- * 走到这里有两种情况:
- * 1. q的right节点r为null
- * 2. q的right节点r不为null,但r对应的Node节点的后继为null
- * 尝试下潜操作
- */
- if ((d = q.down) != null)
- // 走到这里表示q的down节点不为null,下潜成功,将进行下一次循环
- q = d;
- /**
- * 走到这里表示q节点的down节点也为null,下潜失败
- */
- else
- return q.node;
- }
- }
- }
从源码可知,该方法最后确实遍历到了跳表底层靠后的位置,最后返回的Node节点q.node
并不是Node链中最后一个节点的前驱,q.node
节点之后还可能存在1个以上的Node节点,这些Node节点并没有被跳表的索引结构所引用。
大家对findPredecessorOfLast()
方法如果有疑问也不用深究,该方法仅仅会在后面讲解的doRemoveLastEntry()
方法中用到,到时候讲解这个方法的时候大家就能真正理解findPredecessorOfLast()
方法的实际作用了。
findNear(K, int)
方法
在关注findNear(K, int)
方法之前,我们先关注如下三个变量:
- // Control values OR'ed as arguments to findNear
- private static final int EQ = 1;
- private static final int LT = 2;
- private static final int GT = 0; // Actually checked as !LT
这三个变量在findNear(K, int)
方法中使用频繁,分别表示Equal、Great Than和Less Than,即相等、大于和小于。注意它们的取值从大到小依次是0、1、2,转换为二进制即是0000、0010、0100,这样表示有助于在后面使用OR位运算时对它们进行合并。
再来查看findNear(K, int)
方法的源码:
- /**
- * Utility for ceiling, floor, lower, higher methods.
- *
- * @param kkey the key
- * @param rel the relation -- OR'ed combination of EQ, LT, GT;查找关系,值为EQ, LT, GT其中一个
- * @return nearest node fitting relation, or null if no such
- */
- Node<K, V> findNear(K kkey, int rel) {
- // 包装key
- Comparable<? super K> key = comparable(kkey);
- // 无限循环
- for (; ; ) {
- // 获取符合条件的前驱节点,标识为b
- Node<K, V> b = findPredecessor(key);
- // 获取b的后继为n
- Node<K, V> n = b.next;
- // 无限循环
- for (; ; ) {
- if (n == null)
- /**
- * n为null时,说明b已经是Node链中最后一个节点了
- * rel & LT为0时,表示rel不为LT
- * 此时表示,要查找的Node并不是小于传入的key的Node,即要查找的是键大于或等于key的Node
- * 由于跳表中节点时按键从小到大的顺序排列的,因此b就是要查找的Node节点
- */
- return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
- // 走到这里说明b不为null,获取n的后继为f,此时有关系b -> n -> f
- Node<K, V> f = n.next;
- // 检查n是否被改变
- if (n != b.next) // inconsistent read
- break;
- /**
- * 如果n的value为null表示n是被删除的节点,使用helpDelete()协助将其从Node链移除
- * 然后进行直接跳出内层循环
- */
- Object v = n.value;
- if (v == null) { // n is deleted
- n.helpDelete(b, f);
- break;
- }
- /**
- * 检查n是否是标记删除节点,
- * 如果n是标记删除节点,则检查b是否是被删除节点,如果b是被删除的节点,
- * 就跳出内层循环,重新在外层循环中使用findPredecessor(key)查找
- */
- if (v == n || b.value == null) // b is deleted
- break;
- // 走到这里表示b和n都是正常的Node节点,因此比较传入key与n的key
- int c = key.compareTo(n.key);
- /**
- * 首先关注其中两个条件的意思:
- * 1. (rel & EQ) != 0表示指定比较操作存在EQ操作,即可能是EQ(等于)、LT | EQ(小于等于)或GT | EQ(大于等于)
- * 2. (rel & LT) == 0表示指定操作不存在LT操作,即可能是EQ(等于)、GT(大于)或GT | EQ(大于等于)
- * 再来看if的判断过程
- * 1. 如果传入key与n的key相等,且比较操作存在EQ操作(即查找等于、小于等于或大于等于key的节点),就返回n;
- * 2. 如果传入key与n的key相等,但比较操作不存在EQ操作(即查找不等于key的节点),n不符合条件,什么都不做;
- * 3. 如果传入key小于n的key,且不存在LT操作(即查找等于、大于或大于等于key的节点),就返回n;(我一直觉得此处有问题,当传入带有EQ的操作,此时也将n返回了)
- * 4. 如果传入key小于n的key,但存在LT操作(即查找小于、小于等于key的节点),n不符合条件,什么都不做。
- */
- if ((c == 0 && (rel & EQ) != 0) || (c < 0 && (rel & LT) == 0))
- return n;
- /**
- * 如果传入key小于等于n的key,且比较操作存在LT操作(即可能是小于、小于等于)
- * 判断b是否是BASE_HEADER,如果不是就返回b
- */
- if (c <= 0 && (rel & LT) != 0)
- return b.isBaseHeader() ? null : b;
- // 走到这里说明传入的key大于n的key,因此需要进行后移操作
- b = n;
- n = f;
- }
- }
- }
findNear(K, int)
方法接收的第二个参数用于表示操作类型,是一个int值,这个值可以是LT、EQ和GT之间的任意组合。findNear(K, int)
方法的实现过程其实比较简单,同样是使用两层嵌套的无限for循环,在第二个for循环中不断遍历Node链,对拿到的一系列Node节点进行了各类检查工作,然后根据传入的操作类型判断遍历到的节点是否符合条件,如果符合就返回,否则进行新的尝试。
findNode(Comparable<? super K>)
方法
findNode(key)
方法用于寻找给定键所对应的有效的Node节点,它的源码如下:
- private Node<K, V> findNode(Comparable<? super K> key) {
- // 无限循环
- for (; ; ) {
- // 尝试找到符合条件的key的前驱节点
- Node<K, V> b = findPredecessor(key);
- // 获取b的后继节点
- Node<K, V> n = b.next;
- for (; ; ) {
- // n为null,表示没找到,直接返回null
- if (n == null)
- return null;
- // 得到n的后继节点
- Node<K, V> f = n.next;
- // 检查b的后继是否被改变,如果改变就跳出内层循环
- if (n != b.next) // inconsistent read
- break;
- // 获取n的value
- Object v = n.value;
- // n的value为null表示n已经被删掉了
- if (v == null) { // n is deleted
- // 调用helpDelete()方法将其删除,并跳出内层循环
- n.helpDelete(b, f);
- break;
- }
- // 走到这里说明n的value不为null
- if (v == n || b.value == null) // b is deleted
- /**
- * 两种情况:
- * 1. v == n说明n是标记节点
- * 2. v != n,但b.value为null,说明b是被删除的节点
- * 跳出内层循环
- */
- break;
- // 比较key和n的key的大小
- int c = key.compareTo(n.key);
- // key和n的key相同,表示找到
- if (c == 0)
- return n;
- // 表示没有找到
- if (c < 0)
- return null;
- // 往后移,跳出进行下一次循环,即往后遍历
- b = n;
- n = f;
- }
- }
- }
findNode(Comparable<? super K> key)
方法首先通过上面讲过的findPredecessor(Comparable<? super K>)
方法找到给定键所对应的符合条件的前驱节点,然后从该前驱节点开始,对后面的节点一一进行比较,找到键与传入的key
相同的节点返回即可,如果找不到就返回null。具体的操作源码中的注释已经讲解的非常清楚了。
注意在findNode(Comparable<? super K>)
方法查找过程中,会通过helpDelete(Node<K, V> b, Node<K, V> f)
方法将失效的n
节点从Node链中移除,这在后面删除键值对相关的方法中会有体现,敬请期待。
这六个查找辅助方法在ConcurrentSkipListMap的各项功能实现中充当了很重要的责任,这里将它们一并进行讲解也是为后面的内容做铺垫。另外与这六个方法相关,有一些功能性的方法内部就是调用的它们,由于比较简单,这里只贴出源码如下:
- /**
- * @throws NoSuchElementException {@inheritDoc}
- * 获取第一个数据节点的键
- */
- public K firstKey() {
- Node<K, V> n = findFirst();
- if (n == null)
- throw new NoSuchElementException();
- return n.key;
- }
- /**
- * @throws NoSuchElementException {@inheritDoc}
- * 获取最后一个数据节点的键
- */
- public K lastKey() {
- Node<K, V> n = findLast();
- if (n == null)
- throw new NoSuchElementException();
- return n.key;
- }
- /**
- * Returns SimpleImmutableEntry for results of findNear.
- *
- * 根据指定操作查找附近的数据节点,包装为SimpleImmutableEntry实例返回
- *
- * @param key the key
- * @param rel the relation -- OR'ed combination of EQ, LT, GT
- * @return Entry fitting relation, or null if no such
- */
- AbstractMap.SimpleImmutableEntry<K, V> getNear(K key, int rel) {
- for (; ; ) {
- Node<K, V> n = findNear(key, rel);
- if (n == null)
- return null;
- AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
- if (e != null)
- return e;
- }
- }
- /**
- * Returns a key-value mapping associated with the greatest key
- * strictly less than the given key, or <tt>null</tt> if there is
- * no such key. The returned entry does <em>not</em> support the
- * <tt>Entry.setValue</tt> method.
- *
- * 获取比给定key小的最大键对应的键值对
- *
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public Map.Entry<K, V> lowerEntry(K key) {
- return getNear(key, LT);
- }
- /**
- * 获取比给定key小的最大键对应的键值对的键
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public K lowerKey(K key) {
- Node<K, V> n = findNear(key, LT);
- return (n == null) ? null : n.key;
- }
- /**
- * Returns a key-value mapping associated with the greatest key
- * less than or equal to the given key, or <tt>null</tt> if there
- * is no such key. The returned entry does <em>not</em> support
- * the <tt>Entry.setValue</tt> method.
- *
- * 获取键小于或等于给定key的最大键值对
- *
- * @param key the key
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public Map.Entry<K, V> floorEntry(K key) {
- return getNear(key, LT | EQ);
- }
- /**
- * 获取键小于或等于给定key的最大键值对对应的键
- * @param key the key
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public K floorKey(K key) {
- Node<K, V> n = findNear(key, LT | EQ);
- return (n == null) ? null : n.key;
- }
- /**
- * Returns a key-value mapping associated with the least key
- * greater than or equal to the given key, or <tt>null</tt> if
- * there is no such entry. The returned entry does <em>not</em>
- * support the <tt>Entry.setValue</tt> method.
- *
- * 获取键大于或等于给定key的最大键值对
- *
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public Map.Entry<K, V> ceilingEntry(K key) {
- return getNear(key, GT | EQ);
- }
- /**
- * 获取键大于或等于给定key的最大键
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public K ceilingKey(K key) {
- Node<K, V> n = findNear(key, GT | EQ);
- return (n == null) ? null : n.key;
- }
- /**
- * Returns a key-value mapping associated with the least key
- * strictly greater than the given key, or <tt>null</tt> if there
- * is no such key. The returned entry does <em>not</em> support
- * the <tt>Entry.setValue</tt> method.
- *
- * 获取键大于给定key的最大键值对
- *
- * @param key the key
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public Map.Entry<K, V> higherEntry(K key) {
- return getNear(key, GT);
- }
- /**
- * 获取键大于给定key的最大键
- * @param key the key
- * @throws ClassCastException {@inheritDoc}
- * @throws NullPointerException if the specified key is null
- */
- public K higherKey(K key) {
- Node<K, V> n = findNear(key, GT);
- return (n == null) ? null : n.key;
- }
- /**
- * Returns a key-value mapping associated with the least
- * key in this map, or <tt>null</tt> if the map is empty.
- * The returned entry does <em>not</em> support
- * the <tt>Entry.setValue</tt> method.
- * 获取第一个数据节点的键值对,包装为SimpleImmutableEntry实例返回
- */
- public Map.Entry<K, V> firstEntry() {
- for (; ; ) {
- Node<K, V> n = findFirst();
- if (n == null)
- return null;
- AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
- if (e != null)
- return e;
- }
- }
- /**
- * Returns a key-value mapping associated with the greatest
- * key in this map, or <tt>null</tt> if the map is empty.
- * The returned entry does <em>not</em> support
- * the <tt>Entry.setValue</tt> method.
- * 获取最后一个数据节点的键值对,包装为SimpleImmutableEntry实例返回
- */
- public Map.Entry<K, V> lastEntry() {
- for (; ; ) {
- Node<K, V> n = findLast();
- if (n == null)
- return null;
- AbstractMap.SimpleImmutableEntry<K, V> e = n.createSnapshot();
- if (e != null)
- return e;
- }
- }
1.3. size()和isEmpty()方法
ConcurrentSkipListMap提供了size()
方法用于获取键值对个数,实现是非常简单的,由于所有的键值对都存放在跳表最下面一层的由Node构成的单向链表中,因此只需要遍历这个单向链表就可以获取键值对的总个数,源码如下:
- /**
- * Returns the number of key-value mappings in this map. If this map
- * contains more than <tt>Integer.MAX_VALUE</tt> elements, it
- * returns <tt>Integer.MAX_VALUE</tt>.
- *
- * <p>Beware that, unlike in most collections, this method is
- * <em>NOT</em> a constant-time operation. Because of the
- * asynchronous nature of these maps, determining the current
- * number of elements requires traversing them all to count them.
- * Additionally, it is possible for the size to change during
- * execution of this method, in which case the returned result
- * will be inaccurate. Thus, this method is typically not very
- * useful in concurrent applications.
- *
- * @return the number of elements in this map
- */
- public int size() {
- long count = 0;
- // 遍历计算
- for (Node<K, V> n = findFirst(); n != null; n = n.next) {
- if (n.getValidValue() != null)
- ++count;
- }
- // 最大只能为Integer.MAX_VALUE
- return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
- }
注:需要注意的是,如果ConcurrentSkipListMap中键值对个数大于等于Integer.MAX_VALUE,则取Integer.MAX_VALUE,避免溢出。
其中findFirst()
在前面已经介绍过了,用于查找第一个有效的Node,作为遍历操作的起始Node。从findFirst()
可以得知,起始节点起始就是基础头节点(即head
指向的、值为BASE_HEADER
的那个节点)的后继节点。
而isEmpty()
方法则更简单了,只需要判断是否能找到符合条件的起始节点即可,源码如下:
- /**
- * Returns <tt>true</tt> if this map contains no key-value mappings.
- *
- * @return <tt>true</tt> if this map contains no key-value mappings
- */
- public boolean isEmpty() {
- // 如果找到的第一个有效节点为null则表示Map为空
- return findFirst() == null;
- }
1.4. contains()相关方法
ConcurrentSkipListMap中contains()
方法有两类:containsKey(Object key)
和containsValue(Object value)
,分别用于判断是否包含某个键和是否包含某个值。先考察containsValue(Object value)
方法,源码如下:
- /**
- * Returns <tt>true</tt> if this map maps one or more keys to the
- * specified value. This operation requires time linear in the
- * map size. Additionally, it is possible for the map to change
- * during execution of this method, in which case the returned
- * result may be inaccurate.
- *
- * 判断是否包含某个值,需要遍历Node节点
- * 返回的值可能不准确,因为存在其他线程并发修改的情况
- *
- * @param value value whose presence in this map is to be tested
- * @return <tt>true</tt> if a mapping to <tt>value</tt> exists;
- * <tt>false</tt> otherwise
- * @throws NullPointerException if the specified value is null
- */
- public boolean containsValue(Object value) {
- // 参数检查
- if (value == null)
- throw new NullPointerException();
- // 遍历Node链
- for (Node<K, V> n = findFirst(); n != null; n = n.next) {
- V v = n.getValidValue();
- // 判断值是否相等
- if (v != null && value.equals(v))
- return true;
- }
- return false;
- }
containsValue(Object value)
方法的原理是遍历Node链,对每个Node的值进行判断,如果存在某个Node的值与传入value
相同,则返回true表示包含。需要注意的是,由于多线程并发的情况,在遍历判断的过程中可能存在节点数据被其他线程修改的情况,因此containsValue(Object value)
方法的结果并不一定准确。
至于containsKey(Object key)
的实现则是直接通过doGet(Object okey)
方法直接根据key
查找对应的值来实现的,关于`doGet(Object)
方法会在后面讲解,这里先贴出containsKey(Object key)
方法的源码:
- /**
- * Returns <tt>true</tt> if this map contains a mapping for the specified
- * key.
- *
- * 判断是否存在某个键
- *
- * @param key key whose presence in this map is to be tested
- * @return <tt>true</tt> if this map contains a mapping for the specified key
- * @throws ClassCastException if the specified key cannot be compared
- * with the keys currently in the map
- * @throws NullPointerException if the specified key is null
- */
- public boolean containsKey(Object key) {
- // 调用doGet()方法获取key对应的值,如果返回不为null则表示存在这个key
- return doGet(key) != null;
- }
1.5. put()相关方法
与ConcurrentHashMap非常类似,ConcurrentSkipListMap的put()
相关的方法也有三类:put(K, V)
、putIfAbsent(K, V)
和putAll(Map<? extends K, ? extends V>)
。其中putAll(Map<? extends K, ? extends V>)
在某个构造方法中用到了,底层调用的是put(K, V)
方法,而put(K, V)
和putIfAbsent(K, V)
底层都调用了doPut(K kkey, V value, boolean onlyIfAbsent)
方法,区别在于传入的第三个参数不一样,源码如下:
- /**
- * Associates the specified value with the specified key in this map.
- * If the map previously contained a mapping for the key, the old
- * value is replaced.
- * <p>
- * 添加键值对到Map中,键存在时会覆盖原来的值
- *
- * @param key key with which the specified value is to be associated
- * @param value value to be associated with the specified key
- * @return the previous value associated with the specified key, or
- * <tt>null</tt> if there was no mapping for the key 当覆盖时会返回旧值,否则返回null
- * @throws ClassCastException if the specified key cannot be compared
- * with the keys currently in the map
- * @throws NullPointerException if the specified key or value is null 键或值为空都会抛出异常
- */
- public V put(K key, V value) {
- // value不能为null
- if (value == null)
- throw new NullPointerException();
- // 调用doPut()方法添加元素,onlyIfAbsent传入false
- return doPut(key, value, false);
- }
- /**
- * {@inheritDoc}
- *
- * 添加键值对到Map中,只有在键不存在时才会添加
- *
- * @return the previous value associated with the specified key,
- * or <tt>null</tt> if there was no mapping for the key
- * @throws ClassCastException if the specified key cannot be compared
- * with the keys currently in the map
- * @throws NullPointerException if the specified key or value is null
- */
- public V putIfAbsent(K key, V value) {
- // value不能为null
- if (value == null)
- throw new NullPointerException();
- // 调用doPut()方法添加元素,onlyIfAbsent传入true
- return doPut(key, value, true);
- }
因此我们需要着重关注doPut(K, V, boolean)
方法,它的源码如下:
- /**
- * Main insertion method. Adds element if not present, or
- * replaces value if present and onlyIfAbsent is false.
- * 主要的插入方法,当键不存在时会添加,当键存在时,如果onlyIfAbsent为false则会覆盖旧值
- *
- * @param kkey the key
- * @param value the value that must be associated with key
- * @param onlyIfAbsent if should not insert if already present 是否只在不存在时添加
- * @return the old value, or null if newly inserted 旧值(当覆盖、或onlyIfAbsent为true但key已存在时),或者null(新添加)
- */
- private V doPut(K kkey, V value, boolean onlyIfAbsent) {
- /**
- * 在comparable()方法中,key为null会抛出NullPointerException。
- * 这个地方得到的key其实就是一个Comparable类型实例,可能是ComparableUsingComparator对象
- * ComparableUsingComparator将key和Comparator封装了一下
- */
- Comparable<? super K> key = comparable(kkey);
- // 无限循环
- for (; ; ) {
- /**
- * 在跳表中尝试找到符合条件的key的前驱节点
- * 查找的规则在于:跳表中底层Node的key比上层的key要大,Node链中后面Node的key比前面的key要大
- * 因此查找时是从上往下,从前往后找
- */
- Node<K, V> b = findPredecessor(key);
- // 获取b的后继节点
- Node<K, V> n = b.next;
- // 无限循环
- for (; ; ) {
- if (n != null) {
- // b的后继节点n不为null,得到n的后继节点
- Node<K, V> f = n.next;
- // 检查b的后继是否被改变,如果改变就跳出内层循环
- if (n != b.next) // inconsistent read
- break;
- // 获取n的value
- Object v = n.value;
- // n的value为null表示n已经被删掉了
- if (v == null) { // n is deleted
- /**
- * 调用helpDelete()方法将其删除,并跳出内层循环
- * 此时可能返回的情况可能有两种:
- * 修改前:b -> n -> f -> f.next(f.next可能有,也可能没有)
- * 修改后:
- * 1. b -> n -> marker -> f
- * 2. b -> f.next
- * 然后跳出内层循环
- */
- n.helpDelete(b, f);
- break;
- }
- // 走到这里说明n的value不为null
- if (v == n || b.value == null) // b is deleted
- /**
- * 两种情况:
- * 1. v == n说明n是标记节点
- * 2. v != n,但b.value为null,说明b是被删除的节点
- * 跳出内层循环
- */
- break;
- // 比较key和n的key的大小
- int c = key.compareTo(n.key);
- if (c > 0) {
- /**
- * 要比n的key大,就往后移,跳出进行下一次循环,即往后遍历:
- * 之前:b -> n -> f
- * 现在:(old b) -> b(old n) -> n(old f)
- */
- b = n;
- n = f;
- continue;
- }
- if (c == 0) {
- /**
- * key与n的key相等,则表示键已存在,此时如果onlyIfAbsent为false,
- * 就修改n的value为新value,如果修改成功就返回旧value,
- * 如果onlyIfAbsent为true,则直接返回n的value,不做任何处理,
- * 否则进行下一次尝试
- */
- if (onlyIfAbsent || n.casValue(v, value))
- return (V) v;
- else
- // 否则跳出内层循环
- break; // restart if lost race to replace value
- }
- // else c < 0; fall through
- }
- /**
- * 走到这里有两种情况:
- * 1. n为null;此时b就是新节点的前驱
- * 2. n不为null,且已经确认传入的key小于等于n的key,因此n就是新节点的后继,b就是新节点的前驱
- */
- // 构建一个新节点z,将n作为该节点的后继
- Node<K, V> z = new Node<K, V>(kkey, value, n);
- // CAS修改b的后继为新节点z,如果修改不成功就跳出内层循环
- if (!b.casNext(n, z))
- break; // restart if lost race to append to b
- // 随机一个级别,这个方法后面讲解
- int level = randomLevel();
- if (level > 0)
- // 插入一个级别
- insertIndex(z, level);
- return null;
- }
- }
- }
下面将doPut(K, V, boolean)
方法分为几个大的模块来一一讲解,同时在讲解过程中将配合一个插入案例进行演示,我们以前面讲解ConcurrentSkipListMap(SortedMap<K, ? extends V> m)
方法时最终构建好的ConcurrentSkipListMap为例,插入一组新数据:17 => "Tom"
;当前ConcurrentSkipListMap中跳表的结构如下:
1.5.1. 处理可比较键
在doPut(K, V, boolean)
方法的最开始,使用comparable(Object key)
将传入的key
转换成一个Comparable对象,comparable(Object key)
方法源码如下:
- /**
- * If using comparator, return a ComparableUsingComparator, else
- * cast key as Comparable, which may cause ClassCastException,
- * which is propagated back to caller.
- *
- * 如果使用比较器,将返回一个ComparableUsingComparator
- * 否则将key转换为Comparable对象,可能会抛出ClassCastException异常
- */
- private Comparable<? super K> comparable(Object key) throws ClassCastException {
- if (key == null)
- throw new NullPointerException();
- if (comparator != null)
- // 使用构造器,构造一个ComparableUsingComparator实例,并返回
- return new ComparableUsingComparator<K>((K) key, comparator);
- else
- // 不使用构造器,将key转换为Comparable对象
- return (Comparable<? super K>) key;
- }
从源码可知,如果使用了比较器,则将key
包装为一个ComparableUsingComparator对象,如果没有使用比较器,则将key
强制转为Comparable对象,转换过程中可能会抛出ClassCastException异常。在我们的示例中,插入的键值对数据的类型是Integer => String
,即键是Integer类型,该类型是实现了Comparable接口的。
1.5.2. 定位合适的前驱节点
在接下来的代码中,doPut(K, V, boolean)
方法使用了两个无限for循环来实现主要的添加操作,其中第一个for循环用于添加失败时的重试操作,而第二个for循环则是用于对Node节点的遍历操作;与findPredecessor(Comparable<? super K>)
方法相比,但后者的遍历是大范围真实的遍历,而前者的遍历只有一小部分,因为findPredecessor(Comparable<? super K>)
方法已经帮其缩小了目标范围。另外从for循环中的代码逻辑可知,continue操作会结束本次内层循环并直接进行新的内层循环,而break操作会跳出内层循环并直接进行新的外层循环。
外层循环首先通过findPredecessor(key)
在跳表中查找符合条件的、可以作为key
的前驱的第一个节点;findPredecessor(key)
方法在前面已经简单讲解过了,这里以我们的示例为例演示findPredecessor(key)
的具体步骤,现在跳表中已经有两组数据:<15, "Mary">
和<20, "Jack">
,现在要插入一条新数据<17, "Tom">
,有以下的查找步骤:
- 首先从
head
这一层开始查找,最后会查找到<20, "Jack">
,查找顺序如下:
由于这组数据的键比新数据的键大,因此需要下移一层重新查找。
- 下移一层开始查找,最后会查找到
<15, "Mary">
,查找顺序如下:
由于这组数据的键比新数据的键小,因此需要往后移动,重新查找。
- 往后移动后开始查找,最后又会查找到
<20, "Jack">
,查找顺序如下:
由于这组数据的键比新数据的键大,因此需要下移一层重新查找。
- 下移一层开始查找,最后还是会查找到
<20, "Jack">
,查找顺序如下:
虽然这组数据的键比新数据的键大,但此时已无法下潜(由于q
的down
为null),因此q
的node
,即数据<15, "Mary">
所在的节点即是符合条件的前驱节点。
从findPredecessor(Comparable<? super K>)
方法的查找过程可知,其实该方法存在一定的查找效率,即跳表层级越高,需要循环的次数越多。下面贴出这四步循环查找的全图:
1.5.3. 添加操作的情况分类
回到doPut(K, V, boolean)
方法中,当找到了合适的前驱节点(标识为b
)后,又获取了该前驱节点的后继节点(标识为n
),然后在内层for循环首先对这两个节点进行了几类检查,删除一些无效的节点。在各类检查通过之后,就会根据键来寻找合适的位置了,分三类情况:
- 当传入的键比节点
n
的键要大时,就往后移动,主要表现代码如下:
- // 比较key和n的key的大小
- int c = key.compareTo(n.key);
- if (c > 0) {
- /**
- * 要比n的key大,就往后移,跳出进行下一次循环,即往后遍历:
- * 之前:b -> n -> f
- * 现在:(old b) -> b(old n) -> n(old f)
- */
- b = n;
- n = f;
- continue;
- }
注:这种情况是存在的,比如
<15, "Mary">
和<20, "Jack">
中间还存在其他的数据,例如<16, "Jerry">
。我们的示例是一种简单的情况。
当一直往后移动到n
被置为null时,第一类情况会在下一次内层循环转换为第三类情况。
- 当传入的键与节点
n
的键相同时,则根据onlyIfAbsent
参数决定是否覆盖旧值,主要表现代码如下:
- // 比较key和n的key的大小
- int c = key.compareTo(n.key);
- ...
- if (c == 0) {
- /**
- * key与n的key相等,则表示键已存在,此时如果onlyIfAbsent为false,
- * 就修改n的value为新value,如果修改成功就返回旧value,
- * 如果onlyIfAbsent为true,则直接返回n的value,不做任何处理,
- * 否则进行下一次尝试
- */
- if (onlyIfAbsent || n.casValue(v, value))
- return (V) v;
- else
- // 否则跳出内层循环
- break; // restart if lost race to replace value
- }
- 当传入的键比节点
n
的键要小时,则表示此时b
和n
之间就应该是新添加的键值对对应节点所在位置,需要根据添加的键值对新创建一个节点,插入到b
和n
之间,很显然,我们的示例就是这种情况。主要表现代码如下:
- /**
- * 走到这里有两种情况:
- * 1. n为null;此时b就是新节点的前驱
- * 2. n不为null,且已经确认传入的key小于等于n的key,因此n就是新节点的后继,b就是新节点的前驱
- */
- // 构建一个新节点z,将n作为该节点的后继
- Node<K, V> z = new Node<K, V>(kkey, value, n);
- // CAS修改b的后继为新节点z,如果修改不成功就跳出内层循环
- if (!b.casNext(n, z))
- break; // restart if lost race to append to b
需要注意的是,第三类情况也会出现在当n
节点为null的情况下,即此时找到的前驱节点b
没有后继节点。
根据第三类情况,则我们添加的键值对数据所在的Node节点就应该处于b
和n
之间,插入数据的示意图如下:
1.5.4. 新增情况下的级别维护
在上面的第三类情况中,当插入一个新的Node节点成功后一定会执行后面插入级别的操作,即下面的代码:
- // 随机一个级别,这个方法后面讲解
- int level = randomLevel();
- if (level > 0)
- // 插入一个级别
- insertIndex(z, level);
- return null;
randomLevel()
方法在前面已经讲解过了,用于随机生成一个级别。这里主要关注一下insertIndex(z, level)
方法,它根据新创建的Node节点z和随机得到的级别向跳表中插入了对应的Index节点,方法insertIndex(Node<K, V>, int)
源码如下:
- /**
- * Creates and adds index nodes for the given node.
- *
- * 根据Node节点和随机级别添加对应的Index节点
- *
- * @param z the node Node数据节点
- * @param level the level of the index 随机级别
- */
- private void insertIndex(Node<K, V> z, int level) {
- HeadIndex<K, V> h = head;
- // 当前head节点的level
- int max = h.level;
- /**
- * 如果插入的level小于等于max,则不用创建新层级,
- * 只需要构建相应层级的Index列,添加到跳表中即可
- */
- if (level <= max) {
- Index<K, V> idx = null;
- /**
- * 循环创建Index节点,
- * 这种循环方式可以构建一列Index节点,
- * 最终idx指向最顶层的那个Index节点,这一列Index节点的node都指向z
- * 即假设level为2,则最终会有这样的结构:
- * Index(idx)
- * |
- * Index
- * |
- * Node(z)
- */
- for (int i = 1; i <= level; ++i)
- idx = new Index<K, V>(z, idx, null);
- // 使用addIndex()方法将idx添加到跳表中,后面分析
- addIndex(idx, h, level);
- } else { // Add a new level
- /*
- * To reduce interference by other threads checking for
- * empty levels in tryReduceLevel, new levels are added
- * with initialized right pointers. Which in turn requires
- * keeping levels in an array to access them while
- * creating new head index nodes from the opposite
- * direction.
- *
- * 走到这里,说明level比head的level还要大,
- * 因此需要添加一个新的层级,即需要构建新的HeadIndex
- */
- // 指定新层级只比当前层级大1
- level = max + 1;
- // 将要创建的Index放在一个数组中
- Index<K, V>[] idxs = (Index<K, V>[]) new Index[level + 1];
- Index<K, V> idx = null;
- // 循环创建Index节点,效果与上面的一样;并把创建的Index节点放在idxs数组中
- for (int i = 1; i <= level; ++i)
- idxs[i] = idx = new Index<K, V>(z, idx, null);
- HeadIndex<K, V> oldh;
- int k;
- for (; ; ) {
- // 旧的head
- oldh = head;
- // 旧的head的级别
- int oldLevel = oldh.level;
- if (level <= oldLevel) { // lost race to add level
- /**
- * 执行到这里,说明存在并发情况,
- * 其他线程由于维护了级别,导致head节点的级别出现了增加,
- * 因此直接记录level,跳出for循环,由addIndex()完成添加Index操作
- */
- k = level;
- break;
- }
- //
- HeadIndex<K, V> newh = oldh;
- // 旧的HeadIndex指向的Node节点
- Node<K, V> oldbase = oldh.node;
- // 从旧的HeadIndex节点层级加1开始,一直到level级别,创建新的HeadIndex
- for (int j = oldLevel + 1; j <= level; ++j)
- /**
- * 所有新的HeadIndex的层级为j,node都指向oldbase
- * right指向对应层级的Index(从idxs中获取),down指向当前顶层的HeadIndex
- * 具体效果如下:
- * HeadIndex(level) -> Index(idxs[level])
- * | |
- * ...... ......
- * | |
- * HeadIndex(oldLevel + 1) -> Index(idxs[oldLevel + 1])
- * | |
- * HeadIndex(oldLevel) -> Index(idxs[level])
- */
- newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j);
- // 修改head
- if (casHead(oldh, newh)) {
- // 如果修改成功,就记录旧的层级,并跳出for循环,由addIndex()完成添加Index操作
- k = oldLevel;
- break;
- }
- }
- // 会将构建好的一列Index添加到跳表中
- addIndex(idxs[k], oldh, k);
- }
- }
insertIndex(Node<K, V>, int)
方法根据传入级别是否大于当前head
节点的级别分了两种情况。
当传入级别小于等于当前head
节点的级别时,不需要构造新的HeadIndex节点,只需构造好相应列的Index节点,Index列的构建方式源码注释已经讲解得很清楚了;然后通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
方法添加到跳表中即可,该方法会在后面讲解。
当传入级别大于当前head
节点的级别时,则需要构建新的HeadIndex节点并添加到跳表中;在这个过程中需要注意并发情况的发生,当尝试添加HeadIndex节点过程中恰好有其他线程并发增加了HeadIndex的级别,则当前添加HeadIndex的操作可以直接结束,仅仅将Index列添加到跳表中即可。关于HeadIndex的添加源码注释解释的比较清楚,这里也不赘述。
我们主要需要关注addIndex(Index<K, V>, HeadIndex<K, V>, int)
方法,它的源码如下:
- /**
- * Adds given index nodes from given level down to 1.
- * 添加Index节点
- *
- * @param idx the topmost index node being inserted 最高层的将被插入的节点
- * @param h the value of head to use to insert. This must be
- * snapshotted by callers to provide correct insertion level
- * @param indexLevel the level of the index 插入的级别
- */
- private void addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel) {
- // Track next level to insert in case of retries
- // 跟踪下一个待插入的级别,用于重试
- int insertionLevel = indexLevel;
- // 包装idx中node节点的key
- Comparable<? super K> key = comparable(idx.node.key);
- // 检查key
- if (key == null) throw new NullPointerException();
- // Similar to findPredecessor, but adding index nodes along path to key.
- // 与findPredecessor()类似,但会根据遍历到相应key的路径添加Index节点
- for (; ; ) {
- // 头节点的级别
- int j = h.level;
- // 记录头节点
- Index<K, V> q = h;
- // 头节点的后继节点
- Index<K, V> r = q.right;
- // 待插入的Index节点
- Index<K, V> t = idx;
- for (; ; ) {
- if (r != null) { // 头节点的后继节点不为null
- // 取出该后继节点的Node
- Node<K, V> n = r.node;
- // compare before deletion check avoids needing recheck
- // 与传入的待插入Index节点比较Node节点的key
- int c = key.compareTo(n.key);
- // 如果头节点的后继节点r中node的value为null,表明该节点被删除了
- if (n.value == null) {
- /**
- * 调用unlink()将其从链中移除
- * 移除之前:q -> r -> r.right
- * 移除之后:q -> r.right
- */
- if (!q.unlink(r))
- // 移除不成功就跳出内层循环重试
- break;
- // 此时表示移除r成功,将q新的后继节点赋值给r,进行下一次内层循环
- r = q.right;
- continue;
- }
- // 走到这里表示r不是被删除的节点
- if (c > 0) {
- // 如果传入的节点idx的key比r所代表节点的key大,则往后移,并继续查找
- q = r;
- r = r.right;
- continue;
- }
- }
- /**
- * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
- * 即已经找到了Index节点正确的待插入位置
- * 需要判断当前待插入Index节点的级别与q与r的级别是否相同,
- * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
- * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
- */
- if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
- // Don't insert index if node already deleted
- // 检查待插入的Index节点的Node节点是否是被删除的节点
- if (t.indexesDeletedNode()) {
- // 待插入的Index节点中的Node节点是一个被删除的节点
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key); // cleans up
- // 返回,结束这次Index节点的插入
- return;
- }
- /**
- * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
- * 插入前:q -> r -> r.right
- * 插入后:q -> t -> r -> r.right
- */
- if (!q.link(r, t))
- // 插入不成功,跳出内层循环重试
- break; // restart
- // 插入成功,insertionLevel自减后为0时,就可以返回了
- if (--insertionLevel == 0) {
- // need final deletion check before return
- // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
- if (t.indexesDeletedNode())
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key);
- // 返回
- return;
- }
- }
- /**
- * 走到这里,有两种情况:
- * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
- * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
- */
- if (--j >= insertionLevel && j < indexLevel)
- // 下潜到待插入Index列的下一层级
- t = t.down;
- // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
- q = q.down;
- r = q.right;
- }
- }
- }
insertIndex(Node<K, V>, int)
方法虽然完成了Index列的构建,但真正的插入工作还是交给了addIndex(Index<K, V>, HeadIndex<K, V>, int)
方法。addIndex(Index<K, V>, HeadIndex<K, V>, int)
会根据传入的Index节点idx
和indexLevel
(注意,此时如果indexLevel
大于1,表示idx
其实是一个Index列的顶层Index节点),逐级维护跳表与待添加的Index节点间的层级关系。
要维护层级关系,首先需要在跳表中根据Index节点对应的Node节点的键(标识为key
),从传入的HeadIndex节点h
开始遍历查找符合条件的前驱Index节点;这种查找分为两种情况:
- 当查找到的Index节点对应的Node节点的键小于
key
,则需要继续后移查找,表现代码如下:
- Comparable<? super K> key = comparable(idx.node.key);
- Index<K, V> q = h;
- // 头节点的后继节点
- Index<K, V> r = q.right;
- // 取出该后继节点的Node
- Node<K, V> n = r.node;
- // compare before deletion check avoids needing recheck
- // 与传入的待插入Index节点比较Node节点的key
- int c = key.compareTo(n.key);
- if (c > 0) {
- // 如果传入的节点idx的key比r所代表节点的key大,则往后移,并继续查找
- q = r;
- r = r.right;
- continue;
- }
- 当查找到的Index节点对应的Node节点的键大于等于
key
,则标识找到了Index节点正确的待插入位置,代码中表示为q
与r
之间,此时需要将待插入的Index节点插到q
与r
两个节点之间,表现代码如下:
- /**
- * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
- * 即已经找到了Index节点正确的待插入位置
- * 需要判断当前待插入Index节点的级别与q与r所在的层级是否相同,
- * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
- * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
- */
- if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
- // Don't insert index if node already deleted
- // 检查待插入的Index节点的Node节点是否是被删除的节点
- if (t.indexesDeletedNode()) {
- // 待插入的Index节点中的Node节点是一个被删除的节点
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key); // cleans up
- // 返回,结束这次Index节点的插入
- return;
- }
- /**
- * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
- * 插入前:q -> r -> r.right
- * 插入后:q -> t -> r -> r.right
- */
- if (!q.link(r, t))
- // 插入不成功,跳出内层循环重试
- break; // restart
- // 插入成功,insertionLevel自减后为0时,就可以返回了
- if (--insertionLevel == 0) {
- // need final deletion check before return
- // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
- if (t.indexesDeletedNode())
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key);
- // 返回
- return;
- }
- }
注意,这里在插入之前其实做了一次j == insertionLevel
的判断,以保证当前插入的Index的层级与q
与r
两个节点所在的层级相同的情况下才插入。大家可以回顾跳表的结构,跳表中所有的索引列其实是保持逻辑上底部对齐的,因此在插入Index时一定要保证插入过程完成后,Index列每一层的Index都应该处在正确的层级上。
当j == insertionLevel
条件不满足时,表示当前插入的Index的层级与q
与r
两个节点所在的层级不同,此时应该下潜一级,然后重新进行前驱节点的查找,表现代码如下:
- /**
- * 走到这里,有两种情况:
- * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
- * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
- */
- if (--j >= insertionLevel && j < indexLevel)
- // 下潜到待插入Index列的下一层级
- t = t.down;
- // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
- q = q.down;
- r = q.right;
不过我们需要注意,这部分代码不仅仅作用于上面这种情况。在上面j == insertionLevel
满足的情况下,如果Index节点插到q
与r
两个节点之间的工作顺利完成,还会执行如下代码:
- // 插入成功,insertionLevel自减后为0时,就可以返回了
- if (--insertionLevel == 0) {
- // need final deletion check before return
- // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
- if (t.indexesDeletedNode())
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key);
- // 返回
- return;
- }
这段代码的意思是,insertionLevel
自减后如果为0,则表示待插入的Index列上所有的Index节点都已经连接到跳表中了,因此可以直接return返回结束了;但如果insertionLevel
自减后如果不为0,则表示待插入的Index列上还有Index节点没有维护,因此也会执行下潜一层的操作,处理剩下的Index节点。注意下面的代码:
- if (--j >= insertionLevel && j < indexLevel)
- // 下潜到待插入Index列的下一层级
- t = t.down;
这里的操作中,t
是引用的传入的idx
,表示最开始传入的idx
是指向了一个Index列的顶层节点,t = t.down
即是在这个Index列中进行下潜。
这部分对addIndex(Index<K, V>, HeadIndex<K, V>, int)
方法的讲解非常抽象,比较难以理解,这里我们以示例来演示。在前面的操作中,虽然我们已经将数据<17, "Tom">
添加到了跳表中,但并没有维护层级关系。此时head
头节点的level是3,假设随机得到的级别是2,根据insertIndex(Node<K, V> z, int level)
方法,将会构建2层的Index列(所有的Index节点的node
都指向新添加的Node数据节点),示意图如下:
此时Index列的所有Index节点只是与新添加的Node节点有关联关系,并没有与跳表中的索引结构进行关联。
1.5.4.1 查找符合条件的前驱Index节点
通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
可以将Index列添加到跳表的索引结构中。在添加之前,首先需要定位合适的添加位置,即遍历查找符合条件的前驱Index节点,查找流程图如下:
从查找流程可知,在第一次查找中,虽然最后得到的Node节点的键比新添加的Node节点的键(通过传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
方法的idx.node.key
获取)大,但由于此时q
指向的节点的层级大于Index列的总层级(即传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
方法的indexLevel
参数),因此并不满足条件,需要q
节点进行下潜后再次查找。
而在第二次查找中,最后得到的Node节点的键比新添加的Node节点的键小,因此需要q
节点向后移动,此时r
节点也会在q
节点移动后被更新,然后再次进行查找。
第三次查找中,最后得到的Node节点与第一次查找时相同,但是此时q
节点的层级是等于Index列的总层级的,因此符合插入条件,即此时q
节点就是符合条件的前驱Index节点。
1.5.4.2 插入Index列
找到了符合条件的前驱Index节点,就可以进行Index列的插入工作了;需要注意的是,传给addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
方法的idx
虽然只是一个Index对象,但从insertIndex(Node<K, V> z, int level)
方法可以看出,当level
大于1时,最后构成的idx
对象其实是一个Index列中最高层的Index对象,它们从上到下依次通过down
指针进行连接,这个注意点已经强调过很多次了;回顾代码:
- private void insertIndex(Node<K, V> z, int level) {
- HeadIndex<K, V> h = head;
- // 当前head节点的level
- int max = h.level;
- /**
- * 如果插入的level小于等于max,则不用创建新层级,
- * 只需要构建相应层级的Index列,添加到跳表中即可
- */
- if (level <= max) {
- Index<K, V> idx = null;
- /**
- * 循环创建Index节点,
- * 这种循环方式可以构建一列Index节点,
- * 最终idx指向最顶层的那个Index节点,这一列Index节点的node都指向z
- * 即假设level为2,则最终会有这样的结构:
- * Index(idx)
- * |
- * Index
- * |
- * Node(z)
- */
- for (int i = 1; i <= level; ++i)
- idx = new Index<K, V>(z, idx, null);
- // 使用addIndex()方法将idx添加到跳表中,后面分析
- addIndex(idx, h, level);
- } else { // Add a new level
- /*
- *
- * 走到这里,说明level比head的level还要大,
- * 因此需要添加一个新的层级,即需要构建新的HeadIndex
- */
- // 指定新层级只比当前层级大1
- level = max + 1;
- // 将要创建的Index放在一个数组中
- Index<K, V>[] idxs = (Index<K, V>[]) new Index[level + 1];
- Index<K, V> idx = null;
- // 循环创建Index节点,效果与上面的一样;并把创建的Index节点放在idxs数组中
- for (int i = 1; i <= level; ++i)
- idxs[i] = idx = new Index<K, V>(z, idx, null);
- ...
- // 会将构建好的一列Index添加到跳表中
- addIndex(idxs[k], oldh, k);
- }
- }
因此,插入Index列其实涉及整个列中所有Index节点与跳表索引结构之间的维护工作;下面是本示例中Index列插入过程的示意图:
在本例中Index列只有两层,因此插入Index列涉及两次连接操作。在插入上层Index节点时,会根据上面查找到的符合条件的前驱Index节点q
以及q
的后继节点r
,把上层Index节点插在q
和r
之间,这一步是通过addIndex(Index<K, V> idx, HeadIndex<K, V> h, int indexLevel)
方法下面的片段实现的:
- /**
- * 走到这里,说明待插入Index节点的node的key小于或等于r节点的node的key
- * 即已经找到了Index节点正确的待插入位置
- * 需要判断当前待插入Index节点的级别与q与r的级别是否相同,
- * 如果相同则可以插入,如果不同,应该在当前层的下面层进行尝试插入,
- * 以保持插入节点后Index列的高度与HeadIndex列的高度保持低部对齐
- */
- if (j == insertionLevel) { // 待插入Index节点的级别等于头节点的级别
- // Don't insert index if node already deleted
- // 检查待插入的Index节点的Node节点是否是被删除的节点
- if (t.indexesDeletedNode()) {
- // 待插入的Index节点中的Node节点是一个被删除的节点
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key); // cleans up
- // 返回,结束这次Index节点的插入
- return;
- }
- /**
- * 可以插入,将r作为t的后继,然后将t作为q的后继,即将t插在r的前面,q的后面
- * 插入前:q -> r -> r.right
- * 插入后:q -> t -> r -> r.right
- */
- if (!q.link(r, t))
- // 插入不成功,跳出内层循环重试
- break; // restart
- // 插入成功,insertionLevel自减后为0时,就可以返回了
- if (--insertionLevel == 0) {
- // need final deletion check before return
- // 返回之前检查t是否被删除了,如果被删除了就调用findNode(key)进行清理
- if (t.indexesDeletedNode())
- // findNode(key)方法的代码会移除已标记为删除的节点
- findNode(key);
- // 返回
- return;
- }
- }
q.link(r, t)
就是最终的连接操作;插入成功后,会对insertionLevel
进行自减,这一步操作相当于对已插入的层级进行计数,表示已经有一层插入成功了。如果自减后的insertionLevel
为0,表示所有层级的Index节点都插入完成,因此可以return返回结束。
当自减后的insertionLevel
不为0,则会继续往下执行下面的代码:
- /**
- * 走到这里,有两种情况:
- * 1. 尝试插入Index时,发现需要连接的前驱Index和后继Index与待插入的Index不在一个层级,因此需要下潜一层再尝试插入
- * 2. Index插入成功了,但insertionLevel自减后还不为0,即表示插入的Index其实是一个多层的列,还剩余有层级没有进行连接处理
- */
- if (--j >= insertionLevel && j < indexLevel)
- // 下潜到待插入Index列的下一层级
- t = t.down;
- // 前驱节点也下潜,然后进行下一次内层循环,处理该层级待插入的Index节点的连接
- q = q.down;
- r = q.right;
这部分代码使t
指针下潜了,同时q
指针也下潜了,然后在下一次内层循环中,就可以在新的q
和r
之间插入Index列中下层的Index节点了;下层的Index节点插入完成后insertionLevel
再次自减就会变为0,表示所有层级的Index节点都插入完成,直接return返回结束插入工作。到此,添加键值对的操作也就全部宣告完成了。
从上面的分析可知,doPut(K kkey, V value, boolean onlyIfAbsent)
方法的实现的确非常复杂,其实主要复杂的地方并不是新键值对的添加,而是对整个跳表结构的维护。这部分的解析大家如果看不明白也没关系,只需要明白以下的两个注意点即可:
- 在ConcurrentSkipListMap添加键值对时,会首先找到符合条件的前驱Node节点,然后将键值对包装为Node节点添加到这个前驱Node节点的后面。
- 每次添加之后会随机得到一个级别,如果级别大于0,就会构建相应数量的Index节点构成Index列,插入到跳表中。
能够读懂doPut(K, V, boolean)
方法的实现,后面的各种操作就非常简单了。doPut(K, V, boolean)
方法是所有操作方法里最复杂的一个。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...