詳解 Spark 中的 Bucketing

什么是 Bucketing

Bucketing 就是利用 buckets(按列進(jìn)行分桶)來(lái)決定數(shù)據(jù)分區(qū)(partition)的一種優(yōu)化技術(shù)校读,它可以幫助在計(jì)算中避免數(shù)據(jù)交換(avoid data shuffle)。并行計(jì)算的時(shí)候shuffle常常會(huì)耗費(fèi)非常多的時(shí)間和資源.

Bucketing 的基本原理比較好理解祖能,它會(huì)根據(jù)你指定的列(可以是一個(gè)也可以是多個(gè))計(jì)算哈希值歉秫,然后具有相同哈希值的數(shù)據(jù)將會(huì)被分到相同的分區(qū)。

bucket

Bucket和Partition的區(qū)別

Bucket的最終目的也是實(shí)現(xiàn)分區(qū)养铸,但是和Partition的原理不同雁芙,當(dāng)我們根據(jù)指定列進(jìn)行Partition的時(shí)候,Spark會(huì)根據(jù)列的名字對(duì)數(shù)據(jù)進(jìn)行分區(qū)(如果沒(méi)有指定列名則會(huì)根據(jù)一個(gè)隨機(jī)信息對(duì)數(shù)據(jù)進(jìn)行分區(qū))钞螟。Bucketing的最大不同在于它使用了指定列的哈希值却特,這樣可以保證具有相同列值的數(shù)據(jù)被分到相同的分區(qū)。

怎么用 Bucket

按Bucket保存

目前在使用 bucketBy 的時(shí)候筛圆,必須和 sortBy,saveAsTable 一起使用椿浓,如下太援。這個(gè)操作其實(shí)是將數(shù)據(jù)保存到了文件中(如果不指定path,也會(huì)保存到一個(gè)臨時(shí)目錄中)扳碍。

df.write
  .bucketBy(10, "name")
  .sortBy("name")
  .mode(SaveMode.Overwrite)
  .option("path","/path/to")
  .saveAsTable("bucketed")

數(shù)據(jù)分桶保存之后提岔,我們才能使用它。

直接從table讀取

在一個(gè)SparkSession內(nèi)笋敞,保存之后你可以通過(guò)如下命令通過(guò)表名獲取其對(duì)應(yīng)的DataFrame.

val df = spark.table("bucketed")

其中spark是一個(gè)SparkSession對(duì)象碱蒙。獲取之后就可以使用DataFrame或者在SQL中使用表。

從已經(jīng)保存的Parquet文件讀取

如果你要使用歷史保存的數(shù)據(jù)夯巷,那么就不能用上述方法了赛惩,也不能像讀取常規(guī)文件一樣使用 spark.read.parquet() ,這種方式讀進(jìn)來(lái)的數(shù)據(jù)是不帶bucket信息的趁餐。正確的方法是利用CREATE TABLE 語(yǔ)句喷兼,詳情可用參考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
  USING data_source
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]
  [AS select_statement]

示例如下:

spark.sql(
  """
    |CREATE TABLE bucketed
    | (name string)
    |  USING PARQUET
    |  CLUSTERED BY (name) INTO 10 BUCKETS
    |  LOCATION '/path/to'
    |""".stripMargin)

用Buckets的好處

在我們join兩個(gè)表的時(shí)候,如果兩個(gè)表最好按照相同的列劃分成相同的buckets后雷,就可以完全避免shuffle季惯。根據(jù)前面所述的hash值計(jì)算方法吠各,兩個(gè)表具有相同列值的數(shù)據(jù)會(huì)存放在相同的機(jī)器上,這樣在進(jìn)行join操作時(shí)就不需要再去和其他機(jī)器通訊勉抓,直接在本地完成計(jì)算即可贾漏。假設(shè)你有左右兩個(gè)表,各有兩個(gè)分區(qū)藕筋,那么join的時(shí)候?qū)嶋H計(jì)算就是下圖的樣子纵散,兩個(gè)機(jī)器進(jìn)行計(jì)算,并且計(jì)算后分區(qū)還是2.

with bucket

而當(dāng)需要shuffle的時(shí)候念逞,會(huì)是這樣的困食,


without bucket

細(xì)心的你可能發(fā)現(xiàn)了,上面兩個(gè)分區(qū)對(duì)應(yīng)兩個(gè)Executor翎承,下面shuffle之后對(duì)應(yīng)的怎么成了三個(gè)Executor了硕盹?沒(méi)錯(cuò),當(dāng)數(shù)據(jù)進(jìn)行shuffle之后叨咖,分區(qū)數(shù)就不再保持和輸入的數(shù)據(jù)相同了瘩例,實(shí)際上也沒(méi)有必要保持相同。

本地測(cè)試

我們考慮的是大數(shù)據(jù)表的連接甸各,本地測(cè)試的時(shí)候一般使用小的表垛贤,所以逆序需要將小表自動(dòng)廣播的配置關(guān)掉。如果開啟小表廣播趣倾,那么兩個(gè)小表的join之后分區(qū)數(shù)是不會(huì)變的聘惦,例如:

左表分區(qū)數(shù) 右表分區(qū)數(shù)數(shù) Join之后的分區(qū)數(shù)
3 3 3

關(guān)閉配置的命令如下:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

正常情況下join之后分區(qū)數(shù)會(huì)發(fā)生變化:

左表分區(qū)數(shù) 右表分區(qū)數(shù)數(shù) Join之后的分區(qū)數(shù)
3 3 200

這個(gè)200其實(shí)就是 "spark.sql.shuffle.partitions" 配置的值,默認(rèn)就是200. 所以如果在Join過(guò)程中出現(xiàn)了shuffle儒恋,join之后的分區(qū)一定會(huì)變善绎,并且變成spark.sql.shuffle.partitions的值。通常你需要根據(jù)自己的集群資源修改這個(gè)值诫尽,從而優(yōu)化并行度禀酱,但是shuffle是不可避免的。

左右兩個(gè)表Bucket數(shù)目不一致時(shí)

實(shí)際測(cè)試結(jié)果如下:

左表Bucket數(shù) 右表Bucekt數(shù) Join之后的分區(qū)數(shù)
8 4 8
4 4 4

Spark依然會(huì)利用一些Bucekt的信息牧嫉,但具體怎么執(zhí)行目前還不太清楚剂跟,還是保持一致的好。

另外酣藻,如果你spark job的可用計(jì)算核心數(shù)小于Bucket值曹洽,那么從文件中讀取之后Bucekt值會(huì)變,就是說(shuō)bucket的數(shù)目不會(huì)超過(guò)你能使用的最大計(jì)算核數(shù)辽剧。

不要使用的 <=> 符號(hào)R陆唷!抖仅!

在處理null值的時(shí)候坊夫,我們可能會(huì)用到一些特殊的函數(shù)或者符號(hào)砖第,如下表所示。但是在使用bucket的時(shí)候這里有個(gè)坑环凿,一定要躲過(guò)梧兼。join的時(shí)候千萬(wàn)不要使用 <=> 符號(hào),使用之后spark就會(huì)忽略bucket信息智听,繼續(xù)shuffle數(shù)據(jù)羽杰,原因可能和hash計(jì)算有關(guān)。

null

原文連接

如果你喜歡我的文章到推,可以在任一平臺(tái)搜索【黑客悟理】關(guān)注我考赛,非常感謝!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末莉测,一起剝皮案震驚了整個(gè)濱河市颜骤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捣卤,老刑警劉巖忍抽,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異董朝,居然都是意外死亡鸠项,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門子姜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)祟绊,“玉大人,你說(shuō)我怎么就攤上這事哥捕∧脸椋” “怎么了?”我有些...
    開封第一講書人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵扭弧,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我记舆,道長(zhǎng)鸽捻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任泽腮,我火速辦了婚禮御蒲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘诊赊。我一直安慰自己厚满,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開白布碧磅。 她就那樣靜靜地躺著碘箍,像睡著了一般遵馆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上丰榴,一...
    開封第一講書人閱讀 49,806評(píng)論 1 290
  • 那天货邓,我揣著相機(jī)與錄音,去河邊找鬼四濒。 笑死换况,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的盗蟆。 我是一名探鬼主播戈二,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼喳资!你這毒婦竟也來(lái)了觉吭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤骨饿,失蹤者是張志新(化名)和其女友劉穎烂瘫,沒(méi)想到半個(gè)月后随闽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年雇逞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片期升。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拜姿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贴汪,到底是詐尸還是另有隱情脐往,我是刑警寧澤,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布扳埂,位于F島的核電站业簿,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏阳懂。R本人自食惡果不足惜梅尤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望岩调。 院中可真熱鬧巷燥,春花似錦、人聲如沸号枕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)葱淳。三九已至钝腺,卻和暖如春抛姑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拍屑。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工途戒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人僵驰。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓喷斋,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蒜茴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子星爪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348