写点什么

一文搞定 Flink Job 的运行过程

用户头像
shengjk1
关注
发布于: 2021 年 04 月 22 日

背景

之前我们知道了Flink 是如何生成 StreamGraph 以及 如何生成 job如何生成Task,现在我们通过 Flink Shell 将他们串起来,这样我们就学习了从写代码开始到 Flink 运行 task 的整个过程是怎么样的。

正文

我们经常通过 Flink Shell 提交代码,如 flink run -p 2 -m yarn-cluster -ynm test -c test ./test-1.0-SNAPSHOT.jar "file" "./test.properties"& 通过 flink shell 我们可以知道 org.apache.flink.client.cli.CliFrontend 为整个 Flink Job 的入口类


/**   * Submits the job based on the arguments.   */  public static void main(final String[] args) {    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory);
try { final CliFrontend cli = new CliFrontend( configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration)); int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); System.exit(retCode); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); System.exit(31); } }
复制代码


main 很简单,主要就两步,发现并加载配置文件,加载并解析命令。在解析命令的过程当中,如果传入的命令是 run,则可以一直追踪到 executeProgram 方法


protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {    logAndSysout("Starting execution of program");    final JobSubmissionResult result = client.run(program, parallelism);    ......  }
复制代码


通过 client run 方法来执行,最终调用我们传入的主方法( 通过 -c 参数),然后就开始执行用户代码了,首先会构建 StreamGraph ,最终调用 StreamContextEnvironment execute(String jobName) 方法


@Override  public JobExecutionResult execute(String jobName) throws Exception {        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph(); streamGraph.setJobName(jobName);
transformations.clear();
// execute the programs 存在 -d 时 if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { return ctx .getClient() .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings()) .getJobExecutionResult(); } }
复制代码


然后


public JobSubmissionResult run(FlinkPlan compiledPlan,      List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)      throws ProgramInvocationException {    // 构建 jobGraph    JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);    //将 job 提交至 cluster 上    return submitJob(job, classLoader);  }
复制代码


主要就是构建 jobGraph ,关于构建 jobGraph 的细节可以参考 如何构建 job ,构建成功之后就开始提交 job 了。我们以 MiniCluster 为例


@Override  public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {    final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);    ......  }
复制代码


public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {    final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();        // we have to allow queued scheduling in Flip-6 mode because we need to request slots    // from the ResourceManager    jobGraph.setAllowQueuedScheduling(true);        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);        // cache jars and files    final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture      .thenCombine(        dispatcherGatewayFuture,        // 这里真正 submit 操作,交给了 dispatcher 去执行        (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))      .thenCompose(Function.identity());        return acknowledgeCompletableFuture.thenApply(      (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));  }
复制代码


接下来就到了 job 正式运行的时候了


private CompletableFuture<Void> runJob(JobGraph jobGraph) {    Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
//创建 jobManagerRunner 同时也会创建 jobMaster,在创建 JobMaster 的时候构建了 ExecutionGraph final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
// start jobManagerRunner 同时也启动了 jobMaster 等一系列 service,然后就开始调度 executionGraph,execution.deploy task.start return jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)) .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }
复制代码


这部分内容与 如何构建 Job 是一致的,省略若干,具体可以参考 如何构建 job ,需要强调一点就是当 执行到 ExecutionGraph 的 scheduleForExecution 方法时


// 调度 execution  public void scheduleForExecution() throws JobException {
assertRunningInJobMasterMainThread();
final long currentGlobalModVersion = globalModVersion;
// 会启动 startCheckpointScheduler if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { ...... }
复制代码


会启动 CheckpointScheduler 从而开始出发 checkpoint。接下来就开始部署,可以参考 如何构建 job如何生成Task至此为止,从写代码到代码的计算执行,整个过程我们都已经学习清楚了。

总结


发布于: 2021 年 04 月 22 日阅读数: 17
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞定 Flink Job 的运行过程