AQS 源码分析看这一篇就够了
private static Lock lock = new ReentrantLock();
// 操作共享变量的方法
public static void incr(){
// 为了演示效果 休眠一下子
try {
lock.lock();
Thread.sleep(1);
count ++;
// 调用了另外一个方法。
decr();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void decr(){
try {
// 重入锁
lock.lock();
count--;
}catch(Exception e){
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
}
Thread.sleep(4000);
System.out.println("result:" + count);
}
}
首先大家考虑这段代码会死锁吗? 大家给我个回复,我看看大家的理解的怎么样
好了,有说会死锁的,有说不会,其实这儿是不会死锁的,而且结果就是 0.为什么呢?
这个其实是锁的一个嵌套,因为这两把锁都是同一个 线程对象,我们讲共享变量的设计是
当 state=0;线程可以抢占到资源 state =1; 如果进去嵌套访问 共享资源,这时 state = 2 如果有多个嵌套 state 会一直累加,释放资源的时候, state–,直到所有重入的锁都释放掉 state=0,那么其他线程才能继续抢占资源,说白了重入锁的设计目的就是为了防止 死锁
!
====================================================================
通过类图我们可以发现右车的业务应用其实内在都有相识的设计,这里我们只需要搞清楚其中的一个,其他的你自己应该就可以看懂~,好了我们就具体结合前面的案例代码,以 ReentrantLock 为例来介绍 AQS 的代码实现。
===================================================================
在看源码之前先回顾下这个图,带着问题去看,会更轻松
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
这个方法逻辑比较简单,if 条件成立说明 抢占锁成功并设置 当前线程为独占锁
else 表示抢占失败,acquire(1) 方法我们后面具体介绍
compareAndSetState(0, 1):用到了 CAS 是一个原子操作方法,底层是 UnSafe.作用就是设置 共享操作的 state 由 0 到 1. 如果 state 的值是 0 就修改为 1
setExclusiveOwnerThread:代码很简单,进去看一眼即可
acquire 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而 CLH 队列中可能还有别的线程在等待);
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。
当然这里代码的作用我是提前研究过的,对于大家肯定不是很清楚,我们继续里面去看,最后大家可以回到这儿再论证。
tryAcquire(int)
再次尝试抢占锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次尝试抢占锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁的情况
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 表示抢占失败
return false;
}
addWaiter
将阻塞的线程添加到双向链表的结尾
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode 有两种:EXCLUSIVE(独占)和 SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过 enq 入队。
enq(node);
return node;
}
enq(Node)
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为 head 结点,并将 tail 也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一个 if 语句
else 语句
线程 3 进来会执行如下代码
那么效果图
acquireQueued(Node, int)
OK,通过 tryAcquire()和 addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是 head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被 interrupt 了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将 head 指向该结点。所以 head 所指的标杆结点,就是当前获取到资源的那个结点或 null。
p.next = null; // setHead 中 node.prev 已置为 null,此处再将 head.next 置为 null,就是为了方便 GC 回收以前的 head 结点。也就意味着之前拿完资源的结点出队了!
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过 park()进入 waiting 状态,直到被 unpark()。如果不可中断的情况下被中断了,那么会从 park()中醒过来,发现拿不到资源,从而继续进入 park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将 interrupted 标记为 true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如 timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
到这里了,我们先不急着总结 acquireQueued()的函数流程,先看看 shouldParkAfterFailedAcquire()和 parkAndCheckInterrupt()具体干些什么。
shouldParkAfterFailedAcquire(Node, Node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC 回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成 SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱结点的状态不是 SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用 park()使线程进入 waiting 状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
好了,我们可以小结下了。
看了 shouldParkAfterFailedAcquire()和 parkAndCheckInterrupt(),现在让我们再回到 acquireQueued(),总结下该函数的具体流程:
结点进入队尾后,检查状态,找到安全休息点;
调用 park()进入 waiting 状态,等待 unpark()或 interrupt()唤醒自己;
被唤醒后,看自己是不是有资格能拿到号。如果拿到,head 指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程 1。
最后我们再回到前面的 acquire 方法来总结下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
总结下它的流程吧
调用自定义同步器的 tryAcquire()尝试直接去获取资源,如果成功则直接返回;
没成功,则 addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。
好了,lock 方法看完后,我们再来看下 unlock 方法
release(int)
它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock()的语义,当然不仅仅只限于 unlock()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
tryRelease(int)
此方法尝试去释放指定量的资源。下面是 tryRelease()的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {//这里是先尝试释放一下资源,一般都可以释放成功,除了多次重入但只释放一次的情况。
Node h = head;
//这里判断的是 阻塞队列是否还存在和 head 节点是否是 tail 节点,因为之前说过,队列的尾节点的 waitStatus 是为 0 的
if (h != null && h.waitStatus != 0)
//到这里就说明 head 节点已经释放成功啦,就先去叫醒后面的直接节点去抢资源吧
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//这里,node 一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点 s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0 的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
这个函数并不复杂。一句话概括:用 unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用 s 来表示吧。此时,再和 acquireQueued()联系起来,s 被唤醒后,进入 if (p == head && tryAcquire(arg))的判断(即使 p!=head 也没关系,它会再进入 shouldParkAfterFailedAcquire()寻找一个安全点。这里既然 s 已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整,s 也必然会跑到 head 的 next 结点,下一次自旋 p==head 就成立啦),然后 s 把自己设置成 head 标杆结点,表示自己已经获取到资源了,acquire()也返回了
好了,到这我们就因为把源码看完了,再回头来看下这张图
评论