这是真实发生在生产环境的 case,实例启动后正常运行,而在注册中心的状态一直保持STARTING
,而本地的状态为UP
。导致服务的消费方无法发现可用实例。
这种情况的出现概率非常低,运行一年多未发现两个实例同时出现问题的情况,因此多实例运行可以避免。文末有问题的解决方案,不想花时间看分析过程可直接跳到最后。
环境说明:
eureka-client: 1.7.2 spring-boot: 1.5.12.RELEASE spring-cloud: Edgware.SR3
问题重现
借助Btrace
重现, java -noverify -cp .:btrace-boot.jar -javaagent:btrace-agent.jar=script=<pre-compiled-btrace-script> <MainClass> <AppArguments>
思路
主线程更新实例本地状态(STARTING->UP)前, 等待心跳线程完成第一次心跳并尝试注册实例, 获取到当前的状态STARTING
. 主线程更新状态后触发
Btrace 脚本
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnMethod;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.sun.btrace.BTraceUtils.currentThread;
import static com.sun.btrace.BTraceUtils.println;
/**
* @author Addo.Zhang
* @date 2019-07-31
*/
@BTrace(unsafe = true)
public class EurekaRequest {
public static final AtomicBoolean heartbeatThreadRegistrationStarted = new AtomicBoolean(false);
public static final AtomicBoolean replicatorThreadRegistrationCompleted = new AtomicBoolean(false);
public static final AtomicBoolean statusUP = new AtomicBoolean(false);
@OnMethod(clazz = "org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry", location = @Location(value = Kind.LINE, line = 45))
public static void waitHeartbeatExecution() {
println(currentThread() + " is waiting heartbeatThreadRegistrationStarted thread executing first");
while (!heartbeatThreadRegistrationStarted.get()) ;
}
@OnMethod(clazz = "org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry", location = @Location(value = Kind.LINE, line = 46))
public static void markStatusUp() {
statusUP.set(true);
println("Heartbeat thread executed and " + currentThread() + " continues procedure to change status to [UP]");
}
@OnMethod(clazz = "com.netflix.discovery.converters.EurekaJacksonCodec$InstanceInfoSerializer", location = @Location(value = Kind.LINE, line = 369))
public static void continueRegistrationExecution() {
doExecution();
}
@OnMethod(clazz = "com.logancloud.forge.discovery.converters.LoganEurekaJacksonCodec$LoganInstanceInfoSerializer", location = @Location(value = Kind.LINE, line = 117))
public static void continueRegistrationExecution2() {
doExecution();
}
private static void doExecution() {
println(currentThread() + " started to proceed registration");
if (Thread.currentThread().getName().contains("HeartbeatExecutor")) {
heartbeatThreadRegistrationStarted.set(true);
while (!statusUP.get() || !replicatorThreadRegistrationCompleted.get()) {
}
try {
Thread.sleep(500); //interval for replicator registration request completed.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else if (Thread.currentThread().getName().contains("InstanceInfoReplicator")) {
replicatorThreadRegistrationCompleted.set(true);
}
println(currentThread() + " thread registration completed");
}
}
复制代码
心跳线程HeartbeatThread
发送心跳请求(PUT
), 注册中心返回 404.
实例信息同步线程InstanceInfoReplicator
发送注册请求(POST
): 状态为UP
, lastDirtyTimestamp
为a
心跳线程发送实例注册请求(POST
): 状态为STARTING
, lastDirtyTimestamp
为a
服务注册
先分析服务实例的注册逻辑.
InstanceInfo 初始化
通过InstanceInfoFactory#create()
方法来初始化ApplicationInfoManager.instanceInfo
实例时, 实例状态被设置为STARTING
服务实例注册
服务实例注册的真正逻辑是在DiscoveryClient#register()
中完成的. 但是这个方法的调用却有两个入口, 在整个过程中可解释为主动注册和被动注册.
一. 主动注册
EurekaAutoServiceRegistration
实现了SmartLifecycle
接口, 在EurekaClientAutoConfiguration#eurekaAutoServiceRegistration()
被实例化.
EurekaAutoServiceRegistration#start()
方法将EurekaRegistration
注册给EurekaServiceRegistry
:
public void start() {
...
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//调用EurekaServiceRegistry进行注册
this.serviceRegistry.register(this.registration);
//发布实例注册的事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
复制代码
EurekaServiceRegistry#register()
: 先将实例状态设置为初始状态 UP(可通过eureka.instance.initial-status
修改, 默认为UP
). 这里会触发StatusChangeListener#notify()
.
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
//1
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
if (reg.getHealthCheckHandler() != null) {
//2
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
复制代码
DiscoveryClient
内部匿名类提供了StatusChangeListener
的实现, 调用InstanceInfoReplicator#onDemandUpdate()
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
复制代码
InstanceInfoReplicator
是在DiscoveryClient#initScheduledTasks()
中实例化的Runnable
的实现, 实例化之后, 使用其内部的调度线程池调度一个线程. 而onDemandUpdate()
也同样会使用调度线程池调度一个线程.
其#run()
方法会调用DiscoveryClient#refreshInstanceInfo()
来更新状态. 状态的更新是通过HealthCheckHandler
来实现的, 具体请看状态检查. 然后调用DiscoveryClient#register()
方法进行注册.
二. 被动注册
上面提到了DiscoveryClient#initScheduledTasks()
, 这里的 task 除了InstanceInfoReplicator
之外还有其他的线程. 其中一个是线条线程HeartbeatThread
. 这个线程会每隔一段时间向注册中心发送一个PUT
类型的 HTTP 请求: 上报实例的状态(状态(status), 以及状态修改的时间(lastDirtyTimestamp)).
这个请求可能会有两种结果: 404
和200
. 前者说明注册中心中还没有这个实例的注册信息; 后者说明状态上报成功.
假如是404
, 便直接发起注册的动作, 即调用DiscoveryClient#register()
方法进行注册.
状态检查
CloudEurekaClient
通过HealthCheckHandler
来检查实例的健康状态, 看下HealthCheckCallbackToHandlerBridge
实现: callback 为空, 或者当前状态为STARTING
或者OUT_OF_SERVICE
时, 返回当前的状态. 我们没有设置 callback, 故而总是会返回当前的状态. 比如应用启动的初始状态为STARTING
public InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus) {
if (null == callback || InstanceInfo.InstanceStatus.STARTING == currentStatus
|| InstanceInfo.InstanceStatus.OUT_OF_SERVICE == currentStatus) { // Do not go to healthcheck handler if the status is starting or OOS.
return currentStatus;
}
return callback.isHealthy() ? InstanceInfo.InstanceStatus.UP : InstanceInfo.InstanceStatus.DOWN;
}
复制代码
问题分析
现象
TCP 抓包
HeartBeat 请求和 Fetch 请求正常. status=UP&lastDirtyTimestamp=1545039481813
堆信息
本地状态为 UP, lastDirtyTimestamp
为 1545039481813, lastUpdatedTimestamp
为 1545039472888
注册中心里的实例信息
状态为 STARTING, lastDirtyTimestamp
为 1545039481813, registrationTimestamp
为 1545039481898, lastUpdatedTimestamp
为 1545039481899
<instance>
<instanceId>xp-xtower-webapp-boot-6-txcxb:xp-xtower-webapp-boot:10100</instanceId>
<hostName>10.128.41.74</hostName>
<app>XP-XTOWER-WEBAPP-BOOT</app>
<ipAddr>10.128.41.74</ipAddr>
<status>STARTING</status>
<overriddenstatus>UNKNOWN</overriddenstatus>
<port enabled="true">10100</port>
<securePort enabled="false">443</securePort>
<countryId>1</countryId>
<dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
<name>MyOwn</name>
</dataCenterInfo>
<leaseInfo>
<renewalIntervalInSecs>5</renewalIntervalInSecs>
<durationInSecs>20</durationInSecs>
<registrationTimestamp>1545039481898</registrationTimestamp>
<lastRenewalTimestamp>1545950719063</lastRenewalTimestamp>
<evictionTimestamp>0</evictionTimestamp>
<serviceUpTimestamp>0</serviceUpTimestamp>
</leaseInfo>
<metadata>
<forge>1.0.0</forge>
<management.port>10100</management.port>
<jmx.port>1099</jmx.port>
<group>innovation</group>
</metadata>
<homePageUrl>http://10.128.41.74:10100/</homePageUrl>
<statusPageUrl></statusPageUrl>
<healthCheckUrl>http://10.128.41.74:10100/health</healthCheckUrl>
<vipAddress>xp-xtower-webapp-boot</vipAddress>
<secureVipAddress>xp-xtower-webapp-boot</secureVipAddress>
<isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
<lastUpdatedTimestamp>1545039481899</lastUpdatedTimestamp>
<lastDirtyTimestamp>1545039481813</lastDirtyTimestamp>
<actionType>ADDED</actionType>
</instance>
复制代码
ScopeStatuslastDirtyTimestamplastUpdatedTimestampregistrationTimestampRequestUP1545039481813LocalUP15450394818131545039472888RemoteSTARTING154503948181315450394818991545039481898
结合起来看, 问题出在lastDirtyTimestamp
未更新, 导致注册中心的状态未更新. 而lastUpdatedTimestamp
的时间为 1545039481899, 与lastDirtyTimestamp
相差86毫秒
.
服务端InstanceResource#validateDirtyTimestamp()
根据本地保存的实例的信息, 和心跳请求发送过来的请求做比较, 决定响应的状态码200
, 404
或者409
推理
注册中心里实例的状态为STARTING
, 可以确定实例是[被动注册](#二. 被动注册)的.
这里有几个时间点:
1545039472888
: InstanceInfo
对象实例化的时间, 因为本地对象的#lastUpdatedTimestamp
字段只有在实例化才会赋值, 此后不会被修改. 见堆信息
1545039481813
: 状态从STARTING
变为UP
的时间, 也是实例状态的最后一次更新时间. 此后的心跳请求都会带上实例的最新状态(UP
)和状态的最后一次更新时间(1545039481813
), 见TCP抓包.
1545039481898
: 注册中心收到实例的注册请求的时间. 见注册中心里的实例信息
1545039481899
: 注册中心中的实例信息被更新的时间. 这个时间只比注册的时间晚了 1 毫秒. 见注册中心里的实例信息
综上可见, 被动注册时发送请求, 拿到的实例的旧的状态STARTING
, 修改时间确实最新的1545039481813
. 后续的心跳上报实例状态为最新的UP
, 修改时间也是最新的1545039481813
. 但是由于最后修改时间与注册时的最后修改时间相同, 即使状态已经变为UP
, 注册中心在收到心跳请求之后也不会将状态更新为UP
.
服务端InstanceResource#renewLease()
-> InstanceResource#validateDirtyTimestamp()
: 如果请求中的lastDirtyTimestamp
与当前保存的实例的相同, 则直接返回 OK, 不会更新注册中心中保存的实例的状态.
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
// In the case of replication, send the current instance info in the registry for the
// replicating node to sync itself with this one.
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
} else {
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}
复制代码
为什么会出现这种情况?
应用启动过程中会有两个线程会触发注册的动作
InstanceInfoReplicator
线程: DiscoveryClient
中的ApplicationInfoManager.StatusChangeListener
监听到实例状态发生变化, 会新建一个线程将实例注册到注册中心
DiscoveryClient$HeartbeatThread
线程: 这个线程在DiscoveryClient
实例初始化后延迟(与心跳间隔时间相同, 默认是30s
, 中台为提高实例发现效率将其改为了5s
)启动运行. 第一次发送心跳请求是如果注册中心返回 404(说明心跳线程提实例状态更新线程先启动), 则会先将实例注册到注册中心.
上面两个线程都通过调用AbstractJerseyEurekaHttpClient$register()
方法并使用EurekaJacksonCodec$InstanceInfoSerializer
将实例信息序列化. 序列化的过程中先记录实例的状态后记录实例状态的最后修改时间(lastDirtyTimestamp), 这两个操作不是一个原子操作.
非常极端的情况下(缩小心跳间隔增加了出现的概率, 但依然极地), 两个操作之间(心跳线程先拿到实例状态STARTING
)主线程修改了实例状态为UP
, 同时修改了lastDirtyTimestamp
, 并触发了InstanceInfoReplicator
线程的注册操作, 此时心跳线程获取到的实例的最后修改时间与STARTING
状态并不一致. 之后同样注册动作覆盖了实例在注册中心的状态: UP -> STARTING
.
后续的心跳请求带去的最新状态UP
和lastDirtyTimestamp
, 并不会更新在注册中心的状态.
解决方案
在EurekaJacksonCodec$InstanceInfoSerializer#serialize()
方法中, 将#autoMarshalEligible()
的调用移到jgen.writeStartObject()
后面. 这样就使得lastDirtyTimestamp
的获取比status
早, 就能保证即使注册时的lastDirtyTimestamp
小于真正的, 但是状态是与实际相符. lastDirtyTimestamp
会在后续的心跳请求中更新.
addInstance-info.getStatus(): UP
addInstance-info.getLastDirtyTimestamp(): 1565164484429
addInstance-info.getStatus(): STARTING
addInstance-info.getLastDirtyTimestamp(): 1565164484415
renew-status-in-registry: UP
renew-lastDirtyTimestamp: 1565164484429
renew-appInfo.getLastDirtyTimestamp(): 1565164484429
复制代码
PR 已经提交并合并完成,然而 1.7.x 的版本不知何时会发布修复版本
参考:
文章同步发送到公众号:云编码 (微信号:sevenfeet)。
评论