写点什么

xxl-job 客户端架构流程

作者:IT巅峰技术
  • 2022-11-30
    上海
  • 本文字数:3557 字

    阅读完需:约 12 分钟

一、xxl-job 的调度流程和配置

任务调度器和执行器使用 http 协议通信,各自有轮询线程处理不同业务。



二、XXL-JOB 客户端启动流程

  1. 加载 Bean:

  2. 从 spring 容器获取所有对象,并遍历查找方法上标记 XxlJob 注解的方法 将 xxljob 配置的 jobname 作为 key,对象 Handle 作为 value 注册 Map 中  ConcurrentMap jobHandlerRepository 的 Map 中维护;

  3. 创建执行任务的线程池;

  4. 启动内嵌的 Netty 服务;

  5. 启动注册线程,每隔 30s 上报一次注册信息。


public class EmbedServer {    public void start(final String address, final int port, final String appname, final String accessToken) {        executorBiz = new ExecutorBizImpl();        thread = new Thread(new Runnable() {            @Override            public void run() {                // param                EventLoopGroup bossGroup = new NioEventLoopGroup();                EventLoopGroup workerGroup = new NioEventLoopGroup();                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(                        0,                        200,                        60L,                        TimeUnit.SECONDS,                        new LinkedBlockingQueue<Runnable>(2000));                // start server                ServerBootstrap bootstrap = new ServerBootstrap();                bootstrap.group(bossGroup, workerGroup)                        .channel(NioServerSocketChannel.class)                        .childHandler(new ChannelInitializer<SocketChannel>() {                            @Override                            public void initChannel(SocketChannel channel) throws Exception {                                channel.pipeline()                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle                                        .addLast(new HttpServerCodec())                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));                             }                        }).childOption(ChannelOption.SO_KEEPALIVE, true);                ChannelFuture future = bootstrap.bind(port).sync();                // start registry                startRegistry(appname, address);                // wait util stop                future.channel().closeFuture().sync();            }        });        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave        thread.start();    }}
复制代码

三、任务的下发与执行

任务的下发与执行(服务端发送给客户端):


收到服务器不动执行进行任务分发:


private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {      switch (uri) {          case "/beat":              return executorBiz.beat();          case "/idleBeat":              IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);              return executorBiz.idleBeat(idleBeatParam);          case "/run":              TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);              return executorBiz.run(triggerParam);          case "/kill":              KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);              return executorBiz.kill(killParam);          case "/log":              LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);              return executorBiz.log(logParam);          default:              return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");        }}
复制代码

1 /beat:心跳保活检测

直接 return success,用户服务器探活;

2 /idleBeat:任务执行策略配置为忙碌转移时使用;

等待队列如果存在待执行任务时,返回 false;


等待队列为空时:返回 true;

3 /run:接收到执行任务指令

将任务提交到执行队列中,并返回 true;


队列满或 handler 不存在时返回 false;

4 /kill:中断任务

对执行任务的线程执行 JobThread.interrupt();


每个任务 Id 会有一个线程,Kill 仅杀死执行该任务 Id 的线程,下次再下发任务发现线程已中断会重新创建线程。

5 /log:获取执行 log

返回客户端执行的本地 log 给服务端。

四、客户端注册和执行结果上报

客户端注册和执行结果上报(客户端发送给服务端)


@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);}@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {    return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);}
复制代码

1 /registry:注册客户端信息

启动线程定时注册自己的服务到调度器;


创建线程,30s 轮询一次,上报注册信息。

2 /registryRemove:移出执行器请求

将自己从执行器列表移除;


程序退出时会调用一次,在 Netty 的 finally 代码块自动执行。

3 /callback:异步回调结果

执行器异步回调给调度器执行任务结果;


每次任务完成时上报。

五、附录

1 网络通信格式:

(1)客户端注册


http://127.0.0.1:8080/xxl-job-admin/api/registry{    "registryGroup": "EXECUTOR"    "registryKey": "xxl-job-executor-sample"    "registryValue": "http://172.30.0.67:9999/"}Response:{    "code": 200    "msg": null    "content": null}
复制代码


(2)客户端移除注册


http://127.0.0.1:8080/xxl-job-admin/api/registryRemove{    "registryGroup": "EXECUTOR"    "registryKey": "xxl-job-executor-sample"    "registryValue": "http://xxljob-axzo.cn"}Response:{    "code": 200    "msg": null    "content": null}
复制代码


(3)客户端执行任务结果上报


http://127.0.0.1:8080/xxl-job-admin/api/callback{    "logId": 1238    "logDateTim": 1667197980007    "handleCode": 200}Response:{    "code": 200    "msg": null    "content": null}
复制代码


(4)执行器下发任务:同步回调仅代表任务是否发送成功


http://172.30.0.67:9999/run{    "jobId": 4    "executorHandler": "demoJobHandler"    "executorParams": ""    "executorBlockStrategy": "SERIAL_EXECUTION"    "executorTimeout": 0    "logId": 1238    "logDateTime": 1667197980007    "glueType": "BEAN"    "glueSource": ""    "glueUpdatetime": 1666683613000    "broadcastIndex": 0    "broadcastTotal": 1}Response:{    "code": 200    "msg": null    "content": null}
复制代码

2 Token 配置详解

1.配置了 token 后,client 发送的每隔 http 请求头会带上 XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;


2.该参数不会对请求参数加密;


3.如果配置不匹配,客户端请求报错:


{"code": 500"msg": "The access token is wrong.""content": null}
复制代码


4.发送配置 token 的请求,Header 中新增了 Token 参数



5.配置错 token 的返回





程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT 巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。


作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

发布于: 刚刚阅读数: 3
用户头像

一线架构师、二线开发、三线管理 2021-12-07 加入

Redis6.X、ES7.X、Kafka3.X、RocketMQ5.0、Flink1.X、ClickHouse20.X、SpringCloud、Netty5等热门技术分享;架构设计方法论与实践;作者热销新书《RocketMQ技术内幕》;

评论

发布
暂无评论
xxl-job客户端架构流程_IT巅峰技术_InfoQ写作社区