Join背景介紹
SQL的所有操作侦讨,可以分為簡(jiǎn)單操作(如過(guò)濾where吟策、限制次數(shù)limit等)和聚合操作(groupBy儒士,join等)。
其中檩坚,join操作是最復(fù)雜着撩、代價(jià)最大的操作類型,是大部分業(yè)務(wù)場(chǎng)景的性能瓶頸所在匾委;所以睹酌,今天我們基于SparkSQL,來(lái)簡(jiǎn)要的聊一下SparkSQL所支持的幾種常見(jiàn)的Join算法以及其適用場(chǎng)景剩檀。
首先,我們需要知道數(shù)倉(cāng)中表格的分類:按照是否會(huì)經(jīng)常涉及到Join操作旺芽,可以簡(jiǎn)單分為低層次表和高層次表沪猴。
低層次表:直接導(dǎo)入數(shù)倉(cāng)的表,列數(shù)少采章,與其他表存在外鍵依賴运嗜,查詢起來(lái)經(jīng)常會(huì)用到大量Join算法,查詢效率較低
高層次表:由低層次表加工而來(lái)悯舟,使用SQL將需要join的表預(yù)先合并担租,形成“寬表”。寬表上查詢不需要大量Join抵怎,因此效率較高奋救。但是,相對(duì)的是反惕,寬表的數(shù)據(jù)存在大量冗余尝艘,同時(shí)生成滯后,查詢不及時(shí)姿染。
Join使用的結(jié)論
Join常見(jiàn)分類&實(shí)現(xiàn)機(jī)制
當(dāng)前SparkSQL支持三種Join算法-shuffle hash join背亥、broadcast hash join以及sort merge join。其中前兩者歸根到底都屬于hash join,只不過(guò)在hash join之前需要先shuffle還是先broadcast狡汉。所以娄徊,首先我們來(lái)看一下內(nèi)核hash join的機(jī)制。
Hash Join
先來(lái)看一個(gè)簡(jiǎn)單的SQL:select * from order,item where?item.id?= order.id
參與join的兩張表是item和order盾戴,join key分別是item.id以及order.id寄锐,假設(shè)這個(gè)Join采用的是hash join算法,整個(gè)過(guò)程會(huì)經(jīng)歷三步:
1. 確定Build Table(映射表捻脖、小表)以及Probe Table(探查表锐峭、大表)。其中Build Table用于構(gòu)建Hash Table可婶,而Probe會(huì)遍歷自身所有key沿癞,映射到所生成的Hash Table上去匹配。
2. Build Table構(gòu)建Hash Table矛渴。依次讀取Build Table(item)的數(shù)據(jù)椎扬,對(duì)于每一行數(shù)據(jù)根據(jù)join key(item.id)進(jìn)行hash,hash到對(duì)應(yīng)的Bucket具温,生成hash table中的一條記錄蚕涤。數(shù)據(jù)緩存在內(nèi)存中,如果內(nèi)存放不下需要dump到外存铣猩。
3. Probe Table探測(cè)揖铜。依次掃描Probe Table(order)的數(shù)據(jù),使用相同的hash函數(shù)映射Hash Table中的記錄达皿,映射成功之后再檢查join條件(item.id= order.i_id)天吓,如果匹配成功就可以將兩者join在一起。
兩點(diǎn)補(bǔ)充:
1 hash join的性能峦椰。從上面的原理圖可以看出龄寞,hash join對(duì)兩張表基本只掃描一次,算法效率是o(a+b)汤功,比起蠻力的笛卡爾積算法的a*b快了很多數(shù)量級(jí)物邑。
2 為什么說(shuō)Build Table要盡量選擇小表呢?從原理上也看到了滔金,構(gòu)建的Hash Table是需要被頻繁訪問(wèn)的色解,所以Hash Table最好能全部加載到內(nèi)存里,這也決定了hash join只適合至少一個(gè)小表join的場(chǎng)景餐茵。
看完了hash join的內(nèi)核冒签,我們來(lái)看一下這種單機(jī)的算法,在大數(shù)據(jù)分布式情況下钟病,應(yīng)該如何去做萧恕。目前成熟的有兩套算法:broadcast hash join和shuffler hash join刚梭。
Broadcast Hash Join
broadcast hash join是將其中一張小表廣播分發(fā)到另一張大表所在的分區(qū)節(jié)點(diǎn)上,分別并發(fā)地與其上的分區(qū)記錄進(jìn)行hash join票唆。broadcast適用于小表很小织狐,可以直接廣播的場(chǎng)景翎蹈。
在執(zhí)行上墓臭,主要可以分為以下兩步:
1. broadcast階段:將小表廣播分發(fā)到大表所在的所有主機(jī)森逮。分發(fā)方式可以有driver分發(fā),或者采用p2p方式簿煌。
2. hash join階段:在每個(gè)executor上執(zhí)行單機(jī)版hash join氮唯,小表映射,大表試探姨伟;
需要注意的是惩琉,Spark中對(duì)于可以廣播的小表,默認(rèn)限制是10M以下夺荒。(參數(shù)是spark.sql.autoBroadcastJoinThreshold)
Shuffle Hash Join
當(dāng)join的一張表很小的時(shí)候瞒渠,使用broadcast hash join,無(wú)疑效率最高技扼。但是隨著小表逐漸變大伍玖,廣播所需內(nèi)存、帶寬等資源必然就會(huì)太大剿吻,所以才會(huì)有默認(rèn)10M的資源限制窍箍。
所以,當(dāng)小表逐漸變大時(shí)丽旅,就需要采用另一種Hash Join來(lái)處理:Shuffle Hash Join仔燕。
Shuffle Hash Join按照join key進(jìn)行分區(qū),根據(jù)key相同必然分區(qū)相同的原理魔招,將大表join分而治之,劃分為小表的join五辽,充分利用集群資源并行化執(zhí)行办斑。
在執(zhí)行上,主要可以分為以下兩步:
1. shuffle階段:分別將兩個(gè)表按照join key進(jìn)行分區(qū)杆逗,將相同join key的記錄重分布到同一節(jié)點(diǎn)乡翅,兩張表的數(shù)據(jù)會(huì)被重分布到集群中所有節(jié)點(diǎn)。
2. hash join階段:每個(gè)分區(qū)節(jié)點(diǎn)上的數(shù)據(jù)單獨(dú)執(zhí)行單機(jī)hash join算法罪郊。
剛才也說(shuō)過(guò)蠕蚜,Hash Join適合至少有一個(gè)小表的情況,那如果兩個(gè)大表需要Join呢悔橄?這時(shí)候就需要Sort-Merge Join了靶累。
Sort-Merge Join
SparkSQL對(duì)兩張大表join采用了全新的算法-sort-merge join腺毫,整個(gè)過(guò)程分為三個(gè)步驟:
1. shuffle階段:將兩張大表根據(jù)join key進(jìn)行重新分區(qū),兩張表數(shù)據(jù)會(huì)分布到整個(gè)集群挣柬,以便分布式并行處理
2. sort階段:對(duì)單個(gè)分區(qū)節(jié)點(diǎn)的兩表數(shù)據(jù)潮酒,分別進(jìn)行排序
3. merge階段:對(duì)排好序的兩張分區(qū)表數(shù)據(jù)執(zhí)行join操作。join操作很簡(jiǎn)單邪蛔,分別遍歷兩個(gè)有序序列急黎,碰到相同join key就merge輸出,否則繼續(xù)取更小一邊的key侧到。
仔細(xì)分析的話會(huì)發(fā)現(xiàn)勃教,sort-merge join的代價(jià)并不比shuffle hash join小,反而是多了很多匠抗。那為什么SparkSQL還會(huì)在兩張大表的場(chǎng)景下選擇使用sort-merge join算法呢故源?
這和Spark的shuffle實(shí)現(xiàn)有關(guān),目前spark的shuffle實(shí)現(xiàn)都適用sort-based shuffle算法戈咳,因此在經(jīng)過(guò)shuffle之后partition數(shù)據(jù)都是按照key排序的心软。因此理論上可以認(rèn)為數(shù)據(jù)經(jīng)過(guò)shuffle之后是不需要sort的,可以直接merge著蛙。
結(jié)論:如何優(yōu)化
經(jīng)過(guò)上文的分析删铃,可以明確每種Join算法都有自己的適用場(chǎng)景。在優(yōu)化的時(shí)候踏堡,除了要根據(jù)業(yè)務(wù)場(chǎng)景選擇合適的join算法之外猎唁,還要注意以下幾點(diǎn):
1 數(shù)據(jù)倉(cāng)庫(kù)設(shè)計(jì)時(shí)最好避免大表與大表的join查詢。
2 SparkSQL也可以根據(jù)內(nèi)存資源顷蟆、帶寬資源適量將參數(shù)spark.sql.autoBroadcastJoinThreshold調(diào)大诫隅,讓更多join實(shí)際執(zhí)行為broadcast hash join。
文集
文章
Spark難點(diǎn)解析:Join實(shí)現(xiàn)原理
可視化發(fā)現(xiàn)Spark數(shù)據(jù)傾斜
參考鏈接:
SparkSQL – 有必要坐下來(lái)聊聊Join:http://hbasefly.com/2017/03/19/sparksql-basic-join/