写点什么

SpringCloud Gateway 动态路由

用户头像
中原银行
关注
发布于: 2021 年 06 月 17 日
SpringCloud Gateway 动态路由

SpringCloud Gateway 系列文章共五篇,由我行开发工程师 @Aaron 提供,带大家深入剖析 Gateway 工作原理,及如何基于 Gateway 进行定制化开发以适应企业特定环境需求。


第一篇:SpringCloud Gateway 动态路由

第二篇:SpringCloud Gateway 路由数量对性能的影响研究

第三篇:SpringCloud Gateway 路由转发性能优化

第四篇:SpringCloud Gateway 路由断言。

第五篇:SpringCloud Gateway 过滤器。


SpringCloud Gateway 简介

SpringCloud Gateway 是 Spring 出品的 SpringCloud 微服务体系中的 API Gateway,基于 Spring5、Project Reactor、WebFlux 构建的一个高效的非阻塞式网关,提供了以下功能:

  • 可以根据所有的请求属性作为路由条件(Routing Predicates)

  • 提供了作用于指定路由的网关过滤器(Gateway Filter)

  • 提供了作用于所有路由的全局过滤器(Global filter)

  • 提供了断路器集成(Circuit Breaker)

  • 提供了服务发现集成(Discovery Client)

  • 提供了限流组件集成(Rate Limiting)

  • 路径重写(Path Rewriting)

本文内容主要涉及第一部分,路由查找。

Gateway 提供了多种路由加载方式,包括基于配置文件、服务发现、存储器等,本文从源码入手,详细解析路由查找-加载过程,并通过分析这个过程,研究如何进行拓展路由存储及加载方式,进而实现动态路由。

何谓动态路由

动态路由即:在不进行网关应用重启的情况下,可以通过管理 API 或者管理 UI 的方式添加路由,能实时或准实时生效;且在网关应用重启后,动态添加的路由仍然存在。

动态路由的两个基本要求:实时性和持久性。

Gateway 工作原理

查阅 SringCloud Gateway 官方文档,Gateway 工作原理如下图(本文主要涉及红框部分):


Clients make requests to Spring Cloud Gateway. If the Gateway Handler Mapping determines that a request matches a route, it is sent to the Gateway Web Handler. This handler runs the request through a filter chain that is specific to the request. The reason the filters are divided by the dotted line is that filters can run logic both before and after the proxy request is sent. All “pre” filter logic is executed. Then the proxy request is made. After the proxy request is made, the “post” filter logic is run.

客户端请求,首先会被Gateway Handler Mapping处理,用以在 路由表 中查找一个与请求匹配的 路由 ,然后将请求交由 Web Handler 处理,Web Handler 维护了一个过滤器链,链式执行这些过滤器,这些过滤器在逻辑上存在两个执行阶段 prepost

Gateway 源码解读

通过解读官方源码,梳理 gateway 工作机制,并寻找扩展点,以便实现 持久化动态路由表

RoutePredicateHandlerMapping



Gateway 中实现路由查找逻辑的 Gateway Handler MappingRoutePredicateHandlerMapping 类,该类在GatewayAutoConfiguration 中实现自动装配(Gateway 的 Bean 自动装备都是由此类实现) ,源码 260-266 行如下

@Beanpublic RoutePredicateHandlerMapping routePredicateHandlerMapping(    FilteringWebHandler webHandler, RouteLocator routeLocator,    GlobalCorsProperties globalCorsProperties, Environment environment) {  return new RoutePredicateHandlerMapping(webHandler, routeLocator,      globalCorsProperties, environment);}
复制代码

首先可以看到,这里装配是无条件的,没有留出拓展点(我 之前文章 对此用了特殊的方法进行了拓展),重点是两个 Bean 的注入:

  • FilteringWebHandler :创建过滤器链,加载全局过滤器并转化为网关过滤器,组合二者,并执行过滤器链,本文不展开这部分;

  • RouteLocator :有多个实现类,本文重点 。

下面代码为路由查找的过程,重点是this.routeLocator.getRoutes(), 源码上增加了注释便于理解

@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) {//... 忽略之上代码  // lookupRoute 用于查找路由  return lookupRoute(exchange)      // 将查找到的路由记录到 ServerWebExchange 上下文中,然后,返回 FilteringWebHandler      .flatMap((Function<Route, Mono<?>>) r -> {        exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);        if (logger.isDebugEnabled()) {          logger.debug(              "Mapping [" + getExchangeDesc(exchange) + "] to " + r);        }        // 后续会从Attributes获取到路由对象,进而获取路由过滤器,执行过滤器等列操作        exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);        return Mono.just(webHandler);//... 忽略后续代码
复制代码


 protected Mono<Route> lookupRoute(ServerWebExchange exchange) {   // this.routeLocator.getRoutes() 该方法是重点,后续需要继续分析    // RouteLocator 的实现类 是如何 getRoutes()   return this.routeLocator.getRoutes()       .concatMap(route -> Mono.just(route).filterWhen(r -> {         exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());         // 根据请求、当前路由的断言,判断当前当前路由断言是否命中,非本文重点,不展开         return r.getPredicate().apply(exchange);       })//... 忽略后续代码
复制代码

RouteLocator 实现类



// 转化方法 @Overridepublic Flux<Route> getRoutes() {  // 注意这里的 getRouteDefinitions()   Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()      .map(this::convertToRoute);  // ... 下略
复制代码

我们再次看下GatewayAutoConfiguration 中自动装配的情况,源码 223-240 行

@Beanpublic RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,  // 注入所有过滤器工厂(不含全局过滤器),用于根据RouteDefinition组装Route  List<GatewayFilterFactory> gatewayFilters,  // 注入所有断言工厂,用于根据RouteDefinition组装Route  List<RoutePredicateFactory> predicates,  // 注入RouteDefinition的加载器,这里是下文重点  RouteDefinitionLocator routeDefinitionLocator,  ConfigurationService configurationService) {  return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,      gatewayFilters, properties, configurationService);}@Bean@Primary@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {  return new CachingRouteLocator(      new CompositeRouteLocator(Flux.fromIterable(routeLocators)));}
复制代码

GatewayAutoConfiguration 中并没有直接装配 CompositeRouteLocator,而是嵌套在了 CachingRouteLocator 中,上述代码块 17 行表示所有 RouteLocator 的实现类都会被装配到cachedCompositeRouteLocator 中(也包含cachedCompositeRouteLocator,16 行巧妙的通过一个 Conditional 避免了对自身的循环依赖),这样注入到cachedCompositeRouteLocator 其实只有 routeDefinitionRouteLocator ,这里可以由玩家进行拓展。

CachingRouteLocator 本文不进行展开说明,在你创建路由后,需发布一个RefreshRoutesEvent 事件,然后这个 Locator 就可以监听到该事件,并刷新路由。

我们看一下RouteDefinitionRouteLocator的构造方法:

public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,    List<RoutePredicateFactory> predicates,    List<GatewayFilterFactory> gatewayFilterFactories,    GatewayProperties gatewayProperties,    ConfigurationService configurationService) {  this.routeDefinitionLocator = routeDefinitionLocator;  this.configurationService = configurationService;  // 断言工厂初始化,同下面过滤器器工厂类似,也有截去RoutePredicateFactory的操作  initFactories(predicates);  gatewayFilterFactories.forEach(      // factory.name() 用以获取过滤器工厂的名字,具体代码实现就是 末尾截去"GatewayFilterFactory"      // 这也是为什么我们自定义过滤器工厂时需要以GatewayFilterFactory结尾命名      factory -> this.gatewayFilterFactories.put(factory.name(), factory));  this.gatewayProperties = gatewayProperties;}
复制代码

结合上面代码块的内容,例如我们在使用 Gateway 时定义一个路由:

spring:  cloud:    gateway:      routes:      - id: path_route        uri: https://example.org        predicates:        - Path=/red/{segment},/blue/{segment}        filters:        - AddRequestHeader=X-Request-red, blue
复制代码

Path 对应 PathRoutePredicateFactory,AddRequestHeader 对应 AddRequestHeaderGatewayFilterFactory。

RouteDefinitionLocator 实现类

装配 RouteDefinitionRouteLocator 时注入了一个 类型为 RouteDefinitionLocator的 Bean,回到GatewayAutoConfiguration 看 RouteDefinitionLocator 是如何装配的,源码 208-214 行

@Bean@Primary // 注意 @Primary 决定了上面被注入的是这个Bean,// 在GatewayAutoConfiguration 中装配了多个RouteDefinitionLocator的子类,包括// CompositeRouteDefinitionLocator、InMemoryRouteDefinitionRepository,// 还在其他配置类中装配的// PropertiesRouteDefinitionLocator 、DiscoveryClientRouteDefinitionLocator public RouteDefinitionLocator routeDefinitionLocator(  List<RouteDefinitionLocator> routeDefinitionLocators) {  return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));}
复制代码

这里采用了一个 CompositeRouteDefinitionLocator对所有 RouteDefinitionLocator的实现类进行了组合封装,这些实现了,都实现了具体的 getRouteDefinitions() 方法。


# 开启此locatorspring.cloud.gateway.discovery.locator.enabled=true# 开启默认为 reactive 模式,需显示关闭可调整为阻塞模式spring.cloud.discovery.reactive.enabled=false
复制代码
  • CachingRouteDefinitionLocator 通过翻阅源码,发现此加载器并未实装,可能是考虑在底层 RouteLocator 已经具备了缓存。

  • InMemoryRouteDefinitionRepository 基于内存存储的路由加载器,可以通过 SpringCloud Gateway 提供的 management endpoint 进行路由管理,但由于基于内存实现,并未持久化。

    通过GatewayAutoConfiguration 查看其装配代码,源码 202-206 行:

    @Bean // 只有在没有装配 RouteDefinitionRepository 的其他实现 Bean 时,才生效, // 我们可以通过该扩展点实现动态路由及持久化 @ConditionalOnMissingBean(RouteDefinitionRepository.class) public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() { return new InMemoryRouteDefinitionRepository(); }

  • CompositeRouteDefinitionLocator 是对其他 RouteDefinitionLocator 的组合,通过GatewayAutoConfiguration 可以看到其装配代码,源码 208-214 行

    @Bean @Primary public RouteDefinitionLocator routeDefinitionLocator(    List<RouteDefinitionLocator> routeDefinitionLocators) {  return new CompositeRouteDefinitionLocator(      Flux.fromIterable(routeDefinitionLocators)); }

    可以看到前面 RouteDefinitionRouteLocator 中注入的 routeDefinitionLocator 就是 CompositeRouteDefinitionLocator,而装配它是注入了所有 RouteDefinitionLocator 的实现,其中包括 RouteDefinitionRepository 的实现(默认情况下为 InMemoryRouteDefinitionRepository)。

分析到这里就比较明确了,如果需要实现可持久化存储的动态路由,我们只需基于数据库(或其他持久化存储),参考 InMemoryRouteDefinitionRepository 实现一个 RouteDefinitionRepository 即可。

另外 Gateway 自带了管理端点,用于对路由进行动态管理,不过默认的实现是基于内存的,无法持久化:



基于数据库存储的动态路由

对上文归纳总结如下:

  1. 从 RoutePredicateHandlerMapping 入手,注入其中的 RouteLocator 为 CachingRouteLocator ;

  2. CachingRouteLocator 封装了 CompositeRouteLocator ;

  3. CompositeRouteLocator 组合的唯一 RouteLocator 就是 RouteDefinitionRouteLocator ;

  4. RouteDefinitionRouteLocator 装配时注入了 CompositeRouteDefinitionLocator 并实现了 DefinitionRoute 到 Route 的转换 ;

  5. CompositeRouteDefinitionLocator 组合了所有的 RouteDefinitionLocator,其中包括 RouteDefinitionRepository 的一个实现,即 InMemoryRouteDefinitionRepository ;

  6. InMemoryRouteDefinitionRepository 装配是有条件的,仅在不存在其他 RouteDefinitionRepository 的 Bean 才生效;

  7. 参考 InMemoryRouteDefinitionRepository,实现一个基于数据库的路由存储。

下文是采用 MongoDB 进行路由存储的实现代码示例,选用 MongoDB 出于几点考虑:

  1. 路由的定义结构本身就是 JSON 格式;

  2. 路由的断言及过滤器支持自定义,模式比较灵活,关系型数据库支持困难;

  3. MongoDB 支持响应式驱动,与 Reactor 模式比较契合;

  4. MongoDB 的 ChangeStream 机制,能较好的支持路由的准实时刷新。

代码片段

MongoRouteDefinition 仅仅对原生的类进行了继承,未添加任何属性,仅用于添加数据库存储注解@Document(这个类是非必须的,不过定义后,后面代码会有较大简化)

@Document("gwRoutes")public class MongoRouteDefinition extends RouteDefinition {  public static MongoRouteDefinition from(RouteDefinition route) {    MongoRouteDefinition newRoute = new MongoRouteDefinition();    BeanUtils.copyProperties(route, newRoute);    return newRoute;  }}
复制代码

RouteRepositoryOperations 数据库操作接口,JPA 风格

public interface RouteRepositoryOperations extends    ReactiveMongoRepository<MongoRouteDefinition, String> {  /**   * 获取所有路由(分页查询)   *   * @param pageable 分页   * @return 当前页   */  @Query(value = "{}", sort = "{_id:1}")  Flux<MongoRouteDefinition> findAll(Pageable pageable);}
复制代码

MongoRouteDefinitionRepository 实现动态路由的主体代码,封装了路由操作

@Slf4j@Componentpublic class MongoRouteDefinitionRepository    implements RouteDefinitionRepository, ApplicationEventPublisherAware {  private ApplicationEventPublisher eventPublisher;  private Map<String, RouteDefinition> cache = new ConcurrentHashMap<>();  private final RouteRepositoryOperations repositoryOperation;  public MongoRouteDefinitionRepository(RouteRepositoryOperations repositoryOperation) {    this.repositoryOperation = repositoryOperation;  }  @Override  public Flux<RouteDefinition> getRouteDefinitions() {    return Flux.fromIterable(cache.values());  }  @Override  public Mono<Void> save(Mono<RouteDefinition> route) {    return route.flatMap(        r -> repositoryOperation.save(MongoRouteDefinition.from(r))            .doOnNext(this::addCache)            .then(Mono.empty())    );  }  @Override  public Mono<Void> delete(Mono<String> routeId) {    return repositoryOperation.findById(routeId)        .map(RouteDefinition::getId)        .doOnNext(this::removeCache)        .flatMap(repositoryOperation::deleteById);  }  @Override  public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {    this.eventPublisher = eventPublisher;  }  /**   * 将指定路由加入到缓存中。   * <p>   * 为了能实时加载路由,可以通过MongoDB的ChangeStream,监听到数据变化后调用此方法   */  public void addCache(RouteDefinition route) {    this.cache.putIfAbsent(route.getId(), route);    // 前文中的CachingRouteLocator会监听此事件,并清空缓存重新从RouteDefinitionLocator中加载路由    this.publishEvent();  }  /**   * 将指定路由从缓存中删除。   * <p>   * 为了能实时加载路由,可以通过MongoDB的ChangeStream,监听到数据变化后调用此方法   */  public void removeCache(String routeId) {    if (this.cache.remove(routeId) != null) {      this.publishEvent();    }  }  void publishEvent() {    eventPublisher.publishEvent(new RefreshRoutesEvent(this));  }  RouteRepositoryOperations getRepositoryOperation() {    return repositoryOperation;  }  Map<String, RouteDefinition> getCache() {    return cache;  }  void setCache(      Map<String, RouteDefinition> cache) {    this.cache = cache;  }}
复制代码

RouteChangeStreamHandler 实现路由的准实时动态加载(注意单机版 MongoDB 不支持 ChangeStream):

@Lazy(false)@Component@ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true)public class RouteChangeStreamHandler {  private final ReactiveMongoTemplate mongoTemplate;  private final MongoRouteDefinitionRepository routeRepository;  public RouteChangeStreamHandler(      MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) {    this.routeRepository = routeRepository;    this.mongoTemplate = mongoTemplate;  }  @PostConstruct  public void run() {    new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start();  }  public void startMonitor() {    Aggregation aggregation = Aggregation.newAggregation(Aggregation        .match(Criteria.where("operationType").in("insert", "delete", "update", "replace")));    ChangeStreamOptions options = ChangeStreamOptions.builder()        .filter(aggregation)        .returnFullDocumentOnUpdate()        .build();    String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value();    Flux<ChangeStreamEvent<MongoRouteDefinition>> changeStream = mongoTemplate        .changeStream(collectionName, options, MongoRouteDefinition.class);    changeStream        .log()        .doOnNext(e -> {          if (OperationType.INSERT == e.getOperationType()              || OperationType.UPDATE == e.getOperationType()              || OperationType.REPLACE == e.getOperationType()) {            Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache);          } else if (OperationType.DELETE == e.getOperationType()) {            getId(e).ifPresent(routeRepository::removeCache);          }        }).blockLast();  }  private Optional<String> getId(ChangeStreamEvent<MongoRouteDefinition> e) {    return Optional.ofNullable(e.getRaw())        .flatMap(raw -> Optional.ofNullable(raw.getDocumentKey()))        .flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id")))        .flatMap(bson -> Optional.of(bson.getValue().toHexString()));  }}
复制代码

RouteRefresher 定时任务,全量刷新数据库中存储的路由

@Slf4j@Component@ConditionalOnProperty(value = "route.schedule.enabled", havingValue = "true", matchIfMissing = true)public class RouteRefresher {  private final MongoRouteDefinitionRepository repository;  public RouteRefresher(      MongoRouteDefinitionRepository repository) {    this.repository = repository;  }  /**   * 固定间隔重新加载一次缓存   */  @Scheduled(initialDelay = 10000, fixedDelay = 60 * 60 * 1001)  private void refresh() {    RouteRepositoryOperations operation = repository.getRepositoryOperation();    int page = 0;    int pageSize = 1000;    int total = Math.toIntExact(operation.count().blockOptional().orElse(0L));    Map<String, RouteDefinition> oldCache = repository.getCache();    Map<String, RouteDefinition> newCache = new ConcurrentHashMap<>(total);    int oldTotal = oldCache.size();    if (oldTotal < 1) {      // 首次同步刷新      repository.setCache(newCache);    }    while (page * pageSize < total) {      operation.findAll(PageRequest.of(page++, pageSize))          .doOnNext(route -> newCache.putIfAbsent(route.getId(), route))          .blockLast();      log.info("动态路由表当前总大小为:{}, 新路由表当前大小为:{}", oldTotal, newCache.size());    }    repository.setCache(newCache);    log.info("新路由表加载完成,当前大小为:{}", newCache.size());    oldCache.clear();    repository.publishEvent();  }}
复制代码


全文总结

本文依据 SpringCloud Gateway 工作原理,从入口代码入手,逐渐深入解析路由加载的机制,进而找到动态路由的拓展点,然后基于路由数据结构的特征及 SpringCloud Gateway 的特点,选择了基于 MongoDB 实现一个简单的动态路由示例。

通过阅读本文,你可以了解 SpringCloud Gateway 的工作原理,掌握对其源码的分析方法(适用于其他 Springboot 工程),初步领略响应式编程的魅力。

希望本文可以为你带来更多收获与思考。


发布于: 2021 年 06 月 17 日阅读数: 50
用户头像

中原银行

关注

打造科技驱动、创新引领的数字化未来银行。 2020.02.06 加入

中原银行是河南省属法人银行,总部位于河南省郑州市。我行坚持“科技立行、科技兴行”,秉承“稳健 创新 进取 高效”理念,发展移动金融、线上金融,提升综合金融服务能力,金融科技应用水平居国内城商行领先地位。

评论

发布
暂无评论
SpringCloud Gateway 动态路由