是它,是它,就是它,并发包的基石;
一、概述
闲来不卷,随便聊一点。
一般情况下,大家系统中至少也是 JDK8 了,那想必对于 JDK5 加入的一系列功能并不陌生吧。那时候重点加入了java.util.concurrent
并发包,我们简称为 JUC。JUC 下提供了很多并发编程实用的工具类,比如并发锁 lock、原子操作 atomic、线程池操作 Executor 等等。下面,我对 JUC 做了整理,大致分为下面几点:
基于 JDK8,今天重点来聊下 JUC 并发包下的一个类,AbstractQueuedSynchronizer
。
首先,浅显的从名字上看,抽象的队列同步器;实际上,这名字也跟它的作用如出一辙。抽象,即需要被继承;队列同步器,其内部维护了一个队列,供线程入队等待;最终实现多个线程访问共享资源的功能。
二、源码解析
进入AbstractQueuedSynchronizer
内部,需要掌握三个重要的属性:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
复制代码
我们调试 AQS 的源码,必须寻找一个源码调试的切入点,我这里用我们并发编程常用的 Lock 锁作为调试 AQS 的切入点,因为这是解决线程安全问题常用的手段之一。
2.1、源码的切入点
AQS 的源码调试,从Lock
接口出发,JDK 源码定义如下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
复制代码
从源码中看到,Lock
是一个接口,所以该接口会有一些实现类,其中有一个实现类ReentrantLock
,可重入锁,想必大家都不会陌生。
2.2、ReentrantLock 的 lock 方法
通过跟踪源码可以看到,ReentrantLock#lock 内部实现貌似比较简单,只有简短的一行代码
public void lock() {
sync.lock();
}
复制代码
其实内部是维护了一个Sync
的抽象类,调用的是Sync
的 lock()方法。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// ...
}
复制代码
可以看到,Sync
也是个抽象类,它有两个实现类:NonfairSync
和FairSync
,这里其实就引出了我们今天的主角,AbstractQueuedSynchronizer
,Sync
继承了它。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
下面我整理了这一系列类的 UML 图
通过类图可知,lock()方法最终调用的是ReentrantLock
类下,内部类NonfairSync
或FairSync
的 lock 方法;对于这两个类,前者叫非公平锁,后者叫公平锁。通过ReentrantLock
的构造器可知,默认使用NonfairSync
类。
public ReentrantLock() {
sync = new NonfairSync();
}
复制代码
从NonfairSync
类的 lock 方法出发,引出第一个 AQS 下的方法 compareAndSetState。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
复制代码
从 compareAndSetState 方法的命名可以发现,就是比较并交换的意思,典型的 CAS 无锁机制。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
我们可以观察到,这里其实调用的是 Unsafe 类的 compareAndSwapInt 方法,传入的 expect 为 0,update 为 1;意思是如果当前值为 0,那我就把值最终更新为 1。
Unsafe
这个类下面,发现好多方法都是用native
这个关键词进行修饰的(也包括 compareAndSwapInt 方法),用native
关键词修饰的方法,表示原生的方法;原生方法的实现并不是 Java 语言,最终实现是 C/C++;这并不是本文的讨论范围。
回到 AQS 的 compareAndSetState 方法,返回值是 boolean 类型,true 表示值更新为 1 成功,false 表示不成功。这里出现两个分支,成功,走 setExclusiveOwnerThread 方法;不成功,走 acquire 方法。咱优先讨论 acquire 方法。
2.3、AQS 的 acquire 方法
先来看一下该方法的源码;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
这里的核心是两个方法,tryAcquire 方法和 acquireQueued 方法。首先会调用 tryAcquire()方法,看方法命名是尝试获取;实际上这个方法确实在就在做一件事“尝试获取资源”。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制代码
不过 AQS 中的这个方法是protected
修饰,并没有去实现,仅仅只是预留了方法入口,后期需要由其子类去实现;这里的子类就是上文中的NonfairSync
类,该类的源码在上文中已经贴出。这段源码其实运用了我们常见的一个设计模式,“模板方法模式”。
2.4、NonfairSync 的 tryAcquire 方法
NonfairSync 的 tryAcquire 方法源码如下
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
复制代码
这里并没有直接去实现 tryAcquire 方法,而是调用了Sync
类下的 nonfairTryAcquire 方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
这里有个 getState 方法,最终返回的是 AQS 中的 state 字段,这个字段就是多个线程抢占的共享资源,所以这个字段很重要;volatile
关键字修饰,保证内存的可见性,int
类型,对于 ReentrantLock 锁而言,当 state=0 时,表示无锁,当 state>0 时,表示资源已被线程锁定。
下面分析下这段代码:
如果 state=0 表示无锁,通过 cas 去更新 state 的值,这里更新为 1。
将持有锁的线程更新为当前线程。
如果上述 cas 未更新成功,或者 state!=0,表示已上锁。
继续判断下持有锁的线程如果是当前线程,state 字段做叠加,这里表示 ReentrantLock 的含义,表示可重入锁。
最后,state!=0,持有锁的线程也不是当前线程,表示不能对资源加锁,返回 false。
tryAcquire 方法的判断至此结束,不过最终的走向需要看它的返回值;返回 true,表示当前线程抢占到锁,或者当前线程就是抢占锁的线程,直接重入,加锁流程结束;返回 false,表示没有抢占到锁,流程继续,这里就引出下个话题,CLH 线程等待队列。
2.5、AQS 的 addWaiter 方法
2.5.1、CLH 队列
首先咱来看一段源码中的注释
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks
大致意思是:CLH 队列是由 Craig、Landin、Hagersten 这三位老哥名字的首字母叠加在一起命名的,它是一个等待队列,它是一个变种队列,用到了自旋。
这里的信息要抓住三点:等待队列、变种队列、自旋。
2.5.2、Node 类
在解析 addWaiter 方法实现之前,就不得不提到一个内部类Node
;addWaiter 方法的入参是这个类型,所以先来看看这个类。源码如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
复制代码
这里先大致介绍下,每个属性的意思:
SHARED:类型就是 Node,表示共享模式。
EXCLUSIVE:类型也是 Node,表示独占模式,这里的 ReentrantLock 就是独占模式。
waitStatus:int 类型,当前 Node 节点下,存储的线程状态。
CANCELLED:int 类型,等于 1,waitStatus 属性的值之一,表示节点被取消状态。
SIGNAL:int 类型,等于-1,waitStatus 属性的值之一,表示当前节点需要去唤醒下一个节点。
CONDITION:int 类型,等于-2,waitStatus 属性的值之一,表示节点处于等待状态。
PROPAGATE:int 类型,等于-2,waitStatus 属性的值之一,表示下一个被获取的对象应该要无条件传播,该值仅在共享模式下使用。
prev:Node 类型,指向队列中当前节点的前一个节点。
next:Node 类型,指向队列中当前节点的下一个节点。
thread:存储当前线程信息。
nextWaiter:用来存储节点的指针,不过会出现两种情况;等待队列中,会将该属性的值设置成 SHARED 或者 EXCLUSIVE,用来区分当前节点处于共享模式还是独享模式;条件队列中,用于存放下一个节点的指针,所以当是条件队列的情况下,这个队列是单向队列。
isShared():返回是否属于共享模式,true 表示共享模式,false 表示独享模式。
predecessor():获取当前节点的前一个节点。
另外,Node 类还有两个有参构造器:从作者的注释就能看出来,第一个构造器是在等待队列的时,创建节点使用,第二个构造器是在条件队列时,创建节点使用。
2.5.3、方法解析
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
其实这段方法是在创建 Node 对象,Node 对象就是组成 CLH 队列的基础元素。
创建一个 Node 对象,mode 参数由上述的 acquire()方法传递而来,可以看到传入Node.EXCLUSIVE
,表示独占模式。
判断队尾有指向节点,刚创建的节点放入队列的队尾,并且通过 cas 将队尾指针改成当前创建节点,最后返回当前创建节点。
如果队尾没有指向节点,调用 enq 方法,做队列的初始化操作。
这里出现了第一个自旋,enq 方法是无限循环的,就像作者注释的一样,Must initialize,必须初始化。
这里先是重新 new 了一个新的 node(也可以叫空节点),标记它为队列头。
随后再将 addWaiter 方法中创建的 node,加入到队列尾。
总结下 addWaiter 方法干的事情:
创建一个节点,存储当前线程,并标记独占模式。
判断队列是否为空,不为空,通过 cas 将存储当前线程的 node 节点加入到对尾,并且对该节点做对尾标记。
队列为空,通过自旋,做初始化操作。
初始化过后的队列,队列头是一个空节点,队列尾是存储当前线程的节点。
2.6、AQS 的 acquireQueued 方法
还是先来看下这个方法的源码;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
复制代码
从这个方法看到,又是运用了无限循环,需要分两个步骤去观察:1.当前方法中的判断,自己的上一个节点是否是头部节点(头部节点就是占用资源的节点);2.当前节点正式入队列,并且被挂起。
2.6.1、acquireQueued 方法中的判断
当前节点的前一个节点是队列头部,意味着当前节点的前一个节点,就是持有资源的节点;当资源被释放,当前节点会去尝试争夺锁资源;如果拿到锁资源,当前节点会被标记为队列头部节点,它的上个节点(老的头部节点)会被置为 null,需要被 GC 及时清除,所以作者在这里添加了一个注释:help GC;下图就是描述了这个流程:
2.6.2、shouldParkAfterFailedAcquire 方法实现
如果当前节点的上一个节点,并不是头部节点;这里就需要用到上述 Node 类中介绍的各种状态字段了;先来重点介绍下Node
类中的两个状态属性:
进入的 shouldParkAfterFailedAcquire 这个方法内部,该方法接受两个参数:当前节点前一个节点和当前节点。首先,获取上一个节点的 waitStatus 属性,然后通过这个属性做如下判断:
如果状态是 SIGNAL(即等于-1),直接返回 true,后续就会交给 parkAndCheckInterrupt 方法去将当前线程挂起。
如果不是 SIGNAL,对于当前 ReentrantLock 而言,ws>0 的操作是满足的,所以下面的步骤就是当前节点一直往前寻找,跳过已被标记状态为 CANCELLED 的节点,直到找到状态是 SIGNAL 的节点,将该节点作为当前节点的上一个节点。也印证了 SIGNAL 状态的解释:**当前节点的上一个节点是 SIGNAL,那么当前节点需要挂起,等待被唤醒。**最后进入下个循环,直到上个节点状态是 SIGNAL,执行上面的第一步,返回 true。
这里可以想象成一个排队去食堂打饭的场景,你在低头玩手机前,跟你前面的同学说,我玩会手机,快到了叫我一下;结果你前面的同学嫌队伍长走了(CANCELLED 状态),所以你只能继续找他的上一个同学;直到有同学回答你,好的(该同学被标记 SIGNAL 状态);然后你就低头玩手机,等待回答你“好的”的那个同学叫你。
最后 compareAndSetWaitStatus 方法其实不用看也知道,通过 cas 机制,将当前节点的上一个节点的 waitStatus 修改成 SIGNAL 状态,这样的话,当前节点才能被挂起,等待唤醒。
再来看下 parkAndCheckInterrupt 这个方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// LockSupport#park
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
复制代码
其中最终又是这个Unsafe
类,通过它的原生方法 park,去挂起当前线程,这里就不展开赘述了。
2.7、资源上锁总结
下面整理下从 lock 方法作为切入点,一系列的调用:
2.8、ReentrantLock 的 unlock 方法
之前一直在讲资源“上锁”,那么这个方法就是给资源解锁。这里给出重要的部分源码
// AQS中
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// AQS中
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// ReentrantLock中
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
2.9、ReentrantLock 的 tryRelease 方法
在调用 unlock 方法去解锁后,最终是调用 AQS 中的 release 方法去实现这个解锁功能的;在该方法中,首先会调用 ReentrantLock 中的 tryRelease 方法,去做 state 状态值的递减操作。
首先,获取 state 值(在 AQS 中有这个公共属性,上文提到过),这里是对当前 state 值减去 1。
再判断当前解锁的线程与持有锁的线程是不是同一个,不是的话,直接抛异常。所以 t 线程占用锁,只有 t 线程才能解锁,解铃还须系铃人。
最后判断做完递减的值是不是等于 0,如果为 0,将持有锁的线程清空,更新 state 字段为递减值(这里是 0),最后返回 true,代表锁已经被释放了。
如果不是 0,更新 state 字段为递减值(不是 0),也不会清空持有锁的线程,意味着资源还是被线程加锁中,最后返回 false。
2.10、AQS 的 release 方法
在 tryRelease 方法返回 false 的时候,release 方法并不会做任何操作,直接就结束了,意味着解锁并没有完成;但是在返回 true 的时候,具体分以下几部操作:
拿到 CLH 队列被标记头部的节点。
判断不是空(队列不能是空的),并且头部节点的等待状态不是 0,在这种情况下,它只能是-1(SIGNAL),所以是需要去唤醒下个节点的。
最后,调用 AQS 中的 unparkSuccessor 方法,去唤醒线程。
2.11、AQS 的 unparkSuccessor 方法
上面说到了,这个方法主要是用来唤醒线程的,下面还是做一下具体的解析:
该方法传参是一个 Node 节点,这里传入的是被标记队列头的节点(头部节点是持有锁资源的节点)。
拿到头部节点的 waitStatus 状态属性,并且判断小于 0 的情况下(该情况是 waitStatus=-1),通过 cas 机制将头部节点的状态改为 0,初始化状态。
拿到头部节点的下个节点,也就是真正意义上处于等待中的第一个节点。
它还是先判断了这个拿到的节点是否为 null,或者状态大于 0(亦或说判断状态等于 1);如果条件成立,说明头节点的下个节点是空,或者下个节点被取消了。
如果第四个判断条件满足,从队尾一直从后往前找,找到离头节点最近的那个节点。
通过Unsafe
类的 unpark 原生方法去唤醒上述找到的,距离头部节点最近的未处于取消状态下的节点。
2.12、资源解锁总结
通过上面的描述可以发现,资源解锁是相对简单的;它只能被上锁的线程去解锁;通过递减 AQS 内部维护的 state 属性值,直到 state 减为 0,表示资源已被解锁;当资源被解锁后,需要通过Unsafe
的 unpark 方法,去唤醒 CLH 队列中,被挂起的第一个节点上的线程。
2.13、公平锁与非公平锁的差异
在 2.2 中说过,当我们使用无参构造器创建一把“锁”的时候,默认是使用 NonfairSync 这个内部类,也就是非公平锁;但是在源码中发现ReentrantLock
还存在一个有参构造器,参数是一个boolean
类型;
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
很明显,这种方式就是将选择权交给开发人员,当我们传入 true 时,就会创建一把“公平锁”。还是一样,先来看下公平锁的内部;
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
从源码的角度,咱来看下,为什么一个叫“非公平锁”,另一个叫“公平锁”?
其实不难发现,NonfairSync
内部的 lock 方法,它是一上来就通过 cas 机制去抢占 state 公共资源,抢不到才去执行 acquire 方法实现后续入队列等一系列的操作;而这里FairSync
的 lock 方法,它是直接执行 acquire 方法,执行后续的操作。等于非公平锁,会去多争取一次资源,对于在 CLH 队列中等待的线程,是“不公平”的。
除了 lock 方法存在差异之外,在 tryAcquire 方法中,也存在着不同。FairSync
类中,会多执行 hasQueuedPredecessors 方法,它是 AQS 下的一个公用方法,下面具体看下这个方法;
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码
只有简短的几行,却有很多种可能性,但是整个方法主要功能就是判断当前线程是否需要入队列:返回 false,队列为空,不对等待;返回 true,队列不是空,去排队等待。下面需要重点讲下这一行代码:return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
2.13.1、hasQueuedPredecessors 返回 false
返回 false,情况也有两种:1、h != t** **是 false,2、h != t 是 true,并且 (s = h.next) == null 是 false, s.thread != Thread.currentThread()是 false。
第一种情况比较简单,意思是头结点和尾节点是同一个,说明队列是空的,不需要排队等待,所以直接返回 false。
第二种情况,头尾不是同一个节点,头部节点的下个节点也不是空,并且头部节点的下一个节点就是当前线程。其实就可以理解为,前面的资源刚释放,正好轮到当前线程来抢占资源,这种情况相对较少。
2.13.2、hasQueuedPredecessors 返回 true
返回 true,有两种情况:1、h != t 是 true,并且 (s = h.next) == null 是 true。2、h != t 是 true,并且 (s = h.next) == null 是 false, s.thread != Thread.currentThread()是 true。
1、这里的头尾不是同一个节点是必要满足的条件,保证了队列起码不是空的。然后(s = h.next) == null 满足是 true,这里解释起来就必须回顾下 enq 初始化队列这个方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
从这个方法可知,先是将节点的 prev 指向前一个节点,然后再通过 cas 修改尾部标识,最后再将前一个节点的 next 指向当前节点;因此 AQS,入队操作是非原子性的。
继续回到判断本身,头部节点拿到锁在执行;中间节点没拿到锁在入队;此时头部节点执行完后释放锁,当前节点尝试不入队拿锁,但是中间线程已经在排队了,但是还没来得及执行 t.next = node 的操作,导致(s = h.next) == null 满足,所以当前节点必须入队,最终返回 true。
2、满足 s.thread != Thread.currentThread()的情况,执行到这里,可以明确队列首先不是空,并且 h.next != null,也就是头节点之后还有其他节点,最后再判断了下,s.thread != Thread.currentThread 为 true,也就是头节点的下个节点并不是当前节点,既然如此,那只能乖乖去队列中排队了,所以最终返回 true。
三、业务运用
想必大家对于并发锁并不陌生了,上文我也是通过ReentrantLock
这个并发锁为入口,一步步来解析 AQS 中的实现。所以这里就不用 ReentrantLock 举例,这里换一个同步工具:CountDownLatch
,它也是基于 AQS 来实现的。
CountDownLatch
是通过一个计数器来实现的,初始值为线程的数量。每当一个线程完成了自己的任务,计数器的值就相应得减 1。当计数器到达 0 时,表示所有的线程都已执行完毕,然后在等待的线程就可以恢复执行任务。
这个其实跟ReentrantLock
思路差不多,一个是 state 初始值就是 0,通过“上锁”一步步叠加这个值;一个是 state 让使用者自己设定初始值,通过线程调用,一步步递减这个值。
CountDownLatch
具体的运用情况如下:1、一个主线程中,需要开启多个子线程,并且要在多个子线程执行完毕后,主线程才能继续往下执行。2、通过多个线程一起执行,提高执行的效率。
下面,通过一个真实的业务场景,来进一步了解下CountDownLatch
这个同步工具,具体是怎么使用的。
现在有这么一个接口,查询用户的详情信息;用户信息由五部分组成:1、用户基本信息;2、用户影像信息;3、用户工商信息;4、用户账户信息;5、用户组织架构信息;按照原本的逻辑是按照顺序 1-5 这样一步步查询,最后组装用户 VO 对象,接口返回。但是这里可以用上CountDownLatch
这个工具类,申请五个线程,分别去查询这五种信息,提高接口效率。
/**
* @author 往事如风
* @version 1.0
* @date 2023/4/11 18:10
* @description:导出报表
*/
@RestController
public class QueryController {
@GetMapping("/query")
public Result download() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 模拟查询数据
List<String> row1 = CollUtil.newArrayList("aa", "bb", "cc", "dd");
List<String> row2 = CollUtil.newArrayList("aa1", "bb1", "cc1", "dd1");
List<String> row3 = CollUtil.newArrayList("aa2", "bb2", "cc2", "dd2");
List<String> row4 = CollUtil.newArrayList("aa3", "bb3", "cc3", "dd3");
List<String> row5 = CollUtil.newArrayList("aa4", "bb4", "cc4", "dd4");
CountDownLatch count = new CountDownLatch(5);
DataQuery d = new DataQuery();
// 开始时间
long start = System.currentTimeMillis();
System.out.println("开始查询数据。。。。");
executorService.execute(() -> {
System.out.println("查询用户基本信息。。。。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
d.setBaseInfo(row1);
count.countDown();
});
executorService.execute(() -> {
System.out.println("查询用户影像信息。。。。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
d.setImgInfo(row2);
count.countDown();
});
executorService.execute(() -> {
System.out.println("查询用户工商信息。。。。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
d.setBusinessInfo(row3);
count.countDown();
});
executorService.execute(() -> {
System.out.println("查询用户账户信息。。。。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
d.setAccountInfo(row4);
count.countDown();
});
executorService.execute(() -> {
System.out.println("查询用户组织架构信息。。。。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
d.setOrgInfo(row5);
count.countDown();
});
// 阻塞:直到count的值减为0
count.await();
executorService.shutdown();
// 结束时间
long end = System.currentTimeMillis();
System.out.println("查询结束。。。。。");
System.out.println("用时时间:" + (end - start));
return Result.success(d);
}
@Data
class DataQuery {
private List<String> baseInfo;
private List<String> imgInfo;
private List<String> businessInfo;
private List<String> accountInfo;
private List<String> orgInfo;
}
}
/*
控制台输出:
开始查询数据。。。。
查询用户基本信息。。。。。。
查询用户影像信息。。。。。。
查询用户工商信息。。。。。。
查询用户账户信息。。。。。。
查询用户组织架构信息。。。。。。
查询结束。。。。。
用时时间:1017
*/
复制代码
这段代码做了模拟查询各种用户信息的操作,其中每个线程都暂停 1 秒,代表在查询这五种数据;最终打印的用时时间是 1017ms,说明这五个线程是同时进行的,大大提高了接口的效率。
四、写在最后
AQS 提供了一个 FIFO 队列,这里称为 CLH 队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock
、CountDownLatch
、Semaphore
等。
AQS 是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
可以这么说,只要搞懂了 AQS,那么 J.U.C 中绝大部分的 api 都能轻松掌握。
本文主要提供了从ReentrantLock
出发,解析了 AQS 中的各种公用的方法,如果需要知道其他类中怎么去使用 AQS 中的方法,其实也只需要找到切入点,一步步调试下去即可,不过,我想很多地方都是和ReentrantLock
中一致的。
五、参考源码
编程文档:
https://gitee.com/cicadasmile/butte-java-note
应用仓库:
https://gitee.com/cicadasmile/butte-flyer-parent
复制代码
评论