☕【Java 技术之旅】如何彻底认识 AQS 的原理 (上篇)
📕每日一句📕
不知道大家喜不喜欢我新的写作风格呢,标题风格随便设计了一下,哈哈,希望给一些意见哦,
📕原子特性📕
在研究 AQS 框架时,会发现这个类很多地方都使用了 CAS 操作,在并发实现中 CAS 操作必须具备原子性,而且是硬件级别的原子性,Java 被隔离在硬件之上,明显力不从心,这时为了能直接操作操作系统层面,肯定要通过用 C++编写的 native 本地方法来扩展实现。
🚀Unsafe 类的引入🚀
JDK 提供了一个类来满足 CAS 的要求,sun.misc.Unsafe,从名字上可以大概知道它用于执行低级别、不安全的操作,AQS 就是使用此类完成硬件级别的原子操作。最底层原子性处理器机器指令主要为:[LOCK] CMPXCHG
。
🤖Unsafe 的定义🤖
Unsafe 是一个很强大的类,它可以分配内存、释放内存、可以定位对象某字段的位置、可以修改对象的字段值、可以使线程挂起、使线程恢复、可进行硬件级别原子的 CAS 操作等等。
🤖Unsafe 的使用🤖
但平时我们没有这么特殊的需求去使用它,而且必须在受信任代码(一般由 JVM 指定)中调用此类,例如直接 Unsafe unsafe = Unsafe.getUnsafe();获取一个 Unsafe 实例是不会成功的,因为这个类的安全性很重要,设计者对其进行了如下判断,它会检测调用它的类是否由启动类加载器 Bootstrap ClassLoader(它的类加载器为 null)加载,由此保证此类只能由 JVM 指定的类使用。
public static Unsafe getUnsafe() {
Class cc = sun.reflect.Reflection.getCallerClass(2);
if (cc.getClassLoader() != null)
throw new SecurityException("Unsafe");
return theUnsafe;
}
当然可以通过反射绕过上面的限制,用下面的 getUnsafeInstance 方法可以获取 Unsafe 实例,这段代码演示了如何获取 java 对象的相对地址偏移量及使用 Unsafe 完成 CAS 操作,最终输出的是 flag 字段的内存偏移量及 CAS 操作后的值。分别为 8 和 101。
public class UnsafeTest {
private int flag = 100;
private static long offset;
private static Unsafe unsafe = null;
static{
try{
unsafe= getUnsafeInstance();
offset= unsafe.objectFieldOffset(UnsafeTest.class .getDeclaredField("flag"));
}catch (Exception e) {
e.printStackTrace();
}
}
private boolean doSwap(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(this, offset, expect, update);
}
public int getFlag() {
return flag;
}
private static Unsafe getUnsafeInstance() throws
SecurityException,NoSuchFieldException,IllegalArgumentException,IllegalAccessException{
Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeInstance.setAccessible(true);
return(Unsafe) theUnsafeInstance.get(Unsafe.class);
}
public static void main(String[] args) throws Exception {
int expect = 100;
int update = 101;
UnsafeTest unsafeTest = new UnsafeTest();
System.out.println("unsafeTest 对象的 flag 字段的地址偏移量为:"+offset);
unsafeTest.doSwap(offset,expect, update);
System.out.println("CAS 操作后的 flag 值为:" +unsafeTest.getFlag());
}
}
Unsafe 类让我们明白了 java 是如何实现对操作系统操作的,一般我们使用 java 是不需要在内存中处理 java 对象及内存地址位置的,但有的时候我们确实需要知道 java 对象相关的地址,于是我们使用 Unsafe 类,尽管 java 对其提供了足够的安全管理。
Java 语言的设计者们极力隐藏涉及底层操作系统的相关操作,但此节我们本着对 AQS 框架实现的目的,不得不剖析了 Unsafe 类,因为 AQS 里面即是使用 Unsafe 获取对象字段的地址偏移量、相关原子操作来实现 CAS 操作的。
📕中断支持📕
线程的定义给我们提供了并发执行多个任务的方式,大多数情况下我们会让每个任务都自行执行结束,这样能保证事务的一致性,但是有时我们希望在任务执行中取消任务,使线程停止。
🚀中断的类型🚀
Java 也没有提供任何可靠的方法终止线程的执行。线程调度策略中有抢占式和协作式两个概念,与之类似的是中断机制也有协作式和抢占式。
历史上 Java 曾经使用 stop()方法终止线程的运行,他们属于抢占式中断。但它引来了很多问题,早已被 JDK 弃用。
stop()方法则意味着将释放该线程所持的所有锁,而且锁的释放不可控。即刻将抛出 ThreadDeath 异常,不管程序运行到哪里,但它不总是有效,如果存在被终止线程的锁竞争;
🚀抢占式中断🚀
第一点,将导致数据一致性问题,这个很好理解,一般数据加锁就是为了保护数据的一致性,而线程停止伴随所持锁的释放,很可能导致被保护的数据呈现不一致性,最终导致程序运算出现错误。
第二点,比较模糊,它要说明的问题就是可能存在某种情况 stop()方法不能及时终止线程,甚至可能终止不了线程。
看如下代码会发生什么情况,看起来线程 mt 因为执行了 stop()方法将停止,按理来说就算 execute 方法是一个死循环,只要执行了 stop()方法线程将结束,无限循环也将结束。
其实不然,因为我们在 execute 方法使用了 synchronized 修饰,同步方法表示在执行 execute 时将对 mt 对象进行加锁,另外,Thread 的 stop()方法也是同步的,于是在调用 mt 线程的 stop()方法前必须获取 mt 对象锁,但 mt 对象锁被 execute 方法占用,且不释放,于是 stop()方法永远获取不了 mt 对象锁,最后得到一个结论,使用 stop()方法停止线程不可靠,它未必总能有效终止线程。
public class ThreadStop {
public static void main(String[] args) {
Thread mt= new MyThread();
mt.start();
try {
Thread.currentThread().sleep(100);
} catch(InterruptedException e) {
e.printStackTrace();
}
mt.stop();
}
static class MyThread extends Thread {
public void run() {
execute();
}
private synchronized void execute() {
while(true) {
}
}
}
}
🚀协作式中断🚀
经历了很长时间的发展,Java 最终选择用一种协作式的中断机制实现中断。协作式中断的原理很简单,其核心是先对中断标识进行标记,某线程设置某线程的中断标识位,被标记了中断位的线程在适当的时间节点会抛出异常,捕获异常后做相应的处理。实现协作中断有三个要点需要考虑:
在 Java 层面实现轮询中断标识还是在 JVM 中实现;
轮询的颗粒度的控制,一般颗粒度要尽量小周期尽量短以保证响应的及时性;
轮询的时间节点的选择,其实就是在哪些方法里面轮询,例如,JVM 将 Thread 类的 wait()、sleep()、join()等方法都实现中断标识的轮询操作。
中断标识放在哪里?中断是针对线程实例而言,从 Java 层面上看,标识变量放到线程中肯定再合适不过了,但由于由 JVM 维护,所以中断标识具体由本地方法维护。在 Java 层面仅仅留下几个 API 用于操作中断标识,如下。
public class Thread{
public void interrupt() {……} //设置线程为中断状态
public Boolean isInterrupted() {……} //判断线程状态是否中断
public static Boolean interrupted() {……} //清除当前线程中断状态并返回它之前的值
}
通过 interrupt()方法设置中断标识:
如果在非阻塞线程则仅仅只是改变了中断状态,线程将继续往下运行,
如果在可取消阻塞线程中,如正在执行 sleep()、wait()、join()等方法的线程则会因为被设置了中断状态而抛出 InterruptedException 异常,程序对此异常捕获处理。(Lock 也有相关的 LockInterrupt 方法)
第一是轮询在哪个层面实现,这个没有特别的要求,在实际中只要不出现逻辑问题,在 Java 层面或 JVM 层面实现都是可以的。例如,常用的线程睡眠、等待等操作是通过 JVM 实现,而 AQS 框架里面的中断则放到 Java 实现(AQS 和用户自定义的判断),不管在哪个层面上去实现,在轮询过程中都一定要能保证不会产生阻塞。
第二是要保证轮询的颗粒度尽可能的小周期尽可能短,这关系到中断响应的速度。
第三点是关于轮询的时间节点的选取。
针对三要点来看看 AQS 框架中是如何支持中断的,主要在等待获取锁的过程中提供中断操作,下面是伪代码。只需增加加红加粗部分逻辑即可实现中断支持,在循环体中每次循环都对当前线程中断标识位进行判断,一旦检查到线程被标记为中断则抛出 InterruptedException 异常,高层代码对此异常捕获处理即完成中断处理。
总结起来就是 AQS 框架获取锁的中断机制是在 Java 层面实现的,轮询时间节点选择在不断做尝试获取锁操作过程中,每个循环的颗粒度比较小,响应速度得以保证,且循环过程不存在阻塞风险,保证中断检测不会失效。
if(尝试获取锁失败) {
创建 node
使用 CAS 方式把 node 插入到队列尾部
while(true){
if(尝试获取锁成功并且 node 的前驱节点为头节点){
把当前节点设置为头节点
跳出循环
}else{
使用 CAS 方式修改 node 前驱节点的 waitStatus 标识为 signal
if(修改成功){
挂起当前线程
if(当前线程中断位标识为 true)
抛出 InterruptedException 异常
}
}
}
判断线程是否处于中断状态其实很简单,只需使用 Thread.interrupted()操作,如果为 true 则说明线程处于中断位,并清除中断位。至此 AQS 实现了支持中断的获取锁操作。
🚀中断的总结🚀
从 java 发展过程分析了抢占式中断及协作式中断,由于抢占式存在一些缺陷现在已不推荐使用,而协作式中断作为推荐做法,尽管在响应时间较长,但其具有无可比拟的优势。
协作式中断我们可以在 JVM 层面实现,同样也可以在 Java 层面实现,例如 AQS 框架的中断即是在 Java 层面实现,不过如果继续深究是因为 Java 留了几个 API 供我们操作线程的中断标识位,这才使 Java 层面实现中断操作得以实现。
对于 java 的协作式中断机制有人肯定有人批评,批评者说 java 没有抢占式中断机制,且协作式中断机制迫使开发者必须维护中断状态,迫使开发者必须处理 InterruptedException。
但肯定者则认为,虽然协作式中断机制推迟了中断请求的处理,但它为开发人员提供更灵活的中断处理策略,响应性可能不及抢占式,但程序健壮性更强。
📕阻塞/唤醒📕
根据前面的线程阻塞与唤醒小节知道,目前在 Java 语言层面能实现阻塞唤醒的方式一共有三种:suspend 与 resume 组合、wait 与 notify 组合、park 与 unpark 组合。
suspend 与 resume 因为存在无法解决的竟态问题而被 Java 废弃。
wait 与 notify 也存在竟态条件,wait 必须在 notify 之前执行,假如一个线程先执行 notify 再执行 wait 将可能导致一个线程永远阻塞,如此一来,必须要提出另外一种解决方案
park 与 unpark 组合,它位于 juc 包下,应该也是因为当时编写 juc 时发现 java 现有方式无法解决问题而引入的新阻塞唤醒方式,由于 park 与 unpark 使用的是许可机制,许可最大为 1,所以 unpark 与 park 操作不会累加,而且 unpark 可以在 park 之前执行,如 unpark 先执行,后面 park 将不阻塞。
Java 真正意义上的语言层面上的并发编程应该从并发专家 Doug Lea 领导的 JSR-166 开始,此规范请求向 JCP 提交了向 Java 语言中添加并发编程工具,即在 jdk 中添加 java.util.concurrent 工具包供开发者使用,开发者可以轻松构建自己的同步器,而在此之前并发过程中同步都只能依靠 JVM 内置的管程。
AQS 框架的阻塞和唤醒显然使用的是 LockSupport 类的 park 与 unpark 方法,分别调用的是 Unsafe 类的 park 与 unpark 本地方法。逻辑如下:
if(尝试获取锁失败) {
创建 node
使用 CAS 方式把 node 插入到队列尾部
while(true){
if(尝试获取锁成功 并且 node 的前驱节点为头节点){
把当前节点设置为头节点
跳出循环
}else{
使用 CAS 方式修改 node 前驱节点的 waitStatus 标识为 signal
if(修改成功){
LockSupport.park();
}
}
}
if(尝试释放锁成功){
LockSupport.unpark(下一节点包含的线程);
}
一条线程参与锁竞争,首先先尝试获取锁,失败的话创建节点并插入队列尾部,然后再次尝试获取锁,如若成功则不做其他任务处理直接返回,否则设置节点状态为待运行状态,最后使用 LockSupport 的 park 阻塞当前线程。
前驱节点运行完后将尝试唤醒后继节点,使用的即是 LockSupport 的 unpark 唤醒。
总的来说,java 提供的 juc 并发工具包,在阻塞与唤醒操作方面由于 suspend 与 resume 存在各种各样问题,必须使用 LockSupport 中提供的方法操作。
AQS 框架提供的另外一个优秀机制是锁获取超时的支持,当大量线程对某一锁竞争时可能导致某些线程在很长一段时间都获取不了锁,在某些场景下可能希望如果线程在一段时间内不能成功获取锁就取消对该锁的等待以提高性能,这时就需要用到超时机制。
在 JDK1.5 之前还没有 juc 工具,当时的并发控制职能通过 JVM 内置的 synchronized 关键词实现锁,但对一些特殊要求却力不从心,例如超时取消控制。
JDK1.5 开始引入 juc 工具完美解决了此问题,而这正得益于并发基础框架 AQS 提供了超时的支持。
为了更精确地保证时间间隔统计的准确性,实现时使用了 System.nanoTime()更为精确的方法,它能精确到纳秒级别。超时机制的思想就是在不断进行锁竞争的同时记录竞争的时间,一旦时间段超过指定的时间则停止轮询直接返回,返回前对等待队列中对应节点进行取消操作。往下看实现的逻辑,
if(尝试获取锁失败) {
long lastTime = System.nanoTime();
创建 node
使用 CAS 方式把 node 插入到队列尾部
while(true){
if(尝试获取锁成功 并且 node 的前驱节点为头节点){
把当前节点设置为头节点
跳出循环
}else{
if (nanosTimeout <= 0){
取消等待队列中此节点
跳出循环
}
使用 CAS 方式修改 node 前驱节点的 waitStatus 标识为 signal
if(修改成功)
if(nanosTimeout > spinForTimeoutThreshold)
阻塞当前线程 nanosTimeout 纳秒
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
else{
使用 CAS 方式修改 node 前驱节点的 waitStatus 标识为 signal
if(修改成功){
LockSupport.park();
}
}
}
上面正是在前面章节锁的获取逻辑中添加超时处理,核心逻辑是不断循环减去处理的时间消耗,一旦小于 0 就取消节点并跳出循环,其中有两点必须要注意,一个是真正的阻塞时间应该是扣除了竞争入队的时间后剩余的时间,保证阻塞事件的准确性,我们可以看到每次循环都会减去相应的处理时间;
另外一个是关于 spinForTimeoutThreshold 变量阀值,它是决定使用自旋方式消耗时间还是使用系统阻塞方式消耗时间的分割线,juc 工具包作者通过测试将默认值设置为 1000ns,即如果在成功插入等待队列后剩余时间大于 1000ns 则调用系统底层阻塞,否则不调用系统底层,取而代之的是仅仅让之在 Java 应用层不断循环消耗时间,属于优化的措施。
至此 AQS 框架在获取锁的过程中提供了超时机制,超时的支持让 Java 在并发方面提供了更完善的机制,更多的并发策略满足开发者更多需求。
评论