Airflow 分布式集群搭建及测试
一、节点规划
二、airflow 集群搭建步骤
1、在所有节点安装 python3.7
参照单节点安装 Airflow 中安装 anconda 及 python3.7
2、在所有节点上安装 airflow
每台节点安装 airflow 需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib
复制代码
每台节点配置 airflow 环境变量
vim /etc/profile
export AIRFLOW_HOME=/root/airflow
#使配置的环境变量生效
source /etc/profile
复制代码
每台节点切换 airflow 环境,安装 airflow,指定版本为 2.1.3
(python37) conda activate python37
(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
复制代码
默认 Airflow 安装在 $ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow 目录下。配置了 AIRFLOW_HOME,Airflow 安装后文件存储目录在 AIRFLOW_HOME 目录下。可以每台节点查看安装 Airflow 版本信息:
(python37) airflow version
2.1.3
复制代码
在 Mysql 中创建对应的库并设置参数
aiflow 使用的 Metadata database 我们这里使用 mysql,在 node2 节点的 mysql 中创建 airflow 使用的库及表信息。
CREATE DATABASE airflow CHARACTER SET utf8;
create user 'airflow'@'%' identified by '123456';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
复制代码
在 mysql 安装节点 node2 上修改”/etc/my.cnf”,在 mysqld 下添加如下内容:
[mysqld]
explicit_defaults_for_timestamp=1
复制代码
以上修改完成“my.cnf”值后,重启 Mysql 即可,重启之后,可以查询对应的参数是否生效:
#重启mysql
[root@node2 ~]# service mysqld restart
#重新登录mysql查询
mysql> show variables like 'explicit_defaults_for_timestamp';
复制代码
每台节点配置 Airflow airflow.cfg 文件
修改 AIRFLOW_HOME/airflow.cfg 文件,确保所有机器使用同一份配置文件,在 node1 节点上配置 airflow.cfg,配置如下:
[core]
dags_folder = /root/airflow/dags
#修改时区
default_timezone = Asia/Shanghai
#配置Executor类型,集群建议配置CeleryExecutor
executor = CeleryExecutor
# 配置数据库
sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8
[webserver]
#设置时区
default_ui_timezone = Asia/Shanghai
[celery]
#配置Celery broker使用的消息队列
broker_url = redis://node4:6379/0
#配置Celery broker任务完成后状态更新使用库
result_backend = db+mysql://root:123456@node2:3306/airflow
复制代码
将 node1 节点配置好的 airflow.cfg 发送到 node2、node3、node4 节点上:
(python37) [root@node1 airflow]# scp ./airflow.cfg node2:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg node3:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg node4:`pwd`
复制代码
三、初始化 Airflow
1、每台节点安装需要的 python 依赖包
初始化 Airflow 数据库时需要使用到连接 mysql 的包,执行如下命令来安装 mysql 对应的 python 包。
(python37) # pip install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple
复制代码
2、在 node1 上初始化 Airflow 数据库
(python37) [root@node1 airflow]# airflow db init
复制代码
初始化之后在 MySQL airflow 库下会生成对应的表。
四、创建管理员用户信息
在 node1 节点上执行如下命令,创建操作 Airflow 的用户信息:
airflow users create \
--username airflow \
--firstname airflow \
--lastname airflow \
--role Admin \
--email xx@qq.com
复制代码
执行完成之后,设置密码为“123456”并确认,完成 Airflow 管理员信息创建。
五、配置 Scheduler HA
1、下载 failover 组件
登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的 zip 包上传到 node1 “/software”目录下。
在 node1 节点安装 unzip,并解压 failover 组件:
(python37) [root@node1 software]# yum -y install unzip
(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip
复制代码
2、使用 pip 进行安装 failover 需要的依赖包
需要在 node1 节点上安装 failover 需要的依赖包。
(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master
(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .
复制代码
3、node1 节点初始化 failover
(python37) [root@node1 ~]# scheduler_failover_controller init
Adding Scheduler Failover configs to Airflow config file...
Finished adding Scheduler Failover configs to Airflow config file.
Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.
复制代码
注意:初始化 airflow 时,会向 airflow.cfg 配置中追加配置,因此需要先安装 airflow 并初始化。
4、修改 airflow.cfg
首先修改 node1 节点的 AIRFLOW_HOME/airflow.cfg
[scheduler_failover]
# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密
scheduler_nodes_in_cluster = node1,node2
#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动
airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
复制代码
配置完成后,可以通过以下命令进行验证 Airflow Master 节点:
(python37) [root@node1 airflow]# scheduler_failover_controller test_connection
Testing Connection for host 'node1'
(True, ['Connection Succeeded', ''])
Testing Connection for host 'node2'
(True, ['Connection Succeeded\n'])
复制代码
将 node1 节点配置好的 airflow.cfg 同步发送到 node2、node3、node4 节点上:
(python37) [root@node1 ~]# cd /root/airflow/
(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`
复制代码
六、启动 Airflow 集群
1、在所有节点安装启动 Airflow 依赖的 python 包
(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
复制代码
2、在 Master1 节点(node1)启动相应进程
#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程
airflow webserver
airflow scheduler
复制代码
3、在 Master2 节点(node2)启动相应进程
4、在 Worker1(node3)、Worker2(node4)节点启动 Worker
在 node3、node4 节点启动 Worker:
(python37) [root@node3 ~]# airflow celery worker
(python37) [root@node4 ~]# airflow celery worker
复制代码
5、在 node1 启动 Scheduler HA
(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &
复制代码
至此,Airflow 高可用集群搭建完成。
七、访问 Airflow 集群 WebUI
浏览器输入 node1:8080,查看 Airflow WebUI:
八、测试 Airflow HA
1、准备 shell 脚本
在 Airflow 集群所有节点{AIRFLOW_HOME}目录下创建 dags 目录,准备如下两个 shell 脚本,将以下两个脚本放在 $AIRFLOW_HOME/dags 目录下,BashOperator 默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp 目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。
first_shell.sh
#!/bin/bash
dt=$1
echo "==== execute first shell ===="
echo "---- first : time is ${dt}"
复制代码
second_shell.sh
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
复制代码
2、编写 airflow python 配置
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_shell_sh',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=BashOperator(
task_id='first',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag = dag
)
second=BashOperator(
task_id='second',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag=dag
)
first >> second
复制代码
将以上内容写入 execute_shell.py 文件,上传到所有 Airflow 节点{AIRFLOW_HOME}/dags 目录下。
3、重启 Airflow,进入 Airflow WebUI 查看对应的调度
重启 Airflow 之前首先在 node1 节点关闭 webserver ,Scheduler 进程,在 node2 节点关闭 webserver ,Scheduler 进程,在 node3,node4 节点上关闭 worker 进程。
如果各个进程是后台启动,查看后台进程方式:
(python37) [root@node1 dags]# ps aux |grep webserver
(python37) [root@node1 dags]# ps aux |grep scheduler
(python37) [root@node2 dags]# ps aux |grep webserver
(python37) [root@node2 dags]# ps aux |grep scheduler
(python37) [root@node3 ~]# ps aux|grep "celery worker"
(python37) [root@node4 ~]# ps aux|grep "celery worker"
找到对应的启动命令对应的进程号,进行kill。
复制代码
重启后进入 Airflow WebUI 查看任务:
点击“success”任务后,可以看到脚本执行成功日志:
4、测试 Airflow HA
当我们把 node1 节点的 websever 关闭后,可以直接通过 node2 节点访问 airflow webui:
在 node1 节点上,查找“scheduler”进程并 kill,测试 scheduler HA 是否生效:
(python37) [root@node1 ~]# ps aux|grep scheduler
root 23744 0.9 3.3 326940 63028 pts/2 S 00:08 0:02 airflow scheduler -- DagFileProcessorManager
#kill 掉scheduler进程
(python37) [root@node1 ~]# kill -9 23744
#访问webserver webui
复制代码
#在 node1 节点查看 scheduler_failover_controller 进程日志中有启动 schudler 动作,注意:这里是先从 node1 启动,启动不起来再从其他 Master 节点启动 Schduler。
评论