[KubeFlow] MPI-Operator 深度解读
MPI
MPI(Message Passing Interface,消息传递接口)是一种消息传递编程模型。消息传递指用户必须通过显式地发送和接收消息来实现处理器间的数据交换。在这种并行编程中,每个控制流均有自己独立的地址空间,不同的控制流之间不能直接访问彼此的地址空间,必须通过显式的消息传递来实现。这种编程方式是大规模并行处理机(MPP)和机群(Cluster)采用的主要编程方式。由于消息传递程序设计要求用户很好地分解问题,组织不同控制流间的数据交换,并行计算粒度大,特别适合于大规模可扩展并行算法。
MPI是基于进程的并行环境。进程拥有独立的虚拟地址空间和处理器调度,并且执行相互独立。MPI设计为支持通过网络连接的机群系统,且通过消息传递来实现通信,消息传递是MPI的最基本特色。
Open-MPI
OpenMPI[1]是一种高性能消息传递库,最初是作为融合的技术和资源从其他几个项目(FT-MPI, LA-MPI, LAM/MPI, 以及 PACX-MPI),它是MPI-2标准的一个开源实现,由一些科研机构和企业一起开发和维护。因此,OpenMPI能够从高性能社区中获得专业技术、工业技术和资源支持,来创建最好的MPI库。OpenMPI提供给系统和软件供应商、程序开发者和研究人员很多便利。易于使用,并运行本身在各种各样的操作系统,网络互连,以及一批/调度系统。
MPICH
MPICH是MPI标准的一种最重要的实现。MPICH的开发与MPI规范的制订是同步进行的,因此MPICH最能反映MPI的变化和发展。 MPICH的开发主要是由Argonne National Laboratory和Mississippi State University共同完成的,在这一过程中IBM也做出了自己的贡献,但是MPI规范的标准化工作是由MPI论坛完成的。MPICH是MPI最流行的非专利实现,由Argonne国家实验室和密西西比州立大学联合开发,具有更好的可移植性,现阶段多流行的是MPICH2.
MPI-Operator
在很多场景的训练中,用户可以根据自己的选择,使用不同的MPI实现。在mpi-operator中,只是针对open-mpi做了特定的处理,因此接下来我们也会针对open-mpi多机训练,以及如何将其运用到Kubernetes中进行说明。
Open-MPI与多机通信
一般在AI训练过程中,单机无法解决的问题,就通过多机并行来解决,多机通信我们就可以考虑使用Open-MPI来进行,一般我们会给每一张卡分配一个进程,然后多机多线程做数据通信,典型的通信方式比如Ring-AllReduce,可以参考下图的样子:
这是一种简单高效的通信方式,当然现在也有了众多变种算法。
当然为了实现这种多机通信,可能就会有如下问题,比如:
这个多台机器Open-MPI是如何发现并建立连接的呢?
多机多卡在训练过程中,传输环如何建立,这个也是决定了训练效率,那么Open-MPI如何去做呢?
针对上面的两个问题,我们分别说明一下:
在Open-MPI启动的时候,可以指定
--hostfile
或者--host
去指定运行要运行任务的IP或Hostname,这样Open-MPI就会试图通过ssh免秘钥的方式试图去链接对方机器,并执行一系列命令,主要是为了同步环境变量、当前路径以及下发启动命令;当然用户也可以通过其他方式给远程机器下发命令,这个可以通过环境变量
OMPI_MCA_plm_rsh_agent
指定;当所有的机器建立好连接了,准备开始计算了,那么为了能够最高效的去通信,Open-MPI中集成了组件——hwloc。该组件主要是为了单机硬件资源拓扑构建,进而构建最短路径通信;
数据通信方式就是如上图所示方式;
当某一个进程出现了问题,那么整个多机程序就会终止,没有容错性;
代码
Task is cheap, show me the code.
言归正传,说完上面的场景和特性了,那么我们来看下如何实现的。
目前在mpi-operator中主要的版本有两个:
v1alpha1
v1alpha2
其中v1alpha2用的人比较多,我们主要以他为例来介绍如何操作。
因为Kubernetes现有的资源类型无法满足我们的需求,因此需要通过Custom Resource Definition的机制进行扩展,比如mpi-operator里面扩展出来的新CRD,名为MPIJob
,他的具体定义可以在这里找到:mpijob-v1alpha2-crd.yaml.
新CRD的定义
简单介绍下该新CRD Spec的组成:
launcher:目前只是一个,只运行启动mpijob的pod,不运行workload;
worker:可以是一个也可以是多个,真正运行workload 的Pod;
slotsPerWorker:每个worker运行的slots数目;
backoffLimit:最多重试次数;
cleanPodPolicy:任务结束时,清除Pod策略;
runPolicy:多机任务运行策略;
实现
有了上面的定义,就可以具体执行。一般来说新的CRD都是无法复用Kubernetes现有资源类型的情况,那么就会通过operator进行转换,转换成Kubernetes可以识别的资源类型。
比如上面的Launcher
会被转换成Kubernetes中的job资源类型,worker会被转换成Kubernetes中的Statefulset,进而通过informers的机制来监控Kubernetes中的Job和Statefulset这两个资源更新MPIJob的资源状态,下面我们以两个典型的操作来介绍如何执行的:
MPIJob的创建
MPIJob的定义前文已经介绍过了,那么当用户创建了一个MPIJob,其中包含一个Launcher,2个Worker这样的配置,进行多机训练时,当是如何进行的呢?下面依次介绍:
与一般的controller写法相同,监听MPIJob创建,并将其放入队列中;
多线程从队列中去处新的mpijob,进行处理,判断launcher和worker是否存在,如果不存在就进行创建,具体可以参考这个函数;
创建launcher和worker的同时,会在launcher job创建时添加一个额外的init container,这个init container主要的工作就是监控所有的worker都已经就位了,然后执行执行后面launcher job里面定义的命令;
除此之外,为了能够帮用户减少一些额外的配置,基于worker pod的名字,会将其加入到一个configmap中,并mount到每个pod中,这样通过环境变量将hostfile设置为这个mount的configmap路径,就可以发现多机程序,进而去链接了;
前面也介绍过rsh agent,默认是用sshd,这个要设置面秘钥登录,设置起来会稍显麻烦,那么在Kubernetes中运行有没有更简单的办法?答案是有的。我们通过kubectl命令来达到同样的效果,参见此处代码。在这里会创建一个可执行程序,然后去通知worker pod去执行相应的命令等操作;
至此,mpijob就被转换成Kubernetes可以识别的类型,并开始运行了。
MPIJob的终止 终止分为两种类型,分别是正确,或者是出错了。
针对正常终止,Launcher Job的状态会变成Completed状态,mpi-operator会发现监听的job状态变化,进而去找到对应的mpijob,并更新其状态,代码在这里;
针对异常终止,某一个worker或者launcher出现了错误,那么会进行重试(笔者注:这里面的重试其实没有意义),如果超过了
backoffLimit
,那么就会认为是failed状态,执行上面步骤中同样的函数,并更新mpijob状态为failed;当mpijob终止了,就会通过
cleanPodPolicy
去删除没用的pod;
至此就是比较典型的mpi-operator的工作原理了,但是实际上还有诸多问题,已经在计划中修复了。
展望
目前在社区正在讨论v1版本的规划,希望能够尽快提升质量,规划新的API定义,主要的点罗列一下:
去掉
backoffLimit
;将launcher和worker转换为Pod,并自己维护其状态,PR已经就位;
支持抢占,被抢占的任务,可以重新进入队列;
整体质量,测试;
launcher可以运行workload;
目前我们正在与社区积极合作,共同推进项目的逐渐稳定。
版权声明: 本文为 InfoQ 作者【薛磊】的原创文章。
原文链接:【http://xie.infoq.cn/article/ba4915b878b4ddfc348761bc4】。文章转载请联系作者。
评论