写点什么

Java 并发编程—— Semaphore

用户头像
Antway
关注
发布于: 2021 年 06 月 07 日

1. 简介

前面我们针对 CountDownLatch 和 CyclicBarrier 进行了学习,CountDownLatch 用于帮助我们实现“倒计时”的功能,当 count=0 的时候则触发 await 的线程进行执行。而 CyclicBarrier 则适用于到达某一状态后接着执行下一步操作的情况。本次介绍的 Semaphore 是一个计信号数量。


Semaphore 通过 acquire()方法申请信号量,如果当前无信号量可用,则线程处于阻塞状态,如果有可用信号量,线程正常进行。release 方法用于释放一个“锁定占用”的信号量。


Semaphore 通常用于限制访问某些资源(物理或逻辑的)的线程数目。


下面看一个 jdk 中带的例子,该例子中展示了使用 Semaphone 控制数据的访问。


class Pool {    private static final int MAX_AVAILABLE = 100;    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); }
public void putItem(Object x) { if (markAsUnused(x)) available.release(); }
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached }
protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; }}
复制代码

2、Api 分析

Semaphore 提供了两个关键的方法 acquire()和 release()方法。分别用于申请信号量和释放信号量。


如下面示例代码所示:


public class FirstDemo {
static Semaphore semaphore = new Semaphore(2); /** * @param args */ public static void main(String[] args) { for(int i=0;i<4;i++){ new Thread(){ @Override public void run() { System.out.println(Thread.currentThread().getName() + " ready"); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " go"); semaphore.release(); } }.start(); } }}
复制代码


执行结果:


Thread-0 readyThread-0 goThread-2 readyThread-2 goThread-1 readyThread-3 readyThread-1 goThread-3 go
复制代码


从执行结果来看,最多只能连续执行 go 两次,间接的印证最多同时只有两个信号量在工作。补充一个特殊情况,当信号量 Semaphore = 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态,当=1 时表示其他线程可以获取,当=0 时,排他,即其他线程必须要等待。

3. 源码解析

在源码中,Semaphore 同样包含一个继承 AQS 的子类 Sync。


public class Semaphore implements java.io.Serializable {    private static final long serialVersionUID = -3222578661600680210L;    /** All mechanics via AbstractQueuedSynchronizer subclass */    private final Sync sync;
/** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) { setState(permits); }
final int getPermits() { return getState(); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } ....}
复制代码


通过使用 AQS 类的 state 值来存储信号量,原理类似于 CountDownLatch,都是通过 AQS 的 state 值进行计数。


创建一个 Semaphore 对象可以通过构造方法:


/**指定信号量个数*/public Semaphore(int permits) {    sync = new NonfairSync(permits);}
/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
复制代码


通过上面的构造函数,可以看到默认情况下 Semaphore 使用的非公平同步状态,同样也可以通过构造函数进行指定状态。


acquire()方法


public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) { super(permits); }
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}
复制代码


在 acquire 方法中,调用 sync 的 acquireSharedInterruptibly(1)方法,如果线程被 iterrupted 则抛出 InterruptedException 异常。通过 tryAcquireShared(arg) 获取状态值,最关键的代码是:


final int nonfairTryAcquireShared(int acquires) {    for (;;) {        int available = getState();        int remaining = available - acquires;        if (remaining < 0 ||            compareAndSetState(available, remaining))            return remaining;    }}
复制代码


获取当前的 state 值,如果可用 state 值-1 后小于 0,即当前信号量被占满,则返回一个负数,然后执行 doAcquireSharedInterruptibly()方法添加到等待队列。

4. 总结

无论 ReentrantLock 或 CyclicBarrier 内部都是基于 AQS 进行实现,实现的原理有所差异而已。

用户头像

Antway

关注

持续精进,尽管很慢 2019.05.27 加入

专注开源库

评论

发布
暂无评论
Java 并发编程—— Semaphore