Spark Join優(yōu)化-BucketJoin實現(xiàn)

偶然讀取到了字節(jié)跳動關(guān)于Spark做的一些優(yōu)化布轿,發(fā)現(xiàn)其中一項被稱為BuckedtJoin的優(yōu)化項

傳送門:Spark SQL 在字節(jié)跳動數(shù)據(jù)倉庫領(lǐng)域的優(yōu)化實踐

而我曾經(jīng)也實現(xiàn)過一個類似的解決方案哮笆,現(xiàn)在才知道這種方案有一個專業(yè)的名詞BucketJoin。此篇我們來介紹以下汰扭,在不進行Spark源碼修改的情況下疟呐,怎么實現(xiàn)BucketJoin

背景

Spark提供的Join方式主要有HashJoin、BroadcastJoin东且、SortMergeJoin启具;對應(yīng)的使用條件如下:

  • BroadcastJoin:其中一個表非常小,可以被完整放入內(nèi)存中珊泳;
  • HashJoin:兩個表都很大鲁冯,但其中一個表在進行Shuffle之后,在Reduce端的單partition數(shù)據(jù)能被加載導內(nèi)存中色查;
  • SortMergeJoin:兩個表都很大薯演,且在Reduce端無法被加載導內(nèi)存,只能通過兩個有序迭代流進行命中比較秧了;

它們的實現(xiàn)原理不再贅述跨扮,性能表現(xiàn)為:

BroadcastJoin > HashJoin > SortMergeJoin

BucketJoin 適用場景

有這樣一種情況,Table 1 與 Table 2 內(nèi)的數(shù)據(jù)已經(jīng)按照相同的 Key 進行分桶且桶數(shù)都為 n验毡,同時桶內(nèi)按該 Key 排序衡创。對這兩張表進行 Join 時,可以避免 Shuffle晶通,直接啟動 n 個 Task 進行 Join璃氢,我們稱之為BucketJoin。


BucketJoin

性能表現(xiàn)為

BroadcastJoin > BucketJoin > HashJoin > SortMergeJoin

1 BucketJoin 實現(xiàn)思路

解決思路:

BucketJoin在實現(xiàn)的過程中狮辽,根據(jù)數(shù)據(jù)量的不同一也,在拿到兩個相同分區(qū)之后巢寡,可以有以下兩種Join解決方案:

  1. right表可以放在內(nèi)存中:映射為HashMap,進行命中查詢即可椰苟;
  2. right表無法放在內(nèi)存中:保證左右表有序抑月,進行迭代流命中比較(類似SortMergeJoin在Reduce端的實現(xiàn)方案),如何保證有序舆蝴,兩種解法:
    2.1 數(shù)據(jù)生成時進行排序爪幻;
    2.2 讀入數(shù)據(jù)后調(diào)用DataFrame.sortInPartition進行分區(qū)內(nèi)排序;

2 基于Spark-RDD的實現(xiàn)方案

Spark有一個算子zipPartitions可以幫我們實現(xiàn)這個方案

 //preservesPartitioning:是否適用left表的Partitionner來執(zhí)行計算
def zipPartitions[B: ClassTag, V: ClassTag]
  (rdd2: RDD[B], 
  preservesPartitioning: Boolean) 
  (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

這個算子執(zhí)行的前提是兩個RDD的分區(qū)數(shù)一致须误,計算時會將相同partitionId的數(shù)據(jù)放在同一個Task進行計算挨稿,計算規(guī)則就是傳入的f:針對兩個迭代器進行處理的匿名函數(shù)。

但是有一個問題京痢,正常讀入數(shù)據(jù)奶甘,Spark是不能保證HDFS上的數(shù)據(jù)和Partition的Id一致的,比如一個目錄下的part-00000文件祭椰,讀入后的partitionId就可能不是0臭家,導致兩個RDD的partition不能一一對應(yīng)。怎么保證讀入的partition能夠按照我設(shè)想的情況方淤,在HDFS上和RDD的partition是一致的呢钉赁?


靚仔懵逼

我們知道,sc.textFile等讀數(shù)據(jù)方式携茂,是可以按照路徑的通配符你踩,或者多個路徑進行數(shù)據(jù)讀入的。如果直接讀取父目錄讳苦,或者通配符带膜,就不能保證數(shù)據(jù)分片和RDDpartition分片號對應(yīng)的。但是如果這樣讀:

//path1,path0都對應(yīng)文件而不是目錄
val data=sc.textFile("path0,path1,path2,path3....path63")

那么讀入的數(shù)據(jù)是按照路徑的順序生成對應(yīng)partitionid的鸳谜,讀入的RDD的partition和路徑對應(yīng)規(guī)則為:

partition0:path0
partition1:path1
...
partition63:path63

不過別高興太早膝藕,這么做可能得不到預(yù)想的結(jié)果,因為大部分的文件格式咐扭,是支持讀入時進行split的芭挽,比如一個text類型文件有500MB,按照HDFS單個Block256MB的規(guī)則蝗肪,會切分為256MB和246MB兩個Partition屈芜,導致后邊的partition又無法一一對應(yīng)了剃执,解決方案比較麻煩::

  - 用一種無法被split的文件格式或壓縮方式進行存儲庇麦;
  - 通過傳參冈爹,或重寫InputFormat,禁止數(shù)據(jù)塊split逛绵;

解決好上述的問題后怀各,那么我們可以這樣分別讀入兩個表:

val leftRDD = sc.textFile("patha0,patha1,patha2,patha3....patha63")
val rightRDD = sc.textFile("pathb0,pathb1,pathb2,pathb3....pathb63")

3 BucketJoin-代碼實現(xiàn)

示例選取了比較簡單的情況,兩個k-v表的Join:

RDD[(String, String)] + RDD[(String, String)] =  RDD[(String, Option[String], Option[String])]

3.1 BucketJoin-內(nèi)存映射表實現(xiàn)

針對右表的數(shù)據(jù)可以放入內(nèi)存的情況

def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
  val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD
    .zipPartitions(rightRDD) { (leftIter, rightIter) =>
      val rightMemMap = rightIter.toMap
      leftIter.map { case (k, v) => (k, Some(v), rightMemMap.get(k)) }
    }
}

3.2 BucketJoin-內(nèi)存雙流迭代比較實現(xiàn)

針對右表的數(shù)據(jù)無法放入內(nèi)存的情況

def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
  val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD.zipPartitions(rightRDD) { (leftIter, rightIter) =>
    new Iterator[(String, Option[String], Option[String])]() {
      private var currentLeftData: (String, String) = _
      private var currentRightData: (String, String) = _

      override def hasNext: Boolean = {
        if (currentLeftData == null && leftIter.hasNext) currentLeftData = leftIter.next()
        if (currentRightData == null && rightIter.hasNext) currentRightData = rightIter.next()
        currentLeftData != null || currentRightData != null
      }

      override def next(): (String, Option[String], Option[String]) = {
        assert(hasNext)
        if (currentRightData.eq(null)) {
          leftOnly
        } else if (currentLeftData.eq(null)) {
          rightOnly
        } else {
          val compare = currentLeftData._1.compareTo(currentRightData._1)
          if (compare == 0) leftAndRight
          else if (compare < 0) leftOnly
          else rightOnly
        }
      }

      private def leftAndRight(): (String, Option[String], Option[String]) = {
        val currentLeft = currentLeftData
        val currentRight = currentRightData
        currentLeftData = null
        currentRightData = null
        (currentLeft._1, Some(currentLeft._2), Some(currentRight._2))
      }

      private def rightOnly(): (String, Option[String], Option[String]) = {
        val current: (String, String) = currentRightData
        currentRightData = null
        (current._1, Option.empty[String], Some(current._2))
      }

      private def leftOnly(): (String, Option[String], Option[String]) = {
        val current: (String, String) = currentLeftData
        currentLeftData = null
        (current._1, Some(current._2), Option.empty[String])
      }

    }
  }
}

上述就是總體的實現(xiàn)啦术浪,目前的話瓢对,只有RDD是有zipPartitions算子的,所以想利用該加速只能在這一步的時候轉(zhuǎn)換為RDD來進行計算胰苏。

4 注意事項

zipPartitions的preservesPartitioning參數(shù)默認為false硕蛹,建議設(shè)置為true,這樣就可以利用左表的Partition特性進行數(shù)據(jù)本地化的Task分配硕并,配合一些數(shù)據(jù)本地化的參數(shù)調(diào)優(yōu)法焰,理想情況下,只需要較小的右表進行機器的讀取倔毙,與做Shuffle的多次排序+多次落盤+多次序列化范序列化埃仪,不知道要快到哪。

如果需要讀入后進行分片內(nèi)排序陕赃,目前只能使用DateFrame.sortInPartitions卵蛉,不過它是基于unsafeExternalSort進行排序的,要比基于Java對象排序快很多么库。

結(jié)語

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末傻丝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子诉儒,更是在濱河造成了極大的恐慌葡缰,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忱反,死亡現(xiàn)場離奇詭異运准,居然都是意外死亡,警方通過查閱死者的電腦和手機缭受,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門胁澳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人米者,你說我怎么就攤上這事韭畸。” “怎么了蔓搞?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵胰丁,是天一觀的道長。 經(jīng)常有香客問我喂分,道長锦庸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任蒲祈,我火速辦了婚禮甘萧,結(jié)果婚禮上萝嘁,老公的妹妹穿的比我還像新娘。我一直安慰自己扬卷,他們只是感情好牙言,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著怪得,像睡著了一般咱枉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上徒恋,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天蚕断,我揣著相機與錄音,去河邊找鬼入挣。 笑死亿乳,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的财岔。 我是一名探鬼主播风皿,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼匠璧!你這毒婦竟也來了桐款?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤夷恍,失蹤者是張志新(化名)和其女友劉穎魔眨,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酿雪,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡遏暴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了指黎。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片朋凉。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖醋安,靈堂內(nèi)的尸體忽然破棺而出杂彭,到底是詐尸還是另有隱情,我是刑警寧澤吓揪,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布亲怠,位于F島的核電站,受9級特大地震影響柠辞,放射性物質(zhì)發(fā)生泄漏团秽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望习勤。 院中可真熱鬧踪栋,春花似錦、人聲如沸姻报。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吴旋。三九已至,卻和暖如春厢破,著一層夾襖步出監(jiān)牢的瞬間荣瑟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工摩泪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留笆焰,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓见坑,卻偏偏與公主長得像嚷掠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荞驴,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容