写点什么

Ray 1.0 架构解读

用户头像
lipi
关注
发布于: 2021 年 02 月 12 日

本来想独立写一个关于 Ray 怎么执行 worker 的文章,然后发现了 Ray 1.0 Architecure 这篇官方文章。觉得挺好,索性就在这个文章上边做翻译,边写一些理解。原文在 google docs,我放在网盘上了,可以下载

本文的目标是为了对 Ray 分布式系统做一个综述。它也是一个手册,可以帮助以下的人:

  • 对 Ray 的底层存在疑问的开发者

  • 把 Ray 作为分布式后台的工程师

  • Ray 后台的贡献者

这个文档并不是对 Ray 的介绍。如果需要了解 Ray ,可以看 A Gentle Introduction to RayRay GithubRay Slack。也可以看一下 Ray Design Patterns。这个文档可以取代之前的 Ray Paper,因为基础架构从 0.7 升级到了 0.8

综述

API 哲学

Ray 致力于提供一个针对分布式系统的通用 API。达到这个目标的一部分核心能力是提供简单但通用的程序抽象(general programming abstractions),让系统完成所有困难的工作。

Ray 的哲学是尽可能让开发者使用现有的 Python 库系统,而不是造新的轮子

Ray 开发者使用一些 Python 原语来实现他们的逻辑,系统负责执行,处理分布式和并行管理。Ray 的用户从资源角度考虑用户管理,而系统基于以下资源需求管理调度自动缩放


Ray 框架


一些应用可能需要不同的系统级别的平衡,这些不能通过 Ray 的核心能力的抽象来完成。因此,Ray API 的第二个目标是提供对系统能力的细粒度的控制。这个是通过参数的配置来实现,例如任务替换故障处理应用生命周期

Ray 提供了基础的原语(Primitives)。原语可以理解成系统的基本操作。就像 Alan Turing 定义图灵机,定义了六个基本操作。Ray 提供了 init、remote、put、get 等基本原语。但是通过这些基本原语达不到更细粒度的控制,例如针对不同的计算配置不同的 CPU、内存。或者调度的时候,能够提供人工设定调度的方法。Ray 希望通过参数配置来实现任务、故障和应用的细粒度控制。配置能够达到什么样粒度的控制,可以从下文看到

系统范围

Ray 寻找能够在分布式系统上开发的一种通用方法。具体来说,这包括粗粒度的弹性任务分配(例如,serverless 计算的类型),机器学习训练(例如,Ray Tune,RLib,RaySGD), Serving(例如, Ray Serve 等),数据处理(例如,Modin、Dask-on-Ray、Mars-on-Ray),和特定(ad-hoc)计算(例如,Python 应用的并行、粘合不同的分布式框架)

Ray API 能够让开发者容易的组合在一个分布式框架上组合不同类型的库。举个例子,Ray 任务和 Actor 可以在分布式训练(例如,torch.distributed)里面调用或者被调用。在这种情况下,Ray 设计为一个分布式的粘合系统,因为它的 API 是通用的,足够支持在不同的工作类型中作为界面层服务

这里一致强调 Ray API 设计的目的是作为分布式的“胶水”。通过多层次匹配,例如计算、训练、数据、ad-hoc 等,来达到这一目的

系统设计的目标

Ray 架构的核心原则是 API 的简单和通用,而核心系统的目标是性能(低开销和横向扩展)和可靠性。有时候,我们愿意牺牲也挺好的目的。举个例子,Ray 包含了一些组件,例如,分布式引用计数和分布式内存,它增加了架构的复杂度,但是对于性能和可靠性来说这是必须的

简单和通用,和系统的性能和可靠性的目标,必须有所平衡

为了性能,Ray 基于 gRPC 构建,并且能够在很多场景下匹配甚至超过原生 gRPC 的性能。相对于 gRPC 本身,Ray 让应用平衡并行和分布式操作,和分布式内存共享(通过一个共享的对象存储来实现)更简单

为了可靠性,Ray 的内部协议设计提供了发生故障的时候的纠错性,同时减少了通用情况下的开销。Ray 开发了一个分布式引用计数协议来保障内存安全,并且提供了故障恢复的不同选项

由于 Ray 用户通常用资源而不是机器来描述计算,Ray 应用可以从笔记本扩展到集群,而不同更改代码。Ray 的分布式溢出调度(distributed spillback scheduler)对象管理就是为了能够达到这种无缝的扩展而设计的,并且开销较低

相关系统

下面的表格比较了 Ray 和一些相关的系统列表。我们忽略了一些高阶的库(例如,RLib、Rune、RaySGD、Serve、Modin、Dask-on-Ray、Mars-on-Ray),这样的比较超过了这个文档的范围,这里只关注 Ray 核心,你可以参考 Ray 相关库的全列表

集群编排 Ray 能够基于例如 Kubernetes 或者 SLURM 之上的集群调度来提供轻量级、集成语言原语(例如任务和 Actor),而不是用容器和服务这样的原语。并行框架和 Python 的并行框架,例如多进程或 Celery ,Ray 提供了更多的通用的、高性能的 API。Ray 系统也显性的支持内存共享数据处理框架对于一些数据处理框架,例如 Spark、Flink、Mars 或 Dask,Ray 提供了一个底层的并且较窄的 API。这个 API 更加灵活,设计为一种分布式“胶水”框架。换句话说,Ray 并没有固定的对于数据 schema 、关系型表、或者流数据的定义,这些能力都是通过相关的库来提供(例如 Modin、Dask-on-Ray、MARS-on-Ray)Actor 框架和一些专门的 Actor 框架(例如,Erlang 和 Akka)不同,Ray 集成现有的语言,提供跨语言操作和语言层的原生类库。Ray 也透明的管理无状态计算的并行,并且显性的提供 Actor 之间的内存共享机制 HPC 系统很多 HPC(高性能计算)系统暴露一个消息接口,它是一个底层的任务和 Actor 的借口。这能够允许应用非常灵活,但是会让开发者花费更多的开发精力。很多这些系统和类库(例如,NCCL、MPI)也提供优化过的集合消息原语(例如,allreduce)。Ray 应用能够通过初始化 Ray actors 的通讯组来使用这些原语(例如 RaySGD 使用 torch distributed


对比 Ray 和相关系统的比较。Ray 定位为简单通用的原语的基本分布式系统,基于系统上的数据处理模式,可以基于 Ray 的基本原语或者类库来提供

框架概览

应用概念

Task - 在和调用程序不同的进程上的单个的函数调用。Task 可能是无状态的(通过@ray.remote函数注解),也可能是有状态的(通过@ray.remote做类注解,下面 Actor 部分会描述)。Task 是个异步操作。调用了 .remote装饰器的函数,会马上返回一个 ObjectRef 对象。未来可以通过这个对象来获取实际运行的返回

Task 是函数,也可能是 Actor 的类。调用的时候,会把 Task 的输入输出和中间计算代码封装成一个包,提交给 Ray 集群。这时候 Task 还不会真正进行计算,只是做了分配资源和向 GCS 注册的过程。这时候返回一个 ObjectRef 对象,当对这个对象执行 get 操作的时候。Task 才真正开始执行,get 操作会阻塞等待结果返回

Object - 应用的值。当执行 ray.put 的时候,会生成 Object。或者提交任务的时候,会生成 Object。Object 是不可变的,创建之后就不能修改。Worker 通过 ObjectRef 对象获得对 Object 的引用

Object 是什么?可以理解为存储在 Apache Arrow 内存里的值,以及对这个值的操作的封装

Actor - 一个有状态的 worker 进程(@ray.remote类的实例)。Actor task 必须通过一个句柄或者一个 Python 引用来提交到 Ray 集群

Driver - 程序起点,就是调用 ray.init() 的代码

Job - tasks、objects 和 actors 的集合,通过同一个 driver 发起

粗粒度的任务,更多的是一个概念。没有对应的函数或者类来对应

设计


一个 Ray 集群包括一组同类型的 Worker Nodes 和一个中心的全局控制储存实例(Global Control Store、GCS) 组成

系统的一些元数据由 GCS 管理,例如 actor 的地址。GCS 管理的元数据会较少频次的访问,但是会使用在绝大多数或者所有的 worker 熵。例如当前节点在集群的关系。这能够让 GCS 不要影响应用性能

GCS 和 Workers 是 Ray 最重要的组件。集中式的管理能够提供很好的调度监控,但是也会影响性能。GCS 希望能够较少被 worker 访问,后续还采用了一些 Local Scheduler 的方式,避免 worker 对 GCS 的频繁调用

所有权关系



大部分的 Ray 元数据都通过一种去中心化的概念来管理,叫做所有权(ownershop):每个 worker 进程管理并拥有它提交的 task 以及 task 的返回(ObjectRef)。这个所有者对 task 是否执行以及 ObjectRef 对应的值是否能够被解析负责。worker 拥有过它通过 ray.put创造的 object。

所有权机制有如下的好处:(相对于 Ray < 0.8 的中心化机制)

  1. 低延迟(~1 RTT,<200 us,RTT 往返时间)。经常访问的系统元数据在做更新的时候是在进程的本地的

  2. 高吞吐(~10k tasks/s per client;集群内线性扩展),因为系统的元数据通过嵌套的远程调用函数自然的分布在多个 worker 进程中

  3. 架构简化。所有者集中处理垃圾回收和系统元数据的逻辑

  4. 增强可靠性。worker 的故障会被隔离。例如一个远程调用的故障不会影响另一个

所有者机制可能会带来一个取舍(trade-off):

  1. 为了解析 ObjectRef,object 的所有者必须是可访问的。这意味着一个 object 必须和他的所有者分享。可以看 Object failures 和 Object spilling 部分获得更多关于 object 恢复和持久化的信息

  2. 所有权不能被转移

所有权机制的提出可以解决中心化负担过重的问题。把很多处理逻辑让 task 的调用进程来处理。这个和 rust 的所有权机制有些类似,后面可以具体看一下 Ray 怎么实现分布式任务的所有权机制

组件

一个 Ray 的实例包含了一个或者多个 worker 节点(nodes)。每个节点包含以下的物理进程:

  1. 一个或者多个 worker processes,负责 task 的提交和执行。一个 worker process 既可以是无状态的,也可以是一个 Actor。每个 worker process 对应一个特定的 job。默认的初始化 worker 数量等于机器的 CPU 的数量。每个 worker 储存如下信息:

  2. Raylet ,Raylet 在同一个集群共享所有的 jobs 。raylet 有两个主要的线程:

每个 worker 进程和 raylet 分配唯一的 20 位(bytes)的 ID 和一个 IP 地址和端口。同样的地址和端口能够被子组件重用(例如,如果一个之前的 worker 进程销毁了),但是这个唯一 ID 不能重用(ID 随着进程销毁而销毁)。worker 进程和它的本地 raylet 进程资源共享(fate-share)

worker 节点的其中一个设计为 head 节点。区别于其他进程,head 节点承担如下任务:

  1. Global Control Store(GCS)。GCS 是一个 key-value 的服务器,包含了系统级别的元数据,例如 objects 和 actors 的位置。有一个进行中的对 GCS 的优化,以便 GCS 可以运行在任意或者多个节点,而不是设定的 head 节点

  2. dirver 进程。dirver 是一个特殊的 worker 进程,它执行上层应用(例如,python 里的__main__)。它能够提交 tasks,但是自己本身不能执行。Driver 进程能够运行在任何 node 熵,但是默认是在 head node 里

这里面强调了 small 和 large 对象。Ray 里面,对于小于一定大小(100Kb,可以设置)的对象,就通过进程传递。如果超过一定大小,就通过 plasma 共享内存来传递

Driver 可以支持 python 和 java,具体怎么实现会后面另起一篇来描述

GCS 和 worker 之间既要去中心化,又需要 GCS 来做调度。worker 自己也增加了互相调度的机制。这种混合模式不能说完美,也是一个适应场景的需要。还是很好的方式

连接到 Ray

一个应用的 Driver 可以通过下面的方式连接到 Ray:

  1. 不带参数调用 ray.init()。这样就启动了一个嵌入的单节点的 Ray 实例,并且可以立刻准备好让应用来调用

  2. 通过指定 ray.init(address=<GCS addr),连接到一个已经启动的 Ray 集群。在这种调用方式下,driver 连接到 GCS,寻找集群其他组件的地址。例如本地 raylet 地址。因为 Ray 有共享内存的机制,driver 必须在与位于集群存在节点的同一位置。

第二条的意思就是,driver 如果连接到一个存在的 ray 集群。driver 需要连接到和它在一个 node 的 raylet ,加入进去。否则 driver 就没办法使用这个 node 的共享内存,也就没办法进行 task 的调用和 object 的操作。

Ray 还在做一个优化工作。就是允许通过一个不是集群的节点机器连接到 Ray 集群。当前可行的办法是模拟一个“零 cpu ”的 node 来加入,并且作为客户端连接到这个 node。注意这需要集群和客户端机器的双向网络连接

这里也说明了,目前,driver 需要在和 Ray 集群的一个节点上。而不是在集群外

语言运行时

Ray 的核心组件是用 C++ 开发的。 Ray 通过一个嵌入式的 C++ 的库叫“core worker” 来支持 Python 和 Java 。这个库提供了所有权表、进程内存储、和其他 worker 的 gRPC 通讯和 raylets 的能力。因为类库是用 C++ 系的,所有语音的运行时共享了一个通用的高性能 Ray worker 协议



代码参考:

Task 生命周期

task 的所有者负责提交 task,并且把 task 的执行返回(ObjectRef)解析为它的对应值



进程提交了 task,进程就作为 task 返回的所有者。可以通过 raylet 获取资源来执行 task。在这里,driver 是结果 “A” 的所有者,“Worker 1” 是结果 “B” 的所有者

当一个 task 被提交后,task 的所有者等待任何依赖可用。例如 ObjectRef 是我们作为参数带入到 task 里的依赖。注意到这个依赖需要是本地的;task 所有者考虑这些依赖在集群里尽可能快的可用。当这些依赖准备好的时候,所有者从分布式调度里面获得资源来执行这个 task。一旦资源可用,调度就批准这个提交。并且以 task 所有者所在的 worker 地址里给出相应

task 所有者在所在的 worker 通过 gRPC 发送一个特定的 task 描述来调度 task 。当执行 task 的时候,worker 必须存储这个执行的返回值。如果返回值很小,worker 就直接在所有者 worker 的内部返回,把返回值拷贝到进程内的对象存储。如果返回值很大,worker 在它的本地共享内存对象存储这个结果,并且反馈给所有者这个对象在分布式内存。这就允许所有者可以在它所在的本地 node 引用这个对象和获取这个对象

这一段说的非常绕。task 被提交之后,需要等待 Ray 集群确认资源,然后才能提交。提交之后,获取返回。

当一个 task 带一个 ObjectRef的参数提交之后,对象的值在 worker 开始执行的时候必须是可解析的。如果值很小,就从所有者的进程内对象存储到 task 描述里面,这个 task 的描述可以被执行的 worker 引用。如果值很大,对象就必须通过分布式内存来获取,因此 worker 就会在它的本地共享内存有一个这个值的拷贝。调度协调这个对象就通过对象的位置和从其他节点获取副本来完成对象的传输

反复强调对 ObjectRef 的操作,针对小的和大的对象有不同的处理方式。在 raylet 之间有一层通讯机制,调度 raylet 所在的共享内存的数据。worker 本身也有个消息传递机制,小的数据可以直接传输。这也是 Ray 优化数据传输的一个策略

Task 可能出错。Ray 区分两种类型的 task 错误:

  1. 应用层级。在 worker 还存在,但是任务执行出错的任何场景。例如,一个任务在 python 抛出一个 IndexError 错误

  2. 系统层级。当 worker 异常中止的场景。例如,一个进程出现了 segfaults错误,或者 worker 的本地 raylet 中止了

如果是在应用层级的 Tasks 错误不会重试。异常被捕捉并且出存在 task 的返回值里。系统层级的 task 错误会自动重试一定次数

代码参考:

Object 生命周期


Ray 的分布式内存管理。 Worker 可以创建和获得 Objects。Object 的所有者负责确定什么时候 Object 可以安全释放

Object 的所有者是初始化 ObjectRef,并且提交了 task 或者调用了 ray.put的 worker。所有者管理 Object 的生命周期。 Ray 确认如果所有者还存在,Object 最后会解析为它的值(或者抛出一个 worker 异常)。如果所有者不存在,试图获取 Object 值的操作不会悬停,而是抛出异常,即使存在 Object 的物理拷贝

每个 worker 储存它所属的 Object 的引用计数。可以看 Reference Counting 获得更多信息。引用仅仅在以下情况下被统计:

  1. 传递一个 ObjectRef 或者包含 ObjectRef 这个参数的 task

  2. Task 返回一个 ObjectRef 或者包含 ObjectRef 的返回

Object 可以出存在进程内的内存或者分布式对象存储。这个决定是为了减少内存占用量和 Object 的解析时间

当没有错误发生,所有者确认 Object 至少还有一个拷贝还在最终可用,并且还在使用范围内(非零引用计数)。可以查看Memory Management 获得更多信息

有两种方法可以获得 ObjectRef 对应的值

  1. 对 ObjectRef 调用 ray.get

  2. ObjectRef作为参数传递给 task。执行的 worker 会解析 ObjectRef 并且用解析完成的值替换为任务的参数

当一个 Object 很小的时候,它可以解析接收到进程内的存储。大的 Object 会存储在分布式对象,并且采用分布式协议来解析。查看 Object Resolution 获取更多信息

当没有错误发生,可以保证解析最终会成功(但是可能会抛出应用级别的异常,例如,如果 worker segfaults)。如果有异常,解析会抛出一个系统级别的异常但是不会中止。如果 raylet 出现错误,一个储存在分布式内存中的 Object 可能会失败,并且所有的 object 的拷贝都丢失。Ray 也提供了一个选项通过 reconstruction 来自动化恢复这些丢失的 Objects。如果 Object 的所有者出现问题,Object 也可能失败

代码参考:

src/ray/core_worker/store_provider/memory_store/memory_store.cc

src/ray/core_worker/store_provider/plasma_store_provider.cc

src/ray/core_worker/reference_count.cc

src/ray/object_manager/object_manager.cc

Actor 生命周期

Actor 的生命周期和它的元数据(例如 IP 地址和端口)是通过 GCS 服务来管理的。每个 actor 的客户端都可能缓存这些元数据到本地,并且使用这些元数据直接通过 gRPC 发送 task 到其他 actor


不同于提交 task,它是完全的去中心化,并且被 task 的所有者管理。actor 生命周期是通过 GCS 服务中心化的方式来管理的

当一个 actor 在 python 里面创建之后,创建的 worker 就同步 actor 信息到 GCS 注册。这可以确保正确性,以防 worker 在 actor 创建之前失败。一旦 GCS 响应了,actor 剩下的创建过程就是异步的了。创建 worker 的进程会在把本地化的一个特殊 task(这个 task 叫做 创建 actor 的 task)进入队列。这有点像一个通常的非 actor task,除了它的对 actor 进程生命周期的特殊资源的需求。这个创造者异步解析创建 actor 的 task,然后发送到 GCS service 来调度。于此同时,创建这个 actor 的 python 调用程序立即返回一个 “actor 句柄”。即使创建 actor 的 worker 没有调度好,这个 actor 句柄也可以用。查看 Actor Creation 获得更多信息

Actor 是需要注册到 GCS。Actor 被一个启动 Task 创建并且被本地和 GCS 调度。

actor task 的执行有点类似于通常的 task:通过 gRPC 提交,返回 future,然后如果 ObjectRef 没有准备好,就不会执行。但是有亮点不同:

  1. 执行一个 actor task,不需要通过调度获取资源。因为 actor 在它的创建 task 被调度的时候,已经在它的生命周期获得了资源

  2. 每次 actor 的调用,task 都按照它们被提交的顺序来执行

在 actor 的创建者退出,或者集群没有更多可用的 task 或者 句柄的时候,actor 就会被清除(查看 Reference Counting 了解更多中止 actor 的情况)。注意这不针对 detached actors,它是设计为长期存在的 actor。detached actor 可以通过名称来引用,它的清除必须使用 ray.kill(no_restart=True)来操作。查看 Actor Death 获得更多信息

Ray 也支持异步 actor,它是用 asyncio 事件驱动来并行执行的。提交任务到这些 actor ,从调用的角度来看,和常规 actor 一样。唯一的区别是当 task 在 actor 运行的时候,它是提交到一个 asyncio 的事件循环,在后台的线程或者线程池里运行。而不是在当前的线程运行

代码参考:

src/ray/core_worker/core_worker.cc

src/ray/core_worker/transport/direct_actor_transport.cc

src/ray/gcs/gcs_server/gcs_actor_manager.cc

src/ray/gcs/gcs_server/gcs_actor_scheduler.cc

src/ray/protobuf/core_worker.proto

故障模式

系统故障

Ray Worker 节点设计为同样的,因为任何一个节点丢失都不会影响。系统故障指的是针对 head 节点的故障。Ray 还在做一个优化,就是让 GCS 可以运行在多个节点上,降低系统故障的可能性

所有节点都配置一个唯一的 ID ,通过心跳机制互相通讯。GCS 对集群的相互关系做出响应。例如,确定那个节点在线。GCS 如果发现那个节点 ID 中止了,就会启动一个新的 raylet 和对应的新的唯一 ID 来重用资源。一个 raylet 但是心跳连接不存在了就会退出。节点的故障检测机制现在还不支持网络分区:如果一个 worker 节点从 GCS 分离了,它就会连接超时并且被标记为中止了

每个 raylet 都会把自己本地的 worker process 汇报给 GCS。GCS 广播这些故障事件并且处理 actor death。所有 worker 进程在节点上和 raylet 共存(fate-share)

raylet 设计目的是为了在单个的 worker 进程失败后,防止集群资源和系统状态的泄漏。如果一个 worker 进程(本地或者远程的)失败了,每个 raylet 都会做出响应,如下:

  • 释放集群资源,例如 CPU。通过清除失败的 worker 进程来市场资源(可以查看 Resource Fulfillment)。如果 worker 失败了,任何对这个 worker 的资源请求都会被取消

  • 释放被 worker 使用的分布式内存对象(查看Memory Management)。通过在 object directory 清除相关关系来实现

故障处理说的比较简单。围绕 worker 来,worker 失败了,就清除 worker 占用的 cpu 和其他系统资源,包括内存对象

应用模型故障



系统故障的设计代表 Ray 集群图的 task 和 obejct 和它们的所有者共存。例如上图,如果 worker 运行 a 在这个场景发生了故障,那通过 a 创建的 bz (灰色部分)都会被统计。同样的,如果 b 是一个 actor ,由 a 创建,这就有一些意义:

  • 如果其他进程时图获取这样一个 object 的值,就会收到一个应用层的异常。例如,如果 z ObjectRef 已经传回给 driver,driver 会通过 rat.get(z) 收到一个错误

  • 通过让程序运行在不同的 task,并且有不同的子所有权树。故障就会被隔离

  • 应用和 driver 共存,driver 是所有权的根节点

避免共存行为的应用级的选项是通过使用 detached actor 来实现,它可能超过它原始 driver 的生命周期,并且仅仅能通过显式的调用来消除。detached actor 自身能够拥有任何其他的 task 和 objects,它一旦销毁,就会和 actor 共存

还有一个优化在进行中,就是支持 object spilling ,它允许 object 持久化到它的所有者的生命周期之后

最后,Ray 提供了一些可选机制来增强透明的恢复。包括自动化 task retries, actor restart, 和 object reconstruction.

对象管理



进程内的存储 vs 分布式存储。这里显示了当提交一个依赖于 obejct(x)的 task(a)的时候,内存是如何分配的

通常,小的对象存储在它自己的进程里,而大的对象出存在分布式对象存储里。如何储存取决于每个对象的内存占用和解析时间。如果是后一种情况,就是大的对象的情况,在进程内会有一个占位的对象,标记对象已经提交到了共享内存

在进程中的对象能够通过直接内存拷贝的方式快速解析,但是可能会带来更多的内存占用。因为如果对象被很多进程引用,就会造成很多额外的拷贝操作。单个 worker 的进程容量受限于所在机器的内存容量,也受限于一定时间对象可以被引用的数量。对于多次被应用的对象,吞吐量也可能被所有者进程的处理容量限制

相反,解析一个分布式内存的对象至少需要一个 worker 之间的 IPC (进程间通信)。如果 worker 本地共享不存在这个对象的拷贝,还会产生一个 RPC 通信。花句话说,因为共享内存存储是通过“共享内存机制”实现的,一个节点上的多个 worker 可能引用同一块对象的拷贝。采用“零拷贝反序列化”的方式就可以降低总内存占用。这种用法可以让进程应用一个对象而不同有本地的对象,意味着一个进程可以应用一个超过单台机器内存容量的对象,因为对象的多个拷贝可以储存在不同的节点上。

这块应该是 Ray 的精髓之一了。如果把 Ray 作为一个大内存来使用,Ray 可以帮助实现多机的内存调用,还提供多机内存数据的调度。底层是基于 Apache Arrow Plasma 实现的,Plasma 也是 Ray 团队贡献给 Apache Arrow 的

代码参考:

src/ray/core_worker/store_provider/memory_store/memory_store.cc

src/ray/core_worker/store_provider/plasma_store_provider.cc

src/ray/common/buffer.h

src/ray/protobuf/object_manager.proto

对象解析

object 的值能够通过一个叫 ObjectRef 来解析。ObjectRef 包含两个字段:

  • 一个唯一的 20-byte 的标识符。这是产生这个 Object 的 task 的 ID 和 Object 的 ID 的组合

  • Object 所有者进程的地址。包含 worker 进程的唯一 ID ,IP 地址和端口,和本地 raylet 的 ID

一些 Object 通过进程内存储直接拷贝。举个例子,如果所有者调用了 ray.get,系统从本地进程存储查询和反序列化调用对象的值。如果所有者提交了一个独立的任务,这个任务会把这个 Object 内联(拷贝)到 task 的描述里面。因为这些 Objects 是在它所在进程的本地,如果一个其他进程想试图解析这个值,Object 就提交到分布式共享内存,然后就可以通过分布式对象解析的协议来获取



解析一个大的 Object。这个 Object x 在 Node 2 初始化。上面这个例子显示了一个过程:当 Task 的所有者(调用 ray.get 的进程)1、在 GCS 查找 Object 的地址;2、选择了一个地址并且发出了一个获取 Object 的请求;3、获得了这个 Object

大的 Object 存储在分布式内存,并且必须通过分布式协议来解析。如果这个 Object 已经在这个发出请求的 Caller 的本地内存有引用,这个应用的所有者就能够通过 IPC 来获取这个 Object。这返回了一个指向共享内存的指针,这个指针冷可能同时在同一个节点被其他的 worker 也引用了

如果一个 Object 在本地共享内存不存在,这个应用的所有者会提醒本地 raylet ,让它通过远程 raylet 获取这个 Object。本地 raylet 通过 Object directory 寻找对应的 Object 地址,然后向这个地址对应的 raylet 请求这个 Object

代码参考:

src/ray/common/id.h

src/ray/object_manager/object_directory.h

内存管理

对于远程调用 task 的情况,object 的值是在远程的执行 worker 上计算的。如果这个值很小,worker 直接用值返回给调用者。这个值是拷贝到调用者的进程空间。如果值很大,执行的 worker 就把值存储在它的本地共享内存。这个共享内存里面的初始化 copy 就被称为主拷贝(primary copy)



主拷贝和可清除的拷贝:主拷贝(在 Node 2)不能被清除。在 Node 1(通过ray.get创建) 和 Node 3(通过任务提交创建) 的拷贝在内存不够的情况下可以被清除

主拷贝是唯一的。在主拷贝的所有者对这个对象的引用计数大于 0 的时候,主拷贝不能被清除。对于其他的 object 拷贝,它可以根据 LRU 的策略在内存紧张的时候被清除。因此,如果单个 object 存储包含了所有的主拷贝,并且这些主拷贝都在内存,如果另外一个 object 必须要存储在内存,可能会接收到一个 OutOfMemoryError 的错误

大部分情况下,主拷贝都是在对象创建的时候第一次产生。如果初始化的拷贝因为某个故障丢失了,这个拷贝的所有者会在对象可用的位置指定一个新的主拷贝

一旦对象的引用计数变为 0 , 所有的对象拷贝都最终会被自动垃圾回收。小的对象会被所有者在进程空间直接清除。大的对象会通过 raylets 在分布式对象存储异步清除

raylets 也管理分布式对象的传输,他会在对象需要的时候创建额外的拷贝。例如,如果一个依赖于某一个对象的任务提交到远程的节点上

引用计数

引用计数是计算机编程语言中的一种内存管理技术,是指将资源(可以是对象、内存或磁盘空间等等被引用次数保存起来,当被引用次数变为零时就将其释放的过程。引用的意思,就是有一个指针指向了这个内存,拥有这个指针的程序可以使用这个内存。引用计数就是统计有多少个程序可以使用这个内存

每个 worker 为每个所属的 object 储存了一个引用计数。所有者本地的引用计数包括了本地 python 的引用计数和所提交的依赖这个 object 的任务数量。

意思就是,object 如果被本地进程通过 python 引用了,计算在内。提交的任务如果也引用了,也计算在内

前者(本地 python 的引用计数)在 python ObjectRef 释放的时候会递减。后者(提交任务引用了 Object )当任务成功结束的时候会递减

ObjectRef 也可以拷贝到另外一个进程,方法是把 ObjectRef 储存到另外一个 Object 里面。这个进程获得的这个 ObjectRef 的拷贝被称为“借用”。举个例子:

@ray.remotedef temp_borrow(obj_refs):  # Can use obj_refs temporarily as if I am the owner.  x = ray.get(obj_refs[0])
@ray.remoteclass Borrower: def borrow(self, obj_refs): self.x = obj_refs[0]
x_ref = foo.remote()temp_borrow.remote([x_ref]) # Passing x_ref in a list will allow `borrow` to run before the value is ready.b = Borrower.remote()b.borrow.remote([x_ref]) # x_ref can also be borrowed permanently by an actor.
复制代码


这些引用都是通过分布式引用计数协议来跟踪的。简单的说,每当一个对 Object 的引用离开了本地范围,Object 的所有者就增加一个引用计数。例如,在上面的代码里面,在调用 borrower.remoteb.borrower.remote 的时候,所有者会为 x_ref 增加一个在等待任务的计数。一旦任务完成,系统就会返回给所有者还在“借用”的引用计数的列表。例如,在上面的代码里面,如果temp_borrow 不再借用 x_ref了,temp_borrow 的 worker 会返回说不再借用了,但是 Borrower actor 会返回还会继续借用 x_ref

因为 actor 是一种有状态的,对于获取的数据,会一直保留。除非 actor 自己的生命周期结束

如果 worker 仍然借用了任何一个引用,所有者会增加这个 worker 的 ID 到一个储存在本地的借用者的列表里。借用者保留第二份引用计数,在借用者的本地。这份引用计数和所有者本地保存的一直,并且所有者会在引用计数为 0 的时候询问借用者。在这个时候,所有者可能会从借用者列表里面删除这个 worker 的 ID,并且回收这个 object。在上面的例子里,Borrower actor 永久借用了这个引用,因为所有者不能释放这个 object,除非 Borrow actor 它自己生命周期结束

借用者也可以递归的增加到所有者的本地列表里。如果借用者自己传递了 ObjectRef 到其他进程的时候,就产生了递归。在这个情况下,当借用者返回给所有者,说它本地的引用计数为 0 了。所有者会把借用关系转给新的借用者,也就是有递归借用关系的那一个 worker

一个类似的协议也可以用于跟踪所有者的返回 ObjectRef。例如:

@ray.remotedef parent():  y_ref = child.remote()  x_ref = ray.get(y_ref)  x = ray.get(x_ref)
@ray.remotedef child(): x_ref = foo.remote() return x_ref
复制代码


child 函数返回的时候,x_ref 的所有者(运行 child 函数的进程)会标记 x_refy_ref包含。所有者会增加“父” worker 到借用 x_ref 的借用者列表里。在这里,协议类似:当所有者对 y_refx_ref 的引用都失效的时候,它发送了一个信息给 “父” worker,要求借用者反馈信息

引用类型描述何时引用更新本地 Python 引用计数本地 ObjectRef 实例数量。等于 worker 本地进程 python 引用计数当 Python ObjectRef分配或者释放的时候递增或者递减提交的任务数量依赖这个 Object 的提交的还未执行的任务数量在 worker 提交任务的时候递增(例如 foo.remote(x_ref))。在任务完成的时候递减。如果 Object 比较小,以任务描述符的形式储存在进程空间里,递增的操作就是 early 模式,就是任务完成的时候就递增。否则需要等待 ray.get 的时候才进行递增计算借用者一组当前借用了 ObjectRef 的 worker ID。借用者是任何一个有 python ObjectRef 但是却不是这个 ObjectRef拥有者的 worker 。如果 worker 把 ObjectRef 发送给另外一个借用者,则这个借用者的 worker 也维护这个列表 worker 在发现 ObjectRef 被另外一个 worker 借用的时候,会增加这个 worker 的 ID 到这个列表。例如,当 一个 actor 任务保存 ObjectRef 在本地的时候,调用这个 actor 的进程会增加这个 actor 的 worker ID 所有者清除的情况:所有者发送一个长运行的异步 RPC 给每个借用的 worker。如果借用者的 ObjectRef 为 0 ,借用者就返回结果。所有者就会清除这个 worker ID 借用者清除的情况:worker 等待所有者的 RPC 返回。一旦 worker 的引用计数(本地 python 计数+提交任务计数)为 0, worker 就去除它本地的借用者的列表返回给所有者。这样,所有者就可以了解并跟踪借用者嵌套计数在范围内或者包含ObjectRefObjectRef的数量在另外一个 object(例如 ray.put或者return x_ref的时候 ) 并且 ObjectRef被储存的时候递增。当ObjectRef超过范围的时候递减线性计数仅仅在重建的时候维护。依赖ObjectRef的任务数量。这个ObjectRef的值是出存在分布式内存里(也可能会因为故障丢失)当提交一个依赖这个 object 的任务的时候递增。当任务返回的ObjectRef超过范围的时候递减,或者任务完成在本进程空间返回值之后递减

不同类型的引用计数和更新情况的总结

在远程函数或者类定义被使用的引用也可能被永久固定(pinned),例如:

x_ref = foo.remote()
复制代码


x_ref = foo.remote()@ray.remotedef capture():  ray.get(x_ref)  # x_ref is captured. It will be pinned as long as the driver lives.
复制代码


在通过 ray.cloudpickle 的方式把 ObjectRef序列化,引用也可能超过界限(out-of-band)。在这种情况下,会把一个永久引用增加到 object 的计数里,防止 object 超过范围。另外一个超过界限的序列化(例如传输一个标示ObjectRef的唯一二进制 ID)不能保证对 worker 有效,因为这种序列化不包含所有者的地址,并且引用不能被所有者追踪

代码参考:

src/ray/core_worker/reference_count.cc

python/ray/includes/object_ref.pxi

java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java


Actor 句柄

类似上面的引用计数协议也用于跟踪 Actor 的生命周期。用一个虚拟对象来表示这个 Actor。这个对象的创建 task 生成了这个虚拟对象的 ID,并且拥有这个虚拟对象。

当一个 Python 的 actor 句柄被释放的时候,针对这个虚拟对象的引用计数递减。当一个 task 提交到 actor 句柄的时候,针对这个虚拟对象的任务计数递增。当引用计数为 0 的时候, 这个虚拟对象的所有者提醒 GCS 可以安全清除这个 Actor 了

代码参考:

src/ray/core_worker/actor_handle.cc

python/ray/actor.py

java/api/src/main/java/io/ray/api/ActorCall.java


和 Python GC 交互

当对象在 python 里面是一个引用循环的一部分,python 的垃圾回收(garbage collector)机制就不允许这个对象在需要的时候及时回收。因为还有没有被垃圾回收机制搜集的 Python ObjectRef 这个对象还在分布式存储存在,Ray 会定期在系统容量接近饱和的时候,触发 gc.collect() 在所有的 python worker 来做垃圾回收。这样确保 Python 引用循环不会导致虚拟的对象存储被占满


对象丢失

小对象: 比较小的对象存储在进程空间,并且和它的所有者共享生命周期。因为借用的对象会提升到共享内存,所以这样的对象检测丢失的机制在下面奋部署存储里描述

如果一个对象在分布式存储丢失: 如果对象是一个非主要的拷贝,这个对象就可以不做操作的丢失。如果是主拷贝,这个对象的所有者会试图在对象目录里面寻找一个可用的位置指定一个新的主拷贝。如果不存在这样的位置,对象的所有者会在解析对象的时候抛出一个系统错误

Ray 也支持对象重构,或者通过重新执行创建这个对象的 Task 的方式来恢复对象。当这个功能被打开的时候,对象的所有者会线性的缓存这个对象:拥有这个对象的所有者会根据 Task 的描述在内存重建这个对象。之后,如果所有的对象拷贝都丢失了,所有者会重新提交这个 Task ,获得对象的返回。任何依赖这个 Task 的对象都会递归式的重构

通过 ray.put 的对象重构在 Ray 是不支持的:因为这些对象的主拷贝永远在所有者的本地共享内存里。因此,如果主拷贝不能独立于所有者进程存在

如果对象所有者在分布式内存丢失了 : 当在解析对象的时候, raylet 会试图定位对象的拷贝。同时,raylet 会周期性的和对象的所有者通信以确认对象还存在。如果所有者中止了,raylet 会在解析对象的时候抛出一个系统错误


对象溢出和持久化

Ray 1.3 支持当对象存储满了的时候将对象溢出到外部存储。默认的情况下,对象溢出到本地文件系统


资源管理和调度

在 Ray 的资源表现为一个 "key" -> float 数量的键值对。为了方便起见, Ray 调度与安生支持 CPU、GPU 和内存资源类型,意思就是 Ray 自动检测每个节点可用的无力资源。然而,用户也可以定义一个自定义资源需求(custom resource requirements),可以采用任何可用的 string 类型,例如 {"something": 1}

分布式调度的目的是为了匹配集群里可用资源的需求。资源需求很难约束。例如,{"COU":1.0,"GPU":1.0} 表示一个需求,需要一个 CPU 和 一个 GPU。Task 仅仅能在一个 >1 CPU 和 >1 GPU 的节点上运行。每个 @ray.remote 函数需要一个 CPU 来执行,一个 actor ,例如 @ray.remote 类,默认需要 {"CPU":0} 服务

还有一些特殊的资源:

  • CPU、GPU 和内存资源在 Ray 启动的时候自动检测

  • 在 Task 设置 GPU 会自动在 worker 设置 CUDA_VISIBLE_DEVICES 环境变量,让任务在特定的 GPU 运行

任务调度(raylet 所有者协议)



依赖解析

Task 调用者在所有的任务参数都创建好之后,向分布式调度提交资源需求。在很多情况下,任务的调用者也是任务参数的所有者。例如,一个 foo.remote(bar.remote())的程序,调用者也同时拥有这个任务,并且在 bar执行完之后才会执行 foo。因为调用者在本地储存 bar 的结果,所有也可以直接从本地,也就是进程存储获取这个结果继续执行

任务的调用者可能会借用任务参数。例如,当任务的调用者获取了一个 ObjectRef类型的反序列化的拷贝作为参数。任务就必须和参数的所有者确定什么时候这个参数会创建。一个借用进程会通过反序列化 ObjectRef 的方式和参数的所有者通信。所有者一旦返回对象创建完成,借用者就会标记这个对象可用。如果所有者返回失败,借用者也会标记这个对象可用,因为对象和所有者生命周期相同

任务有三种类型的参数:简单数值类型、内嵌的对象和非内嵌的对象

值类型:f.remote(2)

内嵌对象:f.remote(small_obj_id)

非内嵌对象:f.remote(large_or_pending_obj_id)

值类型不需要依赖解析

内嵌对象(小于 100k )可以存储在进程空间。所有者能够直接通过任务描述来拷贝获得

非内嵌对象存储在分布式存储。存储的内容包括大的对象本身、以及对象被进程借用的相关对象。在一个 worker 可以被 raylet 授权使用这个任务依赖之前,先要等待这些对象在节点的本地空间可用。这样就能确保执行的 wroker 不会在接收任务的时候被阻塞,因为要等待对象在本地就绪

资源安排

一个所有者第一次调度任务,是通过发送资源请求到它本地的 raylet。raylet 把这些请求放入请求队列。如果请求包含了对资源的要求,raylet 就会把还没有分配给这个所有者的本地 worker 的地址返回给调用者。在调用者和需要分配的 worker 都存在的时候, raylet 会确保没有其他的客户端会请求这个 worker。为了保证公平,一个所有者如果在足够的时间内(几秒内)没有任务执行,就会释放这个 worker



所有者可能会调度很多任务到借用的 worker,只要授权的资源是足够的。因为,借用可以理解为对同类调度请求的一种优化



如果 raylet 针对请求不通过,raylet 就会返回给所有者一个可以重新请求资源的地址。这个被称为 "spillback" (溢出返回)调度。溢出返回调度是可以反复的:和直接提交资源请求不同,每个 raylet 会把下一个 raylet 地址回复给调用者来重试。这样确保所有者的任务地址元数据是一致的



代码参考:

src/ray/core_worker/core_worker.cc

src/ray/common/task/task_spec.h

src/ray/core_worker/transport/direct_task_transport.cc

src/ray/core_worker/transport/dependency_resolver.cc

src/ray/core_worker/task_manager.cc

src/ray/protobuf/common.proto

分布式调度(raylet-raylet 协议)

资源记录(Resource accounting)

每个 raylet 都会跟踪记录在它这个节点的本地资源。当一个资源被申请成功之后,raylet 就会相应减少一个可用资源。一旦资源释放返回了(或者是因为申请者进程中止了),ralet 就会相应增加一个可用资源。这样 raylet 就可以对本地资源有一个可靠的一致性管理

每个 raylet 在集群里也从 GCS 获得其他节点可用资源的信息。用来做分布式调度,例如在集群做负载均衡。为了减少信息搜集和传送的开销,这些信息只是做最终一致性;信息有可能过时,因为信息是通过周期性的广播来传送。每个间隔性的心跳信息(默认 100 毫秒),每个 raylet 传送它当前可用资源到 GCS 服务。GCS 汇总这些心跳信息,然后重新传播到每个 raylet 节点

调度策略

raylet 总是先分配本地资源给请求者。当本地资源不够的时候,有三种可能性:

  1. 根据之前从 GCS 获取的信息,其他节点有足够的资源。raylet 就会把请求转给其他的 raylet

  2. 没有节点有足够的资源。请求任务就会被推到本地队列,等待本地或者其他节点有资源可用

  3. 请求的资源(例如:{"GPU":1})在集群里没有(集群里只有 CPU)。这样的任务就被认为是不能执行的。raylet 就会发送一个警告给相应的 driver 。 raylet 会把这个请求放入队列,等待有可用的资源。例如,集群里新增了一个 GPU 节点

未来,raylet 可能也可以做本地调度策略(例如,把一个已经有本地化参数的任务提交到节点)。对象本地化目前还没有支持

代码参考:

src/ray/raylet/node_manager.cc

src/ray/protobuf/node_manager.proto

弹性伸缩

Ray 的弹性伸缩(也被称为 Cluster Launcher),是指初始化集群节点,和根据需求增加额外的节点的能力

在 Ray 版本 <= 1.0.1 ,弹性伸缩实现了如下能力:

  1. 估计最近分配资源的使用率。例如,假设一个集群有 100/200 CPU ,和 15/25 GPU。集群的使用率就是 max(100/200,15/25)=60%

  2. 如果使用率的估算超过了一个设定值(默认是 80%) ,弹性伸缩就会自动在集群增加节点

  3. 如果一个节点空闲超过一定时间(默认是 5 分钟),这个节点就会从集群移除

在 Ray 版本 1.0.2+,弹性伸缩的控制逻辑是如下这样:

根据当前所有等待的任务、actor 和群组调用需求,来计算节点数量

增加新的节点的情况

如果一个集群所需要的节点数除以当前节点数量超过了 `1+upscaling_speed,这样设置的节点数就会被限制在一个阈值

upscaling_speed 是一个设定值,代表集群的弹性扩充应该要高于集群目前的使用速度。默认设置为 1

当节点通过 request_resources()来申请资源的时候,upscaling_speed的限制就被绕过了

request_resources()是一个函数或者命令行工具,可以来分配资源,例如:

>>> # Request 1000 CPUs.>>> request_resources(num_cpus=1000)>>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task.>>> request_resources(num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}])>>> # Same as requesting num_cpus=3.>>> request_resources(bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}])
复制代码


如果一个节点空闲超过一定时间(默认是 5 分钟),这个节点就会从集群移除

Ray 1.0.2 版本的新的算法的优点是可以根据需求弹性伸缩到确切的节点数量。因为之前的算法只是使用汇总的利用率,所以很难准确的计算需要多少个节点

Ray 也支持多类型节点。集群节点类型的概念包括物理实例类型(例如,AWS p3.8xl GPU 节点 vs m4.16xl CPU 节点),和其他属性(例如 IAM 角色,机器镜像等)。"定制资源"能够为每个节点定制资源,这样 Ray 就可以从系统层面了解每个节点的类型(例如,一个任务需要以特定角色和特定的机器镜像来运行)

代码参考:


发布于: 2021 年 02 月 12 日阅读数: 26
用户头像

lipi

关注

fn 2018.08.28 加入

上海,从事金融行业 喜欢 Rust、React 和 Nodejs ,对分布式计算和数据处理比较熟悉。喜欢和热爱开发的人交流。微信:lipengsh

评论

发布
暂无评论
Ray 1.0 架构解读