写点什么

Eureka 可用性解读

用户头像
赵镇
关注
发布于: 2021 年 07 月 12 日

元数据

除了普通的基础设置之外,eureka 支持自定义元数据。配置方式如下


eureka:  instance:       metadata-map:              cluster: cl1              name: zhaozhen
复制代码


获取元数据代码


        List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume");        ServiceInstance serviceInstance = list.get(0);        list.stream().forEach(s->{            System.out.println(s.getMetadata());        });
复制代码


在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行


可用性

从技术网站上搜到的一个面试题就有这样的问题:eureka 怎么保证可用性.众所周知,eureka 采用的是 AP 模式,实现高可用最好的方式就是利用最少三台 eureke server 实例,实现两两之间的服务注册。从而达到同步数据的目的那么这就涉及到如下的方面


  • eureka client 和 eureka server 之间如何进行通信

  • eureka 注册在客户端和服务端分别怎么操作实现可用性的

  • eureka 续约/心跳在客户端和服务端分别怎么操作实现可用性的

  • eureka 下线是怎么操作的

eureka client 和 eureka server 之间如何进行通信

通过查询各种资料并追踪自动配置类发现,eureka 和 eureka 之间的通信是采用类似 springmvc 的 Jersey 框架暴露接口进行通信的。通信的形式基本类似于我们使用 http 进行请求的方式。在 EurekaServerAutoConfiguration 中通过注入 FilterRegistrationBean 实现了在 filter 中加入包含了指定包名下的所有的 Jersey 的外部接口


/**   * Register the Jersey filter   */  @Bean  public FilterRegistrationBean jerseyFilterRegistration(      javax.ws.rs.core.Application eurekaJerseyApp) {    FilterRegistrationBean bean = new FilterRegistrationBean();    bean.setFilter(new ServletContainer(eurekaJerseyApp));    bean.setOrder(Ordered.LOWEST_PRECEDENCE);    bean.setUrlPatterns(        Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean; }
/** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment);
// Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } }
// Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures);
return rc; }
复制代码


代码中扫描的 EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery","com.netflix.eureka" };)即是 Jersey 框架的具体的接口类



另外可以提一点的就是,eureka 对外暴露的 dashboard 依然采用的是 springmvc 的 controller 形式。具体的可以看到在 EurekaServerAutoConfiguration 中注入的 EurekaController


  @Bean  @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)  public EurekaController eurekaController() {    return new EurekaController(this.applicationInfoManager);  }
复制代码


感兴趣的可以再研究下后续 EurekaController 的内部实现

eureka 注册在客户端和服务端分别怎么操作实现可用性的

服务每隔 30 秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在 90 秒后到期,然后服务会被失效。每隔 30 秒的续约操作我们称之为⼼跳检测首先在服务端,通过上述的 Jersey 框架暴露的接口进行注册,在 ApplicationResource 中通过 addInstance 进行注册,在这个过程中另一个 eureka server 也相当于是一个 eureka client,同样会进行注册



通过 addInstance 中的 register 方法,一直向下调试到 PeerAwareInstanceRegistryImpl 的 replicateInstanceActionsToPeers 相互注册方法


   /**     * Replicates all instance changes to peer eureka nodes except for     * replication traffic to this node.     *     */    private void replicateInstanceActionsToPeers(Action action, String appName,                                                 String id, InstanceInfo info, InstanceStatus newStatus,                                                 PeerEurekaNode node) {        try {            InstanceInfo infoFromRegistry = null;            CurrentRequestVersion.set(Version.V2);            switch (action) {                case Cancel:                    node.cancel(appName, id);                    break;                case Heartbeat:                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);                    break;                case Register:                    node.register(info);                    break;                case StatusUpdate:                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);                    break;                case DeleteStatusOverride:                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);                    node.deleteStatusOverride(appName, id, infoFromRegistry);                    break;            }        } catch (Throwable t) {            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);        }    }
复制代码


此时,注册时,进入的是 Register


    public void register(final InstanceInfo info) throws Exception {        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);        batchingDispatcher.process(                taskId("register", info),                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {                    public EurekaHttpResponse<Void> execute() {                        return replicationClient.register(info);                    }                },                expiryTime        );    }
复制代码


查阅源码可知此处的 getLeaseRenewalOf(info)的默认值为 90 秒,这就印证了 90 秒到期的说法


    private static int getLeaseRenewalOf(InstanceInfo info) {        return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;    }
复制代码


发起请求


    @Override    public EurekaHttpResponse<Void> register(InstanceInfo info) {        String urlPath = "apps/" + info.getAppName();        ClientResponse response = null;        try {            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();            addExtraHeaders(resourceBuilder);            response = resourceBuilder                    .header("Accept-Encoding", "gzip")                    .type(MediaType.APPLICATION_JSON_TYPE)                    .accept(MediaType.APPLICATION_JSON)                    .post(ClientResponse.class, info);            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();        } finally {            if (logger.isDebugEnabled()) {                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),                        response == null ? "N/A" : response.getStatus());            }            if (response != null) {                response.close();            }        }    }
复制代码


发起请求的地址可以追踪到的是 ApplicationsResource 中的


    @Path("{appId}")    public ApplicationResource getApplicationResource(            @PathParam("version") String version,            @PathParam("appId") String appId) {        CurrentRequestVersion.set(Version.toEnum(version));        return new ApplicationResource(appId, serverConfig, registry);    }
复制代码


此处重新构建了一个 ApplicationResource 对象。并将服务的信息配置等传递到 application 中,等待后续使用分析完这一段之后,我对 addInstance 如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过 EurekaServerAutoConfiguration 引入的 EurekaServerInitializerConfiguration 来完成的,


@Configurationpublic class EurekaServerInitializerConfiguration    implements ServletContextAware, SmartLifecycle, Ordered {}
复制代码


EurekaServerInitializerConfiguration 实现了 SmartLifecycle 方法,start 方法会再容器初始化时执行。而 start 方法的内容


@Override  public void start() {    new Thread(new Runnable() {      @Override      public void run() {        try {          //TODO: is this class even needed now?          eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);          log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
复制代码


具体的业务内容在


  public void contextInitialized(ServletContext context) {    try {      initEurekaEnvironment();      initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
复制代码


第一步 initEurekaEnvironment 为初始化环境,第二步 initEurekaServerContext 为业务操作而随后的操作中最主要的是


    int registryCount = this.registry.syncUp();    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics. EurekaMonitors.registerAllStats();
复制代码


openForTraffic 中主要是为开启服务通信做准备


  @Override    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.        this.expectedNumberOfClientsSendingRenews = count;        updateRenewsPerMinThreshold();        logger.info("Got {} instances from neighboring DS node", count);        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);        this.startupTime = System.currentTimeMillis();        if (count > 0) {            this.peerInstancesTransferEmptyOnStartup = false;        }        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();        boolean isAws = Name.Amazon == selfName;        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {            logger.info("Priming AWS connections for all replicas..");            primeAwsReplicas(applicationInfoManager);        }        logger.info("Changing status to UP");        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);        super.postInit();    }
复制代码


引发向 addIntsance 发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件其中就有向 addInstance 发起请求的


public synchronized void setInstanceStatus(InstanceStatus status) {        InstanceStatus next = instanceStatusMapper.map(status);        if (next == null) {            return;        }
InstanceStatus prev = instanceInfo.setStatus(next); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
复制代码


DiscoveryClient 类内部


         statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {                @Override                public String getId() {                    return "statusChangeListener";                }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
复制代码


指向


    public void run() {        try {            discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
复制代码


最后即是向 addInstance 发起请求的地方


    boolean register() throws Throwable {        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);        EurekaHttpResponse<Void> httpResponse;        try {            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);        } catch (Exception e) {            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);            throw e;        }        if (logger.isInfoEnabled()) {            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());        }        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();    }
复制代码


发起请求即是向 ApplicationResource 的 Instance 方法发起。

eureka 续约在客户端和服务端分别怎么操作实现可用性的

从上面注册中可推测出续约/心跳接口可能也是在 DiscoveryClient 中完成的。搜索 HeatBeat 之后发现注入 DiscoveryClient 方法中有一个初始化定时任务的方法


private void initScheduledTasks() {        if (clientConfig.shouldFetchRegistry()) {            // registry cache refresh timer            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();            scheduler.schedule(                    new TimedSupervisorTask(                            "cacheRefresh",                            scheduler,                            cacheRefreshExecutor,                            registryFetchIntervalSeconds,                            TimeUnit.SECONDS,                            expBackOffBound,                            new CacheRefreshThread()                    ),                    registryFetchIntervalSeconds, TimeUnit.SECONDS);        }
if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
复制代码


其中就有心跳的定时任务。默认的心跳间隔时间 renewalIntervalInSecs 为 30 秒


    /**     * The heartbeat task that renews the lease in the given intervals.     */    private class HeartbeatThread implements Runnable {
public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
复制代码


renew 方法中即是向服务端发起调用的过程,与上述注册基本相同

Eureka 下线服务

Eureka 下线是在 EurekaClientAutoConfiguration 中注入 EurekaClient 时定义的 shutDown 方法。我们可以看到


   @PreDestroy    @Override    public synchronized void shutdown() {        if (isShutdown.compareAndSet(false, true)) {            logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); }
cancelScheduledTasks();
// If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); }
if (eurekaTransport != null) { eurekaTransport.shutdown(); }
heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient"); } }
复制代码


同样的。执行了一个取消定时任务的状态。。另外利用上面说的 applicationInfoManager.setInstanceStatus()方法进行了事件通知,另外 unregister();进行了取消注册操作。eurekaTransport.shutdown();关闭传输。


Eureka 的功能特性总体上来说就是这样。有些地方可能还是不够清楚。欢迎大家一起沟通探讨

发布于: 2021 年 07 月 12 日阅读数: 7
用户头像

赵镇

关注

还未添加个人签名 2017.12.20 加入

还未添加个人简介

评论

发布
暂无评论
Eureka可用性解读