workflow 之 prefect 基本用法(qbit)
作者:qbit
- 2022 年 1 月 13 日
本文字数:4034 字
阅读完需:约 13 分钟
前言
软件版本
Python 3.8
poetry 1.1.7
prefect 0.15.12
复制代码
poetry github
:https://github.com/python-poetry/poetryprefect
github
:https://github.com/PrefectHQ/prefect
安装
用
poetry
初始化项目后在pyproject.toml
添加以下依赖,然后运行poetry update -vvv
[tool.poetry.dependencies]
python = "^3.8"
prefect = "~0.15.13"
复制代码
# Linux
export PREFECT__FLOWS__CHECKPOINTING=true
# Windows powershell
$env:PREFECT__FLOWS__CHECKPOINTING="true"
# Windows cmd(注意行尾不要有空格)
set PREFECT__FLOWS__CHECKPOINTING=true
复制代码
测试代码
test_prefect.py
# encoding: utf-8
# author: qbit
# date: 2022-01-12
# summary: 测试 prefect,加减乘除
import os
import sys
import shutil
import prefect
from prefect import task, Flow
from prefect.engine.results import LocalResult
logger = prefect.context.get("logger")
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
cur_filename = os.path.basename(__file__)
dirname = f".{os.path.splitext(cur_filename)[0]}" # 以当前 py 文件名作为缓存目录名
PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname)
def ClearDirectory(dir):
r""" 清空目录 """
for filename in os.listdir(dir):
file = os.path.join(dir, filename)
try:
if os.path.isfile(file) or os.path.islink(file):
os.remove(file)
elif os.path.isdir(file):
shutil.rmtree(file)
except Exception as e:
print(f'Failed to delete{file}. Reason: {e}')
@task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir))
def TaskAdd(x, y):
result = x + y
logger.info(f"{x} + {y} = {result}")
return result
@task
def TaskSubtract(x):
r""" 读入参数减 1 """
result = x - 1
logger.info(f"{x} - 1 = {result}")
return result
@task
def TaskMultiply(x):
r""" 读入参数乘以 2 """
result = x * 2
logger.info(f"{x} * 2 = {result}")
print(f"result: {result}")
return result
@task(log_stdout=True)
def TaskDivide(x, y):
r""" 读入参数做除法 """
result = y / x
logger.info(f"{y} / {x} = {result}")
return result
if __name__ == '__main__':
if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
print(f"****** Clear {PrefectLocalResultDir} ...")
ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow:
addResult = TaskAdd(2, 1)
subResult = TaskSubtract(addResult)
mulResult = TaskMultiply(addResult)
TaskDivide(subResult, mulResult)
flow_state = flow.run()
复制代码
运行
第一次运行(注意第一个计算结果的
state
是Success
)
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码
第二次运行(注意第一个计算结果的
state
是cached
)
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码
第三次运行(注意第一个计算结果的
state
是C
ached)
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
****** Clear D:\Python3Project\test_prefect\.test_prefect ...
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码
静态 DAG 图
官方文档:https://docs.prefect.io/core/advanced_tutorials/visualization.html
下载 Graphviz 并配置到
PATH
环境变量修改 pyproject.toml,添加
viz
extra,然后运行poetry update -vvv
[tool.poetry.dependencies]
python = "^3.8"
prefect = { version = "~0.15.12", extras = ["viz"] }
复制代码
修改
test_prefect.py
的主函数
if __name__ == '__main__':
if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
print(f"****** Clear {PrefectLocalResultDir} ...")
ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow:
addResult = TaskAdd(2, 1)
subResult = TaskSubtract(addResult)
mulResult = TaskMultiply(addResult)
TaskDivide(subResult, mulResult)
flow.visualize(filename='flow_start', format='png')
flow_state = flow.run()
flow.visualize(flow_state=flow_state, filename='flow_end', format='png')
复制代码
运行代码会生成
flow_start.png
flow_end.png
两张图片
poetry run python ./test_prefect.py restart
复制代码
flow_start.png
flow_end.png
本文出自 qbit snap
划线
评论
复制
发布于: 刚刚阅读数: 3
版权声明: 本文为 InfoQ 作者【qbit】的原创文章。
原文链接:【http://xie.infoq.cn/article/c9bec89de2562ba7bed5884b0】。文章转载请联系作者。
qbit
关注
开箱即用,拿走不谢。 2018.10.10 加入
软件设计师 网络工程师
评论