1. 概述
本文主要分享 Elastic-Job-Lite 作业失效转移。
当作业节点执行作业异常崩溃时,其所分配的作业分片项在下次重新分片之前不会被重新执行。开启失效转移功能后,这部分作业分片项将被其他作业节点抓取后“执行”。为什么此处的执行打引号呢?下文我们会分享到噢,卖个关子。
笔者对失效转移理解了蛮久时间,因此引用官方对它的解释,让你能更好地理解:
来源地址:https://my.oschina.net/u/719192/blog/506062 失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项执行。-- 分隔符 --来源地址:http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/03-design/lite-design/实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
这样看概念可能还是比较难理解,代码搞起来!
涉及到主要类的类图如下( 打开大图 ):
粉色的内在 com.dangdang.ddframe.job.lite.internal.failover 包下,实现了 Elastic-Job-Lite 作业失效转移。
FailoverService,作业失效转移服务。
FailoverNode,作业失效转移数据存储路径。
FailoverListenerManager,作业失效转移监听管理器。
你行好事会因为得到赞赏而愉悦同理,开源项目贡献者会因为 Star 而更加有动力为 Elastic-Job 点赞!传送门
2. 作业节点崩溃监听
当作业节点崩溃时,监听器 JobCrashedJobListener 会监听到该情况,进行作业失效转移处理。
// JobCrashedJobListener.java
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType
&& instanceNode.isInstancePath(path)) { // /${JOB_NAME}/instances/${INSTANCE_ID}
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); // /${JOB_NAME}/sharding/${ITEM_ID}/failover
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) { // /${JOB_NAME}/sharding/${ITEM_ID}/instance
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
复制代码
复制代码
通过判断 /${JOB_NAME}/instances/${INSTANCE_ID} 被移除,执行作业失效转移逻辑。❓说好的作业节点崩溃呢?经过确认,目前这块存在 BUG,未判断作业节点是否为崩溃。所以在当前版本,作业失效转移面向的是所有作业节点关闭逻辑,不仅限于作业崩溃关闭。
优先调用 FailoverService#getFailoverItems(...) 方法,获得关闭作业节点( ${JOB_INSTANCE_ID} )对应的 ${JOB_NAME}/sharding/${ITEM_ID}/failover 作业分片项。
若该作业分片项为空,再调用 ShardingService#getShardingItems(...) 方法,获得关闭作业节点( ${JOB_INSTANCE_ID} )对应的 /${JOB_NAME}/sharding/${ITEM_ID}/instance 作业分片项。
为什么是这样的顺序呢?放在 FailoverService#failoverIfNecessary() 一起讲。这里先看下 FailoverService#getFailoverItems(...) 方法的实现:
// FailoverService
public List<Integer> getFailoverItems(final String jobInstanceId) {
List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
List<Integer> result = new ArrayList<>(items.size());
for (String each : items) {
int item = Integer.parseInt(each);
String node = FailoverNode.getExecutionFailoverNode(item); // `${JOB_NAME}/sharding/${ITEM_ID}/failover`
if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
result.add(item);
}
}
Collections.sort(result);
return result;
}
复制代码
// FailoverService.java
public void setCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item)); // /${JOB_NAME}/leader/failover/items/${ITEM_ID}
}
}
private boolean isFailoverAssigned(final Integer item) {
return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
}
复制代码
3. 作业失效转移
调用 FailoverService#failoverIfNecessary() 方法,如果需要失效转移, 则执行作业失效转移。
// FailoverService.java
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
复制代码
复制代码
private boolean needFailover() {
// `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 有失效转移的作业分片项
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
// 当前作业不在运行中
&& !JobRegistry.getInstance().isJobRunning(jobName);
}
复制代码
条件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效转移的作业分片项。
条件二:当前作业不在运行中。此条件即是上文提交的作业节点空闲的定义。
失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器【空闲】,抓取未完成的孤儿分片项执行
FailoverLeaderExecutionCallback 回调逻辑如下:
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
// 判断需要失效转移
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// 获得一个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
// 设置这个 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作业分片项 为 当前作业节点
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
// 移除这个 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作业分片项
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不应使用triggerJob, 而是使用executor统一调度 疑问:为什么要用executor统一,后面研究下
// 触发作业执行
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
}
复制代码
复制代码
再次调用 #needFailover() 方法,确保经过分布式锁获取等待过程中,仍然需要失效转移。因为可能多个作业节点调用了该回调,第一个作业节点执行了失效转移,可能第二个作业节点就不需要执行失效转移了。
调用 JobNodeStorage#getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT)#get(0) 方法,获得一个 ${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项。 调用 JobNodeStorage#fillEphemeralJobNode(...) 方法,设置这个临时数据节点 ${JOB_NAME}/sharding/${ITEM_ID}failover 作业分片项为当前作业节点( ${JOB_INSTANCE_ID} )。 调用 JobNodeStorage#removeJobNodeIfExisted(...) 方法,移除这个 ${JOB_NAME}/leader/failover/items/${ITEM_ID} 作业分片项。
调用 JobScheduleController#triggerJob() 方法,立即启动作业。调用该方法,实际作业不会立即执行,而仅仅是进行触发。如果有多个失效转移的作业分片项,多次调用 JobScheduleController#triggerJob() 方法会不会导致作业是并行执行的?答案是不会,因为一个作业的 Quartz 线程数设置为 1。
// JobScheduler.java
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
// ... 省略无关代码
result.put("org.quartz.threadPool.threadCount", "1"); // Quartz 线程数:1
// ... 省略无关代码
return result;
}
复制代码
如果说作业分片项实现转移时,每个作业节点都不处于非空闲状态,岂不是 FailoverLeaderExecutionCallback 一直无法被回调?答案当然不是的。作业在执行完分配给自己的作业分片项,会调用 LiteJobFacade#failoverIfNecessary() 方法,进行失效转移的作业分片项抓取:
public final void execute() {
// ... 省略无关代码
// 执行 普通触发的作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 执行 被跳过触发的作业
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 执行 作业失效转移
jobFacade.failoverIfNecessary();
// ... 省略无关代码
}
// LiteJobFacade.java
@Override
public void failoverIfNecessary() {
if (configService.load(true).isFailover()) {
failoverService.failoverIfNecessary();
}
}
// FailoverService.java
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
复制代码
复制代码
让我们在返回 JobCrashedJobListener 处代码,为什么获取失效转移的作业分片项是这样的优先顺序?一个作业节点拥有 ${JOB_NAME}/sharding/${ITEM_ID}/failover 数据分片项,意味着分配给它的作业分片项已经执行完成,否则怎么回调 FailoverLeaderExecutionCallback 方法,抓取失效转移的作业分片项呢?!
此处 JobFacade#failoverIfNecessary() 方法,只会抓取一个失效转移的作业分片,这样带来的好处是,多个作业分片可以一起承担执行失效转移的分片集合。举个例子:一个作业集群有 A / B / C 三个节点,分成六个作业分片,如果 C 节点挂了,A / B 节点分担 C 节点的两个分片。但是,也可能会存在失效转移的分片被漏执行。举个例子:一个作业集群有 A / B / C 三个节点,分成九个作业分片,如果 C 节点挂了,A / B 节点分担 C 节点的两个分片,有一个被漏掉,只能等下次作业分片才能执行。未来这块算法会进行优化。
4. 获取作业分片上下文集合
在《Elastic-Job-Lite 源码分析 —— 作业执行》「4.2 获取当前作业服务器的分片上下文」中,我们可以看到作业执行器( AbstractElasticJobExecutor ) 执行作业时,会获取当前作业服务器的分片上下文进行执行。获取过程总体如下顺序图( 打开大图 ):
实现代码如下:
// LiteJobFacade.java
@Override
public ShardingContexts getShardingContexts() {
// 获得 失效转移的作业分片项
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
// 【忽略,作业分片详解】获取当前作业服务器分片上下文
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
// 【忽略,作业分片详解】作业分片,如果需要分片且当前节点为主节点
shardingService.shardingIfNecessary();
// 【忽略,作业分片详解】获得 分配在本机的作业分片项
List<Integer> shardingItems = shardingService.getLocalShardingItems();
// 移除 分配在本机的失效转移的作业分片项目
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
// 移除 被禁用的作业分片项
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
// 【忽略,作业分片详解】获取当前作业服务器分片上下文
return executionContextService.getJobShardingContext(shardingItems);
}
复制代码
复制代码
// FailoverService.java
public List<Integer> getLocalFailoverItems() {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return Collections.emptyList();
}
return getFailoverItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); // `${JOB_NAME}/sharding/${ITEM_ID}/failover`
}
复制代码
shardingItems.removeAll(failoverService.getLocalTakeOffItems())。为什么呢?举个例子,作业节点 A 持有作业分片项[0, 1],此时异常断网,导致[0, 1]被作业节点 B 失效转移抓取,此时若作业节点 A 恢复,作业分片项[0, 1]依然属于作业节点 A,但是可能已经在作业节点 B 执行,因此需要进行移除,避免多节点运行相同的作业分片项
FailoverService#getLocalTakeOffItems() 方法实现代码如下:
// FailoverService.java
/**
* 获取运行在本作业服务器的被失效转移的序列号.
*
* @return 运行在本作业服务器的被失效转移的序列号
*/
public List<Integer> getLocalTakeOffItems() {
List<Integer> shardingItems = shardingService.getLocalShardingItems();
List<Integer> result = new ArrayList<>(shardingItems.size());
for (int each : shardingItems) {
if (jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(each))) {
result.add(each);
}
}
return result;
}
复制代码
5. 监听作业失效转移功能关闭
class FailoverSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType
&& !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) { // 关闭失效转移功能
failoverService.removeFailoverInfo();
}
}
}
复制代码
原文链接:https://juejin.cn/post/6844904176259072013
最后,小编还整理了大厂的面试宝典,有需要的添加小助理 vx:mxzFAFAFA 来获取!
评论