背景
????????此次需要將10張表[A、B辆它、C誊薄、D、E锰茉、F呢蔫、G、H飒筑、I片吊、J]的數(shù)據(jù)union與表?中的一個字段進(jìn)行join,以達(dá)到篩選數(shù)據(jù)的目的协屡;其中表A的數(shù)據(jù)量約為320億俏脊,其余9個表各自數(shù)據(jù)量約為20億,表?的數(shù)據(jù)量約為900萬肤晓;
????????資源使用情況上升至100 executor-nums * 3 executor-core * 20G executor-memory爷贫,但還是卡在join的stage上认然,重復(fù)失敗多次后,整個job失斅选卷员;
實現(xiàn)方法
方法1
val tableArr = Array("table_a_name", "table_b_name", "table_c_name", "", "table_d_name", "table_e_name", "table_f_name", "table_g_name", "table_h_name", "table_i_name", "table_j_name")
val data_df = tableArr.map(table_name =>
spark.sql(
s"""
|SELECT a, b, c
|FROM ${table_name}
|""".stripMarigin)
).reduce(_.union(_))
val filter_table_tmp = spark.sql(
s"""
|SELECT a
|FROM ?
|""".stripMarigin)
val result = data_df.join(filter_table_tmp, Seq("a"), "inner")
????????采用上面這種代碼,通過查看作業(yè)的DAG圖腾务,發(fā)現(xiàn)程序?qū)⑶笆畟€表的數(shù)據(jù)union到一起后毕骡,再與表?進(jìn)行join;相當(dāng)于將400多億與900萬直接進(jìn)行join窑睁;
????????老辦法挺峡,1.將spark.sql.shuffle.partition的值調(diào)整到程序總executor-core的2~3倍;2.檢查join的字段在兩個表中的類型是否相同担钮;但是最終還是執(zhí)行失敵髟;
????????一直報org.apache.spark.shuffle.FetchFailedException
這類錯誤箫津;果然狭姨,還是shuffle的時候,數(shù)據(jù)量太大/task多/一個task執(zhí)行的數(shù)據(jù)太多的問題苏遥;
????????如果饼拍,不想采用本次標(biāo)題的方法解決問題的同學(xué),可以看看我之前的博客:Spark踩坑vlog——join時shuffle的大坑田炭,可能會幫助你解決問題师抄;
方法2[失敗]
????????最終目的還是想將數(shù)據(jù)打散、分開計算教硫,所以叨吮,就想把這幾個表一個一個進(jìn)行join,再將join的結(jié)果union到一起瞬矩,代碼:
val filter_table_tmp = spark.sql(
s"""
|SELECT a
|FROM ?
|""".stripMarigin).cache()
val table_a_df = spark.sql(
s"""
|SELECT a, b, c
|FROM table_a_name
|""".stripMarigin)
val table_a_result_df = table_a_df.join(filter_table_tmp, Seq("a"), "inner")
val table_b_df = spark.sql(
s"""
|SELECT a, b, c
|FROM table_b_name
|""".stripMarigin)
val table_b_result_df = table_b_df.join(filter_table_tmp, Seq("a"), "inner")
......
val result = table_a_result_df.union(table_b_result_df)......
????????本以為采用以上代碼茶鉴,可以把每個表的join分別計算,結(jié)果誰承想景用,通過查看DAG圖發(fā)現(xiàn)涵叮,Spark在執(zhí)行的時候,發(fā)現(xiàn)了我們的join都拿filter_table_tmp進(jìn)行的伞插,結(jié)果跟方法1一樣割粮,先把所有數(shù)據(jù)合并到一起,然后再進(jìn)行join媚污;
????????因為無知穆刻,方法2失敗。杠步。氢伟。榜轿。。朵锣。
方法3
????????那么谬盐,最終如果想達(dá)到每個表分開join的目的,只能每次join用觸發(fā)為一個job诚些,為了方便之后的數(shù)據(jù)校驗飞傀,我選擇直接將中間數(shù)據(jù)寫入外部存儲。不太懂Spark的job诬烹、task啥劃分的同學(xué)砸烦,可以看看我之前的博客Spark的組件們[Application、Job绞吁、Stage幢痘、TaskSet、Task]家破。
val filter_table_tmp = spark.sql(
s"""
|SELECT a
|FROM ?
|""".stripMarigin).cache()
val table_a_df = spark.sql(
s"""
|SELECT a, b, c
|FROM table_a_name
|""".stripMarigin)
table_a_df.join(filter_table_tmp, Seq("a"), "inner").createOrReplaceTempView("table_a_result_tmp")
spark.sql(
s"""
|INSERT OVERWRITE TABLE temporary_storage_table_name PARTITION(event = "a")
|SELECT *
|FROM table_a_result_tmp
|""".stripMarigin)
......
val result = spark.sql(
s"""
|SELECT *
|FROM temporary_storage_table_name
|""".stripMarigin)
????????最后颜说,一個表join生成一個job,除了表A執(zhí)行時數(shù)據(jù)大汰聋,耗時比較久一點外门粪,數(shù)據(jù)可算是跑出來了。
????????因為程序按照job順序執(zhí)行烹困,所以在每次對一個表進(jìn)行join時玄妈,程序所有的資源都會用來執(zhí)行一個表的數(shù)據(jù),降低了數(shù)據(jù)量髓梅,完成了"少量多次"的理念拟蜻。缺點就是,job太多女淑,采用了外部存儲,增加了IO寫入讀取時間 & 增加了job調(diào)度時間辜御;但是相比于同資源下數(shù)據(jù)跑不出來鸭你,這些延遲都是可以接受的。
????????此坑完結(jié)擒权,撒花??~