写点什么

【数据结构】Java 常用集合类 ConcurrentHashMap(JDK 1.8)

用户头像
Alex🐒
关注
发布于: 2020 年 07 月 21 日

整体设计

常用的 HashMap 是线程不安全的, Hashtable 是线程安全的。Hashtable 通过在方法上添加 synchronized 保证线程安全,相当于 Hashtable 实例只有一把锁,导致高并发场景下使用效率低。



JDK 1.7 ConcurrentHashMap 使用分段锁局部锁定的方式,ConcurrentHashMap 由多个 Segment(包含 Node 键值对)组成,每个 Segment 各自持有锁实现线程安全,当一个线程占用锁访问其中一个 Segment 数据的时候,其他 Segment 的数据能够被其他线程访问。对于一些需要跨多个 Segment 的方法(比如:size()containsValue()),需要锁整个表,这时会按顺序锁住所有 Segment,使用后再按顺序释放,防止死锁。读操作不加锁。



JDK 1.8 ConcurrentHashMap 使用了大量的 CAS 操作实现无锁操作,在少数地方(扩容操作、对某个桶中的 Node 操作...)用到 synchronized 同步锁保证线程安全。



ConcurrentHashMap 的实现基本上与 HashMap 类似,大部分属性可以参考 HashMap

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// 作为 ConcurrentHashMap 的控制标识符(初始化和扩容)
// sizeCtl = -1 表示正在初始化,initTable(或 tryPresize)方法中设置
// sizeCtl = -(1 + resizing_threads_num) 表示正在 resizing,tryPresize 方法中设置
// sizeCtl = 0 | capacity 表示未初始化或当前容量的大小(用于下次扩容判断)
transient volatile int sizeCtl;
// 要使用的下一个表;仅在 resize 时为非空。
private transient volatile Node<K,V>[] nextTable;
}

构造函数

public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
// 初始化表的容量,tableSizeFor 计算大于等于初始容量的的最小的二次幂数值
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 设置 sizeCtl
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
// 设置 sizeCtl
this.sizeCtl = DEFAULT_CAPACITY;
// 调用 putAll,将所有元素存入当前实例中
putAll(m);
}



下面的构造函数指定了一个参数 concurrencyLevel,表示能够同时更新 ConcurrentHashMap 且不产生锁竞争的最大线程数。

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// ...
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

存储结构

Node 对象的实现与 HashMap.Node 类似(有细微区别),存储 key-value,需要注意的是 val 和 next 属性添加了 volatile 修饰保证可见性。并且 val 不允许修改(HashMap 中可以)。

// 哈希桶
transient volatile Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
// volatile 保证可见性
volatile V val;
volatile Node<K,V> next;
// 不允许修改,调用会抛出异常
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
}



当 Node 数量超过 TREEIFY_THRESHOLD 时,会由链表结构转换为红黑树 TreeNode。在 HashMap.TreeNode 中实现了 treeify 方法(链表转换为树),在 ConcurrentHashMap 中,转换的操作交给了 TreeBin 来实现。TreeBin 封装了对 TreeNode 的各种操作。

// TreeNode
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { /****/ }
}
// TreeBin
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 锁状态标识
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
TreeBin(TreeNode<K,V> b) { /****/ }
// 获取写锁
private final void lockRoot() {
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
}
// 释放写锁
private final void unlockRoot() {
lockState = 0;
}
final Node<K,V> find(int h, Object k) { /****/ }
final TreeNode<K,V> putTreeVal(int h, K k, V v) { /****/ }
final boolean removeTreeNode(TreeNode<K,V> p) { /****/ }
// Red-black tree methods, all adapted from CLR
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root,
TreeNode<K,V> p) {
}
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root,
TreeNode<K,V> p) {
}
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
TreeNode<K,V> x) {
}
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root,
TreeNode<K,V> x) {
}
}

Hash 方法

首先比较一下 ConcurrentHashMap 与 HashMap 对 key 取哈希值的方法,有一些不同。

// HashMap
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
// ConcurrentHashMap
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

Get 方法

首先计算出 key 的 hashCode,找到 table 中的数组索引,即 Node<K, V> 对象,如果存在并且判断该结点是否等于要查询的节点,如果不是,按照链表或者红黑树的查询方式查找。

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 检查 table 是否存在,hashCode 所在的索引是有为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 按照树的方式遍历 Node 查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 按照链表的方式遍历 Node 查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

Put 方法

Put 操作应用到 CAS 算法,调用了形如 compareAndSwapXXX 的 native 方法实现。

public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 计算 key 的 hashCode
int hash = spread(key.hashCode());
int binCount = 0;
// 循环直到插入成功,
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化 table
tab = initTable();
// tabAt 调用 getObjectVolatile
// 当前位置为空可以直接插入的情况
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 通过 CAS 操作插入,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 下面是位置已经有值的情况
// MOVED 表示当前 Map 正在进行扩容
else if ((fh = f.hash) == MOVED)
// 帮助进行扩容,然后进入下一次循环尝试插入
tab = helpTransfer(tab, f);
// 未在扩容的情况
else {
V oldVal = null;
// 对f加锁,f 是存储在当前位置的 Node 的头节点
synchronized (f) {
// 双重检查,保证 Node 头节点没有改变
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 对链表进行操作
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
// 更新值(判断 onlyIfAbsent)或插入链表尾部 ...
break;
}
}
else if (f instanceof TreeBin) {
// 对树进行操作 ...
}
}
}
// 判断是否需要 treeify
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

初始化 table

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 初始化成功退出循环
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 有其他线程在初始化,自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 进入初始化 sizeCtl = -1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc = n-n/4 ???
sc = n - (n >>> 2);
}
} finally {
// 初始化成功设置 sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

扩容

在 treeifyBin 的操作中会判断是否真的需要转换树,还是对 table 进行扩容。当一个 table 的某个位置上元素比较多时,很大原因并不是因为这些 key 的 hashCode 相同,而是因为 table 比较小(跟 HashMap 的逻辑相同),hashCode 对 table 大小取摸后相同的概率比较大,所以选择对 table 进行扩容。扩容代码如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// ....
}
}
}
private final void tryPresize(int size) {
// 计算新的容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
// sizeCtl
int sc;
// sizeCtl < 0 表示正在初始化或扩容
// 直到不需要扩容或扩容完成跳出循环
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
// 初始化 table ...
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
// 不需要要扩容
break;
else if (tab == table) {
// 生成一个 resize 的标识,更新 sizeCtl(CAS)
int rs = resizeStamp(n);
// 此时 sc 应该是 >= 0
if (sc < 0) {
// ...
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 执行扩容操作
transfer(tab, null);
}
}
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) {
// 第一个扩容线程会进入 if
// 新的 table 容量是老的 table 的两倍,如果溢出则失败退出
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
// 设置 nextTable
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 省略复杂的扩容过程 ...
// 通过 CAS 将老的 table 中的元素插入 nextTable 中
}



在 put 操作中,有一个判断调用了 helpTransfer 方法,进行帮助扩容,含义是有其它线程正在扩容时,当前线程一起转移元素。

// put 方法
{
if ((fh = f.hash) == MOVED)
// 帮助进行扩容,然后进入下一次循环尝试插入
tab = helpTransfer(tab, f);
}
// f.hash = MOVE 表示 f 是 ForwardingNode,用于扩容操作的节点
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 生成一个 resize 的标识,更新 sizeCtl(CAS)
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 当扩容的线程过多时,跳出循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进入扩容方法,并更新 sizeCtl
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}



扩容操作 transfer 相当复杂,抽取部分核心代码(添加锁的部分):

synchronized (f) {
if (tabAt(tab, i) == f) {
// 这里与 HashMap 重新离散的思路相同:
// 当前桶的节点在扩容后只能在新桶的idx(当前位置)或 idx+oldSize(原桶大小)
// 就是说原链表 Node,会生成两个新的 Node:ln和hn
Node<K,V> ln, hn;
if (fh >= 0) {
// 处理链表节点 ...
// 调用 putObjectVolatile 方法更新 newTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
}
else if (f instanceof TreeBin) {
// 处理树节点,同样的更新逻辑 ...
}
}
}

遍历方式

ConcurrentHashMap 是支持在遍历的时候,进行修改。但是频繁的修改和遍历 ConcurrentHashMap 还是会出问题,有可能遍历不到最新修改的数据。

已知问题

JDK7 HashMap 在多线程环境下可能造成 CPU 100% 的现象,由于在扩容的时候 put 时产生了环状链表,会在 get 时造成了 CPU 100%,在 JDK8 通过修改重新离散的方法得到解决。

JDK8 ConcurrentHashMap 也有一种情况会造成 CPU 100% 中,在 JDK9 中已经得到修复([https://bugs.openjdk.java.net/browse/JDK-8062841]



什么情况下 JDK8 ConcurrentHashMap 会出现这个问题?

Map<String, String> map = new ConcurrentHashMap<>();
map.computeIfAbsent("AaAa",
key -> map.computeIfAbsent("BBBB", key2 -> "value"));



问题的关键在于递归使用了 computeIfAbsent 方法,不要在递归中使用,具体的原因这里不再分析,避免使用。

与 Collections.synchronizedMap() 的区别

Collections.synchronizedMap() 方法来获取一个线程安全的集合(实现原理是 Collections 定义了一个 SynchronizedMap 的内部类,这个类实现了 Map 接口,在调用方法时使用 synchronized 来保证线程同步,其它 Collections.synchronizedXX 方法也是类似原理),所以 Collections.synchronizedMap() 更类似与 Hashtable,对 Map 进行同步。



还有一个区别是,ConcurrentHashMap 必然是个 HashMap。而 Collections.synchronizedMap() 可以接收任意 Map 实例,实现同步。






发布于: 2020 年 07 月 21 日阅读数: 57
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
【数据结构】Java 常用集合类 ConcurrentHashMap(JDK 1.8)