写点什么

简单设计一个 JAVA 并行处理工具类

  • 2024-08-07
    福建
  • 本文字数:6154 字

    阅读完需:约 20 分钟

在工作中,我们肯定遇到过一个接口要处理 N 多事项导致接口响应速度很慢的情况,通常我们会综合使用两种方式来提升接口响应速度


  1. 优化查询 SQL,提升查询效率

  2. 开启多线程并发处理业务数据


这里讨论第二种方案:使用多线程并发处理业务数据,最后处理完成以后,拼装起来返回给前端,每个人的实现方案都不一样,我在工作的这几年也经历了几种写法。


一、几种常见的并行处理写法


方法一:Future 写法

其代码形式如下

@Testpublic void test1() {    //定义线程池    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 30,                    TimeUnit.SECONDS,                    new ArrayBlockingQueue<>(10),                    Executors.defaultThreadFactory(),                    new ThreadPoolExecutor.DiscardPolicy());    //异步执行    Future<String> getUserName = threadPoolExecutor.submit(() -> {        //do something...        return "kdyzm";    });    //异步执行    Future<Integer> getUserAge = threadPoolExecutor.submit(() -> {        //do something...        return 12;    });    //拼装回调结果    try {        UserInfo user = new UserInfo();        user.setName(getUserName.get());        user.setAge(getUserAge.get());        log.info(JsonUtils.toPrettyString(user));    } catch (InterruptedException | ExecutionException e) {        e.printStackTrace();    }}
@Datastatic class UserInfo { private String name; private Integer age;}
复制代码


多几个 submit 一起执行,最后集中 get 获取最终结果。

这种方式任务一旦多了,就会显得代码很乱,一堆的变量名会让代码可读性很差。


方法二:CompletableFuture.allOf 写法

其代码形式如下

@Testpublic void test2() {    try {        UserInfo userInfo = new UserInfo();                CompletableFuture.allOf(            	//异步执行                CompletableFuture.runAsync(() -> {                    userInfo.setName("kdyzm");                }),            	//异步执行                CompletableFuture.runAsync(() -> {                    userInfo.setAge(12);                })        //同步返回        ).get();
log.info(JsonUtils.toPrettyString(userInfo)); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }}
@Datastatic class UserInfo { private String name; private Integer age;}
复制代码


这种方法使用了 CompletableFuture 的 API,通过将多个异步任务收集起来统一调度最后通过一个 get 方法同步到主线程。比直接使用 Future 简化了些。


方法三:CompletableFuture::join 写法

其代码形式如下

@Testpublic void test3(){    UserInfo userInfo = new UserInfo();    Arrays.asList(			//异步执行            CompletableFuture.supplyAsync(()->{                return "kdyzm";            //回调执行            }).thenAccept(name->{                userInfo.setName(name);            }),
//异步执行 CompletableFuture.supplyAsync(()->{ return 12; //回调执行 }).thenAccept(age->{ userInfo.setAge(age); }) //等待所有线程执行完毕 ).forEach(CompletableFuture::join);
log.info(JsonUtils.toPrettyString(userInfo));
}
@Datastatic class UserInfo { private String name; private Integer age;}
复制代码


这种写法和上面的写法相比具有更高的可读性,但是它也有缺点:thenAccept 只能接收一个返回值,如果想处理多个值,则没有办法,只能使用方法 2。


总结

几种写法中第二、三种写法比较常见,使用起来也更加方便,两者各有优缺点:方法 2 能处理多个返回值,方法 3 可读性更高。但是无论是方法 2 还是方法 3,它们的使用总是要记住相关的 API,使用起来总不是很顺手,可读性虽然方法 3 更强一些,但是总还是差点意思。此时我就有了自己设计一个简单的并行处理工具类的想法,既要易用,还要可读性高。


二、并行处理工具类设计


1、设计模式选型

因为平时比较喜欢链式调用的 API,所以一开始一开始设计,我就想用建造者模式来实现这个工具类。关于建造者模式,详情可以看我之前的文章:设计模式(六):建造者模式 。建造者模式在实际应用中的特点就是链式调用,无论是 StringBuilder 还是 lombok 的 @Data 注解,都使用了建造者模式。


2、第一版代码

仿照方法三,我开发了第一版代码

import lombok.Data;import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.function.Consumer;import java.util.function.Supplier;
/** * @author kdyzm */@Slf4jpublic class ConcurrentWorker {
private List<Task> workers = new ArrayList<>();
public static ConcurrentWorker runner() { return new ConcurrentWorker(); }
public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) { Task<R> worker = new Task<>(action, value); this.workers.add(worker); return this; }
public void run() { workers.forEach(item -> { CompletableFuture completableFuture = CompletableFuture.supplyAsync(item.getValue()); item.setCompletableFuture(completableFuture); }); workers .stream() .map( item -> { return item.completableFuture.thenAccept(item.getAction()); } ) .forEach(CompletableFuture::join); }
@Data public static class Task<R> { private Consumer<? super R> action; private Supplier<R> value; private CompletableFuture<R> completableFuture;
public Task(Consumer<? super R> action, Supplier<R> value) { this.action = action; this.value = value; } }}
复制代码


这段代码一共不到 60 行,使用了 Lambda 表达式和函数式编程相关的 API 对方法三进行改造,最终使用效果如下

@Test    public void test() {
UserInfo userInfo = new UserInfo();
ConcurrentWorker.runner() //添加任务 .addTask(userInfo::setName, () -> { //延迟1000毫秒打印线程执行情况 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+"-name"); return "张三"; }) //添加任务 .addTask(userInfo::setAge, () -> { //延迟1000毫秒打印线程执行情况 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+"-age"); return 13; }) //执行任务 .run(); log.info(JsonUtils.toPrettyString(userInfo)); }
@Data static class UserInfo { private String name; private Integer age; private String sex; }
复制代码


它的使用方式就是

ConcurrentWorker.runner()                .addTask(setter function, return_value function )    			.addTask(setter function, return_value function)    			.run()
复制代码


可以看到易用性够了,可读性也很好,但是它的缺点和方法三一样,都只能接收一个参数,毕竟它是根据方法 3 封装的,接下来改造代码让它支持多参数处理。


3、第二版代码

已知,第一版代码已经支持了如下形式的功能

ConcurrentWorker.runner()                .addTask(setter function, return_value function )    			.addTask(setter function, return_value function)    			.run()
复制代码


现在我想添加以下形式的重载方法

.addTask(handle function)
复制代码


没错,就一个参数,在这个方法中可以任意设置对象值。最终使用的效果如下

@Testpublic void test() {
UserInfo userInfo = new UserInfo();
ConcurrentWorker.runner() .addTask(userInfo::setName, () -> { try { Thread.sleep(1000); log.info(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+"-name"); return "张三"; }) .addTask(userInfo::setAge, () -> { try { Thread.sleep(1000); log.info(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+"-age"); return 13; }) //新方法:处理任意多属性值填充 .addTask(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+"-sex"); userInfo.setSex("男"); }) .run(); log.info(JsonUtils.toPrettyString(userInfo));}
@Datastatic class UserInfo { private String name; private Integer age; private String sex;}
复制代码


完整工具类方法如下

import lombok.Data;import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.function.Consumer;import java.util.function.Supplier;
/** * @author kdyzm */@Slf4jpublic class ConcurrentWorker {
private List<Task> workers = new ArrayList<>();
public static ConcurrentWorker runner() { return new ConcurrentWorker(); }
public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) { Task<R> worker = new Task<>(action, value); this.workers.add(worker); return this; }
public <R> ConcurrentWorker addTask(Runnable runnable) { Task<R> worker = new Task<>(runnable); this.workers.add(worker); return this; }
public void run() { workers.forEach(item -> { int taskType = item.getTaskType(); CompletableFuture completableFuture = null; switch (taskType) { case TaskType.RETURN_VALUE: completableFuture = CompletableFuture.supplyAsync(item.getValue()); break; case TaskType.VOID_RETURN: completableFuture = CompletableFuture.runAsync(item.getRunnable()); break; default: break; } item.setCompletableFuture(completableFuture); }); workers .stream() .map( item -> { int taskType = item.getTaskType(); switch (taskType) { case TaskType.RETURN_VALUE: return item.completableFuture.thenAccept(item.getAction()); default: return item.completableFuture.thenAccept(temp->{ //空 }); } } ) .forEach(CompletableFuture::join); }
@Data public static class Task<R> { private Consumer<? super R> action; private Supplier<R> value; private CompletableFuture<R> completableFuture; private Runnable runnable; private int taskType;
public Task(Consumer<? super R> action, Supplier<R> value) { this.action = action; this.value = value; this.taskType = TaskType.RETURN_VALUE; }

public Task(Runnable runnable) { this.runnable = runnable; this.taskType = TaskType.VOID_RETURN; } }

public static class TaskType {
/** * 有返回值的 */ public static final int RETURN_VALUE = 1;
/** * 没有返回值的 */ public static final int VOID_RETURN = 2; }}
复制代码


我将任务类型分为两种,并使用 TaskType 类封装成常量值:1 表示任务执行回调有返回值;2 表示任务执行没有返回值,属性填充将在任务执行过程中完成,该类型任务使用 Runnable 接口实现。


4、工具类 jar 包

相关代码我已经打包成 jar 包上传到 maven 中央仓库,可以通过引入以下 maven 依赖使用ConcurrentWorker工具类

<dependency>    <groupId>cn.kdyzm</groupId>    <artifactId>kdyzm-util</artifactId>    <version>0.0.2</version></dependency>
复制代码


文章转载自:狂盗一枝梅

原文链接:https://www.cnblogs.com/kuangdaoyizhimei/p/18344600

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
简单设计一个JAVA并行处理工具类_Java_快乐非自愿限量之名_InfoQ写作社区