概述
本文首先用导图总结一下上一篇文档写的内容,然后通过 Mutex,ReentrantLock 说明如何使用 AQS,同时关注公平和非公平这个条件,最后关注一下可中断和条件这两个特性。上一篇的总结:
Mutex&ReentrantLock&公平/非公平
在 jdk8 的 current 包中并没有找到 Doug Lea 的 Mutex 类,不过这个不影响,就当我们自己写的也一样。
class Mutex implements Lock, java.io.Serializable {
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取资源,立即返回。成功则返回true,否则false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}
// 尝试释放资源,立即返回。成功则为true,否则false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定为1个量
if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//释放资源,放弃占有状态
return true;
}
}
// 真正同步类的实现都依赖继承于AQS的自定义同步器!
private final Sync sync = new Sync();
//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}
//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
public boolean tryLock() {
return sync.tryAcquire(1);
}
//unlock<-->release。两者语义一样:释放资源。
public void unlock() {
sync.release(1);
}
//锁是否占有状态
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
复制代码
读过 AQS 代码前面的注释,就会了解这种实现就是注释里面建议的 AbstractQueuedSynchronizer 使用方式。
通过内部类 Sync 继承 AbstractQueuedSynchronizer,通过实现 tryAcquire()/tryRelease()或者 ryAcquireShared()/tryReleaseShared()方法实现独占/共享方式获取锁。
定义私有变量 sync,提供外部接口 lock()/unlock()。
接下来再来看一下 ReentrantLock(主要关注它关于公平和非公平的特性)
ReentrantLock 把所有 Lock 接口的操作都委派到一个 Sync 类上,该类继承了 AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer
复制代码
Sync 又有两个子类:
final static class NonfairSync extends Sync
final static class FairSync extends Sync
复制代码
显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁
非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
共同的 Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final boolean isLocked() {
return getState() != 0;
}
}
复制代码
这里只贴出了部分代码。小总结:
这里和 Mutex 的实现方式相似,都是通过内部的 Sync 类来实现。
公平锁和非公平锁的区别只是在获取锁的时候(在入 AQS 的 CLH 队列之前)代码有点区别,其它的都是一样的。一旦进入了队列,所有线程都是按照队列中先来后到的顺序请求锁。
非公平锁的代码中总是优先尝试当前是否有线程持有锁,一旦没有任何线程持有锁,那么非公平锁就霸道的尝试将锁“占为己有”。
ps:小插曲。这个时候如果有人再问 Lock 和 Synchronized 的区别,可以这么回答了:
AbstractQueuedSynchronizer 通过构造一个基于阻塞的 CLH 队列容纳所有的阻塞线程,而对该队列的操作均通过 Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock 实现了偏向锁的功能。synchronized 的底层也是一个基于 CAS 操作的等待队列,但 JVM 实现的更精细,把等待队列分为 ContentionList 和 EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但 synchronized 还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而 Lock 则完全依靠系统阻塞挂起等待线程。当然 Lock 比 synchronized 更适合在应用层扩展,可以继承 AbstractQueuedSynchronizer 定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock 对应的 Condition 也比 wait/notify 要方便的多、灵活的多。
但是后续的问题,只能祝你好运了……(学海无涯啊)
可中断
前提是 AQS 的 acquire 是不响应中断的(关于 Intereput 的知识点,自行百度一下即可,需要知道线程在运行状态下是不响应中断的)。
可中断和超时的实现都是在 AbstractQueuedSynchronizer,没有交给子类来实现。如果在获取一个通过网络交互实现的锁时,这个锁资源突然进行了销毁,那么使用 acquireInterruptibly 的获取方式就能够让该时刻尝试获取锁的线程提前返回。而同步器的这个特性被实现 Lock 接口中的 lockInterruptibly 方法。根据 Lock 的语义,在被中断时,lockInterruptibly 将会抛出 InterruptedException 来告知使用者
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 检测中断标志位
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
上述逻辑主要包括:
1. 检测当前线程是否被中断;
判断当前线程的中断标志位,如果已经被中断了,那么直接抛出异常并将中断标志位设置为 false。
2. 尝试获取状态;
调用 tryAcquire 获取状态,如果顺利会获取成功并返回。
3. 构造节点并加入 sync 队列;
获取状态失败后,将当前线程引用构造为节点并加入到 sync 队列中。退出队列的方式在没有中断的场景下和 acquireQueued 类似,当头结点是自己的前驱节点并且能够获取到状态时,即可以运行,当然要将本节点设置为头结点,表示正在运行。
4. 中断检测。
在每次被唤醒时,进行中断检测,如果发现当前线程被中断,那么抛出 InterruptedException 并退出循环。
# 超时
针对超时控制这部分的实现,主要需要计算出睡眠的 delta,也就是间隔值。间隔可以表示为 nanosTimeout = 原有 nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。如果 nanosTimeout 大于 0,那么还需要使当前线程睡眠,反之则返回 false。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
//计算时间,当前时间减去睡眠之前的时间得到睡眠的时间,然后被
//原有超时时间减去,得到了还应该睡眠的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
1. 加入 sync 队列;
将当前线程构造成为节点 Node 加入到 sync 队列中。
2. 条件满足直接返回;
退出条件判断,如果前驱节点是头结点并且成功获取到状态,那么设置自己为头结点并退出,返回 true,也就是在指定的 nanosTimeout 之前获取了锁。
3. 获取状态失败休眠一段时间;
通过 LockSupport.unpark 来指定当前线程休眠一段时间。
4. 计算再次休眠的时间;
唤醒后的线程,计算仍需要休眠的时间,该时间表示为 nanosTimeout = 原有 nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。其中 now – lastTime 表示这次睡眠所持续的时间。
5. 休眠时间的判定。
唤醒后的线程,计算仍需要休眠的时间,并无阻塞的尝试再获取状态,如果失败后查看其 nanosTimeout 是否大于 0,如果小于 0,那么返回完全超时,没有获取到锁。 如果 nanosTimeout 小于等于 1000L 纳秒,则进入快速的自旋过程。那么快速自旋会造成处理器资源紧张吗?结果是不会,经过测算,开销看起来很小,几乎微乎其微。Doug Lea 应该测算了在线程调度器上的切换造成的额外开销,因此在短时 1000 纳秒内就让当前线程进入快速自旋状态,如果这时再休眠相反会让 nanosTimeout 的获取时间变得更加不精确。
![Qz7aVLTPSvEul5y](https://i.loli.net/2021/07/02/Qz7aVLTPSvEul5y.png)
条件中断
参考
https://www.cnblogs.com/onlywujun/articles/3531568.html
http://ifeve.com/introduce-abstractqueuedsynchronizer/
评论