自定義Spark Partitioner提升es-hadoop Bulk效率

前言

之前寫(xiě)過(guò)一篇文章吧趣,如何提高ElasticSearch 索引速度。除了對(duì)ES本身的優(yōu)化以外,我現(xiàn)在大體思路是盡量將邏輯外移到Spark上,Spark的分布式計(jì)算能力強(qiáng),cpu密集型的很適合淑掌。這篇文章涉及的調(diào)整也是對(duì)SparkES 多維分析引擎設(shè)計(jì) 中提及的一個(gè)重要概念“shard to partition ,partition to shard ” 的實(shí)現(xiàn)。不過(guò)目前只涉及到構(gòu)建索引那塊蝶念。

問(wèn)題描述

當(dāng)你bulk數(shù)據(jù)到集群抛腕,按照ElasticSearch Bulk 源碼解析所描述的:

接著通過(guò)executeBulk方法進(jìn)入原來(lái)的流程。在該方法中媒殉,對(duì)bulkRequest.requests 進(jìn)行了兩次for循環(huán)担敌。

第一次判定如果是IndexRequest就調(diào)用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段廷蓉。

第二次是為了對(duì)數(shù)據(jù)進(jìn)行分揀柄错。大致是為了形成這么一種結(jié)構(gòu):

第二次就是對(duì)提交的數(shù)據(jù)進(jìn)行分揀,然后根據(jù)route/_id 等值找到每個(gè)數(shù)據(jù)所屬的Shard苦酱,最后將數(shù)據(jù)發(fā)送到對(duì)應(yīng)Shard所在的Node節(jié)點(diǎn)上。

然而這導(dǎo)致了兩個(gè)問(wèn)題:

  1. ES Node之間會(huì)形成N*N個(gè)連接给猾,消耗掉過(guò)多的bulk線程
  2. 出現(xiàn)了很多并不需要的網(wǎng)絡(luò)IO

所以我們希望能夠避免這種情況疫萤。

Spark Partition to ES Shard

我們希望能夠?qū)⒎謷倪壿嫹诺絊park端,保證Spark 的Partition 和ES的Shard 一一對(duì)應(yīng)敢伸,并且實(shí)現(xiàn)特定的Partitoner 保證數(shù)據(jù)到達(dá)ES都會(huì)被對(duì)應(yīng)的Shard所在的節(jié)點(diǎn)直接消費(fèi)扯饶,而不會(huì)再被轉(zhuǎn)發(fā)到其他節(jié)點(diǎn)。
經(jīng)過(guò)我的實(shí)際測(cè)試池颈,做了該調(diào)整后尾序,寫(xiě)入QPS有兩倍以上的提升

理論基礎(chǔ)

這里的理論基礎(chǔ)自然是es-hadoop項(xiàng)目。

類(lèi)的調(diào)用路徑關(guān)系為:

EsSpark -> 
     EsRDDWriter -> 
           RestService -> 
                  RestRepository -> 
                            RestClient ->
                                NetworkClient -> 
                                        CommonsHttpTransport

簡(jiǎn)單介紹下他們的作用:

  • EsSpark, 讀取ES和存儲(chǔ)ES的入口躯砰。通過(guò)隱式轉(zhuǎn)換每币,會(huì)顯得更Spark.
  • EsRDDWriter ,調(diào)用RestService創(chuàng)建PartitionWriter,對(duì)ES進(jìn)行數(shù)據(jù)寫(xiě)入
  • RestService,負(fù)責(zé)創(chuàng)建 RestRepository琢歇,PartitionWriter
  • RestRepository兰怠,bulk高層抽象,底層利用NetworkClient做真實(shí)的http請(qǐng)求李茫,另外也維護(hù)Buffer相關(guān)的揭保,典型比如積攢了多少條,多少M(fèi)之后進(jìn)行flush等魄宏。
  • NetworkClient 對(duì) CommonsHttpTransport的封裝秸侣,主要添加了一些節(jié)點(diǎn)校驗(yàn)功能。
  • CommonsHttpTransport 你可以認(rèn)為是對(duì)HttpClient的一個(gè)封裝

原來(lái)我以為需要對(duì)es-hadoop項(xiàng)目的源碼進(jìn)行修改才能實(shí)現(xiàn)前面提到的邏輯。事實(shí)上基于es-hadoop很容易實(shí)現(xiàn)上面提到的需求味榛。

我們現(xiàn)在解釋下為什么不需要修改源碼椭坚。

在RestService類(lèi)里,構(gòu)建RestRepository的時(shí)候励负,會(huì)判定是多索引還是單索引藕溅。對(duì)應(yīng)代碼如下:

RestRepository repository = (iformat.hasPattern() ?
 initMultiIndices(settings, currentSplit, resource, log) : 
initSingleIndex(settings, currentSplit, resource, log));

這里我們只解析單索引部分代碼,在對(duì)應(yīng)的initSingleIndex方法里有如下代碼:

int bucket = currentInstance % targetShards.size();
Shard chosenShard = orderedShards.get(bucket);
Node targetNode = targetShards.get(chosenShard);

先簡(jiǎn)要說(shuō)明下幾個(gè)參數(shù)變量继榆。

  • targetShards 是索引所有的主分片到對(duì)應(yīng)Node節(jié)點(diǎn)的映射巾表。
  • orderedShards 則是根據(jù)shardId 順序排序Shard集合
  • currentInstance 是partitionId

因?yàn)槲覀円呀?jīng)通過(guò)partitioner 將partitionId 轉(zhuǎn)化為shardId,
,也就是partitionId X 里的數(shù)據(jù)略吨,都是屬于shardId 為X集币。 也就是說(shuō)currentInstance == partitionId == shardId。
下面是我們推導(dǎo)出來(lái)的關(guān)系:

  • currentInstance < targetShards.size()
  • bucket == currentInstance == partitionId == shardId
  • targetNode 持有ShardId=currentInstance 的Primary Shard

所以這段代碼實(shí)際完成了partitionId 到 targetNode的映射關(guān)系翠忠。

ESShardPartitioner 實(shí)現(xiàn)

涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊鞠苟。在mr模塊里包含了ES的分片規(guī)則實(shí)現(xiàn)。 spark 模塊則包含ESShardPartitioner類(lèi)秽之。

代碼如下:

package org.elasticsearch.spark
import ....
class ESShardPartitioner(settings:String) extends Partitioner {
      protected val log = LogFactory.getLog(this.getClass())
      
      protected var _numPartitions = -1 
      
      override def numPartitions: Int = {   
        val newSettings = new PropertiesSettings().load(settings)
        val repository = new RestRepository(newSettings)
        val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
        repository.close()
        _numPartitions = targetShards.size()
        _numPartitions
      }

      override def getPartition(key: Any): Int = {
        val shardId = ShardAlg.shard(key.toString(), _numPartitions)
        shardId
      }
}

public class ShardAlg {
    public static int shard(String id, int shardNum) {
        int hash = Murmur3HashFunction.hash(id);
        return mod(hash, shardNum);
    }

    public static int mod(int v, int m) {
        int r = v % m;
        if (r < 0) {
            r += m;
        }
        return r;
    }
}

使用方式如下:


......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter =>
      try {
        val newSettings = new PropertiesSettings().load(settings)
        //創(chuàng)建EsRDDWriter
        val writer = EsRDDCreator.createWriter(newSettings.save())
        writer.write(TaskContext.get(), iter.map(f => f._2))        
      }

不過(guò)這種方式也是有一點(diǎn)問(wèn)題当娱,經(jīng)過(guò)partition 后,Spark Partition Num==ES Primary Shard Num考榨,這樣會(huì)使得Spark寫(xiě)入并發(fā)性會(huì)受到影響跨细。

這個(gè)和Spark Streaming 里KafkaRDD 的partition數(shù)受限于Kafka Partition Num 非常類(lèi)似。我之前也對(duì)這個(gè)做了擴(kuò)展河质,是的多個(gè)Spark Partition 可以映射到同一個(gè)Kafka Partition.

所以這里有第二套方案:

  1. 修改ESShardPartitioner冀惭,可以讓多個(gè)分區(qū)對(duì)應(yīng)一個(gè)Shard,并且通過(guò)一個(gè)Map維護(hù)這個(gè)關(guān)系
  2. 每個(gè)分區(qū)通過(guò)EsRDDWriter指定shardId進(jìn)行寫(xiě)入。

第二點(diǎn)可能需要修改es-hadoop源碼了掀鹅,不過(guò)修改也很簡(jiǎn)單散休,通過(guò)settings傳遞shardId,然后在RestService.initSingleIndex添加如下代碼:

if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){       
            targetNode = targetShards.get(orderedShards.get(Integer.parseInt(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID))));
        }

在創(chuàng)建EsRDDWriter時(shí)拷貝settings的副本并且加入對(duì)應(yīng)的ConfigurationOptions.ES_BULK_SHARDID.

使用時(shí)類(lèi)似下面這個(gè)例子:

//val settings = new SparkSettings(conf).save()
.partitionBy(new ESShardPartitioner(settings)).mapPartitionsWithIndex { (partitionIndex, iter) =>
      try {
      val writer = EsSpark.createEsRDDWriter[Map[String,String]](settings, resource)
       //shardToPartitions個(gè) Spark partition 對(duì)應(yīng)一個(gè)ES Shard
        val shardId = ESShardPartitioner.shardIdFromPartitionId(partionId, shardToPartitions)
       //強(qiáng)制該分片寫(xiě)入到特定的Shard里
        val stats = writer.writeToSpecificPrimaryShard(TaskContext.get(), shardId, iter.map(f => f._2))
        List(NewStats(stats.bulkTotalTime, stats.docsSent)).iterator
      } catch {

這樣可以把一份數(shù)據(jù)切成多分,并發(fā)寫(xiě)入ES的某個(gè)Shard.

總結(jié)

將ES的計(jì)算外移到Spark在這個(gè)場(chǎng)景中還是比較容易的乐尊。下次我還會(huì)專(zhuān)門(mén)寫(xiě)篇文章戚丸,剖析es-hadoop的實(shí)現(xiàn),以及一些關(guān)鍵參數(shù)科吭,尤其是一些類(lèi)的使用昏滴。方便我們對(duì)es-hadoop實(shí)現(xiàn)定制化修改。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末对人,一起剝皮案震驚了整個(gè)濱河市谣殊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌牺弄,老刑警劉巖姻几,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蛇捌,警方通過(guò)查閱死者的電腦和手機(jī)抚恒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)络拌,“玉大人俭驮,你說(shuō)我怎么就攤上這事〈好常” “怎么了混萝?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)萍恕。 經(jīng)常有香客問(wèn)我逸嘀,道長(zhǎng),這世上最難降的妖魔是什么允粤? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任崭倘,我火速辦了婚禮,結(jié)果婚禮上类垫,老公的妹妹穿的比我還像新娘司光。我一直安慰自己,他們只是感情好悉患,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布飘庄。 她就那樣靜靜地躺著,像睡著了一般购撼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谴仙,一...
    開(kāi)封第一講書(shū)人閱讀 51,624評(píng)論 1 305
  • 那天迂求,我揣著相機(jī)與錄音,去河邊找鬼晃跺。 笑死揩局,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的掀虎。 我是一名探鬼主播凌盯,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼烹玉!你這毒婦竟也來(lái)了驰怎?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤二打,失蹤者是張志新(化名)和其女友劉穎县忌,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡症杏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年装获,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片厉颤。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡穴豫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出逼友,到底是詐尸還是另有隱情精肃,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布翁逞,位于F島的核電站肋杖,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏挖函。R本人自食惡果不足惜状植,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望怨喘。 院中可真熱鬧津畸,春花似錦、人聲如沸必怜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)梳庆。三九已至暖途,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間膏执,已是汗流浹背驻售。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留更米,地道東北人欺栗。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像征峦,于是被迫代替她去往敵國(guó)和親迟几。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容