写点什么

Kubernetes 原生 CI/CD 构建框架 Argo 详解!

用户头像
火山引擎
关注
发布于: 2021 年 02 月 05 日
Kubernetes 原生 CI/CD 构建框架 Argo 详解!

作者:FogDong(字节跳动火山引擎

本系列第二篇已发布:Kubernetes 原生 CI/CD 构建框架 Tekton 详解!

什么是流水线?

在计算机中,流水线是把一个重复的过程分解为若干个子过程,使每个子过程可以与其他子过程并行进行的技术,也叫 Pipeline。由于这种工作方式与工厂中的生产流水线十分相似, 因此也被称为流水线技术。从本质上讲,流水线技术是一种时间并行技术。


以我们最熟悉的“构建镜像”过程为例:如下图,在每一次构建镜像中,我们都需要首先拉下代码仓库中的代码,进行代码构建,接着打出镜像,推往镜像仓库。每一次代码更改过后,这一过程都是不变的。使用流水线工具可以极大的提升这一过程的效率,只需要进行简单的配置便可以轻松的完成重复性的工作。这样的过程也被称之为 CI。

上图流程中使用的是 Jenkins。Jenkins 作为老牌流水线框架被大家所熟知。在云原生时代,Jenkins 推出了 Jenkins X 作为基于 Kubernetes 的新一代流水线,另外云原生时代还诞生了两大流水线框架—— Argo 和 Tekton。本文就详细介绍了 Argo 的相关内容。

Argo

Argo Workflows 是一个开源的容器原生的工作流引擎,用于在 Kubernetes 上编排并行作业。Argo Workflows 实现为 Kubernetes CRD。

Quick Start

kubectl create ns argokubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/stable/manifests/quick-start-postgres.yaml
复制代码

Argo 基于 Kubernetes,可以直接使用 kubectl 安装,安装的组件主要包括了一些 CRD 以及对应的 controller 和一个 server。


注意,上述安装只会执行同 namespace 内的 Workflow,cluster install 详见 文档

三级定义

要了解 Argo 定义的 CRD,先从其中的三级定义入手。概念上的从大到小分别为 WorkflowTemplate,Workflow,template。这些资源的命名有些相似,所以会稍微有些迷惑性。

Template

从最简单的 template 说起,一个 template 有多种类型,分别为 container,script,dag,steps,resource 以及 suspend。对于 template,我们可以简单的将其理解为一个 Pod ——container/script/resource 类型的 template 都会去实际控制一个 Pod。而 dag/steps 类型的 template 则是由多个基础类型的 template (container/script/resource)组成的。

  • container:最常见的模板类型,与 Kubernetes container spec 保持一致。

  • script:该类型基于 Container,支持用户在 template 定义一段脚本,另有一个 Source 字段来表示脚本的运行环境。

  • resource:该类型支持我们在 template 中对 kubernetes 的资源进行操作,有一个 action 字段可以指定操作类型,如 create, apply, delete 等,并且支持设定相关的成功与失败条件用于判断该 template 的成功与失败。

  • suspend:Suspend template 将在一段时间内或在手动恢复执行之前暂停执行。可以从 CLI (使用 argo resume)、API 或 UI 恢复执行。

  • steps:Steps Template 允许用户以一系列步骤定义任务。在 Steps 中,[--] 代表顺序执行,[-] 代表并行执行。

  • dag:DAG template 允许用户将任务定义为带依赖的有向无环图。在 DAG 中,通过 dependencies 设置在特定任务开始之前必须完成的其他任务。没有任何依赖项的任务将立即运行。有关 DAG 的详细逻辑可见源码

Workflow

在一个 Workflow 中,其 spec 中有一个名为 templates 的字段,在其中至少需要一个 template 作为其组成的任务。


一个最简单的 hello world 例子如下:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: hello-world-  labels:    workflows.argoproj.io/archive-strategy: "false"spec:  entrypoint: whalesay  templates:  - name: whalesay    container:      image: docker/whalesay:latest      command: [cowsay]      args: ["hello world"]
复制代码

在这个例子中,该 Workflow 的 templates 字段中指定了一个类型为 container 的 template,使用了 whalesay 镜像。


接着,来看一个稍微复杂一点的 workflow:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: steps-spec:  entrypoint: hello-hello-hello
# 在 templates 中有两个 template,一个为 hello-hello-hello,一个为 whalesay templates: - name: hello-hello-hello # Instead of just running a container # This template has a sequence of steps steps: # 该 template 的类型是 steps - - name: hello1 # 在 steps 类型中,[--] 代表顺序执行,[-] 代表并行执行 template: whalesay # 这里引用了下面的 template arguments: parameters: - name: message value: "hello1" - - name: hello2a # 两个短杠 [--] => 顺序执行 template: whalesay arguments: parameters: - name: message value: "hello2a" - name: hello2b # 一个短杠 [-] => 并行执行 template: whalesay arguments: parameters: - name: message value: "hello2b"### 第二个 template - name: whalesay inputs: parameters: - name: message container: image: docker/whalesay command: [cowsay] args: ["{{inputs.parameters.message}}"]
复制代码

WorkflowTemplate

WorkflowTemplate 相当于是 Workflow 的模板库,和 Workflow 一样,也由 template 组成。用户在创建完 WorkflowTemplate 后,可以通过直接提交它们来执行 Workflow。

apiVersion: argoproj.io/v1alpha1kind: WorkflowTemplatemetadata:  name: workflow-template-submittablespec:  entrypoint: whalesay-template      arguments:    parameters:      - name: message        value: hello world  templates:    - name: whalesay-template      inputs:        parameters:          - name: message      container:        image: docker/whalesay        command: [cowsay]        args: ["{{inputs.parameters.message}}"]
复制代码

Workflow Overview

在了解了 Argo 的三级定义后,我们首先来深入一下 Argo 中最为关键的定义,Workflow。Workflow 是 Argo 中最重要的资源,有两个重要的功能:

  • 它定义了要执行的工作流。

  • 它存储了工作流的状态。


由于这些双重职责,Workflow 应该被视为一个 Active 的对象。它不仅是一个静态定义,也是是上述定义的一个“实例”。


可以看到 Workflow Template 的定义与 Workflow 几乎一致,除了类型不同。正因为 Workflow 既可以是一个定义也可以是一个实例,所以才需要 WorkflowTemplate 作为 Workflow 的模板,WorkflowTemplate 在定义后可以通过提交(Submit)来创建一个 Workflow。


而 Workflow 由一个 entrypoint 及一系列 template 组成,entrypoint 定义了这个 workflow 执行的入口,而 template 会实际去执行一个 Pod,其中,用户定义的内容会在 Pod 中以 Main Container 体现。此外,还有两个 Sidecar 来辅助运行。

Sidecar

在 Argo 中,这些 Sidecar 的镜像都是 argoexec。Argo 通过这个 executor 来完成一些流程控制。

Init

当用户的 template 中需要使用到 inputs 中的 artifact 或者是 script 类型时(script 类型需要注入脚本),Argo 都会为这个 pod 加上一个 Init Container —— 其镜像为 argoexec,而命令是 argoexec init。


在这个 Init Container 中,主要做的工作便是加载 artifact:

func loadArtifacts() error {        wfExecutor := initExecutor()        defer wfExecutor.HandleError()        defer stats.LogStats()
// Download input artifacts err := wfExecutor.StageFiles() if err != nil { wfExecutor.AddError(err) return err } err = wfExecutor.LoadArtifacts() if err != nil { wfExecutor.AddError(err) return err } return nil}
复制代码

Wait

除了 Resource 类型外的 template,Argo 都会注入一个 Wait Container,用于等待 Main Container 的完成并结束所有 Sidecar。这个 Wait Container 的镜像同样为 argoexec,而命令是 argoexec wait。(Resource 类型的不需要是因为 Resource 类型的 template 直接使用 argoexec 作为 Main Container 运行)

func waitContainer() error {        wfExecutor := initExecutor()        defer wfExecutor.HandleError()        defer stats.LogStats()        stats.StartStatsTicker(5 * time.Minute)
defer func() { // Killing sidecar containers err := wfExecutor.KillSidecars() if err != nil { log.Errorf("Failed to kill sidecars: %s", err.Error()) } }()
// Wait for main container to complete waitErr := wfExecutor.Wait() if waitErr != nil { wfExecutor.AddError(waitErr) // do not return here so we can still try to kill sidecars & save outputs }
// Capture output script result err := wfExecutor.CaptureScriptResult() if err != nil { wfExecutor.AddError(err) return err } // Capture output script exit code err = wfExecutor.CaptureScriptExitCode() if err != nil { wfExecutor.AddError(err) return err } // Saving logs logArt, err := wfExecutor.SaveLogs() if err != nil { wfExecutor.AddError(err) return err } // Saving output parameters err = wfExecutor.SaveParameters() if err != nil { wfExecutor.AddError(err) return err } // Saving output artifacts err = wfExecutor.SaveArtifacts() if err != nil { wfExecutor.AddError(err) return err } err = wfExecutor.AnnotateOutputs(logArt) if err != nil { wfExecutor.AddError(err) return err }
// To prevent the workflow step from completing successfully, return the error occurred during wait. if waitErr != nil { return waitErr }
return nil}
复制代码

Inputs and Outputs

在运行 Workflow 时,一个非常常见的场景是输出产物的传递。通常,一个 Step 的输出产物可以用作后续步骤的输入产物。


在 Argo 中,产物可以通过 Artifact 或是 Parameter 传递。

Artifact

要使用 Argo 的 Artifact,首先必须配置和使用 Artifact 存储仓库。具体的配置方式可以通过修改存有 Artifact Repository 信息的默认 Config Map 或者在 Workflow 中显示指定,详见 配置文档,在此不做赘述。 下表为 Argo 支持的仓库类型。

一个简单的使用了 Artifact 的例子如下:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: artifact-passing-spec:  entrypoint: artifact-example  templates:  - name: artifact-example    steps:    - - name: generate-artifact        template: whalesay    - - name: consume-artifact        template: print-message        arguments:          artifacts:          # bind message to the hello-art artifact          # generated by the generate-artifact step          - name: message            from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay container: image: docker/whalesay:latest command: [sh, -c] args: ["cowsay hello world | tee /tmp/hello_world.txt"] outputs: artifacts: # generate hello-art artifact from /tmp/hello_world.txt # artifacts can be directories as well as files - name: hello-art path: /tmp/hello_world.txt
- name: print-message inputs: artifacts: # unpack the message input artifact # and put it at /tmp/message - name: message path: /tmp/message container: image: alpine:latest command: [sh, -c] args: ["cat /tmp/message"]
复制代码

默认情况下,Artifact 被打包为 tar 包和 gzip 包。也可以使用 archive 字段指定存档策略。


在上面的例子里,名为 whalesay 的 template 使用 cowsay 命令生成一个名为 /tmp/hello-world.txt 的文件。然后将该文件作为一个名为 hello-art 的 Artifact 输出。名为 print-message 的 template 接受一个名为 message 的输入 Artifact,在 /tmp/message 的路径上解包它,然后使用 cat 命令打印 /tmp/message 的内容。


在前面的 Sidecar 介绍中提到过,Init Container 主要用于拉取 Artifact 产物。这些 Sidecar 正是产物传递的关键。接着,我们通过介绍另一种产物传递的方式来体验 Argo 中传递产物的关键。

Scripts

先来看一个简单的例子:

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: scripts-bash-spec:  entrypoint: bash-script-example  templates:  - name: bash-script-example    steps:    - - name: generate        template: gen-random-int-bash    - - name: print        template: print-message        arguments:          parameters:          - name: message            value: "{{steps.generate.outputs.result}}"  # The result of the here-script
- name: gen-random-int-bash script: image: debian:9.4 command: [bash] source: | # Contents of the here-script cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'
- name: gen-random-int-python script: image: python:alpine3.6 command: [python] source: | import random i = random.randint(1, 100) print(i)
- name: print-message inputs: parameters: - name: message container: image: alpine:latest command: [sh, -c] args: ["echo result was: {{inputs.parameters.message}}"]
复制代码

在上面的例子中,有两个类型为 script 的 template,script 允许使用 source 规范脚本主体。这将创建一个包含脚本主体的临时文件,然后将临时文件的名称作为最后一个参数传递给 command(执行脚本主体的解释器),这样便可以方便的执行不同类型的脚本(bash、python、js etc)。


Script template 会将脚本的标准输出分配给一个名为 result 的特殊输出参数从而被其他 template 调用。在这里,通过 {{steps.generate.outputs.result}} 即可获取到名为 generate 的 template 的脚本输出。


{{xxx}} 是 Argo 固定的变量替换格式:

关于变量的格式详见 文档

关于变量替换的逻辑详见 源码


那么,容器内部应该如何获取这个脚本输出呢?


还是回到 Sidecar,在 Wait Container 中,有这样一段逻辑:

// CaptureScriptResult will add the stdout of a script template as output resultfunc (we *WorkflowExecutor) CaptureScriptResult() error {                ...
log.Infof("Capturing script output") mainContainerID, err := we.GetMainContainerID() if err != nil { return err } reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false) if err != nil { return err } defer func() { _ = reader.Close() }() bytes, err := ioutil.ReadAll(reader) if err != nil { return errors.InternalWrapError(err) } out := string(bytes) // Trims off a single newline for user convenience outputLen := len(out) if outputLen > 0 && out[outputLen-1] == '\n' { out = out[0 : outputLen-1] }
const maxAnnotationSize int = 256 * (1 << 10) // 256 kB // A character in a string is a byte if len(out) > maxAnnotationSize { log.Warnf("Output is larger than the maximum allowed size of 256 kB, only the last 256 kB were saved") out = out[len(out)-maxAnnotationSize:] }
we.Template.Outputs.Result = &out return nil}
复制代码

再来看看这个 Wait Container 的 Volume Mount 情况:

volumeMounts:    - mountPath: /argo/podmetadata      name: podmetadata    - mountPath: /var/run/docker.sock      name: docker-sock      readOnly: true    - mountPath: /argo/secret/my-minio-cred      name: my-minio-cred      readOnly: true    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount      name: default-token-b5grl      readOnly: true
复制代码

现在就十分明确了,Wait Container 通过挂载 docker.sock 以及 service account,获取到 Main Container 中的输出结果,并保存到 Workflow 中。当然,也因为 Workflow 中保存了大量的信息,当一个 Workflow 的 Step 过多时,整个 Workflow 的结构会过于庞大。

Parameter

Parameter 提供了一种通用机制,可以将步骤的结果用作参数。Parameter 的工作原理与脚本结果类似,除了输出参数的值会被设置为生成文件的内容,而不是 stdout 的内容。如:

  - name: whalesay    container:      image: docker/whalesay:latest      command: [sh, -c]      args: ["echo -n hello world > /tmp/hello_world.txt"]  # generate the content of hello_world.txt    outputs:      parameters:      - name: hello-param       # name of output parameter        valueFrom:          path: /tmp/hello_world.txt    # set the value of hello-param to the contents of this hello-world.txt
复制代码

Volume

这并不是 Argo 处理产物传递的一种标准方式,但是通过共享存储,显然我们也能达到共通产物的结果。当然,若使用 Volume,我们则无需借助 Inputs 和 Outputs。


在 Workflow 的 Spec 中,我们可以定义一个 Volume 模板:


apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: volumes-pvc-spec:  entrypoint: volumes-pvc-example  volumeClaimTemplates:                 # define volume, same syntax as k8s Pod spec  - metadata:      name: workdir                     # name of volume claim    spec:      accessModes: [ "ReadWriteOnce" ]      resources:        requests:          storage: 1Gi                  # Gi => 1024 * 1024 * 1024
复制代码

并在其他的 template 中 mount 该 volume:

  - name: whalesay    container:      image: docker/whalesay:latest      command: [sh, -c]      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]      # Mount workdir volume at /mnt/vol before invoking docker/whalesay      volumeMounts:                     # same syntax as k8s Pod spec      - name: workdir        mountPath: /mnt/vol
复制代码

其他流程控制功能

循环

在编写 Workflow 时,能够循环迭代一组输入通常是非常有用的,如下例所示:

  templates:  - name: loop-example    steps:    - - name: print-message        template: whalesay        arguments:          parameters:          - name: message            value: "{{item}}"        withItems:              # invoke whalesay once for each item in parallel        - hello world           # item 1        - goodbye world         # item 2
复制代码

在源码实现中,将会去判断 withItems,如果存在,则对其中的每个元素进行一次 step 的扩展。

// expandStepGroup looks at each step in a collection of parallel steps, and expands all steps using withItems/withParamfunc (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.WorkflowStep, stepsCtx *stepsContext) ([]wfv1.WorkflowStep, error) {        newStepGroup := make([]wfv1.WorkflowStep, 0)        for _, step := range stepGroup {                if !step.ShouldExpand() {                        newStepGroup = append(newStepGroup, step)                        continue                }                expandedStep, err := woc.expandStep(step)                if err != nil {                        return nil, err                }                if len(expandedStep) == 0 {                        // Empty list                        childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)                        if woc.wf.GetNodeByName(childNodeName) == nil {                                stepTemplateScope := stepsCtx.tmplCtx.GetTemplateScope()                                skipReason := "Skipped, empty params"                                woc.log.Infof("Skipping %s: %s", childNodeName, skipReason)                                woc.initializeNode(childNodeName, wfv1.NodeTypeSkipped, stepTemplateScope, &step, stepsCtx.boundaryID, wfv1.NodeSkipped, skipReason)                                woc.addChildNode(sgNodeName, childNodeName)                        }                }                newStepGroup = append(newStepGroup, expandedStep...)        }        return newStepGroup, nil}
复制代码

条件判断

通过 when 关键字指定:

  templates:  - name: coinflip    steps:    # flip a coin    - - name: flip-coin        template: flip-coin    # evaluate the result in parallel    - - name: heads        template: heads                 # call heads template if "heads"        when: "{{steps.flip-coin.outputs.result}} == heads"      - name: tails        template: tails                 # call tails template if "tails"        when: "{{steps.flip-coin.outputs.result}} == tails"
复制代码

错误重尝

  templates:  - name: retry-backoff    retryStrategy:      limit: 10      retryPolicy: "Always"      backoff:        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"        factor: 2        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
复制代码

递归

Template 可以递归地相互调用,这是一个非常实用的功能。例如在机器学习场景中:可以设定准确率必需满足一个值,否则就持续进行训练。在下面这个抛硬币例子中,我们可以持续抛硬币,直到出现正面才结束整个工作流。

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata:  generateName: coinflip-recursive-spec:  entrypoint: coinflip  templates:  - name: coinflip    steps:    # flip a coin    - - name: flip-coin        template: flip-coin    # evaluate the result in parallel    - - name: heads        template: heads                 # call heads template if "heads"        when: "{{steps.flip-coin.outputs.result}} == heads"      - name: tails                     # keep flipping coins if "tails"        template: coinflip        when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin script: image: python:alpine3.6 command: [python] source: | import random result = "heads" if random.randint(0,1) == 0 else "tails" print(result)
- name: heads container: image: alpine:3.6 command: [sh, -c] args: ["echo \"it was heads\""]
复制代码

以下是两次执行的结果,第一次执行直接抛到正面,结束流程;第二次重复三次后才抛到正面,结束流程。

argo get coinflip-recursive-tzcb5
STEP PODNAME MESSAGE ✔ coinflip-recursive-vhph5 ├───✔ flip-coin coinflip-recursive-vhph5-2123890397 └─┬─✔ heads coinflip-recursive-vhph5-128690560 └─○ tails
STEP PODNAME MESSAGE ✔ coinflip-recursive-tzcb5 ├───✔ flip-coin coinflip-recursive-tzcb5-322836820 └─┬─○ heads └─✔ tails ├───✔ flip-coin coinflip-recursive-tzcb5-1863890320 └─┬─○ heads └─✔ tails ├───✔ flip-coin coinflip-recursive-tzcb5-1768147140 └─┬─○ heads └─✔ tails ├───✔ flip-coin coinflip-recursive-tzcb5-4080411136 └─┬─✔ heads coinflip-recursive-tzcb5-4080323273 └─○ tails
复制代码

退出处理

退出处理是一个指定在 workflow 结束时执行的 template,无论成功或失败。

spec:  entrypoint: intentional-fail  onExit: exit-handler                  # invoke exit-handler template at end of the workflow  templates: ...
复制代码

对比 Tekton

相较于 Tekton 而言,Argo 的流程控制功能更加丰富。拥有着循环、递归等功能,这对于一些机器学习的场景都是十分适用的。而 Argo 社区对自己的定位也是 MLOps、AIOps、Data/Batch Processing,这也正是 Kubeflow Pipeline 底层基于 Argo 的原因(尽管 KFP 也在做 Tekton 的 backend)。


但是在权限控制方面,Argo 做的就不如 Tekton;并且我个人认为,Tekton 的结构定义更为清晰。二者各有优劣,可以根据自己的需求进行选择。


参考文档

  • Argo Roadmap:https://github.com/argoproj/argo/blob/master/docs/roadmap.md

  • Argo Examples:https://argoproj.github.io/argo/examples/#welcome

  • Argo Source Code:https://github.com/argoproj/argo


发布于: 2021 年 02 月 05 日阅读数: 2336
用户头像

火山引擎

关注

还未添加个人签名 2020.07.28 加入

火山引擎是字节跳动旗下的数字服务与智能科技品牌,基于公司服务数亿用户的大数据、人工智能和基础服务等能力,为企业客户提供系统化全链路解决方案,助力企业务实地创新,实现业务持续快速的增长。

评论

发布
暂无评论
Kubernetes 原生 CI/CD 构建框架 Argo 详解!