写点什么

【Java 试题】AQS 解析

发布于: 2021 年 04 月 07 日
【Java 试题】AQS解析

系列文章:

【Java 试题】从一道题目再看 Java 继承


一 背景

AQS 即 AbstractQueuedSynchronizer,是 java.util.concurrent.locks 包的一个重要概念。Java 中锁实现/同步的几种方式:synchronized,ReentrantLock,CAS。其中,可重入锁 ReentrantLock 就是基于 AbstractQueuedSynchronizer(AQS)的。因此,理解 AQS 的实现原理,对 Java 锁理解非常重要。本篇将结合 JDK1.8 源码,对 AQS 进行分析。

二 基本信息

2.1 官方注解

/*** Provides a framework for implementing blocking locks and related* synchronizers (semaphores, events, etc) that rely on* first-in-first-out (FIFO) wait queues.  This class is designed to* be a useful basis for most kinds of synchronizers that rely on a* single atomic {@code int} value to represent state. Subclasses* must define the protected methods that change this state, and which* define what that state means in terms of this object being acquired* or released.  Given these, the other methods in this class carry* out all queuing and blocking mechanics. Subclasses can maintain* other state fields, but only the atomically updated {@code int}* value manipulated using methods {@link #getState}, {@link* #setState} and {@link #compareAndSetState} is tracked with respect* to synchronization.
复制代码

核心部分翻译过来,AQS 依赖于先进先出(FIFO)阻塞队列,提供了一个实现阻塞锁和同步器(semaphores-信号量,events-事件等)的框架。AbstractQueuedSynchronizer 这个类的设计是作为大多数类型依赖一个单原子值来表示状态的同步器的一个有用的基础。

2.2 AQS 核心思想

AQS 的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。AQS 是将每一条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node),来实现锁的分配。

CLH 队列:

简单来说,AQS 是基于 CLH 队列,用 volatile 修饰共享变量 state,线程通过 CAS 去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

有一点需要注意:AQS 是自旋锁。也就是说,在等待唤醒的时候,经常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,直到被其他线程获取成功。

2.3 AQS 继承关系

先看一下类定义:

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer    implements java.io.Serializable 
复制代码

可见 AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer,并且实现了序列化接口。

AbstractOwnableSynchronizer 是什么?通过源码可见一斑:


/** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread;
/** * Sets the thread that currently owns exclusive access. * A {@code null} argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
/** * Returns the thread last set by {@code setExclusiveOwnerThread}, * or {@code null} if never set. This method does not otherwise * impose any synchronization or {@code volatile} field accesses. * @return the owner thread */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
复制代码

代码中可知,关键属性只有一个:Thread exclusiveOwnerThread,表示独占模式同步器的当前所有者(线程)。可以由某个线程独占,AbstractOwnableSynchronizer 为创建锁和相关同步器(伴随着所有权的概念)提供了基础。

三 AQS 详解

3.1 队列节点

AQS 所使用的 CLH 队列结构示意:

3.1.1 队列节点定义

队列中节点的结构,在 AQS 的 Node 中定义,这是一个 static final 类型。除了双向链表的相关定义:前驱节点、后继节点、节点值(线程 Thread):

volatile Node prev;volatile Node next;volatile Thread thread;
复制代码

之外,还有很多附加信息:

3.1.1.1 节点等待模式

标记节点等待模式的标记,类型也是 Node,SHARED 值不为 null 时,表示是共享模式;

EXCLUSIVE 不为 null 时,表示是独占模式

/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;
复制代码

3.1.1.2 等待状态值

waitStatus,用于表示线程已取消:

/** waitStatus value to indicate thread has cancelled */static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */static final int PROPAGATE = -3;
volatile int waitStatus;
复制代码

3.1.1.3 nextWaiter

这个属性比较特殊,因为前面已经有指向后继节点的 next,又增加了一个指向下一个条件等待节点,或特殊 SHARED 值的指针。从下面的官方注释中来看,因为条件队列只有在独占模式下保持时才被访问,所以我们只需要一个简单的链接队列来保持等待条件的节点。

/** * Link to next node waiting on condition, or the special * value SHARED.  Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */Node nextWaiter;
复制代码

AQS 的阻塞队列是以双向的链表的形式保存的,是通过 prev 和 next 建立起关系的,但是 AQS 中的条件队列是以单向链表的形式保存的,是通过 nextWaiter 建立起关系的,也就是 AQS 的阻塞队列和 AQS 中的条件队列并非同一个队列。

3.1.2 Node 的构造方法

除了默认的无参构造方法外,还有两个有参数的构造方法:

Node(Thread thread, Node mode) {     // Used by addWaiter    this.nextWaiter = mode;    this.thread = thread;}
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread;}
复制代码

3.2 AQS 的队列结构

3.2.1 队列核心属性

队列的头尾节点、核心资源(state):

private transient volatile Node head;private transient volatile Node tail;private volatile int state;
复制代码

可见队列中为了处理同步,这几个都是 volatile 修饰的。

3.2.2 核心方法

AQS 提供了 get/set 方法来读和设置 state 字段;另外还有 compareAndSetState 来解决争用设置状态的方法,使用了 CAS 机制(unsafe 的 compareAndSwapInt()方法):

protected final int getState() {    return state;}
protected final void setState(int newState) { state = newState;}
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
复制代码

3.3 队列操作

有了队列,下一步就是如何操作队列了。因为大部分都是在线程争用场景下进行,所以,如何保证对队列的操作是正确同步的,这点至关重要。

3.3.1 队尾添加节点

private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}
复制代码

可见操作是通过 compareAndSetTail()方法来操作的,依然是 CAS 保障同步。但在 return 之前,还有 enq(node)方法调用,我们再看这段代码:

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}
复制代码

操作是在一个死循环中,根据 tail 节点是否为空分为两块大的逻辑:tail == null,cas 设置 head;否则,节点的前驱指向 tail 节点,然后 cas 设置 tail。

3.3.2 操作队列头结点

private void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}
复制代码

这里只是一个简单的设置方法,node 的线程值和前驱设置为 null。

四 基于 AQS 的实现

包括:Reentrant、Semaphore,CountDownLatch、ReentrantReadWriteLock、SynchronousQueue 和 FutureTask。

4.1 关于资源获取与释放

获取是一种依赖状态的操作,并且通常会阻塞,直到达到了理想状态为止。以前面提到的几种 AQS 的实现为例:

  • ReentrantLock,“获取”操作意味着“等待直到锁可被获取”。

  • Semaphore,“获取”操作意味着“等待直到许可可被获取”。

  • CountDownLatch,“获取”操作意味着“等待直到闭锁达到结束状态”。

  • FutureTask,“获取”操作意味着“等待直到任务完成”。

释放并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。

4.2 状态机

AQS 使用 state(int)以表示状态,提供了 getState、setState 及 compareAndSetState 等 protected 类型方法进行状态转换。通过巧妙地使用 state,表示各种关键状态:

  • ReentrantLock 用 state 表示所有者线程已经重复获取该锁的次数。

  • Semaphore 用 state 表示剩余的许可数量。

  • CountDownLatch 用 state 表示闭锁的状态,如关闭、打开。

  • FutureTask 用 state 表示任务的状态,如尚未开始、正在运行、已完成、已取消。


除了 state,在同步器类中还可以自行管理一些额外的状态变量。如:

  • ReentrantLock 保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。

  • FutureTask 用 result 表示任务的结果,该结果可能是计算得到的答案,也可能是抛出的异常。


发布于: 2021 年 04 月 07 日阅读数: 66
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
【Java 试题】AQS解析