拿捏了!ConcurrentHashMap!,宝塔 linux 建站教程
另外,比较重要的一个属性就是 sizeCtl。如果看过 Doug Lea 老爷子在 JUC 下的其他类,经常会有一个特殊的变量表示当前对象的状态。并且已 CAS 的方式去修改这个变量,实现自旋锁的功能(例如:AQS 中的 state)。这里的 sizeCtl 就是一个富有特殊意义的变量。当 sizeCtl 大于 0 时,表示扩容的阈值(没错,就是 HashMap 中 threshold 变量的作用),而且上文我们也了解到在 JDK8 中由于 loadfactor 已经被固定为 0.75f。因此在正常状态下(非扩容状态),sizeCtl = oldCap >> 1 - (oldCap << 2)。 而 sizeCtl == -1 是一个特殊的状态标志,表示 ConcurrentHashMap 正在初始化底层数组。当 sizeCtl 为其他负数时,表示 ConcurrentHashMap 正在进程扩容,其中,高 16 位可以反应出扩容前数组的大小,而后 16 位可以反应出此时参与扩容的线程数。
内部类
ConcurrentHashMap 拥有大量的内部类,但其中大部分都是用来遍历或是在 Fork/Join 框架中平行遍历时使用的。这部分类内部类我们不在过多介绍。主要看 CountCell 和几个 Node 的类。
CounterCell
首先,CounterCell 是用来统计 ConcurrentHashMap 用的,其内部有个 value,用来表示元素个数。size()函数就是通过累加 countCells 数组中所有 CounterCell 的 value 值,再加上 BaseCount 得到的。相当于 ConcurrentHashMap 把 size 这个属性拆散保存在了个多个地方。
Node
同 HashMap 一样,为了提高链表的遍历速度,ConcurrentHashMap 也引用了红黑树。而 Node 就表示链表中的节点,并且他还是其他节点的父类。
TreeNode
TreeNode 表示红黑树中的节点,按照红黑树的标准,它还拥有父节点和左右子节点的属性,此外还需要标识是否为红节点。
TreeBin
TreeBin 是一个特殊的节点,用来指向红黑树的根节点,并不存储真实的元素,因此它的节点的哈希值是一个固定的特殊值-2。
ForwardingNode
ForwardingNode 和 TreeBin 一样,并不存储实际元素,而是指向 nextTable,哈希值也是一个特殊的固定值(-1)。它在扩容中会使用,表示这个桶上的元素已经迁移到新的数组中去了。
ReservationNode
同样是一个特殊值,在 putIfAbsent 时使用。因为 put 时需要对桶上的元素上对象锁(ConcurrentHashMap 并非是完全无锁的,只是尽可能少的去使用锁),这时就会添加一个临时占位用的节点 ReservationNode。
构造函数
因为构造函数是公有的 API,所以必须要和 JDK7 中保持一致。虽然其中的部分含义可能发生了一些变化。我们看一下参数最全的构造函数。
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;}
上述方法中的三个参数分别是初始容量 initialCapacity,负载因子 loadfactor 和并行等级 concurrenyLevel。首先,loadfactor 负载因子在 JDK8 的 ConcurrentHashMap 运行时都已经固定为 0.75f,因此这里的参数只能在创建时,帮助确定初始的数组容量。同样的,由于不在使用 JDK7 的 Segement 实现方式,因此这里的 concurrencyLevel 不在用来确定 Segement 的数量。对于 JDK8 中的 ConcurrentHashMap 而言,锁的粒度是对数组的每个桶(理论上可以对每个桶进行并发操作),因此 concurrencyLevel 的含义也就是用来确定底层数据的初始容量。这也正是size = (long)(1.0 + (long)initialCapacity / loadFactor);
这行代码的意义(这里的 initialCapacity 是取参数中 initialCapacity 和 concurrenyLevel 中的最大值)。
另外需要注意的一点是,size 并不是最终我们数组的容量,ConcurrentHashMap 会通过tableSizeFor()
方法找出大于等于 size 的最小 2 的幂次方数作为容量。(这和 HashMap 是一样的,需要保证容量为 2 的幂次,因为之后的散列操作都是基于这一前提)。最后,在得出了初始容量后,ConcurrentHashMap 仅是将容量通过 sizeCtl 来保存,而并没有直接初始化数组。数组的初始化会被延迟到第一次 put 数据时(这样设计可能是出于节省内存的目的)。
put 过程
有了前文的铺垫,我们就可以开始了解 ConcurrentHashMap 的 put 过程了。
先在这里做个声明,本文不会对红黑树的部分展开详细分析,之后用链表升级成红黑树,红黑树退化成链表,在红黑树中查找直接概括某些过程。
put()的具体实现都是由 putVal()这个函数实现的。因此这里我们对 putVal()函数展开分析。
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;//for 循环,一直尝试,直到 put 成功 for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//tab 未初始化,先初始化 tabif (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//对应的 bucket 上还没有元素//采用 CAS 尝试 PUT 元素,如果此时没有其它线程操作,这里将会 PUT 成功 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)//如果 tab 正在扩容 tab = helpTransfer(tab, f);else {//bucket 上已经存在元素 V oldVal = null;//只针对头节点同步,不影响其他 bucket 上的元素,提高效率 synchronized (f) {//同步块内在做一次检查 if (tabAt(tab, i) == f) {//说明头节点未发生改变,如果发生改变,则直接退出同步块,并再次尝试 if (fh >= 0) { //哈希值大于 0 说明是 tab[i]上放的是链表 因为对于红黑树而言 tab[i]上放的是 TreeBin 一个虚拟的节点 其哈希值固定为-2binCount = 1;for (Node<K,V> e = f;; ++binCount) {//查询链表,如果存在相同 key,则更新,否则插入新节点 K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {//如果是红黑树,则以红黑树的方式插入 Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}//判断链表是否需要转成树//值得注意的一点是,这段代码并未在同步块中,应该也是出于效率考虑 if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
整体了解 putVal()的流程
先整体的了解下 putVal(),不对 for()循环中代码具体分析。第一步是校验要 put 的 Key/Value 不能为 null。因此 ConcurrentHashMap 和 HashMap 不同,不支持空的键值。第二步是 spread(key.hashCode())是对键的哈希值做一个扰动,这里通过h^(h>>>16)
的算法实现的,这样做的目的有两个,一是避免了设计不好的 hashCode 函数造成碰撞的概率加大,二是确保了扰动后的哈希值均为正数(因为负数哈希值都是一些特殊的节点)。第三步是 for()循环,这里通过 CAS+自选保证线程安全,暂时先不具体分析。第四步 addCount()应该是表示成功往 ConcurrentHashMap 添加了元素后,让更新元素的数量(当然,我们可以猜想对于替换节点的情况,应该是不会执行这一步的)。这个方法的具体分析我们放在扩容的步骤中。
分析 for()循环中的代码
for()循环中的代码 同样分成了四个部分:
第一步:如果底层数组还没有初始化,通过 initTable()初始化数组
initTable()方法如下:
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;//同样 采用不断重试的方式,而非直接使用锁 while ((tab = table) == null || tab.length == 0) {//sizeCtl < 0 表示 table 正在被初始化或是 reszieif ((sc = sizeCtl) < 0)//当前线程先等待 Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //使用 CAS 操作更新 sizeCtl,标记为正在初始化中//由于采用了 CAS 操作,因此该块的方法可以认为是线程安全的 try {if ((tab = table) == null || tab.length == 0) {//初始化 int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//表示下次需要扩容的值 (1 - 1/4) = 0.75fsc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;}
initTable()不算复杂,首先为了避免多个线程同时进行初始化,需要通过 sizeCtl 进行控制。当线程发现sizeCtl<0
时,就知道此时已经有其他线程在初始化了,那么它会主动让出 CPU 时间,等待初始化完成。如果 sizeCtl 并不是小于 0,说明暂时没有其他线程在初始化,这时候要先通过 CAS 更新 sizeCtl 的值为-1(相当于抢占了自旋锁),然后开始初始化底层数组并设置为 table,然后计算下次扩容的阈值存放在 sizeCtl(具体值为 n - (n >>>2),即容量的 0.75*n)。
第二步:如果已经初始化但是对应桶上元素为 null,那么尝试 CAS 更新
首先,这里确定桶的算法是通过之前 spread()得到的哈希值 h 和数组容量 n 进行一次h & (n -1)
。这个方式和 HashMap 是相同的,因为 n 是 2 的幂,换成二进制,就是高位为 1 之后的低位全为 0 的数,那么这个数减 1 就成了全为 1 的一个数。以这样的方式代替取余的运算不仅计算更快,也能更好的利用哈希值散列。如果,这一步 CAS 失败,说明此时有其他线程也在操作该桶,那么当前线程在下次 for()循环时会进入下列的第三和第四步中。
第三步:说明已经初始化且桶上有元素,那么判断元素是否为 ForwardNode
如果线程发现自己要操作的桶上的节点是 ForwardNode(可以通过其特殊的哈希值判断),那么就说明此时 ConcurrentHashMap 扩容,线程可能会加入帮助扩容。具体的我们放在扩容的部分介绍。
第四步:说明桶上元素是正常元素,那么就要比对这个桶所有元素,进行更新或插入
这里说明该桶上存放的是正常的元素(TreeBin 虽然是一个特殊节点,但也是正常状态下存在的节点),为了线程安全,这里需要对桶上的元素进行上锁synchronized(f)
。然后在遍历桶上所有的元素,选择更新或者插入。第一,需要注意的是,上锁后的第一件事就是进行 double-check 的判断,看上锁过程中头节点是否发生了变化。这很重要,如果头节点发生了变化,那么对之前的头节点 f 上锁是无法保证线程安全的。第二,对于桶上是链表的情况(f.hash > 0
),ConcurrentHashMap 会遍历链表,比较链表的各个节点,如果之前存在相同的 key,那么替换该节点的 value 值(保存节点的旧值用于返回)。如果不存在相同的 key,那么创建新的节点插入链表(注意,ConcurrentHashMap 用的是尾插发,即插入链表尾部)。第三,针对是 TreeBin 的节点,说明桶上关联的是红黑树,则通过红黑树的方式进行插入或更新。
扩容过程
扩容过程过程可能要比 put 过程要稍微复杂一些。首先我们从上文提到的 addCount()函数开始分析。
addCount()更新元素的容器个数
当 ConcurrentHashMap 添加了元素之后,需要通过 addCount()更新元素的个数。并且如果发现元素的个数达到了扩容阈值(sizeCtl),那么将进行 resize()操作。
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
//更新 sizeif ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}//resizeif (check >= 0) {Node<K,V>[] tab, nt; int n, sc;//不断 CAS 重试 while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {//需要 resize//为每个 size 生成一个独特的 stamp 这个 stamp 的第 16 为必为 1 后 15 位针对每个 n 都是一个特定的值 表示 n 最高位的 1 前面有几个零 int rs = resizeStamp(n);//sc 会在库容时变成 rs << RESIZE_STAMP_SHIFT + 2;上面说了 rs 的第 16 位为 1 因此在左移 16 位后 该位的 1 会到达符号位 因此在扩容是 sc 会成为一个负数//而后 16 位用来记录参与扩容的线程数//此时 sc < 0 说明正在扩 if (sc < 0) {/**
分别对五个条件进行说明
sc >>> RESIZE_STAMP_SHIFT != rs 取 sc 的高 16 位 如果!=rs 则说明 HashMap 底层数据的 n 已经发生了变化
sc == rs + 1 此处可能有问题 我先按自己的理解 觉得应该是 sc == rs << RESIZE_STAMP_SHIFT + 1; 因为开始 transfer 时 sc = rs << RESIZE_STAMP_SHIFT + 2(一条线程在扩容,且之后有新线程参与扩容 sc 均会加 1,而一条线程完成后 sc - 1)说明是参与 transfer 的线程已经完成了 transfer
同理 sc == rs + MAX_RESIZERS 这个应该也改为 sc = rs << RESIZE_STAMP_SHIFT + MAX_RESIZERS 表示参与迁移的线程已经到达最大数量 本线程可以不用参与
(nt = nextTable) == null 首先 nextTable 是在扩容中间状态才使用的数组(这一点和 redis 的渐进式扩容方式很像) 当 nextTable 重新为 null 时 说明 transfer 已经 finish
transferIndex <= 0 也是同理
遇上以上这些情况 说明此线程都不需要参与 transfer 的工作
PS: 翻了下 JDK16 的代码 这部分已经改掉了 rs = resizeStamp(n) << RESIZE_STAMP_SHIFT 证明我们的猜想应该是正确的*/if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;//否则该线程需要一起 transferif (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//说明没有其他线程正在扩容 该线程会将 sizeCtl 设置为负数 表示正在扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}
如上文所说,这个方法有两个作用,一是更新元素个数,二是判断是否需要 resize()。
更新 size()
我们可以单独看 addCount 中更新 size 的部分
if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}
首先判断 countCells 是否已经被初始化,如果没有被初始化,那么将尝试在 size 的更新操作放在 baseCount 上。如果此时没有冲突,那么 CAS 修改 baseCount 就能成功,size 的更新就落在了 baseCount 上。如果此时已经有 countCells 了,那么会根据线程的探针随机落到 countCells 的某个下标上。对 size 的更新就是更新对应 CountCells 的 value 值。如果还是不行,将会进入fullAddCount
方法中,自旋重试直到更新成功。这里不对fullAddCount
展开介绍,具体操作也类似,size 的变化要么累加在对应的 CountCell 上,要么累加在 baseCount 上。这里说一下我个人对 ConcurrentHashMap 采用这么复杂的方式进行计数的理解。因为 ConcurrenthHashMap 是出于吞吐量最大的目的设计的,因此,如果单纯的用一个 size 直接记录元素的个数,那么每次增删操作都需要同步 size,这会让 ConcurrentHashMap 的吞吐量大大降低。因为,将 size 分散成多个部分,每次修改只需要对其中的一部分进行修改,可以有效的减少竞争,从而增加吞吐量。
resize()
对于 resize()过程,我其实在代码的注释中说明的比较详细了。首先,是一个 while()循环,其中的条件是元素的 size(由上一步计算而来)已经大于等于 sizeCtl(说明到达了扩容条件,需要进行 resize),这是用来配合 CAS 操作的。接着,是根据当前数组的容量计算了 resizeStamp(该函数会根据不同的容量得到一个确定的数)。得到的这个数会在之后的扩容过程中被使用。然后是比较 sizeCtl,如果 sizeCtl 小于 0,说明此时已经有线程正在扩容,排
除了几种不需要参与扩容的情况(例如,扩容已经完成,或是参与的扩容线程数已经到最大值,具体情况代码上的注解已经给出了分析),剩下的情况当前线程会帮助其他线程一起扩容,扩容前需要修改 CAS 修改 sizeCtl(因为在扩容时,sizeCtl 的后 16 位表示参与扩容的线程数,每当有一个线程参与扩容,需要对 sizeCtl 加 1,当该线程完成时,对 sizeCtl 减 1,这样比对 sizeCtl 就可以知道是否所有线程都完成了扩容)。另外如果 sizeCtl 大于 0,说明还没有线程参与扩容,此时需要 CAS 修改 sizeCtl 为 rs << RESIZE_STAMP_SHIFT + 2(其中 rs 是有 resizeStamp(n)得到的),这是一个负数,上文也说了这个数的后 16 位表示参与扩容的线程,当所有线程都完成了扩容时,sizeCtl 应该为 rs << RESIZE_STAMP_SHIFT + 1。这是我们结束扩容的条件,会在后文看到。
transfer()
transfer()方法负责对数组进行扩容,并将数据 rehash 到新的节点上。这一过程中会启用 nextTable 变量,并在扩容完成后,替换成 table 变量。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {//stride 是步长,transfer 会依据 stride,把 table 分为若干部分,依次处理,好让多线程能协助 transferint n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiating //nextTab 等于 null 表示第一个进来扩容的线程 try {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //第一个线程需要对扩容的数组翻倍 nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}//用 nextTable 和 transferIndex 表示扩容的中间状态 nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true; // advance 表示是否可以继续执行下一个 strideboolean finishing = false; // to ensure sweep before committing nextTab finish 表示 transfer 是否已经完成 nextTable 已经替换了 table
//开始转移各个槽 for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;//STEP1 判断是否可以进入下一个 stride 确认 i 和 bound//通过 stride 领取一部分的 transfer 任务,while 循环就是确认边界 while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing) //认领的部分已经被执行完(一个 stride 执行完)advance = false;else if ((nextIndex = transferIndex) <= 0) { //transfer 任务被认领完 i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) { //认领一个 stride 的任务 bound = nextBound;i = nextIndex - 1;advance = false;}}
/**
i < 0 说明要转移的桶 都已经处理过了
以上条件已经说明 transfer 已经完成了*/if (i < 0 || i >= n || i + n >= nextn) { //transfer 结束 int sc;if (finishing) {//如果完成整个 transfer 的过程 清空 nextTable 让 table 等于扩容后的数组 nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1); //0.75f * n 重新计算下次扩容的阈值 return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//一个线程完成了 transfer//如果还有其他线程在 transfer 先返回 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;//说明这是最后一个在 transfer 的线程 因此 finish 标志被置为 truefinishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null) //如果该节点为 null,则对该节点的迁移立马完成,设置成 forwardNodeadvance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse { //开始迁移该节点 synchronized (f) {//同步,保证线程安全 if (tabAt(tab, i) == f) { //double-checkNode<K,V> ln, hn; //ln 是扩容后依旧保留在原 index 上的 node 链表;hn 是移到 index + n 上的 node 链表 if (fh >= 0) { //普通链表 int runBit = fh & n;Node<K,V> lastRun = f;//这一次遍历的目的是找到最后一个一个节点,其后的节点 hash & N 都不发生改变//例如 有 A->B->C->D,其 hash & n 为 0,1,1,1 那就是找到 B 点//这样做的目的是之后对链表进行拆分时 C 和 D 不需要单独处理 维持和 B 的关系 B 移动到新的 tab[i]或 tab[i+cap]上即可//还有不理解的可以参考我的测试代码:https://github.com/insaneXs/all-mess/blob/master/src/main/java/com/insanexs/mess/collection/TestConHashMapSeq.javafor (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}//如果 runBit == 0 说明之前找到的节点应该在 tab[i]if (runBit == 0) {ln = lastRun;hn = null;}//否则说明之前的节点在 tab[i+cap]else {hn = lastRun;ln = null;}//上面分析了链表的拆分只用遍历到 lastRun 的前一节点 因为 lastRun 及之后的节点已经移动好了 for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;//这里不再继续使用尾插法而是改用了头插法 因此链表的顺序可能会发生颠倒(lastRun 及之后的节点不受影响)if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//将新的链表移动到 nextTab 的对应坐标中 setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);//tab 上对应坐标的节点变为 ForwardingNodesetTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) { //红黑树节点 TreeBin<K,V> t = (TreeBin<K,V>)f;//同样拆成两棵树
评论