Spark的join操作可能觸發(fā)shuffle操作。shuffle操作要經(jīng)過磁盤IO肴茄,網(wǎng)絡傳輸晌畅,對性能影響比較大。本文聊一聊Spark的join在哪些情況下可以避免shuffle過程寡痰。
1 DataFrame/Dataset的join如何避免shuffle
針對Spark DataFrame/DataSet的join抗楔,可以通過broadcast join和bucket join來避免shuffle操作。
1.1 Broadcast Join
Broadcast join很好理解拦坠,小表被分發(fā)到所有executors连躏,所以不需要做shuffle就可以完成join. Spark SQL控制自動broadcast join的參數(shù)是:spark.sql.autoBroadcastJoinThreshold , 默認為10MB. 就是說當join中的一張表的size小于10MB時贞滨,spark會自動將其封裝為broadcast發(fā)送到所有結(jié)點入热,然后進行broadcast join. 當然也可以手動將join中的某張表轉(zhuǎn)化成broadcast :?
????????????????sparkSession.sparkContext.broadcast(df)
1.2 Bucket Join
Bucket join其實就是將要join的兩張表按照join columns(或join columns的子集)根據(jù)相同的partitioner預先做好分區(qū),并將這些分區(qū)信息存儲到catalog中(比如HiveExternalCatalog)晓铆;然后在讀取這兩張表并做join時才顿,spark根據(jù)bucket信息將兩張表的相同partition進行join即可,從而避免了shuffle的過程尤蒿。注意郑气,這里是避免了shuffle過程,并沒有完全避免網(wǎng)絡傳輸腰池,由于兩張表的相同partition不一定在同一臺機器上尾组,所以這里仍需要對其中一張表的partition進行網(wǎng)絡傳輸。關于spark bucketing的原理和使用細節(jié)可以參見這個視頻示弓。
2 RDD的join什么情況下可以避免shuffle
筆者這里想討論的是PairRDDFunctions類的join方法讳侨。在RDD對象中有一個隱式轉(zhuǎn)換可以將rdd轉(zhuǎn)換成PairRDDFunctions對象,這樣就可以直接在rdd對象上調(diào)用join方法:
2.1?PairRDDFunctions.join和PairRDDFunctions.cogroup
先來看看PairRDDFunctions的join方法:
PairRDDFunctions有多個重載的join方法奏属,上面這個只帶一個RDD對象的參數(shù)跨跨,我們接著看它調(diào)用的另一個重載的join方法:
可以看到,RDD的join實現(xiàn)是由cogroup方法完成的,cogroup完后得到的是類型為RDD[(K, (Iterable[V], Iterable[W]))]的rdd對象勇婴,其中K為key的類型忱嘹,V為第一張join表的value類型,W為第二張join表的value類型耕渴;然后拘悦,調(diào)用flatMapValues將其轉(zhuǎn)換成RDD[(K, V, W)]的rdd對象。
下面來看看PairRDDFunctions.cogroup方法的實現(xiàn):
cogroup中生成了CoGroupedRDD對象橱脸,所以關鍵是這個RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.
2.2?CoGroupedRDD
看看這個RDD的getDependencies方法:
其中的rdds就是進行cogroup的rdd序列础米,也就是PairRDDFunctions.cogroup方法中傳入的Seq(self, other)?.
重點來了,對于所有參與cogroup的rdd添诉,如果它的partitioner和結(jié)果CoGroupedRDD的partitioner相同屁桑,則該rdd會成為CoGroupedRDD的一個oneToOne窄依賴,否則就是一個shuffle依賴栏赴,即寬依賴蘑斧。
我們知道,只有寬依賴才會觸發(fā)shuffle艾帐,所以RDD的join可以避免shuffle的條件是:參與join的所有rdd的partitioner都和結(jié)果rdd的partitioner相同。
那么盆偿,結(jié)果rdd的partitioner是怎么確定的呢柒爸?上文講到PairRDDFunctions.join方法有多個重載,其中就有可以指定partitioner的重載事扭,如果沒有指定捎稚,則使用默認的partitioner,看看默認的partitioner是怎么確定的:
簡單地說就是:
1. 如果父rdds中有可用的合格的partitioner求橄,則直接使用其中分區(qū)數(shù)最大的那個partitioner今野;
2. 如果沒有,則根據(jù)默認分區(qū)數(shù)生成HashPartitioner.
至于怎樣的partitioner是合格的罐农,請讀者閱讀上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法条霜。
RDD的compute方法是真正計算得到數(shù)據(jù)的方法,我們來看看CoGroupedRDD的compute方法是怎么實現(xiàn)的:
可以看到涵亏,CoGroupedRDD的數(shù)據(jù)是根據(jù)不同的依賴從父rdd中獲取的:
1. 對于窄依賴宰睡,直接調(diào)用父rdd的iterator方法獲取對應partition的數(shù)據(jù)
2. 對于寬依賴,從shuffleManager獲取shuffleReader對象進行讀取气筋,這里就是shuffle read了
還有一個重點是讀取多個父rdds的數(shù)據(jù)后拆内,怎么將這些數(shù)據(jù)根據(jù)key進行cogroup?
這里用到了ExternalAppendOnlyMap來構(gòu)建key和grouped values的映射宠默。先來看看createExternalMap的實現(xiàn):
相關類型定義如下:
可以看到麸恍,ExternalAppendOnlyMap的構(gòu)造函數(shù)的參數(shù)是是三個方法參數(shù):
1. createCombiner : 對每個key創(chuàng)建用于合并values的combiner數(shù)據(jù)結(jié)構(gòu),在這里就是一個CoGroup的數(shù)據(jù)搀矫,數(shù)組大小就是dependencies的數(shù)量
2. mergeValue : 將每個value合并到對應key的combiner數(shù)據(jù)結(jié)構(gòu)中抹沪,在這里就是將一個CoGroupValue對象添加到其所在rdd對應的CoGroup中
3. mergeCombiners : 合并相同key的兩個combiner數(shù)據(jù)結(jié)構(gòu)刻肄,在這里就是合并兩個CoGroupCombiner對象
CoGroupedRDD.compute會調(diào)用ExternalAppendOnlyMap.insertAll方法將從父rdds得到的數(shù)據(jù)一個一個地插入到ExternalAppendOnlyMap對象中進行合并。
最后采够,以這個ExternalAppendOnlyMap對象作為參數(shù)構(gòu)造InterruptibleIterator肄方,這個iterator會被調(diào)用者用于訪問CoGroupedRDD的單個partition的所有數(shù)據(jù)。
3 總結(jié)
本文簡單地介紹了DataFrame/DataSet如何避免join中的shuffle過程蹬癌,并根據(jù)源碼詳述了RDD的join操作的具體實現(xiàn)細節(jié)权她,分析了RDD的join在什么情況下可以避免shuffle過程。
4 說明
1. 源碼版本:2.4.0
2. 水平有限逝薪,如有錯誤隅要,望讀者指出