workflow 之 prefect 基本用法(qbit)
作者:qbit
- 2022 年 1 月 13 日
本文字数:4034 字
阅读完需:约 13 分钟
前言
软件版本
Python 3.8poetry 1.1.7prefect 0.15.12复制代码
poetry github:https://github.com/python-poetry/poetryprefect
github:https://github.com/PrefectHQ/prefect
安装
用
poetry初始化项目后在pyproject.toml添加以下依赖,然后运行poetry update -vvv
[tool.poetry.dependencies]python = "^3.8"prefect = "~0.15.13"复制代码
# Linuxexport PREFECT__FLOWS__CHECKPOINTING=true# Windows powershell$env:PREFECT__FLOWS__CHECKPOINTING="true"# Windows cmd(注意行尾不要有空格)set PREFECT__FLOWS__CHECKPOINTING=true复制代码
测试代码
test_prefect.py
# encoding: utf-8# author: qbit# date: 2022-01-12# summary: 测试 prefect,加减乘除
import osimport sysimport shutilimport prefectfrom prefect import task, Flowfrom prefect.engine.results import LocalResult
logger = prefect.context.get("logger")cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))cur_filename = os.path.basename(__file__)dirname = f".{os.path.splitext(cur_filename)[0]}" # 以当前 py 文件名作为缓存目录名PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname)
def ClearDirectory(dir): r""" 清空目录 """ for filename in os.listdir(dir): file = os.path.join(dir, filename) try: if os.path.isfile(file) or os.path.islink(file): os.remove(file) elif os.path.isdir(file): shutil.rmtree(file) except Exception as e: print(f'Failed to delete{file}. Reason: {e}')
@task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir))def TaskAdd(x, y): result = x + y logger.info(f"{x} + {y} = {result}") return result
@taskdef TaskSubtract(x): r""" 读入参数减 1 """ result = x - 1 logger.info(f"{x} - 1 = {result}") return result
@taskdef TaskMultiply(x): r""" 读入参数乘以 2 """ result = x * 2 logger.info(f"{x} * 2 = {result}") print(f"result: {result}") return result
@task(log_stdout=True)def TaskDivide(x, y): r""" 读入参数做除法 """ result = y / x logger.info(f"{y} / {x} = {result}") return result
if __name__ == '__main__': if (len(sys.argv) > 1) and (sys.argv[1] == "restart"): print(f"****** Clear {PrefectLocalResultDir} ...") ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult)
flow_state = flow.run()复制代码
运行
第一次运行(注意第一个计算结果的
state是Success)
# 运行命令poetry run python ./test_prefect.py# 结果输出[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 2 + 1 = 3[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded复制代码
第二次运行(注意第一个计算结果的
state是cached)
# 运行命令poetry run python ./test_prefect.py# 结果输出[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded复制代码
第三次运行(注意第一个计算结果的
state是Cached)
# 运行命令poetry run python ./test_prefect.py# 结果输出****** Clear D:\Python3Project\test_prefect\.test_prefect ...[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 2 + 1 = 3[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded复制代码
静态 DAG 图
官方文档:https://docs.prefect.io/core/advanced_tutorials/visualization.html
下载 Graphviz 并配置到
PATH环境变量修改 pyproject.toml,添加
vizextra,然后运行poetry update -vvv
[tool.poetry.dependencies]python = "^3.8"prefect = { version = "~0.15.12", extras = ["viz"] }复制代码
修改
test_prefect.py的主函数
if __name__ == '__main__': if (len(sys.argv) > 1) and (sys.argv[1] == "restart"): print(f"****** Clear {PrefectLocalResultDir} ...") ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult)
flow.visualize(filename='flow_start', format='png') flow_state = flow.run() flow.visualize(flow_state=flow_state, filename='flow_end', format='png')复制代码
运行代码会生成
flow_start.pngflow_end.png两张图片
poetry run python ./test_prefect.py restart复制代码
flow_start.png
flow_end.png
本文出自 qbit snap
划线
评论
复制
发布于: 刚刚阅读数: 3
版权声明: 本文为 InfoQ 作者【qbit】的原创文章。
原文链接:【http://xie.infoq.cn/article/c9bec89de2562ba7bed5884b0】。文章转载请联系作者。
qbit
关注
开箱即用,拿走不谢。 2018.10.10 加入
软件设计师 网络工程师











评论