Elasticsearch 獲取寫入Doc shardID的源碼分析

前言

平時在研究ES的分布式Doc(文檔)寫入操作時泛领,我們已經(jīng)知道對將要寫入的Doc,ES首先會計算其應(yīng)該寫入到索引的哪個分片敛惊,然后在根據(jù)集群metaData中的路由信息判斷此分片所在的ES節(jié)點(diǎn)渊鞋,最后將寫入請求發(fā)送到這個節(jié)點(diǎn)并完成最終的寫入操作。寫入流程說明如下:

接下來我們主要研究步驟2中談到的(節(jié)點(diǎn)使用文檔的_id確定文檔屬于分片0)這個地方的源碼實現(xiàn)瞧挤,看看ES內(nèi)部確定一個Doc應(yīng)該被寫入的Shard的具體實現(xiàn)邏輯

源碼分析

當(dāng)前ES版本為5.6.16锡宋,確定待寫入Doc的Shard編號的主要代碼部分如下:

1. # TransportBulkAction.java

protected void doRun() throws Exception {
    final ClusterState clusterState = observer.setAndGetObservedState();
    if (handleBlockExceptions(clusterState)) {
        return;
    }
    final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
    MetaData metaData = clusterState.metaData();
 
   Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
   for (int i = 0; i < bulkRequest.requests.size(); i++) {
    DocWriteRequest request = bulkRequest.requests.get(i);
    if (request == null) {
        continue;
    }
    String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
    // 根據(jù)路由,找出doc寫入的目標(biāo)shard id
    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
    shardRequests.add(new BulkItemRequest(i, request));
    }   
}

    
1. # OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
    return shards(clusterState, index, id, routing).shardsIt();
}

protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
    int shardId = generateShardId(indexMetaData(clusterState, index), id, routing);
    return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}

static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id;
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
} 

TransportBulkAction類的doRun()方法中特恬,ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); 這行代碼獲取最終Doc的ShardID信息执俩。clusterService.operationRouting()方法返回OperationRouting對象,然后緊接著調(diào)用其indexShards(...)方法癌刽,接著進(jìn)入shards(...)方法役首,最后可看到int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); 這行代碼。這里最終得到分片編號shardId显拜,所以我們重點(diǎn)關(guān)注的邏輯就在generateShardId(...)方法中衡奥。generateShardId(...)方法接受indexMetaData(索引元數(shù)據(jù))、id(文檔Doc的id號远荠,即為此次寫入請求的id號)矮固、routing(寫入時自定義的routing信息)。下面我們重點(diǎn)看下generateShardId(...)方法內(nèi)部的邏輯譬淳。

static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id;
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

首先方法內(nèi)聲明了String類型的effectiveRouting與int類型的partitionOffset档址。ES在Doc寫入操作時通常有兩種方式:一種是指定routing盹兢,另一種是不指定routing。

指定routing的寫入方式類似為:
POST my_index/doc?routing=tony
{
"name": "tony",
"age": 10
}
routing的設(shè)置可以使得寫入的數(shù)據(jù)分布到當(dāng)前索引下的具體的某些分片中守伸,引入routing機(jī)制也是為了更好的搜索性能绎秒,使得遍歷的分片范圍可以進(jìn)一步的縮小尼摹;當(dāng)然同時要面臨著數(shù)據(jù)分布傾斜的風(fēng)險替裆。在routing機(jī)制下ES提供了一個有意義的設(shè)置項index.routing_partition_size,此參數(shù)在索引創(chuàng)建時結(jié)合著routing一起使用窘问。其意義是使得寫入的數(shù)據(jù)能夠集中的落入到routing_partition_size個分片集合中。比如索引my_index包含3個分片宜咒,若此時routing_partition_size的值設(shè)為2惠赫,那經(jīng)過routing寫入到my_index的數(shù)據(jù)只會落入其中的兩個分片,而另一個會處于閑置狀態(tài)故黑。ES官網(wǎng)指出routing_partition_size的值通常設(shè)置為大于1且小于number_of_shards儿咱。

當(dāng)寫入時不帶有routing機(jī)制(對應(yīng)到代碼routing==null, effectiveRouting=id),此時數(shù)據(jù)會經(jīng)過hash(doc_id) % number_primary_shards的方式均勻的寫入到各個主分片中场晶;通過routing機(jī)制寫入混埠,想要達(dá)到數(shù)據(jù)分布均勻,則上一種計算公式就不能滿足條件了诗轻,需要結(jié)合doc_id以及routing值重新計算钳宪。只是平時大部分的時候我們在寫入ES時并沒有指定routing,在ES內(nèi)部處理上默認(rèn)會把doc_id當(dāng)做_routing扳炬,因此我們對hash(doc_id) % number_primary_shards這個公式比較熟悉吏颖。帶有routing的寫入,effectiveRouting被賦予routing值恨樟。接下來代碼中會判斷當(dāng)前索引是否設(shè)置了routing_partition_size選項半醉,若存在則partitionOffset = hash(doc_id) % routing_partition_size值,否則partitionOffset=0劝术。接著到了calculateScaledShardId(...)方法缩多,方法如下:

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

方法中兩行代碼,本質(zhì)上對應(yīng)著ES官網(wǎng)上的公式shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards养晋;hash(_routing) + hash(_id) % routing_partition_size等價于Murmur3HashFunction.hash(effectiveRouting) + partitionOffset衬吆,然后再對num_primary_shards做取模運(yùn)算。整個公式的意思即_routing字段用于計算索引內(nèi)的分片子集匙握,然后_id用于選擇該分片子集內(nèi)的一個分片咆槽。這樣就完整的結(jié)合了routing與doc_id信息計算出具體的分片編號。認(rèn)真分析代碼我們會發(fā)現(xiàn)以下兩點(diǎn)可疑的地方:

  • 取模計算使用的是indexMetaData.getRoutingNumShards這個值圈纺,而不是number_of_primary對應(yīng)的值
  • 取模計算后接著又做了除以indexMetaData.getRoutingFactor的除法運(yùn)算

為啥此處要這樣做呢秦忿?經(jīng)過代碼注釋與實踐發(fā)現(xiàn)麦射,這個其實是包含索引shrink功能的計算方法。索引shrink允許我們將一個索引由比如原來的8個分片灯谣,shrink成為4潜秋、2、1三種數(shù)量的分片索引胎许,是一個比較有用的功能峻呛。關(guān)于factor,這里做個簡單的說明辜窑,比如數(shù)字6钩述,存在6、3穆碎、2牙勘、1四個因子(Factor)。從索引shrink的角度看factor所禀,比如8個shards同時存在4方面、2、1三個factor(8意義不大)色徘,所以indexMetaData.getRoutingFactor的值獲取的就是這個因子數(shù)恭金。另外這里一個重要的知識點(diǎn)是假定一個包含m(偶數(shù))個分片的索引A,經(jīng)過shrink之后(假定shrink為m/2個分片褂策,自然factor=2)變?yōu)樗饕鼴横腿,但此時索引B的getRoutingNumShards值依然為m,而非m/2辙培。有了這個知識點(diǎn)作鋪墊之后蔑水,我們就理解了為啥整個公式的計算結(jié)果后面要除以indexMetaData.getRoutingFactor的值了。因為公式中除數(shù)getRoutingNumShards沒有做同步的減小扬蕊,因此中間的計算結(jié)果需要同步除以getRoutingFactor的值搀别。通常索引的getRoutingFactor的值默認(rèn)為1,這個能夠理解尾抑,因為通常索引都是沒有做shrink操作的歇父。到此,我們就分析完了shardID的整個計算過程了再愈,計算的本質(zhì)沒有變化榜苫,因為要考慮routing以及shrink的功能,所以計算公式稍微變得復(fù)雜了些翎冲。

小結(jié)

到此結(jié)合著代碼垂睬,我們分析完了ES內(nèi)部計算一個將要寫入的Doc對應(yīng)的分片編號的整個過程。計算的本質(zhì)當(dāng)然是為了使得數(shù)據(jù)能夠均勻的分布在滿足條件的每個分片上。為了友好的支持其他的功能驹饺,計算會綜合考慮到其他的一些影響因素钳枕,比如shrink,routing赏壹。但計算的本質(zhì)沒有發(fā)生變化鱼炒。對于routing與shrink功能,文章中沒有貼出具體的詳細(xì)的實踐步驟蝌借,這塊希望大家后面動手實踐起來昔瞧,同時也結(jié)合著代碼一起研究起來,一起學(xué)習(xí)ES菩佑,一起進(jìn)步自晰。

引用
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市稍坯,隨后出現(xiàn)的幾起案子缀磕,更是在濱河造成了極大的恐慌,老刑警劉巖劣光,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異糟把,居然都是意外死亡绢涡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門遣疯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雄可,“玉大人,你說我怎么就攤上這事缠犀∈唬” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵辨液,是天一觀的道長虐急。 經(jīng)常有香客問我,道長滔迈,這世上最難降的妖魔是什么止吁? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮燎悍,結(jié)果婚禮上敬惦,老公的妹妹穿的比我還像新娘。我一直安慰自己谈山,他們只是感情好俄删,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般畴椰。 火紅的嫁衣襯著肌膚如雪臊诊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天迅矛,我揣著相機(jī)與錄音妨猩,去河邊找鬼。 笑死秽褒,一個胖子當(dāng)著我的面吹牛壶硅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播销斟,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼庐椒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蚂踊?” 一聲冷哼從身側(cè)響起约谈,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎犁钟,沒想到半個月后棱诱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡涝动,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年迈勋,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片醋粟。...
    茶點(diǎn)故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡靡菇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出米愿,到底是詐尸還是另有隱情厦凤,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布育苟,位于F島的核電站较鼓,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏违柏。R本人自食惡果不足惜笨腥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望勇垛。 院中可真熱鬧脖母,春花似錦、人聲如沸闲孤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肥照,卻和暖如春脚仔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背舆绎。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工鲤脏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吕朵。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓猎醇,卻偏偏與公主長得像,于是被迫代替她去往敵國和親努溃。 傳聞我的和親對象是個殘疾皇子硫嘶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評論 2 349