
通过阿里云向量检索 Milvus 版和通义千问快速构建基于专属知识库的问答系统

  • 2024-04-18
  • 本文字数:3821 字

    阅读完需:约 13 分钟


阿里云向量检索 Milvus 版是一款 Serverless 全托管服务,确保了与开源 Milvus 的完全兼容性,并支持无缝迁移。它在开源版本的基础上增强了可扩展性,能提供大规模 AI 向量数据的相似性检索服务。凭借其开箱即用的特性、灵活的扩展能力和全链路监控告警,Milvus 云服务成为多样化 AI 应用场景的理想选择,包括多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等。您还可以利用开源的 Attu 工具进行可视化操作,进一步促进应用的快速开发和部署。

阿里云向量检索 Milvus 版已开启免费公测。您可以在E-MapReduce控制台,选择 EMR Serverless > Milvus,进入 Milvus 页面创建入门版的实例,公测期间您可以免费试用 Milvus 服务。



请确保您的运行环境中已安装 Python 3.8 或以上版本,以便顺利安装并使用 DashScope。


▶ 准备工作

  1. 安装相关的依赖库。

pip3 install pymilvus tqdm dashscope

2. 下载所需的知识库。本文示例使用了公开数据集CEC-Corpus。CEC-Corpus 数据集包含 332 篇针对各类突发事件的新闻报道,语料和标注数据,这里我们只需要提取原始的新闻稿文本,并将其向量化后入库。

git clone https://github.com/shijiebei2009/CEC-Corpus.git

▶ 步骤一:知识库向量化

1. 创建 embedding.py 文件,内容如下所示。

import osimport timefrom tqdm import tqdmimport dashscopefrom dashscope import TextEmbeddingfrom pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

def prepareData(path, batch_size=25): batch_docs = [] for file in os.listdir(path): with open(path + '/' + file, 'r', encoding='utf-8') as f: batch_docs.append(f.read()) if len(batch_docs) == batch_size: yield batch_docs batch_docs = []
if batch_docs: yield batch_docs

def getEmbedding(news): model = TextEmbedding.call( model=TextEmbedding.Models.text_embedding_v1, input=news ) embeddings = [record['embedding'] for record in model.output['embeddings']] return embeddings if isinstance(news, list) else embeddings[0]

if __name__ == '__main__':
current_path = os.path.abspath(os.path.dirname(__file__)) # 当前目录 root_path = os.path.abspath(os.path.join(current_path, '..')) # 上级目录 data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText' # 数据下载git clone https://github.com/shijiebei2009/CEC-Corpus.git
# 配置Dashscope API KEY dashscope.api_key = '<YOUR_DASHSCOPE_API_KEY>'
# 配置Milvus参数 COLLECTION_NAME = 'CEC_Corpus' DIMENSION = 1536 MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com' MILVUS_PORT = '19530' USER = 'root' PASSWORD = '<password>'
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)
# Remove collection if it already exists if utility.has_collection(COLLECTION_NAME): utility.drop_collection(COLLECTION_NAME)
# Create collection which includes the id, title, and embedding. fields = [ FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False), FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096), FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION) ] schema = CollectionSchema(fields=fields, description='CEC Corpus Collection') collection = Collection(name=COLLECTION_NAME, schema=schema)
# Create an index for the collection. index_params = { 'index_type': 'IVF_FLAT', 'metric_type': 'L2', 'params': {'nlist': 1024} } collection.create_index(field_name="embedding", index_params=index_params)
id = 0 for news in tqdm(list(prepareData(data_path))): ids = [id + i for i, _ in enumerate(news)] id += len(news)
vectors = getEmbedding(news) # insert Milvus Collection for id, vector, doc in zip(ids, vectors, news): insert_doc = (doc[:498] + '..') if len(doc) > 500 else doc ins = [[id], [insert_doc], [vector]] # Insert the title id, the text, and the text embedding vector collection.insert(ins) time.sleep(2)


  1. 在 Attu 中您可以看到创建的 Collection,具体操作请参见Attu操作指南

在本文示例中,我们将 Embedding 向量和新闻报道文稿一起存入 Milvus 中,同时构建索引类型采用了 IVF_FLAT,在向量检索时,同时可以召回原始文稿。

▶ 步骤二:向量检索与知识问答

数据写入完成后,即可进行快速的向量检索。在通过提问搜索到相关的知识点后,我们可以按照特定的模板将“提问 + 知识点”作为 prompt 向 LLM 发起提问。在这里我们所使用的 LLM 是通义千问,这是阿里巴巴自主研发的超大规模语言模型,能够在用户自然语言输入的基础上,通过自然语言理解和语义分析,理解用户意图。通过提供尽可能清晰详细的指令(prompt),可以获得更符合预期的结果。这些能力都可以通过通义千问来获得。


创建 answer.py 文件,内容如下所示。

import osimport dashscopefrom dashscope import Generationfrom pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collectionfrom embedding import getEmbedding

def getAnswer(query, context): prompt = f'''请基于```内的报道内容,回答我的问题。 ``` {context} ``` 我的问题是:{query}。 '''
rsp = Generation.call(model='qwen-turbo', prompt=prompt) return rsp.output.text

def search(text): # Search parameters for the index search_params = { "metric_type": "L2" }
results = collection.search( data=[getEmbedding(text)], # Embeded search value anns_field="embedding", # Search across embeddings param=search_params, limit=1, # Limit to five results per search output_fields=['text'] # Include title field in result )
ret = [] for hit in results[0]: ret.append(hit.entity.get('text')) return ret

if __name__ == '__main__':
current_path = os.path.abspath(os.path.dirname(__file__)) # 当前目录 root_path = os.path.abspath(os.path.join(current_path, '..')) # 上级目录 data_path = f'{root_path}/CEC-Corpus/raw corpus/allSourceText'
# 配置Dashscope API KEY dashscope.api_key = '<YOUR_DASHSCOPE_API_KEY>'
# 配置Milvus参数 COLLECTION_NAME = 'CEC_Corpus' DIMENSION = 1536 MILVUS_HOST = 'c-97a7d8038fb8****.milvus.aliyuncs.com' MILVUS_PORT = '19530' USER = 'root' PASSWORD = '<password>'
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, user=USER, password=PASSWORD)
fields = [ FieldSchema(name='id', dtype=DataType.INT64, descrition='Ids', is_primary=True, auto_id=False), FieldSchema(name='text', dtype=DataType.VARCHAR, description='Text', max_length=4096), FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='Embedding vectors', dim=DIMENSION) ] schema = CollectionSchema(fields=fields, description='CEC Corpus Collection') collection = Collection(name=COLLECTION_NAME, schema=schema)
# Load the collection into memory for searching collection.load()
question = '北京中央电视台工地发生大火,发生在哪里?出动了多少辆消防车?人员伤亡情况如何?' context = search(question) answer = getAnswer(question, context) print(answer)



如有疑问,可加入向量检索 Milvus 版用户钉群 59530004993 咨询。


还未添加个人签名 2020-10-15 加入



通过阿里云向量检索 Milvus 版和通义千问快速构建基于专属知识库的问答系统_人工智能_阿里云大数据AI技术_InfoQ写作社区