元数据
除了普通的基础设置之外,eureka 支持自定义元数据。配置方式如下
eureka:
instance:
metadata-map:
cluster: cl1
name: zhaozhen
复制代码
获取元数据代码
List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume");
ServiceInstance serviceInstance = list.get(0);
list.stream().forEach(s->{
System.out.println(s.getMetadata());
});
复制代码
在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行
可用性
从技术网站上搜到的一个面试题就有这样的问题:eureka 怎么保证可用性.众所周知,eureka 采用的是 AP 模式,实现高可用最好的方式就是利用最少三台 eureke server 实例,实现两两之间的服务注册。从而达到同步数据的目的那么这就涉及到如下的方面
eureka client 和 eureka server 之间如何进行通信
eureka 注册在客户端和服务端分别怎么操作实现可用性的
eureka 续约/心跳在客户端和服务端分别怎么操作实现可用性的
eureka 下线是怎么操作的
eureka client 和 eureka server 之间如何进行通信
通过查询各种资料并追踪自动配置类发现,eureka 和 eureka 之间的通信是采用类似 springmvc 的 Jersey 框架暴露接口进行通信的。通信的形式基本类似于我们使用 http 进行请求的方式。在 EurekaServerAutoConfiguration 中通过注入 FilterRegistrationBean 实现了在 filter 中加入包含了指定包名下的所有的 Jersey 的外部接口
/**
* Register the Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
/**
* Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
* required by the Eureka server.
*/
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
//
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
//
Set<Class<?>> classes = new HashSet<>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
//
Map<String, Object> propsAndFeatures = new HashMap<>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
复制代码
代码中扫描的 EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery","com.netflix.eureka" };)即是 Jersey 框架的具体的接口类
另外可以提一点的就是,eureka 对外暴露的 dashboard 依然采用的是 springmvc 的 controller 形式。具体的可以看到在 EurekaServerAutoConfiguration 中注入的 EurekaController
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
复制代码
感兴趣的可以再研究下后续 EurekaController 的内部实现
eureka 注册在客户端和服务端分别怎么操作实现可用性的
服务每隔 30 秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在 90 秒后到期,然后服务会被失效。每隔 30 秒的续约操作我们称之为⼼跳检测首先在服务端,通过上述的 Jersey 框架暴露的接口进行注册,在 ApplicationResource 中通过 addInstance 进行注册,在这个过程中另一个 eureka server 也相当于是一个 eureka client,同样会进行注册
通过 addInstance 中的 register 方法,一直向下调试到 PeerAwareInstanceRegistryImpl 的 replicateInstanceActionsToPeers 相互注册方法
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
复制代码
此时,注册时,进入的是 Register
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
复制代码
查阅源码可知此处的 getLeaseRenewalOf(info)的默认值为 90 秒,这就印证了 90 秒到期的说法
private static int getLeaseRenewalOf(InstanceInfo info) {
return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
}
复制代码
发起请求
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
复制代码
发起请求的地址可以追踪到的是 ApplicationsResource 中的
@Path("{appId}")
public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
return new ApplicationResource(appId, serverConfig, registry);
}
复制代码
此处重新构建了一个 ApplicationResource 对象。并将服务的信息配置等传递到 application 中,等待后续使用分析完这一段之后,我对 addInstance 如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过 EurekaServerAutoConfiguration 引入的 EurekaServerInitializerConfiguration 来完成的,
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
}
复制代码
EurekaServerInitializerConfiguration 实现了 SmartLifecycle 方法,start 方法会再容器初始化时执行。而 start 方法的内容
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
复制代码
具体的业务内容在
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
复制代码
第一步 initEurekaEnvironment 为初始化环境,第二步 initEurekaServerContext 为业务操作而随后的操作中最主要的是
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
复制代码
openForTraffic 中主要是为开启服务通信做准备
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
复制代码
引发向 addIntsance 发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件其中就有向 addInstance 发起请求的
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
复制代码
DiscoveryClient 类内部
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
复制代码
指向
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
复制代码
最后即是向 addInstance 发起请求的地方
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
复制代码
发起请求即是向 ApplicationResource 的 Instance 方法发起。
eureka 续约在客户端和服务端分别怎么操作实现可用性的
从上面注册中可推测出续约/心跳接口可能也是在 DiscoveryClient 中完成的。搜索 HeatBeat 之后发现注入 DiscoveryClient 方法中有一个初始化定时任务的方法
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
复制代码
其中就有心跳的定时任务。默认的心跳间隔时间 renewalIntervalInSecs 为 30 秒
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
复制代码
renew 方法中即是向服务端发起调用的过程,与上述注册基本相同
Eureka 下线服务
Eureka 下线是在 EurekaClientAutoConfiguration 中注入 EurekaClient 时定义的 shutDown 方法。我们可以看到
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
复制代码
同样的。执行了一个取消定时任务的状态。。另外利用上面说的 applicationInfoManager.setInstanceStatus()方法进行了事件通知,另外 unregister();进行了取消注册操作。eurekaTransport.shutdown();关闭传输。
Eureka 的功能特性总体上来说就是这样。有些地方可能还是不够清楚。欢迎大家一起沟通探讨
评论