话说 AQS

一、前言
什么是 AQS (AbstractQueuedSynchronizer)翻译过来叫抽象同步队列,
他是除 synchronized 以外的另一种同步机制
Lock 锁的实现 就依赖 AQS 后期会写 Lock 锁的使用及原理
AQS 的中心思想是:现场来了看一下共享资源是否空闲,如果共享资源空闲就上锁(修改状态位),等线程执行完业务代码就释放锁(状态位复位),其他线程来 如果共享资源有被上锁(状态位标志位占用),就进入等待队列监控状态位 一旦被复位 就去抢锁(争着修改状态位)
AQS 维护了一个 FIFO 双向队列 ,FIFO 就是先进先出 双向就是有 pre next 两个指向前后两个节点的属性
总结一句话 3+4 ===》 state+双向队列
AOS 提供了 2 中方式获取锁资源
一种是:独占锁 如: ReentrantLock
一种是:共享锁 如:CountDownLatch
二、DEMO
AQS 如果没有具体的实现类,DEMO 是没有意义的 , 我们先简单看一下里边常用的一些方法吧
1. 排队的时候 我们喜欢偷瞟美女 ,所以我们先看 NODE 他是 AbstractQueuedSynchronizer 的内部类,也就是平时排队的个体(有美女 有帅哥 )
static final class Node {
// 标记Node为共享模式
static final Node SHARED = new Node();
// 标记Node为独占模式
static final Node EXCLUSIVE = null;
// waitStatus 的几种状态
// 当前Node因为超时或中断被取消,进入该状态的节点不会再一次被阻塞
static final int CANCELLED = 1;
// node进入队列后 要确保前一个节点为SIGNAL 前一个节点释放的时候需要unpark进入队列的这个节点
static final int SIGNAL = -1;
//
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 还有 默认0 无状态
volatile int waitStatus;
// 双向队列 指向前一个个Node
volatile Node prev;
// 双向队列 指向后一个Nodesss
volatile Node next;
// 当前等待线程 !! Node的心脏
volatile Thread thread;
// 指向下一个在同一个condition里等待的node
// 或者一个特殊的值:SHARED
//conditions队列仅仅只能是独占模式 如果是共享模式 我们会用一个特殊的值标记SHARED
Node nextWaiter;
// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前一个Node
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 初始化head 和 共享标记的时候用
Node() {
}
// addWaiter用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// Condition 用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}2. 排队的时候大家要试图去问问收银员 是否到自己了 这时候 tryAcquire 上场了(没有插队)
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
* 尝试以独占锁模式去获取,他会查询state状态,如果允许他以独占锁模式获取 那就获取
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
* 如果获取失败,会把当前线程放到队列里 直到这个线程被其他释放线程唤醒
* 也就是说你排队呢 拿着手机看抖音小媚媚 前边的人结账完了 回头叫你一声
* 嘿!别看了 结账了
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument.
* 这个值一般情况下都是1
*
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* 如果获取成功 返回true
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}什么鬼 ? 就给我抛一个异常 ? AQS 只是一个思想,它的类里只有一个流程 没有具体实现 。
什么模式? 是不是跟”模板方法“很像
我们找一个实现吧 ReentrantLock#FairSync#tryAcquire 看一下类图
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
//锁方法
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
* tryAcquire的公平版,只有是递归调用或者是waiters或者是first的是会后才给获取权限
* 没有很懂:理解就是满足一定条件(公平)才可以获取锁
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态 这个就是两个核心中的一个 : 状态位state
// 这个直接调用的父类 AQS的getState()方法
// 继承的好处: 就是爸爸有的你也有
int c = getState();
// 如果是0 那就是木有人占有 这时候准备修改状态
if (c == 0) {
// 1.hasQueuedPredecessors: 判断是否有其他线程比当前线程等待时间长
// 2.compareAndSetState : CAS 线程安全的 设置state状态 0->acquires
//
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果state不等于0 看一下占用state的是不是当前线程
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程占用state 那就是重入了 state总数就+acquires
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果没有获取 就返回false
return false;
}
}3. tryAcquire 是获取锁 也就是去看看能不能到自己呢
比如你去逛街买烧饼,烧饼摊前排起了长龙,你后边看了一眼,轮不到自己,那怎么办 ?
为了爱吃烧饼的女朋友,当然是需要排队了
AbstractQueuedSynchronizer#acquire
/** * Acquires in exclusive mode, ignoring interrupts. * 申请独占锁,忽略中断 * Implemented by invoking at least once {@link #tryAcquire}, * returning on success. * Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. * 至少调用一次tryAcquire ,如果成功就得了,不成功就把当前线程放到等待队列里 * 核心2:双向等待队列出现了 */ public final void acquire(int arg) { // tryAcquire 看2例子 // addWaiter(独占方式加入队列) AQS 方法 // acquireQueued 这是一个死循环动作, 直到获取锁 也就是买到烧饼 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }AbstractQueuedSynchronizer#addWaiter
/** * Creates and enqueues node for current thread and given mode. * 你排队了,需要把你包装一下 因为你不再是你,你成了队伍中的一员 * 现实中可能会给你一个号码 或者给你一个宣传单 或者先给你点儿小吃吃着 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 包装 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 获取队列尾部元素 Node pred = tail; // 如果尾部元素不是null 那就追加 // 下边这个过程 就是队列尾部cas方式追加新元素 tail指向新元素 if (pred != null) {X // 当前插入的node的前一个node设置为tail node.prev = pred; // cas把tail设置为刚添加的元素 if (compareAndSetTail(pred, node)) { // 老tail 的下一个元素 指向新node pred.next = node; return node; } } // 如果是null 那就一般情况下是需要初始化队列 enq(node); return node; } /** * Inserts node into queue, initializing if necessary * 插入node到队列,如果有必要,会进行队列初始化 * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { // 死等 直到成功为止 for (;;) { Node t = tail; // 如果是null 初始化 if (t == null) { // 设置一个新Node为head 并且tail也指向这个node if (compareAndSetHead(new Node())) tail = head; } else { // 设置node上一个元素为t(刚添加的空node) node.prev = t; // cas设置tail为node (来排队的人) if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * 在queue中的线程以独占并且不中断模式申请锁 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 中断设置为false boolean interrupted = false; for (;;) { //获取当前Node的前一个Node final Node p = node.predecessor(); // 如果前一个Node是head,该小强买了 来1个烧饼 if (p == head && tryAcquire(arg)) { // 设置小强为head 这样他的下一个人就知道可以轮到他了 setHead(node); // 加快gc p.next = null; // help GC failed = false; return interrupted; } // 如果没有获取 接着来 //1. shouldParkAfterFailedAcquire 检查状态 如果状态合适就park //2. parkAndCheckInterrupt 稍等片刻 //3. 等着被unpark吧 坐等前边人买完了叫你 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 失败 因为某种异常结束了 但是没有获取成功 // 取消获取锁的这个资格 // 比如小月月肚子疼 那就让他走吧 没资格吃烧饼了 // 这个方法就不看了 主要就是吧小月月的前一个人的next 设置为 小月月的后一个人 // 然后把小月月的后一个人的prev设置为小月月的前一个人 // 类比:链表的删除操作 if (failed) cancelAcquire(node); } } /** * Checks and updates status for a node that failed to acquire. * 检查并设置获取锁失败的node的状态 * Returns true if thread should block. This is the main signal * 如果node需要阻塞 返回true * control in all acquire loops. Requires that pred == node.prev. * * @param pred 前一个节点 * @param node 当前加入的节点 * @return {@code true} 如果可以阻塞 返回true */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // if (ws == Node.SIGNAL) // SIGNAL 状态 可安全park return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 状态大于0 那就是被取消了 需要往前找一个合法的node * 也就是下月月肚子疼要走了,需要让他前边的那个人的后一个人变成小月月后边的那个人 * 类似于删除链表节点 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 将合法的node的next设置为当前node pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 设置状态为 为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Convenience method to park and then check if interrupted * PARK 并检查是否是终端 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // park 也是阻塞的一种 类比wait LockSupport.park(this); // 需要看一下 park过程中有没有被中断 return Thread.interrupted(); }4. new Node() -> tryAcquire ->tryRelease 小强-->排队买到饼-->走开通知下一个人买饼
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 如果head不为空 并且waitStatus不是0 需要通知下一个Node // 买完了 告诉后边那个看抖音的人 该他买了 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // state-需要释放的数量 int c = getState() - releases; // 如果占有线程不是当前线程 抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // boolean free = false; // 如果c ==0 也就是释放完了 lock lock release release if (c == 0) { free = true; // 设置占有线程为null setExclusiveOwnerThread(null); } // 设置状态位 == 0 setState(c); return free; } /** * Wakes up node's successor, if one exists. * 如果存在下一个Node的话 唤醒它 * @param node the node */ private void unparkSuccessor(Node node) { // 先设置当前Node状态 为0 也就是说拜拜了 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 循环找一个合法的next // 小强买完了看看后那个人是不是合法 万一他是小月月,正闹肚子呢 已经取消了cancelled // 那肯定不唤醒他 让他拉粑粑去吧 接着看下月月的下一个是否合法 直到找见一个合法的 叫他来买饼 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒 unpark if (s != null) LockSupport.unpark(s.thread); }三、 假装学术讨论
本来挺清楚,看了看代码,感觉进山了,那句话说的好:只缘身在此山中
来一个山的全貌:
我看完了觉得 其实也没几件事
(1) 尝试获取锁
(2)如果自己是第一个来 初始化列表 获取锁
(3) 如果自己不是第一个来 加入队列等待被叫
(4)被叫醒 执行操作
(5)操作执行完 释放锁 叫下一个人执行
本文有很多图,如果不清楚的话,大家可以关注公众号:木子的昼夜
发送"aqs" 即可获得高清图访问地址
发送"路线" 即可获得本系列文章大纲
也可发送自己想问的问题给我,我会在看到的第一时间回复
最后附上自己公众号刚开始写 愿一起进步:
注意: 以上文字 仅代表个人观点,仅供参考,如有问题还请指出,立即马上连滚带爬的从被窝里出来改正。
版权声明: 本文为 InfoQ 作者【木子的昼夜】的原创文章。
原文链接:【http://xie.infoq.cn/article/00bd3b146330e3a213bc124e7】。文章转载请联系作者。
木子的昼夜
还未添加个人签名 2018.03.28 加入
还未添加个人简介











评论