写点什么

Apache Airflow 工作流管理平台

作者:qife
  • 2025-07-22
    福建
  • 本文字数:1911 字

    阅读完需:约 6 分钟

Apache Airflow 工作流管理平台

项目标题与描述

Apache Airflow 是一个由社区创建的工作流自动化调度和监控平台,采用 Python 编写。项目核心价值在于:


  • 通过代码定义、调度和监控复杂的工作流

  • 提供可视化界面管理任务依赖关系和执行状态

  • 支持丰富的执行器和集成选项

  • 可扩展的插件体系结构


当前版本:3.1.0

功能特性

核心功能

  • DAG 定义:使用 Python 代码定义有向无环图(DAG)工作流

  • 任务调度:基于时间或外部触发器的灵活调度机制

  • 任务依赖:可视化任务依赖关系管理

  • 执行监控:实时监控任务执行状态和日志

  • 插件系统:支持自定义操作符、传感器和钩子

  • 分布式执行:支持 Celery、Kubernetes 等执行器

  • REST API:提供完整的 API 接口管理平台功能

  • 安全控制:基于角色的访问控制(RBAC)和 JWT 认证

独特价值

  • 代码即配置:工作流通过 Python 代码定义,支持版本控制

  • 丰富的 UI:提供任务树、甘特图、代码查看等多种可视化工具

  • 扩展性强:通过插件机制可轻松集成各类服务

安装指南

基础安装

推荐使用 Python 3.10+环境:


# 使用pyenv管理Python版本pyenv install 3.11.9pyenv global 3.11.9
# 创建虚拟环境python -m venv airflow_venvsource airflow_venv/bin/activate
# 安装Airflowpip install apache-airflow==3.0.0 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.11.txt"
复制代码

配置环境

# 设置Airflow主目录export AIRFLOW_HOME=~/airflow
# 初始化数据库airflow db init
# 创建管理员用户airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com
复制代码

启动服务

# 启动所有组件(开发模式)airflow standalone
# 访问Web UIhttp://localhost:8080
复制代码

使用说明

定义 DAG 示例

from datetime import datetimefrom airflow import DAGfrom airflow.operators.bash import BashOperator
with DAG( dag_id="example_dag", schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False,) as dag: task1 = BashOperator( task_id="print_date", bash_command="date", ) task2 = BashOperator( task_id="sleep", bash_command="sleep 5", ) task1 >> task2 # 定义任务依赖
复制代码

REST API 使用

触发 DAG 运行:


import requests
response = requests.post( "http://localhost:8080/api/v1/dags/example_dag/dagRuns", json={ "logical_date": "2023-01-01T00:00:00Z", "conf": {"key": "value"}, }, headers={"Authorization": "Bearer <JWT_TOKEN>"})
复制代码

核心代码

任务调度核心逻辑

class DagRun(models.Model):    """DAG运行实例模型"""        dag_id = models.CharField(max_length=250)    execution_date = models.DateTimeField()    state = models.CharField(max_length=50)    run_id = models.CharField(max_length=250)    external_trigger = models.BooleanField()    conf = models.TextField()  # 存储为JSON        def update_state(self):        """更新DAG运行状态"""        tis = TaskInstance.objects.filter(dag_run=self)        states = [ti.state for ti in tis]                if all(s == State.SUCCESS for s in states):            self.state = State.SUCCESS        elif any(s == State.FAILED for s in states):            self.state = State.FAILED        elif any(s == State.RUNNING for s in states):            self.state = State.RUNNING        else:            self.state = State.QUEUED                    self.save()
复制代码

REST API 认证中间件

class JWTBearer(HTTPBearer):    """JWT认证中间件"""        def __init__(self, **kwargs):        super().__init__(**kwargs)        self.validator = JWTValidator(            required_claims={"aud", "exp", "iat"},
audience=conf.get_mandatory_list_value("execution_api", "jwt_audience"), ) async def __call__(self, request: Request): credentials = await super().__call__(request) try: payload = self.validator.validate(credentials.credentials) request.state.token = TIToken(id=uuid.UUID(payload["sub"]), claims=payload) except InvalidTokenError as e: raise HTTPException(status.HTTP_403_FORBIDDEN, str(e))
复制代码


更多精彩内容 请关注我的个人公众号 公众号(办公 AI 智能小助手)公众号二维码


办公AI智能小助手


用户头像

qife

关注

还未添加个人签名 2021-05-19 加入

还未添加个人简介

评论

发布
暂无评论
Apache Airflow 工作流管理平台_工作流_qife_InfoQ写作社区