写点什么

队列同步器 AQS

作者:周杰伦本人
  • 2022 年 5 月 12 日
  • 本文字数:5493 字

    阅读完需:约 18 分钟

队列同步器 AQS

同步器的设计是基于模板方法模式的,


重写同步器指定的方法时,需要使用同步器提供的如下 3 个方法来访问或修改同步状态。


  • getState():获取当前同步状态。

  • setState(int newState):设置当前同步状态。

  • compareAndSetState(int expect,int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。


同步器依赖内部的同步队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

独占式同步状态获取与释放

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}
复制代码


private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}
复制代码


final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码


独占式同步锁获取流程:



通过调用同步器的 release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。


public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
复制代码


private void unparkSuccessor(Node node) {    /*     * If status is negative (i.e., possibly needing signal) try     * to clear in anticipation of signalling.  It is OK if this     * fails or if status is changed by waiting thread.     */    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);}
复制代码


unparkSuccessor(Node node)方法使用 LockSupport 来唤醒处于等待状态的线程。


在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况。


通过调用同步器的 acquireShared(int arg)方法可以共享式地获取同步状态


public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码


private void doAcquireShared(int arg) {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码


在 acquireShared(int arg)方法中,同步器调用 tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为 int 类型,当返回值大于等于 0 时,表示能够获取到同步状态。


因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg)方法返回值大于等于 0。可以看到,在 doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于 0,表示该次获取同步状态成功并从自旋过程中退出。


通过调用 releaseShared(int arg)方法可以释放同步状态,


public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}
复制代码


该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如 Semaphore),它和独占式主要区别在于 tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作会同时来自多个线程。

独占式超时获取同步状态

通过调用同步器的 doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回 true,否则,返回 false。


private boolean doAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (nanosTimeout <= 0L)        return false;    final long deadline = System.nanoTime() + nanosTimeout;    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return true;            }            nanosTimeout = deadline - System.nanoTime();            if (nanosTimeout <= 0L)                return false;            if (shouldParkAfterFailedAcquire(p, node) &&                nanosTimeout > spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            if (Thread.interrupted())                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码


该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上有所不同。如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout 小于等于 0 表示已经超时),如果没有超时,重新计算超时间隔 nanosTimeout,然后使当前线程等待 nanosTimeout 纳秒(当已到设置的超时时间,该线程会从 LockSupport.parkNanos(Objectblocker,long nanos)方法返回)。


acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态


doAcquireNanos(int arg,long nanosTimeout)会使当前线程等待 nanosTimeout 纳秒,如果当前线程在 nanosTimeout 纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。


独占式超时获取同步状态的流程:


自定义同步组件——TwinsLock

该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,我们将这个同步工具命名为 TwinsLock。


共享式访问,重写 tryAcquireShared(int args)方法和 tryReleaseShared(int args)方法


定义资源数。TwinsLock 在同一时刻允许至多两个线程的同时访问,表明同步资源数为 2,这样可以设置初始状态 status 为 2,当一个线程进行获取,status 减 1,该线程释放,则 status 加 1,状态的合法范围为 0、1 和 2,其中 0 表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用 compareAndSet(int expect,int update)方法做原子性保障。


package com.example.xppdemo.chapter4;
import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;
public class TwinsLock implements Lock { private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); }
public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
public boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } }
public void lock() { sync.acquireShared(1); }
@Override public void lockInterruptibly() throws InterruptedException {
}
@Override public boolean tryLock() { return false; }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; }
public void unlock() { sync.releaseShared(1); }
@Override public Condition newCondition() { return null; }}
复制代码


package com.example.xppdemo.chapter4;

import java.util.concurrent.locks.Lock;
public class TwinsLockTest { public static void main(String[] args) { test(); } public static void test() { final Lock lock = new TwinsLock(); class Worker extends Thread { public void run() { while (true) { lock.lock(); try { SleepUtils.second(1); System.out.println(Thread.currentThread().getName()); SleepUtils.second(1); } finally { lock.unlock(); } } } }// 启动10个线程 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); }// 每隔1秒换行 for (int i = 0; i < 10; i++) { SleepUtils.second(1); System.out.println(); } }}
复制代码


执行结果:


Thread-3
Thread-2
Thread-2Thread-3

Thread-3Thread-2


Thread-3Thread-2
Thread-3Thread-2
复制代码


该线程在执行过程中获取锁,当获取锁之后使当前线程睡眠 1 秒(并不释放锁),随后打印当前线程名称,最后再次睡眠 1 秒并释放锁

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2020.02.29 加入

还未添加个人简介

评论

发布
暂无评论
队列同步器AQS_5月月更_周杰伦本人_InfoQ写作社区