同步屏障 CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); }}
复制代码
因为主线程和子线程的调度是由 CPU 决定的,两个线程都有可能先执行,所以会产生两种输出 12 和 21
如果把 new CyclicBarrier(2)修改成 new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行 await 方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景
public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); }
static class A implements Runnable { @Override public void run() { System.out.println(3); } }}
复制代码
因为 CyclicBarrier 设置了拦截线程的数量是 2,所以必须等代码中的第一个线程和线程 A 都执行完之后,才会继续执行主线程,然后输出 312
public class BankWaterService implements Runnable { /** * 创建4个屏障,处理完之后执行当前类的run方法 */ private CyclicBarrier c = new CyclicBarrier(4, this); /** * 假设只有4个sheet,所以只启动4个线程 */ private Executor executor = Executors.newFixedThreadPool(4); /** * 保存每个sheet计算出的银流结果 */ private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
private void count() { for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 计算当前sheet的银流数据,计算代码省略 sheetBankWaterCount .put(Thread.currentThread().getName(), 1); // 银流计算完成,插入一个屏障 try { c.await(); } catch (Exception e) { e.printStackTrace(); } } }); } }
@Override public void run() { int result = 0; // 汇总每个sheet计算出的结果 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); }// 将结果输出 sheetBankWaterCount.put("result", result); System.out.println(result); }
public static void main(String[] args) { BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); }}
复制代码
使用线程池创建 4 个线程,分别计算每个 sheet 里的数据,每个 sheet 计算结果是 1,再由 BankWaterService 线程汇总 4 个 sheet 计算出的结果,输出结果 4
CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset()方法重置。所以 CyclicBarrier 能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier 还提供其他有用的方法,比如 getNumberWaiting 方法可以获得 Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断
public class CyclicBarrierTest3 { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Thread thread = new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } } }); thread.start(); thread.interrupt(); try { c.await(); } catch (Exception e) { System.out.println(c.isBroken()); } }}
复制代码
评论