Logstash
一、简介
网址:https://github.com/elastic/logstash
Logstash 是具有实时流水线能力的开源的数据收集引擎。Logstash 可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。
二、流程图
inputs(输入阶段):
会生成事件。包括:file、kafka、beats 等
filters(过滤器阶段):
可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate 等
outputs(输出阶段):
将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file 等
Codecs(解码器):
基本上是流过滤器,作为输入和输出的一部分进行操作,可以轻松地将消息的传输与序列化过程分开。
1. 工作原理
Logstash 管道中每个输入阶段都运行在自己的线程中,输入将事件写入到内存或磁盘的中央队列。每个管道工作线程(pipeline worker)从队列中获取一批事件,通过配置的过滤器运行这批事件,然后将过滤的事件运行到所有输出。批处理的大小和工作线程数可以通过pipeline.batch.size
和pipeline.workers
进行配置。
默认 Logstash 在管道各阶段之间使用内存队列来缓存事件,如果发生意外的终止,则内存中的事件都将丢失。为了防止数据丢失,可以启用 Logstash 配置queue.type: persisted
将正在运行的事件持久保存到磁盘。
2. 事件顺序
默认 Logstash 不保证事件顺序,重新排序可以发送在两个地方:
批处理中的事件可以在过滤器处理期间重新排序
当一个或多个批次的处理速度快于其他批次时,可以对批次重新排序
当维护事件顺序非常重要时,排序设置:
设置
pipeline.ordered: auto
且设置pipeline.workers: 1
,则自动启用排序。设置
pipeline.ordered: true
,这种方法可以确保批处理是一个一个的执行,并确保确保事件在批处理中保持其顺序。设置
pipeline.ordered: false
则禁用排序处理,但可以节省排序所需的成本。
三、如何使用
Logstash Module 提供了一种快速的端到端的解决方案,用于提取数据并使用专用仪表盘对其进行可视化。
每个模块都内置了 Logstash 配置、Kibana 仪表盘和其他元文件。使您可以更轻松地为特定用例或数据源设置 Elastic Stack。
为了更轻松的上手,Logstash Module 提供了三种基本功能,运行模块时将执行以下步骤:
创建 ElasticSearch 索引
设置 Kibana 仪表盘和可视化数据所需要的索引模式,搜索和可视化。
使用配置运行 Logstash pipeline
四、弹性数据
当数据流过事件处理管道时,Logstash 可能会遇到阻止其事件传递到输出的情况。如:意外的数据类型或异常终止。 为了防止数据丢失并确保事件不中断的流过管道,Logstash 提供了两种功能。
持久队列(persistent queues)
死信队列(dead letter queues-DLQ)
持久队列
默认 Logstash 在管道阶段(inputs->pipeline worker)之间使用内存中有边界队列来缓冲事件。这些内存队列的大小是固定的,并且不可配置。如果 Logstash 遇到暂时的计算机故障,那内存队列中的数据将丢失。
为了防止异常终止期间的数据丢失,Logstash 具有持久队列功能,该功能将消息队列存储在磁盘上,提供数据的持久性。 持久队列对于需要大缓冲区的 Logstash 部署也很有用,无需部署和管理消息代理(Kafka、Redis 等)以促进缓冲的发布-订阅模型,可以启用持久队列在磁盘上缓冲消息并删除消息代理。
使用 queue.max_bytes 可配置磁盘上队列的总容量,当队列已满时,Logstash 向输入施加压力阻止数据流入,不再接受新事件,这种机制有助于在输入阶段控制数据流速,不会压倒性的到输出。
持久队列的好处:
Logstash 异常终止或重启启动时避免数据丢失,将消息存储在磁盘上,直到传递至少成功一次。
无需使用 Kafka 外部缓冲消息代理。应对大缓冲区和吸收突发事件。
无法解决的问题:
永久性机器故障(如磁盘损坏),持久队列无法防止数据丢失。具有确认能力的 Beats 和 http 之类的插件,将受到持久队列的良好保护。
不使用请求-响应协议的输入插件(如 TCP、UDP),持久队列无法防止数据丢失。
工作原理
队列位于输入和过滤器阶段之间:
input → queue → filter + output
。当输入阶段可处理事件时将事件写入队列,成功写入后,输入可以向数据源发送确认(acknowledgement)。
处理队列中的事件时,Logstash 仅在过滤器和输出已完全处理该事件时,该事件才记录(队列保留管道已处理的事件记录)为已处理(acknowledged/ACKed)- 这意味着该事件已由所有已配置的过滤器和输出处理。
在正常关闭时,Logstash 将停止从队列读取数据,并将完成正在由过滤器和输出处理中的事件。重启后,Logstash 将恢复处理持久队列中的事件,并接受来自输入的新事件。
如果 Logstash 异常终止,任何运行中的事件都不会被记录为 ACKed,并且在 Logstash 重新启动时将被过滤器和输出重新处理。Logstash 在批处理事件,当发生异常终止时,可能有一些批处理已经成功完成,但没有记录为 ACKed。
页
队列本身就是一个页(page)集合,分为头页(head page)和尾页(tail page),仅有一个头页,达到具体大小(queue.page_capacity)时将变成尾页,并创建一个新的头页。尾页是不可变的,头页是仅追加的。 每个页都是一个文件,页中的所有事件确认后,将被删除,如果较旧的页中至少有一个未被确认,整个页将保留在磁盘上,直到成功处理该页上的所有事件为止。
检查点
启用持久队列功能后,Logstash 通过一种称为检查点(checkpoint)的机制提交到磁盘。检查点文件在单独文件中记录有关自身的详细信息(页信息,确认等)。 当记录检查点时,Logstash 将调用头页的同步操作和以原子的方式将队列的当前状态写入磁盘。检查点的过程是原子的,意味着如果成功,将保存对文件的任何修改。 如果 Logstash 终止,或者出现硬件级别的故障,则持久队列中缓冲但尚未提交检查点的所有数据都将丢失。 可以通过设置 queue.checkpoint.writes,强制 Logstash 更频繁地检查点。为了确保最大的持久性避免丢失数据,可以设置 queue.checkpoint.writes 为 1,在每次事件后强制执行检查点。
死信队列
死信队列提供了另一层数据弹性。(当前仅对 Elasticsearch 输出支持死信队列,用于响应码为 400 和 404 的文档,二者均表示无法重试的事件。) 默认情况,当 Logstash 遇到由于数据错误而无法处理事件时,会挂起或删除失败的事件。为了防止数据丢失,可以配置将未成功的事件写入死信队列,而不是丢弃。 写入死信队列的每个事件包括原始事件、无法处理的原因、写入事件的插入信息以及事件时间戳。 要处理死信队列的事件,需要创建一个管道配置,使用 dead_letter_queue 插件从死信队列中读取数据。
工作原理
Elasticsearch 无法访问的 HTTP 请求失败,Elasticsearch 输出插件将无限期的重试整个请求,这些场景中死信队列不会拦截。
部署和扩展
从操作日志和指标分析到企业和应用程序搜索,Elastic Stack 可用于大量用例。确保将数据可扩展、持久和安全地传输到 Elasticsearch 极为重要,尤其是对于关键任务环境。 本文重点介绍 Logstash 的常见体系结构模型,以及如何随着部署的增长而有效的扩展。重点放在操作日志、指标、安全分析用例上,因为它们往往需要大规模部署。
Beats to Elasticsearch
使用 Filebeat Modules,可以快速的收集、解析和索引流行的日志类型和预建的 Kibana 仪表盘。这种情况下 Beats 会将数据直接发送到 ES,由摄取节点处理并索引数据。
Beats and Logstash to Elasticsearch
Beats 和 Logstash 共同提供了可扩展且具有弹性的全面解决方案。Beats 运行在数千台边缘主机服务器上,将日志收集、拖尾和传输到 Logstash。 Logstash 是水平可伸缩的,可以形成运行同一管道的节点组。Logstash 的自适应缓冲功能即使在吞吐量变化不定的情况下也有助于流畅的传输。如果 Logstash 成为瓶颈,只需要添加更多节点即可进行横向扩展。以下是一些建议:
扩展:
Beats 应该在一组 Logstash 节点之间实现负载均衡
建议至少使用两个 Logstash 节点已实现高可用性
通常每个 Logstash 节点仅部署一个 Beats 输入,但也可以为每个 Logstash 节点部署多个 Beats 输入。
弹性:
使用 Filebeat/Winlogbeat 进行日志收集时,可以保证至少一次交付
从 Filebeat/Winlogbeat 到 Logstash,以及从 Logstash 到 Elasticsearch 这两种通讯协议都是同步且支持确认。其他的 Beats 不支持。
处理:
Logstash 通常将使用 grok 或 dissect 提取字段,增强地理信息,并可以进一步利用文件、数据库或 Elasticsearch 查找数据集来丰富事件。
处理复杂性会影响整体吞吐量和 CPU 利用率,确保检查其他可用的过滤器插件。
Integrating with Messaging Queues
如果现有的基础架构中有消息队列,那么将数据放入 Elastic Stack 会很容易。如果仅使用消息队列用于 Logstash 缓冲数据,建议使用 Logstash 持久队列,消除不必要的复杂性。
性能调优
包括性能故障排除和调优和分析 Logstash 性能。
JVM
建议堆的大小应不小于 4G 且不大于 8G,太小会导致 JVM 不断的进行垃圾回收,造成增加 CPU 利用率
堆的大小不要超过物理内存量的水平,必须保留一些内存以运行 OS 和其他进程,一般不要超过物理内存的 50-75%。
将最小(Xms)和最大(Xmx)堆分配大小设置为相同的值,以防止在运行时调整堆大小,这是一个非常昂贵的过程。
调优和分析 Logstash 性能
Logstash 提供了以下选项来优化管道性能,pipeline.workers,pipeline.batch.size 和 pipeline.batch.delay。
pipeline.workers
此设置确定要运行多少个线程以进行过滤和输出处理。如果发现事件正在备份或者 CPU 没有饱和可以考虑增加此参数以更好的利用可用的处理能力。
pipeline.batch.size
此设置定义单个工作线程在尝试执行过滤器和输出之前收集的最大事件数。较大的批处理大小通常更有效,但会增加内存开销。 某些硬件配置要求您增加 jvm.options 配置文件中的 JVM 堆空间,以避免性能下降。由于频繁的垃圾收集或与内存不足异常相关的 JVM 崩溃,超出最佳范围的值会导致性能下降。 输出插件可以将每个批次作为逻辑单元进行处理。例如,Elasticsearch 输出为收到的每个批次发出批量请求。调整 pipeline.batch.size 设置可调整发送到 Elasticsearch 的批量请求的大小。
pipeline.batch.delay
很少需要调整。此设置调整 Logstash 管道的延迟。管道批处理延迟是 Logstash 在当前管道工作线程中接收到事件后等待新消息的最长时间(以毫秒为单位)。经过这段时间后,Logstash 开始执行过滤器和输出。Logstash 在接收事件和在过滤器中处理该事件之间等待的最长时间是 pipeline.batch.delay 和 pipeline.batch.size 设置的乘积。
管道的配置和优化
进行中事件的总数由 pipeline.workers 和 pipeline.batch.size 设置的乘积确定。注意在间歇地不规则的接收大型事件的管道,需要足够的内存来处理这些峰值。 可以将工作线程数设置高于 CPU 内核数,因为输出通常度过空闲时间在 I/O 等待条件下。
评论