Luigi 3.x 基本用法(qbit)
作者:qbit
- 2022 年 1 月 10 日
本文字数:1311 字
阅读完需:约 4 分钟
前言
软件版本
Python3 3.8
luigi 3.0.3
poetry 1.1.7
复制代码
安装
用
poetry
初始化项目后在pyproject.toml
添加以下依赖,然后运行poetry update
[tool.poetry.dependencies]
python = "^3.8"
luigi = "~3.0.3"
复制代码
配置文件
设置环境变量
# Linux
export LUIGI_CONFIG_PATH=my_path/luigi.toml
export LUIGI_CONFIG_PARSER=toml
# Windows(注意行尾不要有空格)
set LUIGI_CONFIG_PATH=my_path/luigi.toml
set LUIGI_CONFIG_PARSER=toml
复制代码
编辑配置文件
luigi.toml
,全部配置项参见官方文档
[core]
default_scheduler_host = '192.168.1.101'
default_scheduler_port = 8082
default_scheduler_url = 'http://192.168.1.101:8082/'
[scheduler]
# luigid 终止后保存的状态文件
state_path = 'my_path/state.pickle'
# 任务终止后多久移除(默认 10 分钟)
remove_delay = 3600
复制代码
测试代码
luigi_test.py
# encoding: utf-8
# author: qbit
# date: 2022-01-10
# summary: 测试 luigi,加减乘除
import luigi
class TaskAdd(luigi.Task):
r""" 读入参数做加法 """
x = luigi.IntParameter()
y = luigi.IntParameter()
def output(self):
return luigi.LocalTarget(f'add_{self.x}_{self.y}.result')
def run(self):
with self.output().open('w') as f:
f.write(str(self.x + self.y))
class TaskSubtract(luigi.Task):
r""" 把加法的结果 -1 """
def requires(self):
return TaskAdd(2, 1)
def output(self):
return luigi.LocalTarget('subtract.result')
def run(self):
with self.input().open('r') as infile:
num = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(num-1))
class TaskMultiply(luigi.Task):
r""" 把加法的结果乘以 2 """
def requires(self):
return TaskAdd(2, 1)
def output(self):
return luigi.LocalTarget('multiply.result')
def run(self):
with self.input().open('r') as infile:
num = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(num*2))
class TaskDivide(luigi.Task):
r""" 用乘法的结果除以减法的结果 """
def requires(self):
return {'x': TaskSubtract(), 'y': TaskMultiply()}
def output(self):
return luigi.LocalTarget('divide.result')
def run(self):
with self.input()['x'].open('r') as infile:
x = int(infile.read().strip())
with self.input()['y'].open('r') as infile:
y = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(y / x))
if __name__ == '__main__':
luigi.build([TaskDivide()], local_scheduler=False)
复制代码
运行
运行后台 scheduler
poetry run luigid
复制代码
运行任务
poetry run python .\luigi_test.py
复制代码
本文出自 qbit snap
划线
评论
复制
发布于: 刚刚阅读数: 2
版权声明: 本文为 InfoQ 作者【qbit】的原创文章。
原文链接:【http://xie.infoq.cn/article/ec48c85daf9792c7f833e0322】。文章转载请联系作者。
qbit
关注
开箱即用,拿走不谢。 2018.10.10 加入
软件设计师 网络工程师
评论