写点什么

如何借助 Kafka 持久化存储 K8S 事件数据?

作者:SEAL安全
  • 2023-05-22
    广东
  • 本文字数:3348 字

    阅读完需:约 11 分钟

如何借助Kafka持久化存储K8S事件数据?

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。 

$ kubectl get events
15m Warning FailedCreate replicaset/ml-pipeline-visualizationserver-865c7865bc
Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found
复制代码


尽管这些信息十分有用,但它只是临时的,保留时间最长为 30 天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。 


构建 K8s 事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。

  • Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。

  • 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。 


为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。 


本示例中所有的配置、源代码和详细设置指示都已经放在以下代码仓库中:https://github.com/esys/kube-events-kafka 



创建 Kafka broker 和主题

我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:https://strimzi.io/docs/operators/latest/overview.html 


首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1kind: Kafkametadata:  name: kube-eventsspec:  entityOperator:    topicOperator: {}    userOperator: {}  kafka:    config:      default.replication.factor: 3      log.message.format.version: "2.6"      offsets.topic.replication.factor: 3      transaction.state.log.min.isr: 2      transaction.state.log.replication.factor: 3    listeners:    - name: plain      port: 9092      tls: false      type: internal    - name: tls      port: 9093      tls: true      type: internal    replicas: 3    storage:      type: jbod      volumes:      - deleteClaim: false        id: 0        size: 10Gi        type: persistent-claim    version: 2.6.0  zookeeper:    replicas: 3    storage:      deleteClaim: false      size: 10Gi      type: persistent-claim
复制代码


然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1kind: KafkaTopicmetadata:  name: cluster-eventsspec:  config:    retention.ms: 7200000    segment.bytes: 1073741824  partitions: 1  replicas: 1
复制代码


设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1data:  config.json: |-    {      "sink": "kafka",      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",      "kafkaTopic": "cluster-events"    }kind: ConfigMapmetadata:  name: eventrouter-cm
复制代码


验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:


kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \  --bootstrap-server kube-events-kafka-bootstrap:9092 \  --topic kube-events \  --from-beginning{"verb":"ADDED","event":{...}}{"verb":"ADDED","event":{...}}...
复制代码


编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:


  • 消费者部分在 cluster-events 主题上监听传入的集群事件

  • 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中 


如果为 Kafka 配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。


p, err := kafka.NewProducer(cfg.Endpoint)if err != nil {        sugar.Fatal("cannot create producer")}defer p.Close()
c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)if err != nil { sugar.Fatal("cannot create consumer")}defer c.Close()
run := truesigs := make(chan os.Signal, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() { sig := <-sigs sugar.Infof("signal %s received, terminating", sig) run = false}()
var wg sync.WaitGroupgo func() { wg.Add(1) for run { data, err := c.Read() if err != nil { sugar.Errorf("read event error: %v", err) time.Sleep(5 * time.Second) continue } if data == nil { continue } msg, err := event.CreateDestinationMessage(data) if err != nil { sugar.Errorf("cannot create destination event: %v", err) } p.Write(msg.Topic, msg.Message) } sugar.Info("worker thread done") wg.Done()}()
wg.Wait()
复制代码


完整代码在此处:https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go 


当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。 


部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:


apiVersion: apps/v1kind: Deploymentmetadata:  labels:    app: events-fanout  name: events-fanoutspec:  replicas: 1  selector:    matchLabels:      app: events-fanout  template:    metadata:      labels:        app: events-fanout    spec:      containers:        - image: emmsys/events-fanout:latest          name: events-fanout          command: [ "./events-fanout"]          args:            - -logLevel=info          env:            - name: ENDPOINT              value: kube-events-kafka-bootstrap:9092            - name: TOPIC              value: cluster-events
复制代码


检查目标主题是否创建

现在,新的主题已经创建完成:


kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name
kafkatopic.kafka.strimzi.io/cluster-eventskafkatopic.kafka.strimzi.io/kube-systemkafkatopic.kafka.strimzi.io/defaultkafkatopic.kafka.strimzi.io/kafkakafkatopic.kafka.strimzi.io/kube-events
复制代码


你会发现你的事件根据其命名空间整齐地存储在这些主题中。 


总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。 


原文链接:https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

发布于: 刚刚阅读数: 2
用户头像

SEAL安全

关注

开发者友好的企业级解决方案 2020-11-05 加入

公众号:Seal软件 Seal-io

评论

发布
暂无评论
如何借助Kafka持久化存储K8S事件数据?_kafka_SEAL安全_InfoQ写作社区