spark數(shù)據(jù)傾斜以及解決方案

對(duì) Spark/Hadoop 這樣的分布式大數(shù)據(jù)系統(tǒng)來(lái)講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜憎蛤。

對(duì)于分布式系統(tǒng)而言,理想情況下纪吮,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加俩檬,應(yīng)用整體耗時(shí)線性下降。如果一臺(tái)機(jī)器處理一批大量數(shù)據(jù)需要120分鐘碾盟,當(dāng)機(jī)器數(shù)量增加到3臺(tái)時(shí)棚辽,理想的耗時(shí)為120 / 3 = 40分鐘。但是冰肴,想做到分布式情況下每臺(tái)機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的1 / N屈藐,就必須保證每臺(tái)機(jī)器的任務(wù)量相等。不幸的是熙尉,很多時(shí)候联逻,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上检痰,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分包归。比如一臺(tái)機(jī)器負(fù)責(zé)處理 80% 的任務(wù)兄墅,另外兩臺(tái)機(jī)器各處理 10% 的任務(wù)牌废。

『不患多而患不均』效扫,這是分布式環(huán)境下最大的問(wèn)題在张。意味著計(jì)算能力不是線性擴(kuò)展的副签,而是存在短板效應(yīng): 一個(gè) Stage 所耗費(fèi)的時(shí)間咙冗,是由最慢的那個(gè) Task 決定植酥。

由于同一個(gè) Stage 內(nèi)的所有 task 執(zhí)行相同的計(jì)算阻桅,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下俭识,不同 task 之間耗時(shí)的差異主要由該 task 所處理的數(shù)據(jù)量決定慨削。所以,要想發(fā)揮分布式系統(tǒng)并行計(jì)算的優(yōu)勢(shì),就必須解決數(shù)據(jù)傾斜問(wèn)題缚态。


二:數(shù)據(jù)傾斜的危害

當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí)磁椒,小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過(guò)大玫芦,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)浆熔。

另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí)桥帆,部分任務(wù)處理的數(shù)據(jù)量過(guò)大医增,可能造成內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗老虫。


三:數(shù)據(jù)傾斜的現(xiàn)象

當(dāng)發(fā)現(xiàn)如下現(xiàn)象時(shí)叶骨,十有八九是發(fā)生數(shù)據(jù)傾斜了:

絕大多數(shù) task 執(zhí)行得都非常快祈匙,但個(gè)別 task 執(zhí)行極慢忽刽,整體任務(wù)卡在某個(gè)階段不能結(jié)束。

原本能夠正常執(zhí)行的 Spark 作業(yè)夺欲,某天突然報(bào)出 OOM(內(nèi)存溢出)異常跪帝,觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的些阅。這種情況比較少見(jiàn)伞剑。

TIPS

在 Spark streaming 程序中,數(shù)據(jù)傾斜更容易出現(xiàn)市埋,特別是在程序中包含一些類似 sql 的 join纸泄、group 這種操作的時(shí)候。因?yàn)?Spark Streaming 程序在運(yùn)行的時(shí)候腰素,我們一般不會(huì)分配特別多的內(nèi)存聘裁,因此一旦在這個(gè)過(guò)程中出現(xiàn)一些數(shù)據(jù)傾斜,就十分容易造成 OOM弓千。


四:數(shù)據(jù)傾斜的原因

在進(jìn)行 shuffle 的時(shí)候衡便,必須將各個(gè)節(jié)點(diǎn)上相同的 key 拉取到某個(gè)節(jié)點(diǎn)上的一個(gè) task 來(lái)進(jìn)行處理,比如按照 key 進(jìn)行聚合或 join 等操作洋访。此時(shí)如果某個(gè) key 對(duì)應(yīng)的數(shù)據(jù)量特別大的話镣陕,就會(huì)發(fā)生數(shù)據(jù)傾斜。比如大部分 key 對(duì)應(yīng)10條數(shù)據(jù)姻政,但是個(gè)別 key 卻對(duì)應(yīng)了100萬(wàn)條數(shù)據(jù)呆抑,那么大部分 task 可能就只會(huì)分配到10條數(shù)據(jù),然后1秒鐘就運(yùn)行完了汁展;但是個(gè)別 task 可能分配到了100萬(wàn)數(shù)據(jù)鹊碍,要運(yùn)行一兩個(gè)小時(shí)厌殉。

因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候,Spark 作業(yè)看起來(lái)會(huì)運(yùn)行得非常緩慢侈咕,甚至可能因?yàn)槟硞€(gè) task 處理的數(shù)據(jù)量過(guò)大導(dǎo)致內(nèi)存溢出公罕。


五:?jiǎn)栴}發(fā)現(xiàn)與定位

1,通過(guò) Spark Web UI

通過(guò) Spark Web UI 來(lái)查看當(dāng)前運(yùn)行的 stage 各個(gè) task 分配的數(shù)據(jù)量(Shuffle Read Size/Records)耀销,從而進(jìn)一步確定是不是 task 分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜楼眷。

知道數(shù)據(jù)傾斜發(fā)生在哪一個(gè) stage 之后,接著我們就需要根據(jù) stage 劃分原理熊尉,推算出來(lái)發(fā)生傾斜的那個(gè) stage 對(duì)應(yīng)代碼中的哪一部分罐柳,這部分代碼中肯定會(huì)有一個(gè) shuffle 類算子≌。可以通過(guò) countByKey 查看各個(gè) key 的分布张吉。

TIPS

數(shù)據(jù)傾斜只會(huì)發(fā)生在 shuffle 過(guò)程中。這里給大家羅列一些常用的并且可能會(huì)觸發(fā) shuffle 操作的算子: distinct转晰、groupByKey、reduceByKey士飒、aggregateByKey查邢、join、cogroup酵幕、repartition 等扰藕。出現(xiàn)數(shù)據(jù)傾斜時(shí),可能就是你的代碼中使用了這些算子中的某一個(gè)所導(dǎo)致的芳撒。

2邓深、通過(guò) key 統(tǒng)計(jì)

也可以通過(guò)抽樣統(tǒng)計(jì) key 的出現(xiàn)次數(shù)驗(yàn)證。

由于數(shù)據(jù)量巨大笔刹,可以采用抽樣的方式芥备,對(duì)數(shù)據(jù)進(jìn)行抽樣,統(tǒng)計(jì)出現(xiàn)的次數(shù)舌菜,根據(jù)出現(xiàn)次數(shù)大小排序取出前幾個(gè):

df.select("key").sample(false, 0.1) // 數(shù)據(jù)采樣

? ? .(k => (k, 1)).reduceBykey(_ + _)? ? ? ? // 統(tǒng)計(jì) key 出現(xiàn)的次數(shù)

? ? .map(k => (k._2, k._1)).sortByKey(false)? // 根據(jù) key 出現(xiàn)次數(shù)進(jìn)行排序

? ? .take(10)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 取前 10 個(gè)萌壳。

如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個(gè)別數(shù)據(jù)比其他數(shù)據(jù)大上若干個(gè)數(shù)量級(jí)日月,則說(shuō)明發(fā)生了數(shù)據(jù)傾斜袱瓮。


六、如何緩解數(shù)據(jù)傾斜

基本思路

業(yè)務(wù)邏輯: 我們從業(yè)務(wù)邏輯的層面上來(lái)優(yōu)化數(shù)據(jù)傾斜爱咬,比如要統(tǒng)計(jì)不同城市的訂單情況尺借,那么我們單獨(dú)對(duì)這一線城市來(lái)做 count,最后和其它城市做整合精拟。

程序?qū)崿F(xiàn): 比如說(shuō)在 Hive 中燎斩,經(jīng)常遇到 count(distinct)操作虱歪,這樣會(huì)導(dǎo)致最終只有一個(gè) reduce,我們可以先 group 再在外面包一層 count瘫里,就可以了实蔽;在 Spark 中使用 reduceByKey 替代 groupByKey 等。

參數(shù)調(diào)優(yōu): Hadoop 和 Spark 都自帶了很多的參數(shù)和機(jī)制來(lái)調(diào)節(jié)數(shù)據(jù)傾斜谨读,合理利用它們就能解決大部分問(wèn)題局装。

思路1. 過(guò)濾異常數(shù)據(jù)

如果導(dǎo)致數(shù)據(jù)傾斜的 key 是異常數(shù)據(jù),那么簡(jiǎn)單的過(guò)濾掉就可以了劳殖。

首先要對(duì) key 進(jìn)行分析铐尚,判斷是哪些 key 造成數(shù)據(jù)傾斜。具體方法上面已經(jīng)介紹過(guò)了哆姻,這里不贅述宣增。

然后對(duì)這些 key 對(duì)應(yīng)的記錄進(jìn)行分析:

空值或者異常值之類的,大多是這個(gè)原因引起

無(wú)效數(shù)據(jù)矛缨,大量重復(fù)的測(cè)試數(shù)據(jù)或是對(duì)結(jié)果影響不大的有效數(shù)據(jù)

有效數(shù)據(jù)爹脾,業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布

解決方案

對(duì)于第 1,2 種情況箕昭,直接對(duì)數(shù)據(jù)進(jìn)行過(guò)濾即可灵妨。

第3種情況則需要特殊的處理,具體我們下面詳細(xì)介紹落竹。


思路2 :

Spark 在做 Shuffle 時(shí)泌霍,默認(rèn)使用 HashPartitioner(非 Hash Shuffle)對(duì)數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè)置的不合適述召,可能造成大量不相同的 Key 對(duì)應(yīng)的數(shù)據(jù)被分配到了同一個(gè) Task 上朱转,造成該 Task 所處理的數(shù)據(jù)遠(yuǎn)大于其它 Task,從而造成數(shù)據(jù)傾斜积暖。

如果調(diào)整 Shuffle 時(shí)的并行度藤为,使得原本被分配到同一 Task 的不同 Key 發(fā)配到不同 Task 上處理,則可降低原 Task 所需處理的數(shù)據(jù)量夺刑,從而緩解數(shù)據(jù)傾斜問(wèn)題造成的短板效應(yīng)凉蜂。

(1)操作流程

RDD 操作 可在需要 Shuffle 的操作算子上直接設(shè)置并行度或者使用 spark.default.parallelism 設(shè)置。如果是 Spark SQL性誉,還可通過(guò) SET spark.sql.shuffle.partitions=[num_tasks]?設(shè)置并行度窿吩。默認(rèn)參數(shù)由不同的 Cluster Manager 控制。

dataFrame 和 sparkSql 可以設(shè)置 spark.sql.shuffle.partitions=[num_tasks]?參數(shù)控制 shuffle 的并發(fā)度错览,默認(rèn)為200纫雁。

(2)適用場(chǎng)景

大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過(guò)大。

(3)解決方案

調(diào)整并行度倾哺。一般是增大并行度轧邪,但有時(shí)如減小并行度也可達(dá)到效果刽脖。

(4)優(yōu)勢(shì)

實(shí)現(xiàn)簡(jiǎn)單,只需要參數(shù)調(diào)優(yōu)忌愚∏埽可用最小的代價(jià)解決問(wèn)題。一般如果出現(xiàn)數(shù)據(jù)傾斜硕糊,都可以通過(guò)這種方法先試驗(yàn)幾次院水,如果問(wèn)題未解決,再嘗試其它方法简十。

(5)劣勢(shì)

適用場(chǎng)景少檬某,只是讓每個(gè) task 執(zhí)行更少的不同的key。無(wú)法解決個(gè)別key特別大的情況造成的傾斜螟蝙,如果某些 key 的大小非常大恢恼,即使一個(gè) task 單獨(dú)執(zhí)行它,也會(huì)受到數(shù)據(jù)傾斜的困擾胰默。并且該方法一般只能緩解數(shù)據(jù)傾斜场斑,沒(méi)有徹底消除問(wèn)題。從實(shí)踐經(jīng)驗(yàn)來(lái)看牵署,其效果一般漏隐。

TIPS 可以把數(shù)據(jù)傾斜類比為 hash 沖突。提高并行度就類似于 提高 hash 表的大小碟刺。


思路3. 自定義 Partitioner

(1)原理

使用自定義的 Partitioner(默認(rèn)為 HashPartitioner)锁保,將原本被分配到同一個(gè) Task 的不同 Key 分配到不同 Task薯酝。

例如半沽,我們?cè)?groupByKey?算子上,使用自定義的 Partitioner:

.groupByKey(new?Partitioner()?{

? @Override

? public int numPartitions() {

? ? return 12;

? }

? @Override

? public int getPartition(Object key) {

? ? int id = Integer.parseInt(key.toString());

? ? if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {

? ? ? return (id - 9500000) / 12;

? ? } else {

? ? ? return id % 12;

? ? }

? }

})

TIPS 這個(gè)做法相當(dāng)于自定義 hash 表的 哈希函數(shù)吴菠。

(2)適用場(chǎng)景

大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過(guò)大者填。

(3)解決方案

使用自定義的 Partitioner 實(shí)現(xiàn)類代替默認(rèn)的 HashPartitioner,盡量將所有不同的 Key 均勻分配到不同的 Task 中做葵。

(4)優(yōu)勢(shì)

不影響原有的并行度設(shè)計(jì)占哟。如果改變并行度,后續(xù) Stage 的并行度也會(huì)默認(rèn)改變酿矢,可能會(huì)影響后續(xù) Stage榨乎。

(5)劣勢(shì)

適用場(chǎng)景有限,只能將不同 Key 分散開(kāi)瘫筐,對(duì)于同一 Key 對(duì)應(yīng)數(shù)據(jù)集非常大的場(chǎng)景不適用蜜暑。效果與調(diào)整并行度類似,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜策肝。而且需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的 Partitioner肛捍,不夠靈活隐绵。


思路4. Reduce 端 Join 轉(zhuǎn)化為 Map 端 Join

通過(guò) Spark 的 Broadcast 機(jī)制,將 Reduce 端 Join 轉(zhuǎn)化為 Map 端 Join拙毫,這意味著 Spark 現(xiàn)在不需要跨節(jié)點(diǎn)做 shuffle 而是直接通過(guò)本地文件進(jìn)行 join依许,從而完全消除 Shuffle 帶來(lái)的數(shù)據(jù)傾斜。

from?pyspark.sql.functions?import?broadcast

result = broadcast(A).join(B, ["join_col"], "left")

其中 A 是比較小的 dataframe 并且能夠整個(gè)存放在 executor 內(nèi)存中缀蹄。

(1)適用場(chǎng)景

參與Join的一邊數(shù)據(jù)集足夠小峭跳,可被加載進(jìn) Driver 并通過(guò) Broadcast 方法廣播到各個(gè) Executor 中。

(2)解決方案

在 Java/Scala 代碼中將小數(shù)據(jù)集數(shù)據(jù)拉取到 Driver袍患,然后通過(guò) Broadcast 方案將小數(shù)據(jù)集的數(shù)據(jù)廣播到各 Executor坦康。或者在使用 SQL 前诡延,將 Broadcast 的閾值調(diào)整得足夠大滞欠,從而使 Broadcast 生效。進(jìn)而將 Reduce Join 替換為 Map Join肆良。

(3)優(yōu)勢(shì)

避免了 Shuffle筛璧,徹底消除了數(shù)據(jù)傾斜產(chǎn)生的條件,可極大提升性能惹恃。

(4)劣勢(shì)

因?yàn)槭窍葘⑿?shù)據(jù)通過(guò) Broadcase 發(fā)送到每個(gè) executor 上夭谤,所以需要參與 Join 的一方數(shù)據(jù)集足夠小,并且主要適用于 Join 的場(chǎng)景巫糙,不適合聚合的場(chǎng)景朗儒,適用條件有限。

NOTES

使用Spark SQL時(shí)需要通過(guò) SET spark.sql.autoBroadcastJoinThreshold=104857600?將 Broadcast 的閾值設(shè)置得足夠大参淹,才會(huì)生效醉锄。

思路5. 拆分 join 再 union

思路很簡(jiǎn)單,就是將一個(gè) join 拆分成 傾斜數(shù)據(jù)集 Join 和 非傾斜數(shù)據(jù)集 Join浙值,最后進(jìn)行 union:

對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過(guò)大的 key 的那個(gè) RDD (假設(shè)是 leftRDD)恳不,通過(guò) sample 算子采樣出一份樣本來(lái),然后統(tǒng)計(jì)一下每個(gè) key 的數(shù)量开呐,計(jì)算出來(lái)數(shù)據(jù)量最大的是哪幾個(gè) key烟勋。具體方法上面已經(jīng)介紹過(guò)了,這里不贅述筐付。

然后將這 k 個(gè) key 對(duì)應(yīng)的數(shù)據(jù)從 leftRDD 中單獨(dú)過(guò)濾出來(lái)卵惦,并給每個(gè) key 都打上 1~n 以內(nèi)的隨機(jī)數(shù)作為前綴,形成一個(gè)單獨(dú)的 leftSkewRDD瓦戚;而不會(huì)導(dǎo)致傾斜的大部分 key 形成另外一個(gè) leftUnSkewRDD沮尿。

接著將需要 join 的另一個(gè) rightRDD,也過(guò)濾出來(lái)那幾個(gè)傾斜 key 并通過(guò) flatMap 操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為 n 條數(shù)據(jù)(這 n 條數(shù)據(jù)都按順序附加一個(gè) 0~n 的前綴)伤极,形成單獨(dú)的 rightSkewRDD蛹找;不會(huì)導(dǎo)致傾斜的大部分 key 也形成另外一個(gè) rightUnSkewRDD姨伤。

現(xiàn)在將 leftSkewRDD 與 膨脹 n 倍的 rightSkewRDD 進(jìn)行 join,且在 Join 過(guò)程中將隨機(jī)前綴去掉庸疾,得到傾斜數(shù)據(jù)集的 Join 結(jié)果 skewedJoinRDD乍楚。注意到此時(shí)我們已經(jīng)成功將原先相同的 key 打散成 n 份,分散到多個(gè) task 中去進(jìn)行 join 了届慈。

對(duì) leftUnSkewRDD 與 rightUnRDD 進(jìn)行Join徒溪,得到 Join 結(jié)果 unskewedJoinRDD。

通過(guò) union 算子將 skewedJoinRDD 與 unskewedJoinRDD 進(jìn)行合并金顿,從而得到完整的 Join 結(jié)果集臊泌。

TIPS

rightRDD 與傾斜 Key 對(duì)應(yīng)的部分?jǐn)?shù)據(jù),需要與隨機(jī)前綴集 (1~n) 作笛卡爾乘積 (即將數(shù)據(jù)量擴(kuò)大 n 倍)揍拆,從而保證無(wú)論數(shù)據(jù)傾斜側(cè)傾斜 Key 如何加前綴渠概,都能與之正常 Join。

skewRDD 的 join 并行度可以設(shè)置為 n * k (k 為 topSkewkey 的個(gè)數(shù))嫂拴。

由于傾斜Key與非傾斜Key的操作完全獨(dú)立播揪,可并行進(jìn)行。

(1)適用場(chǎng)景

兩張表都比較大筒狠,無(wú)法使用 Map 端 Join猪狈。其中一個(gè) RDD 有少數(shù)幾個(gè) Key 的數(shù)據(jù)量過(guò)大,另外一個(gè) RDD 的 Key 分布較為均勻辩恼。

(2)解決方案

將有數(shù)據(jù)傾斜的 RDD 中傾斜 Key 對(duì)應(yīng)的數(shù)據(jù)集單獨(dú)抽取出來(lái)加上隨機(jī)前綴雇庙,另外一個(gè) RDD 每條數(shù)據(jù)分別與隨機(jī)前綴結(jié)合形成新的RDD(相當(dāng)于將其數(shù)據(jù)增到到原來(lái)的N倍,N即為隨機(jī)前綴的總個(gè)數(shù))灶伊,然后將二者Join并去掉前綴疆前。然后將不包含傾斜Key的剩余數(shù)據(jù)進(jìn)行Join。最后將兩次Join的結(jié)果集通過(guò)union合并谁帕,即可得到全部Join結(jié)果峡继。

(3)優(yōu)勢(shì)

相對(duì)于 Map 則 Join冯袍,更能適應(yīng)大數(shù)據(jù)集的 Join匈挖。如果資源充足,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行康愤,效率提升明顯儡循。且只針對(duì)傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展,增加的資源消耗有限征冷。

(4)劣勢(shì)

如果傾斜 Key 非常多择膝,則另一側(cè)數(shù)據(jù)膨脹非常大,此方案不適用检激。而且此時(shí)對(duì)傾斜 Key 與非傾斜 Key 分開(kāi)處理肴捉,需要掃描數(shù)據(jù)集兩遍腹侣,增加了開(kāi)銷。


思路6. 大表 key 加鹽齿穗,小表擴(kuò)大 N 倍 jion

如果出現(xiàn)數(shù)據(jù)傾斜的 Key 比較多傲隶,上一種方法將這些大量的傾斜 Key 分拆出來(lái),意義不大窃页。此時(shí)更適合直接對(duì)存在數(shù)據(jù)傾斜的數(shù)據(jù)集全部加上隨機(jī)前綴跺株,然后對(duì)另外一個(gè)不存在嚴(yán)重?cái)?shù)據(jù)傾斜的數(shù)據(jù)集整體與隨機(jī)前綴集作笛卡爾乘積(即將數(shù)據(jù)量擴(kuò)大N倍)。

其實(shí)就是上一個(gè)方法的特例或者簡(jiǎn)化脖卖。少了拆分乒省,也就沒(méi)有 union。

(1)適用場(chǎng)景

一個(gè)數(shù)據(jù)集存在的傾斜 Key 比較多畦木,另外一個(gè)數(shù)據(jù)集數(shù)據(jù)分布比較均勻袖扛。

(2)優(yōu)勢(shì)

對(duì)大部分場(chǎng)景都適用,效果不錯(cuò)十籍。

(3)劣勢(shì)

需要將一個(gè)數(shù)據(jù)集整體擴(kuò)大 N 倍攻锰,會(huì)增加資源消耗。

思路7. map 端先局部聚合

在 map 端加個(gè) combiner 函數(shù)進(jìn)行局部聚合妓雾。加上 combiner 相當(dāng)于提前進(jìn)行 reduce ,就會(huì)把一個(gè) mapper 中的相同 key 進(jìn)行聚合娶吞,減少 shuffle 過(guò)程中數(shù)據(jù)量 以及 reduce 端的計(jì)算量。這種方法可以有效的緩解數(shù)據(jù)傾斜問(wèn)題械姻,但是如果導(dǎo)致數(shù)據(jù)傾斜的 key 大量分布在不同的 mapper 的時(shí)候妒蛇,這種方法就不是很有效了。

TIPS 使用 reduceByKey 而不是 groupByKey楷拳。

思路8. 加鹽局部聚合 + 去鹽全局聚合

這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合绣夺。第一次是局部聚合,先給每個(gè) key 都打上一個(gè) 1~n 的隨機(jī)數(shù)欢揖,比如 3 以內(nèi)的隨機(jī)數(shù)陶耍,此時(shí)原先一樣的 key 就變成不一樣的了,比如?(hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1)她混,就會(huì)變成?(1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)烈钞。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行 reduceByKey 等聚合操作坤按,進(jìn)行局部聚合毯欣,那么局部聚合結(jié)果,就會(huì)變成了?(1_hello, 2) (2_hello, 2) (3_hello, 1)臭脓。然后將各個(gè) key 的前綴給去掉酗钞,就會(huì)變成?(hello, 2) (hello, 2) (hello, 1),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果了砚作,比如?(hello, 5)窘奏。

defantiSkew():RDD[(String,Int)] = {valSPLIT="-"valprefix =newRandom().nextInt(10)? ? pairs.map(t => ( prefix +SPLIT+ t._1,1))? ? ? ? .reduceByKey((v1, v2) => v1 + v2)? ? ? ? .map(t => (t._1.split(SPLIT)(1), t2._2))? ? ? ? .reduceByKey((v1, v2) => v1 + v2)}

不過(guò)進(jìn)行兩次 mapreduce,性能稍微比一次的差些葫录。

七蔼夜、Hadoop 中的數(shù)據(jù)傾斜

Hadoop 中直接貼近用戶使用的是 Mapreduce 程序和 Hive 程序,雖說(shuō) Hive 最后也是用 MR 來(lái)執(zhí)行(至少目前 Hive 內(nèi)存計(jì)算并不普及)压昼,但是畢竟寫的內(nèi)容邏輯區(qū)別很大求冷,一個(gè)是程序,一個(gè)是Sql窍霞,因此這里稍作區(qū)分匠题。

Hadoop 中的數(shù)據(jù)傾斜主要表現(xiàn)在 ruduce 階段卡在99.99%,一直99.99%不能結(jié)束但金。

這里如果詳細(xì)的看日志或者和監(jiān)控界面的話會(huì)發(fā)現(xiàn):

有一個(gè)多幾個(gè) reduce 卡住

各種 container報(bào)錯(cuò) OOM

讀寫的數(shù)據(jù)量極大韭山,至少遠(yuǎn)遠(yuǎn)超過(guò)其它正常的 reduce

伴隨著數(shù)據(jù)傾斜,會(huì)出現(xiàn)任務(wù)被 kill 等各種詭異的表現(xiàn)冷溃。

經(jīng)驗(yàn):Hive的數(shù)據(jù)傾斜钱磅,一般都發(fā)生在 Sql 中 Group 和 On 上,而且和數(shù)據(jù)邏輯綁定比較深似枕。

優(yōu)化方法

這里列出來(lái)一些方法和思路盖淡,具體的參數(shù)和用法在官網(wǎng)看就行了。

map join 方式

count distinct 的操作凿歼,先轉(zhuǎn)成 group褪迟,再 count

參數(shù)調(diào)優(yōu)

set hive.map.aggr=true

set hive.groupby.skewindata=true

left semi jion 的使用

設(shè)置 map 端輸出、中間結(jié)果壓縮答憔。(不完全是解決數(shù)據(jù)傾斜的問(wèn)題味赃,但是減少了 IO 讀寫和網(wǎng)絡(luò)傳輸,能提高很多效率)

說(shuō)明

hive.map.aggr=true: 在map中會(huì)做部分聚集操作虐拓,效率更高但需要更多的內(nèi)存心俗。

hive.groupby.skewindata=true: 數(shù)據(jù)傾斜時(shí)負(fù)載均衡,當(dāng)選項(xiàng)設(shè)定為true蓉驹,生成的查詢計(jì)劃會(huì)有兩個(gè)MRJob城榛。第一個(gè)MRJob 中,Map的輸出結(jié)果集合會(huì)隨機(jī)分布到Reduce中戒幔,每個(gè)Reduce做部分聚合操作吠谢,并輸出結(jié)果土童,這樣處理的結(jié)果是相同的GroupBy Key有可能被分發(fā)到不同的Reduce中诗茎,從而達(dá)到負(fù)載均衡的目的;第二個(gè)MRJob再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照GroupBy Key分布到Reduce中(這個(gè)過(guò)程可以保證相同的GroupBy Key被分布到同一個(gè)Reduce中),最后完成最終的聚合操作敢订。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末王污,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子楚午,更是在濱河造成了極大的恐慌昭齐,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件矾柜,死亡現(xiàn)場(chǎng)離奇詭異阱驾,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)怪蔑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門里覆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人缆瓣,你說(shuō)我怎么就攤上這事喧枷。” “怎么了弓坞?”我有些...
    開(kāi)封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵隧甚,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我渡冻,道長(zhǎng)戚扳,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任族吻,我火速辦了婚禮咖城,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘呼奢。我一直安慰自己宜雀,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布握础。 她就那樣靜靜地躺著辐董,像睡著了一般。 火紅的嫁衣襯著肌膚如雪禀综。 梳的紋絲不亂的頭發(fā)上简烘,一...
    開(kāi)封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音定枷,去河邊找鬼孤澎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛欠窒,可吹牛的內(nèi)容都是我干的覆旭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼型将!你這毒婦竟也來(lái)了寂祥?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤七兜,失蹤者是張志新(化名)和其女友劉穎丸凭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體腕铸,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡惜犀,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了狠裹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片向拆。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖酪耳,靈堂內(nèi)的尸體忽然破棺而出浓恳,到底是詐尸還是另有隱情,我是刑警寧澤碗暗,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布颈将,位于F島的核電站,受9級(jí)特大地震影響言疗,放射性物質(zhì)發(fā)生泄漏晴圾。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一噪奄、第九天 我趴在偏房一處隱蔽的房頂上張望死姚。 院中可真熱鬧,春花似錦勤篮、人聲如沸都毒。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)账劲。三九已至,卻和暖如春金抡,著一層夾襖步出監(jiān)牢的瞬間瀑焦,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工梗肝, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留榛瓮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓巫击,卻偏偏與公主長(zhǎng)得像禀晓,于是被迫代替她去往敵國(guó)和親精续。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容