写点什么

快速掌握并发编程 ---Semaphore 原理和实战

用户头像
田维常
关注
发布于: 2020 年 11 月 02 日

关注Java 后端技术全栈”**


回复“000”获取大量电子书


生活案例


停车场:停车场只有 2 个车位,即同时只能容纳 2 辆车,车辆都是停一会再走的,如何保证同一时刻最多有 2 个车停在停车位?请用代码实现。


女厕所:女厕所里只有五个位置,即最多只能有五位女性同时上厕所,如何保证同一时刻最多有五位女性在上厕所?请用 java 代码实现。(原本就只想用厕所来说,没想用女厕所来举例,但是之前遇到杠精,说男厕可以两个人一起上,无比尴尬)。


多线程读某个文件:实现一个文件允许同一时刻的并发访问数。


Semaphore 入场


上面的两个案例可以用 JDK1.5 出的 Semaphore 来实现。实现停车场案例后,第二个案例也就很轻松实现了。公司大楼来了 6 辆车,怎么办?保安大哥手里只有两张停车卡(一个车位一张卡)。


import java.util.Random;import java.util.concurrent.Semaphore;public class SemaphoreDemo {    //车位个数    private static final int PARK_LOT = 2;    //车辆数    private static final int CARS = 6;    private static Semaphore semaphore = new Semaphore(PARK_LOT);    private static void park() {        for (int i = 1; i <= CARS; i++) {            int finalI = i;            new Thread(() -> {                try {                    //看看有没有空车位                    if (semaphore.availablePermits() == 0) {                        System.out.println("第" + finalI + "辆司机看了看,哎,还没有空停车位,继续排队");                    }                    //尝试进入停车位                    semaphore.acquire();                    System.out.println("第" + finalI + "成功进入停车场");                    Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间                    System.out.println("第" + finalI + "驶出停车场");                    //离开停车场                    semaphore.release();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }).start();        }    }    public static void main(String[] args) {        park();    }}
复制代码


运行输出



四辆车等待,两辆车进入停车位。然后后面就是出来一辆进入一辆,这个停车场确实保证了任何时可最多可以停两辆车了。


可能前面排队等着别人开出来,可是有的人不按规矩出牌,看到有车位一下就进去把车位抢了。所以这个停车也分公平和非公平的。


整个过程:





假设上面的场景是:司机们能看到车位是否为空,而且只能看到自己前面的车,更前面的车看不见。


通常有六种做法:


1.看到有空车位,便直接开过去、不用排队了(非公平

2.看到有车位了,还得看看前面还有没有车在排队(公平

3.不然,就一直看车位是否为空,以及自己前面还有没有车(死循环--自旋

4.能够停车了,尝试往空车位开去,但却被插队(不公平)的那家伙抢了车位(CAS

5.有时候保安也会说:今天车位满了,不让停进来了,回去吧(中断

6.保安叫你回去了,你却傻傻地不走,想再等等看(中断不响应

7.汽车从车位上离开,每离开一个,空车位就多一个(资源释放


以上是自己对生活和技术的理解,如有不对的地方希望指正。


深入 Semaphore


Semaphore 翻译



Semaphore 是 Doug Lea 在 JDK1.5 时候搞出来的。Semaphore 是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。


UML 类图



方法和主要属性



上面案例中我们一共使用了如下几个关键方法:


new Semaphore(2);availablePermits() acquire();release();
复制代码


构造方法


从构造方法new Semaphore(PARK_LOT)为入口,咱们一步一步把这个 Semaphore 的源码给看一遍,然后梳理一下,最后总结一下。


 //两个车位 permits=2   public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }
复制代码


有两个构造方法,并且这里我们看到了两个熟悉的面孔:


FairSync 公平的

NonfairSync 非公平的


上面的案例中,使用的是一个单个参数的构造方法,即此时使用的是非公平的。就想上面停车的场景,你排队在前面也没用,后面的直接插队,停到车位上去了。


我们来看看这个非公平 NonfairSync 是怎么实现的


 static final class NonfairSync extends Sync {        private static final long serialVersionUID = -2694183684443567898L;        //两个车位 permits=2        NonfairSync(int permits) {            super(permits);        }    }
复制代码


这里面什么逻辑,通过 super(permits)到父类中


 abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 1192457210091910933L;        //两个车位 permits=2        Sync(int permits) {            //就是给AQS设置state=2            setState(permits);        }        //返回AQS中state的值        final int getPermits() {            return getState();        }}
复制代码


构造方法就是创建一个 Semaphore 对象,给 AQS 中的 state 赋值。


availablePermits() 方法--检测是否有可用凭证(资源)


司机看看有没有空车位


semaphore.availablePermits()
复制代码


方法的源码


 public int availablePermits() {        return sync.getPermits();  }
复制代码


其实这里就是获取 AQS 中的 sate 的值。如果 state==0 则证明已满。否则 semaphore.acquire();


acquire 方法--中断式


在 Semaphore 中获取资源的有两种方式:中断式和不中断式。


  1. 当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于 0,则当前信号量的计数会减 1,然后该方法直接返回。

  2. 否则如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。

  3. 当其他线程调用了当前线程的 interrupt()方法中断了当前线程时,则当前线程会抛出 InterruptedException 异常返回。


其中 acquire()为中断式


public void acquire() throws InterruptedException {    //入参是证明只需要一个信号    sync.acquireSharedInterruptibly(1);}
复制代码


是 AQS 中的方法


 public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        //判断是否中断        if (Thread.interrupted()) throw new InterruptedException();        //tryAcquireShared 和 doAcquireSharedInterruptibly 方法        if (tryAcquireShared(arg) < 0){            doAcquireSharedInterruptibly(arg);        }    }    //空方法,有子类去实现    protected int tryAcquireShared(int arg) {        throw new UnsupportedOperationException();    }
复制代码


tryAcquireShared 方法


该方法是在 AQS 中没有实现,是一个空方法,这里在 Semaphore 中有两个实现类



由于我们前面使用就是非公平模式,所以这里我们进入NonfairSync


 static final class NonfairSync extends Sync {        private static final long serialVersionUID = -2694183684443567898L;        NonfairSync(int permits) {            super(permits);        }        //这方法也没有什么逻辑,直接调用nonfairTryAcquireShared        protected int tryAcquireShared(int acquires) {            return nonfairTryAcquireShared(acquires);        }    }
复制代码


而 nonfairTryAcquireShared 方法则是在其父类里实现的


 abstract static class Sync extends AbstractQueuedSynchronizer {               //acquires=1        final int nonfairTryAcquireShared(int acquires) {            //死循环--自旋            for (;;) {                //获取AQS中的state,就是我们前面给的permits=2                //也称之为获取剩余许可数                int available = getState();                int remaining = available - acquires;                // 剩余的许可小于0或者比较设置成功                if (remaining < 0 || compareAndSetState(available, remaining))                    return remaining;            }        }
复制代码


这里是一个 CAS 自旋。因为 Semaphore 是一个共享锁,可能有多个线程同时申请共享资源,因此 CAS 操作可能失败。直到成功获取返回剩余资源数目,或者发现没有剩余资源返回负值代表申请失败。在这里我们看看公平模式


 static final class FairSync extends Sync {        protected int tryAcquireShared(int acquires) {            for (;;) {                //公平模式多了这个方法                if (hasQueuedPredecessors())                    return -1;                int available = getState();                int remaining = available - acquires;                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }    }
复制代码


公平模式下的 tryAcquireShared 方法在试图获取之前做了一个判断,如果发现等对队列中有线程在等待获取资源,就直接返回-1 表示获取失败。当前线程会被上层的 acquireSharedInterruptibly 方法调用 doAcquireShared 方法放入等待队列中。这正是“公平”模式的语义:如果有线程先于我进入等待队列且正在等待,就直接进入等待队列,效果便是各个线程按照申请的顺序获得共享资源,具有公平性。


hasQueuedPredecessors 这个方法我们在之前的 ReentranLock 文章中说过了,这里就不在多说了。


doAcquireSharedInterruptibly 方法


这个方法我们在之前的 Reentranlock 文章中也都讲过了快速掌握并发编程---细说ReentrantLock和AQS,在这里的主要是当获取锁失败后,他就是将当前线程放入等待队列并开始自旋检测获取资源。


acquireUnInterruptibly 方法-- 不中断式


该方法与 acquire()方法类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源时(包含被阻塞后),其他线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出 InterruptedException 异常而返回。


 public void acquireUninterruptibly() {        sync.acquireShared(1);    }    public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }
复制代码


release()方法--释放资源


不管是在 ReentranLock 还是 Semaphore 里,释放锁和释放资源都不会公平性。


该方法的作用是把当前 Semaphore 对象的信号量值增加 1,如果当前有线程因为调用 aquire 方法被阻塞而被放入了 AQS 的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。


 public void release() {        //注意入参是1        sync.releaseShared(1);    }   //释放掉资源后,唤醒后继    public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }
复制代码


而在这个方法中一共调用了两个方法tryReleaseShareddoReleaseShared


tryReleaseShared


tryReleaseShared方法的实现



既然不分公平和不公平,那么这个实现类就肯定是在他们两的父类里实现的,点进去,果然是在java.util.concurrent.Semaphore.Sync中实现的


 protected final boolean tryReleaseShared(int releases) {            //自旋            for (;;) {                int current = getState();                //记得前面aquire方法是state-1                //而这里是+1,表示把凭证归还到池子里了,下一个人来就可以获取凭证了                int next = current + releases;                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                //CAS设置state的值=之前state的值+release                if (compareAndSetState(current, next))                    return true;            }        }
复制代码


doReleaseShared


此方法主要用于唤醒后继


 private void doReleaseShared() {        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }
复制代码


希望结合源码的梳理和文章前面的说的几种做法,能让您更好地理解 Semaphore。


总结


核心知识点:利用了 AQS 中的 state 和同步阻塞队列、CAS、死循环


关注公众号“Java 后端技术全栈”


**每天解锁一本电子书





发布于: 2020 年 11 月 02 日阅读数: 32
用户头像

田维常

关注

关注公众号:Java后端技术全栈,领500G资料 2020.10.24 加入

关注公众号:Java后端技术全栈,领500G资料

评论

发布
暂无评论
快速掌握并发编程---Semaphore原理和实战