分析Elasticsearch的Aggregation有感(一)
Elasticsearch除了全文檢索之外憎茂,引以為傲的便是各種聚合算法芋绸,如kibana中最為常用的以時間軸的刻度為桶bucket進行統(tǒng)計节腐、累加等聚合計算臭挽。
分析Elasticsearch的聚合統(tǒng)計為什么能做到實時響應(yīng)遭笋?其實原理相當簡tong單,主要依賴fielddata(內(nèi)存常駐壶硅,逐漸被doc_values取代)威兜,將需要排序的字段記錄事先排好序記錄在內(nèi)存中或保存在節(jié)點磁盤上;加上集群分布式計算能力庐椒,能做到聚合計算極快椒舵。
桶中桶的聚合計算則是Elasticsearch的噩夢,經(jīng)常在不怎么大的數(shù)據(jù)量下出現(xiàn)OOM约谈;Elasticsearch所有的計算都采用以下方式笔宿;
當請求發(fā)現(xiàn)集群中的任何一個節(jié)點,該節(jié)點必須負責將數(shù)據(jù)進行整理匯聚棱诱,再返回給客戶端泼橘,也就是一個簡單的節(jié)點上進行Map計算,在一個固定的節(jié)點上進行Reduces得到最終Map集合向客戶端返回迈勋。
當進行桶中桶計算時炬灭,Elasticsearch將該過程只進行了簡單拆分,分別計算出兩個聚合的桶的結(jié)果集靡菇,再進行兩個結(jié)果集的Join重归。
也就是Hadoop進行Join計算時的Redece端進行Join計算米愿,這時所匯聚的數(shù)據(jù)量以及Join計算時K(k1-k2)所產(chǎn)生的新的鍵值急速膨脹,最終導致匯聚節(jié)點的OOM提前。
Elasticsearch當前始終堅持采用簡單的查詢發(fā)起節(jié)點負責數(shù)據(jù)匯聚吗货,跟Elasticsearch及搜索引擎技術(shù)的特點有關(guān),即保證TopN的檢索效率狈网,每個計算節(jié)點只返回各自的TopN宙搬,再由匯聚節(jié)點整合計算出TopN,這樣節(jié)點向匯聚節(jié)點所傳輸?shù)臄?shù)據(jù)始終較型夭浮勇垛;這樣設(shè)計帶來的問題是,無法進行復雜的計算士鸥,如桶中桶闲孤,Any Join等這些其他類型的數(shù)據(jù)倉庫所具備的功能。
當前解決這個問題的辦法烤礁,首先能想到的就是與分布式計算引擎來結(jié)合讼积,復雜的計算交給分布式計算引擎來完成,所以自然出現(xiàn)了Elasticsearch-Hadoop的連接組件脚仔。但嘗試使用過Elasticsearch-Hadoop的人最終都放棄了勤众,原因是當前Hadoop與Elasticsearch結(jié)合時,僅僅把Elasticsearch當前類似Txt類型的存儲鲤脏,進行計算時Hadoop的Map任務(wù)通過Elasticsearch-Hadoop提供InputFormat们颜,只是簡單通過Elasticsearch的Scroll對數(shù)據(jù)進行全量的讀取。
這里我們測試過一般硬件配置配置(32核猎醇,128G內(nèi)存窥突,3*4TB硬盤,千兆網(wǎng)卡)組成的4節(jié)點集群硫嘶,最大的Scroll性能只能到20W-30W條/s(1k每條記錄)阻问;簡單計算下,當需要分析10億級別的數(shù)據(jù)時沦疾,光數(shù)據(jù)從Elasticsearch集群加載到Hadoop集群所需要的時間是多少则拷。
所以只是將Elasticsearch當作普通存儲來進行兩個集群的結(jié)合顯然不合適,如何發(fā)揮兩個集群各種的計算特點來適應(yīng)各種不同的計算需求曹鸠,下面來看看我們的研究方向:
修改Elasticsearch的底層計算邏輯,在進行復雜計算時斥铺,不是采用簡單的計算任務(wù)拆分彻桃,下發(fā)計算,再匯聚這樣粗暴的方式晾蜘,而是類似Hadoop上的優(yōu)化邻眷,在進行第一層節(jié)點計算后眠屎,中間在穿插一層shuffle過程,將需要進行Join計算的Maps肆饶,進行相應(yīng)的排序改衩,遷移評估,再執(zhí)行遷移驯镊,保證數(shù)據(jù)在節(jié)點間最小遷移的情況下葫督,再在遷移后的節(jié)點上進行Join,再進行Reduce板惑,是不是已經(jīng)暈了橄镜。所以暫時我們也沒有計算對這部分進行如此徹底的修改。
? ? ? ? 如何將Elastcicearch如何與Hadoop的有機結(jié)合冯乘,但不是如何提高scroll速度或Map任務(wù)直接對Lucese文件進行直接的IO等洽胶,將數(shù)據(jù)全量讀取到Hadoop集群,而接下來的任何分析都與Elasticsearch沒有任何關(guān)系的做法裆馒。
根據(jù)Elasticsearch數(shù)據(jù)shard的分布姊氓,設(shè)置Hadoop的Map任務(wù),保持Map采用Local方式訪問一個或多個分片喷好,將Map操作的數(shù)據(jù)流控制在Local上翔横。
publicList getSplits(JobContext job)throwsIOException {
// getshards splits
List originalSplits =ElasticserchCatShards(job);
// Get active servers
String[] servers = getActiveServersList(job);
if(servers ==null)
returnnull;
// reassign splits to active servers
List splits =newArrayList(originalSplits.size());
intnumSplits = originalSplits.size();
intcurrentServer = 0;
for(inti = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer,?servers.length)){
String server = servers[currentServer]; // Current server
booleanreplaced =false;
// For every remaining split
for(InputSplitsplit : originalSplits){
FileSplit fs = (FileSplit)split;
// For every split location
for(String l : fs.getLocations()){
// If this split is local to the server
if(l.equals(server)){
// Fix split location
splits.add(newFileSplit(fs.getPath(), fs.getStart(),
fs.getLength(),newString[] {server}));
originalSplits.remove(split);
replaced =true;
break;
}
}
if(replaced)
break;
}
// If no local splits are found for this server
if(!replaced){
// Assign first available split to it
FileSplit fs = (FileSplit)splits.get(0);
splits.add(newFileSplit(fs.getPath(), fs.getStart(), fs.getLength(),
newString[] {server}));
originalSplits.remove(0);
}
}
returnsplits;
}
對計算任務(wù)進行拆分,在進行底層數(shù)據(jù)Input時绒窑,采用的scroll+query方式棕孙,指定分片進行查詢。
publicbooleannext(Kkey, Vvalue)throwsIOException {
if(scrollQuery==null) {
if(beat!=null) {
beat.start();
}
#set querey shards and host:127.0.0.1
scrollQuery=queryBuilder.build(client, scrollReader,shards,host);
size=scrollQuery.getSize();
if(log.isTraceEnabled()) {
log.trace(String.format("Received scroll [%s], size [%d] for query [%s]", scrollQuery, size, queryBuilder));
}
}
booleanhasNext=scrollQuery.hasNext();
if(!hasNext) {
returnfalse;
}
Object[] next=scrollQuery.next();
// NB: the left assignment is not needed since method override
// the writable content however for consistency, they are below
currentKey=setCurrentKey(key, next[0]);
currentValue=setCurrentValue(value, next[1]);
// keep on counting
read++;
returntrue;
}
? ? ? ? ? 當然這樣的做法些膨,還是無法徹底解決單個shards數(shù)據(jù)量過大的情況下蟀俊,單個Map任務(wù)加載速度過慢情況的出現(xiàn)。通過Demo測試订雾,性能要較原生的Elasticsearch-Hadoop控件有50倍左右提升肢预。
? ? ? ? ? 研究的另一個方向是對doc_values數(shù)據(jù)文件的分析,doc_values文件的設(shè)計是解決fielddata占用內(nèi)存過大洼哎,通過分析doc_value和fielddata烫映,一個字段的數(shù)據(jù)進行排序存儲在內(nèi)存和磁盤,其不就是天生的列式存儲么噩峦!采用將Map任務(wù)直接對Doc_value文件的讀取加載锭沟,理論上是可以繞過Elasticsearch的計算節(jié)點的,需要我們小伙伴們加快研究步伐识补,解決Elastticsearch無法進行復雜計算的痛病族淮,至少實現(xiàn)桶中桶,在進行soc分析經(jīng)常被提及的需求。
->