写点什么

​Elastic Stack 最佳实践系列:Beats->ES,一个更轻型的架构选择

  • 2023-08-22
    广东
  • 本文字数:6844 字

    阅读完需:约 22 分钟

​Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择

作者:李捷,Elastic 首席云解决方案架构师

ELK 生态下,构建日志分析系统的选择

说起开源的日志分析系统,ELK 几乎无人不晓,这个生态并非是 Elastic 特意而为,毕竟 Elasticsearch 的初心是分布式的搜索引擎,被广泛用作日志系统纯粹一个“美丽的意外”,这是社区使用者推动而成。而现在各大云厂商推广自己的日志服务时,也往往将各种指标对标于 ELK,可见其影响之广。

但其实,流行的架构中并非只有 ELKB,当我们使用 ELKB 搭建一套日志系统时,除了 Elasticsearch, Logstash, Kibana, beats 之外,其实被广泛使用的还有另一个工具 —— Kafka。在这当中,Kafka 的作用是明显的,作为一个中间件,一个缓冲,它起到了提高吞吐,隔离峰值影响,缓存日志数据,快速落盘,同时通过 producer/consumer 模式,让 Logstash 能够横向拓展的作用,还能够用作数据的多路分发。因此,大多数时候,我们看到的实际架构,按数据流转顺序排列,应该是 BKLEK 架构



但我们在使用 Kafka 的时候,也并非是没有成本的,额外的一套分布式系统,更长的数据链路,都会是我们在做最后架构选型时的一些痒点,特别是随着我们产生的数据越来越多,BKLEK 架构会变得越来越大,越来越重,成本、性能、运维的简易性都会成为我们评估日志系统的重要指标。因此,我们的问题是:在 Elastic Stack 都已经进化到了 8.1 的当下,我们是否还需要延续一直以来的惯性思维,认为在我们仍然在任何情况下都是需要 BKLEK 的架构呢?

在我们开始正式探讨之前,我们可以从现在普遍看到的新的架构图可以一猜端倪:



在这个架构中,所有的 Integration 的输出都是 Elasticsearch,所有的数据处理都由 ingest pipeline 完成,数据的完整性和可靠性,由端点和 elasticsearch 之间的应答确认来保证。因此,我们本文探讨的是:

  1. 我们数据采集端直接到 ES 的架构,是否可取

  2. 我们什么时候可以使用这种架构

数据采集端直接到 ES 的架构分析

虽然 Elastic 原厂的 Fleet 与 Elastic Agent 已经处于 GA(普遍可用)的阶段,但因为其本身的一些限制,比如:

  • Integration Repository 需要连接外网

  • 需要单独的 Fleet Server (在最新版本已经与 APM server 合并为 Integration Server)

  • Elastic Agent 还没有能完全覆盖 Beats 所支持的数据源


因此,现阶段,我们讨论数据采集端直接到 ES 的架构时,会主要集中在 Beats->Elasticsearch 这一架构。这一架构,相对于 BKLEK 架构来说,少了中间的 Kafka,甚至我们可以忽略 Logstash,因此,架构会相对精简,带来的好处包括:

  • 相对更低的成本

  • 更高的传输和数据处理效率

  • 更一致的安全特性

  • 更容易进行监控


当然,在带来以上好处的同时,我们也会失去 Kafka 所带来的各种好处。不过不用担心,Kafka 的特性只是构建一个稳健的日志分析系统的充分条件,而非必要条件,在不少场景下,我们不一定是非 Kafka 不可。接下来,我们将讨论几个我们相对会比较关心的问题,以让大家了解,我们是否可以选择这种架构,什么时候选择这种架构,以及相应的最佳实践。

Beats -> Elasticsearch 链路的健康保障

对于不使用 Kafka 的场景,我们可能始终会有点觉得不踏实。因为对 Beats -> Elasticsearch 这个简单架构不够了解,以至于我们信心不足。接下来,我们讨论一下,在这种简单架构下,我们是如何面对各种可能出现的问题的。

大多数架构师会担心的问题是流量波动的问题,如果突然出现日志流量的洪峰,是不是会影响到后端的日志系统。这个问题的答案是,有影响,但影响有限

我们先明确一下日志系统的主要作用:即日志的集中管理,在统一的日志平台上提供所有日志的准实时的关联查询与分析能力。这里的核心是准实时和查询能力。

在流量洪峰的情况下,受影响的是“准实时”的能力,因为受限于日志系统的处理能力,如果日志产生的速度,大于日志系统处理的速度,则我们无法读取到最新的数据。这个问题,即便是我们有了 Kafka 也是无法解决的。

而查询的能力,几乎不受影响。原因如下:

  • ES 的读写是由不同的线程处理的,有各自独立的线程池。过载的写流量,会导致后到的写请求,因为写线程池已满,而被拒绝。但不会导致查询情况的拒接服务

  • ES 从 7.0 开始,使用真实内存断路器,能够避免由于过多的请求,导致节点内存 OOM。但在日志场景,特定时间内日志请求的数量是有明确上限的,该上限为 Beats 的数量,相比于高并发的读场景,Beats 几乎不可能造成请求数量的过载

  • Beats 与 Elasticsearch 之间有背压检测机制,当 Beats 检测到 ES 有拒绝写服务的情况出现时,会主动限流降低,避免对 ES 产生持续的压力与影响。具体参见,附录 1


或许这种“轻描淡写”的描述,让大家觉得有点难以相信,甚至与平时的经验有点不符。但实际上真正的问题来自于不合理的架构:

  • 大多数企业会选择维护一个“怪物”级别的 ES 集群,即,将业务搜索和日志查询两个完全不同的场景混用与同一个 ES 集群。因此,当某些日志流量洪峰来临,打满了“write”线程池之后,会发现业务数据也无法写入了。但仍然的,kafka 也解决不了这个问题,因为,即便有 kafka 在前面挡住真正的日志数据洪流,你也很难判断 kafka 后面的 logtash 集群,会不会导致 ES 集群拒绝写服务,因为消费日志永远比索引日志要快,logstash 会持续消费 kafka 中的数据,直到感受到背压,才会减缓输出的频率,这点和 Filebeat 是一样的

  • 大多数在早期就已经在使用 ES 的企业,由于缺乏合理的架构设计,或者缺乏足够的升级集群的信心,集群一直停留在 5,6 等老版本的,因此,整个架构缺乏足够的健壮性和可恢复性,一些关键能力,比如背压检测,内存断路器,Heap memory offload 等功能是没有的。导致集群较为脆弱,在流量洪峰时,出现 OOM 等问题,也是有的。但仍然,kafka 也解决不了这个问题。


因此,我们在做架构选型之前,确定要不要使用 Beats -> Elasticsearch 这一架构之前,不妨先审视一下,我们当前是不是在业务和日志混用 ES 集群,我们的 ES 集群版本是不是过低,运维缺乏升级的能力?这两个反而是更致命的问题

回到流量洪峰问题的解决。即便加上 kafka,也只是治标不治本,看起来架构更加健壮了一点,但实际上也并不能帮助我们提高后端的消费能力,我们整体面临的情况和使用 Beats -> Elasticsearch 这一架构是没有多大的区别的(Kafka 最大的好处,就是把这些数据落盘到自己的磁盘上了,但这真的是我们在这个场景下需要解决的问题吗?)。

流量洪峰的治本方案,应该是着眼于快速的故障排查,快速的找到出现故障的机器并解决故障。(出现业务暴增也不是不可能的情况,但正常的业务日志撑爆日志系统的可能性非常小,因为首先会撑爆的是产生日志的业务系统,因此,一旦业务暴增,我们需要考虑业务系统和日志系统的同步扩容)

我们可以从 Elastic Stack 上快速的找到这样问题的答案,而且方法很多,这里举例一二。

  • 监控日志集群的 bulk 拒绝率



  • 监控日志错误率



  • 使用机器学习判断日志异常



在发现异常之后,我们可以进一步定位,通过仪表板,查看具体是哪个服务出现了异常:



另一个方面,在使用 Kafka 与 Logstah 的情况下,其实也是会有数据丢失的风险的。数据处理的链路越长,架构越复杂,带来的脆弱性就越多。在这个架构中,Kafka 通过冗余提供了高可用。但 Logstash 确成为了风险的一环,虽然其和 Kafka、ES 之间都有应答确认的机制。但一旦 Logstash 在消费 Kafka 之后,正确投递 ES 之前发生了崩溃,则数据将会丢失。并且我们很可能并不知道数据丢失了。

Beats->Elasticsearch 的链路效率

在使用 Kafka 与 Logstah 的情况下,数据需要由 Beats 首先落盘到 Kafka 的分布式日志文件中,再由 Logstash 从 Kafka 中消费,之后,数据又要根据 Logstash 与 ES 之间的背压,将数据落盘到 Logstash 的 Disk Queue 上。相比于直接由 Beats 到 ES 的架构,这里有了两次数据落盘,读盘的操作;

另一方方面,在基线测试中,我们可以看到 Elasticsearch 的 Ingest node 相对 Logstash 的 ETL 处理的能力会来得更加高效。其原因在于,Logstash 是一个 Java-Ruby 解析器上的应用,在 JVM 上运行 Ruby 解析器的运行效率,不如纯 java 应用的 ingest node。并且处理完的数据,相对于已经包含了集群内部路由信息的 ingest node,在 Logstsh 作为 ETL 的架构下,还需要中间一个额外的 ES 节点负责 bulk request 的路由分发,需要更多的网络跳转才可以最终写到对应的 data node。

更一致的安全特性

而在链路安全方面,ingest node 处于 ES 集群内,天然的使用统一的安全策略。而 Beats->ES 本身就是通过 HTTPS,再配合证书或者密码验证的方式,可以保证链路的安全。

而如果在链路中再增加 Kafka 与 Logstash,则整条链路的安全配置则会更加复杂,稍有配置不慎,还会存在更多的数据泄露的风险。

更容易进行监控

在监控方面,简单的架构会让我们更容易发现系统的异常。并且,我们有现成的手段和现成的告警规则



什么是最佳的架构?

“最佳架构”这个词一定是一个伪命题,因为不存在能解决所有问题的银弹。但指导准则是有的,即按需选择合适的架构



企业通用架构

我们的指导原则是:

  • 大多数的日志数据链路,尽量选择简单且健壮的架构。Beats->Elasticsearch 的架构足够简单和健壮,这个应该是大多数情况下,以 Elastic Stack 为基础的日志系统应该选择的数据摄入方式

  • 对于缺乏可靠性的数据链路,以及需要高吞吐的能力、快速将数据从边缘节点移出的,需选择 Kafka 的帮助

  • 整个架构中的数据链路可以是异构的,根据数据的属性,选择合适的链路


大家可以看到,我们在这里建议的是大多数情况下,我们应该选择 Beats->Elasticsearch 这一简单的架构。其原因就像我们之前提到的:

  • Beats->Elasticsearch 的链路足够健壮

  • Beats->Elasticsearch 的链路效率更高

  • Beats->Elasticsearch 有更好的安全一致性

而当我们确实需要 Kafka 和 Logstash 的数据链路,可以参考我们的一些配置建议[1][2]。

接下来,我们再总结一下我们该如何判断是否需要 Kafka

我们在什么情况下,需要 Kafka

作为一个 Queue,Kafka 在日志场景的作用,并不像在其他的业务应用场景中那么重要。它最主要的作用就是能够将数据快速落盘,并且以冗余的方式存储在分布式的日志文件中。同时,通过 producer/consumer 机制,让后端的 ETL 工具可以并发的消费,并且提供再消费的容错能力。

这里的重点是数据“快速”“落盘”。那我们在什么场景下非常需要这两个能力呢?我们先说“落盘”

数据需要落盘

有一些数据,是转瞬即逝型的数据,比如,我们通过各种探针,exporter 采集数据后,在采集端没有落盘,如果不及时存储起来,就会丢失这部分数据。类似于物联网数据,指标数据,apm 数据都属于这种类型的数据。



数据产生于那一个瞬间,并且在产生数据的地方,我们并没有什么机制去存储这些数据,因此,在数据真正进入后端的存储分析之前,我们往往需要一个 Queue 能够帮我们把这些数据落盘。这时,Kafka 几乎就是我们在技术选型时的不二选择。

数据需要高速转移

再说“快速”,Kafka 的高吞吐也是非常重要的一个能力,也是其得以让人追捧的主要原因之一。可以设想这样的一种场景,在云原生环境下,我们非常动态的去创建各种计算资源,以应对业务流量的变化,虽然每个计算资源产生的数据落盘了,但由于它可能会被销毁,因此,我们需要在它被销毁之前,把其产生的日志数据搬移到 Pods 之外,对于这种转瞬即逝的资源所产生的数据,也需要 Kafka 的能力。

应对数据高峰

日志数据或基于事件的数据很少具有一致的、可预测的大小或流速。考虑一个在周五晚上升级应用程序的场景,您部署的应用程序有一个严重的错误,即信息被过度记录,淹没了您的日志基础设施。在其他多租户用例(例如游戏和电子商务行业)中,这种峰值或数据爆发也相当普遍。在这种情况下使用像 Kafka 这样的消息队列也能来缓冲数据,减缓影响。

数据的多路分发

还有一种情况,是我们可能需要 Kafka 将数据分发到别的地方,我们可以定义多个消费端,分别去消费 Kafka 里的数据,将数据分发到不同的数据处理系统。注意,不要图省事,只建立 Logstash 作为消费端,然后尝试用 Logstash 的多个 Output 去分发。因为 Logstash 并不能保证过个 Output 之间的数据同步与一致,我相信不仅是 Logstash,其他的消费端也无法做此保证,因此需要多个消费端分别消费 Kafka 里的数据。当然,这里不是只有 kafka 一个选择,也可以数据入湖,再从湖中消费。

因此,总结一下,如果你的数据因为没有快速落盘,而存在丢失的风险;如果你的数据吞吐很大,需要及时转移,以及需要应对可能出现的数据洪峰,则你需要 Kafka。

我们在什么情况下,可以不需要 Kafka

先总结一下我们在什么时候必须要用 Kafka:

我们稍微回想一下,在通常的日志场景下我们是如何采集日志的:

filebeat.autodiscover:      providers:        - type: kubernetes          node: ${NODE_NAME}          hints.enabled: true          templates:            - condition:                contains:                  kubernetes.container.name: "opbeans-"              config:                - type: container                  paths:                    - "/var/log/containers/*-${data.kubernetes.container.id}.log"                  include_lines: ['^{']                  tail_files: true
复制代码

复制

我们需要指定特定的日志目录,也就意味着,我们是从磁盘上采集日志数据的。而在云环境上,如果我们采用了云盘作为数据盘,这些日志数据在产生端,即已经实现了冗余的落盘存储。因此,相对于物联网数据,指标数据,apm 数据这种 ephemeral 数据,已经落盘的日志数据大多数时候,并不需要额外用一个 Queue 再去做一次数据的落盘与冗余存储。

但在之前的分析中,我们也说过,在某些 ephemeral 环境下,如果存储数据的本地文件缓存,在我们将数据搬移出去之前就被销毁的话,就会有数据丢失的风险。但幸运的是,大多数情况下,我们是能够轻松判断出这种风险的,大多数非动态扩缩容的应用,比如那些部署在虚拟机上的应用,这种风险的可能性极小,并且可以控制。相对于快速将数据搬出到 kafka,增加本地文件存储的大小,反而是一个更简单,更低成本的选择

并且,当我们发现数据没有正确的写入日志分析系统时,重新让 filebeat 采集一次日志,会比让 Logstash 去调整 offset,在 Kafka 上重新 consume 来得简单得多,直观得多,你不需要了解被采集的文件和 kafka topic、partition 之间的映射关系,你不需要知道文件与 offset 的映射关系,以及如何操作 re-consume。而且通过 Beats 与 Elasticsearch 之间的端到端的应答确认机制和背压探测机制,我们更容易保证数据的正确写入。

因此,总结一下,如果你的数据是日志,这些日志被写入到磁盘,并且不会在短期内被删除。我们可以选择不用 Kafka

总结

为了防止大家漏掉关键的点,在最后再强调三遍:

  • 日志系统切勿与业务搜索系统混用一个 ES 集群

  • 日志系统切勿与业务搜索系统混用一个 ES 集群

  • 日志系统切勿与业务搜索系统混用一个 ES 集群

这是我看过的最悲剧的日志分析系统的架构,没有之一。

回到我们本文内容,一个常见的问题是,我们常常忽略架构设计的重要性,懒于去思考其中变量的影响,希望通过一刀切的简单方案去做事情(一上来就是 BKLEK 架构,不去思考这样做是否值得,我们是否还有更多的选择)。但即便是我们全部使用了 Beats, Kafka, Logstash, Elasticsearch, Kibana 五个组件,也不是所有的数据链路都走一样的路径,我们可以针对接入系统的特性,优先级,重要程度,要求的指标,有所取舍。

因此,虽然选择 Elastic Stack 是一个一定不会错的选择,但如果你觉得这套系统复杂了,成本高了,那希望本文可能给你帮助,或许在只有日志数据的这个阶段,你的系统还是可以简化的,有更多选择的。

附录

1, filebeat 与 Elasticsearch 之间的背压检测协议

Filebeat 在将数据发送到 Logstash 或 Elasticsearch 时使用背压敏感协议来处理大量数据。如果 Logstash 忙于处理数据,它会让 Filebeat 知道减慢其读取速度。一旦拥塞得到解决,Filebeat 将恢复到原来的速度并继续发送数据。



关于细节,可以查看以下配置:

 # The number of seconds to wait before trying to reconnect to Elasticsearch  # after a network error. After waiting backoff.init seconds, the Beat  # tries to reconnect. If the attempt fails, the backoff timer is increased  # exponentially up to backoff.max. After a successful connection, the backoff  # timer is reset. The default is 1s.  backoff.init: 1s  # The maximum number of seconds to wait before attempting to connect to  # Elasticsearch after a network error. The default is 60s.  backoff.max: 60s
复制代码

复制

与 Libbeat 的实现(https://github.com/elastic/beats/下):

/libbeat/outputs/elasticsearch/elasticsearch.go (elasticsearch output 会使用 backoff 初始化 client)

/libbeat/outputs/elasticsearch/client.go#L204 (client 会判断接口调用是否成功,如果失败,会返回 err)

/libbeat/outputs/backoff.go#L38(判断接口的返回,如果有 err,等待)

  func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error {	err := b.client.Publish(ctx, batch)	if err != nil {		b.client.Close()	}	backoff.WaitOnError(b.backoff, err)	return err}
复制代码


用户头像

还未添加个人签名 2020-06-19 加入

欢迎关注,邀您一起探索数据的无限潜能!

评论

发布
暂无评论
​Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择_ES_腾讯云大数据_InfoQ写作社区