Logstash 同步 MySQL 关联表到 Elasticsearch 的嵌套文档中
作者:北桥苏
- 2023-05-13 广东
本文字数:2612 字
阅读完需:约 9 分钟
前言:
上一篇实践了通过 Logstash 同步 MySQL 的几张关联表到 Elasticsearch 中。为了实现同一种业务需求,嵌套文档在资源开销和查询速度上要优于父子文档(针对少量数据的情况)。所以以下就实践一下嵌套文档的基本使用和,以及 Logstash 如何同步一对多关系表到 ElasticSearch 的嵌套文档中。
RESTful 模拟:
以下以博客内容和博客评论为例,从映射创建,到增,删,改,查,聚合演示嵌套文档的使用方法,索引名 “blog_new”。
1. 创建映射
PUT blog_new
{
"mappings": {
"properties": {
"title": {
"type": "text"
},
"body": {
"type": "text"
},
"tags": {
"type": "keyword"
},
"published_on": {
"type": "keyword"
},
"comments": {
"type": "nested",
"properties": {
"name": {
"type": "text"
},
"comment": {
"type": "text"
},
"age": {
"type": "short"
},
"rating": {
"type": "short"
},
"commented_on": {
"type": "text"
}
}
}
}
}
}
复制代码
2. 添加
POST blog_new/blog/2
{
"title": "Hero",
"body": "Hero test body...",
"tags": ["Heros", "happy"],
"published_on": "6 Oct 2018",
"comments": [
{
"name": "steve",
"age": 24,
"rating": 18,
"comment": "Nice article..",
"commented_on": "3 Nov 2018"
}
]
}
复制代码
3. 删除
POST blog_new/blog/1/_update
{
"script": {
"lang": "painless",
"source": "ctx._source.comments.removeIf(it -> it.name == 'John');"
}
}
复制代码
4. 修改
POST blog_new/blog/2/_update
{
"script": {
"source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}"
}
}
复制代码
5. 查询
GET /blog_new/_search?pretty
{
"query": {
"bool": {
"must": [
{
"nested": {
"path": "comments",
"query": {
"bool": {
"must": [
{
"match": {
"comments.name": "William"
}
},
{
"match": {
"comments.age": 34
}
}
]
}
}
}
}
]
}
}
}
复制代码
6. 聚合
GET blog_new/_search
{
"size": 0,
"aggs": {
"comm_aggs": {
"nested": {
"path": "comments"
},
"aggs": {
"min_age": {
"min": {
"field": "comments.age"
}
}
}
}
}
}
复制代码
Logstash 同步:
同步到 ES 的嵌套文档和前面的父子文档就有点不一样了,这里只需要一个 jdbc。合并主要是通过关联查询出结果,然后聚合导入到 ElasticSearch 中。以下还是以博客和评论为例,创建索引映射和其他 MySQL 表之类的就省略,直接看运行命令。
1. 创建嵌套文档索引和映射
可以用上面 RESTful 方式的映射创建进行修改,主要的是嵌套的类型是 nested,执行配置前运行 SQL 查询效果如下。
2. 配置同步代码
input {
stdin {}
jdbc {
jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => "root"
schedule => "*/1 * * * *"
statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id"
}
}
filter {
aggregate {
task_id => "%{community_id}"
code => "
map['id'] = event.get('community_id')
map['content'] = event.get('content')
map['location'] = event.get('location')
map['images'] = event.get('images')
map['comment_list'] ||=[]
map['comment'] ||=[]
if (event.get('comment_id') != nil)
if !(map['comment_list'].include? event.get('comment_id'))
map['comment_list'] << event.get('comment_id')
map['comment'] << {
'comment_id' => event.get('comment_id'),
'content' => event.get('comment_content')
}
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
json {
source => "message"
remove_field => ["message"]
#remove_field => ["message", "type", "@timestamp", "@version"]
}
mutate {
#将不需要的JSON字段过滤,且不会被存入 ES 中
remove_field => ["tags", "@timestamp", "@version"]
}
}
output {
stdout {
#codec => json_lines
}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "test_nested_community_content"
document_id => "%{id}"
}
}
复制代码
3. 运行命令开始同步
bin\logstash -f mysql\mysql.conf
复制代码
4. 查询
划线
评论
复制
发布于: 刚刚阅读数: 5
北桥苏
关注
公众号:ZERO开发 2023-05-08 加入
专注后端实战技术分享,不限于PHP,Python,JavaScript, Java等语言,致力于给猿友们提供有价值,有干货的内容。
评论