> hello,大家好,我是 Jackpop,硕士毕业于哈尔滨工业大学,曾在华为、阿里等大厂工作,如果你对升学、就业、技术提升等有疑惑,不妨交个朋友:
[我是 Jackpop,我们交个朋友吧!](https://mp.weixin.qq.com/s/fCHn8JpLQDH-M_QkVxwR1w)
在第二部分中,我们学习了如何在 ElasticSearch 中执行搜索。但是,我们无法使用其批量 API 将.json 数据文件导入 ElasticSearch。
在这部分中,我们将进行一些编程,并学习一些有关如何将.json 飞行数据文件导入 ElasticSearch 的方法:
数据转换
ElasticSearch 对数据格式有特定的格式要求:
{``"index"``:{``"_id"``:4800770}}
{``"Rcvr"``:1,``"HasSig"``:``false``,``"Icao"``:``"494102"``, ``"Bad"``:``false``,``"Reg"``:``"CS-PHB"``, ...}
...
复制代码
这就意味着,你需要把下载的每一份 json 数据按照上述格式进行转换。主要满足如下 2 点:
我们可以通过编写简单的 Java 程序,快速把 json 文件转换成对应格式:
package com.jgc;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* Converts a flight data json file to a format that can be imported to
* ElasticSearch using its bulk API.
*/
public class JsonFlightFileConverter {
private static final Path flightDataJsonFile =
Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
public static void main(String[] args) {
List<String> list = new ArrayList<>();
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
list = stream
.map(line -> line.split("\{"))
.flatMap(Arrays::stream)
.collect(toList());
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(list);
}
}
复制代码
最后,通过简单的拼接,输出我们想要的结果:
final String result = list.stream().skip(3)
.map(s -> "{" + s + "\n")
.collect(Collectors.joining());
System.out.println(result);
复制代码
现在,可以看到输出已经非常接近我们想要的结果:
{"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...
复制代码
实际上,我们可以将最后一个代码片段添加到原始流中,如下所示:
String result = "";
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
result = stream
.map(line -> line.split("\{"))
.flatMap(Arrays::stream)
.skip(3)
.map(s -> "{" + s + "\n")
.collect(Collectors.joining());
} catch (IOException e) {
e.printStackTrace();
}
复制代码
现在,我们需要在每行的上方插入新行,其中包含文档的索引,如下所示:
{"index":{"_id":4800770}}
复制代码
我们可以创建一个函数,这样处理会更加简洁明了:
private static String insertIndex(String s) {
final String[] keyValues = s.split(",");
final String[] idKeyValue = keyValues[0].split(":");
return "{"index":{"_id":"+ idKeyValue[1] +"}}\n";
}
复制代码
这样,就可以对每个输入进行转换,给出我们需要的输出。
我们还需要解决的更多细节,从每个文档中删除最后一个逗号。
private static String removeLastComma(String s) {
return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;
}
复制代码
这时候,数据处理代码就变成了下面这个样子:
public class JsonFlightFileConverter {
public static void main(String[] args) {
if (args.length == 1) {
Path inDirectoryPath = Paths.get(args[0]);
if (inDirectoryPath != null) {
Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");
try {
if (Files.exists(outDirectoryPath)) {
Files.walk(outDirectoryPath)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));
} catch (IOException e) {
e.printStackTrace();
}
try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {
for (Path inFlightDataJsonFile : ds) {
String result = "";
try (Stream stream =
Files.lines(inFlightDataJsonFile.toAbsolutePath())) {
result = stream
.parallel()
.map(line -> line.split("\{"))
.flatMap(Arrays::stream)
.skip(3)
.map(s -> createResult(s))
.collect(Collectors.joining());
Path outFlightDataJsonFile =
Paths.get(outDirectoryPath.toString(),
inFlightDataJsonFile.getFileName().toString());
Files.createFile(outFlightDataJsonFile);
Files.writeString(outFlightDataJsonFile, result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
System.out.println("Usage: java JsonFlightFileConverter ");
}
...
复制代码
使用 ElasticSearch 的批量 API 导入数据
需要再次强调,文件必须以空行结尾。 如果不是,则添加一个(实际上前面的程序已经在文件末尾添加了换行符)。
在产生新的.json 文件的目录(输出目录)内,执行以下命令:
curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"
复制代码
请注意,内容类型是 application / x-ndjson,而不是 application / x-json。
还要注意,我们将数据表示为二进制以便保留换行符。 文件名为 2016-07-01-1300Z.json。
ElasticSearch 中任何具有相同 ID 的现有文档都将被.json 文件中的文档替换。
最后,可以发现有 7679 文件被导入:
"hits" : {
"total" : {
"value" : 7679,
"relation" : "eq"
},
GET /_cat/shards?v
复制代码
返回结果:
index shard prirep state docs store ip node
flight 0 p STARTED 7679 71mb 127.0.0.1 MacBook-Pro.local
flight 0 r UNASSIGNED
复制代码
解析 JSON 数据
将这些文档导入 ElasticSearch 的另一种方法是将 JSON 数据文件解析到内存中,并使用 ElasticSearch 的 REST API 将其导入 ElasticSearch。
有许多库可用于解析 Java 中的 JSON 文件:
GSon
Jackson
mJson
JSON-Simple
JSON-P
我们将使用 Google 的 GSon 库,但其他任何 JSON 库都可以完成此工作。
GSon 提供了多种表示 JSON 数据的方法,具体使用哪一种,则取决于下一步,即如何将数据导入到 ElasticSearch。
ElasticSearch API 要求数据的格式为:Map<String, Object>,这是我们将解析后的 JSON 数据存储到的位置。
首先,将下面依赖加入到 pom.xml 中:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
复制代码
使用下方代码解析 json 数据:
package com.jcg;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
public class JsonFlightFileReader {
private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";
private static final Gson gson = new Gson();
public static void main(String[] args) {
parseJsonFile(flightDataJsonFile);
}
private static void parseJsonFile(String file) {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {
Map<String, Object> map = gson.fromJson(reader,
new TypeToken<Map<String, Object>>() { }.getType());
List<Object> acList = (List<Object>) (map.get("acList"));
for (Object item : acList) {
LinkedTreeMap<String, Object> flight =
(LinkedTreeMap<String, Object>) item;
for (Map.Entry<String, Object> entry : flight.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";
System.out.print(outEntry);
}
System.out.println("}");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
通过下述方法可以使用数据:
Map<String, Object> map = gson.fromJson(reader, new TypeToken<Map<String, Object>>() {}.getType());
List<Object> acList = (List<Object>) (map.get("acList"));
复制代码
使用 ElasticSearch REST API 导入数据
首先,在 pom.xml 中加入下方依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.10.0</version>
</dependency>
复制代码
我们可以通过 RestClient 与 ElasticSearch 进行交互:
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
.setDefaultHeaders(new Header[]{
new BasicHeader("accept", "application/json"),
new BasicHeader("content-type", "application/json")})
.setFailureListener(new RestClient.FailureListener() {
public void onFailure(Node node) {
System.err.println("Low level Rest Client Failure on node " +
node.getName());
}
}).build();
复制代码
创建好 RestClient 之后,下一步就是创建一个 Request,并将 json 数据传递给它:
Request request = new Request("POST", "/flight/_doc/4800770");
String jsonDoc = "{"Rcvr":1,"HasSig":false,"Icao":"494102",...]}";
request.setJsonEntity(jsonDoc);
复制代码
最后,我们发送请求。
有两种方式,同步:
Response response = restClient.performRequest(request);
if (response.getStatusLine().getStatusCode() != 200) {
System.err.println("Could not add document with Id: " + id + " to index /flight");
}
复制代码
异步:
Cancellable cancellable = restClient.performRequestAsync(request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
System.out.println("Document with Id: " + id + " was successfully added to index /flight");
}
@Override
public void onFailure(Exception exception) {
System.err.println("Could not add document with Id: " + id + " to index /flight");
}
});
复制代码
最后,不要忘记关闭 restClient 连接:
} finally {
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
复制代码
这部分,我们重点介绍了如何将.json 数据批处理文件导入到 ElasticSearch。
我们看到了如何通过两种方式做到这一点:
你可以根据自己的情况自行选择其中一种方法。
评论