写点什么

Java 并发编程——AQS 源码学习

  • 2022 年 5 月 06 日
  • 本文字数:4243 字

    阅读完需:约 14 分钟

this.thread = thread;}


Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}


Node 内部类中,声明了 prenext 节点用于队列的连接,同时保存了 waitStatus 状态。

2.2 AQS 类解析

在面向对象的世界中,想要了解一个类有什么特点,就要看它的属性。通过查看源码,我们看到 AQS 类包含:


/**


  • Head of the wait queue, lazily initialized. Except for

  • initialization, it is modified only via method setHead. Note:

  • If head exists, its waitStatus is guaranteed not to be

  • CANCELLED.*/private transient volatile Node head;


/**


  • Tail of the wait queue, lazily initialized. Modified only via

  • method enq to add new wait node.*/private transient volatile Node tail;


/**


  • The synchronization state.*/private volatile int state;

  • 前驱头节点 - head

  • 后驱尾节点 - tail

  • 同步器状态 - state


AQS 基于 FIFO 队列,接下来就依照 acquire-releaseacquireShared-releaseShared 的次序来分析入队和出队。

2.2.1. acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源成功,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是 lock() 的语义,当然不仅仅只限于 lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是 acquire 源码:


public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}


acquire 方法中调用了 tryAcquire()acquireQueued()addWaiter() 三个方法。


首先通过tryAcquire方法尝试申请独占锁。如果获取成功则返回 true,否则返回 false


protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}


源码中直接 throw 一个异常。结合我们前面自定义锁的知识,AQS 只是一个框架,具体资源获取和释放方式交由自定义同步器实现。AQS 这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过 stateget/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。


这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。如果都定义成 abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea 还是站在咱们开发者的角度,尽量减少不必要的工作量。

1. addWaiter()

接着就是 addWaiter() 方法用于将当前线程添加到等待队列的队尾,并返回当前线程所在的节点


private Node addWaiter(Node mode) {//以给定的 Node 节点模式构建当前线程的 Node 节点,在 acquire 方法中传入的是 EXCLUSIVE 独占式节点 Node node = new Node(Thread.currentThread(), mode);// 将尾节点进行保存 Node pred = tail;if (pred != null) {//如果尾节点不为 null//将尾节点设置尾新节点的 prev 节点 node.prev = pred;if (compareAndSetTail(pred, node)) {//通过 CAS 保证,确保节点能够被线程安全的添加//将当前界定指向前驱的 next 节点 pred.next = node;return node;}}//如果尾节点为 null,则通过 enq 进行入队 enq(node);return node;}


//同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过 CAS 将节点设置成为尾节点之后,//当前线程才能从该方法中返回,否则当前线程不断的尝试设置。private Node enq(final Node node) {//CAS"自旋",直到成功加入队尾 for (;;) {//尾节点临时存储 Node t = tail;if (t == null) { // Must initialize//如果 tail 为 null,则将 if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}


addWaiter(Node node) 方法中,将当前线程节点添加到等待队列中。


2. acquireQueued()

acquireQueued 在队列中的线程获取锁


/**


  • Acquires in exclusive uninterruptible mode for thread already in

  • queue. Used by condition wait methods as well as acquire.

  • @param node the node

  • @param arg the acquire argument

  • @return {@code true} if interrupted while waiting

  • acquireQueued 方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态(锁)( p == head && tryAcquire(arg))


*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//死循环检查(自旋检查)当前节点的前驱节点是否为头结点,才能获取锁 for (;;) {// 获取节点的前驱节点 final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {//节点中的线程循环的检查,自己的前驱节点是否为头节点//将当前节点设置为头结点,移除之前的头节点 setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起 if (shouldParkAfterFailedAcquire(p, node) &&//如果需要挂起,借助 JUC 包下面的 LockSupport 类的静态方法 park 挂起当前线程,直到被唤醒 parkAndCheckInterrupt())interrupted = true;}} finally {//如果有异常 if (failed)//取消请求,将当前节点从队列中移除 cancelAcquire(node);}}


通过 addWaiter 方法添加到等待队列中后,在通过 acquireQueued 方法进行锁的获取。


大体流程如下:


  • tryAcquire() 尝试直接去获取资源,如果成功则直接返回;

  • addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;

  • acquireQueued() 使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false

  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。

3. 独占式锁获取流程

调用同步器的 acquire(int arg) 方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。获取流程:


  1. 当前线程通过 tryAcquire() 方法尝试获取锁,成功则直接返回,失败则进入队列排队等待,通过 CAS 获取同步状态。

  2. 如果尝试获取锁失败的话,构造同步节点(独占式的 Node.EXCLUSIVE),通过 addWaiter(Node node,int args) 方法,将节点加入到同步队列的队列尾部。

  3. 最后调用 acquireQueued(final Node node, int args) 方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued 方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。


2.2.2 release(int) 独占锁的释放

AQS 中通过 release 方法进行锁的释放。


public final boolean release(int arg) {//调用 tryRelease 方法释放 if (tryRelease(arg)) {//如果释放成功 Node h = head;//如果头节点不为 null,并且头结点的 waitStatus 值不为 0,即有状态 if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}//没有释放成功返回 falsereturn false;}


// tryRelease() 尝试释放当前线程的同步状态(锁)protected final boolean tryRelease(int releases) {//c 为释放后的同步状态 int c = getState() - releases;//判断当前释放锁的线程是否为获取到锁(同步状态)的线程,不是抛出异常(非法监视器状态异常)if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;//如果锁(同步状态)已经被当前线程彻底释放,则设置锁的持有者为 null,同步状态(锁)变的可获取 if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}


private void unparkSuccessor(Node node) {/*


  • If status is negative (i.e., possibly needing signal) try

  • to clear in anticipation of signalling. It is OK if this

  • fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);


/*


  • Thread to unpark is held in successor, which is normally

  • just the next node. But if cancelled or apparently null,

  • traverse backwards from tail to find the actual

  • non-cancelled successor.*/Node s = node.next;//从队列尾部开始往前去找最前面的一个 waitStatus 小于 0 的节点。if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//唤醒后继节点对应的线程 if (s != null)LockSupport.unpark(s.thread);}


release() 是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。

2.2.3 acquireShared(int)

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是 acquireShared() 的源码:


public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShar 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 ed(arg);}


private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);//加入队列尾部 boolean failed = true;//是否成功标志 try {boolean interrupted = false;//等待过程中是否被中断过的标志 for (;;) {final Node p = node.predecessor();//前驱 if (p == head) {//如果到 head 的下一个,因为 head 是拿到资源的线程,此时 node 被唤醒,很可能是 head 用完资源来唤醒自己的 int r = tryAcquireShared(arg);//尝试获取资源 if (r >= 0) {//成功 setHeadAndPropagate(node, r);//将 head 指向自己,还有剩余资源可以再唤醒之后的线程 p.next = null; // help GCif (interrupted)//如果等待过程中被打断过,此时将中断补上。selfInterrupt();failed = false;return;}}


//判断状态,寻找安全点,进入 waiting 状态,等着被 unpark()或 interrupt()if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Java 并发编程——AQS 源码学习_Java_爱好编程进阶_InfoQ写作社区