写点什么

Logstash 中 Ruby filter 使用指南

用户头像
Langer
关注
发布于: 2021 年 03 月 28 日
Logstash 中 Ruby filter 使用指南

引言

许多小伙伴在使用 ELK 进行数据处理的过程中,对 Logstash 中 rugy 插件的使用产生许多疑问,Logstash 插件功能强大,详细理清 ruby 插件的使用方法对小伙伴日常工作多有助益。本文将对 ruby 插件的使用方式、部分原理进行详细说明,并附带丰富示例帮助小伙伴们理解消化。

Logstash 插件原理简述

数据在 logstash 中从 input 插件流入到 filter 插件,最终通过 output 插件输出。在 fiter 阶段,可以通过 logstash 提供的数十个 filter 插件处理 event 中各字段的数据,包括但不限于 event 过滤、字段增加、字段删除、字段匹配选择修改等。logstash 工具本身由 java 和 ruby 实现,Ruby 插件灵活性高,处理 logstash 事件实用高效。

ruby 插件介绍

ruby 插件的使用包含两种方式:inline 和 script,在 ruby 插件的配置选项中,配置参数如下所述,其中 code 和 init 用在 inline 方式中, path 和 script_params 用在 script 方式中。 tag_on_exception 参数用在 ruby 代码运行出错时的处理中。

inline 方式

inline 方式通过将需要执行的 ruby 代码以字符串的形式配置在 logstash 的配置文件中,如下所示:

filter {ruby {init => "ruby code run in plugin init stage"code => "ruby code run within every event"}}
复制代码

在 inline 方式中, init 中的代码在插件启动时执行, code 中的代码在每次 event 中都会执行,所以 init 中可以放置外部库导入代码,实例参数初始化代码等, init 和 code 在 inline 方式中起到的作用和 script 中的 register 方法和 filter 方法起到的作用分别对应。后面介绍 script 方式时会具体介绍。

  • init:初始化中执行的 ruby 代码

  • code:每条 event 都会执行的代码

script 方式

如果插件处理逻辑不复杂,代码量不大,可以直接用 inline 方式使用插件就好,但如果逻辑复杂且代码量大,可以考虑使用 script 方式使用插件,优点是代码易读易维护。script 方式用到另外两个配置参数: path 和 script_params

  • path: 插件脚本在执行环境中的存储绝对路径

  • script_params:从配置中传递给脚本的外部参数

script 方式和 inline 方式最大的不同就是 script 方式需要按照插件要求的格式完成两个函数的编写,脚本模板如下:

def register(params)enddef filter(event)return [event]end
复制代码

其中 register 起到的作用和 inline 模式中 init 类似,filter 和 code 类似,两个方法有各自要求和特点:

register方法与script_params配置参数相关,可通过其传参数到registerfilter方法返回有具体要求,即返回event对象组成的数组。
复制代码

script 方式配置文件和脚本文件缺一不可,配置方式如下:

ruby {path => "path in logstash environment"script_params => { "param_key" => "param_value" }}
复制代码

event 对象

在使用这 inline 和 script 两种方式使用 Ruby 插件时,代码中能操作的对象并不多,通过如下代码,可以输出 code setting 中代码的运行 context 有哪些:

ruby {code => "puts local_variables; "}
复制代码

logstash 运行输出两个变量: event 和 new_event_block ,其中 new_event_block 只在需要主动生成新的 event 时才起作用,需要重点关注的是 event 对象。接下来,查看一下 event 对象有哪些变量和方法可以使用。

ruby {code => "puts event.instance_variables; puts event.public_methods"}
复制代码

运行发现,event 对象并没有任何实例变量,公共方法却有不少,其中,比较重要的方法有:

to_json, to_hash : 格式化输出set, get, remove, cancel :方法用来对数据字段进行自定义化的操作
复制代码

举例说明

为了更好理解 ruby 插件各个参数的作用,以及 inline 方式和 script 方式的具体使用方法,通过几个例子来具体说明,举例按照从易到难 的方式排列。

为了更好说明 ruby 插件的使用方法,下列所有举例都使用相同的运行环境和测试数据。

运行环境为: kafka producer —-> kafka —-> logstash —-> es

测试数据为:

{"id" : "5f0970c84c1de6c4fdc907ee","comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=","parent_comment_id" : 123 ,"like_count" : 2 ,"comment_url" : "test_url","update_time" : "2020-07-11 15:56:56","is_new" : 1 ,"data": "test data"}
复制代码

两种方式读取 event 中数据 data 字段

配置代码:

ruby {code => "p event.get('[data]')"}
复制代码

output: “test data”

脚本代码:

def register(params)enddef filter(event)p event.get('[data]')return [event]end
复制代码

output: “test data”

两种方式删除 event 数据字段

配置代码:

ruby {code => "event.remove('data')"}
复制代码

脚本代码:

def register(params)enddef filter(event)event.remove('data')return [event]end
复制代码

两种方式修改 event 中原有数据字段

配置代码:

ruby {code => "event.set('[data]', 'new data')"}
复制代码

脚本代码:

def register(params)enddef filter(event)event.set('[data]', 'new data')return [event]end
复制代码

两种方式新增 event 中数据字段

配置代码:

ruby {code => "event.set('[new_key]', 'new data') if not event.include?('new_key')"}
复制代码

脚本代码:

def register(params)enddef filter(event)if not event.include?('new_key')event.set('[new_key]', 'new data')endreturn [event]end
复制代码

两种方式匹配 event 中数据字段的字段⻓度

配置代码:

ruby {code => "id_valid = event.get('[id]').length;event.set('[is_id_valid]', 1) if id_valid != 24"}
复制代码

脚本代码:

def register(params)enddef filter(event)	id_valid = event.get('[id]').length   if id_valid == 24		event.set('[is_id_valid]', 0)   else		event.set('[is_id_valid]', 1)   end  return [event] end
复制代码

两种方式匹配 event 中数据字段的字段是否包含 html 标签

配置代码: *

ruby {code => "is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/);event.set('[is_data_valid]', 0) if not is_html"}
复制代码

脚本代码:

def register(params)enddef filter(event)is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/)if is_htmlevent.set('[is_data_valid]', 1 )elseevent.set('[is_data_valid]', 0 )endreturn [event]end
复制代码

两种方式匹配 event 中数据字段中时间格式是否为 UTC 格式

配置代码:

ruby {code => "is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/);event.set('[is_time_valid]', 0) ifis_utc"}
复制代码

脚本代码:

def register(params)enddef filter(event)is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/)if is_utcevent.set('[is_time_valid]', 0 )elseevent.set('[is_data_valid]', 1 )endreturn [event]end
复制代码

两种方式匹配 event 中数据字段是否为合法 URL

配置代码:

ruby {code => "is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/);event.set('[is_url_valid]', 0) if is_url"}
复制代码

脚本代码:

def register(params)enddef filter(event)is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/)if is_urlevent.set('[is_url_valid]', 0 )elseevent.set('[is_url_valid]', 1 )endreturn [event]end
复制代码

插件中引入外部库

当 event 内置 API 和 ruby string 的各类方法不能满足需求时,可以引入 ruby 的公共库来解决相关问题。下面距离说明引入 Ruby 公共 json 解析库来解析 event 相关字段,示例如下:

inline 方式:

ruby {init => "require 'json'"code => "data=event.to_json;new_data=JSON.parse(data);event.set('[new_data]', new_data)"}
复制代码

output:

{"id" : "5f0970c84c1de6c4fdc907ee","comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=","parent_comment_id" : 123 ,"like_count" : 2 ,"comment_url" : "test_url","update_time" : "2020-07-11 15:56:56","is_new" : 1 ,"data": "test data""new_data" => {"id" : "5f0970c84c1de6c4fdc907ee","comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=","parent_comment_id" : 123 ,"like_count" : 2 ,"comment_url" : "test_url","update_time" : "2020-07-11 15:56:56","is_new" : 1 ,"data": "test data"}"@version" => "1","@timestamp" => 2020 - 07 - 16T02: 11 :39.081Z}
复制代码

脚本方式:

def register(params)#para1 = params['param_key']#p para#output: "param_value"require 'json'end
def filter(event)data=event.to_jsonnew_data=JSON.parse(data)event.set('[newdata]', newdata)return [event]end
复制代码

output: 与 inline 方式相同


至此,关于 Logstash 中 ruby 插件的使用方式、部分原理和示例说明完毕,对以上内容有问题的小伙伴欢迎评论区留言交流。


发布于: 2021 年 03 月 28 日阅读数: 8
用户头像

Langer

关注

还未添加个人签名 2021.03.22 加入

还未添加个人简介

评论

发布
暂无评论
Logstash 中 Ruby filter 使用指南