偶然讀取到了字節(jié)跳動關(guān)于Spark做的一些優(yōu)化布轿,發(fā)現(xiàn)其中一項被稱為BuckedtJoin的優(yōu)化項
傳送門:Spark SQL 在字節(jié)跳動數(shù)據(jù)倉庫領(lǐng)域的優(yōu)化實踐
而我曾經(jīng)也實現(xiàn)過一個類似的解決方案哮笆,現(xiàn)在才知道這種方案有一個專業(yè)的名詞BucketJoin。此篇我們來介紹以下汰扭,在不進行Spark源碼修改的情況下疟呐,怎么實現(xiàn)BucketJoin
背景
Spark提供的Join方式主要有HashJoin、BroadcastJoin东且、SortMergeJoin启具;對應(yīng)的使用條件如下:
- BroadcastJoin:其中一個表非常小,可以被完整放入內(nèi)存中珊泳;
- HashJoin:兩個表都很大鲁冯,但其中一個表在進行Shuffle之后,在Reduce端的單partition數(shù)據(jù)能被加載導內(nèi)存中色查;
- SortMergeJoin:兩個表都很大薯演,且在Reduce端無法被加載導內(nèi)存,只能通過兩個有序迭代流進行命中比較秧了;
它們的實現(xiàn)原理不再贅述跨扮,性能表現(xiàn)為:
BroadcastJoin > HashJoin > SortMergeJoin
BucketJoin 適用場景
有這樣一種情況,Table 1 與 Table 2 內(nèi)的數(shù)據(jù)已經(jīng)按照相同的 Key 進行分桶且桶數(shù)都為 n验毡,同時桶內(nèi)按該 Key 排序衡创。對這兩張表進行 Join 時,可以避免 Shuffle晶通,直接啟動 n 個 Task 進行 Join璃氢,我們稱之為BucketJoin。
性能表現(xiàn)為
BroadcastJoin > BucketJoin > HashJoin > SortMergeJoin
1 BucketJoin 實現(xiàn)思路
解決思路:
BucketJoin在實現(xiàn)的過程中狮辽,根據(jù)數(shù)據(jù)量的不同一也,在拿到兩個相同分區(qū)之后巢寡,可以有以下兩種Join解決方案:
- right表可以放在內(nèi)存中:映射為HashMap,進行命中查詢即可椰苟;
- right表無法放在內(nèi)存中:保證左右表有序抑月,進行迭代流命中比較(類似SortMergeJoin在Reduce端的實現(xiàn)方案),如何保證有序舆蝴,兩種解法:
2.1 數(shù)據(jù)生成時進行排序爪幻;
2.2 讀入數(shù)據(jù)后調(diào)用DataFrame.sortInPartition進行分區(qū)內(nèi)排序;
2 基于Spark-RDD的實現(xiàn)方案
Spark有一個算子zipPartitions可以幫我們實現(xiàn)這個方案
//preservesPartitioning:是否適用left表的Partitionner來執(zhí)行計算
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B],
preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
這個算子執(zhí)行的前提是兩個RDD的分區(qū)數(shù)一致须误,計算時會將相同partitionId的數(shù)據(jù)放在同一個Task進行計算挨稿,計算規(guī)則就是傳入的f:針對兩個迭代器進行處理的匿名函數(shù)。
但是有一個問題京痢,正常讀入數(shù)據(jù)奶甘,Spark是不能保證HDFS上的數(shù)據(jù)和Partition的Id一致的,比如一個目錄下的part-00000文件祭椰,讀入后的partitionId就可能不是0臭家,導致兩個RDD的partition不能一一對應(yīng)。怎么保證讀入的partition能夠按照我設(shè)想的情況方淤,在HDFS上和RDD的partition是一致的呢钉赁?
我們知道,sc.textFile等讀數(shù)據(jù)方式携茂,是可以按照路徑的通配符你踩,或者多個路徑進行數(shù)據(jù)讀入的。如果直接讀取父目錄讳苦,或者通配符带膜,就不能保證數(shù)據(jù)分片和RDDpartition分片號對應(yīng)的。但是如果這樣讀:
//path1,path0都對應(yīng)文件而不是目錄
val data=sc.textFile("path0,path1,path2,path3....path63")
那么讀入的數(shù)據(jù)是按照路徑的順序生成對應(yīng)partitionid的鸳谜,讀入的RDD的partition和路徑對應(yīng)規(guī)則為:
partition0:path0
partition1:path1
...
partition63:path63
不過別高興太早膝藕,這么做可能得不到預(yù)想的結(jié)果,因為大部分的文件格式咐扭,是支持讀入時進行split的芭挽,比如一個text類型文件有500MB,按照HDFS單個Block256MB的規(guī)則蝗肪,會切分為256MB和246MB兩個Partition屈芜,導致后邊的partition又無法一一對應(yīng)了剃执,解決方案比較麻煩::
- 用一種無法被split的文件格式或壓縮方式進行存儲庇麦;
- 通過傳參冈爹,或重寫InputFormat,禁止數(shù)據(jù)塊split逛绵;
解決好上述的問題后怀各,那么我們可以這樣分別讀入兩個表:
val leftRDD = sc.textFile("patha0,patha1,patha2,patha3....patha63")
val rightRDD = sc.textFile("pathb0,pathb1,pathb2,pathb3....pathb63")
3 BucketJoin-代碼實現(xiàn)
示例選取了比較簡單的情況,兩個k-v表的Join:
RDD[(String, String)] + RDD[(String, String)] = RDD[(String, Option[String], Option[String])]
3.1 BucketJoin-內(nèi)存映射表實現(xiàn)
針對右表的數(shù)據(jù)可以放入內(nèi)存的情況
def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD
.zipPartitions(rightRDD) { (leftIter, rightIter) =>
val rightMemMap = rightIter.toMap
leftIter.map { case (k, v) => (k, Some(v), rightMemMap.get(k)) }
}
}
3.2 BucketJoin-內(nèi)存雙流迭代比較實現(xiàn)
針對右表的數(shù)據(jù)無法放入內(nèi)存的情況
def join(leftRDD: RDD[(String, String)], rightRDD: RDD[(String, String)]) {
val joinedRDD: RDD[(String, Option[String], Option[String])] = leftRDD.zipPartitions(rightRDD) { (leftIter, rightIter) =>
new Iterator[(String, Option[String], Option[String])]() {
private var currentLeftData: (String, String) = _
private var currentRightData: (String, String) = _
override def hasNext: Boolean = {
if (currentLeftData == null && leftIter.hasNext) currentLeftData = leftIter.next()
if (currentRightData == null && rightIter.hasNext) currentRightData = rightIter.next()
currentLeftData != null || currentRightData != null
}
override def next(): (String, Option[String], Option[String]) = {
assert(hasNext)
if (currentRightData.eq(null)) {
leftOnly
} else if (currentLeftData.eq(null)) {
rightOnly
} else {
val compare = currentLeftData._1.compareTo(currentRightData._1)
if (compare == 0) leftAndRight
else if (compare < 0) leftOnly
else rightOnly
}
}
private def leftAndRight(): (String, Option[String], Option[String]) = {
val currentLeft = currentLeftData
val currentRight = currentRightData
currentLeftData = null
currentRightData = null
(currentLeft._1, Some(currentLeft._2), Some(currentRight._2))
}
private def rightOnly(): (String, Option[String], Option[String]) = {
val current: (String, String) = currentRightData
currentRightData = null
(current._1, Option.empty[String], Some(current._2))
}
private def leftOnly(): (String, Option[String], Option[String]) = {
val current: (String, String) = currentLeftData
currentLeftData = null
(current._1, Some(current._2), Option.empty[String])
}
}
}
}
上述就是總體的實現(xiàn)啦术浪,目前的話瓢对,只有RDD是有zipPartitions算子的,所以想利用該加速只能在這一步的時候轉(zhuǎn)換為RDD來進行計算胰苏。
4 注意事項
zipPartitions的preservesPartitioning參數(shù)默認為false硕蛹,建議設(shè)置為true,這樣就可以利用左表的Partition特性進行數(shù)據(jù)本地化的Task分配硕并,配合一些數(shù)據(jù)本地化的參數(shù)調(diào)優(yōu)法焰,理想情況下,只需要較小的右表進行機器的讀取倔毙,與做Shuffle的多次排序+多次落盤+多次序列化范序列化埃仪,不知道要快到哪。
如果需要讀入后進行分片內(nèi)排序陕赃,目前只能使用DateFrame.sortInPartitions卵蛉,不過它是基于unsafeExternalSort進行排序的,要比基于Java對象排序快很多么库。