写点什么

Ray 分布式计算框架详解

用户头像
lipi
关注
发布于: 2020 年 08 月 30 日
Ray 分布式计算框架详解

Ray 是 UC Berkeley RISELab 出品的机器学习分布式框架。UC Berkeley 教授 Ion Stoica 写了一篇文章:The Future of Computing is Distributed。里面详细说了 Ray 产生的原由。总结一下,就是由于 AI 和大数据的快速发展,对于应用和硬件能力的要求提出了更高的挑战。需要有更适合的软件架构来匹配大规模实时的计算需求。


Ion Stoica 同时也是 Spark 产品的公司 Databricks 的创始人,Apache Mesos、Alluxio、Clipper 的项目主导人


Ray 的特点


我们从了解 Ray 到实践上线,大概有 1 年半的时间。为什么会找到 Ray ,主要还是基于高性能计算的需求。我们的场景主要是做投资的实时归因分析,由于涉及的数据量很大,对于计算的要求也很高。还有一个关键的门槛,金融模型大量使用 Pandas 和 Numpy 来做矩阵计算,需要针对 Pandas/Numpy 有更好的支持。实践下来,我觉得 Ray 有如下特点:


  • 分布式异步调用

  • 内存调度

  • Pandas/Numpy 的分布式支持

  • 支持 Python

  • 整体性能出众


存在很多和 Ray 类似的框架,但是如果把范围缩小,针对 python 用户,类似的主要就是 Dask、Mars、Celery 等 。


和 Dask 对比


Dask 是 Anaconda 的产品,背后的主要贡献者是 Matthew Rocklin 。Dask 的 blog 和 Matthew Rocklin 的 Blog 可以经常去看看,更新很频繁,内容很不错。最近 Matthew Rocklin 加入了 Nvidia ,开始做 Dask_cudf,开发基于 GPU 的 Dask,结合 Rapids cudf(基于 GPU 的 Pandas)。最新的 blog 里面,显示他准备建立一个 Dask 的公司,推进 python 的分布式数据平台


Dask 和核心是弥补 python 在数据科学中的不足,主要是性能上。Python 单机的能力不能够支持数据科学中大数据集的快速计算。


Dask 提供了基础的数据结构,底层是分布式计算架构。数据结构包括:Array 、Dataframe。Array 兼容 numpy 的 ndarray,Dataframe 兼容 Pandas 的 Dataframe 。"兼容"是个相对的说法,毕竟 Pandas 和 Numpy 发展多年,本身也在发展,接口非常多。Dask 应该是在兼容这块做的非常好的,但是和 Pandas/Numpy 还是有差异,这一点要注意。


就像刚才提到的,Dask 的目标是为了弥补 Python 在数据科学上的不足。而 Ray 的出发点是为了加速机器学习的调优和训练的速度。Ray 除了基础的计算平台,还包括 Tune (超参数调节) 和 RLlib (增强学习)


因为数据科学和机器学习基础都是 Python 也是核心,所以分布式和对 Python Pandas/Numpy 的支持是这两个框架的基础。但是有一点不一样的,也是我们决定要采用 Ray 的核心原因。Ray 的底层内存数据结构的基础是 Apache Arrow,而 Dask 是 xarray/xray 。xray 是 NumFocus 赞助的开源类似 numpy 的数据结构。Apache Arrow 拥有更好的生态,被大部分的数据处理系统接受,非常有利于和其他系统的融合


Apache Arrow 和 Plasma


Apache Arrow 是列式内存数据结构,已经成为数据处理领域最通用的数据结构。Arrow 最突出的特点就是生态非常丰富和性能出众


Arrow 的背后还有一位 committer 和 co-creator,叫 Wes McKinney 。Wes McKinney 是 pandas 的作者,所以 Arrow 对 Pandas 的支持也非常好


Ray 团队基于 Arrow 开发了一个内存数据服务,叫做 Plasma 。Plasma 在 Linux 共享内存创建了 Arrow 封装的对象,单独作为一个进程运行。其他进程可以通过 Plasma Client Library 来访问这块共享内存里的 Arrow 存储。这个功能是 Ray 团队开发的,贡献给 Arrow 作为 Arrow 生态的一部分。


除了 Plasma ,Ray 团队还贡献了一个叫做 Modin 的功能,就是基于 Ray 的分布式能力,提供了 Pandas 的实现。类似于 Dask 的 Dataframe。这个功能已经从 Ray 独立,成为一个独立的项目


软件架构


Ray 的基础结构可以参考 Paper https://arxiv.org/abs/1712.05889 



经过几个版本的迭代,有些内容做了一些优化,主要的结构还是如上图。GCS 作为集中的服务端,是 Worker 之间传递消息的纽带。每个 Server 都有一个共用的 Object Store,也就是用 Apache Arrow/Plasma 构建的内存数据。 Local Scheduler 是 Server 内部的调度,同时通过 GCS 来和其他 Server 上的 Worker 通信。Object Store 时间也有通信,作用是传递 Worker 之间的数据。


在 Paper 里面描述了一个典型的远程调用流程:


可以看到,GCS 储存了代码、输入参数、返回值。Worker 通过 Local Scheduler 来和 GCS 通信。Local Scheduler 就是 Raylet, 是单机上的基础调度服务。



Object > 100 kb 会通过 Object Store 之间的并行 RPC 来传输,而不通过任务调度 RPC 来实现。Apache Arrow 在 0.15 之后提供了一个 Apache Arrow Flight 的 RPC 框架,0.16 又做了强化。不知道 Ray 的 Object 的并行传递是不是采用 Arrow Flight。下图是一个 任务调度的 RPC 示例图:



以上两个示意图来自于How Ray Uses gRPC and Arrow to outperform gRPC


下面我们详细来了解了一下 Raylet


Raylet


我重新画了一个简单一点的 Worker 和 GCS 的关系图:

Raylet 在中间的作用非常关键,Raylet 包含了几个重要内容:


  • Node Manager

  • Object Manager

  • gcs_client 或者 gcs server


Node Manager 是基于 boost::asio 的异步通信模块,主要是通信的连接和消息处理管理;Object Manager 是 Object Store 的管理;gcs_client 是连接 GCS 客户端。如果设置 RAY_GCS_SERVICE_ENABLED 为 True 的话 ,这个 Server 就是 作为 GCS 启动。


我们先看一下 Raylet 的启动过程:

首先,要做 Raylet 的初始化,这里面包含很多参数,包括 Node Manager 和 gcs client 的初始化。然后 Raylet Start 之后,注册 GCS,准备接收消息。一旦有消息进来,就进入 Node Manager 的 ProcessClientMessage 过程。在解释 ProcessClientMessage 的操作之前,我们需要了解一下 Ray Worker 和 Raylet 的进程/线程和通信的模型


通信模型


Ray 采用的是 Boost::asio 的异步通信模型,这里有一个很丰富全面的关于 asio 的介绍


Asio 采用的是 Proactor 模型。一个操作经过 Initiator 之后分解为 Asynchronous Operation Processor(AOP) 、Asynchronouse Operation(AO) 和 Completion Hanlder(CH) 。AOP 做具体的工作,执行异步操作。执行完成之后,把结果放入 Completion Event Queue(CEQ)。Asynchronous Event Demultiplexer(AED)等待 CEQ ,如果 CEQ 出现完成事件,则返回一个完成事件到 CH


Raylet 启动了一个 main_service , 是 boost::asio::io_service 。io_service 也是 asio 运转的核心组件。前面的 AOP、AED 和 Proactor 都是由 io_service 串联起来的。io_service 内部实现了一个任务队列,队列的任务就是 void(void) 函数


io_service 的接口有 run 、run_one、poll、poll_one、stop、reset 、 dispatch 、post 。run 方式就是轮询执行队列里面的所有任务,无任务执行的时候就 epoll_wait 上阻塞等待



// Initialize the node manager.boost::asio::io_service main_service;main_service.run();
复制代码

Node Manager 在初始化的时候,会按照 num_initial_workers 的数量初始化 worker pool 。然后 Node Manager 会按照 asio 的异步机制,分配任务到这些 worker pool 里面的进程


接下来我们看一下 Raylet 、Worker 和 GCS 的消息传递和调度机制


消息传递和调度


Ray 后面的公司 Anyscale.io 的 blog 有一篇文章,叫做 Fast Scheduling in Ray 0.8 。讲了怎么在 ray 0.8 里面优化调度



Worker 提交 task 到 raylet,raylet 分配 task 到其他 worker。同时 raylet 还需要把 task 、相关 worker 信息提交给 GCS。task 执行的参数和返回都需要通过 Object Store 来获取

接下来,我们看一下详细的消息传递和对应的一些执行过程

先看一下 Submit Task 这个操作: Worker 提交一个 Task ,就调用 SubmitTask 的任务到 Raylet 。Task 在 Raylet 内部有一个 Lineage 的机制。这个也是上面 Anyscale 图里面的 task lineage

我们先了解一下 Task Lineage 的机制


Task Lineage


Task Lineage 里面包含几个概念,Lineage Cache 、Lineage Entry 和 Lineage 。Lineage 是管理 Task 执行的 DAG (有向无环图) ;Lineage Entry 是对 Task 状态的一些管理;Lineage Cache 是对 Task 在本机执行缓存的管理。在上面 Fast Scheduling in Ray 0.8 文章里面,主要就是通过对 Lineage 的优化来提升 Ray 0.8 的调度性能



在 Ray 0.8 里面,把调用其他 Worker 的流程,从 Raylet 到 GCS 然后到 woker ,改为直接查询 Lineage Cache,如果 Worker 曾经调用过,就直接请求对应的 Worker。减少调用路径,提升效率。


回到我们对 Lineage 的分析


Task 在 GCS 里面有几个状态:None、Uncommitted、Committing、Committed 。None 意思是在 Lineage Cache 里面不存在;当任务从 Woker 提交之后,是 uncommited 状态了;当任务发生一些变化,经过一些操作或者重新提交,就是 Committing 状态。意思就是正在进行 Committing,等待返回状态;提交的任务得到了反馈,就是 Committed 状态。但是有一个不同,任务没有删除,当下一个任务还是调度这个 Worker 的时候,就可以直接调用这个 Task Entry 来实现。这就是上面说的优化过程


TaskEntry 保存 Task 的状态和相关的联系。主要包含这么几个内容:


  • GcsStatus:就是上面说的 Task 的状态

  • parent_task_ids_:一个 Set ,保存了 Task 的父任务 ID 列表

  • forwarded_to_:一个 Set,保存了任务明确提交之后提交到的 Node Manager 的 ID 列表


Lineage 维护了两个 map。一个是 Task 和 LineageEntry 的 map;一个是 TaskID 和 TaskID Set 的 Map。第二个的意思就是 Task 和它子 Task 组的映射


LineageCache 是 Task 的 Cache Table 。包含了 Task 的信息和状态。Lineage Cache 的策略是把所有的任务成为 Uncommitted 状态。为了安全起见,只有当 Task 的父任务都删除了,子任务才能删除


Lineage 的细节还很多,而且还处在优化的状态。我们先看看通过一个 Task 提交的过程来看看 Lineage 是怎么运转的


提交任务


Submit Task 之后,先记录增加了一个 Task。然后拿到需要提交的 Task Spec,就是 Task 详细信息。然后提交。Task 有几个状态:


  • Placeable:就绪的状态,可以分配到 Node, 可以是本地或者远端。分配的原则根据资源状况,例如本地的内存、是否超过 Task 最大数量等。如果本地资源不够,就会提交到其他的 Node ,也就是服务器。当然,如果其他 Node 资源也不够,就会继续分配。

  • WaitForActorCreation:这个转改是针对 Actore Task ,代表 Actor 方法等待 Actore 完成返回

  • Waiting:Task 在等待它的参数的依赖关系满足要求。也就是 Task 的参数需要放到 local object store

  • Ready:Task 可以运行,所有的参数已经传输到 local object store 了

  • Running:Task 已经分配冰运行到一个 worker

  • Blocked:Task 暂停。可能是因为 Task 正在等待启动其他 Task 并且等待结果返回

  • Infeasible:Task 所需要的资源所有机器都不满足

  • Swap:两个状态中的一个转换状态。例如一个 Ready 状态的 Task ,提交到了一个 worker,在等待返回的时候。就处在 Swap 状态。如果 Worker 接收了这个 Task,task 状态会变为 Running,否则它就会返回到 Ready 状态


在 design_docs/task_states.rst 文档里面有一个图描述了 Task 的状态变化过程:



在 SubmitTask 最后:


// if the task was forwarded.
if (forwarded) { // Check for local dependencies and enqueue as waiting or ready for dispatch.} else { // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueTasks({task}, TaskState::PLACEABLE); ScheduleTasks(cluster_resource_map_);}
复制代码


如果要提交的 Task 需要 forward (在收到 HandleForwardTask 操作的时候),则进行 Task 如队列操作。入队的时候,如果参数都满足,也就是本地资源足够。Task 就入队列,成为 READY 状态,如果不满足,就是 WAITING 状态。同时改变 Task 状态为 Pending


if (args_ready) {    local_queues_.QueueTasks({task}, TaskState::READY);    DispatchTasks(MakeTasksByClass({task}));  } else {    local_queues_.QueueTasks({task}, TaskState::WAITING);  }
task_dependency_manager_.TaskPending(task);
复制代码

调度策略


在 SubmitTask 之后,如果不是 forward ,则执行两个操作:


local_queues_.QueueTasks({task}, TaskState::PLACEABLE);ScheduleTasks(cluster_resource_map_);
复制代码


第一个是把 Task 放到本地,并且把 Task 状态置为 Placeable;第二是把 Task 在集群进行调度


Ray 针对 Task 有两个 Queue:ReadyQueue、SchedulingQueue 。 ReadyQueue 是已经准备好的 Task 的队列;SchedulingQueue 是已经提交的 Task 的队列。这两个队列用来存储不同状态的 Task,实现上面说的 Task 状态变化过程。


调度任务的步骤是这样:


  1. 先尝试把 Tasks 放在 Local Node

  2. 如果 Local Node 有资源

  3. 如果没有合适的计算资源,就采用硬分配的方式,给 Client 安排计算资源


可以看出,调度就是基于资源的分配。资源包括计算、内存/数据,在 Ray 体现为 Worker 、Task 、Object Store(Arrow)。所以我们需要搞清楚资源,才能更好的理解调度


集群架构


按照上面的描述,Ray 集群有 Worker 、Gcs 和 Raylet 等模块。Worker 是一个执行单元。 Worker 的执行是通过 gRPC 来远程提交的。整个架构有点像 istio 的 service mesh 的结构



对应以上的粗粒度的组件,拆解开来就像下面这样:



这里面有几个关键组件。Raylet 是处理 Worker 和 GCS 的关键连接点,还有处理 Local Worker 之间的调度。Raylet 里面包含 Node Manager,这是处理消息传递和调度的基础模块;还有 Object Manager ,这是处理本机 Arrow 内存读取的组件,相对容易理解;Core worker 组件针对 Python Driver 提供支持,主要是完成 Task 的调度。就是 python 里面使用 ray 时候需要加的 remote 注解。这个是 Ray 的核心。Python Driver 主要是针对 python 提供支持,当然 Ray 也有 Java Driver ,这里没有列出


我们先从 Raylet 看起


Raylet


在 Raylet 初始化的时候,初始化了一个 main_service 。 这是一个boost::asio::io_service 实例。这个在上面的通信模型里面简单描述了一下 asio 的机制。main_service 在 main.cc 启动(main_service.run()),main_service 的引用传递到了 Raylet ,然后 Raylet 应用传递到了 Node Manager


Node Manager(下称 NM)是 Raylet 的一个负责通信的模块,处理 Raylet 和其他分布式节点(服务器)、Worker、Task 分配还有 GCS 的通信


从 Raylet 到 Node Manager 的入口在 HandlerAccept :


ClientHandler<local_stream_protocol> client_handler =        [this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); };    MessageHandler<local_stream_protocol> message_handler =        [this](std::shared_ptr<LocalClientConnection> client, int64_t message_type,               const uint8_t *message) {          node_manager_.ProcessClientMessage(client, message_type, message);        };    // Accept a new local client and dispatch it to the node manager.    auto new_connection = LocalClientConnection::Create(        client_handler, message_handler, std::move(socket_), "worker",        node_manager_message_enum,        static_cast<int64_t>(protocol::MessageType::DisconnectClient));
复制代码


client_handler 是处理连接请求,message_handler 是处理这个 Client 的消息。LocalClientConnection 是一个针对客户端请求到服务端的抽象,主要是基于 asio 机制把读写,和异步读写封装了一下


ProcessNewClient 主要是记录 Client 的一些信息 ProcessClientMessage 就是上面架构图里面的消息处理,对应不同的消息处理流程。可以看一下附录:《Node Manager 处理消息的列表》


这里面最重要的一个,是 SubmitTask。是针对 task 的处理,Task 作为主要任务调度的模块,贯穿 Ray 分布式任务调度的全过程。所以我们有必要从源头来了解和跟踪一下 Task 的发起到完成的整个过程。同时,我们也可以通过这个过程,了解从 Python Driver 到 Core Worker ,然后到 Raylet 的处理过程。


Submit Task


Task 是表示一个任务及其执行的资源等信息。Task 的发起是从 Python Driver


@ray.remotedef borrower(inner_ids):     inner_id = inner_ids[0]     ray.get(foo.remote(inner_id))
inner_id = ray.put(1)outer_id = ray.put([inner_id])res = borrower.remote(outer_id)
复制代码

例如以上的代码,@ray.remote 注解下面的函数,就是一个执行体。对应的是 RemoteFuntion Class 。在 _remote 这一段:


self._pickled_function = pickle.dumps(self._function)
复制代码

_function 序列化为 _pickled_funtion,然后再 hash 为 pickled_function_hash


self._function_descriptor = PythonFunctionDescriptor.from_function(	self._function, self._pickled_function)
def from_function(cls, function, pickled_function): pickled_function_hash = hashlib.sha1(pickled_function).hexdigest()
复制代码

然后就调用了 Core Worker 的 SubmitTask


Status SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,const TaskOptions &task_options, std::vector<ObjectID> *return_ids,int max_retries);
复制代码

在 SubmitTask 里面。生成一个 task id ,然后通过 BuildCommonTaskSpec 函数,把 Task 所有信息封装成一个 TaskSpecification 实例。然后把这个 TaskSpecification 提交到 Task 的任务队列里面。


if (task_options.is_direct_call) {    task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries);    return direct_task_submitter_->SubmitTask(task_spec);  } else {    return local_raylet_client_->SubmitTask(task_spec);  }
复制代码

这里面 Task Manager 是对 Task 管理的一个封装。包含了对应的内存 in_memory_store_、引用计数 reference_counter_(主要用作对 ObjectID 的管理,用在 GC 上),任务的状态和 Retry 次数等。这里面 task_manager->AddPendingTask ,主要是针对 Task 提交前做了记录,记录 Task ID,为了 reference_manager_ 之后的 GC 用


is_direct_call 是针对 Actor worker 的直接调用。local_raylet_client_ 就是上面提到的 Raylet Client,Core worker 把接收到的 remote 调用提交到 Raylet ,Raylet 来做调度。就是下图红色的那一段:



在 Raylet 的 Node Manager 接收到 SubmitTask 消息,按照 Task 的依赖次序来提交 Task。意思就是,如果一个 Task B 依赖于另外一个 Task A,那就先提交 Task A


如果任务是提交到另外一个 Node(这个取决于 Lineage 调度,forwarded 是 True,forwarded 是 SubmitTask 的最后一个参数),则在 Lineage Cache 增加一个 UncommittedLineage


lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage)
复制代码

这里面第二个参数,是 SubmitTask 的时候,生成的一个 Lineage 的实例。


如果任务是提交到本地(forwarded 是 False,默认),则异步 commit task 到 GCS:


lineage_cache_.CommitTask(task)
复制代码

调用的是 Lineage 的 CommitTask,然后调用 Lineage 的 FlushTask,接着调用 gcs_client_->Tasks().AsyncAdd 把 Task 状态提交到 GCS,然后根据返回状态更新本地 Lineage 的 Task 状态为 GcsStatus::COMMITTED。同时 Evict Task 和 UnSubscribeTask


在处理好 Lineage Cache 之后,SubmitTask 在本地的 Task Queue 里面增加这个 Task,然后调用 ScheduleTasks 来调度()


整个过程流程图如下:



在 DispatchTasks 里面,按照 class order 分配。避免其中一个任务执行的时候卡住,导致 Ray 启动多个 worker 来执行。这个问题在 #3644 有描述。


参考文档

Ray Paper



发布于: 2020 年 08 月 30 日阅读数: 2223
用户头像

lipi

关注

fn 2018.08.28 加入

上海,从事金融行业 喜欢 Rust、React 和 Nodejs ,对分布式计算和数据处理比较熟悉。喜欢和热爱开发的人交流。微信:lipengsh

评论 (2 条评论)

发布
用户头像
大佬可以加个微信吗
2020 年 09 月 02 日 13:47
回复
好啊,lipengsh
2020 年 09 月 02 日 15:36
回复
没有更多了
Ray 分布式计算框架详解