Java多线程 31 - ConcurrentHashMap(JDK1.7)详解(2)
发布于 / 2018-08-16
简介:ConcurrentHashMap是线程安全的哈希表,用法与HashMap或Hashtable类似。
1. ConcurrentHashMap源码解析
在接下来的内容中将以ConcurrentHashMap源码为基础详细解析其各类操作的实现。
1.1. ConcurrentHashMap的构造方法
ConcurrentHashMap有五个构造方法:
- // 创建一个带有默认初始容量(16)、加载因子(0.75)和concurrencyLevel(16)的新的空映射
- ConcurrentHashMap()
- // 创建一个带有指定初始容量、默认加载因子(0.75)和concurrencyLevel(16)的新的空映射
- ConcurrentHashMap(int initialCapacity)
- // 创建一个带有指定初始容量、加载因子和默认concurrencyLevel(16)的新的空映射
- ConcurrentHashMap(int initialCapacity, float loadFactor)
- // 构造一个与给定映射具有相同映射关系的新映射
- ConcurrentHashMap(Map<? extends K, ? extends V> m)
- // 创建一个带有指定初始容量、加载因子和并发级别的新的空映射
- ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
其中前四个构造方法内部都是调用的第五个构造方法,无非是传入的参数不同而已:
- public ConcurrentHashMap(int initialCapacity, float loadFactor) {
- this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
- }
- public ConcurrentHashMap(int initialCapacity) {
- this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
- }
- public ConcurrentHashMap() {
- this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
- }
- public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
- this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
- putAll(m);
- }
这里需要注意的是第四个构造方法内部通过Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY)
来计算初始容量,然后通过putAll(Map<? extends K, ? extends V>)
方法将传入的集合m
中的元素依次添加到新构造的ConcurrentHashMap中。
我们主要关注第五个构造方法,它的源码如下:
- /**
- * Creates a new, empty map with the specified initial
- * capacity, load factor and concurrency level.
- * 根据初始容量、平衡因子和并发级别创建一个ConcurrentHashMap
- *
- * @param initialCapacity 初始容量,the initial capacity. The implementation
- * performs internal sizing to accommodate this many elements.
- * @param loadFactor 平衡因子,用于计算阈值,the load factor threshold, used to control resizing.
- * 并发级别,Resizing may be performed when the average number of elements per
- * bin exceeds this threshold.
- * @param concurrencyLevel the estimated number of concurrently
- * updating threads. The implementation performs internal sizing
- * to try to accommodate this many threads.
- * @throws IllegalArgumentException 当参数不规范时会抛出异常,if the initial capacity is
- * negative or the load factor or concurrencyLevel are
- * nonpositive.
- */
- @SuppressWarnings("unchecked")
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- // 检查并发级别,从这里可以得知,最大并发级别为65,535
- if (concurrencyLevel > MAX_SEGMENTS)
- concurrencyLevel = MAX_SEGMENTS;
- // Find power-of-two sizes best matching arguments
- /**
- * 根据并发级别,来计算Segment的最大个数,最大为65535
- * 其中sshift记录了左移次数,ssize则计算了大小
- */
- int sshift = 0;
- int ssize = 1;
- // 2的次方,不小于concurrencyLevel的那个值
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
- // 这两个值后面会讲解
- this.segmentShift = 32 - sshift;
- this.segmentMask = ssize - 1;
- /**
- * 检查哈希表的初始容量
- * 哈希表的实际容量 = Segments的容量 * Segment中数组的长度
- */
- if (initialCapacity > MAXIMUM_CAPACITY)
- initialCapacity = MAXIMUM_CAPACITY;
- // 计算每个Segment Table,即HashEntry链,应该装载的数据量
- int c = initialCapacity / ssize;
- if (c * ssize < initialCapacity)
- ++c;
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- // cap即是Segments中HashEntry链的长度,2的次方,不小于c的那个值
- while (cap < c)
- cap <<= 1;
- // create segments and segments[0]
- // 创建Segment数组的第0个Segment对象s0
- Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
- // 创建Segment数组
- Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
- // 将s0装入到Segment数组ss索引为0的位置
- UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
- this.segments = ss;
- }
该构造方法的代码比较多,而且也不太好理解,但它的功能是非常简单的:检查传入的参数,根据并发级别构造了Segment数组,从代码的的实现来说,Segment数组的大小即为并发级别量,这种设定也相对好理解,当有N个线程并发访问ConcurrentHashMap时,如果有N个Segment分段,则能够尽量减少这N个线程的冲突访问次数;而HashEntry数组的大小则是由初始容量除以并发级别得到的,即保证每个Segment中HashEntry的数量是平均的。需要注意的是,代码中在使用这些量级指标时,都将相关数量转换为了2的次方,以方便后面的操作。
1.2. hash()方法
hash()方法作为ConcurrentHashMap重要的方法之一,承担了对存储的键值对的键做哈希操作的任务,它定义为ConcurrentHashMap类中的hash(Object)
方法,代码如下:
- /**
- * Applies a supplemental hash function to a given hashCode, which
- * defends against poor quality hash functions. This is critical
- * because ConcurrentHashMap uses power-of-two length hash tables,
- * that otherwise encounter collisions for hashCodes that do not
- * differ in lower or upper bits.
- * hash方法,根据k计算hash值
- */
- private int hash(Object k) {
- int h = hashSeed;
- if ((0 != h) && (k instanceof String)) {
- return sun.misc.Hashing.stringHash32((String) k);
- }
- h ^= k.hashCode();
- // Spread bits to regularize both segment and index locations,
- // using variant of single-word Wang/Jenkins hash.
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >>> 10);
- h += (h << 3);
- h ^= (h >>> 6);
- h += (h << 2) + (h << 14);
- return h ^ (h >>> 16);
- }
当传入Object类型的k
之后,将会计算出相应的哈希值;从实现代码可知,如果hashSeed
的值不为0且k
为字符串时将会直接调用sun.misc.Hashing.stringHash32((String) k)
来计算哈希值。hashSeed
在前面提到过,它是ConcurrentHashMap的一个常量,定义如下:
- /**
- * A randomizing value associated with this instance that is applied to
- * hash code of keys to make hash collisions harder to find.
- * 随机Hash种子,这个值取决于 {@link Holder#ALTERNATIVE_HASHING}
- */
- private transient final int hashSeed = randomHashSeed(this);
- // 根据Holder.ALTERNATIVE_HASHING计算得到hashSeed
- private static int randomHashSeed(ConcurrentHashMap instance) {
- if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
- return sun.misc.Hashing.randomHashSeed(instance);
- }
- return 0;
- }
从代码可知,hashSeed
取决于Holder.ALTERNATIVE_HASHING
的值,如果Holder.ALTERNATIVE_HASHING
为true,会通过sun.misc.Hashing.randomHashSeed(this)
计算得到hashSeed
;Holder是ConcurrentHashMap的一个静态内部类,用于存放一些在虚拟机启动后才能初始化的值,其中Holder.ALTERNATIVE_HASHING
就是在该类的静态代码块中初始化的:
- /**
- * holds values which can't be initialized until after VM is booted.
- * 存放一些在虚拟机启动后才能初始化的值
- */
- private static class Holder {
- /**
- * Enable alternative hashing of String keys?
- *
- * <p>Unlike the other hash map implementations we do not implement a
- * threshold for regulating whether alternative hashing is used for
- * String keys. Alternative hashing is either enabled for all instances
- * or disabled for all instances.
- */
- static final boolean ALTERNATIVE_HASHING;
- static {
- // Use the "threshold" system property even though our threshold behaviour is "ON" or "OFF".
- // 获得JVM的参数jdk.map.althashing.threshold的值,这个值将作为阈值
- String altThreshold = java.security.AccessController.doPrivileged(new sun.security.action.GetPropertyAction("jdk.map.althashing.threshold"));
- int threshold;
- try {
- // 强转altThreshold,如果altThreshold为null则取2,147,483,647
- threshold = (null != altThreshold) ? Integer.parseInt(altThreshold) : Integer.MAX_VALUE;
- // disable alternative hashing if -1
- // 当threshold为-1时,关掉alternative hashing
- if (threshold == -1) {
- threshold = Integer.MAX_VALUE;
- }
- if (threshold < 0) {
- throw new IllegalArgumentException("value must be positive integer.");
- }
- } catch (IllegalArgumentException failed) {
- throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed);
- }
- /**
- * 当threshold <= 1,073,741,824时ALTERNATIVE_HASHING为true
- * 其实这种情况表明,当-Djdk.map.althashing.threshold参数传入-1时,ALTERNATIVE_HASHING必然为false,也即关掉alternative hashing
- */
- ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY;
- }
- }
通过JVM虚拟机启动参数jdk.map.althashing.threshold
可以控制是否开启Alternative Hashing;回到hash(Object)
方法,一旦开启Alternative Hashing意味着hashSeed
的值不为0,此时如果传入的需要进行哈希的键k
是字符串,就会直接调用sun.misc.Hashing.stringHash32((String) k)
进行哈希操作,否则将使用后面的Wang/Jenkins hash算法的变种来计算哈希值,以减少哈希冲突,使元素能够均匀的分布在不同的Segment上,提高容器的存取效率。
注:在JDK 8中,
hashSeed
已经被移除掉了,移除掉的原因是调用sun.misc.Hashing.randomHashSeed
计算hashSeed
时会调用方法java.util.Random.nextInt()
,该方法使用AtomicLong,在多线程情况下会有性能问题。
1.3. size()和isEmpty()方法
ConcurrentHashMap提供了size()
方法用于获取键值对个数,源码如下:
- /**
- * Returns the number of key-value mappings in this map. If the
- * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
- * <tt>Integer.MAX_VALUE</tt>.
- * 返回Map中键值对的个数,当超过Integer.MAX_VALUE时会返回Integer.MAX_VALUE
- *
- * @return the number of key-value mappings in this map
- */
- public int size() {
- /**
- * Try a few times to get accurate count. On failure due to continuous async changes in table, resort to locking.
- * 多次尝试以获取准确的数量。当持续进行table的异步更新、重排和锁定时可能会失败
- */
- // 获取Segment数组
- final Segment<K, V>[] segments = this.segments;
- int size;
- // 标识是否溢出
- boolean overflow; // true if size overflows 32 bits
- // 本次计算后得到的modCount
- long sum; // sum of modCounts
- // 上一次计算后得到的modCount
- long last = 0L; // previous sum
- int retries = -1; // first iteration isn't retry
- try {
- // 循环不断尝试
- for (; ; ) {
- /**
- * 前两次是不加锁的,在不加锁的情况下先后计算两次,如果两次计算的modCount一致,
- * 则认为在统计的时间内,没有其它线程对该map修改或删除,直接返回size;
- * 如果两次计算的modCount不一致,则对所有的Segment加锁,并计算size。
- */
- if (retries++ == RETRIES_BEFORE_LOCK) {
- // 遍历操作,对所有的Segment上锁
- for (int j = 0; j < segments.length; ++j)
- /**
- * 获取Segment数组中索引为j位置上的Segment,并上锁
- * 这个操作可以保证如果索引为j位置上没有Segment,就新创建一个
- */
- ensureSegment(j).lock(); // force creation
- }
- sum = 0L;
- size = 0;
- overflow = false;
- // 遍历所有的Segment,叠加计算其存储的元素的数量
- for (int j = 0; j < segments.length; ++j) {
- // 获取Segment
- Segment<K, V> seg = segmentAt(segments, j);
- if (seg != null) {
- // 更新modCount计数器
- sum += seg.modCount;
- int c = seg.count;
- // 叠加并判断相关数量是否溢出
- if (c < 0 || (size += c) < 0)
- overflow = true;
- }
- }
- /**
- * 前两次计算得到的modCount计数如果相同,说明没有并发修改,就跳出for循环,结束计数
- * 这种方式可以保证计数的精确性,如果两次计算得到modCount不一致表示可能有其他的线程修改了Map导致元素数量发生了变化,
- * 在下一次的循环中将加锁统计。
- */
- if (sum == last)
- break;
- // 记录这一次计算后得到modCount
- last = sum;
- }
- } finally {
- // 循环对所有的Segment解锁
- if (retries > RETRIES_BEFORE_LOCK) {
- for (int j = 0; j < segments.length; ++j)
- segmentAt(segments, j).unlock();
- }
- }
- // 返回值
- return overflow ? Integer.MAX_VALUE : size;
- }
注:
size()
和后面讲到的isEmpty()
两个方法中用到的segmentAt(Segment<K, V>[] ss, int j)
方法在前面有过介绍,即获取Segment数组ss
上的索引为i位置上的元素。
size()
方法的原理是比较简单的,即遍历Segment数组中所有的Segment,将每个Segment包含的元素数量进行累加;如果ConcurrentHashMap中的元素个数超过Integer.MAX_VALUE
时,会直接返回Integer.MAX_VALUE
。需要注意的是,这里巧妙地先使用两次循环试图避开加锁,如果在前两次循环中得到的统计值是相同的,则认为没有其他线程并发修改ConcurrentHashMap中的元素,直接跳出循环返回统计值即可;但如果前两次循环得到的统计值不同,那么在第三次循环中会对所有的Segment进行lock()
上锁后再统计,计算完后会解锁。这种方式既可以在理想情况下避开加锁操作,也可以保证计数的精确性。
另外,加锁过程中用到的ensureSegment(int)
方法可以保证Segment数组中每个索引位置都有Segment对象填充,如果没有就新建;ensureSegment(int)
方法源码如下:
- /**
- * Returns the segment for the given index, creating it and
- * recording in segment table (via CAS) if not already present.
- * 返回指定索引位置的Segment,当不存在时将创建它
- *
- * @param k the index
- * @return the segment
- */
- @SuppressWarnings("unchecked")
- private Segment<K, V> ensureSegment(int k) {
- // 获取Segment数组
- final Segment<K, V>[] ss = this.segments;
- // 计算索引为k的Segment在Segment数组中的原始偏移量
- long u = (k << SSHIFT) + SBASE; // raw offset
- Segment<K, V> seg;
- // 尝试获取
- if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
- // 走到这里表示获取到的Segment为null
- // 使用索引为0位置上的Segment作为原型,主要将其table长度、加载因子等值用于新Segment的创建
- Segment<K, V> proto = ss[0]; // use segment 0 as prototype
- // table长度
- int cap = proto.table.length;
- // 加载因子
- float lf = proto.loadFactor;
- // 阈值
- int threshold = (int) (cap * lf);
- // 创建HashEntry链数组
- HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];
- // 重新检查索引为k的Segment是否为null
- if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
- // 创建新的Segment
- Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);
- while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
- // CAS方式赋值
- if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
- // 赋值成功即跳出while循环
- break;
- }
- }
- }
- return seg;
- }
size()
方法中使用两次modCount
比较的方式在isEmpty()
方法中也有体现,ConcurrentHashMap的isEmpty()
用于判断当前Map是否为空,它的源码如下:
- /**
- * Returns <tt>true</tt> if this map contains no key-value mappings.
- * 判断Map是否为空
- *
- * @return <tt>true</tt> if this map contains no key-value mappings
- */
- public boolean isEmpty() {
- /*
- * Sum per-segment modCounts to avoid mis-reporting when
- * elements are concurrently added and removed in one segment
- * while checking another, in which case the table was never
- * actually empty at any point. (The sum ensures accuracy up
- * through at least 1<<31 per-segment modifications before
- * recheck.) Methods size() and containsValue() use similar
- * constructions for stability checks.
- */
- long sum = 0L;
- // 获取Segment数组
- final Segment<K, V>[] segments = this.segments;
- // 循环遍历Segment数组
- for (int j = 0; j < segments.length; ++j) {
- Segment<K, V> seg = segmentAt(segments, j);
- if (seg != null) {
- // 一旦某个Segment中的元素不为0,就返回false
- if (seg.count != 0)
- return false;
- // 更新modCount总和,这次是加操作
- sum += seg.modCount;
- }
- }
- // 重新检查
- if (sum != 0L) { // recheck unless no modifications
- for (int j = 0; j < segments.length; ++j) {
- Segment<K, V> seg = segmentAt(segments, j);
- if (seg != null) {
- if (seg.count != 0)
- return false;
- // 更新modCount总和,这次是减操作
- sum -= seg.modCount;
- }
- }
- // 二次检查后,sum应该为0,如果不为0表示有其他线程修改了Map导致元素数量发生了变化,则返回false
- if (sum != 0L)
- return false;
- }
- return true;
- }
isEmpty()
方法的原理也非常简单,遍历Segment数组中所有的Segment,如果每个Segment中的元素个数都为0,则表示Map是空的。同时在遍历过程中也将所有Segment的modCount
值进行累加统计并记为sum
,如果sum
不为0(此时表示Map中是有元素的)就进行第二次遍历;与第一次遍历不同的是,第二次遍历是从sum
中减去所有Segment的modCount
值,如果经过两次遍历后sum
的值为0,则表示两次遍历中所有Segment的modCount
值的和是相同的,即元素数量没有发生变化,本次判断结果有效,否则直接返回false。
注1:需要注意的是,在Segment类中,put、remove、replace和clear操作都会使
modCount
自增,rehash操作是不会修改modCount
的,这些后面会讲解。
注2:关于Segment的lock()
和unlock()
操作,由于Segment继承自ReentrantLock,因此Segment类本身就是一把互斥锁,拥有加锁和解锁操作;关于ReentrantLock锁的内容可以关注前面的文章Java多线程系列——ReentrantLock互斥锁。
1.4. contains()相关方法
ConcurrentHashMap中contains()
方法有三类:contains(Object value)
、containsKey(Object key)
和containsValue(Object value)
,contains(Object value)
内部调用的就是containsValue(Object value)
,用于判断是否包含某个值,containsKey(Object key)
则用于判断是否包含某个键。
先考察contains(Object)
和containsValue(Object)
两个方法,源码如下:
- /**
- * Legacy method testing if some key maps into the specified value
- * in this table. This method is identical in functionality to
- * {@link #containsValue}, and exists solely to ensure
- * full compatibility with class {@link java.util.Hashtable},
- * which supported this method prior to introduction of the
- * Java Collections framework.
- *
- * 该方法调用了containsValue(Object value)
- *
- * @param value a value to search for
- * @return <tt>true</tt> if and only if some key maps to the
- * <tt>value</tt> argument in this table as
- * determined by the <tt>equals</tt> method;
- * <tt>false</tt> otherwise
- * @throws NullPointerException if the specified value is null
- */
- public boolean contains(Object value) {
- return containsValue(value);
- }
- /**
- * Returns <tt>true</tt> if this map maps one or more keys to the
- * specified value. Note: This method requires a full internal
- * traversal of the hash table, and so is much slower than
- * method <tt>containsKey</tt>.
- *
- * 判断某个值是否存在于Map中,可能需要遍历整个Map,因此比containsKey要慢
- *
- * @param value value whose presence in this map is to be tested
- * @return <tt>true</tt> if this map maps one or more keys to the
- * specified value
- * @throws NullPointerException if the specified value is null 值不能为null
- */
- public boolean containsValue(Object value) {
- // Same idea as size() 实现方式与size()方法类似
- // 检查value是否为空
- if (value == null)
- throw new NullPointerException();
- // 获取Segment数组
- final Segment<K, V>[] segments = this.segments;
- // 标识是否找到
- boolean found = false;
- long last = 0;
- int retries = -1;
- try {
- outer:
- for (; ; ) {
- // 先对所有Segment上锁
- if (retries++ == RETRIES_BEFORE_LOCK) {
- for (int j = 0; j < segments.length; ++j)
- ensureSegment(j).lock(); // force creation
- }
- long hashSum = 0L;
- int sum = 0;
- // 遍历Segment
- for (int j = 0; j < segments.length; ++j) {
- HashEntry<K, V>[] tab;
- // 获取指定索引的Segment
- Segment<K, V> seg = segmentAt(segments, j);
- if (seg != null && (tab = seg.table) != null) {
- // 遍历Segment中的HashEntry链
- for (int i = 0; i < tab.length; i++) {
- HashEntry<K, V> e;
- // 遍历HashEntry链
- for (e = entryAt(tab, i); e != null; e = e.next) {
- V v = e.value;
- // 判断值是否相同
- if (v != null && value.equals(v)) {
- // 如果有相同的表示是包含这个值的
- found = true;
- // 跳出到外层循环
- break outer;
- }
- }
- }
- sum += seg.modCount;
- }
- }
- // 判断得到的modCount是否一致,保证精确性
- if (retries > 0 && sum == last)
- break;
- last = sum;
- }
- } finally {
- // 遍历Segment并解锁
- if (retries > RETRIES_BEFORE_LOCK) {
- for (int j = 0; j < segments.length; ++j)
- segmentAt(segments, j).unlock();
- }
- }
- return found;
- }
contains(Object value)
内部调用了containsValue(Object value)
,而containsValue(Object value)
的实现方式与前面的size()
方法很类似:循环遍历整个Map,查找相应的value
,同时通过modCount
来尝试避免加锁及提高精确性。上面的源码注释解释得非常清楚了。需要注意的是,containsValue(Object)
可能需要遍历整个Map,所以它的效率及其低下。
containsKey(Object)
方法的效率则相对较高,它的源码如下:
- /**
- * Tests if the specified object is a key in this table.
- * 判断某个键是否存在于Map中
- *
- * @param key possible key
- * @return <tt>true</tt> if and only if the specified object
- * is a key in this table, as determined by the
- * <tt>equals</tt> method; <tt>false</tt> otherwise.
- * @throws NullPointerException if the specified key is null 键不可为空
- */
- @SuppressWarnings("unchecked")
- public boolean containsKey(Object key) {
- Segment<K, V> s; // same as get() except no need for volatile value read
- HashEntry<K, V>[] tab;
- // 获取key的哈希值
- int h = hash(key);
- // 根据哈希值计算相应的Segment在Segment数组中的偏移量
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- // 根据偏移量获取Segment
- if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
- // 获取到的Segment不为null,且该Segment的table不为null,就遍历该Segment的table中的HashEntry对象
- for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
- (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
- e != null; e = e.next) {
- K k;
- // key相同,key的hash也相同,表示找到了,就返回true;否则继续查找下一个HashEntry
- if ((k = e.key) == key || (e.hash == h && key.equals(k)))
- return true;
- }
- }
- return false;
- }
在containsKey(Obejct key)
方法中可以根据传入的key
直接计算出该key
所在的Segment,使用算法是(((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE
,这里我们需要考察segmentShift
和segmentMask
两个变量;这两个变量是在ConcurrentHashMap中被初始化的,回顾源码:
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- ...
- /**
- * 根据并发级别,来计算Segment的最大个数,最大为65535
- * 其中sshift记录了左移次数,ssize则计算了大小
- */
- int sshift = 0;
- int ssize = 1;
- // 2的次方,不小于concurrencyLevel的那个值
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
- // 这两个值后面会讲解
- this.segmentShift = 32 - sshift;
- this.segmentMask = ssize - 1;
- ...
- }
从源码可知,segmentShift
为32 - sshift
,sshift
是并发级别量的二进制形式表示时1按位左移的位数,因此32 - sshift
即是高位0的位数,举个例子,假设我们的并发级别是32,32是2的5次方,转换为二进制即为0010 0000
,即1按位左移5位,所以sshift
就为5,segmentShift
就为27;而segmentMask
为ssize - 1
,即31,表示为二进制码即是0001 1111
。为什么这么设计呢?我们知道,并发级别量是多少,则有多少个Segment;再看看根据哈希值计算相应的Segment在Segment数组中的偏移量的算法:(((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE
,其中哈希值h
是一个32位的int类型的整数,将其无符号右移segmentShift
(27)位,就只剩下高位的5个二进制位了,然后将其与segmentMask
(二进制码为0001 1111
)相与,这样一番操作之后,能够将哈希值的高5位取出来作为最后定位Segment偏移量的根据值,可以保证这个根据值不会超过并发级别量,也即不超过Segment数组的大小。在ConcurrentHashMap中根据哈希值定位Segment都是使用的这个算法,希望大家可以好好体会一下。
回到containsKey(Object)
方法,在根据哈希值定位到Segment之后,就开始遍历Segment中的HashEntry链,查找相应键,如果查找到了则表示键存在,直接返回true,否则如果遍历到最后也没有找到就返回false。
1.5. put()相关方法
终于到了最关键的put()
相关方法了,其实接下来的一些方法都非常重要,涉及的操作也相对复杂。
put()
相关的方法也有三类:put(K, V)
、putIfAbsent(K, V)
和putAll(Map<? extends K, ? extends V>)
。其中putAll(Map<? extends K, ? extends V>)
已经和我们打过照面了,在ConcurrentHashMap的某个构造方法中就用到了它,其实它继承自AbstractMap类,底层调用的就是put(K, V)
方法,源码如下:
- /**
- * Copies all of the mappings from the specified map to this one.
- * These mappings replace any mappings that this map had for any of the
- * keys currently in the specified map.
- *
- * 将m中的键值对拷贝到ConcurrentHashMap中,会覆盖ConcurrentHashMap中原有的键值对
- *
- * @param m mappings to be stored in this map
- */
- public void putAll(Map<? extends K, ? extends V> m) {
- // 循环遍历m中所有的键值对,使用put方法依次添加到ConcurrentHashMap中
- for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
- put(e.getKey(), e.getValue());
- }
而put(K, V)
和putIfAbsent(K, V)
基本类似,底层其实都调用了Segment的put(K key, int hash, V value, boolean onlyIfAbsent)
方法,区别在于最后一个参数传的不同。putIfAbsent(K, V)
会判断Map中是否已存在相应的键,如果存在就不添加,而put(K, V)
则是无论存在与否一律添加,覆盖原来的旧值。put(K, V)
和putIfAbsent(K, V)
的源码如下:
- /**
- * Maps the specified key to the specified value in this table.
- * Neither the key nor the value can be null.
- *
- * <p> The value can be retrieved by calling the <tt>get</tt> method
- * with a key that is equal to the original key.
- *
- * 添加键值对到Map中,键和值都不可以为null
- *
- * @param key key with which the specified value is to be associated
- * @param value value to be associated with the specified key
- * @return the previous value associated with <tt>key</tt>, or
- * <tt>null</tt> if there was no mapping for <tt>key</tt>
- * @throws NullPointerException if the specified key or value is null
- */
- @SuppressWarnings("unchecked")
- public V put(K key, V value) {
- Segment<K, V> s;
- // 检查value是否为null
- if (value == null)
- throw new NullPointerException();
- // 在hash()方法中如果key为null会抛出NullPointerException异常
- int hash = hash(key);
- // 根据hash计算对应Segment的偏移量
- int j = (hash >>> segmentShift) & segmentMask;
- // nonvolatile; recheck
- // 获取对应的Segment
- if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
- // 如果对应的Segment为空就使用ensureSegment()方法来创建
- s = ensureSegment(j);
- // 使用Segment的put方法添加元素,onlyIfAbsent传入的是false,即如存在就覆盖旧值
- return s.put(key, hash, value, false);
- }
- /**
- * {@inheritDoc}
- *
- * 该方法与{@link #put} 方法的实现基本一样
- *
- * @return the previous value associated with the specified key,
- * or <tt>null</tt> if there was no mapping for the key
- * @throws NullPointerException if the specified key or value is null
- */
- @SuppressWarnings("unchecked")
- public V putIfAbsent(K key, V value) {
- Segment<K, V> s;
- if (value == null)
- throw new NullPointerException();
- int hash = hash(key);
- int j = (hash >>> segmentShift) & segmentMask;
- if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
- s = ensureSegment(j);
- // 与put方法的地方在于,此处onlyIfAbsent传入的是true,即如存在就不添加
- return s.put(key, hash, value, true);
- }
可以发现,putIfAbsent(K, V)
和put(K, V)
,最后调用Segment对象的put(K key, int hash, V value, boolean onlyIfAbsent)
方法时传入的onlyIfAbsent
不同。另外,关于Segment索引的定位算法,与前面的contains()
相关方法里的是一致的,这里不再赘述。
1.5.1. Segment类的put()方法
上面源码中的注释对两个方法的实现已经解释得非常清楚了,这里我们要关注的自然是Segment的put(K key, int hash, V value, boolean onlyIfAbsent)
方法,该方法源码如下:
- // 添加元素
- final V put(K key, int hash, V value, boolean onlyIfAbsent) {
- /**
- * 先尝试获取锁,tryLock()方法是继承自ReentrantLock的方法
- * 如果获取到锁了,node为null,否则执行scanAndLockForPut(key, hash, value)获取node
- * scanAndLockForPut()内部会不断自旋尝试获取锁,一旦其返回了即说明获取到锁了
- * 如果返回值不为null,表示是相应HashEntry链表的头节点
- *
- * 当获取到锁之后,会根据hash找到对应的HashEntry链表(此处具体实现是找到链表的头元素)
- * 然后在链表中根据key进行遍历查找
- * 如果查找到了key相同的HashEntry,就判断onlyIfAbsent是否为true,如果是就更新该HashEntry的值为value,否则直接跳出
- * 如果没有查找到,可能有两种情况,链表为空或者确实没有
- * 则根据传入的key、hash、value创建一个HashEntry,设置到index位置上
- */
- HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
- V oldValue;
- try {
- HashEntry<K, V>[] tab = table;
- // 计算index,即 (table的长度 - 1) & hash值
- int index = (tab.length - 1) & hash;
- // 获取table上索引为index上的元素
- HashEntry<K, V> first = entryAt(tab, index);
- for (HashEntry<K, V> e = first; ; ) {
- if (e != null) {
- // 获取到的元素不为null,表示index上已经有元素了
- K k;
- if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
- // 走到这里,表示put进的键与已经存在于index位置上HashEntry的键相同
- // 先记录旧值
- oldValue = e.value;
- if (!onlyIfAbsent) {
- /**
- * 如果没有指定onlyIfAbsent为true,即没有要求必须是不存在的情况下添加
- * 就更新index位置上已存在的元素的值为value
- */
- e.value = value;
- // modCount自增
- ++modCount;
- }
- break;
- }
- // 走到这里表示key与e.key不相同,则遍历链表下一个元素进行判断
- e = e.next;
- } else {
- // 获取到的元素为null,表示index上没有元素
- if (node != null)
- // 如果node不为null,将first设置为node的后继元素
- node.setNext(first);
- else
- // 如果node为null,则创建一个新的HashEntry赋值给node,并将first作为node的后继节点
- node = new HashEntry<K, V>(hash, key, value, first);
- /**
- * 计算Segment中元素个数是否超过阈值,如果超过了就进行rehash操作
- * 否则将node放在table的index位置上
- */
- int c = count + 1;
- if (c > threshold && tab.length < MAXIMUM_CAPACITY)
- rehash(node);
- else
- setEntryAt(tab, index, node);
- // 更新modCount
- ++modCount;
- // 更新Segment元素个数计数器
- count = c;
- oldValue = null;
- break;
- }
- }
- } finally {
- // 解锁
- unlock();
- }
- return oldValue;
- }
1.5.1.1. 加锁与解锁
从Segment的put(K, int, V, boolean)
方法源码可以得知,在每次调用put(K, int, V, boolean)
方法时,都会先调用tryLock()
尝试加锁,这里的tryLock()
其实是一种优化操作,如果tryLock()
一次性成功,就可以顺利往下执行,否则需要调用scanAndLockForPut(key, hash, value)
来尝试加锁,该方法的源码如下:
- /**
- * Scans for a node containing given key while trying to
- * acquire lock, creating and returning one if not found. Upon
- * return, guarantees that lock is held. UNlike in most
- * methods, calls to method equals are not screened: Since
- * traversal speed doesn't matter, we might as well help warm
- * up the associated code and accesses as well.
- *
- * 扫描元素并尝试加锁,扫描元素主要用于寻找与给定key相同的HashEntry节点,
- * 如果没找到就创建一个新的节点,然后将这个节点返回。
- * 该方法可以保证锁已获取到
- *
- * @return a new node if key not found, else null
- */
- private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
- // 尝试获取HashEntry链的第一个元素
- HashEntry<K, V> first = entryForHash(this, hash);
- HashEntry<K, V> e = first;
- HashEntry<K, V> node = null;
- int retries = -1; // negative while locating node
- while (!tryLock()) {
- // 走到这里说明获取锁失败
- HashEntry<K, V> f; // to recheck first below
- if (retries < 0) {
- // 如果重试次数retries小于0,表示可能是第一次尝试
- /**
- * 从entryForHash()方法可知,e为null有两种情况:
- * 1. Segment为空
- * 2. Segment的table为空则返回空
- */
- if (e == null) {
- // 创建一个HashEntry节点
- if (node == null) // speculatively create node
- node = new HashEntry<K, V>(hash, key, value, null);
- // retries次数置为0,下次就不会走if (retries < 0)的分支了
- retries = 0;
- } else if (key.equals(e.key))
- // 当e不为null,且key与e.key相同时,将retries次数置为0
- retries = 0;
- else
- // 走到这里,说明e不为null,且key与e.key不同,则继续往后查找
- e = e.next;
- } else if (++retries > MAX_SCAN_RETRIES) {
- // 当重试次数超过限制,强制上锁(可能会进入阻塞状态)
- lock();
- break;
- } else if ( // 是否是偶数次尝试
- (retries & 1) == 0 &&
- // 重新尝试获取HashEntry链的第一个元素,然后判断本次获取的第一个元素是否与之前的first相同
- (f = entryForHash(this, hash)) != first) {
- /**
- * 走到这里说明是偶数次重试,并且HashEntry链的第一个元素发生了改变
- * 如果是的话,就应该重新遍历,
- * 将e和first置为新获取的HashEntry链第一个元素
- * 重置retries为-1,然后进入下一次循环
- */
- e = first = f; // re-traverse if entry changed
- retries = -1;
- }
- }
- /**
- * 该方法最后返回node,即新创建的HashEntry表头节点
- * 只有在tryLock()方法或lock()获取到锁的时候才会返回,否则会一直自旋
- */
- return node;
- }
scanAndLockForPut(key, hash, value)
是可以保证锁被获取到的,同时它可以协助找到或创建一个键与需要添加的键相同的节点,这个节点在put(K, int, V, boolean)
方法后面会用到。需要注意的是,在该方法中可能会调用lock()
方法来获取锁,因此执行添加操作的线程可能会进入阻塞状态。
另外,在put(K, int, V, boolean)
方法中的finally块会释放获取的锁。
1.5.1.2. HashEntry数组中元素的定位
获取到锁后,接下来的工作是找到Segment中table
数组里相应的HashEntry元素(也即是HashEntry链的头节点)。在table
数组上定位HashEntry元素的索引的方式是(tab.length - 1) & hash
,即数组大小减1,然后与hash
值相与。这里我们回顾一下前面ConcurrentHashMap的构造方法:
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- ...
- /**
- * 检查哈希表的初始容量
- * 哈希表的实际容量 = Segments的容量 * Segment中数组的长度
- */
- if (initialCapacity > MAXIMUM_CAPACITY)
- initialCapacity = MAXIMUM_CAPACITY;
- // 计算每个Segment Table,即HashEntry链,应该装载的数据量
- int c = initialCapacity / ssize;
- if (c * ssize < initialCapacity)
- ++c;
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- // cap即是Segments中HashEntry链的长度,2的次方,不小于c的那个值
- while (cap < c)
- cap <<= 1;
- // create segments and segments[0]
- // 创建Segment数组的第0个Segment对象s0
- Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
- // 创建Segment数组
- Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
- // 将s0装入到Segment数组ss索引为0的位置
- UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
- this.segments = ss;
- }
从构造方法可以得知,HashEntry数组的大小为cap
,cap
值也是一个2的次方,所以HashEntry数组大小减1的值用二进制表示必然是低位为连续的1,举个例子,在前面的讲解中我们假设ssize
为32,这里假设initialCapacity
为4096,则计算出的cap
为128,二进制表示为1000 0000
,则cap - 1
用二进制表示即为0111 1111
;使用0111 1111
与hash
相与可以保证计算出的索引在HashEntry数组中不会越界(最大为0111 1111
,即31)。
根据哈希值计算出索引之后,使用entryAt(HashEntry<K, V>[] tab, int i)
方法在Segment中的table
数组上找出对应的HashEntry,也即是对应的HashEntry链的头节点(可能为空)。entryAt(HashEntry<K, V>[], int)
在前面已经介绍过了。
1.5.1.3. 添加元素
找到了HashEntry链的头节点,就可以进行添加键值对操作了。整个添加操作存在于for循环中,有多种情况:
- 当HashEntry链的头节点不为空,就遍历这个链,查找键与添加操作传入的键相同的节点,如果查找到键相同的节点,会根据传入的
onlyIfAbsent
参数判断是覆盖旧值还是放弃添加;如果没有查找到,就会将根据传入的键、值和哈希值构建一个新的HashEntry添加到链的尾部。 - HashEntry链的头节点为空,就根据传入的键、值和哈希值构建一个新的HashEntry,作为头节点添加到Segment类的HashEntry数组
table
中。
需要注意的是,添加节点的操作其实都位于else分支中,就算HashEntry链的头节点不为空,如果没有找到键与添加操作传入的键相同的节点,也会因为遍历到链表结尾然后进入else分支。另外,在else分支中会计算table
的数量是否超过了阈值,如果超过了会进行重哈希操作(调用rehash()
方法),并将添加操作放在重哈希过程中。关于rehash()
方法会在后面的内容中讲解。
1.6. remove()相关方法
ConcurrentHashMap的删除相关的操作是两个重载方法:remove(Object key)
和remove(Object key, Object value)
,具体实现也是调用了Segment的删除方法:
- /**
- * Removes the key (and its corresponding value) from this map.
- * This method does nothing if the key is not in the map.
- *
- * @param key the key that needs to be removed
- * @return the previous value associated with <tt>key</tt>, or
- * <tt>null</tt> if there was no mapping for <tt>key</tt>
- * @throws NullPointerException if the specified key is null
- */
- public V remove(Object key) {
- // 根据哈希值获取Segment
- int hash = hash(key);
- Segment<K, V> s = segmentForHash(hash);
- // 调用Segment的删除方法
- return s == null ? null : s.remove(key, hash, null);
- }
- /**
- * {@inheritDoc}
- *
- * @throws NullPointerException if the specified key is null
- */
- public boolean remove(Object key, Object value) {
- int hash = hash(key);
- Segment<K, V> s;
- // 调用Segment的删除方法
- return value != null && (s = segmentForHash(hash)) != null &&
- s.remove(key, hash, value) != null;
- }
流程都是根据键得到相应的哈希值,然后使用Segment类的删除方法,接下来我们查看Segment的remove(Object key, int hash, Object value)
方法,源码如下:
- /**
- * Remove; match on key only if value null, else match both.
- * 移除操作,返回移除的元素
- */
- final V remove(Object key, int hash, Object value) {
- // 先尝试上锁
- if (!tryLock())
- scanAndLock(key, hash);
- V oldValue = null;
- try {
- HashEntry<K, V>[] tab = table;
- int index = (tab.length - 1) & hash;
- // 获取对应的元素e
- HashEntry<K, V> e = entryAt(tab, index);
- HashEntry<K, V> pred = null;
- while (e != null) {
- K k;
- // e的后继
- HashEntry<K, V> next = e.next;
- // key相同,hash值也相同
- if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
- // 获取元素的value
- V v = e.value;
- if (value == null || value == v || value.equals(v)) {
- if (pred == null)
- // 如果pred为null,将e的后继节点放在e现在的位置上
- setEntryAt(tab, index, next);
- else
- // 否则将e的后继节点作为pred的后继,即跳过了原来的e节点
- pred.setNext(next);
- // 更新modCount计数器
- ++modCount;
- // 更新map大小
- --count;
- // 赋值给oldValue
- oldValue = v;
- }
- break;
- }
- // 如果key不同,hash值也不同,则往后遍历
- pred = e;
- e = next;
- }
- } finally {
- // 解锁
- unlock();
- }
- return oldValue;
- }
上面源码中的注释已经将remove(Object, int, Object)
方法的原理解释得比较清楚了,这里主要解释几个注意点:
- 删除时,首先找到对应的HashEntry链的头节点,然后遍历进行判断。
- 遍历过程中,当找到符合条件的待删除节点后,如果该节点时头节点,就将其后继节点作为所在链的头节点,具体代码是
setEntryAt(tab, index, next)
;否则就将其后继节点作为其前驱节点的后继节点,也即是跳过待删除节点,具体代码是pred.setNext(next)
,使用的setNext(HashEntry)
方法位于HashEntry类中,前面有提到过。 - 该删除方法会返回删除节点的值,也即是删除掉的键值对的值。
1.7. replace()相关方法
ConcurrentHashMap提供了两个更新操作:replace(K key, V oldValue, V newValue)
和replace(K key, V value)
,这两个方法的不同点在于,前一个方法传入了oldValue
参数,删除前会判断节点的值是否与oldValue
一致,如果一致才会删除;源码如下:
- /**
- * {@inheritDoc}
- *
- * @throws NullPointerException if any of the arguments are null
- */
- public boolean replace(K key, V oldValue, V newValue) {
- int hash = hash(key);
- // 检查传入参数是否合法
- if (oldValue == null || newValue == null)
- throw new NullPointerException();
- // 根据哈希值获取对应的Segment
- Segment<K, V> s = segmentForHash(hash);
- // 调用Segment的replace方法
- return s != null && s.replace(key, hash, oldValue, newValue);
- }
- /**
- * {@inheritDoc}
- *
- * @return the previous value associated with the specified key,
- * or <tt>null</tt> if there was no mapping for the key
- * @throws NullPointerException if the specified key or value is null
- */
- public V replace(K key, V value) {
- int hash = hash(key);
- // 检查传入参数是否合法
- if (value == null)
- throw new NullPointerException();
- // 根据哈希值获取对应的Segment
- Segment<K, V> s = segmentForHash(hash);
- // 调用Segment的replace方法
- return s == null ? null : s.replace(key, hash, value);
- }
从源码可知,底层也分别调用了Segment的两个方法:replace(K key, int hash, V oldValue, V newValue)
和replace(K key, int hash, V value)
,源码如下:
- // 替换节点,返回结果标识是否进行了更新
- final boolean replace(K key, int hash, V oldValue, V newValue) {
- // 上锁
- if (!tryLock())
- scanAndLock(key, hash);
- boolean replaced = false;
- try {
- HashEntry<K, V> e;
- // 遍历
- for (e = entryForHash(this, hash); e != null; e = e.next) {
- K k;
- if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
- // 走到这里说明找到了key和hash都相同的节点
- if (oldValue.equals(e.value)) {
- // 如果找到的节点当前的value与oldValue一致则进行修改
- e.value = newValue;
- // 更新计数器
- ++modCount;
- replaced = true;
- }
- break;
- }
- }
- } finally {
- // 解锁
- unlock();
- }
- // 返回的结果标识是否进行了更新
- return replaced;
- }
- /**
- * 替换节点的值为value,并返回旧值
- * 与上面的replace相比这个方法不会比较旧值是否一致,同时返回值不一样,其他流程一样
- */
- final V replace(K key, int hash, V value) {
- // 加锁
- if (!tryLock())
- scanAndLock(key, hash);
- V oldValue = null;
- try {
- HashEntry<K, V> e;
- // 遍历寻找节点
- for (e = entryForHash(this, hash); e != null; e = e.next) {
- K k;
- if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
- // key相同,hash值也相同,表示找到了节点
- // 记录节点旧值
- oldValue = e.value;
- // 修改节点的value值
- e.value = value;
- ++modCount;
- break;
- }
- }
- } finally {
- // 解锁
- unlock();
- }
- // 返回旧值
- return oldValue;
- }
Segment的两个更新方法也比较简单,上面代码中的注释都解释得比较清楚了。
1.8. get()相关方法
ConcurrentHashMap的get(Object key)
可以根据传入的键获取对应的值,如果没有找到就返回null。该方法底层没有像上面一样调用Segment的方法,而是直接实现的代码逻辑,源码如下:
- /**
- * Returns the value to which the specified key is mapped,
- * or {@code null} if this map contains no mapping for the key.
- *
- * <p>More formally, if this map contains a mapping from a key
- * {@code k} to a value {@code v} such that {@code key.equals(k)},
- * then this method returns {@code v}; otherwise it returns
- * {@code null}. (There can be at most one such mapping.)
- *
- * 获取指定键对应的值,如果在Map中没有找到该键,将返回null
- *
- * @throws NullPointerException if the specified key is null
- */
- public V get(Object key) {
- Segment<K, V> s; // manually integrate access methods to reduce overhead
- HashEntry<K, V>[] tab;
- // 哈希值
- int h = hash(key);
- // 获取Segment对应的索引
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- // 获取Segment
- if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
- // 获取并遍历HashEntry链
- for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
- (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
- e != null; e = e.next) {
- K k;
- // 键相同,哈希值也相同
- if ((k = e.key) == key || (e.hash == h && key.equals(k)))
- // 返回值
- return e.value;
- }
- }
- // 否则返回null
- return null;
- }
根据上面对源码的注释,和对之前的某些操作方法的理解,对于get(Object)
方法的原理想必已经非常简单了,这里就不再赘述。
1.9. rehash()方法
在Segment类的put(K key, int hash, V value, boolean onlyIfAbsent)
方法中提到过,当添加键值时Segment中元素个数超过阈值,就会进行重哈希操作,具体代码片段如下:
- final V put(K key, int hash, V value, boolean onlyIfAbsent) {
- ...
- /**
- * 计算Segment中元素个数是否超过阈值,如果超过了就进行rehash操作
- * 否则将node放在table的index位置上
- */
- int c = count + 1;
- if (c > threshold && tab.length < MAXIMUM_CAPACITY)
- rehash(node);
- else
- setEntryAt(tab, index, node);
- ...
- }
重哈希操作调用的是Segment的rehash(HashEntry)
方法,传入了新构建的HashEntry节点,rehash(HashEntry)
方法源码如下:
- /**
- * Doubles size of table and repacks entries, also adding the
- * given node to new table
- *
- * 重哈希操作,会对Segment进行扩容,然后将传入的节点添加到Segment的table数组中
- */
- @SuppressWarnings("unchecked")
- private void rehash(HashEntry<K, V> node) {
- /*
- * Reclassify nodes in each list to new table. Because we
- * are using power-of-two expansion, the elements from
- * each bin must either stay at same index, or move with a
- * power of two offset. We eliminate unnecessary node
- * creation by catching cases where old nodes can be
- * reused because their next fields won't change.
- * Statistically, at the default threshold, only about
- * one-sixth of them need cloning when a table
- * doubles. The nodes they replace will be garbage
- * collectable as soon as they are no longer referenced by
- * any reader thread that may be in the midst of
- * concurrently traversing table. Entry accesses use plain
- * array indexing because they are followed by volatile
- * table write.
- */
- // 记录旧table
- HashEntry<K, V>[] oldTable = table;
- // 记录旧的容量
- int oldCapacity = oldTable.length;
- // 新的容量是旧的容量的两倍
- int newCapacity = oldCapacity << 1;
- // 计算新的阈值
- threshold = (int) (newCapacity * loadFactor);
- // 先创建一个新的HashEntry数组newtable
- HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];
- int sizeMask = newCapacity - 1;
- for (int i = 0; i < oldCapacity; i++) {
- // 遍历取出旧元素e
- HashEntry<K, V> e = oldTable[i];
- if (e != null) {
- // e的后继
- HashEntry<K, V> next = e.next;
- // 计算新的index为idx
- int idx = e.hash & sizeMask;
- if (next == null) // Single node on list
- // 如果e的后继为空,表示e是一个单节点,则直接将其存入newTable的idx位置
- newTable[idx] = e;
- else {
- // Reuse consecutive sequence at same slot
- // 走到这里说明e是一个HashEntry链的头节点
- HashEntry<K, V> lastRun = e;
- int lastIdx = idx;
- /**
- * 这是一种优化做法,可以将一部分新索引一致的元素一次性迁移
- * 从next开始向后遍历,该过程主要是找到第一个新的index与旧的index不同的节点
- * 通过这个遍历过程,可以找到由于HashEntry链容量较之原来扩大了一倍,计算出的新的index与原来不同的第一个元素,赋值给lastRun
- * 而这个lastRun元素之后的所有元素的新的index都应该与之相同,否则后续的遍历还会更新lastRun的指向
- */
- for (HashEntry<K, V> last = next; last != null; last = last.next) {
- // 计算当前遍历到的节点新的index为k
- int k = last.hash & sizeMask;
- if (k != lastIdx) {
- // 如果当前节点新的index与前驱节点的新的index不同,则记录当前节点新的index为lastIdx
- lastIdx = k;
- // 记录当前节点为lastRun
- lastRun = last;
- }
- }
- /**
- * 将上面计算出来的lastRun放在newTable的lastIdx位置的槽中,这个lastIdx也是上面新计算出来的
- * 由于HashEntry链是一个链表,因此lastRun后面的元素也会一并移动到newTable的lastIdx位置的槽中
- */
- newTable[lastIdx] = lastRun;
- // 复制e元素对应的HashEntry链上剩余的元素到新的newTable中
- for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
- V v = p.value;
- int h = p.hash;
- // 计算新的index为k
- int k = h & sizeMask;
- // 取出原来k位置上的元素n
- HashEntry<K, V> n = newTable[k];
- // 构建新节点,将n作为新节点的后继节点,然后将新节点放在newTable的k位置上,相当于把新节点插在链表头部
- newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
- }
- }
- }
- }
- // 将传入的node添加到对应的HashEntry链表头
- int nodeIndex = node.hash & sizeMask; // add the new node
- node.setNext(newTable[nodeIndex]);
- newTable[nodeIndex] = node;
- // 将newTable赋值给table
- table = newTable;
- }
从上面的源码可以得知,rehash(HashEntry)
方法会对Segment的table
数组进行扩容,扩容过程是创建一个大小为原数组两倍的新数组,然后将原数组中的HashEntry节点全部添加到新数组中;在将原来的HashEntry添加到新数组的过程中,使用了一种优化做法:通过遍历HashEntry链,先找出位于链尾的、扩容后计算出的新的哈希值相同的节点,归功于链表的特性,可以将这些节点一次性进行迁移,减少遍历的次数以提升效率。大家可以好好体会一下重哈希方法的实现。Rehash优化的示意图如下:
1.10. clear()相关方法
ConcurrentHashMap的clear()
方法用于清空Map,源码如下:
- /**
- * Removes all of the mappings from this map.
- * 清空Map中所有的键值对
- */
- public void clear() {
- final Segment<K, V>[] segments = this.segments;
- // 遍历Segment进行清空处理
- for (int j = 0; j < segments.length; ++j) {
- Segment<K, V> s = segmentAt(segments, j);
- if (s != null)
- s.clear();
- }
- }
它的底层也是调用的Segment的clear()
方法,源码如下:
- // 清理操作,会将table中已有的节点全部置为null
- final void clear() {
- lock();
- try {
- // 遍历table数组,将每个元素都置为null
- HashEntry<K, V>[] tab = table;
- for (int i = 0; i < tab.length; i++)
- setEntryAt(tab, i, null);
- ++modCount;
- count = 0;
- } finally {
- unlock();
- }
- }
原理非常简单,就是将Segment实例的table
数组的所有元素都置为null。
1.11. 遍历相关方法
ConcurrentHashMap也像HashMap一样提供了相应的遍历方法,使用方式基本相同。ConcurrentHashMap的遍历方法有以下几个:
- public Set<K> keySet() { ... }
- public Collection<V> values() { ... }
- public Set<Map.Entry<K, V>> entrySet() { ... }
- public Enumeration<K> keys() { ... }
- public Enumeration<V> elements() { ... }
我们这里只介绍前面三个基于Iterator实现的遍历方法,后面两个以Enumeration实现的遍历大家可以参考之前讲解HashMap时的讲解。
使用过HashMap的人应该对keySet()
和values()
这两个方法都比较熟悉,其中keySet()
可以得到键的Set,而values()
可以得到值的Collection,它们的源码如下:
- /**
- * Returns a {@link Set} view of the keys contained in this map.
- * The set is backed by the map, so changes to the map are
- * reflected in the set, and vice-versa. The set supports element
- * removal, which removes the corresponding mapping from this map,
- * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>,
- * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt>
- * operations. It does not support the <tt>add</tt> or
- * <tt>addAll</tt> operations.
- *
- * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator
- * that will never throw {@link ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed to)
- * reflect any modifications subsequent to construction.
- */
- public Set<K> keySet() {
- Set<K> ks = keySet;
- return (ks != null) ? ks : (keySet = new KeySet());
- }
- /**
- * Returns a {@link Collection} view of the values contained in this map.
- * The collection is backed by the map, so changes to the map are
- * reflected in the collection, and vice-versa. The collection
- * supports element removal, which removes the corresponding
- * mapping from this map, via the <tt>Iterator.remove</tt>,
- * <tt>Collection.remove</tt>, <tt>removeAll</tt>,
- * <tt>retainAll</tt>, and <tt>clear</tt> operations. It does not
- * support the <tt>add</tt> or <tt>addAll</tt> operations.
- *
- * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator
- * that will never throw {@link ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed to)
- * reflect any modifications subsequent to construction.
- */
- public Collection<V> values() {
- Collection<V> vs = values;
- return (vs != null) ? vs : (values = new Values());
- }
以keySet()
方法为例,底层其实返回了一个KeySet实例,我们查看KeySet类的代码:
- final class KeySet extends AbstractSet<K> {
- public Iterator<K> iterator() {
- return new KeyIterator();
- }
- public int size() {
- return ConcurrentHashMap.this.size();
- }
- public boolean isEmpty() {
- return ConcurrentHashMap.this.isEmpty();
- }
- public boolean contains(Object o) {
- return ConcurrentHashMap.this.containsKey(o);
- }
- public boolean remove(Object o) {
- return ConcurrentHashMap.this.remove(o) != null;
- }
- public void clear() {
- ConcurrentHashMap.this.clear();
- }
- }
这里我们主要关注的是iterator()
方法返回的迭代器类KeyIterator,该类是ConcurrentHashMap的内部类,源码如下:
- final class KeyIterator
- extends HashIterator
- implements Iterator<K>, Enumeration<K> {
- public final K next() {
- return super.nextEntry().key;
- }
- public final K nextElement() {
- return super.nextEntry().key;
- }
- }
KeyIterator继承自HashIterator,而它主要的两个方法next()
和nextElement()
都是调用的继承自HashIterator类的方法:
- abstract class HashIterator {
- // ConcurrentHashMap里下一个Segment的索引
- int nextSegmentIndex;
- // 下一个HashEntry链的索引
- int nextTableIndex;
- // 当前的HashEntry链
- HashEntry<K, V>[] currentTable;
- // 下一个HashEntry
- HashEntry<K, V> nextEntry;
- // 最后返回
- HashEntry<K, V> lastReturned;
- HashIterator() {
- // 初始化nextSegmentIndex为Segment数组最大索引
- nextSegmentIndex = segments.length - 1;
- nextTableIndex = -1;
- advance();
- }
- /**
- * Set nextEntry to first node of next non-empty table
- * (in backwards order, to simplify checks).
- * 设置nextEntry为下一个不为空的HashEntry链的头节点
- */
- final void advance() {
- for (; ; ) {
- if (nextTableIndex >= 0) {
- // 获取table中的HashEntry,,即在table数组中从后向前的顺序依次获取
- if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null)
- break;
- } else if (nextSegmentIndex >= 0) {
- // 获取Segment,即在Segment数组中从后向前的顺序依次获取
- Segment<K, V> seg = segmentAt(segments, nextSegmentIndex--);
- // 获取Segment相应的table数组
- if (seg != null && (currentTable = seg.table) != null)
- // 将nextTableIndex置为table数组最大索引
- nextTableIndex = currentTable.length - 1;
- } else
- break;
- }
- }
- final HashEntry<K, V> nextEntry() {
- // 获取nextEntry为返回的HashEntry
- HashEntry<K, V> e = nextEntry;
- if (e == null)
- throw new NoSuchElementException();
- lastReturned = e; // cannot assign until after null check
- if ((nextEntry = e.next) == null)
- advance();
- return e;
- }
- // 判断是否有下一个元素
- public final boolean hasNext() {
- return nextEntry != null;
- }
- // 判断是否还有更多的元素
- public final boolean hasMoreElements() {
- return nextEntry != null;
- }
- public final void remove() {
- if (lastReturned == null)
- throw new IllegalStateException();
- // 从ConcurrentHashMap中移除当前遍历到的元素
- ConcurrentHashMap.this.remove(lastReturned.key);
- lastReturned = null;
- }
- }
上面的注释已经将HashIterator实现讲解得很详细了,我们主要关注的是HashIterator的advance()
方法的实现,在这个方法中可以得知,对ConcurrentHashMap的遍历方式其实是从后向前遍历,即在Segment数组的遍历中,是从Segment数组的最后一个元素开始往前遍历,而在HashEntry数组中也是一样,是从HashEntry数组的最后一个元素开始往前遍历,两者综合起来,对ConcurrentHashMap的遍历方式则是从后向前遍历。
关于values()
方法的实现其实和keySet()
方法是基本类似的,这里不再赘述,读者可以自行研究源码。
1.12. 序列化相关方法
ConcurrentHashMap重写了writeObject(ObjectOutputStream)
和readObject(ObjectInputStream)
两个序列化相关的方法,源码如下:
- /**
- * Save the state of the <tt>ConcurrentHashMap</tt> instance to a
- * stream (i.e., serialize it).
- *
- * 序列化写方法
- *
- * @param s the stream
- * @serialData the key (Object) and value (Object)
- * for each key-value mapping, followed by a null pair.
- * The key-value mappings are emitted in no particular order.
- */
- private void writeObject(java.io.ObjectOutputStream s) throws IOException {
- // force all segments for serialization compatibility
- for (int k = 0; k < segments.length; ++k)
- ensureSegment(k);
- // 调用缺省序列化写
- s.defaultWriteObject();
- // 遍历Segment数组
- final Segment<K, V>[] segments = this.segments;
- for (int k = 0; k < segments.length; ++k) {
- Segment<K, V> seg = segmentAt(segments, k);
- // 上锁
- seg.lock();
- try {
- // 遍历Segment的table
- HashEntry<K, V>[] tab = seg.table;
- for (int i = 0; i < tab.length; ++i) {
- HashEntry<K, V> e;
- // 分别写出HashEntry的键和值
- for (e = entryAt(tab, i); e != null; e = e.next) {
- s.writeObject(e.key);
- s.writeObject(e.value);
- }
- }
- } finally {
- // 解锁
- seg.unlock();
- }
- }
- // 写出两个null
- s.writeObject(null);
- s.writeObject(null);
- }
- /**
- * Reconstitute the <tt>ConcurrentHashMap</tt> instance from a
- * stream (i.e., deserialize it).
- *
- * 序列化读方法
- *
- * @param s the stream
- */
- @SuppressWarnings("unchecked")
- private void readObject(java.io.ObjectInputStream s)
- throws IOException, ClassNotFoundException {
- // 调用缺省序列化读
- s.defaultReadObject();
- // 设置哈希种子
- UNSAFE.putIntVolatile(this, HASHSEED_OFFSET, randomHashSeed(this));
- // Re-initialize segments to be minimally sized, and let grow.
- // 先初始化最小的Segment数组,后期根据键值对的添加自动增长
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- final Segment<K, V>[] segments = this.segments;
- for (int k = 0; k < segments.length; ++k) {
- Segment<K, V> seg = segments[k];
- if (seg != null) {
- seg.threshold = (int) (cap * seg.loadFactor);
- seg.table = (HashEntry<K, V>[]) new HashEntry[cap];
- }
- }
- // Read the keys and values, and put the mappings in the table
- // 遍历读入键值对的数据,然后调用put方法添加到Map中
- for (; ; ) {
- K key = (K) s.readObject();
- V value = (V) s.readObject();
- if (key == null)
- break;
- put(key, value);
- }
- }
ConcurrentHashMap的序列化写方法中,首先调用了缺省的序列化写方法,然后遍历了整个Map的键值对数据依次写出;而序列化读方法则相反,首先调用了缺省的序列化读方法,然后根据默认配置初始化了最小的Segment数组和HashEntry数组(在后期的添加方法中会对Map进行动态扩容),接下来依次调用输入流s
的readObject()
方法读取键值对,使用put(K, V)
方法添加到Map中。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...