写点什么

AQS 阻塞队列机制源码分享

作者:new life
  • 2022 年 9 月 11 日
    北京
  • 本文字数:7568 字

    阅读完需:约 25 分钟

AQS 阻塞队列机制源码分享

1.1 AQS 介绍:

AbstractQueuedSynchronizer 是一个抽象类,以下我们会简称 AQS,翻译成中文就是抽象队列同步器。他是是一个用来实现同步器,封装了各种底层的同步细节,我们想自定义自己的同步工具的时候,只需要定义这个类的子类并覆盖它提供的一些方法就好了。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 中实现都有使用。

本次分享主要结合 ReentrantLock 的获取锁和释放锁分享基于 AQS 排他锁的实现;借助 ReentrantReadWriteLock 的读锁分享共享锁的实现。

1.2 AQS 组成

1.2.1 AQS 内部关键属性

private volatile int state;private transient volatile Node head;private transient volatile Node tail;
复制代码


AQS 主要通过 volatile 修饰的 state 值的改变,标识是否有线程持有锁,对于排它锁,在持有锁之后将 state 值加 1,后续线程线程看到 state 值为 1 之后,只能排队等待。当持有锁的线程,释放锁的时候再将状态 state 值设置为 0,通过 state 的切换,来控制线程的切换。


AQS 另外两个重要的属性是 head,tail,这个是 AQS 阻塞队列的头结点和尾结点,当有节点需要阻塞,封装成节点放到阻塞队列的时候,必须要确保 head 和 tail 的初始化,初始化后 head 节点并不是排队的第一个节点,它的下一个才是,后面分析源码的时候会看到。


接下来我们看下将需要阻塞的请求封装成 Node,这个 Node 的内容,我在 node 的属性上都有注释,后面分享的时候也会提到。

1.2.2 内部属性

static final class Node {    // 节点状态有默认0及下面1,-1,-2,-3 四种状态    volatile int waitStatus;    // 前节点指针    volatile Node prev;    // 后续节点指针    volatile Node next;    // 当前请求的线程信息    volatile Thread thread;    // 只有条件队列才用这个属性,指下一个等待节点    Node nextWaiter;    // 创建的节点等待共享锁    static final Node SHARED = new Node();    // 创建的节点等待排他锁    static final Node EXCLUSIVE = null;    // 当前请求取消抢锁    static final int CANCELLED =  1;    // 当前请求处在阻塞队列    static final int SIGNAL    = -1;    // 当前请求处在等待队列    static final int CONDITION = -2;    // 当前请求等共享锁    static final int PROPAGATE = -3;}
复制代码


1.3 内部固定方法

public final void acquire(int arg);public final void acquireInterruptibly(int arg);public final boolean release(int arg);public final void acquireShared(int arg);
复制代码

这些方法都已经定义好了实现方式,并且这些方法是 final 修饰的不能重写,比如下面这个:

public final void acquire(int arg) {    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){     		selfInterrupt();    } }
复制代码

凡是调用了像 acquire() 这样的方法,就需要按照方法里的逻辑执行,但是 AQS 同样提供了许多待扩展的方法,让使用者根据自己的需求,自行实现,比如:

protected boolean tryAcquire(int arg);protected boolean tryRelease(int arg);protected int tryAcquireShared(int arg);protected boolean tryReleaseShared(int arg);protected boolean isHeldExclusively();
复制代码

这些方法内部只是抛出一个异常,要想使用这些方法,需要在子类内重写实现逻辑。比如 tryAcquire() 只用重写才可以使用,后面会结合 ReentrantLock 分享这个方法的使用。

protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();} 
复制代码


1.4 ReentrantLock 中的 AQS

上图为 ReentrantLock.lock() 的实现机制,lock() 获取锁分为公平方式和非公平方式,这俩者的区别主要是:线程是否可以插队,公平和非公平都有各自的优缺点,而在 ReentrantLock 里的实现默认使用的是非公平的方式获取锁。下面是公平锁和非公平锁的优缺点:

公平锁:

优点:各个线程公平平等,每个线程等待一段时间后,都有执行的机会;

缺点:就在于整体执行速度更慢,吞吐量更小;

非公平锁:

优点:就在于整体执行速度更快,吞吐量更大;

缺点:但同时也可能产生线程饥饿问题,也就是说如果一直有线程插队,那么在等待队列中的线程可能长时间得不到运行;

1.4.1 非公平锁-获取锁的源码分析

final void lock() {    // 非公平的体现,先CAS获取一次    if (compareAndSetState(0, 1)) {      setExclusiveOwnerThread(Thread.currentThread());    } else {      acquire(1);    }}
复制代码

我们从 lock() 入口开始分析,对于 ReentrantLock 的非公平锁(默认使用):

1、首先尝试 CAS 获取锁,即 state == 0 的时候更新状态;

2、如果获取锁失败,走 acquire() 逻辑(公平锁直接走这块逻辑),这里面包含将线程安排到阻塞队列排队的逻辑;但是并不是一定会排队,后面还是会尝试获取锁,如果还是获取不到,只能进入队列排队了。后面详述。

 public final void acquire(int arg) {        if (!tryAcquire(arg) &&                  ③            ②acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {            selfInterrupt();        } }
复制代码

如果首次通过 CAS 方式获取锁没有成功,会重新获取锁,acquire() 主要由 3 块组成:

1、先尝试获取锁 tryAcquire(arg);

2、如果没有获取到的情况下将当前请求封装成节点,添加到阻塞队列尾部 addWaiter(Node.EXCLUSIVE), arg);

3、判断是否需要阻塞当前线程 acquireQueued();

1.4.1.1 tryAcquire() 方法分析

protected final boolean tryAcquire(int acquires) {   return nonfairTryAcquire(acquires);}
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 当前线程在排队前,再争取一次获取锁,获取到退出 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 重入锁的体现 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
复制代码

tryAcquire() 方法里使用的是 nonfairTryAcquire() 的方式获取锁。

nonfairTryAcquire() 里如果此刻有线程释放了锁,本线程可以去抢锁,抢到就把本线程设置为锁的独占线程。如果此刻是持有锁的线程来抢锁,这个就是重入锁的范围,只需要将 state 值做加法操作,对应后面 release() 方法自然有减法操作。

注:

线程的阻塞,需要操作的系统的支持,线程的阻塞唤醒,涉及到内核态和用户态的切换,所以为了减少不必要的线程上下文切换,这个地方又会抱着侥幸的心理抢一次锁,减少上下文切换也是选择非公平方式的一个原因。

1.4.1.2 addWaiter() 方法分析

 private Node addWaiter(Node mode) {     Node node = new Node(Thread.currentThread(), mode);     Node pred = tail;     // 队列不为空,新节点直接添加到队尾,返回     if (pred != null) {       node.prev = pred;       if (compareAndSetTail(pred, node)) {         pred.next = node;         return node;       }     }     // 初始化头节点,并尝试把新节点添加到到位     enq(node);     return node; }private Node enq(final Node node) {    for (; ; ) {      Node t = tail;      if (t == null) {        if (compareAndSetHead(new Node())) tail = head;      } else {        node.prev = t;        if (compareAndSetTail(t, node)) {          t.next = node;          return t;        }      }    }}
复制代码

addWaiter() 作用是将节点添加到阻塞队列队尾:

1、构造新节点,包含本线程信息;

2、如果队列的尾结点不为 null,说明阻塞队列已经初始化过了,直接在队列尾节点后面排上新节点;

3、如果队列未初始化,需要走 enq() 方法逻辑:

(1)初始化头节点,注意这里的头节点 (head)、尾节点(tail) 指向同一节点;

(2)队列初始化完成后,将新节点排在头节点之后。

enq() 方法里是个自旋操作,保证新节点可以排队成功。

1.4.1.3 acquireQueued() 方法分析

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {      boolean interrupted = false;      for (; ; ) {        final Node p = node.predecessor();        if (p == head && tryAcquire(arg)) {          setHead(node);          p.next = null; // help GC          failed = false;          return interrupted;        }        if (shouldParkAfterFailedAcquire(p, node) &&            parkAndCheckInterrupt())          interrupted = true;      }    } finally {      if (failed)        cancelAcquire(node);    } }private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)      return true;    if (ws > 0) {      do {        node.prev = pred = pred.prev;      } while (pred.waitStatus > 0);      pred.next = node;    } else {      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;    }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}
复制代码

走完 tryAcquire() 和 addWaiter() 逻辑就到 acquireQueued() 这块,acquireQueued()通过自旋的操作:

1、取前节点 node.predecessor() 如果自己就是第一个排队等待的节点直接去获取锁:获取成功直接退出。不用走后面的阻塞逻辑(这个地方再次体现了非公平锁和减少上下文切换)。

2、如果自己当前不是第一个排队的,接下来就需要判断是否阻塞了 shouldParkAfterFailedAcquire():

(1)如果自己之前的节点都在阻塞状态(SIGNAL),自己肯定阻塞;

(2)如果有节点状态是取消状态(CANCEL),移除这种取消的节点。

(3)将前节点的状态改为 SIGNAL,标识为阻塞状态。

3、如果当前节点需要阻塞,直接使用 LockSupport.park(),阻塞当前线程。注意 acquireQueued 里的自旋(for (; ; ))这个动作确实在循环处理,但并不是一直在消耗 CPU 的自旋,因为下面这个 parkAndCheckInterrupt()方法会将没获取到锁的线程阻塞,让出 CPU,直到有其他线程释放锁,并且这个线程被唤醒了,这个线程才会重新在循环里争抢锁。

1.4.2 非公平锁-释放锁的源码分析

上图为释放锁 unlock() 流程,内容主要是修改 state 值,

唤醒一个阻塞队列的线程去抢锁,源码分析如下:

public void unlock() {   sync.release(1);}
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}

复制代码

unLock() 里调用的是 AQS 的 release() 方法,这个方法内部:

(1)tryRelease()释放锁,即对 AQS 的 state 字段做减法操作;

(2)在修改完 state 字段成功之后,unparkSuccessor() 唤醒第一个在排队的线程;

注:头节点不是第一个排队的节点;

protected final boolean tryRelease(int releases) {    int c = getState() - releases;    if (Thread.currentThread() != getExclusiveOwnerThread())      throw new IllegalMonitorStateException();    boolean free = false;    if (c == 0) {      free = true;      setExclusiveOwnerThread(null);    }    /**     	这个 state 值更新,就是单纯的赋值,和获取锁的时候不一样,获取锁的时候更新是CAS的方式设置state值,    	但是这个地方已经获取到锁了,直接set也是线程安全的    */    setState(c);    return free; }private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    if (ws < 0) {      compareAndSetWaitStatus(node, ws, 0);    }    Node s = node.next;    /**    * 获取头节点之后第一个有效的阻塞节点,如果头结点后面的第一个节点为空或者节点取消抢锁了(state > 0),    * 那就从队列尾部开始向前遍历找一个阻塞的节点唤醒    */    if (s == null || s.waitStatus > 0) {      s = null;      for (Node t = tail; t != null && t != node; t = t.prev)        if (t.waitStatus <= 0) {          s = t;        }    }    if (s != null)      LockSupport.unpark(s.thread); }protected final void setState(int newState) {     state = newState;}
复制代码

1、tryRelease() 释放锁:

(1) 对于一般排他锁而言,获取锁,对 state 做加 1 操作,释放锁对 state 做减 1 操作,最终 state 会是 0;但是作为重入锁,state 字段的值可能会大于 1;在释放锁的时候,一次操作 release() ,state 字段不一定是 0;所以在 tryRelease() 只有 c 值为 0,代表锁彻底释放。

(2) 当 AQS 的 state 字段状态为 0 时,说明锁彻底释放,这时候要清除锁独占的标识 setExclusiveOwnerThread(null);

2、当持有锁的线程彻底释放锁了,接下来就该唤醒一个在排队的节点了:

(1)更新头节点状态为 0(0 也是默认状态);注意头节点在实际使用中,不会充当真正的阻塞节点。

(2)获取头节点之后第一个有效的阻塞节点,如果头结点后面的第一个节点为空或者节点取消抢锁了,那就从队列尾部开始向前遍历找一个阻塞的节点唤醒。

(3)LockSupport.unpark() 唤醒这个满足要求的阻塞的节点。

1.5 ReentrantReadWriteLock 的读锁-源码分析

ReentrantLock 实现的互斥锁一次只允许一个线程访问共享数据,哪怕是只读操作,但是读写锁 ReentrantReadWriteLock 允许对共享数据进行更高性能的并发访问,对于写操作,一次只有一个线程(write 线程)可修改共享数据对于读操作,允许多个线程同时读取。与互斥锁相比,使用读写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用,即在同一时间试图对该数据执行读取或写入操作的线程数。

所以,读写锁适用于读多写少的场景。

ReentrantReadWriteLock 里的 ReadLock 就是基于 AQS 共享锁开发的。读写锁里的 ReadLock 和 WriteLock 是 ReentrantReadWriteLock 俩静态内部类。而写锁的加锁和释放锁逻辑和 ReentrantLock 相似,就不分析了,这里只看读锁的加锁过程。

分享读锁机制前,先看下读写锁的内部轮廓:

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {    private static final long serialVersionUID = -6992448646407690164L;    private final ReentrantReadWriteLock.ReadLock readerLock;    private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync; public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } abstract static class Sync extends AbstractQueuedSynchronizer { // 省略部分代码 } public static class ReadLock implements Lock, java.io.Serializable { // 省略部分代码 } public static class WriteLock implements Lock, java.io.Serializable { // 省略部分代码 }}

复制代码


 public static final Object get(String key) {     readLock.lock();     try {       return map.get(key);     } finally {       readLock.unlock();     }   } public void lock() {    sync.acquireShared(1); }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) { doAcquireShared(arg); } } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // SHARED_UNIT 为 2^16,读锁通过int的高16位表示 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 省略部分代码 return 1; } return fullTryAcquireShared(current);}final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive();}
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;}
复制代码


分享这个读锁的目的,主要是读写锁在使用 AQS 的基础上,

将 state 值分为高低位,高 16 位供读锁使用,低 16 位供写锁使用。

读锁获取锁成功的条件是:阻塞队列的第一个节点是等待读锁,并且持有读锁

的数量没有到达最大值(2 的 16 次方)且设置锁状态成功;否则锁等待。

1、readLock.lock() 方法内调用 AQS 的 acquireShared() 方法;

2、acquireShared() 方法里要获取锁成功就退出,没有的话需要 doAcquireShared()

方法中将请求排入队列。

3、tryAcquireShared() 方法中会判断该线程是否该阻塞,读锁持有者有没有到达最大值,

如果符合要求才可以更新锁的状态。

  private void doAcquireShared(int arg) {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {      boolean interrupted = false;      for (; ; ) {        final Node p = node.predecessor();        if (p == head) {          int r = tryAcquireShared(arg);          if (r >= 0) {            // 省略部分代码            return;          }        }        // 标红的部分是读锁阻塞的逻辑,和排他锁逻辑一致,就不赘述了        if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {          interrupted = true;}      }    } finally {      if (failed) cancelAcquire(node);    }    }
复制代码


用户头像

new life

关注

还未添加个人签名 2019.03.04 加入

还未添加个人简介

评论

发布
暂无评论
AQS 阻塞队列机制源码分享_AQS_new life_InfoQ写作社区