写点什么

AQS 源码分析看这一篇就够了

  • 2021 年 11 月 10 日
  • 本文字数:4890 字

    阅读完需:约 16 分钟

private static Lock lock = new ReentrantLock();


// 操作共享变量的方法


public static void incr(){


// 为了演示效果 休眠一下子


try {


lock.lock();


Thread.sleep(1);


count ++;


// 调用了另外一个方法。


decr();


} catch (InterruptedException e) {


e.printStackTrace();


}finally {


lock.unlock();


}


}


public static void decr(){


try {


// 重入锁


lock.lock();


count--;


}catch(Exception e){


}finally {


lock.unlock();


}


}


public static void main(String[] args) throws InterruptedException {


for (int i = 0; i < 1000 ; i++) {


new Thread(()->AtomicDemo.incr()).start();


}


Thread.sleep(4000);


System.out.println("result:" + count);


}


}


首先大家考虑这段代码会死锁吗? 大家给我个回复,我看看大家的理解的怎么样


好了,有说会死锁的,有说不会,其实这儿是不会死锁的,而且结果就是 0.为什么呢?


这个其实是锁的一个嵌套,因为这两把锁都是同一个 线程对象,我们讲共享变量的设计是


当 state=0;线程可以抢占到资源 state =1; 如果进去嵌套访问 共享资源,这时 state = 2 如果有多个嵌套 state 会一直累加,释放资源的时候, state–,直到所有重入的锁都释放掉 state=0,那么其他线程才能继续抢占资源,说白了重入锁的设计目的就是为了防止 死锁


AQS 类图


====================================================================



通过类图我们可以发现右车的业务应用其实内在都有相识的设计,这里我们只需要搞清楚其中的一个,其他的你自己应该就可以看懂~,好了我们就具体结合前面的案例代码,以 ReentrantLock 为例来介绍 AQS 的代码实现。


源码分析


===================================================================


在看源码之前先回顾下这个图,带着问题去看,会更轻松



Lock.lock()




final void lock() {


if (compareAndSetState(0, 1))


setExclusiveOwnerThread(Thread.currentThread());


else


acquire(1);


}


这个方法逻辑比较简单,if 条件成立说明 抢占锁成功并设置 当前线程为独占锁


else 表示抢占失败,acquire(1) 方法我们后面具体介绍


compareAndSetState(0, 1):用到了 CAS 是一个原子操作方法,底层是 UnSafe.作用就是设置 共享操作的 state 由 0 到 1. 如果 state 的值是 0 就修改为 1


setExclusiveOwnerThread:代码很简单,进去看一眼即可

acquire 方法

public final void acquire(int arg) {


if (!tryAcquire(arg) &&


acquireQueued(addWaiter(Node.EXCLUSIVE), arg))


selfInterrupt();


}


  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而 CLH 队列中可能还有别的线程在等待);

  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。


当然这里代码的作用我是提前研究过的,对于大家肯定不是很清楚,我们继续里面去看,最后大家可以回到这儿再论证。

tryAcquire(int)

再次尝试抢占锁


protected final boolean tryAcquire(int acquires) {


return nonfairTryAcquire(acquires);


}


final boolean nonfairTryAcquire(int acquires) {


final Thread current = Thread.currentThread();


int c = getState();


//再次尝试抢占锁


if (c == 0) {


if (compareAndSetState(0, acquires)) {


setExclusiveOwnerThread(current);


return true;


}


}


// 重入锁的情况


else if (current == getExclusiveOwnerThread()) {


int nextc = c + acquires;


if (nextc < 0) // overflow


throw new Error("Maximum lock count exceeded");


setState(nextc);


return true;


}


// false 表示抢占失败


return false;


}

addWaiter

将阻塞的线程添加到双向链表的结尾


private Node addWaiter(Node mode) {


//以给定模式构造结点。mode 有两种:EXCLUSIVE(独占)和 SHARED(共享)


Node node = new Node(Thread.currentThread(), mode);


//尝试快速方式直接放到队尾。


Node pred = tail;


if (pred != null) {


node.prev = pred;


if (compareAndSetTail(pred, node)) {


pred.next = node;


return node;


}


}


//上一步失败则通过 enq 入队。


enq(node);


return node;


}

enq(Node)

private Node enq(final Node node) {


//CAS"自旋",直到成功加入队尾


for (;;) {


Node t = tail;


if (t == null) { // 队列为空,创建一个空的标志结点作为 head 结点,并将 tail 也指向它。


if (compareAndSetHead(new Node()))


tail = head;


} else {//正常流程,放入队尾


node.prev = t;


if (compareAndSetTail(t, node)) {


t.next = node;


return t;


}


}


}


}


第一个 if 语句



else 语句



线程 3 进来会执行如下代码



那么效果图


acquireQueued(Node, int)

OK,通过 tryAcquire()和 addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:


final boolean acquireQueued(final Node node, int arg) {


boolean failed = true;//标记是否成功拿到资源


try {


boolean interrupted = false;//标记等待过程中是否被中断过


//又是一个“自旋”!


for (;;) {


final Node p = node.predecessor();//拿到前驱


//如果前驱是 head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被 interrupt 了)。


if (p == head && tryAcquire(arg)) {


setHead(node);//拿到资源后,将 head 指向该结点。所以 head 所指的标杆结点,就是当前获取到资源的那个结点或 null。


p.next = null; // setHead 中 node.prev 已置为 null,此处再将 head.next 置为 null,就是为了方便 GC 回收以前的 head 结点。也就意味着之前拿完资源的结点出队了!


failed = false; // 成功获取资源


return interrupted;//返回等待过程中是否被中断过


}


//如果自己可以休息了,就通过 park()进入 waiting 状态,直到被 unpark()。如果不可中断的情况下被中断了,那么会从 park()中醒过来,发现拿不到资源,从而继续进入 park()等待。


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将 interrupted 标记为 true


}


} finally {


if (failed) // 如果等待过程中没有成功获取资源(如 timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。


cancelAcquire(node);


}


}


到这里了,我们先不急着总结 acquireQueued()的函数流程,先看看 shouldParkAfterFailedAcquire()和 parkAndCheckInterrupt()具体干些什么。

shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {


int ws = pred.waitStatus;//拿到前驱的状态


if (ws == Node.SIGNAL)


//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了


return true;


if (ws > 0) {


/*


  • 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。

  • 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC 回收)!


*/


do {


node.prev = pred = pred.prev;


} while (pred.waitStatus > 0);


pred.next = node;


} else {


//如果前驱正常,那就把前驱的状态设置成 SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!


compareAndSetWaitStatus(pred, ws, Node.SIGNAL);


}


return false;


}


整个流程中,如果前驱结点的状态不是 SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。


private final boolean parkAndCheckInterrupt() {


LockSupport.park(this);//调用 park()使线程进入 waiting 状态


return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。


}


好了,我们可以小结下了。


看了 shouldParkAfterFailedAcquire()和 parkAndCheckInterrupt(),现在让我们再回到 acquireQueued(),总结下该函数的具体流程:


  1. 结点进入队尾后,检查状态,找到安全休息点;

  2. 调用 park()进入 waiting 状态,等待 unpark()或 interrupt()唤醒自己;

  3. 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head 指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程 1。


最后我们再回到前面的 acquire 方法来总结下


public final void acquire(int arg) {


if (!tryAcquire(arg) &&


acquireQueued(addWaiter(Node.EXCLUSIVE), arg))


selfInterrupt();


}


总结下它的流程吧


  1. 调用自定义同步器的 tryAcquire()尝试直接去获取资源,如果成功则直接返回;

  2. 没成功,则 addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。

  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。



Lock.unlock()




好了,lock 方法看完后,我们再来看下 unlock 方法

release(int)

它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock()的语义,当然不仅仅只限于 unlock()


public final boolean release(int arg) {


if (tryRelease(arg)) {


Node h = head;//找到头结点


if (h != null && h.waitStatus != 0)


unparkSuccessor(h);//唤醒等待队列里的下一个线程


return true;


}


return false;


}

tryRelease(int)

此方法尝试去释放指定量的资源。下面是 tryRelease()的源码:


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


public final boolean release(int arg) {


if (tryRelease(arg)) {//这里是先尝试释放一下资源,一般都可以释放成功,除了多次重入但只释放一次的情况。


Node h = head;


//这里判断的是 阻塞队列是否还存在和 head 节点是否是 tail 节点,因为之前说过,队列的尾节点的 waitStatus 是为 0 的


if (h != null && h.waitStatus != 0)


//到这里就说明 head 节点已经释放成功啦,就先去叫醒后面的直接节点去抢资源吧


unparkSuccessor(h);


return true;


}


return false;


}


private void unparkSuccessor(Node node) {


//这里,node 一般为当前线程所在的结点。


int ws = node.waitStatus;


if (ws < 0)//置零当前线程所在的结点状态,允许失败。


compareAndSetWaitStatus(node, ws, 0);


Node s = node.next;//找到下一个需要唤醒的结点 s


if (s == null || s.waitStatus > 0) {//如果为空或已取消


s = null;


for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。


if (t.waitStatus <= 0)//从这里可以看出,<=0 的结点,都是还有效的结点。


s = t;


}


if (s != null)


LockSupport.unpark(s.thread);//唤醒


}


这个函数并不复杂。一句话概括:用 unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用 s 来表示吧。此时,再和 acquireQueued()联系起来,s 被唤醒后,进入 if (p == head && tryAcquire(arg))的判断(即使 p!=head 也没关系,它会再进入 shouldParkAfterFailedAcquire()寻找一个安全点。这里既然 s 已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整,s 也必然会跑到 head 的 next 结点,下一次自旋 p==head 就成立啦),然后 s 把自己设置成 head 标杆结点,表示自己已经获取到资源了,acquire()也返回了


好了,到这我们就因为把源码看完了,再回头来看下这张图

评论

发布
暂无评论
AQS源码分析看这一篇就够了