Spark InsertIntoHiveTable如何commit結(jié)果數(shù)據(jù)

在maintain我們的daily spark jobs時据途,發(fā)現(xiàn)有的時候一些spark jobs在insert數(shù)據(jù)到hive table時會在所有tasks完成后hang住很長一段時間后整個job才結(jié)束。經(jīng)過一些調(diào)查分析后,我們發(fā)現(xiàn)這段時間里,spark是在把.hive-staging_hive*/-ext-10000目錄里的文件一個一個地move到hive table的location目錄下,由于我們一些spark job生成的hive表的文件數(shù)據(jù)比較多(數(shù)萬個)。正常情況下,這也不是什么大問題霹抛,這個moving過程總共也就消耗幾分至十幾分鐘。但是卷谈,當(dāng)namenode的負(fù)載特別高時杯拐,這個moving過程可能持續(xù)一個或幾個小時,這就有點(diǎn)接受不了了世蔗。

問題

要解決問題端逼,先要搞清楚問題的來龍去脈, 本文的目的就是為了搞清楚以下問題:

1. spark job是怎么commit每個write task的凸郑?

2. spark job是怎么commit整個write job的裳食?

3.?.hive-staging_hive*目錄是在什么時候被move到hive table的location目錄下的?

為了搞清楚以上問題芙沥,我們查閱了spark源碼(版本為2.3.0)诲祸。

注意,本文的目的是講清楚在我們遇到的scenario下而昨,Spark InsertIntoHiveTable是如何commit結(jié)果數(shù)據(jù)的救氯,對于其他不同配置導(dǎo)致的不同scenario,請讀者自行閱讀源碼(文中會對我們的配置情況稍加說明)歌憨。

對于如何解決上面提到的hang住一個或幾個小時的問題着憨,最好的解決方案還是保證namenode的正常響應(yīng)速度,在正常情況情況下务嫡,以上問題影響不大甲抖。

至于如何通過修改commit過程使得最終數(shù)據(jù)文件的moving耗時更短漆改,讀者可以在了解了commit具體過程后加以思考。

committer的生成

InsertIntoHive的run方法會調(diào)用其processInsert方法進(jìn)行處理准谚,processInsert會做一些validation和準(zhǔn)備工作挫剑,然后會調(diào)用SaveAsHiveFile.saveAsHiveFile方法,saveAsHiveFile也會做一些準(zhǔn)備工作柱衔,然后會生成一個FileCommitProtocol類型的committer對象:

SaveAsHiveFile.saveAsHiveFile :生成committer對象

這里使用了反射生成commiter對象:

instantiate a FileCommitProtocol instance

這里的className就是前面SaveAsHiveFile.saveAsHiveFile中的sparkSession.sessionState.conf.fileCommitProtoclClass, 這個值由參數(shù)spark.sql.sources.commitProtocolClass控制樊破, 默認(rèn)為SQLHadoopMapReduceCommitProtocol :?

spark.sql.sources.commitProtocolClass

SQLHadoopMapReduceCommitProtocol繼承自HadoopMapReduceCommitProtocol, 只是重寫了其setupCommitter方法唆铐,setupCommitter方法的作用是生成HadoopMapReduceCommitProtocol內(nèi)部的committer對象哲戚,而這個對象是OutputCommitter類型,其commitTask和commitJob方法會被用于commit每個task的結(jié)果和整個job的結(jié)果艾岂。

注意顺少,這里其實是有兩層committer,一層是HadoopMapReduceCommitProtocol內(nèi)部的committer澳盐,另一層就是HadoopMapReduceCommitProtocol本身祈纯。在HadoopMapReduceCommitProtocol的commitTask和commitJob方法中都會直接或間接地調(diào)用其內(nèi)部commiter對象的對應(yīng)方法:

Call directly inner committer.commitJob in?HadoopMapReduceCommitProtocol.commitJob
Call indirectly inner committer.commitTask in?HadoopMapReduceCommitProtocol.commitTask

因為在我們的case中主要是依靠內(nèi)部committer來進(jìn)行結(jié)果數(shù)據(jù)文件的moving,所以本文主要關(guān)注內(nèi)部committer(也就是OutputCommitter)的commit行為叼耙,對于HadoopMapReduceCommitProtocol層的commit行為,讀者可閱讀HadoopMapReduceCommitProtocol的commitTask, commitJob, newTaskTempFile, newTaskTempFileAbsPath等方法粒没,可以結(jié)合FileFormatWriter中的DynamicPartitionWriteTask和SingleDirectoryWriteTask的newOutputWriter方法一起閱讀筛婉。

task的commit

前面講到,我們主要看OutputCommitter的commit行為癞松,OutputCommitter是一個抽象類爽撒,我們主要來看一下其子類org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter的實現(xiàn)。

FileOutputCommitter的commitTask方法將單個task attempt生成的結(jié)果數(shù)據(jù)文件move到指定的committedTaskPath? 或者 outputPath

FileOutputCommitter.commitTask

注意响蓉,commitTask這里的行為受變量algorithmVersion控制硕勿,而這個變量的值由參數(shù)mapreduce.fileoutputcommitter.algorithm.version控制,可選值為1和2. 在我們的scenario下枫甲,該值為2源武,所以最終調(diào)用了mergePaths方法把task attempt的輸出目錄中的數(shù)據(jù)文件都move到outputPath下面。

這里的outputPath就是對應(yīng)的.hive-staging_hive*下的目錄, 比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000.?而task attempt的輸出目錄是.hive-staging_hive*下的針對每個task attempt創(chuàng)建的臨時目錄想幻,比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000/_temporary/0/_temporary/attempt_20200503162201_0005_m_000065_0.

FileOutputCommitter.mergePaths的功能就是把源路徑(可以是目錄粱栖,也可以是文件)的所有文件都move到目標(biāo)路徑下,如果源和目標(biāo)有沖突脏毯,則以源覆蓋目標(biāo)闹究,可以看看這個函數(shù)的code,其實就是個遞歸實現(xiàn)食店。

至此渣淤,我們知道了赏寇,對于InsertIntoHiveTable的每個task,在它執(zhí)行完后都會把自己的結(jié)果文件從task attempt的臨時folder移到.hive-staging_hive*/-ext-10000中來价认。每個task都只負(fù)責(zé)move自己生成的數(shù)據(jù)文件蹋订,這個過程也是各個task并行進(jìn)行的。

job的commit

上面講述了單個task如何commit自己的數(shù)據(jù)文件刻伊,那么當(dāng)一個job的所有task都完成commit后露戒,這個job的commit又做了些什么呢?

FileOutputCommitter.commitJob

主要邏輯在commitJobInternal中實現(xiàn):

FileOutputCommitter.commitJobInternal

對于algorithmVersion為2的情況捶箱,因為在FileOutputCommitter.commitTask方法中已經(jīng)調(diào)用mergePaths將task生成的數(shù)據(jù)文件merge到了.hive-staging_hive_*/-ext-10000下面智什,所以在commitJob中,對于algorithmVersion為2的情況丁屎,只需要清理_temporary目錄并創(chuàng)建_SUCCESS的marker文件荠锭。

.hive-staging_hive目錄中文件的moving

如上文所述,InsertIntoHive的processInsert會調(diào)用SaveAsHiveFile.saveAsHiveFile進(jìn)行hive 文件的寫入晨川,寫入的文件最終都會commit到.hive-staging_hive*/-ext-10000目錄中证九,那么.hive-staging_hive*目錄又是怎么被move到hive table的location目錄下的呢?這個工作是在processInsert方法調(diào)用完SaveAsHiveFile.saveAsHiveFile之后共虑,再通過調(diào)用org.apache.hadoop.hive.ql.metadata.Hive的loadDynamicPartitions方法完成的愧怜。

總結(jié)

通過上述分析,我們知道了:

1. 對于spark的InsertIntoHiveTable妈拌,結(jié)果rdd的每個partition的數(shù)據(jù)都有相應(yīng)的task負(fù)責(zé)數(shù)據(jù)寫入拥坛,而每個task都會在目標(biāo)hive表的location目錄下的.hive-staging_hive*/-ext-10000目錄中創(chuàng)建相應(yīng)的臨時的staging目錄,當(dāng)前task的所有數(shù)據(jù)都會先寫入到這個staging目錄中尘分;

2. 當(dāng)單個task寫入完成后猜惋,會調(diào)用FileOutputCommitter.commitTask把task的staging目錄下的數(shù)據(jù)文件都move到.hive-staging_hive*/-ext-10000下面,這個過程就是單個task的commit

3. 當(dāng)一個spark job的所有task都執(zhí)行完成并commit成功后培愁,spark會調(diào)用FileOutputCommitter.commitJob把臨時的staging目錄都刪除掉著摔,并創(chuàng)建_SUCCESS標(biāo)記文件

4. 當(dāng)spark成功將數(shù)據(jù)都寫入到staging_hive*/-ext-10000中 (也就是commitJob成功后),spark會調(diào)用hive的相應(yīng)API把數(shù)據(jù)文件都move到目標(biāo)hive表的location目錄下定续,并更新hive meta data以enable新的hive partition

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谍咆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子香罐,更是在濱河造成了極大的恐慌卧波,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件庇茫,死亡現(xiàn)場離奇詭異港粱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門查坪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來寸宏,“玉大人,你說我怎么就攤上這事偿曙〉” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵望忆,是天一觀的道長罩阵。 經(jīng)常有香客問我,道長启摄,這世上最難降的妖魔是什么稿壁? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮歉备,結(jié)果婚禮上傅是,老公的妹妹穿的比我還像新娘。我一直安慰自己蕾羊,他們只是感情好喧笔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著龟再,像睡著了一般书闸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上吸申,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天梗劫,我揣著相機(jī)與錄音,去河邊找鬼截碴。 笑死,一個胖子當(dāng)著我的面吹牛蛉威,可吹牛的內(nèi)容都是我干的日丹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蚯嫌,長吁一口氣:“原來是場噩夢啊……” “哼哲虾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起择示,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤束凑,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后栅盲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體汪诉,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了扒寄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鱼鼓。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖该编,靈堂內(nèi)的尸體忽然破棺而出迄本,到底是詐尸還是另有隱情,我是刑警寧澤课竣,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布嘉赎,位于F島的核電站,受9級特大地震影響于樟,放射性物質(zhì)發(fā)生泄漏公条。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一隔披、第九天 我趴在偏房一處隱蔽的房頂上張望赃份。 院中可真熱鬧,春花似錦奢米、人聲如沸抓韩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谒拴。三九已至,卻和暖如春涉波,著一層夾襖步出監(jiān)牢的瞬間英上,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工啤覆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留苍日,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓窗声,卻偏偏與公主長得像相恃,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子笨觅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354