xxl-job 源码运行解析,java 基础编程视频


现在我们在调度中心点击启动,会调用 start 接口:



在上图的接口中我们可以看到,它会添加一个 job 任务到 quartz 中,任务的执行类是 RemoteHttpJobBean,现在我们查看一下这个类:



上图中我们可以看到会通过一个快慢线程池去执行 trigger,1 分钟内若有 10 次运行超过 500ms 就会启动慢线程池,继续进入 XxlJobTrigger.trigger:

上述代码中首先通过 jobId 查询数据库中保存的定时任务信息,然后是失败重试次数和分片机制,我们先跳过,直接看??processTrigger:


上述代码中首先获取阻塞策略,默认是?Serial execution 串行执行,然后是路由策略,默认是 FIRST 取第一台机器执行,然后是重点执行定时任务的函数 runExecutor:?

我们直接看?XxlJobDynamicScheduler.getExecutorBiz(address) , 这里的?address 是执行器管理中注册的 ip 地址,如下图:


这里实例化了一个?XxlRpcReferenceBean :

这里初始化了一个 netty_http 服务端和客户端?

采用?HessianSerializer 作为序列化,?CallType.SYNC? 采用同步的请求方式,?LoadBalance.ROUND 负载使用轮询的方式。
现在我们查看 getObject 方法:


看到这里我们知道这是一个典型的动态代理的设计方式(ps: 不熟悉动态代理的,可以看这里我的这篇博客?动态代理白话解析)
我们继续往下看??executorBiz.run(triggerParam) ,在执行这个方法时会去调用代理处理类? InvocationHandler 中的?invoke 方法,我们直接看上图中的 invoke 方法:
前面一段是 如果代理?XxlRpcGenericService 的 invoke 方法时做的一些特殊处理,然后若 被代理类为 Object 则会抛出异常,否则 就是通过路由策略路由到一个最终请求地址。最后是来到了 RPC 的请求调用处理
xxl-job? 的 RPC 请求调用处理
首先实例化了一个?XxlRpcRequest ,有? className 类名 ( 即? com.xxl.job.core.biz.ExecutorBiz ),methodName
方法名? ( 即 run ),parameterTypes 参数类型名称,parameters? 方法参数 这 4 个重要属性。我们继续看?XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest) 这里会去调用?NettyHttpClient?的?asyncSend :


在?getPool 方法中获取到 NettyHttpClient 后,会执行它的 init 方法,也就是 netty 的?http 客户端初始化?:


然后这里通过连接池获取? NettyHttpConnectClient 连接,调用其 send 方法发送请求:

然后会通过 NettyHttpClientHandler 获取返回结果 :



这里获取到返回结果后会调用?channelRead0 方法,将? XxlRpcResponse 放进? XxlRpcFutureResponse 中 ,我们继续往下走 看? futureResponse.get 查看获取结果的方法:

这里会加锁调用 wait? 方法阻塞只到获取返回结果,我们查看 获取返回结果的响应方法?futureResponse.setResponse :

在获取到返回结果后会调用?notifyAll 唤醒 wait 阻塞,继续往下执行? xxlRpcResponse.getResult() 得到返回结果,到这里 调度中心的请求就执行完成了。
xxl-job 执行器源码解析? ( RPC 请求服务端)
现在我们看执行器任务执行的源码解析:

首先执行器在启动时会实例化 XxlJobSpringExecutor 类,调用其 start? 方法进行初始化:

调用父类的 start 方法:


这里通过配置的 ip , port ,appName 初始化了 XxlRpcProviderFactory,添加了? ExecutorBizImpl 执行类 ,继续往下看 start 方法:?



这里 使用一个守护线程启动了一个 netty 的 http 服务端, 在接受到调度中心调度请求后,会执行? NettyHttpServerHandler 的 channelRead0 方法:

这里采用一个线程池处理请求,继续往下看? process 方法:

这里的调度请求 uri 不是包含 services ,所以我们直接看? this.xxlRpcProviderFactory.invokeService 方法 :

评论