写点什么

ElasticSearch 从入门到精通:数据导入

作者:Jackpop
  • 2022 年 6 月 30 日
  • 本文字数:5186 字

    阅读完需:约 17 分钟

> hello,大家好,我是 Jackpop,硕士毕业于哈尔滨工业大学,曾在华为、阿里等大厂工作,如果你对升学、就业、技术提升等有疑惑,不妨交个朋友:

[我是 Jackpop,我们交个朋友吧!](https://mp.weixin.qq.com/s/fCHn8JpLQDH-M_QkVxwR1w)


在第二部分中,我们学习了如何在 ElasticSearch 中执行搜索。但是,我们无法使用其批量 API 将.json 数据文件导入 ElasticSearch。


在这部分中,我们将进行一些编程,并学习一些有关如何将.json 飞行数据文件导入 ElasticSearch 的方法:


  • 通过将.json 数据文件转换为 ElasticSearch 的 API 需要的格式

  • 通过解析.json 数据文件,使用 JSON 库(例如 gson)提取其值,然后使用 ElasticSearch 的 REST API 导入数据

数据转换

ElasticSearch 对数据格式有特定的格式要求:


{``"index"``:{``"_id"``:4800770}}{``"Rcvr"``:1,``"HasSig"``:``false``,``"Icao"``:``"494102"``, ``"Bad"``:``false``,``"Reg"``:``"CS-PHB"``, ...}...
复制代码


这就意味着,你需要把下载的每一份 json 数据按照上述格式进行转换。主要满足如下 2 点:


  • 在每个数据文档前面加入一行以 index 开头的数据

  • 把"Id":<value>修改为{"_id":<value>}


我们可以通过编写简单的 Java 程序,快速把 json 文件转换成对应格式:


package com.jgc;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import static java.util.stream.Collectors.toList;/** * Converts a flight data json file to a format that can be imported to  * ElasticSearch using its bulk API. */public class JsonFlightFileConverter {    private static final Path flightDataJsonFile =         Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");    public static void main(String[] args) {        List<String> list = new ArrayList<>();        try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {            list = stream                    .map(line -> line.split("\{"))                    .flatMap(Arrays::stream)                    .collect(toList());        } catch (IOException e) {            e.printStackTrace();        }        System.out.println(list);    }}
复制代码


最后,通过简单的拼接,输出我们想要的结果:


final String result = list.stream().skip(3)                .map(s -> "{" + s + "\n")                .collect(Collectors.joining());System.out.println(result);
复制代码


现在,可以看到输出已经非常接近我们想要的结果:


{"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...
复制代码


实际上,我们可以将最后一个代码片段添加到原始流中,如下所示:


String result = "";try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {     result = stream            .map(line -> line.split("\{"))            .flatMap(Arrays::stream)            .skip(3)            .map(s -> "{" + s + "\n")            .collect(Collectors.joining());} catch (IOException e) {    e.printStackTrace();}
复制代码


现在,我们需要在每行的上方插入新行,其中包含文档的索引,如下所示:


{"index":{"_id":4800770}}
复制代码


我们可以创建一个函数,这样处理会更加简洁明了:


private static String insertIndex(String s) {    final String[] keyValues = s.split(",");    final String[] idKeyValue = keyValues[0].split(":");    return "{"index":{"_id":"+ idKeyValue[1] +"}}\n";}
复制代码


这样,就可以对每个输入进行转换,给出我们需要的输出。


我们还需要解决的更多细节,从每个文档中删除最后一个逗号。


private static String removeLastComma(String s) {    return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;}
复制代码


这时候,数据处理代码就变成了下面这个样子:


public class JsonFlightFileConverter {  public static void main(String[] args) {  if (args.length == 1) {    Path inDirectoryPath = Paths.get(args[0]);    if (inDirectoryPath != null) {        Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");        try {            if (Files.exists(outDirectoryPath)) {                Files.walk(outDirectoryPath)                        .sorted(Comparator.reverseOrder())                        .map(Path::toFile)                        .forEach(File::delete);            }            Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));        } catch (IOException e) {            e.printStackTrace();        }        try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {            for (Path inFlightDataJsonFile : ds) {                String result = "";                try (Stream stream =                      Files.lines(inFlightDataJsonFile.toAbsolutePath())) {            result = stream                      .parallel()                      .map(line -> line.split("\{"))                      .flatMap(Arrays::stream)                      .skip(3)                      .map(s -> createResult(s))                      .collect(Collectors.joining());                Path outFlightDataJsonFile =                      Paths.get(outDirectoryPath.toString(),                                inFlightDataJsonFile.getFileName().toString());                Files.createFile(outFlightDataJsonFile);                Files.writeString(outFlightDataJsonFile, result);            }        } catch (IOException e) {            e.printStackTrace();        }    } } else {    System.out.println("Usage: java JsonFlightFileConverter "); }...
复制代码

使用 ElasticSearch 的批量 API 导入数据

需要再次强调,文件必须以空行结尾。 如果不是,则添加一个(实际上前面的程序已经在文件末尾添加了换行符)。


在产生新的.json 文件的目录(输出目录)内,执行以下命令:


curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"
复制代码


请注意,内容类型是 application / x-ndjson,而不是 application / x-json。


还要注意,我们将数据表示为二进制以便保留换行符。 文件名为 2016-07-01-1300Z.json。


ElasticSearch 中任何具有相同 ID 的现有文档都将被.json 文件中的文档替换。


最后,可以发现有 7679 文件被导入:


"hits" : {    "total" : {      "value" : 7679,      "relation" : "eq"    },GET /_cat/shards?v
复制代码


返回结果:


index   shard prirep state      docs   store ip        nodeflight  0     p      STARTED    7679   71mb 127.0.0.1 MacBook-Pro.localflight  0     r      UNASSIGNED
复制代码

解析 JSON 数据

将这些文档导入 ElasticSearch 的另一种方法是将 JSON 数据文件解析到内存中,并使用 ElasticSearch 的 REST API 将其导入 ElasticSearch。


有许多库可用于解析 Java 中的 JSON 文件:


  • GSon

  • Jackson

  • mJson

  • JSON-Simple

  • JSON-P


我们将使用 Google 的 GSon 库,但其他任何 JSON 库都可以完成此工作。


GSon 提供了多种表示 JSON 数据的方法,具体使用哪一种,则取决于下一步,即如何将数据导入到 ElasticSearch。


ElasticSearch API 要求数据的格式为:Map<String, Object>,这是我们将解析后的 JSON 数据存储到的位置。


首先,将下面依赖加入到 pom.xml 中:


<dependency>    <groupId>com.google.code.gson</groupId>    <artifactId>gson</artifactId>    <version>2.8.6</version></dependency>
复制代码


使用下方代码解析 json 数据:


package com.jcg; import com.google.gson.Gson;import com.google.gson.internal.LinkedTreeMap;import com.google.gson.reflect.TypeToken;import java.io.BufferedReader;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Paths;import java.util.List;import java.util.Map; public class JsonFlightFileReader {    private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";    private static final Gson gson = new Gson();    public static void main(String[] args) {        parseJsonFile(flightDataJsonFile);    }    private static void parseJsonFile(String file) {        try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {            Map<String, Object> map = gson.fromJson(reader,                        new TypeToken<Map<String, Object>>() { }.getType());            List<Object> acList = (List<Object>) (map.get("acList"));            for (Object item : acList) {                LinkedTreeMap<String, Object> flight =                         (LinkedTreeMap<String, Object>) item;                for (Map.Entry<String, Object> entry : flight.entrySet()) {                    String key = entry.getKey();                    Object value = entry.getValue();                    String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";                    System.out.print(outEntry);                }                System.out.println("}");            }        } catch (IOException e) {            e.printStackTrace();        }    }}
复制代码


通过下述方法可以使用数据:


Map<String, Object> map = gson.fromJson(reader, new TypeToken<Map<String, Object>>() {}.getType());List<Object> acList = (List<Object>) (map.get("acList"));
复制代码

使用 ElasticSearch REST API 导入数据

首先,在 pom.xml 中加入下方依赖:


<dependency>    <groupId>org.elasticsearch.client</groupId>    <artifactId>elasticsearch-rest-client</artifactId>    <version>7.10.0</version></dependency>
复制代码


我们可以通过 RestClient 与 ElasticSearch 进行交互:


RestClient restClient = RestClient.builder(    new HttpHost("localhost", 9200, "http"));.setDefaultHeaders(new Header[]{        new BasicHeader("accept", "application/json"),        new BasicHeader("content-type", "application/json")}).setFailureListener(new RestClient.FailureListener() {    public void onFailure(Node node) {        System.err.println("Low level Rest Client Failure on node " +                node.getName());    }}).build();
复制代码


创建好 RestClient 之后,下一步就是创建一个 Request,并将 json 数据传递给它:


Request request = new Request("POST", "/flight/_doc/4800770");String jsonDoc = "{"Rcvr":1,"HasSig":false,"Icao":"494102",...]}";request.setJsonEntity(jsonDoc);
复制代码


最后,我们发送请求。


有两种方式,同步:


Response response = restClient.performRequest(request);if (response.getStatusLine().getStatusCode() != 200) {    System.err.println("Could not add document with Id: " + id + " to index /flight");}
复制代码


异步:


Cancellable cancellable = restClient.performRequestAsync(request,    new ResponseListener() {        @Override        public void onSuccess(Response response) {            System.out.println("Document with Id: " + id + " was successfully added to index /flight");        }         @Override        public void onFailure(Exception exception) {            System.err.println("Could not add document with Id: " + id + " to index /flight");        }});
复制代码


最后,不要忘记关闭 restClient 连接:


} finally {    try {        restClient.close();    } catch (IOException e) {        e.printStackTrace();    }}
复制代码


这部分,我们重点介绍了如何将.json 数据批处理文件导入到 ElasticSearch。


我们看到了如何通过两种方式做到这一点:


  • 使用 ElasticSearch 的批量 API,

  • 使用 JSON 库解析.json 文件


你可以根据自己的情况自行选择其中一种方法。

发布于: 刚刚阅读数: 4
用户头像

Jackpop

关注

还未添加个人签名 2020.09.16 加入

公众号:平凡而诗意,微信:code_7steps,全网粉丝超20万,技术进阶、优质资源、实用工具,欢迎关注!

评论

发布
暂无评论
ElasticSearch从入门到精通:数据导入_Jackpop_InfoQ写作社区