本人在一個(gè)偽實(shí)時(shí)項(xiàng)目中用到了SparkStreaming技術(shù)杯道,雖然解決了垛膝,不過網(wǎng)上查閱相關(guān)資料锤灿,以下文章進(jìn)行了更好的解讀赘艳,特此引用酌毡!本人用的?SparkStreaming外部來處理。
轉(zhuǎn)自:https://cloud.tencent.com/developer/article/1150845
使用sparkstreaming時(shí)蕾管,如果實(shí)時(shí)計(jì)算結(jié)果要寫入到HDFS枷踏,那么不可避免的會(huì)遇到一個(gè)問題,那就是在默認(rèn)情況下會(huì)產(chǎn)生非常多的小文件掰曾,這是由sparkstreaming的微批處理模式和DStream(RDD)的分布式(partition)特性導(dǎo)致的旭蠕,sparkstreaming為每個(gè)partition啟動(dòng)一個(gè)獨(dú)立的線程來處理數(shù)據(jù),一旦文件輸出到HDFS旷坦,那么這個(gè)文件流就關(guān)閉了掏熬,再來一個(gè)batch的parttition任務(wù),就再使用一個(gè)新的文件流秒梅,那么假設(shè)旗芬,一個(gè)batch為10s,每個(gè)輸出的DStream有32個(gè)partition捆蜀,那么一個(gè)小時(shí)產(chǎn)生的文件數(shù)將會(huì)達(dá)到(3600/10)*32=11520個(gè)之多疮丛。眾多小文件帶來的結(jié)果是有大量的文件元信息,比如文件的location辆它、文件大小这刷、block number等需要NameNode來維護(hù),NameNode會(huì)因此鴨梨山大娩井。不管是什么格式的文件,parquet似袁、text,洞辣、JSON或者 Avro,都會(huì)遇到這種小文件問題昙衅,這里討論幾種處理Sparkstreaming小文件的典型方法扬霜。
增加batch大小
這種方法很容易理解,batch越大而涉,從外部接收的event就越多著瓶,內(nèi)存積累的數(shù)據(jù)也就越多,那么輸出的文件數(shù)也就回變少啼县,比如上邊的時(shí)間從10s增加為100s材原,那么一個(gè)小時(shí)的文件數(shù)量就會(huì)減少到1152個(gè)沸久。但別高興太早,實(shí)時(shí)業(yè)務(wù)能等那么久嗎余蟹,本來人家10s看到結(jié)果更新一次卷胯,現(xiàn)在要等快兩分鐘,是人都會(huì)罵娘威酒。所以這種方法適用的場(chǎng)景是消息實(shí)時(shí)到達(dá)窑睁,但不想擠壓在一起處理,因?yàn)閿D壓在一起處理的話葵孤,批處理任務(wù)在干等担钮,這時(shí)就可以采用這種方法(是不是很像spark內(nèi)部的pipeline模式,但是要注意區(qū)別哦)尤仍。
Coalesce大法好箫津?
文章開頭講了,小文件的基數(shù)是:batch_number*partition_number吓著,而第一種方法是減少batch_number鲤嫡,那么這種方法就是減少partition_number了,這個(gè)api不細(xì)說绑莺,就是減少初始的分區(qū)個(gè)數(shù)暖眼。看過spark源碼的童鞋都知道纺裁,對(duì)于窄依賴诫肠,一個(gè)子RDD的partition規(guī)則繼承父RDD,對(duì)于寬依賴(就是那些個(gè)叉叉叉ByKey操作)欺缘,如果沒有特殊指定分區(qū)個(gè)數(shù)栋豫,也繼承自父rdd。那么初始的SourceDstream是幾個(gè)partiion谚殊,最終的輸出就是幾個(gè)partition丧鸯。所以Coalesce大法的好處就是,可以在最終要輸出的時(shí)候嫩絮,來減少一把partition個(gè)數(shù)丛肢。但是這個(gè)方法的缺點(diǎn)也很明顯,本來是32個(gè)線程在寫256M數(shù)據(jù)剿干,現(xiàn)在可能變成了4個(gè)線程在寫256M數(shù)據(jù)蜂怎,而沒有寫完成這256M數(shù)據(jù),這個(gè)batch是不算做結(jié)束的置尔。那么一個(gè)batch的處理時(shí)延必定增長(zhǎng)杠步,batch擠壓會(huì)逐漸增大。這種方法也要慎用,切雞切雞坝募摺朵锣!
SparkStreaming外部來處理
我們既然把數(shù)據(jù)輸出到hdfs,那么說明肯定是要用hive或者sparksql這樣的“sql on hadoop”系統(tǒng)類進(jìn)一步進(jìn)行數(shù)據(jù)分析试躏,而這些表一般都是按照半小時(shí)或者一小時(shí)猪勇、一天,這樣來分區(qū)的(注意不要和sparkStreaming的分區(qū)混淆颠蕴,這里的分區(qū)泣刹,是用來做分區(qū)裁剪優(yōu)化的),那么我們可以考慮在SparkStreaming外再啟動(dòng)定時(shí)的批處理任務(wù)來合并SparkStreaming產(chǎn)生的小文件犀被。這種方法不是很直接椅您,但是卻比較有用,“性價(jià)比”較高寡键,唯一要注意的是掀泳,批處理的合并任務(wù)在時(shí)間切割上要把握好,搞不好就可能回去合并一個(gè)還在寫入的SparkStreaming小文件西轩。
自己調(diào)用foreach去append
SparkStreaming提供的foreach這個(gè)outout類api员舵,可以讓我們自定義輸出計(jì)算結(jié)果的方法。那么我們其實(shí)也可以利用這個(gè)特性藕畔,那就是每個(gè)batch在要寫文件時(shí)马僻,并不是去生成一個(gè)新的文件流,而是把之前的文件打開注服【碌耍考慮這種方法的可行性,首先溶弟,HDFS上的文件不支持修改女淑,但是很多都支持追加,那么每個(gè)batch的每個(gè)partition就對(duì)應(yīng)一個(gè)輸出文件辜御,每次都去追加這個(gè)partition對(duì)應(yīng)的輸出文件鸭你,這樣也可以實(shí)現(xiàn)減少文件數(shù)量的目的。這種方法要注意的就是不能無限制的追加擒权,當(dāng)判斷一個(gè)文件已經(jīng)達(dá)到某一個(gè)閾值時(shí)苇本,就要產(chǎn)生一個(gè)新的文件進(jìn)行追加了。