写点什么

AQS 介绍和原理分析(下)

用户头像
追风少年
关注
发布于: 2021 年 07 月 02 日
AQS介绍和原理分析(下)

概述

本文首先用导图总结一下上一篇文档写的内容,然后通过 Mutex,ReentrantLock 说明如何使用 AQS,同时关注公平和非公平这个条件,最后关注一下可中断和条件这两个特性。上一篇的总结:



Mutex&ReentrantLock&公平/非公平

在 jdk8 的 current 包中并没有找到 Doug Lea 的 Mutex 类,不过这个不影响,就当我们自己写的也一样。


class Mutex implements Lock, java.io.Serializable {    // 自定义同步器    private static class Sync extends AbstractQueuedSynchronizer {        // 判断是否锁定状态        protected boolean isHeldExclusively() {            return getState() == 1;        }
// 尝试获取资源,立即返回。成功则返回true,否则false。 public boolean tryAcquire(int acquires) { assert acquires == 1; // 这里限定只能为1个量 if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入! setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源 return true; } return false; }
// 尝试释放资源,立即返回。成功则为true,否则false。 protected boolean tryRelease(int releases) { assert releases == 1; // 限定为1个量 if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断! throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0);//释放资源,放弃占有状态 return true; } }
// 真正同步类的实现都依赖继承于AQS的自定义同步器! private final Sync sync = new Sync();
//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。 public void lock() { sync.acquire(1); }
//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。 public boolean tryLock() { return sync.tryAcquire(1); }
//unlock<-->release。两者语义一样:释放资源。 public void unlock() { sync.release(1); }
//锁是否占有状态 public boolean isLocked() { return sync.isHeldExclusively(); }}
复制代码


读过 AQS 代码前面的注释,就会了解这种实现就是注释里面建议的 AbstractQueuedSynchronizer 使用方式。


  • 通过内部类 Sync 继承 AbstractQueuedSynchronizer,通过实现 tryAcquire()/tryRelease()或者 ryAcquireShared()/tryReleaseShared()方法实现独占/共享方式获取锁。

  • 定义私有变量 sync,提供外部接口 lock()/unlock()。


接下来再来看一下 ReentrantLock(主要关注它关于公平和非公平的特性)


ReentrantLock 把所有 Lock 接口的操作都委派到一个 Sync 类上,该类继承了 AbstractQueuedSynchronizer:


static abstract class Sync extends AbstractQueuedSynchronizer
复制代码


Sync 又有两个子类:


final static class NonfairSync extends Syncfinal static class FairSync extends Sync  
复制代码


显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁



非公平锁


static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
复制代码


公平锁


static final class FairSync extends Sync {        final void lock() {            acquire(1);        }        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }
复制代码


共同的 Sync


abstract static class Sync extends AbstractQueuedSynchronizer {              final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final boolean isLocked() { return getState() != 0; } }
复制代码


这里只贴出了部分代码。小总结:


  • 这里和 Mutex 的实现方式相似,都是通过内部的 Sync 类来实现。

  • 公平锁和非公平锁的区别只是在获取锁的时候(在入 AQS CLH 队列之前)代码有点区别,其它的都是一样的。一旦进入了队列,所有线程都是按照队列中先来后到的顺序请求锁。

  • 非公平锁的代码中总是优先尝试当前是否有线程持有锁,一旦没有任何线程持有锁,那么非公平锁就霸道的尝试将锁“占为己有”。


ps:小插曲。这个时候如果有人再问 Lock 和 Synchronized 的区别,可以这么回答了:


AbstractQueuedSynchronizer 通过构造一个基于阻塞的 CLH 队列容纳所有的阻塞线程,而对该队列的操作均通过 Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock 实现了偏向锁的功能。synchronized 的底层也是一个基于 CAS 操作的等待队列,但 JVM 实现的更精细,把等待队列分为 ContentionList 和 EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但 synchronized 还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而 Lock 则完全依靠系统阻塞挂起等待线程。当然 Lock 比 synchronized 更适合在应用层扩展,可以继承 AbstractQueuedSynchronizer 定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock 对应的 Condition 也比 wait/notify 要方便的多、灵活的多。


但是后续的问题,只能祝你好运了……(学海无涯啊)

可中断

前提是 AQS 的 acquire 是不响应中断的(关于 Intereput 的知识点,自行百度一下即可,需要知道线程在运行状态下是不响应中断的)。

可中断和超时的实现都是在 AbstractQueuedSynchronizer,没有交给子类来实现。如果在获取一个通过网络交互实现的锁时,这个锁资源突然进行了销毁,那么使用 acquireInterruptibly 的获取方式就能够让该时刻尝试获取锁的线程提前返回。而同步器的这个特性被实现 Lock 接口中的 lockInterruptibly 方法。根据 Lock 的语义,在被中断时,lockInterruptibly 将会抛出 InterruptedException 来告知使用者

public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 检测中断标志位
            if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码


上述逻辑主要包括:

1. 检测当前线程是否被中断;

判断当前线程的中断标志位,如果已经被中断了,那么直接抛出异常并将中断标志位设置为 false。

2. 尝试获取状态;

调用 tryAcquire 获取状态,如果顺利会获取成功并返回。

3. 构造节点并加入 sync 队列;

获取状态失败后,将当前线程引用构造为节点并加入到 sync 队列中。退出队列的方式在没有中断的场景下和 acquireQueued 类似,当头结点是自己的前驱节点并且能够获取到状态时,即可以运行,当然要将本节点设置为头结点,表示正在运行。

4. 中断检测。

在每次被唤醒时,进行中断检测,如果发现当前线程被中断,那么抛出 InterruptedException 并退出循环。

# 超时

针对超时控制这部分的实现,主要需要计算出睡眠的 delta,也就是间隔值。间隔可以表示为 nanosTimeout = 原有 nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。如果 nanosTimeout 大于 0,那么还需要使当前线程睡眠,反之则返回 false。

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head &amp;&amp; tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout &lt;= 0)                 return false;             if (shouldParkAfterFailedAcquire(p, node) &amp;&amp; nanosTimeout &gt; spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            //计算时间,当前时间减去睡眠之前的时间得到睡眠的时间,然后被
            //原有超时时间减去,得到了还应该睡眠的时间
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码


1. 加入 sync 队列;

将当前线程构造成为节点 Node 加入到 sync 队列中。

2. 条件满足直接返回;

退出条件判断,如果前驱节点是头结点并且成功获取到状态,那么设置自己为头结点并退出,返回 true,也就是在指定的 nanosTimeout 之前获取了锁。

3. 获取状态失败休眠一段时间;

通过 LockSupport.unpark 来指定当前线程休眠一段时间。

4. 计算再次休眠的时间;

唤醒后的线程,计算仍需要休眠的时间,该时间表示为 nanosTimeout = 原有 nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。其中 now – lastTime 表示这次睡眠所持续的时间。

5. 休眠时间的判定。

唤醒后的线程,计算仍需要休眠的时间,并无阻塞的尝试再获取状态,如果失败后查看其 nanosTimeout 是否大于 0,如果小于 0,那么返回完全超时,没有获取到锁。 如果 nanosTimeout 小于等于 1000L 纳秒,则进入快速的自旋过程。那么快速自旋会造成处理器资源紧张吗?结果是不会,经过测算,开销看起来很小,几乎微乎其微。Doug Lea 应该测算了在线程调度器上的切换造成的额外开销,因此在短时 1000 纳秒内就让当前线程进入快速自旋状态,如果这时再休眠相反会让 nanosTimeout 的获取时间变得更加不精确。

![Qz7aVLTPSvEul5y](https://i.loli.net/2021/07/02/Qz7aVLTPSvEul5y.png)


条件中断


参考

https://www.cnblogs.com/onlywujun/articles/3531568.html

http://ifeve.com/introduce-abstractqueuedsynchronizer/

发布于: 2021 年 07 月 02 日阅读数: 15
用户头像

追风少年

关注

昨夜雨疏风骤,却道代码依旧。 2018.11.13 加入

非典型性程序员一枚,某互联网大厂资深开发,专注【Java技术领域、分布式技术领域、云原生技术实践】,喜欢分享金融和日常生活。 对每一行文字负责,对每一行代码负责。欢迎来踩,公众号:帅气的追风少年

评论

发布
暂无评论
AQS介绍和原理分析(下)