写点什么

巧用输出变量,提升 Dolphinscheduler 工作流灵活性和可维护性

作者:白鲸开源
  • 2025-03-24
    天津
  • 本文字数:5053 字

    阅读完需:约 17 分钟

巧用输出变量,提升Dolphinscheduler工作流灵活性和可维护性

输出变量是 DolphinScheduler 任务调度中实现数据流动与任务协作的核心机制,通过显式定义和传递参数,解决了跨节点数据共享、优先级冲突等问题,同时支持复杂流程编排(如子流程、条件分支)。合理使用输出变量能显著提升工作流的灵活性和可维护性。本文将介绍 DolphinScheduler 中重要的输出变量及其使用方法,


Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`)使用


在 Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`) 各自有不同的作用和用法。理解它们的区别和用法对于编写和调试 Shell 脚本非常重要


1.1 单引号 (')


  • 用途:完全引用,用于保护字符串中的所有字符,不进行变量替换或命令替换

  • 特性:引号内的任何字符都将原样输出,不会进行任何解释

VAR="world"echo 'Hello, $VAR'  # 输出:Hello, $VAR
复制代码


1.2 双引号 (")


  • 用途:部分引用,用于保护字符串中的大部分字符,但允许变量替换和命令替换

  • 特性:引号内的变量和命令会被解释,其余字符原样输出

VAR="world"echo "Hello, $VAR"  # 输出:Hello, world
复制代码


1.3 反引号 (`)


  • 用途:命令替换,用于执行引号内的命令,并将命令的输出作为结果返回

  • 特性:引号内的命令会被执行,结果会替换反引号中的内容

  • 注意:反引号是旧的命令替换方式,更推荐使用 $()

DATE=`date`echo "Current date and time: $DATE"  # 输出当前日期和时间
复制代码


1.4 推荐使用 $() 进行命令替换


  • 更现代和可读性更好

  • 可以嵌套使用,而反引号嵌套使用比较困难

DATE=$(date)echo "Current date and time: $DATE"  # 输出当前日期和时间
# 嵌套命令替换示例OUTER=``(echo "Outer ``(echo "Inner")")echo $OUTER # 输出:Outer Inner
复制代码


1.5 使用场景

  • 单引号:当你不希望字符串中的任何内容被解释时使用,例如正则表达式、特殊字符等

  • 双引号:当你希望字符串中包含变量或命令替换时使用

  • 反引号/$():当你需要执行命令并使用其输出时使用


Dolphinscheduler 中的输出变量和文件传递


2.1 Shell 输出变量

  • 2.1.1、流程如下



  • 2.1.2、任务配置


  • taskA



echo 'taskA'echo "#{setValue(linesNum=${lines_num})}"echo '${setValue(words=20)}'
复制代码

注意 : 这里 ${lines_num}其实直接就是通过 Worker 进行变量进行替换的

taskB



echo 'taskB'echo ${linesNum}echo ${words}
复制代码
  • 2.1.3、结果输出

  • 主要看 taskB 的输出

}[INFO] 2024-07-05 10:09:54.539 +0800 - Success initialized task plugin instance successfully[INFO] 2024-07-05 10:09:54.539 +0800 - Set taskVarPool: [{"prop":"linesNum","direct":"IN","type":"VARCHAR","value":"100"},{"prop":"words","direct":"IN","type":"VARCHAR","value":"20"}] successfully[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************[INFO] 2024-07-05 10:09:54.539 +0800 - *********************************  Execute task instance  *************************************[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************[INFO] 2024-07-05 10:09:54.540 +0800 - Final Shell file is: [INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 10:09:54.540 +0800 - #!/bin/bashBASEDIR=$(cd `dirname $0`; pwd)cd $BASEDIRsource /etc/profileexport HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}export FLINK_HOME=/home/flink-1.18.1export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}export SEATUNNEL_HOME=/opt/software/seatunnelexport CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=``HADOOP_HOME/bin:``SPARK_HOME/bin:``PYTHON_HOME/bin:``JAVA_HOME/bin:``HIVE_HOME/bin:``FLINK_HOME/bin:``DATAX_HOME/bin:``SEATUNNEL_HOME/bin:``CHUNJUN_HOME/bin:``PATHecho 'taskB'echo 100echo 20[INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 10:09:54.540 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454/1961_1454.sh[INFO] 2024-07-05 10:09:54.544 +0800 - process start, process id is: 588336[INFO] 2024-07-05 10:09:56.544 +0800 - -> taskB 100 20[INFO] 2024-07-05 10:09:56.546 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454, processId:588336 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0
复制代码


2.2 Shell 文件传递

  • 2.2.1、流程如下



  • 2.2.2、任务配置

fileUploadTask



echo 'fileUploadTask'
mkdir -p data/test1 data/test2echo "test1 message" >> data/test1/text.txtecho "test2 message" >> data/test2/text.txt
tree.
复制代码

fileDownloadTask



echo 'fileDownloadTask'
cat input_dir/test1/text.txtcat input_dir/test2/text.txt
复制代码

2.2.3、结果输出

fileDownloadTask

[INFO] 2024-07-05 11:11:08.160 +0800 - Success initialized task plugin instance successfully[INFO] 2024-07-05 11:11:08.160 +0800 - Set taskVarPool: [{"prop":"fileUploadTask.file-text","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_text.txt"},{"prop":"fileUploadTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_data_ds_pack.zip"}] successfully[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************[INFO] 2024-07-05 11:11:08.160 +0800 - *********************************  Execute task instance  *************************************[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************[INFO] 2024-07-05 11:11:08.160 +0800 - Final Shell file is: [INFO] 2024-07-05 11:11:08.160 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 11:11:08.160 +0800 - #!/bin/bashBASEDIR=$(cd `dirname $0`; pwd)cd $BASEDIRsource /etc/profileexport HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}export FLINK_HOME=/home/flink-1.18.1export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}export SEATUNNEL_HOME=/opt/software/seatunnelexport CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=``HADOOP_HOME/bin:``SPARK_HOME/bin:``PYTHON_HOME/bin:``JAVA_HOME/bin:``HIVE_HOME/bin:``FLINK_HOME/bin:``DATAX_HOME/bin:``SEATUNNEL_HOME/bin:``CHUNJUN_HOME/bin:``PATHecho 'fileDownloadTask'
cat input_dir/test1/text.txtcat input_dir/test2/text.txt[INFO] 2024-07-05 11:11:08.161 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 11:11:08.161 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456/1962_1456.sh[INFO] 2024-07-05 11:11:08.164 +0800 - process start, process id is: 590323[INFO] 2024-07-05 11:11:10.164 +0800 - -> fileDownloadTask test1 message test2 message[INFO] 2024-07-05 11:11:10.166 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456, processId:590323 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0
复制代码


2.3 SQL 任务输出变量

模式从 SqlTask 任务进行结果的输出,ShellTask 对结果使用

2.3.1、流程如下



  • 2.3.2、任务配置

    sqlOutVarTask



select user_name as userNameList from t_ds_user
复制代码

readOutVarTask



echo 'readOutVarTask'echo ${userNameList}
复制代码
  • 2.3.3、结果输出

  • readOutVarTask

[INFO] 2024-07-05 11:19:00.294 +0800 - Success initialized task plugin instance successfully[INFO] 2024-07-05 11:19:00.294 +0800 - Set taskVarPool: [{"prop":"userNameList","direct":"IN","type":"LIST","value":"[\"admin\",\"qiaozhanwei\",\"test\"]"}] successfully[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************[INFO] 2024-07-05 11:19:00.294 +0800 - *********************************  Execute task instance  *************************************[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************[INFO] 2024-07-05 11:19:00.295 +0800 - Final Shell file is: [INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 11:19:00.295 +0800 - #!/bin/bashBASEDIR=$(cd `dirname $0`; pwd)cd $BASEDIRsource /etc/profileexport HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}export FLINK_HOME=/home/flink-1.18.1export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}export SEATUNNEL_HOME=/opt/software/seatunnelexport CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=``HADOOP_HOME/bin:``SPARK_HOME/bin:``PYTHON_HOME/bin:``JAVA_HOME/bin:``HIVE_HOME/bin:``FLINK_HOME/bin:``DATAX_HOME/bin:``SEATUNNEL_HOME/bin:``CHUNJUN_HOME/bin:``PATHecho 'readOutVarTask'echo ["admin","qiaozhanwei","test"][INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************[INFO] 2024-07-05 11:19:00.295 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458/1963_1458.sh[INFO] 2024-07-05 11:19:00.299 +0800 - process start, process id is: 590781[INFO] 2024-07-05 11:19:02.299 +0800 - -> readOutVarTask [admin,qiaozhanwei,test][INFO] 2024-07-05 11:19:02.301 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458, processId:590781 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0
复制代码

以上就是今天的全部内容了,希望对大家更好地使用 DolphinScheduler 有所帮助。

转载自 Journey

原文链接:https://segmentfault.com/a/1190000045035406

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
巧用输出变量,提升Dolphinscheduler工作流灵活性和可维护性_开源_白鲸开源_InfoQ写作社区