Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API
网关。
在ShenYu
网关中,注册中心是用于将客户端信息注册到shenyu-admin
,admin
再通过数据同步将这些信息同步到网关,网关通过这些数据完成流量筛选。客户端信息主要包括接口信息
和URI信息
。
本文基于shenyu-2.4.1
版本进行源码分析,官网的介绍请参考 客户端接入原理 。
1. 注册中心原理
当客户端启动时,读取接口信息和uri信息
,通过指定的注册类型,将数据发送到shenyu-admin
。
图中的注册中心需要用户指定使用哪种注册类型,ShenYu
当前支持Http
、Zookeeper
、Etcd
、Consul
和Nacos
进行注册。具体如何配置请参考 客户端接入配置 。
ShenYu
在注册中心的原理设计上引入了Disruptor
,Disruptor
队列在其中起到数据与操作解耦,利于扩展。如果注册请求过多,导致注册异常,也有数据缓冲作用。
如图所示,注册中心分为两个部分,一是注册中心客户端register-client
,负载处理客户端数据读取。另一个是注册中心服务端register-server
,负载处理服务端(就是shenyu-admin
)数据写入。通过指定注册类型进行数据发送和接收。
客户端:通常来说就是一个微服务,可以是springmvc
,spring-cloud
,dubbo
,grpc
等。
register-client
:注册中心客户端,读取客户接口和uri
信息。
Disruptor
:数据与操作解耦,数据缓冲作用。
register-server
:注册中心服务端,这里就是shenyu-admin
,接收数据,写入数据库,发数据同步事件。
注册类型:指定注册类型,完成数据注册,当前支持Http
、Zookeeper
、Etcd
、Consul
和Nacos
。
本文分析的是使用Http
的方式进行注册,所以具体的处理流程如下:
在客户端,数据出队列后,通过http
传输数据,在服务端,提供相应的接口,接收数据,然后写入队列。
2. 客户端注册流程
当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。官方提供的例子是一个由springboot
构建的微服务。注册中心的相关配置可以参考官网 客户端接入配置 。
2.1 加载配置,读取属性
先用一张图串联下注册中心客户端初始化流程:
我们分析的是通过http
的方式进行注册,所以需要进行如下配置:
shenyu:
register:
registerType: http
serverLists: http://localhost:9095
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false
复制代码
每个属性表示的含义如下:
registerType
: 服务注册类型,填写 http
。
serverList
: 为http
注册类型时,填写Shenyu-Admin
项目的地址,注意加上http://
,多个地址用英文逗号分隔。
port
: 你本项目的启动端口,目前springmvc/tars/grpc
需要进行填写。
contextPath
: 为你的这个mvc
项目在shenyu
网关的路由前缀, 比如/order
,/product
等等,网关会根据你的这个前缀来进行路由。
appName
:你的应用名称,不配置的话,会默认取 spring.application.name
的值。
isFull
: 设置 true
代表代理你的整个服务,false
表示代理你其中某几个controller
;目前适用于springmvc/springcloud
。
项目启动后,会先加载配置文件,读取属性信息,生成相应的Bean
。
首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration
,它是shenyu
客户端http
注册配置类,通过@Configuration
表示这是一个配置类,通过@ImportAutoConfiguration
引入其他配置类。创建SpringMvcClientBeanPostProcessor
,主要处理元数据。创建ContextRegisterListener
,主要处理 URI
信息。
/**
* shenyu 客户端http注册配置类
*/
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
//创建SpringMvcClientBeanPostProcessor,主要处理元数据
@Bean
public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
return new SpringMvcClientBeanPostProcessor(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);
}
// 创建ContextRegisterListener,主要处理 URI信息
@Bean
public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
}
}
复制代码
ShenyuClientCommonBeanConfiguration
是shenyu
客户端通用配置类,会创建注册中心客户端通用的bean
。
创建ShenyuClientRegisterRepository
,通过工厂类创建而成。
创建ShenyuRegisterCenterConfig
,读取shenyu.register
属性配置。
创建ShenyuClientConfig
,读取shenyu.client
属性配置。
/**
* shenyu客户端通用配置类
*/
@Configuration
public class ShenyuClientCommonBeanConfiguration {
// 创建ShenyuClientRegisterRepository,通过工厂类创建而成。
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}
// 创建ShenyuRegisterCenterConfig,读取shenyu.register属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
// 创建ShenyuClientConfig,读取shenyu.client属性配置
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}
复制代码
2.2 用于注册的 HttpClientRegisterRepository
上面的配置文件中生成的ShenyuClientRegisterRepository
是客户端注册的具体实现,它是一个接口,它的实现类如下。
HttpClientRegisterRepository
:通过http
进行注册;
ConsulClientRegisterRepository
:通过Consul
进行注册;
EtcdClientRegisterRepository
:通过Etcd
进行注册;
NacosClientRegisterRepository
:通过nacos
进行注册;
ZookeeperClientRegisterRepository
通过Zookeeper
进行注册。
具体是哪一种方式,是通过SPI
进行加载实现的,实现逻辑如下:
/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
复制代码
加载类型通过registerType
指定,也就是我们在配置文件中指定的类型:
shenyu:
register:
registerType: http
serverLists: http://localhost:9095
复制代码
我们指定的是http
,所以会去加载HttpClientRegisterRepository
。对象创建成功后,执行的初始化方法init()
如下:
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
@Override
public void init(final ShenyuRegisterCenterConfig config) {
this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));
}
// 暂时省略其他逻辑
}
复制代码
读取配置文件中的serverLists
,即sheenyu-admin
的地址,为后续数据发送做准备。类注解@Join
用于SPI
的加载。
SPI
全称为 Service Provider Interface
, 是 JDK
内置的一种服务提供发现功能, 一种动态替换发现的机制。
shenyu-spi 是Apache ShenYu
网关自定义的SPI
扩展实现,设计和实现原理参考了Dubbo
的 SPI扩展实现 。
2.3 构建元数据的 SpringMvcClientBeanPostProcessor
创建SpringMvcClientBeanPostProcessor
,负责元数据的构建和注册,它的构造函数逻辑如下:
/**
* spring mvc 客户端bean的后置处理器
*/
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
/**
* 通过构造函数进行实例化
*/
public SpringMvcClientBeanPostProcessor(final PropertiesConfig clientConfig,
final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 读取配置属性
Properties props = clientConfig.getProps();
// 获取端口信息,并校验
int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
if (port <= 0) {
String errorMsg = "http register param must config the port must > 0";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
// 获取appName
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// 获取contextPath
this.contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
// 校验appName和contextPath
if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the appName or contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
// 获取 isFull
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// 开始事件发布
publisher.start(shenyuClientRegisterRepository);
}
// 暂时省略了其他逻辑
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 暂时省略了其他逻辑
}
}
复制代码
在构造函数中,主要是读取属性信息,然后进行校验。
shenyu:
client:
http:
props:
contextPath: /http
appName: http
port: 8189
isFull: false
复制代码
最后,执行了 publisher.start()
,开始事件发布,为注册做准备。
ShenyuClientRegisterEventPublisher
通过单例模式实现,主要是生成元数据
和URI
订阅器(后续用于数据发布),然后启动Disruptor
队列。提供了一个共有方法publishEvent()
,发布事件,向 Disruptor 队列发数据。
public class ShenyuClientRegisterEventPublisher {
// 私有变量
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
private DisruptorProviderManage providerManage;
private RegisterClientExecutorFactory factory;
/**
* 公开静态方法
*
* @return ShenyuClientRegisterEventPublisher instance
*/
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}
/**
* Start方法执行
*
* @param shenyuClientRegisterRepository shenyuClientRegisterRepository
*/
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// 创建客户端注册工厂类
factory = new RegisterClientExecutorFactory();
// 添加元数据订阅器
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 添加URI订阅器
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
// 启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}
/**
* 发布事件,向Disruptor队列发数据
*
* @param <T> the type parameter
* @param data the data
*/
public <T> void publishEvent(final T data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(f -> f.setData(data));
}
}
复制代码
SpringMvcClientBeanPostProcessor
的构造函数逻辑分析完了,主要是读取属性配置,创建元数据和URI
订阅器, 启动Disruptor
队列。要注意到它实现了BeanPostProcessor
,这是Spring
提供的一个接口,在Bean
的生命周期中,真正开始使用之前,会执行后置处理器的postProcessAfterInitialization()
方法。
SpringMvcClientBeanPostProcessor
作为一个后置处理器,它的功能是:读取注解中的元数据,并向admin
注册。
// 后置处理器
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
// 省略了其他逻辑
// 后置处理器:读取注解中的元数据,并向admin注册
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
// 配置属性,如果 isFull=true 的话,表示注册整个微服务
if (isFull) {
return bean;
}
// 获取当前bean的Controller注解
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
// 获取当前bean的RequestMapping注解
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
// 如果这个bean是一个接口
if (controller != null || requestMapping != null) {
// 获取当前bean的 ShenyuSpringMvcClient 注解
ShenyuSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), ShenyuSpringMvcClient.class);
String prePath = "";
//如果没有 ShenyuSpringMvcClient 注解,就返回,表示这个接口不需要注册
if (Objects.isNull(clazzAnnotation)) {
return bean;
}
//如果 ShenyuSpringMvcClient 注解中的path属性包括 * ,表示注册整个接口
if (clazzAnnotation.path().indexOf("*") > 1) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(clazzAnnotation, prePath));
return bean;
}
prePath = clazzAnnotation.path();
// 获取当前bean的所有方法
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
// 遍历方法
for (Method method : methods) {
// 获取当前方法上的注解 ShenyuSpringMvcClient
ShenyuSpringMvcClient shenyuSpringMvcClient = AnnotationUtils.findAnnotation(method, ShenyuSpringMvcClient.class);
// 如果方法上有注解ShenyuSpringMvcClient,就表示该方法需要注册
if (Objects.nonNull(shenyuSpringMvcClient)) {
// 构建元数据,发送注册事件
publisher.publishEvent(buildMetaDataDTO(shenyuSpringMvcClient, prePath));
}
}
}
return bean;
}
// 构造元数据
private MetaDataRegisterDTO buildMetaDataDTO(final ShenyuSpringMvcClient shenyuSpringMvcClient, final String prePath) {
// contextPath上下文名称
String contextPath = this.contextPath;
// appName应用名称
String appName = this.appName;
// path注册路径
String path;
if (StringUtils.isEmpty(contextPath)) {
path = prePath + shenyuSpringMvcClient.path();
} else {
path = contextPath + prePath + shenyuSpringMvcClient.path();
}
// desc描述信息
String desc = shenyuSpringMvcClient.desc();
// ruleName规则名称,没有填写的话就和path一致
String configRuleName = shenyuSpringMvcClient.ruleName();
String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
// 构建元数据
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(path)
.pathDesc(desc)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(shenyuSpringMvcClient.enabled())
.ruleName(ruleName)
.registerMetaData(shenyuSpringMvcClient.registerMetaData())
.build();
}
}
复制代码
在后置处理器中,需要读取配置属性,如果 isFull=true
的话,表示注册整个微服务。获取当前bean
的Controller
注解、RequestMapping
注解、ShenyuSpringMvcClient
注解,通过读取这些注解信息判断当前bean
是否是接口?接口是否需要注册?方法是否需要注册?然后根据ShenyuSpringMvcClient
注解中的属性构建元数据,最后通过publisher.publishEvent()
发布事件进行注册。
Controller
注解和RequestMapping
注解是由Spring
提供的,这个大家应该很熟悉,不过多赘述。ShenyuSpringMvcClient
注解是由Apache ShenYu
提供的,用于注册SpringMvc
客户端,它的定义如下:
/**
* shenyu 客户端接口,用于方法上或类上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ShenyuSpringMvcClient {
// path 注册路径
String path();
// ruleName 规则名称
String ruleName() default "";
// desc 描述信息
String desc() default "";
// enabled是否启用
boolean enabled() default true;
// registerMetaData 注册元数据
boolean registerMetaData() default false;
}
复制代码
它的使用如下:
@RestController
@RequestMapping("/test")
@ShenyuSpringMvcClient(path = "/test/**") // 表示整个接口注册
public class HttpTestController {
//......
}
复制代码
@RestController
@RequestMapping("/order")
@ShenyuSpringMvcClient(path = "/order")
public class OrderController {
/**
* Save order dto.
*
* @param orderDTO the order dto
* @return the order dto
*/
@PostMapping("/save")
@ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法
public OrderDTO save(@RequestBody final OrderDTO orderDTO) {
orderDTO.setName("hello world save order");
return orderDTO;
}
复制代码
该方法会将数据发送到Disruptor
队列中,关于Disruptor
队列更多细节这里不做更多介绍,这不影响分析注册的流程。
当数据发送后,Disruptor
队列的消费者会处理数据,进行消费。
QueueConsumer
是一个消费者,它实现了WorkHandler
接口,它的创建过程在providerManage.startup()
逻辑中。WorkHandler
接口是disruptor
的数据消费接口,只有一个方法是onEvent()
。
package com.lmax.disruptor;
public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}
复制代码
QueueConsumer
重写了onEvent()
方法,主要逻辑是生成消费任务,然后在线程池中去执行。
/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 省略了其他逻辑
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}
复制代码
QueueConsumerExecutor
是在线程池中被执行的任务,它实现了Runnable
接口,具体的实现类有两个:
顾名思义,一个负责处理客户端任务,一个负责处理服务端任务(服务端就是admin
,在下文进行分析)。
重写的run()
逻辑如下:
public final class RegisterClientConsumerExecutor extends QueueConsumerExecutor<DataTypeParent> {
//......
@Override
public void run() {
// 获取数据
DataTypeParent dataTypeParent = getData();
// 根据数据类型调用相应的处理器进行处理
subscribers.get(dataTypeParent.getType()).executor(Lists.newArrayList(dataTypeParent));
}
}
复制代码
根据不同的数据类型调用不同的处理器去执行相应的任务。数据类型有两种,一个是元数据,记录客户端注册信息。一个是URI
数据,记录客户端服务信息。
//数据类型
public enum DataType {
// 元数据
META_DATA,
// URI数据
URI,
}
复制代码
执行器订阅者也分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
客户端这边对元数据处理逻辑是:遍历元数据信息,调用接口方法persistInterface()
完成数据的发布。
public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.META_DATA; // 元数据
}
@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 调用接口方法persistInterface()完成数据的发布
shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);
}
}
}
复制代码
ShenyuClientRegisterRepository
是一个接口,用于表示客户端数据注册,它的实现类目前有五种,每一种就表示一种注册方法。
ConsulClientRegisterRepository
:通过Consul
实现客户端注册;
EtcdClientRegisterRepository
:通过Etcd
实现客户端注册;
HttpClientRegisterRepository
:通过Http
实现客户端注册;
NacosClientRegisterRepository
:通过Nacos
实现客户端注册;
ZookeeperClientRegisterRepository
:通过Zookeeper
实现客户端注册;
从图中可以看出,注册中心的加载是通过SPI
的方式完成的。这个在前面提到过了,在客户端通用配置文件中,通过指定配置文件中的属性完成具体的类加载。
/**
* 加载 ShenyuClientRegisterRepository
*/
public final class ShenyuClientRegisterRepositoryFactory {
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
/**
* 创建 ShenyuClientRegisterRepository
*/
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过SPI的方式进行加载,类型由registerType决定
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
//执行初始化操作
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
复制代码
本文的源码分析是基于Http
的方式进行注册,所以我们先分析HttpClientRegisterRepository
,其他的注册方式后续再分析。
通过http
的方式注册很简单,就是调用工具类发送http
请求。注册元数据和 URI 都是调用的同一个方法doRegister()
,指定接口和类型就好。
@Join
public class HttpClientRegisterRepository implements ShenyuClientRegisterRepository {
// 服务端提供的接口用于注册元数据
private static final String META_PATH = "/shenyu-client/register-metadata";
// 服务端提供的接口用于注册URI
private static final String URI_PATH = "/shenyu-client/register-uri";
//注册URI
@Override
public void persistURI(final URIRegisterDTO registerDTO) {
doRegister(registerDTO, URI_PATH, Constants.URI);
}
//注册接口(就是元数据信息)
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {
doRegister(metadata, META_PATH, META_TYPE);
}
// 进行注册
private <T> void doRegister(final T t, final String path, final String type) {
// 遍历admin服务列表(admin可能是集群)
for (String server : serverList) {
try {
// 调用工具类发送 http 请求
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), server + path, type);
return;
} catch (Exception e) {
LOGGER.error("register admin url :{} is fail, will retry", server);
}
}
}
}
复制代码
将数据序列化后,通过OkHttp
发送数据。
public final class RegisterUtils {
//......
// 通过OkHttp发送数据
public static void doRegister(final String json, final String url, final String type) throws IOException {
String result = OkHttpTools.getInstance().post(url, json);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}
复制代码
至此,客户端通过http
的方式注册元数据的逻辑就分析完了。小结一下:通过读取自定义的注解信息构造元数据,将数据发到Disruptor
队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http
请求到admin
。
客户端元数据注册流程的源码分析过程完成了,用流程图描述如下:
2.4 构建 URI 的 ContextRegisterListener
创建 ContextRegisterListener
,负责客户端URI
数据的构建和注册,它的创建是在配置文件中完成。
@Configuration
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
public class ShenyuSpringMvcClientConfiguration {
// ......
// 创建 ContextRegisterListener
@Bean
public ContextRegisterListener contextRegisterListener(final ShenyuClientConfig clientConfig) {
return new ContextRegisterListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()));
}
}
复制代码
ContextRegisterListener
实现了ApplicationListener
接口,并重写了onApplicationEvent()
方法,当有 Spring 事件发生后,该方法会执行。
// 实现了ApplicationListener接口
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
//......
//通过构造函数完成实例化
public ContextRegisterListener(final PropertiesConfig clientConfig) {
// 读取 shenyu.client.http 配置信息
Properties props = clientConfig.getProps();
// isFull是否注册整个服务
this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));
// contextPath上下文路径
String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH);
this.contextPath = contextPath;
if (isFull) {
if (StringUtils.isBlank(contextPath)) {
String errorMsg = "http register param must config the contextPath";
LOG.error(errorMsg);
throw new ShenyuClientIllegalArgumentException(errorMsg);
}
this.contextPath = contextPath + "/**";
}
// port 客户端端口信息
int port = Integer.parseInt(props.getProperty(ShenyuClientConstants.PORT));
// appName 应用名称
this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);
// host信息
this.host = props.getProperty(ShenyuClientConstants.HOST);
this.port = port;
}
// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent contextRefreshedEvent) {
//保证该方法的内容只执行一次
if (!registered.compareAndSet(false, true)) {
return;
}
// 如果是 isFull=true 代表注册整个服务,构建元数据并注册
if (isFull) {
publisher.publishEvent(buildMetaDataDTO());
}
// 构建URI数据并注册
publisher.publishEvent(buildURIRegisterDTO());
}
// 构建URI数据
private URIRegisterDTO buildURIRegisterDTO() {
String host = IpUtils.isCompleteHost(this.host) ? this.host : IpUtils.getHost(this.host);
return URIRegisterDTO.builder()
.contextPath(this.contextPath)
.appName(appName)
.host(host)
.port(port)
.rpcType(RpcTypeEnum.HTTP.getName())
.build();
}
// 构建元数据
private MetaDataRegisterDTO buildMetaDataDTO() {
String contextPath = this.contextPath;
String appName = this.appName;
return MetaDataRegisterDTO.builder()
.contextPath(contextPath)
.appName(appName)
.path(contextPath)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(contextPath)
.build();
}
}
复制代码
在构造函数中主要是读取属性配置。
onApplicationEvent()
方法是有Spring
事件发生时会执行,这里的参数是ContextRefreshedEvent
,表示上下文刷新事件。当Spring
容器就绪后执行此处逻辑:如果是 isFull=true
代表注册整个服务,构建元数据并注册,在前面分析的后置处理器SpringMvcClientBeanPostProcessor
中没有处理 isFull=true
的情况,所以在此处进行了处理。然后再构建URI
数据并注册。
ContextRefreshedEvent
是Spring
内置事件。ApplicationContext
被初始化或刷新时,该事件被触发。这也可以在 ConfigurableApplicationContext
接口中使用 refresh()
方法来发生。此处的初始化是指:所有的Bean
被成功装载,后处理Bean
被检测并激活,所有Singleton Bean
被预实例化,ApplicationContext
容器已就绪可用。
注册逻辑都是通过 publisher.publishEvent()
完成。在前面都已经分析过了:向Disruptor
队列写入数据,再从中消费数据,最后通过ExecutorSubscriber
去处理。
执行器订阅者分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
这里是注册URI
信息,所以执行类是ShenyuClientURIExecutorSubscriber
。
主要逻辑是遍历 URI 数据集合,通过persistURI()
方法实现数据注册。
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.URI; //数据类型是URI
}
// 注册URI数据
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
//添加hook,优雅停止客户端
ShenyuClientShutdownHook.delayOtherHooks();
// 注册URI
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
}
}
}
复制代码
代码中的while(true)
循环是为了保证客户端已经成功启动了,通过host
和port
可以连接上。
后面的逻辑是:添加hook
函数,用于优雅停止客户端 。
通过persistURI()
方法实现数据注册。整个逻辑也在前面分析过了,最终就是通过OkHttp
客户端向shenyu-admin
发起http
,通过http
的方式注册URI
。
分析到这里就将客户端的注册逻辑分析完了,将构建的元数据和 URI 数据发送到Disruptor
队列,再从中消费,读取数据,通过http
向admin
发送数据。
客户端URI
注册流程的源码分析完成了,流程图如下:
3. 服务端注册流程
3.1 注册接口 ShenyuHttpRegistryController
从前面的分析可以知道,服务端提供了注册的两个接口:
这两个接口位于ShenyuHttpRegistryController
中,它实现了ShenyuServerRegisterRepository
接口,是服务端注册的实现类。它用@Join
标记,表示通过SPI
进行加载。
// shenuyu客户端接口
@RequestMapping("/shenyu-client")
@Join
public class ShenyuHttpRegistryController implements ShenyuServerRegisterRepository {
private ShenyuServerRegisterPublisher publisher;
@Override
public void init(final ShenyuServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
}
// 注册元数据
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// 注册URI
@PostMapping("/register-uri")
@ResponseBody
public String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {
publish(uriRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// 发布注册事件
private <T> void publish(final T t) {
publisher.publish(Collections.singletonList(t));
}
}
复制代码
两个注册接口获取到数据好,就调用了publish()
方法,把数据发布到Disruptor
队列中。
ShenyuServerRegisterRepository
接口是服务注册接口,它有五个实现类,表示有五种注册方式:
ConsulServerRegisterRepository
:通过Consul
实现注册;
EtcdServerRegisterRepository
:通过Etcd
实现注册;
NacosServerRegisterRepository
:通过Nacos
实现注册;
ShenyuHttpRegistryController
:通过Http
实现注册;
ZookeeperServerRegisterRepository
:通过Zookeeper
实现注册。
具体用哪一种方式,是通过配置文件指定的,然后通过SPI
进行加载。
在shenyu-admin
中的application.yml
文件中配置注册方式,registerType
指定注册类型,当用http
进行注册时,serverLists
不需要填写,更多配置说明可以参考官网 客户端接入配置 。
shenyu:
register:
registerType: http
serverLists:
复制代码
在引入相关依赖和属性配置后,启动shenyu-admin
时,会先加载配置文件,和注册中心相关的配置文件类是RegisterCenterConfiguration
。
// 注册中心配置类
@Configuration
public class RegisterCenterConfiguration {
// 读取配置属性
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
//创建ShenyuServerRegisterRepository,用于服务端注册
@Bean
public ShenyuServerRegisterRepository shenyuServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig, final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
// 1.从配置属性中获取注册类型
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 2.通过注册类型,以SPI的方法加载实现类
ShenyuServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuServerRegisterRepository.class).getJoin(registerType);
// 3.获取publisher,向Disruptor队列中写数据
RegisterServerDisruptorPublisher publisher = RegisterServerDisruptorPublisher.getInstance();
// 4.注册Service, rpcType -> registerService
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
// 5.事件发布的准备工作
publisher.start(registerServiceMap);
// 6.注册的初始化操作
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}
复制代码
在配置类中生成了两个bean
:
在创建shenyuServerRegisterRepository
的过程中,也进行了一系列的准备工作:
1.从配置属性中获取注册类型。
2.通过注册类型,以SPI
的方法加载实现类:比如指定的类型是http
,就会加载ShenyuHttpRegistryController
。
3.获取publisher
,向Disruptor
队列中写数据。
4.注册Service
, rpcType -> registerService
:获取注册的Service
,每种rpc
都有对应的Service
。本文的客户端构建是通过springboot
,属于http
类型,还有其他客户端类型:dubbo
,Spring Cloud
,gRPC
等。
5.事件发布的准备工作:添加服务端元数据和URI
订阅器,处理数据。并且启动Disruptor
队列。
6.注册的初始化操作:http
类型的注册初始化操作就是保存publisher
。
RegisterServerDisruptorPublisher#publish()
服务端向Disruptor
队列写入数据的发布者 ,通过单例模式构建。
public class RegisterServerDisruptorPublisher implements ShenyuServerRegisterPublisher {
//私有属性
private static final RegisterServerDisruptorPublisher INSTANCE = new RegisterServerDisruptorPublisher();
//公开静态方法获取实例
public static RegisterServerDisruptorPublisher getInstance() {
return INSTANCE;
}
//事件发布的准备工作,添加服务端元数据和URI订阅器,处理数据。并且启动Disruptor队列。
public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
//服务端注册工厂
factory = new RegisterServerExecutorFactory();
//添加URI数据订阅器
factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
//添加元数据订阅器
factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
//启动Disruptor队列
providerManage = new DisruptorProviderManage(factory);
providerManage.startup();
}
// 向队列中写入数据
@Override
public <T> void publish(final T data) {
DisruptorProvider<Object> provider = providerManage.getProvider();
provider.onData(f -> f.setData(data));
}
@Override
public void close() {
providerManage.getProvider().shutdown();
}
}
复制代码
配置文件的加载,可看作是注册中心服务端初始化流程,用图描述如下:
3.2 消费数据 QueueConsumer
在前面分析了客户端disruptor
队列消费数据的过。服务端也是一样的逻辑,只是其中执行任务的执行者变了。
QueueConsumer
是一个消费者,它实现了WorkHandler
接口,它的创建过程在providerManage.startup()
逻辑中。WorkHandler
接口是disruptor
的数据消费接口,只有一个方法是onEvent()
。
package com.lmax.disruptor;
public interface WorkHandler<T> {
void onEvent(T var1) throws Exception;
}
复制代码
QueueConsumer
重写了onEvent()
方法,主要逻辑是生成消费任务,然后在线程池中去执行。
/**
*
* 队列消费者
*/
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 省略了其他逻辑
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
// 通过工厂创建队列消费任务
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 保存数据
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
// 放在线程池中执行 消费任务
executor.execute(queueConsumerExecutor);
}
}
}
复制代码
QueueConsumerExecutor
是在线程池中被执行的任务,它实现了Runnable
接口,具体的实现类有两个:
顾名思义,一个负责处理客户端任务,一个负责处理服务端任务。
RegisterServerConsumerExecutor
是服务端消费者执行器,它通过QueueConsumerExecutor
间接实现了Runnable
接口,并重写了run()
方法。
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<List<DataTypeParent>> {
// ...
@Override
public void run() {
//获取从disruptor队列中拿到的数据
List<DataTypeParent> results = getData();
// 数据校验
results = results.stream().filter(data -> isValidData(data)).collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
return;
}
//根据类型执行操作
getType(results).executor(results);
}
// 根据类型获取订阅者
private ExecutorSubscriber getType(final List<DataTypeParent> list) {
DataTypeParent result = list.get(0);
return subscribers.get(result.getType());
}
}
复制代码
执行器订阅者分为两类,一个是处理元数据,一个是处理URI
。在客户端和服务端分别有两个,所以一共是四个。
如果是注册元数据,则通过MetadataExecutorSubscriber#executor()
实现:根据类型获取注册Service
,调用register()
。
public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.META_DATA; // 元数据类型
}
@Override
public void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {
// 遍历元数据列表
for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {
// 根据类型获取注册Service
ShenyuClientRegisterService shenyuClientRegisterService = this.shenyuClientRegisterService.get(metaDataRegisterDTO.getRpcType());
Objects.requireNonNull(shenyuClientRegisterService);
// 对元数据进行注册,加锁确保顺序执行,防止并发错误
synchronized (ShenyuClientRegisterService.class) {
shenyuClientRegisterService.register(metaDataRegisterDTO);
}
}
}
}
复制代码
如果是注册元数据,则通过URIRegisterExecutorSubscriber#executor()
实现:构建URI
数据,根据注册类型查找Service,
通过registerURI
方法实现注册。
public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
//......
@Override
public DataType getType() {
return DataType.URI; // URI数据类型
}
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 构建URI数据类型,通过registerURI方法实现注册
findService(dataList).ifPresent(service -> {
Map<String, List<URIRegisterDTO>> listMap = buildData(dataList);
listMap.forEach(service::registerURI);
});
}
// 根据类型查找Service
private Optional<ShenyuClientRegisterService> findService(final Collection<URIRegisterDTO> dataList) {
return dataList.stream().map(dto -> shenyuClientRegisterService.get(dto.getRpcType())).findFirst();
}
}
复制代码
ShenyuClientRegisterService
是注册方法接口,它有多个实现类:
AbstractContextPathRegisterService
:抽象类,处理部分公共逻辑;
AbstractShenyuClientRegisterServiceImpl
::抽象类,处理部分公共逻辑;
ShenyuClientRegisterDivideServiceImpl
:divide
类,处理http
注册类型;
ShenyuClientRegisterDubboServiceImpl
:dubbo
类,处理dubbo
注册类型;
ShenyuClientRegisterGrpcServiceImpl
:gRPC
类,处理gRPC
注册类型;
ShenyuClientRegisterMotanServiceImpl
:Motan
类,处理Motan
注册类型;
ShenyuClientRegisterSofaServiceImpl
:Sofa
类,处理Sofa
注册类型;
ShenyuClientRegisterSpringCloudServiceImpl
:SpringCloud
类,处理SpringCloud
注册类型;
ShenyuClientRegisterTarsServiceImpl
:Tars
类,处理Tars
注册类型;
从上面可以看出每种微服务都有对应的注册实现类,本文的源码分析是 以官方提供的 shenyu-examples-http 为例,是属http
注册类型,所以元数据和 URI 数据的注册实现类是 ShenyuClientRegisterDivideServiceImpl
:
public String register(final MetaDataRegisterDTO dto) {
// 1.注册选择器信息
String selectorHandler = selectorHandler(dto);
String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);
// 2.注册规则信息
String ruleHandler = ruleHandler();
RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);
ruleService.registerDefault(ruleDTO);
// 3.注册元数据信息
registerMetadata(dto);
// 4.注册contextPath
String contextPath = dto.getContextPath();
if (StringUtils.isNotEmpty(contextPath)) {
registerContextPath(dto);
}
return ShenyuResultMessage.SUCCESS;
}
复制代码
整个注册逻辑可以分为 4 个步骤:
1.注册选择器信息
2.注册规则信息
3.注册元数据信息
4.注册contextPath
在admin
这一侧通过客户端的元数据信息需要构建选择器、规则、元数据和ContextPath
。具体的注册过程和细节处理跟rpc
类型有关。我们就不再继续向下追踪了,对于注册中心的逻辑分析,跟踪到这里就够了。
服务端元数据注册流程的源码分析完了,流程图描述如下:
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {
if (CollectionUtils.isEmpty(uriList)) {
return "";
}
// 对应的选择器是否存在
SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
if (Objects.isNull(selectorDO)) {
return "";
}
// 处理选择器中的handler信息
String handler = buildHandle(uriList, selectorDO);
selectorDO.setHandle(handler);
SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));
selectorData.setHandle(handler);
// 更新数据库中的记录
selectorService.updateSelective(selectorDO);
// 发布事件
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));
return ShenyuResultMessage.SUCCESS;
}
复制代码
admin
拿到URI
数据后,主要是更新选择器中的handler
信息,然后写入到数据库,最后发布事件通知网关。通知网关的逻辑是由数据同步操作完成,这在之前的文章中已经分析过了,就不再赘述。
服务端URI
注册流程的源码分析完成了,用图描述如下:
至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor
队列,再从中消费数据,根据接收到的元数据和URI
数据更新admin
的选择器、规则、元数据和选择器的handler
。
4. 总结
本文主要对Apache ShenYu
网关中的http注册
模块进行了源码分析。涉及到的主要知识点,归纳如下:
注册中心是为了将客户端信息注册到admin
,方便流量筛选;
http
注册是将客户端元数据信息和URI
信息注册到admin
;
http
服务的接入通过注解@ShenyuSpringMvcClient
标识;
注册信息的构建主要通过Spring
的后置处理器BeanPostProcessor
和应用监听器ApplicationListener
;
注册类型的加载通过SPI
完成;
引入Disruptor
队列是为了数据与操作解耦,以及数据缓冲。
注册中心的实现采用了面向接口编程,使用模板方法、单例、观察者等设计模式。
评论