写点什么

workflow 之 Dagster 基本用法(qbit)

作者:qbit
  • 2022 年 1 月 11 日
  • 本文字数:1536 字

    阅读完需:约 5 分钟

workflow 之 Dagster 基本用法(qbit)

前言

  • 软件版本

Python  3.8poetry  1.1.7dagster 0.13.13
复制代码

安装

  • poetry 初始化项目后在 pyproject.toml 添加以下依赖,然后运行 poetry update

# 国内镜像源(可选)[[tool.poetry.source]]name = "aliyun"url = "https://mirrors.aliyun.com/pypi/simple/"default = true
[tool.poetry.dependencies]python = "^3.8"dagster = "~0.13.13"dagit = "~0.13.13"
复制代码

测试代码

  • test_dagster.py

# encoding: utf-8# author: qbit# date: 2022-01-11# summary: 测试 dagster,加减乘除from dagster import get_dagster_logger, job, op
loggerDag = get_dagster_logger()
@opdef OpSeed(): r""" 种子函数,生成初始参数 """ return (2, 1)
@opdef OpAdd(seed): r""" 读入参数做加法 """ x, y = seed result = x + y loggerDag.info(f"{x} + {y} = {result}") return result
@opdef OpSubtract(x): r""" 读入参数减 1 """ result = x - 1 loggerDag.info(f"{x} - 1 = {result}") return result
@opdef OpMultiply(x): r""" 读入参数乘以 2 """ result = x * 2 loggerDag.info(f"{x} * 2 = {result}") return result
@opdef OpDivide(x, y): r""" 读入参数做除法 """ result = y / x loggerDag.info(f"{y} / {x} = {result}") return result
@jobdef arithmetic(): r""" 四则运算 """ seed = OpSeed() addResult = OpAdd(seed) subResult = OpSubtract(addResult) mulResult = OpMultiply(addResult) OpDivide(subResult, mulResult)
if __name__ == "__main__": result = arithmetic.execute_in_process(run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})
复制代码

直接运行

  • 直接使用 python 运行

poetry run python test_dagster.py
复制代码
  • 运行结果

2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpAdd - 2 + 1 = 32022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpMultiply - 3 * 2 = 62022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpSubtract - 3 - 1 = 22022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpDivide - 6 / 2 = 3.0
复制代码

daster 运行

  • 编辑配置文件 run_config.yaml

loggers:  console:    config:      log_level: INFO
复制代码
  • 运行命令

poetry run dagster job execute -f test_dagster.py -c run_config.yaml
复制代码
  • 运行结果

To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
2022-01-11 16:35:25 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpAdd - 2 + 1 = 32022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpSubtract - 3 - 1 = 22022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpMultiply - 3 * 2 = 62022-01-11 16:35:35 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpDivide - 6 / 2 = 3.0
复制代码

dagit 运行

  • 运行命令

poetry run dagit -f test_dagster.py
复制代码
  • 用浏览器打开 http://127.0.0.1:3000 查看

  • 点击 Launchpad 标签页,点击 Launch Run 按钮运行,运行结果如下:


出自 qbit snap

发布于: 2022 年 01 月 11 日阅读数: 92
用户头像

qbit

关注

开箱即用,拿走不谢。 2018.10.10 加入

软件设计师 网络工程师

评论

发布
暂无评论
workflow 之 Dagster 基本用法(qbit)_工作流_qbit_InfoQ写作社区