性能优化 - 技术专题 - 并发编程

系统性能优化之并发编程
一.什么是高并发
高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一。它通常是指,通过设计保证系统能够同时并行处理很多请求。
高并发指标通常有四点:响应时间(RT)、吞吐量(Throughput)、QPS(Queries-per-second)、并发用户数。
注:一个用户一秒内狂点按钮 10 次,这不是并发。只有 10 次请求在同一时刻同时向服务器发送请求(比如用 CountDownLatch 模拟实现),这才是并发。
1.1 响应时间(RT)
响应时间是指系统对请求作出响应的时间,一般来讲,用户能接受的响应时间是小于 5 秒。
1.2 吞吐量(Throughput)
吞吐量是指系统在单位时间内处理请求的数量。对于无并发的应用系统而言,吞吐量与响应时间成严格的反比关系,实际上此时吞吐量就是响应时间的倒数。
一个系统的吞度量(承压能力)与 request 对 CPU 的消耗、外部接口、IO 等等紧密关联。
系统吞吐量几个重要参数:QPS(TPS)、并发数、响应时间
QPS(TPS):每秒钟 request/事务数量
并发数: 系统同时处理的 request/事务数量
响应时间: 一般取平均响应时间
理解了上面三个要素的意义之后,就能推算出它们之间的关系:
QPS(TPS)= 并发数/平均响应时间
一个系统吞吐量通常由 QPS(TPS)、并发数两个因素决定,每套系统这两个值都有一个相对极限值,在应用场景访问压力下,只要某一项达到系统最高值,系统的吞吐量就上不去了,如果压力继续增大,系统的吞吐量反而会下降,原因是系统超负荷工作,上下文切换、内存等等其它消耗导致系统性能下降。
1.3 QPS(Queries-per-second)
每秒响应请求数
1.4 并发用户数
并发用户数是指系统可以同时承载的正常使用系统功能的用户的数量。与吞吐量相比,并发用户数是一个更直观但也更笼统的性能指标。实际上,并发用户数是一个非常不准确的指标,因为用户不同的使用模式会导致不同用户在单位时间发出不同数量的请求。
二、高并发带来的问题
2.1 服务端
某一时间片刻系统流量异常高,系统濒临阀值;
服务器 CPU,内存爆满,磁盘 IO 繁忙;
系统雪崩:分布式系统中经常会出现某个基础服务不可用造成整个系统不可用的情况,这种现象被称为服务雪崩效应。服务雪崩效应是一种因服务提供者的不可用导致服务调用者的不可用,并将不可用逐渐放大的过程。A 为服务提供者,B 为 A 的服务调用者,C 和 D 是 B 的服务调用者。当 A 的不可用,引起 B 的不可用,并将不可用逐渐放大 C 和 D 时,服务雪崩就形成了。
2.2 用户角度
使用体验差
三、并发解决方案-四大法定
缓存:Redis
异步:消息中间件 MQ
并发编程
分布式
四、优化方案——并发编程
4.1 阿里笔试题
支付宝登录后,请问你可以从哪些角度优化提升性能?业务场景架构图
4.2 模拟业务场景
采用 SpringBoot 创建两个项目,一个项目(remote-server)为模拟远程服务接口项目,提供用户基本信息、用户余额查询等 API。另外一个项目(app-server)调用远程服务接口,然后用 fastjson 组装数据后向 APP 端提供统一的用户信息 API。
4.2.1 remote-server 项目
1、端口设置 8081
server: port: 8081
2、导入我们需要用到的 fastjson
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>3、创建两个实体类,分别为:用户基本信息
public class UserBaseInfo { private String userId; private String userName; private String sex; private int age; // get set 省略}
用户余额
public class UserMoney { private String userId; private String money; // get set 省略}
4、分别提供两个 API 供远程调用,分别为:用户基本信息 API
@RestController@RequestMapping("users-base")public class UserBaseInfoController { @GetMapping("/{userId}") public UserBaseInfo getUserBaseInfo(@PathVariable String userId){ System.out.println("userId:"+userId); try { // 模拟业务逻辑,等待2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } UserBaseInfo userBaseInfo = new UserBaseInfo(); userBaseInfo.setUserId("1"); userBaseInfo.setUserName("AlanChen"); userBaseInfo.setSex("男"); userBaseInfo.setAge(18); return userBaseInfo; }}用户余额 API
@RestController@RequestMapping("users-money")public class UserMoneyController {
@GetMapping("/{userId}") public UserMoney getUserMoney(@PathVariable String userId){ System.out.println("userId:"+userId); try { // 模拟业务逻辑,等待2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } UserMoney userMoney = new UserMoney(); userMoney.setUserId("1"); userMoney.setMoney("1000"); return userMoney; }}
5、用 Postman 测试接口如下
用户基本信息
用户余额
4.2.2 app-server 项目
1、端口设置为 8888
server: port: 8888
2、导入我们需要用到的 fastjson
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
3、用 Spring 的 RestTemplate 接口实现远程调用服务
/** * @author Alan Chen * @description 远程接口 * @date 2020-07-20 */@Servicepublic class RemoteService {
/** * 获取用户基本信息 * @param userId * @return */ public String getUserInfo(String userId){ RestTemplate restTemplate = new RestTemplate(); long t1 = System.currentTimeMillis(); String result = restTemplate.getForObject("http://127.0.0.1:8081/users-base/{userId}",String.class,userId);
System.out.println("获取用户基本信息时间为:"+(System.currentTimeMillis()-t1)); return result; }
/** * 获取用户余额 * @param userId * @return */ public String getUserMoney(String userId){ RestTemplate restTemplate = new RestTemplate(); long t1 = System.currentTimeMillis(); String result = restTemplate.getForObject("http://127.0.0.1:8081/users-money/{userId}",String.class,userId);
System.out.println("获取用户余额时间为:"+(System.currentTimeMillis()-t1)); return result; }}
4、调用远程接口,数据组装后返回给客户端
@Servicepublic class UserService {
@Autowired RemoteService remoteService;
ExecutorService task = Executors.newFixedThreadPool(10);
/** * 串行调用远程接口 * @param userId * @return */ public String getUserInfo(String userId){
long t1 = System.currentTimeMillis();
// 分别调用两个接口 String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1);
String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2);
// 结果集进行合并 JSONObject result = new JSONObject(); result.putAll(userInfo); result.putAll(moneyInfo);
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString(); }
/** * 线程并行调用远程接口(Thread+FutureTask+Callable) * @param userId * @return */ public String getUserInfoThread(String userId){ // Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } };
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } };
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo); FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
new Thread(queryUserInfoFutureTask).start(); new Thread(queryUserMoneyFutureTask).start();
// 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); } System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1)); return result.toString(); }
/** * 线程并行调用远程接口(Thread+Callable+自定义FutureTask) * @param userId * @return */ public String getUserInfoThreadMyFutureTask(String userId){ // Runnable没有返回值,Callable有返回值 long t1 = System.currentTimeMillis(); Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } }; Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } }; AlanChenFutureTask<JSONObject> queryUserInfoFutureTask = new AlanChenFutureTask<>(queryUserInfo); AlanChenFutureTask <JSONObject> queryUserMoneyFutureTask = new AlanChenFutureTask<>(queryUserMoney); new Thread(queryUserInfoFutureTask).start(); new Thread(queryUserMoneyFutureTask).start(); // 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); } System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1)); return result.toString(); }
/** * 线程并行调用远程接口(Thread+FutureTask+Callable+ExecutorService) * @param userId * @return */ public String getUserInfoThreadPool(String userId){ // Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } };
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } };
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo); FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
//用线程池执行 task.submit(queryUserInfoFutureTask); task.submit(queryUserMoneyFutureTask);
// 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); }
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString(); }
/** * 异步请求(节省tomcat线程池线程) Callable或DeferredResult * @param userId * @return */ public Callable<String> getUserInfoAsync(@PathVariable String userId){ long t = System.currentTimeMillis(); System.out.println("主线程开始..."+Thread.currentThread());
Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { long t1 = System.currentTimeMillis(); System.out.println("子线程开始..."+Thread.currentThread());
String result = getUserInfoThreadPool(userId);
System.out.println("子线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t1)); return result; } };
System.out.println("主线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t)); return callable; }}
5、提供 Controller 接口给客户端访问
@RestController@RequestMapping("users")public class UserController {
@Autowired UserService userService;
@GetMapping("/{userId}") public String getUserInfo(@PathVariable String userId){ return userService.getUserInfo(userId); }
@GetMapping("/thread/{userId}") public String getUserInfoThread(@PathVariable String userId){ return userService.getUserInfoThread(userId); }
@GetMapping("/thread/pool/{userId}") public String getUserInfoThreadPool(@PathVariable String userId){ return userService.getUserInfoThreadPool(userId); }
@GetMapping("/thread/my_future_task/{userId}") public String getUserInfoThreadMyFutureTask(@PathVariable String userId){ return userService.getUserInfoThreadMyFutureTask(userId); }
@GetMapping("/thread/async/{userId}") public Callable<String> getUserInfoAsync(@PathVariable String userId){ return userService.getUserInfoAsync(userId); }
}
4.3 优化方案详解
所有的优化方案都在 app-server 的 UserService 实现类里
4.3.1 常规实现方式:串行调用远程接口
/** * 串行调用远程接口 * @param userId * @return */ public String getUserInfo(String userId){
long t1 = System.currentTimeMillis();
// 分别调用两个接口 String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1);
String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2);
// 结果集进行合并 JSONObject result = new JSONObject(); result.putAll(userInfo); result.putAll(moneyInfo);
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString(); }
执行结果
postman 执行结果
控制台打印执行时间
我看可以看到,执行时间大约是 4 秒钟。这种串行调用远程接口的方式,时间执行为各远程接口调用的时间总和,接口响应慢,不可取。
4.3.2 线程并行调用远程接口:Thread+FutureTask+Callable
/** * 线程并行调用远程接口(Thread+FutureTask+Callable) * @param userId * @return */ public String getUserInfoThread(String userId){ // Runnable没有返回值,Callable有返回值 long t1 = System.currentTimeMillis(); Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } }; Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } }; FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo); FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney); new Thread(queryUserInfoFutureTask).start(); new Thread(queryUserMoneyFutureTask).start(); // 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); } System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1)); return result.toString(); }
控制台打印执行时间
我看可以看到,执行时间大约是 2 秒钟,执行时间减少了一半。利用 Thread+FutureTask+Callable,可以同时发送多个远程调用请求,再用 FutureTask 的 get()方法阻塞拿到各异步请求的结果集,再进行合并。这种方案,接口执行的总时间取决于各异步远程接口调用的最长的那个时间。
线程并行调用接口
4.3.3 线程并行调用远程接口:Thread+Callable+自定义 FutureTask
FutureTask 除了用 JDK 自带的接口外,我们自己同样也可以实现一个简单的 FutureTask。
/** * @author Alan Chen * @description 自定义FutureTask * @date 2020-07-20 */public class AlanChenFutureTask<V> implements Runnable, Future<V> {
Callable<V> callable; V result;
public AlanChenFutureTask(Callable<V> callable){ this.callable = callable; }
@Override public void run() { try { result = callable.call(); synchronized (this){ this.notifyAll(); } } catch (Exception e) { e.printStackTrace(); } }
@Override public V get() throws InterruptedException, ExecutionException { if(result!=null){ return result; } synchronized (this){ //阻塞等待获取返回值 this.wait(); } return result; }
@Override public boolean cancel(boolean mayInterruptIfRunning) { return false; }
@Override public boolean isCancelled() { return false; }
@Override public boolean isDone() { return false; }
@Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; }}
/** * 线程并行调用远程接口(Thread+Callable+自定义FutureTask) * @param userId * @return */ public String getUserInfoThreadMyFutureTask(String userId){ // Runnable没有返回值,Callable有返回值 long t1 = System.currentTimeMillis(); Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } }; Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } }; AlanChenFutureTask<JSONObject> queryUserInfoFutureTask = new AlanChenFutureTask<>(queryUserInfo); AlanChenFutureTask <JSONObject> queryUserMoneyFutureTask = new AlanChenFutureTask<>(queryUserMoney); new Thread(queryUserInfoFutureTask).start(); new Thread(queryUserMoneyFutureTask).start(); // 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); } System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1)); return result.toString(); }
控制台打印执行时间
4.3.4 线程池并行调用远程接口:Thread+FutureTask+Callable+ExecutorService
我们可以进一步进行优化,将线程换成线程池
/** * 线程池并行调用远程接口(Thread+FutureTask+Callable+ExecutorService) * @param userId * @return */ public String getUserInfoThreadPool(String userId){ // Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v1 = remoteService.getUserInfo(userId); JSONObject userInfo = JSONObject.parseObject(v1); return userInfo; } };
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { String v2 = remoteService.getUserMoney(userId); JSONObject moneyInfo = JSONObject.parseObject(v2); return moneyInfo; } };
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo); FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
//用线程池执行 task.submit(queryUserInfoFutureTask); task.submit(queryUserMoneyFutureTask);
// 结果集进行合并 JSONObject result = new JSONObject(); try { result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果 result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果 } catch (Exception e) { e.printStackTrace(); }
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString(); }
控制台打印执行时间
4.3.5 异步请求(节省 tomcat 线程池线程) :Callable 或 DeferredResult
我们知道 tomcat 的请求连接数是有限的,如果接口响应时间过长,会占用和消耗 tomcat 的连接数。如果 tomcat 主线程接收到请求后,立即开启一个子线程异步去执行业务逻辑,然后 tomcat 主线程快速返回释放连接,等有结果后子线程再返回给前端客户端,这样就可以大大节省 tomcat 的连接数,提高 tomcat 连接利用率。具体实现如下:
异步请求架构图
/** * 异步请求(节省tomcat线程池线程) Callable或DeferredResult * @param userId * @return */ public Callable<String> getUserInfoAsync(@PathVariable String userId){ long t = System.currentTimeMillis(); System.out.println("主线程开始..."+Thread.currentThread()); Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { long t1 = System.currentTimeMillis(); System.out.println("子线程开始..."+Thread.currentThread()); String result = getUserInfoThreadPool(userId); System.out.println("子线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t1)); return result; } }; System.out.println("主线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t)); return callable; }
postman 执行测试
控制台打印结果
我们从打印结果中可以看到,tomcat 主线程几乎只用了 0 毫秒就快速返回了,等子线程取到结果后再返回给客户端,消耗时间大约为 2 秒。
李浩宇/Alex
我们始于迷惘,终于更高的迷惘. 2020.03.25 加入
一个酷爱计算机技术、健身运动、悬疑推理的极客狂人,大力推荐安利Java官方文档:https://docs.oracle.com/javase/specs/index.html











评论