OneFlow 源码解析:Global Tensor
撰文 | 郑建华
更新|赵露阳
上文中讲到的类似于PyTorch中的普通Tensor,在 OneFlow 中称为 Local Tensor。Local Tensor 是单卡视角下的普通 Tensor。与之相对,OneFlow 中还有一个独有的概念——Global Tensor。
Global Tensor 是指被 placement 和 SBP 属性所指定的,一个全局视角下的逻辑 Tensor。Global Tensor 的 shape 是逻辑形状,其真实数据根据 placement 和 SBP 的规则分布在多个 rank 上。
Global Tensor 既可以通过普通的 Local Tensor 通过 tensor.to_global()转换得到,也可以直接用数据或 Numpy 来构造。
下面的小节将通过一个示例(https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html),
展示从普通数据构造 Global Tensor 的过程,以及分别描述 SBP、Placement 和 Global Tensor 构造的细节。
1
Global Tensor 示例
开启 2 个终端,终端一、二分别设置环境变量:
终端一、二分别执行相同代码:
终端一、二的输出如下:
这个例子中:
export xxx 环境变量告诉 oneflow 环境用于通信的 IP 和 Port,以及全局共有 2 个 rank(WORLD_SIZE=2),终端一所在的是 rank0,终端二所在的是 rank1。
p = flow.placement("cpu", ranks=[0, 1])
设置了 global tensor 将会被放置于 rank0 和 rank1。
sbp = flow.sbp.split(0)
设置了 global tensor 的 sbp 属性为 split,即按第 0 维度进行切分。
x = flow.tensor([[1,2,3],[4,5,6]], placement=p, sbp=sbp)
从 python list 数据配合 sbp 和 placement 构造了一个 global tensor x。
这里,x
是由[[1,2,3],[4,5,6]]构造而来,其 shape 为(2,3),所以我们print(x.shape)
得到的是:oneflow.Size([2, 3])
,x 是一个 global tensor,其 shape 表示全局范围内的逻辑形状。
然后,在特定 rank 上执行x.to_local()
表示将 global tensor 转为当前 rank 上的 local tensor,由于 x 的 sbp 是 split(0),表示 tensor 按第 0 维切分,即[1,2,3]存放于 rank0;[4,5,6]存放于 rank1。
所以,print(x.to_local())
得到终端一的输出为:
tensor([[1, 2, 3]], dtype=oneflow.int64)
终端二的输出为:
tensor([[4, 5, 6]], dtype=oneflow.int64)
当然,上述只是一个小例子,用于理解 global tensor 以及 sbp 和 placement 属性的概念,真实应用场景下,通常都会直接用 local tensor 通过 tensor.to_global(https://oneflow.readthedocs.io/en/master/generated/oneflow.Tensor.to_global.html?highlight=to_global)的方式,来创建 global tensor 并使用。
2
SBP
SBP 由 split, broadcast, partial 的首字母组合而成,SBP 是一种规则,其描述了逻辑 tensor(global tensor)在物理设备上的分布策略。
split 表示 global tensor 在各个 rank(物理设备)都存在分片,每个分片可以看作是将 global tensor 沿着某一维度切分得到的本 rank 分量(rank 由 placement 指定)。
broadcast 表示 global tensor 在每个 rank 上完全一样,等价于从某个 rank 复制并广播至所有 rank。
partial 表示 global tensor 与物理设备上的 tensor 的形状相同,但是物理设备上的值,只是 global tensor 的一部分,global tensor 的值需要这些 rank 上的 local tensor 进行 sum、max、mean 等类似操作。
Python 端 flow.sbp
包定义了 split 等 3 种类型。其 C++ binding 代码在 sbp_symbol.cpp
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/sbp_symbol.cpp#L106-L108)中。这些类型都是 SbpParallel
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.proto#L57)类型,是 protobuf message 对象。三种类型通过 oneof parallel_type(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.proto#L58)共享存储。
其中 broadcast 和 partial_sum 都是空消息,赋值时需要调用 mutable 方法
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.cpp#L83)显式表明 oneof 字段具体是哪种类型。split 的值表示在 tensor 的哪个轴上切分数据。轴的 index 值是一个[[0, 5]之间的整数]。所有的 split SbpParallel 对象被保存到一个静态 vector
3
Placement 的构造
placement 属性指定逻辑 tensor 实际存放在哪些物理设备上,更具体的,是存放于哪些 rank 上。
在上述例子中:
flow.placement("cpu", ranks=[0, 1])
创建了一个 placement 对象。第一个参数是设备类型,目前支持 cpu 或 cuda。ranks[0, 1]表示 tensor 分布在 rank 0 和 rank1 上。
sbp = flow.sbp.split(0)
表明 tensor 的数据分布是按 split 切分,且是沿着第 0 维进行切分。
ranks 只列出了 rank id(全局唯一),没有指定节点 host。是因为 rank 与 host 关系已经根据环境变量所确定。环境变量 RANK 表示全局唯一的 rank id,LOCAL_RANK 表示节点内的本地 rank id。在 GPU 环境下,一般一个进程对应一块设备(https://docs.oneflow.org/master/parallelism/04_launch.html#_1)。WORLD_SIZE 表示所有节点的设备(进程)总数。
在通过import oneflow
初始化 oneflow 时,会根据环境变量在各个节点间建立控制面通信连接(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/env_global_objects_scope.cpp#L173-L175),以及数据面通信连接。这样每个进程就知道有多少个节点、有多少个设备/进程、当前进程在整个集群的位置。
通过 placement 的构造函数绑定(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L202)可以知道,其对应的 C++类型是 ParallelDesc
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/parallel_desc.h#L42)。对象构造由函数 CreateParallelDescSymbol
3.1 确定 machine 和 device
ParseAndFormatRanks
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L113)会将 ranks 数组[0, 1]转为形如"machine_id:device_id"的字符串数组,供后续处理使用。这里的逻辑决定了如何根据 ranks 中的 id,确定 tensor 数据在节点和设备上的分布:
machine_id=rank / NumOfProcessPerNode
device_id=rank % NumOfProcessPerNode
从上述公式可以看出,各个节点的设备/进程数量需要是一致的。
3.2 构造并缓存 ParallelDesc 对象
CreateParallelDesc
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L67)函数完成 ParallelDesc 的构造。其中
MakeParallelConf
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/parallel_conf_util.cpp#L34)会先根据"machine_id:device_id"等数据构造一个 cfg::ParallelConf 对象,这是一个类似 oneflow::ParallelConf
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/placement.proto#L12)的类型,文件位于 build/oneflow/core/job/placement.cfg.h,是 cmake 构建过程中自动生成的文件。
cfg::ParallelConf 等对象的接口类似 protobuf message,但实现了 hash 方法,可以作为 hash map 的 key。
之后的 PhysicalRun
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L72)虽然涉及虚拟机,但实际执行的 op 指令应该是空的,实质性的逻辑只是调用 builder 的 GetParallelDescSymbol
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/instructions_builder.cpp#L216),其中的核心逻辑是 FindOrCreate
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/instructions_builder.cpp#L217),从缓存中查找 ParallelDesc 或创建新的缓存。
4
Global Tensor 构造调用流程
下面以本文开始的例子分析一下构造 global tensor 的调用流程。这可能不是一个典型的场景,只是人为指定一个简单的数据便于展示和 debug。
通过之前讨论 local tensor 时的类关系图可以知道,EagerGlobalTensorImpl 内含一个 local tensor 的变量(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/tensor_impl.h#L339)。可以想象,构造 global tensor 时,会先构造一个 local tensor、再做一些后续处理。
Python 端创建 tensor 对象时,如果像本文开始的例子那样指定 placement、sbp 和数据,对应的 Functor 是 GlobalTensorWithDataCtorFunctor
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/functional/tensor_api.cpp#L158)。核心逻辑在 MakeGlobalTensorFromData
上述各个部分的主要职能如下:
DataConsistencyCheck(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L251)会在 tensor 的 placement 涉及的各个节点间拷贝数据、校验数据是否一致。
functional::Empty
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L256)会根据 shape 和 dtype 构造一个 local tensor,并等待随后填充数据(这里和之前讨论 local tensor 的过程一致)。
SwitchCopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L257)为 empty 的 local tensor 填充数据,数据既可以是本例中的 python list,也可以是 numpy 的 ndarray。
functional::Cast
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L267)进行数据类型 dtype 的转换。
functional::LocalToGlobal
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L272-L274)把 local tensor 转为 global tensor,但这个只是用于 broadcast 至指定 placement 的临时的 global tensor(sbp list 全部为 broadcast,用于广播)。
functional::ToGlobal
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L277-L279)将临时的 global tensor 根据 placement 和 sbp,ToGlobal 转换为最终的 global tensor。
5
用 flow.randn 构造 Global Tensor
下面看一个通过 op 构造 global tensor 的例子
randn op 在 local 和 global 下分别对应着不同的 functor 实现:
普通的 flow.randn 对应RandNFunctor
,而 global 版本(带 placement 和 sbp 参数)的 randn 则对应的是GlobalRandNFunctor
。
可以看到:
GlobalRandNFunctor
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/functional/impl/random_functor.cpp#L194)中主要 dispatch 了"normal" op,在 Eager Global 的 mode 下, 会交给EagerGlobalInterpreter
进行各种推导和准备工作(Interpret[https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L110]),并在Interpret
方法里通过PhysicalRun
,将 normal op 执行的指令交给虚拟机调度并执行。
EagerGlobalTensorImpl::New(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L138)时会调用 GetPhysicalShape(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/tensor_impl.cpp#L207)获取 local tensor 的 shape。
这里,我们可以合理猜测,在每个 rank 上都会经过同样的 Interpret、调用同样的 normal op,生成本 rank 下部分的 randn 结果——local tensor,其 shape 都为(2, 5),经过组装得到 global tensor x,其 shape 为(4, 5)。经过 debug 验证了上述猜测是正确的。从这个例子中,大致可以得到结论:
1.Global Tensor 其实是基于 Local Tensor 以及 SBP 和 placement 的一层封装,其 shape 为全局逻辑形状;其数据由各个 ranks 所持有(ranks 由 placement 指定)。
2.每个 rank 上的数据分片都是独立的 Local Tensor,经过 SBP 规则的组装,得到上层的 Global Tensor。
3.Global Tensor 的计算实际上就是通过不同 rank 上数据分片(Local Tensor)独立经过 kernel 计算、boxing 机制等组合完成的。
参考资料:
OneFlow 源码
(https://github.com/Oneflow-Inc/oneflow/commit/fca713f45a2f55379eb4284848a8f62d0f266283)
Global Tensor:https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html
集群的全局视角:https://docs.oneflow.org/master/parallelism/02_sbp.html
其他人都在看
九大深度学习库;谷歌文字生成视频的两大利器
欢迎下载体验 OneFlow v0.8.0 最新版本:https://github.com/Oneflow-Inc/oneflow/
版权声明: 本文为 InfoQ 作者【OneFlow】的原创文章。
原文链接:【http://xie.infoq.cn/article/9230c70f75d108759b4fef351】。文章转载请联系作者。
评论