背景
我們小組主要負(fù)責(zé)Alpha機(jī)器學(xué)習(xí)平臺(以下簡稱Alpha)的設(shè)計與實現(xiàn)工作芥吟,前段時間算法同學(xué)提出一個需求嘱蛋,希望能夠按照小時為單位蘑辑,看到每個實驗中各種特征(單個或組合)的覆蓋率条获、正樣本占比合搅、負(fù)樣本占比牌柄。我簡單解釋一下這三種指標(biāo)的定義:
- 覆蓋率:所有樣本中出現(xiàn)某一特征的樣本的比例
- 正樣本占比:所有出現(xiàn)該特征的樣本中畸悬,正樣本的比例
- 負(fù)樣本占比:所有出現(xiàn)該特征的樣本中,負(fù)樣本的比例
光看這三個指標(biāo)珊佣,大家可能會覺得這個需求很簡單蹋宦,無非就是一個簡單的篩選、聚合而已咒锻。
如果真的這么簡單冷冗,我也沒必要寫這篇文章單獨(dú)記錄了。問題的關(guān)鍵就在于惑艇,每小時有將近1億的數(shù)據(jù)量蒿辙,而我們需要保存7天的數(shù)據(jù),數(shù)據(jù)總量預(yù)計超過了100億。
技術(shù)方案
在了解清楚需求后思灌,我們小組馬上對技術(shù)方案展開討論碰镜,討論過程中出現(xiàn)了3種方案:
- 第一種:用Spark流式計算,計算每一種可能單個或組合特征的相關(guān)指標(biāo)
- 第二種:收到客戶端請求后习瑰,遍歷HDFS中相關(guān)數(shù)據(jù)绪颖,進(jìn)行離線計算
- 第三種:將數(shù)據(jù)按照實驗+小時分索引存入ES,收到客戶端請求后甜奄,實時計算返回
首先柠横,第一種方案直接被diss,原因是一個實驗一般會出現(xiàn)幾百课兄、上千個特征牍氛,而這些特征的組合何止幾億種,全部計算的話烟阐,可行性暫且不論搬俊,光是對資源的消耗就無法承受
第二種方案,雖然技術(shù)上是可行的蜒茄,但離線計算所需時間較長唉擂,對用戶來說,體驗并不理想檀葛。并且玩祟,為了計算目標(biāo)1%的數(shù)據(jù)而要遍歷所有數(shù)據(jù),對資源也存在很大浪費(fèi)
第三種方案屿聋,將數(shù)據(jù)按照實驗+小時分索引后空扎,可以將每個索引包含的數(shù)據(jù)量降到1000萬以下,再借助ES在查詢润讥、聚合方面高效的能力转锈,應(yīng)該可以實現(xiàn)秒級響應(yīng),并且用戶體驗也會非常好
技術(shù)方案由此確定楚殿。
技術(shù)架構(gòu)
1.用Spark從Kafka中接入原始數(shù)據(jù)撮慨,之后對數(shù)據(jù)進(jìn)行解析,轉(zhuǎn)換成我們的目標(biāo)格式
2.將數(shù)據(jù)按照實驗+小時分索引存入ES中
3.接受到用戶請求后勒魔,將請求按照實驗+特征+小時組合甫煞,創(chuàng)建多個異步任務(wù)菇曲,由這些異步任務(wù)并行從ES中過濾并聚合相關(guān)數(shù)據(jù)冠绢,得到結(jié)果
4.將異步任務(wù)的結(jié)果進(jìn)行合并,返回給前端進(jìn)行展示
代碼實現(xiàn)
異步任務(wù)
// 啟動并行任務(wù)
final Map<String,List<Future<GetCoverageTask.Result>>> futures = Maps.newHashMap();
for(String metric : metrics) { // 遍歷要計算的指標(biāo)
final SampleRatio sampleRatio = getSampleRatio(metric);
for (String exptId : expts) { // 遍歷目標(biāo)實驗列表
for (String id : features) { // 遍歷要分析的特征
final String name = getMetricsName(exptId, sampleRatio, id);
final List<Future<GetCoverageTask.Result>> resultList = Lists.newArrayList();
for (Date hour : coveredHours) { // 將時間按照小時進(jìn)行拆分
final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);
final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);
// 啟動并行任務(wù)
final Future<GetCoverageTask.Result> future = TaskExecutor.submit(task);
resultList.add(future);
}
futures.put(name, resultList);
}
}
}
final QueryRes queryRes = new QueryRes();
final Iterator<Map.Entry<String, List<Future<GetCoverageTask.Result>>>> it = futures.entrySet().iterator();
while (it.hasNext()){
// 省略結(jié)果處理流程
}
指標(biāo)計算
// 1\. 對文檔進(jìn)行聚合運(yùn)行常潮,分別得到基礎(chǔ)文檔的數(shù)量弟胀,以及目標(biāo)文檔數(shù)量
final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);
final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);
// 2\. 得到覆蓋率
final String indexName = getIndexName(exptId, hour);
final Search search = new Search.Builder(searchBuilder.toString())
.addIndex(indexName).addType(getType()).build();
final SearchResult result = jestClient.execute(search);
if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){
// 請求出錯
log.warn(result.getErrorMessage());
return 0f;
}
final MetricAggregation aggregations = result.getAggregations();
// 3\. 解析結(jié)果
final long dividend ;
if(SampleRatio.ALL == sampleRatio){
dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();
}else {
dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();
}
// 防止出現(xiàn)被除數(shù)為0時程序異常
if(dividend <= 0){
return 0f;
}
long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();
return divisor / (float)dividend;
聚合
int label = 0;
final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);
// 包含指定特征的正樣本數(shù)量
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
final List<QueryBuilder> must = boolQuery.must();
// 計算樣本數(shù)量
TermQueryBuilder labelQuery = null;
if(SampleRatio.POSITIVE == sampleRatio) {
// 計算正樣本數(shù)量
label = 1;
labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
must.add(labelQuery);
}else if(SampleRatio.NEGATIVE == sampleRatio) {
// 計算負(fù)樣本數(shù)量
labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
must.add(labelQuery);
}
must.add(existsQuery);
final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());
existsCountAgg.field(fieldName);
final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);
filterAgg.subAggregation(existsCountAgg);
return filterAgg;
上線效果
上線后表現(xiàn)完全滿足預(yù)期,平均請求耗時在3秒左右,用戶體驗良好孵户。感謝各位小伙伴的辛苦付出~~
下圖是ES中部分索引的信息: