写点什么

Apache APISIX 集成 Kafka 实现高效率实时日志监控

  • 2022 年 2 月 23 日
  • 本文字数:3718 字

    阅读完需:约 12 分钟

Apache Kafka 是由 Apache 管理的开源流处理平台,由 Scala 和 Java 编写,为处理实时数据提供了统一、高吞吐、低延迟的功能特性。


其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。目前已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序等领域。

实现方式:kafka-logger

Apache APISIX 早在 1.2 版本开始就已经提供了 kafka-logger 插件的支持,其后又经过多次功能强化,目前已具备非常成熟且完善的功能。支持将 API 请求日志,甚至请求体和响应体以 JSON 格式推送至 Kafka 集群中。


使用 kafka-logger 时,用户可以发送多种数据并自定义发送的日志格式,同时还支持以批处理的方式打包发送日志或进行自动重试等功能。

如何使用

步骤一:启动 Kafka 集群

本文示例只演示了一种启动方式,其他启动方式细节可参考官方文档


# 使用 docker-compose 启动一个具有 1个 zookeeper 节点、3个 kafka 节点的集群# 同时还启动一个 EFAK 用于数据监控。version: '3'
services: zookeeper: image: confluentinc/cp-zookeeper:6.2.1 hostname: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zookeeper:2888:3888
kafka1: image: confluentinc/cp-kafka:6.2.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper
kafka2: image: confluentinc/cp-kafka:6.2.1 hostname: kafka2 ports: - "9093:9093" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper

kafka3: image: confluentinc/cp-kafka:6.2.1 hostname: kafka3 ports: - "9094:9094" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zookeeper
efak: image: nickzurich/kafka-eagle:2.0.9 hostname: efak ports: - "8048:8048" depends_on: - kafka1 - kafka2 - kafka3
复制代码

步骤二:创建 Topic

接下来我们创建 test Topic 用于收集日志。


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ehdgn5Ju-1645610464291)(https://tfzcfxawmk.feishu.cn/space/api/box/stream/download/asynccode/?code=ZmYzYWIyZmVhNzExYjIzYTExNmY4NWMyOWZhMjJmNTRfeE8zZjNsOVZ4VXFVYjRqcGRtSWo2VUhPSkhTcGdITDhfVG9rZW46Ym94Y25OZkJsUjg3RHVUSGYwMnU3Y3R5N2VmXzE2NDU2MTA0NTQ6MTY0NTYxNDA1NF9WNA)]

步骤三:创建路由并开启插件

通过下方命令可进行路由创建与 kafka-logger 插件的开启。


curl -XPUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \--header 'Content-Type: application/json' \--data-raw '{    "uri": "/*",    "plugins": {        "kafka-logger": {            "batch_max_size": 1,            "broker_list": {                "127.0.0.1": 9092            },            "disable": false,            "kafka_topic": "test",            "producer_type": "sync"        }    },    "upstream": {        "nodes": {            "httpbin.org:80": 1        },        "type": "roundrobin"    }}'
复制代码


上述代码中配置了 kafka broker 地址、目标 Topic、同步的生产模式和每一批次包含的最大日志数量。这里我们可以先将 batch_max_size 设置为 1,即每产生一条日志就向 Kafka 写入一条消息。


通过上述设置,就可以实现将 /* 路径下的 API 请求日志发送至 Kafka 的功能。

步骤四:发送请求

接下来我们通过 API 发送一些请求,并记录下请求次数。


# 向 API 发送 10 次请求curl http://127.0.0.1:9080/get
复制代码


通过下图可以看到,有一些日志消息已经被写入到我们创建的 test topic 中。点击查看日志内容,可以发现上述进行的 API 请求日志已经被写入了。


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h2giHqly-1645610464292)(https://tfzcfxawmk.feishu.cn/space/api/box/stream/download/asynccode/?code=NmM5OGE5NWVkNGM5ZDU3YWMwZWNkNjY4ODZmM2FhOWJfUkphUVpsUG8zQmFIVGFpMU9rcFZPUjBzQWZEYnIzcDNfVG9rZW46Ym94Y25GOVRZYzBBekVSSnZ2WlgzNEpUUUJkXzE2NDU2MTA0NTQ6MTY0NTYxNDA1NF9WNA)]


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aNTOrlFk-1645610464293)(https://tfzcfxawmk.feishu.cn/space/api/box/stream/download/asynccode/?code=ZThiOGY2ODMzMDliY2EwNjQ4YTAwMTJlN2E0MzgzYjNfWDJCT25jWDZkTHpOMGdIOXYxY3NBZnFwbWhsNEt6NHRfVG9rZW46Ym94Y25lNFB3d2hMak9iQ0hYUGNEZll0cVlkXzE2NDU2MTA0NTQ6MTY0NTYxNDA1NF9WNA)]

自定义日志结构

当然,在使用过程中我们也可以通过 kafka-logger 插件提供的元数据配置,来设置发送至 Kafka 的日志数据结构。通过设置 log_format 数据,可以控制发送的数据类型。


比如以下数据中的 $host$time_iso8601 等,都是来自于 Nginx 提供的内置变量;也支持如 $route_id$service_id 等 APISIX 提供的变量配置。


curl -XPUT 'http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger' \--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \--header 'Content-Type: application/json' \--data-raw '{    "log_format": {        "host": "$host",        "@timestamp": "$time_iso8601",        "client_ip": "$remote_addr",        "route_id": "$route_id"    }}'
复制代码


通过发送请求进行简单测试,可以看到上述日志结构设置已生效。目前 Apache APISIX 提供多种日志格式模板,在配置上具有极大的灵活性,更多日志格式细节可参考官方文档


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xhTCoRSU-1645610464294)(https://tfzcfxawmk.feishu.cn/space/api/box/stream/download/asynccode/?code=YzUzM2U0M2YzZDdiOTRhZTliZWQzMzdlOTI4OGIwMDRfSWpkVUZrOXhydEJ5WTl4aTBWQ2lTMWRpRGYzRjBBN0FfVG9rZW46Ym94Y241M2tQbXFjWVZkUUFQMUxoSUZmakRhXzE2NDU2MTA0NTQ6MTY0NTYxNDA1NF9WNA)]

关闭插件

如使用完毕,只需移除路由配置中 kafka-logger 插件相关配置并保存,即可关闭路由上的插件。得益于 Apache APISIX 的动态化优势,开启关闭插件的过程都不需要重启 Apache APISIX,十分方便。


$ curl http://127.0.0.1:9080/apisix/admin/routes/1  -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '{    "methods": ["GET"],    "uri": "/hello",    "plugins": {},    "upstream": {        "type": "roundrobin",        "nodes": {            "127.0.0.1:1980": 1        }    }}'
复制代码

总结

本文为大家介绍了关于 kafka-logger 插件的功能前瞻与使用步骤,更多关于 kafka-logger 插件说明和完整配置列表,可以参考官方文档


目前,我们也在开发其他日志类插件以便与更多相关服务进行集成。如果您对此类集成项目感兴趣,也欢迎随时在 GitHub Discussions 中发起讨论,或通过邮件列表进行交流。

用户头像

Github:https://github.com/apache/apisix 2021.06.02 加入

Apache APISIX 是一个云原生、高性能、可扩展的微服务 API 网关。它是基于 OpenResty 和 etcd 来实现,和传统 API 网关相比,Apache APISIX 具备动态路由和插件热加载,特别适合微服务体系下的 API 管理。

评论

发布
暂无评论
Apache APISIX 集成 Kafka 实现高效率实时日志监控