前言
平時在研究ES的分布式Doc(文檔)寫入操作時泛领,我們已經(jīng)知道對將要寫入的Doc,ES首先會計算其應(yīng)該寫入到索引的哪個分片敛惊,然后在根據(jù)集群metaData中的路由信息判斷此分片所在的ES節(jié)點(diǎn)渊鞋,最后將寫入請求發(fā)送到這個節(jié)點(diǎn)并完成最終的寫入操作。寫入流程說明如下:
源碼分析
當(dāng)前ES版本為5.6.16锡宋,確定待寫入Doc的Shard編號的主要代碼部分如下:
1. # TransportBulkAction.java
protected void doRun() throws Exception {
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
MetaData metaData = clusterState.metaData();
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// 根據(jù)路由,找出doc寫入的目標(biāo)shard id
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
}
1. # OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
}
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetaData(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
TransportBulkAction類的doRun()方法中特恬,ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); 這行代碼獲取最終Doc的ShardID信息执俩。clusterService.operationRouting()方法返回OperationRouting對象,然后緊接著調(diào)用其indexShards(...)方法癌刽,接著進(jìn)入shards(...)方法役首,最后可看到int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); 這行代碼。這里最終得到分片編號shardId显拜,所以我們重點(diǎn)關(guān)注的邏輯就在generateShardId(...)方法中衡奥。generateShardId(...)方法接受indexMetaData(索引元數(shù)據(jù))、id(文檔Doc的id號远荠,即為此次寫入請求的id號)矮固、routing(寫入時自定義的routing信息)。下面我們重點(diǎn)看下generateShardId(...)方法內(nèi)部的邏輯譬淳。
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
首先方法內(nèi)聲明了String類型的effectiveRouting與int類型的partitionOffset档址。ES在Doc寫入操作時通常有兩種方式:一種是指定routing盹兢,另一種是不指定routing。
指定routing的寫入方式類似為:
POST my_index/doc?routing=tony
{
"name": "tony",
"age": 10
}
routing的設(shè)置可以使得寫入的數(shù)據(jù)分布到當(dāng)前索引下的具體的某些分片中守伸,引入routing機(jī)制也是為了更好的搜索性能绎秒,使得遍歷的分片范圍可以進(jìn)一步的縮小尼摹;當(dāng)然同時要面臨著數(shù)據(jù)分布傾斜的風(fēng)險替裆。在routing機(jī)制下ES提供了一個有意義的設(shè)置項index.routing_partition_size,此參數(shù)在索引創(chuàng)建時結(jié)合著routing一起使用窘问。其意義是使得寫入的數(shù)據(jù)能夠集中的落入到routing_partition_size個分片集合中。比如索引my_index包含3個分片宜咒,若此時routing_partition_size的值設(shè)為2惠赫,那經(jīng)過routing寫入到my_index的數(shù)據(jù)只會落入其中的兩個分片,而另一個會處于閑置狀態(tài)故黑。ES官網(wǎng)指出routing_partition_size的值通常設(shè)置為大于1且小于number_of_shards儿咱。
當(dāng)寫入時不帶有routing機(jī)制(對應(yīng)到代碼routing==null, effectiveRouting=id),此時數(shù)據(jù)會經(jīng)過hash(doc_id) % number_primary_shards的方式均勻的寫入到各個主分片中场晶;通過routing機(jī)制寫入混埠,想要達(dá)到數(shù)據(jù)分布均勻,則上一種計算公式就不能滿足條件了诗轻,需要結(jié)合doc_id以及routing值重新計算钳宪。只是平時大部分的時候我們在寫入ES時并沒有指定routing,在ES內(nèi)部處理上默認(rèn)會把doc_id當(dāng)做_routing扳炬,因此我們對hash(doc_id) % number_primary_shards這個公式比較熟悉吏颖。帶有routing的寫入,effectiveRouting被賦予routing值恨樟。接下來代碼中會判斷當(dāng)前索引是否設(shè)置了routing_partition_size選項半醉,若存在則partitionOffset = hash(doc_id) % routing_partition_size值,否則partitionOffset=0劝术。接著到了calculateScaledShardId(...)方法缩多,方法如下:
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
方法中兩行代碼,本質(zhì)上對應(yīng)著ES官網(wǎng)上的公式shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards养晋;hash(_routing) + hash(_id) % routing_partition_size等價于Murmur3HashFunction.hash(effectiveRouting) + partitionOffset衬吆,然后再對num_primary_shards做取模運(yùn)算。整個公式的意思即_routing字段用于計算索引內(nèi)的分片子集匙握,然后_id用于選擇該分片子集內(nèi)的一個分片咆槽。這樣就完整的結(jié)合了routing與doc_id信息計算出具體的分片編號。認(rèn)真分析代碼我們會發(fā)現(xiàn)以下兩點(diǎn)可疑的地方:
- 取模計算使用的是indexMetaData.getRoutingNumShards這個值圈纺,而不是number_of_primary對應(yīng)的值
- 取模計算后接著又做了除以indexMetaData.getRoutingFactor的除法運(yùn)算
為啥此處要這樣做呢秦忿?經(jīng)過代碼注釋與實踐發(fā)現(xiàn)麦射,這個其實是包含索引shrink功能的計算方法。索引shrink允許我們將一個索引由比如原來的8個分片灯谣,shrink成為4潜秋、2、1三種數(shù)量的分片索引胎许,是一個比較有用的功能峻呛。關(guān)于factor,這里做個簡單的說明辜窑,比如數(shù)字6钩述,存在6、3穆碎、2牙勘、1四個因子(Factor)。從索引shrink的角度看factor所禀,比如8個shards同時存在4方面、2、1三個factor(8意義不大)色徘,所以indexMetaData.getRoutingFactor的值獲取的就是這個因子數(shù)恭金。另外這里一個重要的知識點(diǎn)是假定一個包含m(偶數(shù))個分片的索引A,經(jīng)過shrink之后(假定shrink為m/2個分片褂策,自然factor=2)變?yōu)樗饕鼴横腿,但此時索引B的getRoutingNumShards值依然為m,而非m/2辙培。有了這個知識點(diǎn)作鋪墊之后蔑水,我們就理解了為啥整個公式的計算結(jié)果后面要除以indexMetaData.getRoutingFactor的值了。因為公式中除數(shù)getRoutingNumShards沒有做同步的減小扬蕊,因此中間的計算結(jié)果需要同步除以getRoutingFactor的值搀别。通常索引的getRoutingFactor的值默認(rèn)為1,這個能夠理解尾抑,因為通常索引都是沒有做shrink操作的歇父。到此,我們就分析完了shardID的整個計算過程了再愈,計算的本質(zhì)沒有變化榜苫,因為要考慮routing以及shrink的功能,所以計算公式稍微變得復(fù)雜了些翎冲。
小結(jié)
到此結(jié)合著代碼垂睬,我們分析完了ES內(nèi)部計算一個將要寫入的Doc對應(yīng)的分片編號的整個過程。計算的本質(zhì)當(dāng)然是為了使得數(shù)據(jù)能夠均勻的分布在滿足條件的每個分片上。為了友好的支持其他的功能驹饺,計算會綜合考慮到其他的一些影響因素钳枕,比如shrink,routing赏壹。但計算的本質(zhì)沒有發(fā)生變化鱼炒。對于routing與shrink功能,文章中沒有貼出具體的詳細(xì)的實踐步驟蝌借,這塊希望大家后面動手實踐起來昔瞧,同時也結(jié)合著代碼一起研究起來,一起學(xué)習(xí)ES菩佑,一起進(jìn)步自晰。