写点什么

深入剖析 Spring WebFlux

发布于: 刚刚

一、WebFlux 简介


WebFlux 是 Spring Framework5.0 中引入的一种新的反应式 Web 框架。通过 Reactor 项目实现 Reactive Streams 规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步 IO 能够以少量而稳定的线程处理更高的吞吐,规避文件 IO/网络 IO 阻塞带来的线程堆积。


1.1 WebFlux 的特性


WebFlux 具有以下特性:

  • 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞 IO 模型,Spring WebFlux 这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。

  • 响应式函数编程 - 相对于 Java8 Stream 同步、阻塞的 Pull 模式,Spring Flux 采用 Reactor Stream 异步、非阻塞 Push 模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。

  • 不拘束于 Servlet - 可以运行在传统的 Servlet 容器(3.1+版本),还能运行在 Netty、Undertow 等 NIO 容器中。


1.2 WebFlux 的设计目标


  • 适用高并发

  • 高吞吐量

  • 可伸缩性

二、Spring WebFlux 组件介绍


2.1 HTTPHandler


一个简单的处理请求和响应的抽象,用来适配不同 HTTP 服务容器的 API。



2.2 WebHandler


一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;



2.3 DispatcherHandler


请求处理的总控制器,实际工作是由多个可配置的组件来处理。



WebFlux 是兼容 Spring MVC 基于 @Controller,@RequestMapping 等注解的编程开发方式的,可以做到平滑切换。


2.4 Functional Endpoints


这是一个轻量级函数编程模型。是基于 @Controller,@RequestMapping 等注解的编程模型的替代方案,提供一套函数式 API 用于创建 Router,Handler 和 Filter。调用处理组件如下:



简单的 RouterFuntion 路由注册和业务处理过程:

@Beanpublic RouterFunction<ServerResponse> initRouterFunction() {    return RouterFunctions.route()        .GET("/hello/{name}", serverRequest -> {            String name = serverRequest.pathVariable("name");            return ServerResponse.ok().bodyValue(name);        }).build();}
复制代码


请求转发处理过程:



2.5 Reactive Stream


这是一个重要的组件,WebFlux 就是利用 Reactor 来重写了传统 Spring MVC 逻辑。其中 Flux 和 Mono 是 Reactor 中两个关键概念。掌握了这两个概念才能理解 WebFlux 工作方式。


Flux 和 Mono 都实现了 Reactor 的 Publisher 接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux 或者 Mono 会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。


Mono 工作流程图



只会在发送出单个结果后完成。


Flux 工作流程图



发送出零个或者多个,可能无限个结果后才完成。

对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。对于非流类型:application/json  WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。
复制代码


三、WebFlux 工作原理


3.1 组件装配过程



流程相关源码解析-WebFluxAutoConfiguration

@Configuration//条件装配 只有启动的类型是REACTIVE时加载@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)//只有存在 WebFluxConfigurer实例  时加载@ConditionalOnClass(WebFluxConfigurer.class)//在不存在  WebFluxConfigurationSupport实例时 加载@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })//在之后装配@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,      CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })//自动装配顺序@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)public class WebFluxAutoConfiguration {   @Configuration   @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })   //接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration   @Import({ EnableWebFluxConfiguration.class })   public static class WebFluxConfig implements WebFluxConfigurer {      //隐藏部分源码     /**     * Configuration equivalent to {@code @EnableWebFlux}.     */   }     @Configuration    public static class EnableWebFluxConfiguration            extends DelegatingWebFluxConfiguration {        //隐藏部分代码    }    @Configuration    @ConditionalOnEnabledResourceChain    static class ResourceChainCustomizerConfiguration {        //隐藏部分代码    }    private static class ResourceChainResourceHandlerRegistrationCustomizer            implements ResourceHandlerRegistrationCustomizer {        //隐藏部分代码    }
复制代码


WebFluxAutoConfiguration 自动装配时先自动装配 EnableWebFluxConfiguration 而 EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。


最终 WebFluxConfigurationSupport 不仅配置 DispatcherHandler 还同时配置了其他很多 WebFlux 核心组件包括 异常处理器 WebExceptionHandler,映射处理器处理器 HandlerMapping,请求适配器 HandlerAdapter,响应处理器 HandlerResultHandler 等。


DispatcherHandler 创建初始化过程如下;

public class WebFluxConfigurationSupport implements ApplicationContextAware {   //隐藏部分代码   @Nullable   public final ApplicationContext getApplicationContext() {      return this.applicationContext;   }	//隐藏部分代码   @Bean   public DispatcherHandler webHandler() {      return new DispatcherHandler();   }
复制代码


public class DispatcherHandler implements WebHandler, ApplicationContextAware {   @Nullable   private List<HandlerMapping> handlerMappings;   @Nullable   private List<HandlerAdapter> handlerAdapters;   @Nullable   private List<HandlerResultHandler> resultHandlers;   	@Override   public void setApplicationContext(ApplicationContext applicationContext) {		initStrategies(applicationContext);   }   protected void initStrategies(ApplicationContext context) {   	  //注入handlerMappings      Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(            context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); //注入handlerAdapters Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); //注入resultHandlers Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); }
复制代码


流程相关源码解析-HTTPHandlerAutoConfiguration


上面已讲解过 WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext      implements ConfigurableWebServerApplicationContext {   @Override   protected void onRefresh() {      super.onRefresh();      try {         createWebServer();      }      catch (Throwable ex) {         throw new ApplicationContextException("Unable to start reactive web server", ex);      }   }   private void createWebServer() {      WebServerManager serverManager = this.serverManager;      if (serverManager == null) {         String webServerFactoryBeanName = getWebServerFactoryBeanName();         ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);         boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();         // 这里创建容器管理时注入httpHandler         this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);         getBeanFactory().registerSingleton("webServerGracefulShutdown",               new WebServerGracefulShutdownLifecycle(this.serverManager));         // 注册一个 web容器启动服务类,该类继承了SmartLifecycle         getBeanFactory().registerSingleton("webServerStartStop",               new WebServerStartStopLifecycle(this.serverManager));      }      initPropertySources();   }   protected HttpHandler getHttpHandler() {		String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);		if (beanNames.length == 0) {			throw new ApplicationContextException(					"Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");		}		if (beanNames.length > 1) {			throw new ApplicationContextException(					"Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "							+ StringUtils.arrayToCommaDelimitedString(beanNames));		}        //容器上下文获取httpHandler		return getBeanFactory().getBean(beanNames[0], HttpHandler.class);	}
复制代码


而这个 HTTPHandler 是由 HTTPHandlerAutoConfiguration 装配进去的。

@Configuration@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)@ConditionalOnMissingBean(HttpHandler.class)@AutoConfigureAfter({ WebFluxAutoConfiguration.class })@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)public class HttpHandlerAutoConfiguration {   @Configuration   public static class AnnotationConfig {      private ApplicationContext applicationContext;      public AnnotationConfig(ApplicationContext applicationContext) {         this.applicationContext = applicationContext;      }      //构建WebHandler      @Bean      public HttpHandler httpHandler() {         return WebHttpHandlerBuilder.applicationContext(this.applicationContext)               .build();      }   }
复制代码


流程相关源码解析-web 容器

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建 WebServerManager 容器管理器时会获取对应 web 容器实例,并注入响应的 HTTPHandler。

class WebServerManager {   private final ReactiveWebServerApplicationContext applicationContext;   private final DelayedInitializationHttpHandler handler;   private final WebServer webServer;   WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,         Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {      this.applicationContext = applicationContext;      Assert.notNull(factory, "Factory must not be null");      this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);      this.webServer = factory.getWebServer(this.handler);   }}
复制代码


以 Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接 Servlet 请求到 HTTPHandler 组件。

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {    //隐藏部分代码       @Override    public WebServer getWebServer(HttpHandler httpHandler) {        if (this.disableMBeanRegistry) {            Registry.disableRegistry();        }        Tomcat tomcat = new Tomcat();        File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");        tomcat.setBaseDir(baseDir.getAbsolutePath());        Connector connector = new Connector(this.protocol);        connector.setThrowOnFailure(true);        tomcat.getService().addConnector(connector);        customizeConnector(connector);        tomcat.setConnector(connector);        tomcat.getHost().setAutoDeploy(false);        configureEngine(tomcat.getEngine());        for (Connector additionalConnector : this.additionalTomcatConnectors) {            tomcat.getService().addConnector(additionalConnector);        }        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);        prepareContext(tomcat.getHost(), servlet);        return getTomcatWebServer(tomcat);    }}
复制代码


最后 Spring 容器加载后通过 SmartLifecycle 实现类 WebServerStartStopLifecycle 来启动 Web 容器。

WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer


3.2 完整请求处理流程



(引用自:https://blog.csdn.net)


该图给出了一个 HTTP 请求处理的调用链路。是采用 Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于 WebFlux 开发时要避免 controller 中存在阻塞逻辑。列举下面例子可以看到 Spring MVC 和 Spring Webflux 之间的请求处理区别。

@RestControllerpublicclass TestController {    private Logger logger = LoggerFactory.getLogger(this.getClass());    @GetMapping("sync")    public String sync() {        logger.info("sync method start");        String result = this.execute();        logger.info("sync method end");        return result;    }    @GetMapping("async/mono")    public Mono<String> asyncMono() {        logger.info("async method start");        Mono<String> result = Mono.fromSupplier(this::execute);        logger.info("async method end");        return result;    }    private String execute() {        try {            TimeUnit.SECONDS.sleep(5);        } catch (InterruptedException e) {            e.printStackTrace();        }        return "hello";    }}
复制代码


日志输出

2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end
复制代码


从上面例子可以看出 sync() 方法阻塞了请求,而 asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了 Mono 中 Supplier 中的了。当 execute 处理完业务逻辑后通过回调方式响应给浏览器。


四、存储支持


一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。


NOSQL Database


  • MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。

  • Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。


Relational Database


  • H2 (io.r2dbc:r2dbc-h2)

  • MariaDB (org.mariadb:r2dbc-mariadb)

  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)

  • MySQL (dev.miku:r2dbc-mysql)

  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)

  • Postgres (io.r2dbc:r2dbc-postgresql)

  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)


五、总结


关于 Spring MVC 和 Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。


基本依赖

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <!-- r2dbc 连接池 --> <dependency>     <groupId >io.r2dbc</groupId>     <artifactId>r2dbc-pool</artifactId> </dependency> <!--r2dbc mysql 库--> <dependency>     <groupId>dev.miku</groupId>     <artifactId>r2dbc- mysql</artifactId> </dependency> <!--自动配置需要引入一个嵌入式数据库类型对象--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency><!-- 反应方程式 web 框架 webflux--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
复制代码


相同数据下效果如下



Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。



Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。


作者:vivo 互联网服务器团队-Zhou Changqing

发布于: 刚刚阅读数: 2
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
深入剖析 Spring WebFlux