元数据
除了普通的基础设置之外,eureka 支持自定义元数据。配置方式如下
eureka: instance: metadata-map: cluster: cl1 name: zhaozhen
复制代码
获取元数据代码
List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume"); ServiceInstance serviceInstance = list.get(0); list.stream().forEach(s->{ System.out.println(s.getMetadata()); });
复制代码
在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行
可用性
从技术网站上搜到的一个面试题就有这样的问题:eureka 怎么保证可用性.众所周知,eureka 采用的是 AP 模式,实现高可用最好的方式就是利用最少三台 eureke server 实例,实现两两之间的服务注册。从而达到同步数据的目的那么这就涉及到如下的方面
eureka client 和 eureka server 之间如何进行通信
eureka 注册在客户端和服务端分别怎么操作实现可用性的
eureka 续约/心跳在客户端和服务端分别怎么操作实现可用性的
eureka 下线是怎么操作的
eureka client 和 eureka server 之间如何进行通信
通过查询各种资料并追踪自动配置类发现,eureka 和 eureka 之间的通信是采用类似 springmvc 的 Jersey 框架暴露接口进行通信的。通信的形式基本类似于我们使用 http 进行请求的方式。在 EurekaServerAutoConfiguration 中通过注入 FilterRegistrationBean 实现了在 filter 中加入包含了指定包名下的所有的 Jersey 的外部接口
/** * Register the Jersey filter */ @Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean; }
/** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment);
// Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } }
// Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures);
return rc; }
复制代码
代码中扫描的 EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery","com.netflix.eureka" };)即是 Jersey 框架的具体的接口类
另外可以提一点的就是,eureka 对外暴露的 dashboard 依然采用的是 springmvc 的 controller 形式。具体的可以看到在 EurekaServerAutoConfiguration 中注入的 EurekaController
@Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); }
复制代码
感兴趣的可以再研究下后续 EurekaController 的内部实现
eureka 注册在客户端和服务端分别怎么操作实现可用性的
服务每隔 30 秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在 90 秒后到期,然后服务会被失效。每隔 30 秒的续约操作我们称之为⼼跳检测首先在服务端,通过上述的 Jersey 框架暴露的接口进行注册,在 ApplicationResource 中通过 addInstance 进行注册,在这个过程中另一个 eureka server 也相当于是一个 eureka client,同样会进行注册
通过 addInstance 中的 register 方法,一直向下调试到 PeerAwareInstanceRegistryImpl 的 replicateInstanceActionsToPeers 相互注册方法
/** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
复制代码
此时,注册时,进入的是 Register
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
复制代码
查阅源码可知此处的 getLeaseRenewalOf(info)的默认值为 90 秒,这就印证了 90 秒到期的说法
private static int getLeaseRenewalOf(InstanceInfo info) { return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000; }
复制代码
发起请求
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
复制代码
发起请求的地址可以追踪到的是 ApplicationsResource 中的
@Path("{appId}") public ApplicationResource getApplicationResource( @PathParam("version") String version, @PathParam("appId") String appId) { CurrentRequestVersion.set(Version.toEnum(version)); return new ApplicationResource(appId, serverConfig, registry); }
复制代码
此处重新构建了一个 ApplicationResource 对象。并将服务的信息配置等传递到 application 中,等待后续使用分析完这一段之后,我对 addInstance 如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过 EurekaServerAutoConfiguration 引入的 EurekaServerInitializerConfiguration 来完成的,
@Configurationpublic class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {}
复制代码
EurekaServerInitializerConfiguration 实现了 SmartLifecycle 方法,start 方法会再容器初始化时执行。而 start 方法的内容
@Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
复制代码
具体的业务内容在
public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
复制代码
第一步 initEurekaEnvironment 为初始化环境,第二步 initEurekaServerContext 为业务操作而随后的操作中最主要的是
int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics. EurekaMonitors.registerAllStats();
复制代码
openForTraffic 中主要是为开启服务通信做准备
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
复制代码
引发向 addIntsance 发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件其中就有向 addInstance 发起请求的
public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status); if (next == null) { return; }
InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
复制代码
DiscoveryClient 类内部
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
复制代码
指向
public void run() { try { discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
复制代码
最后即是向 addInstance 发起请求的地方
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
复制代码
发起请求即是向 ApplicationResource 的 Instance 方法发起。
eureka 续约在客户端和服务端分别怎么操作实现可用性的
从上面注册中可推测出续约/心跳接口可能也是在 DiscoveryClient 中完成的。搜索 HeatBeat 之后发现注入 DiscoveryClient 方法中有一个初始化定时任务的方法
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); }
if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
复制代码
其中就有心跳的定时任务。默认的心跳间隔时间 renewalIntervalInSecs 为 30 秒
/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable {
public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
复制代码
renew 方法中即是向服务端发起调用的过程,与上述注册基本相同
Eureka 下线服务
Eureka 下线是在 EurekaClientAutoConfiguration 中注入 EurekaClient 时定义的 shutDown 方法。我们可以看到
@PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); }
cancelScheduledTasks();
// If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); }
if (eurekaTransport != null) { eurekaTransport.shutdown(); }
heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient"); } }
复制代码
同样的。执行了一个取消定时任务的状态。。另外利用上面说的 applicationInfoManager.setInstanceStatus()方法进行了事件通知,另外 unregister();进行了取消注册操作。eurekaTransport.shutdown();关闭传输。
Eureka 的功能特性总体上来说就是这样。有些地方可能还是不够清楚。欢迎大家一起沟通探讨
评论