【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(下)
承接上文的对应的 Eureka 的上篇介绍,我们开始介绍,详见 [【SpringCloud 技术专题】「Eureka 源码分析」从源码层面让你认识 Eureka 工作流程和运作机制(上)]
原理回顾
Eureka Server 提供服务注册服务,各个节点启动后,会在 Eureka Server 中进行注册,这样 Eureka Server 中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。
Eureka Client 是一个 Java 客户端,用于简化与 Eureka Server 的交互,客户端同时也具备一个内置的、使用轮询负载算法的负载均衡器。
在应用启动后,将会向 Eureka Server 发送心跳(默认周期为 30 秒),如果 Eureka Server 在多个心跳周期(默认 3 个心跳周期=90 秒)没有收到某个节点的心跳,Eureka Server 将会从服务注册表中把这个服务节点移除。
高可用情况下的:Eureka Server 之间将会通过复制的方式完成数据的同步;
Eureka Client 具有缓存的机制,即使所有的 Eureka Server 都挂掉的话,客户端依然可以利用缓存中的信息消费其它服务的 API;
EurekaServer 启动流程分析
EurekaServer 处理服务注册、集群数据复制
EurekaClient 是如何注册到 EurekaServer 的?
刚才在 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在 EurekaServer 已经处于 Debug 运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试微服务来试试吧,直接 Run。
当启动后,就一定会调用注册 register 方法,那么就接着往下看,拭目以待;
实例注册方法机制
InstanceRegistry.register 顺着堆栈信息往上看,是 ApplicationResource.addInstance 方法被调用了,分析 addInstance;
ApplicationResource 类
主要是处理接收 Http 的服务请求。
这里的写法貌似看起来和我们之前 Controller 的 RESTFUL 写法有点不一样,仔细一看,原来是 Jersey RESTful 框架,是一个产品级的 RESTful service 和 client 框架。与 Struts 类似,它同样可以和 hibernate,spring 框架整合。
看到 registry.register(info, "true".equals(isReplication)); 注册啊,原来 EurekaClient 客户端启动后会调用会通过 Http(s)请求,直接调到 ApplicationResource.addInstance 方法,只要是和注册有关的,都会调用这个方法。
接着我们深入 registry.register(info, "true".equals(isReplication)) 查看;
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;
然后通过 ApplicationContext 发布了一个事件 EurekaInstanceRegisteredEvent 服务注册事件,可以给 EurekaInstanceRegisteredEvent 添加监听事件,那么用户就可以在此刻实现自己想要的一些业务逻辑。
然后我们再来看看 super.register(info, isReplication) 方法,该方法是 InstanceRegistry 的父类 PeerAwareInstanceRegistryImpl 的方法。
服务户厕机制
进入 PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;
进入 super.register(info, leaseDuration, isReplication),如何写入 EurekaServer 的注册表的,进入 AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。
发现这个方法有点长,大致阅读,主要更新了注册表的时间之外,还更新了缓存等其它东西,大家有兴趣的可以深究阅读该方法;
集群之间的复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。
每当有注册请求,首先更新 EurekaServer 的注册表,然后再将信息同步到其它 EurekaServer 的节点上去;
接下来我们看看 node 节点是如何进行复制操作的,进入 replicateInstanceActionsToPeers 方法。
节点之间的复制状态操作,都在这里体现的淋漓尽致,那么我们就拿 Register 类型 node.register(info) 来看,我们来看看 node 究竟是如何做到同步信息的,进入 node.register(info) 方法看看;
同级之间的复制机制
PeerEurekaNode.register(final InstanceInfo info) 方法,一窥究竟如何同步数据。
这里涉及到了 Eureka 的任务批处理,通常情况下 Peer 之间的同步需要调用多次,如果 EurekaServer 一多的话,那么将会有很多 http 请求,所以自然而然的孕育出了任务批处理,但是也在一定程度上导致了注册和下线的一些延迟,突出优势的同时也势必会造成一些劣势,但是这些延迟情况还是能符合常理在容忍范围之内的。
在 expiryTime 超时时间之内,批次处理要做的事情就是合并任务为一个 List,然后发送请求的时候,将这个批次 List 直接打包发送请求出去,这样的话,在这个批次的 List 里面,可能包含取消、注册、心跳、状态等一系列状态的集合 List。
我们再接着看源码,batchingDispatcher.process 这么一调用,然后我们就直接看这个 TaskDispatchers.createBatchingTaskDispatcher 方法。
这里的 process 方法会将任务添加到队列中,有入队列自然有出队列,具体怎么取任务,我就不一一给大家讲解了,我就讲讲最后是怎么触发任务的。进入 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 这句代码的 TaskExecutors.batchExecutors 方法。
我们发现 TaskExecutors 类中的 batchExecutors 这个静态方法,有个 BatchWorkerRunnable 返回的实现类,因此我们再次进入 BatchWorkerRunnable 类看看究竟,而且既然是 Runnable,那么势必会有 run 方法。
这就是我们 BatchWorkerRunnable 类的 run 方法,这里面首先要获取信号量释放,才能获得任务集合,一旦获取到了任务集合的话,那么就直接调用 processor.process(tasks) 方法请求 Peer 节点同步数据,接下来我们看看 ReplicationTaskProcessor.process 方法;
感觉快要见到真相了,所以我们迫不及待的进入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窥究竟。
看到了相对路径地址,我们搜索下"batch"这样的字符串看看有没有对应的接收方法或者被 @Path 注解进入的;在 eureka-core-1.4.12.jar 这个包下面,果然搜到到了 @Path("batch") 这样的字样,直接进入,发现这是 PeerReplicationResource 类的方法 batchReplication,我们进入这方法看看。
看到了循环一次遍历任务进行处理,不知不觉觉得心花怒放,胜利的重点马上就要到来了,我们进入 PeerReplicationResource.dispatch 方法看看。
随便抓一个类型,那我们也拿 Register 类型来看,进入 PeerReplicationResource.handleRegister 看看。
Peer 节点的同步旅程终于结束了,最终又回调到了 ApplicationResource.addInstance 这个方法,这个方法在最终是 EurekaClient 启动后注册调用的方法,然而 Peer 节点的信息同步也调用了这个方法,仅仅只是通过一个变量 isReplication 为 true 还是 false 来判断是否是节点复制。剩下的 ApplicationResource.addInstance 流程前面已经提到过了,相信大家已经明白了注册的流程是如何扭转的,包括批量任务是如何处理 EurekaServer 节点之间的信息同步的了。
EurekaClient 启动流程分析
调换运行模式
Run 运行 discovery-eureka 服务,Debug 运行 provider-user 服务,先观察日志先;
服务提供方主体加载流程
【1】:仔细查看下日志,先是 DefaultLifecycleProcessor 类处理了一些 bean,然后接下来肯定会调用一些实现 SmartLifecycle 类的 start 方法;
【2】: 接着初始化设置了 EurekaClient 的状态为 STARTING,初始化编码使用的格式,哪些用 JSON,哪些用 XML;
【3】: 紧接着打印了强制获取注册信息状态为 false,已注册的应用大小为 0,客户端发送心跳续约,心跳续约间隔为 30 秒,最后打印 Client 初始化完成;
EnableEurekaClient 组件。
@EnableEurekaClient
这个注解类竟然也使用了注解 @EnableDiscoveryClient,那么我们有必要去这个注解类看看。
@EnableDiscoveryClient
这个注解类有个比较特殊的注解 @Import,由此我们猜想,这里的大多数逻辑是不是都写在这个 EnableDiscoveryClientImportSelector 类呢?
EnableDiscoveryClientImportSelector
EnableDiscoveryClientImportSelector 类继承了 SpringFactoryImportSelector 类,但是重写了一个 isEnabled() 方法,默认值返回 true,为什么会返回 true。
EnableDiscoveryClientImportSelector.selectImports
首先通过注解获取了一些属性,然后加载了一些类名称,我们进入 loadFactoryNames 方法看看。
加载了一个配置文件,配置文件里面写了啥呢?打开 SpringFactoryImportSelector 该文件所在的 jar 包的 spring.factories 文件一看。
都是一些 Configuration 后缀的类名,所以这些都是加载的一堆堆的配置文件类。factories 对象里面只有一个类名路径为 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。
EurekaDiscoveryClientConfiguration
首先看到该类实现了 SmartLifecycle 接口,那么就肯定会实现 start 方法,而且这个 start 方法感觉应在会被加载执行的。this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 这段代码有一个观察者模式的回调存在。
这个方法会因为状态的改变而回调所有实现 StatusChangeListener 这个类的地方,前提得先注册到 listeners 中去才行。
于是乎,我们断定,若想要回调,那么就必须有地方先注册这个事件,而且这个注册还必须提前执行在 start 方法前执行,于是我们得先在 ApplicationInfoManager 这个类中找到注册到 listeners 的这个方法。
于是我们逆向找下 registerStatusChangeListener 被调用的地方。
很不巧的是,尽然只有 1 个地方被调用,这个地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks 方法又是在 DiscoveryClient 的构造函数里面调用的,同时我们也对 initScheduledTasks 以及 initScheduledTasks 被调用的构造方法地方打上断点。
果不其然,EurekaDiscoveryClientConfiguration.start 方法被调用了,紧接着 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也进入断点,然后在往下走,又进入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回调处。
看着断点依次经过我们上述分析的地方,然后也符合日志打印的顺序,所以我们现在应该是有必要好好看看 DiscoveryClient.initScheduledTasks 这个方法究竟干了什么伟大的事情。然而又想了想,还不如看看 initScheduledTasks 被调用的构造方法。
DiscoveryClient 经过 @Inject 注解过的构造方法。
从往下看,initScheduledTasks 这个方法顾名思义就是初始化调度任务,所以这里面的内容应该就是重头戏,进入看看。
在这个方法从上往下一路注释分析下来,干了 EurekaClient 我们最想知道的一些事情,定时任务获取注册信息,定时任务刷新缓存,定时任务心跳续约,定时任务同步数据中心数据,状态变化监听回调等。但是唯独没看到注册,这是怎么回事呢?
instanceInfoReplicator.onDemandUpdate() 就是在状态改变的时候。
onDemandUpdate 这个方法,唯独 InstanceInfoReplicator.this.run() 这个方法还有点用,而且还是 run 方法呢,感情 InstanceInfoReplicator 这个类还是实现了 Runnable 接口?经过查看这个类,还真是实现了 Runnable 接口。
这个方法应该我们要找的注册所在的地方。
discoveryClient.register() 这个 register 方法,原来注册方法就是这个。
原来调用了 EurekaHttpClient 封装的客户端请求对象来进行注册的,再继续深探 registrationClient.register 方法,于是我们来到了 AbstractJerseyEurekaHttpClient.register 方法。
调用的是 Jersey RESTful 框架来进行请求的,然后在 EurekaServer 那边就会在 ApplicationResource.addInstance 方法接收客户端的注册请求,因此我们的 EurekaClient 是如何注册的就到此为止了。
版权声明: 本文为 InfoQ 作者【李浩宇/Alex】的原创文章。
原文链接:【http://xie.infoq.cn/article/18895ee3d3f097d395327e140】。文章转载请联系作者。
评论