关注“Java 后端技术全栈”**
回复“000”获取大量电子书
生活案例
停车场:停车场只有 2 个车位,即同时只能容纳 2 辆车,车辆都是停一会再走的,如何保证同一时刻最多有 2 个车停在停车位?请用代码实现。
女厕所:女厕所里只有五个位置,即最多只能有五位女性同时上厕所,如何保证同一时刻最多有五位女性在上厕所?请用 java 代码实现。(原本就只想用厕所来说,没想用女厕所来举例,但是之前遇到杠精,说男厕可以两个人一起上,无比尴尬)。
多线程读某个文件:实现一个文件允许同一时刻的并发访问数。
Semaphore 入场
上面的两个案例可以用 JDK1.5 出的 Semaphore 来实现。实现停车场案例后,第二个案例也就很轻松实现了。公司大楼来了 6 辆车,怎么办?保安大哥手里只有两张停车卡(一个车位一张卡)。
import java.util.Random;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
//车位个数
private static final int PARK_LOT = 2;
//车辆数
private static final int CARS = 6;
private static Semaphore semaphore = new Semaphore(PARK_LOT);
private static void park() {
for (int i = 1; i <= CARS; i++) {
int finalI = i;
new Thread(() -> {
try {
//看看有没有空车位
if (semaphore.availablePermits() == 0) {
System.out.println("第" + finalI + "辆司机看了看,哎,还没有空停车位,继续排队");
}
//尝试进入停车位
semaphore.acquire();
System.out.println("第" + finalI + "成功进入停车场");
Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间
System.out.println("第" + finalI + "驶出停车场");
//离开停车场
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
public static void main(String[] args) {
park();
}
}
复制代码
运行输出
四辆车等待,两辆车进入停车位。然后后面就是出来一辆进入一辆,这个停车场确实保证了任何时可最多可以停两辆车了。
可能前面排队等着别人开出来,可是有的人不按规矩出牌,看到有车位一下就进去把车位抢了。所以这个停车也分公平和非公平的。
整个过程:
假设上面的场景是:司机们能看到车位是否为空,而且只能看到自己前面的车,更前面的车看不见。
通常有六种做法:
1.看到有空车位,便直接开过去、不用排队了(非公平)
2.看到有车位了,还得看看前面还有没有车在排队(公平)
3.不然,就一直看车位是否为空,以及自己前面还有没有车(死循环--自旋)
4.能够停车了,尝试往空车位开去,但却被插队(不公平)的那家伙抢了车位(CAS)
5.有时候保安也会说:今天车位满了,不让停进来了,回去吧(中断)
6.保安叫你回去了,你却傻傻地不走,想再等等看(中断不响应)
7.汽车从车位上离开,每离开一个,空车位就多一个(资源释放)
以上是自己对生活和技术的理解,如有不对的地方希望指正。
深入 Semaphore
Semaphore 翻译
Semaphore 是 Doug Lea 在 JDK1.5 时候搞出来的。Semaphore 是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。
UML 类图
方法和主要属性
上面案例中我们一共使用了如下几个关键方法:
new Semaphore(2);
availablePermits()
acquire();
release();
复制代码
构造方法
从构造方法new Semaphore(PARK_LOT)
为入口,咱们一步一步把这个 Semaphore 的源码给看一遍,然后梳理一下,最后总结一下。
//两个车位 permits=2
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码
有两个构造方法,并且这里我们看到了两个熟悉的面孔:
FairSync 公平的
NonfairSync 非公平的
上面的案例中,使用的是一个单个参数的构造方法,即此时使用的是非公平的。就想上面停车的场景,你排队在前面也没用,后面的直接插队,停到车位上去了。
我们来看看这个非公平 NonfairSync 是怎么实现的
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
//两个车位 permits=2
NonfairSync(int permits) {
super(permits);
}
}
复制代码
这里面什么逻辑,通过 super(permits)到父类中
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//两个车位 permits=2
Sync(int permits) {
//就是给AQS设置state=2
setState(permits);
}
//返回AQS中state的值
final int getPermits() {
return getState();
}
}
复制代码
构造方法就是创建一个 Semaphore 对象,给 AQS 中的 state 赋值。
availablePermits() 方法--检测是否有可用凭证(资源)
司机看看有没有空车位
semaphore.availablePermits()
复制代码
方法的源码
public int availablePermits() {
return sync.getPermits();
}
复制代码
其实这里就是获取 AQS 中的 sate 的值。如果 state==0 则证明已满。否则 semaphore.acquire();
acquire 方法--中断式
在 Semaphore 中获取资源的有两种方式:中断式和不中断式。
当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于 0,则当前信号量的计数会减 1,然后该方法直接返回。
否则如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。
当其他线程调用了当前线程的 interrupt()方法中断了当前线程时,则当前线程会抛出 InterruptedException 异常返回。
其中 acquire()为中断式
public void acquire() throws InterruptedException {
//入参是证明只需要一个信号
sync.acquireSharedInterruptibly(1);
}
复制代码
是 AQS 中的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断是否中断
if (Thread.interrupted()) throw new InterruptedException();
//tryAcquireShared 和 doAcquireSharedInterruptibly 方法
if (tryAcquireShared(arg) < 0){
doAcquireSharedInterruptibly(arg);
}
}
//空方法,有子类去实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
复制代码
tryAcquireShared 方法
该方法是在 AQS 中没有实现,是一个空方法,这里在 Semaphore 中有两个实现类
由于我们前面使用就是非公平模式,所以这里我们进入NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//这方法也没有什么逻辑,直接调用nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
复制代码
而 nonfairTryAcquireShared 方法则是在其父类里实现的
abstract static class Sync extends AbstractQueuedSynchronizer {
//acquires=1
final int nonfairTryAcquireShared(int acquires) {
//死循环--自旋
for (;;) {
//获取AQS中的state,就是我们前面给的permits=2
//也称之为获取剩余许可数
int available = getState();
int remaining = available - acquires;
// 剩余的许可小于0或者比较设置成功
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
这里是一个 CAS 自旋。因为 Semaphore 是一个共享锁,可能有多个线程同时申请共享资源,因此 CAS 操作可能失败。直到成功获取返回剩余资源数目,或者发现没有剩余资源返回负值代表申请失败。在这里我们看看公平模式
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
//公平模式多了这个方法
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
复制代码
公平模式下的 tryAcquireShared 方法在试图获取之前做了一个判断,如果发现等对队列中有线程在等待获取资源,就直接返回-1 表示获取失败。当前线程会被上层的 acquireSharedInterruptibly 方法调用 doAcquireShared 方法放入等待队列中。这正是“公平”模式的语义:如果有线程先于我进入等待队列且正在等待,就直接进入等待队列,效果便是各个线程按照申请的顺序获得共享资源,具有公平性。
hasQueuedPredecessors 这个方法我们在之前的 ReentranLock 文章中说过了,这里就不在多说了。
doAcquireSharedInterruptibly 方法
这个方法我们在之前的 Reentranlock 文章中也都讲过了快速掌握并发编程---细说ReentrantLock和AQS,在这里的主要是当获取锁失败后,他就是将当前线程放入等待队列并开始自旋检测获取资源。
acquireUnInterruptibly 方法-- 不中断式
该方法与 acquire()方法类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源时(包含被阻塞后),其他线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出 InterruptedException 异常而返回。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
release()方法--释放资源
不管是在 ReentranLock 还是 Semaphore 里,释放锁和释放资源都不会公平性。
该方法的作用是把当前 Semaphore 对象的信号量值增加 1,如果当前有线程因为调用 aquire 方法被阻塞而被放入了 AQS 的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
public void release() {
//注意入参是1
sync.releaseShared(1);
}
//释放掉资源后,唤醒后继
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
而在这个方法中一共调用了两个方法tryReleaseShared
和doReleaseShared
tryReleaseShared
tryReleaseShared
方法的实现
既然不分公平和不公平,那么这个实现类就肯定是在他们两的父类里实现的,点进去,果然是在java.util.concurrent.Semaphore.Sync
中实现的
protected final boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
int current = getState();
//记得前面aquire方法是state-1
//而这里是+1,表示把凭证归还到池子里了,下一个人来就可以获取凭证了
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS设置state的值=之前state的值+release
if (compareAndSetState(current, next))
return true;
}
}
复制代码
doReleaseShared
此方法主要用于唤醒后继
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
希望结合源码的梳理和文章前面的说的几种做法,能让您更好地理解 Semaphore。
总结
核心知识点:利用了 AQS 中的 state 和同步阻塞队列、CAS、死循环
关注公众号“Java 后端技术全栈”
**每天解锁一本电子书
评论