ES實現(xiàn)百億級數(shù)據(jù)實時分析-實戰(zhàn)案例

背景

我們小組主要負(fù)責(zé)Alpha機(jī)器學(xué)習(xí)平臺(以下簡稱Alpha)的設(shè)計與實現(xiàn)工作芥吟,前段時間算法同學(xué)提出一個需求嘱蛋,希望能夠按照小時為單位蘑辑,看到每個實驗中各種特征(單個或組合)的覆蓋率条获、正樣本占比合搅、負(fù)樣本占比牌柄。我簡單解釋一下這三種指標(biāo)的定義:

  • 覆蓋率:所有樣本中出現(xiàn)某一特征的樣本的比例
  • 正樣本占比:所有出現(xiàn)該特征的樣本中畸悬,正樣本的比例
  • 負(fù)樣本占比:所有出現(xiàn)該特征的樣本中,負(fù)樣本的比例

光看這三個指標(biāo)珊佣,大家可能會覺得這個需求很簡單蹋宦,無非就是一個簡單的篩選、聚合而已咒锻。

如果真的這么簡單冷冗,我也沒必要寫這篇文章單獨(dú)記錄了。問題的關(guān)鍵就在于惑艇,每小時有將近1億的數(shù)據(jù)量蒿辙,而我們需要保存7天的數(shù)據(jù),數(shù)據(jù)總量預(yù)計超過了100億。

技術(shù)方案

在了解清楚需求后思灌,我們小組馬上對技術(shù)方案展開討論碰镜,討論過程中出現(xiàn)了3種方案:

  • 第一種:用Spark流式計算,計算每一種可能單個或組合特征的相關(guān)指標(biāo)
  • 第二種:收到客戶端請求后习瑰,遍歷HDFS中相關(guān)數(shù)據(jù)绪颖,進(jìn)行離線計算
  • 第三種:將數(shù)據(jù)按照實驗+小時分索引存入ES,收到客戶端請求后甜奄,實時計算返回

首先柠横,第一種方案直接被diss,原因是一個實驗一般會出現(xiàn)幾百课兄、上千個特征牍氛,而這些特征的組合何止幾億種,全部計算的話烟阐,可行性暫且不論搬俊,光是對資源的消耗就無法承受

第二種方案,雖然技術(shù)上是可行的蜒茄,但離線計算所需時間較長唉擂,對用戶來說,體驗并不理想檀葛。并且玩祟,為了計算目標(biāo)1%的數(shù)據(jù)而要遍歷所有數(shù)據(jù),對資源也存在很大浪費(fèi)

第三種方案屿聋,將數(shù)據(jù)按照實驗+小時分索引后空扎,可以將每個索引包含的數(shù)據(jù)量降到1000萬以下,再借助ES在查詢润讥、聚合方面高效的能力转锈,應(yīng)該可以實現(xiàn)秒級響應(yīng),并且用戶體驗也會非常好

技術(shù)方案由此確定楚殿。

技術(shù)架構(gòu)

技術(shù)架構(gòu)

1.用Spark從Kafka中接入原始數(shù)據(jù)撮慨,之后對數(shù)據(jù)進(jìn)行解析,轉(zhuǎn)換成我們的目標(biāo)格式

2.將數(shù)據(jù)按照實驗+小時分索引存入ES中

3.接受到用戶請求后勒魔,將請求按照實驗+特征+小時組合甫煞,創(chuàng)建多個異步任務(wù)菇曲,由這些異步任務(wù)并行從ES中過濾并聚合相關(guān)數(shù)據(jù)冠绢,得到結(jié)果

4.將異步任務(wù)的結(jié)果進(jìn)行合并,返回給前端進(jìn)行展示

代碼實現(xiàn)

異步任務(wù)

// 啟動并行任務(wù)

final Map<String,List<Future<GetCoverageTask.Result>>> futures = Maps.newHashMap();

for(String metric : metrics) { // 遍歷要計算的指標(biāo)

final SampleRatio sampleRatio = getSampleRatio(metric);

for (String exptId : expts) { // 遍歷目標(biāo)實驗列表

for (String id : features) { // 遍歷要分析的特征

final String name = getMetricsName(exptId, sampleRatio, id);

final List<Future<GetCoverageTask.Result>> resultList = Lists.newArrayList();

for (Date hour : coveredHours) { // 將時間按照小時進(jìn)行拆分

final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);

final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);

// 啟動并行任務(wù)

final Future<GetCoverageTask.Result> future = TaskExecutor.submit(task);

 resultList.add(future);

 }

futures.put(name, resultList);

 }

}

}

final QueryRes queryRes = new QueryRes();

final Iterator<Map.Entry<String, List<Future<GetCoverageTask.Result>>>> it = futures.entrySet().iterator();

while (it.hasNext()){

// 省略結(jié)果處理流程

 }

指標(biāo)計算

// 1\. 對文檔進(jìn)行聚合運(yùn)行常潮,分別得到基礎(chǔ)文檔的數(shù)量弟胀,以及目標(biāo)文檔數(shù)量

final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);

final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();

searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);

// 2\. 得到覆蓋率

final String indexName = getIndexName(exptId, hour);

final Search search = new Search.Builder(searchBuilder.toString())

.addIndex(indexName).addType(getType()).build();

final SearchResult result = jestClient.execute(search);

if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){

// 請求出錯

 log.warn(result.getErrorMessage());

return 0f;

}

final MetricAggregation aggregations = result.getAggregations();

// 3\. 解析結(jié)果

final long dividend ;

if(SampleRatio.ALL == sampleRatio){

dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();

}else {

dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();

}

// 防止出現(xiàn)被除數(shù)為0時程序異常

if(dividend <= 0){

return 0f;

}

long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();

return divisor / (float)dividend;

聚合

int label = 0;

final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);

// 包含指定特征的正樣本數(shù)量

final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

final List<QueryBuilder> must = boolQuery.must();

// 計算樣本數(shù)量

TermQueryBuilder labelQuery = null;

if(SampleRatio.POSITIVE == sampleRatio) {

// 計算正樣本數(shù)量

 label = 1;

 labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

 must.add(labelQuery);

}else if(SampleRatio.NEGATIVE == sampleRatio) {

// 計算負(fù)樣本數(shù)量

 labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

 must.add(labelQuery);

}

must.add(existsQuery);

final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());

existsCountAgg.field(fieldName);

final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);

filterAgg.subAggregation(existsCountAgg);

return filterAgg;

上線效果

上線后表現(xiàn)完全滿足預(yù)期,平均請求耗時在3秒左右,用戶體驗良好孵户。感謝各位小伙伴的辛苦付出~~

下圖是ES中部分索引的信息:


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末萧朝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子夏哭,更是在濱河造成了極大的恐慌检柬,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件竖配,死亡現(xiàn)場離奇詭異何址,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)进胯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門用爪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胁镐,你說我怎么就攤上這事偎血。” “怎么了盯漂?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵颇玷,是天一觀的道長。 經(jīng)常有香客問我就缆,道長亚隙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任违崇,我火速辦了婚禮阿弃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘羞延。我一直安慰自己渣淳,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布伴箩。 她就那樣靜靜地躺著入愧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嗤谚。 梳的紋絲不亂的頭發(fā)上棺蛛,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音巩步,去河邊找鬼旁赊。 笑死,一個胖子當(dāng)著我的面吹牛椅野,可吹牛的內(nèi)容都是我干的终畅。 我是一名探鬼主播籍胯,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼离福!你這毒婦竟也來了杖狼?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤妖爷,失蹤者是張志新(化名)和其女友劉穎蝶涩,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體絮识,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡子寓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了笋除。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斜友。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垃它,靈堂內(nèi)的尸體忽然破棺而出鲜屏,到底是詐尸還是另有隱情,我是刑警寧澤国拇,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布洛史,位于F島的核電站,受9級特大地震影響酱吝,放射性物質(zhì)發(fā)生泄漏也殖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一务热、第九天 我趴在偏房一處隱蔽的房頂上張望忆嗜。 院中可真熱鬧,春花似錦崎岂、人聲如沸捆毫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽绩卤。三九已至,卻和暖如春江醇,著一層夾襖步出監(jiān)牢的瞬間濒憋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工陶夜, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留凛驮,地道東北人。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓律适,卻偏偏與公主長得像辐烂,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捂贿,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容