写点什么

从 Milvus 迁移 DashVector

作者:DashVector
  • 2024-09-05
    陕西
  • 本文字数:4776 字

    阅读完需:约 16 分钟

从Milvus迁移DashVector

本文档演示如何从 Milvus 将 Collection 数据全量导出,并适配迁移至 DashVector。方案的主要流程包括:

  1. 首先,升级 Milvus 版本,目前 Milvus 只有在最新版本(v.2.3.x)中支持全量导出

  2. 其次,将 Milvus Collection 的 Schema 信息和数据信息导出到具体的文件中

  3. 最后,以导出的文件作为输入来构建 DashVector Collection 并数据导入

下面,将详细阐述迁移方案的具体操作细节。


1. Milvus 升级 2.3.x 版本

本文中,我们将借助 Milvus 的query_iterator来全量导出数据(query接口无法导出完整数据),由于该接口目前只在 v2.3.x 版本中支持,所以在导出数据前,需要先将 Milvus 版本升级到该版本。Milvus 版本升级的详细操作参考Milvus用户文档

注意:在进行 Milvus Upgrade 时需要注意数据的备份安全问题。

2. Milvus 全量数据导出

数据的导出包含 Schema 以及数据记录,Schema 主要用于完备地定义 Collection,数据记录对应于每个 Partition 下的全量数据,这两部分涵盖了需要导出的全部数据。下文展示如何将单个 Milvus Collection 全量导出。

2.1. Schema 导出

DashVector 和 Milvus 在 Schema 的设计上有一些区别,DashVector 向用户透出的接口非常简单,Milvus 则更加详尽。从 Milvus 迁移 DashVector 时会涉及到部分 Schema 参数的删除(例如 Collection 的 index_param 参数),只会保留 DashVector 构建 Collection 的必要参数,以下为一个 Schema 转换的简单示例(其中,Collection 已有的数据参考Milvus示例代码写入)。

python 示例:

from pymilvus import (    connections,    utility,    Collection,    DataType)import osimport jsonfrom pathlib import Path
fmt = "\n=== {:30} ===\n"
print(fmt.format("start connecting to Milvus"))host = os.environ.get('MILVUS_HOST', "localhost")print(fmt.format(f"Milvus host: {host}"))connections.connect("default", host=host, port="19530")
metrics_map = { 'COSINE': 'cosine', 'L2': 'euclidean', 'IP': 'dotproduct',}
dtype_map = { DataType.BOOL: 'bool', DataType.INT8: 'int', DataType.INT16: 'int', DataType.INT32: 'int', DataType.INT64: 'int',
DataType.FLOAT: 'float', DataType.DOUBLE: 'float',
DataType.STRING: 'str', DataType.VARCHAR: 'str',}
def load_collection(collection_name: str) -> Collection: has = utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") if not has: return None
collection = Collection(collection_name) collection.load() return collection def export_collection_schema(collection, file: str): schema = collection.schema.to_dict() index = collection.indexes[0].to_dict() export_schema = dict() milvus_metric_type = index['index_param']['metric_type'] try: export_schema['metrics'] = metrics_map[milvus_metric_type] except: raise Exception(f"milvus metrics_type{milvus_metric_type} not supported") export_schema['fields_schema'] = {} for field in schema['fields']: if 'is_primary' in field and field['is_primary']: continue if field['name'] == index['field']: # vector if field['type'] == DataType.FLOAT_VECTOR: export_schema['dtype'] = 'float' export_schema['dimension'] = field['params']['dim'] else: raise Exception(f"milvus dtype{field['type']} not supported yet") else: try: # non-vector export_schema['fields_schema'][field['name']] = dtype_map[field['type']] except: raise Exception(f"milvus dtype{field['type']} not supported yet") with open(file, 'w') as file: json.dump(export_schema, file, indent=4) if __name__ == "__main__": collection_name = "YOUR_MILVUS_COLLECTION_NAME" collection = load_collection(collection_name) dump_path_str = collection_name+'.dump' dump_path = Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True) schema_file = dump_path_str + "/schema.json" export_collection_schema(collection, schema_file)
复制代码

JSON 示例:

{    "metrics": "euclidean",    "fields_schema": {        "random": "float",        "var": "str"    },    "dtype": "float",    "dimension": 8}
复制代码

2.2. Data 导出

DashVector 和 Milvus 在设计上都有 Partition 的概念,所以向量以及其他数据进行导出时,需要注意按照 Partition 粒度进行导出。此外,DashVector 的主键类型为 str,而 Milvus 设计其为自定义类型,所以在导出时需要考虑主键类型的转换。以下为一个基于query_iterator接口导出的简单代码示例:

from pymilvus import (    connections,    utility,    Collection,    DataType)import osimport jsonimport numpy as npfrom pathlib import Path
fmt = "\n=== {:30} ===\n"
print(fmt.format("start connecting to Milvus"))host = os.environ.get('MILVUS_HOST', "localhost")print(fmt.format(f"Milvus host: {host}"))connections.connect("default", host=host, port="19530")pk = "pk"vector_field_name = "vector"
def load_collection(collection_name: str) -> Collection: has = utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") if not has: return None
collection = Collection(collection_name) collection.load() return collection def export_partition_data(collection, partition_name, file: str): batch_size = 10 output_fields=["pk", "random", "var", "embeddings"] query_iter = collection.query_iterator( batch_size=batch_size, output_fields = output_fields, partition_names=[partition_name] ) export_file = open(file, 'w') while True: docs = query_iter.next() if len(docs) == 0: # close the iterator query_iter.close() break for doc in docs: new_doc = {} new_doc_fields = {} for k, v in doc.items(): if k == pk: # primary key new_doc['pk'] = str(v) elif k == vector_field_name: new_doc['vector'] = [float(k) for k in v] else: new_doc_fields[k] = v new_doc['fields'] = new_doc_fields json.dump(new_doc, export_file) export_file.write('\n') export_file.close() if __name__ == "__main__": collection_name = "YOUR_MILVUS_COLLECTION_NAME" collection = load_collection(collection_name) pk = collection.schema.primary_field.name vector_field_name = collection.indexes[0].field_name dump_path_str = collection_name+'.dump' dump_path = Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True)
for partition in collection.partitions: partition_name = partition.name if partition_name == '_default': export_path = dump_path_str + '/default.txt' else: export_path = dump_path_str + '/' + partition_name + ".txt" export_partition_data(collection, partition_name, export_path)
复制代码

3. 将数据导入 DashVector

3.1. 创建 Cluster

参考 DashVector 官方用户手册构建 Cluster。

3.2. 创建 Collection

根据 2.1 章节中导出的 Schema 信息以及参考 Dashvector 官方用户手册来创建 Collection。下面的示例代码会根据 2.1 章节中导出的 schema.json 来创建一个 DashVector 的 Collection。

from dashvector import Client, DashVectorException
from pydantic import BaseModelfrom typing import Dict, Typeimport json
dtype_convert = { 'int': int, 'float': float, 'bool': bool, 'str': str}
class Schema(BaseModel): metrics: str dtype: Type dimension: int fields_schema: Dict[str, Type] @classmethod def from_dict(cls, json_data): metrics = json_data['metrics'] dtype = dtype_convert[json_data['dtype']] dimension = json_data['dimension'] fields_schema = {k: dtype_convert[v] for k, v in json_data['fields_schema'].items()} return cls(metrics=metrics, dtype=dtype, dimension=dimension, fields_schema=fields_schema)
def read_schema(schema_path) -> Schema: with open(schema_path) as file: json_data = json.loads(file.read()) return Schema.from_dict(json_data)
if __name__ == "__main__": milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump" milvus_dump_scheme_path = milvus_dump_path + "/schema.json" schema = read_schema(milvus_dump_scheme_path) client = dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collection rsp = client.create(name="YOUR_DASHVECTOR_COLLECTION_NAME", dimension=schema.dimension, metric=schema.metrics, dtype=schema.dtype, fields_schema=schema.fields_schema) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message)
复制代码

3.3. 导入 Data

根据 2.2 章节中导出的数据以及参考 DashVector 官方用户手册来批量插入 Doc。下面的示例代码会依次解析各个 Partition 导出的数据,然后依次创建 DashVector 下的 Partition 并导入数据。

from dashvector import Client, DashVectorException, Doc
from pydantic import BaseModelfrom typing import Dict, Typeimport jsonimport globfrom pathlib import Path
def insert_data(collection, partition_name, partition_file): if partition_name != 'default': rsp = collection.create_partition(partition_name) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message) with open(partition_file) as f: for line in f: if line.strip(): json_data = json.loads(line) rsp = collection.insert( [ Doc(id=json_data['pk'], vector=json_data['vector'], fields=json_data['fields']) ] ) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message)
if __name__ == "__main__": milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump"
client = dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collection collection = client.get("YOUR_DASHVECTOR_COLLECTION_NAME") partition_files = glob.glob(milvus_dump_path+'/*.txt', recursive=False) for partition_file in partition_files: # create partition partition_name = Path(partition_file).stem insert_data(collection, partition_name, partition_file)
复制代码


发布于: 刚刚阅读数: 4
用户头像

DashVector

关注

还未添加个人签名 2024-05-14 加入

向量检索服务DashVector基于通义实验室自研的高效向量引擎Proxima内核,提供具备水平拓展能力的云原生、全托管的向量检索服务。

评论

发布
暂无评论
从Milvus迁移DashVector_数据库_DashVector_InfoQ写作社区