https://blog.csdn.net/u013332124/article/details/92001346
一讲仰、Spark任務(wù)輸出文件的總過程
當(dāng)一個(gè)Job開始執(zhí)行后免姿,輸出文件的相關(guān)過程大概如下:
1、Job啟動(dòng)時(shí)創(chuàng)建一個(gè)目錄: ${output.dir}/_temporary/${appAttemptId} 作為本次運(yùn)行的輸出臨時(shí)目錄
2妒蛇、當(dāng)有task開始運(yùn)行后,會(huì)創(chuàng)建 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId}/${fileName} 文件,后面這個(gè)task的所有輸出都會(huì)被寫到這個(gè)文件中
3狂芋、當(dāng)task運(yùn)行完后,需要檢查是否要commit憨栽,如果需要commit帜矾,會(huì)調(diào)用OutputCommitter#commitTask()方法。commit的細(xì)節(jié)后面說
4屑柔、等整個(gè)Job執(zhí)行完就調(diào)用OutputCommitter#commitJob()方法屡萤。具體的過程也在下面介紹commit時(shí)說。
output.dir表示用戶指定的輸出目錄掸宛,appAttemptId表示任務(wù)的attemptId死陆,一般從1開始一直遞增。taskAttemptId表示task的attemptId唧瘾,比如taskId是0措译,第一次運(yùn)行,這個(gè)id就是0.0饰序。
OutputCommitter 只是一個(gè)抽象類领虹,spark運(yùn)行時(shí)會(huì)從配置中獲取指定的實(shí)現(xiàn)類,如果配置中沒指定菌羽,spark默認(rèn)會(huì)使用 org.apache.hadoop.mapred.FileOutputCommitter 的實(shí)現(xiàn)掠械。
二、Commit細(xì)節(jié)分析
1注祖、commitTask 介紹
1.1猾蒂、判斷是否需要commit
當(dāng)task執(zhí)行完后,會(huì)去檢查以下狀態(tài)是晨,如果下面的條件達(dá)成肚菠,就不會(huì)執(zhí)行commit
- ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttempt} 目錄不存在 (說明這個(gè)task的臨時(shí)輸出目錄不存在,明顯是有問題的)
- 如果開啟了Output commit coordination罩缴,就需要通過rpc詢問Driver是否可以commit (根據(jù)spark.hadoop.outputCommitCoordination.enabled參數(shù)蚊逢,默認(rèn)為true.如果開啟了推測(cè)執(zhí)行,這個(gè)一定要設(shè)置為true)
- Driver的CommitCoordinator判斷task運(yùn)行失敗 (task運(yùn)行失敗就沒必要commit了)
- Driver的CommitCoordinator判斷該task的其他attempt已經(jīng)commit過了 (如果commit的taskAttemptId和當(dāng)前一樣箫章,那么可以再次commit烙荷,說明task commit是一個(gè)冪等的操作)
1.2、task的commit細(xì)節(jié)
因?yàn)槲覀兇蟛糠智闆r下用的都是FileOutputCommitter檬寂,所以下面主要介紹一下這個(gè)類的commitTask實(shí)現(xiàn)终抽。
FileOutputCommitter的實(shí)際commitTask細(xì)節(jié)和參數(shù) mapreduce.fileoutputcommitter.algorithm.version 有關(guān)(默認(rèn)值是1)。
當(dāng)mapreduce.fileoutputcommitter.algorithm.version=1時(shí):
commit的操作是將 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 重命名為 ${output.dir}/_temporary/${appAttemptId}/${taskId}
當(dāng)mapreduce.fileoutputcommitter.algorithm.version=2時(shí):
commit的操作是將 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 下的文件移動(dòng)到 ${output.dir} 目錄下 (也就是最終的輸出目錄)
spark任務(wù)可以通過設(shè)置spark配置 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2來開啟版本2的commit邏輯
在hadoop 2.7.0之前桶至,F(xiàn)ileOutputCommitter的實(shí)現(xiàn)沒有區(qū)分版本昼伴,統(tǒng)一都是使用version=1的commit邏輯。因此如果spark的hadoop依賴包版本如果低于2.7.0,設(shè)置mapreduce.fileoutputcommitter.algorithm.version=2是沒有用的
2镣屹、commitJob 介紹
Job執(zhí)行完后圃郊,會(huì)調(diào)用commitJob方法,我們還是看一下FileOutputCommitter的實(shí)現(xiàn):
commitJob的細(xì)節(jié)也和mapreduce.fileoutputcommitter.algorithm.version 參數(shù)有關(guān)(默認(rèn)值是1)
當(dāng)mapreduce.fileoutputcommitter.algorithm.version=1時(shí):
由 Driver 單線程遍歷所有 committedTaskPath女蜈,也就是${output.dir}/_temporary/${appAttemptId} 下的所有文件持舆,然后移動(dòng)到 ${output.dir} 目錄下。然后創(chuàng)建_SUCCESS表示任務(wù)結(jié)束
當(dāng)mapreduce.fileoutputcommitter.algorithm.version=2時(shí):
只需要?jiǎng)?chuàng)建_SUCCESS文件伪窖,因?yàn)檩敵鑫募趖ask執(zhí)行完后就已經(jīng)移動(dòng)到輸出目錄了
在commitJob完后吏廉,spark還會(huì)執(zhí)行cleanupJob將${output.dir}/_temporary 目錄刪除
三、V1和V2 commiter版本比較
mapreduce.fileoutputcommitter.algorithm.version 參數(shù)對(duì)文件輸出有很大的影響惰许,下面總結(jié)一下兩種版本在各方面的優(yōu)缺點(diǎn)席覆。
1、性能方面
v1在task結(jié)束后只是將輸出文件拷到臨時(shí)目錄汹买,然后在job結(jié)束后才由Driver把這些文件再拷到輸出目錄佩伤。如果文件數(shù)量很多,Driver就需要不斷的和NameNode做交互晦毙,而且這個(gè)過程是單線程的生巡,因此勢(shì)必會(huì)增加耗時(shí)。如果我們碰到有spark任務(wù)所有task結(jié)束了但是任務(wù)還沒結(jié)束见妒,很可能就是Driver還在不斷的拷文件孤荣。
v2在task結(jié)束后立馬將輸出文件拷貝到輸出目錄,后面Job結(jié)束后Driver就不用再去拷貝了。
因此盐股,在性能方面钱豁,v2完勝v1。
2疯汁、數(shù)據(jù)一致性方面
v1在Job結(jié)束后才批量拷文件牲尺,其實(shí)就是兩階段提交,它可以保證數(shù)據(jù)要么全部展示給用戶幌蚊,要么都沒展示(當(dāng)然谤碳,在拷貝過程中也無法保證完全的數(shù)據(jù)一致性,但是這個(gè)時(shí)間一般來說不會(huì)太長)溢豆。如果任務(wù)失敗蜒简,也可以直接刪了_temporary目錄,可以較好的保證數(shù)據(jù)一致性漩仙。
v2在task結(jié)束后就拷文件臭蚁,就會(huì)造成spark任務(wù)還未完成就讓用戶看到一部分輸出,這樣就完全沒辦法保證數(shù)據(jù)一致性了讯赏。另外垮兑,如果任務(wù)在輸出過程中失敗,就會(huì)有一部分?jǐn)?shù)據(jù)成功輸出漱挎,一部分沒輸出的情況系枪。
因此在數(shù)據(jù)一致性方面,v1完勝v2
3磕谅、總結(jié)
很明顯私爷,如果我們執(zhí)著于性能,不在乎數(shù)據(jù)輸出時(shí)的一致性膊夹,完全可以將mapreduce.fileoutputcommitter.algorithm.version設(shè)置為2來提高性能衬浑。
但是如果我們對(duì)輸出要求很高的數(shù)據(jù)一致性,那么最好不要為了性能將mapreduce.fileoutputcommitter.algorithm.version設(shè)置為2放刨。
參考資料
https://issues.apache.org/jira/browse/MAPREDUCE-4815