写点什么

如何在生成式 AI 里使用 Ray Data 进行大规模 RAG 应用的 Embedding Inference

作者:Zilliz
  • 2024-08-06
    上海
  • 本文字数:4614 字

    阅读完需:约 15 分钟

如何在生成式AI里使用 Ray Data 进行大规模 RAG 应用的 Embedding Inference

检索增强生成 (RAG) 是企业级生成式 AI(GenAI)应用的热门案例之一。多数 RAG 教程演示了如何利用 OpenAI API 结合 Embedding 模型和大语言模型(LLM)来进行推理(Inference)。然而,在开发过程中,如果能使用开源工具,就可以免去访问自己数据的费用,同时也能加快迭代。

在 Embedding 步骤(即将数据转换为向量的过程)中,使用 Ray Data 取得的性能提升尤为显著。相比于使用 Pandas,采用 Ray Data 等工具对批量推理请求进行汇总处理可以显著节省资源和处理时间。例如,在一台配备 16GB RAM 的 Mac M2 笔记本电脑上,仅使用四个 worker node,Ray Data 的处理速度就比 Pandas 快了 60 倍!本文将详细介绍使用 Milvus + Ray Data 进行 Embedding inference 的性能。

我们的开源 RAG 技术栈包括:

  • BGM-M3 Embedding 模型:该模型能一次性生成三种类型的向量,包括稀疏向量、稠密向量和多向量。

  • Ray Data:高效的分布式 Embedding inference 推理工具。

  • AWS S3:用于临时存储推理结果。

  • Milvus 或 Zilliz Cloud:用作向量数据库。

示例数据来源:Kaggle 的 IMDB 海报数据集

开源 RAG 技术栈

BGM-M3 Embedding 模型

BGM-M3 Embedding 模型是一种强大的多功能 Embedding 工具,其特点在于能够处理多语言(Multi-Linguality)、多功能(Multi-Functionality)和多粒度(Multi-Granularity)的数据,因此得名“M3”。该模型支持超过 100 种语言,并能计算三种常见的 Embedding 类型:稀疏向量、稠密向量和多向量。它还可以处理各种长度的文本——从短句到长文档,最多可支持 8192 个 token。更多详情,请参考论文或通过 HugginFace 网站了解此模型

Milvus 2.4 版本现已集成 BGM-M3 Embedding 模型

Ray Data

有长期运行的数据转换任务?

Ray Data

具备可扩展的数据处理能力,能够便捷和快速地在多台机器(CPU、GPU 等)上并行处理大量数据。Ray Data 尤其适用于数据可以被分割,能够并行处理的场景,例如同时进行的多个数据块切分和 Embedding 转换的任务。Ray Data 底层是一个强大的流式执行引擎,能够最大化集群中的 GPU 利用率。与使用在线服务(如 OpenAI 嵌入 API)运行嵌入相比,使用 Ray Data 进行离线嵌入作业可以节省大部分成本。

Anyscale

Ray 的托管平台。您可以轻松地在 Anyscale 上利用 GPU 机器扩展 Embedding 任务。

Milvus 和 Zilliz Cloud

RAG(Retrieval Augmented Generation)应用之所以能够迅速响应,关键在于其背后强大的向量数据库。Milvus 专为处理大规模业务和海量数据设计。与其他向量数据库不同,Milvus 能够根据数据量的增长灵活地进行扩展。Milvus 的存储、索引和查询是独立的,可以单独垂直扩展或水平扩展,这一设计让 RAG 应用能够迅速响应用户的实时查询需求,同时在收到查询前和收到查询时,Milvus 能够智能地进行离线计算。除了强大的性能外,Milvus 还提供了多种企业级重要特性,如多租户和基于角色的访问控制(RBAC)、高可用等。

Zilliz Cloud

基于开源的 Milvus 搭建的全托管向量数据库云服务。

设置 RAG 工具

本教程中将使用 Milvus 的 Python SDK、Ray Data、Amazon S3 和 Zilliz Cloud。

开始前,请先注册 AWS 账号以使用 Amazon S3。然后前往 console.aws.amazon.com > IAM > My security credentials > Create access key。复制并保存密钥(Access key 和 secret key)。

安装所需的工具库并运行 aws config。请在文件中输入 AWS 密码。

pip install boto3 pip install awscli –force-reinstall –upgradeaws config #fill in your key and secret keymore ~/.aws/credentials #make sure this looks correct
复制代码

安装 Ray Data。

pip install -U "ray[data]"
复制代码

安装 Pymilvus。

pip install -U pymilvus "pymilvus[model]" langchain
复制代码

PyMilvus 2.4 版本及以上已打包 BGE-M3 embedding 模型。

import ray, os, pprint, time, boto3from langchain.text_splitter import RecursiveCharacterTextSplitterimport numpy as npimport pymilvusprint(pymilvus.__version__) # must be >= 2.4.0from pymilvus.model.hybrid import BGEM3EmbeddingFunction
复制代码

如需使用 Zilliz,请先注册账号并创建集群。

准备数据

本文使用了Kaggle IMDB 海报数据作为数据集,其中包含大约 48,000 部电影、影评、海报链接以及一些元数据。

我们将所有文本字段(电影名称、描述、影评文本)复制到名为‘text’的一列中,并将其保存为 Parquet 格式,因为这种文件格式比 CSV 更高效。

生成 Embedding 向量

根据以下步骤生成 Embedding 向量:

  1. 切分数据: 将输入文本切分为片段,将语义上相关的文本片段保存在一起。

  2. 调用推理(inference)模式下的 Embedding 模型,用于生成文本片段的 Embedding 向量。

Ray Data 可以并行处理以下数据操作请求:

  1. flat_map():切分数据(输出的行数将多于输入行数)。

  2. map_batches():调用 Embedding 模型。

chunk_size = 512chunk_overlap = np.round(chunk_size * 0.10, 0)
# Define a LangChain text splitter.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len) #len is a built-in Python function
# 1. Define a regular python function for chunking.def chunk_row(row, splitter=text_splitter):
# Copy the row columns into metadata. metadata = row.copy() del metadata['text'] # Remove text from metadata
# Split the text into chunks. chunks = splitter.create_documents( texts=[row["text"]], metadatas=[metadata]) chunk_list = [{ "text": chunk.page_content, **chunk.metadata} for chunk in chunks]
return chunk_list
# 2. Define a class with a callable method to compute embeddings.class ComputeEmbeddings: def __init__(self): # Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder. # https://huggingface.co/BAAI/bge-m3 self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu") print(f"dense_dim: {self.model.dim['dense']}") print(f"sparse_dim: {self.model.dim['sparse']}")
def __call__(self, batch):
# Ray data batch is a dictionary where values are array values. # BGEM3EmbeddingFunction input is docs as a list of strings. docs = list(batch['text'])
# Encode the documents. bge-m3 dense embeddings are already normalized. embeddings = self.model(docs) batch['vector_dense'] = embeddings['dense'] return batch
if __name__ == "__main__":
FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"
# Load and transform data.
ds = ray.data.read_parquet(FILE_PATH)
# Chunk the input text chunked_ds = ds.flat_map(chunk_row)
# Compute embeddings with a class that calls the embeddings model. embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)
# Save the embeddings to S3 in a folder of parquet part files. embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')
复制代码

运行前,需要提交 Ray 任务:

  1. 将代码保存至 Python 脚本文件。本例中将文件命名为 ray_data_demo.py

  2. 在本地运行前,请先创建一个全新的路径,路径中只可包含 .py 脚本文件和 .parquet 数据文件。本例中将这个路径命名为ray_cluster

  3. 运行 Python 脚本以启动 Ray 集群并自动提交任务。

  4. 打开 http://127.0.0.1:8265,查看集群和任务。

Embedding 速度提升 60 倍

表格展示了在 16GB M2 笔记本电脑上 Embedding 任务的用时。示例使用了 1 个单节点 Ray 集群用于批量处理任务,并发为 4。Pandas 用时更长,原因是 Pandas 只有一个处理器。但是 Ray Data 有 4 个处理器。 Pandas 和 Ray 在处理大型数据时性能都更出色。

将数据从 S3 批量插入到 Milvus 或 Zilliz Cloud 中

Milvus 和 Zilliz Cloud 都支持批量插入(bulk insert)数据,可以直接从 AWS、GCP 或 Azure 对象存储中导入 Embedding 数据。除了界面外,Zilliz Cloud 还提供 RESTful API 和 SDK

对于大量 Embedding 数据,使用 bulk insert 可以显著节省机器资源并缩短插入时间,与普通 insert 相比更为高效。此外,通过 bulk insert 构建的向量搜索索引比普通 insert 的索引更高效。

在 Zilliz Cloud 界面上仅需轻击几次鼠标便可轻松批量插入数据。首先,在集群中创建 1 个全新的 collection。创建时,需要开启 AutoID 和动态列,添加向量列,并设置向量列的向量维度。设置完毕后,点击“创建 Collection”。

接下来,点击“导入数据”并按照界面上的提示输入 parquet 文件的路径。(请注意,如果您的 S3 存储桶为非公开链接,您还需要输入 Access Key 和 Secret Key,以便 Zilliz Cloud 读取其中的数据)。Zilliz Cloud 支持 Amazon S3、Google Cloud Storage 或 Azure Blob Storage 等对象存储服务。点击“导入”开始将所有数据导入向量数据库 collection。

导入任务完成后,您可以选择在 collection 上构建索引,加速后续的向量搜索。

查询数据

为测试新导入至 Collection 中的数据,让我们基于电影数据进行提问。

def mc_run_search(question, output_fields, top_k=2, filter_expression=""):     # Embed the question using the same encoder.   embeddings = model_bgem3([question])   query_embeddings = embeddings['dense']
# Run semantic vector search using your query and the vector database. results = mc.search( COLLECTION_NAME, data=query_embeddings, search_params=SEARCH_PARAMS, output_fields=output_fields, # Milvus can utilize metadata in boolean expressions to filter search. filter=filter_expression, limit=top_k, consistency_level="Eventually" )
# Assemble retrieved context and context metadata. # The search result is in the variable `results[0]`, which is type # 'pymilvus.orm.search.SearchResult'. METADATA_FIELDS = [f for f in output_fields if f != 'chunk'] formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context( results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k) return formatted_results, context, context_metadata
SAMPLE_QUESTION = "muybridge horse movie"
# Return top k unique results with HNSW index.TOP_K = 2
# Define output fields to return.OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]
formatted_results, context, context_metadata = \ mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)
复制代码

以下为查询结果:

完整的 Ray Data 脚本可在此获取。

总结

文本介绍了如何使用 Ray Data 和 Milvus/Zilliz Cloud 的批量插入功能大幅加速向量生成和加载过程。使用 Milvus 能够高效构建索引、节省计算资源、提升向量搜索速度。

发布于: 21 小时前阅读数: 2
用户头像

Zilliz

关注

Data Infrastructure for AI Made Easy 2021-10-09 加入

还未添加个人简介

评论

发布
暂无评论
如何在生成式AI里使用 Ray Data 进行大规模 RAG 应用的 Embedding Inference_人工智能_Zilliz_InfoQ写作社区