Luigi 3.x 基本用法(qbit)
作者:qbit
- 2022 年 1 月 10 日
本文字数:1311 字
阅读完需:约 4 分钟
前言
软件版本
Python3 3.8luigi 3.0.3poetry 1.1.7复制代码
安装
用
poetry初始化项目后在pyproject.toml添加以下依赖,然后运行poetry update
[tool.poetry.dependencies]python = "^3.8"luigi = "~3.0.3"复制代码
配置文件
设置环境变量
# Linuxexport LUIGI_CONFIG_PATH=my_path/luigi.tomlexport LUIGI_CONFIG_PARSER=toml# Windows(注意行尾不要有空格)set LUIGI_CONFIG_PATH=my_path/luigi.tomlset LUIGI_CONFIG_PARSER=toml复制代码
编辑配置文件
luigi.toml,全部配置项参见官方文档
[core]default_scheduler_host = '192.168.1.101'default_scheduler_port = 8082default_scheduler_url = 'http://192.168.1.101:8082/'
[scheduler]# luigid 终止后保存的状态文件state_path = 'my_path/state.pickle'# 任务终止后多久移除(默认 10 分钟)remove_delay = 3600复制代码
测试代码
luigi_test.py
# encoding: utf-8# author: qbit# date: 2022-01-10# summary: 测试 luigi,加减乘除
import luigi
class TaskAdd(luigi.Task): r""" 读入参数做加法 """ x = luigi.IntParameter() y = luigi.IntParameter()
def output(self): return luigi.LocalTarget(f'add_{self.x}_{self.y}.result')
def run(self): with self.output().open('w') as f: f.write(str(self.x + self.y))
class TaskSubtract(luigi.Task): r""" 把加法的结果 -1 """ def requires(self): return TaskAdd(2, 1)
def output(self): return luigi.LocalTarget('subtract.result')
def run(self): with self.input().open('r') as infile: num = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(num-1))
class TaskMultiply(luigi.Task): r""" 把加法的结果乘以 2 """ def requires(self): return TaskAdd(2, 1)
def output(self): return luigi.LocalTarget('multiply.result')
def run(self): with self.input().open('r') as infile: num = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(num*2))
class TaskDivide(luigi.Task): r""" 用乘法的结果除以减法的结果 """ def requires(self): return {'x': TaskSubtract(), 'y': TaskMultiply()}
def output(self): return luigi.LocalTarget('divide.result')
def run(self): with self.input()['x'].open('r') as infile: x = int(infile.read().strip())
with self.input()['y'].open('r') as infile: y = int(infile.read().strip())
with self.output().open('w') as outfile: outfile.write(str(y / x))
if __name__ == '__main__': luigi.build([TaskDivide()], local_scheduler=False)复制代码
运行
运行后台 scheduler
poetry run luigid复制代码
运行任务
poetry run python .\luigi_test.py复制代码
本文出自 qbit snap
划线
评论
复制
发布于: 刚刚阅读数: 2
版权声明: 本文为 InfoQ 作者【qbit】的原创文章。
原文链接:【http://xie.infoq.cn/article/ec48c85daf9792c7f833e0322】。文章转载请联系作者。
qbit
关注
开箱即用,拿走不谢。 2018.10.10 加入
软件设计师 网络工程师











评论