写点什么

从 0 构建深度学习框架——揭秘深度学习框架的黑箱

作者:方品
  • 2025-12-07
    广东
  • 本文字数:13548 字

    阅读完需:约 44 分钟

引言你有没有好奇过,当你在 PyTorch 或 TensorFlow 中调用 .backward() 计算梯度时,框架到底在背后做了什么?我们每天都在使用这些成熟的深度学习工具,但很少有人真正去探索它们的底层实现——自动微分的魔法、计算图的构建、张量运算的优化……这些隐藏在 API 背后的核心原理,才是深度学习的真正基石。今天,我将向大家介绍一个从零开始实现的轻量级深度学习框架——Needle。它不是为了替代现有的成熟框架,而是为了帮你揭开深度学习的神秘面纱:你将亲手见证自动微分如何通过计算图和链式法则实现你将理解张量运算的底层逻辑和实现原理你将学会如何从 0 到 1 构建自己的深度学习组件 Needle 的代码结构清晰,注释详细,所有功能都从零实现,是学习深度学习核心原理的最佳实践项目。Needle 的核心功能 Needle 虽然是一个轻量级框架,但却实现了深度学习框架的所有核心功能:🧮 自动微分系统自动微分是深度学习的核心技术之一,Needle 实现了基于计算图的自动微分系统。当你执行张量运算时,框架会自动构建计算图,记录所有运算操作和依赖关系。当需要计算梯度时,框架会沿着计算图反向传播,自动计算每个参数的梯度,无需手动推导。➕ 丰富的张量运算 Needle 支持各种基本的张量运算,包括:元素级运算(Add, Mul, Div, Pow 等)矩阵运算(MatMul, Transpose, Dot 等)归约运算(Sum, Mean, Max 等)变形运算(Reshape, Broadcast, Concatenate 等)这些运算与 PyTorch 等主流框架的 API 设计非常相似,易于上手,同时保持了代码的简洁性和可理解性。🔌 多后端支持 Needle 支持两种后端:NumPy 后端:用于快速原型开发和 CPU 计算,便于调试和学习自定义 NDArray 后端:支持 CPU 和 CUDA 加速,提供了更高的性能通过环境变量 NEEDLE_BACKEND 可以轻松切换后端,满足不同场景的需求。📦 神经网络模块 Needle 提供了完整的神经网络模块支持,包括:线性层(Linear)、卷积层(Conv)等基础组件 ReLU、Sigmoid、Softmax 等激活函数损失函数(CrossEntropyLoss, MSELoss 等)这些模块与自动微分系统无缝集成,方便构建各种复杂的神经网络模型。🎯 优化算法 Needle 实现了多种常用的优化算法,包括:随机梯度下降(SGD)MomentumRMSpropAdam 这些优化算法可以直接用于训练神经网络模型,支持权重衰减和学习率调度等功能。📊 数据加载 Needle 提供了数据加载和预处理支持,包括:自定义 Dataset 接口 DataLoader 用于批量加载数据常用的数据变换操作这些功能可以帮助用户轻松处理各种数据集,提高训练效率。接下来,让我们深入了解 Needle 背后的深度学习原理。我将用简单的例子和代码来说明自动微分、计算图等核心概念。计算图 & 自动微分的从零实现早期的 ML 通常从手动计算梯度开始,但是随着网络变得越来越复杂,手动计算梯度变得越来越困难。为了解决这个问题,自动微分被提出。自动微分是一种计算梯度的方法,它可以自动计算函数在任意点的梯度。自动微分通过将所有计算操作表示为计算图,其中值(包括张量)为计算图中的节点,计算(op)是图中的边(一个 op 可能是多条边)。将梯度计算转换为计算图上的反向传播。最终利用链式法的迭代计算,一步步算出最终的完整梯度。我们来从零实现一个自动微分计算框架,完整代码参考.先定义计算图的边,也就是计算(op)本身,其抽象类非常简单:class Op:"""Operator definition."""def call(self, *args):raise NotImplementedError()


def compute(self, *args: Tuple[NDArray]):    """Calculate forward pass of operator.
Parameters ---------- input: np.ndarray A list of input arrays to the function
Returns ------- output: nd.array Array output of the operation
""" raise NotImplementedError()
def gradient( self, out_grad: "Value", node: "Value") -> Union["Value", Tuple["Value"]]: """Compute partial adjoint for each input value for a given output adjoint.
Parameters ---------- out_grad: Value The adjoint wrt to the output value.
node: Value The value node of forward evaluation.
Returns ------- input_grads: Value or Tuple[Value] A list containing partial gradient adjoints to be propagated to each of the input node. """ raise NotImplementedError()
复制代码


compute 和 gradient 分别是这条边上两个方向的计算,compute 是前向计算,gradient 是反向计算。 compute 依赖与这个计算的所有输入作为入参,并计算出输出节点的值;gradient 依赖与这个计算的所有输入和最终输出的 adjoint 作为入参,并计算出这个计算的所有输入节点的 adjoint。比起单值,深度学习更通常处理张量值,因此我们再定义一组专门处理张量的计算(op).class TensorOp(Op):"""Op class specialized to output tensors, will be alternate subclasses for other structures"""def call(self, *args):return Tensor.make_from_op(self, args)


class TensorTupleOp(Op):"""Op class specialized to output TensorTuple"""def call(self, *args):return TensorTuple.make_from_op(self, args)接着定义计算图的节点,也就是值(value)本身,其抽象类也非常简单:class Value:"""A value in the computational graph."""


# trace of computational graphop: Optional[Op]inputs: List["Value"]# The following fields are cached fields for# dynamic computationcached_data: NDArrayrequires_grad: bool
def realize_cached_data(self): """Run compute to realize the cached data""" # avoid recomputation if self.cached_data is not None: return self.cached_data # note: data implicitly calls realized cached data self.cached_data = self.op.compute( *[x.realize_cached_data() for x in self.inputs] ) return self.cached_data
def is_leaf(self): return self.op is None
def __del__(self): global TENSOR_COUNTER TENSOR_COUNTER -= 1
def _init( self, op: Optional[Op], inputs: List["Tensor"], *, num_outputs: int = 1, cached_data: List[object] = None, requires_grad: Optional[bool] = None,): global TENSOR_COUNTER TENSOR_COUNTER += 1 if requires_grad is None: requires_grad = any(x.requires_grad for x in inputs) self.op = op self.inputs = inputs self.num_outputs = num_outputs self.cached_data = cached_data self.requires_grad = requires_grad
@classmethoddef make_const(cls, data, *, requires_grad=False): value = cls.__new__(cls) value._init( None, [], cached_data=data, requires_grad=requires_grad, ) return value
@classmethoddef make_from_op(cls, op: Op, inputs: List["Value"]): value = cls.__new__(cls) value._init(op, inputs)
if not LAZY_MODE: if not value.requires_grad: return value.detach() value.realize_cached_data() return value
复制代码


Value 的定义串联起来了整个计算图,每个值节点中记录了这个值是由哪个计算(op)得到的,以及这个计算的所有输入节点(也是值节点)。 为了避免重复计算定义了缓存区域,以及是否需要计算梯度的 flag。 is_leaf 方法用于判断这个值节点是否是叶子节点,也就是是否没有输入节点。通过这个信息可以对计算图进行拓扑排序,从而完成前向和反向传播的计算。接下来进入主角,Tensor 这种最常用 Value 的定义。class Tensor(Value):grad: "Tensor"


def __init__(    self,    array,    *,    device: Optional[Device] = None,    dtype=None,    requires_grad=True,    **kwargs,):    if isinstance(array, Tensor):        if device is None:            device = array.device        if dtype is None:            dtype = array.dtype        if device == array.device and dtype == array.dtype:            cached_data = array.realize_cached_data()        else:            # fall back, copy through numpy conversion            cached_data = Tensor._array_from_numpy(                array.numpy(), device=device, dtype=dtype            )    else:        device = device if device else cpu()        cached_data = Tensor._array_from_numpy(array, device=device, dtype=dtype)
self._init( None, [], cached_data=cached_data, requires_grad=requires_grad, )
@staticmethoddef make_const(data, requires_grad=False): tensor = Tensor.__new__(Tensor) tensor._init( None, [], cached_data=( data if not isinstance(data, Tensor) else data.realize_cached_data() ), requires_grad=requires_grad, ) return tensor
@propertydef data(self): return self.detach()
@propertydef shape(self): return self.realize_cached_data().shape
@propertydef dtype(self): return self.realize_cached_data().dtype
@propertydef device(self): data = self.realize_cached_data() # numpy array always sits on cpu if array_api is numpy: return cpu() return data.device
def backward(self, out_grad=None): out_grad = ( out_grad if out_grad else init.ones(*self.shape, dtype=self.dtype, device=self.device) ) compute_gradient_of_variables(self, out_grad)
def __add__(self, other): if isinstance(other, Tensor): return needle.ops.EWiseAdd()(self, other) else: return needle.ops.AddScalar(other)(self)
复制代码


这里 device 是用来指定计算设备的,支持 cpu 和 gpu,我们后续为分别基于 CPU 和 GPU 实现不同的计算后端。 Tensor 本身需要支持各类矩阵数值计算,包括加减乘除、矩阵乘法、求和、广播、reshape 等。这里为了简单只定义了加法和加法标量的实现。完整代码请参考 autograd.py。上面的 computegradientof_variables 函数用于计算一个张量的反向传播。我们需要遍历这个张量所在计算图,并按照拓扑排序进行反向传播,主要实现如下:def compute_gradient_of_variables(output_tensor, out_grad):"""Take gradient of output node with respect to each node in node_list.


Store the computed result in the grad field of each Variable."""# a map from node to a list of gradient contributions from each output nodenode_to_output_grads_list: Dict[Tensor, List[Tensor]] = {}# Special note on initializing gradient of# We are really taking a derivative of the scalar reduce_sum(output_node)# instead of the vector output_node. But this is the common case for loss function.node_to_output_grads_list[output_tensor] = [out_grad]
# Traverse graph in reverse topological order given the output_node that we are taking gradient wrt.reverse_topo_order = list(reversed(find_topo_sort([output_tensor])))
for tenser in reverse_topo_order: tensor_grad = sum_node_list(node_to_output_grads_list[tenser]) tenser.grad = tensor_grad # leaf node if tenser.op is None: continue inputs_out_grads = tenser.op.gradient_as_tuple(tensor_grad, tenser) for input, out_grad in zip(tenser.inputs, inputs_out_grads): if input not in node_to_output_grads_list: node_to_output_grads_list[input] = [] node_to_output_grads_list[input].append(out_grad)
复制代码


def find_topo_sort(node_list: List[Value]) -> List[Value]:"""Given a list of nodes, return a topological sort list of nodes ending in them.


A simple algorithm is to do a post-order DFS traversal on the given nodes,going backwards based on input edges. Since a node is added to the orderingafter all its predecessors are traversed due to post-order DFS, we get a topologicalsort."""visited = set()order = []for node in node_list:    topo_sort_dfs(node, visited, order)return order
复制代码


def topo_sort_dfs(node, visited, topo_order):"""Post-order DFS"""if node not in visited:visited.add(node)for n in node.inputs:topo_sort_dfs(n, visited, topo_order)topo_order.append(node)"""Post-order DFS"""上面我们已经完成了整个计算图的构建,接下来便是细化上面提到的张量需要支持的各类计算(op)的正向传播和反向传播的逻辑。 张量内部的数据是矩阵,因此支持各类矩阵的常用运算,这里的实现主要是体力活,通过底层提供的矩阵计算来实现正向和反向传播,具体完整实现可以参考 opslogarithmic.py 和 opsmathematic.py。矩阵计算的从零实现矩阵的表示:无论任何维度(shape)的矩阵,我们都可以将其表示为一个一维数组,只是需要根据 strides 来计算每个元素的位置。理解 strides 的含义是实现高效矩阵计算的关键。strides 表示每个维度上的步长,即每个元素在内存中的偏移量。 例如,对于一个 的矩阵,其 strides 为 ,表示在内存中每个元素的偏移量为 个元素大小(如果元素大小为 字节),而在第二个维度上每个元素的偏移量为 个元素大小。 如果需要对这个 的矩阵进行转置操作得到一个 的矩阵,只需要对原矩阵的 strides 进行交换即可,即 。完全不需要对底层矩阵元素进行复制。类似的操作包括但不限于:transposereshapebroadcastviewsqueeze/unsqueezesliceflipreversepermute 无论是 CPU 还是 GPU 介质,优化矩阵运算的关键都是如何利用好更告诉的存储。对 CPU 来说是如何实现 cache 友好的矩阵运算,减少 cache miss 带来的对 main memory 的访问。对 GPU 来说是如何利用好 shared memory,减少对 GPU global memory 的访问。我们将分别在下面的 CPU 和 GPU 矩阵运算实现中详细介绍。CPU 矩阵运算实现完整代码参考 ndarraybackend_cpu.cc.使用矩阵分块乘法,将矩阵乘法分解为多个小的矩阵乘法,每个小的矩阵乘法可以利用 cache 友好的方式进行计算,减少 cache miss 带来的对 main memory 的访问。 每个小的矩阵乘法的大小为 ,其中 是一个超参数,一般取 。 取 是因为现代 CPU 的 cache 行大小为 字节,即两个 类型的元素。#define ALIGNMENT 256#define TILE 8typedef float scalar_t;const size_t ELEM_SIZE = sizeof(scalar_t);


/**


  • This is a utility structure for maintaining an array aligned to ALIGNMENT

  • boundaries in memory. This alignment should be at least TILE * ELEM_SIZE,

  • though we make it even larger here by default.*/struct AlignedArray {AlignedArray(const size_t size) {int ret = posix_memalign((void **)&ptr, ALIGNMENT, size * ELEM_SIZE);if (ret != 0)throw std::bad_alloc();this->size = size;}~AlignedArray() { free(ptr); }size_t ptr_as_int() { return (size_t)ptr; }scalar_t *ptr;size_t size;};


void MatmulTiled(const AlignedArray &a, const AlignedArray &b,AlignedArray out, uint32_t m, uint32_t n, uint32_t p) {/*


  • Matrix multiplication on tiled representations of array. In this

  • setting, a, b, and out are all 4D compact arrays of the appropriate

  • size, e.g. a is an array of size a[m/TILE][n/TILE][TILE][TILE] You should

  • do the multiplication tile-by-tile to improve performance of the array

  • (i.e., this function should call AlignedDot() implemented above).

  • Note that this function will only be called when m, n, p are all

  • multiples of TILE, so you can assume that this division happens without

  • any remainder.

  • Args:

  • a: compact 4D array of size m/TILE x n/TILE x TILE x TILE

  • b: compact 4D array of size n/TILE x p/TILE x TILE x TILE

  • out: compact 4D array of size m/TILE x p/TILE x TILE x TILE to write to

  • m: rows of a / out

  • n: columns of a / rows of b

  • p: columns of b / out


*/for (size_t i = 0; i < m * p; ++i) {out->ptr[i] = 0.;}for (size_t i = 0; i < m / TILE; i++) {for (size_t j = 0; j < p / TILE; j++) {float *_out = out->ptr + i * p * TILE + j * TILE * TILE;for (size_t k = 0; k < n / TILE; k++) {const float *_a = a.ptr + i * n * TILE + k * TILE * TILE;const float *_b = b.ptr + k * p * TILE + j * TILE * TILE;AlignedDot(_a, _b, _out);}}}}GPU 矩阵运算实现完整代码参考 ndarraybackend_cuda.cu.使用矩阵分块乘法,将矩阵乘法分解为多个小的矩阵乘法,每个小的矩阵乘法可以利用 shared memory 友好的方式进行计算,减少对 GPU global memory 的访问。 每个小的矩阵乘法的大小为 ,其中 是一个超参数,一般取 。 取 是因为现代 GPU 的 shared memory 行大小为 字节,即两个 类型的元素。#define BASE_THREAD_NUM 256


#define TILE 4typedef float scalar_t;const size_t ELEM_SIZE = sizeof(scalar_t);


struct CudaArray {CudaArray(const size_t size) {cudaError_t err = cudaMalloc(&ptr, size * ELEM_SIZE);if (err != cudaSuccess)throw std::runtime_error(cudaGetErrorString(err));this->size = size;}~CudaArray() { cudaFree(ptr); }size_t ptr_as_int() { return (size_t)ptr; }


scalar_t *ptr;size_t size;};


struct CudaDims {dim3 block, grid;};


CudaDims CudaOneDim(size_t size) {/**


  • Utility function to get cuda dimensions for 1D call*/CudaDims dim;size_t num_blocks = (size + BASE_THREAD_NUM - 1) / BASE_THREAD_NUM;dim.block = dim3(BASE_THREAD_NUM, 1, 1);dim.grid = dim3(num_blocks, 1, 1);return dim;}


#define MAX_VEC_SIZE 8struct CudaVec {uint32_t size;int32_t data[MAX_VEC_SIZE];};


CudaVec VecToCuda(const std::vector<int32_t> &x) {CudaVec shape;if (x.size() > MAX_VEC_SIZE)throw std::runtime_error("Exceeded CUDA supported max dimesions");shape.size = x.size();for (size_t i = 0; i < x.size(); i++) {shape.data[i] = x[i];}return shape;}


global void MatmulKernel(const scalar_t *a, const scalar_t *b,scalar_t *out, uint32_t M, uint32_t N,uint32_t P) {shared scalar_t a_tile[TILE][TILE];shared scalar_t b_tile[TILE][TILE];


size_t thread_x = threadIdx.x;size_t thread_y = threadIdx.y;


size_t block_x = blockIdx.x;size_t block_y = blockIdx.y;


size_t x = thread_x + block_x * blockDim.x;size_t y = thread_y + block_y * blockDim.y;


size_t cnt = (N + TILE - 1) / TILE;scalar_t sum = 0;for (size_t i = 0; i < cnt;i++) // 遍历 a 的某一行和 b 某一列的 TILE 块,对应位置累加得到这个块的 out{if ((i * TILE + thread_y) < N) // 防越界{a_tile[thread_x][thread_y] = a[x * N + i * TILE + thread_y];}if ((i * TILE + thread_x) < N) {b_tile[thread_x][thread_y] = b[(i * TILE + thread_x) * P + y];}


__syncthreads();
if (x < M && y < P) { for (size_t j = 0; j < TILE; j++) if (i * TILE + j < N) sum += a_tile[thread_x][j] * b_tile[j][thread_y];}
__syncthreads();
复制代码


}


if (x < M && y < P)out[x * P + y] = sum;}


void Matmul(const CudaArray &a, const CudaArray &b, CudaArray out, uint32_t M,uint32_t N, uint32_t P) {/*


  • Multiply two (compact) matrices into an output (also comapct) matrix. You

  • will want to look at the lecture and notes on GPU-based linear algebra to

  • see how to do this. Since ultimately mugrade is just evaluating

  • correctness, you can implement a version that simply parallelizes over

  • (i,j) entries in the output array. However, to really get the full benefit

  • of this problem, we would encourage you to use cooperative fetching, shared

  • memory register tiling, and other ideas covered in the class notes. Note

  • that unlike the tiled matmul function in the CPU backend, here you should

  • implement a single function that works across all size matrices, whether or

  • not they are a multiple of a tile size. As with previous CUDA

  • implementations, this function here will largely just set up the kernel

  • call, and you should implement the logic in a separate MatmulKernel() call.

  • Args:

  • a: compact 2D array of size m x n

  • b: comapct 2D array of size n x p

  • out: compact 2D array of size m x p to write the output to

  • M: rows of a / out

  • N: columns of a / rows of b

  • P: columns of b / out*/


dim3 block = dim3(TILE, TILE, 1);size_t grid_x = (M + TILE - 1) / TILE;size_t grid_y = (P + TILE - 1) / TILE;dim3 grid = dim3(grid_x, grid_y, 1);MatmulKernel<<<grid, block>>>(a.ptr, b.ptr, out->ptr, M, N, P);}Triton 矩阵运算实现也可以使用 triton 实现矩阵乘法,triton 是一个基于 python 的编译器,用于将 python 代码编译为 cuda kernel。 如果你对 triton 感兴趣,可以参考我之前关于的一些其他文章:突破性能瓶颈:深入解析 Flash Attention 内核优化技术深入解析:使用 Triton 实现 Flash Attention2 - 让大模型训练飞起来深度学习框架搭建以上内容串联起了一个完整的深度学习框架的基本底层要素:包括自动微分计算,矩阵计算的 CPU、GPU 实现。 接下来补齐深度学习框架中其他的基本要素,从而构建起一个基本开箱可用的深度学习框架。model parameter 本质是 Tensor,封装一层方便 nn Module 管理。class Parameter(Tensor):"""A special kind of tensor that represents parameters."""nn modulenn.Module 是所有神经网络模块的基类。Module 是封装模型结构、参数和计算的统一单元。每个基础模型或者自定义模型都通过继承 nn.Module 来定义。它可以包含可学习参数(通过 nn.Parameter 或子模块如 nn.Linear 自动注册)和前向计算逻辑。Module 支持层级嵌套(一个 Module 可包含其他 Module),自动管理参数、状态(如训练/评估模式)和设备(CPU/关键代码如下,完整代码参考 nn_basic.py。class Module:def init(self):self.training = True


def parameters(self) -> List[Tensor]:    """Return the list of parameters in the module."""    return _unpack_params(self.__dict__)
def _children(self) -> List["Module"]: return _child_modules(self.__dict__)
def eval(self): self.training = False for m in self._children(): m.training = False
def train(self): self.training = True for m in self._children(): m.training = True
def __call__(self, *args, **kwargs): return self.forward(*args, **kwargs)
复制代码


def _unpack_params(value: object) -> List[Tensor]:if isinstance(value, Parameter):return [value]elif isinstance(value, Module):return value.parameters()elif isinstance(value, dict):params = []for k, v in value.items():params += _unpack_params(v)return paramselif isinstance(value, (list, tuple)):params = []for v in value:params += _unpack_params(v)return paramselse:return []


def _child_modules(value: object) -> List["Module"]:if isinstance(value, Module):modules = [value]modules.extend(_child_modules(value.dict))return modulesif isinstance(value, dict):modules = []for k, v in value.items():modules += _child_modules(v)return moduleselif isinstance(value, (list, tuple)):modules = []for v in value:modules += _child_modules(v)return moduleselse:return []常见内置模块为了做到开箱即用,我们直接 cosplay pytorch,实现一些常见的神经网络模块,如 Linear、ReLU、Softmax 等。最简单的 Linear:class Linear(Module):def init(self, in_features, out_features, bias=True, device=None, dtype="float32"):super().init()self.in_features = in_featuresself.out_features = out_featuresself.weight = Parameter(init.kaiming_uniform(in_features,out_features,device=device,dtype=dtype,requires_grad=True,))if bias:self.bias = Parameter(init.kaiming_uniform(out_features, 1, device=device, dtype=dtype, requires_grad=True).transpose())else:self.bias = None


def forward(self, X: Tensor) -> Tensor:    ret = X @ self.weight    if self.bias:        b = self.bias.broadcast_to(ret.shape)        ret = ret + b    return ret
复制代码


class Flatten(Module):def forward(self, X):s = X.shapebatch = s[0]other = 1for i, x in enumerate(s):if i == 0:continueother = other * xreturn X.reshape((batch, other))


class ReLU(Module):def forward(self, x: Tensor) -> Tensor:return ops.relu(x)


class Sequential(Module):def init(self, *modules):super().init()self.modules = modules


def forward(self, x: Tensor) -> Tensor:    input = x    for module in self.modules:        input = module(input)    return input
复制代码


class SoftmaxLoss(Module):def forward(self, logits: Tensor, y: Tensor):one_hot_y = init.one_hot(logits.shape[-1], y)z_y = ops.summation(logits * one_hot_y, axes=(-1,))log_sum = ops.logsumexp(logits, axes=(-1,))return ops.summation((log_sum - z_y) / logits.shape[0])


class BatchNorm1d(Module):def init(self, dim, eps=1e-5, momentum=0.1, device=None, dtype="float32"):super().init()self.dim = dimself.eps = epsself.momentum = momentumself.weight = Parameter(init.ones(dim, device=device, requires_grad=True))self.bias = Parameter(init.zeros(dim, device=device, requires_grad=True))self.running_mean = init.zeros(dim)self.running_var = init.ones(dim)


def forward(self, x: Tensor) -> Tensor:    if self.training:        batch_mean = x.sum((0,)) / x.shape[0]        batch_var = ((x - batch_mean.broadcast_to(x.shape)) ** 2).sum(            (0,)        ) / x.shape[0]        self.running_mean = (            1 - self.momentum        ) * self.running_mean + self.momentum * batch_mean.data        self.running_var = (            1 - self.momentum        ) * self.running_var + self.momentum * batch_var.data        norm = (x - batch_mean.broadcast_to(x.shape)) / (            batch_var.broadcast_to(x.shape) + self.eps        ) ** 0.5        return self.weight.broadcast_to(x.shape) * norm + self.bias.broadcast_to(            x.shape        )    else:        norm = (x - self.running_mean.broadcast_to(x.shape)) / (            self.running_var.broadcast_to(x.shape) + self.eps        ) ** 0.5        return self.weight.broadcast_to(x.shape) * norm + self.bias.broadcast_to(            x.shape        )
复制代码


class LayerNorm1d(Module):def init(self, dim, eps=1e-5, device=None, dtype="float32"):super().init()self.dim = dimself.eps = epsself.weights = Parameter(init.ones(dim, requires_grad=True))self.bias = Parameter(init.zeros(dim, requires_grad=True))


def forward(self, x: Tensor) -> Tensor:    mean = (        (ops.summation(x, axes=(-1,)) / x.shape[-1])        .reshape((x.shape[0], 1))        .broadcast_to(x.shape)    )    var = (        (ops.summation((x - mean) ** 2, axes=(-1,)) / x.shape[-1])        .reshape((x.shape[0], 1))        .broadcast_to(x.shape)    )    deno = (var + self.eps) ** 0.5    return self.weights.broadcast_to(x.shape) * (        (x - mean) / deno    ) + self.bias.broadcast_to(x.shape)
复制代码


class Dropout(Module):def init(self, p=0.5):super().init()self.p = p


def forward(self, x: Tensor) -> Tensor:    if self.training:        mask = init.randb(*x.shape, p=1 - self.p) / (1 - self.p)        x = x * mask    return x
复制代码


class Residual(Module):def init(self, fn: Module):super().init()self.fn = fn


def forward(self, x: Tensor) -> Tensor:    return self.fn(x) + x
复制代码


Module 参数初始化时,我们需要实现一些常见的参数初始化方法,如 kaiminguniform 等。具体代码参考 initinitializers.py,这里就不展开了。还有一些更复杂的 Module,如 RNN、CNN,这里暂时没有实现了。如果你对大语言模型中常见的 Module,比如 transformer 的实现感兴趣,可以参考我的文章:从 0 搭建 LLM 不再难!这个 PyTorch 项目帮你吃透大模型底层逻辑以及对应的代码仓库:llm-from-scratchoptimizeroptimizer 用于更新模型参数以最小化损失函数。 这里实现了 SGD 和 Adam 两种 optimizer。class Optimizer:def init(self, params):self.params = params


def step(self):    raise NotImplementedError()
def reset_grad(self): for p in self.params: p.grad = None
复制代码


class SGD(Optimizer):def init(self, params, lr=0.01, momentum=0.0, weight_decay=0.0):super().init(params)self.lr = lrself.momentum = momentumself.u = defaultdict(float)self.weight_decay = weight_decay


def step(self):    for p in self.params:        if self.weight_decay > 0:            grad = p.grad.data + self.weight_decay * p.data        else:            grad = p.grad.data        self.u[p] = self.momentum * self.u[p] + (1 - self.momentum) * grad        p.data = p.data - ndl.Tensor(self.lr * self.u[p], dtype=p.dtype)
复制代码


class Adam(Optimizer):def init(self,params,lr=0.01,beta1=0.9,beta2=0.999,eps=1e-8,weight_decay=0.0,):super().init(params)self.lr = lrself.beta1 = beta1self.beta2 = beta2self.eps = epsself.weight_decay = weight_decayself.t = 0


    self.m = defaultdict(float)    self.v = defaultdict(float)
def step(self): self.t += 1 for p in self.params: if self.weight_decay > 0: grad = p.grad.data + self.weight_decay * p.data else: grad = p.grad.data self.m[p] = self.beta1 * self.m[p] + (1 - self.beta1) * grad self.v[p] = self.beta2 * self.v[p] + (1 - self.beta2) * (grad**2) unbiased_m = self.m[p] / (1 - self.beta1**self.t) unbiased_v = self.v[p] / (1 - self.beta2**self.t) p.data = p.data - ndl.Tensor( self.lr * unbiased_m / (unbiased_v**0.5 + self.eps), dtype=p.dtype, )
复制代码


DataLoaderDataLoader 用于批量加载数据集,支持 shuffle、并行加载等功能。 这里实现了一个简单的 Dataset 和对应的 DataLoader,具体代码参考 databasic.py。以及常见的数据转化方法总结通过本文的介绍,我们从零构建了一个完整的轻量级深度学习框架 Needle,它包含了深度学习框架的所有核心功能:自动微分系统:基于计算图实现了自动微分功能,支持动态构建计算图和反向传播。张量运算系统:实现了丰富的张量操作,包括元素级运算、矩阵运算、归约运算和变形运算等。多后端支持:提供了 NumPy 后端用于快速原型开发和 CPU 计算,以及自定义 NDArray 后端用于更高性能的 CPU 和 CUDA 加速。神经网络模块:实现了线性层、卷积层等基础组件,以及 ReLU、Sigmoid 等激活函数,支持构建复杂的神经网络模型。优化算法:实现了 SGD、Momentum、RMSprop 和 Adam 等常用优化算法,支持权重衰减和学习率调度等功能。数据加载系统:提供了 Dataset 接口和 DataLoader,支持批量加载数据和数据预处理。如果你对深度学习框架的实现感兴趣,可以访问项目的 GitHub 仓库获取完整代码:https://github.com/fangpin/dl-framework。如有任何疑问欢迎与我交流,你也可以参考获取更多理论知识。

用户头像

方品

关注

还未添加个人签名 2025-11-16 加入

还未添加个人简介

评论

发布
暂无评论
从0构建深度学习框架——揭秘深度学习框架的黑箱_AI_方品_InfoQ写作社区