Java 高并发之设计模式,深入 linux 内核架构 mobi
1?public?static?synchronized?Singleton?getInstance()?{
2??????????if?(single ==?null) {
3??????????????single =?new?Singleton();
4??????????}
5?????????return?single;
6?}
这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差
懒汉式: 使用双检锁 + volatile
1?????private?volatile?Singleton singleton =?null;
2?????public?static?Singleton?getInstance()?{
3?????????if?(singleton ==?null) {
4?????????????synchronized?(Singleton.class) {
5?????????????????if?(singleton ==?null) {
6?????????????????????singleton =?new?Singleton();
7?????????????????}
8?????????????}
9?????????}
10?????????return?singleton;
11?????}
本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.
后续调用 getInstance 已经是无锁状态. 只是写法上稍微繁琐点.
至于为什么要 volatile 关键字, 主要涉及到 jdk 指令重排, 详见之前的博文:?Java 内存模型与指令重排
懒汉式: 使用静态内部类
1?public?class?Singleton?{
2?????private?static?class?LazyHolder?{
3????????private?static?final?Singleton INSTANCE =?new?Singleton();
4?????}
5?????private?Singleton?(){}
6?????public?static?final?Singleton?getInstance()?{
7????????return?LazyHolder.INSTANCE;
8?????}
9?}
该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.
缺点在于无法响应事件来重新初始化 INSTANCE.
饿汉式
1?public?class?Singleton1?{?
2?????private?Singleton1()?{}?
3?????private?static?final?Singleton1 single =?new?Singleton1();?
4?????public?static?Singleton1?getInstance()?{?
5?????????return?single;?
6?????}?
7?}
缺点在于对象在一开始就直接初始化了.
Future 模式
该模式的核心思想是异步调用. 有点类似于异步的 ajax 请求.
当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.
因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,
后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下
jdk 中内置了 Future 模式的支持, 其接口如下:
通过 FutureTask 实现
注意其中两个耗时操作.
如果 doOtherThing 耗时 2s, 则整个函数耗时 2s 左右.
如果 doOtherThing 耗时 0.2s, 则整个函数耗时取决于 RealData.costTime, 即 1s 左右结束.
1?public?class?FutureDemo1?{
2
3?????public?static?void?main(String[] args)?throws?InterruptedException, ExecutionException?{
4?????????FutureTask<String> future =?new?FutureTask<String>(new?Callable<String>() {
5?????????????@Override
6?????????????public?String?call()?throws?Exception?{
7?????????????????return?new?RealData().costTime();
8?????????????}
9?????????});
10?????????ExecutorService service = Executors.newCachedThreadPool();
11?????????service.submit(future);
12
13?????????Sy
stem.out.println("RealData 方法调用完毕");
14?????????// 模拟主函数中其他耗时操作
15?????????doOtherThing();
16?????????// 获取 RealData 方法的结果
17?????????System.out.println(future.get());
18?????}
19
20?????private?static?void?doOtherThing()?throws?InterruptedException?{
21?????????Thread.sleep(2000L);
22?????}
23?}
24
25?class?RealData?{
26
27?????public?String?costTime()?{
28?????????try?{
29?????????????// 模拟 RealData 耗时操作
30?????????????Thread.sleep(1000L);
31?????????????return?"result";
32?????????}?catch?(InterruptedException e) {
33?????????????e.printStackTrace();
34?????????}
35?????????return?"exception";
36?????}
37
38?}
通过 Future 实现
与上述 FutureTask 不同的是, RealData 需要实现 Callable 接口
1?public?class?FutureDemo2?{
2
3?????public?static?void?main(String[] args)?throws?InterruptedException, ExecutionException?{
4?????????ExecutorService service = Executors.newCachedThreadPool();
5?????????Future<String> future = service.submit(new?RealData2());
6
7?????????System.out.println("RealData2 方法调用完毕");
8?????????// 模拟主函数中其他耗时操作
9?????????doOtherThing();
10?????????// 获取 RealData2 方法的结果
11?????????System.out.println(future.get());
12?????}
13
14?????private?static?void?doOtherThing()?throws?InterruptedException?{
15?????????Thread.sleep(2000L);
16?????}
17?}
18
19?class?RealData2?implements?Callable<String>{
20
21?????public?String?costTime()?{
22?????????try?{
23?????????????// 模拟 RealData 耗时操作
24?????????????Thread.sleep(1000L);
25?????????????return?"result";
26?????????}?catch?(InterruptedException e) {
27?????????????e.printStackTrace();
28?????????}
29?????????return?"exception";
30?????}
31
32?????@Override
33?????public?String?call()?throws?Exception?{
34?????????return?costTime();
35?????}
36?}
另外 Future 本身还提供了一些额外的简单控制功能, 其 API 如下
1?????// 取消任务
2?????boolean?cancel(boolean?mayInterruptIfRunning);
3?????// 是否已经取消
4?????boolean?isCancelled();
5?????// 是否已经完成
6?????boolean?isDone();
7?????// 取得返回对象
8?????V?get()?throws?InterruptedException, ExecutionException;
9?????// 取得返回对象, 并可以设置超时时间
10?????V?get(long?timeout, TimeUnit unit)
11?throws?InterruptedException, ExecutionException, TimeoutException;
生产消费者模式
生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。
在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。
生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下
PCData 为我们需要处理的元数据模型, 生产者构建 PCData, 并放入缓冲队列.
消费者从缓冲队列中获取数据, 并执行计算.
生产者核心代码
1?????????while(isRunning) {
2?????????????Thread.sleep(r.nextInt(SLEEP_TIME));
3?????????????data =?new?PCData(count.incrementAndGet);
4?????????????// 构造任务数据
5?????????????System.out.println(data +?" is put into queue");
6?????????????if?(!queue.offer(data,?2, TimeUnit.SECONDS)) {
7?????????????????// 将数据放入队列缓冲区中
8?????????????????System.out.println("faild to put data : "?+ data);
9?????????????}
10?????????}
消费者核心代码
1?????????while?(true) {
2?????????????PCData data = queue.take();
3?????????????// 提取任务
4?????????????if?(data !=?null) {
5?????????????????// 获取数据, 执行计算操作
6?????????????????int?re = data.getData() *?10;
7?????????????????System.out.println("after cal, value is : "?+ re);
8?????????????????Thread.sleep(r.nextInt(SLEEP_TIME));
9?????????????}
10?????????}
生产消费者模式可以有效对数据解耦, 优化系统结构.
降低生产者和消费者线程相互之间的依赖与性能要求.
一般使用 BlockingQueue 作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,?
如果对缓冲队列有性能要求, 则可以使用基于 CAS 无锁设计的 ConcurrentLinkedQueue.
分而治之
严格来讲, 分而治之不算一种模式, 而是一种思想.
它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.
我们主要讲两个场景, Master-Worker 模式, ForkJoin 线程池.
Master-Worker 模式
该模式核心思想是系统由两类进行协助工作: Master 进程, Worker 进程.
Master 负责接收与分配任务, Worker 负责处理任务.?当各个 Worker 处理完成后,?
将结果返回给 Master 进行归纳与总结.
假设一个场景, 需要计算 100 个任务, 并对结果求和, Master 持有 10 个子进程.
Master 代码
1?public?class?MasterDemo?{
2?????// 盛装任务的集合
3?????private?ConcurrentLinkedQueue<TaskDemo> workQueue =?new?ConcurrentLinkedQueue<TaskDemo>();
4?????// 所有 worker
5?????private?HashMap<String, Thread> workers =?new?HashMap<>();
6?????// 每一个 worker 并行执行任务的结果
7?????private?ConcurrentHashMap<String, Object> resultMap =?new?ConcurrentHashMap<>();
8
9?????public?MasterDemo(WorkerDemo worker,?int?workerCount)?{
10?????????// 每个 worker 对象都需要持有 queue 的引用, 用于领任务与提交结果
11?????????worker.setResultMap(resultMap);
12?????????worker.setWorkQueue(workQueue);
13?????????for?(int?i =?0; i < workerCount; i++) {
14?????????????workers.put("子节点: "?+ i,?new?Thread(worker));
15?????????}
16?????}
17
18?????// 提交任务
19?????public?void?submit(TaskDemo task)?{
20?????????workQueue.add(task);
21?????}
22
23?????// 启动所有的子任务
24?????public?void?execute(){
25?????????for?(Map.Entry<String, Thread> entry : workers.entrySet()) {
26?????????????entry.getValue().start();
27?????????}
28?????}
29
30?????// 判断所有的任务是否执行结束
31?????public?boolean?isComplete()?{
32?????????for?(Map.Entry<String, Thread> entry : workers.entrySet()) {
33?????????????if?(entry.getValue().getState() != Thread.State.TERMINATED) {
34?????????????????return?false;
35?????????????}
36?????????}
37
38?????????return?true;
39?????}
40
41?????// 获取最终汇总的结果
42?????public?int?getResult()?{
43?????????int?result =?0;
44?????????for?(Map.Entry<String, Object> entry : resultMap.entrySet()) {
45?????????????result += Integer.parseInt(entry.getValue().toString());
46?????????}
47
48?????????return?result;
49?????}
50
51?}
Worker 代码
1?public?class?WorkerDemo?implements?Runnable{
2
3?????private?ConcurrentLinkedQueue<TaskDemo> workQueue;
4?????private?ConcurrentHashMap<String, Object> resultMap;
5
6?????@Override
7?????public?void?run()?{
8?????????while?(true) {
9?????????????TaskDemo input =?this.workQueue.poll();
10?????????????// 所有任务已经执行完毕
11?????????????if?(input ==?null) {
12?????????????????break;
13?????????????}
14?????????????// 模拟对 task 进行处理, 返回结果
15?????????????int?result = input.getPrice();
16?????????????this.resultMap.put(input.getId() +?"", result);
17?????????????System.out.println("任务执行完毕, 当前线程: "?+ Thread.currentThread().getName());
18?????????}
19?????}
20
21?????public?ConcurrentLinkedQueue<TaskDemo>?getWorkQueue()?{
22?????????return?workQueue;
23?????}
24
25?????public?void?setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue)?{
26?????????this.workQueue = workQueue;
27?????}
28
29?????public?ConcurrentHashMap<String, Object>?getResultMap()?{
30?????????return?resultMap;
31?????}
评论