內(nèi)容簡介
1.spring整合es
2.普通查詢的使用方法
3.聚合查詢的使用方法
4.普通查詢與聚合查詢的使用區(qū)別
spring整合es
1.service層的接口
public interface EsClient {
public TransportClient getClient();
public QueryBuilder getQueryCondition(final String name, final String value);
}
2.serviceimpl層的實現(xiàn)類
@Service("esClient")
public class EsClientImpl implements EsClient{
@Value("${es.clustername}")
private String clusterName;
@Value("${es.port}")
private String port;
@Value("${es.hosts}")
private String hosts;
private TransportClient transportClient = null;
/**
* 條件分割器剧腻,多條件使用逗號分割
*
* @param name 屬性名
* @param value 查詢值
* @return
*/
public QueryBuilder getQueryCondition(final String name, final String value) {
if (value.contains(",")) {
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
for (final String v : value.split(",")) {
boolQuery.should(QueryBuilders.termQuery(name, v));
}
return boolQuery;
} else {
return QueryBuilders.termQuery(name, value);
}
}
@PostConstruct
public void init() throws Exception {
connectEs();
}
@PreDestroy
public void destory() {
disConnectEs();
}
private void disConnectEs() {
if (null != transportClient) {
transportClient.close();
}
}
private void connectEs() throws Exception {
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
transportClient = new PreBuiltTransportClient(settings);
if (StringUtils.isNotBlank(hosts)) {
String[] hostArray = hosts.split(",");
for (String host : hostArray) {
InetSocketTransportAddress ist = new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port));
transportClient.addTransportAddress(ist);
}
}
}
public TransportClient getClient() {
return this.transportClient;
}
}
3.屬性注入(多種屬性文件剂桥,這里只是一種pom文件存放屬性的方式)
<!--es-->
<sink.es.clustername>dolphin</sink.es.clustername>
<sink.es.port>9300</sink.es.port>
<sink.es.hosts>172.20.78.56,172.20.78.57,172.20.78.58</sink.es.hosts>
<sink.es.index>sinkreport</sink.es.index>
<sink.es.type>mg_sink_user_d</sink.es.type>
普通查詢
1.關(guān)于es模塊提供的查詢
public SearchResponse queryFlowInspireActiveChart(CommonQueryParam commonQueryParam) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotEmpty(commonQueryParam.getProvince())) {
boolQueryBuilder.must(QueryBuilders.termQuery("province", commonQueryParam.getProvince()));
}
if (StringUtils.isNotEmpty(commonQueryParam.getCity())) {
boolQueryBuilder.must(QueryBuilders.termQuery("city", commonQueryParam.getCity()));
}
boolQueryBuilder.must(QueryBuilders.termQuery("content_id", "c0001"));
if (StringUtils.isNotEmpty(commonQueryParam.getStart()) && StringUtils.isNotEmpty(commonQueryParam.getEnd())) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("dayid");
rangeQuery.from(commonQueryParam.getStart());
rangeQuery.to(commonQueryParam.getEnd());
boolQueryBuilder.must(rangeQuery);
}
SearchRequestBuilder searchRequestBuilder = esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(10000)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(boolQueryBuilder)
.addSort("dayid",SortOrder.ASC);//普通查詢按照dayid升序排列
//打印es語句
LOGGER.debug(searchRequestBuilder.toString());
SearchResponse response = searchRequestBuilder.execute().actionGet();
return response;
}
2.業(yè)務(wù)層得到es層返回的數(shù)據(jù)進行業(yè)務(wù)開發(fā)
SearchHits hits = response.getHits();
Iterator<SearchHit> it = hits.iterator();
while(it.hasNext()){
Map<String,Object> map = it.next().getSource();
//日期
String dayid = map.get("dayid").toString())
//日新增手機用戶數(shù)
String userNewActive = map.get("user_new_active").toString());
//日存量手機用戶數(shù) = 日活躍手機用戶數(shù)減去日新增手機用戶數(shù)
int resultNumber = Integer.valueOf(map.get("user_active").toString()) - Integer.valueOf(map.get("user_new_active").toString());
}
3.小總結(jié)
普通查詢很簡單擎颖,對于普通查詢的各種條件全谤,比如限定日期区转,限定省市衫樊,直接在boolQueryBuilder作用must即可零酪。
對于對數(shù)據(jù)進行限定size分頁届垫,直接跟在esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(10000)即可茶鹃,這里就是設(shè)置返回最大1000條數(shù)據(jù)涣雕。
聚合查詢
es的聚合是聚合,普通查詢是普通查詢闭翩,取數(shù)據(jù)的方式不同挣郭,所以不能兩個同時使用,所以對數(shù)據(jù)進行限定的時候疗韵,應(yīng)該放到聚合語句里面限定兑障,關(guān)閉掉普通的size。
1.es聚合查詢
public SearchResponse queryFlowInspireTopTenDataByProvince(
CommonQueryParam commonQueryParam) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotEmpty(commonQueryParam.getProvince())) {
boolQueryBuilder.must(QueryBuilders.termQuery("province", commonQueryParam.getProvince()));
}
//省數(shù)據(jù)內(nèi)容 排除city=0001
if (StringUtils.isNotEmpty(commonQueryParam.getCity())) {
if("0001".equals(commonQueryParam.getCity())){
boolQueryBuilder.mustNot(QueryBuilders.termQuery("city", "0001"));
}
}
boolQueryBuilder.mustNot(QueryBuilders.termQuery("content_id", "c0001"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("content_name", "ignore"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("level1_name", "ignore"));
//排除掉-2異常數(shù)據(jù)
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow_mobile", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow_wifi", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("avg_flow_mobile", "-2"));
if (StringUtils.isNotEmpty(commonQueryParam.getStart()) && StringUtils.isNotEmpty(commonQueryParam.getEnd())) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("dayid");
rangeQuery.from(commonQueryParam.getEnd());
rangeQuery.to(commonQueryParam.getEnd());
boolQueryBuilder.must(rangeQuery);
}
SearchRequestBuilder searchRequestBuilder = esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(0)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(boolQueryBuilder);
//order對別名為contentName的桶數(shù)據(jù)按照求和后的flow進行降序排序
//如果不對求和數(shù)據(jù)進行排序,僅針對桶排序流译,即僅對contentName排序逞怨,很簡單:將order的參數(shù)替換為Terms.Order.asc即可
searchRequestBuilder.addAggregation(AggregationBuilders.terms("contentName").field("content_name").size(2147483647).order(Terms.Order.aggregation("flow", false))
.subAggregation(AggregationBuilders.sum("flow").field("flow"))
.subAggregation(AggregationBuilders.sum("flowMobile").field("flow_mobile"))
.subAggregation(AggregationBuilders.sum("flowWifi").field("flow_wifi"))
.subAggregation(AggregationBuilders.sum("avgFlowMobile").field("avg_flow_mobile"))
);
LOGGER.debug(searchRequestBuilder.toString());
SearchResponse response = searchRequestBuilder.execute().actionGet();
return response;
}
以上因為使用了聚合,所以關(guān)閉的普通查詢的數(shù)據(jù)size先蒋,將其設(shè)置為0骇钦,相反的,打開聚合需要的數(shù)據(jù)size竞漾。
注意:一定要看清楚aggregation的層級眯搭,在了解到es桶特性之后,其實想要排序只能對桶進行排序业岁,一般情況下是對單個桶里面的數(shù)據(jù)進行排序鳞仙,多個桶也能排序,只需要將求和笔时,平均這些函數(shù)方法放到最后一個桶即可棍好,但是es支持不友好很有可能多個桶返回的數(shù)據(jù)并沒有按照預(yù)想的排序。
2.業(yè)務(wù)層獲取es數(shù)據(jù)進行業(yè)務(wù)開發(fā)
Aggregations firstAggs = response.getAggregations();
if (null == firstAggs) {
return null;
}
Terms contentNameTerms = firstAggs.get("contentName");
for (Terms.Bucket contentNameBucket : contentNameTerms.getBuckets()) {
number++;
if(number >= 11){
break;
}
String contentName = contentNameBucket.getKeyAsString();
TableData td = new TableData();
td.setRank(String.valueOf(number));
td.setName(contentName);
td.setKind(this.queryLevel1NameByContentName(commonQueryParam,contentName));
Sum flow = contentNameBucket.getAggregations().get("flow");
td.setTotalFlow(String.format("%.0f", flow.getValue()));
Sum flowMobile = contentNameBucket.getAggregations().get("flowMobile");
td.setMobileFlow(String.format("%.0f", flowMobile.getValue()));
Sum flowWifi = contentNameBucket.getAggregations().get("flowWifi");
td.setWifiFlow(String.format("%.0f", flowWifi.getValue()));
Sum avgFlowMobile = contentNameBucket.getAggregations().get("avgFlowMobile");
td.setAvgFlow(String.format("%.2f", avgFlowMobile.getValue()*1024));
tableDataList.add(td);
}
return tableDataList;
}
要清楚聚合返回數(shù)據(jù)的結(jié)構(gòu)要去了解es桶的原理允耿,簡單來說就是借笙,有多少tems就要for循環(huán)多少次,tems循環(huán)出來的就是桶(bucket),一個桶可以就是group by的數(shù)據(jù)较锡,通過這個桶可以拿到group by的字段名稱比如contentName,然后可以根據(jù)這個桶再去拿桶里面聚合好的數(shù)據(jù)比如contentNameBucket.getAggregations().get("flow");
普通查詢與聚合查詢的使用區(qū)別
1.普通查詢在javaApi里面只能用普通查詢的方式獲取值
2.聚合查詢在javaApi里面只能用聚合查詢方式獲取值
3.由于兩個的不相關(guān)性业稼,他們的size要分別設(shè)置,如果要求聚合蚂蕴,那么在普通查詢的size設(shè)置將失去意義低散,應(yīng)該設(shè)置普通查詢size參數(shù)為0,而在聚合查詢的size參數(shù)盡量設(shè)置大一些骡楼,比如.size(2147483647)熔号,聚合的size越大越能保證桶內(nèi)聚合的時候,sum(value)的數(shù)據(jù)盡量準確鸟整。
補充一個多字段聚合的例子說明多字段的層級關(guān)系
注意多少個group by就需要多少個terms引镊,最后一個terms里面才放sum等函數(shù)聚合,而不是sum聚合函數(shù)放到terms的任意層級篮条!
//設(shè)置用戶運營指標聚合
private void setMemberServiceAgg(SearchRequestBuilder searchRequestBuilder) {
searchRequestBuilder.addAggregation(AggregationBuilders.terms("dayid").field("dayid").size(2147483647)
.subAggregation(AggregationBuilders.terms("provinceName").field("province_name").size(2147483647)
.subAggregation(AggregationBuilders.terms("cityName").field("city_name").size(2147483647).order(Terms.Order.term(true))
.subAggregation(AggregationBuilders.sum("sumNewPayMember").field("new_pay_member"))
.subAggregation(AggregationBuilders.sum("sumCancelMember").field("cancel_member"))
.subAggregation(AggregationBuilders.sum("sumMembers").field("members"))
.subAggregation(AggregationBuilders.sum("sumMemberActive").field("member_active"))
)
)
);
}