大數(shù)據(jù)學(xué)習(xí)day_6

思考問題

Mapper類

Mapper類

org.apache.hadoop.mapreduce.Mapper<KEYIN犀变、VALUEIN惠桃、KEYOUT弟灼、VALUEOUT>

四個泛型,分別是KEYIN嗽上、VALUEIN办绝、KEYOUT伊约、VALUEOUT,
前面兩個KEYIN、VALUEIN 指的是map 函數(shù)輸入的參數(shù)key孕蝉、value 的類型屡律;
后面兩個KEYOUT、VALUEOUT 指的是map 函數(shù)輸出的key降淮、value 的類型超埋。

Mapper有setup(),map(),cleanup()和run()四個方法霍殴。

其中setup()一般是用來進(jìn)行一些map()前的準(zhǔn)備工作媒惕,
map()則一般承擔(dān)主要的處理工作,
cleanup()則是收尾工作如關(guān)閉文件或者執(zhí)行map()后的K-V分發(fā)等来庭。run()方法提供了setup->map->cleanup()的執(zhí)行模板妒蔚。

在MapReduce中,Mapper從一個輸入分片中讀取數(shù)據(jù)月弛,然后經(jīng)過Shuffle and Sort階段肴盏,分發(fā)數(shù)據(jù)給Reducer,在Map端和Reduce端我們可能使用設(shè)置的Combiner進(jìn)行合并帽衙,這在Reduce前進(jìn)行菜皂。Partitioner控制每個K-V(鍵值)對應(yīng)該被分發(fā)到哪個reducer(我們的Job可能有多個reducer),Hadoop默認(rèn)使用HashPartitioner佛寿,HashPartitioner使用key的hashCode對reducer的數(shù)量取模得來幌墓。

run()方法

public void run(Context context) throws IOException, InterruptedException {  
  setup(context);  
  while (context.nextKeyValue()) {  
    map(context.getCurrentKey(), context.getCurrentValue(), context);  
  }  
  cleanup(context);  
}  

可以得出,K/V對是從傳入的Context(上下文)獲取的冀泻。

map()方法

@SuppressWarnings("unchecked")  
protected void map(KEYIN key, VALUEIN value,   
                   Context context) throws IOException, InterruptedException {  
  context.write((KEYOUT) key, (VALUEOUT) value);  
}  

也看得出輸出結(jié)果K/V對也是通過Context來完成的
作為map方法輸入的鍵值對常侣,其value值存儲的是文本文件中的一行(以回車符為行結(jié)束標(biāo)記),而key值為該行的首字母相對于文本文件的首地址的偏移量弹渔。將<K1,V1>作為map方法的結(jié)果輸出胳施,其余的工作都交有 MapReduce框架 處理。
這里輸入?yún)?shù)key肢专、value 的類型就是KEYIN舞肆、VALUEIN,每一個鍵值對都會調(diào)用一次map 函數(shù)博杖。在這里,map 函數(shù)沒有處理輸入的key椿胯、value,直接通過context.write(…)方法輸出了剃根,輸出的key哩盲、value 的類型就是KEYOUT、VALUEOUT狈醉。這是默認(rèn)實現(xiàn)廉油,通常是需要我們根據(jù)業(yè)務(wù)邏輯覆蓋的。

當(dāng)調(diào)用到map時苗傅,通常會先執(zhí)行一個setup函數(shù)抒线,最后會執(zhí)行一個cleanup函數(shù)。而默認(rèn)情況下渣慕,這兩個函數(shù)的內(nèi)容都是nothing嘶炭。因此抱慌,當(dāng)map方法不符合應(yīng)用要求時,可以試著通過增加setup和cleanup的內(nèi)容來滿足應(yīng)用的需求旱物。

Reducer類

Reducer類

org.apache.hadoop.mapreduce.Reducer<KEYIN遥缕、VALUEIN、KEYOUT宵呛、VALUEOUT>

四個泛型,分別是KEYIN单匣、VALUEIN、KEYOUT宝穗、VALUEOUT,
前面兩個KEYIN户秤、VALUEIN 指的是map 函數(shù)輸出的參數(shù),即reduce 函數(shù)輸入的key逮矛、value 的類型鸡号;
后面兩個KEYOUT、VALUEOUT 指的是reduce 函數(shù)輸出的key须鼎、value 的類型鲸伴。

Reducer有3個主要的函數(shù),分別是:setup()晋控,clearup()汞窗,reduce(),run()赡译。

reducer()

@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, 
Context context ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

run()

 @SuppressWarnings("unchecked")
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
      // If a back up store is used, reset it
      ((ReduceContext.ValueIterator)
          (context.getValues().iterator())).resetBackupStore();
    }
    cleanup(context);
  }
}

當(dāng)調(diào)用到reduce時仲吏,通常會先執(zhí)行一個setup函數(shù),最后會執(zhí)行一個cleanup函數(shù)蝌焚。而默認(rèn)情況下裹唆,這兩個函數(shù)的內(nèi)容都是nothing。因此只洒,當(dāng)reduce不符合應(yīng)用要求時许帐,可以試著通過增加setup和cleanup的內(nèi)容來滿足應(yīng)用的需求。

InputFormat類

平時我們寫MapReduce程序的時候毕谴,在設(shè)置輸入格式的時候成畦,總會調(diào)用形如job.setInputFormatClass(KeyValueTextInputFormat.class);來保證輸入文件按照我們想要的格式被讀取。

所有的輸入格式都繼承于InputFormat析珊,這是一個抽象類,其子類有專門用于讀取普通文件的FileInputFormat蔑穴,用來讀取數(shù)據(jù)庫的DBInputFormat等等忠寻。
其實,一個輸入格式InputFormat存和,主要無非就是要解決如何將數(shù)據(jù)分割成分片(比如多少行為一個分片)奕剃,以及如何讀取分片中的數(shù)據(jù)(比如按行讀戎月谩)。前者由getSplits()完成纵朋,后者由RecordReader完成柿顶。這些方法的實現(xiàn)都在子類中。

1 public abstract class InputFormat<K, V> {
2     
3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
4 
5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,  InterruptedException;
6 
7 }
InputFormat類圖

類InputFomat 是負(fù)責(zé)把HDFS 中的文件經(jīng)過一系列處理變成map 函數(shù)的輸入部分的操软。這個類做了三件事情:
  第一嘁锯, 驗證輸入信息的合法性,包括輸入路徑是否存在等聂薪;
  第二家乘,把HDFS 中的文件按照一定規(guī)則拆分成InputSplit,每個InputSplit 由一個Mapper執(zhí)行藏澳;
  第三仁锯,提供RecordReader,把InputSplit 中的每一行解析出來供map 函數(shù)處理翔悠;

MapReduce應(yīng)用開發(fā)人員并不需要直接處理InputSplit业崖,因為它是由InputFormat創(chuàng)建。InputFormat負(fù)責(zé)產(chǎn)生輸入分片并將它們分隔成記錄蓄愁。

InputSplit

我們知道Mappers的輸入是一個一個的輸入分片双炕,稱InputSplit。InputSplit是一個抽象類涝登,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對雄家。

getLength()用來獲取InputSplit的大小,以支持對InputSplits進(jìn)行排序胀滚,而getLocations()則用來獲取存儲分片的位置列表趟济。

public abstract class InputSplit {  
  public abstract long getLength() throws IOException, InterruptedException;  
  
  public abstract   
    String[] getLocations() throws IOException, InterruptedException;  
}  

InputSplit是hadoop定義的用來傳送給每個單獨的map的數(shù)據(jù),InputSplit存儲的并非數(shù)據(jù)本身咽笼,而是一個分片長度和一個記錄數(shù)據(jù)位置的數(shù)組顷编。生成InputSplit的方法可以通過InputFormat()來設(shè)置。
當(dāng)數(shù)據(jù)傳送給map時剑刑,map會將輸入分片傳送到InputFormat媳纬,InputFormat則調(diào)用方法getRecordReader()生成RecordReader,RecordReader再通過creatKey()施掏、creatValue()方法創(chuàng)建可供map處理的一個一個的<key,value>對钮惠。
簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的七芭。

FileinputFormat類

FileinputFormat類是所有使用文件作為其數(shù)據(jù)源的InputFormat實現(xiàn)的基類素挽。
它提供了兩個功能:

  • 定義哪些文件包含在一個作業(yè)的輸出中
  • 輸入文件生成分片的實現(xiàn)。

并把分片分隔成記錄的作業(yè)由其子類來完成狸驳。

FileinputFormat類的輸入路徑

作業(yè)的輸入被設(shè)定為一組路徑预明,這對限定作業(yè)輸入提供了很大的靈活性缩赛。

<small>一條路徑可以表示一個文件、一個目錄或是一個glob撰糠,即一個文件和目錄的集合酥馍。值得注意的是,被值得為輸入的路徑的目錄中的內(nèi)容不會被遞歸進(jìn)行處理阅酪!</small>

如果需要排除特定文件可以使用FileInPutFormat的SetInputPathFilter()方法設(shè)置一個過濾器:


FileInPutFormat類的輸入分片

給定一組文件旨袒,F(xiàn)ileInPutFormat是如何把它們轉(zhuǎn)換為輸入分片的?
FileInPutFormat只分割大文件遮斥。這里的大是值超過HDFS塊的大小峦失。而分片通常與HDFS塊大小一樣,也可以設(shè)置不同的Hadoop屬性來改變术吗。


下面是該類對getSplits 方法的實現(xiàn)
利用FileInputFormat 的getSplits方法尉辑,我們就計算出了我們的作業(yè)的所有輸入分片了
注意:每一個輸入分片啟動一個Mapper 任務(wù)。

public List<InputSplit> getSplits(JobContext job
 2                                     ) throws IOException {
 3     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 4     long maxSize = getMaxSplitSize(job);
 5 
 6     // generate splits
 7     List<InputSplit> splits = new ArrayList<InputSplit>();
 8     List<FileStatus>files = listStatus(job);
 9     for (FileStatus file: files) {
10       Path path = file.getPath();
11       FileSystem fs = path.getFileSystem(job.getConfiguration());
12       long length = file.getLen();
13       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
14       if ((length != 0) && isSplitable(job, path)) { 
15         long blockSize = file.getBlockSize();
16         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
17 
18         long bytesRemaining = length;
19         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
20           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
21           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
22                                    blkLocations[blkIndex].getHosts()));
23           bytesRemaining -= splitSize;
24         }
25         
26         if (bytesRemaining != 0) {
27           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
28                      blkLocations[blkLocations.length-1].getHosts()));
29         }
30       } else if (length != 0) {
31         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
32       } else { 
33         //Create empty hosts array for zero length files
34         splits.add(new FileSplit(path, 0, length, new String[0]));
35       }
36     }
37     
38     // Save the number of input files in the job-conf
39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40 
41     LOG.debug("Total # of splits: " + splits.size());
42     return splits;
43   }

那這些計算出來的分片是怎么被map讀取出來的呢较屿?就是InputFormat中的另一個方法createRecordReader(),FileInputFormat并沒有對這個方法做具體的要求隧魄,而是交給子類自行去實現(xiàn)它。

  • RecordReader:
    RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類隘蝎,我們可以將其看作是在InputSplit上的迭代器购啄。我們從類圖中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法嘱么,由它獲取分片上的下一個K-V 對狮含。
  • 我們再深入看看上面提到的RecordReader的一個子類: Lin eRecordReader。
    LineRecordReader由一個FileSplit構(gòu)造出來曼振,start是這個FileSplit的起始位置几迄,pos是當(dāng)前讀取分片的位置,end是分片結(jié)束位置冰评,in是打開的一個讀取這個分片的輸入流映胁,它是使用這個FileSplit對應(yīng)的文件名來打開的。key和value則分別是每次讀取的K-V對甲雅。然后我們還看到可以利用getProgress()來跟蹤讀取分片的進(jìn)度解孙,這個函數(shù)就是根據(jù)已經(jīng)讀取的K-V對占總K-V對的比例來顯示進(jìn)度的。

其他輸入類

  • CombineFileInputFormat類 能夠很好的處理小文件
    WholeFileInputFormat類 使用RecordReader將整個文件讀為一條記錄抛人。

  • TestInputFormat類 Hadoop默認(rèn)的輸入方法弛姜,每條記錄是一行輸入。
    鍵是LongWritable類型妖枚,存儲該行在整個文件中的字節(jié)偏移量廷臼。
    值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),是Text類型的中剩。

SequenceFileInputFormat類 順序文件格式存儲二進(jìn)制的鍵值對的序列作為MapReduce的輸入時使用。

MultipleInputs類 能妥善處理多種格式輸入問題抒寂。

DBInputFormat 這種輸入格式用于使用JDBC從關(guān)系數(shù)據(jù)庫中讀取數(shù)據(jù)结啼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市屈芜,隨后出現(xiàn)的幾起案子郊愧,更是在濱河造成了極大的恐慌,老刑警劉巖井佑,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件属铁,死亡現(xiàn)場離奇詭異,居然都是意外死亡躬翁,警方通過查閱死者的電腦和手機(jī)焦蘑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盒发,“玉大人例嘱,你說我怎么就攤上這事∧ⅲ” “怎么了拼卵?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蛮艰。 經(jīng)常有香客問我腋腮,道長,這世上最難降的妖魔是什么壤蚜? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任即寡,我火速辦了婚禮,結(jié)果婚禮上仍律,老公的妹妹穿的比我還像新娘嘿悬。我一直安慰自己,他們只是感情好水泉,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布善涨。 她就那樣靜靜地躺著,像睡著了一般草则。 火紅的嫁衣襯著肌膚如雪钢拧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天炕横,我揣著相機(jī)與錄音源内,去河邊找鬼。 笑死份殿,一個胖子當(dāng)著我的面吹牛膜钓,可吹牛的內(nèi)容都是我干的嗽交。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼颂斜,長吁一口氣:“原來是場噩夢啊……” “哼夫壁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起沃疮,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤盒让,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后司蔬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體邑茄,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年俊啼,在試婚紗的時候發(fā)現(xiàn)自己被綠了肺缕。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡授帕,死狀恐怖搓谆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情豪墅,我是刑警寧澤泉手,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站偶器,受9級特大地震影響斩萌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜屏轰,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一颊郎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霎苗,春花似錦姆吭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至厘擂,卻和暖如春昆淡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背刽严。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工昂灵, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓眨补,卻偏偏與公主長得像管削,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子撑螺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容