Java Core 「10」J.U.C 同步工具类 -2
Java Core 「9」J.U.C 同步工具类-1 中介绍了 java.util.concurrent 包中提供的控制线程间同步的锁工具。除了锁工具外,还有其他的同步机制,例如信号量机制。
01-Semaphore
A counting semaphore. Conceptually, a semaphore maintains a set of permits.
前面 J.U.C 同步工具类-1 中介绍 LockSupport 类时提到,其关联了一个许可。信号量可以看作是一个维护了多个许可的集合。信号量常用来限制同时访问某个资源的线程数量:
acquire,许可数减一,若无可用许可,则阻塞当前线程。
release,许可数加一,可能会唤醒已阻塞线程。
当信号量的值为 1 时,可把它当作是一个排他锁。不过需特别留意的是,信号量并未维护许可与获取许可的线程之间的关系。在介绍 ReentrantLock#unlock 方法时有提到,如果未获得锁的线程尝试释放锁,会抛 IllegalMonitorStateException 异常。但 Semaphore#release 并不会关心当前线程是否曾得到过许可。
Semaphore 也是基于 AQS 实现的。内部类 Semaphore#Sync 继承了 AbstractQueuedSynchronizer 并派生出两个子类:NonfairSync 和 FairSync。
初始化方法 Semaphore(int) 默认使用 NonfairSync,也可使用 Semaphore(int, boolean) 指定使用的底层实现。
Semaphore 还提供了 acquireUninterruptibly() / tryAcquire() / tryAcquire(long, TimeUnit) 实现。除此之外,Seamphore 中还提供了可同时获取/释放多个许可的 acquire(int) / acquireUninterruptibly(int) / tryAcquire(int) / tryAcquire(int, long, TImeUnit) / release(int) 方法。
为了更好地理解信号量,下面有几个问题可以体会下:
一个初始化为 5 的信号量,线程 A 调用 6 次 acquire 方法会怎么样?线程 A 第 6 次调用时会阻塞。
一个初始化为 5 的信号量,线程 A 调用 1 次 acquire 后,再调用 2 次 release,线程 B 调用 6 次 acquire 会怎么样?B 可以正常获取 6 次许可。release 会增加许可的数量,这个数量并不会受初始化时的数量所限制。
02-CountDownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CountDownLatch 可以被理解为一个一次性的计数器。CountDownLatch(int) 初始化计数器为 n,线程间可通过如下的接口同步:
await,调用此方法的线程会被阻塞,直至计数器为 0。await(long, TimeUnit) 是带超时版本的方法,当超时时间为 0 时,等同于 await();当超时时间大于 0 时,调用 await 方法的线程会被阻塞,直至以下三种情况之一发生:1)CountDownLatch 计数器值为 0;2)超时时间已过;3)当前线程(调用 await 的线程)被中断。
countDown,调用此方法的线程会将计数器的值减 1。
CountDownLatch 的典型应用场景之一是:一个或多个线程调用 await 方法等待,n 个其他线程调用 countDown 后,等待的线程被唤醒。
CountDownLatch 是一次性的计数器,即计数器值无法被重置。如果想要重置计数器,则可以选择使用 CyclicBarrier。
03-CyclicBarrier
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarrier 并非基于 AQS 实现。与 CountDownLatch 一样,CyclicBarrier 也是用于控制多个线程的工具。在初始化时可指定计数器值 CyclicBarrier(int)。而且,从名字中可以看出,CyclicBarrier 是可复用的。因此,其内部包含了一个 reset() 方法,将其计数器值重置为初始值。
await,与 CountDownLatch#await 方法的语义稍有区别。调用 await 的当前线程会阻塞,直至以下 5 种条件之一发生:1)所有线程均到达(即 n 个线程都调用了 await 方法,n 为 CyclicBarrier 初始化的计数器值);2)当前线程被中断;3)其他调用了 await 方法的线程被中断;4)调用了 await 方法的某个线程超时了(即其调用的是超市版本的 await);5)CyclicBarrier 被重置,即其他线程调用了 reset 方法。
CyclicBarrier 还包含了另外一个构造器 CyclicBarrier(int, Runnable)。最后一个到达线程除了会唤醒其他的所有阻塞的线程,还会执行初始化时的 Runnable 任务。
CyclicBarrier 与 CountDownLatch 对比:
前者可循环使用,后者为一次性(one-shot)
从计数方式上,前者累加,后者递减
两者都有控制多个线程等待同步,然后开始下一步动作的语义。
04-Phaser
A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
Phaser 是 JDK 1.7 时引入的。在上节介绍 CyclicBarrier 时提到,其初始化时会指定一个计数器值 n,当 n 个线程调用 await 之后(这些线程也被称为参与者,party),这些线程会先阻塞,然后等 n 个线程全都调用过 await 后(也称为抵达,arrive barrier),所有的参与者恢复行动。
在 Phaser 中,参与者的数量是可变的,可通过 register() / bulkRegister(int) 增加参与者的数量,通过 arriveAndDeregister() 减少参与者的数量。与 CyclicBarrier 一样,Phaser 是分代的(generation),每代都有一个数字(从 0 开始)与其关联,例如上图中的 phase-0 / phase-1 / phase-2。phase-0 到 phase-1 称为晋升(advance)。
CyclicBarrier 可以看作是一个参与者数量固定的 Phaser。
Phaser 中有一些概念需要理解下:
Registration,Phaser 中的参与者数量是可变的,可以随时注册,并在任何抵达点可以通过 arriveAndDeregister 撤销注册。
Synchronization,参与者可以调用两类方法:Arrival,arrive() 和 arriveAndDeregister() 方法不会阻塞,但会返回一个整型值,表示到达的 Phaser(例如上图中的 phase-0)。当最后的参与者到达 Phaser 后,可以执行特定的动作(类似于 CyclicBarrier 中最后到达的参与者执行的 Runnable 任务)。特定的动作可以通过重写 onAdvance(int, int) 方法实现。onAdvance 的返回值也控制着 Termination。Waiting,awaitAdvance(int) 方法,若参数不是当前的 phase-number 或 Phaser 已停止,则立即返回;否则,阻塞当前线程至 phase-number 晋升,返回值为晋升后的 phase-number。
Termination,Phaser 变成终止状态后,所有的同步方法(Synchronization 中提到方法)都会立即返回,不会等到 phase-number 晋升。终止状态后,任何的注册尝试都是无效的。onAdvance(int, int) 方法的返回值为 true 时,Phaser 进入到终止状态。
Tiering,Phaser 是可以分层次的,即可以有父子结构。
Monitoring,getRegisteredParties 可以获得参与者总数,getArrivedParties 可以获得到达当前 Phase 的参与者数量,getPhase 可以获得当前的 phase-number,getUnarrivedParties 获得剩余的参与者数。
05-Exchanger
A synchronization point at which threads can pair and swap elements within pairs.
Exchanger 用于在两个线程之间交换数据,线程 A 调用 exchange 方法后会阻塞至线程 B 调用 exchange,之后两个线程互相交换数据。
Exchanger 可以看作是“双工”的 SynchronousQueue。
下面是一个交换双方线程名的示例。
历史文章推荐
版权声明: 本文为 InfoQ 作者【Samson】的原创文章。
原文链接:【http://xie.infoq.cn/article/0b7860fa2a0046d04f7ddb05b】。文章转载请联系作者。
评论