写点什么

【实战】一招搞定 Shell 调度!DolphinScheduler+ProcessBuilder 超详细教程

作者:白鲸开源
  • 2025-04-28
    天津
  • 本文字数:5784 字

    阅读完需:约 19 分钟

【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程

本文将介绍在 DolphinScheduler 中使用ProcessBuilder执行 Shell 命令的方法。默认通过BashShellInterceptorBuilder封装 Shell 脚本并生成执行命令,支持普通模式和 sudo 模式运行。同时,结合 Spring Boot 应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对 Shell 任务的高效管理和调度。

1、ProcessBuilder DolphinScheduler 中的使用

1.1、命令的封装

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory


public class ShellInterceptorBuilderFactory {
private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
@SuppressWarnings("unchecked") public static IShellInterceptorBuilder newBuilder() { // TODO 默认的走的是这个逻辑 if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) { return new BashShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) { return new ShShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) { return new CmdShellInterceptorBuilder(); } throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE); }}
复制代码


默认走的是 BashShellInterceptorBuilder


org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder


public class BashShellInterceptorBuilder        extends            BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
@Override public BashShellInterceptorBuilder newBuilder() { return new BashShellInterceptorBuilder(); }
@Override public BashShellInterceptor build() throws FileOperateException, IOException { // TODO 这里是生成shell脚本的核心点 generateShellScript(); List<String> bootstrapCommand = generateBootstrapCommand(); // TODO 实例化BashShellInterceptor return new BashShellInterceptor(bootstrapCommand, shellDirectory); }
// 这个是如果不是sudo的方式,进行命令执行的前缀 @Override protected String shellInterpreter() { return "bash"; }
@Override protected String shellExtension() { return ".sh"; }
@Override protected String shellHeader() { return "#!/bin/bash"; }}
复制代码


org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand


protected List<String> generateBootstrapCommand() {        if (sudoEnable) {            // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh            return bootstrapCommandInSudoMode();        }        // TODO bash /opt/xx.sh        return bootstrapCommandInNormalMode();    }
复制代码


bootstrapCommandInSudoMode():


private List<String> 

bootstrapCommandInSudoMode() { if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) { return bootstrapCommandInResourceLimitMode(); } List<String> bootstrapCommand = new ArrayList<>(); bootstrapCommand.add("sudo"); if (StringUtils.isNotBlank(runUser)) { bootstrapCommand.add("-u"); bootstrapCommand.add(runUser); } bootstrapCommand.add("-i"); bootstrapCommand.add(shellAbsolutePath().toString()); return bootstrapCommand; }
复制代码


bootstrapCommandInNormalMode():


private List<String> bootstrapCommandInNormalMode() {        List<String> bootstrapCommand = new ArrayList<>();        bootstrapCommand.add(shellInterpreter());        bootstrapCommand.add(shellAbsolutePath().toString());        return bootstrapCommand;    }
复制代码

1.2、命令的执行

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor


public abstract class BaseShellInterceptor implements IShellInterceptor {
protected final String workingDirectory; protected final List<String> executeCommands;
protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) { this.executeCommands = executeCommands; this.workingDirectory = workingDirectory; }
@Override public Process execute() throws IOException { // init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的 processBuilder.directory(new File(workingDirectory)); // merge error information to standard output stream processBuilder.redirectErrorStream(true); processBuilder.command(executeCommands); log.info("Executing shell command : {}", String.join(" ", executeCommands)); return processBuilder.start(); }}
复制代码

2、最佳实践实例

2.1、pom.xml 配置

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter</artifactId>  <version>2.6.1</version></dependency>
复制代码

2.2、pom.xml 配置

@SpringBootApplicationpublic class Application {
public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args);
List<String> executeCommands = new ArrayList<>(); executeCommands.add("sudo"); executeCommands.add("-u"); executeCommands.add("qiaozhanwei"); executeCommands.add("-i"); executeCommands.add("/opt/test/my.sh");

ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的 processBuilder.directory(new File("/opt/test")); // merge error information to standard output stream processBuilder.redirectErrorStream(true); processBuilder.command(executeCommands); Process process = processBuilder.start();
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = inReader.readLine()) != null) { // TODO 终端日志输出 System.out.println(line); } } catch (Exception e) { e.printStackTrace(); }

// TODO 等10分钟,如果10分钟不结束,返回且status为false boolean status = process.waitFor(10, TimeUnit.MINUTES);
System.out.println("status ->" + status); }}
复制代码

2.3、日志输出结果

  .   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::                (v2.6.1)
2024-06-15 18:33:16.090 INFO 31834 --- [ main] com.journey.test.Application : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)2024-06-15 18:33:16.091 INFO 31834 --- [ main] com.journey.test.Application : No active profile set, falling back to default profiles: default2024-06-15 18:33:16.244 INFO 31834 --- [ main] com.journey.test.Application : Started Application in 0.252 seconds (JVM running for 0.42)Number of Maps = 1Samples per Map = 1000002024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWrote input for Map #0Starting Job2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:80322024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_09312024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 12024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:12024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_09312024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_09312024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_09312024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false2024-06-15 18:33:24,978 INFO mapreduce.Job: map 0% reduce 0%2024-06-15 18:33:29,153 INFO mapreduce.Job: map 100% reduce 0%2024-06-15 18:33:34,384 INFO mapreduce.Job: map 100% reduce 100%2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=28 FILE: Number of bytes written=548863 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=278 HDFS: Number of bytes written=215 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=3 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=37968 Total time spent by all reduces in occupied slots (ms)=79360 Total time spent by all map tasks (ms)=2373 Total time spent by all reduce tasks (ms)=2480 Total vcore-milliseconds taken by all map tasks=2373 Total vcore-milliseconds taken by all reduce tasks=2480 Total megabyte-milliseconds taken by all map tasks=4859904 Total megabyte-milliseconds taken by all reduce tasks=10158080 Map-Reduce Framework Map input records=1 Map output records=2 Map output bytes=18 Map output materialized bytes=28 Input split bytes=160 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=28 Reduce input records=2 Reduce output records=0 Spilled Records=4 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=87 CPU time spent (ms)=1420 Physical memory (bytes) snapshot=870387712 Virtual memory (bytes) snapshot=9336647680 Total committed heap usage (bytes)=2716860416 Peak Map Physical memory (bytes)=457416704 Peak Map Virtual memory (bytes)=3773362176 Peak Reduce Physical memory (bytes)=412971008 Peak Reduce Virtual memory (bytes)=5563285504 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=118 File Output Format Counters Bytes Written=97Job Finished in 17.292 secondsEstimated value of Pi is 3.14120000000000000000status ->true
Process finished with exit code 0
复制代码


转载自 Journey

原文链接:https://segmentfault.com/a/1190000044966157

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程_大数据_白鲸开源_InfoQ写作社区