阻塞与非阻塞
阻塞是指程序会一直等待该进程或线程完成当前任务期间不做其它事情。而非阻塞,是指当前线程在处理一些事情的同时,还可以处理其它的事情,并不需要等待当前事件完成才执行其它事件。
阻塞与非阻塞客户端
对于请求当中,我们有需要借助一些请求封装的客户端,这里可以分为两大类:阻塞式、非阻塞式。
阻塞式客户端以常见的 RestTemplate
为例,这是一种常见的客户端请求封装,要创建负载平衡RestTemplate
,下面看看其 Bean:
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
复制代码
在底层,RestTemplate 使用了基于每个请求对应一个线程模型(thread-per-request)的 Java Servlet API。在阻塞客户端中,这意味着,直到 Web 客户端收到响应之前,线程都将一直被阻塞下去。而阻塞带来的问题是:每个线程都消耗了一定的内存和 CPU 周期。
如果在并发下,等待结果的请求迟早都会堆积起来。这样,程序将创建很多线程,这些线程将耗尽线程池或占用所有可用内存。由于频繁的 CPU 线程切换,我们还会遇到性能下降的问题。
这在 Spring5 中,提出了一种新的客户端抽象:反应式客户端 WebClient
,而 WebClient 使用了 Spring Reactive Framework
所提供的异步非阻塞解决方案。所以,当 RestTemplate
创建一个个新的线程时,Webclient 是为其创建类似 task 的线程,并且在底层,Reactive 框架将对这些 task 进行排队,并且仅在适当的响应可用时再执行它们。WebClient 是 Spring WebFlux 库的一部分。所以,我们还可以使用了流畅的函数式 API 编程,并将响应类型作为声明来进行组合。如果需要使用 WebClient,同样可以创建:
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
复制代码
案例
假设这里有一个响应非常慢的服务 rest-service,我们分别用阻塞式、非阻塞式客户端来测试一下。
阻塞式
我们利用 RestTemplate
实现阻塞式请求:
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Autowired
RestTemplate restTemplate;
@GetMapping("/getClientRes")
public Response<Object> getClientRes() throws Exception {
System.out.println("block api enter");
HttpHeaders headers = new HttpHeaders();
MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
HttpEntity<String> formEntity = new HttpEntity<String>(null, headers);
String body = "";
try {
ResponseEntity<String> responseEntity = restTemplate.exchange("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service",
HttpMethod.GET, formEntity, String.class);
System.out.println(JSON.toJSONString(responseEntity));
if (responseEntity.getStatusCodeValue() == 200) {
System.out.println("block api exit");
return Response.ok(responseEntity.getBody());
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("block api failed, exit");
return Response.error("failed");
}
复制代码
在启动服务请求后,发现其打印:
block api enter
[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]
block api exit
复制代码
非阻塞式
上面的打印符合我们的逾期,接下来我们来看看非阻塞、反应式客户端请求:
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
@GetMapping(value = "/getClientResByWebClient", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<String> getClientResByWebClient() throws Exception {
System.out.println("no block api enter");
Mono<String> resp = webClientBuilder.build().get()
.uri("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service").retrieve()
.bodyToMono(String.class);
resp.subscribe(body -> System.out.println(body.toString()));
System.out.println("no block api exit");
return resp;
}
复制代码
执行完代码后,看打印:
no block api enter
no block api exit
[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]
复制代码
在本例中,WebClient 返回一个 Mono 生产者后完成方法的执行。如果一旦结果可用,发布者将开始向其订阅者发送数据。调用这个 API 的客户端(浏览器)也将订阅返回的 Mono 对象。
阻塞式转非阻塞式
可以将前面的阻塞式请求,直接转为非阻塞请求,前提是你使用的是 Spring5,此时,可以直接这样来写,贴代码:
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.fromCallable(() -> restTemplate.getForObject("http://diff-ns-service-service/all/getService", String.class))
.subscribeOn(Schedulers.elastic());
}
复制代码
这样后,在请求访问时,直接返回了提供者服务返回的信息体:
{"result":{"status":200,"code":0,"msg":"success"},"data":[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]}
复制代码
这里需要注意的是,请求时,需要直接返回服务提供者的标准信息体,不能再作二次封装返回,否则,只能拿到信息:
{"result":{"status":200,"code":0,"msg":"success"},"data":{"scanAvailable":true}}
复制代码
表示本次 callable 为 true,但这不是我们需要的信息,我们还是需要其本身返回的业务数据。所以需要提供者的返回标准化,因为直接将信息返回给可接收的浏览器等前端。
自定义返回体
从前面例子,我们可以看到:当发送请求获取响应后会直接返回给订阅者(浏览器),但有时候,我们可能需要作一些自定义的返回体,比如加一些状态码、说明、描述等。
此时,我们该如何处理呢?我们可以看到在 webClient 中,提供了 block 函数,该函数在返回信息时,可以阻塞其返回给前端,可以通过其来封装返回结果:
@GetMapping("/test2")
public Response<String> test2() {
Mono<String> resp = webClientBuilder.build()
.get()
.uri("http://diff-ns-service-service/all/getService")
.retrieve()
.bodyToMono(String.class);
return Response.ok(resp.block(Duration.ofSeconds(2)));
}
复制代码
这里在等待异常出现给定一个最大的时间,超时将抛出异常:RunTimeException
。这样,我们就可以自定义消息体返回格式了。
{"result":{"status":200,"code":0,"msg":"success"},"data":""\"[{\\\"host\\\":\\\"10.244.0.55\\\",\\\"instanceId\\\":\\\"71f96128-3bb1-11ec-97e6-ac1f6ba00d36\\\",\\\"metadata\\\":{\\\"kubectl.kubernetes.io/last-applied-configuration\\\":\\\"{\\\\\\\"apiVersion\\\\\\\":\\\\\\\"v1\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"Service\\\\\\\",\\\\\\\"metadata\\\\\\\":{\\\\\\\"annotations\\\\\\\":{},\\\\\\\"name\\\\\\\":\\\\\\\"cas-server-service\\\\\\\",\\\\\\\"namespace\\\\\\\":\\\\\\\"system-server\\\\\\\"},\\\\\\\"spec\\\\\\\":{\\\\\\\"ports\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"cas-server01\\\\\\\",\\\\\\\"port\\\\\\\":2000,\\\\\\\"targetPort\\\\\\\":\\\\\\\"cas-server01\\\\\\\"}],\\\\\\\"selector\\\\\\\":{\\\\\\\"app\\\\\\\":\\\\\\\"cas-server\\\\\\\"}}}\\\\n\\\",\\\"port.cas-server01\\\":\\\"2000\\\",\\\"k8s_namespace\\\":\\\"system-server\\\"},\\\"namespace\\\":\\\"system-server\\\",\\\"port\\\":2000,\\\"scheme\\\":\\\"http\\\",\\\"secure\\\":false,\\\"serviceId\\\":\\\"cas-server-service\\\",\\\"uri\\\":\\\"http://10.244.0.55:2000\\\"},{\\\"host\\\":\\\"10.244.0.56\\\",\\\"instanceId\\\":\\\"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36\\\",\\\"metadata\\\":{\\\"$ref\\\":\\\"$[0].metadata\\\"},\\\"namespace\\\":\\\"system-server\\\",\\\"port\\\":2000,\\\"scheme\\\":\\\"http\\\",\\\"secure\\\":false,\\\"serviceId\\\":\\\"cas-server-service\\\",\\\"uri\\\":\\\"http://10.244.0.56:2000\\\"}]\"""}
复制代码
当然,如果对方的消息体已经按照我们标准的格式输出了,我们可以直接返回这个消息体。
结论
在大部分场景下, RestTemplate
还是继续被使用的,但有些场景下,反应式非阻塞请求还是必须的,系统资源要少得多。WebClient
不失为是一个更好的选择。
评论