Spark的join什么情況下可以避免shuffle孤钦?

Spark的join操作可能觸發(fā)shuffle操作。shuffle操作要經(jīng)過磁盤IO肴茄,網(wǎng)絡傳輸晌畅,對性能影響比較大。本文聊一聊Spark的join在哪些情況下可以避免shuffle過程寡痰。

1 DataFrame/Dataset的join如何避免shuffle

針對Spark DataFrame/DataSet的join抗楔,可以通過broadcast join和bucket join來避免shuffle操作。

1.1 Broadcast Join

Broadcast join很好理解拦坠,小表被分發(fā)到所有executors连躏,所以不需要做shuffle就可以完成join. Spark SQL控制自動broadcast join的參數(shù)是:spark.sql.autoBroadcastJoinThreshold , 默認為10MB. 就是說當join中的一張表的size小于10MB時贞滨,spark會自動將其封裝為broadcast發(fā)送到所有結(jié)點入热,然后進行broadcast join. 當然也可以手動將join中的某張表轉(zhuǎn)化成broadcast :?

????????????????sparkSession.sparkContext.broadcast(df)

1.2 Bucket Join

Bucket join其實就是將要join的兩張表按照join columns(或join columns的子集)根據(jù)相同的partitioner預先做好分區(qū),并將這些分區(qū)信息存儲到catalog中(比如HiveExternalCatalog)晓铆;然后在讀取這兩張表并做join時才顿,spark根據(jù)bucket信息將兩張表的相同partition進行join即可,從而避免了shuffle的過程尤蒿。注意郑气,這里是避免了shuffle過程,并沒有完全避免網(wǎng)絡傳輸腰池,由于兩張表的相同partition不一定在同一臺機器上尾组,所以這里仍需要對其中一張表的partition進行網(wǎng)絡傳輸。關于spark bucketing的原理和使用細節(jié)可以參見這個視頻示弓。

2 RDD的join什么情況下可以避免shuffle

筆者這里想討論的是PairRDDFunctions類的join方法讳侨。在RDD對象中有一個隱式轉(zhuǎn)換可以將rdd轉(zhuǎn)換成PairRDDFunctions對象,這樣就可以直接在rdd對象上調(diào)用join方法:

RDD.scala : implicit conversion from RDD to PairRDDFunctions

2.1?PairRDDFunctions.join和PairRDDFunctions.cogroup

先來看看PairRDDFunctions的join方法:

PairRDDFunctions.join

PairRDDFunctions有多個重載的join方法奏属,上面這個只帶一個RDD對象的參數(shù)跨跨,我們接著看它調(diào)用的另一個重載的join方法:

PairRDDFunctions.join

可以看到,RDD的join實現(xiàn)是由cogroup方法完成的,cogroup完后得到的是類型為RDD[(K, (Iterable[V], Iterable[W]))]的rdd對象勇婴,其中K為key的類型忱嘹,V為第一張join表的value類型,W為第二張join表的value類型耕渴;然后拘悦,調(diào)用flatMapValues將其轉(zhuǎn)換成RDD[(K, V, W)]的rdd對象。

下面來看看PairRDDFunctions.cogroup方法的實現(xiàn):

PairRDDFunctions.cogroup

cogroup中生成了CoGroupedRDD對象橱脸,所以關鍵是這個RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.

2.2?CoGroupedRDD

看看這個RDD的getDependencies方法:

CoGroupedRDD.getDependencies

其中的rdds就是進行cogroup的rdd序列础米,也就是PairRDDFunctions.cogroup方法中傳入的Seq(self, other)?.

重點來了,對于所有參與cogroup的rdd添诉,如果它的partitioner和結(jié)果CoGroupedRDD的partitioner相同屁桑,則該rdd會成為CoGroupedRDD的一個oneToOne窄依賴,否則就是一個shuffle依賴栏赴,即寬依賴蘑斧。

我們知道,只有寬依賴才會觸發(fā)shuffle艾帐,所以RDD的join可以避免shuffle的條件是:參與join的所有rdd的partitioner都和結(jié)果rdd的partitioner相同。

那么盆偿,結(jié)果rdd的partitioner是怎么確定的呢柒爸?上文講到PairRDDFunctions.join方法有多個重載,其中就有可以指定partitioner的重載事扭,如果沒有指定捎稚,則使用默認的partitioner,看看默認的partitioner是怎么確定的:

Partitioner.defaultPartitioner

簡單地說就是:

1. 如果父rdds中有可用的合格的partitioner求橄,則直接使用其中分區(qū)數(shù)最大的那個partitioner今野;

2. 如果沒有,則根據(jù)默認分區(qū)數(shù)生成HashPartitioner.

至于怎樣的partitioner是合格的罐农,請讀者閱讀上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法条霜。

RDD的compute方法是真正計算得到數(shù)據(jù)的方法,我們來看看CoGroupedRDD的compute方法是怎么實現(xiàn)的:

CoGroupedRDD.compute

可以看到涵亏,CoGroupedRDD的數(shù)據(jù)是根據(jù)不同的依賴從父rdd中獲取的:

1. 對于窄依賴宰睡,直接調(diào)用父rdd的iterator方法獲取對應partition的數(shù)據(jù)

2. 對于寬依賴,從shuffleManager獲取shuffleReader對象進行讀取气筋,這里就是shuffle read了

還有一個重點是讀取多個父rdds的數(shù)據(jù)后拆内,怎么將這些數(shù)據(jù)根據(jù)key進行cogroup?

這里用到了ExternalAppendOnlyMap來構(gòu)建key和grouped values的映射宠默。先來看看createExternalMap的實現(xiàn):

CoGroupedRDD.createExternalMap

相關類型定義如下:

type definition in CoGroupedRDD

可以看到麸恍,ExternalAppendOnlyMap的構(gòu)造函數(shù)的參數(shù)是是三個方法參數(shù):

1. createCombiner : 對每個key創(chuàng)建用于合并values的combiner數(shù)據(jù)結(jié)構(gòu),在這里就是一個CoGroup的數(shù)據(jù)搀矫,數(shù)組大小就是dependencies的數(shù)量

2. mergeValue : 將每個value合并到對應key的combiner數(shù)據(jù)結(jié)構(gòu)中抹沪,在這里就是將一個CoGroupValue對象添加到其所在rdd對應的CoGroup中

3. mergeCombiners : 合并相同key的兩個combiner數(shù)據(jù)結(jié)構(gòu)刻肄,在這里就是合并兩個CoGroupCombiner對象

CoGroupedRDD.compute會調(diào)用ExternalAppendOnlyMap.insertAll方法將從父rdds得到的數(shù)據(jù)一個一個地插入到ExternalAppendOnlyMap對象中進行合并。

最后采够,以這個ExternalAppendOnlyMap對象作為參數(shù)構(gòu)造InterruptibleIterator肄方,這個iterator會被調(diào)用者用于訪問CoGroupedRDD的單個partition的所有數(shù)據(jù)。

3 總結(jié)

本文簡單地介紹了DataFrame/DataSet如何避免join中的shuffle過程蹬癌,并根據(jù)源碼詳述了RDD的join操作的具體實現(xiàn)細節(jié)权她,分析了RDD的join在什么情況下可以避免shuffle過程。

4 說明

1. 源碼版本:2.4.0

2. 水平有限逝薪,如有錯誤隅要,望讀者指出

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市董济,隨后出現(xiàn)的幾起案子步清,更是在濱河造成了極大的恐慌,老刑警劉巖虏肾,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件廓啊,死亡現(xiàn)場離奇詭異,居然都是意外死亡封豪,警方通過查閱死者的電腦和手機谴轮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吹埠,“玉大人第步,你說我怎么就攤上這事≡道牛” “怎么了粘都?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長刷袍。 經(jīng)常有香客問我翩隧,道長,這世上最難降的妖魔是什么呻纹? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任鸽心,我火速辦了婚禮,結(jié)果婚禮上居暖,老公的妹妹穿的比我還像新娘顽频。我一直安慰自己,他們只是感情好太闺,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布糯景。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蟀淮。 梳的紋絲不亂的頭發(fā)上最住,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機與錄音怠惶,去河邊找鬼涨缚。 笑死,一個胖子當著我的面吹牛策治,可吹牛的內(nèi)容都是我干的脓魏。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼通惫,長吁一口氣:“原來是場噩夢啊……” “哼茂翔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起履腋,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤珊燎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后遵湖,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體悔政,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年延旧,在試婚紗的時候發(fā)現(xiàn)自己被綠了谋国。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡垄潮,死狀恐怖烹卒,靈堂內(nèi)的尸體忽然破棺而出闷盔,到底是詐尸還是另有隱情弯洗,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布逢勾,位于F島的核電站牡整,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏溺拱。R本人自食惡果不足惜逃贝,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望迫摔。 院中可真熱鬧沐扳,春花似錦、人聲如沸句占。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至杨拐,卻和暖如春祈餐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背哄陶。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工帆阳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人屋吨。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓蜒谤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親离赫。 傳聞我的和親對象是個殘疾皇子芭逝,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345