轉(zhuǎn)自:http://hbasefly.com/2017/03/19/sparksql-basic-join/
Join背景介紹
Join是數(shù)據(jù)庫查詢永遠繞不開的話題,傳統(tǒng)查詢SQL技術總體可以分為簡單操作(過濾操作-where筹燕、排序操作-limit等)扰法,聚合操作-groupBy等以及Join操作等蛹含。其中Join操作是其中最復雜、代價最大的操作類型塞颁,也是OLAP場景中使用相對較多的操作浦箱。因此很有必要聊聊這個話題。
另外祠锣,從業(yè)務層面來講酷窥,用戶在數(shù)倉建設的時候也會涉及Join使用的問題。通常情況下伴网,數(shù)據(jù)倉庫中的表一般會分為”低層次表”和“高層次表”蓬推。
所謂”低層次表”,就是數(shù)據(jù)源導入數(shù)倉之后直接生成的表澡腾,單表列值較少沸伏,一般可以明顯歸為維度表或者事實表糕珊,表和表之間大多存在外健依賴,所以查詢起來會遇到大量Join運算毅糟,查詢效率相對比較差红选。而“高層次表”是在”低層次表”的基礎上加工轉(zhuǎn)換而來,通常做法是使用SQL語句將需要Join的表預先進行合并形成“寬表”姆另,在寬表上的查詢因為不需要執(zhí)行大量Join因而效率相對較高喇肋,很明顯,寬表缺點是數(shù)據(jù)會有大量冗余迹辐,而且生成相對比較滯后蝶防,查詢結(jié)果可能并不及時。
因此右核,為了獲得實效性更高的查詢結(jié)果慧脱,大多數(shù)場景還是需要進行復雜的Join操作。Join操作之所以復雜贺喝,不僅僅因為通常情況下其時間空間復雜度高菱鸥,更重要的是它有很多算法,在不同場景下需要選擇特定算法才能獲得最好的優(yōu)化效果躏鱼。關系型數(shù)據(jù)庫也有關于Join的各種用法氮采,姜承堯大神之前由淺入深地介紹過MySQL Join的各種算法以及調(diào)優(yōu)方案(關注公眾號InsideMySQL并回復join可以查看相關文章)。本文接下來會介紹SparkSQL所支持的幾種常見的Join算法以及其適用場景染苛。
Join常見分類以及基本實現(xiàn)機制
當前SparkSQL支持三種Join算法-shuffle hash join鹊漠、broadcast hash join以及sort merge join。其中前兩者歸根到底都屬于hash join茶行,只不過在hash join之前需要先shuffle還是先broadcast躯概。其實,這些算法并不是什么新鮮玩意畔师,都是數(shù)據(jù)庫幾十年前的老古董了(參考)娶靡,只不過換上了分布式的皮而已。不過話說回來看锉,SparkSQL/Hive…等等姿锭,所有這些大數(shù)據(jù)技術哪一樣不是來自于傳統(tǒng)數(shù)據(jù)庫技術,什么語法解析AST伯铣、基于規(guī)則優(yōu)化(CRO)呻此、基于代價優(yōu)化(CBO)、列存腔寡,都來自于傳統(tǒng)數(shù)據(jù)庫焚鲜。就拿shuffle hash join和broadcast hash join來說,hash join算法就來自于傳統(tǒng)數(shù)據(jù)庫,而shuffle和broadcast是大數(shù)據(jù)的皮恃泪,兩者一結(jié)合就成了大數(shù)據(jù)的算法了郑兴。因此可以這樣說,大數(shù)據(jù)的根就是傳統(tǒng)數(shù)據(jù)庫贝乎,傳統(tǒng)數(shù)據(jù)庫人才可以很快的轉(zhuǎn)型到大數(shù)據(jù)情连。好吧,這些都是閑篇览效。
繼續(xù)來看技術却舀,既然hash join是’內(nèi)核’,那就刨出來看看锤灿,看完把’皮’再分析一下挽拔。
Hash Join
先來看看這樣一條SQL語句:select * from order,item where item.id = order.i_id,很簡單一個Join節(jié)點但校,參與join的兩張表是item和order螃诅,join key分別是item.id以及order.i_id。現(xiàn)在假設這個Join采用的是hash join算法状囱,整個過程會經(jīng)歷三步:
1. 確定Build Table以及Probe Table:這個概念比較重要术裸,Build Table使用join key構(gòu)建Hash Table,而Probe Table使用join key進行探測亭枷,探測成功就可以join在一起袭艺。通常情況下,小表會作為Build Table叨粘,大表作為Probe Table猾编。此事例中item為Build Table,order為Probe Table升敲。
2. 構(gòu)建Hash Table:依次讀取Build Table(item)的數(shù)據(jù)答倡,對于每一行數(shù)據(jù)根據(jù)join key(item.id)進行hash,hash到對應的Bucket驴党,生成hash table中的一條記錄苇羡。數(shù)據(jù)緩存在內(nèi)存中,如果內(nèi)存放不下需要dump到外存鼻弧。
3. 探測:再依次掃描Probe Table(order)的數(shù)據(jù),使用相同的hash函數(shù)映射Hash Table中的記錄锦茁,映射成功之后再檢查join條件(item.id = order.i_id)攘轩,如果匹配成功就可以將兩者join在一起。
基本流程可以參考上圖码俩,這里有兩個小問題需要關注:
1. hash join性能如何度帮?很顯然,hash join基本都只掃描兩表一次,可以認為o(a+b)笨篷,較之最極端的笛卡爾集運算a*b瞳秽,不知甩了多少條街
2. 為什么Build Table選擇小表?道理很簡單率翅,因為構(gòu)建的Hash Table最好能全部加載在內(nèi)存练俐,效率最高;這也決定了hash join算法只適合至少一個小表的join場景冕臭,對于兩個大表的join場景并不適用腺晾;
上文說過,hash join是傳統(tǒng)數(shù)據(jù)庫中的單機join算法辜贵,在分布式環(huán)境下需要經(jīng)過一定的分布式改造悯蝉,說到底就是盡可能利用分布式計算資源進行并行化計算,提高總體效率托慨。hash join分布式改造一般有兩種經(jīng)典方案:
1. broadcast hash join:將其中一張小表廣播分發(fā)到另一張大表所在的分區(qū)節(jié)點上鼻由,分別并發(fā)地與其上的分區(qū)記錄進行hash join。broadcast適用于小表很小厚棵,可以直接廣播的場景蕉世。
2. shuffler hash join:一旦小表數(shù)據(jù)量較大,此時就不再適合進行廣播分發(fā)窟感。這種情況下讨彼,可以根據(jù)join key相同必然分區(qū)相同的原理,將兩張表分別按照join key進行重新組織分區(qū)柿祈,這樣就可以將join分而治之哈误,劃分為很多小join,充分利用集群資源并行化躏嚎。
Broadcast Hash Join
如下圖所示蜜自,broadcast hash join可以分為兩步:
1. broadcast階段:將小表廣播分發(fā)到大表所在的所有主機。廣播算法可以有很多卢佣,最簡單的是先發(fā)給driver重荠,driver再統(tǒng)一分發(fā)給所有executor;要不就是基于bittorrete的p2p思路虚茶;
2. hash join階段:在每個executor上執(zhí)行單機版hash join戈鲁,小表映射,大表試探嘹叫;
SparkSQL規(guī)定broadcast hash join執(zhí)行的基本條件為被廣播小表必須小于參數(shù)spark.sql.autoBroadcastJoinThreshold婆殿,默認為10M。
Shuffle Hash Join
在大數(shù)據(jù)條件下如果一張表很小罩扇,執(zhí)行join操作最優(yōu)的選擇無疑是broadcast hash join婆芦,效率最高怕磨。但是一旦小表數(shù)據(jù)量增大,廣播所需內(nèi)存消约、帶寬等資源必然就會太大肠鲫,broadcast hash join就不再是最優(yōu)方案。此時可以按照join key進行分區(qū),根據(jù)key相同必然分區(qū)相同的原理,就可以將大表join分而治之鹰晨,劃分為很多小表的join子巾,充分利用集群資源并行化。如下圖所示,shuffle hash join也可以分為兩步:
1. shuffle階段:分別將兩個表按照join key進行分區(qū),將相同join key的記錄重分布到同一節(jié)點,兩張表的數(shù)據(jù)會被重分布到集群中所有節(jié)點泡挺。這個過程稱為shuffle
2. hash join階段:每個分區(qū)節(jié)點上的數(shù)據(jù)單獨執(zhí)行單機hash join算法。
看到這里命浴,可以初步總結(jié)出來如果兩張小表join可以直接使用單機版hash join娄猫;如果一張大表join一張極小表,可以選擇broadcast hash join算法生闲;而如果是一張大表join一張小表媳溺,則可以選擇shuffle hash join算法;那如果是兩張大表進行join呢碍讯?
Sort-Merge Join
SparkSQL對兩張大表join采用了全新的算法-sort-merge join悬蔽,如下圖所示,整個過程分為三個步驟:
1. shuffle階段:將兩張大表根據(jù)join key進行重新分區(qū)捉兴,兩張表數(shù)據(jù)會分布到整個集群蝎困,以便分布式并行處理
2. sort階段:對單個分區(qū)節(jié)點的兩表數(shù)據(jù),分別進行排序
3. merge階段:對排好序的兩張分區(qū)表數(shù)據(jù)執(zhí)行join操作倍啥。join操作很簡單禾乘,分別遍歷兩個有序序列,碰到相同join key就merge輸出虽缕,否則取更小一邊始藕,見下圖示意:
仔細分析的話會發(fā)現(xiàn),sort-merge join的代價并不比shuffle hash join小氮趋,反而是多了很多伍派。那為什么SparkSQL還會在兩張大表的場景下選擇使用sort-merge join算法呢?這和Spark的shuffle實現(xiàn)有關剩胁,目前spark的shuffle實現(xiàn)都適用sort-based shuffle算法拙已,因此在經(jīng)過shuffle之后partition數(shù)據(jù)都是按照key排序的。因此理論上可以認為數(shù)據(jù)經(jīng)過shuffle之后是不需要sort的摧冀,可以直接merge。
經(jīng)過上文的分析,可以明確每種Join算法都有自己的適用場景索昂,數(shù)據(jù)倉庫設計時最好避免大表與大表的join查詢建车,SparkSQL也可以根據(jù)內(nèi)存資源、帶寬資源適量將參數(shù)spark.sql.autoBroadcastJoinThreshold調(diào)大椒惨,讓更多join實際執(zhí)行為broadcast hash join缤至。
總結(jié)
Join操作是傳統(tǒng)數(shù)據(jù)庫中的一個高級特性,尤其對于當前MySQL數(shù)據(jù)庫更是如此康谆,原因很簡單领斥,MySQL對Join的支持目前還比較有限,只支持Nested-Loop Join算法沃暗,因此在OLAP場景下MySQL是很難吃的消的月洛,不要去用MySQL去跑任何OLAP業(yè)務,結(jié)果真的很難看孽锥。不過好消息是MySQL在新版本要開始支持Hash Join了嚼黔,這樣也許在將來也可以用MySQL來處理一些小規(guī)模的OLAP業(yè)務。
和MySQL相比惜辑,PostgreSQL唬涧、SQLServer、Oracle等這些數(shù)據(jù)庫對Join支持更加全面一些盛撑,都支持Hash Join算法碎节。由PostgreSQL作為內(nèi)核構(gòu)建的分布式系統(tǒng)Greenplum更是在數(shù)據(jù)倉庫中占有一席之地,這和PostgreSQL對Join算法的支持其實有很大關系抵卫。
總體而言狮荔,傳統(tǒng)數(shù)據(jù)庫單機模式做Join的場景畢竟有限,也建議盡量減少使用Join陌僵。然而大數(shù)據(jù)領域就完全不同轴合,Join是標配,OLAP業(yè)務根本無法離開表與表之間的關聯(lián)碗短,對Join的支持成熟度一定程度上決定了系統(tǒng)的性能受葛,夸張點說,’得Join者得天下’偎谁。本文只是試圖帶大家真正走進Join的世界总滩,了解常用的幾種Join算法以及各自的適用場景。后面兩篇文章還會涉及Join的方方面面巡雨,敬請期待闰渔!