Elasticsearch分组后,根据分组后的数量排序,并查询分组后的组数量,通过DSL和java API两种方式
解决方案:
示例:在单据表中,查询2022-01-19当天每个人提交的单据数量,从高到低排序,并查询提交过单据的总人数。
期望实现的SQL
select Id,count(Id) as c from userbill where type='bill' and createTime >='2022-01-19' and createTime <= '2022-01-19' group by createUser order by c desc
复制
1、DSL方式 :
GET /userbill/_search { "from": 0, "size": 0, "query": { "bool": { "filter": [ { "bool": { "must": [ { "term": { "type": { "value": "bill", "boost": 1 } } }, { "range": { "createTime": { "from": "2022-01-19", "to": "2022-01-19", "include_lower": true, "include_upper": true, "boost": 1 } } } ], "adjust_pure_negative": true, "boost": 1 } } ] } }, "_source": false, "stored_fields": "_none_", "aggs": { "group_name": { "terms": { "field": "createUser", "size": 999999 }, "aggs": { "bucket_field": { "bucket_sort": { "sort": [ { "_count": { "order": "desc" } } ] } } } }, "stats_monthly_sales": { "stats_bucket": { "buckets_path": "group_name>_count" } } } }
复制
查询结果
key为分组用户ID,doc_count为数量,stats_monthly_sales内count为组的数量
{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 32, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "group_name" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : 115944, "doc_count" : 8 }, { "key" : 967145, "doc_count" : 7 }, { "key" : 917175, "doc_count" : 4 }, { "key" : 937800, "doc_count" : 3 }, { "key" : 888831, "doc_count" : 2 }, { "key" : 963198, "doc_count" : 2 }, { "key" : 88896565, "doc_count" : 2 }, { "key" : 381480, "doc_count" : 1 }, { "key" : 918555, "doc_count" : 1 }, { "key" : 1002454, "doc_count" : 1 }, { "key" : 88895739, "doc_count" : 1 } ] }, "stats_monthly_sales" : { "count" : 11, "min" : 1.0, "max" : 8.0, "avg" : 2.909090909090909, "sum" : 32.0 } } }
复制
2、java API方式
查询后解析结果,封装到List集合中
public List<StatisticsData> test(){ SearchSourceBuilder builder = new SearchSourceBuilder(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termQuery("type", "bill")); boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte("2022-01-19").lte("2022-01-19")); queryBuilder.filter(boolQueryBuilder); builder.query(queryBuilder); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("group_name") .field("createUser") .size(999999); builder.aggregation(new StatsBucketPipelineAggregationBuilder("stats_bucket", "group_name>_count")); builder.aggregation(termsAggregationBuilder); builder.from(0); builder.size(0); builder.fetchSource(false); SearchRequest request = new SearchRequest(ElasticsearchConstant.EMSUSERBILLPROCESS_INDEX); request.source(builder); SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); ParsedTerms parsedTerms = (ParsedTerms) response.getAggregations().asMap().get("group_name"); List<Terms.Bucket> buckets = (List<Terms.Bucket>) parsedTerms.getBuckets(); List<StatisticsData> li = buckets.stream().map(u -> { StatisticsData s = new StatisticsData(); s.setKey(String.valueOf(u.getKey())); s.setDoc_count(String.valueOf(u.getDocCount())); return s; }).collect(Collectors.toList()); return li; }
复制