Airflow Operators 及案例
Airflow 中最重要的还是各种 Operator,其允许生成特定类型的任务,这个任务在实例化时称为 DAG 中的任务节点,所有的 Operator 均派生自 BaseOparator,并且继承了许多属性和方法。关于 BaseOperator 的参数可以参照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator
BaseOperator 中常用参数如下:
task_id(str) : 唯一 task_id 标记
owner(str):任务的所有者,建议使用 linux 用户名
email(str or liststr):出问题时,发送报警 Email 的地址,可以填写多个,用逗号隔开。
email_on_retry(bool):当任务重试时是否发送电子邮件
email_on_failure(bool):当任务执行失败时是否发送电子邮件
**retries(int):**在任务失败之前应该重试的次数
**retry_delay(datetime.timedelta):**重试间隔,必须是 timedelta 对象
**start_date(datetime.datetime):**DAG 开始执行时间,这个参数必须是 datetime 对象,不可以使用字符串。
end_date(datetime.datetime):DAG 运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。
**depends_on_past(bool,默认 False):**是否依赖于过去,如果为 True,那么必须之前的 DAG 调度成功了,现在的 DAG 调度才能执行。
**dag(airflow.models.DAG):**指定的 dag。
execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。
**trigger_rule(str):**定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。
一、BashOperator 及调度 Shell 命令及脚本
BashOperator 主要执行 bash 脚本或命令,BashOperator 参数如下:
bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)
复制代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'email':'kettle_test1@163.com', #pwd:kettle123456
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_shell_cmd',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
t1=BashOperator(
task_id='print_date',
bash_command='date',
dag = dag
)
t2=BashOperator(
task_id='print_helloworld',
bash_command='echo "hello world!"',
dag=dag
)
t3=BashOperator(
task_id='tempplated',
bash_command="""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ params.name}}"
echo "{{ params.age}}"
{% endfor %}
""",
params={'name':'wangwu','age':10},
dag=dag
)
t1 >> t2 >> t3
复制代码
注意在 t3 中使用了 Jinja 模板,“{% %}”内部是 for 标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.name 和 params.age 是自定义变量。
在 default_args 中的 email 是指当 DAG 执行失败时,发送邮件到指定邮箱,想要使用 airflow 发送邮件,需要在 $AIRFLOW_HOME/airflow.cfg 中配置如下内容:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5
复制代码
此外,关于邮箱的信息如下:
邮箱 1:kettle_test1@163.com password:kettle123456
邮箱 2:kettle_test2@163.com password:kettle123456
163 邮箱 SMTP 服务器地址: smtp.163.com 端口:25
配置 163 邮箱时需要开启“POP3/SMTP/IMAP 服务”服务,设置如下:
kettle_test1@163.com FECJJVEPGPTZJYMQ
kettle_test2@163.com VIOFSYMFDIKKIUEA
BashOperator 调度 Shell 脚本案例
准备如下两个 shell 脚本,将以下两个脚本放在 $AIRFLOW_HOME/dags 目录下,
BashOperator 默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp 目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。
first_shell.sh
#!/bin/bash
dt=$1
echo "==== execute first shell ===="
echo "---- first : time is ${dt}"
复制代码
second_shell.sh
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
复制代码
编写 airflow python 配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_shell_sh',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=BashOperator(
task_id='first',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag = dag
)
second=BashOperator(
task_id='second',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag=dag
)
first >> second
复制代码
执行结果:
特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:
二、SSHOperator 及调度远程 Shell 脚本
在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用 SSHOperator 来调用远程机器上的脚本任务。SSHOperator 使用 ssh 协议与远程主机通信,需要注意的是 SSHOperator 调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:
#Ubunto系统
. ~/.profile
#CentoOS或者RedHat系统
. ~/.bashrc
复制代码
关于 SSHOperator 参数详解可以参照:
airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation
SSHOperator 的常用参数如下:
ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。
remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。
复制代码
按照如下步骤来使用 SSHOperator 调度远程节点脚本:
1、安装“apache-airflow-providers-ssh ”provider package
首先停止 airflow webserver 与 scheduler,在 node4 节点切换到 python37 环境,安装 ssh Connection 包。另外,关于 Providers package 安装方式可以参照如下官网地址:
https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh
#切换Python37环境
[root@node4 ~]# conda activate python37
#安装ssh provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1
#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler
复制代码
2、配置 SSH Connection 连接
登录 airflow webui ,选择“Admin”->“Connections”:
点击“+”添加连接,这里 host 连接的是 node5 节点:
3、准备远程执行脚本
在 node5 节点/root 路径下创建 first_shell.sh,内容如下:
#!/bin/bash
echo "==== execute first shell ===="
复制代码
在 node3 节点/root 路径下创建 second_shell.sh,内容如下:
#!/bin/bash
echo "==== execute second shell ===="
复制代码
4、编写 DAG python 配置文件
注意在本地开发工具编写 python 配置时,需要用到 SSHOperator,需要在本地对应的 python 环境中安装对应的 provider package。
C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-ssh==2.1.1
复制代码
python 配置文件:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
default_args = {
'owner':'lisi',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_remote_shell',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=SSHOperator(
task_id='first',
ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
command='sh /root/first_shell.sh ',
dag = dag
)
second=SSHOperator(
task_id='second',
ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
command='sh /root/second_shell.sh ',
remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
dag=dag
)
first >> second
复制代码
5、调度 python 配置脚本
将以上配置好的 python 文件上传至 node4 节点 $AIRFLOW_HOME/dags 下,重启 Airflow websever 与 scheduler,登录 webui,开启调度:
调度结果如下:
三、HiveOperator 及调度 HQL
可以通过 HiveOperator 直接操作 Hive SQL ,HiveOperator 的参数如下:
hql(str):需要执行的Hive SQL。
hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。
复制代码
想要在 airflow 中使用 HiveOperator 调用 Hive 任务,首先需要安装以下依赖并配置 Hive Metastore:
#切换Python37环境
[root@node4 ~]# conda activate python37
#安装hive provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2
#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler
复制代码
登录 Airflow webui 并设置 Hive Metastore,登录后找到”Admin”->”Connections”,点击“+”新增配置:
1、启动 Hive,准备表
启动 HDFS、Hive Metastore,在 Hive 中创建以下三张表:
create table person_info(id int,name string,age int) row format delimited fields terminated by '\t';
create table score_info(id int,name string,score int) row format delimited fields terminated by '\t';
复制代码
向表 person_info 加载如下数据:
1 zs 18
2 ls 19
3 ww 20
向表 score_info 加载如下数据:
1 zs 100
2 ls 200
3 ww 300
2、在 node4 节点配置 Hive 客户端
由于 Airflow 使用 HiveOperator 时需要在 Airflow 安装节点上有 Hive 客户端,所以需要在 node4 节点上配置 Hive 客户端。
将 Hive 安装包上传至 node4 “/software”下解压,并配置 Hive 环境变量
#在/etc/profile文件最后配置Hive环境变量
export HIVE_HOME=/software/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin
#使环境变量生效
source /etc/profile
复制代码
修改 HIVE_HOME/conf/hive-site.xml ,写入如下内容:
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration>
复制代码
3、编写 DAG python 配置文件
注意在本地开发工具编写 python 配置时,需要用到 HiveOperator,需要在本地对应的 python 环境中安装对应的 provider package。
C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-apache-hive==2.0.2
注意:这里本地安装也有可能缺少对应的C++环境,我们也可以不安装,直接跳过也可以。
复制代码
Python 配置文件:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
default_args = {
'owner':'wangwu',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_hive_sql',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=HiveOperator(
task_id='person_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select id,name,age from person_info',
dag = dag
)
second=HiveOperator(
task_id='score_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select id,name,score from score_info',
dag=dag
)
third=HiveOperator(
task_id='join_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',
dag=dag
)
first >> second >>third
复制代码
4、调度 python 配置脚本
将以上配置好的 python 文件上传至 node4 节点 $AIRFLOW_HOME/dags 下,重启 Airflow websever 与 scheduler,登录 webui,开启调度:
调度结果如下:
四、PythonOperator
PythonOperator 可以调用 Python 函数,由于 Python 基本可以调用任何类型的任务,如果实在找不到合适的 Operator,将任务转为 Python 函数,使用 PythonOperator 即可。
关于 PythonOperator 常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
复制代码
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
print(a)
print(b)
print("hello airflow1")
# 返回的值只会打印到日志中
return{"sss1":"xxx1"}
def print__hello2(random_base):
print(random_base)
print("hello airflow2")
# 返回的值只会打印到日志中
return{"sss2":"xxx2"}
default_args = {
'owner':'maliu',
'start_date':datetime(2021, 10, 1),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_pythoncode',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=PythonOperator(
task_id='first',
#填写 print__hello1 方法时,不要加上“()”
python_callable=print__hello1,
# op_args 对应 print_hello1 方法中的a参数
op_args=[1,2,3,"hello","world"],
# op_kwargs 对应 print__hello1 方法中的b参数
op_kwargs={"id":"1","name":"zs","age":18},
dag = dag
)
second=PythonOperator(
task_id='second',
#填写 print__hello2 方法时,不要加上“()”
python_callable=print__hello2,
# random_base 参数对应 print_hello2 方法中参数“random_base”
op_kwargs={"random_base":random.randint(0,9)},
dag=dag
)
first >> second
复制代码
评论