写点什么

一文带你学会 AQS 和并发工具类的关系

用户头像
比伯
关注
发布于: 2021 年 01 月 15 日
一文带你学会AQS和并发工具类的关系

1. 存在的意义

AQS(AbstractQueuedSynchronizer)是 JAVA 中众多锁以及并发工具的基础,其底层采用乐观锁,大量使用了 CAS 操作, 并且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。


提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(semaphore 等)。 此类旨在为大多数依赖单个原子 int 值表示状态的同步器提供有用的基础。 子类必须定义更改此状态的受保护方法,并定义该状态对于获取或释放此对象而言意味着什么。 鉴于这些,此类中的其他方法将执行所有排队和阻塞机制。 子类可以维护其他状态字段,但是相对于同步,仅跟踪使用方法 getState,setState 和 compareAndSetState 操作的原子更新的 int 值。


此类支持默认独占模式和共享模式之一或两者。 当以独占方式进行获取时,其他线程尝试进行的获取将无法成功。 由多个线程获取的共享模式可能(但不一定)成功。 该类不“理解”这些差异,当共享模式获取成功时,下一个等待线程(如果存在)还必须确定它是否也可以获取。 在不同模式下等待的线程共享相同的 FIFO 队列。 通常,实现子类仅支持这些模式之一,但例如可以在 ReadWriteLock 发挥作用。 仅支持独占模式或仅支持共享模式的子类无需定义支持未使用模式的方法。

2. 核心知识点

2.1 state

private volatile int state; // 同步状态
复制代码


state 是整个工具的核心,通常整个工具都是在设置和修改状态,很多方法的操作都依赖于当前状态是什么。由于状态是全局共享的,一般会被设置成 volatile 类型,为了保证其修改的可见性;


2.2 CLH 队列

AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。队列采用的是悲观锁的思想,表示当前所等待的资源,状态或者条件短时间内可能无法满足。因此,它会将当前线程包装成某种类型的数据结构,扔到一个等待队列中,当一定条件满足后,再从等待队列中取出。


2.3 CAS 操作

CAS 操作是最轻量的并发处理,通常我们对于状态的修改都会用到 CAS 操作,因为状态可能被多个线程同时修改,CAS 操作保证了同一个时刻,只有一个线程能修改成功,从而保证了线程安全。CAS 采用的是乐观锁的思想,因此常常伴随着自旋,如果发现当前无法成功地执行 CAS,则不断重试,直到成功为止。


3. 核心实现原理

3.1 作为同步器的基础

要将此类用作同步器的基础,请使用 getState,setState 或 compareAndSetState 检查或修改同步状态,以重新定义以下方法(如适用):


  1. tryAcquire


独占方式,arg 为获取锁的次数,尝试获取资源,成功则返回 True,失败则返回 False。


  1. tryRelease


独占方式,arg 为释放锁的次数,尝试释放资源,成功则返回 True,失败则返回 False。


  1. tryAcquireShared


共享方式,arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。


  1. tryReleaseShared


共享方式,arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 True,否则返回 False。


  1. isHeldExclusively


该线程是否正在独占资源。只有用到 Condition 才需要去实现它。


默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException 。 这些方法的实现必须在内部是线程安全的,并且通常应简短且不阻塞。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为 final 方法,因为它们不能独立变化。


3.2 同步状态 state

AQS 中维护了一个名为 state 的字段,意为同步状态,是由 volatile 修饰的,用于展示当前临界资源的获锁情况。


private volatile int state;
复制代码


下面提供了几个访问这个 state 字段的方法:

返回同步状态的当前值。 此操作具有 volatile 读取的内存语义


protected final int getState() {    return state;}
复制代码


设置同步状态的值。 此操作具有 volatile 写操作的内存语义。


protected final void setState(int newState) {    state = newState;}
复制代码


如果当前状态值等于期望值,则以原子方式将同步状态设置为给定的更新值。 此操作具有 volatile 读写的内存语义


protected final boolean compareAndSetState(int expect, int update) {    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
复制代码


这几个方法都是 Final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式

state 的值即表示了锁的状态,state 为 0 表示锁没有被占用,state 大于 0 表示当前已经有线程持有该锁,这里之所以说大于 0 而不说等于 1 是因为可能存在可重入的情况。你可以把 state 变量当做是当前持有该锁的线程数量。


public abstract class AbstractOwnableSynchronizer    protected AbstractOwnableSynchronizer() {          private transient Thread exclusiveOwnerThread;    protected final void setExclusiveOwnerThread(Thread thread) {        exclusiveOwnerThread = thread;    }    protected final Thread getExclusiveOwnerThread() {        return exclusiveOwnerThread;    }}
复制代码


exclusiveOwnerThread 属性的值即为当前持有锁的线程独占模式获取锁流程:



共享模式获取锁流程:



3.3 数据结构

  1. AQS 中最基本的数据结构是 Node,Node 即为 CLH 变体队列中的节点。


static final class Node {    // 表示线程以共享的模式等待锁    static final Node SHARED = new Node();    // 表示线程正在以独占的方式等待锁    static final Node EXCLUSIVE = null;    // 为1,表示线程获取锁的请求已经取消了    static final int CANCELLED =  1;    // 为-1,表示线程已经准备好了,就等资源释放了    static final int SIGNAL    = -1;    // 为-2,表示节点在等待队列中,节点线程等待唤醒    static final int CONDITION = -2;    // 为-3,当前线程处在SHARED情况下,该字段才会使用    static final int PROPAGATE = -3;    // 当前节点在队列中的状态    volatile int waitStatus;    // 前驱节点    volatile Node prev;    // 后续节点    volatile Node next;    // 当前节点的线程    volatile Thread thread;    // 指向下一个处于CONDITION状态的节点    Node nextWaiter;    ...}
复制代码


  1. AQS 中 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。


// 队列头节点private transient volatile Node head;// 队列尾节点private transient volatile Node tail;
复制代码



在 AQS 中的队列是一个 FIFO 队列,它的 head 节点永远是一个虚拟节点(dummy node), 它不代表任何线程,因此 head 所指向的 Node 的 thread 属性永远是 null。但是我们不会在构建过程中创建它们,因为如果没有争用,这将是浪费时间。 而是构造节点,并在第一次争用时设置头和尾指针。只有从次头节点往后的所有节点才代表了所有等待锁的线程。也就是说,在当前线程没有抢到锁被包装成 Node 扔到队列中时,即使队列是空的,它也会排在第二个,我们会在它的前面新建一个虚拟节点。


4. 获取锁实现

4.1 ReentrantLock 独占锁内部结构

构造函数源代码


// 默认创建非公平锁public ReentrantLock() {    sync = new NonfairSync();}// 通过传值为true来进行创建公平锁public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}
复制代码


ReentrantLock 里面有三个内部类:


  1. 一个是抽象的 Sync 实现了 AbstractQueuedSynchronizer

  2. NonfairSync 继承了 Sync

  3. FairSync 继承了 Sync



4.2 非公平锁的实现

ReentrantLock 种获取锁的方法


public void lock() {    sync.lock();}
复制代码


ReentrantLock 的非公平锁实现


static final class NonfairSync extends Sync {    final void lock() {    // 如果设置state的值从0变为1成功        if (compareAndSetState(0, 1))        // 则将当前线程设置为独占线程            setExclusiveOwnerThread(Thread.currentThread());        else            acquire(1);    }    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}
复制代码


compareAndSetState(0,1)


protected final boolean compareAndSetState(int expect, int update) {    // 通过unsafe.compareAndSwapInt方法来进行设置值    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
复制代码


stateOffset 为 AQS 中维护的 state 属性的偏移量



setExclusiveOwnerThread(Thread.currentThread());


protected final void setExclusiveOwnerThread(Thread thread) {    exclusiveOwnerThread = thread;}
复制代码


acquire(1); 调用的是 AQS 中的 acquire(int arg) 方法


public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}
复制代码


tryAcquire(arg) 该方法是 protected 的由子类去具体实现的



我们需要看的是 NonfairSync 中实现的 tryAcquire 方法,里面又调用了 nonfairTryAcquire 方法,再进去看看


static final class NonfairSync extends Sync {    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}
复制代码


nonfairTryAcquire(int acquires) 方法实现


final boolean nonfairTryAcquire(int acquires) {    // 获取当前线程    final Thread current = Thread.currentThread();    // 获取当前state的值    int c = getState();     if (c == 0) {      // 看看设置值是否能成功            if (compareAndSetState(0, acquires)) {           // 则将当前线程设置为独占线程            setExclusiveOwnerThread(current);            return true;        }    } // 返回由setExclusiveOwnerThread设置的最后一个线程;如果从不设置,则返回null     else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        // 设置state的值        setState(nextc);        return true;    }    return false;}
复制代码


acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法实现,先看里 addWaiter(Node.EXCLUSIVE)方法注意:Node.EXCLUSIVE 此时是空值,所以 mode 就是空的,所以此时创建的 Node 节点中的 nextWaiter 是空值。


static final class Node {  Node(Thread thread, Node mode) {     // Used by addWaiter    this.nextWaiter = mode;    this.thread = thread;  }}private Node addWaiter(Node mode) {    // 创建一个新的节点    Node node = new Node(Thread.currentThread(), mode);    // 将当前CLH队列的尾部节点赋予给 pred    Node pred = tail;    if (pred != null) { // 如果尾节点不为空        node.prev = pred; // 将当前node节点的前驱节点指向CLH队列的尾部节点        if (compareAndSetTail(pred, node)) { // CAS设置值            pred.next = node; // CLH队列的尾部节点的后继节点指向新的node节点            return node;        }    }    enq(node);    return node;}
复制代码


如果 CLH 队列的尾部节点为空值的话,执行 enq(node)方法


// 通过CAS方式设置队列的头节点private final boolean compareAndSetHead(Node update) {    return unsafe.compareAndSwapObject(this, headOffset, null, update);}// 通过CAS方式设置队列的尾部节点private final boolean compareAndSetTail(Node expect, Node update) {    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}// 节点入队操作private Node enq(final Node node) {    for (;;) {        Node t = tail;        // 如果尾部节点为空值        if (t == null) {        // 则进行初始化一个节点,将其设置为头节点            if (compareAndSetHead(new Node()))                tail = head; //头尾指向同一个空节点        } else {      // 如果尾部节点不为空,那么将传进来的入队节点的前驱节点指向当前队列的尾部节点            node.prev = t;      // 将当前传进来的入队节点设置为当前队列的尾部节点            if (compareAndSetTail(t, node)) {           // 将当前队列的尾部节点的后继节点指向传进来的新节点                 t.next = node;                return t;            }        }    }}
复制代码


查看 acquireQueued 方法实现


// 获取当前节点的前驱节点final Node predecessor() throws NullPointerException {    Node p = prev;    if (p == null)        throw new NullPointerException();    else        return p;}// 检查并更新无法获取的节点的状态。 如果线程应阻塞,则返回trueprivate static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//SIGNAL这个状态就有点意思了,它不是表征当前节点的状态,而是当前节点的下一个节点 //的状态。当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继 // 节点)已经被挂起了(或者马上就要被挂起了),因此在当前节点释放了锁或者放弃获取 // 锁时,如果它的waitStatus属性为SIGNAL,它还要完成一个额外的操作——唤醒它的后继节点。    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        return true;    if (ws > 0) {     // 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点                                        // 已经取消了等待锁(由于超时或者中断等原因)        // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点        // 然后我们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面         do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {     // 前驱节点的状态既不是SIGNAL,也不是CANCELLED        // 用CAS设置前驱节点的ws为 Node.SIGNAL,给自己定一个闹钟        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}// 停止的便捷方法,然后检查是否中断private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}// 取消正在进行的获取尝试private void cancelAcquire(Node node) {    if (node == null)        return;    node.thread = null;    Node pred = node.prev;    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;    Node predNext = pred.next;    node.waitStatus = Node.CANCELLED;    if (node == tail && compareAndSetTail(node, pred)) {        compareAndSetNext(pred, predNext, null);    } else {        int ws;        if (pred != head &&            ((ws = pred.waitStatus) == Node.SIGNAL ||             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&            pred.thread != null) {            Node next = node.next;            if (next != null && next.waitStatus <= 0)                compareAndSetNext(pred, predNext, next);        } else {            unparkSuccessor(node);        }        node.next = node;    }}// 能执行到该方法, 说明addWaiter 方法已经成功将包装了当前Thread的节点添加到了等待队列的队尾// 该方法中将再次尝试去获取锁// 在再次尝试获取锁失败后, 判断是否需要把当前线程挂起final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {         // 获取当前节点的前驱节点            final Node p = node.predecessor();         // 在前驱节点就是head节点的时候,继续尝试获取锁            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 将当前线程挂起,使CPU不再调度它            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)             cancelAcquire(node);    }}
复制代码


为什么前面获取锁失败了, 这里还要再次尝试获取锁呢?

首先, 这里再次尝试获取锁是基于一定的条件的,即:当前节点的前驱节点就是 HEAD 节点,因为我们知道,head 节点就是个虚拟节点,它不代表任何线程,或者代表了持有锁的线程,如果当前节点的前驱节点就是 head 节点,那就说明当前节点已经是排在整个等待队列最前面的了。


setHead(node); 方法


// 这个方法将head指向传进来的node,并且将node的thread和prev属性置为nullprivate void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}
复制代码


  可以看出,这个方法的本质是丢弃原来的 head,将 head 指向已经获得了锁的 node。但是接着又将该 node 的 thread 属性置为 null 了,这某种意义上导致了这个新的 head 节点又成为了一个虚拟节点,它不代表任何线程。为什么要这样做呢,因为在 tryAcquire 调用成功后,exclusiveOwnerThread 属性就已经记录了当前获取锁的线程了,此处没有必要再记录。这某种程度上就是将当前线程从等待队列里面拿出来了,是一个变相的出队操作。

shouldParkAfterFailedAcquire(Node pred, Node node)方法


  1. 如果为前驱节点的 waitStatus 值为 Node.SIGNAL 则直接返回 true

  2. 如果为前驱节点的 waitStatus 值为 Node.CANCELLED (ws > 0), 则跳过那些节点, 重新寻找正常等待中的前驱节点,然后排在它后面,返回 false

  3. 其他情况, 将前驱节点的状态改为 Node.SIGNAL, 返回 false


acquireQueued 方法中的 Finally 代码


private void cancelAcquire(Node node) {  // 将无效节点过滤    if (node == null)        return;  // 设置该节点不关联任何线程,也就是虚节点    node.thread = null;    Node pred = node.prev;  // 通过前驱节点,跳过取消状态的node    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;  // 获取过滤后的前驱节点的后继节点    Node predNext = pred.next;  // 把当前node的状态设置为CANCELLED    node.waitStatus = Node.CANCELLED;  // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点  // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null    if (node == tail && compareAndSetTail(node, pred)) {        compareAndSetNext(pred, predNext, null);    } else {        int ws;    // 如果当前节点不是head的后继节点,    // 1.判断当前节点前驱节点的是否为SIGNAL    // 2.如果不是,则把前驱节点设置为SINGAL看是否成功    // 如果1和2中有一个为true,再判断当前节点的线程是否为null    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {            Node next = node.next;            if (next != null && next.waitStatus <= 0)                compareAndSetNext(pred, predNext, next);        } else {   // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点            unparkSuccessor(node);        }        node.next = node; // help GC    }}
复制代码


4.3 非公平锁获取流程图

非公平锁获取锁成功的流程图



非公平锁获取锁失败的流程图



5.释放锁实现

5.1 释放锁代码分析

  尝试释放此锁。如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。 如果当前线程不是此锁的持有者,则抛出 IllegalMonitorStateException。


## ReentrantLockpublic void unlock() {    sync.release(1);}
复制代码


sync.release(1) 调用的是 AbstractQueuedSynchronizer 中的 release 方法


## AbstractQueuedSynchronizerpublic final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
复制代码


分析 tryRelease(arg)方法



tryRelease(arg)该方法调用的是 ReentrantLock 中


protected final boolean tryRelease(int releases) {// 获取当前锁持有的线程数量和需要释放的值进行相减    int c = getState() - releases;     // 如果当前线程不是锁占有的线程抛出异常    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    // 如果此时c = 0就意味着state = 0,当前锁没有被任意线程占有    // 将当前所的占有线程设置为空    if (c == 0) {        free = true;        setExclusiveOwnerThread(null);    }    // 设置state的值为 0    setState(c);    return free;}
复制代码


  如果头节点不为空,并且 waitStatus != 0,唤醒后续节点如果存在的话。

这里的判断条件为什么是 h != null && h.waitStatus != 0?


  因为 h == null 的话,Head 还没初始化。初始情况下,head == null,第一个节点入队,Head 会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null 的情况。


  1. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒

  2. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒


private void unparkSuccessor(Node node) {// 获取头结点waitStatus    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);// 获取当前节点的下一个节点    Node s = node.next;//如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点    if (s == null || s.waitStatus > 0) {        s = null;        // 就从尾部节点开始找往前遍历,找到队列中第一个waitStatus<0的节点。        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }  // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点唤醒    if (s != null)        LockSupport.unpark(s.thread);}
复制代码


为什么要从后往前找第一个非 Cancelled 的节点呢?

看一下 addWaiter 方法


private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
复制代码


我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred, compareAndSetTail(pred, node) 这两个地方可以看作 Tail 入队的原子操作,但是此时 pred.next = node;还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node

所以,如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。


5.2 释放锁流程图


6.注意

由于篇幅较长公平锁地实现在下一篇的博客中讲述,谢谢大家的关注和支持!有问题希望大家指出问题,共同进步!!!


最后还给大家整理了一份面试宝典,有需要的大哥们添加小编的 vx:mxzFAFAFA 来免费获取!






发布于: 2021 年 01 月 15 日阅读数: 19
用户头像

比伯

关注

还未添加个人签名 2020.11.09 加入

还未添加个人简介

评论

发布
暂无评论
一文带你学会AQS和并发工具类的关系