OneFlow 源码解析:静态图与运行时
作者|郑建华更新|许啸宇、张文骁、成诚
OneFlow 静态图的训练效率远高于动态图(eager 模式)。本文试图通过一个简单例子,结合v0.8.0版本的代码,解读一下静态图和运行时的实现机制。
在开始之前,建议先读一下参考资料中《OneFlow 框架的系统设计(https://zhuanlan.zhihu.com/p/337851255)》等系列文章。对静态图、运行时的基本概念和设计理念有基本的了解,会更容易理解代码。
1 代码示例
下面的示例代码来自官方文档(https://docs.oneflow.org/master/basics/08_nn_graph.html),是一个线性模型的前向计算。后续主要基于这段代码进行分析。
2 oneflow 包的初始化
import oneflow 在初始化包(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py)时,与静态图相关的主要操作如下:
GetEnv(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L228)
EnvGlobalObjectsScope::Init(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L126)
启动各个节点的控制面(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L160-L162)网络连接
初始化 VM(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L180)
启动各个节点的数据面网络连接(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L184-L188)
初始化 KernelObserver(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L192-L203)
NewDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L229)
RegsiterSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/multi_client_session.py#L39) 创建 Session,并注册为 default session(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/session_util.cpp#L89)
创建 Python MultiClientSession 并保存到 dict(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/session_context.py#L40),但并不 TryInit
创建 C++ MultiClientSessionContext(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/multi_client_session.py#L41) 但并不 TryInit
EnvGlobalObjectsScope::Init 中先创建一个全局的 ProcessCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/env_global_objects_scope.cpp#L132)对象。然后根据环境变量等配置,在各个进程间创建 gRPC 和 CommNet 的连接,分别负责控制面和数据面的数据传输。其中在 Bootstrap 过程中会初始化全局的 ProcessCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/grpc.cpp#L42),给每个进程分配一个全局唯一的 rank 编号(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/global_process_ctx.cpp#L28)(machine_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/rpc/lib/global_process_ctx.cpp#L24))。
本文不涉及网络层面的操作,只讨论同一进程内各线程间的交互。
3 Module 类
虽然可以直接用 op 和 tensor 构造模型,但是 op 的粒度太细了,直接用 op 构造模型会比较繁琐。
Module(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/module.py#L54)是由 op 和 tensor 构成的、可复用的子模块。利用 Module 可以更高效、更快捷的构建复杂模型。oneflow.nn(https://github.com/Oneflow-Inc/oneflow/blob/d825243aa7aff5cba8bd3a901b4cc56c2b1a36af/python/oneflow/nn/__init__.py)模块导出了很多预定义的 Module。
Module 定义了自己的属性设置逻辑(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/module.py#L262),核心逻辑是
如果 value 是 Parameter 类型,就保存到 Module._parameters 中
如果 value 是 Module 类型,就保存到 Module._modules 中
如果 value 是 Tensor 类型,就保存到 Module._buffers 中
否则按常规属性处理
Module 可以包含子 Module,形成树结构。因为 Module 通过 setattr 将子 Module 和 Parameter 都保存到字典结构中,可以方便的遍历所有 Module 及其参数 tensor。
4 Graph 类
4.1 构造函数
Graph 的构造函数中 GetDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L145)得到的 session,就是导入 oneflow 包时 NewDefaultSession(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/__init__.py#L229)构建的 session。当时没有初始化,而是在 Graph 构造时进行初始化(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L147)。对应的 C++函数是 MultiClientSessionContext::TryInit(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/multi_client_session_context.cpp#L67),执行时会创建各种全局的资源管理器,比如:
LazyJobBuildAndInferCtxMgr
BufferMgr
RegstMgr
ActorMsgBus
ThreadMgr
4.2 __setattr__: 将 Module 和 Tensor 封装为 Block
Graph.__setattr__ 支持通过设置属性的方式把一个 Module 添加到 Graph 中,之后改 Module 就可以被 Graph 调用了。添加到 Graph 中的 Module,会被包装到 Block 里面,Block 起到了代理执行的作用,它会给原 Eager 下的 Module 扩展出静态执行需要的一些特殊功能。
添加到 Graph 中的 Module 和原 Module 共享了状态(Parameter、Buffer)和 forward 执行逻辑。共享 forward 执行逻辑使得静态和动态执行计算逻辑相同。共享状态则可以使动态图下的模型状态被静态图复用。基于此,两个 Graph,一个用于训练,一个用于预测,他们都复用统一模型 Module,这样训练和预测 Graph 也就实现了模型共享。
setattr 最重要的动作就是对_add_block 的调用(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1332),_add_block 中主要是调用 get_block_cls 并保存结果(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1326)。get_block_cls(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L39)的作用是将 Module 及其所有 Tensor 属性都转为对应的 Block 对象。为什么要做这个动作呢?主要是静态图编译需要借助 Block 类型来实现代理执行的功能,这些功能不适合直接写到 eager 下的 Module 和 Tensor 上。
这个转换是在 ModuleBlock 构造时调用 set_origin(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L131)完成的。对于子 Module,会递归调用 get_block_cls 函数(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L145),这样所有子 Module 及其 Tensor 属性都会被转换为对应的 Block 对象。
所以,上述示例代码中,GraphMyLinear 实际存储的是 ModuleBlock,Graph.build 执行时获取的 model 属性也是 ModuleBlock 对象,ModuleBlock.origin 才是 ModuleMyLinear。
Graph.__setattr__不允许将 Tensor 对象设置为属性(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L1340)。Tensor 只能存到 Module 中,因为 Module 是做状态共享的基本单位,而 Graph 是不允许复用的。
4.3 针对不同任务,定义不同的计算图
根据 Oneflow Model Zoo 的模型示例(https://github.com/Oneflow-Inc/models/blob/1b291f78d8f60e5f04ee0c5962e4611cc4bab40a/Vision/classification/image/alexnet/graph/train.py),train/eval 等阶段可以创建不同的 Graph 子类。动态图下提供了 Module、Optimizer、Dataloader 等模块,这些模型都可以被添加到 Graph 中。不同的组合可以构建不同类型的任务。
在这些不同阶段,Graph 构造函数的行为、build 函数的输入输出都有各自特点。了解这些,看后续代码时会更容易理解各个参数的具体含义。
构造函数
train 阶段,需要添加 Module、损失函数、优化器和 dataloader
eval 阶段,只需要添加 Module 和 dataloader
build 函数
train
导入样本和 label
调用 Module 得到前向计算结果
计算损失
计算梯度
返回 loss
eval
导入样本和 label
调用 Module 得到预估结果
返回预估结果和 label
4.4 小结
上述几个类型的关系如下:
下面描述了 GraphMyLinear 的构造流程
5 逻辑图的编译
计算机语言的编译,是将高级语言的语句编译为汇编或机器指令。深度学习框架对计算任务的编译,是将用户的特定语句操作转换为 DAG 图。oneflow 中用 Job(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job.proto#L30)描述逻辑的计算图。
不同于 eager 模式的动态图,静态图在开始执行前可以得到整个计算任务的所有信息,可以对 DAG 进行多轮优化。每轮优化都是输入一个 Job、得到一个新 Job。
最后,根据分布式环境配置,将逻辑图 Job 转换为物理执行的计算图 Plan(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/plan.proto#L34)。在物理图中,一个 op 可能分布在多个节点/进程。
启动 DAG 计算需要调用 Graph.__call__,这个函数的执行主要分以下几个步骤:
__call__
_compile(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L221) if not _is_compiled
build_graph(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L741)
__build_graph(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L759)
finish_complie_and_init_runtime(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L742)
__run(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L226)
逻辑图编译主要在__build_graph 中进行。finish_complie_and_init_runtime 会继续做一些优化 pass,然后构建物理图、初始化运行时 Actor 系统。__run 会启动一次 DAG 的运算。
5.1 graph_build_context: 为逻辑图编译设置基本环境
在 Graph 中,build 函数里面的代码执行都在 graph_build_context 的作用域下,这样实现了动态转静态的功能。
__build_graph 中的 graph_build_context(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L851)虽然只有一行代码,但却做了几件非常重要的事情。
首先在 context 作用域内设置全局的 lazy_mode 为 True(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L46)。在这个 context 作用域内,所有 op 都由 LazyInterpreter 解释执行。
其次,在 JobBuildAndInferCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L47)作用域内,JobBuildAndInferCtx_Open(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L57)调用类似如下 C++代码
OpenJobBuildAndInferCtx 会新建一个 Job 对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx_mgr.cpp#L32)、一个 LazyJobBuildAndInferCtx 对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx_mgr.cpp#L34)。LazyJobBuildAndInferCtx 负责根据用户定制的 op 等操作,修改 Job,其中最主要的功能是添加新 Op。
5.2 __build_io:为计算图添加 input 和 output Op
上面这行代码(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L854-L856)的作用是,对于用户传递给 graph_mylinear(input)的 input 参数,针对其中的每个 tensor 都在逻辑计算图中插入一个 FeedInputOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/system_ops.h#L48)节点。也就是说,model 的输入(比如样本 tensor,具体参考 4.3 节),在静态图中也视为一个 op 操作。
__build_io 内会用 args(即 input)和 kwargs 构造一个 ArgsTree。ArgsTree 把 Python 下的输入、输出抽象成了一个树,输入、输出可以是嵌套的 Tuple、List、Dict,元素是 Tensor,嵌套的结构刚好可以表示为树,而 Tensor 是树中的叶子节点。示例代码中 kwargs 是空的。
遍历 ArgsTree,对 args 和 kwargs 的每个 tensor 都调用传入的 build_func,对于 input 来说,就是 build_graph_input_arg(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L206)。后面会看到,model 的 output 也会调用__build_io,所以这个函数名的意思应该就是对 model 的输入、输出进行静态图的构图工作。
build_graph_input_arg 内部会构造一个 FeedInputOpExpr(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L213),提交给解释器执行。因为是在 lazy 作用域内,由 LazyInterpreter 解释执行(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L471),LazyInterpreter 会将对应的 op 插入静态图。
附:build input 时 ArgsTree 的内部结构
__build_io(input) 中 ArgsTree 的内部数据组织示意
_named_io_args: NamedArg
_value: tuple
[0]: NamedArg
_value: tuple of NamedArg
[0]: NamedArg
_value: args tensor from Graph.__call__
[1]: NamedArg
_value: empty kwargs from Graph.__call__
通过 pdb 命令可以查看变量: p
args_tree._named_io_args._value[0]._value[0]._value.to_numpy()
5.2.1 将 op 添加到逻辑图
LazyInterpreter::ApplyImpl(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L471)在执行时,GetCurInferCtx()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L500)返回的就是 graph_build_context 中 OpenJobBuildAndInferCtx(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L57)创建的那个 LazyJobBuildAndInferCtx 对象,这个对象负责逻辑图的构建。添加 op 的主要调用流程如下:
infer_ctx->AddAndInferConsistentOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L503)
AddAndInferOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx.cpp#L563)
ConstructOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx.cpp#L580)
CheckAndConstructOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/operator.cpp#L1216)
NewObj(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/operator.cpp#L51)
OperatorConf 中,多种 op 配置共享 op_type 字段(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/op_conf.proto#L412),protobuf oneof 的 op_type_case 常量作为注册 NewObj 的 key。
系统预定义的 op 在 oneflow/core/operator(https://github.com/Oneflow-Inc/oneflow/tree/release/v0.8.0/oneflow/core/operator)下,例如 UserOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/operator/user_op.h#L24)。
AddAndInferOp 将返回的 Operator 保存到 LazyJobBuildAndInferCtx 的字典中。后续的函数调用,主要是进行推导并修改静态图 Job,使得各个节点构成一个 DAG。
JobBuildAndInferCtx 相关的类关系如下:
5.2.2 lazy tensor 和 eager tensor 的区别
LazyInterpreter::ApplyImpl 的最后,会调用 BuildTensor(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L518)构造一个 lazy tensor,作为 build_graph_input_arg 的返回值(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L216)。所以__build_io 返回的 lazy_args(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L854)是 lazy tensor,它将替代 eager 的 args(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L828)(也就是用户输入的 input)参与后续的计算图构建。
那么 lazy tensor 和 eager tensor 的区别是什么呢?eager tensor 是要即时计算的,所以需要真实数据;而 lazy tensor 仅在静态图编译阶段用于推导,只需要描述性质的元信息。静态图编译是在 lazy 模式下运行,只是使用 lazy tensor 做计算机构图和校验。
后面会看到,静态图的运行期已经没有 tensor 的概念。运行期看到的只是更广义的 Regst 存储,可能代表 tensor/blob,也可能是其它控制信息。静态图运行时的输入,是直接读取外部 eager tensor 的内存数据到到 regst;输出应该是 op 写到 regst,通过 blob 构造 eager tensor。
5.3 build: 将 UserOp 和 FeedVariableOp 添加到逻辑图
__build_graph 中的 self.build()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L861)会调用 GraphMyLinear.build(),以及 ModuleMyLinear.forward()。因为是在 lazy 模式下运行,matmul 和 add 都会调用 UserOpExpr 重载版本的 LazyInterpreter::ApplyImpl(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L832),进而调用 AddAndInferConsistentOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L940)进行构图操作。
需要说明的是,在引用 Module 的 Parameter 属性时(如 weight/bias),会触发 FeedVariableOp 的构图操作(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L226)、调用对应版本的 LazyInterpreter::ApplyImpl(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L527)。这个是怎么执行的呢?
__build_graph 中,在进入 lazy 模式之前,先调用了_create_states_builder(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L843)。其中 self._state()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L667)返回所有 Module 的所有 Parameter(包括子 Module)。
state_block 的类型是 TensorBlock(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L631)。所有的 state_block 的 lazy_origin_builder().method(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L647)都被设置为调用 build_graph_state(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L683-L688)。
给 build_graph_state(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L220)设置个断点能让整个调用过程显形,主要的调用栈如下:
这个调用过程比较容易困扰的是,执行对象会在 Grpah、GraphMyLinear、ModuleMyLinear、ModuleBlock 之间切换。
前面在讨论 Graph 的构造时已经提过,执行 self.model(input)时,Graph.__getattr__返回的属性 model 是 ModuleBlock 对象,所以实际调用的是 ModuleBlock.__call__。
在这个函数内调用__block_forward(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L234),其中的_origin(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L266)是 ModuleMyLinear,进入到它的 forward 方法,执行到 flow.matmul(input, self.weight) + self.bias 时,matmul 会被 LazyOpInterpreter 所执行,在 LazyOpInterpreter 中调用 AddAndInferConsistentOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L503)
,在 Job 中添加一个 matmul operator。同理后面的加法会在 job 中添加一个 add operator。
self.weight 和 self.bias 会触发调用 ModuleBlock.__getattr__,进而调用_get_from_states(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L483),调用 TensorBlock.try_build()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/block.py#L521)。这里执行的就是进入 lazy 模式之前设置的 build_graph_state(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/framework/graph_build_util.py#L220)。从而增加一个 FeedVariableOp 到计算图(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L527)。为什么设置和调用会距离这么远呢?主要是为了让参数尽量和消费参数的 Operator 在一个作用域下,所以实现成了惰性求值来达到延迟计算的目的。
再后面的步骤就是调用__build_io(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L869-L875)插入 FetchOutputOp(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp#L589)。也就是说,获取 model 的 output 也是一个 op。
到目前为止,前向计算图就构建完成了。它的 json 表示可以参考附录。net.op 是计算图的节点,通过 input 等属性可以看出节点之间的连接关系。
示例代码的前向计算图如下。从这个图可以看到,input、output、weights 等都是 op。
5.4 逻辑图优化
在__build_graph 中会调用 CurJobBuildAndInferCtx_Complete 对静态图进行多轮优化(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L923),对应的 C++函数是 LazyJobBuildAndInferCtx::Complete()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/job_build_and_infer_ctx.cpp#L975)。
这之后生成的 Job 是 full_job。本文的示例代码比较简单,并不是典型的计算场景,其 forwar 和 ful 计算图的拓扑是一样的。实际大部的图优化都实现在这个阶段,如 Op fusion、AMP、ZeRO、常量折叠等等。
到这里,逻辑图构建的主体部分就结束了。
随后会构建一个 CNNGraph 对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L947),对应的 C++类型是 NNGraph(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.h#L33)。这个对象将负责构建物理计算图 Plan。它也是整个运行时的拥有者和维护者。这个对象析构时,整个运行时也会有序终止并释放资源。
5.5 物理图的编译
接下来就是执行 finish_complie_and_init_runtime(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L742),其中的核心调用是 self._c_nn_graph.complie_and_init_runtime()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/python/oneflow/nn/graph/graph.py#L802),对应的 C++函数是 NNGraph::CompileAndInitRuntime(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L265)。
在这个函数中,JobCompleter().Complete()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L280)会继续对逻辑图做几轮修改优化,补全 Runtime 执行所需要的附加信息,Compiler().Compile()(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L285)将逻辑图转为分设备的物理图,并继续对 Plan 进行修改优化。
Plan 的编译是在 master 节点进行的(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L282)。master 节点会将 Plan 通过 gRPC 推送给各个 worker 节点(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L308),worker 节点从 master 拉取物理计算图(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L310)。
之后调用 NewRuntimeBuffers 创建 Buffer 对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L322),Buffer 应该是主要用于进程内的信息同步。
然后就准备初始化运行时了。
示例代码生成的 compiled_job 和物理图 Plan 的 json 参见附录。
最终生成的 compiled 逻辑图如下。框架自动插入了很多系统控制节点。
5.6 Plan 的结构
示例代码输出的 Plan json 数据见附录。
Plan 在逻辑上和 compiled_job 是等价的。这里主要关注 task/op 之间的关系。
Plan.task 中的每个元素是一个 task,其中的 exec_sequence.exec_node 对应 job 中的 op,通常只有一个 op(数组可以支持 sub graph)。
exec_node.kernel_conf.op_attribute 描述了 op 信息。其中 op_conf 包含 op name 信息。
kernel_conf.op_attribute.op_conf 就是 Job 中的 OperatorConf。
kernel_conf.op_attribute.arg_signature.bn_in_op2lbi 体现了 task/op 之间的连接关系。
bn_in_op 就是 blob name in op,即 op 输入的 blob name。
以 System-AutoTick-DstSubsetTick_21 为例
exec_node.bn_in_op2regst_desc_id 在 task 层面体现了连接关系。这个 map 中的 key 表示输入输出,value 是 register id。
task.produced_regst_desc 描述了对应 task 生产的 register,consumer_task_id 是消费者,
produced_regst_desc.out.regst_desc_type.data_regst_desc.lbi2blob_desc.lbi 就是这个 register 的 logic blob id。
task.consumed_regst_desc_id 描述了对应 task 消费的 register 信息
6 运行时的初始化
NNGraph::CompileAndInitRuntime 中,new Runtime 这行代码会初始化运行时(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/framework/nn_graph.cpp#L331)。主要做的事情包括:
创建 Thread
通知 Thread 创建 Actor,Actor 会创建 Regst 和 Kernel
给没有输入的 source_tasks 发送启动信号 kStart
6.1 Runtime 创建 Thread
在 Runtime 的构造函数中,DumpThreadIdsFromPlan(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L65)会将 Plan 中属于当前进程的 task 的 thread id 存入 thread_ids_变量。AddThreads 创建这些 Thread 对象(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L69)。
Thread 在构造时会创建一个物理线程( https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/thread/thread.cpp#L39),线程执行的是 PollMsgChannel 方法(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/thread/thread.cpp#L44),Thread 就是在这里持续等待需要处理的新消息。
Thread 只处理两类命令消息:线程终止消息,创建 Actor 的消息。其它消息交给 Actor::ProcessMsg 处理(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/thread/thread.cpp#L83)。
6.2 Runtime 通知 Thread 创建 Actor
在 Runtime 的构造函数中,tasks 被分为两类:source_tasks 和 other_tasks。在示例代码中,source_tasks(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L84-L85)是没有输入边的 task。
从代码逻辑看,在 Plan proto 中,task 的 consumed_regst_desc_id 字段是一个 map。如果这个 map 的所有 key 都是 in_ctrl(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L54),这个 task 就是 source_tasks。
一些 source_tasks 的示例如下:
System-Src-WaitAndSendIds_16
System-AutoTick-AppendDeviceTick_9
System-EagerCriticalSection-Interface-End-Tick-19
System-EagerCriticalSection-Interface-End-Tick-25
Runtime 调用 HandoutTasks 函数(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L100-L101)会给 ActorMsgBus 发送构建 Actor 的 kConstructActor 消息(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/runtime.cpp#L49)。
6.3 ActorMsgBus 和 Thread 的消息处理
从接口看,ActorMsgBus (https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L24)负责消息的发送(Actor 通过 ActorMsgBus 发送消息),Thread::PollMsgChannel(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L60) 负责消息的接收和处理。
相关实体的协作关系如下
Actor 是自调度的基本单元,接受消息然后工作,工作完后再继续发送消息。
actor_id 就是 task_id,是在编译 Plan 时就确定的。task 是编译时概念,actor 是对等的运行时概念。
task_id 有特定的编码格式(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L21-L29),从中可以解析出 machine_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L73)和 thread_id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/graph/task_id.cpp#L77)。
在跨网络的整个物理图 Plan 中,actor id 相当于地址,通过它可以定位唯一的 actor 实体。
Actor 通过 ActorMsgBus::SendMsg(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L24) 发送 ActorMsg(https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/lazy/actor/actor_message.h#L34) 消息。
ActorMsg 包含源和目的 actor id(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message.h#L84-L85)。
如果是进程内通讯(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L26),将通过 ActorMsgBus::SendMsgWithoutCommNet (https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/lazy/actor/actor_message_bus.cpp#L49)把 ActorMsg 朝目的 actor 所在的 thread 入队消息(https://github.com/Oneflow-Inc/oneflow/blob/4856d691051accd72f13f4139d281e411977b297/oneflow/core/thread/thread.h#L40)。
Thread::EnqueueActorMsg 会判断当前 thread 是否是 actor thread,如果是则入本地队列,否则则入 actor thead 的 channel 队列。
如果 ActorMsg 是跨进程消息,ActorMsgBus 通过 CommNet 发送消息(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor_message_bus.cpp#L42-L44),接收方的 CommNet 应该会根据 actor id 获得线程 id,从 ThreadMgr 查到 Thread,将消息交给 Thread 处理。
Thread::PollMsgChannel(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L60) 负责消息的接收和处理。
如果线程本地队列 local_msg_queue_为空,则从 thread 的 channel 队列中取出全部 ActorMsg 放入本地队列(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L63)。
从本地队列中取出一个 ActorMsg,然后开始处理。
处理一些特殊的 kCmdMsg 消息(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L67-L79),然后普通消息交给 Actor 自行处理(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/thread/thread.cpp#L83)。
Actor 收到消息后,会判断是否满足了 Act 的条件,如果满足,则会执行 Act,从而调用 LaunchKernel 执行计算,Act 执行结束后通过 ActorMsgBus 发消息通知上下游 Actor。
这些对象之间的消息传递关系如下图所示
6.4 激活 source Actor
目前的实现中,Actor 全部是自调度的,只能接受来自其他 Actor 的消息。Actor 中有一类比较特殊的 source actors,它们与 source tasks 对应。
source actors 没有上游 actor,它们会朝下游 actor 发送消息从而激活所有的 Actor 运行。
source actors 本身是如何执行的呢?它们在接受到 kStart 消息后就会一直 Act 直到进入退出流程。但是其 kernel 会阻塞在 Buffer(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/common/buffer.h#L26) 处,一直等待其他线程往 buffer 中添加数据后,阻塞会被激活,然后 kernel 执行读取,kernel 完成后,actor 的 Act 结束,往下游发送消息。
source actors 由于会发生阻塞,所以其必须有单独的 actor thread。
Runtime 初始化的的最后一步就是朝各 source actors 发送 kStart 消息用以激活它们,但 source actors 只有接受到 buffer 的数据后才会往下执行,然后朝下游 actors 发送消息,使所有的 actors 都执行起来。
7 Actor
7.1 Actor 的创建
Thread 在创建 Actor 时,会先尝试创建为 LightActor(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L104),如果不成功,再尝试用预先注册的工厂创建 Actor。
有几种 TaskType 可以用于 LightActor(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/light_actor.cpp#L677-L689):
kNormalForward,比如 matmul、add 等 user op。
kCopyHd
kTick
kCollectiveBoxingGeneric
目前大约有 20 多种 Actor 的子类型。其它 Actor 类型根据 TaskType(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/job/task.proto#L8)预先注册。例如 WaitAndSendIdsActor。
示例代码的各个节点对应的 actor 类型参见附录。
Actor 相关的类关系如下(包含关系只是表示可以访问到相关信息,并不意味着创建或着拥有该类型对象)
7.2 Actor 的初始化
Actor 的构造函数一般都是空的,构建之后需要执行 Init(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L129)函数进行初始化。
LightActor 继承自 ActorBase,不是 Actor 的子类,有自己的 Init 函数实现。这里只讨论 Actor 的初始化。
在 Actor::Init(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L129)中,首先调用 ConstructKernel(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L138)创建 kernel 实例。和 Operator 类似,kernel 也是以 OpTypeCase 作为注册的 key,例如 WaitAndSendIdsKernel(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L51)。一个 Actor 通常只有一个 kernel。
之后调用 NewRegsts 创建 Regst(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L152)。Tensor 是用户侧的概念。对应的运行时概念是 Regst(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/register/register.h#L24),它持有 Kernel 需要读写的内存。Regst 的概念比 Tensor 更宽泛,比如框架自动添加的控制 Op 也会用到 Regst。
Actor 将自己创建的 Regst 保存到 produced_regsts_(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L153)。
TakeOverNaiveConsumed(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L182)只记录需要消费的 regst id,但并不 push 到 consumed_regsts_。
TakeOverNaiveProduced(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L183)既记录生产的 regst id,也 push 到 naive_produced_rs_(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L249)。这种区别是为了首次执行计算时,actor 能顺利执行。后面分析 Actor 的消息处理时会再回过头来讨论一下。
调用 InitBnInOp2BlobInfo 会初始化 BlobInfo(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L184)。
之后就是调用 VirtualActorInit(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.cpp#L185),这里允许各个 Actor 子类定制自己的初始化逻辑。通常会调用 OF_SET_MSG_HANDLER 宏(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.0/oneflow/core/lazy/actor/actor.h#L76-L80)设置 Actor 的消息处理函数。
7.3 Actor 的消息处理
LightActor 首先会根据消息类型分别处理 kRegstMsg 和 kEordMsg 消息。HandleRegstMsg(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/lazy/actor/light_actor.cpp#L424) 中根据 RegstMsg 的 type (kProduced 或 kComsumed) 来分别处理各种读写状态计数。
然后判断读写计数是否达到了判断条件,如果达到了意味着满足了读写 regst 的条件,然后就 执行 ActOnce(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/lazy/actor/light_actor.cpp#L451)。
LightActor::ActOnce 会在第一次执行时去 InitBnInOp2Blob 和 InitActMsg。InitBnInOp2Blob 初始化 resgt 中的 bn 与 Blob 的映射关系,为 kernel 提供通过 bn 访问 Blob 的功能。InitActMsg 会初始化好所有需要发送的消息避免后继发消息时重复的构建消息。
然后就是 LaunchKernel,接着会 ResetState 重置 regst 状态。
LaunchKernel 后就会把之前构建好的消息发送出去,同步消息会直接入队 thread 消息队列,异步消息通过 callback 发送到 ActorMsgBus。
普通 Actor::ProcessMsg 会调用 msg handler 来处理消息,最常见的 msg handler 就是 Actor::HandlerNormal(https://github.com/Oneflow-Inc/oneflow/blob/release/v0.8.1/oneflow/core/lazy/actor/actor.cpp#L329)。
Actor::HandlerNormal 中流程跟 LightActor 中类似,会根据不同的 regst 类型来分别处理,Actor 中对 regst 的状态管理方式与 LightActor 不同,LightActor 中的方式更加高效,Actor 中能处理一些特殊情况。
消息处理完毕后,就会调用 ActUntilFail,ActUntilFail 会判断 IsReadReady 和 IsWriteReady 来决定是否可以进行 Act。
最常见的 NaiveActor::Act() 就是执行 AsyncLaunchKernel。
Act 完成后,就开始朝上下游发送 regst 消息。
还有一些特殊的 Actor,我们以 WaitAndSendIdsActor 为例,观察一下这类 Actor 的消息处理机制。
之所以选择这个例子,一是这个 Actor 比较简单;二是这是一个典型的 source task,想看一下计算图是怎么被触发启动计算的。
Thread 收到的消息如果不是 kStopThread 或 kConstructActor,就调用 Actor::ProcessMsg(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L83),将消息转给 Actor 处理。
ProcessMsg 函数只是简单的将消息转给 handler 处理(https://github.com/Oneflow-Inc/oneflow/blob/b6bf3f8843679111eb1edf79deefce814d250f4e/oneflow/core/lazy/actor/actor.h#L38)。
WaitAndSendIdsActor::VirtualActorInit 中,handler 被设置为 HandlerWaitToStart(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L53)。
Runtime 的构造函数中,发送的第一批消息是给 source_tasks 的 kStart 消息,这个消息就由 HandlerWaitToStart 函数处理。
HandlerWaitToStart 校验消息类型后,将 handler 设置为 HandlerNormal(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/job/runtime.cpp#L109)(这也是大部分 Actor 的默认 handler),然后调用 ProcessMsg(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L74),实际就是调用新设置的 handler HandlerNormal。
HandlerNormal 中,如果是 kCmdMsg,只允许是 kStart(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L377)。通过消息类型校验后,会直接调用 ActUntilFail(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L378)。
7.4 Act 执行的条件
LightActor 和 Actor 判断能否进行 Act 采用了不同的策略,LightActor 的效率更高,Actor 能处理一些特殊情况。
对于 LightActor,当在读的 register 计数 total_reading_cnt_ 归 0,可消费的 register 计数 ready_consumed_ 增加到 max_ready_consumed_,前者表示所有的消费者已经读取当前 LightActor 的 Regst,后者表示当前 LightActor 消费的所有 Regst 已经到达(由上游发送的 Regst 消息)。
对于 Actor,Actor::ActUntilFail 中,Act 方法(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L424)是各个子类自己实现的,一般主要是启动 kernel 计算。
但是在执行 Act 之前,需要先确认:
Act 执行依赖的数据是否都已经就绪?(IsReadReady)
Act 生产出来的数据,消费方是否已经用完、并收到 ack 消息确认?(IsWriteReady)
Actor 有 4 个与此相关的成员变量
RegstSlot naive_produced_rs_;
RegstSlot inplace_produced_rs_;
RegstSlot naive_consumed_rs_;
RegstSlot inplace_consumed_rs_;
xx_produced_rs_存储的是当前 Actor 的下游 consumer 返回的、已经使用完毕的 ack regst 信息。(当前 Actor 生产的 Regst 存储在 produced_regsts_中。)
运行时在初始化的过程中,所有 Actor 都没有运行过,任何 Actor 都不可能收到 ack 消息,所以在 Actor 初始化时,要预先填充 xx_produced_rs_,这样才能保证 Actor 在首次运行前是 WriteReady 的,才能顺利启动执行。
xx_consumed_rs_存储的是上游依赖发来的数据。它不需要预先填充。因为 source_tasks 没有输入依赖,自然就是 ReadReady 的;而 xx_produced_rs_在初始化时的预先填充又保证它是 WriteReady 的,所以 source_tasks 可以直接运行。source_tasks 的输出消息发给下游,下游也会变为 ReadReady,而下游在初始化后也保证是 WriteReady 的。整个 Actor 系统就可以这样运转起来了。
7.5 Actor 上下游之间的通知机制
Act 执行完毕后,需要将结果数据发给下游 consumer。以 WaitAndSendIds 的 Naive Produced 为例,ActUntilFail 中的调用流程如下:
AsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L427)
VirtualAsyncSendNaiveProducedRegstMsgToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L441)
HandleProducedNaiveDataRegstToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L446)
HandleRegstToConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L577)
EnqueueAsyncMsg(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L523)
如果目标线程是当前线程,ActorMsgBus::SendMsg(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L662)
否则,将消息加入 async_msg_queue_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L664)
增加 total_reading_cnt_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L526)(这个变量表示已经发消息给下游、但未收到的 ack 数量)
naive_produced_rs_.PopFrontRegsts(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L581)
AsyncSendProducedCtrlRegstMsgToConsumer
注意 naive_produced_rs_.PopFrontRegsts(https://github.com/Oneflow-Inc/oneflow/blob/06a6af1c7f760ba4b12d2dfb8f73d7fda5c7dbab/oneflow/core/lazy/actor/register_slot.cpp#L53)会将 Regst 指针从队列中删掉,相应的可用(https://github.com/Oneflow-Inc/oneflow/blob/06a6af1c7f760ba4b12d2dfb8f73d7fda5c7dbab/oneflow/core/lazy/actor/register_slot.cpp#L49)register 计数减 1(https://github.com/Oneflow-Inc/oneflow/blob/06a6af1c7f760ba4b12d2dfb8f73d7fda5c7dbab/oneflow/core/lazy/actor/register_slot.cpp#L49)。
而在 Actor::HandlerNormal 中处理收到的 kRegstMsg 消息(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L340)时,如果是 consumer 发来的 ack 消息,会调用 TryUpdtStateAsProducedRegst(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L355),将 Regst 再添加到 naive_produced_rs_ 中(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L654),以保证当前 Actor 在收到所有 ack 后是 WriteReady 的;同时递减在读的 register 计数 total_reading_cnt_。
Actor 对依赖的上游消息的处理是类似的。通过以下函数调用给上游发送 ack 消息、通知 register 已经用完,可以继续更新了:
AsyncSendNaiveConsumedRegstMsgToProducer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L431)
AsyncRetInplaceConsumedRegstIfNoConsumer(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L432)在 Actor::HandlerNormal 中收到 kRegstMsg 消息后,将消息添加到 consumed_rs_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L344),以保证当前 Actor 在收到所有依赖数据后是 ReadReady 的。
LightActor 有自己的消息处理机制(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/light_actor.cpp#L299),大致原理应该是差不多的。
7.6 Act 执行的动作
根据上述讨论,Actor 收到 kRegstMsg 后也会进入 ActUntilFail 执行。如果读写都是 Ready,就执行 Act(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L424)。以 WaitAndSendIdsActor 为例,主要调用链路如下:
AsyncLaunchKernel(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L58)
ek.kernel->Launch(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L562),启动 Kernel 计算
Forward(https://github.com/Oneflow-Inc/oneflow/blob/eae9ff38f074479d79ce24b0f6e0594f82126171/oneflow/core/kernel/kernel.cpp#L52)
ForwardDataContent(https://github.com/Oneflow-Inc/oneflow/blob/eae9ff38f074479d79ce24b0f6e0594f82126171/oneflow/core/kernel/kernel.cpp#L65)
buffer->Pull(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L40)
给 regst 的存储地址 mut_dptr 赋值(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L47)
buffer->Pull 会等待条件变量的通知(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L60)。现在,看上去所有 Actor 都已准备就绪,只等发令枪一响就开跑了。
8 启动静态图的计算
Graph.__run(https://github.com/Oneflow-Inc/oneflow/blob/81edd938826a7ea903174d682348847658b64653/python/oneflow/nn/graph/graph.py#L226)会扣动发令枪的板机,启动计算图的一轮计算。
主要调用流程如下:
RunLazyNNGraph(https://github.com/Oneflow-Inc/oneflow/blob/81edd938826a7ea903174d682348847658b64653/python/oneflow/nn/graph/graph.py#L1076)
builder->LaunchLazyJob(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L568)
LaunchLazyJobInstructionType(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/instructions_builder.cpp#L179)
Buffer::Push(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/instructions_builder.cpp#L179)
这里的 Buffer::Push 就是 WaitAndSendIdsKernel 在等待的起跑信号。
9 运行时的退出机制
整个运行时包含很多对象和资源,安全有序的退出是庞杂而又细致的工作。这里仅以 WaitAndSendIds 为例,从一个侧面观察一下运行时的退出机制。
运行时的退出始于 NNGraph 对象的析构(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L76)。
9.1 Actor 的退出
NNGraph 在析构时,会关闭所有的 Buffer 对象(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L82)。
Buffer 在关闭时,会设置 is_closed_ = true 并通知所有监听者(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L81)。但是 Pull 会继续处理完已经提交的计算。
所以,Buffer 应该是主要用于进程内的通信和异步协调的一个类。
WaitAndSendIdsKernel 这时候正在等待新一轮计算开始(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/kernel/wait_and_send_ids_kernel.cpp#L40),结果收到 Pull 返回的 kBufferStatusErrorClosed(https://github.com/Oneflow-Inc/oneflow/blob/49f60e682518436dfeb37344a15902a959e0e4f2/oneflow/core/common/buffer.h#L61)。
WaitAndSendIdsActor::IsCustomizedReadReady 以后就一直返回 false(https://github.com/Oneflow-Inc/oneflow/blob/22f70a1719f371a54512633bb92086580d9c3c89/oneflow/core/lazy/actor/wait_and_send_ids_actor.cpp#L68),IsReadReady 也返回 false(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L533)。
这之后,ActUntilFail 只会执行异步消息发送(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L437)(不再进入 while 循环)
WaitAndSendIdsActor::HandlerNormal 仍然会处理其它 Actor 发来的消息(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L340)。但因为 IsCustomizedReadReady 返回 false,会进入 AsyncSendEORDMsgForAllProducedRegstDesc(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L394)执行。它会给每个下游发送 kEordMsg 消息(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L614)。
Actor 在收到上游发来的 kEordMsg 消息后,递减 remaining_eord_cnt_(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L331)。
remaining_eord_cnt_被初始化为 Actor 的输入 regst 的数量(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L171)。
total_reading_cnt_是当前 Actor 生产的、已经发给 consumer、但尚未收到 ack 的消息数量。
Actor 目前仍可以正常接收 consumer 发来的 ack 消息。
当上述 2 个变量都为 0 时(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L395),意味着所有上游都发出了 kEordMsg 消息,也收到了所有下游的 ack 消息。Actor 就给 Thread 返回 1(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L397)。
如果上述两个变量有不为 0 的,就修改 handler,由 HandlerZombie(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/lazy/actor/actor.cpp#L399)处理后续收到的消息。
Thread 收到 Actor 返回的 1 后(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L84),将它从自己的存储中删除(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L89),并递减运行 Actor 的数量。
9.2 Thread 的退出
NNGraph 重置 runtime_导致运行时对象被析构(https://github.com/Oneflow-Inc/oneflow/blob/8f672eea116cae4a73bb7309e7496b08d7ec9a32/oneflow/core/framework/nn_graph.cpp#L83)。
Runtime 删除所有 Thread(https://github.com/Oneflow-Inc/oneflow/blob/b17a9cd6b930b5817c63623fb682bd708377a93b/oneflow/core/job/runtime.cpp#L117)。
ThreadMgr 给所有 Thread 发送 kStopThread 消息(https://github.com/Oneflow-Inc/oneflow/blob/c8c6d351fa28c5ebce948d69c06670a783f83f74/oneflow/core/thread/thread_manager.cpp#L64)。同时,重置指针导致 Thread 析构(https://github.com/Oneflow-Inc/oneflow/blob/c8c6d351fa28c5ebce948d69c06670a783f83f74/oneflow/core/thread/thread_manager.cpp#L66)。
Thread 的物理线程退出 PollMsgChannel 循环(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L68)。
Thread 等待物理线程结束,关闭 channel(https://github.com/Oneflow-Inc/oneflow/blob/55b822e4d3c88757d11077d7546981309125c73f/oneflow/core/thread/thread.cpp#L52)。
10 分布式场景的静态图
分布式的 compile_job、物理图 Plan 和单机场景有明显变化。
比如,每个进程都有一套 WaitAndSendIds 等控制节点。这也容易理解,因为每个节点都要执行__run 和 Buffer::Push/Pull,都要启动本进程的 Actors 执行计算。
matmul 和 broadcast_add 等 user op 也会在两个节点进行计算。
10.1 示例代码
启动方式参考 Global Tensor 的官方文档。
11 附录
11.1 断点
11.1.1 Python 断点示例
11.1.2 C++断点示例
启动命令
断点示例
11.2 静态图的 json 表示
forward(https://quip.com/OMc4A0HOOr0C)
full(https://quip.com/JLaMAHGBLXmK)
compiled(https://quip.com/tXjuAiS3J0Ab)
plan(https://quip.com/a0DMAAIte6PQ)
11.3 actor type
naive_actor
light_actor
wait_and_send_ids_actor
call_back_notify_actor
12 参考资料
oneflow v0.8.0(https://github.com/Oneflow-Inc/oneflow/tree/release/v0.8.0)
OneFlow 框架的系统设计(上篇)(https://zhuanlan.zhihu.com/p/337851255)
OneFlow 框架的系统设计(中篇)(https://zhuanlan.zhihu.com/p/338699487)
OneFlow 框架的系统设计(下篇)(https://zhuanlan.zhihu.com/p/339208452)
一个 Job 在 OneFlow 中的执行过程—上篇(https://zhuanlan.zhihu.com/p/344531540)
一个 Job 在 OneFlow 中的执行过程—中篇(https://zhuanlan.zhihu.com/p/355654002)
一个 Job 在 OneFlow 中的执行过程—下篇(https://zhuanlan.zhihu.com/p/363689736)
静态图模块 nn.Graph(https://docs.oneflow.org/master/basics/08_nn_graph.html)
OneFlow 系统设计(https://docs.oneflow.org/v0.4.0/basics_topics/essentials_of_oneflow.html)
torch.nn.Module(https://pytorch.org/docs/1.10/generated/torch.nn.Module.html)
其他人都在看
欢迎 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-Inc/oneflow/
版权声明: 本文为 InfoQ 作者【OneFlow】的原创文章。
原文链接:【http://xie.infoq.cn/article/f518cc05dc923bfece86511b3】。文章转载请联系作者。
评论