ConcurrentHashMap 锁的前世今生,了解一下
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
同样为了提高取模运算效率,通过如下计算,ssize 即为大于 concurrencyLevel 的最小的 2 的 N 次方,同时 segmentMask 为 2^N-1。这一点跟上文中计算数组长度的方法一致。对于某一个 Key 的哈希值,只需要向右移 segmentShift 位以取高 sshift 位,再与 segmentMask 取与操作即可得到它在 Segment 数组上的索引。
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
同步方式
Segment 继承自 ReentrantLock,所以我们可以很方便的对每一个 Segment 上锁。
对于读操作,获取 Key 所在的 Segment 时,需要保证可见性(请参考如何保证多线程条件下的可见性)。具体实现上可以使用 volatile 关键字,也可使用锁。但使用锁开销太大,而使用 volatile 时每次写操作都会让所有 CPU 内缓存无效,也有一定开销。ConcurrentHashMap 使用如下方法保证可见性,取得最新的 Segment。
Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)
获取 Segment 中的 HashEntry 时也使用了类似方法
HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。它会先获取该 Key-Value 对所在的 Segment 的锁,获取成功后就可以像操作一个普通的 HashMap 一样操作该 Segment,并保证该 Segment 的安全性。
同时由于其它 Segment 的锁并未被获取,因此理论上可支持 concurrencyLevel(等于 Segment 的个数)个线程安全的并发读写。
获取锁时,并不直接使用 lock 来获取,因为该方法获取锁失败时会挂起(参考可重入锁)。事实上,它使用了自旋锁,如果 tryLock 获取锁失败,说明锁被其它线程占用,此时通过循环再次以 tryLock 的方式申请锁。如果在循环过程中该 Key 所对应的链表头被修改,则重置 retry 次数。如果 retry 次数超过一定值,则使用 lock 方法申请锁。
这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗 CPU 资源比较多,因此在自旋次数超过阈值时切换为互斥锁。
size 操作
put、remove 和 get 操作只需要关心一个 Segment,而 size 操作需要遍历所有的 Segment 才能算出整个 Map 的大小。一个简单的方案是,先锁住所有 Sgment,计算完后再解锁。但这样做,在做 size 操作时,不仅无法对 Map 进行写操作,同时也无法进行读操作,不利于对 Map 的并行操作。
为更好支持并发操作,ConcurrentHashMap 会在不上锁的前提逐个 Segment 计算 3 次 size,如果某相邻两次计算获取的所有 Segment 的更新次数(每个 Segment 都与 HashMap 一样通过 modCount 跟踪自己的修改次数,Segment 每修改一次其 modCount 加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总 size 相等,可直接作为最终结果返回。如果这三次计算过程中 Map 有更新,则对所有 Segment 加锁重新计算 Size。该计算方法代码如下
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for
(int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
=========================================================================================
Java 7 为实现并行访问,引入了 Segment 这一结构,实现了分段锁,理论上最大并发度与 Segment 个数相等。Java 8 为进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。同时为了提高哈希碰撞下的寻址性能,Java 8 在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(long(N)))。其数据结构如下图所示:
寻址方式
Java 8 的 ConcurrentHashMap 同样是通过 Key 的哈希值与数组长度取模确定该 Key 在数组中的索引。同样为了避免不太好的 Key 的 hashCode 设计,它通过如下方法计算得到 Key 的最终哈希值。不同的是,Java 8 的 ConcurrentHashMap 作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将 Key 的 hashCode 值与其高 16 位作异或并保证最高位为 0(从而保证最终结果为正整数)。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
同步方式
对于 put 操作,如果 Key 对应的数组元素为 null,则通过 CAS 操作将其设置为当前值。如果 Key 对应的数组元素(也即链表表头或者树的根元素)不为 null,则对该元素使用 synchronized 关键字申请锁,然后进行操作。如果该 put 操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率。
对于读操作,由于数组被 volatile 关键字修饰,因此不用担心数组的可见性问题。同时每个元素是一个 Node 实例(Java 7 中每个元素是一个 HashEntry),它的 Key 值和 hash 值都由 final 修饰,不可变更,无须关心它们被修改后的可见性问题。而其 Value 及对下一个元素的引用由 volatile 修饰,可见性也有保障。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
对于 Key 对应的数组元素的可见性,由 Unsafe 的 getObjectVolatile 方法保证。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
size 操作
put 方法和 remove 方法都会通过 addCount 方法维护 Map 的 size。size 方法通过 sumCount 获取由 addCount 方法维护的 Map 的 size。
下面,我们结合部分源码来看一下:
put 操作
首先通过 hash 找到对应链表过后, 查看是否是第一个 object, 如果是, 直接用 cas 原则插入,无需加锁。
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 这里在整个 map 第一次操作时,初始化 hash 桶, 也就是一个 table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果是第一个 object, 则直接 cas 放入, 不用锁
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
评论