写点什么

深入浅出理解 Java 并发 AQS 的共享锁模式

作者:JAVA旭阳
  • 2022-10-17
    浙江
  • 本文字数:5966 字

    阅读完需:约 1 分钟

概述

这篇文章深入浅出理解Java并发AQS的独占锁模式讲解了 AQS 的独占锁实现原理,那么本篇文章在阐述 AQS 另外一个重要模式,共享锁模式,那什么是共享锁呢?


共享锁可以由多个线程同时获取, 比较典型的就是读锁,读操作并不会产生副作用,所以可以允许多个线程同时对数据进行读操作而不会有线程安全问题,jdk 中的很多并发工具比如 ReadWriteLock 和 CountdownLatch 就是依赖 AQS 的共享锁实现的。


本文重点讲解下 AQS 是如何实现共享锁的。

自定义共享锁例子

首先我们通过 AQS 实现一个非常最最最轻量简单的共享锁例子,帮助大家对共享锁有一个整体的感知。


@Slf4jpublic class ShareLock {
/** * 共享锁帮助类 */ private static class ShareSync extends AbstractQueuedSynchronizer {
private int lockCount;
/** * 创建共享锁帮助类,最多有count把共享锁,超过了则阻塞 * * @param count 共享锁数量 */ public ShareSync(int count) { this.lockCount = count; }
/** * 尝试获取共享锁 * * @param arg 每次获取锁的数量 * @return 返回正数,表示后续其他线程获取共享锁可能成功; 返回0,表示后续其他线程无法获取共享锁;返回负数,表示当前线程获取共享锁失败 */ @Override protected int tryAcquireShared(int arg) { // 自旋 for (;;) { int c = getState(); // 如果持有锁的数量大于指定数量,返回-1,线程进入阻塞 if(c >= lockCount) { return -1; } int nextc = c + 1; // cas设置成功,返回1,获取到共享锁 if (compareAndSetState(c, nextc)) { return 1; } } }
/** * 尝试释放共享锁 * * @param arg 释放锁的数量 * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false */ @Override protected boolean tryReleaseShared(int arg) { // 自旋操作 for (; ; ) { int c = getState(); // 如果没有锁了 if (c == 0) { return false; } // 否则锁量-1 int nextc = c - 1; // cas修改状态 if (compareAndSetState(c, nextc)) { return true; } } } }
private final ShareSync sync;
public ShareLock(int count) { this.sync = new ShareSync(count); }
/** * 加共享锁 */ public void lockShare() { sync.acquireShared(1); }
/** * 释放共享锁 */ public void releaseShare() { sync.releaseShared(1); }}
复制代码


  • 创建内部类共享帮助锁ShareSync类,继承自AbstractQueuedSynchronizer类,实现了共享锁相关的方法tryAcquireShared()tryReleaseShared()

  • 创建ShareLock,提供了lockShare()加锁和releaseShare()两个 API。


验证:


public static void main(String[] args) throws InterruptedException {        ShareLock shareLock = new ShareLock(3);        for (int i = 0; i < 5; i++) {            new Thread(() -> {                shareLock.lockShare();                try {                    log.info("lock success");                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    shareLock.releaseShare();                    log.info("release success");                }
}, "thread-" + i).start(); } Thread.sleep(10000); }
复制代码


  • 一共创建最多共同有 3 个线程共享的共享锁。

  • 创建 5 个线程去竞争共享锁。


运行结果:



  • 运行结果显示每次最多只有 3 个lock success,说明同时只有 3 个线程共享。

  • 只有在释放共享锁以后,其他线程才能获取锁。


下面对它的实现原理一探究竟。

核心原理机制

共享模式也是由 AQS 提供的,首先我们关注下 AQS 的数据结构。



AQS 内部维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。


AQS 作为一个抽象方法,提供了加锁、和释放锁的框架,这里采用的模板方模式,在上面中提到的tryAcquireSharedtryReleaseShared就是和共享模式相关的模板方法。



共享模式的入口方法如下:


源码解析


上图是 AQS 的类结构图,其中标红部分是组成 AQS 的重要成员变量。

成员变量

  1. state 共享变量


AQS 中里一个很重要的字段 state,表示同步状态,是由 volatile 修饰的,用于展示当前临界资源的获锁情况。通过 getState(),setState(),compareAndSetState()三个方法进行维护。


关于 state 的几个要点:


  • 使用 volatile 修饰,保证多线程间的可见性。

  • getState()、setState()、compareAndSetState()使用 final 修饰,限制子类不能对其重写。

  • compareAndSetState()采用乐观锁思想的 CAS 算法,保证原子性操作。


  1. CLH 队列(FIFO 队列)


AQS 里另一个重要的概念就是 CLH 队列,它是一个双向链表队列,其内部由 head 和 tail 分别记录头结点和尾结点,队列的元素类型是 Node。


private transient volatile Node head;private transient volatile Node tail;
复制代码


Node 的结构如下:


static final class Node {    //共享模式下的等待标记    static final Node SHARED = new Node();    //独占模式下的等待标记    static final Node EXCLUSIVE = null;    //表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。    static final int CANCELLED =  1;    //表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。    static final int SIGNAL    = -1;    //表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。    static final int CONDITION = -2;    //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。    static final int PROPAGATE = -3;    //状态,包括上面的四种状态值,初始值为0,一般是节点的初始状态    volatile int waitStatus;    //上一个节点的引用    volatile Node prev;    //下一个节点的引用    volatile Node next;    //保存在当前节点的线程引用    volatile Thread thread;    //condition队列的后续节点    Node nextWaiter;}
复制代码


注意,waitSstatus 负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0 来判断结点的状态是否正常。


  1. exclusiveOwnerThread


AQS 通过继承 AbstractOwnableSynchronizer 类,拥有的属性。表示独占模式下同步器持有的线程。

共享锁获取 acquireShared(int)

acquireShared(int)是共享锁模式下线程获取共享资源的入口方法,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程无法响应中断。


public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码


方法的整体流程如下:


  1. tryAcquireShared()尝试获取资源,需要自定义同步器去实现,返回负值代表获取失败;0 代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。

  2. 如果失败则通过 doAcquireShared()进入等待队列,直到获取到资源为止才返回。


doAcquireShared(int)


此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。


private void doAcquireShared(int arg) {    //封装线程为共享Node 加入队列尾部    final Node node = addWaiter(Node.SHARED);    //是否成功标志    boolean failed = true;    try {        //等待过程中是否被中断过的标志        boolean interrupted = false;        // 自旋操作        for (;;) {            // 获取前驱节点            final Node p = node.predecessor();            //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的            if (p == head) {                //尝试获取资源                int r = tryAcquireShared(arg);                //成功                if (r >= 0) {                    //将head指向自己,还有剩余资源可以再唤醒之后的线程                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    //如果等待过程中被打断过,此时将中断补上。                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
复制代码


doAcquireShared方法的实现和获取独占锁中的acquireQueued方法很类似,但是主要有一点不同,那就是线程在被唤醒后,若成功获取到了共享锁,还需要判断共享锁是否还能被其他线程获取,若可以,则继续向后唤醒它的下一个节点对应的线程。


setHeadAndPropagate(Node, int)


该方法主要将当前节点设置为头节点,同时判断条件是否符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。


private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;    //head指向自己    setHead(node);     //如果还有剩余量,继续唤醒下一个邻居线程    if (propagate > 0 || h == null || h.waitStatus < 0) {        Node s = node.next;        if (s == null || s.isShared())            // 唤醒操作            doReleaseShared();    }}
复制代码

共享释放 releaseShared(int)

releaseShared(int)是共享模式下线程释放共享资源的入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。


public final boolean releaseShared(int arg) {    //尝试释放资源    if (tryReleaseShared(arg)) {        //唤醒后继结点        doReleaseShared();        return true;    }    return false;}
复制代码


方法的整体流程如下:


  • tryReleaseShared 尝试释放锁,这由自定义同步器去实现, 返回 true 表示释放成功。

  • doReleaseShared 唤醒后续队列中等待的节点,


doReleaseShared()


此方法主要用于唤醒队列中等待的共享节点。


private void doReleaseShared() {    // 自旋操作    for (;;) {        // 获取头节点        Node h = head;        if (h != null && h != tail) {            // 获取节点的等待状态            int ws = h.waitStatus;            // 如果节点等待状态是-1, -1表示有责任唤醒后续节点的状态            if (ws == Node.SIGNAL) {                // cas修改当前节点的等待状态为0                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                //唤醒后续节点                unparkSuccessor(h);            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;        }        if (h == head)// head发生变化            break;    }}
复制代码


  • 逻辑是一个死循环,每次循环中重新读取一次 head,然后保存在局部变量 h 中,再配合if(h == head) break;,这样,循环检测到 head 没有变化时就会退出循环。注意,head 变化一定是因为:acquire thread 被唤醒,之后它成功获取锁,然后 setHead 设置了新 head。而且注意,只有通过if(h == head) break;即 head 不变才能退出循环,不然会执行多次循环。

  • if (h != null && h != tail)判断队列是否至少有两个 node,如果队列从来没有初始化过(head 为 null),或者 head 就是 tail,那么中间逻辑直接不走,直接判断 head 是否变化了。

  • 如果队列中有两个或以上个 node,那么检查局部变量 h 的状态:


  • 如果状态为 SIGNAL,说明 h 的后继是需要被通知的。通过对 CAS 操作结果取反,将compareAndSetWaitStatus(h, Node.SIGNAL, 0)unparkSuccessor(h)绑定在了一起。说明了只要 head 成功得从 SIGNAL 修改为 0,那么 head 的后继的代表线程肯定会被唤醒了。

  • 如果状态为 0,说明 h 的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于 acquire thread 获取锁失败再次设置 head 为 SIGNAL 并再次阻塞,要么由于 acquire thread 获取锁成功而将自己(head 后继)设置为新 head 并且只要 head 后继不是队尾,那么新 head 肯定为 SIGNAL。所以设置这种中间状态的 head 的 status 为 PROPAGATE,让其 status 又变成负数,这样可能被被唤醒线程检测到。


  • 如果状态为 PROPAGATE,直接判断 head 是否变化。

  • 两个 continue 保证了进入那两个分支后,只有当 CAS 操作成功后,才可能去执行 if(h == head) break;,才可能退出循环。

  • if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新 head,就会再次循环。目的当然是为了再次执行 unparkSuccessor(h),即唤醒队列中第一个等待的线程。

总结

本文主要讲解了 AQS 的共享模式,通过一个自定义简单的 demo 帮助大家深入浅出的理解,同时深入分析了源码实现,希望对大家有帮助。

参考

https://developer.aliyun.com/article/779674


https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html


https://www.cnblogs.com/tuyang1129/p/12670014.html


https://www.cnblogs.com/waterystone/p/4920797.html


https://www.cnblogs.com/moxiaotao/p/10283347.html


https://blog.csdn.net/anlian523/article/details/106319538/

发布于: 刚刚阅读数: 6
用户头像

JAVA旭阳

关注

还未添加个人签名 2018-07-18 加入

还未添加个人简介

评论

发布
暂无评论
深入浅出理解Java并发AQS的共享锁模式_Java_JAVA旭阳_InfoQ写作社区