Spring Cloud Eureka
Spring Cloud Eureka 实现了服务的注册与发现, 同时包含了服务端和客户端组件,均采用 Java 编写。
Eureka 服务端,也称为服务注册中心,支持高可用配置,依托于强一致性提供良好的服务实例可用性,可以应对多种不同的故障场景。
Eureka 客服端,处理服务的注册与发现。在应用程序运行时,Eureka 客户端向注册中心注册自身提供的服务并定期性的发送心跳来更新服务租约。同时,也从服务端查询当前注册的服务信息并更新本地缓存。
Eureka 架构体系
Eureka 架构中有三个角色:
服务注册中心,Eureka 提供的服务端,提供服务注册与发现的功能
服务提供者,集成了 Eureka 客户端,提供服务的应用,遵循 Eureka 通信机制,将自己提供的服务注册到 Eureka,以供其他应用发现
服务消费者,集成了 Eureka 客户端,从服务注册中心获取服务列表,能够调用服务提供者的接口。服务提供者和服务消费者只是根据服务调用方向的逻辑定义。
服务提供者
服务注册
服务提供者在启动的时候会通过发送 REST 请求将自己注册到 Eureka Server,同时携带自身服务的一些元数据信息。Eureka Server 将元数据进行存储。Eureka Server 之间可以互相注册。
服务同步
不同的服务提供者可能会注册到不同的 Eureka Server 上。当服务提供者发送注册请求到一个 Eureka Servcer 时,会将该请求转发给其他的 Eureka Server(需要注册到当前节点),实现注册中心之间的服务同步,保证服务提供者的信息就可以通过任意 Eureka Server 获取到。
服务续约
在注册完服务之后,服务提供者会维护一个心跳用来持续通知 Eureka Server 服务存活状态(Renew),防止 Eureka Server 的将该服务实例从服务列表中排除出去(会定时监测未存活的实例,进行清除)。
服务下线
在系统运行过程中会面临关闭或重启服务的某个实例的情况,在服务关闭期间,不希望客户端会继续调用关闭了的实例。所以在客户端程序中,当服务实例进行正常的关闭操作时,会发送一个服务下线的 REST 请求给 Eureka Server,在接收到请求之后,将该服务状态设置为下线(DOWN),并转发给其他的 Eureka Server。
服务消费者
获取服务
服务消费者在启动的时会发送 REST 请求给任意 Eureka Server 获取注册的服务清单。为了性能考虑,Eureka Server 会维护一份只读的服务清单来返回给客户端(会定时更新)。
服务调用
服务消费者在获取服务清单后,通过服务名可以获得具体提供服务的实例名和该实例的元数据信息。因为有这些服务实例的详细信息,所以客户端可以根据自己的需要决定具体需要调用的实例,例如,如果使用 Ribbon,会默认采用轮询的方式进行调用。
对于访问实例的选择,Eureka 中有 Region 和 Zone 的概念,一个 Region 中可以包含多个 Zone,每个服务客户端需要被注册到一个 Zone 中,即每个客户端对应一个 Region和一个Zone。在进行服务调用的时候,优先访问同处一个 Zone 中的服务提供者,访问不到再访问其他 Zone 中的服务提供者。
服务注册中心
失效剔除
有时服务实例并不是正常下线,可能由于内存溢出、网络故障等原因使得服务不能正常工作,而服务注册中心并未收到服务下线的请求。为了从服务列表中将这些无法提供服务的实例剔除,Eureka Server 在启动的时候会创建一个定时任务,定时的(默认为60秒)将清单中超时(超过指定时间没有接收到心跳)的服务剔除出去。
自我保护
Eureka Server 在运行期间,会统计服务实力心跳的次数,在单位时间内的心跳次数设置一定的预期。如果 Eureka Server 开启了自我保护,在服务续约超时的时候,如果单位时间内的心跳次数大于预期,不会自动剔除这些服务实例。当触发了自我保护机制时,日志会打印如下信息:
EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY’RE NOT. RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPIRED JUST TO BE SAFE.
但是,在自我保护的状态下,那么客户端有可能获取到实际已经不存活的服务实例,会出现调用失败的情况,所以客户端必须要有容错机制,比如可以使用请求重试、断路器等。
Eureka 配置
Eureka 服务端
maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-server</artifactId>
</dependency>
@EnableEurekaServer
注解启用
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
配置文件 application.yml
eureka:
instance:
hostname: ${ipAddress}
prefer-ip-address: true
server:
enable-self-preservation: false
eviction-interval-timer-in-ms: 5000
client:
register-with-eureka: false
fetch-registry: false
service-url:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
常见参数配置
高可用配置
eureka:
client:
register-with-eureka: false
fetch-registry: false
service-url:
defaultZone: http://${eureka.host1}:${server.port}/eureka/,http://${eureka.host1}:${server.port}/eureka/
Eureka 客户端
maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
@EnableDiscoveryClient
注解启用
@SpringBootApplication
@EnableDiscoveryClient
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
}
配置文件 application.yml
eureka:
instance:
lease-renewal-interval-in-seconds: 5
lease-expiration-duration-in-seconds: 15
client:
serviceUrl:
defaultZone: http://${EUREKA_HOST:10.16.10.35}:${EUREKA_PORT:6200}/eureka/
registry-fetch-interval-seconds: 5
常见参数配置
Eureka 源码
Eureka 客户端
初始化
EurekaClientAutoConfiguration
* 配置注入
*/
@Bean
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) {
String hostname = this.getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean.parseBoolean(this.getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = this.getProperty("eureka.instance.ip-address");
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setInstanceId(IdUtils.getDefaultInstanceId(this.env));
instance.setPreferIpAddress(preferIpAddress);
return instance;
}
@Bean
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = (new InstanceInfoFactory()).create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
* 客户端 EurekaClient 注入
*/
@Bean
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
进入 Client 构造函数,启动了两个核心定时线程负责:1. 刷新本地缓存;2. 发送心跳请求
public class CloudEurekaClient extends DiscoveryClient {
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
}
}
public class DiscoveryClient implements EurekaClient {
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
});
}
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
this.initScheduledTasks();
}
private void initScheduledTasks() {
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh",
this.scheduler,
this.cacheRefreshExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.CacheRefreshThread()),
(long)renewalIntervalInSecs, TimeUnit.SECONDS);
this.scheduler.schedule(new TimedSupervisorTask("heartbeat",
this.scheduler,
this.heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new DiscoveryClient.HeartbeatThread()),
(long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
}
刷新缓存线程和心跳线程
class CacheRefreshThread implements Runnable {
public void run() {
DiscoveryClient.this.refreshRegistry();
}
}
class HeartbeatThread implements Runnable {
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
服务注册/续约
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
this.register();
}
}
注册方法的入口还有一处在 InstanceInfoReplicator
,在配置信息改变的时候去调用 register()
方法
class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
Future next = this.scheduler.schedule(this, (long)initialDelayMs, TimeUnit.SECONDS);
}
public void run() {
this.discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
this.discoveryClient.register();
this.instanceInfo.unsetIsDirty(dirtyTimestamp);
}
}
}
服务获取
服务获取这里有两种拉取方式,一种是全量拉取,一种是增量拉取。默认配置下,首先执行一次全量获取注册信息进行本地缓存,而后定时增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。
void refreshRegistry() {
boolean success = this.fetchRegistry(remoteRegionsModified);
if (success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
}
boolean fetchRegistry(boolean forceFullRegistryFetch) {
Applications applications = this.getApplications();
if (!this.clientConfig.shouldDisableDelta()
&& Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress())
&& !forceFullRegistryFetch
&& applications != null
&& applications.getRegisteredApplications().size() != 0
&& applications.getVersion() != -1L) {
this.getAndUpdateDelta(applications);
} else {
this.getAndStoreFullRegistry();
}
applications.setAppsHashCode(applications.getReconcileHashCode());
this.onCacheRefreshed();
}
此处应用了一致性哈希算法
增量获取注册的实例(Applications)时,会获取到:
Eureka Server 近期变化(注册、下线)的应用集合
Eureka Server 应用集合哈希码
Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,重新向 Eureka Server 全量获取应用集合。
哈希算法
appsHashCode = ${status}_${count}_
使用应用实例状态status
和对应数量count
拼接出一致性哈希码(若数量为 0,该应用实例状态不进行拼接)。状态以字符串大小排序。
举个例子,8 个 UP,0 个 DOWN ,则 appsHashCode = UP_8_
;8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP8
。
增量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = this.eurekaTransport.queryClient.getDelta((String[])this.remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = (Applications)httpResponse.getEntity();
}
if (delta == null) {
this.getAndStoreFullRegistry();
} else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {
String reconcileHashCode = "";
this.updateDelta(delta);
reconcileHashCode = this.getReconcileHashCode(applications);
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || this.clientConfig.shouldLogDeltaDiff()) {
this.reconcileAndLogDifference(delta, reconcileHashCode);
}
}
}
服务下线
注入 EurekaClient 时,设置了 destroyMethod
,应用正常退出时调度用
@Bean(
destroyMethod = "shutdown"
)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
class DiscoveryClient implements EurekaClient {
public synchronized void shutdown() {
this.cancelScheduledTasks();
this.unregister();
}
}
Eureka 服务端
Server 初始化
Eureka 是一个 Servlet 应用,它使用 jersey
作为 RESTful
框架来提供 REST 服务。
EurekaServerAutoConfiguration
private static final String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};
* Eureka Server 提供 REST API,Application 负责处理 http 请求
* URL 映射到 Controller
*/
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// ...
// 扫描到的 class 封装为 BeanDefinition,加入 Application 返回
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
接收客户端注册请求
package com.netflix.eureka.resources;
public class ApplicationResource {
public Response addInstance(InstanceInfo info, String isReplication) {
this.registry.register(info, "true".equals(isReplication));
}
}
调用注册方法
public void register(final InstanceInfo info, final boolean isReplication) {
this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
this.replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
注册服务存储
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
private final AbstractInstanceRegistry.CircularQueue<Pair<Long, String>> recentRegisteredQueue;
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue;
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
if (existingLease != null && existingLease.getHolder() != null) {
Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = (InstanceInfo)existingLease.getHolder();
}
} else {
synchronized(this.lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
++this.expectedNumberOfClientsSendingRenews;
this.updateRenewsPerMinThreshold();
}
}
}
Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
((Map)gMap).put(registrant.getId(), lease);
synchronized(this.recentRegisteredQueue) {
this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")"));
}
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
}
接收客户端续约请求
package com.netflix.eureka.resources;
public class InstanceResource {
public Response renewLease(String isReplication, String overriddenStatus, String status, String lastDirtyTimestamp) {
this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
}
}
调用续约方法
public boolean renew(String appName, String id, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = (Lease)gMap.get(id);
}
leaseToRenew.renew();
}
接收客户端下线请求
package com.netflix.eureka.resources;
public class InstanceResource {
public Response cancelLease(String isReplication) {
this.registry.cancel(this.app.getName(), this.id, "true".equals(isReplication));
}
}
调用下线方法
private final AbstractInstanceRegistry.CircularQueue<Pair<Long, String>> recentCanceledQueue;
protected boolean internalCancel(String appName, String id, boolean isReplication) {
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = (Lease)gMap.remove(id);
}
synchronized(this.recentCanceledQueue) {
this.recentCanceledQueue.add(new Pair(System.currentTimeMillis(), appName + "(" + id + ")"));
}
leaseToCancel.cancel();
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
}
服务剔除
当客户端长时间没有给服务端发送请求的时候,就说明客户端 DOWN 了。Eureka Server 会启动一个定时线程进行检查,启动路径:
EurekaServerInitializerConfiguration#start()
-> EurekaServerBootstrap#initEurekaServerContext()
-> InstanceRegistry#openForTraffic()
-> AbstractInstanceRegistry#postInit()
void postInit() {
this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask()); // 定时器
this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(),
this.serverConfig.getEvictionIntervalTimerInMs(),
this.serverConfig.getEvictionIntervalTimerInMs());
}
定时器执行逻辑
public void evict(long additionalLeaseMs) {
if (!this.isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
} else {
List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
Iterator var4 = this.registry.entrySet().iterator();
while(true) {
Map leaseMap;
do {
if (!var4.hasNext()) {
int registrySize = (int)this.getLocalRegistrySize();
int registrySizeThreshold = (int)((double)registrySize
* this.serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
for(int i = 0; i < toEvict; ++i) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
String appName = ((InstanceInfo)lease.getHolder()).getAppName();
String id = ((InstanceInfo)lease.getHolder()).getId();
this.internalCancel(appName, id, false);
}
}
return;
}
Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
leaseMap = (Map)groupEntry.getValue();
} while(leaseMap == null);
Iterator var7 = leaseMap.entrySet().iterator();
while(var7.hasNext()) {
Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
}
自我保护
在剔除服务时,判断服务端是否开启了自我保护
protected volatile int numberOfRenewsPerMinThreshold;
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfClientsSendingRenews
* (60.0D / (double)this.serverConfig.getExpectedClientRenewalIntervalSeconds())
* this.serverConfig.getRenewalPercentThreshold());
}
public boolean isLeaseExpirationEnabled() {
if (!this.isSelfPreservationModeEnabled()) {
return true;
} else {
return this.numberOfRenewsPerMinThreshold > 0
&& this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
}
}
服务信息同步
在服务注册/续约、服务剔除修改完本地数据之后会调用 PeerAwareInstanceRegistryImpl#replicateToPeers()
方法,信息同步到其他服务器
private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
if (this.peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();
while(var8.hasNext()) {
PeerEurekaNode node = (PeerEurekaNode)var8.next();
if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
}
}
private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
InstanceInfo infoFromRegistry = null;
switch(action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
}
}
这里的 isReplication
参数控制 Eureka Server 集群二次传播注册信息。当客服端发送请求时 isReplication=false
,当请求是从 Eureka Server 发出时 isReplication=true
。
通常会配置多个 Eureka Server ,假设当前有服务器 eureka-A、eureka-B、eureka-C。如果 Eureka A 注册到 B,B 注册到 C,那么当服务向 A 注册时,B 中会有该服务的注册信息,但是 C 中是没有的。如果希望只向一台 Eureka Server 注册,并且其它所有实例都能得到注册信息,那么就必须把所有节点都互相注册。
接收客户端拉取请求
全量拉取
public class ApplicationsResource {
public Response getContainers(String version, String acceptHeader, String acceptEncoding, String eurekaAccept, UriInfo uriInfo, String regionsStr) {
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
}
多级缓存
Eureka Server 为了避免同时读写内存数据结构造成的并发冲突问题,采用了多级缓存机制来进一步提升服务请求的响应速度。
拉取注册表的操作:
首先从 ReadOnlyCacheMap 里查缓存的注册表。
若没有,就找 ReadWriteCacheMap 里缓存的注册表。
如果还没有,就从内存中获取实际的注册表数据。
注册表发生变更的操作:
内存中更新注册表,同时过期掉 ReadWriteCacheMap,此时不会影响 ReadOnlyCacheMap。
在一段时间内(默认30秒),各服务拉取注册表会直接读 ReadOnlyCacheMap。
30秒过后,后台线程发现 ReadWriteCacheMap 已经清空了,也会清空 ReadOnlyCacheMap 中的缓存。
下次有服务拉取注册表,从内存中获取最新的数据了,同时填充各个缓存。
ResponseCache(ResponseCacheImpl)
的内部实现
class ResponseCacheImpl {
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap = CacheBuilder.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, ResponseCacheImpl.Value>(){
})
.build(new CacheLoader<Key, ResponseCacheImpl.Value>() {
});
if (this.shouldUseReadOnlyResponseCache) {
this.timer.schedule(this.getCacheUpdateTask(), new Date(), responseCacheUpdateIntervalMs);
}
}
public String get(Key key) {
return this.get(key, this.shouldUseReadOnlyResponseCache);
}
String get(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = this.getValue(key, useReadOnlyCache);
return payload != null && !payload.getPayload().equals("") ? payload.getPayload() : null;
}
ResponseCacheImpl.Value getValue(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;
if (useReadOnlyCache) {
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
this.readOnlyCacheMap.put(key, payload);
}
} else {
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
return payload;
}
}
增量拉取
与全量拉取的 cacheKey
不同,影响读写缓存的 load
方法不同,增量拉取从最近变更租约队列中获取,全量拉取从注册表获取。
public class ApplicationsResource {
public Response getContainerDifferential(String version, String acceptHeader, String acceptEncoding, String eurekaAccept, UriInfo uriInfo, String regionsStr) {
Key cacheKey = new Key(EntityType.Application, "ALL_APPS_DELTA", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
}
其他后台线程
最近租约变更队列清理
后台任务定时顺序扫描队列 recentlyChangedQueue
,当 lastUpdateTime
超过一定时长后进行移除。
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.deltaRetentionTimer.schedule(this.getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
public void run() {
Iterator it = AbstractInstanceRegistry.this.recentlyChangedQueue.iterator();
while(it.hasNext() &&
it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
}
}
};
}
}
Eureka 视图
评论