写点什么

夜莺二次开发指南 - 任务执行中心

用户头像
qinyening
关注
发布于: 2021 年 01 月 13 日
夜莺二次开发指南-任务执行中心

前言

本系列将对夜莺平台各个模块的主要逻辑进行介绍,方便大家进行二次开发,本篇是系列的第六篇,任务执行中心 (JOB) 模块


首先贴下夜莺的项目地址和架构图,正在使用夜莺的读者欢迎给夜莺加一个 star


本节主要讲解任务执行中心 JOB 模块,关于功能的介绍可以到 bilibili 观看视频,本篇主要讲解 JOB 模块的主要设计思路,以及如何基于 job 平台,开发自己的发布系统。


设计思路

任务执行中心的功能由 agent 和 job 两个模块来实现。任务执行中心可以同时在上万台机器上执行任务,同时还要保证任务可以正常执行,不会重复执行。下面我们从几个问题入手,来帮助大家理解 job 是如何完成上述的工作的。


1.如何支撑千万级别的任务分发和执行?


为了支撑海量的任务,job 模块不能是一个单点,这样这样会遇到性能瓶颈,需要是多个实例,多个实例的话,就涉及到了任务调度,哪些任务由哪个实例去分配。下面介绍一下任务的调度逻辑,首先看一下任务是如何调度到不同的实例上去的,job 模块设计了一个 task_scheduler 的表,有 id 和 scheduler 两个字段,当每一个 job 实例启动的时候,都会去查询 scheduler 为空的数据,然后去接管任务,因为 mysql update 操作是有锁的,所以一个任务可以保证只会被一个 job 实例接管处理,这样就解决了调度问题和扩展问题,当 job 模块遇到性能瓶颈时,直接增加新的 job 实例即可,下面代码是具体的实现。


//获取没被认领的任务func OrphanTaskIds() ([]int64, error) {    var ids []int64    err := DB["job"].Table("task_scheduler").Where("scheduler = ''").Select("id").Find(&ids)    return ids, err}
//尝试去接管任务func TakeOverTask(id int64, pre, current string) (bool, error) { ret, err := DB["job"].Exec("UPDATE task_scheduler SET scheduler=? WHERE id = ? and scheduler = ?", current, id, pre) if err != nil { return false, err }
affected, err := ret.RowsAffected() if err != nil { return false, err }
return affected > 0, nil}
复制代码


2.当某个 job 实例异常挂了之后,刚接管还没有下发的任务会不会永远不会执行,如果处理?


job 模块会定期上报自己的心跳,并且在上报心跳的同时会对所有实例是否上报心跳进行探测,如果某个实例不上报心跳了,之前的任务就被接管,接管的逻辑和获取新任务的逻辑类似,不再赘述


//定时上报心跳func Heartbeat() {    for {        heartbeat()        time.Sleep(time.Second)    }}
//dss, err := models.DeadTasSchedulers()if err != nil { logger.Errorf("cannot get dead task schedulers: %v", err) return}
cnt := len(dss)if cnt == 0 { return}
for i := 0; i < cnt; i++ { ids, err := models.TasksOfScheduler(dss[i]) if err != nil { logger.Errorf("cannot get tasks of scheduler(%s): %v", dss[i], err) return } if len(ids) == 0 { err = models.DelDeadTaskScheduler(dss[i]) if err != nil { logger.Errorf("cannot del dead task scheduler(%s): %v", dss[i], err) return } } takeOverTasks(ident, dss[i], ids)}


复制代码


3..如何保证任务不被重复执行?


当 agent 执行完任务之后,由于网络问题或者调度任务的服务端实例挂了,都会导致服务端感知不到任务已经执行完成了,会再次进行任务的下发,那 agent 如何避免任务被再次执行呢?对应执行完的任务,agent 会把执行结果持久化到磁盘,agent 在执行任务的之前,会检测待执行的任务之前是否执行过,如果之前执行过了,会直接返回结果,不在重复执行,下面是主要的实现逻辑


if resp.AssignTasks != nil {    count := len(resp.AssignTasks)    for i := 0; i < count; i++ {        at := resp.AssignTasks[i]        assigned[at.Id] = struct{}{}        Locals.AssignTask(at)    }}
func (lt *LocalTasksT) AssignTask(at dataobj.AssignTask) { local, found := lt.GetTask(at.Id) //... if local.doneBefore() { local.loadResult() return } //...}
//任务完成之后,落盘记录func persistResult(t *Task) { metadir := config.Config.Job.MetaDir
stdout := path.Join(metadir, fmt.Sprint(t.Id), "stdout") stderr := path.Join(metadir, fmt.Sprint(t.Id), "stderr") doneFlag := path.Join(metadir, fmt.Sprint(t.Id), fmt.Sprintf("%d.done", t.Clock))
file.WriteString(stdout, t.GetStdout()) file.WriteString(stderr, t.GetStderr()) file.WriteString(doneFlag, t.GetStatus())}
复制代码


二次开发 demo

任务执行中心在机器上提供了一种命令通道的能力,基于任务执行中心可以开发适合自己公司的机器初始化、服务发布变更、故障自愈等系统。下面以构建服务发布变更举例,介绍如何基于任务执行中心进行二次开发


如果要做一个变更发布系统,我们通常需要关注一下几点


  1. 需要支持灰度发布

  2. 需要支持随时暂停发布任务

  3. 需要支持快速回滚


这能能力任务执行中心都已经具备了,所以基于任务执行中心开发变更发布系统非常方便,只需要维护一些元信息,然后调用任务管理的接口即可

    //创建任务userLogin.POST("/tasks", taskPost)
//更新任务的状态userLogin.PUT("/task/:id/action", taskActionPut)
//查看任务的执行情况notLogin.GET("/task/:id/stdout", taskStdout)notLogin.GET("/task/:id/stderr", taskStderr)notLogin.GET("/task/:id/state", apiTaskState)notLogin.GET("/task/:id/result", apiTaskResult)
复制代码


拿实现发布系统的上线功能举例

//创建一个 jobMeta 的数据结构type JobMeta struct {    Title     string   `json:"title"`    Account   string   `json:"account"`   //设置执行账号    Batch     int      `json:"batch"`     //并发度控制    Tolerance int      `json:"tolerance"` //暂停点,实现灰度功能要用到这个字段    Timeout   int64    `json:"timeout"`   //部署超时时间    Pause     string   `json:"pause"`     //暂停点,实现灰度功能要用到这个字段    Script    string   `json:"script"`    //发布系统,执行发布任务的脚本    Args      string   `json:"args"`        //执行脚本需要的参数    Action    string   `json:"action"`    //待执行的动作,支持 ignore|pause|kill|redo    Hosts     []string `json:"hosts"`     //待执行的机器}
func main() { http.Start()}
func httpConfig(r *gin.Engine) { demo := r.Group("/api/") { demo.POST("/deploy", createDepoly) } }
func createDepoly(c *gin.Context) { userName := loginUsername(c) var jobMeta JobMeta c.ShouldBindJSON(&jobMeta) data, err := json.Marshal(jobDat) errors.Dangerous(err) httplib.PostJSON("job.addr/api/job-ce/tasks", time.Second*5, data, nil) renderData(c, nil, err)}
复制代码


只要我们准备好发布服务需要执行的脚本文件,然后在每次发布服务的时候,将 jobMeta 根据需求补充完整,这样再实现一个 web 页面,一个简单发布系统就实现了。当然想要投入到生产环境使用,还需要在高可用方面在做一些工作。


本篇是夜莺二次开发系列的最后一篇,希望本系列可以给大家带来一些帮助,后面我们将会开放更多的功能出来,敬请期待吧。

作者简介

秦叶宁 企业级开源运维平台 Nightingale 主程,Urlooker 作者,现负责滴滴私有云运维产品方向的工作,如有运维平台的搭建需求,欢迎与我联系:)


发布于: 2021 年 01 月 13 日阅读数: 374
用户头像

qinyening

关注

-_- 2018.03.16 加入

微信 qinyening2100,欢迎交流:)

评论

发布
暂无评论
夜莺二次开发指南-任务执行中心