鲁迅说,“世上本没有路,走的人多了,也便成了路。”。
鲁迅说,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
鲁迅还说,“猛兽总是独行,牛羊才成群结对。”
网络上流传着鲁迅说过的各种名言,我们不禁怀疑,鲁迅到底说没说?原文是什么样的,出处又是哪里?想回答这个问题,最好的办法就是搜索原文。但是,使用传统搜索方式,错了一个字可能就搜索不到,不如试试语义搜索吧。
我们可以把鲁迅作品集向量化,储存到向量数据库中。然后搜索某条据说是鲁迅说过的话,最后通过大模型组织语言输出回答,告诉我们鲁迅有没有说过这句话。如果有,再让它附上原文和出处。这个过程,就是 RAG(Retrieval-Augmented Generation,检索增强生成)。
而对于较长的本文,直接向量化会导致信息缺失,需要把文本分割成多个块,分别向量化。打个比方,如果一篇文章是一张图片,组成文章的块就是图片的像素点。文章分割成的块越多,意味着图片的像素点越多,分辨率越高,图片也就越清晰。我会介绍三种常见的分块方法,并且比较基于它们的向量搜索和 RAG 响应有什么区别。
图片来源:Photo by Master Unknown on Unsplash
字数太多怎么向量化
在 [[01-怎么把大白话“变成”古诗词]] 这篇文章中,我详细介绍了使用 Milvus 创建向量数据库的整个过程,相关内容我就不再赘述了,直接给出代码。
Milvus 版本:>=2.4.0
安装依赖。
!pip install "pymilvus[model]" torch
复制代码
定义函数 vectorize_query 把文本向量化的函数。
import torchimport jsonfrom pymilvus import DataType, MilvusClientfrom pymilvus.model.hybrid import BGEM3EmbeddingFunction
# 将输入的文本向量化def vectorize_query(query , model_name = "BAAI/bge-small-zh-v1.5"): # 检查是否有可用的CUDA设备 device = "cuda:0" if torch.cuda.is_available() else "cpu" # 根据设备选择是否使用fp16 use_fp16 = device.startswith("cuda") # 创建嵌入模型实例 bge_m3_ef = BGEM3EmbeddingFunction( model_name=model_name, device=device, use_fp16=use_fp16 ) # 把输入的文本向量化 vectors = bge_m3_ef.encode_documents(query) return vectors
复制代码
下一步就是把鲁迅作品集向量化了。但是且慢,让我们先看一下鲁迅作品集[^1]的文本格式:
[ { "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "content": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人... }, { "book": "伪自由书", "title": "王道诗话", "author": "鲁迅", "type": "", "source": "", "date": "", "content": "《人权论》是从鹦鹉开头的,据说古时候有一只高飞远走的鹦哥儿... }, ...]
复制代码
文本中的“content”字段的值,就是一篇文章。有的文章字数多达几万字,用几百维的向量根本无法表达文章的语义细节。怎么办?就像前面说的,既然全文字数太多,我们就把文章切成几块,对每个块再做向量化。这个操作叫做“分块”。
根据固定字数分块
最简单的分块方法是 fixed_chunk(固定分块),是按照字数分块,比如每隔 150 个字就分割一次。比如,对于《最艺术的国家》这篇文章使用 fixed_chunk,再通过 ChunkViz 把分块结果可视化,如下图所示:
我们用代码来实现 fixed_chunk。
import json
# fixed_chunkdef fixed_chunk( input_file_path, output_file_path, chunk_size, field_name ): with open(input_file_path, 'r', encoding='utf-8') as file: data_list = json.load(file) chunk_data_list = [] for data in data_list: # 获取指定字段的值 text = data[field_name] # 对指定字段分割 chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] for idx, chunk in enumerate(chunks): chunk_data_list.append({ # 使用原始文章的 id 生成chunk的id "id": f'{data["book"]}#{data["title"]}#chunk{idx}', "book" : data["book"], "title" : data["title"], "author" : data["author"], "type" : data["type"], "source" : data["source"], "date" : data["date"], "chunk" : chunk, # window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处 "window": "", "method": "fixed_chunk" }) with open(output_file_path, 'w', encoding='utf-8') as json_file: json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
input_file_path = "luxun_sample.json"output_file_path = "luxun_sample_fixed_chunk.json"chunk_size = 150field_name = "content"
fixed_chunk(input_file_path, output_file_path, chunk_size, field_name)
复制代码
运行代码,得到 luxun_sample_fixed_chunk.json 文件,格式和上文中的可视化结果一致。
[ { "id": "伪自由书 #最艺术的国家 #chunk0 ", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...", "window": "", "method": "fixed_chunk" }, { "id": "伪自由书 #最艺术的国家 #chunk1 ", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "民国。然而这民国年久失修...", "window": "", "method": "fixed_chunk" }, ...]
复制代码
你可能已经发现了,fixed_chunk 经常在句子中间分割,导致句子不连贯,语义的完整性被破坏。
根据标点符号分割
怎么解决这个问题呢?我们可以在标点符号处分割。但是这还不够,因为这样分割的话,块与块之间仍然是相互独立的了,缺少关联。打个比方,如果看《生活大爆炸》这样的单元剧,我们跳着看也没关系,不影响理解剧情。但是如果看《天龙八部》这样的连续剧,上一集讲的还是段誉为救钟灵去万劫谷拿解药,下一集他就瞬移到了少室山,用六脉神剑大战慕容复。我们会一头雾水,这中间到底发生了什么?
所以,连续剧的开头有“前情提要”,结尾有“下集预告”。同样,为了保证块与块之间语义的连贯,我们也要设计一个“重叠”部分,让下一个块的开头部分,重复上一个块的结尾部分。
听起来很复杂?不用担心,我们可以使用 LlamaIndex[^2] 库轻松实现这种分块方法—— semantic_chunk 。
安装 LlamaIndex 库。
pip install llama_index==0.11.16
复制代码
定义 semantic_chunk 分块函数。
# 导入SentenceSplitter用来分块from llama_index.core.node_parser import SentenceSplitter
# 定义semantic_chunk分块函数def semantic_chunk( input_file_path, output_file_path, # 块的大小 chunk_size, # 重叠部分的大小。小于 chunk_overlap 的句子,就会作为重叠部分 chunk_overlap, # 指定分块的字段 field_name, ) : # 初始化 SentenceSplitter,设置分块的参数 text_splitter = SentenceSplitter( # 指定段落分隔符 paragraph_separator="\n\n\n", # 指定主要分隔符 separator="。", # 指定次要分隔符 secondary_chunking_regex="[^,.;、。:]+[,.;、。:]?", # 指定块的大小 chunk_size=chunk_size, # 指定重叠部分的大小 chunk_overlap=chunk_overlap, ) with open(input_file_path, 'r', encoding='utf-8') as file: data_list = json.load(file) chunk_data_list = [] for data in data_list: text = data[field_name] chunks = text_splitter.split_text(text) for idx, chunk in enumerate(chunks): chunk_data_list.append({ # 使用原始文章的 id 生成chunk的id "id": f'{data["book"]}#{data["title"]}#chunk{idx}', "book" : data["book"], "title" : data["title"], "author" : data["author"], "type" : data["type"], "source" : data["source"], "date" : data["date"], "chunk" : chunk, # window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处 "window": "", "method": "semantic_chunk" }) with open(output_file_path, 'w', encoding='utf-8') as json_file: json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
# 示例:input_file_path = "luxun_sample.json"output_file_path = "luxun_sample_semantic_chunk.json"chunk_size = 150chunk_overlap = 20field_name = "content"
semantic_chunk( input_file_path, output_file_path, chunk_size, chunk_overlap, field_name)
复制代码
执行上面的代码,得到 luxun_sample_semantic_chunk.json 文件,我们来看一下分块的结果:
[ { "id": "伪自由书#最艺术的国家#chunk0", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...中国的固有文化是科举制度,外加捐班之类。", "window": "", "method": "semantic_chunk" }, { "id": "伪自由书#最艺术的国家#chunk1", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "外加捐班之类。当初说这太不像民权...这对于民族是不忠,对于祖宗是不孝,", "window": "", "method": "semantic_chunk" }, ...]
复制代码
果然是在我们设置的标点符号处分块的,而且附带重叠部分,这样就能保证块与块之间语义的连贯了。
根据句子分块
对于上面的分块结果,你可能还不满意。虽然它根据标点符号分割,但是并不一定在句号处分割,无法保证句子的完整性。比如,对于这句话 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。 可能分割成 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵 和 是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮” 两个块。
为了解决这个问题,又诞生了一种分块方法,它根据句子而不是字数分割,也就是说,根据“。”、“!”和“?”这三个表示句子结束的标点符号分割,而不会受到字数的限制。但是,这种分割方式怎么实现重叠的功能呢?这也简单,把整个句子作为重叠部分就行了,叫做“窗口句子”。这种分块方法叫做 sentence_window。
比如,对于句子 ABCD,设置窗口大小为 1,表示原始句子的左右各 1 个句子为“窗口句子”。分块如下:
第一个句子:A。窗口句子:B。因为第一个句子的左边没有句子。
第二个句子:B。窗口句子:A 和 C。
第三个句子:C。窗口句子:B 和 D。
第四个句子:D。窗口句子:C。因为最后一个句子的右边没有句子。
前面两种分块方法,都是对 chunk 字段向量化。而这种分块方法,除了对 chunk 字段(也就是原始句子)向量化外,还会把窗口句子作为原始句子的上下文,以元数据的形式储存在文件中。
原始句子用来做向量搜索,而在生成回答时,窗口句子和原始句子会一起传递给大模型。这样做的好处是,只向量化原始句子,节省了储存空间。提供窗口句子作为原始句子的上下文,可以帮助大模型理解原始句子的语境。
理解原理了,我们用代码来实现吧。
导入依赖。
import reimport jsonfrom typing import Listfrom llama_index.core import Documentfrom llama_index.core.node_parser import SentenceWindowNodeParser
复制代码
定义函数 split_text_into_sentences,用来分割中英文句子。
def split_text_into_sentences(text): # 使用正则表达式识别中英文句子结束符 sentence_endings = re.compile(r'(?<=[。!?.!?])') sentences = sentence_endings.split(text) return [s.strip() for s in sentences if s.strip()]
复制代码
定义函数 sentence_window,基于句子对文本分块。
# 定义sentence_window分块函数def sentence_window( input_file_path, output_file_path, field_name, window_size ): # 设置一个用于文本解析的节点解析器 node_parser = SentenceWindowNodeParser.from_defaults( window_size=window_size, # 为窗口元数据指定一个键名为"window",用于在解析过程中存储窗口数据 window_metadata_key="window", # 为原始文本元数据指定一个键名为"original_text",用于在解析过程中存储原始文本 original_text_metadata_key="original_text", sentence_splitter = split_text_into_sentences ) with open(input_file_path, 'r', encoding='utf-8') as file: data_list = json.load(file) chunk_data_list = [] for data in data_list: text = data[field_name] # 将分割后的句子处理成节点。节点包含多个句子,类似于块 document = Document(text=text) nodes = node_parser.get_nodes_from_documents([document]) for idx, node in enumerate(nodes): chunk = node.metadata["original_text"] window = node.metadata["window"] chunk_data_list.append({ "id": f'{data["book"]}#{data["title"]}#chunk{idx}', "book": data["book"], "title": data["title"], "author": data["author"], "type": data["type"], "source": data["source"], "date": data["date"], "chunk": chunk, "window": window, "method": "sentence_window" }) with open(output_file_path, 'w', encoding='utf-8') as json_file: json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
# 执行句子分块input_file_path = "luxun_sample.json"output_file_path = "luxun_sample_sentence_window.json"field_name = "content"window_size = 1
sentence_window( input_file_path, output_file_path, field_name, window_size)
复制代码
让我们来看下分块的结果,字段“chunk”是原始句子,“window”里面包含了原始句子和窗口句子。
[ { "id": "伪自由书#最艺术的国家#chunk0", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。", "window": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。", "method": "sentence_window" }, { "id": "伪自由书#最艺术的国家#chunk1", "book": "伪自由书", "title": "最艺术的国家", "author": "鲁迅", "type": "", "source": "", "date": "", "chunk": "这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。", "window": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。 表面上是中性,骨子里当然还是男的。", "method": "sentence_window" }, ...]
复制代码
创建向量数据库
文本分块完成,接下来就是文本向量化,导入向量数据库了,这部分你应该比较熟悉了,我直接给出代码。
定义函数 vectorize_file,向量化 json 文件中指定的字段。
def vectorize_file(input_file_path, output_file_path, field_name): # 读取 json 文件,把chunk字段的值向量化 with open(input_file_path, 'r', encoding='utf-8') as file: data_list = json.load(file) # 提取该json文件中的所有chunk字段的值 query = [data[field_name] for data in data_list] # 向量化文本数据 vectors = vectorize_query(query) # 将向量添加到原始文本中 for data, vector in zip(data_list, vectors['dense']): # 将 NumPy 数组转换为 Python 的普通列表 data['vector'] = vector.tolist() # 将更新后的文本内容写入新的json文件 with open(output_file_path, 'w', encoding='utf-8') as outfile: json.dump(data_list, outfile, ensure_ascii=False, indent=4)
复制代码
为了比较 RAG 使用不同分块方法的效果,我们把三个分块文件全部向量化。
# 向量化固定分块的文件vectorize_file("luxun_sample_fixed_chunk.json", "luxun_sample_fixed_chunk_vector.json", "chunk")
# 向量化通过标点符号分块的文件vectorize_file("luxun_sample_semantic_chunk.json", "luxun_sample_semantic_chunk_vector.json", "chunk")
# 向量化通过句子分块的文件vectorize_file("luxun_sample_sentence_window.json","luxun_sample_sentence_window_vector.json", "chunk")
复制代码
接下来创建集合。为了能够在同一个集合中区分三种分块方法的搜索结果,我们设置参数 partition_key_field 的值为 method,它表示采用的分块方法。Milvus 会根据 method 字段的值,把数据插入到对应的分区中。打个比方,如果把集合看作一个 excel 文件,partition (分区)就是表格的工作表(Worksheet)。一个 excel 文件包含多张工作表,不同的数据填写在对应的工作表中。相应的,我们把不同的数据插入到对应分区中,搜索时指定分区,就可以提高搜索效率。
# 创建集合from pymilvus import MilvusClient, DataTypeimport time
def create_collection(collection_name): # 检查同名集合是否存在,如果存在则删除 if milvus_client.has_collection(collection_name): print(f"集合 {collection_name} 已经存在") try: # 删除同名集合 milvus_client.drop_collection(collection_name) print(f"删除集合:{collection_name}") except Exception as e: print(f"删除集合时出现错误: {e}") # 创建集合模式 schema = MilvusClient.create_schema( auto_id=False, enable_dynamic_field=True, # 设置partition key partition_key_field = "method", # 设置分区数量,默认为16 num_partitions=16, description="" ) # 添加字段到schema schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=256) schema.add_field(field_name="book", datatype=DataType.VARCHAR, max_length=128) schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=128) schema.add_field(field_name="author", datatype=DataType.VARCHAR, max_length=64) schema.add_field(field_name="type", datatype=DataType.VARCHAR, max_length=64) schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=64) schema.add_field(field_name="date", datatype=DataType.VARCHAR, max_length=32) schema.add_field(field_name="chunk", datatype=DataType.VARCHAR, max_length=2048) schema.add_field(field_name="window", datatype=DataType.VARCHAR, default_value="", max_length=6144) schema.add_field(field_name="method", datatype=DataType.VARCHAR, max_length=32) schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=512) # 创建集合 try: milvus_client.create_collection( collection_name=collection_name, schema=schema, shards_num=2 ) print(f"创建集合:{collection_name}") except Exception as e: print(f"创建集合的过程中出现了错误: {e}") # 等待集合创建成功 while not milvus_client.has_collection(collection_name): # 获取集合的详细信息 time.sleep(1) if milvus_client.has_collection(collection_name): print(f"集合 {collection_name} 创建成功") collection_info = milvus_client.describe_collection(collection_name) print(f"集合信息: {collection_info}")
collection_name = "LuXunWorks"uri="http://localhost:19530"milvus_client = MilvusClient(uri=uri)create_collection(collection_name)
复制代码
把数据插入到向量数据库。
from tqdm import tqdm# 数据入库def insert_vectors(file_path, collection_name, batch_size): # 读取和处理文件 with open(file_path, 'r') as file: data = json.load(file) # 将数据插入集合 print(f"正在将数据插入集合:{collection_name}") total_count = len(data) # pbar 是 tqdm 库中的一个进度条对象,用于显示插入数据的进度 with tqdm(total=total_count, desc="插入数据") as pbar: # 每次插入 batch_size 条数据 for i in range(0, total_count, batch_size): batch_data = data[i:i + batch_size] res = milvus_client.insert( collection_name=collection_name, data=batch_data ) pbar.update(len(batch_data)) # 验证数据是否成功插入集合 print(f"插入的实体数量: {total_count}")
# 设置每次插入的数据量batch_size = 100insert_vectors("luxun_sample_fixed_chunk_vector.json", collection_name, batch_size)insert_vectors("luxun_sample_semantic_chunk_vector.json", collection_name, batch_size)insert_vectors("luxun_sample_sentence_window_vector.json", collection_name, batch_size)
复制代码
创建索引。我们使用倒排索引,首先创建索引参数。
index_params = milvus_client.prepare_index_params()
index_params.add_index( # 指定索引名称 index_name="IVF_FLAT", # 指定创建索引的字段 field_name="vector", # 设置索引类型 index_type="IVF_FLAT", # 设置度量方式 metric_type="IP", # 设置索引聚类中心的数量 params={"nlist": 128})
复制代码
接下来创建索引。
milvus_client.create_index( # 指定为创建索引的集合 collection_name=collection_name, # 使用前面创建的索引参数创建索引 index_params=index_params)
复制代码
验证下索引是否成功创建。查看集合的所有索引。
res = milvus_client.list_indexes( collection_name=collection_name)print(res)
复制代码
返回我们创建的索引 ['IVF_FLAT']。再查看下索引的详细信息。
res = milvus_client.describe_index( collection_name=collection_name, index_name="IVF_FLAT")print(res)
复制代码
返回下面的索引信息,表示索引创建成功:
{'nlist': '128', 'index_type': 'IVF_FLAT', 'metric_type': 'IP', 'field_name': 'vector', 'index_name': 'IVF_FLAT', 'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
复制代码
接下来加载集合到内存。
print (f"正在加载集合:{collection_name}")milvus_client.load_collection (collection_name=collection_name)
复制代码
验证下加载状态。
print (milvus_client.get_load_state (collection_name=collection_name))
复制代码
如果返回 {'state': <LoadState: Loaded>},说明加载完成。接下来,我们定义搜索函数。
先定义搜索参数。
search_params = { # 度量类型 "metric_type": "IP", # 搜索过程中要查询的聚类单元数量。增加nprobe值可以提高搜索精度,但会降低搜索速度 "params": {"nprobe": 16}}
复制代码
再定义搜索函数。还记得前面我们在创建集合时,设置的 partition_key_field 吗?它会根据 method 字段的值,把数据插入到相应的分区中。而搜索函数中的 filter 参数,就是用来指定在哪个分区中搜索的。
def vector_search( query, search_params, limit, output_fields, partition_name):# 将查询转换为向量query_vectors = [vectorize_query(query)['dense'][0].tolist()]# 向量搜索res = milvus_client.search( collection_name=collection_name, # 指定查询向量 data=query_vectors, # 指定搜索的字段 anns_field="vector", # 设置搜索参数 search_params=search_params, # 设置搜索结果的数量 limit=limit, # 设置输出字段 output_fields=output_fields, # 在指定分区中搜索 filter=f"method =='{partition_name}'")return res
复制代码
再定义一个打印搜索结果的函数,方便查看。
# 打印向量搜索结果def print_vector_results(res): # hit是搜索结果中的每一个匹配的实体 res = [hit["entity"] for hit in res[0]] for item in res: print(f"title: {item['title']}") print(f"chunk: {item['chunk']}") print(f"method: {item['method']}") print("-"*50)
复制代码
下面我们就来看一看,fixed_chunk、semantic_chunk 和 sentence_window 三位选手在向量搜索上表现如何。首先搜索第一个句子:“世上本没有路,走的人多了,也便成了路。”。
# 比较不同分块方法产生的搜索结果query1 = ["世上本没有路,走的人多了,也便成了路。"]limit = 1output_fields = ["title", "chunk", "window", "method"]
# 定义分块方法列表chunk_methods = ["fixed_chunk", "semantic_chunk", "sentence_window"]
# 定义一个函数来执行搜索并打印结果def compare_chunk_methods(query, search_params, limit, output_fields, methods): for method in methods: res = vector_search(query, search_params, limit, output_fields, method) print(f"{method} 的搜索结果是:\n") print_vector_results(res) print("*" * 50)
# 调用函数进行比较compare_chunk_methods(query1, search_params, limit, output_fields, chunk_methods)
复制代码
搜索结果如下:
fixed_chunk 的搜索结果是:
title: 故乡chunk: 的人多了,也便成了路。 一九二一年一月。 method: fixed_chunk--------------------------------------------------semantic_chunk 的搜索结果是:
title: 六十六生命的路chunk: 跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,method: semantic_chunk--------------------------------------------------sentence_window 的搜索结果是:
title: 故乡chunk: 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。method: sentence_window--------------------------------------------------
复制代码
fixed_chunk 选手的确搜索到了原文,但是并不完整。这也是 fixed_chunk 分块的典型问题。
semantic_chunk 选手的表现让人失望,它并没有搜索到原文。但是它给了我们意外收获,搜索结果的意思和原文有些类似。也是向量数据库语义搜索功能的体现。
原文其实在这个块中:
我的愿望茫远罢了。我在朦胧中,眼前展开一片海边碧绿的沙地来,上面深蓝的天空中挂着一轮金黄的圆月。我想:希望本是无所谓有,无所谓无的。这正如地上的路;其实地上本没有路,走的人多了,也便成了路。 一九二一年一月。"
semantic_chunk 选手没有搜索到它,可能是因为这个块的前半部分和查询句子的语义相差较远。这也反应了分块对搜索结果的影响。
最后出场的 sentence_window 选手,给出了标准答案:
这正如地上的路;其实地上本没有路,走的人多了,也便成了路。
恭喜 sentence_window 选手完美找到了原文。因为它基于句子分割,能够更好地保存句子的语义。当然,这样做也是有代价的。你可以比较下这三种分块方法向量化后的文件,luxun_sample_fixed_chunk_vector.json 的大小是 11.5MPa,luxun_sample_semantic_chunk_vector.json 增加到了 16.1MPa,而 luxun_sample_sentence_window_vector.json 则达到了 49.2MPa,是前两者的 3 到 4 倍。
我们再来看看第二个句子,三位选手的表现如何:
“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
fixed_chunk 选手给出的句子仍然不完整,但是包含了完整的原文:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知
semantic_chunk 选手这次正常发挥,也找到了原文:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,
sentence_window 选手依旧给出了完美答案:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。
虽然三位选手都找到了原文,但是 sentence_window 选手返回的原文不但完整,而且没有包含无关内容,减少了干扰信息。
再来看看最后一个句子:
“猛兽总是独行,牛羊才成群结对。”
fixed_chunk 选手找到了类似的句子,但是包含了较多的无关内容:
兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。其次要防说话。人能说话,已经是祸胎了,而况有时还要做文章。所以苍颉造字,夜有鬼哭。鬼且反对,而况于官?猴子不会说话
semantic_chunk 找到的则是另外一句完全不相关的句子:
和一些狐群狗党趁势来开除她私意所不喜的学生们么?而几个在“男尊女卑”的社会生长的男人们,此时却在异性的饭碗化身的面前摇尾,简直并羊而不如,羊,诚然的弱的,但还不至于如此,我敢给我所敬爱的羊们保证!但是,在黄金世界还未到来之前,
我们最后看看 sentence_window 选手的表现:
猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。
别忘了 sentence_window 选手除了搜索到的原始句子,还能提供“窗口句子”作为上下文:
# 查看`sentence_window`方法的窗口句子method = "sentence_window"res_sentence_window = vector_search(query3, search_params, limit, output_fields, method)res_sentence_window = [hit["entity"] for hit in res_sentence_window[0]]for item in res_sentence_window: print(f"window: {item['window']}")
复制代码
窗口句子如下:
window: 然亦可见至道嘉猷,人同此心,心同此理,固无华夷之限也。 猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。 人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。
复制代码
在 RAG 应用中,把上下文句子一起传递给大模型,能让大模型更好地理解句子的语义,作出更好的回答。
调用大模型的 API
创建向量数据库这部分想必你已经轻车熟路了,下面我们来完成 RAG 应用的最后一个部分:生成。我们要把搜索到的句子传递给大模型,让它根据提示词重新组装成回答。
首先,我们要创建一个大模型的 api key,用来调用大模型。我使用的是 deepseek。为了保护 api key 的安全,把 api key 设置为环境变量“DEEPSEEK_API_KEY”。请把 <you_api_key> 替换成你自己的 api key。
import osos.environ['DEEPSEEK_API_KEY'] = <you_api_key>
复制代码
然后,再从环境变量中读取 api key。
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
复制代码
deepseek 使用与 OpenAI 兼容的 API 格式,我们可以使用 OpenAI SDK 来访问 DeepSeek API。
# 安装 openai 库pip install openai
复制代码
接下来创建 openai 客户端实例。
# 导入openai库from openai import OpenAI
# 导入os库import os
# 创建openai客户端实例OpenAI_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com")
复制代码
根据 deepseek api 文档的说明,定义生成响应的函数 generate_response。model 是我们使用的大模型,这里是 deepseek-chat。temperature 决定大模型回答的随机性,数值在 0-2 之间,数值越高,生成的文本越随机;值越低,生成的文本越确定。
# 定义生成响应的函数def generate_response( system_prompt, user_prompt, model, temperature ): # 大模型的响应 response = OpenAI_client.chat.completions.create( model=model, messages=[ # 设置系统信息,通常用于设置模型的行为、角色或上下文。 {"role": "system", "content": system_prompt}, # 设置用户消息,用户消息是用户发送给模型的消息。 {"role": "user", "content": user_prompt}, ], # 设置温度 temperature=temperature, stream=True ) # 遍历响应中的每个块 for chunk in response: # 检查块中是否包含选择项 if chunk.choices: # 打印选择项中的第一个选项的增量内容,并确保立即刷新输出 print(chunk.choices[0].delta.content, end="", flush=True)
复制代码
响应函数接收的参数中,system_prompt 是系统提示词,主要用于设置模型的行为、角色或上下文。你可以理解为这是系统给大模型的提示词,而且始终有效。我们可以使用下面的提示词规范大模型的响应:
system_prompt = "你是鲁迅作品研究者,熟悉鲁迅的各种作品。"
复制代码
user_prompt 是用户提示词,是用户发给大模型的。大模型会在系统提示词和用户提示词的共同作用下,生成响应。用户提示词由查询句子 query 和向量数据库搜索到的句子组成。对于 fixed_chunk 和 semantic_chunk,我们需要获取 chunk 字段的值。对于 sentence_window,我们需要获取 window 字段的值。定义下面的函数可以帮助我们方便获取想要的值。
def get_ref_info (query, search_params, limit, output_fields, method): res = vector_search (query, search_params, limit, output_fields, method) for hit in res[0]: ref_info = { "ref": hit["entity"]["window"] if method == "sentence_window" else hit["entity"]["chunk"], "title": hit["entity"]["title"] } return ref_info
复制代码
最后,针对不同的分块方法,获取对应的响应。
for method in chunk_methods: print(f"分块方法: {method}") # 获取参考信息 ref_info = get_ref_info(query, search_params, limit, output_fields, method) # 生成用户提示词 user_prompt = ( f"请你根据提供的参考信息,查找是否有与问题语义相似的内容。参考信息:{ref_info}。问题:{query}。\n" f"如果找到了相似的内容,请回复“鲁迅的确说过类似的话,原文是[原文内容],这句话来自[文章标题]”。\n" f"[原文内容]是参考信息中ref字段的值,[文章标题]是参考信息中title字段的值。如果引用它们,请引用完整的内容。\n" f"如果参考信息没有提供和问题相关的内容,请回答“据我所知,鲁迅并没有说过类似的话。”") # 生成响应 generate_response(system_prompt, user_prompt, model, temperature) print("\n" + "*" * 50 + "\n")
复制代码
好啦,一切准备就绪,让我们看看使用不同分块方法的 RAG,究竟有什么区别。先看第一句话,“世上本没有路,走的人多了,也便成了路。”
分块方法: fixed_chunk鲁迅的确说过类似的话,原文是“的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。**************************************************
分块方法: semantic_chunk鲁迅的确说过类似的话,原文是“跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,”这句话来自《六十六生命的路》。**************************************************
分块方法: sentence_window鲁迅的确说过类似的话,原文是“我想:希望本是无所谓有,无所谓无的。 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。
复制代码
fixed_chunk 选手虽然给出了原文,但是遗憾的是不够完整。semantic_chunk 选手没有搜索到原文,但是给出的句子语义也和原文类似,算是意外收获。而 sentence_window 选手,则给出了标准答案。
再来看看第二句,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
分块方法: fixed_chunk鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知”,这句话来自《秋夜》。**************************************************
分块方法: semantic_chunk鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,”,这句话来自《秋夜》。**************************************************
分块方法: sentence_window鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。 这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。”,这句话来自《秋夜》。**************************************************
复制代码
三位选手表现差不多,sentence_window 选手给出的原文更精准。
最后来看看第三句,“猛兽总是独行,牛羊才成群结对。”
分块方法: fixed_chunk鲁迅的确说过类似的话,原文是“兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。**************************************************
分块方法: semantic_chunk据我所知,鲁迅并没有说过类似的话。**************************************************
分块方法: sentence_window鲁迅的确说过类似的话,原文是“猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。**************************************************
复制代码
fixed_chunk 选手虽然搜索结果包含了无关内容,但是大模型从中筛选出了合适的句子。semantic_chunk 选手搜索到的句子并没有被大模型采纳。sentence_window 选手仍然不负众望,给出了标准答案。请为 sentence_window 选手的精彩表现鼓掌。
探索
其实,RAG 的响应和很多因素相关,你可以多多尝试,看看结果有什么不同。比如,修改 vector_search 函数的 limit 参数,让向量数据库多返回几个句子,增加命中概率。或者增加 generate_response 函数的 temperature 参数,看看 RAG 的响应如何变化。还有提示词,它直接影响大模型如何回答。
另外,你还可以基于本应用,开发其他功能,比如鲁迅作品智能问答功能,解答关于鲁迅作品的问题。或者鲁迅作品推荐功能,输入你想要阅读的作品类型,让 RAG 为你做推荐。玩法多多,祝你玩得开心。
藏宝图
老规矩,推荐一些资料供你参考。
ChunkViz 是一个在线网站,提供分块可视化功能。
想了解 RAG 更多有趣应用,可以看看这个视频:当我开发出史料检索RAG应用,正史怪又该如何应对?。想了解更多技术细节,看这里: 揭秘「 B 站最火的 RAG 应用」是如何炼成的。
想了解更多分块技术,可以阅读检索增强生成(RAG)的分块策略指南和从固定大小到NLP分块 - 文本分块技术的深入研究两篇文章。
参考
[^1]: 鲁迅作品集数据基于 luxun_dataset ,增加了一些字段。luxun_sample.json 为鲁迅部分作品,方便试用。luxun.json 为完整的鲁迅作品集。
[^2]: LlamaIndex 是一个用于构建带有上下文增强功能的生成式 AI 应用的框架,支持大型语言模型(LLMs)。
代码文件
链接: https://pan.baidu.com/s/16nSrOh7jM0c6naB0JgsUhQ?pwd=1234 提取码: 1234
作者介绍
Zilliz 黄金写手:江浩
------------------------------------------------------------------------------------------------------------
Zilliz 是向量数据库系统的开拓者和全球领先者,研发面向企业级 AI 应用的向量数据库系统。作为全球最受欢迎的开源向量数据库 Milvus 的创造者,Zilliz 提供面向 AI 应用的新一代数据库技术,帮助企业便捷开发 AI 应用。当前,Zilliz 旗下向量数据库产品,以开源的 Milvus 和 商业化的 Zilliz Cloud 为代表,目前在全球范围内拥有上万企业级用户。其中,Milvus 能够处理数百万乃至数十亿级的向量数据,是最受欢迎的开源向量数据数据库之一;而 Zilliz Cloud 能为用户提供百亿级向量数据毫秒级检索能力,以及开箱即用的向量数据库服务。
免费试用 Zilliz Cloud: http://zilliz.com.cn/cloud
评论