写点什么

Java Core 「10」J.U.C 同步工具类 -2

作者:Samson
  • 2022 年 6 月 17 日
  • 本文字数:3826 字

    阅读完需:约 13 分钟

Java Core 「9」J.U.C 同步工具类-1 中介绍了 java.util.concurrent 包中提供的控制线程间同步的锁工具。除了锁工具外,还有其他的同步机制,例如信号量机制。

01-Semaphore

A counting semaphore. Conceptually, a semaphore maintains a set of permits.

前面 J.U.C 同步工具类-1 中介绍 LockSupport 类时提到,其关联了一个许可。信号量可以看作是一个维护了多个许可的集合。信号量常用来限制同时访问某个资源的线程数量:

  • acquire,许可数减一,若无可用许可,则阻塞当前线程。

  • release,许可数加一,可能会唤醒已阻塞线程。

当信号量的值为 1 时,可把它当作是一个排他锁。不过需特别留意的是,信号量并未维护许可与获取许可的线程之间的关系。在介绍 ReentrantLock#unlock 方法时有提到,如果未获得锁的线程尝试释放锁,会抛 IllegalMonitorStateException 异常。但 Semaphore#release 并不会关心当前线程是否曾得到过许可。

Semaphore 也是基于 AQS 实现的。内部类 Semaphore#Sync 继承了 AbstractQueuedSynchronizer 并派生出两个子类:NonfairSync 和 FairSync。



初始化方法 Semaphore(int) 默认使用 NonfairSync,也可使用 Semaphore(int, boolean) 指定使用的底层实现。

Semaphore 还提供了 acquireUninterruptibly() / tryAcquire() / tryAcquire(long, TimeUnit) 实现。除此之外,Seamphore 中还提供了可同时获取/释放多个许可的 acquire(int) / acquireUninterruptibly(int) / tryAcquire(int) / tryAcquire(int, long, TImeUnit) / release(int) 方法。

为了更好地理解信号量,下面有几个问题可以体会下:

  1. 一个初始化为 5 的信号量,线程 A 调用 6 次 acquire 方法会怎么样?线程 A 第 6 次调用时会阻塞。

"A" #14 prio=5 os_prio=0 cpu=0.00ms elapsed=8.03s tid=0x0000022a96fe0800 nid=0x18f4 waiting on condition  [0x000000e88a9fe000]   java.lang.Thread.State: WAITING (parking)        at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)        - parking to wait for  <0x0000000711a3e848> (a java.util.concurrent.Semaphore$NonfairSync)        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)        at java.util.concurrent.Semaphore.acquire(java.base@11.0.15/Semaphore.java:318)        at self.samson.example.basic.juc.lock.SemaphoreExamples$1.run(SemaphoreExamples.java:14)        at java.lang.Thread.run(java.base@11.0.15/Thread.java:834)
复制代码
  1. 一个初始化为 5 的信号量,线程 A 调用 1 次 acquire 后,再调用 2 次 release,线程 B 调用 6 次 acquire 会怎么样?B 可以正常获取 6 次许可。release 会增加许可的数量,这个数量并不会受初始化时的数量所限制。

02-CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

CountDownLatch 可以被理解为一个一次性的计数器。CountDownLatch(int) 初始化计数器为 n,线程间可通过如下的接口同步:

  • await,调用此方法的线程会被阻塞,直至计数器为 0。await(long, TimeUnit) 是带超时版本的方法,当超时时间为 0 时,等同于 await();当超时时间大于 0 时,调用 await 方法的线程会被阻塞,直至以下三种情况之一发生:1)CountDownLatch 计数器值为 0;2)超时时间已过;3)当前线程(调用 await 的线程)被中断。

  • countDown,调用此方法的线程会将计数器的值减 1。

CountDownLatch 的典型应用场景之一是:一个或多个线程调用 await 方法等待,n 个其他线程调用 countDown 后,等待的线程被唤醒。

CountDownLatch 是一次性的计数器,即计数器值无法被重置。如果想要重置计数器,则可以选择使用 CyclicBarrier。

03-CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

CyclicBarrier 并非基于 AQS 实现。与 CountDownLatch 一样,CyclicBarrier 也是用于控制多个线程的工具。在初始化时可指定计数器值 CyclicBarrier(int)。而且,从名字中可以看出,CyclicBarrier 是可复用的。因此,其内部包含了一个 reset() 方法,将其计数器值重置为初始值。

  • await,与 CountDownLatch#await 方法的语义稍有区别。调用 await 的当前线程会阻塞,直至以下 5 种条件之一发生:1)所有线程均到达(即 n 个线程都调用了 await 方法,n 为 CyclicBarrier 初始化的计数器值);2)当前线程被中断;3)其他调用了 await 方法的线程被中断;4)调用了 await 方法的某个线程超时了(即其调用的是超市版本的 await);5)CyclicBarrier 被重置,即其他线程调用了 reset 方法。

CyclicBarrier 还包含了另外一个构造器 CyclicBarrier(int, Runnable)。最后一个到达线程除了会唤醒其他的所有阻塞的线程,还会执行初始化时的 Runnable 任务。

CyclicBarrier 与 CountDownLatch 对比:

  • 前者可循环使用,后者为一次性(one-shot)

  • 从计数方式上,前者累加,后者递减

  • 两者都有控制多个线程等待同步,然后开始下一步动作的语义。

04-Phaser

A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.

Phaser 是 JDK 1.7 时引入的。在上节介绍 CyclicBarrier 时提到,其初始化时会指定一个计数器值 n,当 n 个线程调用 await 之后(这些线程也被称为参与者,party),这些线程会先阻塞,然后等 n 个线程全都调用过 await 后(也称为抵达,arrive barrier),所有的参与者恢复行动。



在 Phaser 中,参与者的数量是可变的,可通过 register() / bulkRegister(int) 增加参与者的数量,通过 arriveAndDeregister() 减少参与者的数量。与 CyclicBarrier 一样,Phaser 是分代的(generation),每代都有一个数字(从 0 开始)与其关联,例如上图中的 phase-0 / phase-1 / phase-2。phase-0 到 phase-1 称为晋升(advance)。

CyclicBarrier 可以看作是一个参与者数量固定的 Phaser。

Phaser 中有一些概念需要理解下:

  • Registration,Phaser 中的参与者数量是可变的,可以随时注册,并在任何抵达点可以通过 arriveAndDeregister 撤销注册。

  • Synchronization,参与者可以调用两类方法:Arrival,arrive() 和 arriveAndDeregister() 方法不会阻塞,但会返回一个整型值,表示到达的 Phaser(例如上图中的 phase-0)。当最后的参与者到达 Phaser 后,可以执行特定的动作(类似于 CyclicBarrier 中最后到达的参与者执行的 Runnable 任务)。特定的动作可以通过重写 onAdvance(int, int) 方法实现。onAdvance 的返回值也控制着 Termination。Waiting,awaitAdvance(int) 方法,若参数不是当前的 phase-number 或 Phaser 已停止,则立即返回;否则,阻塞当前线程至 phase-number 晋升,返回值为晋升后的 phase-number。

  • Termination,Phaser 变成终止状态后,所有的同步方法(Synchronization 中提到方法)都会立即返回,不会等到 phase-number 晋升。终止状态后,任何的注册尝试都是无效的。onAdvance(int, int) 方法的返回值为 true 时,Phaser 进入到终止状态。

  • Tiering,Phaser 是可以分层次的,即可以有父子结构。

  • Monitoring,getRegisteredParties 可以获得参与者总数,getArrivedParties 可以获得到达当前 Phase 的参与者数量,getPhase 可以获得当前的 phase-number,getUnarrivedParties 获得剩余的参与者数。

05-Exchanger

A synchronization point at which threads can pair and swap elements within pairs.

Exchanger 用于在两个线程之间交换数据,线程 A 调用 exchange 方法后会阻塞至线程 B 调用 exchange,之后两个线程互相交换数据。

Exchanger 可以看作是“双工”的 SynchronousQueue。

下面是一个交换双方线程名的示例。

Exchanger<String> stringExchanger = new Exchanger<>();Runnable runnable = () ->  {    System.out.println(Thread.currentThread().getName() + " before exchange().");    try {        String exchange = stringExchanger.exchange("I'm " + Thread.currentThread().getName());        System.out.println(Thread.currentThread().getName() + " after exchange(). and exchange = " + exchange);    } catch (Exception e) {        e.printStackTrace();    }};
new Thread(runnable, "thread-1").start();new Thread(runnable, "thread-2").start();}
复制代码



历史文章推荐

Java Core 「9」J.U.C 同步工具类 -1

Java Core 「8」字节码增强技术

Java Core 「7」各种不同类型的锁

Java Core「6」反射与 SPI 机制

Java Core「5」自定义注解编程

Java Core「4」java.util.concurrent 包简介

Java Core「3」volatile 关键字

Java Core「2」synchronized 关键字

Java Core「1」JUC- 线程基础

发布于: 刚刚阅读数: 3
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core 「10」J.U.C 同步工具类-2_学习笔记_Samson_InfoQ写作社区