写点什么

Java 并发(五):ReentrantLock 的加锁实现—

  • 2021 年 11 月 11 日
  • 本文字数:5278 字

    阅读完需:约 17 分钟

在 ReentrantLock 中,调用 lock 方法获取锁,调用 unlock 方法来释放锁,当然要同一个 ReentrantLock 才能锁住,具体的用法回看 Java 复习的锁


ReentrantLock 拥有公平锁和非公平锁,我们可以从构造方法中看出,无参构造出来的锁是非公平锁



ReentrantLock 的实现依赖于 Java 的同步框架 AbstractQueuedSynchronizer(简称为 AQS)


AQS 是使用一个整形的 volatile 变量来维护同步状态的,名字为 state


所以,我们必须先看看 AQS 的底层源码

AQS 的底层

AQS 的变量、常量与内部类


这里我们只先了解 head、tail 与一个内部类 Node,不过在上面,我们也可以看到一个重要的 state 变量,就是前面提到的锁实现


因为这三个就构成了 AQS 的底层,一个双向链表形成的队列,该队列就有用来存储线程的


Node 内部类



可以看到里面不仅有 prev、next 还有要存储的线程 thread,当然里面还有一些变量是表示线程状态的,比如 CANCELLED、CONDITION、PROPAGATE 和 SIGNAL,而且要注意,这个内部类是一个 final 类型,不可以被继承,但结点里面的 prev、next 和 thread 都是非 final,可以被修改的!



先认识到这,接下来我们回去看看 ReentrantLock


接下来,我们看看 ReentrantLock 的结构



可以看到,ReentrantLock 里面有 3 个内部类


分别为


  • FairSync:公平锁

  • NonfairSync:非公平锁

  • Sync:锁

FairSync:公平锁


可以看到 FairSync 继承了 Sync,而且自己只有一个序列化 Id 属性(当然还包含 Sync 的一切属性和方法)

lock 方法

不过最重要的,我们还是要看他的 lock 方法



可以看到,他调用 AbstractQueuedSynchronizer 里面的 acquire 方法,而且给了一个参数为 1,因为 Sync 是继承了 AbstractQueuedSynchronizer,AbstractQueuedSynchronizer 是一个抽象类,所以 FairSync 是拥有 AbstractQueuedSynchronizer 的所有属性和方法

AbstractQueuedSynchronizer 的 acquire 方法

接下来,我们看看 AbstratQueuedSynchronizer 的 acquire 方法



我们来看一下他的注释



注释说明了,这个方法就是 lock 方法的底层,独占模式其实就是获取锁,通过调用 tryAcquire 去获取锁,如果成功获取锁就


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


返回,获取不了就线程排队,线程排队过程中,可能会发生反复阻塞和解除阻塞


acquire 方法的步骤如下


  • 调用 tryAcquire 方法,这个方法是去获取锁

  • 调用 accquireQueued 方法,前面的一个判断是 tryAcquire 的非返回值,所以代表没有获取锁,所以接下来这个方法就是进行入线程队列,就是进入 AQS 的底层线程队列

  • 如果结果为 True,就执行 selfInterupt 方法,因为结果为 true,就代表拿不到锁,而且进入队列失败,所以需要进行打断

tryAcquire 方法


可以看到这个方法是 AbstractQueuedSynchronizer 里面的,而且并没有实现,在下面这些类里面都有实现



因为我们看的是 FairSync,所以 FairSync 调用的 tryAcquire 方法肯定是 FairSync 里面的


下面是他的底层实现



照样先看看他的注释



可以从注释中知道,这里的 tryAcquire 是公平版本的,符合 FairSync 的本质,接下来分析源代码(公平锁传进来的 acquire 为 1)


protected final boolean tryAcquire(int acquires) {


//获取当前执行该方法的线程


final Thread current = Thread.currentThread();


//获取锁的状态,前面提到过,AQS 是依靠一个 volatile 变量来实现的


//而且该变量称为 state,在 tryAcquire 这里就实现了


int c = getState();


//等于 0,代表锁还没有获取


if (c == 0) {


//调用了 hasQueuedPredecessors 与 compareAndSetState 方法


//hasQueuedPredecessors,来判断是否需要排队,因为是公平锁


//compareAndSetState 方法其实就是 CAS 去获取锁


//而且可以看到,如果 CAS 成功,那么 state 是改成 acquire,也就是 1


//所以 1 代表了锁正在被占用中


//所以 ReentrantLock 的底层是 CAS


if (!hasQueuedPredecessors() &&


compareAndSetState(0, acquires)) {


//如果不需要等待,且拿到了锁


//将锁的拥有者设为当前线程


setExclusiveOwnerThread(current);


return true;


}


}


//如果锁已经被占用


//而锁的拥有者再一次去进行获得锁


//即拥有锁的线程又去获得锁


else if (current == getExclusiveOwnerThread()) {


//计算当前状态+acquires


int nextc = c + acquires;


//如果小于 0 抛出异常


if (nextc < 0)


throw new Error("Maximum lock count exceeded");


//锁的状态变成


setState(nextc);


//继续放行,所以允许锁的拥有者再次获得锁


//但仅仅修改了底层 state 状态


return true;


}


//如果锁被占用,而且请求获得的线程不是锁的占用者


//返回 false


return false;


}


getState 方法



就是返回了底层的 state 变量,也就是 AQS 的 state


hasQueuedPredecessors 方法


这个方法是用来判断线程是否需要排队的(返回 false 不需要排队),因为是公平锁,所以这个判断是必须的,否则就不公平了,因为没有判断排队操作,那么只要一个线程释放了锁,那么后来的线程也是可以抢到的(透露一下:非公平锁的实现就是仅仅少了这个判断而已)



可以看到,这个方法是 AQS 里面的,tail 和 head 属性前面已经提到过,是底层队列的头尾结点


关键在于 return 语句


他主要有两个判断


  • 头结点是否等于尾结点

  • 头结点是否尾空,或者头结点的线程是否为当前线程


关于这个弹出,必须要先清楚,底层队列里面的结点是怎么添加的


下面就是 AQS 的插入方法 addWaiter



可以看到,如果尾结点不为空,也就代表不是第一次插入,那么直接就使用尾插法,如果是第一次插入,只是调用 enq 方法,所以 enq 方法是针对第一次插入使用的,这两个方法都是 AQS 的



可与看到里面是一个死循环,如果是第一次插入,让创建一个新结点为头结点(相当于一个哨兵),然后让新尾结点一样,这一点可以看成是初始化头尾结点,接下来下一轮循环,此时尾结点已经不为 null,接下来就让新结点指向尾结点,让新结点又称为了尾结点,再让旧的尾结点指向新的尾结点


但这里有一个疑问,如果并不是第一次入队,为什么最后还要执行多一次 enq 方法呢?


我们可以看到,如果并不是第一次入队,因为前面的 addWaiter,使用了一次 CAS,那如果 CAS 失败了怎么办?即中途有线程把尾结点改了怎么办?


此时就需要重复的操作,不断的执行 CAS 直到成功为止,所以就有了无论哪次插入都要执行 enq 方法,因为 enq 方法里面就是一个死循环的 CAS 执行,直到 CAS 执行成功,才会结束


总结一下添加结点


  • 所以头结点只是一个哨兵,是不存储线程的,而插入的方法则是尾插法

  • 其实头结点也不完全是哨兵,下面我们看线程自旋时,会发现其实头结点是当前轮到执行的线程,这里先埋下个伏笔


知道了插入方法后,我们回过头来看 hasQueuedPredecessors 的返回值,也就是如何判断是否需要排队



步骤如下


  • 判断头结点是否等于尾结点,只有在初始化的时候,头结点才会等于尾结点,或者队列为空,头尾结点都为 null,所以,如果头结点等于尾结点(直接返回 false),就代表了队列为空,所以就不需要进行排队

  • 通过第一个判断就已经知道头尾结点之间是有其他结点存在的,但由于前面的 enq 方法是先处理头结点的,然后除了两个 CAS 操作之外,其他都不是原子性的,也就是基本上整个 enq 方法都不是原子性的,所以当正在插入第一个线程的时候,会出现一个 h.next == null 的问题(这时已经代表有一个线程正在插入了,但还没完成插入,所以当前线程需要排队),所以当 h.next == null 时就代表要排队

  • 最后一个判断就是判断头结点的下一个线程是不是当前线程(也就是下一个是不是轮到你,不需要排队,因为插入方法是尾插法,所以弹出方法要从头弹出),经过前面两次判断,可以得知队列中有人在排队,并且此时并不存在队列第一次插入结点的 Null 问题,那么最后就需要判断下一个是否轮到当前线程


setExclusiveOwnerThread 方法



当前面判断无人排队,或者下一个就是轮到当前线程,那么首先要做的是调用 setExclusiveOwnerThread 方法


这个方法唯一执行的是:将 exclusiveOwnerThread 改成为自己,表明这个锁被这个线程获取了


总结 tryAcquire 方法


  • 首先获取当线程

  • 获取锁的当前状态,0 代表无人占用

  • 如果无人占用,判断当前线程是否需要进行排队

  • 如果队列为空,不需要排队

  • 如果队列不为空,但下一个是轮到自己,也不需要为空

  • 这里要注意,第一次插入的 null 问题

  • 如果当前线程不需要进行排队,下一步就是执行 CAS 来获得锁(此时锁 state 的状态要由 0 变为了 1),CAS 保证了同时争夺锁的并发安全性,如果此时 CAS 失败,就要重头来(本质上是 CAS 修改 state 从 0 变为 1)

  • 如果抢到了锁,那就将锁的拥有者改成自己

  • 返回 true 给上一层

  • 如果锁已经被占用了,也就是 state 不为 0,分为两种情况

  • 其他线程占有这把锁,所以后续操作直接返回 false

  • 当前线程占有这把锁,又再次进行加锁,相当于加了多层锁,又要去修改锁的状态 state,一般来说会加 1(源码上是加上 acquire,而 acquire 传进来是为 1),代表这个锁进入了需要被释放多次,才可以被其他线程获取,然后返回 True 给上一层

  • 如果是第二种情况,是不需要使用 CAS 来保证线程安全性的,因为上一层已经有一层锁在保护这个线程的安全性


整个判断是否排队的 tryAcquire 方法已经结束了


接下来就是回到上一层的 acquire 方法

返回 AbstractQueuedSynchronized 的 acquire 方法


可以看到,如果需要排队,就会执行 acquireQueued 方法,接下来在执行 addWaiter


addWaiter 方法已经看过了,其实就是一个结点进入底层队列而已,返回的是插入的结点,相当于是尾结点,也就是入队操作,不过这里离要注意的是他的参数是一个 Node.EXCLUSIVE,我们接下来来看一下他究竟是什么



其实本质上这只是一个状态表示,表示入队这个结点是进入等待状态

acquireQueued 方法

源码如下



接下来我们看看这个方法干了什么


前面已经判断了是否需要等待,如果需要等待,就会先执行插入队列的方法,然后执行 acquireQueued,既然进入了队列了,那么此时就应该是 park 此时的线程,也就是停止此时的线程,但其实停止并不是休眠或者阻塞,在 ReentrantLock 中,是使用自旋的方式来进行的


其实,这个方式其实就是 ReentranLock 底层的自旋操作


接下来分析整一段源码


final boolean acquireQueued(final Node node, int arg) {


//定义一个 failed 变量


//该变量用来判断当前线程拿到锁是否失败


boolean failed = true;


try {


//定义一个 interrupted 变量


//这个变量用来判断当前线程是否还在排队(还在自旋)


boolean interrupted = false;


//死循环


//这个死循环就是底层的 CAS 自旋


for (;;) {


//获取当前线程结点的前一个结点


final Node p = node.predecessor();


//如果前一个线程是头结点,然后当前线程就会去看是否需要排队


//前面已经提到过,头结点的线程要么是一个哨兵,要么就是当前拥有锁的线程


//如果前一个线程是头结点,那就代表当前线程就要时刻注意前一个线程是否结束


//所以要死循环不断地去看需不需要排队


//如果前一个结点并不是头结点,因为是公平锁,那就不需要在意,继续等待


if (p == head && tryAcquire(arg)) {


//进入到这里,就代表前一个线程已经结束了


//并且当前线程抢到锁,正在执行


//先将自己改成头结点,代表自己正在操作,让下一个线程注意


//并且这个 setHead 不是普通让自己成为头结点


//他会将自己里面的线程设为


//因为这是要满足一个原则:线程队列里面不允许存在拥有锁的线程


setHead(node);


//断开原来头结点与自己的连接


p.next = null; // help GC


//failed 变量设为 false


//代表线程拿到锁


failed = false;


//返回 interrupted


//该线程结束自旋,开始执行


return interrupted;


}


//当争夺锁失败时


//调用 shouldParkAfterFailedAcquire 方法


//这个方法是检查和更新未能获取锁的线程状态,判断是否需要进行阻塞


//如果需要阻塞就会调用 parkAndCheckInterrupt


//也就是让这个线程进行阻塞


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


//获取不到锁,failed 依然为 true


//获取不到锁,还需要继续自旋,执行下一轮循环


interrupted = true;


}


} finally {


//当轮到线程执行时,需要结束自旋,开始执行动作


//即 failed 会变为 false


//但首先要做的是,


if (failed)


cancelAcquire(node);


}


}


前面的自旋循环很简单,但是一直自旋下去并不是好事,因为自旋也是会消耗 CPU 的,假如有一个线程迟迟不释放锁,就白白消耗了很多的 CPU,所以,自旋是要限制次数的,而限制次数就存在于当获得锁失败时可能会调用的两个方法


setHead



可以看到,setHead 方法将头结点里面的线程设为了空,因为要满足获取锁的线程不存在于队列中,代表获取锁的线程已经不需要再进行排队了,而且不需要被叫醒


shouldParkAfterFailedAcquire 方法


前面已经提到过这个线程是用来判断获取锁失败的线程是否需要 park,即是否需要阻塞(因为 CAS 太多次了)


我们先来看看源码


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {


//获取前一个线程的 waitStatus 状态


int ws = pred.waitStatus;


//如果前一个线程状态为 SIGNAL


if (ws == Node.SIGNAL)


/*


  • This node has already set status asking a release

  • to signal it, so it can safely park.


*/


//返回 true,即当前线程需要阻塞、休眠


return true;


//如果前一个线程的状态大于 0


if (ws > 0) {


/*


  • Predecessor was cancelled. Skip over predecessors and

  • indicate retry.


*/


//代表前一个线程取消执行了,就要从队列中删除前一个线程


//当然这是一个循环,可能前面有其他取消的线程,那么也是要删除的

评论

发布
暂无评论
Java并发(五):ReentrantLock的加锁实现—