引言
许多小伙伴在使用 ELK 进行数据处理的过程中,对 Logstash 中 rugy 插件的使用产生许多疑问,Logstash 插件功能强大,详细理清 ruby 插件的使用方法对小伙伴日常工作多有助益。本文将对 ruby 插件的使用方式、部分原理进行详细说明,并附带丰富示例帮助小伙伴们理解消化。
Logstash 插件原理简述
数据在 logstash 中从 input 插件流入到 filter 插件,最终通过 output 插件输出。在 fiter 阶段,可以通过 logstash 提供的数十个 filter 插件处理 event 中各字段的数据,包括但不限于 event 过滤、字段增加、字段删除、字段匹配选择修改等。logstash 工具本身由 java 和 ruby 实现,Ruby 插件灵活性高,处理 logstash 事件实用高效。
ruby 插件介绍
ruby 插件的使用包含两种方式:inline 和 script,在 ruby 插件的配置选项中,配置参数如下所述,其中 code 和 init 用在 inline 方式中, path 和 script_params 用在 script 方式中。 tag_on_exception 参数用在 ruby 代码运行出错时的处理中。
inline 方式
inline 方式通过将需要执行的 ruby 代码以字符串的形式配置在 logstash 的配置文件中,如下所示:
filter {
ruby {
init => "ruby code run in plugin init stage"
code => "ruby code run within every event"
}
}
复制代码
在 inline 方式中, init 中的代码在插件启动时执行, code 中的代码在每次 event 中都会执行,所以 init 中可以放置外部库导入代码,实例参数初始化代码等, init 和 code 在 inline 方式中起到的作用和 script 中的 register 方法和 filter 方法起到的作用分别对应。后面介绍 script 方式时会具体介绍。
init:初始化中执行的 ruby 代码
code:每条 event 都会执行的代码
script 方式
如果插件处理逻辑不复杂,代码量不大,可以直接用 inline 方式使用插件就好,但如果逻辑复杂且代码量大,可以考虑使用 script 方式使用插件,优点是代码易读易维护。script 方式用到另外两个配置参数: path 和 script_params
script 方式和 inline 方式最大的不同就是 script 方式需要按照插件要求的格式完成两个函数的编写,脚本模板如下:
def register(params)
end
def filter(event)
return [event]
end
复制代码
其中 register 起到的作用和 inline 模式中 init 类似,filter 和 code 类似,两个方法有各自要求和特点:
register方法与script_params配置参数相关,可通过其传参数到register
filter方法返回有具体要求,即返回event对象组成的数组。
复制代码
script 方式配置文件和脚本文件缺一不可,配置方式如下:
ruby {
path => "path in logstash environment"
script_params => { "param_key" => "param_value" }
}
复制代码
event 对象
在使用这 inline 和 script 两种方式使用 Ruby 插件时,代码中能操作的对象并不多,通过如下代码,可以输出 code setting 中代码的运行 context 有哪些:
ruby {
code => "puts local_variables; "
}
复制代码
logstash 运行输出两个变量: event 和 new_event_block ,其中 new_event_block 只在需要主动生成新的 event 时才起作用,需要重点关注的是 event 对象。接下来,查看一下 event 对象有哪些变量和方法可以使用。
ruby {
code => "puts event.instance_variables; puts event.public_methods"
}
复制代码
运行发现,event 对象并没有任何实例变量,公共方法却有不少,其中,比较重要的方法有:
to_json, to_hash : 格式化输出
set, get, remove, cancel :方法用来对数据字段进行自定义化的操作
复制代码
举例说明
为了更好理解 ruby 插件各个参数的作用,以及 inline 方式和 script 方式的具体使用方法,通过几个例子来具体说明,举例按照从易到难 的方式排列。
为了更好说明 ruby 插件的使用方法,下列所有举例都使用相同的运行环境和测试数据。
运行环境为: kafka producer —-> kafka —-> logstash —-> es
测试数据为:
{
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
}
复制代码
两种方式读取 event 中数据 data 字段
配置代码:
ruby {
code => "p event.get('[data]')"
}
复制代码
output: “test data”
脚本代码:
def register(params)
end
def filter(event)
p event.get('[data]')
return [event]
end
复制代码
output: “test data”
两种方式删除 event 数据字段
配置代码:
ruby {
code => "event.remove('data')"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
event.remove('data')
return [event]
end
复制代码
两种方式修改 event 中原有数据字段
配置代码:
ruby {
code => "event.set('[data]', 'new data')"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
event.set('[data]', 'new data')
return [event]
end
复制代码
两种方式新增 event 中数据字段
配置代码:
ruby {
code => "event.set('[new_key]', 'new data') if not event.include?('new_key')"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
if not event.include?('new_key')
event.set('[new_key]', 'new data')
end
return [event]
end
复制代码
两种方式匹配 event 中数据字段的字段⻓度
配置代码:
ruby {
code => "id_valid = event.get('[id]').length;event.set('[is_id_valid]', 1) if id_valid != 24"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
id_valid = event.get('[id]').length
if id_valid == 24
event.set('[is_id_valid]', 0)
else
event.set('[is_id_valid]', 1)
end
return [event]
end
复制代码
两种方式匹配 event 中数据字段的字段是否包含 html 标签
配置代码: *
ruby {
code => "is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/);event.set('[is_data_valid]', 0) if not is_html"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/)
if is_html
event.set('[is_data_valid]', 1 )
else
event.set('[is_data_valid]', 0 )
end
return [event]
end
复制代码
两种方式匹配 event 中数据字段中时间格式是否为 UTC 格式
配置代码:
ruby {
code => "is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/);event.set('[is_time_valid]', 0) if
is_utc"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/)
if is_utc
event.set('[is_time_valid]', 0 )
else
event.set('[is_data_valid]', 1 )
end
return [event]
end
复制代码
两种方式匹配 event 中数据字段是否为合法 URL
配置代码:
ruby {
code => "is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/);event.set('[is_url_valid]', 0) if is_url"
}
复制代码
脚本代码:
def register(params)
end
def filter(event)
is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/)
if is_url
event.set('[is_url_valid]', 0 )
else
event.set('[is_url_valid]', 1 )
end
return [event]
end
复制代码
插件中引入外部库
当 event 内置 API 和 ruby string 的各类方法不能满足需求时,可以引入 ruby 的公共库来解决相关问题。下面距离说明引入 Ruby 公共 json 解析库来解析 event 相关字段,示例如下:
inline 方式:
ruby {
init => "require 'json'"
code => "data=event.to_json;new_data=JSON.parse(data);event.set('[new_data]', new_data)"
}
复制代码
output:
{
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
"new_data" => {
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
}
"@version" => "1",
"@timestamp" => 2020 - 07 - 16T02: 11 :39.081Z
}
复制代码
脚本方式:
def register(params)
#para1 = params['param_key']
#p para
#output: "param_value"
require 'json'
end
def filter(event)
data=event.to_json
new_data=JSON.parse(data)
event.set('[newdata]', newdata)
return [event]
end
复制代码
output: 与 inline 方式相同
至此,关于 Logstash 中 ruby 插件的使用方式、部分原理和示例说明完毕,对以上内容有问题的小伙伴欢迎评论区留言交流。
评论