概述
ReentrantReadWriteLock
读写锁是使用 AQS 的集大成者,用了独占模式和共享模式。本文和大家一起理解下ReentrantReadWriteLock
读写锁的实现原理。在这之前建议大家阅读下下面 3 篇关联文章:
深入浅出理解Java并发AQS的独占锁模式
深入浅出理解Java并发AQS的共享锁模式
通俗易懂读写锁ReentrantReadWriteLock的使用
原理概述
上图是ReentrantReadWriteLock
读写锁的类结构图:
实现了ReadWriteLock
接口,该接口提供了获取读锁和写锁的 API。
ReentrantReadWriteLock
读写锁内部的成员变量 readLock 是读锁,指向内部类 ReadLock。
ReentrantReadWriteLock
读写锁内部的成员变量 writeLock 是写锁,指向内部类 WriteLock。
ReentrantReadWriteLock
读写锁内部的成员变量 sync 是继承 AQS 的同步器,他有两个子类FairSync
公平同步器和NoFairSync
非公平同步器,读写锁内部也有一个 sync,他们使用的是同一个 sync。
读写锁用的同一个 sync 同步器,那么他们共享同一个 state, 这样不会混淆吗?
不会,ReentrantReadWriteLock
读写锁使用了 AQS 中 state 值得低 16 位表示写锁得计数,用高 16 位表示读锁得计数,这样就可以使用同一个 AQS 同时管理读锁和写锁。
ReentrantReadWriteLock 类重要成员变量
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;
复制代码
ReentrantReadWriteLock 构造方法
//默认是非公平锁,可以指定参数创建公平锁
public ReentrantReadWriteLock(boolean fair) {
// true 为公平锁
sync = fair ? new FairSync() : new NonfairSync();
// 这两个 lock 共享同一个 sync 实例,都是由 ReentrantReadWriteLock 的 sync 提供同步实现
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
复制代码
Sync 类重要成员变量
// 用来移位
static final int SHARED_SHIFT = 16;
// 高16位的1
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 65535,16个1,代表写锁的最大重入次数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 低16位掩码:0b 1111 1111 1111 1111,用来获取写锁重入的次数
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获取读写锁的读锁分配的总次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁(独占)锁的重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
加锁原理
图解过程
设计一个加锁场景,t1 线程加写锁,t2 线程加读锁,我们看下它们整个加锁得流程。
t1 加写锁w.lock()
成功,占了 state 的低 16 位。
t2 线程执行加读锁 r.lock()
,尝试获取锁,发现已经被写锁占据了,加锁失败。
t2 线程被封装成一个共享模式 Node.SHARED 的节点,加入到 AQS 的队列中。
在阻塞前,t2 线程发现自己是队列中的老二,会尝试再次获取读锁,因为 t1 没有释放,它会失败,然后它会把队列的前驱节点的状态改为-1,然后阻塞自身,也就是 t2 线程。
后面如过有其他线程如 t3,t4 加读锁或者写锁,由于 t1 线程没有释放锁,会变成下面的状态。
上面是整个解锁的流程,下面深入源码验证这个流程。
源码解析
写锁加锁源码
WriteLock 类的 lock()方法是加写锁的入口方法。
static final class NonfairSync extends Sync {
// ... 省略无关代码
// 外部类 WriteLock 方法, 方便阅读, 放在此处
public void lock() {
sync.acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
// 尝试获得写锁失败
!tryAcquire(arg) &&
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
// 进入 AQS 队列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
//获得锁的状态
int c = getState();
// 获得低 16 位, 代表写锁的 state 计数
int w = exclusiveCount(c);
// c不等于0表示加了读锁或者写锁
if (c != 0) {
if (
// c != 0 and w == 0 表示有读锁返回错误,读锁不支持锁升级, 或者
w == 0 ||
// w != 0 说明有写锁,写锁的拥有者不是自己,获取失败
current != getExclusiveOwnerThread()
) {
// 获得锁失败
return false;
}
// 写锁计数超过低 16 位最大数量, 报异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入, 获得锁成功,没有并发,所以不使用 CAS
setState(c + acquires);
return true;
}
if (
// c == 0,说明没有任何锁,判断写锁是否该阻塞,是 false 就尝试获取锁,失败返回 false
writerShouldBlock() ||
// 尝试更改计数失败
!compareAndSetState(c, c + acquires)
) {
// 获得锁失败
return false;
}
// 获得锁成功,设置锁的持有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
// 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
final boolean writerShouldBlock() {
return false;
}
// 公平锁会检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
}
复制代码
2.读锁加锁源码
ReadLock
类的lock()
方法是加读锁的入口方法,调用tryAcquireShared()
方法尝试获取读锁,返回负数,失败,加入到队列中。
// 加读锁的方法入口
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// tryAcquireShared 返回负数, 表示获取读锁失败,加入到队列中
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
tryAcquireShared()
方法是一个模板方法,AQS 类中定义语义,子类实现,如果返回 1,表示获取锁成功,还有剩余资源,返回 0 表示获取成功,没有剩余资源,返回-1 表示失败。
// 尝试以共享模式获取,返回1表示获取锁成功,还有剩余资源,返回0表示获取成功,没有剩余资源,返回-1,表示失败
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount(c) 代表低 16 位, 写锁的 state,成立说明有线程持有写锁
// 写锁的持有者不是当前线程,则获取读锁失败,【写锁允许降级】
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 高 16 位,代表读锁的 state,共享锁分配出去的总次数
int r = sharedCount(c);
// 读锁是否应该阻塞
if (!readerShouldBlock() && r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 尝试增加读锁计数
// 加锁成功
// 加锁之前读锁为 0,说明当前线程是第一个读锁线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 第一个读锁线程是自己就发生了读锁重入
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// cachedHoldCounter 设置为当前线程的 holdCounter 对象,即最后一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
// 说明还没设置 rh
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程的锁重入的对象,赋值给 cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
// 还没重入
else if (rh.count == 0)
readHolds.set(rh);
// 重入 + 1
rh.count++;
}
// 读锁加锁成功
return 1;
}
// 逻辑到这 应该阻塞,或者 cas 加锁失败
// 会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
return fullTryAcquireShared(current);
}
// 非公平锁 readerShouldBlock 偏向写锁一些,看 AQS 阻塞队列中第一个节点是否是写锁,是则阻塞,反之不阻塞
// 防止一直有读锁线程,导致写锁线程饥饿
// true 则该阻塞, false 则不阻塞
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 下面是公平锁的readerShouldBlock
// 公平锁会检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
复制代码
final int fullTryAcquireShared(Thread current) {
// 当前读锁线程持有的读锁次数对象
HoldCounter rh = null;
for (;;) {
int c = getState();
// 说明有线程持有写锁
if (exclusiveCount(c) != 0) {
// 写锁不是自己则获取锁失败
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 条件成立说明当前线程是 firstReader,当前锁是读忙碌状态,而且当前线程也是读锁重入
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
// 最后一个读锁的 HoldCounter
rh = cachedHoldCounter;
// 说明当前线程也不是最后一个读锁
if (rh == null || rh.tid != getThreadId(current)) {
// 获取当前线程的 HoldCounter
rh = readHolds.get();
// 条件成立说明 HoldCounter 对象是上一步代码新建的
// 当前线程不是锁重入,在 readerShouldBlock() 返回 true 时需要去排队
if (rh.count == 0)
// 防止内存泄漏
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 越界判断
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 读锁加锁,条件内的逻辑与 tryAcquireShared 相同
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
复制代码
doAcquireShared()
是在获取读锁失败的时候加入 AQS 队列的逻辑。
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点就头节点就去尝试获取锁
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
// r >= 0 表示获取成功
if (r >= 0) {
//【这里会设置自己为头节点,唤醒相连的后序的共享节点】
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 是否在获取读锁失败时阻塞 park 当前线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有共享资源(例如共享读锁或信号量),为 0 就没有资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取下一个节点
Node s = node.next;
// 如果当前是最后一个节点,或者下一个节点是【等待共享读锁的节点】
if (s == null || s.isShared())
// 唤醒后继节点
doReleaseShared();
}
}
复制代码
解锁原理
图解过程
由于上面 t1 线程加的写锁,所有其他的线程都被阻塞了,只有在 t1 线程解锁以后,其他线程才能被唤醒,我们现在看下 t1 线程被唤醒了,会发生什么?
t1 线程执行解锁w.unlock()
成功,修改 AQS 中的 state。
t1 线程唤醒队列中等待的老二, 为什么不是老大,因为老大是一个空节点,不会设置任何的线程。t2 线程被唤醒后,抢锁成功,修改 state 中高 16 位为 1。
老二的线程节点变为蓝色节点
AQS 中的 state 变为 1_0。
t2 线程恢复运行,设置原来的老二节点为头节点
t2 线程要做的事情还没结束呢,因为是共享模式,它现在释放了,就此时也唤醒队列中的下一个共享节点。
t3 线程恢复去竞争读锁成功,这时 state 的高位+1,变成 2。
这时候 t3 线程所在的 Node 设置为头节点,同时发现对列的下一个节点不是共享节点,而是独占节点,就不会唤醒后面的节点了。
之后 t2 线程和 t3 线程进入尾声,执行r.unlock
操作,state 的计数减一,直到变为 0。
最后写锁线程 t4 被唤醒,去抢占锁成功,整个流程结束。
上面是整个解锁的流程,下面深入源码验证这个流程。
源码解析
写锁释放流程
WriteLock
类的unlock()
方法是入口方法,调用 tryRelease()方法释放锁,如果成功,调用unparkSuccessor()
方法唤醒线程。
public void unlock() {
// 释放锁
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 头节点不为空并且不是等待状态不是 0,唤醒后继的非取消节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
tryRelease()
方法是 AQS 提供的模板方法,返回 true 表示成功,false 失败,由自定义同步器实现。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 设置占用线程为null
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
复制代码
读锁释放流程
ReadLock
类的unlock()
方法是释放共享锁的入口方法。
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
tryReleaseShared()
方法是由 AQS 提供的模板方法,由自定义同步器实现。
protected final boolean tryReleaseShared(int unused) {
//自选
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程,计数为 0 才是真正释放
if (compareAndSetState(c, nextc))
// 返回是否已经完全释放了
return nextc == 0;
}
}
复制代码
调用doReleaseShared()
方法唤醒等待的线程,这个方法调用的地方有两处,还记得吗,一个这是里的解锁,还有一个是前面加共享锁阻塞的地方,唤醒后获取锁成功,也会调用doReleaseShared()
方法。
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// SIGNAL 唤醒后继
if (ws == Node.SIGNAL) {
// 因为读锁共享,如果其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的 head,
// 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
if (h == head)
break;
}
}
复制代码
总结
本文讲解了读写锁ReentrantReadWriteLock
的整个加锁、解锁的实现原理,并从源码的角度深入分析,希望对大家有帮助。
评论