Java 并发之 AQS 源码分析
如何使用
使用AQS来实现同步器,只需要在其子类中实现以下方法:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
并在适当的时候,通过
getState
setState
compareAndSetState
来检查、修改状态变量state
ReentrantLock中的非公平锁的同步器实现
由于ReentrantLock只支持独占模式,因此其同步器实现方法:
tryAcquire
tryRelease
而无需实现
tryAcquireShared
tryReleaseShared
对于ReentrantLock来说,其lock方法AQS同步获取资源的入口,其内部会调用tryAcquire方法.tryAcquire方法由ReentrantLock的同步器实现,操作如下:
通过CAS来竞争锁,成功则直接返回
竞争锁失败,则当前线程进入等待队列,经过一次自旋后进入WAITING状态,直到当前线程被其前驱结点唤醒、或者线程中断,而且线程中断发生时不保证及时处理,只是在中断时记录下中断状态,继续自旋或阻塞,直到成功获取资源后,返回这期间的中断状态,交由上层程序处理;
/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// AQS同步获取资源的入口,其内部会调用tryAcquire方法。// tryAcquire的实现决定了如何进行锁竞争,由子类来实现。acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// Sync#nonfairTryAcquire/*** 通过getState、compareAndSetState、setState以保证原子性地维护和读取AQS内部的state变量** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入概念体现在这里else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync#tryReleaseprotected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// Sync#isHeldExclusivelyprotected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}// Sync#newConditionfinal ConditionObject newCondition() {return new ConditionObject();}// Methods relayed from outer class// Sync#getOwnerfinal Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}// Sync#getHoldCountfinal int getHoldCount() {return isHeldExclusively() ? getState() : 0;}// Sync#isLockedfinal boolean isLocked() {return getState() != 0;}}
源码分析
加锁主流程
节点的共享模式和独占模式
/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;
节点状态
对于普通队列,初始的节点状态为0;对于条件队列,初始的节点状态为CONDITION
// 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;// 表示后续节点在等待当前节点唤醒。后续节点入队时会将前去节点的状态更新为SIGNAL。看shouldParkAfterFailedAcquire方法/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;// 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
入队操作
入队的操作很简单,要注意的是:
只在发生竞争时才初始化CLH队列
将入队的节点通过CAS设置到tail
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize// 队列只在发生锁竞争的时候创建,这里初始化队列的头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
自旋锁、锁等待与线程挂起
/*** 独占模式下线程获取资源的入口。如果获取到资源,线程直接返回。否则进入等待队列,直到获取到资源为止,整个过程忽略中断,并不会对线程的中断状态进行及时响应,而是在获取到资源之后,恢复线程的中断状态,直接返回。由上层应用程序觉得如何处理中断* Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}/*** 通过死循环,实现线程的自旋过程。如果当前线程的前驱结点是队首,那么尝试获取资源,获取到则线程返回。如果当前线程的前驱结点不是队首或者获取资源失败,先检查前驱结点状态,当前驱结点状态为Node.SIGNAL,挂起当前线程;如果前驱结点状态 > 0说明前驱结点被取消,需要维护队列,将当前节点的前驱结点指向到有效节点上;如果前驱结点的状态为0或者Node.PROPAGATE时,将被修改为Node.SIGNAL,当前线程进入下一个循环,下个循环中,如果前驱结点状态为Node.SIGNAL,将挂起当前线程,进入WAITING状态,等待前驱结点唤醒或者线程被中断;而且不会立即响应中断,而是在获取到资源后将中断状态返回* Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 如果获取不到资源,当前线程将在队列上阻塞等待。阻塞由LockSupport支持来实现,如果在等待过程中线程发生中断不会立即响应,而是在获取到资源之后,重新设置上中断状态。由上层应用代码来决定如何处理if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 在不支持中断和无限期等待的情况下,不会走到这一步if (failed)cancelAcquire(node);}}/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
解锁主流程
释放共享资源
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/// AbstractQueuedSynchronizer#releasepublic final boolean release(int arg) {// 调用子类同步器实现的tryRelease释放共享资源if (tryRelease(arg)) {Node h = head;// 如果发生过锁竞争,等待队列不为空,且当前线程的后继节点线程需要被唤醒(后继节点入队时,会修改前驱结点的waitStatus = Node.SIGNAL)if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// Sync#tryReleaseprotected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 重入体现在这里,只有当state = 0时才释放锁。意味着lock、unlock要成对出现。否则将无法正确释放锁if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}/*** Wakes up node's successor, if one exists.** @param node the node*/// AbstractQueuedSynchronizer#unparkSuccessorprivate void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 为什么从后往前遍历?因为节点入队时,是通过CAS的方式设置tail节点后,再设置此前tail.next的值。这两个操作不是原子的,可能出现链表中插入了节点,但是遍历不到最后一个节点。具体可以看看enq(Node node)方法for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
取消获取资源
对于取消获取资源的操作,发生在中断或者timeout的场景中。
实际上,取消等待就是将需要取消的节点从CLH队列中出队。分三种情况讨论:
当前节点node是tail节点,直接出队即可。设置node前驱结点的next为null
当前节点node不是tail也不是head的后继节点:需要更改前驱结点的状态为Node.SIGNAL,前驱结点的next为node.next
当前节点时head的后继节点,直接唤醒当前节点的后继节点线程。由该线程进行当前节点node的出队。体现在AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法上
/*** Cancels an ongoing attempt to acquire.** @param node the node*/private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessorsNode pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// 进行出队操作,分三种场景考虑// If we are the tail, remove ourselves.// 场景一:当前节点是尾结点,很好处理,直接出队即可if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.// 场景二:当前节点node不是tail而且不是head的后继节点。int ws;if (pred != head &&// 修改node前驱结点状态为SIGNAL,并设置前驱结点的next为node.next((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 场景三:node是head的后继节点,直接唤醒node的后继节点unparkSuccessor(node);}node.next = node; // help GC}}
延伸阅读
版权声明: 本文为 InfoQ 作者【指尖流逝】的原创文章。
原文链接:【http://xie.infoq.cn/article/d441c999adf8b62b4558d2805】。文章转载请联系作者。
指尖流逝
还未添加个人签名 2017.10.17 加入
还未添加个人简介
评论