在maintain我們的daily spark jobs時据途,發(fā)現(xiàn)有的時候一些spark jobs在insert數(shù)據(jù)到hive table時會在所有tasks完成后hang住很長一段時間后整個job才結(jié)束。經(jīng)過一些調(diào)查分析后,我們發(fā)現(xiàn)這段時間里,spark是在把.hive-staging_hive*/-ext-10000目錄里的文件一個一個地move到hive table的location目錄下,由于我們一些spark job生成的hive表的文件數(shù)據(jù)比較多(數(shù)萬個)。正常情況下,這也不是什么大問題霹抛,這個moving過程總共也就消耗幾分至十幾分鐘。但是卷谈,當(dāng)namenode的負(fù)載特別高時杯拐,這個moving過程可能持續(xù)一個或幾個小時,這就有點(diǎn)接受不了了世蔗。
問題
要解決問題端逼,先要搞清楚問題的來龍去脈, 本文的目的就是為了搞清楚以下問題:
1. spark job是怎么commit每個write task的凸郑?
2. spark job是怎么commit整個write job的裳食?
3.?.hive-staging_hive*目錄是在什么時候被move到hive table的location目錄下的?
為了搞清楚以上問題芙沥,我們查閱了spark源碼(版本為2.3.0)诲祸。
注意,本文的目的是講清楚在我們遇到的scenario下而昨,Spark InsertIntoHiveTable是如何commit結(jié)果數(shù)據(jù)的救氯,對于其他不同配置導(dǎo)致的不同scenario,請讀者自行閱讀源碼(文中會對我們的配置情況稍加說明)歌憨。
對于如何解決上面提到的hang住一個或幾個小時的問題着憨,最好的解決方案還是保證namenode的正常響應(yīng)速度,在正常情況情況下务嫡,以上問題影響不大甲抖。
至于如何通過修改commit過程使得最終數(shù)據(jù)文件的moving耗時更短漆改,讀者可以在了解了commit具體過程后加以思考。
committer的生成
InsertIntoHive的run方法會調(diào)用其processInsert方法進(jìn)行處理准谚,processInsert會做一些validation和準(zhǔn)備工作挫剑,然后會調(diào)用SaveAsHiveFile.saveAsHiveFile方法,saveAsHiveFile也會做一些準(zhǔn)備工作柱衔,然后會生成一個FileCommitProtocol類型的committer對象:
這里使用了反射生成commiter對象:
這里的className就是前面SaveAsHiveFile.saveAsHiveFile中的sparkSession.sessionState.conf.fileCommitProtoclClass, 這個值由參數(shù)spark.sql.sources.commitProtocolClass控制樊破, 默認(rèn)為SQLHadoopMapReduceCommitProtocol :?
SQLHadoopMapReduceCommitProtocol繼承自HadoopMapReduceCommitProtocol, 只是重寫了其setupCommitter方法唆铐,setupCommitter方法的作用是生成HadoopMapReduceCommitProtocol內(nèi)部的committer對象哲戚,而這個對象是OutputCommitter類型,其commitTask和commitJob方法會被用于commit每個task的結(jié)果和整個job的結(jié)果艾岂。
注意顺少,這里其實是有兩層committer,一層是HadoopMapReduceCommitProtocol內(nèi)部的committer澳盐,另一層就是HadoopMapReduceCommitProtocol本身祈纯。在HadoopMapReduceCommitProtocol的commitTask和commitJob方法中都會直接或間接地調(diào)用其內(nèi)部commiter對象的對應(yīng)方法:
因為在我們的case中主要是依靠內(nèi)部committer來進(jìn)行結(jié)果數(shù)據(jù)文件的moving,所以本文主要關(guān)注內(nèi)部committer(也就是OutputCommitter)的commit行為叼耙,對于HadoopMapReduceCommitProtocol層的commit行為,讀者可閱讀HadoopMapReduceCommitProtocol的commitTask, commitJob, newTaskTempFile, newTaskTempFileAbsPath等方法粒没,可以結(jié)合FileFormatWriter中的DynamicPartitionWriteTask和SingleDirectoryWriteTask的newOutputWriter方法一起閱讀筛婉。
task的commit
前面講到,我們主要看OutputCommitter的commit行為癞松,OutputCommitter是一個抽象類爽撒,我們主要來看一下其子類org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter的實現(xiàn)。
FileOutputCommitter的commitTask方法將單個task attempt生成的結(jié)果數(shù)據(jù)文件move到指定的committedTaskPath? 或者 outputPath:
注意响蓉,commitTask這里的行為受變量algorithmVersion控制硕勿,而這個變量的值由參數(shù)mapreduce.fileoutputcommitter.algorithm.version控制,可選值為1和2. 在我們的scenario下枫甲,該值為2源武,所以最終調(diào)用了mergePaths方法把task attempt的輸出目錄中的數(shù)據(jù)文件都move到outputPath下面。
這里的outputPath就是對應(yīng)的.hive-staging_hive*下的目錄, 比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000.?而task attempt的輸出目錄是.hive-staging_hive*下的針對每個task attempt創(chuàng)建的臨時目錄想幻,比如:/path/to/table/location/.hive-staging_hive_2020-05-03_16-14-42_568_1802626970789985228-1/-ext-10000/_temporary/0/_temporary/attempt_20200503162201_0005_m_000065_0.
FileOutputCommitter.mergePaths的功能就是把源路徑(可以是目錄粱栖,也可以是文件)的所有文件都move到目標(biāo)路徑下,如果源和目標(biāo)有沖突脏毯,則以源覆蓋目標(biāo)闹究,可以看看這個函數(shù)的code,其實就是個遞歸實現(xiàn)食店。
至此渣淤,我們知道了赏寇,對于InsertIntoHiveTable的每個task,在它執(zhí)行完后都會把自己的結(jié)果文件從task attempt的臨時folder移到.hive-staging_hive*/-ext-10000中來价认。每個task都只負(fù)責(zé)move自己生成的數(shù)據(jù)文件蹋订,這個過程也是各個task并行進(jìn)行的。
job的commit
上面講述了單個task如何commit自己的數(shù)據(jù)文件刻伊,那么當(dāng)一個job的所有task都完成commit后露戒,這個job的commit又做了些什么呢?
主要邏輯在commitJobInternal中實現(xiàn):
對于algorithmVersion為2的情況捶箱,因為在FileOutputCommitter.commitTask方法中已經(jīng)調(diào)用mergePaths將task生成的數(shù)據(jù)文件merge到了.hive-staging_hive_*/-ext-10000下面智什,所以在commitJob中,對于algorithmVersion為2的情況丁屎,只需要清理_temporary目錄并創(chuàng)建_SUCCESS的marker文件荠锭。
.hive-staging_hive目錄中文件的moving
如上文所述,InsertIntoHive的processInsert會調(diào)用SaveAsHiveFile.saveAsHiveFile進(jìn)行hive 文件的寫入晨川,寫入的文件最終都會commit到.hive-staging_hive*/-ext-10000目錄中证九,那么.hive-staging_hive*目錄又是怎么被move到hive table的location目錄下的呢?這個工作是在processInsert方法調(diào)用完SaveAsHiveFile.saveAsHiveFile之后共虑,再通過調(diào)用org.apache.hadoop.hive.ql.metadata.Hive的loadDynamicPartitions方法完成的愧怜。
總結(jié)
通過上述分析,我們知道了:
1. 對于spark的InsertIntoHiveTable妈拌,結(jié)果rdd的每個partition的數(shù)據(jù)都有相應(yīng)的task負(fù)責(zé)數(shù)據(jù)寫入拥坛,而每個task都會在目標(biāo)hive表的location目錄下的.hive-staging_hive*/-ext-10000目錄中創(chuàng)建相應(yīng)的臨時的staging目錄,當(dāng)前task的所有數(shù)據(jù)都會先寫入到這個staging目錄中尘分;
2. 當(dāng)單個task寫入完成后猜惋,會調(diào)用FileOutputCommitter.commitTask把task的staging目錄下的數(shù)據(jù)文件都move到.hive-staging_hive*/-ext-10000下面,這個過程就是單個task的commit
3. 當(dāng)一個spark job的所有task都執(zhí)行完成并commit成功后培愁,spark會調(diào)用FileOutputCommitter.commitJob把臨時的staging目錄都刪除掉著摔,并創(chuàng)建_SUCCESS標(biāo)記文件
4. 當(dāng)spark成功將數(shù)據(jù)都寫入到staging_hive*/-ext-10000中 (也就是commitJob成功后),spark會調(diào)用hive的相應(yīng)API把數(shù)據(jù)文件都move到目標(biāo)hive表的location目錄下定续,并更新hive meta data以enable新的hive partition