【源码系列】Spring Cloud Eureka

用户头像
Alex🐒
关注
发布于: 2020 年 06 月 28 日

Spring Cloud Eureka

Spring Cloud Eureka 实现了服务的注册与发现, 同时包含了服务端和客户端组件,均采用 Java 编写。



Eureka 服务端,也称为服务注册中心,支持高可用配置,依托于强一致性提供良好的服务实例可用性,可以应对多种不同的故障场景。



Eureka 客服端,处理服务的注册与发现。在应用程序运行时,Eureka 客户端向注册中心注册自身提供的服务并定期性的发送心跳来更新服务租约。同时,也从服务端查询当前注册的服务信息并更新本地缓存。

Eureka 架构体系

Eureka 架构中有三个角色:

  • 服务注册中心,Eureka 提供的服务端,提供服务注册与发现的功能

  • 服务提供者,集成了 Eureka 客户端,提供服务的应用,遵循 Eureka 通信机制,将自己提供的服务注册到 Eureka,以供其他应用发现

  • 服务消费者,集成了 Eureka 客户端,从服务注册中心获取服务列表,能够调用服务提供者的接口。服务提供者和服务消费者只是根据服务调用方向的逻辑定义。

服务提供者

服务注册

服务提供者在启动的时候会通过发送 REST 请求将自己注册到 Eureka Server,同时携带自身服务的一些元数据信息。Eureka Server 将元数据进行存储。Eureka Server 之间可以互相注册。



服务同步

不同的服务提供者可能会注册到不同的 Eureka Server 上。当服务提供者发送注册请求到一个 Eureka Servcer 时,会将该请求转发给其他的 Eureka Server(需要注册到当前节点),实现注册中心之间的服务同步,保证服务提供者的信息就可以通过任意 Eureka Server 获取到。



服务续约

在注册完服务之后,服务提供者会维护一个心跳用来持续通知 Eureka Server 服务存活状态(Renew),防止 Eureka Server 的将该服务实例从服务列表中排除出去(会定时监测未存活的实例,进行清除)。



服务下线

在系统运行过程中会面临关闭或重启服务的某个实例的情况,在服务关闭期间,不希望客户端会继续调用关闭了的实例。所以在客户端程序中,当服务实例进行正常的关闭操作时,会发送一个服务下线的 REST 请求给 Eureka Server,在接收到请求之后,将该服务状态设置为下线(DOWN),并转发给其他的 Eureka Server。

服务消费者

获取服务

服务消费者在启动的时会发送 REST 请求给任意 Eureka Server 获取注册的服务清单。为了性能考虑,Eureka Server 会维护一份只读的服务清单来返回给客户端(会定时更新)。



服务调用

服务消费者在获取服务清单后,通过服务名可以获得具体提供服务的实例名和该实例的元数据信息。因为有这些服务实例的详细信息,所以客户端可以根据自己的需要决定具体需要调用的实例,例如,如果使用 Ribbon,会默认采用轮询的方式进行调用。



对于访问实例的选择,Eureka 中有 Region 和 Zone 的概念,一个 Region 中可以包含多个 Zone,每个服务客户端需要被注册到一个 Zone 中,即每个客户端对应一个 Region和一个Zone。在进行服务调用的时候,优先访问同处一个 Zone 中的服务提供者,访问不到再访问其他 Zone 中的服务提供者。

服务注册中心

失效剔除

有时服务实例并不是正常下线,可能由于内存溢出、网络故障等原因使得服务不能正常工作,而服务注册中心并未收到服务下线的请求。为了从服务列表中将这些无法提供服务的实例剔除,Eureka Server 在启动的时候会创建一个定时任务,定时的(默认为60秒)将清单中超时(超过指定时间没有接收到心跳)的服务剔除出去。



自我保护

Eureka Server 在运行期间,会统计服务实力心跳的次数,在单位时间内的心跳次数设置一定的预期。如果 Eureka Server 开启了自我保护,在服务续约超时的时候,如果单位时间内的心跳次数大于预期,不会自动剔除这些服务实例。当触发了自我保护机制时,日志会打印如下信息:

EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY’RE NOT. RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPIRED JUST TO BE SAFE.



但是,在自我保护的状态下,那么客户端有可能获取到实际已经不存活的服务实例,会出现调用失败的情况,所以客户端必须要有容错机制,比如可以使用请求重试、断路器等。

Eureka 配置

Eureka 服务端

maven

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-server</artifactId>
</dependency>



@EnableEurekaServer 注解启用

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}



配置文件 application.yml

eureka:
instance:
hostname: ${ipAddress}
prefer-ip-address: true
server:
enable-self-preservation: false # 关闭自我保护模式(默认为打开)
eviction-interval-timer-in-ms: 5000 # 续期时间,即扫描失效服务的间隔时间(缺省为60*1000ms)
client:
register-with-eureka: false # 不作为一个客户端注册到注册中心
fetch-registry: false
service-url:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/



常见参数配置

高可用配置

eureka:
client:
register-with-eureka: false # 不作为一个客户端注册到注册中心
fetch-registry: false
service-url: # 多个 Eureka Server 互相注册
defaultZone: http://${eureka.host1}:${server.port}/eureka/,http://${eureka.host1}:${server.port}/eureka/

Eureka 客户端

maven

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>



@EnableDiscoveryClient 注解启用

@SpringBootApplication
@EnableDiscoveryClient
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
}



配置文件 application.yml

eureka:
instance:
# 客户端需要多长时间发送心跳给注册中心,默认30秒
lease-renewal-interval-in-seconds: 5
# 注册中心在接受到实例的最后一次发出的心跳后,等待多久才可以将此实例删除
lease-expiration-duration-in-seconds: 15
client:
serviceUrl:
defaultZone: http://${EUREKA_HOST:10.16.10.35}:${EUREKA_PORT:6200}/eureka/
# 客户端间隔多久去拉取服务器注册信息,默认为30秒
registry-fetch-interval-seconds: 5



常见参数配置

Eureka 源码

Eureka 客户端

初始化

EurekaClientAutoConfiguration

/**
* 配置注入
*/
@Bean
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) {
String hostname = this.getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean.parseBoolean(this.getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = this.getProperty("eureka.instance.ip-address");
// 读取配置 ...
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setInstanceId(IdUtils.getDefaultInstanceId(this.env));
instance.setPreferIpAddress(preferIpAddress);
// Set instance ...
return instance;
}
@Bean
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = (new InstanceInfoFactory()).create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
/**
* 客户端 EurekaClient 注入
*/
@Bean
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}



进入 Client 构造函数,启动了两个核心定时线程负责:1. 刷新本地缓存;2. 发送心跳请求

public class CloudEurekaClient extends DiscoveryClient {
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
// ...
}
}
public class DiscoveryClient implements EurekaClient {
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
});
}
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
// ...
// 启动定时任务
this.initScheduledTasks();
}
private void initScheduledTasks() {
// 定时刷新缓存
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh",
this.scheduler,
this.cacheRefreshExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.CacheRefreshThread()),
(long)renewalIntervalInSecs, TimeUnit.SECONDS);
// 定时发送心跳
this.scheduler.schedule(new TimedSupervisorTask("heartbeat",
this.scheduler,
this.heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.HeartbeatThread()),
(long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
}



刷新缓存线程和心跳线程

class CacheRefreshThread implements Runnable {
public void run() {
// 发送请求到注册中心,拉取最新注册表
DiscoveryClient.this.refreshRegistry();
}
}
class HeartbeatThread implements Runnable {
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

服务注册/续约

boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
// 返回 404 (NOT_FOUND)则调用注册方法 register()
this.register();
}
}



注册方法的入口还有一处在 InstanceInfoReplicator,在配置信息改变的时候去调用 register() 方法

class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
Future next = this.scheduler.schedule(this, (long)initialDelayMs, TimeUnit.SECONDS);
}
public void run() {
// 刷新 instanceInfo
this.discoveryClient.refreshInstanceInfo();
// 发送心跳
Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
this.discoveryClient.register();
this.instanceInfo.unsetIsDirty(dirtyTimestamp);
}
}
}

服务获取

服务获取这里有两种拉取方式,一种是全量拉取,一种是增量拉取。默认配置下,首先执行一次全量获取注册信息进行本地缓存,而后定时增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。

void refreshRegistry() {
// 拉取注册服务
boolean success = this.fetchRegistry(remoteRegionsModified);
if (success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
}
boolean fetchRegistry(boolean forceFullRegistryFetch) {
Applications applications = this.getApplications();
if (!this.clientConfig.shouldDisableDelta()
&& Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress())
&& !forceFullRegistryFetch
&& applications != null
&& applications.getRegisteredApplications().size() != 0
&& applications.getVersion() != -1L) {
// 增量拉取
this.getAndUpdateDelta(applications);
} else {
// 全量拉取
this.getAndStoreFullRegistry();
}
// 设置应用集合哈希码
applications.setAppsHashCode(applications.getReconcileHashCode());
// 广播事件(刷新本地缓存)
this.onCacheRefreshed();
}

此处应用了一致性哈希算法



增量获取注册的实例(Applications)时,会获取到:

  1. Eureka Server 近期变化(注册、下线)的应用集合

  2. Eureka Server 应用集合哈希码



Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,重新向 Eureka Server 全量获取应用集合。

哈希算法
appsHashCode = ${status}_${count}_

使用应用实例状态status和对应数量count拼接出一致性哈希码(若数量为 0,该应用实例状态不进行拼接)。状态以字符串大小排序



举个例子,8 个 UP,0 个 DOWN ,则 appsHashCode = UP_8_;8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP8



增量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = this.eurekaTransport.queryClient.getDelta((String[])this.remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = (Applications)httpResponse.getEntity();
}
if (delta == null) {
// 增量拉取失败,全量拉取
this.getAndStoreFullRegistry();
} else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {
// CAS 处理并发的场景
String reconcileHashCode = "";
// 增量更新
this.updateDelta(delta);
reconcileHashCode = this.getReconcileHashCode(applications);
// 如果本地的应用哈希码,与服务器的应用哈希码不一致,重新全量获取一次
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || this.clientConfig.shouldLogDeltaDiff()) {
this.reconcileAndLogDifference(delta, reconcileHashCode);
}
}
}

服务下线

注入 EurekaClient 时,设置了 destroyMethod,应用正常退出时调度用

@Bean(
destroyMethod = "shutdown"
)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}



class DiscoveryClient implements EurekaClient {
public synchronized void shutdown() {
// 停止定时限线程
this.cancelScheduledTasks();
// 服务下线,通知注册中心
this.unregister();
}
}

Eureka 服务端

Server 初始化

Eureka 是一个 Servlet 应用,它使用 jersey 作为 RESTful 框架来提供 REST 服务。

EurekaServerAutoConfiguration

private static final String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};
/**
* Eureka Server 提供 REST API,Application 负责处理 http 请求
* URL 映射到 Controller
*/
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
// 扫描 EUREKA_PACKAGES 下的 @Path 和 @Provider,功能类似 @Component
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// ...
// 扫描到的 class 封装为 BeanDefinition,加入 Application 返回
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}

接收客户端注册请求

package com.netflix.eureka.resources;
public class ApplicationResource {
public Response addInstance(InstanceInfo info, String isReplication) {
// 校验 instance 信息 ...
this.registry.register(info, "true".equals(isReplication));
}
}



调用注册方法

public void register(final InstanceInfo info, final boolean isReplication) {
// 广播注册事件
this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
public void register(InstanceInfo info, boolean isReplication) {
// 有效时长,客户端设置 eureka.instance.lease-expiration-duration-in-seconds
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 实例注册
super.register(info, leaseDuration, isReplication);
// 同步到其他节点
this.replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}



注册服务存储

// 实例注册表,线程安全的双层 map 结构,第一层 key 是服务名,第二层 key 是 instanceId
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
// 最近变更租约实例队列,循环队列
private final AbstractInstanceRegistry.CircularQueue<Pair<Long, String>> recentRegisteredQueue;
// 实例变更表
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue;
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
// 第一个请求初始化 registry ...
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
if (existingLease != null && existingLease.getHolder() != null) {
// 实例已经注册过
Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
// 当前注册实例的时间戳早于已经注册的实例的时间戳,使用已经注册的实例信息
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = (InstanceInfo)existingLease.getHolder();
}
} else {
synchronized(this.lock) {
// 记录客户端注册次数,用于统计单位时间内的注册频率(如果设置了)
if (this.expectedNumberOfClientsSendingRenews > 0) {
++this.expectedNumberOfClientsSendingRenews;
this.updateRenewsPerMinThreshold();
}
}
}
Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
((Map)gMap).put(registrant.getId(), lease);
synchronized(this.recentRegisteredQueue) {
// 租约入队列
this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// 变更租约入队列
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
}

接收客户端续约请求

package com.netflix.eureka.resources;
public class InstanceResource {
public Response renewLease(String isReplication, String overriddenStatus, String status, String lastDirtyTimestamp) {
this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
}
}



调用续约方法

public boolean renew(String appName, String id, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = (Lease)gMap.get(id);
}
leaseToRenew.renew(); // 更新 lastUpdateTimestamp
}

接收客户端下线请求

package com.netflix.eureka.resources;
public class InstanceResource {
public Response cancelLease(String isReplication) {
this.registry.cancel(this.app.getName(), this.id, "true".equals(isReplication));
}
}



调用下线方法

// 最近下线列表
private final AbstractInstanceRegistry.CircularQueue<Pair<Long, String>> recentCanceledQueue;
protected boolean internalCancel(String appName, String id, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 移除实例
leaseToCancel = (Lease)gMap.remove(id);
}
// 加入下线列表
synchronized(this.recentCanceledQueue) {
this.recentCanceledQueue.add(new Pair(System.currentTimeMillis(), appName + "(" + id + ")"));
}
leaseToCancel.cancel(); // 更新 evictionTimestamp
// 加入最近变更队列
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
}

服务剔除

当客户端长时间没有给服务端发送请求的时候,就说明客户端 DOWN 了。Eureka Server 会启动一个定时线程进行检查,启动路径:

EurekaServerInitializerConfiguration#start()
-> EurekaServerBootstrap#initEurekaServerContext()
-> InstanceRegistry#openForTraffic()
-> AbstractInstanceRegistry#postInit()
void postInit() {
this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask()); // 定时器
this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(),
this.serverConfig.getEvictionIntervalTimerInMs(),
this.serverConfig.getEvictionIntervalTimerInMs());
}



定时器执行逻辑

public void evict(long additionalLeaseMs) {
// 判断是否可以剔除实例,自我保护机制
if (!this.isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
} else {
// 过期租约列表
List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
Iterator var4 = this.registry.entrySet().iterator();
// 下面的代码逻辑有点奇怪,标记了真正的执行顺序
while(true) {
Map leaseMap;
do {
// 假设 registry 是有值的,第一次不会进入 if...
// 循环n次以后,注册表都遍历完,进入 if...
if (!var4.hasNext()) {
// 【n】注册表已经全部加入 leaseMap,并且过期租约已经加入 expiredLeases
int registrySize = (int)this.getLocalRegistrySize();
int registrySizeThreshold = (int)((double)registrySize
* this.serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
// 【n+1】假设 expiredLeases 有值,并且 evictionLimit >0(getRenewalPercentThreshold<1)
for(int i = 0; i < toEvict; ++i) {
// 随机选择一个过期的 lease
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
String appName = ((InstanceInfo)lease.getHolder()).getAppName();
String id = ((InstanceInfo)lease.getHolder()).getId();
// 调度用下线入口
this.internalCancel(appName, id, false);
}
}
// 【n+1】如果没有过期的对象,退出方法
return;
}
// 【0】leaseMap 初始化
Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
leaseMap = (Map)groupEntry.getValue();
// 【1】退出 do...while
} while(leaseMap == null);
// 【2】找到过期租约,加入 expiredLeases
Iterator var7 = leaseMap.entrySet().iterator();
while(var7.hasNext()) {
Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
// 判断过期
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
}

自我保护

在剔除服务时,判断服务端是否开启了自我保护

// 期望最小每分钟续租次数
protected volatile int numberOfRenewsPerMinThreshold;
// 每次 Renew/下线都会调用,更新 numberOfRenewsPerMinThreshold
// 计算公式:服务总数
// * (期望)每分钟续约数(eureka.server.expected-client-renewal-interval-seconds)
// * 自我保护续约百分比阀值因子(eureka.server.renewal-percent-threshold)
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfClientsSendingRenews
* (60.0D / (double)this.serverConfig.getExpectedClientRenewalIntervalSeconds())
* this.serverConfig.getRenewalPercentThreshold());
}
public boolean isLeaseExpirationEnabled() {
// 默认打开自我保护
if (!this.isSelfPreservationModeEnabled()) {
// 自我保护关闭, 可以剔除实例
return true;
} else {
return this.numberOfRenewsPerMinThreshold > 0
// 上一分钟的续约数 > 期望值
&& this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
}
}

服务信息同步

在服务注册/续约、服务剔除修改完本地数据之后会调用 PeerAwareInstanceRegistryImpl#replicateToPeers()方法,信息同步到其他服务器

private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
if (this.peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();
while(var8.hasNext()) {
PeerEurekaNode node = (PeerEurekaNode)var8.next();
if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
// 排除自身
this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
}
}
private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
InstanceInfo infoFromRegistry = null;
// 同步给其他节点
switch(action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
// ...
}
}



这里的 isReplication 参数控制 Eureka Server 集群二次传播注册信息。当客服端发送请求时 isReplication=false,当请求是从 Eureka Server 发出时 isReplication=true



通常会配置多个 Eureka Server ,假设当前有服务器 eureka-A、eureka-B、eureka-C。如果 Eureka A 注册到 B,B 注册到 C,那么当服务向 A 注册时,B 中会有该服务的注册信息,但是 C 中是没有的。如果希望只向一台 Eureka Server 注册,并且其它所有实例都能得到注册信息,那么就必须把所有节点都互相注册。

接收客户端拉取请求

全量拉取
public class ApplicationsResource {
public Response getContainers(String version, String acceptHeader, String acceptEncoding, String eurekaAccept, UriInfo uriInfo, String regionsStr) {
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
// 通过 `ResponseCache(ResponseCacheImpl)` 返回服务注册表
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
}



多级缓存

Eureka Server 为了避免同时读写内存数据结构造成的并发冲突问题,采用了多级缓存机制来进一步提升服务请求的响应速度



拉取注册表的操作:

  1. 首先从 ReadOnlyCacheMap 里查缓存的注册表。

  2. 若没有,就找 ReadWriteCacheMap 里缓存的注册表。

  3. 如果还没有,就从内存中获取实际的注册表数据。



注册表发生变更的操作:

  1. 内存中更新注册表,同时过期掉 ReadWriteCacheMap,此时不会影响 ReadOnlyCacheMap。

  2. 在一段时间内(默认30秒),各服务拉取注册表会直接读 ReadOnlyCacheMap。

  3. 30秒过后,后台线程发现 ReadWriteCacheMap 已经清空了,也会清空 ReadOnlyCacheMap 中的缓存。

  4. 下次有服务拉取注册表,从内存中获取最新的数据了,同时填充各个缓存。



ResponseCache(ResponseCacheImpl) 的内部实现

class ResponseCacheImpl {
// ReadOnlyCacheMap ConcurrentHashMap 实现
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();
// ReadWriteCacheMap GuavaCache 实现
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
// 启用只读缓存
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
// 更新只读缓存间隔
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap = CacheBuilder.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, ResponseCacheImpl.Value>(){
// ... 监听移出 key
})
.build(new CacheLoader<Key, ResponseCacheImpl.Value>() {
//... 不存在的 key 加载 value 方法
});
if (this.shouldUseReadOnlyResponseCache) {
// 定时更新 ReadOnlyCacheMap
this.timer.schedule(this.getCacheUpdateTask(), new Date(), responseCacheUpdateIntervalMs);
}
}
public String get(Key key) {
return this.get(key, this.shouldUseReadOnlyResponseCache);
}
String get(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = this.getValue(key, useReadOnlyCache);
return payload != null && !payload.getPayload().equals("") ? payload.getPayload() : null;
}
ResponseCacheImpl.Value getValue(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;
if (useReadOnlyCache) {
// 从只读缓存中获取注册信息
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
// 只读缓存不存在便从读写缓存中获取信息
// 读写缓存使用 GuavaCache 实现,如果 key 不存在,会执行逻辑加载到缓存中
// 同时更新到只读缓存
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
this.readOnlyCacheMap.put(key, payload);
}
} else {
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
return payload;
}
}
增量拉取

与全量拉取的 cacheKey 不同,影响读写缓存的 load 方法不同,增量拉取从最近变更租约队列中获取,全量拉取从注册表获取。

public class ApplicationsResource {
public Response getContainerDifferential(String version, String acceptHeader, String acceptEncoding, String eurekaAccept, UriInfo uriInfo, String regionsStr) {
// 通过 `ResponseCache(ResponseCacheImpl)` 返回服务注册表
Key cacheKey = new Key(EntityType.Application, "ALL_APPS_DELTA", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
}

其他后台线程

最近租约变更队列清理

后台任务定时顺序扫描队列 recentlyChangedQueue,当 lastUpdateTime 超过一定时长后进行移除。

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
// delta-retention-timer-interval-in-ms 执行频率,默认值:30*1000 毫秒
this.deltaRetentionTimer.schedule(this.getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
public void run() {
Iterator it = AbstractInstanceRegistry.this.recentlyChangedQueue.iterator();
// retention-time-in-ms-in-delta-queue 保留租约变更记录过期时长,默认值:3*60*1000 毫秒。
while(it.hasNext() &&
it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
}
}
};
}
}

Eureka 视图



发布于: 2020 年 06 月 28 日 阅读数: 78
用户头像

Alex🐒

关注

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
【源码系列】Spring Cloud Eureka