鲁迅说,“世上本没有路,走的人多了,也便成了路。”。
鲁迅说,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
鲁迅还说,“猛兽总是独行,牛羊才成群结对。”
网络上流传着鲁迅说过的各种名言,我们不禁怀疑,鲁迅到底说没说?原文是什么样的,出处又是哪里?想回答这个问题,最好的办法就是搜索原文。但是,使用传统搜索方式,错了一个字可能就搜索不到,不如试试语义搜索吧。
我们可以把鲁迅作品集向量化,储存到向量数据库中。然后搜索某条据说是鲁迅说过的话,最后通过大模型组织语言输出回答,告诉我们鲁迅有没有说过这句话。如果有,再让它附上原文和出处。这个过程,就是 RAG(Retrieval-Augmented Generation,检索增强生成)。
而对于较长的本文,直接向量化会导致信息缺失,需要把文本分割成多个块,分别向量化。打个比方,如果一篇文章是一张图片,组成文章的块就是图片的像素点。文章分割成的块越多,意味着图片的像素点越多,分辨率越高,图片也就越清晰。我会介绍三种常见的分块方法,并且比较基于它们的向量搜索和 RAG 响应有什么区别。
图片来源:Photo by Master Unknown on Unsplash
字数太多怎么向量化
在 [[01-怎么把大白话“变成”古诗词]] 这篇文章中,我详细介绍了使用 Milvus 创建向量数据库的整个过程,相关内容我就不再赘述了,直接给出代码。
Milvus 版本:>=2.4.0
安装依赖。
!pip install "pymilvus[model]" torch
复制代码
定义函数 vectorize_query 把文本向量化的函数。
import torch
import json
from pymilvus import DataType, MilvusClient
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
# 将输入的文本向量化
def vectorize_query(query , model_name = "BAAI/bge-small-zh-v1.5"):
# 检查是否有可用的CUDA设备
device = "cuda:0" if torch.cuda.is_available() else "cpu"
# 根据设备选择是否使用fp16
use_fp16 = device.startswith("cuda")
# 创建嵌入模型实例
bge_m3_ef = BGEM3EmbeddingFunction(
model_name=model_name,
device=device,
use_fp16=use_fp16
)
# 把输入的文本向量化
vectors = bge_m3_ef.encode_documents(query)
return vectors
复制代码
下一步就是把鲁迅作品集向量化了。但是且慢,让我们先看一下鲁迅作品集[^1]的文本格式:
[
{
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"content": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...
},
{
"book": "伪自由书",
"title": "王道诗话",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"content": "《人权论》是从鹦鹉开头的,据说古时候有一只高飞远走的鹦哥儿...
},
...
]
复制代码
文本中的“content”字段的值,就是一篇文章。有的文章字数多达几万字,用几百维的向量根本无法表达文章的语义细节。怎么办?就像前面说的,既然全文字数太多,我们就把文章切成几块,对每个块再做向量化。这个操作叫做“分块”。
根据固定字数分块
最简单的分块方法是 fixed_chunk
(固定分块),是按照字数分块,比如每隔 150 个字就分割一次。比如,对于《最艺术的国家》这篇文章使用 fixed_chunk
,再通过 ChunkViz 把分块结果可视化,如下图所示:
我们用代码来实现 fixed_chunk
。
import json
# fixed_chunk
def fixed_chunk(
input_file_path,
output_file_path,
chunk_size,
field_name
):
with open(input_file_path, 'r', encoding='utf-8') as file:
data_list = json.load(file)
chunk_data_list = []
for data in data_list:
# 获取指定字段的值
text = data[field_name]
# 对指定字段分割
chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
for idx, chunk in enumerate(chunks):
chunk_data_list.append({
# 使用原始文章的 id 生成chunk的id
"id": f'{data["book"]}#{data["title"]}#chunk{idx}',
"book" : data["book"],
"title" : data["title"],
"author" : data["author"],
"type" : data["type"],
"source" : data["source"],
"date" : data["date"],
"chunk" : chunk,
# window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处
"window": "",
"method": "fixed_chunk"
})
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_fixed_chunk.json"
chunk_size = 150
field_name = "content"
fixed_chunk(input_file_path, output_file_path, chunk_size, field_name)
复制代码
运行代码,得到 luxun_sample_fixed_chunk.json
文件,格式和上文中的可视化结果一致。
[
{
"id": "伪自由书 #最艺术的国家 #chunk0 ",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...",
"window": "",
"method": "fixed_chunk"
},
{
"id": "伪自由书 #最艺术的国家 #chunk1 ",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "民国。然而这民国年久失修...",
"window": "",
"method": "fixed_chunk"
},
...
]
复制代码
你可能已经发现了,fixed_chunk
经常在句子中间分割,导致句子不连贯,语义的完整性被破坏。
根据标点符号分割
怎么解决这个问题呢?我们可以在标点符号处分割。但是这还不够,因为这样分割的话,块与块之间仍然是相互独立的了,缺少关联。打个比方,如果看《生活大爆炸》这样的单元剧,我们跳着看也没关系,不影响理解剧情。但是如果看《天龙八部》这样的连续剧,上一集讲的还是段誉为救钟灵去万劫谷拿解药,下一集他就瞬移到了少室山,用六脉神剑大战慕容复。我们会一头雾水,这中间到底发生了什么?
所以,连续剧的开头有“前情提要”,结尾有“下集预告”。同样,为了保证块与块之间语义的连贯,我们也要设计一个“重叠”部分,让下一个块的开头部分,重复上一个块的结尾部分。
听起来很复杂?不用担心,我们可以使用 LlamaIndex[^2] 库轻松实现这种分块方法—— semantic_chunk
。
安装 LlamaIndex 库。
pip install llama_index==0.11.16
复制代码
定义 semantic_chunk
分块函数。
# 导入SentenceSplitter用来分块
from llama_index.core.node_parser import SentenceSplitter
# 定义semantic_chunk分块函数
def semantic_chunk(
input_file_path,
output_file_path,
# 块的大小
chunk_size,
# 重叠部分的大小。小于 chunk_overlap 的句子,就会作为重叠部分
chunk_overlap,
# 指定分块的字段
field_name,
) :
# 初始化 SentenceSplitter,设置分块的参数
text_splitter = SentenceSplitter(
# 指定段落分隔符
paragraph_separator="\n\n\n",
# 指定主要分隔符
separator="。",
# 指定次要分隔符
secondary_chunking_regex="[^,.;、。:]+[,.;、。:]?",
# 指定块的大小
chunk_size=chunk_size,
# 指定重叠部分的大小
chunk_overlap=chunk_overlap,
)
with open(input_file_path, 'r', encoding='utf-8') as file:
data_list = json.load(file)
chunk_data_list = []
for data in data_list:
text = data[field_name]
chunks = text_splitter.split_text(text)
for idx, chunk in enumerate(chunks):
chunk_data_list.append({
# 使用原始文章的 id 生成chunk的id
"id": f'{data["book"]}#{data["title"]}#chunk{idx}',
"book" : data["book"],
"title" : data["title"],
"author" : data["author"],
"type" : data["type"],
"source" : data["source"],
"date" : data["date"],
"chunk" : chunk,
# window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处
"window": "",
"method": "semantic_chunk"
})
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
# 示例:
input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_semantic_chunk.json"
chunk_size = 150
chunk_overlap = 20
field_name = "content"
semantic_chunk(
input_file_path,
output_file_path,
chunk_size,
chunk_overlap,
field_name
)
复制代码
执行上面的代码,得到 luxun_sample_semantic_chunk.json
文件,我们来看一下分块的结果:
[
{
"id": "伪自由书#最艺术的国家#chunk0",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...中国的固有文化是科举制度,外加捐班之类。",
"window": "",
"method": "semantic_chunk"
},
{
"id": "伪自由书#最艺术的国家#chunk1",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "外加捐班之类。当初说这太不像民权...这对于民族是不忠,对于祖宗是不孝,",
"window": "",
"method": "semantic_chunk"
},
...
]
复制代码
果然是在我们设置的标点符号处分块的,而且附带重叠部分,这样就能保证块与块之间语义的连贯了。
根据句子分块
对于上面的分块结果,你可能还不满意。虽然它根据标点符号分割,但是并不一定在句号处分割,无法保证句子的完整性。比如,对于这句话 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”
。 可能分割成 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵
和 是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”
两个块。
为了解决这个问题,又诞生了一种分块方法,它根据句子而不是字数分割,也就是说,根据“。”、“!”和“?”这三个表示句子结束的标点符号分割,而不会受到字数的限制。但是,这种分割方式怎么实现重叠的功能呢?这也简单,把整个句子作为重叠部分就行了,叫做“窗口句子”。这种分块方法叫做 sentence_window
。
比如,对于句子 ABCD
,设置窗口大小为 1,表示原始句子的左右各 1 个句子为“窗口句子”。分块如下:
第一个句子:A。窗口句子:B。因为第一个句子的左边没有句子。
第二个句子:B。窗口句子:A 和 C。
第三个句子:C。窗口句子:B 和 D。
第四个句子:D。窗口句子:C。因为最后一个句子的右边没有句子。
前面两种分块方法,都是对 chunk 字段向量化。而这种分块方法,除了对 chunk 字段(也就是原始句子)向量化外,还会把窗口句子作为原始句子的上下文,以元数据的形式储存在文件中。
原始句子用来做向量搜索,而在生成回答时,窗口句子和原始句子会一起传递给大模型。这样做的好处是,只向量化原始句子,节省了储存空间。提供窗口句子作为原始句子的上下文,可以帮助大模型理解原始句子的语境。
理解原理了,我们用代码来实现吧。
导入依赖。
import re
import json
from typing import List
from llama_index.core import Document
from llama_index.core.node_parser import SentenceWindowNodeParser
复制代码
定义函数 split_text_into_sentences
,用来分割中英文句子。
def split_text_into_sentences(text):
# 使用正则表达式识别中英文句子结束符
sentence_endings = re.compile(r'(?<=[。!?.!?])')
sentences = sentence_endings.split(text)
return [s.strip() for s in sentences if s.strip()]
复制代码
定义函数 sentence_window
,基于句子对文本分块。
# 定义sentence_window分块函数
def sentence_window(
input_file_path,
output_file_path,
field_name,
window_size
):
# 设置一个用于文本解析的节点解析器
node_parser = SentenceWindowNodeParser.from_defaults(
window_size=window_size,
# 为窗口元数据指定一个键名为"window",用于在解析过程中存储窗口数据
window_metadata_key="window",
# 为原始文本元数据指定一个键名为"original_text",用于在解析过程中存储原始文本
original_text_metadata_key="original_text",
sentence_splitter = split_text_into_sentences
)
with open(input_file_path, 'r', encoding='utf-8') as file:
data_list = json.load(file)
chunk_data_list = []
for data in data_list:
text = data[field_name]
# 将分割后的句子处理成节点。节点包含多个句子,类似于块
document = Document(text=text)
nodes = node_parser.get_nodes_from_documents([document])
for idx, node in enumerate(nodes):
chunk = node.metadata["original_text"]
window = node.metadata["window"]
chunk_data_list.append({
"id": f'{data["book"]}#{data["title"]}#chunk{idx}',
"book": data["book"],
"title": data["title"],
"author": data["author"],
"type": data["type"],
"source": data["source"],
"date": data["date"],
"chunk": chunk,
"window": window,
"method": "sentence_window"
})
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)
# 执行句子分块
input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_sentence_window.json"
field_name = "content"
window_size = 1
sentence_window(
input_file_path,
output_file_path,
field_name,
window_size
)
复制代码
让我们来看下分块的结果,字段“chunk”是原始句子,“window”里面包含了原始句子和窗口句子。
[
{
"id": "伪自由书#最艺术的国家#chunk0",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。",
"window": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。",
"method": "sentence_window"
},
{
"id": "伪自由书#最艺术的国家#chunk1",
"book": "伪自由书",
"title": "最艺术的国家",
"author": "鲁迅",
"type": "",
"source": "",
"date": "",
"chunk": "这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。",
"window": "我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。 表面上是中性,骨子里当然还是男的。",
"method": "sentence_window"
},
...
]
复制代码
创建向量数据库
文本分块完成,接下来就是文本向量化,导入向量数据库了,这部分你应该比较熟悉了,我直接给出代码。
定义函数 vectorize_file,向量化 json 文件中指定的字段。
def vectorize_file(input_file_path, output_file_path, field_name):
# 读取 json 文件,把chunk字段的值向量化
with open(input_file_path, 'r', encoding='utf-8') as file:
data_list = json.load(file)
# 提取该json文件中的所有chunk字段的值
query = [data[field_name] for data in data_list]
# 向量化文本数据
vectors = vectorize_query(query)
# 将向量添加到原始文本中
for data, vector in zip(data_list, vectors['dense']):
# 将 NumPy 数组转换为 Python 的普通列表
data['vector'] = vector.tolist()
# 将更新后的文本内容写入新的json文件
with open(output_file_path, 'w', encoding='utf-8') as outfile:
json.dump(data_list, outfile, ensure_ascii=False, indent=4)
复制代码
为了比较 RAG 使用不同分块方法的效果,我们把三个分块文件全部向量化。
# 向量化固定分块的文件
vectorize_file("luxun_sample_fixed_chunk.json", "luxun_sample_fixed_chunk_vector.json", "chunk")
# 向量化通过标点符号分块的文件
vectorize_file("luxun_sample_semantic_chunk.json", "luxun_sample_semantic_chunk_vector.json", "chunk")
# 向量化通过句子分块的文件
vectorize_file("luxun_sample_sentence_window.json","luxun_sample_sentence_window_vector.json", "chunk")
复制代码
接下来创建集合。为了能够在同一个集合中区分三种分块方法的搜索结果,我们设置参数 partition_key_field
的值为 method
,它表示采用的分块方法。Milvus 会根据 method
字段的值,把数据插入到对应的分区中。打个比方,如果把集合看作一个 excel 文件,partition (分区)就是表格的工作表(Worksheet)。一个 excel 文件包含多张工作表,不同的数据填写在对应的工作表中。相应的,我们把不同的数据插入到对应分区中,搜索时指定分区,就可以提高搜索效率。
# 创建集合
from pymilvus import MilvusClient, DataType
import time
def create_collection(collection_name):
# 检查同名集合是否存在,如果存在则删除
if milvus_client.has_collection(collection_name):
print(f"集合 {collection_name} 已经存在")
try:
# 删除同名集合
milvus_client.drop_collection(collection_name)
print(f"删除集合:{collection_name}")
except Exception as e:
print(f"删除集合时出现错误: {e}")
# 创建集合模式
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
# 设置partition key
partition_key_field = "method",
# 设置分区数量,默认为16
num_partitions=16,
description=""
)
# 添加字段到schema
schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=256)
schema.add_field(field_name="book", datatype=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="author", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="type", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="date", datatype=DataType.VARCHAR, max_length=32)
schema.add_field(field_name="chunk", datatype=DataType.VARCHAR, max_length=2048)
schema.add_field(field_name="window", datatype=DataType.VARCHAR, default_value="", max_length=6144)
schema.add_field(field_name="method", datatype=DataType.VARCHAR, max_length=32)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=512)
# 创建集合
try:
milvus_client.create_collection(
collection_name=collection_name,
schema=schema,
shards_num=2
)
print(f"创建集合:{collection_name}")
except Exception as e:
print(f"创建集合的过程中出现了错误: {e}")
# 等待集合创建成功
while not milvus_client.has_collection(collection_name):
# 获取集合的详细信息
time.sleep(1)
if milvus_client.has_collection(collection_name):
print(f"集合 {collection_name} 创建成功")
collection_info = milvus_client.describe_collection(collection_name)
print(f"集合信息: {collection_info}")
collection_name = "LuXunWorks"
uri="http://localhost:19530"
milvus_client = MilvusClient(uri=uri)
create_collection(collection_name)
复制代码
把数据插入到向量数据库。
from tqdm import tqdm
# 数据入库
def insert_vectors(file_path, collection_name, batch_size):
# 读取和处理文件
with open(file_path, 'r') as file:
data = json.load(file)
# 将数据插入集合
print(f"正在将数据插入集合:{collection_name}")
total_count = len(data)
# pbar 是 tqdm 库中的一个进度条对象,用于显示插入数据的进度
with tqdm(total=total_count, desc="插入数据") as pbar:
# 每次插入 batch_size 条数据
for i in range(0, total_count, batch_size):
batch_data = data[i:i + batch_size]
res = milvus_client.insert(
collection_name=collection_name,
data=batch_data
)
pbar.update(len(batch_data))
# 验证数据是否成功插入集合
print(f"插入的实体数量: {total_count}")
# 设置每次插入的数据量
batch_size = 100
insert_vectors("luxun_sample_fixed_chunk_vector.json", collection_name, batch_size)
insert_vectors("luxun_sample_semantic_chunk_vector.json", collection_name, batch_size)
insert_vectors("luxun_sample_sentence_window_vector.json", collection_name, batch_size)
复制代码
创建索引。我们使用倒排索引,首先创建索引参数。
index_params = milvus_client.prepare_index_params()
index_params.add_index(
# 指定索引名称
index_name="IVF_FLAT",
# 指定创建索引的字段
field_name="vector",
# 设置索引类型
index_type="IVF_FLAT",
# 设置度量方式
metric_type="IP",
# 设置索引聚类中心的数量
params={"nlist": 128}
)
复制代码
接下来创建索引。
milvus_client.create_index(
# 指定为创建索引的集合
collection_name=collection_name,
# 使用前面创建的索引参数创建索引
index_params=index_params
)
复制代码
验证下索引是否成功创建。查看集合的所有索引。
res = milvus_client.list_indexes(
collection_name=collection_name
)
print(res)
复制代码
返回我们创建的索引 ['IVF_FLAT']
。再查看下索引的详细信息。
res = milvus_client.describe_index(
collection_name=collection_name,
index_name="IVF_FLAT"
)
print(res)
复制代码
返回下面的索引信息,表示索引创建成功:
{'nlist': '128', 'index_type': 'IVF_FLAT', 'metric_type': 'IP', 'field_name': 'vector', 'index_name': 'IVF_FLAT', 'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
复制代码
接下来加载集合到内存。
print (f"正在加载集合:{collection_name}")
milvus_client.load_collection (collection_name=collection_name)
复制代码
验证下加载状态。
print (milvus_client.get_load_state (collection_name=collection_name))
复制代码
如果返回 {'state': <LoadState: Loaded>}
,说明加载完成。接下来,我们定义搜索函数。
先定义搜索参数。
search_params = {
# 度量类型
"metric_type": "IP",
# 搜索过程中要查询的聚类单元数量。增加nprobe值可以提高搜索精度,但会降低搜索速度
"params": {"nprobe": 16}
}
复制代码
再定义搜索函数。还记得前面我们在创建集合时,设置的 partition_key_field
吗?它会根据 method
字段的值,把数据插入到相应的分区中。而搜索函数中的 filter
参数,就是用来指定在哪个分区中搜索的。
def vector_search(
query,
search_params,
limit,
output_fields,
partition_name
):
# 将查询转换为向量
query_vectors = [vectorize_query(query)['dense'][0].tolist()]
# 向量搜索
res = milvus_client.search(
collection_name=collection_name,
# 指定查询向量
data=query_vectors,
# 指定搜索的字段
anns_field="vector",
# 设置搜索参数
search_params=search_params,
# 设置搜索结果的数量
limit=limit,
# 设置输出字段
output_fields=output_fields,
# 在指定分区中搜索
filter=f"method =='{partition_name}'"
)
return res
复制代码
再定义一个打印搜索结果的函数,方便查看。
# 打印向量搜索结果
def print_vector_results(res):
# hit是搜索结果中的每一个匹配的实体
res = [hit["entity"] for hit in res[0]]
for item in res:
print(f"title: {item['title']}")
print(f"chunk: {item['chunk']}")
print(f"method: {item['method']}")
print("-"*50)
复制代码
下面我们就来看一看,fixed_chunk
、semantic_chunk
和 sentence_window
三位选手在向量搜索上表现如何。首先搜索第一个句子:“世上本没有路,走的人多了,也便成了路。”。
# 比较不同分块方法产生的搜索结果
query1 = ["世上本没有路,走的人多了,也便成了路。"]
limit = 1
output_fields = ["title", "chunk", "window", "method"]
# 定义分块方法列表
chunk_methods = ["fixed_chunk", "semantic_chunk", "sentence_window"]
# 定义一个函数来执行搜索并打印结果
def compare_chunk_methods(query, search_params, limit, output_fields, methods):
for method in methods:
res = vector_search(query, search_params, limit, output_fields, method)
print(f"{method} 的搜索结果是:\n")
print_vector_results(res)
print("*" * 50)
# 调用函数进行比较
compare_chunk_methods(query1, search_params, limit, output_fields, chunk_methods)
复制代码
搜索结果如下:
fixed_chunk 的搜索结果是:
title: 故乡
chunk: 的人多了,也便成了路。 一九二一年一月。
method: fixed_chunk
--------------------------------------------------
semantic_chunk 的搜索结果是:
title: 六十六生命的路
chunk: 跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,
method: semantic_chunk
--------------------------------------------------
sentence_window 的搜索结果是:
title: 故乡
chunk: 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。
method: sentence_window
--------------------------------------------------
复制代码
fixed_chunk
选手的确搜索到了原文,但是并不完整。这也是 fixed_chunk
分块的典型问题。
semantic_chunk
选手的表现让人失望,它并没有搜索到原文。但是它给了我们意外收获,搜索结果的意思和原文有些类似。也是向量数据库语义搜索功能的体现。
原文其实在这个块中:
我的愿望茫远罢了。我在朦胧中,眼前展开一片海边碧绿的沙地来,上面深蓝的天空中挂着一轮金黄的圆月。我想:希望本是无所谓有,无所谓无的。这正如地上的路;其实地上本没有路,走的人多了,也便成了路。 一九二一年一月。"
semantic_chunk
选手没有搜索到它,可能是因为这个块的前半部分和查询句子的语义相差较远。这也反应了分块对搜索结果的影响。
最后出场的 sentence_window
选手,给出了标准答案:
这正如地上的路;其实地上本没有路,走的人多了,也便成了路。
恭喜 sentence_window
选手完美找到了原文。因为它基于句子分割,能够更好地保存句子的语义。当然,这样做也是有代价的。你可以比较下这三种分块方法向量化后的文件,luxun_sample_fixed_chunk_vector.json
的大小是 11.5MPa,luxun_sample_semantic_chunk_vector.json
增加到了 16.1MPa,而 luxun_sample_sentence_window_vector.json
则达到了 49.2MPa,是前两者的 3 到 4 倍。
我们再来看看第二个句子,三位选手的表现如何:
“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
fixed_chunk
选手给出的句子仍然不完整,但是包含了完整的原文:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知
semantic_chunk
选手这次正常发挥,也找到了原文:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,
sentence_window
选手依旧给出了完美答案:
在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。
虽然三位选手都找到了原文,但是 sentence_window
选手返回的原文不但完整,而且没有包含无关内容,减少了干扰信息。
再来看看最后一个句子:
“猛兽总是独行,牛羊才成群结对。”
fixed_chunk
选手找到了类似的句子,但是包含了较多的无关内容:
兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。其次要防说话。人能说话,已经是祸胎了,而况有时还要做文章。所以苍颉造字,夜有鬼哭。鬼且反对,而况于官?猴子不会说话
semantic_chunk
找到的则是另外一句完全不相关的句子:
和一些狐群狗党趁势来开除她私意所不喜的学生们么?而几个在“男尊女卑”的社会生长的男人们,此时却在异性的饭碗化身的面前摇尾,简直并羊而不如,羊,诚然的弱的,但还不至于如此,我敢给我所敬爱的羊们保证!但是,在黄金世界还未到来之前,
我们最后看看 sentence_window
选手的表现:
猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。
别忘了 sentence_window
选手除了搜索到的原始句子,还能提供“窗口句子”作为上下文:
# 查看`sentence_window`方法的窗口句子
method = "sentence_window"
res_sentence_window = vector_search(query3, search_params, limit, output_fields, method)
res_sentence_window = [hit["entity"] for hit in res_sentence_window[0]]
for item in res_sentence_window:
print(f"window: {item['window']}")
复制代码
窗口句子如下:
window: 然亦可见至道嘉猷,人同此心,心同此理,固无华夷之限也。 猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。 人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。
复制代码
在 RAG 应用中,把上下文句子一起传递给大模型,能让大模型更好地理解句子的语义,作出更好的回答。
调用大模型的 API
创建向量数据库这部分想必你已经轻车熟路了,下面我们来完成 RAG 应用的最后一个部分:生成。我们要把搜索到的句子传递给大模型,让它根据提示词重新组装成回答。
首先,我们要创建一个大模型的 api key,用来调用大模型。我使用的是 deepseek。为了保护 api key 的安全,把 api key 设置为环境变量“DEEPSEEK_API_KEY”。请把 <you_api_key>
替换成你自己的 api key。
import os
os.environ['DEEPSEEK_API_KEY'] = <you_api_key>
复制代码
然后,再从环境变量中读取 api key。
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
复制代码
deepseek 使用与 OpenAI 兼容的 API 格式,我们可以使用 OpenAI SDK 来访问 DeepSeek API。
# 安装 openai 库
pip install openai
复制代码
接下来创建 openai 客户端实例。
# 导入openai库
from openai import OpenAI
# 导入os库
import os
# 创建openai客户端实例
OpenAI_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com")
复制代码
根据 deepseek api 文档的说明,定义生成响应的函数 generate_response
。model 是我们使用的大模型,这里是 deepseek-chat
。temperature
决定大模型回答的随机性,数值在 0-2 之间,数值越高,生成的文本越随机;值越低,生成的文本越确定。
# 定义生成响应的函数
def generate_response(
system_prompt,
user_prompt,
model,
temperature
):
# 大模型的响应
response = OpenAI_client.chat.completions.create(
model=model,
messages=[
# 设置系统信息,通常用于设置模型的行为、角色或上下文。
{"role": "system", "content": system_prompt},
# 设置用户消息,用户消息是用户发送给模型的消息。
{"role": "user", "content": user_prompt},
],
# 设置温度
temperature=temperature,
stream=True
)
# 遍历响应中的每个块
for chunk in response:
# 检查块中是否包含选择项
if chunk.choices:
# 打印选择项中的第一个选项的增量内容,并确保立即刷新输出
print(chunk.choices[0].delta.content, end="", flush=True)
复制代码
响应函数接收的参数中,system_prompt
是系统提示词,主要用于设置模型的行为、角色或上下文。你可以理解为这是系统给大模型的提示词,而且始终有效。我们可以使用下面的提示词规范大模型的响应:
system_prompt = "你是鲁迅作品研究者,熟悉鲁迅的各种作品。"
复制代码
user_prompt
是用户提示词,是用户发给大模型的。大模型会在系统提示词和用户提示词的共同作用下,生成响应。用户提示词由查询句子 query
和向量数据库搜索到的句子组成。对于 fixed_chunk
和 semantic_chunk
,我们需要获取 chunk
字段的值。对于 sentence_window
,我们需要获取 window
字段的值。定义下面的函数可以帮助我们方便获取想要的值。
def get_ref_info (query, search_params, limit, output_fields, method):
res = vector_search (query, search_params, limit, output_fields, method)
for hit in res[0]:
ref_info = {
"ref": hit["entity"]["window"] if method == "sentence_window" else hit["entity"]["chunk"],
"title": hit["entity"]["title"]
}
return ref_info
复制代码
最后,针对不同的分块方法,获取对应的响应。
for method in chunk_methods:
print(f"分块方法: {method}")
# 获取参考信息
ref_info = get_ref_info(query, search_params, limit, output_fields, method)
# 生成用户提示词
user_prompt = (
f"请你根据提供的参考信息,查找是否有与问题语义相似的内容。参考信息:{ref_info}。问题:{query}。\n"
f"如果找到了相似的内容,请回复“鲁迅的确说过类似的话,原文是[原文内容],这句话来自[文章标题]”。\n"
f"[原文内容]是参考信息中ref字段的值,[文章标题]是参考信息中title字段的值。如果引用它们,请引用完整的内容。\n"
f"如果参考信息没有提供和问题相关的内容,请回答“据我所知,鲁迅并没有说过类似的话。”"
)
# 生成响应
generate_response(system_prompt, user_prompt, model, temperature)
print("\n" + "*" * 50 + "\n")
复制代码
好啦,一切准备就绪,让我们看看使用不同分块方法的 RAG,究竟有什么区别。先看第一句话,“世上本没有路,走的人多了,也便成了路。”
分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。
**************************************************
分块方法: semantic_chunk
鲁迅的确说过类似的话,原文是“跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,”这句话来自《六十六生命的路》。
**************************************************
分块方法: sentence_window
鲁迅的确说过类似的话,原文是“我想:希望本是无所谓有,无所谓无的。 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。
复制代码
fixed_chunk
选手虽然给出了原文,但是遗憾的是不够完整。semantic_chunk
选手没有搜索到原文,但是给出的句子语义也和原文类似,算是意外收获。而 sentence_window
选手,则给出了标准答案。
再来看看第二句,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”
分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知”,这句话来自《秋夜》。
**************************************************
分块方法: semantic_chunk
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,”,这句话来自《秋夜》。
**************************************************
分块方法: sentence_window
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。 这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。”,这句话来自《秋夜》。
**************************************************
复制代码
三位选手表现差不多,sentence_window
选手给出的原文更精准。
最后来看看第三句,“猛兽总是独行,牛羊才成群结对。”
分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。
**************************************************
分块方法: semantic_chunk
据我所知,鲁迅并没有说过类似的话。
**************************************************
分块方法: sentence_window
鲁迅的确说过类似的话,原文是“猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。
**************************************************
复制代码
fixed_chunk
选手虽然搜索结果包含了无关内容,但是大模型从中筛选出了合适的句子。semantic_chunk
选手搜索到的句子并没有被大模型采纳。sentence_window
选手仍然不负众望,给出了标准答案。请为 sentence_window
选手的精彩表现鼓掌。
探索
其实,RAG 的响应和很多因素相关,你可以多多尝试,看看结果有什么不同。比如,修改 vector_search
函数的 limit
参数,让向量数据库多返回几个句子,增加命中概率。或者增加 generate_response
函数的 temperature
参数,看看 RAG 的响应如何变化。还有提示词,它直接影响大模型如何回答。
另外,你还可以基于本应用,开发其他功能,比如鲁迅作品智能问答功能,解答关于鲁迅作品的问题。或者鲁迅作品推荐功能,输入你想要阅读的作品类型,让 RAG 为你做推荐。玩法多多,祝你玩得开心。
藏宝图
老规矩,推荐一些资料供你参考。
ChunkViz 是一个在线网站,提供分块可视化功能。
想了解 RAG 更多有趣应用,可以看看这个视频:当我开发出史料检索RAG应用,正史怪又该如何应对?。想了解更多技术细节,看这里: 揭秘「 B 站最火的 RAG 应用」是如何炼成的。
想了解更多分块技术,可以阅读检索增强生成(RAG)的分块策略指南和从固定大小到NLP分块 - 文本分块技术的深入研究两篇文章。
参考
[^1]: 鲁迅作品集数据基于 luxun_dataset ,增加了一些字段。luxun_sample.json
为鲁迅部分作品,方便试用。luxun.json
为完整的鲁迅作品集。
[^2]: LlamaIndex 是一个用于构建带有上下文增强功能的生成式 AI 应用的框架,支持大型语言模型(LLMs)。
代码文件
链接: https://pan.baidu.com/s/16nSrOh7jM0c6naB0JgsUhQ?pwd=1234 提取码: 1234
作者介绍
Zilliz 黄金写手:江浩
------------------------------------------------------------------------------------------------------------
Zilliz 是向量数据库系统的开拓者和全球领先者,研发面向企业级 AI 应用的向量数据库系统。作为全球最受欢迎的开源向量数据库 Milvus 的创造者,Zilliz 提供面向 AI 应用的新一代数据库技术,帮助企业便捷开发 AI 应用。当前,Zilliz 旗下向量数据库产品,以开源的 Milvus 和 商业化的 Zilliz Cloud 为代表,目前在全球范围内拥有上万企业级用户。其中,Milvus 能够处理数百万乃至数十亿级的向量数据,是最受欢迎的开源向量数据数据库之一;而 Zilliz Cloud 能为用户提供百亿级向量数据毫秒级检索能力,以及开箱即用的向量数据库服务。
免费试用 Zilliz Cloud: http://zilliz.com.cn/cloud
评论