写点什么

同步屏障 CyclicBarrier

作者:周杰伦本人
  • 2022 年 6 月 07 日
  • 本文字数:2385 字

    阅读完需:约 8 分钟

同步屏障 CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。


CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。


public class CyclicBarrierTest {    static CyclicBarrier c = new CyclicBarrier(2);    public static void main(String[] args) {        new Thread(new Runnable() {            @Override            public void run() {                try {                    c.await();                } catch (Exception e) {                }                System.out.println(1);            }        }).start();        try {            c.await();        } catch (Exception e) {        }        System.out.println(2);    }}
复制代码


因为主线程和子线程的调度是由 CPU 决定的,两个线程都有可能先执行,所以会产生两种输出 12 和 21


如果把 new CyclicBarrier(2)修改成 new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行 await 方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。


CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景


public class CyclicBarrierTest2 {    static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); }
static class A implements Runnable { @Override public void run() { System.out.println(3); } }}
复制代码


因为 CyclicBarrier 设置了拦截线程的数量是 2,所以必须等代码中的第一个线程和线程 A 都执行完之后,才会继续执行主线程,然后输出 312


public class BankWaterService implements Runnable {    /**     * 创建4个屏障,处理完之后执行当前类的run方法     */    private CyclicBarrier c = new CyclicBarrier(4, this);    /**     * 假设只有4个sheet,所以只启动4个线程     */    private Executor executor = Executors.newFixedThreadPool(4);    /**     * 保存每个sheet计算出的银流结果     */    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new            ConcurrentHashMap<String, Integer>();
private void count() { for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 计算当前sheet的银流数据,计算代码省略 sheetBankWaterCount .put(Thread.currentThread().getName(), 1); // 银流计算完成,插入一个屏障 try { c.await(); } catch (Exception e) { e.printStackTrace(); } } }); } }
@Override public void run() { int result = 0; // 汇总每个sheet计算出的结果 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); }// 将结果输出 sheetBankWaterCount.put("result", result); System.out.println(result); }
public static void main(String[] args) { BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); }}
复制代码


使用线程池创建 4 个线程,分别计算每个 sheet 里的数据,每个 sheet 计算结果是 1,再由 BankWaterService 线程汇总 4 个 sheet 计算出的结果,输出结果 4


CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset()方法重置。所以 CyclicBarrier 能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。


CyclicBarrier 还提供其他有用的方法,比如 getNumberWaiting 方法可以获得 Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断


public class CyclicBarrierTest3 {    static CyclicBarrier c = new CyclicBarrier(2);    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {        Thread thread = new Thread(new Runnable() {            @Override            public void run() {                try {                    c.await();                } catch (Exception e) {                }            }        });        thread.start();        thread.interrupt();        try {            c.await();        } catch (Exception e) {            System.out.println(c.isBroken());        }    }}
复制代码


发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020.02.29 加入

还未添加个人简介

评论

发布
暂无评论
同步屏障CyclicBarrier_6月月更_周杰伦本人_InfoQ写作社区