深入浅出理解 Java 并发 AQS 的共享锁模式
概述
这篇文章深入浅出理解Java并发AQS的独占锁模式讲解了 AQS 的独占锁实现原理,那么本篇文章在阐述 AQS 另外一个重要模式,共享锁模式,那什么是共享锁呢?
共享锁可以由多个线程同时获取, 比较典型的就是读锁,读操作并不会产生副作用,所以可以允许多个线程同时对数据进行读操作而不会有线程安全问题,jdk 中的很多并发工具比如 ReadWriteLock 和 CountdownLatch 就是依赖 AQS 的共享锁实现的。
本文重点讲解下 AQS 是如何实现共享锁的。
自定义共享锁例子
首先我们通过 AQS 实现一个非常最最最轻量简单的共享锁例子,帮助大家对共享锁有一个整体的感知。
创建内部类共享帮助锁
ShareSync
类,继承自AbstractQueuedSynchronizer
类,实现了共享锁相关的方法tryAcquireShared()
和tryReleaseShared()
。创建
ShareLock
,提供了lockShare()
加锁和releaseShare()
两个 API。
验证:
一共创建最多共同有 3 个线程共享的共享锁。
创建 5 个线程去竞争共享锁。
运行结果:
运行结果显示每次最多只有 3 个
lock success
,说明同时只有 3 个线程共享。只有在释放共享锁以后,其他线程才能获取锁。
下面对它的实现原理一探究竟。
核心原理机制
共享模式也是由 AQS 提供的,首先我们关注下 AQS 的数据结构。
AQS 内部维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。
AQS 作为一个抽象方法,提供了加锁、和释放锁的框架,这里采用的模板方模式,在上面中提到的tryAcquireShared
、tryReleaseShared
就是和共享模式相关的模板方法。
共享模式的入口方法如下:
源码解析
上图是 AQS 的类结构图,其中标红部分是组成 AQS 的重要成员变量。
成员变量
state 共享变量
AQS 中里一个很重要的字段 state,表示同步状态,是由 volatile 修饰的,用于展示当前临界资源的获锁情况。通过 getState(),setState(),compareAndSetState()三个方法进行维护。
关于 state 的几个要点:
使用 volatile 修饰,保证多线程间的可见性。
getState()、setState()、compareAndSetState()使用 final 修饰,限制子类不能对其重写。
compareAndSetState()采用乐观锁思想的 CAS 算法,保证原子性操作。
CLH 队列(FIFO 队列)
AQS 里另一个重要的概念就是 CLH 队列,它是一个双向链表队列,其内部由 head 和 tail 分别记录头结点和尾结点,队列的元素类型是 Node。
Node 的结构如下:
注意,waitSstatus 负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0 来判断结点的状态是否正常。
exclusiveOwnerThread
AQS 通过继承 AbstractOwnableSynchronizer 类,拥有的属性。表示独占模式下同步器持有的线程。
共享锁获取 acquireShared(int)
acquireShared(int)是共享锁模式下线程获取共享资源的入口方法,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程无法响应中断。
方法的整体流程如下:
tryAcquireShared()尝试获取资源,需要自定义同步器去实现,返回负值代表获取失败;0 代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
如果失败则通过 doAcquireShared()进入等待队列,直到获取到资源为止才返回。
doAcquireShared(int)
此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
doAcquireShared
方法的实现和获取独占锁中的acquireQueued
方法很类似,但是主要有一点不同,那就是线程在被唤醒后,若成功获取到了共享锁,还需要判断共享锁是否还能被其他线程获取,若可以,则继续向后唤醒它的下一个节点对应的线程。
setHeadAndPropagate(Node, int)
该方法主要将当前节点设置为头节点,同时判断条件是否符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。
共享释放 releaseShared(int)
releaseShared(int)
是共享模式下线程释放共享资源的入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
方法的整体流程如下:
tryReleaseShared 尝试释放锁,这由自定义同步器去实现, 返回 true 表示释放成功。
doReleaseShared 唤醒后续队列中等待的节点,
doReleaseShared()
此方法主要用于唤醒队列中等待的共享节点。
逻辑是一个死循环,每次循环中重新读取一次 head,然后保存在局部变量 h 中,再配合
if(h == head) break;
,这样,循环检测到 head 没有变化时就会退出循环。注意,head 变化一定是因为:acquire thread 被唤醒,之后它成功获取锁,然后 setHead 设置了新 head。而且注意,只有通过if(h == head) break;
即 head 不变才能退出循环,不然会执行多次循环。if (h != null && h != tail)
判断队列是否至少有两个 node,如果队列从来没有初始化过(head 为 null),或者 head 就是 tail,那么中间逻辑直接不走,直接判断 head 是否变化了。如果队列中有两个或以上个 node,那么检查局部变量 h 的状态:
如果状态为 SIGNAL,说明 h 的后继是需要被通知的。通过对 CAS 操作结果取反,将
compareAndSetWaitStatus(h, Node.SIGNAL, 0)
和unparkSuccessor(h)
绑定在了一起。说明了只要 head 成功得从 SIGNAL 修改为 0,那么 head 的后继的代表线程肯定会被唤醒了。如果状态为 0,说明 h 的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于 acquire thread 获取锁失败再次设置 head 为 SIGNAL 并再次阻塞,要么由于 acquire thread 获取锁成功而将自己(head 后继)设置为新 head 并且只要 head 后继不是队尾,那么新 head 肯定为 SIGNAL。所以设置这种中间状态的 head 的 status 为 PROPAGATE,让其 status 又变成负数,这样可能被被唤醒线程检测到。
如果状态为 PROPAGATE,直接判断 head 是否变化。
两个 continue 保证了进入那两个分支后,只有当 CAS 操作成功后,才可能去执行 if(h == head) break;,才可能退出循环。
if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新 head,就会再次循环。目的当然是为了再次执行 unparkSuccessor(h),即唤醒队列中第一个等待的线程。
总结
本文主要讲解了 AQS 的共享模式,通过一个自定义简单的 demo 帮助大家深入浅出的理解,同时深入分析了源码实现,希望对大家有帮助。
参考
https://developer.aliyun.com/article/779674
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
https://www.cnblogs.com/tuyang1129/p/12670014.html
https://www.cnblogs.com/waterystone/p/4920797.html
https://www.cnblogs.com/moxiaotao/p/10283347.html
版权声明: 本文为 InfoQ 作者【JAVA旭阳】的原创文章。
原文链接:【http://xie.infoq.cn/article/989e4c7855355d9dd4013fa9f】。文章转载请联系作者。
评论