写点什么

什么?JDK8 的 ConcurrentHashMap 有 Bug

作者:skow
  • 2022 年 5 月 23 日
  • 本文字数:5544 字

    阅读完需:约 18 分钟

什么?JDK8的ConcurrentHashMap 有 Bug

大家好,我是 Skow


大家看到标题是不是第一反应,怎么可能 JDK8 的 ConcurrentHsahMap 会有 bug,肯定是个标题党


且慢,的确我们 JDK8 中存在这个 Bug,但是这个 Bug 不影响使用,并且在 JDK12 中对其进行了修复



在这个 JDK 的 Bug 收集论坛上,我们可以看到这个报告指出的是在 ConcurrentHsahMap 进行 addCount() 的时候会有问题存在,我们先看一下具体在 JDK8 和 JDK12 中该方法的体现


JDK8


private final void addCount(long x, int check) {        //...        if (check >= 0) {            Node<K,V>[] tab, nt; int n, sc;            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&                   (n = tab.length) < MAXIMUM_CAPACITY) {                int rs = resizeStamp(n);                if (sc < 0) {                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                        transferIndex <= 0)                        break;                    //...                }                //...            }        }    }
复制代码


JDK12


private final void addCount(long x, int check) {    //...    if (check >= 0) {        Node<K,V>[] tab, nt;        int n, sc;        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&                (n = tab.length) < MAXIMUM_CAPACITY) {            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;            if (sc < 0) {                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||                        (nt = nextTable) == null || transferIndex <= 0)                    break;                //...            }             //...        }    }}
复制代码


我们仔细对比一下代码,就可以发现在 JDK12 中去除了 (sc >>> RESIZE_STAMP_SHIFT) != rs


这个条件判断,那问题就来,这个条件出现了什么问题?


Doug Lea 这么细的人当初为什么没考虑到,欢迎走入今天的文章,揭秘一下这个 BUG 的前世今生,对 ConcurrentHashMap 扩容和计数逻辑熟悉的同学,可直接跳转第三部份看问题分析即可


基本定义

默认小伙伴对 ConcurrentHashMap 有初步的了解,以及清楚 CAS 算法等基础知识,此文不做展开说明


想要知道 ConcurrentHashMap 中的扩容和计数逻辑,我们需要先明白一些定义,不然直接开始读源码,我们会有点吃力


接下来会罗列一下一会我们 addCount 方法中会用到的一些基本定义,我们先混个眼熟


  • sizeCtl

  • sizeCtl 的定义有点复杂,但是也有点重要

  • sizeCtl < -1 表示当前有 N-1 个线程正在执行扩容操作,如 -2 就表示有 2-1 个线程正在扩容

  • sizeCtl = -1 表示当前正在初始化数组

  • sizeCtl = 0 默认状态,表示数组还没有被初始化

  • sizeCtl > 0 记录下一次需要扩容的大小。

  • counterCells

  • 计数单元,非空的时候,大小通常为 2 次幂

  • RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS

  • 用于生成记录扩容线程个数的戳记的移位数

  • MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1

  • 辅助扩容的最大线程数

  • transferIndex

  • 扩容任务分配的进度标示。初始为 n,逆序扩容,每次减一个步长的位置,最终减至<= 0,表示整个扩容任务分配结束

计数逻辑

假设现在让你设计一个并发下的 int 类型的数值相加,你会怎么考虑?


第一反应是不是使用 volatile 或者 synchroieed 来保证数据相加的准确性,或许会有同学想到用 volatile 修饰变量然后利用 CAS 进行相加,性能会不会较优


但是在并发激烈的情况下,CAS 也可能会导致部份线程一直处于自旋的情况


为了避免这种极端的并发激烈情况,Doug Lea 采取了一种分而治之的思想,来尽可能避免这种竞争的发生


那就是利用 baseCount + CounterCell


baseCount 顾名思义就是一个基础的计数变量,counterCells 上面我们有简短说明,它是一个计数单元,非空的时候,大小通常为 2 次幂,在未发生竞争的时候,cas 修改会直接去修改 baseCount,假设 CAS 修改失败也不会进行自旋,而是以哈希映射的方式在 counterCells 中获取到一个数组下标的位置来进行计数,若依然没法找到对应合适的位置会继续重复这个操作,多次失败后就会针对 counterCells 进行扩容,继续孜孜不倦的寻找合适的格子来计数


了解了 addCount 的大体设计思想,我们来看下源码


/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available.  Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */private final void addCount(long x, int check) {    CounterCell[] as; long b, s;    // 这边 compareAndSwapxxx 都是 CAS操作    // 此处的意思就是如果 counterCells 不为 null,就直接往 counterCells 里操作,反之则直接操作 baseCount    if ((as = counterCells) != null ||        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {        CounterCell a; long v; int m;        // 默认线程安全,没有竞争        boolean uncontended = true;        if (as == null || (m = as.length - 1) < 0 ||            // ThreadLocalRandom.getProbe() 取当前线程的 hash 值            // 此处的 compareAndSwapLong 做的事情是找到不为 null 的格子,进行 value+x,CAS 失败就进入 fullAddCount            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||            !(uncontended =              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {            fullAddCount(x, uncontended);            return;        }        if (check <= 1)            return;        s = sumCount();    }    // 扩容代码暂时省略}
复制代码


简单的逻辑大致在代码里都已经注释了,摘出来两点和大家在分享一下


fullAddCount 这个方法,其实就是尽可能去保证计数成功,里面的逻辑不展开说明,是一堆 if lese,感兴趣的小伙伴可以去进一步阅读


第二点,此处 check 方法的判断



check 的值,我们可以理解为链表的长度,会根据不同的调用,入参进行变化


如果我们是在 put 方法中调用的 addCount 那么 check 一定大于等于 2,如果是在 computeIfAbsent 方法中调用的,那么 check 为 可能为 1


当 check <= 1 就不会进行扩容检查操作,也是为了保障性能,所以直接 return

扩容逻辑

话外音:咋还不说 bug,都看了这么久了


小伙伴们别着急,只有明白了这个计数和扩容逻辑,我们才能理解到这个 bug 的精髓所在


ConcurrentHashMap 的扩容逻辑,我认为是 ConcurrentHashMap 较为核心的逻辑之一,我们来细细品鉴一下


private final void addCount(long x, int check) {    // 只有当外部传进来的 check 大于等于0的时候,才会开始检查扩容    if (check >= 0) {        Node<K,V>[] tab, nt; int n, sc;        // sizeCtl 会被赋值为下一次扩容的大小,,此处比较 s 和 下一次扩容的大小        // tab 未发生改变并且数组长度未达到阈值        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&               (n = tab.length) < MAXIMUM_CAPACITY) {            int rs = resizeStamp(n);            if (sc < 0) {                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                    transferIndex <= 0)                    break;                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                    transfer(tab, nt);            }            else if (U.compareAndSwapInt(this, SIZECTL, sc,                                         (rs << RESIZE_STAMP_SHIFT) + 2))                transfer(tab, null);            s = sumCount();        }    }}
复制代码


进入 while 循环后,我们可以看到 resizeStamp 这个方法,虽然和我们的 bug 理解无关,但是这个方法很有意思,作为拓展我们展开说下,不感兴趣的小伙伴可以往下扒拉扒拉


/** * Returns the stamp bits for resizing a table of size n. * 返回值作为正在扩容的数组的长度n的一个标志位 * Must be negative when shifted left by RESIZE_STAMP_SHIFT. * 通过 RESIZE_STAMP_SHIFT 左移 16 位时一定是一个负数 */static final int resizeStamp(int n) {    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));}
复制代码


numberOfLeadingZeros 这个方法主要的目的就是为了得到 n 转成二进制后的最高非 0 位前的 0 的个数。


小张:啥玩意,你这说的,听不懂


上例子


假设传入的 n 为 16,numberOfLeadingZeros 得到结果即如图所示



然后在当前 JDK8 版本中 RESIZE_STAMP_BITS 默认为 16


1 << (RESIZE_STAMP_BITS - 1) 等于 1 << 15 等于 32678


27 | 32678 = 32795 二进制就是:1000 0000 0001 1011


利用 rs 即可反推出,当前需要扩容的数组长度!切记,切记,切记。后文理解 bug 需要用到


理解了 resizeStamp 的意图之后我们继续往下看,当 sc 小于 0 的时候,表示 ConcurrentHashMap 已经开始扩容,会去判断是否扩容完成,是否需要线程去辅助继续帮助扩容


反之则说明还没有开始扩容,则触发第一个扩容线程,进行 cas 修改,这边是 rs << RESIZE_STAMP_SHIFT) + 2,为什么是 + 2,而不是 +1


我们又要回到我们刚才算出来的 rs 的作用是什么


  • 高 16 位代表当前扩容的标记

  • 低 16 位代表了扩容的线程数


知道 rs 作用我们就比较好理解为什么需要 +2,而不是+1 了


因为我们的 rs 最终都是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,按常理 +1 即可,但是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以只好 +2

问题分析

前面铺垫完,我们进入正题


我们来看下扩容的五个条件判断


if (check >= 0) {            Node<K,V>[] tab, nt; int n, sc;            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&                   (n = tab.length) < MAXIMUM_CAPACITY) {                int rs = resizeStamp(n);                if (sc < 0) {                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                        transferIndex <= 0)                        break;                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                        transfer(tab, nt);                }                else if (U.compareAndSwapInt(this, SIZECTL, sc,                                             (rs << RESIZE_STAMP_SHIFT) + 2))                    transfer(tab, null);                s = sumCount();            }        }
复制代码


  • sc == rs + 1

  • 最后一个线程正在处理扩容工作,扩容工作即将结束

  • sc == rs + MAX_RESIZERS

  • 当前扩容线程数已满,无需辅助帮助扩容

  • (nt = nextTable) == null

  • 扩容完成之后把 nextTable 置为 null

  • transferIndex <= 0

  • 当前可供扩容的下标已经全部分配完毕,也代表了当前线程扩容结束


(sc >>> RESIZE_STAMP_SHIFT) != rs


这个条件也是我们的 bug 所在,我们理解这个条件为 将 sc 右移 16 位 判断是否不等于我们的 rs


sc = sizeCtl,即扩容完成后 sc 会被赋值为 扩容后的数组的大小


rs 即刚才我们分析的扩容时间戳


那么当进入这个 if 判断,其实一定意味着 我们的 tab 被发生的改变


那么一定有另一个线程正在使 ConcurrentHashMap 处于扩容中,或者处于扩容结束


  1. 当处于扩容中 (nt = nextTable) == null 可能为 true


   private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {   ...          if (finishing) {           nextTable = null;           // table changed after "nextTable = null"。           table = nextTab;           sizeCtl = (n << 1) - (n >>> 1);           return;       }   ...   }
复制代码


  1. 当扩容结束,这个 sizeCtl 已被改变,则和当前线程拿到的 sizeCtl 不符,即 (sc >>> RESIZE_STAMP_SHIFT) != rs 这个条件判断永远为 false, (sc == rs + 1 || sc == rs + MAX_RESIZERS ) 永远不会为 true,扩容线程数永远不受限制


那么,这个 bug 是否会影响到我们的使用呢?


当这个条件判断为 false 的时候,进入第二个 if 又是一个 cas 判断,这个 cas 判断将会失败


并且当前线程在下一个迭代中将会 break 出 while 循环 或者继续尝试去帮助扩容


虽然设计的思路略有偏差,但是好在不影响实际使用

小结

至此,我们的分析已经结束,这个 bug 比较隐蔽的,但是我们理解 ConcurrentHashMap 的扩容逻辑后,也还是可以去 get 到 Bug 的产生


我们简单归纳下,就是判断扩容的时候的一个条件判断写错,导致扩容的线程数不受控制,但是不影响实际使用,因为在真正的扩容逻辑 transfer 方法中,参与扩容的线程也会发现其实扩容已经结束,不参与扩容即可


本文没有分析 transfer 的真正逻辑,感兴趣的同学可以去看下

发布于: 刚刚阅读数: 2
用户头像

skow

关注

分享,前方就是一片花海 2019.05.30 加入

微信公众号:issues

评论

发布
暂无评论
什么?JDK8的ConcurrentHashMap 有 Bug_Java_skow_InfoQ写作社区