写点什么

Java 并发(五),大厂程序员 35 岁后的职业出路在哪

  • 2021 年 11 月 10 日
  • 本文字数:5251 字

    阅读完需:约 17 分钟


先认识到这,接下来我们回去看看 ReentrantLock


接下来,我们看看 ReentrantLock 的结构



可以看到,ReentrantLock 里面有 3 个内部类


分别为


  • FairSync:公平锁

  • NonfairSync:非公平锁

  • Sync:锁

FairSync:公平锁


可以看到 FairSync 继承了 Sync,而且自己只有一个序列化 Id 属性(当然还包含 Sync 的一切属性和方法)

lock 方法

不过最重要的,我们还是要看他的 lock 方法



可以看到,他调用 AbstractQueuedSynchronizer 里面的 acquire 方法,而且给了一个参数为 1,因为 Sync 是继承了 AbstractQueuedSynchronizer,AbstractQueuedSynchronizer 是一个抽象类,所以 FairSync 是拥有 AbstractQueuedSynchronizer 的所有属性和方法

AbstractQueuedSynchronizer 的 acquire 方法

接下来,我们看看 AbstratQueuedSynchronizer 的 acquire 方法



我们来看一下他的注释


![在这里插入图片描述](https://img-blog.csdnimg.cn/20210616224207487.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0dE


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


VVRfVHJpbQ==,size_16,color_FFFFFF,t_70#pic_center)


注释说明了,这个方法就是 lock 方法的底层,独占模式其实就是获取锁,通过调用 tryAcquire 去获取锁,如果成功获取锁就返回,获取不了就线程排队,线程排队过程中,可能会发生反复阻塞和解除阻塞


acquire 方法的步骤如下


  • 调用 tryAcquire 方法,这个方法是去获取锁

  • 调用 accquireQueued 方法,前面的一个判断是 tryAcquire 的非返回值,所以代表没有获取锁,所以接下来这个方法就是进行入线程队列,就是进入 AQS 的底层线程队列

  • 如果结果为 True,就执行 selfInterupt 方法,因为结果为 true,就代表拿不到锁,而且进入队列失败,所以需要进行打断

tryAcquire 方法


可以看到这个方法是 AbstractQueuedSynchronizer 里面的,而且并没有实现,在下面这些类里面都有实现



因为我们看的是 FairSync,所以 FairSync 调用的 tryAcquire 方法肯定是 FairSync 里面的


下面是他的底层实现



照样先看看他的注释



可以从注释中知道,这里的 tryAcquire 是公平版本的,符合 FairSync 的本质,接下来分析源代码(公平锁传进来的 acquire 为 1)


protected final boolean tryAcquire(int acquires) {


//获取当前执行该方法的线程


final Thread current = Thread.currentThread();


//获取锁的状态,前面提到过,AQS 是依靠一个 volatile 变量来实现的


//而且该变量称为 state,在 tryAcquire 这里就实现了


int c = getState();


//等于 0,代表锁还没有获取


if (c == 0) {


//调用了 hasQueuedPredecessors 与 compareAndSetState 方法


//hasQueuedPredecessors,来判断是否需要排队,因为是公平锁


//compareAndSetState 方法其实就是 CAS 去获取锁


//而且可以看到,如果 CAS 成功,那么 state 是改成 acquire,也就是 1


//所以 1 代表了锁正在被占用中


//所以 ReentrantLock 的底层是 CAS


if (!hasQueuedPredecessors() &&


compareAndSetState(0, acquires)) {


//如果不需要等待,且拿到了锁


//将锁的拥有者设为当前线程


setExclusiveOwnerThread(current);


return true;


}


}


//如果锁已经被占用


//而锁的拥有者再一次去进行获得锁


//即拥有锁的线程又去获得锁


else if (current == getExclusiveOwnerThread()) {


//计算当前状态+acquires


int nextc = c + acquires;


//如果小于 0 抛出异常


if (nextc < 0)


throw new Error("Maximum lock count exceeded");


//锁的状态变成


setState(nextc);


//继续放行,所以允许锁的拥有者再次获得锁


//但仅仅修改了底层 state 状态


return true;


}


//如果锁被占用,而且请求获得的线程不是锁的占用者


//返回 false


return false;


}


getState 方法



就是返回了底层的 state 变量,也就是 AQS 的 state


hasQueuedPredecessors 方法


这个方法是用来判断线程是否需要排队的(返回 false 不需要排队),因为是公平锁,所以这个判断是必须的,否则就不公平了,因为没有判断排队操作,那么只要一个线程释放了锁,那么后来的线程也是可以抢到的(透露一下:非公平锁的实现就是仅仅少了这个判断而已)



可以看到,这个方法是 AQS 里面的,tail 和 head 属性前面已经提到过,是底层队列的头尾结点


关键在于 return 语句


他主要有两个判断


  • 头结点是否等于尾结点

  • 头结点是否尾空,或者头结点的线程是否为当前线程


关于这个弹出,必须要先清楚,底层队列里面的结点是怎么添加的


下面就是 AQS 的插入方法 addWaiter



可以看到,如果尾结点不为空,也就代表不是第一次插入,那么直接就使用尾插法,如果是第一次插入,只是调用 enq 方法,所以 enq 方法是针对第一次插入使用的,这两个方法都是 AQS 的



可与看到里面是一个死循环,如果是第一次插入,让创建一个新结点为头结点(相当于一个哨兵),然后让新尾结点一样,这一点可以看成是初始化头尾结点,接下来下一轮循环,此时尾结点已经不为 null,接下来就让新结点指向尾结点,让新结点又称为了尾结点,再让旧的尾结点指向新的尾结点


但这里有一个疑问,如果并不是第一次入队,为什么最后还要执行多一次 enq 方法呢?


我们可以看到,如果并不是第一次入队,因为前面的 addWaiter,使用了一次 CAS,那如果 CAS 失败了怎么办?即中途有线程把尾结点改了怎么办?


此时就需要重复的操作,不断的执行 CAS 直到成功为止,所以就有了无论哪次插入都要执行 enq 方法,因为 enq 方法里面就是一个死循环的 CAS 执行,直到 CAS 执行成功,才会结束


总结一下添加结点


  • 所以头结点只是一个哨兵,是不存储线程的,而插入的方法则是尾插法

  • 其实头结点也不完全是哨兵,下面我们看线程自旋时,会发现其实头结点是当前轮到执行的线程,这里先埋下个伏笔


知道了插入方法后,我们回过头来看 hasQueuedPredecessors 的返回值,也就是如何判断是否需要排队



步骤如下


  • 判断头结点是否等于尾结点,只有在初始化的时候,头结点才会等于尾结点,或者队列为空,头尾结点都为 null,所以,如果头结点等于尾结点(直接返回 false),就代表了队列为空,所以就不需要进行排队

  • 通过第一个判断就已经知道头尾结点之间是有其他结点存在的,但由于前面的 enq 方法是先处理头结点的,然后除了两个 CAS 操作之外,其他都不是原子性的,也就是基本上整个 enq 方法都不是原子性的,所以当正在插入第一个线程的时候,会出现一个 h.next == null 的问题(这时已经代表有一个线程正在插入了,但还没完成插入,所以当前线程需要排队),所以当 h.next == null 时就代表要排队

  • 最后一个判断就是判断头结点的下一个线程是不是当前线程(也就是下一个是不是轮到你,不需要排队,因为插入方法是尾插法,所以弹出方法要从头弹出),经过前面两次判断,可以得知队列中有人在排队,并且此时并不存在队列第一次插入结点的 Null 问题,那么最后就需要判断下一个是否轮到当前线程


setExclusiveOwnerThread 方法



当前面判断无人排队,或者下一个就是轮到当前线程,那么首先要做的是调用 setExclusiveOwnerThread 方法


这个方法唯一执行的是:将 exclusiveOwnerThread 改成为自己,表明这个锁被这个线程获取了


总结 tryAcquire 方法


  • 首先获取当线程

  • 获取锁的当前状态,0 代表无人占用

  • 如果无人占用,判断当前线程是否需要进行排队

  • 如果队列为空,不需要排队

  • 如果队列不为空,但下一个是轮到自己,也不需要为空

  • 这里要注意,第一次插入的 null 问题

  • 如果当前线程不需要进行排队,下一步就是执行 CAS 来获得锁(此时锁 state 的状态要由 0 变为了 1),CAS 保证了同时争夺锁的并发安全性,如果此时 CAS 失败,就要重头来(本质上是 CAS 修改 state 从 0 变为 1)

  • 如果抢到了锁,那就将锁的拥有者改成自己

  • 返回 true 给上一层

  • 如果锁已经被占用了,也就是 state 不为 0,分为两种情况

  • 其他线程占有这把锁,所以后续操作直接返回 false

  • 当前线程占有这把锁,又再次进行加锁,相当于加了多层锁,又要去修改锁的状态 state,一般来说会加 1(源码上是加上 acquire,而 acquire 传进来是为 1),代表这个锁进入了需要被释放多次,才可以被其他线程获取,然后返回 True 给上一层

  • 如果是第二种情况,是不需要使用 CAS 来保证线程安全性的,因为上一层已经有一层锁在保护这个线程的安全性


整个判断是否排队的 tryAcquire 方法已经结束了


接下来就是回到上一层的 acquire 方法

返回 AbstractQueuedSynchronized 的 acquire 方法


可以看到,如果需要排队,就会执行 acquireQueued 方法,接下来在执行 addWaiter


addWaiter 方法已经看过了,其实就是一个结点进入底层队列而已,返回的是插入的结点,相当于是尾结点,也就是入队操作,不过这里离要注意的是他的参数是一个 Node.EXCLUSIVE,我们接下来来看一下他究竟是什么



其实本质上这只是一个状态表示,表示入队这个结点是进入等待状态

acquireQueued 方法

源码如下



接下来我们看看这个方法干了什么


前面已经判断了是否需要等待,如果需要等待,就会先执行插入队列的方法,然后执行 acquireQueued,既然进入了队列了,那么此时就应该是 park 此时的线程,也就是停止此时的线程,但其实停止并不是休眠或者阻塞,在 ReentrantLock 中,是使用自旋的方式来进行的


其实,这个方式其实就是 ReentranLock 底层的自旋操作


接下来分析整一段源码


final boolean acquireQueued(final Node node, int arg) {


//定义一个 failed 变量


//该变量用来判断当前线程拿到锁是否失败


boolean failed = true;


try {


//定义一个 interrupted 变量


//这个变量用来判断当前线程是否还在排队(还在自旋)


boolean interrupted = false;


//死循环


//这个死循环就是底层的 CAS 自旋


for (;;) {


//获取当前线程结点的前一个结点


final Node p = node.predecessor();


//如果前一个线程是头结点,然后当前线程就会去看是否需要排队


//前面已经提到过,头结点的线程要么是一个哨兵,要么就是当前拥有锁的线程


//如果前一个线程是头结点,那就代表当前线程就要时刻注意前一个线程是否结束


//所以要死循环不断地去看需不需要排队


//如果前一个结点并不是头结点,因为是公平锁,那就不需要在意,继续等待


if (p == head && tryAcquire(arg)) {


//进入到这里,就代表前一个线程已经结束了


//并且当前线程抢到锁,正在执行


//先将自己改成头结点,代表自己正在操作,让下一个线程注意


//并且这个 setHead 不是普通让自己成为头结点


//他会将自己里面的线程设为


//因为这是要满足一个原则:线程队列里面不允许存在拥有锁的线程


setHead(node);


//断开原来头结点与自己的连接


p.next = null; // help GC


//failed 变量设为 false


//代表线程拿到锁


failed = false;


//返回 interrupted


//该线程结束自旋,开始执行


return interrupted;


}


//当争夺锁失败时


//调用 shouldParkAfterFailedAcquire 方法


//这个方法是检查和更新未能获取锁的线程状态,判断是否需要进行阻塞


//如果需要阻塞就会调用 parkAndCheckInterrupt


//也就是让这个线程进行阻塞


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


//获取不到锁,failed 依然为 true


//获取不到锁,还需要继续自旋,执行下一轮循环


interrupted = true;


}


} finally {


//当轮到线程执行时,需要结束自旋,开始执行动作


//即 failed 会变为 false


//但首先要做的是,


if (failed)


cancelAcquire(node);


}


}


前面的自旋循环很简单,但是一直自旋下去并不是好事,因为自旋也是会消耗 CPU 的,假如有一个线程迟迟不释放锁,就白白消耗了很多的 CPU,所以,自旋是要限制次数的,而限制次数就存在于当获得锁失败时可能会调用的两个方法


setHead



可以看到,setHead 方法将头结点里面的线程设为了空,因为要满足获取锁的线程不存在于队列中,代表获取锁的线程已经不需要再进行排队了,而且不需要被叫醒


shouldParkAfterFailedAcquire 方法


前面已经提到过这个线程是用来判断获取锁失败的线程是否需要 park,即是否需要阻塞(因为 CAS 太多次了)


我们先来看看源码


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {


//获取前一个线程的 waitStatus 状态


int ws = pred.waitStatus;


//如果前一个线程状态为 SIGNAL


if (ws == Node.SIGNAL)


/*


  • This node has already set status asking a release

  • to signal it, so it can safely park.


*/


//返回 true,即当前线程需要阻塞、休眠


return true;


//如果前一个线程的状态大于 0


if (ws > 0) {


/*


  • Predecessor was cancelled. Skip over predecessors and

  • indicate retry.


*/


//代表前一个线程取消执行了,就要从队列中删除前一个线程


//当然这是一个循环,可能前面有其他取消的线程,那么也是要删除的


do {


//让当前线程的前一个线程为前前一个线程


node.prev = pred = pred.prev;


} while (pred.waitStatus > 0);


pred.next = node;


} else {


/*


  • waitStatus must be 0 or PROPAGATE. Indicate that we

  • need a signal, but don't park yet. Caller will need to

  • retry to make sure it cannot acquire before parking.


*/


//如果是 0 或者其他状态


//那么前一个线程还是可以继续自旋的,不过将状态改成了 SIGNAL


//当前线程下一轮抢锁如果还是失败,那他的上一个线程就会因为变成了 SIGNAL


//而被取消线程,转而变成阻塞了


compareAndSetWaitStatus(pred, ws, Node.SIGNAL);


}


//不需要 park 掉线程


return false;


}


那么我们来看看这个 SIGNAL 状态是什么

评论

发布
暂无评论
Java并发(五),大厂程序员35岁后的职业出路在哪