写点什么

Reactive Spring 实战 -- WebFlux 使用教程

用户头像
binecy
关注
发布于: 2021 年 01 月 17 日

WebFlux 是 Spring 5 提供的响应式 Web 应用框架。

它是完全非阻塞的,可以在 Netty,Undertow 和 Servlet 3.1+等非阻塞服务器上运行。

本文主要介绍 WebFlux 的使用。

FluxWeb vs no FluxWeb

WebFlux 是完全非阻塞的。

在 FluxWeb 前,我们可以使用 DeferredResult 和 AsyncRestTemplate 等方式实现非阻塞的 Web 通信。

我们先来比较一下这两者。


注意:关于同步阻塞与异步非阻塞的性能差异,本文不再阐述。

阻塞即浪费。我们通过异步实现非阻塞。只有存在阻塞时,异步才能提高性能。如果不存在阻塞,使用异步反而可能由于线程调度等开销导致性能下降。


下面例子模拟一种业务场景。

订单服务提供接口查找订单信息,同时,该接口实现还需要调用仓库服务查询仓库信息,商品服务查询商品信息,并过滤,取前 5 个商品数据。


OrderService 提供如下方法

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {    // [1]    Order order = mockOrder(orderId);    // [2]    ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);    ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =                    asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),                            HttpMethod.GET,  null, new ParameterizedTypeReference<List<Goods>>(){});    // [3]    CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {        logger.warn("get user err", err);        return new ResponseEntity(new User(), HttpStatus.OK);    });    CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {        logger.warn("get goods err", err);        return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);    });    // [4]    warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {            order.setWarehouse(warehouseRes.getBody());            List<Goods> goods = goodsRes.getBody().stream()                    .filter(g -> g.getPrice() > 10).limit(5)                    .collect(Collectors.toList());            order.setGoods(goods);        return order;    }).whenCompleteAsync((o, err)-> {        // [5]        if(err != null) {            logger.warn("err happen:", err);        }        rs.setResult(o);    });}
复制代码
  1. 加载订单数据,这里 mack 了一个数据。

  2. 通过 asyncRestTemplate 获取仓库,产品信息,得到 ListenableFuture。

  3. 设置 ListenableFuture 异常处理,避免因为某个请求报错导致接口失败。

  4. 合并仓库,产品请求结果,组装订单数据

  5. 通过 DeferredResult 设置接口返回数据。


可以看到,代码较繁琐,通过 DeferredResult 返回数据的方式也与我们同步接口通过方法返回值返回数据的方式大相径庭。


这里实际存在两处非阻塞

  1. 使用 AsyncRestTemplate 实现发送异步 Http 请求,也就是说通过其他线程调用仓库服务和产品服务,并返回 CompletableFuture,所以不阻塞 getOrderByRest 方法线程。

  2. DeferredResult 负责异步返回 Http 响应。

getOrderByRest 方法中并不阻塞等待 AsyncRestTemplate 返回,而是直接返回,等到 AsyncRestTemplate 返回后通过回调函数设置 DeferredResult 的值将数据返回给 Http,可对比以下阻塞等待的代码

ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();order.setWarehouse(warehouseRes.getBody());order.setGoods(goodsRes.getBody());return order;
复制代码


下面我们使用 WebFlux 实现。

pom 引入依赖

    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-webflux</artifactId>    </dependency>
复制代码


服务启动类 OrderServiceReactive

@EnableDiscoveryClient@SpringBootApplicationpublic class OrderServiceReactive{    public static void main( String[] args )    {        new SpringApplicationBuilder(                OrderServiceReactive.class)                .web(WebApplicationType.REACTIVE).run(args);    }}
复制代码

WebApplicationType.REACTIVE 启动 WebFlux。


OrderController 实现如下

@GetMapping("/{id}")public Mono<Order> getById(@PathVariable long id) {    return service.getOrder(id);}
复制代码

注意返回一个 Mono 数据,Mono 与 Flux 是 Spring Reactor 提供的异步数据流。

WebFlux 中通常使用 Mono,Flux 作为数据输入,输出值。

当接口返回 Mono,Flux,Spring 知道这是一个异步请求结果。

关于 Spring Reactor,可参考理解Reactor的设计与实现


OrderService 实现如下

public Mono<Order> getOrder(long orderId) {    // [1]    Mono<Order> orderMono = mockOrder(orderId);    // [2]    return orderMono.flatMap(o -> {        // [3]        Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());        Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +                StringUtils.join(o.getGoodsIds(), ","), Goods.class)                .filter(g -> g.getPrice() > 10)                .take(5)                .onErrorReturn(new Goods());        // [4]        return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {            o.setUser(u);            o.setGoods(gs);            return o;        });    });}
private <T> Mono<T> getMono(String url, Class<T> resType) { return webClient.get().uri(url).retrieve().bodyToMono(resType);}
// getFlux
复制代码
  1. 加载订单数据,这里 mock 了一个 Mono 数据

  2. flatMap 方法可以将 Mono 中的数据转化类型,这里转化后的结果还是 Order。

  3. 获取仓库,产品数据。这里可以看到,对产品过滤,取前 5 个的操作可以直接添加到 Flux<Goods>上。

  4. zipWith 方法可以组合两个 Mono,并返回新的 Mono 类型,这里组合仓库、产品数据,最后返回 Mono<Order>。

可以看到,代码整洁不少,并且接口返回 Mono<Order>,与我们在同步接口中直接数据的做法类似,不需要借助 DeferredResult 这样的工具类。

我们通过 WebClient 发起异步请求,WebClient 返回 Mono 结果,虽然它并不是真正的数据(它是一个数据发布者,等请求数据返回后,它才把数据送过来),但我们可以通过操作符方法对他添加逻辑,如过滤,排序,组合,就好像同步操作时已经拿到数据那样。

而在 AsyncRestTemplate,则所有的逻辑都要写到回调函数中。


WebFlux 是完全非阻塞的。

Mono、Flux 的组合函数非常有用。

上面方法中先获取订单数据,再同时获取仓库,产品数据,

如果接口参数同时传入了订单 id,仓库 id,产品 id,我们也可以同时获取这三个数据,再组装起来

public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {    Mono<Order> orderMono = mockOrderMono(orderId);
return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> { o.setWarehouse(w); return o; }).zipWith(getFlux("http://goods-service/goods/mock/list?ids=" + StringUtils.join(goodsIds, ","), Goods.class) .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> { o.setGoods(gs); return o; });}
复制代码


如果我们需要串行获取订单,仓库,商品这三个数据,实现如下

public Mono<Order> getOrderInLabel(long orderId) {    Mono<Order> orderMono = mockOrderMono(orderId);
return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> { o.setWarehouse(w); return o; }).zipWhen(o -> getFlux("http://goods-service/goods/mock/list?ids=" + StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class) .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> { o.setGoods(gs); return o; });}
复制代码

zipWith 方法会同时请求待合并的两个 Mono 数据,而 zipWhen 方法则会阻塞等待第一个 Mono 数据到达在请求第二个 Mono 数据。

orderMono.zipWhen(...).zipWhen(...)中,第一个 zipWhen 方法会阻塞等待 orderMono 数据返回再使用 order 数据构造新的 Mono 数据,第二个 zipWhen 方法也会等待前面 zipWhen 构建的 Mono 数据返回再构建新 Mono,

所以在第二个 zipWhen 方法中,可以调用 o.getWarehouse().getLabel(),因为第一个 zipWhen 已经获取到仓库信息。


下面说一个 WebFlux 的使用。

分为两部分,WebFlux 服务端与 WebClient。


WebFlux 服务端

底层容器切换

WebFlux 默认使用 Netty 实现服务端异步通信,可以通过更换依赖包切换底层容器

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-webflux</artifactId>    <exclusions>    <exclusion>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-netty</artifactId>    </exclusion>    </exclusions></dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId></dependency>
复制代码


注解

WebFlux 支持 SpringMvc 大部分的注解,如

映射:@Controller,@GetMapping,@PostMapping,@PutMapping,@DeleteMapping

参数绑定:@PatchMapping,@RequestParam,@RequestBody,@RequestHeader,@PathVariable,@RequestAttribute,@SessionAttribute

结果解析:@ResponseBody,@ModelAttribute

这些注解的使用方式与 springMvc 相同


命令式映射

WebFlux 支持使用命令式编程指定映射关系

@Beanpublic RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {    return route()            .GET("/invoice/{orderId}",  accept(APPLICATION_JSON), invoiceHandler::get)            .build();}
复制代码

调用"/invoice/{orderId}",请求会转发到 invoiceHandler#get 方法


invoiceHandler#get 方法实现如下

public Mono<ServerResponse> get(ServerRequest request) {    Invoice invoice = new Invoice();    invoice.setId(999L);    invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));    return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);}
复制代码


Filter

可以通过实现 WebFilter 接口添加过滤器

@Componentpublic class TokenCheckFilter implements WebFilter {    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {        if(!exchange.getRequest().getHeaders().containsKey("token")) {            ServerHttpResponse response =  exchange.getResponse();            response.setStatusCode(HttpStatus.FORBIDDEN);            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));        } else {            exchange.getAttributes().put("auth", "true");            return chain.filter(exchange);        }    }}
复制代码

上面实现的是前置过滤器,在调用逻辑方法前的检查请求 token


实现后置过滤器代码如下

@Componentpublic class LogFilter  implements WebFilter {    private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {        // [1]        logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());        return chain.filter(exchange)            .doFinally(s -> {                // [2]                logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());            });    }}
复制代码

注意,[1]处 exchange.getResponse()返回的是初始化状态的 response,并不是请求处理后返回的 response。


异常处理

通过 @ExceptionHandler 注解定义一个全局的异常处理器

@ControllerAdvicepublic class ErrorController {    private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);
@ResponseBody @ExceptionHandler({NullPointerException.class}) @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) public String nullException(NullPointerException e) { logger.error("global err handler", e); return "{\"msg\":\"There is a problem\"}"; }}
复制代码


WebFluxConfigurer

WebFlux 中可以通过 WebFluxConfigurer 做自定义配置,如配置自定义的结果解析

@Configuration@EnableWebFluxpublic class WebConfig implements WebFluxConfigurer {    public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {        configurer.addCustomResolver(new HandlerMethodArgumentResolver() {            ...        });    }
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) { configurer.customCodecs().register(new HttpMessageWriter() { ... }); }}
复制代码

configureArgumentResolvers 方法配置参数绑定处理器

configureHttpMessageCodecs 方法配置 Http 请求报文,响应报文解析器


@EnableWebFlux 要求 Spring 从 WebFluxConfigurationSupport 引入 Spring WebFlux 配置。如果你的依赖中引入了 spring-boot-starter-webflux,Spring WebFlux 将自动配置,不需要添加该注解。

但如果你只使用 Spring WebFlux 而没有使用 Spring Boot,这是需要添加 @EnableWebFlux 启动 Spring WebFlux 自动化配置。


Spring Flux 支持 CORS,Spring Security,HTTP/2,更多内容不再列出,请参考官方文档。


WebClient

WebClient 可以发送异步 Web 请求,并支持响应式编程。

下面说一个 WebClient 的使用。


底层框架

WebClient 底层使用的 Netty 实现异步 Http 请求,我们可以切换底层库,如 Jetty

@Beanpublic JettyResourceFactory resourceFactory() {    return new JettyResourceFactory();}
@Beanpublic WebClient webClient() { HttpClient httpClient = HttpClient.create(); ClientHttpConnector connector = new JettyClientHttpConnector(httpClient, resourceFactory()); return WebClient.builder().clientConnector(connector).build();}
复制代码


连接池

WebClient 默认是每个请求创建一个连接。

我们可以配置连接池复用连接,以提高性能。

ConnectionProvider provider = ConnectionProvider.builder("order")    .maxConnections(100)    .maxIdleTime(Duration.ofSeconds(30))    .pendingAcquireTimeout(Duration.ofMillis(100))      .build();return WebClient    .builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));
复制代码

maxConnections:允许的最大连接数

pendingAcquireTimeout:没有连接可用时,请求等待的最长时间

maxIdleTime:连接最大闲置时间


超时

底层使用 Netty 时,可以如下配置超时时间

import io.netty.handler.timeout.ReadTimeoutHandler;import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create() .doOnConnected(conn -> conn .addHandlerLast(new ReadTimeoutHandler(10)) .addHandlerLast(new WriteTimeoutHandler(10)));
复制代码


或者直接使用 responseTimeout

HttpClient httpClient = HttpClient.create()        .responseTimeout(Duration.ofSeconds(2));
复制代码


Post Json

WebClient 可以发送 json,form,文件等请求报文,

看一个最常用的 Post Json 请求

webClient.post().uri("http://localhost:9004/order/")    .contentType(MediaType.APPLICATION_JSON)    .body(Mono.just(order), Order.class)    .retrieve().bodyToMono(String.class)
复制代码


异常处理

可以在 ResponseSpec 中指定异常处理

private <T> Mono<T> getMono(String url, Class<T> resType) {return webClient    .get().uri(url).retrieve()    .onStatus(HttpStatus::is5xxServerError, clientResponse -> {        return Mono.error(...);    })    .onStatus(HttpStatus::is4xxClientError, clientResponse -> {        return Mono.error(...);    })    .onStatus(HttpStatus::isError, clientResponse -> {        return Mono.error(...);    })    .bodyToMono(resType)}
复制代码


也可以在 HttpClient 上配置

HttpClient httpClient = HttpClient.create()        .doOnError((req, err) -> {            log.error("err on request:{}", req.uri(), err);        }, (res, err) -> {            log.error("err on response:{}", res.uri(), err);        })
复制代码


同步返回结果

使用 block 方法可以阻塞线程,等待请求返回

private <T> T syncGetMono(String url, Class<T> resType) {    return webClient            .get().uri(url).retrieve()            .bodyToMono(resType).block();}
复制代码


获取响应信息

exchangeToMono 可以获取到响应的 header,statusCode 等信息

private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {    return webClient            .get()            .uri(url)            .exchangeToMono(response -> {                logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());                return response.bodyToMono(resType);            });}
复制代码


注册中心与 Ribbon

经验证,WebClient 支持 Eureka 注册中心与 Ribbon 转发,使用方式与 restTemplate 相同。

不过 @LoadBalanced 需要添加在 WebClient.Builder 上

@Bean@LoadBalancedpublic WebClient.Builder loadBalancedWebClientBuilder() {    return WebClient.builder();}
复制代码


官方文档:https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

文章完整代码:https://gitee.com/binecy/bin-springreactive/tree/master/order-service


实际项目中,线程阻塞场景往往不只有 Http 请求阻塞,还有 Mysql 请求,Redis 请求,Kafka 请求等等导致的阻塞。从这些数据源中获取数据时,大多数都是阻塞直到数据源返回数据。

而 Reactive Spring 强大在于,它也支持这些数据源的非阻塞响应式编程。

下一篇文章,我们来看一个如何实现 Redis 的非阻塞响应式编程。


如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!



发布于: 2021 年 01 月 17 日阅读数: 111
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Reactive Spring实战 -- WebFlux使用教程