大数据使用 Airflow 实现简单的工作流调度
- 2022 年 3 月 17 日
本文字数:5422 字
阅读完需:约 18 分钟
Airflow 是一个以编程方式编写,安排和监视工作流的平台。
使用 Airflow 将实用工作流任务编写的有向无环图(DAG)。一个流程计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的用户使查看生产运行的管道问题,监视中的故障以及正在显示的故障时需要对进行解决改变的容易。
1、编写 Dag 任务脚本
1. 启动阿里云服务器,并启动 hadoop 集群。
2. 集群集群节点间 ssh 免密登录。
[root@airflowairflow]# vim /etc/hosts172.26.16.78airflow airflow172.26.16.41hadoop101 hadoop101172.26.16.39hadoop102 hadoop102172.26.16.40hadoop103 hadoop103 [root@airflow~]# ssh-keygen -t rsa[root@airflow~] # ssh-copy-id hadoop101[root@airflow~]# ssh-copy-id hadoop102[root@airflow~]# ssh-copy-id hadoop103
3.创建 work-py 目录下写 python 脚本,编写.py
[root@airflow~]# mkdir -p /opt/module/work-py[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/ usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date ':datetime(2020,12,15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重试=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--队列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)
脚本解读:
default_args 设置默认参数
depends_on_past 是否开启任务依赖
schedule_interval 调度频率
重试次数
start_date 开始时间
Bash 操作者任务,如果为真为假指定执行一个必须成功完成的任务,则执行是否成功完成。
task_id 任务唯一标识(必填)
bash_command 具体任务执行命令
如 set_upstream 设置依赖上图展示广告任务依赖 dws 任务依赖 dwd 任务
注意:
必须导包
从气流导入 DAG
从 airflow.operators.bash_operator importBashOperator
4.配置 JDK
注意:ssh 的目标机(hadoop002) /etc/bashrc 里必须配置 java 环境变量,配置完后源码。
(python3)[root@airflow work-py]# vim /etc/bashrc(python3)[root@airflow work-py]# source /etc/bashrc

5. 查看 Airflow,获取 dag 文件存放目录文件
(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg

6.按照配置文件中配置的路径,创建 dd 文件存放目录,将.py 脚本调用此目录。
(python3)[root@airflow work-py]# mkdir ~/airflow/dags(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
7.等待,刷新任务列表,可以看到列表中,出现测试任务。
(python3)[root@airflow work-py]#airflow list_dags------------------------------------ --------------------------------------------DAGS------------------ -------------------------------------------------example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_operatorexample_sub_dagexample .section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial
8.刷新 Airflow 的网页页面,已经出现测试任务。

9. 点击运行测试任务。

10.点击成功任务,查看日志。




11.查看 dag 图,甘特图。



12.查看脚本代码。

2、Dag 任务操作
1. 删除 dag 任务。

2.通过执行以下命令,可以重新添加 dag 任务。
(python3)[root@airflow work-py]# airflow list_tasks test --treeThe'list_tasks' 命令在 Airflow 2.0 中已弃用并删除,请改用'tasks list' [2020-12-1511:17:08,981] {__init__ .py:50} INFO - 使用执行器 SequentialExecutor[2020-12-1511:17:08,982] {dagbag.py:417} INFO - 从 /root/airflow/dags<Task(BashOperator):dwd> 填充 DagBag任务(BashOperator):dws> <任务(BashOperator):广告>
3.查看当前所有 dag 任务,可以回来查看测试任务被重新添加了。
(python3)[root@airflow work-py]#(python3)[root@airflow work-py]#airflow list_dags 'list_dags' 命令在 Airflow 2.0 中已弃用并删除,请使用 'dags list' 或 'dags report'相反[2020-12-1511:33:57,106] {__init__.py:50} 信息 - 使用执行器 SequentialExecutor [2020-12-1511:33:57,106] {dagbag.py:417} 信息 - 从 /root/airflow/dags 填充 DagBag ------------------------------------ ------------------DAGS-- -------------------------------------------------- example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_subdag_operatorexample_subdag_operator.section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial
4.重新添加的 dag 任务。

3、配置邮件服务器
1.首先确保所有邮箱已经开启 SMTP 服务。

2. 修改气流配置文件,如下:
(Python3)[root @ airflow work-py] #vim〜/ airflow / airflow.cfgsmtp_host = smtp.qq.comsmtp_starttls = truesmtp_ssl = falsesmtp_user = 2473196869@qq.com#smtp_user = smtp_password = wjmfbxkfvypdebeg#smtp_password = smtp_port = 587smtp_mail_from = 2473196869 @qq.com
3.重启气流。
(python3)[root@airflow 气流]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15(python3)[root@airflow 气流]# ps -ef |grep 气流根 745 1 0 09:50 ?00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0root 7875 1851 0 12:51 分/1 00:00:00 grep --color=auto 气流(python3)[root@airflow 气流]# kill -15 745 (python3)[root@airflow 气流]# 气流网络服务器 -p 8080 -D (python3)[root @airflow 气流]# 气流调度器 -D
4.重新编辑 test.py 脚本文件,并替换。
[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromairflow.operators.email_operator import EmailOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date':datetime(2020,12,15), ' email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight' : 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重试=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--队列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to= "2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1 )t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com ", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com ", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc=" chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc=" chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /
5.查看页面是否有效。


6. 运行测试,查看运行情况和邮件。



关键词:大数据培训

编程江湖
IT技术分享 2021.11.23 加入
关注【IT云文化】微信公众号,获取学习资源
评论