背景:
???????? 前些天筆者面臨這樣一個(gè)問(wèn)題猾编,在hdfs上有一個(gè)目錄存放著一些文件夷恍,定期要通過(guò)mr的api將這些文件轉(zhuǎn)換為HBase的HFile蚓峦。但是文件中可能會(huì)存在一些可以檢測(cè)出來(lái)的臟數(shù)據(jù)卵迂,現(xiàn)在希望能夠在生成HFile的同時(shí)察蹲,統(tǒng)計(jì)每次任務(wù)臟數(shù)據(jù)的比例铭若,超過(guò)一定閾值的時(shí)候就發(fā)告警。
現(xiàn)有框架的處理方法與問(wèn)題:
MR:
???????? 將生成HFile與統(tǒng)計(jì)臟數(shù)據(jù)視為兩個(gè)MR任務(wù)递览,分別計(jì)算叼屠,兩次提交。
Spark:
???????? 按照如下RDD血緣圖绞铃,cache RDD1镜雨,并進(jìn)行兩次計(jì)算分別得到RDD2與RDD3,然后在將兩個(gè)衍生出來(lái)的子RDD持久化到HDFS之類的存儲(chǔ)系統(tǒng)上儿捧。
通過(guò)MR處理荚坞,HDFS上的文件將被讀取兩次,雖然在Spark的計(jì)算模型中菲盾,可以通過(guò)cache方法颓影,將數(shù)據(jù)盡可能的放在內(nèi)存中,但是在轉(zhuǎn)化為RDD2與RDD3的過(guò)程中仍然會(huì)有兩次內(nèi)存IO(當(dāng)然很有可能因?yàn)閮?nèi)存存不下懒鉴,成為了磁盤(pán)IO)诡挂。哪種IO相對(duì)于CPU而言都慢了不止一個(gè)級(jí)別,因此能不能有一種方法像下面的圖這樣临谱,將兩種運(yùn)算在上游放在一起(在一個(gè)map中同時(shí)統(tǒng)計(jì)臟數(shù)據(jù)與生成HBase的Cell)璃俗,將不同的結(jié)果發(fā)送給不同的下游呢?
這樣一來(lái)悉默,兩次不同的下游計(jì)算(往往是Reduce或者子RDD)城豁,可以綁定同一個(gè)上游計(jì)算(往往是Map或者父RDD),而上游計(jì)算又只會(huì)有一次IO抄课。但是現(xiàn)有的計(jì)算框架唱星,好像都不支持一個(gè)上游運(yùn)算與多個(gè)下游運(yùn)算綁定雳旅。
?
自己想到的解決辦法:
下面以MR運(yùn)算框架為例,談?wù)勛约旱慕鉀Q辦法〖淞模現(xiàn)在的MR框架中岭辣,input dir、output dir與shuffle context是與一個(gè)job綁定的甸饱;我們可以將input dir與map綁定沦童,output dir 和shuffle context與reduce綁定.
???????? 將現(xiàn)有map端的api修改為如下形式:
修改前(對(duì)應(yīng)現(xiàn)在的Mapper類):
void map(KEYIN key, VALUEIN value, Context context);
void run(Context context);
void cleanup(Context context);
void setup(Context context)
修改后(不妨叫這個(gè)類為NewMapper):
void map(KEYIN key, VALUEIN value, List reduceContexts);
void run(Context context);
void cleanup(Context context);
void setup(Context context)
像筆者提到的問(wèn)題可以用如下偽代碼解決
public class CombinedMapper extendsNewMapper {
???????? privateint dirtyrows = 0;
???????? privateint totalrows = 0;
???????? privateList reduceContexts = null;
???????? void map(KEYIN key, VALUEIN value, List reduceContexts ) {
?????????????????? totalrows++;
?????????????????? if(dirtyrow(key)) {
??????????????????????????? dirtyrows++;
? ? ? ? ? ? ? ? ? ? ? ? ? ? //臟數(shù)據(jù)就直接過(guò)濾了
??????????????????????????? return;
????????????????????}
?????????????????? contextForHFile.write(*******);
????????}
? ? ? ? ?void setup(Context context) {
?????????????????? reduceContexts= context.getReduceContexts();
?????????????????? contextForHFile? = reduceContexts.getContextForHFile();
?????????????????? contextForCounter= reduceContexts.getContextForCounter();
? ? ? ? ?}
? ? ? ? ?void run(Context context) {
??? ???????? setup(context);
??? ???????? while (context.nextKeyValue()) {
?????????????????? map(context.getCurrentKey(),context.getCurrentValue(), reduceContexts);
??? ???????? }
? ? ? ? ? ? ?contextForCounter.write(****);
??? ???????? cleanup(context);
? ???? }
}
Reducer端的代碼無(wú)需任何改動(dòng),只是在初始化job的時(shí)候可能需要按照如下方法初始化job
job.addReducer(ReducerClass1.class).addReducer(ReducerClass2.class)叹话;
OutputFormat.setOutputdir(Reducer1.class,outputdir1);
OutputFormat.setOutputdir(Reducer2.class,outputdir2);
表示上游計(jì)算綁定多個(gè)下游計(jì)算偷遗。這樣一來(lái),可以在一次IO中完成兩種不同的運(yùn)算驼壶。
缺點(diǎn):
???????? 筆者設(shè)計(jì)的對(duì)現(xiàn)有計(jì)算框架的補(bǔ)充氏豌,雖然可以減少IO,比如現(xiàn)在的場(chǎng)景是要對(duì)一個(gè)很大的數(shù)據(jù)集用兩種完全不同的方法做分析热凹,肯定是大有裨益的泵喘。但是缺點(diǎn)也是很明顯的,那就是耦合度變大般妙,上游的一個(gè)子模塊失敗可能影響整體計(jì)算纪铺,比如上面生成HFile的任務(wù)如果導(dǎo)致Map程序不能跑通就會(huì)導(dǎo)致統(tǒng)計(jì)臟數(shù)據(jù)的任務(wù)也失敗。
可是耦合這種東西可能真的是“過(guò)猶不及”吧碟渺,一個(gè)零耦合的東西既沒(méi)有存在的必要也沒(méi)有存在的可能鲜锚。而且筆者的意思并不是修改現(xiàn)有的計(jì)算框架api,而是增加一種api來(lái)支持想減少IO的場(chǎng)景苫拍,以前的代碼是完全不用修改的芜繁。
后續(xù):
???????? 從MR的觀點(diǎn)來(lái)看,筆者做的補(bǔ)充可以說(shuō)是讓一個(gè)任務(wù)支持多種Reduce绒极,但是其實(shí)MR計(jì)算框架對(duì)多種Map的支持也不是很好骏令,比如我現(xiàn)在想同時(shí)處理TXT文件與parquet文件再生成HFile。但是spark可以通過(guò)像下圖的做法垄提,對(duì)不同的RDD做不同的transformation然后再將新的RDD做union來(lái)支持“一個(gè)任務(wù)榔袋,多種Map”。也許將來(lái)也可以通過(guò)類似的辦法讓一個(gè)MR任務(wù)支持多個(gè)Map輸入吧塔淤。