写点什么

Milvus 入门小册

作者:Quincy
  • 2025-01-13
    湖北
  • 本文字数:6127 字

    阅读完需:约 20 分钟

Milvus 架构



Milvus 的架构设计是将存储和计算分离的,从系统层次上划分,分为以下四类:

  • 访问层:访问层由一组无状态代理组成,作为系统的前层和用户的终端

  • 协调者服务:协调者服务将任务分配给工作节点,并作为系统的大脑发挥作用

  • worker 节点:工作者节点是系统的胳膊和腿,遵循协调者服务的指示,执行用户触发的 DML/DDL 命令

  • 存储:存储是系统的骨骼,负责数据的持久性。它包括元数据存储、日志 broker 和对象存储。


Milvus 是依赖外部组件实现存储能力的,存储相关的组件包含如下:

  • metadata storage:元数据存储,实现使用 etcd,负责维护集群 segment 分配及分布情况、集合信息、用户的角色权限、Milvus 内部组件的服务注册和发现

  • message storage:消息存储组件,支持 pulsar 或 kafka 实现,所有查询请求(包括插入、删除)数据都会先写入到 message storage,再由 query node 或 datanode 节点进行消费

  • object storage:对象存储,支持 minio、s3 兼容接口对象存储、azureblob 实现,是数据最终持久化的地方,数据包含:binlog、index、增量文件


抛开外部依赖的存储组件,milvus 内部的其他组件都是无状态的,支持水平扩展


Milvus 数据的组织方式



Collection(集合)

类似于关系型数据库中的表,是 milvus 中最大的数据单元表示


Shard(分片)

milvus 默认通过主键 hash 来实现分片,通过对一个集合切分成多个 shard,可以将不同分片发送给不同的 worker node 去处理,这样充分利用集群的并行计算能力。一个分片对应一个 vchannel,vchannel 会映射到一个 pchannel(对应为 kafka 中的一个 topic);切记,shard 主要作用是提高并行写入能力,而不是读取能力



Partition(分区)

一个 Shard 中可以有多个 partition。Milvus 中的 partition 是指在一个集合中标有相同标签的一组数据。常见的分区方法包括按日期、性别、用户年龄等进行分区。创建 partition 可以有利于查询过程,因为巨大的数据可以通过分区标签进行过滤。相比之下,Shard 更多的是提高写数据时的扩展能力,而 Partition 更多的是读数据时的系统性能提升。



segments(段)

在每个 partition 中,都有多个 segment。segment 是 Milvus 中系统调度的最小单位。有两种类型的段:growing segment(增长段)和 sealed segment(封存段);growing segment 是由查询节点订阅的。Milvus 用户不断地把数据写进 growing segment。当一个 growing segment 的大小达到一个上限(默认为 512MB)时,系统将不再允许向这个 growing segment 写入数据,因此这个 growing segment 会变为 sealed。索引是建立在 sealed segment 上的。

为了实时访问数据,系统会同时读取 growing segment(流数据)和 sealed segment 中的数据(批数据)。


Entity(实体)

每个段都包含大量的实体。Milvus 中的一个实体相当于传统数据库中的一行。每个实体都有一个唯一的主键字段,它也可以自动生成。实体还必须包含时间戳(ts),和向量字段(Milvus 的核心)。

Milvus 组件介绍

milvus 集群包含如下内部组件:

Proxy(代理服务)

是 client 请求的前端入口,无状态,校验 client 请求,为 client 请求创建对应的 log msg 发送到 log broker 对应的 pchannel,并把多个 query node 返回的查询结果集进行 reduce 处理后再返回 client


协调服务组件

协调器服务负责将任务分配给工作节点,并作为系统的大脑发挥作用。它承担的任务包括集群拓扑结构管理、负载平衡、时间戳生成、数据声明和数据管理。有四种协调器类型:根协调器(root coordinator)、数据协调器(data coordinator)、查询协调器(query coordinator)和索引协调器(index coordinator)

Root coord(根协调器):

负责处理数据定义语言(DDL)和数据控制语言(DCL)的请求,如创建或删除集合、索引,以及管理 TSO(时间戳 Oracle)和时间标签的发布。

Query coord(查询协调器):

查询协调器负责管理拓扑结构和查询节点的负载平衡,以及从增长段到密封段的衔接。

Data coord(数据协调器):

数据协调器负责管理数据节点的拓扑结构,维护元数据,并触发刷新、压缩和其他后台数据操作,如下:

  • 分配段空间

数据协调器将 growing segment 中的空间分配给 proxy,以便 proxy 可以使用段中的 free 空间插入数据。

  • 记录段的分配和段内分配空间的到期时间

分配的每个段内的空间不是永久性的,因此,data coord 还需要记录每个段分配的到期时间。

  • 自动刷新段数据

如果段满了,数据协调器会自动触发数据刷新,使 growing segment 变为 sealed segment。

  • 为 data node 分配 vchannel

一个集合可以有多个 vchannels。data coord 决定哪些 vchannel 被哪些 data node 所消费。

Index coord(索引协调器):

索引协调器负责管理索引节点的拓扑结构,建立索引,并维护索引元数据。


工作节点

工作节点是数据实际的执行者,它遵循协调者服务的指令,执行来自 Proxy 的数据操作语言(DML)命令。由于存储和计算的分离,工作节点是无状态的,当部署在 Kubenetes 上时,可以提高系统的扩展性和灾难恢复能力。有三种类型的节点:

Query node(查询节点)

查询节点检索增量的日志数据,并通过订阅 log broker 消费并将其变成不断增长的 growing segment;从对象存储加载历史数据,并在向量和标量数据之间进行混合搜索。

Data node(数据节点)

数据节点通过订阅 log broker 检索增量日志数据,处理写请求,并将日志数据打包成日志快照(binlog)并存储在对象存储中。如下:

  • 消费数据

从 data coord 分配的 channel 中消费数据,并为数据创建一个序列。

  • 数据持久性

在内存中缓存插入的数据,当数据量达到一定的阈值时,自动将这些插入的数据刷入对象存储。

Index node(索引节点)

索引节点负责执行索引构建工作。索引节点不需要常驻内存,可以用 serverless framework 框架实现。


每类的协调器和工作节点是一一对应的


Milvus 如何进行数据处理

MsgStream 接口是 milvus 数据处理的关键,接口提供了 AsProducer(channels []string)和 AsConsumer(channels []string, subNamestring)方法,分别作为生产者或消费者向 log broker 中指定的 pchannel(对应 kafka topic)生产或消费数据。



当集合创建的时候可以指定 shard 数量,每个 shard 对应一个 vchannel,因此一个集合可以有多个 vchannels(这些 vchannels 又被称作 DmChannels(data manipulation channels)),不同集合的 vchannel 可以分配到同一个 pchannel 以达到复用目的,如下图:



MsgStream 的 Producer()负责向 pchannel 写数据,有两种写数据方式:

  • Single write:通过对不同的 entity 的主键进行 hash 得到不同的 vchannel,然后将这些 entity 写入到对应 pchannel

  • Broadcast write:将 entity 写到指定的 channels

写数据

简单流程是 proxy 将插入请求写入 log broker 的 pchannel,data node 通过消费 pchannel 将写入数据进行序列化后持久化到对象存储。如下图:



proxy 如何知道要发送到哪些 pchannel(kafka topic)?data node 又是如何知道各自从哪些 pchannel 消费?

从更细的角度讲,当 proxy 收了插入请求,会从 rootcoord 获取 vchannels 和 pchannels 的映射关系,然后创建 MsgStream 对象;然后 proxy 请求 data coord 为插入请求分配 segments,然后将所请求的 sements 信息插入到 log broker 对应的 pchannel 中,这样 segment 信息便不会丢失



data node 启动时候从 etcd 读取元数据信息(vchannel 的分配信息以及和 pchannel 的映射关系),这些 vchannels 要怎么分配给这些 data node 是由 data coord 负责调度的,然后每个 data node 创建自己的 Msgstream 对象作为消费者从 log broker 对应的 pchannel 消费数据并持久化到对象存储。然后将存储路径信息反馈到 data coord 从而记录到 etcd 中。



由于不同的集合的 vchannels 会共享一个 pchannel,所以在 data node 从 pchannel 消费数据时会得到多个集合的数据,为解决这个问题,milvus 引入了 flowgragh,flowgraph 被集成在 data node(也包括 query node)中,用于过滤出同一个集合的数据



读数据

load 阶段:

proxy 组件收到 data load 请求后会发送给 query coord,query coord 负责分配 shard 到不同的 query node,这个分配过程是使用 rpc 通信的

query node 根据分配的 vchannel 映射的 pchannel,创建 MsgStream 对象然后从 log broker 对应 pchannel 进行消费,这个过程同样需要使用到 flowgraph 去过滤对应的集合 msg



search 阶段:

对于 search 请求,proxy 会将请求写入到 DqRequestChannel 中,query node 通过订阅 DqRequestChannel 得到 search 请求时,需要同时对存储在 sealed segments 中的批量数据和动态插入 Milvus 并存储在 growing segments 中的流数据进行本地查询(通过 load 阶段分配好的 shard,连接到 log broker 对应的 pchannels 进行消费)。之后,查询节点需要对 sealed segments 和 growing segments 的查询结果进行汇总。这些汇总的结果通过 gRPC 传递给 proxy。

proxy 收集来自多个查询节点的所有结果,然后聚合它们以获得最终结果。然后,proxy 将最终的查询结果返回给客户端。由于每个查询请求和其相应的查询结果都由相同的唯一请求 ID 标记,proxy 可以找出哪些查询结果对应于哪个查询请求。



DDL 操作

常见的 DDL 操作包括查询集合 schema、查询索引信息、创建集合、删除集合、构建索引、删除索引等。

DDL 请求从客户端发送到 proxy,proxy 进一步将这些请求按照接收的顺序传递给 root coord,root coord 为每个 DDL 请求分配一个时间戳并对请求进行动态检查。proxy 以串行方式处理每个请求,即每次处理一个 DDL 请求。proxy 在完成对前一个请求的处理并收到 root coord 的结果之前,不会处理下一个请求。proxy 与 root coord 之间是使用 gRPC 通信。root coord 会按分配的时间戳顺序来处理这些请求,并记录最新已处理的时间戳。当收到一个时间戳比最新处理时间戳要小的请求,root coord 为保证时间顺序会直接拒绝掉该请求。



索引构建和删除

创建索引

在收到来自客户端的索引构建请求后,proxy 首先对这些请求进行静态检查,并将其发送到 root coord。然后,root coord 将这些索引构建请求持久化到 etcd 中,并将这些请求发送给 index coord。



如上图所示,当 index coord 收到来自 root coord 的索引构建请求时,它首先将任务持久化到 etcd 中。索引构建任务的初始状态为 Unissued。index coord 维护每个 index node 的任务负载记录,并将入站任务发送到负载较少的 index node。在任务完成后,index node 将任务的状态修改为完成或失败并写入 etcd。然后,index coord 将通过在 etcd 中查找来了解索引构建任务是成功还是失败。如果任务由于系统资源有限或 index node 退出而失败,index coord 将重新触发整个过程,并将同一任务分配给另一个 index node。


删除索引

同样的,index coord 也负责处理索引的删除请求。当 root coord 收到来自客户端的删除索引请求时,它首先将该索引标记为 "dropped",并将结果返回给客户端,同时通知 index coord。然后,index coord 过滤出所有的被标记为 dropped 的索引任务的 IndexID,通过后台协程逐渐删除对象存储(MinIO 和 S3)中对应 IndexID 的索引记录。这个过程涉及 recycleIndexFiles 接口。当所有相应的索引文件被删除后,被删除索引的元数据信息也会从 etcd 中删除。



数据插入和持久化

数据插入

Proxy 把请求收到的数据哈希到多个桶里,然后向 data coord (data coordinator) 去请求分配 segment 的空间。再把请求到的空间的这一部分数据插入到 message storage 里面。插入到 message storage 之后,这些数据就不会再丢失了。

持久化



Data node 订阅 message store,通过订阅它然后去消费这个 insert message, 接着会把这个插入请求放到一个内存的 buffer 里面,在积累到一定的大小后,它会被 flush 到一个对象存储里面。有点类似 MySQL flush innodb buffer pool 的机制,到了一定的脏页比例就开始刷硬盘

小结



第一步:Insert message 从 SDK 发到 proxy 之后,proxy 把这个 insert message 插到相应的 log broker 中,插入到 log broker 中的每条消息都有唯一的主键和一个时间戳;

第二步:插入到 log broker 之后,数据会被 data node 消费;

第三步:Data node 会把数据写入进持久化存储当中,最终数据在持久化存储中是基于 segment 的粒度来组织的,也就是说这个消息除了中主键和时间戳,还会被额外赋予一个 segment ID,以标识出这条数据最终会属于哪个 segment。Data note 在收到这些信息之后,会把相应的信息写入相应的 segment 中,并最终写入到持久化存储中去。

第四、五步:在数据被持久化之后,如果说基于这些数据直接做查询的话,查询速度会比较慢,因此一般情况下会考虑去构建一些索引去以加速查询速度。这时 index node 就会把信息从持久化存储里拉出来并构建索引,而构建的索引文件又会被回写进持久化存储中(S3 或 Minio 等等)

Log broker 和 object storage 也是 Milvus 架构中保障数据可靠性很重要的两部分,在系统设计中这两部分也可以分别选择一些第三方组件,来保障不同情况下的可靠性。



实时搜索实现原理

数据载入 QueryNode 流程



用户需要执行 collection.load()将集合数据加载到内存中,milvus 才能提供数据查询。这个过程中 querycoord 会请求 datacoord 获取当前哪些 segment 已经被持久化到对象存储以及最新的 checkpoint 时间,对于比这个 checkpoint 时间还新的数据是仍未被持久化到对象存储的数据,然后 querycoord 根据 datacoord 返回的已持久化 segment 信息和增量数据信息(对应为消息队列中的增量消息 topic)按照负载均衡策略分配给 querynode,querynode 从对象存储中获取持久化 segment 加载到内存中执行索引搜索、从消息队列中消费增量消息进行暴力搜索(即:FLAT 类型的索引),最后将结果进行合并

数据查询一致性的实现

一致性级别及适用场景:

查询一致性级别从高到低分为以下 4 种,级别越低查询性能越好但读取的数据一致性越差

  • Strong

强一致性,某一时刻在任意查询节点数据副本上读取到的数据是一致的



  • Bounded staleness

尽可能一致,允许在一个较短的时间区间内不同副本上读取到的数据出现不一致,但从全局角度看,在其担保的时间区间内数据是一致的



  • Session

会话一致性,同一个会话中写入的数据能立即读到



  • Eventually

最终一致,在所有数据副本中不担保读写顺序,但最终数据是一致的



如何指定查询一致性级别:

搜索时通过 consistency_level 参数指定

res = collection.query(  expr = "book_id in [2,4,6,8]",  offset = 0,  limit = 10,   output_fields = ["book_id", "book_intro"],  consistency_level="Strong")
复制代码

实时搜索流程

相关概念:

  • Service Timstamp

定义 querynode 当前可提供查询服务支持的数据时间点

  • Guarantee Timestamp

querynode 获取到的 Service Timstamp 未达到 Guarantee Timestamp 之前查询不被执行。不同的一致性是通过 guarantee ts 来控制的,其控制查询节点在数据时间没达到 guarantee ts 前查询不被执行;strong 级别是 guarantee ts 等价与当前查询时间、bounded staleness 级别时 guarantee ts 为相对于当前查询时间更小的一个时间、session 级别时,guarantee ts 为同一个 client 中最近一次写操作的时间、eventually 级别时,只是将 guarantee ts 设置为一个比较靠前的时间

  • Travel Timestamp

又称之为“时间旅行”,当在查询中指定了 traval ts,所返回的数据是 travel ts 之前产生的数据,即使数据在当前时间已经被删除(前提是 travel ts 之前的数据还未被 compact)

以下实时查询的执行流程图:


快速安装

本地安装

 curl -O https://raw.githubusercontent.com/milvus-io/milvus/v2.4.0/deployments/docker-compose/docker-compose.yml
复制代码

启动命令

  docker-compose up -d
复制代码


发布于: 刚刚阅读数: 3
用户头像

Quincy

关注

路过岁月 慢慢生活 2018-08-01 加入

履历:滴滴、金山 资深开发 深耕运营领域 一个心怀远方的搬砖懒汉

评论

发布
暂无评论
Milvus入门小册_Milvus_Quincy_InfoQ写作社区