写点什么

源码分析 ElasticJob 选主实现原理

  • 2021 年 11 月 12 日
  • 本文字数:2188 字

    阅读完需:约 7 分钟

}


}


选主直接使用 cautor 开源框架提供的实现类 org.apache.curator.framework.recipes.leader.LeaderLatch。


LeaderLatch 需要传入两个参数:


  • CuratorFramework client:curator 框架客户端。

  • latchPath:锁节点路径,elasticJob 的 latchPath 为: n a m e s p a c e / {namespace}/ namespace/{Jobname}/leader/election/latch。


代码 @1、@2:启动 LeaderLatch,其主要过程就是去锁路径下创建一个临时排序节点,如果创建的节点序号最小,await 方法将返回,否则在前一个节点监听该节点事件,并阻塞,如何获得分布式锁后,执行 callback 回调方法。


LeaderService$LeaderElectionExecutionCallback


@RequiredArgsConstructor


class LeaderElectionExecutionCallback implements LeaderExecutionCallback {


@Override


public void execute() {


if (!hasLeader()) {


jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());


}


}


}


成功获取选主的分布式锁后,如果{namespace}/{jobname}/leader/election/instance 节点不存在,则创建该临时节点,节点存储的内容为 IP 地址 @-@进程 ID,其代码为:jobInstanceId = IpUtils.getIp() + “@-@”+ ManagementFactory.getRuntimeMXBean().getName().split("@")[0];


选主流程如图所示:



上面完成一次选主过程,如果主服务器宕机怎么办?从节点如何接管主服务器的角色呢?基于 ZK 的开发模式一般是监听节点的变化事件,做成相应的处理。


2、ElectionListenerManager 主节点选举监听管理器





  • String jobName:job 名称。

  • LeaderNode leaderNode:主节点信息,封装主节点在 zk 中存储节点信息。

  • ServerNode serverNode:服务器节点信息。

  • LeaderService leaderService:选主服务实现类。

  • ServerService serverService:作业服务器服务类。


LeaderNode、ServerNode 代表存储在 zk 服务器上的路径, LeaderNode 的类图如图所示:



其中 JobNamePath 定义了每一个 Job 在 zk 服务器的存储组织目录,根据器代码显示,例如数据同步项目(MyProject)下定义了两个定时任务(SyncUserJob、SyncRoleJob)。


注册中心命名空间取名为项目名:MyProject,在 zk 的节点存储节点类似如下目录结构,节点存放内容在具体用到时再分析。


2.1 ElectionListenerManager#start

public void start() {


addDataListener(new LeaderElectionJobListener());


addDataListener(new LeaderAbdicationJobListener());


}


首先关注一下使用 ZK 如何添加自定义监听器。


JobNodeStorage#addDataListener


/**


  • 注册数据监听器.

  • @param listener 数据监听器


*/


public void addDataListener(final TreeCacheListener listener) {


TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);


cache.getListenable().addListener(listener);


}


首先获取 TreeCache,然后获取 cahce.getListenable().addListener(TreeCacheListener)。


根据节点路径创建 TreeCache 的方法如下:


ZookeeperRegistryCenter#addCacheData


public void addCacheData(final String cachePath) {


TreeCache cache = new TreeCache(client, cachePath);


try {


cache.start();


//CHECKSTYLE:OFF


} catch (final Exception ex) {


//CHECKSTYLE:ON


RegExceptionHandler.handleException(ex);


}


caches.put(cachePath + "/", cache);


}

2.2 LeaderElectionJobListener

选主事件监听器,监听节点主节点 LeaderNode.INSTANCE{namespace}/{jobname}/leader/election/instance。


如果主节点失去与 zk 的连接,由于 LeaderNode.INSTANCE 为临时节点,当节点被 zk 删除后,会触发其他从节点的选主,但由于任务调度服务器重新建立与 zk 的连接后,并不能直接参与选主,所以当 LeaderNode.INSTANCE 每发送一次变化后,尝试发起一次选主,调用 LeaderService.electLeader 方法。


LeaderElectionJobListener #dataChanged


protected void dataChanged(final String path, final Type eventType, final String data) {


if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {


leaderService.electLeader();


}


}


如果该 job 未停止,并且可以进行选主或 LeaderNode.INSTANCE 节点被删除时,触发一次选主。


LeaderElectionJobListener #isActiveElection


private boolean isActiveElection(final String path, final String data) {


return !leaderService.hasLeader() && isLocalServerEnabled(path, data);


}


如果当前节点不是主节点,并且当前服务器运行正常,运行正常的依据是存在{namespace}/{jobname}/servers/server-ip,并且节点内容不为 DISABLED。


LeaderElectionJobListener #isPassiveElection


private boolean isPassiveElection(final String path, final Type eventType) {


return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobIns


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


tance(jobName).getIp());


}


如果当前事件节点为 LeaderNode.INSTANCE 并且事件类型为删除,并且该 job 的当前对应的实例({namespace}/{jobname}/instances/ip)存在并且状态不为 DISABLED。

2.3 LeaderAbdicationJobListener

主退位监听器,其目的就是删除 LeaderNode.INSTANCE 节点。

评论

发布
暂无评论
源码分析ElasticJob选主实现原理