摘要:Elasticsearch
葛峻,Java
本文為《Elasticsearch搜索引擎構(gòu)建入門與實戰(zhàn)》第七章內(nèi)容的筆記
內(nèi)容摘要
- 計算指定文檔的字段統(tǒng)計量(sum/mean/max/count)
- 空值填充統(tǒng)計
- 分組統(tǒng)計(groupBy)
- 聚合結(jié)果排序(order)
- 聚合結(jié)果分組取topN(窗口函數(shù))
- 聚合結(jié)果的后過濾(having)
計算指定文檔的字段統(tǒng)計量
(1)sum吏夯,max,min卑吭,value_count,stats
es的聚合使用aggs
關(guān)鍵字马绝,是DSL下的頂級,以sum為例挣菲,對指定文檔計算統(tǒng)計量的DSL為
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"sum": {
"field": "model_1"
}
}
}
}
返回如下
"hits" : {
"total" : 13,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"my_agg" : {
"value" : 2.240000009536743
}
}
以上語法指定的GET的_search請求方式富稻,然后指定了size:0
設(shè)置返回中沒有文檔信息,然后命名了一個my_agg作為返回聚合值的字段名白胀,然后采用sun的聚合方式給到my_agg的value屬性椭赋,注意返回的sum
值是小數(shù)位有偏差的,原始的加數(shù)都是保留兩位小數(shù)的或杠,其他指標avg
哪怔,max
,min
同理向抢,對于count使用value_count
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"value_count": {
"field": "model_1"
}
}
}
}
返回如下
"aggregations" : {
"my_agg" : {
"value" : 4
}
}
在es的聚合語句中指定stats
參數(shù)就可以一起返回預設(shè)置的統(tǒng)計量
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"stats": {
"field": "model_1"
}
}
}
}
返回如下
"aggregations" : {
"my_agg" : {
"count" : 4,
"min" : 0.0,
"max" : 0.8299999833106995,
"avg" : 0.5600000023841858,
"sum" : 2.240000009536743
}
}
(2)和query一起使用
和query一起使用可以對指定范圍的文檔做聚合統(tǒng)計
GET /hotel/_doc/_search
{
"query": {
"term": {
"city": "HK"
}
},
"size": 0,
"aggs": {
"my_agg": {
"sum": {
"field": "model_1"
}
}
}
}
以上結(jié)果只會對query的輸出求聚合
(3)Java客戶端的使用
使用query之后再求Sum的代碼如下认境,在query的同級構(gòu)建aggregation
,具體通過AggregationBuilders
設(shè)置聚合方法和字段挟鸠,在返回時先使用getAggregations
拿到聚合對象叉信,然后拿到聚合指標對象,最后拿到聚合值艘希,也就是從aggregations -> my_agg -> value的三級
public Double getHotelSum() {
double res = 0.0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.termQuery("city", "HK"))
.aggregation(AggregationBuilders.sum("my_agg").field("model_1"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Sum sum = aggregations.get("my_agg");
res = sum.getValue();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
注意如果query到的文檔為空硼身,或者聚合的field全為空,則聚合指標值可能存在初始化錯誤覆享,比如Max返回-inf佳遂,Min返回inf,Sum返回0撒顿,avg返回inf丑罪,因此加了一層命中文檔數(shù)searchResponse.getHits().getTotalHits()做判斷,再看value_count的例子
public long getModel1Count() {
long res = 0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.count("my_agg").field("model_1"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
Aggregations aggregations = searchResponse.getAggregations();
ValueCount count = aggregations.get("my_agg");
res = count.getValue();
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
空值填充統(tǒng)計
空值在計算統(tǒng)計值的時候是略過的核蘸,比如在求均值時不會將控制的個數(shù)作為分母巍糯,es在統(tǒng)計的時候可以指定缺失值填充的策略,例如將所有model_1字段為空的在統(tǒng)計時當做0
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"avg": {
"field": "model_1"
}
}
}
}
輸出
"aggregations" : {
"my_agg" : {
"value" : 1.9825000315904617
}
再加入填充邏輯
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"avg": {
"field": "model_1" ,
"missing": 0
}
}
}
}
輸出的均值降低客扎,因為加入了填充0的影響
"aggregations" : {
"my_agg" : {
"value" : 0.49562500789761543
}
}
添加填充的邏輯在Java中實現(xiàn)如下祟峦,在AggregationBuilders對象中增加missing
屬性即可
public Double getHotelAvg() {
double res = 0.0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.avg("my_agg").field("model_1").missing(0.0));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Avg avg = aggregations.get("my_agg");
res = avg.getValue();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
分組統(tǒng)計groupBy
除了直接統(tǒng)計聚合值涮瞻,es支持分組統(tǒng)計儡首,就是結(jié)構(gòu)化數(shù)據(jù)的groupBy南窗,以及更復雜的交叉表統(tǒng)計Pivot
分組字段一定是一個離散字段,對于keyword型自然支持台颠,對于數(shù)值型(range)需要先分箱再作為分組列
a.使用terms設(shè)置分組字段
直接使用terms
設(shè)置字段,在命名的自定義字段的下一級畦木,但是不指定聚合方式的化笨觅,會返回分組統(tǒng)計count
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city"
}
}
}
}
輸出如下,同樣空值是不被統(tǒng)計的婶希,返回值在doc_count
字段下
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "HK",
"doc_count" : 2
},
{
"key" : "SZ",
"doc_count" : 1
}
]
注意es返回的doc_count是近似值榕暇,并不一定準確,因此es給出了doc_count_error_upper_bound
和sum_other_doc_count
喻杈,分別表示可能被遺漏的文檔數(shù)量做大值彤枢,除了返回的文檔外剩下的文檔總數(shù),再看一下boolean類型的字段分組統(tǒng)計
b.使用ranges設(shè)置分組字段
基于數(shù)值字段分箱之后獲得每個組筒饰,使用from
缴啡,to
指定分箱的起點和終點,注意是包含起始點瓷们,不包含終點业栅,左閉右開
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3
},
{
"from": 3
}
]
}
}
}
}
返回如下,確實是左閉右開谬晕,自動命名了key
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "1.0-3.0",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2
},
{
"key" : "3.0-*",
"from" : 3.0,
"doc_count" : 3
}
]
}
}
也可以自定義分組的key名
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3, "key": "low_degree"
},
{
"from": 3,"key": "high_degree"
}
]
}
}
}
}
返回帶有自命名的key
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "low_degree",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2
},
{
"key" : "high_degree",
"from" : 3.0,
"doc_count" : 3
}
]
}
c.設(shè)置分組聚合指標
以上默認都是以doc_count
聚合獲取計數(shù)碘裕,還可以結(jié)合其他統(tǒng)計指標,具體實在分組的同級(terms固蚤,range)娘汞,再寫一個aggs,設(shè)置聚合的字段和聚合方式夕玩,以及空值處理方式等等你弦。以均值為例DSL如下
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3, "key": "low_degree"
},
{
"from": 3,"key": "high_degree"
}
]
},
"aggs": {
"my_avg": {
"avg": {
"field": "price"
}
}
}
}
}
}
以上是對degree做groupBy,以price求均值聚合燎孟,看一下返回
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "low_degree",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2,
"my_avg" : {
"value" : 1514.0
}
},
{
"key" : "high_degree",
"from" : 3.0,
"doc_count" : 3,
"my_avg" : {
"value" : 1486990.6666666667
}
}
]
}
d.Java客戶端groupBy分組統(tǒng)計代碼
分別整理terms禽作,range分組,以及帶有聚合指標的代碼
public void getTermsBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("degree3"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
System.out.println(bucketKey + ":" + docCount);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
輸出如下
3:2
1:1
2:1
4:1
在range分組的代碼中使用addRange
和RangeAggregator.Range
來設(shè)置分組起始和結(jié)束揩页,三元素分別是key名旷偿,起始,結(jié)束爆侣,沒有就是null萍程,輸入的必須是double
public void getRangeBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.range("my_agg").field("degree3")
.addRange(new RangeAggregator.Range("low_degree", null, 3d))
.addRange(new RangeAggregator.Range("high_degree", 3d, null)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("my_agg");
for (Range.Bucket bucket : range.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
System.out.println(bucketKey + ":" + docCount);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
輸出如下
low_degree:2
high_degree:3
在分組并且指定聚合指標時,需要在分組之后使用subAggregation
創(chuàng)建子聚合邏輯兔仰,在其中指定聚合名稱好聚合字段茫负,聚合方式,在返回結(jié)果時乎赴,使用getAggregations
對bucket拿到值
public void getRangeBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.range("my_agg").field("degree3")
.addRange(new RangeAggregator.Range("low_degree", null, 3d))
.addRange(new RangeAggregator.Range("high_degree", 3d, null))
.subAggregation(AggregationBuilders.avg("my_avg").field("price")));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("my_agg");
for (Range.Bucket bucket : range.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
Avg avg = bucket.getAggregations().get("my_avg");
double value = avg.getValue();
System.out.println(bucketKey + ":" + value);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
返回結(jié)果如下
low_degree:1514.0
high_degree:1486990.6666666667
聚合結(jié)果截取和排序(order by)
可以指定聚合計算之后忍法,根據(jù)聚合的key潮尝,或者結(jié)果的value進行排序,默認根據(jù)doc_count的大小進行降序排序饿序,使用_count
排序達到同樣的效果勉失,寫在分組方式terms內(nèi)
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"_count": "desc"
}
}
}
}
}
也可以根據(jù)_key
進行排序,根據(jù)分組的key值排序
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"_key": "desc"
}
}
}
}
}
最要解決的還是對value進行排序原探,在排序的時候指定自定義的聚合字段名即可
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"my_avg": "desc"
}
},
"aggs": {
"my_avg": {
"avg": {
"field": "price"
}
}
}
}
}
}
Java代碼示例乱凿,在子聚合中加入order
屬性,設(shè)置自定義字段以倒序排列BucketOrder.aggregation("my_avg", false)
public void getTermsBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("degree3")
.subAggregation(AggregationBuilders.avg("my_avg").field("price"))
.order(BucketOrder.aggregation("my_avg", false)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
Avg avg = bucket.getAggregations().get("my_avg");
double value = avg.getValue();
System.out.println(bucketKey + ":" + value);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
聚合結(jié)果分組取topN(窗口函數(shù))
能實現(xiàn)類似窗口函數(shù)的功能咽弦,比如分組求top1告匠,在例子中想拿到每個實體的最新更新日期的那一條數(shù)據(jù)詳情,先插入幾條數(shù)據(jù)
PUT /stock
POST /stock/_doc/_mapping
{
"properties": {
"security_code": {"type": "keyword"},
"stock_price": {"type": "double"},
"date": {"type": "date", "format": "yyyy-MM-dd"}
}
}
POST /_bulk
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":3.14,"date":"2021-01-01"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":9.14,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":4.14,"date":"2021-01-03"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":2.97,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":3.54,"date":"2021-01-03"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":7.84,"date":"2021-01-04"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300198","stock_price":9.26,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300198","stock_price":3.14,"date":"2021-01-01"}
下面查詢最新的一條股價詳情离唬,在子聚合中指定top_hits
的sort
排序條件和返回條數(shù)size
,每個分組返回top1
GET /stock/_doc/_search
{
"size": 0,
"aggs": {
"type": {
"terms": {
"field": "security_code"
},"aggs": {
"latest_price": {
"top_hits": {
"sort": [{
"date": {"order": "desc"}
}],
"size": 1
}
}
}
}
}
}
查看輸出
"buckets" : [
{
"key" : "002334",
"doc_count" : 3,
"latest_price" : {
"hits" : {
"total" : 3,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "8-d9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "002334",
"stock_price" : 7.84,
"date" : "2021-01-04"
},
"sort" : [
1609718400000
]
}
]
}
}
},
{
"key" : "300124",
"doc_count" : 3,
"latest_price" : {
"hits" : {
"total" : 3,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "8Od9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "300124",
"stock_price" : 4.14,
"date" : "2021-01-03"
},
"sort" : [
1609632000000
]
}
]
}
}
},
{
"key" : "300198",
"doc_count" : 2,
"latest_price" : {
"hits" : {
"total" : 2,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "9Od9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "300198",
"stock_price" : 9.26,
"date" : "2021-01-02"
},
"sort" : [
1609545600000
]
}
]
}
}
}
]
Java客戶端的實現(xiàn)
public void getLatestStockPrice() {
SearchRequest searchRequest = new SearchRequest("stock");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("security_code")
.subAggregation(AggregationBuilders.topHits("latest_price").sort("date").size(1)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
TopHits topHits = bucket.getAggregations().get("latest_price");
topHits.getHits().forEach(s -> System.out.println(s.getSourceAsMap()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
打印輸出如下
{date=2021-01-02, security_code=002334, stock_price=2.97}
{date=2021-01-01, security_code=300124, stock_price=3.14}
{date=2021-01-01, security_code=300198, stock_price=3.14}