Java 并发编程系列——常用并发工具类

用户头像
孙苏勇
关注
发布于: 2020 年 05 月 01 日
Java并发编程系列——常用并发工具类

接上一篇,写写几个常用的并发编程中使用的工具类,CountDownLatch,CyclicBarrier,Semaphore和Exchanger。



接下来分别以示例的方式来介绍这几个工具类。



CountDownLatch,其场景主要应用在一个任务等待其他任务执行N次后才执行。假设我们定义了两个线程类,这两个线程类中当执行完本线程的任务将计数减1,再定义一个等待线程,当计数为0时执行本线程的任务。



示例代码:

public class ShowCountDownLatch {
private static final CountDownLatch lock = new CountDownLatch(3);
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(new CountDownRunnable1()).start();
new Thread(new CountDownRunnable2()).start();
}
new Thread(new WaitRunnable()).start();
}
public static class WaitRunnable implements Runnable {
@Override
public void run() {
System.out.println("wait runnable start waiting");
try {
lock.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait runnable end");
}
}
public static class CountDownRunnable1 implements Runnable {
@Override
public void run() {
System.out.println("countdown1 start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.countDown();
System.out.println("countdown1 end");
}
}
public static class CountDownRunnable2 implements Runnable {
@Override
public void run() {
System.out.println("countdown2 start");
try {
Thread.sleep(1300);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.countDown();
System.out.println("countdown2 end");
}
}
}



如上代码所示,我们定义了计数值为3,共开启6个线程,其中这6个线程中都会执行计数扣减,当扣减为0时,等待线程中的任务执行。这里需要注意,计数为0时等待线程并非马上执行,而是要等做扣减的任务执行完后以后。这一点我们可以从结果输出上看出来。

countdown1 start
countdown2 start
countdown1 start
countdown2 start
countdown1 start
countdown2 start
wait runnable start waiting
countdown1 end
countdown1 end
countdown1 end
wait runnable end
countdown2 end
countdown2 end
countdown2 end



await方法可以带超时参数,当有等待时间要求时,可使用带超时的await方法。



CyclicBarrier,其场景主要为有一组线程,当所有线程执行到某一个时刻,这组线程才能继续向下执行。CyclicBarrier构造时可以传入一个Runnable,即当一组线程执行到某一时刻点时这个Runnable随这一组线程一起执行。



示例代码:

public class ShowCyclicBarrier {
private static final CyclicBarrier lock = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("all thread end");
}
});
public static void main(String[] args) {
new Thread(new Runnable1()).start();
new Thread(new Runnable2()).start();
}
public static class Runnable1 implements Runnable {
@Override
public void run() {
System.out.println("Runnable1 start");
try {
lock.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Runnable1 end");
}
}
public static class Runnable2 implements Runnable {
@Override
public void run() {
System.out.println("Runnable2 start");
try {
lock.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Runnable2 end");
}
}
}



从如下输出结果上可以看出,当线程全部执行到await方法时,这一组线程继续执行后续任务,并且同时执行CyclicBarrier中的Runnable。同样可使用带超时的await方法,结果:

Runnable1 start
Runnable2 start
all thread end
Runnable2 end
Runnable1 end



CountDownLatch与CyclicBarrier的区别

CountDownLatch主要由外部线程来控制线程是否往下执行,而CyclicBarrier是由一组线程自身来控制。比如,我们有一个计算任务,必须等到前置的若干个计算完成后才能启动,这时候就可以用CountDownLatch来实现。又比如,我们要测试一个服务的瞬间响应能力,希望启动一批线程,当线程全部准备好后,同时执行调用该服务,就可以用CyclicBarrier来实现。



Semaphore,其场景主要可用于对某一资源有使用数量的限制。我们假设有一个复杂的计算任务会被很多线程调用,而该计算方法是很资源的,我们希望同时能执行的计算在一个数值内,当多个线程调用时,超过则要排队,而计算完毕后排队中的任务可以接着执行。



示例代码:

public class ShowSemaphore {
public static void main(String[] args) {
CalculatorPool pool = new CalculatorPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
pool.calculate(finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
public static class CalculatorPool {
private final int MAX_POOL_SIZE = 3;
private final LinkedList<Calculator> pool = new LinkedList<>();
private final Semaphore remain;
public int calculate(int base) throws InterruptedException {
long start = System.currentTimeMillis();
remain.acquire();
System.out.printf("Thread %d take %d ms to get resource\n",
Thread.currentThread().getId(),
System.currentTimeMillis() - start);
Calculator calculator;
synchronized (pool) {
calculator = pool.removeFirst();
}
int result = calculator.calculate(base);
synchronized (pool) {
pool.addLast(calculator);
}
System.out.printf("%d threads is waiting for calculator\n",
remain.getQueueLength());
remain.release();
return result;
}
public CalculatorPool() {
this.remain = new Semaphore(MAX_POOL_SIZE);
for (int i = 0; i < MAX_POOL_SIZE; i++) {
pool.addLast(new Calculator());
}
}
}
public static class Calculator {
public int calculate(int base) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return base;
}
}
}



如上代码所示,我们定义了一个计算池,计算池中允许同时最多十个计算任务,要想执行计算,首先需要通过acquire来看是否有资源可用,而用后需要release资源以供后续任务使用。



输出结果如下:

Thread 12 take 7 ms to get resource
Thread 14 take 0 ms to get resource
Thread 13 take 0 ms to get resource
7 threads is waiting for calculator
7 threads is waiting for calculator
Thread 15 take 556 ms to get resource
6 threads is waiting for calculator
Thread 16 take 554 ms to get resource
Thread 17 take 555 ms to get resource
4 threads is waiting for calculator
Thread 18 take 1059 ms to get resource
3 threads is waiting for calculator
3 threads is waiting for calculator
Thread 19 take 1060 ms to get resource
Thread 20 take 1059 ms to get resource
1 threads is waiting for calculator
Thread 21 take 1559 ms to get resource
0 threads is waiting for calculator
0 threads is waiting for calculator
0 threads is waiting for calculator

可以从线程等待计算资源的时间上看出其对资源使用的限制。



Exchanger,其应用场景主要是两个线程进行数据交换。这里我们假设一个生产者消费者的情形,消费者如果手里没商品呢就把自己的容器给生产者,生产者呢生产好商品把填满的容器再给回消费者。



示例代码:

public class ShowExchanger {
private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Consumer().start();
new Producer().start();
}
public static class Producer extends Thread {
@Override
public void run() {
Set<String> set = new HashSet<>();
while (!isInterrupted()) {
try {
if (!set.isEmpty()) {
set.clear();
System.out.printf("consumer over, size is %d\n", set.size());
}
set = exchanger.exchange(set);
System.out.printf("get products from producer, size is %d\n", set.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static class Consumer extends Thread {
@Override
public void run() {
Set<String> set = new HashSet<>();
while (!isInterrupted()) {
for (int i = 0; i < 10; i++) {
set.add(Integer.toString(i));
}
try {
Thread.sleep(1000);
System.out.printf("%d products produced\n", set.size());
set = exchanger.exchange(set);
System.out.printf("get empty set from consumer, size is %d\n", set.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}



如上代码所示,我们不断的进行商品的生产与消费,从输出结果上可以看出交换的过程。

10 products produced
get empty set from consumer, size is 0
get products from producer, size is 10
consumer over, size is 0
10 products produced
get empty set from consumer, size is 0
get products from producer, size is 10
consumer over, size is 0

这些并发工具类相对使用会比较多,可以帮助我们解决不同应用场景下的并发调度要求。

本系列其他文章:

Java并发编程系列——Fork-Join

Java并发编程系列——线程的等待与唤醒

Java并发编程系列插曲——对象的内存结构

Java并发编程系列——线程

发布于: 2020 年 05 月 01 日 阅读数: 185
用户头像

孙苏勇

关注

不读书,思想就会停止。 2018.04.05 加入

公众号“像什么",记录想记录的。

评论

发布
暂无评论
Java并发编程系列——常用并发工具类