在使用 Apache DolphinScheduler 编排任务的过程中,Shell 类型任务是最常见的任务类型之一。然而,很多用户在实际使用中都会遇到一个看似简单却常常引发问题的问题——环境变量怎么设置才有效?
如果你也曾经因为任务执行环境不一致、找不到命令路径、引用变量失败等问题而抓狂,那么这篇文章将为你拨开迷雾。本文将深入解析 DolphinScheduler 中 Shell 任务的环境变量设置机制,分享几种常见的配置方式、注意事项以及实战踩坑经验,帮助你高效、稳定地配置任务运行环境。
任务类型总结
注意 : 所谓的 SHELL 任务类型,都是对 SHELL 任务类型进行的封装,说白了底层调用的就是Java ProcessBudiler封装的 SHELL。
注意 : SQL 任务类型其实使用的就是各个 DB 驱动的 JDBC 进行的操作。
注意 : HTTP 任务类型其实就是访问其 OPEN API,进行HttpClient封装调用的操作。
注意 : 所谓的逻辑节点是虚拟任务,这类任务不会调度到 Worker 节点上去运行,只会在 Master 节点作为控制节点。
注意 : 其实就是调用各个任务的开放的 Client 进行任务的封装。
Shell 任务怎么配置环境变量呢?
因为可能涉及到一个组件的不同的版本的客户端,比如说 Spark2、Spark3。还有就是针对不同集群的不同客户端,比如说集群 1 的 Spark3 客户端和集群 2 的 Spark 客户端。 像这样的需求,怎么在 dolphinscheduler 中进行配置呢?或者说有几种配置方式呢?
两种方式 : 1、通过 task 不同的环境变量 2、默认的环境变量
1. 通过 task 不同的环境变量
安全中心 -> 环境管理
任务中引用
默认的环境变量
common.properties
# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profileshell.env_source_list=/etc/profile# The interceptor type of Shell task, e.g. bash, sh, cmdshell.interceptor.type=bash
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
public class ShellInterceptorBuilderFactory {
private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
@SuppressWarnings("unchecked") public static IShellInterceptorBuilder newBuilder() { // TODO 默认的走的是这个逻辑 if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) { return new BashShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) { return new ShShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) { return new CmdShellInterceptorBuilder(); } throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE); }
}
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder
public class BashShellInterceptorBuilder extends BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
@Override public BashShellInterceptorBuilder newBuilder() { return new BashShellInterceptorBuilder(); }
@Override public BashShellInterceptor build() throws FileOperateException, IOException { // TODO 这里是生成shell脚本的核心点 generateShellScript(); List<String> bootstrapCommand = generateBootstrapCommand(); // TODO 实例化BashShellInterceptor return new BashShellInterceptor(bootstrapCommand, shellDirectory); }
@Override protected String shellInterpreter() { return "bash"; }
@Override protected String shellExtension() { return ".sh"; }
@Override protected String shellHeader() { return "#!/bin/bash"; }
}
复制代码
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder, TaskCallBack taskCallBack) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); // todo: we need to use state like JDK Thread to make sure the killed task should not be executed iShellInterceptorBuilder = iShellInterceptorBuilder // TODO 设置执行路径 .shellDirectory(taskRequest.getExecutePath()) // TODO 这里设置shell 名字 .shellName(taskRequest.getTaskAppId());
// Set system env // TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv); }
// Set custom env // TODO 设置自定义的env if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { // TODO 向 customEnvScripts 中加入 iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig()); }
// Set k8s config (This is only work in Linux) if (taskRequest.getK8sTaskExecutionContext() != null) { iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml()); }
// Set sudo (This is only work in Linux) // TODO 设置sudo为true的模式 iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable()); // Set tenant (This is only work in Linux) // TODO 设置租户 iShellInterceptorBuilder.runUser(taskRequest.getTenantCode()); // Set CPU Quota (This is only work in Linux) if (taskRequest.getCpuQuota() != null) { iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota()); } // Set memory Quota (This is only work in Linux) if (taskRequest.getMemoryMax() != null) { iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax()); }
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build(); // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式 process = iShellInterceptor.execute();
// parse process output // TODO 这里解析到进程的输出 parseProcessOutput(this.process);
// collect pod log collectPodLogIfNeeded();
int processId = getProcessId(this.process);
result.setProcessId(processId);
// cache processId taskRequest.setProcessId(processId);
// print process id log.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly long remainTime = getRemainTime();
// update pid before waiting for the run to finish if (null != taskCallBack) { // TODO 这里其实就是更新任务实例西悉尼 taskCallBack.updateTaskInstanceInfo(taskInstanceId); }
// waiting for the run to finish boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
TaskExecutionStatus kubernetesStatus = ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
if (taskOutputFuture != null) { try { // Wait the task log process finished. taskOutputFuture.get(); } catch (ExecutionException e) { log.error("Handle task log error", e); } }
if (podLogOutputFuture != null) { try { // Wait kubernetes pod log collection finished podLogOutputFuture.get(); // delete pod after successful execution and log collection ProcessUtils.cancelApplication(taskRequest); } catch (ExecutionException e) { log.error("Handle pod log error", e); } }
// if SHELL task exit if (status && kubernetesStatus.isSuccess()) {
// SHELL task state result.setExitStatusCode(this.process.exitValue());
} else { log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); result.setExitStatusCode(EXIT_CODE_FAILURE); cancelApplication(); } int exitCode = this.process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result;
}
复制代码
重点就是:
// Set system env // TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv); }
// Set custom env // TODO 设置自定义的env if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { // TODO 向 customEnvScripts 中加入 iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig()); }
复制代码
其实就是说自定的环境变量是可以覆盖默认的环境变量的。
转载自 Journey
原文链接:https://segmentfault.com/a/1190000044954252
评论