写点什么

Kubernetes 任务调用 Job 与 CronJob 及源码分析

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:8579 字

    阅读完需:约 28 分钟

spec.backoffLimit


我们在上面的字段中定义了为 4,表示重试次数为 4。


restartPolicy


在运行过程中,可能发生各种系统问题导致的 Pod 运行失败,如果设置 restartPolicy 为 OnFailure,那么在运行中发生的失败后 Job Controller 会重启 Pod 里面的容器,而不是创建新的 Pod。


还可以设置为 Never,表示容器运行失败之后不会重启。


spec.activeDeadlineSeconds


表示最长运行时间,单位是秒。如:


spec:


backoffLimit: 5


activeDeadlineSeconds: 100


这样设置之后会进入 pastActiveDeadline 进行校验job.Spec.ActiveDeadlineSeconds是不是为空,不是空的话,会比较 Pod 的运行时间 duration 是否大于job.Spec.ActiveDeadlineSeconds设置的值,如果大于,那么会标记 Pod 终止的原因是 DeadlineExceeded。


在 job Controller 的源码中,我们可以看到这部分的逻辑:


job Controller 首先会去校验任务是不是处理次数是不是超过了 BackoffLimit 设置,如果没有超过的话就校验有没有设置 ActiveDeadlineSeconds,如果设置了的话,就校验当前 job 运行时间是否超过了 ActiveDeadlineSeconds 设置的的时间,超过了那么会打上标记,表示这个 job 运行失败。


...


jobHaveNewFailure := failed > job.Status.Failed


exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&


(int32(previousRetry)+1 > *job.Spec.BackoffLimit)


if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {


// check if the number of pod restart exceeds backoff (for restart OnFailure only)


// OR if the number of failed jobs increased since the last syncJob


jobFailed = true


failureReason = "BackoffLimitExceeded"


failureMessage = "Job has reached the specified backoff limit"


} else if pastActiveDeadline(&job) {


jobFailed = true


failureReason = "DeadlineExceeded"


failureMessage = "Job was active longer than specified deadline"


}


...


func pastActiveDeadline(job *batch.Job) bool {


if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {


return false


}


now := metav1.Now()


start := job.Status.StartTime.Time


duration := now.Time.Sub(start)


allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second


return duration >= allowedDuration


}

Job 的并行任务

在 Job 对象中,负责并行控制的参数有两个:


  1. spec.parallelism表示一个 Job 在任意时间最多可以启动多少个 Pod 同时运行;

  2. spec.completions表示 Job 的最小完成数。


举例:


apiVersion: batch/v1


kind: Job


metadata:


name: pi


spec:


parallelism: 2


completions: 4


template:


spec:


containers:


  • name: pi


image: perl


command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]


restartPolicy: Never


backoffLimit: 4


在创建任务之后,我们可以看到最多只会有两个 Pod 同时运行:


$ kubectl get pod


NAME READY STATUS RESTARTS AGE


pi-8fsrn 0/1 ContainerCreating 0 30s


pi-job-67kwg 0/1 Completed 0 14h


pi-wlbm5 0/1 ContainerCreating 0 30s


每当有一个 Pod 完成计算进入 Completed 状态时,就会有一个新的 Pod 被自动创建出来,并且快速地从 Pending 状态进入到 ContainerCreating 状态。


最终我们可以看到 job 的 COMPLETIONS 会标记全部完成:


$ kubectl get job


NAME COMPLETIONS DURATION AGE


pi 4/4 2m52s 2m52s


Job Controller 中会会根据配置的并发数来确认当前处于 active 的 pods 数量是否合理,如果不合理的话则进行调整。


如果处于 active 状态的 pods 数大于 job 设置的并发数?job.Spec.Parallelism,则并发删除多余的 active pods。

Job 源码分析

通过上面的使用例子,我们可以看到 job 的使用时非常的简单的,下面我们通过源码来理解一下这 job 的运行逻辑。


核心源码位置在 job_controller.go 中 Controller 类的 syncJob 方法中:


syncJob 方法很长,我还是想要将这个方法拆开来进行说明。


Controller#syncJob


func (jm *Controller) syncJob(key string) (bool, error) {


...


job := *sharedJob


// if job was finished previously, we don't want to redo the termination


// 如果 job 已经跑完了,那么直接返回,避免重跑


if IsJobFinished(&job) {


return true, nil


}


// retrieve the previous number of retry


// 获取 job 的重试次数


previousRetry := jm.queue.NumRequeues(key)


jobNeedsSync := jm.expectations.SatisfiedExpectations(key)


//获取这个 job 的 pod 列表


pods, err := jm.getPodsForJob(&job)


if err != nil {


return false, err


}


//找到这个 job 中仍然活跃的 pod


activePods := controller.FilterActivePods(pods)


active := int32(len(activePods))


//获取 job 中运行成功的 pod 数和运行失败的 pod 数


succeeded, failed := getStatus(pods)


conditions := len(job.Status.Conditions)


// job first start


//设置 job 的启动时间


if job.Status.StartTime == nil {


now := metav1.Now()


job.Status.StartTime = &now


// enqueue a sync to check if job past ActiveDeadlineSeconds


if job.Spec.ActiveDeadlineSeconds != nil {


klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",


key, *job.Spec.ActiveDeadlineSeconds)


jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)


}


}


...


}


这部分的代码会校验 job 是否已经跑完了,如果跑完了直接返回;


然后获取 job 的重试次数,以及与 job 关联的 pod 列表,并计算出活跃的 pod 数量、运行成功的 pod 数量、以及失败的 pod 数量;


接下来如果 job 是首次启动,那么需要设置 job 的启动时间。


继续:


func (jm *Controller) syncJob(key string) (bool, error) {


...


var manageJobErr error


jobFailed := false


var failureReason string


var failureMessage string


//failed 次数超过了 job.Status.Failed 说明有新的 pod 运行失败了


jobHaveNewFailure := failed > job.Status.Failed


// new failures happen when status does not reflect the failures and active


// is different than parallelism, otherwise the previous controller loop


// failed updating status so even if we pick up failure it is not a new one


//如果有新的 pod 运行失败,并且活跃的 pod 不等于并行 Parallelism 数


//并且重试次数超过了 BackoffLimit


exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&


(int32(previousRetry)+1 > *job.Spec.BackoffLimit)


//重试次数是否超标


if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {


// check if the number of pod restart exceeds backoff (for restart OnFailure only)


// OR if the number of failed jobs increased since the last syncJob


jobFailed = true


failureReason = "BackoffLimitExceeded"


failureMessage = "Job has reached the specified backoff limit"


// job 运行时间是否超过了 ActiveDeadlineSeconds


} else if pastActiveDeadline(&job) {


jobFailed = true


failureReason = "DeadlineExceeded"


failureMessage = "Job was active longer than specified deadline"


}


...


}


这段代码是用来判断 job 是否运行失败,判断依据是 job 重试次数是否超过了 BackoffLimit,以及 job 的运行时间是否超过了设置的 ActiveDeadlineSeconds。


上面这里会获取上一次运行的 Failed 次数和这次的 job 的 failed 次数进行比较,如果 failed 多了表示又产生了新的运行失败的 pod。如果运行失败会标识出失败原因,以及设置 jobFailed 为 true。


在上面的代码中调用了 pastBackoffLimitOnFailure 方法和 pastActiveDeadline 方法,我们分别看一下:


pastBackoffLimitOnFailure


func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {


//如果 RestartPolicy 为 OnFailure,那么直接返回


if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {


return false


}


result := int32(0)


for i := range pods {


po := pods[i]


//如果 pod 状态为 Running 或 Pending


//获取到 pod 对应的重启次数以及 Container 状态,包含 pod 中的 InitContainer


if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {


for j := range po.Status.InitContainerStatuses {


stat := po.Status.InitContainerStatuses[j]


result += stat.RestartCount


}


for j := range po.Status.ContainerStatuses {


stat := po.Status.ContainerStatuses[j]


result += stat.RestartCount


}


}


}


//如果 BackoffLimit 等于,那么只要重启了一次,则返回 true


if *job.Spec.BackoffLimit == 0 {


return result > 0


}


//比较重启次数是否超过了 BackoffLimit


return result >= *job.Spec.BackoffLimit


}


这个方法会校验 job 的 RestartPolicy 策略,不是 OnFailure 才继续往下执行。然后会遍历 pod 列表,将 pod 列表中的重启次数累加并与 BackoffLimit 进行比较,超过了则返回 true。


pastActiveDeadline


func pastActiveDeadline(job *batch.Job) bool {


if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {


return false


}


now := metav1.Now()


start := job.Status.StartTime.Time


duration := now.Time.Sub(start)


allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second


return duration >= allowedDuration


}


这个方法会算出 job 的运行时间 duration,然后和 ActiveDeadlineSeconds 进行比较,如果超过了则返回 true。


我们回到 syncJob 中继续往下:


func (jm *Controller) syncJob(key string) (bool, error) {


...


//job 运行失败


if jobFailed {


errCh := make(chan error, active)


//将 job 里面的 active 的 pod 删除


jm.deleteJobPods(&job, activePods, errCh)


select {


case manageJobErr = <-errCh:


if manageJobErr != nil {


break


}


default:


}


// update status values accordingly


//清空 active 数


failed += active


active = 0


job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))


jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)


} else {


//如果 job 需要同步,并且 job 没有被删除,则调用 manageJob 进行同步工作


if jobNeedsSync && job.DeletionTimestamp == nil {


active, manageJobErr = jm.manageJob(activePods, succeeded, &job)


}


//完成数等于 pod 运行成功的数量


completions := succeeded


complete := false


//如果没有设置 Completions,那么只要有 pod 完成,那么 job 就算完成


if job.Spec.Completions == nil {


if succeeded > 0 && active == 0 {


complete = true


}


} else {


//如果实际完成数大于或等于 Completions


if completions >= *job.Spec.Completions {


complete = true


//如果还有 pod 处于 active 状态,发送 EventTypeWarning 事件


if active > 0 {


jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")


}


//如果实际完成数大于 Completions,发送 EventTypeWarning 事件


if completions > *job.Spec.Completions {


jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")


}


}


}


//job 完成了则更新 job.Status.Conditions 和 job.Status.CompletionTime 字段


if complete {


job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))


now := metav1.Now()


job.Status.CompletionTime = &now


jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")


}


}


...


}


这一段中会根据 jobFailed 的状态进行判断。


如果 jobFailed 为 true 则表示这个 job 运行失败,需要删除这个 job 关联的所有 pod,并且清空 active 数。


如果 jobFailed 为 false 则表示这个 job 处于非 false 状态。如果 job 需要同步,并且 job 没有被删除,则调用 manageJob 进行同步工作;


接下来会对设置的 Completions 进行处理,如果 Completions 没有设置,那么只要有一个 pod 运行完毕,那么这个 pod 就算完成;


如果实际完成的 pod 数量大于 completions 或仍然有 pod 处于 active 中,则发送相应的事件信息。最后更新 job 的状态为完成。


我们接下来一口气看看 manageJob 中这个同步方法里面做了什么,这个方法是 job 管理 pod 运行数量的核心方法:


Controller#manageJob


func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {


...


//如果处于 active 状态的 pods 数大于 job 设置的并发数 job.Spec.Parallelism


if active > parallelism {


//多出的个数


diff := active - parallelism


errCh = make(chan error, diff)


jm.expectations.ExpectDeletions(jobKey, int(diff))


klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)


//pods 排序,以便可以优先删除一些 pod:


// 判断 pod 状态:Not ready < ready


// 是否已经被调度:unscheduled< scheduled


//判断 pod phase :pending < running


sort.Sort(controller.ActivePods(activePods))


active -= diff


wait := sync.WaitGroup{}


wait.Add(int(diff))


for i := int32(0); i < diff; i++ {


//并发删除多余的 active pods


go func(ix int32) {


defer wait.Done()


if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {


// Decrement the expected number of deletes because the informer won't observe this deletion


jm.expectations.DeletionObserved(jobKey)


if !apierrors.IsNotFound(err) {


klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)


activeLock.Lock()


active++


activeLock.Unlock()


errCh <- err


utilruntime.HandleError(err)


}


}


}(i)


}


wait.Wait()


//若处于 active 状态的 pods 数小于 job 设置的并发数,则需要创建出新的 pod


} else if active < parallelism {


wantActive := int32(0)


//如果没有声明 Completions,那么 active 的 pod 应该等于 parallelism,如果有 pod 已经完成了,那么不再创建新的。


if job.Spec.Completions == nil {


if succeeded > 0 {


wantActive = active


} else {


wantActive = parallelism


}


// 如果声明了 Completions,那么需要比较 Completions 和 succeeded


// 如果 wantActive 大于 parallelism,那么需要创建的 Pod 数等于 parallelism


} else {


// Job specifies a specific number of completions. Therefore, number


// active should not ever exceed number of remaining completions.


wantActive = *job.Spec.Completions - succeeded


if wantActive > parallelism {


wantActive = parallelism


}


}


//计算出 diff 数


diff := wantActive - active


if diff < 0 {


utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))


diff = 0


}


//表示已经有足够的 pod,不需要再创建了


if diff == 0 {


return active, nil


}


jm.expectations.ExpectCreations(jobKey, int(diff))


errCh = make(chan error, diff)


klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)


active += diff


wait := sync.WaitGroup{}


//创建的 pod 数依次为 1、2、4、8......,呈指数级增长


for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {


errorCount := len(errCh)


wait.Add(int(batchSize))


for i := int32(0); i < batchSize; i++ {


//并发程创建 pod


go func() {


defer wait.Done()


//创建 pod


err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))


if err != nil {


...


}


//创建失败的处理


if err != nil {


defer utilruntime.HandleError(err)


klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)


jm.expectations.CreationObserved(jobKey)


activeLock.Lock()


active--


activeLo


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


ck.Unlock()


errCh <- err


}


}()


}


wait.Wait()


...


diff -= batchSize


}


}


...


return active, nil


}


这个方法的逻辑十分的清晰,我们下面撸一撸~


这段代码在开始用一个 if 判断来校验 active 的 pod 是否超过了 parallelism,如果超过了需要算出超过了多少,存在 diff 字段中;然后需要删除多余的 pod,不过这个时候有个细节的地方,这里会根据 pod 的状态进行排序,会首先删除一些不是 ready 状态、unscheduled、pending 状态的 pod;


若 active 的 pod 小于 parallelism,那么首先需要判断 Completions,如果没有被设置,并且已经有 pod 运行成功了,那么不需要创建新的 pod,否则还是需要创建 pod 至 parallelism 指定个数;如果设置了 Completions,那么还需要根据 pod 完成的数量来做一个判断需要创建多少新的 pod;


如果需要创建的 pod 数小于 active 的 pod 数,那么直接返回即可;


接下来会在一个 for 循环中循环并发创建 pod,不过创建的数量是依次指数递增,避免一下子创建太多 pod。


定时任务 CronJob



基本使用

我们从一个例子开始,如下:


apiVersion: batch/v1beta1


kind: CronJob


metadata:


name: hello


spec:


schedule: "*/1 * * * *"


jobTemplate:


spec:


template:


spec:


containers:


  • name: hello


image: busybox


args:


  • /bin/sh

  • -c

  • date; echo Hello from the Kubernetes cluster


restartPolicy: OnFailure


这个 CronJob 会每分钟创建一个 Pod:


$ kubectl get pod


NAME READY STATUS RESTARTS AGE


hello-1596406740-tqnlb 0/1 ContainerCreating 0 8s


cronjob 会记录最近的调度时间:


$ kubectl get cronjob hello


NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE


hello */1 * * * * False 1 16s 2m33s


spec.concurrencyPolicy


如果设置的间隔时间太短,那么可能会导致任务还没执行完成又创建了新的 Pod。所以我们可以通过修改spec.concurrencyPolicy来定义处理策略:


  • Allow,这也是默认情况,这意味着这些 Job 可以同时存在;

  • Forbid,这意味着不会创建新的 Pod,该创建周期被跳过;

  • Replace,这意味着新产生的 Job 会替换旧的、没有执行完的 Job。


如果某一次 Job 创建失败,这次创建就会被标记为“miss”。当在指定的时间窗口内,miss 的数目达到 100 时,那么 CronJob 会停止再创建这个 Job。


spec.startingDeadlineSeconds可以指定这个时间窗口。startingDeadlineSeconds=200 意味着过去 200 s 里,如果 miss 的数目达到了 100 次,那么这个 Job 就不会被创建执行了。

cronjob 源码分析

CronJob 的源码在 cronjob_controller.go 中,主要实现是在 Controller 的 syncAll 方法中。


下面我们看看 CronJob 是在源码中如何创建运行的:


Controller#syncAll


func (jm *Controller) syncAll() {


//列出所有的 job


jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {


return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)


}


js := make([]batchv1.Job, 0)


//遍历 jobListFunc 然后将状态正常的 job 放入到 js 集合中


err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {


jobTmp, ok := object.(*batchv1.Job)


if !ok {


return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)


}


js = append(js, *jobTmp)


return nil


})


...


//列出所有的 cronJobs


cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {


return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)


}


//遍历所有的 jobs,根据 ObjectMeta.OwnerReference 字段确定该 job 是否由 cronJob 所创建


//key 为 uid,value 为 job 集合


jobsByCj := groupJobsByParent(js)


klog.V(4).Infof("Found %d groups", len(jobsByCj))


//遍历 cronJobs


err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {


cj, ok := object.(*batchv1beta1.CronJob)


if !ok {


return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)


}


//进行同步


syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)


//清理所有已经完成的 jobs


cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)


return nil


})


if err != nil {


utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))


return


}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Kubernetes任务调用Job与CronJob及源码分析