第九篇|Spark的五種JOIN策略解析

JOIN操作是非常常見(jiàn)的數(shù)據(jù)處理操作蛔六,Spark作為一個(gè)統(tǒng)一的大數(shù)據(jù)處理引擎朝捆,提供了非常豐富的JOIN場(chǎng)景。本文分享將介紹Spark所提供的5種JOIN策略懒豹,希望對(duì)你有所幫助。本文主要包括以下內(nèi)容:

  • 影響JOIN操作的因素
  • Spark中JOIN執(zhí)行的5種策略
  • Spark是如何選擇JOIN策略的

影響JOIN操作的因素

數(shù)據(jù)集的大小

參與JOIN的數(shù)據(jù)集的大小會(huì)直接影響Join操作的執(zhí)行效率脸秽。同樣,也會(huì)影響JOIN機(jī)制的選擇和JOIN的執(zhí)行效率记餐。

JOIN的條件

JOIN的條件會(huì)涉及字段之間的邏輯比較驮樊。根據(jù)JOIN的條件,JOIN可分為兩大類:等值連接非等值連接片酝。等值連接會(huì)涉及一個(gè)或多個(gè)需要同時(shí)滿足的相等條件。在兩個(gè)輸入數(shù)據(jù)集的屬性之間應(yīng)用每個(gè)等值條件练湿。當(dāng)使用其他運(yùn)算符(運(yùn)算連接符不為=)時(shí)审轮,稱之為非等值連接。

JOIN的類型

在輸入數(shù)據(jù)集的記錄之間應(yīng)用連接條件之后疾渣,JOIN類型會(huì)影響JOIN操作的結(jié)果。主要有以下幾種JOIN類型:

  • 內(nèi)連接(Inner Join):僅從輸入數(shù)據(jù)集中輸出匹配連接條件的記錄榴捡。
  • 外連接(Outer Join):又分為左外連接、右外鏈接和全外連接碧信。
  • 半連接(Semi Join):右表只用于過(guò)濾左表的數(shù)據(jù)而不出現(xiàn)在結(jié)果集中街夭。
  • 交叉連接(Cross Join):交叉聯(lián)接返回左表中的所有行,左表中的每一行與右表中的所有行組合呈枉。交叉聯(lián)接也稱作笛卡爾積。

Spark中JOIN執(zhí)行的5種策略

Spark提供了5種JOIN機(jī)制來(lái)執(zhí)行具體的JOIN操作猖辫。該5種JOIN機(jī)制如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

簡(jiǎn)介

當(dāng)要JOIN的表數(shù)據(jù)量比較大時(shí)啃憎,可以選擇Shuffle Hash Join。這樣可以將大表進(jìn)行按照J(rèn)OIN的key進(jìn)行重分區(qū)辛萍,保證每個(gè)相同的JOIN key都發(fā)送到同一個(gè)分區(qū)中。如下圖示:

image

如上圖所示:Shuffle Hash Join的基本步驟主要有以下兩點(diǎn):

  • 首先悯许,對(duì)于兩張參與JOIN的表辉阶,分別按照join key進(jìn)行重分區(qū),該過(guò)程會(huì)涉及Shuffle垃僚,其目的是將相同join key的數(shù)據(jù)發(fā)送到同一個(gè)分區(qū)店印,方便分區(qū)內(nèi)進(jìn)行join冈在。
  • 其次,對(duì)于每個(gè)Shuffle之后的分區(qū)按摘,會(huì)將小表的分區(qū)數(shù)據(jù)構(gòu)建成一個(gè)Hash table,然后根據(jù)join key與大表的分區(qū)數(shù)據(jù)記錄進(jìn)行匹配炫贤。

條件與特點(diǎn)

  • 僅支持等值連接,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類型
  • 需要對(duì)小表構(gòu)建Hash map侍郭,屬于內(nèi)存密集型的操作掠河,如果構(gòu)建Hash表的一側(cè)數(shù)據(jù)比較大,可能會(huì)造成OOM
  • 將參數(shù)spark.sql.join.prefersortmergeJoin (default true)置為false

Broadcast Hash Join

簡(jiǎn)介

也稱之為Map端JOIN爆捞。當(dāng)有一張表較小時(shí)勾拉,我們通常選擇Broadcast Hash Join盗温,這樣可以避免Shuffle帶來(lái)的開(kāi)銷成肘,從而提高性能。比如事實(shí)表與維表進(jìn)行JOIN時(shí)砚偶,由于維表的數(shù)據(jù)通常會(huì)很小店煞,所以可以使用Broadcast Hash Join將維表進(jìn)行Broadcast风钻。這樣可以避免數(shù)據(jù)的Shuffle(在Spark中Shuffle操作是很耗時(shí)的)骡技,從而提高JOIN的效率。在進(jìn)行 Broadcast Join 之前布朦,Spark 需要把處于 Executor 端的數(shù)據(jù)先發(fā)送到 Driver 端,然后 Driver 端再把數(shù)據(jù)廣播到 Executor 端涛舍。如果我們需要廣播的數(shù)據(jù)比較多唆途,會(huì)造成 Driver 端出現(xiàn) OOM。具體如下圖示:

image

Broadcast Hash Join主要包括兩個(gè)階段:

  • Broadcast階段 :小表被緩存在executor中
  • Hash Join階段:在每個(gè) executor中執(zhí)行Hash Join

條件與特點(diǎn)

  • 僅支持等值連接没佑,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類型
  • Broadcast Hash Join相比其他的JOIN機(jī)制而言蛤奢,效率更高陶贼。但是,Broadcast Hash Join屬于網(wǎng)絡(luò)密集型的操作(數(shù)據(jù)冗余傳輸)拜秧,除此之外,需要在Driver端緩存數(shù)據(jù)痢掠,所以當(dāng)小表的數(shù)據(jù)量較大時(shí),會(huì)出現(xiàn)OOM的情況
  • 被廣播的小表的數(shù)據(jù)量要小于spark.sql.autoBroadcastJoinThreshold值雄驹,默認(rèn)是10MB(10485760)
  • 被廣播表的大小閾值不能超過(guò)8GB淹辞,spark2.4源碼如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
  • 基表不能被broadcast,比如左連接時(shí)蔬将,只能將右表進(jìn)行廣播央星。形如:fact_table.join(broadcast(dimension_table),可以不使用broadcast提示毙石,當(dāng)滿足條件時(shí)會(huì)自動(dòng)轉(zhuǎn)為該JOIN方式颓遏。

Sort Merge Join

簡(jiǎn)介

該JOIN機(jī)制是Spark默認(rèn)的,可以通過(guò)參數(shù)spark.sql.join.preferSortMergeJoin進(jìn)行配置滤灯,默認(rèn)是true曼玩,即優(yōu)先使用Sort Merge Join。一般在兩張大表進(jìn)行JOIN時(shí)弟孟,使用該方式样悟。Sort Merge Join可以減少集群中的數(shù)據(jù)傳輸,該方式不會(huì)先加載所有數(shù)據(jù)的到內(nèi)存陈症,然后進(jìn)行hashjoin震糖,但是在JOIN之前需要對(duì)join key進(jìn)行排序。具體圖示:

image

Sort Merge Join主要包括三個(gè)階段:

  • Shuffle Phase : 兩張大表根據(jù)Join key進(jìn)行Shuffle重分區(qū)
  • Sort Phase: 每個(gè)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序
  • Merge Phase: 對(duì)來(lái)自不同表的排序好的分區(qū)數(shù)據(jù)進(jìn)行JOIN论咏,通過(guò)遍歷元素,連接具有相同Join key值的行來(lái)合并數(shù)據(jù)集

條件與特點(diǎn)

  • 僅支持等值連接
  • 支持所有join類型
  • Join Keys是排序的
  • 參數(shù)spark.sql.join.prefersortmergeJoin (默認(rèn)true)設(shè)定為true

Cartesian Join

簡(jiǎn)介

如果 Spark 中兩張參與 Join 的表沒(méi)指定join key(ON 條件)那么會(huì)產(chǎn)生 Cartesian product join蠢护,這個(gè) Join 得到的結(jié)果其實(shí)就是兩張行數(shù)的乘積养涮。

條件

  • 僅支持內(nèi)連接
  • 支持等值和不等值連接
  • 開(kāi)啟參數(shù)spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

簡(jiǎn)介

該方式是在沒(méi)有合適的JOIN機(jī)制可供選擇時(shí),最終會(huì)選擇該種join策略懈凹。優(yōu)先級(jí)為:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

在Cartesian 與Broadcast Nested Loop Join之間悄谐,如果是內(nèi)連接,或者非等值連接威沫,則優(yōu)先選擇Broadcast Nested Loop策略洼专,當(dāng)時(shí)非等值連接并且一張表可以被廣播時(shí)孵构,會(huì)選擇Cartesian Join。

條件與特點(diǎn)

  • 支持等值和非等值連接
  • 支持所有的JOIN類型蜡镶,主要優(yōu)化點(diǎn)如下:
    • 當(dāng)右外連接時(shí)要廣播左表
    • 當(dāng)左外連接時(shí)要廣播右表
    • 當(dāng)內(nèi)連接時(shí)恤筛,要廣播左右兩張表

Spark是如何選擇JOIN策略的

等值連接的情況

有join提示(hints)的情況,按照下面的順序

  • 1.Broadcast Hint:如果join類型支持望伦,則選擇broadcast hash join
  • 2.Sort merge hint:如果join key是排序的煎殷,則選擇 sort-merge join
  • 3.shuffle hash hint:如果join類型支持, 選擇 shuffle hash join
  • 4.shuffle replicate NL hint: 如果是內(nèi)連接劣摇,選擇笛卡爾積方式

沒(méi)有join提示(hints)的情況弓乙,則逐個(gè)對(duì)照下面的規(guī)則

  • 1.如果join類型支持钧惧,并且其中一張表能夠被廣播(spark.sql.autoBroadcastJoinThreshold值勾习,默認(rèn)是10MB),則選擇 broadcast hash join
  • 2.如果參數(shù)spark.sql.join.preferSortMergeJoin設(shè)定為false追逮,且一張表足夠小(可以構(gòu)建一個(gè)hash map) 粹舵,則選擇shuffle hash join
  • 3.如果join keys 是排序的,則選擇sort-merge join
  • 4.如果是內(nèi)連接巴席,選擇 cartesian join
  • 5.如果可能會(huì)發(fā)生OOM或者沒(méi)有可以選擇的執(zhí)行策略诅需,則最終選擇broadcast nested loop join

非等值連接情況

有join提示(hints),按照下面的順序

  • 1.broadcast hint:選擇broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是內(nèi)連接赵刑,則選擇cartesian product join

沒(méi)有join提示(hints)场刑,則逐個(gè)對(duì)照下面的規(guī)則

  • 1.如果一張表足夠小(可以被廣播),則選擇 broadcast nested loop join
  • 2.如果是內(nèi)連接铐懊,則選擇cartesian product join
  • 3.如果可能會(huì)發(fā)生OOM或者沒(méi)有可以選擇的執(zhí)行策略瞎疼,則最終選擇broadcast nested loop join

join策略選擇的源碼片段

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {
          if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {
            None
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val maybeBuildSide = if (buildLeft && buildRight) {
            Some(desiredBuildSide)
          } else if (buildLeft) {
            Some(BuildLeft)
          } else if (buildRight) {
            Some(BuildRight)
          } else {
            None
          }

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  }

總結(jié)

本文主要介紹了Spark提供的5種JOIN策略,并對(duì)三種比較重要的JOIN策略進(jìn)行了圖示解析竿裂。首先對(duì)影響JOIN的因素進(jìn)行了梳理腻异,然后介紹了5種Spark的JOIN策略,并對(duì)每種JOIN策略的具體含義和觸發(fā)條件進(jìn)行了闡述,最后給出了JOIN策略選擇對(duì)應(yīng)的源碼片段给赞。希望本文能夠?qū)δ阌兴鶐椭?/p>

『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末矫户,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子柑蛇,更是在濱河造成了極大的恐慌驱闷,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盆耽,死亡現(xiàn)場(chǎng)離奇詭異扼菠,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)析恢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門冯痢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事振劳∮涂瘢” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵弱贼,是天一觀的道長(zhǎng)磷蛹。 經(jīng)常有香客問(wèn)我,道長(zhǎng)味咳,這世上最難降的妖魔是什么檬嘀? 我笑而不...
    開(kāi)封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任鸳兽,我火速辦了婚禮罕拂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘衷掷。我一直安慰自己蛋济,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布渡处。 她就那樣靜靜地躺著祟辟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪旧困。 梳的紋絲不亂的頭發(fā)上吼具,一...
    開(kāi)封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音怖竭,去河邊找鬼陡蝇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛登夫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鸦致,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼鲁纠!你這毒婦竟也來(lái)了鳍寂?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤捍壤,失蹤者是張志新(化名)和其女友劉穎鞍爱,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體盗扇,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沉填,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年翼闹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猎荠。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡关摇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出输虱,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布横堡,位于F島的核電站冠桃,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜污茵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一葬项、第九天 我趴在偏房一處隱蔽的房頂上張望民珍。 院中可真熱鬧,春花似錦嚷量、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至部蛇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間巷查,已是汗流浹背抹腿。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留崇败,地道東北人肩祥。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像岸霹,于是被迫代替她去往敵國(guó)和親将饺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子痛黎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345