一明刷、前言
前面已經(jīng)介紹了ElasticSearch的寫入流程拙泽,了解了ElasticSearch寫入時的分布式特性的相關(guān)原理。ElasticSearch作為一款具有強(qiáng)大搜索功能的存儲引擎屯碴,它的讀取是什么樣的呢践叠?讀取相比寫入簡單的多言缤,但是在使用過程中有哪些需要我們注意的呢?本篇文章會進(jìn)行詳細(xì)的分析禁灼。
在前面的文章我們已經(jīng)知道ElasticSearch的讀取分為兩種GET和SEARCH管挟。這兩種操作是有一定的差異的,下面我們先對這兩種核心的數(shù)據(jù)讀取方式進(jìn)行一一分析弄捕。
二僻孝、GET的流程
2.1 整體流程
(圖片來自官網(wǎng))
以下是從主分片或者副本分片檢索文檔的步驟順序:
客戶端向 Node 1 發(fā)送獲取請求
節(jié)點使用文檔的 _id 來確定文檔屬于分片 0 。分片 0 的副本分片存在于所有的三個節(jié)點上守谓。在這種情況下穿铆,它將請求轉(zhuǎn)發(fā)到 Node 2
Node 2 將文檔返回給 Node 1,然后將文檔返回給客戶端斋荞。
注意:
在處理讀取請求時荞雏,協(xié)調(diào)節(jié)點在每次請求的時候都會通過輪詢所有的副本分片來達(dá)到負(fù)載均衡。
在文檔被檢索時,已經(jīng)被索引的文檔可能已經(jīng)存在于主分片上但是還沒有復(fù)制到副本分片凤优。在這種情況下悦陋,副本分片可能會報告文檔不存在,但是主分片可能成功返回文檔筑辨。一旦索引請求成功返回給用戶俺驶,文檔在主分片和副本分片都是可用的
2.2 GET詳細(xì)流程
2.2.1 協(xié)調(diào)節(jié)點處理過程
在協(xié)調(diào)節(jié)點有個http_server_worker線程池。收到讀請求后它的具體過程為:
收到請求挖垛,先獲取集群的狀態(tài)信息
根據(jù)路由信息計算id是在哪一個分片上
因為一個分片可能有多個副本分片痒钝,所以上述的計算結(jié)果是一個列表
調(diào)用transportServer的sendRequest方法向目標(biāo)發(fā)送請求
上一步的方法內(nèi)部會檢查是否為本地node秉颗,如果是的話就不會發(fā)送到網(wǎng)絡(luò)痢毒,否則會異步發(fā)送
等待數(shù)據(jù)節(jié)點回復(fù),如果成功則返回數(shù)據(jù)給客戶端蚕甥,否則會重試
重試會發(fā)送上述列表的下一個哪替。
2.2.2 數(shù)據(jù)節(jié)點處理過程
數(shù)據(jù)節(jié)點上有一個get線程池。收到了請求后菇怀,處理過程為:
- 在數(shù)據(jù)節(jié)點有個shardTransporthander的messageReceived的入口專門接收協(xié)調(diào)節(jié)點發(fā)送的請求
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) {
asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
}
}
- shardOperation方法會先檢查是否需要refresh凭舶,然后調(diào)用indexShard.getService().get()讀取數(shù)據(jù)并存儲到GetResult中。
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService().get(
request.type(), request.id(),
request.storedFields(), request.realtime(),
request.version(), request.versionType(),
request.fetchSourceContext());
- indexShard.getService().get()最終會調(diào)用GetResult getResult = innerGet(……)用來獲取結(jié)果爱沟。即ShardGetService#innerGet
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
................
Engine.GetResult get = null;
............
get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
..........
if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
try {
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.close();
}
-
上面代碼的indexShard.get讀取真正的數(shù)據(jù)時會最終調(diào)用:
org.elasticsearch.index.engine.InternalEngine#gett
public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = null;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
帅霜。。呼伸。身冀。。括享。
//刷盤操作
refreshIfNeeded("realtime_get", versionValue.seqNo);
注意:
get過程會加讀鎖搂根。處理realtime選項,如果為true铃辖,則先判斷是否有數(shù)據(jù)可以刷盤剩愧,然后調(diào)用Searcher進(jìn)行讀取。Searcher是對IndexSearcher的封裝在早期realtime為true則會從tranlog中讀取娇斩,后面只會從index的lucene讀取了仁卷。即實時的數(shù)據(jù)只在lucene之中。
- innerGetLoadFromStoredFields根據(jù)type犬第,id锦积,filed,source等信息過濾瓶殃,并將結(jié)果放到getresult之中返回
2.3 小結(jié)
GET是根據(jù)doc id 哈希找到對應(yīng)的shard的
get請求默認(rèn)是實時的充包,但是不同版本有差異,在5.x以前,讀不到寫的doc會從translog中去讀取基矮,之后改為讀取不到會進(jìn)行refresh到lucene中淆储,因此現(xiàn)在的實時讀取需要復(fù)制一定的性能損耗的代價。如果對實時性要求不高家浇,可以請求是手動帶上realtime為false
三本砰、search流程
3.1 search整體流程
對于Search類請求,ElasticSearch請求是查詢lucene的Segment钢悲,前面的寫入詳情流程也分析了点额,新增的文檔會定時的refresh到磁盤中,所以搜索是屬于近實時的莺琳。而且因為沒有文檔id还棱,你不知道你要檢索的文檔在哪個分配上,需要將索引的所有的分片都去搜索下惭等,然后匯總珍手。ElasticSearch的search一般有兩個搜索類型
dfs_query_and_fetch,流程復(fù)雜一些辞做,但是算分的時候使用了全局的一些指標(biāo)琳要,這樣獲取的結(jié)果可能更加精確一些。
query_then_fetch秤茅,默認(rèn)的搜索類型稚补。
所有的搜索系統(tǒng)一般都是兩階段查詢:
第一階段查詢到匹配的docID,第二階段再查詢DocID對應(yīng)的完整文檔框喳。這種在ElasticSearch中稱為query_then_fetch课幕,另一種就是一階段查詢的時候就返回完整Doc,在ElasticSearch中叫query_and_fetch帖努,一般第二種適用于只需要查詢一個Shard的請求撰豺。因為這種一次請求就能將數(shù)據(jù)請求到,減少交互次數(shù)拼余,二階段的原因是需要多個分片聚合匯總污桦,如果數(shù)據(jù)量太大那么會影響網(wǎng)絡(luò)傳輸效率,所以第一階段會先返回id匙监。
除了上述的這兩種查詢外凡橱,還有一種三階段查詢的情況。
搜索里面有一種算分邏輯是根據(jù)TF和DF來計算score的亭姥,而在普通的查詢中稼钩,第一階段去每個Shard中獨立查詢時攜帶條件算分都是獨立的,即Shard中的TF和DF也是獨立的达罗。雖然從統(tǒng)計學(xué)的基礎(chǔ)上數(shù)據(jù)量多的情況下坝撑,每一個分片的TF和DF在整體上會趨向于準(zhǔn)確静秆。但是總會有情況導(dǎo)致局部的TF和DF不準(zhǔn)的情況出現(xiàn)。
ElasticSearch為了解決這個問題引入了DFS查詢巡李。
比如DFS_query_then_fetch抚笔,它在每次查詢時會先收集所有Shard中的TF和DF值,然后將這些值帶入請求中侨拦,再次執(zhí)行query_then_fetch殊橙,這樣算分的時候TF和DF就是準(zhǔn)確的,類似的有DFS_query_and_fetch狱从。這種查詢的優(yōu)勢是算分更加精準(zhǔn)膨蛮,但是效率會變差。
另一種選擇是用BM25代替TF/DF模型季研。
在ElasticSearch7.x敞葛,用戶沒法指定以下兩種方式:DFS_query_and_fetch和query_and_fetch。
注:這兩種算分的算法模型在《ElasticSearch實戰(zhàn)篇》有介紹:
這里query_then_fetch具體的搜索的流程圖如下:
(圖片來自官網(wǎng))
查詢階段包含以下四個步驟:
客戶端發(fā)送一個 search 請求到 Node 3 训貌, Node 3 會創(chuàng)建一個大小為 from + size 的空優(yōu)先隊列制肮。
Node 3 將查詢請求轉(zhuǎn)發(fā)到索引的每個主分片或副本分片中冒窍。每個分片在本地執(zhí)行查詢并添加結(jié)果到大小為 from + size 的本地有序優(yōu)先隊列中递沪。
每個分片返回各自優(yōu)先隊列中所有文檔的 ID 和排序值給協(xié)調(diào)節(jié)點,也就是 Node 3 综液,它合并這些值到自己的優(yōu)先隊列中來產(chǎn)生一個全局排序后的結(jié)果列表款慨。
當(dāng)一個搜索請求被發(fā)送到某個節(jié)點時,這個節(jié)點就變成了協(xié)調(diào)節(jié)點谬莹。這個節(jié)點的任務(wù)是廣播查詢請求到所有相關(guān)分片并將它們的響應(yīng)整合成全局排序后的結(jié)果集合,這個結(jié)果集合會返回給客戶端。
3.2 search詳細(xì)流程
以上就是ElasticSearch的search的詳細(xì)流程踱侣,下面會對每一步進(jìn)行進(jìn)一步的說明馆衔。
3.2.1 協(xié)調(diào)節(jié)點
3.2.1.1 query階段
協(xié)調(diào)節(jié)點處理query請求的線程池為:
http_server_work
- 負(fù)責(zé)解析請求
負(fù)責(zé)該解析功能的類為:
org.elasticsearch.rest.action.search.RestSearchAction
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
。蕉扮。整胃。。喳钟。屁使。。奔则。蛮寂。。易茬。酬蹋。
};
}
主要將restquest的參數(shù)封裝成SearchRequest
這樣SearchRequest請求發(fā)送給TransportSearchAction處理
- 生成目的分片列表
將索引涉及到的shard列表或者有跨集群訪問相關(guān)的shard列表合并
private void executeSearch(...........) {
........
//本集群的列表分片列表
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
.collect(Collectors.toList());
.......
//遠(yuǎn)程集群的分片列表
final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
.......
}
- 遍歷分片發(fā)送請求
如果有多個分片位于同一個節(jié)點,仍然會發(fā)送多次請求
public final void run() {
......
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
}
......
......
if (shardsIts.size() > 0) {
//遍歷分片發(fā)送請求
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardItIndexMap.containsKey(shardRoutings);
int shardIndex = shardItIndexMap.get(shardRoutings);
//執(zhí)行shard請求
performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
}
......
shardsIts為搜索涉及的所有分片,而shardRoutings.nextOrNull()會從分片的所有副本分片選出一個分片來請求范抓。
- 收集和合并請求
onShardSuccess對收集到的結(jié)果進(jìn)行合并写半,這里需要檢查所有的請求是否都已經(jīng)有了回復(fù)。
然后才會判斷要不要進(jìn)行executeNextPhase
private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
successfulOps.incrementAndGet();
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures != null) {
shardFailures.set(result.getShardIndex(), null);
}
successfulShardExecution(shardIt);
}
private void successfulShardExecution(SearchShardIterator shardsIt) {
......
//計數(shù)器累加
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
//是不是所有分都已經(jīng)回復(fù)尉咕,然后調(diào)用onPhaseDone();
if (xTotalOps == expectedTotalOps) {
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]",
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()));
}
}
當(dāng)返回結(jié)果的分片數(shù)等于預(yù)期的總分片數(shù)時叠蝇,協(xié)調(diào)節(jié)點會進(jìn)入當(dāng)前Phase的結(jié)束處理,啟動下一個階段Fetch Phase的執(zhí)行年缎。onPhaseDone()會executeNextPhase來執(zhí)行下一個階段悔捶。
3.2.1.2 fetch階段
當(dāng)觸發(fā)了executeNextPhase方法將觸發(fā)fetch階段
- 發(fā)送fetch請求
上一步的executeNextPhase方法觸發(fā)Fetch階段,F(xiàn)etch階段的起點為FetchSearchPhase#innerRun函數(shù)单芜,從查詢階段的shard列表中遍歷蜕该,跳過查詢結(jié)果為空的 shard。其中也會封裝一些分頁信息的數(shù)據(jù)洲鸠。
private void executeFetch(....){
//發(fā)送請求
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
//處理成功的消息
@Override
public void innerOnResponse(FetchSearchResult result) {
try {
progressListener.notifyFetchResult(shardIndex);
counter.onResult(result);
} catch (Exception e) {
context.onPhaseFailure(FetchSearchPhase.this, "", e);
}
}
//處理失敗的消息
@Override
public void onFailure(Exception e) {
........
}
});
}
- 收集結(jié)果
使用了countDown多線程工具堂淡,fetchResults存儲某個分片的結(jié)果,每收到一個shard的數(shù)據(jù)就countDoun一下扒腕,當(dāng)都完畢后绢淀,觸發(fā)finishPhase。接著會進(jìn)行下一步:
CountedCollector:
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults, docIdsToLoad.length, finishPhase, context);
finishPhase:
final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, queryResults, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults.getAtomicArray());
- 執(zhí)行字段聚合
執(zhí)行字段折疊功能瘾腰,有興趣可以研究下皆的。即ExpandSearchPhase模塊。ES 5.3版本以后支持的Field Collapsing查詢蹋盆。通過該類查詢可以輕松實現(xiàn)按Field值進(jìn)行分類费薄,每個分類獲取排名前N的文檔。如在菜單行為日志中按菜單名稱(用戶管理栖雾、角色管理等)分類楞抡,獲取每個菜單排名點擊數(shù)前十的員工。用戶也可以按Field進(jìn)行Aggregation實現(xiàn)類似功能析藕,但Field Collapsing會更易用召廷、高效。
- 回復(fù)客戶端
ExpandSearchPhase執(zhí)行完了噪径,就返回給客戶端結(jié)果了柱恤。
context.sendSearchResponse(searchResponse, queryResults);
3.2.2 數(shù)據(jù)節(jié)點
處理數(shù)據(jù)節(jié)點請求的線程池為:search
根據(jù)前面的兩個階段,數(shù)據(jù)節(jié)點主要處理協(xié)調(diào)節(jié)點的兩類請求:query和fetch
- 響應(yīng)query請求
這里響應(yīng)的請求就是第一階段的query請求
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,
(request, channel, task) -> {
//執(zhí)行查詢
searchService.executeQueryPhase(request, keepStatesInContext(channel.getVersion()), (SearchShardTask) task,
//注冊結(jié)果監(jiān)聽器
new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));
});
executeQueryPhase:
public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext,
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
...........
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
.......
//執(zhí)行真正的請求
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
}
@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
}
executeQueryPhase會執(zhí)行l(wèi)oadOrExecuteQueryPhase方法
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
final boolean canCache = indicesService.canCache(request, context);
context.getQueryShardContext().freezeContext();
if (canCache) {
indicesService.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}
這里判斷是否從緩存查詢找爱,默認(rèn)啟用緩存梗顺,緩存的算法默認(rèn)為LRU,即刪除最近最少使用的數(shù)據(jù)车摄。如果不啟用緩存則會執(zhí)行queryPhase.execute(context);底層調(diào)用lucene進(jìn)行檢索寺谤,并且進(jìn)行聚合仑鸥。
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
.......
//聚合預(yù)處理
aggregationPhase.preProcess(searchContext);
.......
//全文檢索并打分
rescorePhase.execute(searchContext);
.......
//自動補全和糾錯
suggestPhase.execute(searchContext);
//實現(xiàn)聚合
aggregationPhase.execute(searchContext);
.......
}
關(guān)鍵點:
慢查詢?nèi)罩局械膓uery日志統(tǒng)計時間就是該步驟的時間;
聚合lucene的操作也是在本階段完成变屁;
查詢的時候會使用lRU緩存眼俊,緩存為節(jié)點級別的;
響應(yīng)fetch請求粟关;
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(request, (SearchShardTask) task,
new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));
});
執(zhí)行fetch疮胖;
調(diào)用fetchPhase的execute方法獲取doc;
將結(jié)果封裝到FetchSearchResult,調(diào)用網(wǎng)絡(luò)組件發(fā)送到response闷板。
3.3 小結(jié)
search是比較耗費資源的澎灸,它需要遍歷相關(guān)的所有分片,每個分片可能有多個lucene段遮晚,那么每個段都會遍歷一下性昭,因此ElasticSearch的常見優(yōu)化策略就是將段進(jìn)行合并;
分頁查詢的時候县遣,即使是查后面幾頁糜颠,也會將前幾頁的數(shù)據(jù)聚合進(jìn)行分頁,因此非常耗費內(nèi)存萧求,對于這種有深度分頁的需求可能要尋找其它的解決方式其兴。
四、總結(jié)
ElasticSearch查詢分為兩類饭聚,一類為GET忌警,另一類為SEARCH。它們使用場景不同秒梳。
如果對是實時性要求不高,可以GET查詢時不要刷新來提升性能箕速。
GET讀取一個分片失敗后酪碘,會嘗試從其它分片讀取。
慢query日志是統(tǒng)計數(shù)據(jù)節(jié)點接收到了query請求后的耗時日志盐茎。
每次分頁的請求都是一次重新搜索的過程兴垦,而不是從第一次搜索的結(jié)果中獲取,這樣深度分頁會比較耗費內(nèi)存字柠。這樣也符合常見使用場景探越,因為基本只看前幾頁,很少深度分頁窑业;如果確實有需要钦幔,可以采用scoll根據(jù)_scroll_id查詢的方式。
搜索需要遍歷分片所有的Lucene分段常柄,段的合并會對查詢性能有好處鲤氢。
聚會操作在lucene檢索完畢后ElasticSearch實現(xiàn)的搀擂。
本文主要分析了ElasticSearch分布式查詢主體流程,并未對lucene部分進(jìn)行分析卷玉,有興趣的可以自行查找相關(guān)資料哨颂。
程序員的核心競爭力其實還是技術(shù),因此對技術(shù)還是要不斷的學(xué)習(xí)相种,關(guān)注 “IT巔峰技術(shù)” 公眾號 威恼,該公眾號內(nèi)容定位:中高級開發(fā)、架構(gòu)師寝并、中層管理人員等中高端崗位服務(wù)的沃测,除了技術(shù)交流外還有很多架構(gòu)思想和實戰(zhàn)案例。
作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者食茎,同時也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人蒂破,曾就職于拼多多、德邦等公司别渔,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人附迷,主要負(fù)責(zé)開發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開發(fā)和運維管理哎媚、混合云及基礎(chǔ)服務(wù)平臺的建設(shè)喇伯。