Java 并发之 AQS 源码分析

用户头像
指尖流逝
关注
发布于: 2020 年 05 月 15 日
Java并发之AQS源码分析

如何使用

使用AQS来实现同步器,只需要在其子类中实现以下方法:

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively

并在适当的时候,通过

  • getState

  • setState

  • compareAndSetState

来检查、修改状态变量state

ReentrantLock中的非公平锁的同步器实现

由于ReentrantLock只支持独占模式,因此其同步器实现方法:

  • tryAcquire

  • tryRelease

而无需实现

  • tryAcquireShared

  • tryReleaseShared

对于ReentrantLock来说,其lock方法AQS同步获取资源的入口,其内部会调用tryAcquire方法.tryAcquire方法由ReentrantLock的同步器实现,操作如下:

  • 通过CAS来竞争锁,成功则直接返回

  • 竞争锁失败,则当前线程进入等待队列,经过一次自旋后进入WAITING状态,直到当前线程被其前驱结点唤醒、或者线程中断,而且线程中断发生时不保证及时处理,只是在中断时记录下中断状态,继续自旋或阻塞,直到成功获取资源后,返回这期间的中断状态,交由上层程序处理;

/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS同步获取资源的入口,其内部会调用tryAcquire方法。
// tryAcquire的实现决定了如何进行锁竞争,由子类来实现。
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync#nonfairTryAcquire
/**
* 通过getState、compareAndSetState、setState以保证原子性地维护和读取AQS内部的state变量
*
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入概念体现在这里
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// Sync#isHeldExclusively
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Sync#newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
// Sync#getOwner
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// Sync#getHoldCount
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// Sync#isLocked
final boolean isLocked() {
return getState() != 0;
}
}

源码分析

加锁主流程

节点的共享模式和独占模式

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

节点状态

对于普通队列,初始的节点状态为0;对于条件队列,初始的节点状态为CONDITION

// 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// 表示后续节点在等待当前节点唤醒。后续节点入队时会将前去节点的状态更新为SIGNAL。看shouldParkAfterFailedAcquire方法
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
// 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

入队操作

入队的操作很简单,要注意的是:

  • 只在发生竞争时才初始化CLH队列

  • 将入队的节点通过CAS设置到tail

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 队列只在发生锁竞争的时候创建,这里初始化队列的头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

自旋锁、锁等待与线程挂起

/**
* 独占模式下线程获取资源的入口。如果获取到资源,线程直接返回。否则进入等待队列,直到获取到资源为止,整个过程忽略中断,并不会对线程的中断状态进行及时响应,而是在获取到资源之后,恢复线程的中断状态,直接返回。由上层应用程序觉得如何处理中断
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 通过死循环,实现线程的自旋过程。如果当前线程的前驱结点是队首,那么尝试获取资源,获取到则线程返回。如果当前线程的前驱结点不是队首或者获取资源失败,先检查前驱结点状态,当前驱结点状态为Node.SIGNAL,挂起当前线程;如果前驱结点状态 > 0说明前驱结点被取消,需要维护队列,将当前节点的前驱结点指向到有效节点上;如果前驱结点的状态为0或者Node.PROPAGATE时,将被修改为Node.SIGNAL,当前线程进入下一个循环,下个循环中,如果前驱结点状态为Node.SIGNAL,将挂起当前线程,进入WAITING状态,等待前驱结点唤醒或者线程被中断;而且不会立即响应中断,而是在获取到资源后将中断状态返回
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取不到资源,当前线程将在队列上阻塞等待。阻塞由LockSupport支持来实现,如果在等待过程中线程发生中断不会立即响应,而是在获取到资源之后,重新设置上中断状态。由上层应用代码来决定如何处理
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 在不支持中断和无限期等待的情况下,不会走到这一步
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

解锁主流程

释放共享资源

/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
// 调用子类同步器实现的tryRelease释放共享资源
if (tryRelease(arg)) {
Node h = head;
// 如果发生过锁竞争,等待队列不为空,且当前线程的后继节点线程需要被唤醒(后继节点入队时,会修改前驱结点的waitStatus = Node.SIGNAL)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 重入体现在这里,只有当state = 0时才释放锁。意味着lock、unlock要成对出现。否则将无法正确释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 为什么从后往前遍历?因为节点入队时,是通过CAS的方式设置tail节点后,再设置此前tail.next的值。这两个操作不是原子的,可能出现链表中插入了节点,但是遍历不到最后一个节点。具体可以看看enq(Node node)方法
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

取消获取资源

对于取消获取资源的操作,发生在中断或者timeout的场景中。

实际上,取消等待就是将需要取消的节点从CLH队列中出队。分三种情况讨论:

  • 当前节点node是tail节点,直接出队即可。设置node前驱结点的next为null

  • 当前节点node不是tail也不是head的后继节点:需要更改前驱结点的状态为Node.SIGNAL,前驱结点的next为node.next

  • 当前节点时head的后继节点,直接唤醒当前节点的后继节点线程。由该线程进行当前节点node的出队。体现在AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法上

/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 进行出队操作,分三种场景考虑
// If we are the tail, remove ourselves.
// 场景一:当前节点是尾结点,很好处理,直接出队即可
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
// 场景二:当前节点node不是tail而且不是head的后继节点。
int ws;
if (pred != head &&
// 修改node前驱结点状态为SIGNAL,并设置前驱结点的next为node.next
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 场景三:node是head的后继节点,直接唤醒node的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

延伸阅读

AQS论文



发布于: 2020 年 05 月 15 日 阅读数: 34
用户头像

指尖流逝

关注

还未添加个人签名 2017.10.17 加入

还未添加个人简介

评论

发布
暂无评论
Java并发之AQS源码分析