Elasticsearch分组后,根据分组后的数量排序,并查询分组后的组数量,通过DSL和java API两种方式
解决方案:
示例:在单据表中,查询2022-01-19当天每个人提交的单据数量,从高到低排序,并查询提交过单据的总人数。
期望实现的SQL
select Id,count(Id) as c from userbill where type='bill' and createTime >='2022-01-19'
and createTime <= '2022-01-19' group by createUser order by c desc
1、DSL方式 :
GET /userbill/_search
{
"from": 0,
"size": 0,
"query": {
"bool": {
"filter": [
{
"bool": {
"must": [
{
"term": {
"type": {
"value": "bill",
"boost": 1
}
}
},
{
"range": {
"createTime": {
"from": "2022-01-19",
"to": "2022-01-19",
"include_lower": true,
"include_upper": true,
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
]
}
},
"_source": false,
"stored_fields": "_none_",
"aggs": {
"group_name": {
"terms": {
"field": "createUser",
"size": 999999
},
"aggs": {
"bucket_field": {
"bucket_sort": {
"sort": [
{
"_count": {
"order": "desc"
}
}
]
}
}
}
},
"stats_monthly_sales": {
"stats_bucket": {
"buckets_path": "group_name>_count"
}
}
}
}
查询结果
key为分组用户ID,doc_count为数量,stats_monthly_sales内count为组的数量
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 32,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"group_name" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : 115944,
"doc_count" : 8
},
{
"key" : 967145,
"doc_count" : 7
},
{
"key" : 917175,
"doc_count" : 4
},
{
"key" : 937800,
"doc_count" : 3
},
{
"key" : 888831,
"doc_count" : 2
},
{
"key" : 963198,
"doc_count" : 2
},
{
"key" : 88896565,
"doc_count" : 2
},
{
"key" : 381480,
"doc_count" : 1
},
{
"key" : 918555,
"doc_count" : 1
},
{
"key" : 1002454,
"doc_count" : 1
},
{
"key" : 88895739,
"doc_count" : 1
}
]
},
"stats_monthly_sales" : {
"count" : 11,
"min" : 1.0,
"max" : 8.0,
"avg" : 2.909090909090909,
"sum" : 32.0
}
}
}
2、java API方式
查询后解析结果,封装到List集合中
public List<StatisticsData> test(){
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("type", "bill"));
boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte("2022-01-19").lte("2022-01-19"));
queryBuilder.filter(boolQueryBuilder);
builder.query(queryBuilder);
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders
.terms("group_name")
.field("createUser")
.size(999999);
builder.aggregation(new StatsBucketPipelineAggregationBuilder("stats_bucket", "group_name>_count"));
builder.aggregation(termsAggregationBuilder);
builder.from(0);
builder.size(0);
builder.fetchSource(false);
SearchRequest request = new SearchRequest(ElasticsearchConstant.EMSUSERBILLPROCESS_INDEX);
request.source(builder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
ParsedTerms parsedTerms = (ParsedTerms) response.getAggregations().asMap().get("group_name");
List<Terms.Bucket> buckets = (List<Terms.Bucket>) parsedTerms.getBuckets();
List<StatisticsData> li = buckets.stream().map(u -> {
StatisticsData s = new StatisticsData();
s.setKey(String.valueOf(u.getKey()));
s.setDoc_count(String.valueOf(u.getDocCount()));
return s;
}).collect(Collectors.toList());
return li;
}