写点什么

DeepSeek Smallpond 在火山引擎 AI 数据湖的探索实践

  • 2025-03-27
    北京
  • 本文字数:5628 字

    阅读完需:约 18 分钟

资料来源:火山引擎-开发者社区

DeepSeek Smallpond 介绍

Smallpond 是一套由 DeepSeek 推出的 、针对 AI 领域,基于 Ray 和 DuckDB 实现的轻量级数据处理引擎,具有以下优点:

1.轻量级

2.高性能

3.支持规模大

4.无需运维

5.Per Job 的资源调度

快速开始

Smallpond 提供了两套 API(具体介绍见下文),一套是 High-level 的 Dataframe API,一套是 Low-level 的 Logicalplan API。前者简单、易理解,使用上非常类似 Pandas、PySpark 等引擎;后者灵活度高,可以实现更加复杂的数据处理逻辑。

·Dataframe API

import smallpond

sp = smallpond.init()

df = sp.read_parquet("path/to/dataset/*.parquet")

df = df.repartition(10)

df = df.map("x + 1")

df.write_parquet("path/to/output")

当前 Dataframe API 功能还比较薄弱,针对一些高级场景,比如定义 Ray 运行参数、GPU 等尚无法设置。

·LogicalPlan API

from smallpond.logical.dataset import ParquetDataSet

from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan

from smallpond.execution.driver import Driver

def my_pipeline(input_paths: List[str], npartitions: int):

ctx = Context()

dataset = ParquetDataSet(input_paths)

node = DataSourceNode(ctx, dataset)

node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)

node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")

return LogicalPlan(ctx, node)

if name == "__main__":

driver = Driver()

driver.add_argument("-i", "--input_paths", nargs="+")

driver.add_argument("-n", "--npartitions", type=int, default=10)

plan = my_pipeline(**driver.get_arguments())

driver.run(plan)

python script.py -i "path/to/.parquet" -n 10 Ray # Ray 引擎 python script.py -i "path/to/.parquet" -n 10 scheduler # built-in 引擎

注意,Smallpond 支持两种分布式引擎(具体介绍见下文),一种是 Ray 引擎,一种是 Built-in 引擎。使用方式见上文脚本所示。

架构介绍

下图为 Smallpond 架构:

整体架构类似于 Spark 的架构,其组件 Dataframe、Logicalplan、Physicalplan、Scheduler 等 Spark 都有对应,是一个典型的  批处理形式 SQL 内核架构 。

1.DataFrame 的接口目前只能支持 Ray Engine。

2.最底层是 存储层 。这个存储有两个作用:

·作为源数据和中间执行数据的存储,可以被 mount 到本地路径;

·如果选用 Built-in 执行引擎,这个存储还是 task 的序列化存储用于从 driver 节点向 executor 节点派发任务。除了 3FS 存储,Smallpond 还支持 fsspec 接口 ,从而对接其他存储。

3.引擎层 。这里有两个选项,一个是 Ray,一个是 Built-in (run driver 的时候通过 mode 来指定,如果选项是 Ray,走 Ray 引擎,如果选项是 Scheduler,走 Built-in 引擎)。官方说两套引擎是历史原因,未来会逐渐合并。

4.执行层 。完全类似 Spark 的实现,有 Logicalplan,如果选用 Dataframe 接口,还有优化器支持。最后的物理计划生成 task,会被调度器扔到远端的 worker 计算。task 的执行有两种选择:DuckDB 和 Arrow(官方文档未给出)。

5.API 层 。支持 High-level 与 Low-level 的 API。

主要特点

特点 1:使用 Ray 做分布式调度和执行

Smallpond 使用 Ray 作为其分布式执行引擎之一,另一种为 Built-in 引擎。根据笔者测试,Ray 引擎相较于 Built-in 引擎有明显的性能方面的优势。一个可能重要的原因是,在 Built-in 引擎中,driver 向 executor 发送序列化任务不是通过 RPC 进行,而是通过共享存储方式进行,这个过程 task 序列化需要落共享存储。另外 Ray 相比有非常高效的 task 调度能力。

特点 2:MPI 的支持与 numa 绑定

这是一个可能容易被忽视的创新点:借助 MPI 框架,用户可以自定义任务,使用 MPI 做高效的集合通信。上文已经说到,用户完全可以自定义自己的 Python 脚本,而用户可以在自己的 Python 脚本里写 MPI 程序,从而使用 MPI 做高效的集合通信。

同时,worker 也做了 NUMA 绑定,做到更加高效的内存存取。

另外,代码中设置了 openmp 的环境变量。用户可以使用单核多线程来加速程序。

特点 3:极具灵活性的 Low Level API

通过 Low Level API,用户不但可以自己定义 map、filter 等典型的 SQL 类型算子,也可以定义非 SQL 算子,例如可以定义 PythonScriptNode,用于执行 Python 脚本。

这样做的好处在于, 极大地增强了数据处理的灵活性 :某些数据处理需求可能不方便使用类似 Dataframe 的 map 算子来处理,就可以写 Python 代码自由地处理这些任务了。

从这个角度看,这个 Logicalplan 已经超出了 Spark Logicalplan 的范畴 ,兼具了一些类似于“ 工作流调度 ”的能力,可以调度进程。

特点 4:与 3FS 的结合

Smallpond 将 3FS 挂载到本地,可以利用 3FS 的性能优势,结合 DuckDB 的优秀处理能力,达到很高的处理效率。

不过,笔者认为与 3FS 的结合不能作为一个创新点来看待,因为两者是松耦合的(非深度结合),只是说运行在 3FS 上会使得 Smallpond 运行地更快,而挂载 filesystem 到本地实现分布式计算也是一个常规的行为。

火山引擎

 AI 数据湖 LAS 介绍

总体架构

随着 LLM 和多模态 AI 技术的飞速发展,非结构化数据量呈指数级增长,这极大地增加了数据管理、计算和存储的复杂性。传统的数据湖解决方案已难以适应 AI 场景下对数据的新需求。为了应对这一挑战,新一代数据湖必须解决以下多模态数据带来的关键问题:

·数据管理 :传统数据管理侧重于库表结构,而面对多模态非结构化数据,如何实现高效管理。

·数据计算 :如何从非结构化数据中挖掘潜在价值,如何提高 CPU 和 GPU 利用率,如何使用模型来处理数据。

·数据存储 :传统数据湖格式在非结构化数据存储方面存在局限,是否可实现全模态数据的统一湖格式存储。

·AI 场景支撑 :多模态数据湖如何支撑 预训练、后训练、知识库、AI 搜索、智能体、安全合规 等场景的智能化应用。

火山引擎

基于内外部客户的实践,推出了一款面向 AI 场景的多模态数据湖服务 。总体架构如下 

功能介绍

LAS 提供了如下功能:

1.数据集管理 。用户可以根据数据的使用场景创建不同类型的数据集。比如,针对大规模预训练场景的大规模数据集,可以使用 LAS 的分布式处理能力;对于后训练阶段的 SFT 场景,LAS 推出了数据洞察以及细粒度的数据编辑功能。此外,LAS 还支持数据集多版本,满足算法人员在不同数据版本之间做对比实验的需求。

2.统一 Catalog 。用户可以将自己的数据注册为 catalog table,即能够使用平台提供的针对格式化数据的计算与分析工具。

3.丰富的算子支持 。LAS 提供了针对文本、图片、视频、音频、文档等类型的 100+ 算子,用户可以一键调用,标准化自己的数据处理流程。

4.工作流支持 。通过工作流,用户可以提交各种类型的数据处理作业,比如除了内置的算子标准化作业,还支持用户提交 Python 作业、Spark 作业、Ray 作业等等。

5.多数据湖格式/数据源 。支持 lance、Iceberg、Parquet、Json、CSV、VikingDB、Opensearch 等,满足各种场景需求。例如,针对训练或者微调过程,需要有高性能的点查需求,用户可以选择 lance;针对线上业务数据回流场景,可以选择 Iceberg;针对 RAG 场景,可以选择 VikingDB 作为数据 sink。

6.存算分离架构与分布式数据缓存 。LAS 推荐使用存算分离架构,以减少存储成本,提升计算的可扩展性。同时,LAS 针对存算分离场景提供了 Proton 缓存服务,以加速对 TOS 数据的访问。

Smallpond 与 LAS 融合实践

Smallpond DataFrame + LAS Ray 计算资源组

当前,有很多客户在云上运行他们的计算解决方案的同时,也希望能够在云上用上 Smallpond。为此,LAS 提出了基于 Ray 的云上方案,如下图所示:

该云上方案具备五大优点:

·环境准备简单:无需用户需手工添加节点,打通 SSH,构建 MPI 集群。

·资源隔离:支持对 IO/网络/内存等更加严格的资源条件。

·认证鉴权:对资源的申请做用户鉴权。

·资源统一管理:用户无需手动管理计算资源,开箱即用。

·K8s 调度:完全交由平台运维解决,支持排队,抢占等。

在该方案中,LAS 中的集群能够无缝的与 Smallpond 融合,只需要在云上开通资源 ,将 ray_address 设置成已开通的资源队里,其余逻辑无需改造,就可以完成数据预处理。

sp = smallpond.init(ray_address= "ray://192.xxxx:10001")

Smallpond 基于 Proton/TOS-FS 对接云存储

Smallpond 不仅支持 3FS 协议的存储,还支持 fuse 和 fsspec 的接口。因此,我们可以针对大规模数据处理的场景,将数据存储在 TOS 对象存储上(LAS 支持 TOS 的 fsspec 协议访问),而将训练场景的数据放置到 vePFS 存储中。

扩展 Datasource-Lance 多模态数据湖

Lance 是新一代的列式存储结构,它被设计用来存储视频,图像,音频以及普通列式数据。它可以被存储在任何 POSIX 文件系统以及像 S3,TOS 等云存储上。Lance 允许数据被随机访问,在随机访问场景下它比 Parquet 性能快 100 倍。同时它具备向量检索,零拷贝的能力,并且与 Pyarrow,DuckDB 生态紧密结合。

Lance 的主要能力:

1.多版本管理 : Lance 是数据湖, 提供了 多版本的能力 , 能够快速的实现增删查改以及结构变更的需求, 也提供 time travel 的能力。

2.多维分析 : Lance 能够对接分布式计算引擎, 例如 Spark/Ray, 完成大规模数据分析需求。

3.随机检索 : Lance 构建了 主键索引和二级索引 , 能够实现快速的随机检索。

4.向量检索 : Lance 上实现了 IVF-PQ 和 IVF-HNSW 向量索引, 以及全文索引, 具有 混合搜索能力 。

5.多模数据 : Lance 自定义了底层文件格式, 能够写入大宽表和大宽列, 直接在表字段中存储多模数据, 例如文本/图像。

6.开放生态 : Lance 支持 Python/Java 客户端, 内存采用 arrow 格式, 适配了很多 AI 生态的引擎和大数据计算引擎。

LAS 中提供了完整的产品化的 Lance 湖服务能力,包括元数据管理,小文件合并服务, 而 Smallpond 也是能够无缝的接入 lance 的数据源。

以下是 Smallpond 支持 Lance 的实践样例:

import lance

import arrow

from smallpond.logical.dataset import ArrowTableDataSet

从 Lance 格式的文件中读取数据

lance_ds = lance.dataset("example.lance")

将数据转换为 Arrow 的 Table

arrow_table = lance_ds.to_table()

将 arrow_table 转换成 smallpond 的 dataset

smallpond_dataset = ArrowTableDataSet()

集成 LAS 的算子

Smallpond 支持 map/map_batches 的并行算子逻辑,其接口方式跟 Ray 类似。而火山引擎 LAS 的算子服务能力接口是可以同时兼容 Spark/Ray。因此 LAS Built-in 的算子也都能够直接跑在 Smallpond 上。

实践示例

场景描述:在 RAG 架构的离线入库场景,通过 LAS 产品提供的 分布式计算能力 Smallpond,实现从对象存储到向量数据库的全流程优化。在该链路中,读取数据后,利用 Smallpond 高效完成数据的切分(chunk)和向量化处理,并最终将向量数据批量入库至向量数据库。

同时,LAS 提供有 Chunk 和 Embedding 的算子,平台产品界面中,有对算子进行详细和介绍和 Demo 示例,便于用户快速搭建该链路。此外,也支持自定义算子。

以下按照自定义算子示例:

Step 1:创建 LAS 计算资源

在 LAS 平台中提供有 CPU 和 GPU 的计算资源队列。由于 Embedding 消耗的算力较大,建议采用 GPU 计算资源。

Step 2:实现该流程的代码

定义三种处理器:Chunk、Embedding、写向量数据库。由于 SmallPond 未提供写入向量数据库的接口,可以利用 map_batches 方法实现。

import copy

import logging

import pyarrow as pa

import smallpond

from FlagEmbedding import FlagModel

from llama_index.core import Document

from llama_index.core.node_parser import SentenceSplitter

from volcengine.viking_db import Data, VikingDBService

class ChunkProcessor:

"""将文本切分成 chunk 片段,用于文本检索等场景。"""

class EmbeddingProcessor:"""使用 BGE 系列模型计算文本的 embedding。"""

class VikingdbSinkProcessor:""" 将数据写入到火山的向量数据库 VikingDB 中 """

if name == "__main__":

初始化

sp = smallpond.init()

数据处理流水线 

sp.read\_csv(paths, schema)\ .flat\_map(ChunkProcessor(input\_col\_name = "index"))\ .map\_batches(EmbeddingProcessor(input\_col\_name = "chunk"))\ .map\_batches(VikingdbSinkProcessor(collection\_name = vikingdb\_dataset))\ .take\_all() 

Step 3:在 LAS 平台提交该示例代码

LAS 平台提供直接运行 Python 脚本的能力。

1.在 LAS 平台中,算子管理菜单中 上传上述代码,以及代码依赖的镜像。也可以使用 LAS 平台提供的镜像。LAS 平台提供的镜像有 LAS 算子执行的镜像,也提供含 PyTorch 基础镜像等等。

2.LAS 平台中,工作流中通过拖拽式方式,将该算子拖到画布中点击执行按钮,便可启动任务。界面上可以查看到执行日志和进度。

小结与规划

Smallpond 是 DeepSeek 开源的一个优秀的轻量级、高性能 AI 场景数据处理框架,一经推出便引起了业界的关注,项目 Star 数快速增长。其优点以及创新点上文已经有了详细的介绍,但由于项目处于开源初期,仍有很多问题有待解决,比如:

1.支持已有 Ray Cluster 的接入方式;

2.数据源需要支持 S3 协议、TOS 等其他对象存储的协议;

3.适配更多的数据格式,尤其是面向多模的数据格式,例如 Lance,LMDB,Webdataset,Pickle 等。

相信随着时间的推移,上述这些问题都能得到很好的解决。

根据项目介绍,Smallpond 的目标是解决 AI 场景灵活的数据处理需求,这与火山引擎 LAS 多模数据湖的目标是相同的。LAS 数据湖配套有自己的数据处理框架,以及大量的用于多模数据处理的算子,用户可以开箱使用。而如果用户选择使用类似 Smallpond 的数据处理框架,通过 LAS 也能很好的支持,同时也能很好的发挥云的优势。

在未来,火山引擎 LAS 会考虑 结合 Smallpond 优秀的架构能力与云低成本、易运维、以及生态协同的优势 ,为用户提供更加强大的 AI 数据处理功能。


用户头像

还未添加个人签名 2022-01-25 加入

还未添加个人简介

评论

发布
暂无评论
DeepSeek Smallpond 在火山引擎 AI 数据湖的探索实践_火山引擎开发者社区_InfoQ写作社区