只需要花五分钟时间掌握 ES 聚合操作
作者:程序员万金游
- 2023-08-30 湖南
本文字数:5484 字
阅读完需:约 18 分钟
根据指定字段的值进行聚合(分类)
REST API 示例
GET http://139.198.152.90:9200/elasticsearch-client/_search
{
"aggs": {
"my-agg-name": {
"terms": {
"field": "name"
}
}
}
}
// ====== 返回的结果 只展示 aggregations 部分 ======
"aggregations": {
"my-agg-name": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 6,
"buckets": [
{
"key": "lisi1",
"doc_count": 6
},
{
"key": "ii1",
"doc_count": 1
},
{
"key": "lisi0",
"doc_count": 1
}
]
}
}
复制代码
从响应体可以看出,是根据 name
属性进行了分类聚合,将指定属性的值作为 key
且展示类对应类别的条数。
Java high level rest client 方式
// 根据指定字段进行聚合操作
@Test
public void testAggregations () throws IOException {
// 查询 source 对象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 只演示聚合操作不关注数据本身,设置成 0
searchSourceBuilder.size(0);
// 将聚合条件设置到查询 source 对象中
String bucketName = "terms-agg-name";
searchSourceBuilder.aggregation(AggregationBuilders.terms(bucketName).field("name"));
// 构建查询请求对象
SearchRequest searchRequest = new SearchRequest(INDEX_NAME).source(searchSourceBuilder);
// 进行查询
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
if (!RestStatus.OK.equals(searchResponse.status())) {
log.info("请求失败");
} else {
// 这边使用什么类型的 Aggregation ,就使用什么类型接;例如这边使用的是 TermsAggregationBuilder 构建的查询条件 就需要使用 Terms 来接
Terms terms = searchResponse.getAggregations().get(bucketName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
log.info("== bucket: key: {}, docCount: {}", bucket.getKeyAsString(), bucket.getDocCount());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
复制代码
控制台打印为:
2021-12-29 15:15:11.651 INFO 16009 --- [ main] a.e.RestHighLevelClientAggregationsTests : == bucket: key: lisi1, docCount: 6
2021-12-29 15:15:11.654 INFO 16009 --- [ main] a.e.RestHighLevelClientAggregationsTests : == bucket: key: ii1, docCount: 1
复制代码
可以看到数据已经按照名字进行分类聚合了。
改变聚合的作用域
REST API 示例
先通过 query
筛选出符合条件的数据,然后在经过聚合操作进行聚合。
GET http://139.198.152.90:9200/elasticsearch-client/_search
{
"query": {
"wildcard": {
"name": "*lisi*"
}
}
,"aggs": {
"my-agg-name": {
"terms": {
"field": "name"
}
}
}
}
// ====== 返回的结果 只展示 aggregations 部分 ======
"aggregations": {
"my-agg-name": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 6,
"buckets": [
{
"key": "lisi1",
"doc_count": 6
},
{
"key": "lisi0",
"doc_count": 1
}
]
}
}
复制代码
因为在 query
添加了 name
只能是包含 lisi
字段的,所以:
{
"key": "ii1",
"doc_count": 1
}
复制代码
这条记录就被排除了。
Tips:
如果使用者只关注返回的聚合信息,而不关注数据的本身的话,可以将 size 字段设置为 0,这样既可以减小网络开销又不会有多余数据的干扰。
复制代码
Java high level rest client 方式
// 先进行 query 筛选信息,然后根据指定字段进行聚合操作
@Test
public void testAggregationsWithQuery () throws IOException {
// 查询 source 对象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 只演示聚合操作不关注数据本身,设置成 0
searchSourceBuilder.size(0);
// 设置查询条件
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 必须是 name 包含 lisi 才参与聚合
boolQueryBuilder.must(QueryBuilders.wildcardQuery("name", "*lisi*"));
// 将查询条件设置到查询 source 对象中
searchSourceBuilder.query(boolQueryBuilder);
// 将查询条件设置到查询 source 对象中
String bucketName = "terms-agg-name";
searchSourceBuilder.aggregation(AggregationBuilders.terms(bucketName).field("name").size(2));
// 构建查询请求对象
SearchRequest searchRequest = new SearchRequest(INDEX_NAME).source(searchSourceBuilder);
// 进行查询
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
if (!RestStatus.OK.equals(searchResponse.status())) {
log.info("请求失败");
} else {
// 这边使用什么类型的 Aggregation ,就使用什么类型接;例如这边使用的是 TermsAggregationBuilder 构建的查询条件 就需要使用 Terms 来接
Terms terms = searchResponse.getAggregations().get(bucketName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
log.info("== bucket: key: {}, docCount: {}", bucket.getKeyAsString(), bucket.getDocCount());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
复制代码
控制台输出:
2021-12-29 15:16:17.551 INFO 16060 --- [ main] a.e.RestHighLevelClientAggregationsTests : == bucket: key: lisi1, docCount: 6
2021-12-29 15:16:17.552 INFO 16060 --- [ main] a.e.RestHighLevelClientAggregationsTests : == bucket: key: lisi3, docCount: 1
复制代码
可以看出下面这条已经被排除了
2021-12-29 14:29:56.601 INFO 14168 --- [ main] a.e.RestHighLevelClientAggregationsTests : == bucket: key: ii1, docCount: 1
复制代码
执行多条聚合操作
GET http://139.198.152.90:9200/elasticsearch-client/_search
{
"size": 0,
"query": {
"wildcard": {
"name": "*lisi*"
}
}
,"aggs": {
"my-first-agg-name": {
"terms": {
"field": "name"
}
},
"my-second-agg-name": {
"terms": {
"field": "age"
}
}
}
}
// === 响应结果 ===
"aggregations": {
"my-second-agg-name": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 22,
"doc_count": 2
},
{
"key": 23,
"doc_count": 2
}
]
},
"my-first-agg-name": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "lisi1",
"doc_count": 6
},
{
"key": "lisi0",
"doc_count": 1
}
]
}
}
复制代码
执行子聚合查询
GET http://139.198.152.90:9200/elasticsearch-client/_search
// 请求参数的含义为:根据 name 进行分类聚合,然后计算根据 name 分类过后的每个组的平均年龄是多少
{
"size": 0,
"query": {
"wildcard": {
"name": "*lisi*"
}
},
"aggs": {
"my-first-agg-name": {
"terms": {
"field": "name"
},
"aggs": {
"my-sub-agg-name": {
"avg": {
"field": "age"
}
}
}
}
}
}
// === 响应体示例 ===
{
"took": 12,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 15,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"my-first-agg-name": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "lisi1",
"doc_count": 6,
"my-sub-agg-name": {
"value": 24.0
}
},
{
"key": "lisi0",
"doc_count": 1,
"my-sub-agg-name": {
"value": 20.0
}
}
]
}
}
}
复制代码
响应体中显示聚合类型
默认情况下,响应体中是不会显示聚合的类型的,只会显示聚合的名称,如果你想要显示聚合的类型的话可以添加 typed_keys
查询参数,示例如下:
http://139.198.152.90:9200/elasticsearch-client/_search?typed_keys
// === 响应体发生的变化为 ===
聚合名称添加了 类型# 样式,示例如下:
1. sterms#my-first-agg-name
2. avg#my-sub-agg-name
复制代码
在聚合操作中使用脚本
GET /my-index-000001/_search?size=0
{
"runtime_mappings": {
"message.length": {
"type": "long",
"script": "emit(doc['message.keyword'].value.length())"
}
},
"aggs": {
"message_length": {
"histogram": {
"interval": 10,
"field": "message.length"
}
}
}
}
复制代码
根据时间进行聚合
REST API 示例
http://ip:9200/索引名称/_search
{
"timeout": "1s",
"aggs": {
"datetime-aggs": {
"date_histogram": {
// 指定聚合字段
"field": "DateTime",
// 指定时间间隔
"interval": "1d"
}
}
},
"from": 0,
"size": 0
}
复制代码
Java high level rest client 方式
// 根据日期进行聚合
@Test
public void test() throws IOException {
// 查询 source 对象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 只演示聚合操作不关注数据本身,设置成 0
searchSourceBuilder.size(0);
// 设置聚合表达式
searchSourceBuilder.aggregation(AggregationBuilders.dateHistogram("datetime_bucket")
// 聚合指定的字段
.field("DateTime")
.format("yyyy-MM-dd")
.minDocCount(0)
.calendarInterval(DateHistogramInterval.DAY)
// 倒序
.order(BucketOrder.key(false))
);
// 执行 ES 查询请求,并根据响应结果判断是否获取数据
SearchRequest request = new SearchRequest(INDEX_NAME).source(searchSourceBuilder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
if (!RestStatus.OK.equals(response.status()) || response.getAggregations() == null) {
log.info("请求失败");
} else {
Histogram datetimeBucket = response.getAggregations().get("datetime_bucket");
List<? extends Histogram.Bucket> buckets = datetimeBucket.getBuckets();
for (Histogram.Bucket bucket : buckets) {
// 获取日期、数量
String date = bucket.getKeyAsString();
long number = bucket.getDocCount();
log.info("== bucket: key: {}, docCount: {}", date,number);
}
}
}
复制代码
作者:AHA_WT
链接:https://juejin.cn/post/7047104634355187742
划线
评论
复制
发布于: 刚刚阅读数: 3
程序员万金游
关注
只要码不死,就往死里码 2021-11-19 加入
还未添加个人简介
评论