写点什么

OneFlow 源码解析:Tensor 类型体系与 Local Tensor

作者:OneFlow
  • 2022 年 9 月 06 日
    重庆
  • 本文字数:7702 字

    阅读完需:约 25 分钟

OneFlow源码解析:Tensor类型体系与Local Tensor

撰文|郑建华

更新|赵露阳


tensor 和 op 是神经网络模型最基本的组件:op 是模型的节点,tensor 是连接节点的边。然而,构建一个 tensor 并不仅仅是构造一个对象那么简单,至少要考虑以下问题:


  • 要支持节点本地的 local tensor,以及分布式的 global tensor;

  • 要支持 eager 和 lazy 执行模式;

  • 要支持不同的数据类型,包括 float、double、int 等;

  • 要支持不同设备。


1、创建 tensor 的方法


与 PyTorch 类似,在 OneFlow 中也可以通过两种主要的方式来创建 tensor:Tensortensor。这两种方式最终都会创建出 OneFlow 内部的 C++ Tensor 对象,即对应 Python 层的 flow.Tensor 类型。


1.1 Tensor


Python 层的 Tensor 是在 tensor.py(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/tensor.py#L23)中引入的,通过 python c api 注册的 Tensor 类型对象,此对象在 MakeTensorType(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/framework/tensor.cpp#L623)中被定义和返回。


在 MakeTensorType 中主要通过 PyTensorObject_init 创建了 Tensor 对象:


static int PyTensorObject_init(PyObject* self, PyObject* args, PyObject* kwargs) {  HANDLE_ERRORS  auto* temp = functional::_legacy_tensor_ctor(NULL, args, kwargs);  if (PyErr_Occurred()) { throw py::error_already_set(); }  auto* _self = (PyTensorObject*)self;  _self->data = PyTensor_Unpack(temp);  _self->data->set_pyobject(self);

// reset temp data to prevent clearing the pyobject // when the temp is deallocated ((PyTensorObject*)temp)->data.reset(); Py_XDECREF(temp); return 0; END_HANDLE_ERRORS_RET(-1)}
复制代码


通过functional::_legacy_tensor_ctor函数创建了 OneFlow 内部的 c++ Tensor 对象:oneflow::one::Tensor,并作为 data 绑定至 Python 的 Tensor 类型。在 MakeTensorType 中,还通过 PyMethodDef(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/framework/tensor.cpp#L639-L641)为 Tensor 注册了很多 C++方法,如:


static PyMethodDef PyTensorObject_methods[] = {    {"storage_offset", PyTensorObject_storage_offset, METH_NOARGS, NULL},    {"stride", PyTensorObject_stride, METH_NOARGS, NULL},    {"is_contiguous", PyTensorObject_is_contiguous, METH_NOARGS, NULL},    {"contiguous", PyTensorObject_contiguous, METH_NOARGS, NULL},    {"contiguous_", PyTensorObject_contiguous_, METH_NOARGS, NULL},    {"pin_memory", PyTensorObject_pin_memory, METH_NOARGS, NULL},    {"is_pinned", PyTensorObject_is_pinned, METH_NOARGS, NULL},    {"requires_grad_", (PyCFunction)PyTensorObject_requires_grad_, METH_VARARGS | METH_KEYWORDS,     NULL},    {"retain_grad", PyTensorObject_retain_grad, METH_NOARGS, NULL},    {"detach", PyTensorObject_detach, METH_NOARGS, NULL},    {"clone", PyTensorObject_clone, METH_NOARGS, NULL},    {"zero_", PyTensorObject_zero_, METH_NOARGS, NULL},    {"register_hook", PyTensorObject_register_hook, METH_O, NULL},    {"_register_post_grad_accumulation_hook", PyTensorObject__register_post_grad_accumulation_hook,     METH_O, NULL},    {"global_id", PyTensorObject_global_id, METH_NOARGS, NULL},    {"check_meta_consistency", PyTensorObject_check_meta_consistency, METH_NOARGS, NULL},    {"to_numpy", PyTensorObject_to_numpy, METH_NOARGS, NULL},    {"type", (PyCFunction)PyTensorObject_type, METH_VARARGS | METH_KEYWORDS, NULL},
复制代码


此外,在 Python 层通过 RegisterMethods(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/tensor.py#L502)也为 Tensor 注册了一些 Python 实现的 Tensor 方法或属性(如 tensor.numpy),在 OneFlow 包初始化时会通过 RegisterMethod4Class(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/python/oneflow/framework/register_class_method_util.py#L23)完成这些 Python 方法和属性的注册。RegisterMethod4Class 的调用流程如下:



相比于 Python 实现来说,Tensor 的++实现的方法/属性通常具有较高的性能。


1.2 tensor 函数


Tensor 是类型,而 tensor 则是函数,flow.tensor函数在oneflow/api/python/functional/tensor_api.yaml中被定义:


- name: "tensor"  signature: [      "Tensor (PyObject* data, *, DataType dtype=None, Device device=None,      Bool requires_grad=False, Bool pin_memory=False) => TensorWithData",      "Tensor (PyObject* data, *, DataType dtype=None, Placement placement,      SbpList sbp, Bool requires_grad=False) => GlobalTensorWithData",    ]  bind_python: True
复制代码


其 C++实现位于tensor_api.yaml.pybind.cpp中,这是构建阶段自动生成的文件。


通过函数签名可以看到,flow.tensor()有两种重载的方法:


  • TensorWithData

  • GlobalTensorWithData


它们分别用于构造 local tensor 和 global tensor 的构造。和上面的 Tensor 类似,flow.tensor 返回的也是 OneFlow 内部的oneflow::one::Tensor对象(绑定至 Python 的 Tensor 对象)。


1.3 手动构建 tensor 的两种方式


和 PyTorch 类似,在 OneFlow 中常用创建 tensor 的方式也分为两种:


  • flow.Tensor

  • flow.tensor


创建方式示例:


import oneflowimport numpy as np
oneflow.tensor([[1., -1.], [1., -1.]])# tensor([[ 1., -1.],# [ 1., -1.]], dtype=oneflow.float32)oneflow.tensor(np.array([[1, 2, 3], [4, 5, 6]]))# tensor([[ 1, 2, 3],# [ 4, 5, 6]], dtype=oneflow.int64)flow.Tensor([[1,2,3],[4,5,6]])
复制代码


大多数情况下(和 PyTorch 类似的 eager 模式),可以通过指定 device、dtype、shape 等参数创建普通 tensor(local tensor);


少数情况下(如 OneFlow 特有的 eager global、lazy 模式),需要 global tensor 时,可以通过指定 sbp 和 placement 的方式直接创建 global tensor,也可通过 tensor.to_global 的方式将普通 tensor 转换为 global tensor,可参考:


  • oneflow.tensor(https://oneflow.readthedocs.io/en/master/generated/oneflow.tensor.html#

  • global tensor(https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html


2、OneFlow 的 tensor 类型体系


上述内容中介绍的 oneflow 内部的 C++ Tensor 对象,实际上其定义位于oneflow/core/framework/tensor.h,是一个抽象的 Tensor 类型。



其中LocalTensor即为普通的单卡视角下的 Tensor(和 PyTorch 的 Tensor 类似);GlobalTensor则为 OneFlow 所特有的全局视角下的 Tensor(通常用于 eager global 模式或 lazy 模式下)。Tensor 使用了 Bridge 模式,每个 Tensor 子类内部有一个 TensorImpl 字段,负责抽象 Tensor 的实际实现:



3、local tensor 的构造


我们以flow.tensor([[1,2,3],[4,5,6]])为例,看一下 tensor 构造的过程。主要的流程如下:



在这个例子中,由于使用的是 flow.tensor 方法创建 tensor(且为普通的 local tensor)所以会用到在oneflow/api/python/functional/tensor_api.yaml中定义的 TensorWithData 方法,其实现,是位于oneflow/api/python/functional/tensor_api.cpp的 TensorWithDataFunctor:


class TensorWithDataFunctor { public:  Maybe<Tensor> operator()(PyObject* data, const Optional<Symbol<DType>>& dtype,                           const Optional<Symbol<Device>>& device, const bool requires_grad,                           const bool pin_memory) const {    ...    if (PyTensor_Check(data)) {      // Throw warnings like pytorch.      auto ret = PyErr_WarnEx(          PyExc_UserWarning,          "To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() "          "or sourceTensor.clone().detach().requires_grad_(True), rather than "          "oneflow.tensor(sourceTensor).",          1);      if (ret != 0) { return Error::RuntimeError(); }
const auto& other = PyTensor_Unpack(data); return MakeTensorFromOtherTensor(other, dtype, device, requires_grad, pin_memory); } else { // Make tensor from python sequence or numpy array. return MakeLocalTensorFromData(data, dtype, device, requires_grad, pin_memory); } }};
复制代码


由于这里传入的 data 是一个 Python 的 list 对象,所以最终会调用MakeLocalTensorFromData方法,创建 tensor 主要的逻辑都在这个函数中。其中大量调用 Python 和 Numpy 的接口,检查 PyObject 的数据类型,获取 Shape(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L184)和 DataType(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L185),如果用户没有制定 device,默认会设置为 CPU 设备(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L191)。


后面主要是调用 EmptyFunctor(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L194)和 SwitchCopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L195)。前者为 tensor 分配内存,后者进行数据拷贝,两个步骤都会通过虚拟机指令完成。其中 EmptyFunctor 会走普通的OpCall指令、而 CopyLocalTensorFromUntypedArray 会根据是否需要同步 copy 走到AccessBlobByCallback/SyncAccessBlobByCallback指令。


为什么要通过虚拟机指令完成呢?无论是内存资源的分配,还是数据拷贝,CPU 和 CUDA 等不同设备上的操作都不一样。之前讨论 Op/Kernel 时已经看到,在 OneFlow 中所有动静态图任务执行、eager 模式下 op/kernel 执行、内存/显存的分配和释放、device、stream 等统一由虚拟机进行管理。


3.1 分配内存:EmptyFunctor


matmulreluinplace=false时)等操作在执行过程中也会创建 output tensor。之前讨论 relu 时重点关注了 op 和 kernel 的计算逻辑,而忽略了 tensor 相关的内容。


而这里只需要先构造一个空 tensor 对象,不需要其它计算,所以是一个 Empty 操作,Empty op 对应的 kernel——EmptyKernel(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/user/kernels/empty_kernel.cpp#L30)没有实质性的计算逻辑,只是先根据 shape、dtype、device 信息创建一个空 tensor,等待后续将实际的数据从内存中 copy 至此空 tensor,从而完成整个 tensor 的创建过程。


EmptyFunctor 同样和其他 functor 一样,最终会被 Dispacth 至对应的 interpreter 被解释执行,这里由于是 eager 模式下的 local tensor,EmptyFunctor 最终会进入 eager local interpreter,交给 NaiveInterpret(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L74)方法处理。流程如下:


  1. 在构造 EagerLocalTensorImpl(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L110)对象,用于存放 tensor 结果。但这只是一个壳子,还没有为 tensor 的数据分配存储空间。

  2. 之后会初始化 EagerBlobObject(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L114)、TensorStorage(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/tensor_impl.cpp#L120),这样 tensor 主要的字段基本构建完毕

  3. 然后构造 OpCall 指令、提交虚拟机 PhysicalRun(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/framework/op_interpreter/eager_local_op_interpreter.cpp#L134-L136),等待 vm 的调度执行。


OpCall 对应的指令策略最终会进入oneflow/core/vm/op_call_instruction_policy.cpp,并在 Prepare 方法中通过AllocateOutputBlobsMemory方法对 TensorStorage 完成实际的内存分配;在Compute方法中启动(empty op 对应的)实际的 kernel 执行。


3.2 拷贝数据:SwitchCopyLocalTensorFromUntypedArray


SwitchCopyMirroredTensorFromUntypedArray其实是 MAKE_SWITCH_ENTRY(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L150)宏展开后的函数名。宏展开后的代码如下。实际会调用 CopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.cpp#L68)。


template<typename... Args>static Maybe<void> SwitchCopyLocalTensorFromUntypedArray(    const std::tuple<DataType>& switch_tuple, Args&& ... args) {  static const std::map<std::tuple<DataType>, std::function<Maybe<void>(Args && ...)>>      case_handlers {          {SwitchCase(DataType::kFloat),           [](Args&&... args) {             return CopyLocalTensorFromUntypedArray<float>(std::forward<Args>(args)...);           }},           // ...      };  return case_handlers.at(switch_tuple)(std::forward<Args>(args)...);};
复制代码


CopyLocalTensorFromUntypedArray 方法如下:


template<typename T>Maybe<void> CopyLocalTensorFromUntypedArray(const std::shared_ptr<Tensor>& tensor,                                            PyObject* array) {  return CopyBetweenLocalTensorAndNumpy<T>(tensor, array, CopyFromNumpyArray, "mut",                                           /*block_host_until_done=*/false);}
复制代码


其内部实际调用了CopyBetweenLocalTensorAndNumpy方法。


CopyBetweenLocalTensorAndNumpy


顾名思义,这个方法主要是用在 numpy 和 tensor 之间进行数据 copy 的。其中第 3 个参数:CopyFromNumpyArray实际是一个函数回调的 callback 方法,其主要通过 SyncAutoMemcpy 进行 array 和 tensor(blob)之间的内存拷贝:


void CopyFromNumpyArray(ep::Stream* stream,                        const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object,                        const NumPyArrayPtr& array_ptr) {  SyncAutoMemcpy(stream, eager_blob_object->mut_dptr(), array_ptr.data(),                 eager_blob_object->ByteSizeOfBlobBody(), eager_blob_object->mem_case(),                 memory::MakeHostMemCase());}
复制代码


继续看 CopyBetweenLocalTensorAndNumpy(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/api/python/utils/tensor_utils.h#L93)方法,其中最关键的是:


JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {      return builder->AccessBlobByCallback(          tensor,          [array_ptr, Copy](ep::Stream* stream,                            const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object) {            Copy(stream, eager_blob_object, array_ptr);          },          modifier);    }));
复制代码


通过 InstructionsBuilder 构建了AccessBlobByCallback指令,参数为上面通过 EmptyFuncor 创建的空 tensor、callback 的函数指针及参数、以及 modifier(string "mut"表示可动态修改)。


AccessBlobByCallback


和 OpCall 类似,InstructionsBuilder 调用AccessBlobByCallback时,也会实际构造对应的 vm 指令策略——AccessBlobArgCbInstructionPolicy并派发至 vm,等待被调度和实际执行:


template<typename T>Maybe<void> InstructionsBuilder::AccessBlobByCallback(    const T tensor,    const std::function<void(ep::Stream*, const std::shared_ptr<vm::EagerBlobObject>&)>& callback,    const std::string& modifier) {  const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object = JUST(tensor->eager_blob_object());  Symbol<Device> device = JUST(GetDevice(tensor));  ...  Symbol<Stream> stream = JUST(GetDefaultStreamByDevice(device));  JUST(SoftSyncStream({eager_blob_object}, stream));  auto instruction = intrusive::make_shared<vm::Instruction>(      // Never replace `stream` with producer_stream or last_used_stream.      JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream)),      std::make_shared<vm::AccessBlobArgCbInstructionPolicy>(eager_blob_object, callback,                                                             modifier));  instruction_list_->EmplaceBack(std::move(instruction));  return Maybe<void>::Ok();}
复制代码


等该条AccessBlobArgCbInstructionPolicy指令实际执行时,会在指令的 Compute(https://github.com/Oneflow-Inc/oneflow/blob/2e6a72c8734b9929191306df35b4284e9caa8126/oneflow/core/vm/access_blob_arg_cb_instruction_policy.h#L79)方法中调用 callback 完成从 tensor 的 blob <-> numpy 的 ndarray 之间的数据 copy,至此拷贝过程结束,flow.tensor的创建全部完成。


(本文经授权后发布。原文:https://segmentfault.com/a/1190000041989895


参考资料


其他人都在看


欢迎下载体验 OneFlow v0.8.0 最新版本:https://github.com/Oneflow-Inc/oneflow/


发布于: 刚刚阅读数: 4
用户头像

OneFlow

关注

不至于成为世界上最快的深度学习框架。 2022.03.23 加入

★ OneFlow深度学习框架:github.com/Oneflow-Inc/oneflow ★ OF云平台:oneflow.cloud

评论

发布
暂无评论
OneFlow源码解析:Tensor类型体系与Local Tensor_深度学习_OneFlow_InfoQ写作社区