-
業(yè)務背景
為了對訂單進行統(tǒng)計分析汹来,需要將訂單表中的數(shù)據(jù)查詢并導出來辫继。
但是,問題來了遣耍,訂單表動輒成百上千萬數(shù)據(jù)舵变,加上字段繁多,使用sql來查詢纪隙,很容易就連接超時了。那么碘饼,換一種手段悲伶,將訂單數(shù)據(jù)同步到定位為搜索的Elasticsearch(es)中去,再從es中查詢出來麸锉。可是柳爽,問題又來了磷脯,從官方文檔中介紹娩脾,從es中查詢出來的數(shù)據(jù)默認限制了一萬條,這個將直接影響查詢的深度分頁以及查詢總數(shù)。官方文檔介紹如下:
- 解決方式
- 調(diào)大index.max_result_window的參數(shù)隘冲,如果太大會影響性能和效率展辞,并且很有可能導致OOM
- 如果是進行深度分頁的話万牺,可以使用滾動查詢Scroll。如果是使用from+size覆旱,假設現(xiàn)在有5個節(jié)點核无,那么當一個請求分發(fā)到A節(jié)點,那么A節(jié)點將作為協(xié)調(diào)節(jié)點噪沙,然后將請求分發(fā)到其它節(jié)點吐根,其它節(jié)點都需返回from+size條數(shù)據(jù)給A節(jié)點(可以由主分片或者副分片處理),然后再由協(xié)調(diào)節(jié)點進行排序局义,截取出第from頁的size條數(shù)據(jù)冗疮,返回客戶端,數(shù)據(jù)量大的話性能比較低穷绵。
-
如果是查詢訂單特愿,可以采用切割查詢的策略,流程如下:
核心代碼如下:
//先從數(shù)據(jù)庫中查出一部分訂單id
List<Integer> orderIds = wholesaleOrderDAO.getOrderIds(stime, etime, providerId, timeType);
if (CollectionUtils.isEmpty(orderIds)){
return Pair.of(Collections.EMPTY_LIST,BigDecimal.ZERO);
}
//按照9000的容量對訂單進行切割
List<List<Integer>> partition = Lists.partition(orderIds, 9000);
List<ExportOrderDataItem> orders = new ArrayList<>();
//使用多線程從es中進行訂單查詢
List<List<ExportOrderDataItem>> resultsAsync = taskCommonService.getResultsAsync(partition, e -> {
try {
return orderIndexService.getOrderDetail(e,drugStore,phone,providerId);
} catch (Exception exception) {
log.error("從es中查詢訂單數(shù)據(jù)異常",exception);
}
return new ArrayList<>();
});
for(List<ExportOrderDataItem> orderList : resultsAsync){
orders.addAll(orderList);
}
if (CollectionUtils.isEmpty(orders)){
return Pair.of(Collections.EMPTY_LIST,BigDecimal.ZERO);
}
/**
* 處理異步結(jié)果20s超時
*/
private static final long DEFAULT_TIME_OUT = 20000;
private final ThreadPoolTaskExecutor fileCommonExecutor;
/**
* 異步獲取結(jié)果
* @param items 需要異步分頁的參數(shù)
* @param <T> 方法參數(shù)
* @param <R> 結(jié)果
*/
public <R, T> List<R> getResultsAsync(Collection<T> items, Function<T, R> function) throws Exception {
List<R> results = new ArrayList<>(items.size());
List<Future<R>> futureList = new ArrayList<>();
for (T param : items) {
futureList.add(fileCommonExecutor.submit(() -> function.apply(param)));
}
long remainingTime = DEFAULT_TIME_OUT;
long cos = 0;
for (Future<R> future : futureList) {
remainingTime -= cos;
long beginTime = System.currentTimeMillis();
R result = future.get(remainingTime, TimeUnit.MILLISECONDS);
cos = System.currentTimeMillis() - beginTime;
results.add(result);
}
return results;
}