写点什么

workflow 之 prefect 基本用法(qbit)

作者:qbit
  • 2022 年 1 月 13 日
  • 本文字数:4034 字

    阅读完需:约 13 分钟

前言

  • 软件版本

Python  3.8poetry  1.1.7prefect 0.15.12
复制代码

安装

  • poetry 初始化项目后在 pyproject.toml 添加以下依赖,然后运行 poetry update -vvv

[tool.poetry.dependencies]python = "^3.8"prefect = "~0.15.13"
复制代码
# Linuxexport PREFECT__FLOWS__CHECKPOINTING=true# Windows powershell$env:PREFECT__FLOWS__CHECKPOINTING="true"# Windows cmd(注意行尾不要有空格)set PREFECT__FLOWS__CHECKPOINTING=true
复制代码

测试代码

  • test_prefect.py

# encoding: utf-8# author: qbit# date: 2022-01-12# summary: 测试 prefect,加减乘除
import osimport sysimport shutilimport prefectfrom prefect import task, Flowfrom prefect.engine.results import LocalResult
logger = prefect.context.get("logger")cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))cur_filename = os.path.basename(__file__)dirname = f".{os.path.splitext(cur_filename)[0]}" # 以当前 py 文件名作为缓存目录名PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname)
def ClearDirectory(dir): r""" 清空目录 """ for filename in os.listdir(dir): file = os.path.join(dir, filename) try: if os.path.isfile(file) or os.path.islink(file): os.remove(file) elif os.path.isdir(file): shutil.rmtree(file) except Exception as e: print(f'Failed to delete{file}. Reason: {e}')
@task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir))def TaskAdd(x, y): result = x + y logger.info(f"{x} + {y} = {result}") return result
@taskdef TaskSubtract(x): r""" 读入参数减 1 """ result = x - 1 logger.info(f"{x} - 1 = {result}") return result
@taskdef TaskMultiply(x): r""" 读入参数乘以 2 """ result = x * 2 logger.info(f"{x} * 2 = {result}") print(f"result: {result}") return result
@task(log_stdout=True)def TaskDivide(x, y): r""" 读入参数做除法 """ result = y / x logger.info(f"{y} / {x} = {result}") return result
if __name__ == '__main__': if (len(sys.argv) > 1) and (sys.argv[1] == "restart"): print(f"****** Clear {PrefectLocalResultDir} ...") ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult)
flow_state = flow.run()
复制代码

运行

  • 第一次运行(注意第一个计算结果的 stateSuccess

# 运行命令poetry run python ./test_prefect.py# 结果输出[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 2 + 1 = 3[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:14:56+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码
  • 第二次运行(注意第一个计算结果的 statecached

# 运行命令poetry run python ./test_prefect.py# 结果输出[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:21:12+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码
  • 第三次运行(注意第一个计算结果的 stateCached)

# 运行命令poetry run python ./test_prefect.py# 结果输出****** Clear D:\Python3Project\test_prefect\.test_prefect ...[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 2 + 1 = 3[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 3 * 2 = 6[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 3 - 1 = 2[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...[2022-01-13 14:24:12+0800] INFO - prefect | 6 / 2 = 3.0[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | result: 3.0[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
复制代码

静态 DAG 图

[tool.poetry.dependencies]python = "^3.8"prefect = { version = "~0.15.12", extras = ["viz"] }
复制代码
  • 修改 test_prefect.py 的主函数

if __name__ == '__main__':    if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):        print(f"****** Clear {PrefectLocalResultDir} ...")        ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult)
flow.visualize(filename='flow_start', format='png') flow_state = flow.run() flow.visualize(flow_state=flow_state, filename='flow_end', format='png')
复制代码
  • 运行代码会生成 flow_start.png flow_end.png 两张图片

poetry run python ./test_prefect.py restart
复制代码
  • flow_start.png


  • flow_end.png


本文出自 qbit snap

发布于: 刚刚阅读数: 3
用户头像

qbit

关注

开箱即用,拿走不谢。 2018.10.10 加入

软件设计师 网络工程师

评论

发布
暂无评论
workflow 之 prefect 基本用法(qbit)