写点什么

JUC 并发—并发安全集合一

  • 2025-02-21
    福建
  • 本文字数:41927 字

    阅读完需:约 138 分钟

1.JDK 1.7 的 HashMap 的死循环与数据丢失

 

(1)JDK 1.7 的 HashMap 工作原理


一.Hash 算法

put(key, value) => 对 key 执行 Hash 算法 => 根据 Hash 值用类似取模的算法 => 定位数组的某一个元素 => 如果数组元素为空,则将 value 存放在该数组元素里。

 

二.Hash 冲突

如果两个 key 的 Hash 值,经过取模算法定位到数组的同一个位置,此时就会用链表处理这种 Hash 冲突。

 

三.数组扩容

如果数组元素达到了:数组大小 * loadFactor(0.75),此时就会数组扩容。扩容时会按照倍数扩容,首先创建一个两倍大小的新数组。然后遍历原来的数组元素,对每个元素的 key 值进行 Hash 运算。接着将 Hash 运算后的 Hash 值对新数组大小进行取模,定位到新数组位置。

 

(2)JDK 1.7 的 HashMap 并发下导致的环形链表


多线程并发操作 HashMap 时,可能会在扩容过程中形成一个环形链表。比如两个线程同时插入一个元素,而此时恰好两个线程同时触发了数组扩容。那么在数组扩容的过程中,就可能会形成一个环形链表。

 

下面是 JDK1.7 中 HashMap 扩容的核心源码。进行数组扩容时,会使用头插法来进行链表迁移。如果并发执行的两个线程同时使用头插法进行链表迁移,那么就有可能形成一个环形链表。


//JDK1.7的HashMap扩容的核心方法void transfer(Entry[] newTable) {    Entry[] src = table;//旧的数组    int newCapacity = newTable.length;    for (int j = 0; j < src.length; j++) {        Entry<K,V> e = src[j];        if (e != null) {            src[j] = null;            do {                Entry<K,V> next = e.next;                //线程1执行到这里,假设此时的链表为:newTable[i] = <k1,v1> -> <k2,v2>                //那么可知:e = <k1,v1>,next = <k2,v2>                //恰好此时CPU发生了上下文切换,于是切换到线程2去执行扩容                //线程2扩容时处理完链表的这两个节点后,newTable[i]就变成了:<k2,v2> -> <k1,v1>                //然后CPU又切换回线程1来执行,由于此时e = <k1,v1>,那么后续代码对e.next赋值后,e就成为环形链表了:                //也就是e = <k1,v1> -> <k2,v2> -> <k1,v1>,最后e又赋值给newTable[i]                int i = indexFor(e.hash, newCapacity);//在新数组的位置                //头插法:刚开始newTable[i]为null,后来newTable[i]变为<k1,v1>;                //然后当e=<k2,v2>时,这里e设为<k2,v2> -> <k1,v1>,并又赋值给newTable[i]                //接着遍历链表当e=<k3,v3>时,这里e设为<k3,v3> -> <k2,v2> -> <k1,v1> ...                e.next = newTable[i];                newTable[i] = e;                e = next;            } while (e != null);        }    }}
复制代码


(3)环形链表引发的死循环与数据丢失


一.环形链表导致死循环

假如执行 get(k3)时,k3 的 Hash 取模算法定位到环形链表的位置。于是开始遍历环形链表,但由于环形链表里没有 k3 的值,所以会导致在环形链表中无法找到 k3 对应的值进行返回。这样就导致了一直在环形链表中进行死循环,无法退出遍历。最后导致 CPU 飙升,线上系统被这个 get 操作卡死。

 

二.环形链表导致丢失数据

上面例子就导致了从出发是无法找到的,因此这条数据就永久丢失了,甚至会被垃圾回收掉。

 

(4)JDK 1.7 和 JDK 1.8 的 HashMap 对比


在 JDK1.7 中,HashMap 采用数组 + 链表的数据结构来存储数据。在多个线程并发扩容时,可能会造成环形链表最终导致死循环和数据丢失。

 

在 JDK1.8 中,HashMap 采用数组 + 链表 + 红黑树的数据结构来存储数据,并且优化了 JDK1.7 中的数组扩容方案,解决了死循环和数据丢失的问题。但是在并发场景下调用 put()方法时,有可能会存在数据覆盖的问题。

 

(5)并发安全的集合


比如 HashTable 使用 synchronized 来保证线程的安全性,比如 Collections.synchronizedMap 可以把一个线程不安全的 Map,通过 synchronized 的方式,将其变成安全的。

 

但是这些方法在线程竞争激烈的情况下,效率都比较低。因为它们都是在方法层面上使用了 synchronized 实现的锁机制,从而导致不管是 put 操作还是 get 操作都需要去竞争同一把锁。

 

ConcurrentHashMap 既能保证并发安全,性能也好于 HashTable 等集合。

 

2.ConcurrentHashMap 的并发安全


(1)如何理解 ConcurrentHashMap 的并发安全


只能保证多线程并发执行时,容器中的数据不会被破坏。无法保证涉及多个线程的复合操作的正确性,复合操作会有并发安全问题。

 

(2)ConcurrentHashMap 在复合操作中的安全问题


假设需要通过一个 ConcurrentHashMap 来记录每个用户的访问次数。如果指定用户已经有访问次数的记录,则进行递增,否则添加新访问记录。

 

如下代码在多线程并发调用时,会存在并发安全问题。虽然 ConcurrentHashMap 对于数据操作本身是安全的,但这里是复合操作,也就是"读—修改—写",而这三个操作作为一个整体却不是原子的。所以当多个线程访问同一个用户时,很可能会覆盖相互操作的结果,从而造成该用户的访问记录次数少于实际访问次数。


public class Demo {    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);    		public static void main(String[] args) throws InterruptedException {        Long accessCount = USER_ACCESS_COUNT.get("张三");        if (accessCount == null) {            USER_ACCESS_COUNT.put("张三", 1L);        } else {            USER_ACCESS_COUNT.put("张三", accessCount + 1);        }    }}
复制代码


(3)ConcurrentMap 可解决复合操作的安全问题


虽然 ConcurrentHashMap 是并发安全的,但对于其复合操作需要特别关注。上述复合操作的安全问题的解决方案是,可以对复合操作加锁,也可以使用 ConcurrentMap 接口来解决复合操作的安全问题。

 

ConcurrentMap 是一个支持并发访问的 Map 集合,相当于在原本的 Map 集合上新增了一些方法来扩展 Map 的功能。

 

ConcurrentMap 接口定义的如下 4 个方法,都能满足原子性的,可以用在 ConcurrentHashMap 的复合操作场景中。


//A java.util.Map providing thread safety and atomicity guarantees.public interface ConcurrentMap<K, V> extends Map<K, V> {    ...    //向ConcurrentHashMap集合插入数据    //如果插入数据的key不存在于集合中,则保存当前数据并且返回null    //如果key已经存在,则返回存在的key对应的value    V putIfAbsent(K key, V value);        //根据key和value来删除ConcurrentHashMap集合中的元素    //该删除操作必须保证key和value完全匹配,删除成功则返回true,否则返回false    boolean remove(Object key, Object value);        //根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue    //该替换操作必须保证key和oldValue玩去匹配,替换成功则返回true,否则返回false    boolean replace(K key, V oldValue, V newValue);        //和replace(key, oldValue, newValue)不同之处在于,少了对oldValue的判断    //如果替换成功,则返回替换之前的value,否则返回null    V replace(K key, V value);    ...}
复制代码


因此,可以基于 ConcurrentMap 提供的接口对上述 Demo 进行改造。将原来 ConcurrentHashMap 第一次的 put()方法替换为 putIfAbsent()方法,将原来 ConcurrentHashMap 修改用的 put()方法替换为 replace()方法。由于 putIfAbsent()方法和 replace()方法都能保证原子性,所以并发安全了。同时增加一个 while(true)方法以实现一个类似自旋的操作,确保操作成功。


public class KeyUtil {    private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
public static void main(String[] args) throws InterruptedException { while (true) { Long accessCount = USER_ACCESS_COUNT.get("张三"); if (accessCount == null) { if (USER_ACCESS_COUNT.putIfAbsent("张三", 1L) == null) { break; } } else { if (USER_ACCESS_COUNT.replace("张三", accessCount, accessCount + 1)) { break; } } } }}
复制代码


(4)ConcurrentMap 支持 lambda 表达式操作


一.computeIfAbsent()方法

该方法通过判断传入的 key 是否存在来对 ConcurrentMap 进行数据初始化。如果 key 存在,则不做任何处理。如果 key 不存在,则调用 mappingFunction 计算出 value 值添加到 Map。


//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);
复制代码


二.computeIfPresent()方法

该方法对已经存在的 key 对应的 value 值进行修改。如果 key 不存在,则返回 null。如果 key 存在,则调用 mappingFunction 计算出 value 值修改 Map。


//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);
复制代码


三.compute()方法

compute()方法是 computeIfAbsent()方法和 computeIfPresent()方法的结合体。不管 key 是否存在,都会调用 mappingFunction 计算出 value 值。如果 key 存在,则对 value 进行修改。如果 key 不存在,则进行初始化处理。


//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);
//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);
//如果张三这个用户存在,则对其value加1,否则初始化其值为1USER_ACCESS_COUNT.compute("张三", (k,v) -> (v == null) ? 1L : v + 1);
复制代码


3.ConcurrentHashMap 的设计介绍


(1)JDK1.8 相比于 JDK1.7 的改进


一.取消了 segment 分段设计,直接使用 Node 数组来保存数据

采用 Node 数组元素作为锁的粒度,进一步减少并发冲突的范围和概率。

 

二.引入红黑树设计

红黑树降低了极端情况下查询某个结点数据的时间复杂度,从 O(n)降低到了 O(logn),提升了查找性能。

 

(2)ConcurrentHashMap 的设计思想


一.通过对数组元素加锁来降低锁的粒度

二.多线程进行并发扩容

三.高低位迁移方法

四.链表转红黑树及红黑树转链表

五.分段锁来实现数据统计

 

(3)ConcurrentHashMap 的数据结构定义


ConcurrentHashMap 采用 Node 数组来存储数据,数组长度默认为 16。Node 表示数组中的一个具体的数据结点,并且实现了 Map.Entry 接口。Node 的 key 和 val 属性,表示实际存储的 key 和 value。Node 的 hash 属性,表示当前 key 对应的 hash 值。Node 的 next 属性,表示如果是链表结构,则指向下一个 Node 结点。

 

当链表长度大于等于 8 + Node 数组长度大于 64 时,链表会转为红黑树,红黑树的存储使用 TreeNode 来实现。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //The default initial table capacity.    //Must be a power of 2 (i.e., at least 1) and at most MAXIMUM_CAPACITY.    private static final int DEFAULT_CAPACITY = 16;        //The array of bins. Lazily initialized upon first insertion.    //Size is always a power of two. Accessed directly by iterators.    transient volatile Node<K,V>[] table;//用来存储ConcurrentHashMap数据的Node数组        //Key-value entry.      //This class is never exported out as a user-mutable Map.Entry     //(i.e., one supporting setValue; see MapEntry below),     //but can be used for read-only traversals used in bulk tasks.    //Subclasses of Node with a negative hash field are special,     //and contain null keys and values (but are never exported).      //Otherwise, keys and vals are never null.    static class Node<K,V> implements Map.Entry<K,V> {        final int hash;//当前key对应的hash值        final K key;//实际存储的key        volatile V val;//实际存储的value        volatile Node<K,V> next;//如果是链表结构,则指向下一个Node结点             Node(int hash, K key, V val, Node<K,V> next) {            this.hash = hash;            this.key = key;            this.val = val;            this.next = next;        }        ...    }        //Nodes for use in TreeBins    static final class TreeNode<K,V> extends Node<K,V> {        TreeNode<K,V> parent;//red-black tree links        TreeNode<K,V> left;        TreeNode<K,V> right;        TreeNode<K,V> prev;//needed to unlink next upon deletion        boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } ... } ...}
复制代码


4.ConcurrentHashMap 的 put 操作流程


(1)ConcurrentHashMap 的 put 操作流程


首先通过 key 的 hashCode 的高低 16 位的位与操作来计算 key 的 hash 值,让 32 位的 hashCode 都参与运算以降低数组大小小于 32 时哈希冲突的概率。

 

然后判断 Node 数组是否为空或者 Node 数组的长度是否为 0。如果为空或者为 0,则调用 initTable()方法进行初始化。如果不为空,则通过 hash & (n - 1)计算当前 key 在 Node 数组中的下标位置。并通过 tabAt()方法获取该位置的值 f,然后判断该位置的值 f 是否为空。

 

如果该位置的值 f 为空,则把当前的 key 和 value 封装成 Node 对象。然后尝试通过 casTabAt()方法使用 CAS 设置该位置的值 f 为封装好的 Node 对象。如果 CAS 设置成功,则退出 for 循环,否则继续进行下一次 for 循环。

 

如果该位置的值 f 不为空,则判断 Node 数组是否正处于扩容中。如果是,那么当前线程就调用 helpTransfer()方法进行并发扩容。如果不是,那么说明当前的 key 在 Node 数组中出现了 Hash 冲突。于是通过 synchronized 关键字,对该位置的值 f 进行 Hash 冲突处理。其实 JUC 还可以继续优化,比如先用 CAS 尝试修改哈希冲突下的链表或红黑树。如果 CAS 修改失败,那么再通过使用 synchronized 对该数组元素加锁来进行处理。

 

最后,会调用 addCount()方法统计 Node 数组中的元素个数。


public class ConcurrentHashMapDemo {    public static void main(String[] args) {        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();        map.put("k1", "v1");    }}
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { ... //The array of bins. Lazily initialized upon first insertion. //Size is always a power of two. Accessed directly by iterators. transient volatile Node<K,V>[] table; //Creates a new, empty map with the default initial table size (16). public ConcurrentHashMap() { } //Creates a new, empty map with an initial table size //accommodating the specified number of elements without the need to dynamically resize. //@param initialCapacity The implementation performs internal sizing to accommodate this many elements. public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) { throw new IllegalArgumentException(); } int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //Returns a power of two table size for the given desired capacity. private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } static final int spread(int h) { //通过hashCode的高低16位的异或运算来计算hash值,以降低数组大小比32小的时候的哈希冲突概率 return (h ^ (h >>> 16)) & HASH_BITS; } //Maps the specified key to the specified value in this table. //Neither the key nor the value can be null. public V put(K key, V value) { return putVal(key, value, false); } //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性 //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的 //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //CAS设置Node数组的元素为某个Node对象 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) { throw new NullPointerException(); } //通过key的hashCode的高低16位的位与操作来计算hash值 int hash = spread(key.hashCode()); int binCount = 0; //这是一个没有结束条件的for循环,用来自旋 //其中Node数组的引用赋值给了tab变量 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) { //调用initTable()方法初始化Node数组 tab = initTable(); } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) { break;// no lock when adding to empty bin } } else if ((fh = f.hash) == MOVED) { //如果发现Node数组正处于扩容中,那么就进行并发扩容 tab = helpTransfer(tab, f); } else { V oldVal = null; //处理Hash冲突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) {//如果是链表 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) { e.val = value; } break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) {//如果是红黑树 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) { p.val = value; } } } } } if (binCount != 0) { //如果链表的元素大于等于8 if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8 treeifyBin(tab, i);//链表转红黑树 } if (oldVal != null) { return oldVal; } break; } } } //调用addCount()方法统计Node数组元素的个数 addCount(1L, binCount); return null; } ...}
复制代码


(2)ConcurrentHashMap 和 HashMap 的 put 操作


都是通过 key 的 hashCode 的高低 16 位的异或运算,来降低 Hash 冲突概率。

 

都是通过 Hash 值与数组大小-1 的位与运算(取模),来定位 key 在数组的位置。

 

但 ConcurrentHashMap 使用了自旋 + CAS + synchronized 来处理 put 操作,从而保证了多个线程对数组里某个 key 进行赋值时的效率 + 并发安全性。


public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {    static final int TREEIFY_THRESHOLD = 8;//链表转红黑树的阈值    ...        public V put(K key, V value) {        return putVal(hash(key), key, value, false, true);    }      final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {        Node<K,V>[] tab; Node<K,V> p; int n, i;        if ((tab = table) == null || (n = tab.length) == 0) {            n = (tab = resize()).length;        }        if ((p = tab[i = (n - 1) & hash]) == null) {            //如果通过哈希寻址算法定位到的下标为i的数组元素为空(即tab[i]为空)            //那么就可以直接将一个新创建的Node对象放到数组的tab[i]这个位置;            tab[i] = newNode(hash, key, value, null);        } else {            Node<K,V> e; K k;            if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {                //通过哈希寻址算法定位到的数组位置已有Node元素                //那么判断是否为相同的key,如果是相同的key则进行value覆盖                e = p;            } else if (p instanceof TreeNode) {                //通过哈希寻址算法定位到的数组位置已有Node元素,而且不是相同的key                //那么通过"p instanceof TreeNode)",判断数组的tab[i]元素是否是一颗红黑树                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);            } else {                ...            }            ...        }        ++modCount;        //判断数组大小size,是否已经达到了扩容阈值threshold大小        if (++size > threshold) {            resize();        }        afterNodeInsertion(evict);        return null;    }    ...}
复制代码


(3)为什么 ConcurrentHashMap 是并发安全的


首先在初始化 Node 数组时,会通过自旋 + CAS 去设置 sizeCtl 的值来获得锁。然后在 put()操作时,也会通过自旋 + CAS 去设置数组某个位置的值。当出现 Hash 冲突时,则使用 synchronized 关键字来修改数组某个位置的值。

 

5.ConcurrentHashMap 的 Node 数组初始化


(1)调用 put()方法时才初始化 Node 数组


Node 数组的初始化过程是被动的,当调用 ConcurrentHashMap.put()方法时,如果发现 Node 数组还没有被初始化,才会调用 initTable()方法完成初始化。

 

(2)initTable()方法的初始化逻辑


initTable()方法和一般的初始化方法不同,因为需要考虑多线程并发情形。

 

首先 while 循环的退出条件是 Node 数据即 table 初始化成功,否则一直循环。这其实就使用到了自旋的机制,因为多个线程调用 initTable()必然会竞争。而在竞争的情况下如果不采用独占锁机制,就只能通过自旋来不断重试。

 

然后通过 sizeCtl 是否小于 0 来判断当前是否有其他线程正在进行初始化。如果有,则通过 Thread.yield()把自己变成就绪状态,释放 CPU 资源。如果没有,则通过 CAS 修改 sizeCtl 变量的值为-1。

 

如果 CAS 修改 sizeCtl 成功,则表示当前线程获取初始化 Node 数组的锁成功了;

如果 CAS 修改 sizeCtl 失败,则表示当前线程获取初始化 Node 数组的锁失败了;

 

对于获取锁失败的线程,会继续进入下一次 while 循环进行重试,这样设计是为了避免出现多个线程同时初始化 Node 数组。

 

对于获取锁成功的线程,首先会判断 Node 数组是否已经初始化完成。如果 Node 数组已经初始化完成,则退出 while 循环。如果 Node 数组还是空,则创建一个 Node 数组,然后赋值给 table 变量。并且计算下次扩容的阈值(0.75 倍当前数组容量),然后赋值给 sizeCtl。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     //sizeCtl = -1,表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组    //sizeCtl < -1,用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量    //sizeCtl = 0,表示Node数组未初始化,并且在ConcurrentHashMap构造方法中没有指定初始容量    //sizeCtl > 0,如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75),如果未初始化,则表示数组的初始容量    private transient volatile int sizeCtl;    private static final long SIZECTL;      static {        try {            U = sun.misc.Unsafe.getUnsafe();//获取UnSafe对象            Class<?> k = ConcurrentHashMap.class;            SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));//获取sizeCtl变量的偏移量            ...        } catch (Exception e) {            throw new Error(e);        }    }    ...        //初始化Node数组    //Initializes table, using the size recorded in sizeCtl.    private final Node<K,V>[] initTable() {        Node<K,V>[] tab; int sc;        //退出while循环的条件是Node数组即table初始化成功        while ((tab = table) == null || tab.length == 0) {            //判断当前是否有其他线程正在进行初始化            if ((sc = sizeCtl) < 0) {                //如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源                Thread.yield();//lost initialization race; just spin            } else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                //如果没有线程正在进行初始化,则通过CAS修改sizeCtl变量的值为-1                //如果CAS修改成功,则表示当前线程获得了初始化数组的锁                //如果CAS修改失败,则表示当前线程获取初始化数组的锁失败                try {                    //再次判断Node数组是否为空,即Node数组是否已经初始化完成                    //因为执行Thread.yield()让出CPU资源的线程必然会再次执行到这里                    if ((tab = table) == null || tab.length == 0) {                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                        @SuppressWarnings("unchecked")                        //初始化大小为n的Node数组,然后赋值给tab变量和table变量                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        //赋值给ConcurrentHashMap的全局Node数组table                        table = tab = nt;                        //计算下次扩容的阈值,阈值的计算是当前数组容量的0.75倍                        sc = n - (n >>> 2);                    }                } finally {                    //最后将扩容的阈值赋值给sizeCtl                    sizeCtl = sc;                }                break;            }        }        return tab;    }    ...}
复制代码


(3)sizeCtl 的状态流转


一.sizeCtl = -1

表示当前有线程抢占到了初始化 Node 数组的资格,正在初始化 Node 数组。


二.sizeCtl < -1

用 sizeCtl 值的二进制低 16 位来记录当前参与扩容的线程数量。


三.sizeCtl = 0

表示 Node 数组未初始化,且创建 ConcurrentHashMap 时没有指定初始容量。


四.sizeCtl > 0

如果 Node 数组已经初始化,那么 sizeCtl 表示扩容的阈值(初始容量 * 0.75)。如果 Node 数组未初始化,则表示数组的初始容量。


 

6.ConcurrentHashMap 对 Hash 冲突的处理


(1)Hash 冲突的几个解决方案


一.开放寻址法

如果位置 i 被占用,那么就探查 i+1、i+2、i+3 的位置。ThreadLocal 采用的就是开放寻址法。

 

二.链式寻址法

Hash 表的每个位置都连接一个链表。当发生 Hash 冲突时,冲突的元素会被加入到这个位置的链表中。ConcurrentHashMap 就是基于链式寻址法解决 Hash 冲突的。

 

三.再 Hash 法

提供多个不同的 Hash 函数,当发生 Hash 冲突时,使用第二个、第三个等。

 

(2)ConcurrentHashMap 对 Hash 冲突的处理


首先使用 synchronized 对当前位置的 Node 对象 f 进行加锁。由于这种锁控制在数组的单个数据元素上,所以长度为 16 的数组理论上就可以支持 16 个线程并发写入数据。

 

然后判断当前位置的 Node 对象 f 是链表还是红黑树。如果是链表,那么就把当前的 key/value 封装成 Node 对象插入到链表的尾部。如果是红黑树,那么就调用 TreeBin 的 putTreeVal()方法往红黑树插入结点。

 

最后判断链表的长度是否大于等于 8,如果链表的长度大于等于 8,再调用 treeifyBin()方法决定是扩容数组还是将链表转化为红黑树。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //The array of bins. Lazily initialized upon first insertion.    //Size is always a power of two. Accessed directly by iterators.    transient volatile Node<K,V>[] table;        //Maps the specified key to the specified value in this table.    //Neither the key nor the value can be null.    public V put(K key, V value) {        return putVal(key, value, false);    }        //获取Node数组在位置i的元素    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的    //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);    }        //CAS设置Node数组的元素为某个Node对象    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);    }        final V putVal(K key, V value, boolean onlyIfAbsent) {        if (key == null || value == null) {            throw new NullPointerException();        }              //通过key的hashCode的高低16位的位与操作来计算hash值        int hash = spread(key.hashCode());        int binCount = 0;               //这是一个没有结束条件的for循环,用来自旋        //其中Node数组的引用赋值给了tab变量        for (Node<K,V>[] tab = table;;) {            Node<K,V> f; int n, i, fh;            if (tab == null || (n = tab.length) == 0) {                //调用initTable()方法初始化Node数组                tab = initTable();            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                //如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {                    break;// no lock when adding to empty bin                }            } else if ((fh = f.hash) == MOVED) {                //发现Node数组正处于扩容中,那么就进行并发扩容                tab = helpTransfer(tab, f);            } else {                V oldVal = null;                //处理Hash冲突                synchronized (f) {//使用synchronized对当前数组位置的Node对象f进行加锁                    if (tabAt(tab, i) == f) {                        if (fh >= 0) {//如果是链表                            binCount = 1;//binCount用来记录链表的长度                            //从链表的头结点开始遍历每个结点                            for (Node<K,V> e = f;; ++binCount) {                                K ek;                                //如果存在相同的key,则修改该key的value                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                                    oldVal = e.val;                                    if (!onlyIfAbsent) {                                        e.val = value;                                    }                                    break;                                }                                Node<K,V> pred = e;                                //找到链表的最后一个结点                                if ((e = e.next) == null) {                                    //把当前的key/value封装成Node对象插入到链表的尾部                                    pred.next = new Node<K,V>(hash, key, value, null);                                    break;                                }                            }                        } else if (f instanceof TreeBin) {//如果是红黑树                            Node<K,V> p;                            binCount = 2;                            //调用TreeBin的putTreeVal()方法往红黑树插入结点                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {                                oldVal = p.val;                                if (!onlyIfAbsent) {                                    p.val = value;                                }                            }                        }                    }                }                //最后判断链表的长度是否大于等于8                if (binCount != 0) {                    //如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是转化为红黑树                    if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8                        treeifyBin(tab, i);//是扩容数组还是转化为红黑树                    }                    if (oldVal != null) {                        return oldVal;                    }                    break;                }            }        }        //调用addCount()方法统计Node数组元素的个数        addCount(1L, binCount);        return null;    }    ...}
复制代码


(3)链表长度大于 8 时是扩容还是转化为红黑树


当链表长度 >= 8 时ConcurrentHashMap 会对链表采用两种方式进行优化。

 

方式一:对数组进行扩容

当数组长度 <= 64,且链表长度 >= 8 时,优先选择对数组进行扩容。

 

方式二:把链表转化为红黑树

当数组长度 > 64,且链表长度 >= 8 时,会将链表转化为红黑树。

 

treeifyBin()方法的作用是根据相关阈值来决定是扩容还是把链表转为红黑树。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     static final int MIN_TREEIFY_CAPACITY = 64;    ...    //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.    private final void treeifyBin(Node<K,V>[] tab, int index) {        Node<K,V> b; int n, sc;        if (tab != null) {            //如果当前数组的长度小于64,则调用tryPresize()方法进行数组扩容            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {                tryPresize(n << 1);//数组扩容            } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {                synchronized (b) {                    if (tabAt(tab, index) == b) {                        TreeNode<K,V> hd = null, tl = null;                        for (Node<K,V> e = b; e != null; e = e.next) {                            //构建一个TreeNode并插入红黑树中                            TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);                            if ((p.prev = tl) == null) {                                hd = p;                            } else {                                tl.next = p;                            }                            tl = p;                        }                        setTabAt(tab, index, new TreeBin<K,V>(hd));                    }                }            }        }    }    ...}
复制代码


7.ConcurrentHashMap 的并发扩容机制


(1)ConcurrentHashMap 扩容的前置操作


ConcurrentHashMap 的 tryPresize()方法用于处理数组扩容前的前置操作,该方法主要分为四部分。

 

第一部分:

首先通过 tableSizeFor()方法计算传入 size 的最小的 2 的幂次方。

 

第二部分:

然后判断 Node 数组是否已初始化,如果还没初始化则要先进行初始化。初始化时会计算扩容阈值为数组大小的 0.75 倍 + 将扩容阈值赋值给 sizeCtl。

 

第三部分:

如果 Node 数组已经初始化,则判断是否需要进行扩容。如果 Node 数组已经被其他线程完成扩容,则当前线程退出循环,无需扩容。如果 Node 数组已达到最大容量,则无法再进行扩容,也需退出循环。

 

第四部分:

调用 transfer()方法开始执行扩容操作。如果 sizeCtl < 0,说明此时已经有其他线程在执行扩容了。如果 sizeCtl >= 0,说明此时没有其他线程进行扩容。当前线程都会先通过 CAS 成功设置 sizeCtl 后,再调用 transfer()方法来扩容。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Returns a power of two table size for the given desired capacity.    private static final int tableSizeFor(int c) {        int n = c - 1;        n |= n >>> 1;        n |= n >>> 2;        n |= n >>> 4;        n |= n >>> 8;        n |= n >>> 16;        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;    }
//Tries to presize table to accommodate the given number of elements. private final void tryPresize(int size) { //一.首先通过tableSizeFor()方法计算传入size的最小的2的幂次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //二.判断Node数组是否已经初始化,如果还没初始化,需要先进行初始化 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2);//扩容阈值为数组大小的0.75倍 } } finally { sizeCtl = sc;//将扩容阈值赋值给sizeCtl } } } //三.如果Node数组已经初始化,则判断是否需要进行扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) { //c <= sc,说明Node数组已经被其他线程完成扩容了,不需要再进行扩容 //n >= MAXIMUM_CAPACITY,说明Node数组已达到最大容量,无法再进行扩容 break; } //四.调用transfer()方法开始执行扩容操作 else if (tab == table) { int rs = resizeStamp(n); //如果sc < 0,说明此时已经有其他线程在执行扩容了 //于是当前线程可以先通过CAS成功设置sizeCtl的值后,再调用transfer()方法协助扩容 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) { break; } if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nt); } } //如果sc >= 0,说明此时没有其他线程进行扩容 //于是当前线程也是先通过CAS成功设置sizeCtl的值后,再调用transfer()方法进行扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) { transfer(tab, null); } } } } ...}
复制代码


(2)ConcurrentHashMap 并发扩容的机制


一.ConcurrentHashMap 中的扩容设计


扩容就是创建一个 2 倍原大小的数组,然后把原数组的数据迁移到新数组中。但多线程环境下的扩容,需要考虑其他线程会同时往数组添加元素的情况。如果简单地对扩容过程增加一把同步锁,保证扩容过程不存在其他线程操作,那么就会对性能的损耗特别大,特别是数据量比较大时,阻塞的线程会很多。

 

首先使用 CAS 来实现计算每个线程的迁移区间。然后使用 synchronized 把锁粒度控制到每个数组元素上。如果数组有 16 个元素就有 16 把锁,如果数组有 32 个元素就有 32 把锁。接着如果线程 A 在进行数组扩容时,线程 B 要修改数组的某个元素 f。那么就让修改元素的线程加入迁移,从而实现多线程并发扩容来提高效率。等数组扩容完成后,线程 B 才继续去修改元素 f。最后通过高低位迁移逻辑计算出高位链和低位链,大大减少了数据迁移次数。

 

二.多线程并发扩容的原理


当存在多个线程并发扩容及数据迁移时,默认会给每个线程分配一个区间。这个区间的默认长度是 16,每个线程会负责自己区间内的数据迁移工作。如果只有两个线程对长度为 64 的数组迁移数据,则每个线程要做 2 次迁移,迁移过程会依赖 transferIndex 来更新每个线程的迁移区间。

 

(3)ConcurrentHashMap 并发扩容的流程


ConcurrentHashMap 的 transfer()方法用于处理数组扩容时的流程细节,该方法主要分为五部分:

 

第一部分:创建扩容后的数组


这部分代码主要做两件事情:

一.计算每个线程处理的迁移区间长度,默认是 16。

二.初始化一个新的数组 nt,赋值给方法入参 nextTab 和全局变量 nextTable。该数组的长度是原数组的 2 倍,并且设置 transferIndex 的值为为原数组大小。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     //The next table to use; non-null only while resizing.    private transient volatile Node<K,V>[] nextTable;    ...        //Moves and/or copies the nodes in each bin to new table.     //tab是原数组,nextTab是扩容后的数组    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        int n = tab.length, stride;        //计算每个线程处理的迁移区间长度,默认是16        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {            stride = MIN_TRANSFER_STRIDE;//subdivide range        }        //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍        //并且设置transferIndex的值为为原数组大小        if (nextTab == null) {//initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n                nextTab = nt;//将创建的扩容数组赋值给nextTab            } catch (Throwable ex) {//try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;//将创建的扩容数组赋值给nextTable            transferIndex = n;//设置transferIndex为原来的数组大小        }        ...    }}
复制代码


第二部分:计算当前线程的数据迁移区间


下面的 while 循环会计算每个执行到此处的线程需要负责的数据迁移区间。假设当前数组长度是 32,需要扩容到 64。那么此时 transferIndex = 32,nextn = 64,n = 32。

 

当前线程第一次 for 循环:nextIndex 被 transferIndex 赋值为 32,之后 CAS 修改 transferIndex。CAS 修改成功后,nextBound = 32 - 16 = 16,transferIndex = 16。所以 bound = 16,i = 31,当前线程负责的迁移区间为[bound, i] = [16, 31]。

 

当前线程第二次 for 循环,或者有其他线程进来第一次 for 循环:由于此时 transferIndex = 16,所以 nextIndex 会被 transferIndex 赋值为 16。之后 CAS 修改 transferIndex 为 0,修改成功后,nextBound = 16 - 16 = 0。所以 bound = 0,i = 15,此时线程负责的迁移区间为[bound, i] = [0, 15]。

 

需要注意的是:每次循环都会通过 if (--i >= bound || finishing)判断区间是否已迁移完成。如果已完成,则会继续进入 while 循环中的 CAS,获取新的迁移区间。

 

数组从高位往低位进行迁移,比如第一次 for 循环,处理的区间是[16, 31]。那么就会从位置为 31 开始往前进行遍历,对每个数组元素进行数据迁移。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     //The next table to use; non-null only while resizing.    private transient volatile Node<K,V>[] nextTable;    ...        //Moves and/or copies the nodes in each bin to new table.     //tab是原数组,nextTab是扩容后的数组    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        ...        int nextn = nextTab.length;//扩容后的数组长度        //继承自Node的ForwardingNode表示一个正在被迁移的Node        //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        //advance字段用来判断是否还有待处理的数据迁移工作,也就是扩容标记        boolean advance = true;        boolean finishing = false; // to ensure sweep before committing nextTab        //当前线程负责的迁移区间是[bound, i]        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            //while循环会计算每个执行到此处的线程需要负责的数据迁移区间            while (advance) {                //假设当前数组长度是32,需要扩容到64;                //那么此时transferIndex = 32,nextn = 64,n = 32;                //刚开始循环时i = 0,nextIndex被transferIndex赋值为32                int nextIndex, nextBound;                if (--i >= bound || finishing) {                    //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中                    //但后来bound = 16, i = 31后,就会进入这里,退出循环                    //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间                    advance = false;                } else if ((nextIndex = transferIndex) <= 0) {                    //判断当前线程是否已经分配到了新的迁移区间                    i = -1;                    advance = false;                } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {                     //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,                    //那么advance设置为false,退出while循环                    //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间                    bound = nextBound;//第一次for循环nextBound = 16                    i = nextIndex - 1;//第一次for循环i = 31                    advance = false;                }            }            ...        }        ...    }        //A node inserted at head of bins during transfer operations.    static final class ForwardingNode<K,V> extends Node<K,V> {        final Node<K,V>[] nextTable;        ForwardingNode(Node<K,V>[] tab) {            super(MOVED, null, null, null);            this.nextTable = tab;        }        ...    }}
复制代码


第三部分:更新扩容标记 advance


如果位置 i 的数组元素 Node 为空,说明该 Node 对象不需要迁移。所以通过 casTabAt()方法修改原数组在位置 i 的元素为 fwd 对象,这样其他线程在进行 put()操作的时候就可以发现当前数组正在扩容。

 

如果位置 i 的数组元素 Node 的 hash 值为 MOVED,那么说明该 Node 对象已经被迁移了。所以设置扩容标记位 advance 为 true,等下次 for 循环时进入 while 循环--i。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     //The next table to use; non-null only while resizing.    private transient volatile Node<K,V>[] nextTable;        //The array of bins. Lazily initialized upon first insertion.    //Size is always a power of two. Accessed directly by iterators.    transient volatile Node<K,V>[] table;        ...    //Moves and/or copies the nodes in each bin to new table.     //tab是原数组,nextTab是扩容后的数组    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        ...        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        ...        //当前线程负责的迁移区间是[bound, i]        for (int i = 0, bound = 0;;) {            ...            } else if ((f = tabAt(tab, i)) == null) {                //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容                advance = casTabAt(tab, i, null, fwd);            } else if ((fh = f.hash) == MOVED) {                //设置扩容标记位advance为true,等下次for循环时进入while循环--i                advance = true; // already processed                //第三部分结束            } else {            ...        }        ...    }        //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);    }
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }}
复制代码


第四部分:开始数据迁移和扩容


首先对当前要迁移的 Node 结点 f 添加同步锁 synchronized,避免多线程竞争。

 

如果结点 f 的哈希值大于 0,则表示 Node 结点 f 为链表或普通结点,那么此时就需要按照链表或普通结点的方式来进行数据迁移。

 

如果结点 f 属于 TreeBin 类型,则表示结点 f 为红黑树,那么此时就要按红黑树的规则进行数据迁移。

 

需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于 6 时,红黑树就会转化为链表。

 

接着使用高位链和低位链的计算方法构造高位链和低位链,遍历链表的每一个结点,计算 p.hash & n 的值。如果值为 0,表示需要迁移,属于高位链;否则不需要迁移,属于低位链。

 

比如在数组长度为 16 的一个链表中,hash 值为:4, 20, 52, 68, 84, 100。经过 hash & (n - 1)得到的下标位置都是 4,接着数组长度需要扩容到 32。于是经过 hash & (n - 1)计算,发现 20, 52, 84 对应的下标变成了 20。这就意味着,这个链表中 hash 值为 20, 52, 84 的结点需要迁移到位置 20。

 

最后把低位链设置到扩容后的数组的位置 i,把高位链设置到位置 i + n。此时当前线程已处理完位置为 i 的数据迁移,于是设置 advance 为 true,让后续的 for 循环可以进入 while 循环来实现对 i 的递减继续迁移数据。

 

第五部分:完成迁移后的判断


如果数据迁移完成了,则把扩容后的数组赋值给 table。如果还没完成数据迁移,则通过 CAS 修改并发扩容的线程数。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     //The next table to use; non-null only while resizing.    private transient volatile Node<K,V>[] nextTable;    ...        //Moves and/or copies the nodes in each bin to new table.     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        //第一部分开始:创建扩容后的数组        int n = tab.length, stride;//n就是原数组大小        //计算每个线程处理的迁移区间长度,默认是16        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {            stride = MIN_TRANSFER_STRIDE;//subdivide range        }        //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍        //并且设置transferIndex的值为为原数组大小        if (nextTab == null) {//initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n                nextTab = nt;//将创建的扩容数组赋值给nextTab            } catch (Throwable ex) {//try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;//将创建的扩容数组赋值给nextTable            transferIndex = n;//设置transferIndex为原来的数组大小        }        //第一部分结束        //第二部分开始:计算当前线程的数据迁移区间        int nextn = nextTab.length;//扩容后的数组长度        //继承自Node的ForwardingNode表示一个正在被迁移的Node        //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        //advance字段用来判断是否还有待处理的数据迁移工作        boolean advance = true;        boolean finishing = false; // to ensure sweep before committing nextTab               //当前线程负责的迁移区间是[bound, i]        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            //while循环会计算每个执行到此处的线程需要负责的数据迁移区间            while (advance) {                //假设当前数组长度是32,需要扩容到64;                //那么此时transferIndex = 32,nextn = 64,n = 32;                //刚开始循环时i = 0,nextIndex被transferIndex赋值为32                int nextIndex, nextBound;                if (--i >= bound || finishing) {                    //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中                    //但后来bound = 16, i = 31后,就会进入这里,退出循环                    //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间                    advance = false;                } else if ((nextIndex = transferIndex) <= 0) {                    //判断当前线程是否已经分配到了新的迁移区间                    i = -1;                    advance = false;                } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {                     //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,                    //那么advance设置为false,退出while循环                    //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间                    bound = nextBound;//第一次for循环nextBound = 16                    i = nextIndex - 1;//第一次for循环i = 31                    advance = false;                }            }            //第二部分结束            if (i < 0 || i >= n || i + n >= nextn) {                //第五部分开始:完成迁移后的判断                int sc;                //如果数据迁移完成了,则把扩容后的数组赋值给table                if (finishing) {                    nextTable = null;                    table = nextTab;                    sizeCtl = (n << 1) - (n >>> 1);                    return;                }                //如果还没完成数据迁移,则通过CAS修改并发扩容的线程数                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {                        return;                    }                    finishing = advance = true;                    i = n; // recheck before commit                }                //第五部分结束            } else if ((f = tabAt(tab, i)) == null) {                //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容                advance = casTabAt(tab, i, null, fwd);            } else if ((fh = f.hash) == MOVED) {                //设置扩容标记位advance为true,等下次for循环时进入while循环--i                advance = true; // already processed                //第三部分结束            } else {                //第四部分开始:开始数据迁移和扩容                synchronized (f) {//首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争                    if (tabAt(tab, i) == f) {                        Node<K,V> ln, hn;                        //如果fh >= 0,则表示Node结点f为链表或普通结点,此时需要按照链表或普通结点的方式来进行数据迁移                        if (fh >= 0) {                            int runBit = fh & n;                            Node<K,V> lastRun = f;                            //for循环遍历链表,计算出当前链表最后一个需要迁移或者不需要迁移的结点位置                            //遍历链表的每一个结点,计算p.hash & n,如果值为0,表示需要迁移,否则不需要迁移                            for (Node<K,V> p = f.next; p != null; p = p.next) {                                int b = p.hash & n;                                if (b != runBit) {                                    runBit = b;                                    lastRun = p;                                }                            }                            if (runBit == 0) {                                ln = lastRun;                                hn = null;                            } else {                                hn = lastRun;                                ln = null;                            }                            for (Node<K,V> p = f; p != lastRun; p = p.next) {                                int ph = p.hash; K pk = p.key; V pv = p.val;                                if ((ph & n) == 0) {                                    //将ln作为参数,以ln为基础构造低位链,不需要迁移                                    ln = new Node<K,V>(ph, pk, pv, ln);                                } else {                                    //将hn作为参数,以hn为基础构造高位链,需要迁移                                    hn = new Node<K,V>(ph, pk, pv, hn);                                }                            }                            //把低位链设置到扩容后的数组的位置i                            setTabAt(nextTab, i, ln);                            //把高位链设置到扩容后的数组的位置i + n                            setTabAt(nextTab, i + n, hn);                            //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了                            setTabAt(tab, i, fwd);                            //当前线程已处理完位置为i的数据的迁移,于是设置advance为true,让后续的for循环继续进入while循环来实现对i的递减                            advance = true;                        } else if (f instanceof TreeBin) {                            //如果f instanceof TreeBin,则表示结点f为红黑树,需要按照红黑树的规则进行数据迁移                            //需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表                            TreeBin<K,V> t = (TreeBin<K,V>)f;                            TreeNode<K,V> lo = null, loTail = null;                            TreeNode<K,V> hi = null, hiTail = null;                            int lc = 0, hc = 0;                            for (Node<K,V> e = t.first; e != null; e = e.next) {                                int h = e.hash;                                TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);                                if ((h & n) == 0) {                                    if ((p.prev = loTail) == null) {                                        lo = p;                                    } else {                                        loTail.next = p;                                    }                                    loTail = p;                                    ++lc;                                } else {                                    if ((p.prev = hiTail) == null) {                                        hi = p;                                    } else {                                        hiTail.next = p;                                    }                                    hiTail = p;                                    ++hc;                                }                            }                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;                            setTabAt(nextTab, i, ln);                            setTabAt(nextTab, i + n, hn);                            setTabAt(tab, i, fwd);                            advance = true;                        }                    }                }                //第四部分结束            }        }    }    ...}
复制代码


8.ConcurrentHashMap 的分段锁统计元素数据


(1)ConcurrentHashMap 维护数组元素个数思路


当调用完 put()方法后,ConcurrentHashMap 必须增加当前元素的个数,以方便在 size()方法中获得存储的元素个数。

 

在常规的集合中,只需要一个全局 int 类型的字段保存元素个数即可。每次添加一个元素,就对这个 size 变量 + 1。

 

在 ConcurrentHashMap 中,需要保证对该变量修改的并发安全。如果使用同步锁 synchronized,那么性能开销比较大,不合适。所以 ConcurrentHashMap 使用了自旋 + 分段锁来维护元素个数。

 

(2)ConcurrentHashMap 维护数组元素个数流程


ConcurrentHashMap 采用了两种方式来保存元素的个数。当线程竞争不激烈时,直接使用 baseCount + 1 来增加元素个数。当线程竞争比较激烈时,构建一个 CounterCell 数组,默认长度为 2。然后随机选择一个 CounterCell,针对该 CounterCell 中的 value 进行保存。

 

增加元素个数的流程如下:



(3)维护数组元素个数的 addCount()方法


addCount()方法的作用主要包括两部分:

一.累加 ConcurrentHashMap 中的元素个数

二.通过 check >= 0 判断是否需要进行数组扩容

 

其中增加数组元素个数的核心逻辑是:

首先通过 CAS 修改全局成员变量 baseCount 来进行累加。注意会先判断(as = counterCells) != null,再尝试对 baseCount 进行累加。这是因为如果一个集合发生过并发,那么后续发生并发的可能性会更大。如果 CAS 累加 baseCount 失败,则尝试使用 CounterCell 来进行累加。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Base counter value, used mainly when there is no contention,    but also as a fallback during table initialization races. Updated via CAS.    private transient volatile long baseCount;        //Table of counter cells. When non-null, size is a power of 2.    private transient volatile CounterCell[] counterCells;
private static final long BASECOUNT; static { try { U = sun.misc.Unsafe.getUnsafe(); ... BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount")); ... } catch (Exception e) { throw new Error(e); } } //Maps the specified key to the specified value in this table. //Neither the key nor the value can be null. public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { ... //调用addCount()方法统计Node数组元素的个数 addCount(1L, binCount); return null; } //Adds to count, and if table is too small and not already resizing, initiates transfer. //If already resizing, helps perform transfer if work is available. //Rechecks occupancy after a transfer to see if another resize is already needed because resizings are lagging additions. //x是要增加的数组元素个数 private final void addCount(long x, int check) { CounterCell[] as; long b, s; //首先通过CAS修改全局成员变量baseCount来进行累加 //注意:这里先判断(as = counterCells) != null,再尝试对baseCount进行CAS累加 //这是因为如果一个集合发生过并发,那么后续发生并发的可能性会更大,这种思想在并发编程中很常见 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //增加数组元素个数 CounterCell a; long v; int m; boolean uncontended = true; //如果CAS修改baseCount失败,则尝试使用CounterCell来进行累加 //1.as == null,说明CounterCell数组还没初始化 //2.(m = as.length - 1) < 0,说明CounterCell数组还没初始化 //3.(a = as[ThreadLocalRandom.getProbe() & m]) == null,说明CounterCell数组已经创建了, //但是Hash定位到的数组位置没有对象实例,说明这个数字还存在没有CounterCell实例对象的情况 //4.如果U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)返回false,说明存在多线程竞争 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //调用fullAddCount()方法实现数组元素个数的累加 fullAddCount(x, uncontended); return; } if (check <= 1) { return; } //sumCount()方法返回总的元素个数,也就是CounterCell数组的元素个数和baseCount两者的和 s = sumCount(); } if (check >= 0) { //处理数组扩容 Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) { break; } if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nt); } } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) { transfer(tab, null); } s = sumCount(); } } } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; } } } return sum; } ...}
复制代码


(4)维护数组元素个数的 fullAddCount()方法


fullAddCount()方法的作用主要包括三部分:初始化 CounterCell 数组、增加数组元素个数、对 CounterCell 数组扩容。

 

注意:为了定位当前线程添加的数组元素个数应落到 CounterCell 数组哪个元素,会使用 ThreadLocalRandom 确定当前线程对应的 hash 值,由该 hash 值和 CounterCell 数组大小进行类似于取模的位与运算来决定。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.    private transient volatile int cellsBusy;
//Table of counter cells. When non-null, size is a power of 2. private transient volatile CounterCell[] counterCells; //x是要增加的数组元素个数 private final void fullAddCount(long x, boolean wasUncontended) { //通过ThreadLocalRandom来确定当前线程对应的hash值 int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit();//force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false;// True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; //(as = counterCells) != null && (n = as.length) > 0,表示counterCells数组已经完成初始化 if ((as = counterCells) != null && (n = as.length) > 0) { //第二部分:增加数组元素个数,分两种情况 if ((a = as[(n - 1) & h]) == null) { //情况一:(a = as[(n - 1) & h]) == null,表示将当前线程定位到counterCells数组的某位置的元素为null //此时直接把当前要增加的元素个数x保存到新创建的CounterCell对象,然后将该对象赋值到CounterCell数组的该位置即可 if (cellsBusy == 0) {//Try to attach new Cell //先创建一个CounterCell对象,并把x保存进去 CounterCell r = new CounterCell(x);//Optimistic create //U.compareAndSwapInt(this, CELLSBUSY, 0, 1)返回true,表示当前线程占有了锁 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try {//Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //把新构建的保存了元素个数x的CounterCell对象保存到rs[j]的位置 rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) { break; } continue;//Slot is now non-empty } } collide = false; } else if (!wasUncontended) {//CAS already known to fail wasUncontended = true;//Continue after rehash } else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) { //情况二:如果将当前线程定位到counterCells数组的某位置的元素不为null, //那么直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)操作,对counterCells数组的指定位置进行累加 break; } else if (counterCells != as || n >= NCPU) { collide = false;//At max size or stale } else if (!collide) { collide = true; } else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //第三部分:counterCells数组扩容 //需要先通过cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),抢占锁 try { if (counterCells == as) {// Expand table unless stale //在原有的基础上扩容一倍 CounterCell[] rs = new CounterCell[n << 1]; //通过for循环进行数据迁移 for (int i = 0; i < n; ++i) { rs[i] = as[i]; } //把扩容后的对象赋值给counterCells counterCells = rs; } } finally { //恢复标识 cellsBusy = 0; } collide = false; continue;//继续下一次自旋 } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //第一部分:初始化CounterCell数组 //cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),通过cellsBusy字段来抢占锁,通过CAS修改该字段值为1表示抢到锁 boolean init = false; try {//Initialize table if (counterCells == as) { //构造一个元素个数为2的CounterCell数组 CounterCell[] rs = new CounterCell[2]; //把要增加的数组元素个数x,保存到CounterCell数组的某个元素中 rs[h & 1] = new CounterCell(x); //把初始化的CounterCell数组赋值给全局对象counterCells counterCells = rs; init = true; } } finally { //恢复标识 cellsBusy = 0; } if (init) { break; } } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) { break;//Fall back on using base } } } ...}
复制代码


(5)获取数组元素个数的 size()方法


sumCount()方法会先得到 baseCount 的值,保存到 sum 字段中。然后遍历 CounterCell 数组,把每个 value 进行累加。

 

注意:size()方法在计算总的元素个数时并没有加锁,所以 size()方法返回的元素个数不一定代表此时此刻总数量。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.    private transient volatile int cellsBusy;
//Table of counter cells. When non-null, size is a power of 2. private transient volatile CounterCell[] counterCells; public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; } } } return sum; } ...}
复制代码


9.ConcurrentHashMap 的查询操作是否涉及锁


(1)put 操作会加锁


首先尝试通过 CAS 设置 Node 数组对应位置的 Node 元素。如果该位置的 Node 元素非空,或者 CAS 设置失败,则说明发生了哈希冲突。此时就会使用 synchronized 关键字对该数组元素加锁来处理链表或者红黑树。

 

其实 JUC 还可以继续优化,先用 CAS 尝试修改哈希冲突下的链表或红黑树。如果 CAS 修改失败,再通过使用 synchronized 对该数组元素加锁来处理。

 

(2)size 操作不会加锁


size()方法在计算总的元素个数时并没有加锁,所以 size()方法返回的元素个数不一定代表此时此刻数组元素的总数量。

 

(3)get 操作也不会加锁


get()方法也使用了 CAS 操作,通过 Unsafe 类让数组中的元素具有可见性。保证线程根据 tabAt()方法获取数组的某个位置的元素时,能获取最新的值。所以 get 不加锁,但通过 volatile 读数组,可以保证读到数组元素的最新值。

 

虽然 table 变量使用了 volatile,但这只保证了 table 引用对所有线程的可见性,还不能保证 table 数组中的元素的修改对于所有线程是可见的。因此才通过 Unsafe 类的 getObjectVolatile()来保证 table 数组中元素的可见性。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //The array of bins. Lazily initialized upon first insertion.    //Size is always a power of two. Accessed directly by iterators.    transient volatile Node<K,V>[] table;        public V get(Object key) {        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;        int h = spread(key.hashCode());        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {            if ((eh = e.hash) == h) {                if ((ek = e.key) == key || (ek != null && key.equals(ek))) {                    return e.val;                }            } else if (eh < 0) {                return (p = e.find(h, key)) != null ? p.val : null;            }            while ((e = e.next) != null) {                if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                    return e.val;                }            }        }        return null;    }        //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性    //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);    }    ...}
复制代码


10.ConcurrentHashMap 中红黑树的使用


(1)treeifyBin()方法的逻辑


put 操作中当发现链表元素>=8 时会调用 treeifyBin()方法将链表转为红黑树。首先通过 tabAt()方法从 Node 数组中获取位置为 index 的元素并赋值给变量 b,然后使用 synchronized 对 Node 数组中位置为 index 的元素 b 进行加锁,接着通过 for 循环遍历 Node 数组中位置为 index 的元素 b 这个链表,并且根据链表中每个结点的数据封装成一个 TreeNode 对象来组成新链表,最后把新链表的头结点作为参数传给 TreeBin 构造方法来完成红黑树的构建。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.    //将Node数组tab中位置为index的元素,从链表转化为红黑树    private final void treeifyBin(Node<K,V>[] tab, int index) {        Node<K,V> b; int n, sc;        if (tab != null) {            if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {                tryPresize(n << 1);//数组扩容            } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {                synchronized (b) {//b就是链表,先用synchronized对b加锁,保证并发安全                    if (tabAt(tab, index) == b) {                        TreeNode<K,V> hd = null, tl = null;//hd是新链表的头结点,tl是新链表的尾结点                        //将链表b赋值给e,然后遍历通过e.next赋值回给e来遍历链表                        for (Node<K,V> e = b; e != null; e = e.next) {                            //根据Node结点e来封装一个TreeNode对象                            TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);                            if ((p.prev = tl) == null) {                                hd = p;                            } else {                                //尾插法构造新链表                                tl.next = p;                            }                            tl = p;                        }                        //将构造好的新链表的头结点hd作为参数,创建一个TreeBin对象                        setTabAt(tab, index, new TreeBin<K,V>(hd));                    }                }            }        }    }    ...}
复制代码


(2)TreeBin 的成员变量和方法


ConcurrentHashMap 中红黑树用继承自 Node 的 TreeNode 来表示。TreeBin 则主要提供了红黑树的一系列功能的实现,并且实现了读写锁。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    //Nodes for use in TreeBins    static final class TreeNode<K,V> extends Node<K,V> {        TreeNode<K,V> parent;//red-black tree links        TreeNode<K,V> left;        TreeNode<K,V> right;        TreeNode<K,V> prev;//needed to unlink next upon deletion        boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } ... } //TreeNodes used at the heads of bins. //TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root. //They also maintain a parasitic read-write lock forcing writers (who hold bin lock) //to wait for readers (who do not) to complete before tree restructuring operations. static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root;//红黑树根结点 volatile TreeNode<K,V> first;//链表头结点,由构造函数传入 volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态) volatile int lockState;//表示锁的状态 // values for lockState static final int WRITER = 1;//写锁状态 static final int WAITER = 2;//等待获取写锁状态 static final int READER = 4;//读锁状态 ... //构造函数,将以b为头结点的链表转换为红黑树 //Creates bin with initial set of nodes headed by b. TreeBin(TreeNode<K,V> b) { ... } //对红黑树的根结点加写锁 //Acquires write lock for tree restructuring. private final void lockRoot() { if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) { contendedLock(); // offload to separate method } } //释放写锁 //Releases write lock for tree restructuring. private final void unlockRoot() { lockState = 0; } //根据key获取指定的结点 //Returns matching node or null if none. //Tries to search using tree comparisons from root, but continues linear search when lock not available. final Node<K,V> find(int h, Object k) { ... } ... } ...}
复制代码


(3)TreeBin 在构造方法中将链表转为红黑树


treeifyBin()方法在对链表进行转化时,会先构建一个双向链表,然后将该双向链表传入 TreeBin 的构造方法。

 

TreeBin 的构造方法会通过如下处理将该双向链表转化为红黑树:

一.如果红黑树为空,则初始化红黑树的根结点

二.如果红黑树不为空,则按平衡二叉树逻辑插入

三.通过 balanceInsertion()方法进行自平衡

 

TreeBin 的构造方法可以分为三部分:

 

第一部分:初始化红黑树

遍历链表 b,将链表 b 的头结点设置为红黑树的根结点,接着设置红黑树根结点的左右子结点为 null,以及设置红黑树根结点为黑色。

 

第二部分:将链表中的结点添加到初始化好的红黑树

首先计算 dir 的值。如果 dir = -1,表示红黑树中被插入结点的 hash 值大于新添加结点 x 的 hash 值。如果 dir = 1,表示红黑树中被插入结点的 hash 值小于新添加结点 x 的 hash 值。然后根据 dir 的值来决定新添加结点 x 是被插入结点的左结点还是右结点,最后调用 TreeBin 的 balanceInsertion()方法对红黑树进行自平衡处理。

 

第三部分:对红黑树进行自平衡

调用 TreeBin 的 balanceInsertion()方法对红黑树进行自平衡处理。


public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {     ...    static final class TreeBin<K,V> extends Node<K,V> {        TreeNode<K,V> root;//红黑树根结点        volatile TreeNode<K,V> first;//链表头结点,由构造函数传入        volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)        volatile int lockState;//表示锁的状态        //values for lockState        static final int WRITER = 1;//写锁状态        static final int WAITER = 2;//等待获取写锁状态        static final int READER = 4;//读锁状态        ...        //构造函数,将以b为头结点的链表转换为红黑树        TreeBin(TreeNode<K,V> b) {            //第一部分开始:初始化红黑树            super(TREEBIN, null, null, null);            this.first = b;            //r表示红黑树的根结点            TreeNode<K,V> r = null;            //遍历链表b,x将作为新添加的红黑树结点            for (TreeNode<K,V> x = b, next; x != null; x = next) {                next = (TreeNode<K,V>)x.next;                //把新添加的红黑树结点x的左右子结点设置为null                x.left = x.right = null;                //r表示红黑树的根结点,r == null表示红黑树为空,将x结点设置为红黑树的根结点                if (r == null) {                    x.parent = null;                    //把红黑树的根结点设置为黑色                    x.red = false;                    r = x;                    //第一部分结束                } else {                    //第二部分开始:将链表中的结点添加到初始化好的红黑树中                    //x是新添加的红黑树结点                    K k = x.key;                    int h = x.hash;                    Class<?> kc = null;                    //p是红黑树中被插入的结点                    for (TreeNode<K,V> p = r;;) {                        int dir, ph;                        K pk = p.key;                        //首先计算dir的值                        //dir = -1,表示红黑树中被插入结点的hash值大于新添加结点x的hash值                        //dir = 1,表示红黑树中被插入结点的hash值小于新添加结点x的hash值                        if ((ph = p.hash) > h) {                            dir = -1;                        } else if (ph < h) {                            dir = 1;                        } else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) {                             dir = tieBreakOrder(k, pk);                            TreeNode<K,V> xp = p;                        }                        //然后根据dir的值来决定新添加的结点x是左结点还是右结点                        if ((p = (dir <= 0) ? p.left : p.right) == null) {                            x.parent = xp;                            if (dir <= 0) {                                xp.left = x;                            } else {                                xp.right = x;                            }                            //第二部分结束                            //第三部分开始:红黑树进行自平衡                            //r代表一棵红黑树,x代表往红黑树r添加的结点                            r = balanceInsertion(r, x);                            //第三部分结束                            break;                        }                    }                }            }            this.root = r;            assert checkInvariants(root);        }        ...    }    ...}
复制代码


(4)TreeBin 在插入元素时实现红黑树的自平衡



public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { ... static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root;//红黑树根结点 volatile TreeNode<K,V> first;//链表头结点,由构造函数传入 volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态) volatile int lockState;//表示锁的状态 // values for lockState static final int WRITER = 1;//写锁状态 static final int WAITER = 2;//等待获取写锁状态 static final int READER = 4;//读锁状态 ... //root代表一棵红黑树,x代表往红黑树r添加的结点 static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) { //所有往红黑树添加的结点默认为红色 x.red = true; //自旋,xp表示添加结点x的父结点,xpp表示添加结点x的爷结点,xppl表示爷结点的左结点,xppr表示爷结点的右结点 for (TreeNode<K,V> xp, xpp, xppl, xppr;;) { if ((xp = x.parent) == null) {//此处判断条件表示:x结点的父结点为空 //由于只有根结点的父结点才会为空,所以此时x结点为根结点,于是设置根结点x为黑色 x.red = false; return x; } else if (!xp.red || (xpp = xp.parent) == null) {//此处判断条件表示:表示x结点的父结点为黑色,或者x结点的爷结点为空 //那么直接返回红黑树root,不需要处理 return root; } //代码执行到这里,说明x结点的父结点为红色 if (xp == (xppl = xpp.left)) {//此处判断条件表示:表示x结点的父结点xp是其爷结点xpp的左子结点xppl if ((xppr = xpp.right) != null && xppr.red) {//此处判断条件表示:x结点的叔结点存在且为红色 //那么直接修改父结点和叔结点的颜色为黑色 xppr.red = false; xp.red = false; xpp.red = true; x = xpp; } else {//此处判断条件表示:如果x结点的叔结点不存在,或者叔结点存在且为黑色 if (x == xp.right) {//如果x结点是父结点的右子结点,则按x结点的父结点进行左旋 root = rotateLeft(root, x = xp);//将x结点的父结点赋值给x结点 xpp = (xp = x.parent) == null ? null : xp.parent; } if (xp != null) { xp.red = false; if (xpp != null) { xpp.red = true; root = rotateRight(root, xpp); } } } } else {//表示x结点的父结点是其爷结点的右子结点 if (xppl != null && xppl.red) { xppl.red = false; xp.red = false; xpp.red = true; x = xpp; } else { if (x == xp.left) { root = rotateRight(root, x = xp); xpp = (xp = x.parent) == null ? null : xp.parent; } if (xp != null) { xp.red = false; if (xpp != null) { xpp.red = true; root = rotateLeft(root, xpp); } } } } } } static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) { TreeNode<K,V> r, pp, rl; if (p != null && (r = p.right) != null) { if ((rl = p.right = r.left) != null) { rl.parent = p; } if ((pp = r.parent = p.parent) == null) { (root = r).red = false; } else if (pp.left == p) { pp.left = r; } else { pp.right = r; } r.left = p; p.parent = r; } return root; }
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) { TreeNode<K,V> l, pp, lr; if (p != null && (l = p.left) != null) { if ((lr = p.left = l.right) != null) { lr.parent = p; } if ((pp = l.parent = p.parent) == null) { (root = l).red = false; } else if (pp.right == p) { pp.right = l; } else { pp.left = l; } l.right = p; p.parent = l; } return root; } ... }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18727528

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
JUC并发—并发安全集合一_算法_不在线第一只蜗牛_InfoQ写作社区