OneFlow 源码解析:Eager 模式下的设备管理与并发执行
作者|郑建华
更新|赵露阳
通过这篇笔记,希望能初步了解 OneFlow 在 Eager 模式下对设备的管理方式、设备执行计算的过程以及如何充分利用设备计算能力。这里的设备主要指类似 CUDA 这样的并行计算加速设备。
1、设备、流相关类型及关系
框架通过流(Stream)向设备(Device)提交计算任务。一个 Stream 是一个命令序列,可以类比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#streams),或者 CPU Thread 的指令序列。同一个 Stream 中的命令按顺序执行;不同 Stream 之间的命令有依赖关系时,需要同步。不同的任务,比如 kernel 计算、host2device、device2host 等都有自己独立的 Stream,可以并发执行,从而在 Eager 模式下尽可能充分利用设备的异步并发执行能力。
OneFlow 中 Device 和 Stream 相关的部分类结构如下所示:
Device 相关类型
oneflow::Deviceoneflow::Device 是用于表示设备的基础类型,例如:构建 tensor 时 flow.tensor(shape, device="cuda:1")
就会在内部构造出这个基础的 Device 类型,其中设备编号为 1、设备类型为 CUDA。
oneflow/core/framework/device.h:
oneflow::Device
中最重要的两个成员变量分别是用于表示设备类型的DeviceType
;用于表示设备编号的 device_id_。
DeviceType
DeviceType
是一个枚举类,不同的值代表不同的计算设备类型,其定义位于 oneflow/core/common/device_type.proto
:
目前在 oneflow master 分支中,主要有kCPU
表示 cpu 设备;kCUDA
表示 nvidia cuda 设备;在其他多设备支持的分支中,这里还增加了更多的设备类型。
oneflow::ep::Device
oneflow::Device
是 oneflow 中对设备的基础封装类型,而oneflow::ep::Device
则是一个抽象类,属于 oneflow 的 ep 模块(execution provider),是对设备行为的封装,ep 模块为多硬件设备提供了更高层次的抽象,方便 oneflow 支持和兼容多硬件设备提供了更高的灵活性和可拓展性。
oneflow::ep::Device
不仅提供了表示设备类型的device_type()
方法、表示设备编号的device_index()
方法,还提供了创建/销毁ep::Stream
、创建/销毁Event
、在设备上申请/释放内存的各种方法。
oneflow/core/ep/include/device.h
:
oneflow::ep::Device
有如下子类实现:
Stream 相关类型
oneflow::Stream 和 cuda device 以及 stream 的关系类似,oneflow 中也存在类似的基础 Stream 类型。
oneflow/core/framework/stream.h
:
可以看见 Stream 类中的成员变量:
device_ 表示该 Stream 对象将在何种设备上执行
streamtype_ 表示该 Stream 的类型,是用于计算的 compute stream 还是用于数据搬运的 host2device、device2host stream 等
threaduid_ 表示负责启动该 Stream 的线程 id
unique_streamid_ 表示这个 stream 自身的 unique id
StreamType
和DeviceType
分为 kCpu 和 kCuda 类似,Stream
也有各种类型之分,具体如下:
oneflow/core/common/stream_type.h
oneflow::ep::Stream
oneflow 中的 ep 模块提供了一个更高层次的对 Stream 的抽象类,除了可以获取设备的device()
、获取设备类型的device_type()
方法外,还提供了一系列虚方法如:
同步 Sync()
执行 Event 事件 RecordEvent()
oneflow/core/ep/include/stream.h
:
oneflow::ep::Stream
有如下子类实现:
oneflow::vm::Stream
oneflow vm(virtual machine)中的oneflow::vm::Stream
类型,用于 vm 内部维护 stream 极其依赖关系、StreamPolicy、调度线程等。
oneflow/core/vm/stream.h
:
StreamPolicy
StreamPolicy
是 oneflow vm 中独有的概念,提供了一系列虚方法如:
stream() 获取
oneflow::ep::Stream
指针mut_allocator() 获取
vm::Allocator
指针(用于 tensor 内存管理)device_type() 获取 device 设备类型
除此之外,提供了一系列 vm 相关的指令状态初始化、查询、删除等方法。
oneflow/core/vm/stream_policy.h
:
StreamPolicy 有如下子类实现:
2、Eager Local 模式下的 Device 和 Stream 推导
下面,梳理一下普通的 eager 模式(eager local mode)下,算子执行全过程中 device 和 stream 相关的推导流程。
2.1 推导 Device
首先,对于一个算子(op)来说,要为其设置一个默认的 device 用于实际计算,这一步在:
这里GetDefaultDevice
的逻辑是:
1.如果 inputs tensor 非空,则根据第一个 input tensor 的 device 来设置 default 的 device
2.如果 inputs tensor 为空,则优先从 OpExprInterpContext 中获取 device,若 OpExprInterpContext 中未设置,则会通过
Device::New("cpu")
;默认给一个 cpu device
值得说明的是,在 1.种情况时,如果 input tensor 创建时指定了 device 为 cuda 设备,则这里推导出的 default device 同样为相同的 cuda device;如果未显示指定,则默认还是 cpu device。
2.2 推导 Stream
oneflow::Stream 的推导主要在:
JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args))
);
Symbol<Stream> stream = JUST(InferDeviceAndStream(...));
InferDeviceAndStream
中,Stream
推导的逻辑是会根据 user_op_expr 是否定义了 device_and_stream_infer_fn 而有所区别
(少数情况)如果该 op 定义了推导函数,则调用此推导函数来推导 Stream,例如 tensor.cuda()方法,inputs 在 CPU 上, outputs 在 CUDA,二者的设备类型不同。这时就不会默认推导而是利用 op 注册的推导函数获取 oneflow::Stream(
。例如 CopyOp::InferDeviceAndStream。
(多数情况)否则会通过
stream = JUST(GetDefaultStreamByDevice(default_device))
;
来推导。
GetDefaultStreamByDevice
的具体实现:
可以看见,根据传入的device
、StreamType::kCompute
,new 了一个oneflow::Stream
。
2.3 InstructionsBuilder::Call 和 vm::Stream 推导
在上述 device 和 stream 推导完成后,会通过 InstructionsBuilder 调用 Call 方法:
Call 方法中会通过
JUST(SoftSyncStream(output_eager_blob_objects, stream));
JUST(SoftSyncStream(input_eager_blob_objects, stream));
auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
完成 outputs inputs tensor 的流同步(SoftSyncStream)过程以及vm::Stream
的推导,然后通过构造OpCallInstructionPolicy
指令派发至 vm 执行。
SoftSyncStream 的同步这里省略,具体过程见第 4 节。
2.3.1 构造 ThreadCtx 对象,启动执行指令的线程
ThreadCtx 对象指针保存在 VirtualMachine 的 HashMap 中。每个 DeviceType(CPU 或 CUDA)对应一个 ThreadCtx 对象;临界区和 LazyJob 有自己的 ThreadCtx 对象。
首次访问 HashMap 时得到的是零值(空指针),需要调用 CreateThreadCtx 创建对象。实际通过虚拟机指令创建对象,ThreadCtx 对象保存在 VirtualMachineEngine::thread_ctx_list_ 中。
ThreadCtx 对象构造后,会创建一个 worker 线程、执行 WorkerLoop 方法,并添加到 worker_threads_。所以 worker_threads_ 是与 ThreadCtx 对象一一对应的。
这个线程负责其所归属的指令的执行:
WorkerLoop 在收到通知后,会调用 ThreadCtx::TryReceiveAndRun 处理指令。
在这个函数中,将 ThreadCtx 的指令挪到临时列表、通过 StreamPolicy 执行每个指令。
ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 时添加进去的。
ThreadCtx 创建完成后,将持有 vm::Stream 对象。oneflow::vm::Stream
和oneflow::Stream
的数量是一一对应的,vm::Stream 按照<DeviceType, StreamRole>分组存储在对应的 ThreadCtx 中。
vm::Stream
的推导流程细节如下:
auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
VirtualMachine::GetVmStream()
Maybevm::Stream* VirtualMachine::CreateStream(Symbol<Stream> stream)
Stream::Init(ThreadCtx* thread_ctx, Symbol<Device> device, StreamType stream_type...)
stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
2.4 执行 OpCall 指令和 ep::Stream 推导
有几个场景会创建(获取) ep::Stream 对象。比如 kernel 执行时。
OpCall 指令在构造时,指令策略类型是 OpCallInstructionPolicy。虚拟机在 DispatchInstruction 时,无论哪个分支,后续都会调用 EpStreamType::Run,最终通过
EpStreamPolicyBase::Run()
instruction->Compute()
OpCallInstructionPolicy::Compute()
OpCallInstructionUtil::Compute()
OpCallInstructionUtil::OpKernelCompute()
op_call_instruction_policy->mut_opkernel()->Compute()执行 kernel 的 Compute 方法
例如 GpuL2NormalizeKernel::Compute,最终在其 kernel 的 Compute 方法中,会通过 ctx->stream()创建(获取)ep::Stream 对象,launch kernel 执行计算。
2.4.1 获取/创建 ep::Stream
下面,我们重点看一下 OpCall 指令实际执行时,调用的 OpCallInstructionUtil::Compute()方法:
其中会通过ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream()
;完成ep::Stream
的推导,之后在OpKernelCompute()
方法中实际完成 op/kernel 的执行。
ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();
ep::Stream* stream() override { return GetOrCreateEpStream(); }
GetOrCreateEpStream()
ep_stream_ = GetOrCreateEpDevice()->CreateStream();
这里->stream()会调用 ep_stream_policy_base.h 中的:
ep::Stream* stream() override { return GetOrCreateEpStream(); }
这是一个 private 方法:
可以看到,如果成员变量ep_stream_
非空,则直接返回;否则,通过 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 来创建创建ep::Stream
。
2.4.2 获取/创建 ep::Device
而这里的GetOrCreateEpDevice
方法如下:
ep::Device* GetOrCreateEpDevice() const {if (unlikely(ep_device_ == nullptr)) {ep_device_ = Singletonep::DeviceManagerRegistry::Get()->GetDevice(device_->enum_type(),device_->device_id());CHECK(ep_device_);}return ep_device_.get();}
根据oneflow::Device
中拿到的 device id 和 device type,去全局单例的ep::DeviceManagerRegistry
中取出对应的oneflow::ep::Device
oneflow::vm::StreamPolicy 和 oneflow::vm::EpStreamPolicy 推导
stream_policy_ =
CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));
3、Eager Global 模式下的 Device 和 Stream 推导
eager global 模式下,device 信息隐藏在 placement 中,placement 不仅包括了 device type 信息还包括其 tensor 具体分布在哪些 ranks 上的信息,placement
在 C++ 中的对应类型是 ParallelDesc。
所以 device 以及 stream 的部分推导过程和 eager local 模式下有所区别,但 OpCall 指令执行;device、vm::Stream 和 ep::Stream 的推导过程都和 eager local 模式下是类似的。
3.1 推导 Device
3.1.1 placement 的 parallel_id
oneflow 中的 placement 表示 tensor 存放的设备集群(device group),如:
p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])
表示 tensor 分布于 1 台机器上,cuda device 0、1、2、3 四个设备上;
p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])
则表示 tensor 分布于 2 台机器上,host1 的 device0、1 以及 host2 的 device2、3。
在 oneflow 的分布式环境下,各个 host 上需要有相同数量的 device,每个进程使用一个 device。这样根据环境变量 RANK
可以得出 machine_id,LOCAL_RANK
就是进程在 制定 host 上的 rank 序号。
如果 input tensor 的 placement 与当前进程无关,可以省掉很多不必要的计算。通过 placement 的 parallel_id 可以判断计算任务是否与当前进程相关。
placement
在 C++ 中的对应类型是 ParallelDesc,其中并没有 parallel_id 字段,这个信息隐含在其它字段中。
ParallelDesc 在构造时会调用 ClearUp 函数,从中可以看到
ParallelDesc::parallel_id2machine_id_ 是 placement 分布的 machine。
ParallelDesc::parallel_id2device_id_ 是 placement 分布的 device_id。
parallel_id 是上述 2 个数组的索引,一个 parallel_id 对应一个 machine_id:device_id 组合。这样,根据 parallel_id 可以查到对应的 machine_id 和 device_id。
反过来,根据 machine_id:device_id 也可以从 machine_id2device_id2parallel_id_ 查到 parallel_id。
3.1.2 eager 模式下根据 parallel_id 忽略无关计算任务
在 eager 分布时场景处理计算任务时,会调用 GetTensorDevice4CurrentProcessCtx
,推导得到输出 tensor 的 device,以及获取当前进程的 machine_id、device_id 在 placement 中的 parallel_id 值。
如果当前进程与该 placement 无关,parallel_id 就是空,后续处理时就可以忽略一些计算:
EagerGlobalTensorImpl::New 中只需要用 functional::Empty 构造一个 shape 为 0 的空的 tensor。
GetBoxingOutput 计算时,如果 parallel_id 为空则表示当前 rank 进程无效,无需计算直接返回。
Interpret 可以不给 vm 提交指令、提前返回。
3.2 推导 Stream
在 ConsistentTensorInferCache
中推导 SBP Signature 时,也会同时推导出当前的 tensor 计算任务、在当前进程所用的 device。推导时,会先确认所有 inputs 的 placement 是一致的,都分布在相同的 device 上。如前所述,如果计算任务与当前进程无关,会提前返回;而一个进程只使用一个 device。
这里和 eager local 模式下 stream 的推导类似,通过JUST(InferDeviceAndStream(user_op_expr, infer_args))
推导出oneflow::Stream
对象,StreamRole 是 kCompute。区别在于 eager global 模式下
3.2.1 unique_stream_id
unique_stream_id
表示 oneflow::Stream
对象的创建次序。
所有的 oneflow::Stream
对象都保存在全局的 StreamMgr::stream2unique_stream_id_ 中。unique_stream_id2stream_symbol_
可看作是引用类型的副本,unique_stream
_id 就是 Stream 对象在这个数组中的索引。与 parallel_id 不同,unique_stream_id 是 Stream 对象在进程内的唯一标识。
并不是每次都需要加锁访问 StreamMgr
。oneflow::Stream
包含的都是描述性信息,其引用是以 ThreadLocal 的方式存储的,可以提升后续读取的效率。虚拟机在执行指令时,也会用 unique_stream_id 进行逻辑判断。
4、Eager 模式下的 Stream 同步——SoftSyncStream
设想以下场景:将 CPU 下的 tensor 拷贝到 CUDA 设备,然后在 CUDA 上再进行 tensor add 的计算。这涉及到两个流,一个是 Host2Device,一个是 CUDA Compute。这两个流的计算任务是并发执行的。需要有同步措施,才能保证拷贝完再执行 add 计算。
Eager 模式下,在 InstructionsBuilder::Call
中构造指令时,对 SoftSyncStream 的调用会在必要时向指令列表插入同步指令。
SoftSyncStream
中,几个重要概念:
tensor 在 oneflow 内存中的实际承载者是 eager_blob_object
last_used_stream 表示一个 tensor(blob)上一次使用到的 stream,可能是 compute stream、h2d stream、d2h stream、集合通信 ccl stream 等
如果 last_used_stream 与当前计算执行的流 stream 相同,则可以忽略,因为相同 stream 间天然顺序执行所以无需同步,否则就需要进行后续的同步处理
SoftSyncStream 代码如下:
主体逻辑是,会在ForEachEagerBlobObjectsNeedingSoftSync
方法中遍历每一个 tensor 对象(eager blob object),对于每一个需要同步的 blob 运用 lambda 方法并最终调用SoftSyncStreamBetween
完成 stream 间的同步。
这里,我们看一下 ForEachEagerBlobObjectsNeedingSoftSync 的逻辑:
首先 if/else 的主体业务逻辑是类似的,主要区别在于,当 blob 的 size <= kOpArgsReservedSize 时(默认为 4)会使用 small vector 来存放 LocalDepObject 变量,效率会更快一些(否则会走到 else 分支,主体逻辑类似,这里就不看了)。
const auto& opt_last_used_stream = eager_blob_object->last_used_stream()
;
if (unlikely(!opt_last_used_stream.has_value())) { continue; }
这两句是查询该 tensor(blob)上一次被使用时用到的 stream——last_used_stream,如果为空,则直接 continue 跳过,因为如果此 tensor 之前并未被任何 stream 使用,则无需进行 stream 间的同步操作,因为在当前 stream 上不会有关于该 tensor 的其他依赖关系;
如果last_used_stream!=stream
则表示需要在两个 stream 间进行同步,则会应用传入的 lambda 函数 DoEach 进行处理,在这里 lambda 函数即:
既实际调用的是 SoftSyncStreamBetween 来完成实际的 stream 间同步,这里主要有 3 个变量:
dep_objects
存储了 tensor 间的依赖关系
last_used_stream
则是该 tensor 上一次使用的 stream
stream
该 tensor 当前使用的 stream
SoftSyncStreamBetween 的代码如下:
SoftSyncStreamBetween
的主要逻辑如下:
先额外做了一次 check,检测如果待同步的两个 stream 相同,则 check 会报错并提示"synchronization is unnecessary"
通过
SupportingStreamWait
判断 from 和 to stream 间是否支持 stream wait,是则调用 StreamWait 方法;否则,直接调用RecordEvent
方法SupportingStreamWait
的主要逻辑是,通过 stream 的 device、以及StreamType
的 Visit 方法来判断。简单来说,如果 from 和 to stream 之间是不同的 device(譬如 cpu stream <-> cuda stream 之间的同步),或者 from stream 的 device 为 cpu,则 SupportingStreamWait 一定是 false;如果是相同的,则继续通过其他判断条件进行判断。
SupportingStreamWait 为 True
SupportingStreamWait
为 True 时,即 from to stream 同为 Cuda Stream 间的同步情况,在这种情况下会走到 StreamWait 的函数,该函数最终会派发一个StreamWaitEventInstructionPolicy
的指令给 vm 执行,StreamWaitEventInstructionPolicy 的执行逻辑主要是两个 cuda event:
cudaEventRecord
cudaStreamWaitEvent
对于 from_stream 来说,插入一个
cudaEventRecord
,用于标志 from stream 是否完成该 stream 上的 event 事件;对于 to_stream 来说,插入一个
cudaStreamWaitEvent
等待 from stream 上的事件完成后,再继续执行 to_stream。
SupportingStreamWait 为 False
SupportingStreamWait
为 False 时,会直接调用JUST(RecordEvent(std::move(dependences)
, from_stream)); 其内部实现会从对象池中获取可复用的 cuda event 对象并执行 event。
这里有个细节,由于 cuda event 的创建和销毁都会引发 cuda kernel 的 launch 由异步转同步,所以基于对象池的 cuda event 可以避免这个开销。
实际上最终调用的还是cudaEventRecord
,而cudaEventRecord
本身只是起到一个“占位符”的作用,并不能起到(保证该 stream 上其他 kernel 全部执行完)的作用,真正能保证 stream 同步作用的是 oneflow vm(vitual machine)控制下的指令间依赖关系/执行顺序。
5、CPU 下的并行计算
CpuStream 只有一个线程。CPU kernel 应该是通过 OpenMP 或者 Intel OneApi 等实现并行计算加速。
参考资料 1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa
其他人都在看
欢迎 Star、试用 OneFlow 最新版本:https://github.com/Oneflow-Inc/oneflow/
版权声明: 本文为 InfoQ 作者【OneFlow】的原创文章。
原文链接:【http://xie.infoq.cn/article/fc331d59437f4686f7cab7aaa】。文章转载请联系作者。
评论