写点什么

一文带你学会 AQS 和并发工具类的关系 2

用户头像
伯阳
关注
发布于: 2021 年 01 月 18 日
一文带你学会AQS和并发工具类的关系2

1.创建公平锁


1.使用方式


Lock reentrantLock = new ReentrantLock(true);reentrantLock.lock(); //加锁try{  // todo} finally{  reentrantLock.unlock(); // 释放锁}
复制代码

2.创建公平锁


在 new ReentrantLock(true)的时候加入关键字 true


public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}
复制代码

当传入的参数值为 true 的时候创建的对象为 new FairSync()公平锁。

2.加锁的实现


1.普通的获取锁


reentrantLock.lock(); //加锁
复制代码

加锁的实际调用的方法是创建的公平锁里面的 lock 方法


static final class FairSync extends Sync {    final void lock() {        acquire(1);    }    ...}
复制代码

代码中的 acquire 方法和非公平锁中的 acquire 方法一样都是调用的 AQS 中的 final 方法

## AbstractQueuedSynchronizerpublic final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
复制代码

不过不同之处是这里面的 tryAcquire(arg)方法是调用的公平锁里面实现的方法


这个方法其实和非公平锁方法特别相似,只有一处不同公平锁中含有一个特殊的方法叫做 hasQueuedPredecessors()该方法也是 AQS 中的方法,该方法的实质就是要判断该节点的前驱节点是否是 head 节点


## AbstractQueuedSynchronizerpublic final boolean hasQueuedPredecessors() {    Node t = tail;     Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}
复制代码

剩下的部分和前一篇分析的非公平锁几乎是一个流程

protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            // 设置当前线程为当前锁的独占线程            setExclusiveOwnerThread(current);            // 获取锁成功            return true;        }    }    // 如果是当前线程持有的锁信息,在原来的state的值上加上acquires的值    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        // 设置state的值        setState(nextc);        // 获取锁成功        return true;    }    // 获取锁失败了才返回false    return false;}
复制代码

注意一下只有当返回 false 的时候才是 tryAcquire 失败的时候。此时就会走到繁琐的 addWaiter(Node.EXCLUSIVE)方法

2.普通获取锁失败


如果前面 tryAcquire 失败就会进行接下来的 addWaiter(Node.EXCLUSIVE)


## AbstractQueuedSynchronizerprivate Node addWaiter(Node mode) {    // 创建一个新的node节点 mode 为Node.EXCLUSIVE = null    Node node = new Node(Thread.currentThread(), mode);    // 获取尾部节点    Node pred = tail;    // 如果尾部节点不为空的话将新加入的节点设置成尾节点并返回当前node节点    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 如果尾部节点为空整明当前队列是空值得需要将当前节点入队的时候先初始化队列    enq(node);    return node;}
复制代码

3.节点入队方法


enq(node)方法是节点入队的方法我们来分析一下,enq 入队方法也是 AQS 中的方法,注意该方法的死循环,无论如何也要将该节点加入到队列中。


## AbstractQueuedSynchronizerprivate Node enq(final Node node) {for (;;) {Node t = tail;// 如果尾节点为空的话,那么需要插入一个新的节点当头节点        if (t == null) {if (compareAndSetHead(new Node()))tail = head;} else {    // 如果不为空的话,将当前节点变为尾节点并返回当前节点的前驱节点        node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
复制代码

其实和非公平锁的 addWaiter(Node node)是一样的流程,分析完。

4.acquireQueued 方法


此时当前节点已经被加入到了阻塞队列中了,进入到了 acquireQueued 方法。该方法也是 AQS 中的方法。


## AbstractQueuedSynchronizerfinal boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {         // 获取当前节点的前驱节点            final Node p = node.predecessor();            // 如果当前节点的前驱节点是头节点的话会再一次执行tryAcquire方法获            // 取锁            if (p == head && tryAcquire(arg)) {             // 设置当前节点为头节点                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码

注意 setHead(node) 中具体的实现细节 thread 为 null,prev 也为 null 其实就是如果当前节点的前驱节点为头节点的话,那么当前节点变成了头节点也就是之前阻塞队列的虚拟头节点。

private void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}
复制代码

如果不是头节点或者 tryAcquire()方法执行失败执行下面的更加繁琐的方法 shouldParkAfterFailedAcquire(p, node),如果该方法返回 true 才会执行到下面的 parkAndCheckInterrupt()方法,这两个方法都是 AQS 中的方法。

## AbstractQueuedSynchronizerprivate static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    // 获取前驱节点的状态int ws = pred.waitStatus;// 如果前驱节点的状态为SIGNAL那么直接就可以沉睡了,因为如果一个节点要是进入    // 阻塞队列的话,那么他的前驱节点的waitStatus必须是SIGNAL状态。    if (ws == Node.SIGNAL)return true;// 如果前驱节点不是Node.SIGNAL状态就往前遍历一值寻找节点的waitStatus必须    // 是SIGNAL状态的节点    if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {    // 如果没有找到符合条件的节点,那么就将当前节点的前驱节点的waitStatus        // 设置成SIGNAL状态    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
复制代码

如果返回的值是 false,就要注意此时又继续进入了下一次死循环中,因为如果往前遍历的过程中有可能他的前驱节点变成了头节点,那么就可以再次的获取锁,如果不是的话那么只能

执行 parkAndCheckInterrupt()方法进行线程的挂起了。


private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
复制代码

5.取消请求


无论如何最终都走到了 cancelAcquire 方法


private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;Node pred = node.prev;// 跳过所有取消请求的节点    while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;// 将当前节点设置成取消状态,为了后续遍历跳过我们    node.waitStatus = Node.CANCELLED;// 如果当前节点是尾节点,并且将当前节点的前驱节点设置成尾节点成功    if (node == tail && compareAndSetTail(node, pred)) {    // 当前节点的前驱节点的后续节点为空    compareAndSetNext(pred, predNext, null);} else {    int ws;// 如果前驱节点不是头节点        if (pred != head &&        // 前驱节点的状态是Node.SIGNAL或者前驱节点的waitStatus设置                   // 成Node.SIGNAL        ((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&// 前驱节点的thread 不为空            pred.thread != null) {// 获取当前节点的后继节点            Node next = node.next;// 如果后继节点不为空,并且后继节点waitStatus 小于0            if (next != null && next.waitStatus <= 0)// 将当前节点的后继节点设置成当前节点的前驱节点的后继节点            compareAndSetNext(pred, predNext, next);} else {// 如果上面当前节点的前驱节点是head或者其他条件不满足那么就唤醒当前节点        unparkSuccessor(node);}node.next = node; // help GC}}
复制代码

unparkSuccessor(node)唤醒当前节点,该方法也是 AbstractQueuedSynchronizer 中的方法

private void unparkSuccessor(Node node) {    // 获取当前节点的状态    int ws = node.waitStatus;    // 如果当前节点状态小于0那么设置成0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 获取当前节点的后继节点    Node s = node.next;    // 如果后继节点为空,或者后继节点的状态小于0    if (s == null || s.waitStatus > 0) {        // 后继节点置为null。视为取消请求的节点        s = null;        // 获取尾节点,并且尾节点不为空,不是当前节点,那么就往前遍历寻找        // 节点waitStatus 状态小于0的节点赋予给当前节点的后继节点        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)     // 唤醒后继节点        LockSupport.unpark(s.thread);}
复制代码

3.获取锁的流程图


流程图和上一篇非公平锁的获取流程图十分相似只有一点点区别这里就不过多的描述了。


4.释放锁的实现


4.1 释放锁代码分析


尝试释放此锁。如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。 如果当前线程不是此锁的持有者,则抛出 IllegalMonitorStateException。


## ReentrantLockpublic void unlock() {    sync.release(1);}
复制代码

sync.release(1) 调用的是 AbstractQueuedSynchronizer 中的 release 方法

## AbstractQueuedSynchronizerpublic final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
复制代码

分析 tryRelease(arg)方法


tryRelease(arg)该方法调用的是 ReentrantLock 中


protected final boolean tryRelease(int releases) {// 获取当前锁持有的线程数量和需要释放的值进行相减    int c = getState() - releases;     // 如果当前线程不是锁占有的线程抛出异常    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    // 如果此时c = 0就意味着state = 0,当前锁没有被任意线程占有    // 将当前所的占有线程设置为空    if (c == 0) {        free = true;        setExclusiveOwnerThread(null);    }    // 设置state的值为 0    setState(c);    return free;}
复制代码

如果头节点不为空,并且 waitStatus != 0,唤醒后续节点如果存在的话。

这里的判断条件为什么是 h != null && h.waitStatus != 0?


因为 h == null 的话,Head 还没初始化。初始情况下,head == null,第一个节点入队,Head 会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null 的情况。


  1. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒

  2. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒

private void unparkSuccessor(Node node) {// 获取头结点waitStatus    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);// 获取当前节点的下一个节点    Node s = node.next;//如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点    if (s == null || s.waitStatus > 0) {        s = null;        // 就从尾部节点开始找往前遍历,找到队列中第一个waitStatus<0的节点。        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }  // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点唤醒    if (s != null)        LockSupport.unpark(s.thread);}
复制代码

为什么要从后往前找第一个非 Cancelled 的节点呢?

看一下 addWaiter 方法


private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}
复制代码

我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred, compareAndSetTail(pred, node) 这两个地方可以看作 Tail 入队的原子操作,但是此时 pred.next = node;还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node

所以,如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。


4.2 释放锁流程图



5.注意


下一篇讲解并发工具包下的 LockSupport,谢谢大家的关注和支持!有问题希望大家指出,共同进步!!!


发布于: 2021 年 01 月 18 日阅读数: 20
用户头像

伯阳

关注

所有牛逼的人都有一段苦逼的岁月!只有坚持 2019.07.03 加入

一个懂得生活的程序员,世界是多维度的,我们要看的开心、玩的开心、活的开心

评论

发布
暂无评论
一文带你学会AQS和并发工具类的关系2