【Java 技术探索】,区块链技术 kafka
ConcurrentHashMap 是 concurrent 家族中的一个类,由于它可以高效地支持并发操作,以及被广泛使用,经典的开源框架 Spring 的底层数据结构就是使用 ConcurrentHashMap 实现的。
与同是线程安全的老大哥 HashTable 相比,它已经更胜一筹,因此它的锁更加细化,而不是像 HashTable 一样为几乎每个方法都添加了 synchronized 锁,这样的锁无疑会影响到性能。
[](
)原理简介
本文的分析的源码是 JDK8 的版本,与 JDK7 的版本有很大的差异。实现线程安全的思想也已经完全变了,它摒弃了 Segment(锁段)的概念,而是启用了一种全新的方式实现,利用 CAS 算法。
它沿用了与它同时期(JDK1.8)的 HashMap 版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如 TreeBin,Traverser 等对象内部类。
[](
)重要的属性
首先来看几个重要的属性,与 HashMap 相同的就不再介绍了,这里重点解释一下 sizeCtl 这个属性。可以说它是 ConcurrentHashMap 中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。
[](
)sizeCtl
负数代表正在进行初始化或扩容操作
-1 代表正在初始化
-N 表示有 N-1 个线程正在进行扩容操作
正数或 0 代表 hash 表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。
还后面可以看到,它的值始终是当前 ConcurrentHashMap 容量的 0.75 倍,这与 loadfactor 是对应的。
/**
盛装 Node 元素的数组 它的大小是 2 的整数次幂
Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
Table initialization and resizing control. When negative, the
table is being initialized or resized: -1 for initialization,
else -(1 + the number of active resizing threads). Otherwise,
when table is null, holds the initial table size to use upon
creation, or 0 for default. After initialization, holds the
next element count value upon which to resize the table.
hash 表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1 代表正在初始化
-N 表示有 N-1 个线程正在进行扩容操作
正数或 0 代表 hash 表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
// 以下两个是用来控制扩容的时候 单线程进入的变量
/**
The number of bits used for generation stamp in sizeCtl.
Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash 值是-1,表示是一个 forwardNode 节点
static final int TREEBIN = -2; // hash 值是-2 表示这时一个 TreeBin 节点
[](
)重要的内部类
[](
)Node(链表)
Node 是最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。
如下所示:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; //hash 值
final K key; //键值 key
volatile V val;//带有 sync 锁的 value
volatile Node<K,V> next;//带有 sync 锁的 next 指针
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash; // hash
this.key = key; // key
this.val = val; // value
this.next = next; //下一个节点的引用
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允许直接改变 value 的值
publi
c final V setValue(V value) {
throw new UnsupportedOperationException();
}
// 比较两个 node 是否相等
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null && // 先比类型
(k == key || k.equals(key)) && // 再比 key
(v == (u = val) || v.equals(u))); // 再比 value 值
}
/**
Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
这个 Node 内部类与 HashMap 中定义的 Node 类很相似
但是有一些差别它对 value 和 next 属性设置了 volatile 的 sync 锁
它不允许调用 setValue 方法直接改变 Node 的 value 域
它增加了 find 方法辅助 map.get()方法
[](
)TreeNode(树节点)
树节点类,另外一个核心的数据结构。
当链表长度过长的时候,会转换为 TreeNode。
但是与 HashMap 不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。
而且 TreeNode 在 ConcurrentHashMap 集成自 Node 类,而并非 HashMap 中的集成自 LinkedHashMap.Entry<K,V>类,也就是说 TreeNode 带有 next 指针,这样做的目的是方便基于 TreeBin 的访问。
[](
)TreeBin
这个类并不负责包装用户的 key、value 信息,而是包装的很多 TreeNode 节点。它代替了 TreeNode 的根节点,也就是说在实际的 ConcurrentHashMap“数组”中,存放的是 TreeBin 对象,而不是 TreeNode 对象,这是与 HashMap 的区别。另外这个类还带有了读写锁。
可以看到在构造 TreeBin 节点时,仅仅指定了它的 hash 值为 TREEBIN 常量,这也就是个标识为。同时也看到我们熟悉的红黑树构造方法
/**
Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b; // 包装进去作为 first 节点
TreeNode<K,V> r = null;
//初始化根节点
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next; //传入的节点的 next 赋值为当前 treebin 节点的 next 节点,作为数据同步使用
x.left = x.right = null; //将左子树和右子树都是 null
if (r == null) {
x.parent = null;//父节点为空
x.red = false; // 不属于红子树节点
r = x; //将红节点,临时存储起来,用于下次循环使用
}
else {
K k = x.key; //如果不是根节点则就是下一个子树节点(属于 next 值)
int h = x.hash; // 如果不是根节点就取出 hash 值
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph; //
K pk = p.key;//获取根节点的 key 值
if ((ph = p.hash) > h) // 获取 hash 值判断是否属于小于根节点 hash 值
dir = -1; //属于左子树,
else if (ph < h) // 否则属于右子树
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
// 当 hash 值相等的时候,比较 class 类以及比较 key 值
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
// 判断比较结果是否小于零,且左右子树都为 null 的情况下
x.parent = xp;// 将父节点进行赋值
if (dir <= 0)
xp.left = x; // 进行判断是否属于左子树(<=0)
else
xp.right = x;// 进行判断是否属于右子树(>0)
// 进行平衡插入机制(其实就是建立平衡关系)
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r; // 根节点
assert checkInvariants(root);
}
[](
)ForwardingNode
一个用于连接两个 table 的节点类。它包含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key value next 指针全部为 null,它的 hash 值为-1. 这里面定义的 find 的方法是从 nextTable 里进行查询节点,而不是以自身为头节点进行查找
/**
A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
// 循环遍历查找相关的对应 hash 和 key 值的
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 如果无法定位或者定位到 null 则直接返回
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
// 遍历查找对比 hash 值和 key 值,从而进行分析是否相等。
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
// 查找到后相等直接返回
return e;
//如果 hash 值为-1 或者 -2
if (eh < 0) {
// 如果是 forwardingNode 则就是代表是-1
if (e instanceof ForwardingNode) {
//直接获取到下一个表。
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
//直到为 null,直接返回
if ((e = e.next) == null)
return null;
}
}
}
}
[](
)Unsafe 与 CAS
在 ConcurrentHashMap 中,随处可以看到 Unsafe, 大量使用了 Unsafe.compareAndSwapXXX 的方法,这个方法是利用一个 CAS 算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。
这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。
因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN 的思想是比较类似的。
[](
)unsafe 静态块
unsafe 代码块控制了一些属性的修改工作,比如最常用的 SIZECTL 。 在这一版本的 concurrentHashMap 中,大量应用来的 CAS 方法进行变量、属性的修改工作。 利用 CAS 进行无锁操作,可以大大提高性能。
// 各个字段属性的偏移量,可以通过偏移量+首地址获取到对应的数据对象
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
[](
)三个核心方法
ConcurrentHashMap 定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了 ConcurrentHashMap 的线程安全。
@SuppressWarnings("unchecked")
//获得在 i 位置上的 Node 节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
利用 CAS 算法设置 i 位置上的 Node 节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
在 CAS 算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于 SVN
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用 volatile 方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
[](
)初始化方法 initTable
对于 ConcurrentHashMap 来说,调用它的构造方法仅仅是设置一些参数。
整个 table 的初始化是在向 ConcurrentHashMap 中插入元素的时候发生的。
如调用 put、computeIfAbsent、compute、merge 等方法的时候,调用时机是检查 table==null。
初始化方法主要应用了关键属性 sizeCtl 如果这个值 <0,表示其他线程正在进行初始化,就放弃这个操作。
在这也可以看出 ConcurrentHashMap 的初始化只能由一个线程完成。
如果获得了初始化权限,就用 CAS 方法将 sizeCtl 置为-1,防止其他线程进入。
初始化数组后,将 sizeCtl 的值改为 0.75 * n,源码如下:
/**
Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl 表示有其他线程正在进行初始化操作,把线程挂起。对于 table 的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用 CAS 方法把 sizectl 的值置为-1 表示本线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//相当于 0.75*n 设置一个扩容的阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
[](
)扩容方法 transfer
ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。
这个方法的基本思想跟 HashMap 是很像的,但是由于它是支持并发扩容的,所以要复杂的多。
原因是它支持多线程进行扩容操作,而并没有加锁。
我想这样做的目的不仅仅是为了满足 concurrent 的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。
[](
)整个扩容操作分为两个部分
第一部分是构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过
RESIZE_STAMP_SHIFT
这个常量经过一次运算来保证的,这个地方在后面会有提到;第二个部分就是将原来 table 中的元素复制到 nextTable 中,这里允许多线程进行操作。
[](
)先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数 i,然后利用 tabAt 方法获得 i 位置的元素:
如果这个位置为空,就在原 table 中的 i 位置放入 forwardNode 节点,这个也是触发并发扩容的关键点;
如果这个位置是 Node 节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上
如果这个位置是 TreeBin 节点(fh<0),也做一个反序处理,并且判断是否需要 untreefi,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上
遍历过所有的节点以后就完成了复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。
[](
)再看一下多线程是如何完成的:
如果遍历到的节点是 forward 节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。
多线程遍历节点,处理了一个节点,就把对应点的值 set 为 forward,另一个线程看到 forward,就向后遍历。
这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。
/**
一个过渡的 table 表,只有在扩容的时候才会使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
Moves and/or copies the nodes in each bin to new table. See
above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个 nextTable 对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于 true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//这个 while 循环体的作用就是在控制 i-- 通过 i--可以依次遍历原 hash 表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把 nextTable 赋值给 table 清空临时对象 nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的 1.5 倍 依然相当于现在容量的 0.75 倍
return;
}
//利用 CAS 方法更新这个扩容阈值,在这里面 sizectl 值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入 ForwardingNode 指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到 ForwardingNode 节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果 fh>=0 证明这是一个 Node 节点
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在 nextTable 的 i 位置上插入一个链表
setTabAt(nextTab, i, ln);
//在 nextTable 的 i+n 的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在 table 的 i 位置上插入 forwardNode 节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置 advance 为 true 返回到上面的 while 循环中 就可以执行 i--操作
advance = true;
}
//对 TreeBin 对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造正序和反序两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
评论