写点什么

Apache ShenYu 源码阅读系列 - 注册中心实现原理之 Http 注册

作者:子夜2104
  • 2021 年 12 月 06 日
  • 本文字数:22410 字

    阅读完需:约 74 分钟

Apache ShenYu源码阅读系列-注册中心实现原理之Http注册

Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。


ShenYu网关中,注册中心是用于将客户端信息注册到shenyu-adminadmin再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息URI信息


本文基于shenyu-2.4.1版本进行源码分析,官网的介绍请参考 客户端接入原理

1. 注册中心原理

当客户端启动时,读取接口信息和uri信息,通过指定的注册类型,将数据发送到shenyu-admin



图中的注册中心需要用户指定使用哪种注册类型,ShenYu当前支持HttpZookeeperEtcdConsulNacos进行注册。具体如何配置请参考 客户端接入配置


ShenYu在注册中心的原理设计上引入了DisruptorDisruptor队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。



如图所示,注册中心分为两个部分,一是注册中心客户端register-client,负载处理客户端数据读取。另一个是注册中心服务端register-server,负载处理服务端(就是shenyu-admin)数据写入。通过指定注册类型进行数据发送和接收。


  • 客户端:通常来说就是一个微服务,可以是springmvcspring-clouddubbogrpc等。

  • register-client:注册中心客户端,读取客户接口和uri信息。

  • Disruptor:数据与操作解耦,数据缓冲作用。

  • register-server:注册中心服务端,这里就是shenyu-admin,接收数据,写入数据库,发数据同步事件。

  • 注册类型:指定注册类型,完成数据注册,当前支持HttpZookeeperEtcdConsulNacos


本文分析的是使用Http的方式进行注册,所以具体的处理流程如下:



在客户端,数据出队列后,通过http传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。

2. 客户端注册流程

当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置

2.1 加载配置,读取属性

先用一张图串联下注册中心客户端初始化流程:



我们分析的是通过http的方式进行注册,所以需要进行如下配置:


shenyu:  register:    registerType: http    serverLists: http://localhost:9095  client:    http:        props:          contextPath: /http          appName: http          port: 8189            isFull: false
复制代码


每个属性表示的含义如下:


  • registerType : 服务注册类型,填写 http

  • serverList: 为http注册类型时,填写Shenyu-Admin项目的地址,注意加上http://,多个地址用英文逗号分隔。

  • port: 你本项目的启动端口,目前springmvc/tars/grpc需要进行填写。

  • contextPath: 为你的这个mvc项目在shenyu网关的路由前缀, 比如/order/product 等等,网关会根据你的这个前缀来进行路由。

  • appName:你的应用名称,不配置的话,会默认取 spring.application.name 的值。

  • isFull: 设置 true 代表代理你的整个服务,false表示代理你其中某几个controller;目前适用于springmvc/springcloud


项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean


首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientBeanPostProcessor,主要处理元数据。创建ContextRegisterListener,主要处理 URI 信息。


/** * shenyu 客户端http注册配置类 */@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)public class ShenyuSpringMvcClientConfiguration {
//创建SpringMvcClientBeanPostProcessor,主要处理元数据 @Bean public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { return new SpringMvcClientBeanPostProcessor(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository); } // 创建ContextRegisterListener,主要处理 URI信息 @Bean public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) { return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName())); }}
复制代码


ShenyuClientCommonBeanConfigurationshenyu客户端通用配置类,会创建注册中心客户端通用的bean


  • 创建ShenyuClientRegisterRepository,通过工厂类创建而成。

  • 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置。

  • 创建ShenyuClientConfig,读取shenyu.client属性配置。


/** * shenyu客户端通用配置类 */@Configurationpublic class ShenyuClientCommonBeanConfiguration {       // 创建ShenyuClientRegisterRepository,通过工厂类创建而成。    @Bean    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {        return ShenyuClientRegisterRepositoryFactory.newInstance(config);    }      // 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }      // 创建ShenyuClientConfig,读取shenyu.client属性配置    @Bean    @ConfigurationProperties(prefix = "shenyu")    public ShenyuClientConfig shenyuClientConfig() {        return new ShenyuClientConfig();    }}
复制代码

2.2 用于注册的 HttpClientRegisterRepository

上面的配置文件中生成的ShenyuClientRegisterRepository是客户端注册的具体实现,它是一个接口,它的实现类如下。



  • HttpClientRegisterRepository:通过http进行注册;

  • ConsulClientRegisterRepository:通过Consul进行注册;

  • EtcdClientRegisterRepository:通过Etcd进行注册;

  • NacosClientRegisterRepository:通过nacos进行注册;

  • ZookeeperClientRegisterRepository通过Zookeeper进行注册。


具体是哪一种方式,是通过SPI进行加载实现的,实现逻辑如下:


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}
复制代码


加载类型通过registerType指定,也就是我们在配置文件中指定的类型:


shenyu:  register:    registerType: http    serverLists: http://localhost:9095
复制代码


我们指定的是http,所以会去加载HttpClientRegisterRepository。对象创建成功后,执行的初始化方法init()如下:


@Joinpublic class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {        @Override    public void init(final ShenyuRegisterCenterConfig config) {        this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));    }    // 暂时省略其他逻辑}
复制代码


读取配置文件中的serverLists,即sheenyu-admin的地址,为后续数据发送做准备。类注解@Join用于SPI的加载。


SPI 全称为 Service Provider Interface, 是 JDK 内置的一种服务提供发现功能, 一种动态替换发现的机制。

shenyu-spiApache ShenYu网关自定义的SPI扩展实现,设计和实现原理参考了DubboSPI扩展实现

2.3 构建元数据的 SpringMvcClientBeanPostProcessor

创建SpringMvcClientBeanPostProcessor,负责元数据的构建和注册,它的构造函数逻辑如下:


/** *  spring mvc 客户端bean的后置处理器 */public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
/** * 通过构造函数进行实例化 */ public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { // 读取配置属性 Properties props = clientConfig.getProps(); // 获取端口信息,并校验 int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT)); if (port <= 0) { String errorMsg = "http register param must config the port must > 0"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } // 获取appName this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); // 获取contextPath this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); // 校验appName和contextPath if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) { String errorMsg = "http register param must config the appName or contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } // 获取 isFull this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); // 开始事件发布 publisher.start(shenyuClientRegisterRepository); }
// 暂时省略了其他逻辑 @Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { // 暂时省略了其他逻辑 }
}
复制代码


在构造函数中,主要是读取属性信息,然后进行校验。


shenyu:  client:    http:        props:          contextPath: /http          appName: http          port: 8189            isFull: false
复制代码


最后,执行了 publisher.start(),开始事件发布,为注册做准备。


  • ShenyuClientRegisterEventPublisher


ShenyuClientRegisterEventPublisher通过单例模式实现,主要是生成元数据URI订阅器(后续用于数据发布),然后启动Disruptor队列。提供了一个共有方法publishEvent(),发布事件,向 Disruptor 队列发数据。


public class ShenyuClientRegisterEventPublisher {    // 私有变量    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();        private DisruptorProviderManage providerManage;        private RegisterClientExecutorFactory factory;        /**     * 公开静态方法     *     * @return ShenyuClientRegisterEventPublisher instance     */    public static ShenyuClientRegisterEventPublisher getInstance() {        return INSTANCE;    }        /**     * Start方法执行     *     * @param shenyuClientRegisterRepository shenyuClientRegisterRepository     */    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {        // 创建客户端注册工厂类        factory = new RegisterClientExecutorFactory();        // 添加元数据订阅器        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));        //  添加URI订阅器        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));        // 启动Disruptor队列        providerManage = new DisruptorProviderManage(factory);        providerManage.startup();    }        /**     * 发布事件,向Disruptor队列发数据     *     * @param <T> the type parameter     * @param data the data     */    public <T> void publishEvent(final T data) {        DisruptorProvider<Object> provider = providerManage.getProvider();        provider.onData(f -> f.setData(data));    }}
复制代码


SpringMvcClientBeanPostProcessor的构造函数逻辑分析完了,主要是读取属性配置,创建元数据和URI订阅器, 启动Disruptor队列。要注意到它实现了BeanPostProcessor,这是Spring提供的一个接口,在Bean的生命周期中,真正开始使用之前,会执行后置处理器的postProcessAfterInitialization()方法。


  • postProcessAfterInitialization() 方法


SpringMvcClientBeanPostProcessor作为一个后置处理器,它的功能是:读取注解中的元数据,并向admin注册。


// 后置处理器public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {   // 省略了其他逻辑        // 后置处理器:读取注解中的元数据,并向admin注册    @Override    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {        // 配置属性,如果 isFull=true 的话,表示注册整个微服务        if (isFull) {            return bean;        }        // 获取当前bean的Controller注解        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);         // 获取当前bean的RequestMapping注解        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);        // 如果这个bean是一个接口        if (controller != null || requestMapping != null) {               // 获取当前bean的 ShenyuSpringMvcClient 注解            ShenyuSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);            String prePath = "";            //如果没有 ShenyuSpringMvcClient 注解,就返回,表示这个接口不需要注册            if (Objects.isNull(clazzAnnotation)) {                return bean;            }             //如果 ShenyuSpringMvcClient 注解中的path属性包括 * ,表示注册整个接口            if (clazzAnnotation.path().indexOf("*") > 1) {                // 构建元数据,发送注册事件                publisher.publishEvent(buildMetaDataDTO(clazzAnnotation, prePath));                return bean;            }                        prePath = clazzAnnotation.path();            // 获取当前bean的所有方法            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());            // 遍历方法            for (Method method : methods) {                // 获取当前方法上的注解 ShenyuSpringMvcClient                ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);                // 如果方法上有注解ShenyuSpringMvcClient,就表示该方法需要注册                if (Objects.nonNull(shenyuSpringMvcClient)) {                    // 构建元数据,发送注册事件                    publisher.publishEvent(buildMetaDataDTO(shenyuSpringMvcClient, prePath));                }            }        }        return bean;    }
// 构造元数据 private MetaDataRegisterDTO buildMetaDataDTO(final ShenyuSpringMvcClient shenyuSpringMvcClient, final String prePath) { // contextPath上下文名称 String contextPath = this.contextPath; // appName应用名称 String appName = this.appName; // path注册路径 String path; if (StringUtils.isEmpty(contextPath)) { path = prePath + shenyuSpringMvcClient.path(); } else { path = contextPath + prePath + shenyuSpringMvcClient.path(); } // desc描述信息 String desc = shenyuSpringMvcClient.desc(); // ruleName规则名称,没有填写的话就和path一致 String configRuleName = shenyuSpringMvcClient.ruleName(); String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName; // 构建元数据 return MetaDataRegisterDTO.builder() .contextPath(contextPath) .appName(appName) .path(path) .pathDesc(desc) .rpcType(RpcTypeEnum.HTTP.getName()) .enabled(shenyuSpringMvcClient.enabled()) .ruleName(ruleName) .registerMetaData(shenyuSpringMvcClient.registerMetaData()) .build(); }}
复制代码


在后置处理器中,需要读取配置属性,如果 isFull=true 的话,表示注册整个微服务。获取当前beanController注解、RequestMapping注解、ShenyuSpringMvcClient 注解,通过读取这些注解信息判断当前bean是否是接口?接口是否需要注册?方法是否需要注册?然后根据ShenyuSpringMvcClient 注解中的属性构建元数据,最后通过publisher.publishEvent()发布事件进行注册。


Controller注解和RequestMapping注解是由Spring提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient 注解是由Apache ShenYu提供的,用于注册SpringMvc客户端,它的定义如下:


/** * shenyu 客户端接口,用于方法上或类上 */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})public @interface ShenyuSpringMvcClient {    // path 注册路径    String path();        // ruleName 规则名称    String ruleName() default "";        // desc 描述信息    String desc() default "";
// enabled是否启用 boolean enabled() default true; // registerMetaData 注册元数据 boolean registerMetaData() default false;}
复制代码


它的使用如下:


  • 注册整个接口


@RestController@RequestMapping("/test")@ShenyuSpringMvcClient(path = "/test/**")  // 表示整个接口注册public class HttpTestController { //......}
复制代码


  • 注册当前方法


@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")public class OrderController {
/** * Save order dto. * * @param orderDTO the order dto * @return the order dto */ @PostMapping("/save") @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法 public OrderDTO save(@RequestBody final OrderDTO orderDTO) { orderDTO.setName("hello world save order"); return orderDTO; }
复制代码


  • publisher.publishEvent() 发布注册事件


该方法会将数据发送到Disruptor队列中,关于Disruptor队列更多细节这里不做更多介绍,这不影响分析注册的流程。


当数据发送后,Disruptor队列的消费者会处理数据,进行消费。


  • QueueConsumer 消费数据


QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()


package com.lmax.disruptor;
public interface WorkHandler<T> { void onEvent(T var1) throws Exception;}
复制代码


QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {      // 省略了其他逻辑
@Override public void onEvent(final DataEvent<T> t) { if (t != null) { // 通过工厂创建队列消费任务 QueueConsumerExecutor<T> queueConsumerExecutor = factory.create(); // 保存数据 queueConsumerExecutor.setData(t.getData()); // help gc t.setData(null); // 放在线程池中执行 消费任务 executor.execute(queueConsumerExecutor); } }}
复制代码


QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:


  • RegisterClientConsumerExecutor:客户端消费者执行器;

  • RegisterServerConsumerExecutor:服务端消费者执行器。


顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin,在下文进行分析)。



  • RegisterClientConsumerExecutor 消费者执行器


重写的run()逻辑如下:


public final class RegisterClientConsumerExecutor extends QueueConsumerExecutor<DataTypeParent> {      //...... 
@Override public void run() { // 获取数据 DataTypeParent dataTypeParent = getData(); // 根据数据类型调用相应的处理器进行处理 subscribers.get(dataTypeParent.getType()).executor(Lists.newArrayList(dataTypeParent)); } }
复制代码


根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI数据,记录客户端服务信息。


//数据类型public enum DataType {   // 元数据    META_DATA,       // URI数据    URI,}
复制代码


  • ExecutorSubscriber#executor() 执行器订阅者


执行器订阅者也分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。



  • ShenyuClientMetadataExecutorSubscriber#executor()


客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()完成数据的发布。


public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {       //......        @Override    public DataType getType() {        return DataType.META_DATA; // 元数据    }        @Override    public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {        for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {            // 调用接口方法persistInterface()完成数据的发布            shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);        }    }}
复制代码


  • ShenyuClientRegisterRepository#persistInterface()


ShenyuClientRegisterRepository是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。


  • ConsulClientRegisterRepository:通过Consul实现客户端注册;

  • EtcdClientRegisterRepository:通过Etcd实现客户端注册;

  • HttpClientRegisterRepository:通过Http实现客户端注册;

  • NacosClientRegisterRepository:通过Nacos实现客户端注册;

  • ZookeeperClientRegisterRepository:通过Zookeeper实现客户端注册;



从图中可以看出,注册中心的加载是通过SPI的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。


/** * 加载 ShenyuClientRegisterRepository */public final class ShenyuClientRegisterRepositoryFactory {        private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();        /**     * 创建 ShenyuClientRegisterRepository     */    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {            // 通过SPI的方式进行加载,类型由registerType决定            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());            //执行初始化操作            result.init(shenyuRegisterCenterConfig);            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);            return result;        }        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());    }}
复制代码


本文的源码分析是基于Http的方式进行注册,所以我们先分析HttpClientRegisterRepository,其他的注册方式后续再分析。


通过http的方式注册很简单,就是调用工具类发送http请求。注册元数据和 URI 都是调用的同一个方法doRegister(),指定接口和类型就好。


  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。

  • /shenyu-client/register-uri: 服务端提供的接口用于注册 URI。


@Joinpublic class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {    // 服务端提供的接口用于注册元数据        private static final String META_PATH = "/shenyu-client/register-metadata";
// 服务端提供的接口用于注册URI private static final String URI_PATH = "/shenyu-client/register-uri";
//注册URI @Override public void persistURI(final URIRegisterDTO registerDTO) { doRegister(registerDTO, URI_PATH, Constants.URI); } //注册接口(就是元数据信息) @Override public void persistInterface(final MetaDataRegisterDTO metadata) { doRegister(metadata, META_PATH, META_TYPE); } // 进行注册 private <T> void doRegister(final T t, final String path, final String type) { // 遍历admin服务列表(admin可能是集群) for (String server : serverList) { try { // 调用工具类发送 http 请求 RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), server + path, type); return; } catch (Exception e) { LOGGER.error("register admin url :{} is fail, will retry", server); } } }}
复制代码


将数据序列化后,通过OkHttp发送数据。


public final class RegisterUtils {      //...... 
// 通过OkHttp发送数据 public static void doRegister(final String json, final String url, final String type) throws IOException { String result = OkHttpTools.getInstance().post(url, json); if (Objects.equals(SUCCESS, result)) { LOGGER.info("{} client register success: {} ", type, json); } else { LOGGER.error("{} client register error: {} ", type, json); } }}
复制代码


至此,客户端通过http的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin


客户端元数据注册流程的源码分析过程完成了,用流程图描述如下:


2.4 构建 URI 的 ContextRegisterListener

创建 ContextRegisterListener,负责客户端URI数据的构建和注册,它的创建是在配置文件中完成。


@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)public class ShenyuSpringMvcClientConfiguration {     // ......        //  创建 ContextRegisterListener    @Bean    public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {        return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));    }}
复制代码


ContextRegisterListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有 Spring 事件发生后,该方法会执行。


// 实现了ApplicationListener接口public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
//......
//通过构造函数完成实例化 public ContextRegisterListener(final PropertiesConfig clientConfig) { // 读取 shenyu.client.http 配置信息 Properties props = clientConfig.getProps(); // isFull是否注册整个服务 this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString())); // contextPath上下文路径 String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); this.contextPath = contextPath; if (isFull) { if (StringUtils.isBlank(contextPath)) { String errorMsg = "http register param must config the contextPath"; LOG.error(errorMsg); throw new ShenyuClientIllegalArgumentException(errorMsg); } this.contextPath = contextPath + "/**"; } // port 客户端端口信息 int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT)); // appName 应用名称 this.appName = props.getProperty(ShenyuClientConstants.APP_NAME); // host信息 this.host = props.getProperty(ShenyuClientConstants.HOST); this.port = port; }
// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行 @Override public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) { //保证该方法的内容只执行一次 if (!registered.compareAndSet(false, true)) { return; } // 如果是 isFull=true 代表注册整个服务,构建元数据并注册 if (isFull) { publisher.publishEvent(buildMetaDataDTO()); } // 构建URI数据并注册 publisher.publishEvent(buildURIRegisterDTO()); }
// 构建URI数据 private URIRegisterDTO buildURIRegisterDTO() { String host = IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host); return URIRegisterDTO.builder() .contextPath(this.contextPath) .appName(appName) .host(host) .port(port) .rpcType(RpcTypeEnum.HTTP.getName()) .build(); }
// 构建元数据 private MetaDataRegisterDTO buildMetaDataDTO() { String contextPath = this.contextPath; String appName = this.appName; return MetaDataRegisterDTO.builder() .contextPath(contextPath) .appName(appName) .path(contextPath) .rpcType(RpcTypeEnum.HTTP.getName()) .enabled(true) .ruleName(contextPath) .build(); }}
复制代码


在构造函数中主要是读取属性配置。


onApplicationEvent()方法是有Spring事件发生时会执行,这里的参数是ContextRefreshedEvent,表示上下文刷新事件。当Spring容器就绪后执行此处逻辑:如果是 isFull=true 代表注册整个服务,构建元数据并注册,在前面分析的后置处理器SpringMvcClientBeanPostProcessor中没有处理 isFull=true 的情况,所以在此处进行了处理。然后再构建URI数据并注册。


ContextRefreshedEventSpring内置事件。ApplicationContext被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext接口中使用 refresh() 方法来发生。此处的初始化是指:所有的Bean被成功装载,后处理Bean被检测并激活,所有Singleton Bean 被预实例化,ApplicationContext容器已就绪可用。


注册逻辑都是通过 publisher.publishEvent()完成。在前面都已经分析过了:向Disruptor队列写入数据,再从中消费数据,最后通过ExecutorSubscriber去处理。


  • ExecutorSubscriber#executor()


执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。



这里是注册URI信息,所以执行类是ShenyuClientURIExecutorSubscriber


  • ShenyuClientURIExecutorSubscriber#executor()


主要逻辑是遍历 URI 数据集合,通过persistURI()方法实现数据注册。


public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {        //......        @Override    public DataType getType() {        return DataType.URI; //数据类型是URI    }        // 注册URI数据    @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        for (URIRegisterDTO uriRegisterDTO : dataList) {            Stopwatch stopwatch = Stopwatch.createStarted();            while (true) {                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {                    break;                } catch (IOException e) {                    long sleepTime = 1000;                    // maybe the port is delay exposed                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {                        LOG.error("host:{}, port:{} connection failed, will retry",                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());                        // If the connection fails for a long time, Increase sleep time                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {                            sleepTime = 10000;                        }                    }                    try {                        TimeUnit.MILLISECONDS.sleep(sleepTime);                    } catch (InterruptedException ex) {                        ex.printStackTrace();                    }                }            }            //添加hook,优雅停止客户端             ShenyuClientShutdownHook.delayOtherHooks();                        // 注册URI            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);        }    }}
复制代码


代码中的while(true)循环是为了保证客户端已经成功启动了,通过hostport可以连接上。


后面的逻辑是:添加hook函数,用于优雅停止客户端 。


通过persistURI()方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp客户端向shenyu-admin发起http,通过http的方式注册URI


分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和 URI 数据发送到Disruptor队列,再从中消费,读取数据,通过httpadmin发送数据。


客户端URI注册流程的源码分析完成了,流程图如下:


3. 服务端注册流程

3.1 注册接口 ShenyuHttpRegistryController

从前面的分析可以知道,服务端提供了注册的两个接口:


  • /shenyu-client/register-metadata:服务端提供的接口用于注册元数据。

  • /shenyu-client/register-uri: 服务端提供的接口用于注册 URI。


这两个接口位于ShenyuHttpRegistryController中,它实现了ShenyuServerRegisterRepository接口,是服务端注册的实现类。它用@Join标记,表示通过SPI进行加载。


// shenuyu客户端接口@RequestMapping("/shenyu-client")@Joinpublic class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {
private ShenyuServerRegisterPublisher publisher;
@Override public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) { this.publisher = publisher; } // 注册元数据 @PostMapping("/register-metadata") @ResponseBody public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) { publish(metaDataRegisterDTO); return ShenyuResultMessage.SUCCESS; } // 注册URI @PostMapping("/register-uri") @ResponseBody public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) { publish(uriRegisterDTO); return ShenyuResultMessage.SUCCESS; }
// 发布注册事件 private <T> void publish(final T t) { publisher.publish(Collections.singletonList(t)); }}
复制代码


两个注册接口获取到数据好,就调用了publish()方法,把数据发布到Disruptor队列中。


  • ShenyuServerRegisterRepository接口


ShenyuServerRegisterRepository接口是服务注册接口,它有五个实现类,表示有五种注册方式:


  • ConsulServerRegisterRepository:通过Consul实现注册;

  • EtcdServerRegisterRepository:通过Etcd实现注册;

  • NacosServerRegisterRepository:通过Nacos实现注册;

  • ShenyuHttpRegistryController:通过Http实现注册;

  • ZookeeperServerRegisterRepository:通过Zookeeper实现注册。


具体用哪一种方式,是通过配置文件指定的,然后通过SPI进行加载。


shenyu-admin中的application.yml文件中配置注册方式,registerType指定注册类型,当用http进行注册时,serverLists不需要填写,更多配置说明可以参考官网 客户端接入配置


shenyu:  register:    registerType: http     serverLists: 
复制代码


  • RegisterCenterConfiguration 加载配置


在引入相关依赖和属性配置后,启动shenyu-admin时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration


// 注册中心配置类@Configurationpublic class RegisterCenterConfiguration {    // 读取配置属性    @Bean    @ConfigurationProperties(prefix = "shenyu.register")    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {        return new ShenyuRegisterCenterConfig();    }        //创建ShenyuServerRegisterRepository,用于服务端注册    @Bean    public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {        // 1.从配置属性中获取注册类型        String registerType = shenyuRegisterCenterConfig.getRegisterType();        // 2.通过注册类型,以SPI的方法加载实现类        ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);        // 3.获取publisher,向Disruptor队列中写数据        RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();        // 4.注册Service, rpcType -> registerService        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));        // 5.事件发布的准备工作        publisher.start(registerServiceMap);        // 6.注册的初始化操作        registerRepository.init(publisher, shenyuRegisterCenterConfig);        return registerRepository;    }}
复制代码


在配置类中生成了两个bean


  • shenyuRegisterCenterConfig:读取属性配置;

  • shenyuServerRegisterRepository:用于服务端注册。


在创建shenyuServerRegisterRepository的过程中,也进行了一系列的准备工作:


  • 1.从配置属性中获取注册类型。

  • 2.通过注册类型,以SPI的方法加载实现类:比如指定的类型是http,就会加载ShenyuHttpRegistryController

  • 3.获取publisher,向Disruptor队列中写数据。

  • 4.注册ServicerpcType -> registerService:获取注册的Service,每种rpc都有对应的Service。本文的客户端构建是通过springboot,属于http类型,还有其他客户端类型:dubboSpring CloudgRPC等。

  • 5.事件发布的准备工作:添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。

  • 6.注册的初始化操作:http类型的注册初始化操作就是保存publisher

  • RegisterServerDisruptorPublisher#publish()


服务端向Disruptor队列写入数据的发布者 ,通过单例模式构建。


public class RegisterServerDisruptorPublisher implements ShenyuServerRegisterPublisher {    //私有属性    private static final RegisterServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();
//公开静态方法获取实例 public static RegisterServerDisruptorPublisher getInstance() { return INSTANCE; } //事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。 public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) { //服务端注册工厂 factory = new RegisterServerExecutorFactory(); //添加URI数据订阅器 factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService)); //添加元数据订阅器 factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService)); //启动Disruptor队列 providerManage = new DisruptorProviderManage(factory); providerManage.startup(); } // 向队列中写入数据 @Override public <T> void publish(final T data) { DisruptorProvider<Object> provider = providerManage.getProvider(); provider.onData(f -> f.setData(data)); } @Override public void close() { providerManage.getProvider().shutdown(); }}
复制代码


配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:


3.2 消费数据 QueueConsumer

在前面分析了客户端disruptor队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。


QueueConsumer是一个消费者,它实现了WorkHandler接口,它的创建过程在providerManage.startup()逻辑中。WorkHandler接口是disruptor的数据消费接口,只有一个方法是onEvent()


package com.lmax.disruptor;
public interface WorkHandler<T> { void onEvent(T var1) throws Exception;}
复制代码


QueueConsumer重写了onEvent()方法,主要逻辑是生成消费任务,然后在线程池中去执行。


/** *  * 队列消费者 */public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {      // 省略了其他逻辑
@Override public void onEvent(final DataEvent<T> t) { if (t != null) { // 通过工厂创建队列消费任务 QueueConsumerExecutor<T> queueConsumerExecutor = factory.create(); // 保存数据 queueConsumerExecutor.setData(t.getData()); // help gc t.setData(null); // 放在线程池中执行 消费任务 executor.execute(queueConsumerExecutor); } }}
复制代码


QueueConsumerExecutor是在线程池中被执行的任务,它实现了Runnable接口,具体的实现类有两个:


  • RegisterClientConsumerExecutor:客户端消费者执行器;

  • RegisterServerConsumerExecutor:服务端消费者执行器。


顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。


  • RegisterServerConsumerExecutor#run()


RegisterServerConsumerExecutor是服务端消费者执行器,它通过QueueConsumerExecutor间接实现了Runnable接口,并重写了run()方法。


public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {   // ...
@Override public void run() { //获取从disruptor队列中拿到的数据 List<DataTypeParent> results = getData(); // 数据校验 results = results.stream().filter(data -> isValidData(data)).collect(Collectors.toList()); if (CollectionUtils.isEmpty(results)) { return; } //根据类型执行操作 getType(results).executor(results); } // 根据类型获取订阅者 private ExecutorSubscriber getType(final List<DataTypeParent> list) { DataTypeParent result = list.get(0); return subscribers.get(result.getType()); }}
复制代码


  • ExecutorSubscriber#executor()


执行器订阅者分为两类,一个是处理元数据,一个是处理URI。在客户端和服务端分别有两个,所以一共是四个。



  • MetadataExecutorSubscriber#executor()


如果是注册元数据,则通过MetadataExecutorSubscriber#executor()实现:根据类型获取注册Service,调用register()


public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {     //......
@Override public DataType getType() { return DataType.META_DATA; // 元数据类型 }
@Override public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) { // 遍历元数据列表 for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) { // 根据类型获取注册Service ShenyuClientRegisterService shenyuClientRegisterService = this.shenyuClientRegisterService.get(metaDataRegisterDTO.getRpcType()); Objects.requireNonNull(shenyuClientRegisterService); // 对元数据进行注册,加锁确保顺序执行,防止并发错误 synchronized (ShenyuClientRegisterService.class) { shenyuClientRegisterService.register(metaDataRegisterDTO); } } }}
复制代码


  • URIRegisterExecutorSubscriber#executor()


如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()实现:构建URI数据,根据注册类型查找Service,通过registerURI方法实现注册。


public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {    //......        @Override    public DataType getType() {        return DataType.URI; // URI数据类型    }        @Override    public void executor(final Collection<URIRegisterDTO> dataList) {        if (CollectionUtils.isEmpty(dataList)) {            return;        }        // 构建URI数据类型,通过registerURI方法实现注册        findService(dataList).ifPresent(service -> {            Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);            listMap.forEach(service::registerURI);        });    }        // 根据类型查找Service    private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {        return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();    }}
复制代码


  • ShenyuClientRegisterService#register()


ShenyuClientRegisterService是注册方法接口,它有多个实现类:



  • AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;

  • AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;

  • ShenyuClientRegisterDivideServiceImpldivide类,处理http注册类型;

  • ShenyuClientRegisterDubboServiceImpldubbo类,处理dubbo注册类型;

  • ShenyuClientRegisterGrpcServiceImplgRPC类,处理gRPC注册类型;

  • ShenyuClientRegisterMotanServiceImplMotan类,处理Motan注册类型;

  • ShenyuClientRegisterSofaServiceImplSofa类,处理Sofa注册类型;

  • ShenyuClientRegisterSpringCloudServiceImplSpringCloud类,处理SpringCloud注册类型;

  • ShenyuClientRegisterTarsServiceImplTars类,处理Tars注册类型;


从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http注册类型,所以元数据和 URI 数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl


  • register(): 注册元数据


public String register(final MetaDataRegisterDTO dto) {        // 1.注册选择器信息        String selectorHandler = selectorHandler(dto);        String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);        // 2.注册规则信息        String ruleHandler = ruleHandler();        RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);        ruleService.registerDefault(ruleDTO);        // 3.注册元数据信息        registerMetadata(dto);        // 4.注册contextPath        String contextPath = dto.getContextPath();        if (StringUtils.isNotEmpty(contextPath)) {            registerContextPath(dto);        }        return ShenyuResultMessage.SUCCESS;    }
复制代码


整个注册逻辑可以分为 4 个步骤:


  • 1.注册选择器信息

  • 2.注册规则信息

  • 3.注册元数据信息

  • 4.注册contextPath


admin这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath。具体的注册过程和细节处理跟rpc类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。


服务端元数据注册流程的源码分析完了,流程图描述如下:



  • registerURI(): 注册URI数据


public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {        if (CollectionUtils.isEmpty(uriList)) {            return "";        }        // 对应的选择器是否存在        SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        if (Objects.isNull(selectorDO)) {            return "";        }        // 处理选择器中的handler信息        String handler = buildHandle(uriList, selectorDO);        selectorDO.setHandle(handler);        SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));        selectorData.setHandle(handler);               // 更新数据库中的记录        selectorService.updateSelective(selectorDO);        // 发布事件        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));        return ShenyuResultMessage.SUCCESS;    }
复制代码


admin拿到URI数据后,主要是更新选择器中的handler信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。


服务端URI注册流程的源码分析完成了,用图描述如下:



至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler

4. 总结

本文主要对Apache ShenYu网关中的http注册模块进行了源码分析。涉及到的主要知识点,归纳如下:


  • 注册中心是为了将客户端信息注册到admin,方便流量筛选;

  • http注册是将客户端元数据信息和URI信息注册到admin

  • http服务的接入通过注解@ShenyuSpringMvcClient标识;

  • 注册信息的构建主要通过Spring的后置处理器BeanPostProcessor和应用监听器ApplicationListener

  • 注册类型的加载通过SPI完成;

  • 引入Disruptor队列是为了数据与操作解耦,以及数据缓冲。

  • 注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。

用户头像

子夜2104

关注

还未添加个人签名 2018.03.21 加入

还未添加个人简介

评论

发布
暂无评论
Apache ShenYu源码阅读系列-注册中心实现原理之Http注册