写点什么

OneFlow 源码解析:Global Tensor

作者:OneFlow
  • 2022-10-31
    重庆
  • 本文字数:6732 字

    阅读完需:约 22 分钟

OneFlow源码解析:Global Tensor

撰文 | 郑建华

更新|赵露阳


上文中讲到的类似于PyTorch中的普通Tensor,在 OneFlow 中称为 Local Tensor。Local Tensor 是单卡视角下的普通 Tensor。与之相对,OneFlow 中还有一个独有的概念——Global Tensor。


Global Tensor 是指被 placement 和 SBP 属性所指定的,一个全局视角下的逻辑 Tensor。Global Tensor 的 shape 是逻辑形状,其真实数据根据 placement 和 SBP 的规则分布在多个 rank 上。


Global Tensor 既可以通过普通的 Local Tensor 通过 tensor.to_global()转换得到,也可以直接用数据或 Numpy 来构造。


下面的小节将通过一个示例(https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html),

展示从普通数据构造 Global Tensor 的过程,以及分别描述 SBP、Placement 和 Global Tensor 构造的细节。


1

Global Tensor 示例


开启 2 个终端,终端一、二分别设置环境变量:

# 终端一export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=0 LOCAL_RANK=0
# 终端二export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=1 LOCAL_RANK=1
复制代码


终端一、二分别执行相同代码:


import oneflow as flowp = flow.placement("cpu", ranks=[0, 1])sbp = flow.sbp.split(0)x = flow.tensor([[1,2,3],[4,5,6]], placement=p, sbp=sbp)print(x.shape)print(x.to_local())
复制代码


终端一、二的输出如下:


# 终端一oneflow.Size([2, 3])tensor([[1, 2, 3]], dtype=oneflow.int64)
# 终端二oneflow.Size([2, 3])tensor([[4, 5, 6]], dtype=oneflow.int64)
复制代码


这个例子中:

  • export xxx 环境变量告诉 oneflow 环境用于通信的 IP 和 Port,以及全局共有 2 个 rank(WORLD_SIZE=2),终端一所在的是 rank0,终端二所在的是 rank1。


  • p = flow.placement("cpu", ranks=[0, 1])设置了 global tensor 将会被放置于 rank0 和 rank1。


  • sbp = flow.sbp.split(0)设置了 global tensor 的 sbp 属性为 split,即按第 0 维度进行切分。


  • x = flow.tensor([[1,2,3],[4,5,6]], placement=p, sbp=sbp)从 python list 数据配合 sbp 和 placement 构造了一个 global tensor x。


这里,x是由[[1,2,3],[4,5,6]]构造而来,其 shape 为(2,3),所以我们print(x.shape)得到的是:oneflow.Size([2, 3]),x 是一个 global tensor,其 shape 表示全局范围内的逻辑形状。


然后,在特定 rank 上执行x.to_local()表示将 global tensor 转为当前 rank 上的 local tensor,由于 x 的 sbp 是 split(0),表示 tensor 按第 0 维切分,即[1,2,3]存放于 rank0;[4,5,6]存放于 rank1。


所以,print(x.to_local())得到终端一的输出为:

tensor([[1, 2, 3]], dtype=oneflow.int64)


终端二的输出为:

tensor([[4, 5, 6]], dtype=oneflow.int64)


当然,上述只是一个小例子,用于理解 global tensor 以及 sbp 和 placement 属性的概念,真实应用场景下,通常都会直接用 local tensor 通过 tensor.to_global(https://oneflow.readthedocs.io/en/master/generated/oneflow.Tensor.to_global.html?highlight=to_global)的方式,来创建 global tensor 并使用。


2

SBP


SBP 由 split, broadcast, partial 的首字母组合而成,SBP 是一种规则,其描述了逻辑 tensor(global tensor)在物理设备上的分布策略。


  • split 表示 global tensor 在各个 rank(物理设备)都存在分片,每个分片可以看作是将 global tensor 沿着某一维度切分得到的本 rank 分量(rank 由 placement 指定)。

  • broadcast 表示 global tensor 在每个 rank 上完全一样,等价于从某个 rank 复制并广播至所有 rank。

  • partial 表示 global tensor 与物理设备上的 tensor 的形状相同,但是物理设备上的值,只是 global tensor 的一部分,global tensor 的值需要这些 rank 上的 local tensor 进行 sum、max、mean 等类似操作。


Python 端 flow.sbp

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/python/oneflow/sbp.py

包定义了 split 等 3 种类型。其 C++ binding 代码在 sbp_symbol.cpp

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/sbp_symbol.cpp#L106-L108)中。这些类型都是 SbpParallel

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.proto#L57)类型,是 protobuf message 对象。三种类型通过 oneof parallel_type(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.proto#L58)共享存储。


其中 broadcast partial_sum 都是空消息,赋值时需要调用 mutable 方法

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/sbp_parallel.cpp#L83)显式表明 oneof 字段具体是哪种类型。split 的值表示在 tensor 的哪个轴上切分数据。轴的 index 值是一个[[0, 5]之间的整数]。所有的 split SbpParallel 对象被保存到一个静态 vector

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/sbp_symbol.cpp#L47)中。


3

Placement 的构造


placement 属性指定逻辑 tensor 实际存放在哪些物理设备上,更具体的,是存放于哪些 rank 上。


在上述例子中:


flow.placement("cpu", ranks=[0, 1])创建了一个 placement 对象。第一个参数是设备类型,目前支持 cpu 或 cuda。ranks[0, 1]表示 tensor 分布在 rank 0 和 rank1 上。


sbp = flow.sbp.split(0)表明 tensor 的数据分布是按 split 切分,且是沿着第 0 维进行切分。


ranks 只列出了 rank id(全局唯一),没有指定节点 host。是因为 rank 与 host 关系已经根据环境变量所确定。环境变量 RANK 表示全局唯一的 rank id,LOCAL_RANK 表示节点内的本地 rank id。在 GPU 环境下,一般一个进程对应一块设备(https://docs.oneflow.org/master/parallelism/04_launch.html#_1)。WORLD_SIZE 表示所有节点的设备(进程)总数。


在通过import oneflow初始化 oneflow 时,会根据环境变量在各个节点间建立控制面通信连接(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/env_global_objects_scope.cpp#L173-L175),以及数据面通信连接。这样每个进程就知道有多少个节点、有多少个设备/进程、当前进程在整个集群的位置。


通过 placement 的构造函数绑定(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L202)可以知道,其对应的 C++类型是 ParallelDesc

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/parallel_desc.h#L42)。对象构造由函数 CreateParallelDescSymbol

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L154)完成。主要调用流程如下:



3.1 确定 machine 和 device


ParseAndFormatRanks

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L113)会将 ranks 数组[0, 1]转为形如"machine_id:device_id"的字符串数组,供后续处理使用。这里的逻辑决定了如何根据 ranks 中的 id,确定 tensor 数据在节点和设备上的分布:


  • machine_id=rank / NumOfProcessPerNode

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/rpc/lib/global_process_ctx.cpp#L56

  • device_id=rank % NumOfProcessPerNode

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/rpc/lib/global_process_ctx.cpp#L84


从上述公式可以看出,各个节点的设备/进程数量需要是一致的。


3.2 构造并缓存 ParallelDesc 对象


CreateParallelDesc

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L67)函数完成 ParallelDesc 的构造。其中

MakeParallelConf

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/parallel_conf_util.cpp#L34)会先根据"machine_id:device_id"等数据构造一个 cfg::ParallelConf 对象,这是一个类似 oneflow::ParallelConf

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/job/placement.proto#L12)的类型,文件位于 build/oneflow/core/job/placement.cfg.h,是 cmake 构建过程中自动生成的文件。


cfg::ParallelConf 等对象的接口类似 protobuf message,但实现了 hash 方法,可以作为 hash map 的 key。


之后的 PhysicalRun

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/symbol/placement_symbol.cpp#L72)虽然涉及虚拟机,但实际执行的 op 指令应该是空的,实质性的逻辑只是调用 builder 的 GetParallelDescSymbol

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/instructions_builder.cpp#L216),其中的核心逻辑是 FindOrCreate

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/instructions_builder.cpp#L217),从缓存中查找 ParallelDesc 或创建新的缓存。


4

 Global Tensor 构造调用流程


下面以本文开始的例子分析一下构造 global tensor 的调用流程。这可能不是一个典型的场景,只是人为指定一个简单的数据便于展示和 debug。


通过之前讨论 local tensor 时的类关系图可以知道,EagerGlobalTensorImpl 内含一个 local tensor 的变量(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/tensor_impl.h#L339)。可以想象,构造 global tensor 时,会先构造一个 local tensor、再做一些后续处理。


Python 端创建 tensor 对象时,如果像本文开始的例子那样指定 placement、sbp 和数据,对应的 Functor 是 GlobalTensorWithDataCtorFunctor

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/functional/tensor_api.cpp#L158)。核心逻辑在 MakeGlobalTensorFromData

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L227)中,其主要调用流程如下:


上述各个部分的主要职能如下:


  • DataConsistencyCheck(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L251)会在 tensor 的 placement 涉及的各个节点间拷贝数据、校验数据是否一致。


  • functional::Empty

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L256)会根据 shape 和 dtype 构造一个 local tensor,并等待随后填充数据(这里和之前讨论 local tensor 的过程一致)。


  • SwitchCopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L257)为 empty 的 local tensor 填充数据,数据既可以是本例中的 python list,也可以是 numpy 的 ndarray。


  • functional::Cast

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L267)进行数据类型 dtype 的转换。


  • functional::LocalToGlobal

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L272-L274)把 local tensor 转为 global tensor,但这个只是用于 broadcast 至指定 placement 的临时的 global tensor(sbp list 全部为 broadcast,用于广播)。


  • functional::ToGlobal

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L277-L279)将临时的 global tensor 根据 placement 和 sbp,ToGlobal 转换为最终的 global tensor。


5

用 flow.randn 构造 Global Tensor


下面看一个通过 op 构造 global tensor 的例子


# 终端一# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=0 LOCAL_RANK=0# 终端二# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=1 LOCAL_RANK=1
import oneflow as flowp = flow.placement("cpu", ranks=[0, 1])sbp = flow.sbp.split(0)x = flow.randn(4, 5, placement=p, sbp=sbp)print(x.shape) # (4,5)print(x.to_local().shape) # (2,5)
复制代码


randn op 在 local 和 global 下分别对应着不同的 functor 实现:


# oneflow/core/functional/functional_api.yaml- name: "randn"  signature: [      "Tensor (Shape size, *, DataType dtype=None, Device device=None,      Generator generator=None, Bool requires_grad=False) => RandN",      "Tensor (Shape size, *, Placement placement, SbpList sbp, DataType dtype=None,      Generator generator=None, Bool requires_grad=False) => GlobalRandN",    ]  bind_python: True
复制代码


普通的 flow.randn 对应RandNFunctor,而 global 版本(带 placement 和 sbp 参数)的 randn 则对应的是GlobalRandNFunctor


可以看到:


  • GlobalRandNFunctor

https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/functional/impl/random_functor.cpp#L194)中主要 dispatch 了"normal" op,在 Eager Global 的 mode 下, 会交给EagerGlobalInterpreter进行各种推导和准备工作(Interpret[https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L110]),并在Interpret方法里通过PhysicalRun,将 normal op 执行的指令交给虚拟机调度并执行。


  • EagerGlobalTensorImpl::New(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L138)时会调用 GetPhysicalShape(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/tensor_impl.cpp#L207)获取 local tensor 的 shape。


这里,我们可以合理猜测,在每个 rank 上都会经过同样的 Interpret、调用同样的 normal op,生成本 rank 下部分的 randn 结果——local tensor,其 shape 都为(2, 5),经过组装得到 global tensor x,其 shape 为(4, 5)。经过 debug 验证了上述猜测是正确的。从这个例子中,大致可以得到结论:


1.Global Tensor 其实是基于 Local Tensor 以及 SBP 和 placement 的一层封装,其 shape 为全局逻辑形状;其数据由各个 ranks 所持有(ranks 由 placement 指定)。


2.每个 rank 上的数据分片都是独立的 Local Tensor,经过 SBP 规则的组装,得到上层的 Global Tensor。


3.Global Tensor 的计算实际上就是通过不同 rank 上数据分片(Local Tensor)独立经过 kernel 计算、boxing 机制等组合完成的。


参考资料:


其他人都在看


欢迎下载体验 OneFlow v0.8.0 最新版本:https://github.com/Oneflow-Inc/oneflow/


发布于: 刚刚阅读数: 4
用户头像

OneFlow

关注

不至于成为世界上最快的深度学习框架。 2022-03-23 加入

★ OneFlow深度学习框架:github.com/Oneflow-Inc/oneflow ★ OF云平台:oneflow.cloud

评论

发布
暂无评论
OneFlow源码解析:Global Tensor_人工智能_OneFlow_InfoQ写作社区