写点什么

大数据使用 Airflow 实现简单的工作流调度

作者:编程江湖
  • 2022 年 3 月 17 日
  • 本文字数:5422 字

    阅读完需:约 18 分钟

Airflow 是一个以编程方式编写,安排和监视工作流的平台。

使用 Airflow 将实用工作流任务编写的有向无环图(DAG)。一个流程计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的用户使查看生产运行的管道问题,监视中的故障以及正在显示的故障时需要对进行解决改变的容易。

1、编写 Dag 任务脚本

1. 启动阿里云服务器,并启动 hadoop 集群。

2. 集群集群节点间 ssh 免密登录。

[root@airflowairflow]# vim /etc/hosts172.26.16.78airflow airflow172.26.16.41hadoop101 hadoop101172.26.16.39hadoop102 hadoop102172.26.16.40hadoop103 hadoop103 [root@airflow~]# ssh-keygen -t rsa[root@airflow~] # ssh-copy-id hadoop101[root@airflow~]# ssh-copy-id hadoop102[root@airflow~]# ssh-copy-id hadoop103
复制代码

3.创建 work-py 目录下写 python 脚本,编写.py

[root@airflow~]# mkdir -p /opt/module/work-py[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/ usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date ':datetime(2020,12,15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重试=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--队列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3, dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)
复制代码

脚本解读:

default_args 设置默认参数

depends_on_past 是否开启任务依赖

schedule_interval 调度频率

重试次数

start_date 开始时间

Bash 操作者任务,如果为真为假指定执行一个必须成功完成的任务,则执行是否成功完成。

task_id 任务唯一标识(必填)

bash_command 具体任务执行命令

如 set_upstream 设置依赖上图展示广告任务依赖 dws 任务依赖 dwd 任务


注意:

必须导包

从气流导入 DAG

从 airflow.operators.bash_operator importBashOperator


4.配置 JDK

注意:ssh 的目标机(hadoop002) /etc/bashrc 里必须配置 java 环境变量,配置完后源码。

(python3)[root@airflow work-py]# vim /etc/bashrc(python3)[root@airflow work-py]# source /etc/bashrc
复制代码



5. 查看 Airflow,获取 dag 文件存放目录文件


(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
复制代码



6.按照配置文件中配置的路径,创建 dd 文件存放目录,将.py 脚本调用此目录。

(python3)[root@airflow work-py]# mkdir ~/airflow/dags(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
复制代码

7.等待,刷新任务列表,可以看到列表中,出现测试任务。

(python3)[root@airflow work-py]#airflow list_dags------------------------------------ --------------------------------------------DAGS------------------ -------------------------------------------------example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_operatorexample_sub_dagexample .section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial
复制代码

8.刷新 Airflow 的网页页面,已经出现测试任务。



9. 点击运行测试任务。



10.点击成功任务,查看日志。






11.查看 dag 图,甘特图。





12.查看脚本代码。



2、Dag 任务操作

1. 删除 dag 任务。



2.通过执行以下命令,可以重新添加 dag 任务。

(python3)[root@airflow work-py]# airflow list_tasks test --treeThe'list_tasks' 命令在 Airflow 2.0 中已弃用并删除,请改用'tasks list' [2020-12-1511:17:08,981] {__init__ .py:50} INFO - 使用执行器 SequentialExecutor[2020-12-1511:17:08,982] {dagbag.py:417} INFO - 从 /root/airflow/dags<Task(BashOperator):dwd> 填充 DagBag任务(BashOperator):dws> <任务(BashOperator):广告>
复制代码

3.查看当前所有 dag 任务,可以回来查看测试任务被重新添加了。

(python3)[root@airflow work-py]#(python3)[root@airflow work-py]#airflow list_dags 'list_dags' 命令在 Airflow 2.0 中已弃用并删除,请使用 'dags list' 或 'dags report'相反[2020-12-1511:33:57,106] {__init__.py:50} 信息 - 使用执行器 SequentialExecutor [2020-12-1511:33:57,106] {dagbag.py:417} 信息 - 从 /root/airflow/dags 填充 DagBag ------------------------------------ ------------------DAGS-- -------------------------------------------------- example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_complexexample_external_task_marker_childexample_external_task_marker_parentexample_http_operatorexample_kubernetes_executor_configexample_nested_branch_dagexample_passing_params_via_test_commandexample_pig_operatorexample_python_operatorexample_short_circuit_operatorexample_skip_dagexample_subdag_operatorexample_subdag_operator.section-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorialsection-1example_subdag_operator.section-2example_trigger_controller_dagexample_trigger_target_dagexample_xcomlatest_onlylatest_only_with_triggertesttest_utilstutorial
复制代码


4.重新添加的 dag 任务。



3、配置邮件服务器

1.首先确保所有邮箱已经开启 SMTP 服务。



2. 修改气流配置文件,如下:

(Python3)[root @ airflow work-py] #vim〜/ airflow / airflow.cfgsmtp_host = smtp.qq.comsmtp_starttls = truesmtp_ssl = falsesmtp_user = 2473196869@qq.com#smtp_user = smtp_password = wjmfbxkfvypdebeg#smtp_password = smtp_port = 587smtp_mail_from = 2473196869 @qq.com
复制代码

3.重启气流。

(python3)[root@airflow 气流]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15(python3)[root@airflow 气流]# ps -ef |grep 气流根 745 1 0 09:50 ?00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dh​​client/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0root 7875 1851 0 12:51 分/1 00:00:00 grep --color=auto 气流(python3)[root@airflow 气流]# kill -15 745 (python3)[root@airflow 气流]# 气流网络服务器 -p 8080 -D (python3)[root @airflow 气流]# 气流调度器 -D
复制代码

4.重新编辑 test.py 脚本文件,并替换。

[root@airflow~]# cd /opt/module/work-py/[root@airflowwork-py]# vim test.py #!/usr/bin/pythonfromairflow import DAGfromairflow.operators.bash_operator import BashOperatorfromairflow.operators.email_operator import EmailOperatorfromdatetime import datetime, timedelta default_args= { 'owner': 'test_owner', 'depends_on_past': True, 'email': ['2473196869@qq.com'], 'start_date':datetime(2020,12,15), ' email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight' : 10, # 'end_date': datetime(2016, 1, 1),}dag =DAG('test', default_args=default_args,schedule_interval=timedelta(days=1)) t1 =BashOperator(task_id='dwd', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',重试=3, dag=dag) t2 =BashOperator(task_id='dws', bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 -- executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3 , dag=dag) t3 =BashOperator(task_id='ads',bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu。 member.controller.AdsMemberController--队列火花 /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to= "2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1 )t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com ", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController- -queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"', retries=3,dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com ", subject="test-subject", html_content="<h1>test-content</h1>", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2 )email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc=" chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /dag=dag) email=EmailOperator( task_id="email", to="2473196869@qq.com", subject="test-subject", html_content="<h1>test-content</h1>", cc=" chaosong@qq.com", dag=dag) t2.set_upstream(t1)t3.set_upstream(t2)email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags /
复制代码

5.查看页面是否有效。




6. 运行测试,查看运行情况和邮件。





关键词:大数据培训

用户头像

编程江湖

关注

IT技术分享 2021.11.23 加入

关注【IT云文化】微信公众号,获取学习资源

评论

发布
暂无评论
大数据使用Airflow实现简单的工作流调度_编程江湖_InfoQ写作平台