Java 并发(十):独占式、共享式的获取与释放
下面就画一下流程图吧
具体的细节还是回看 ReentrantLock
释放的代码如下
释放成功,就会去唤醒后面的线程,进而使后续结点可以重新尝试获取同步状态
共享式同步状态获取与释放
共享式与独占式获取的最主要的区别就是在于同一时刻能否有多个线程同时获取到锁,也就是获取到同步状态
以 InnoDB 的共享锁与排他锁为例
共享锁其实就是共享式同步状态,允许同时进行读取
排他锁就是独占式同步状态,一个事务在写
的时候,不允许其他事务进行写甚至读
共享式同步
AQS 也支持共享式同步状态的操作
共享式同步状态获取的实现源码为 acquireShared 方法
步骤如下
调用 tryAcquireShared,即判断能否去获得共享锁
如果可以获得共享锁,调用 doAcquireShared 方法
可以看到,其是一个抽象方法,这里用到的设置模式为模板模式
由于这里只讲 AQS,这里就先不细讲
只有当 tryAcquireShared 返回值小于 0 的时候,才会进入这个方法,而 tryAcquire 的返回值对应的结果如下
小于 0:获取共享锁失败
等于 0:获取共享锁成功,但后面线程不可以获得共享锁
大于 0:获取共享锁成功,且后面线程可以继续获得共享锁
可以看出,只有获取共享锁失败,才会进入这个方法,如果获取共享锁失败,那么就要自旋,而且还要通知后面的线程无法获取
源码如下,细节跟 AQS 的 acquireQueued 很像(acquireQueued 就是入队自旋的)
private void doAcquireShared(int arg) {
//将获取共享锁失败的结点添加进底层线程队列中
final Node node = addWaiter(Node.SHARED);
//定义一个 failed 变量
//定义线程是否没有出错
boolean failed = true;
try {
//定义一个变量
//标记线程是否被挂起
boolean interrupted = false;
//死循环
for (;;) {
//获取当前结点在队列里的上一个结点
final Node p = node.predecessor();
//如果上一个结点是头结点
if (p == head) {
//头结点是正在执行的结点
//此时又尝试去获取共享锁,自旋去获取
int r = tryAcquireShared(arg);
//如果获取共享锁成功
//这里如果大于等于 0,
//肯定是获取共享锁成功的,但不一定锁可以继续被后续线程获取
//可以看到,一旦获取共享锁成功,最后面就会直接 return
//但也有个例外,
//假如这个线程自旋太多次,被挂起了,然后又被唤醒了,获取到了锁,
//但被唤醒是因为别的线程去中止他,那么最后将会被中止
if (r >= 0) {
//将头结点设置为当前结点(队列的头结点为拥有锁的结点)
//即使共享锁可以被多个线程拥有
//也要遵守队列中只能有正在排队的结点
//所以获得共享锁的线都会被设置为头结点
//然后清掉里面的 thread
setHeadAndPropagate(node, r);
//断开之前的头结点
p.next = null; // help GC
//如果该线程此前是 Park 的,然后是被其他线程 interrupt 唤醒的
if (interrupted)
//将其挂起
//此时就挂在这里了
//线程就被中止了
selfInterrupt();
//当前线程正常执行
failed = false;
//直接返回
return;
}
}
//如果走到这里,就判断获取乐观锁失败
//这一步是判断是否还需要继续自旋
//如果不需要自旋,代表要被 park 掉
//后面就会调用 parkAndCheckInterrupt 将线程挂起终止
//这里与 ReentrantLock 一样,也是自旋两次
//如果判断要被挂起了,
//就让线程进入等待状态,并且此时如果有其他线程中止当前线程
//就会将 interrupted 修改为 true
//再下一轮的抢锁,如果抢到锁,就会被终止,然后释放锁
//parkAndCheckInterrupt 会将线程 Park
//并且判断唤醒后的线程是不是被别人 interrupt 唤醒的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果有其他线程尝试终止这个线程
//当线程唤醒回来时,会先修改 interrupted 为 true
interrupted = true;
}
} finally {、
//如果出现异常
//就在这里取消获取锁
if (failed)
cancelAcquire(node);
}
}
总结一下整个步骤
循环去判断上一个是不是头结点(相当于看前面的人多不多)
如果是头结点,那么就可以再进行抢锁
如果成功抢到锁,返回上一层
如果不是头结点,那么就判断前面的人是否已经休眠了,如果已经休眠,自己也休眠
shouldParkAfterFailedAcquire 方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果是休眠状态,返回 true,代表可以 park
if (ws == Node.SIGNAL)
/*
This node has already set status asking a release
to signal it, so it can safely park.
*/
return true;
//如果是取消状态
//底层队列跳过这个结点
if (ws > 0) {
/*
Predecessor was cancelled. Skip over predecessors and
indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//如果是 0 或者 propagate 方法
//单纯只是将状态改为休眠,并没有实际 park 掉
//因为最后返回了 false
else {
/*
waitStatus must be 0 or PROPAGATE. Indicate that we
need a signal, but don't park yet. Caller will need to
retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这个方法就是判断要不要继续去等待,需不需要 Park
如果前面的线程是状态 Signal,代表前面的已经 park 了,所以自己也要 Park
如果前面的线程不是 Signal(是 0 或者 Propagate),那么就将前面的线程改为 Signal,再 CAS 多一次
如果前面的线程是跳过的,那么就维护底层队列,将前面的线程删掉
这个方法检查后面的结点是否也可以拥有共享锁,tryAcquireShared 返回值如果为负值,代表当前线程获取共享锁失败;如果为 0,代表当前线程获取共享锁成功,但后续线程不能获取;如果为正值,代表当前线程获取共享锁成功,且后续线程也可以获取共享锁
源码如下
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//setHead 这个方法前面已经讲过
//这个方法只是简单地将头结点设置成当前结点,但并不保留里面的线程
setHead(node);
//如果 tryAcquireShared 的返回值为正值
//即 propagete > 0,那么即当前线程获取共享锁成功,并且后续线程也可以获得共享锁
//判断 h == null 是因为线程入队时,会有一个 null 问题,此处是解决 Null 问题的
//然后判断头结点的线程状态是否允许共享锁传播
//其实判断 waitStatus 状态小于 0,是会唤醒一些不必要的线程
//因为只有 waitStatus == -3 即,为 Propagate 时,才是允许传播
//但这里是直接判断小于 0,那么有一些线程休眠了,状态为-1 也是会唤醒的
//不过我这里没看懂的是,为什么后面又来了一次判断头结点?
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取下一个结点
Node s = node.next;
//如果下一个结点为空,或者也是来获取共享锁的
if (s == null || s.isShared())
//唤醒后续结点并且保证共享锁传播
doReleaseShared();
}
}
我们先来看一下 isShared 方法
这个方法是 Node 结点里面的方法,判断当前线程是否在以共享模式等待,即是否等待获取共享锁(通过判断其 nextWaiter 状态)。
此前疏忽了在 addWaiter 里面的对于 Node 使用的构造方法
可以看到,这个构造方法,就一直在维护着所有进来的线程等待的状态,而共享状态也是在这里维护着的(反过来,独占状态也要加入队列,那么独占状态也会在这里存着)
总结一下,setHeadAndPropagate
这个方法是当线程拿到锁时,那么自己就要跳出那个队列了(成为头结点)
跳出队列,执行 setHead 方法,去成为头结点
传播状态,唤醒后面的结点,唤醒第一个正在 Park 的结点,让他去可以 CAS 抢锁
接下来我们看看这个方法是干了什么
这个方法是唤醒后续的结点然后保证共享锁传播的
private void doReleaseShared() {
//死循环 CAS,也就是自旋
for (;;) {
//获取头结点
Node h = head;
//判断队列中是否拥有结点
//如果没有那就代表队列里面没有线程等待
//不需要唤醒
if (h != null && h != tail) {
//获取头结点线程的状态
int ws = h.waitStatus;
//如果是正在休眠的,代表后续线程需要被唤醒
if (ws == Node.SIGNAL) {
//循环 CAS 修改其状态为正常
//因为从 shouldParkAfterFailedAcquire 方法里面知道
//只有前面一个线程为 0 或者 propagate 才能继续自旋
//否则就会被 Park 掉
//所以这里改成 0,否则一唤醒马上就被 Park 掉了
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//如果 CAS 修改成功,那么就唤醒后续线程
//这里唤醒的是最先的一个处于休眠,或者处于无条件传播共享状态的线程
//也就是唤醒后续线程
//注意这里是一个个唤醒的
//一个线程获得锁,就唤醒后面的一个线程
unparkSuccessor(h);
}
//如果头结点是正常执行的状态的
//代表后面的一个线程已经被唤醒了
//那么这个头结点线程就要变成另一个状态
//那么就将其状态修改为无条件传播共享状态
//因为当后面的线程如果获取锁失败
//CAS 一次之后,就会将前面的线程状态给设置成休眠
//然后后面的线程又会被 Park 掉
//这里将状态改为 Propagate
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点改变,那么就要重新进行
//如果此时又有一个线程进来,获得了锁
//会去修改头节点,执行 SetHead 方法
//那么此时就要重新去唤醒第一个执行的线程
//如果头结点没变,那么结束循环
if (h == head) // loop if head changed
break;
}
}
总结一下 doReleaseShare(只有前面是头结点才会进入这里的)
判断队列里面是否有结点等待
如果没有结点等待,就不需要传播了
如果有结点等待且头结点状态为休眠的
先将头结点状态改成 0,让后一个结点可以继续自旋至多两次,而不会判断自己应该休眠
然后唤醒后一个正常的结点
如果有结点等待且头结点状态为 0 的
将头结点状态改成 Propagate,同样也是为了让后一个结点可以继续自旋至多两次
下面来说明一下这个 propagate 状态
这个状态有什么必要性吗?注释上说,如果第一个线程不是 Signal 状态,那么就需要变成 PropaGate 状态来确保可以继续传播,不过改为 0 可以自旋,改为 Propagate 也可以自旋,那么为什么要分开呢?
那我们就回看一下传播代码咯,传播代码就是 setHeadAndPropagate 方法嘛
这里再贴上来
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//setHead 这个方法前面已经讲过
//这个方法只是简单地将头结点设置成当前结点,但并不保留里面的线程
setHead(node);
//如果 tryAcquireShared 的返回值为正值
//即 propagete > 0,那么即当前线程获取共享锁成功,并且后续线程也可以获得共享锁
//判断 h == null 是因为线程入队时,会有一个 null 问题,此处是解决 Null 问题的
//然后判断头结点的线程状态是否允许共享锁传播
//其实判断 waitStatus 状态小于 0,是会唤醒一些不必要的线程
//因为只有 waitStatus == -3 即,为 Propagate 时,才是允许传播
//但这里是直接判断小于 0,那么有一些线程休眠了,状态为-1 也是会唤醒的
//不过我这里没看懂的是,为什么后面又来了一次判断头结点?
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取下一个结点
Node s = node.next;
//如果下一个结点为空,或者也是来获取共享锁的
if (s == null || s.isShared())
//唤醒后续结点并且保证共享锁传播
doReleaseShared();
}
}
评论