写点什么

Java 高并发之设计模式,深入 linux 内核架构 mobi

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:4569 字

    阅读完需:约 15 分钟

1?public?static?synchronized?Singleton?getInstance()?{


2??????????if?(single ==?null) {


3??????????????single =?new?Singleton();


4??????????}


5?????????return?single;


6?}


这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差


懒汉式: 使用双检锁 + volatile


1?????private?volatile?Singleton singleton =?null;


2?????public?static?Singleton?getInstance()?{


3?????????if?(singleton ==?null) {


4?????????????synchronized?(Singleton.class) {


5?????????????????if?(singleton ==?null) {


6?????????????????????singleton =?new?Singleton();


7?????????????????}


8?????????????}


9?????????}


10?????????return?singleton;


11?????}


本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.


后续调用 getInstance 已经是无锁状态. 只是写法上稍微繁琐点.


至于为什么要 volatile 关键字, 主要涉及到 jdk 指令重排, 详见之前的博文:?Java 内存模型与指令重排


懒汉式: 使用静态内部类


1?public?class?Singleton?{


2?????private?static?class?LazyHolder?{


3????????private?static?final?Singleton INSTANCE =?new?Singleton();


4?????}


5?????private?Singleton?(){}


6?????public?static?final?Singleton?getInstance()?{


7????????return?LazyHolder.INSTANCE;


8?????}


9?}


该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.


缺点在于无法响应事件来重新初始化 INSTANCE.


饿汉式


1?public?class?Singleton1?{?


2?????private?Singleton1()?{}?


3?????private?static?final?Singleton1 single =?new?Singleton1();?


4?????public?static?Singleton1?getInstance()?{?


5?????????return?single;?


6?????}?


7?}


缺点在于对象在一开始就直接初始化了.

Future 模式

该模式的核心思想是异步调用. 有点类似于异步的 ajax 请求.


当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.


因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,


后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下



jdk 中内置了 Future 模式的支持, 其接口如下:



通过 FutureTask 实现


注意其中两个耗时操作.


  • 如果 doOtherThing 耗时 2s, 则整个函数耗时 2s 左右.

  • 如果 doOtherThing 耗时 0.2s, 则整个函数耗时取决于 RealData.costTime, 即 1s 左右结束.


1?public?class?FutureDemo1?{


2


3?????public?static?void?main(String[] args)?throws?InterruptedException, ExecutionException?{


4?????????FutureTask<String> future =?new?FutureTask<String>(new?Callable<String>() {


5?????????????@Override


6?????????????public?String?call()?throws?Exception?{


7?????????????????return?new?RealData().costTime();


8?????????????}


9?????????});


10?????????ExecutorService service = Executors.newCachedThreadPool();


11?????????service.submit(future);


12


13?????????Sy


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


stem.out.println("RealData 方法调用完毕");


14?????????// 模拟主函数中其他耗时操作


15?????????doOtherThing();


16?????????// 获取 RealData 方法的结果


17?????????System.out.println(future.get());


18?????}


19


20?????private?static?void?doOtherThing()?throws?InterruptedException?{


21?????????Thread.sleep(2000L);


22?????}


23?}


24


25?class?RealData?{


26


27?????public?String?costTime()?{


28?????????try?{


29?????????????// 模拟 RealData 耗时操作


30?????????????Thread.sleep(1000L);


31?????????????return?"result";


32?????????}?catch?(InterruptedException e) {


33?????????????e.printStackTrace();


34?????????}


35?????????return?"exception";


36?????}


37


38?}


通过 Future 实现


与上述 FutureTask 不同的是, RealData 需要实现 Callable 接口


1?public?class?FutureDemo2?{


2


3?????public?static?void?main(String[] args)?throws?InterruptedException, ExecutionException?{


4?????????ExecutorService service = Executors.newCachedThreadPool();


5?????????Future<String> future = service.submit(new?RealData2());


6


7?????????System.out.println("RealData2 方法调用完毕");


8?????????// 模拟主函数中其他耗时操作


9?????????doOtherThing();


10?????????// 获取 RealData2 方法的结果


11?????????System.out.println(future.get());


12?????}


13


14?????private?static?void?doOtherThing()?throws?InterruptedException?{


15?????????Thread.sleep(2000L);


16?????}


17?}


18


19?class?RealData2?implements?Callable<String>{


20


21?????public?String?costTime()?{


22?????????try?{


23?????????????// 模拟 RealData 耗时操作


24?????????????Thread.sleep(1000L);


25?????????????return?"result";


26?????????}?catch?(InterruptedException e) {


27?????????????e.printStackTrace();


28?????????}


29?????????return?"exception";


30?????}


31


32?????@Override


33?????public?String?call()?throws?Exception?{


34?????????return?costTime();


35?????}


36?}


另外 Future 本身还提供了一些额外的简单控制功能, 其 API 如下


1?????// 取消任务


2?????boolean?cancel(boolean?mayInterruptIfRunning);


3?????// 是否已经取消


4?????boolean?isCancelled();


5?????// 是否已经完成


6?????boolean?isDone();


7?????// 取得返回对象


8?????V?get()?throws?InterruptedException, ExecutionException;


9?????// 取得返回对象, 并可以设置超时时间


10?????V?get(long?timeout, TimeUnit unit)


11?throws?InterruptedException, ExecutionException, TimeoutException;


生产消费者模式


生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。


在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。


生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。


生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下



PCData 为我们需要处理的元数据模型, 生产者构建 PCData, 并放入缓冲队列.


消费者从缓冲队列中获取数据, 并执行计算.


生产者核心代码


1?????????while(isRunning) {


2?????????????Thread.sleep(r.nextInt(SLEEP_TIME));


3?????????????data =?new?PCData(count.incrementAndGet);


4?????????????// 构造任务数据


5?????????????System.out.println(data +?" is put into queue");


6?????????????if?(!queue.offer(data,?2, TimeUnit.SECONDS)) {


7?????????????????// 将数据放入队列缓冲区中


8?????????????????System.out.println("faild to put data : "?+ data);


9?????????????}


10?????????}


消费者核心代码


1?????????while?(true) {


2?????????????PCData data = queue.take();


3?????????????// 提取任务


4?????????????if?(data !=?null) {


5?????????????????// 获取数据, 执行计算操作


6?????????????????int?re = data.getData() *?10;


7?????????????????System.out.println("after cal, value is : "?+ re);


8?????????????????Thread.sleep(r.nextInt(SLEEP_TIME));


9?????????????}


10?????????}


生产消费者模式可以有效对数据解耦, 优化系统结构.


降低生产者和消费者线程相互之间的依赖与性能要求.


一般使用 BlockingQueue 作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,?


如果对缓冲队列有性能要求, 则可以使用基于 CAS 无锁设计的 ConcurrentLinkedQueue.

分而治之

严格来讲, 分而治之不算一种模式, 而是一种思想.


它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.


我们主要讲两个场景, Master-Worker 模式, ForkJoin 线程池.


Master-Worker 模式


该模式核心思想是系统由两类进行协助工作: Master 进程, Worker 进程.


Master 负责接收与分配任务, Worker 负责处理任务.?当各个 Worker 处理完成后,?


将结果返回给 Master 进行归纳与总结.



假设一个场景, 需要计算 100 个任务, 并对结果求和, Master 持有 10 个子进程.


Master 代码


1?public?class?MasterDemo?{


2?????// 盛装任务的集合


3?????private?ConcurrentLinkedQueue<TaskDemo> workQueue =?new?ConcurrentLinkedQueue<TaskDemo>();


4?????// 所有 worker


5?????private?HashMap<String, Thread> workers =?new?HashMap<>();


6?????// 每一个 worker 并行执行任务的结果


7?????private?ConcurrentHashMap<String, Object> resultMap =?new?ConcurrentHashMap<>();


8


9?????public?MasterDemo(WorkerDemo worker,?int?workerCount)?{


10?????????// 每个 worker 对象都需要持有 queue 的引用, 用于领任务与提交结果


11?????????worker.setResultMap(resultMap);


12?????????worker.setWorkQueue(workQueue);


13?????????for?(int?i =?0; i < workerCount; i++) {


14?????????????workers.put("子节点: "?+ i,?new?Thread(worker));


15?????????}


16?????}


17


18?????// 提交任务


19?????public?void?submit(TaskDemo task)?{


20?????????workQueue.add(task);


21?????}


22


23?????// 启动所有的子任务


24?????public?void?execute(){


25?????????for?(Map.Entry<String, Thread> entry : workers.entrySet()) {


26?????????????entry.getValue().start();


27?????????}


28?????}


29


30?????// 判断所有的任务是否执行结束


31?????public?boolean?isComplete()?{


32?????????for?(Map.Entry<String, Thread> entry : workers.entrySet()) {


33?????????????if?(entry.getValue().getState() != Thread.State.TERMINATED) {


34?????????????????return?false;


35?????????????}


36?????????}


37


38?????????return?true;


39?????}


40


41?????// 获取最终汇总的结果


42?????public?int?getResult()?{


43?????????int?result =?0;


44?????????for?(Map.Entry<String, Object> entry : resultMap.entrySet()) {


45?????????????result += Integer.parseInt(entry.getValue().toString());


46?????????}


47


48?????????return?result;


49?????}


50


51?}


Worker 代码


1?public?class?WorkerDemo?implements?Runnable{


2


3?????private?ConcurrentLinkedQueue<TaskDemo> workQueue;


4?????private?ConcurrentHashMap<String, Object> resultMap;


5


6?????@Override


7?????public?void?run()?{


8?????????while?(true) {


9?????????????TaskDemo input =?this.workQueue.poll();


10?????????????// 所有任务已经执行完毕


11?????????????if?(input ==?null) {


12?????????????????break;


13?????????????}


14?????????????// 模拟对 task 进行处理, 返回结果


15?????????????int?result = input.getPrice();


16?????????????this.resultMap.put(input.getId() +?"", result);


17?????????????System.out.println("任务执行完毕, 当前线程: "?+ Thread.currentThread().getName());


18?????????}


19?????}


20


21?????public?ConcurrentLinkedQueue<TaskDemo>?getWorkQueue()?{


22?????????return?workQueue;


23?????}


24


25?????public?void?setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue)?{


26?????????this.workQueue = workQueue;


27?????}


28


29?????public?ConcurrentHashMap<String, Object>?getResultMap()?{


30?????????return?resultMap;


31?????}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Java 高并发之设计模式,深入linux内核架构mobi