写点什么

Logstash 同步 MySQL 关联表到 Elasticsearch 的嵌套文档中

作者:北桥苏
  • 2023-05-13
    广东
  • 本文字数:2612 字

    阅读完需:约 9 分钟

前言:

​ 上一篇实践了通过 Logstash 同步 MySQL 的几张关联表到 Elasticsearch 中。为了实现同一种业务需求,嵌套文档在资源开销和查询速度上要优于父子文档(针对少量数据的情况)。所以以下就实践一下嵌套文档的基本使用和,以及 Logstash 如何同步一对多关系表到 ElasticSearch 的嵌套文档中。

RESTful 模拟:

​ 以下以博客内容和博客评论为例,从映射创建,到增,删,改,查,聚合演示嵌套文档的使用方法,索引名 “blog_new”。


1. 创建映射


PUT blog_new{ "mappings": {    "properties": {        "title": {          "type": "text"        },        "body": {          "type": "text"        },        "tags": {          "type": "keyword"        },        "published_on": {          "type": "keyword"        },        "comments": {          "type": "nested",          "properties": {            "name": {              "type": "text"            },            "comment": {              "type": "text"            },            "age": {              "type": "short"            },            "rating": {              "type": "short"            },            "commented_on": {              "type": "text"            }          }        }      }  }}
复制代码


2. 添加


POST blog_new/blog/2{  "title": "Hero",  "body": "Hero test body...",  "tags": ["Heros", "happy"],  "published_on": "6 Oct 2018",  "comments": [    {      "name": "steve",      "age": 24,      "rating": 18,      "comment": "Nice article..",      "commented_on": "3 Nov 2018"    }  ]}
复制代码


3. 删除


POST  blog_new/blog/1/_update{ "script": {    "lang": "painless",    "source": "ctx._source.comments.removeIf(it -> it.name == 'John');" }}
复制代码


4. 修改


POST blog_new/blog/2/_update{  "script": {    "source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}"   }}
复制代码


5. 查询


GET /blog_new/_search?pretty{  "query": {    "bool": {      "must": [        {          "nested": {            "path": "comments",            "query": {              "bool": {                "must": [                  {                    "match": {                      "comments.name": "William"                    }                  },                  {                    "match": {                      "comments.age": 34                    }                  }                ]              }            }          }        }      ]    }  }}
复制代码


6. 聚合


GET blog_new/_search{  "size": 0,  "aggs": {    "comm_aggs": {      "nested": {        "path": "comments"      },      "aggs": {        "min_age": {          "min": {            "field": "comments.age"          }        }      }    }  }}
复制代码

Logstash 同步:

​ 同步到 ES 的嵌套文档和前面的父子文档就有点不一样了,这里只需要一个 jdbc。合并主要是通过关联查询出结果,然后聚合导入到 ElasticSearch 中。以下还是以博客和评论为例,创建索引映射和其他 MySQL 表之类的就省略,直接看运行命令。


1. 创建嵌套文档索引和映射


可以用上面 RESTful 方式的映射创建进行修改,主要的是嵌套的类型是 nested,执行配置前运行 SQL 查询效果如下。



2. 配置同步代码


input {
stdin {} jdbc { jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false" jdbc_user => root jdbc_password => "root" schedule => "*/1 * * * *" statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id" } } filter { aggregate { task_id => "%{community_id}" code => " map['id'] = event.get('community_id') map['content'] = event.get('content') map['location'] = event.get('location') map['images'] = event.get('images') map['comment_list'] ||=[] map['comment'] ||=[] if (event.get('comment_id') != nil) if !(map['comment_list'].include? event.get('comment_id')) map['comment_list'] << event.get('comment_id') map['comment'] << { 'comment_id' => event.get('comment_id'), 'content' => event.get('comment_content') } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } json { source => "message" remove_field => ["message"] #remove_field => ["message", "type", "@timestamp", "@version"] } mutate { #将不需要的JSON字段过滤,且不会被存入 ES 中 remove_field => ["tags", "@timestamp", "@version"] } } output { stdout { #codec => json_lines } elasticsearch { hosts => ["127.0.0.1:9200"] index => "test_nested_community_content" document_id => "%{id}" }}
复制代码


3. 运行命令开始同步


bin\logstash -f mysql\mysql.conf
复制代码



4. 查询




用户头像

北桥苏

关注

公众号:ZERO开发 2023-05-08 加入

专注后端实战技术分享,不限于PHP,Python,JavaScript, Java等语言,致力于给猿友们提供有价值,有干货的内容。

评论

发布
暂无评论
Logstash同步MySQL关联表到Elasticsearch的嵌套文档中_elasticsearch_北桥苏_InfoQ写作社区