写点什么

Luigi 3.x 基本用法(qbit)

作者:qbit
  • 2022 年 1 月 10 日
  • 本文字数:1311 字

    阅读完需:约 4 分钟

前言

  • 软件版本

Python3 3.8luigi 3.0.3poetry 1.1.7
复制代码


安装

  • poetry 初始化项目后在 pyproject.toml 添加以下依赖,然后运行 poetry update

[tool.poetry.dependencies]python = "^3.8"luigi = "~3.0.3"
复制代码

配置文件

  • 设置环境变量

# Linuxexport LUIGI_CONFIG_PATH=my_path/luigi.tomlexport LUIGI_CONFIG_PARSER=toml# Windows(注意行尾不要有空格)set LUIGI_CONFIG_PATH=my_path/luigi.tomlset LUIGI_CONFIG_PARSER=toml
复制代码
  • 编辑配置文件 luigi.toml ,全部配置项参见官方文档

[core]default_scheduler_host = '192.168.1.101'default_scheduler_port = 8082default_scheduler_url = 'http://192.168.1.101:8082/'
[scheduler]# luigid 终止后保存的状态文件state_path = 'my_path/state.pickle'# 任务终止后多久移除(默认 10 分钟)remove_delay = 3600
复制代码

测试代码

  • luigi_test.py

# encoding: utf-8# author: qbit# date: 2022-01-10# summary: 测试 luigi,加减乘除
import luigi
class TaskAdd(luigi.Task): r""" 读入参数做加法 """ x = luigi.IntParameter() y = luigi.IntParameter()
def output(self): return luigi.LocalTarget(f'add_{self.x}_{self.y}.result')
def run(self): with self.output().open('w') as f: f.write(str(self.x + self.y))
class TaskSubtract(luigi.Task): r""" 把加法的结果 -1 """ def requires(self): return TaskAdd(2, 1)
def output(self): return luigi.LocalTarget('subtract.result')
def run(self): with self.input().open('r') as infile: num = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(num-1))
class TaskMultiply(luigi.Task): r""" 把加法的结果乘以 2 """ def requires(self): return TaskAdd(2, 1)
def output(self): return luigi.LocalTarget('multiply.result')
def run(self): with self.input().open('r') as infile: num = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(num*2))
class TaskDivide(luigi.Task): r""" 用乘法的结果除以减法的结果 """ def requires(self): return {'x': TaskSubtract(), 'y': TaskMultiply()}
def output(self): return luigi.LocalTarget('divide.result')
def run(self): with self.input()['x'].open('r') as infile: x = int(infile.read().strip())
with self.input()['y'].open('r') as infile: y = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(y / x))
if __name__ == '__main__': luigi.build([TaskDivide()], local_scheduler=False)
复制代码

运行

  • 运行后台 scheduler

poetry run luigid
复制代码
  • 运行任务

poetry run python .\luigi_test.py
复制代码


本文出自 qbit snap


发布于: 刚刚阅读数: 2
用户头像

qbit

关注

开箱即用,拿走不谢。 2018.10.10 加入

软件设计师 网络工程师

评论

发布
暂无评论
Luigi 3.x 基本用法(qbit)