歡迎關(guān)注公眾號“Tim在路上”
MapReduce是我們再進行離線大數(shù)據(jù)處理的時候經(jīng)常要使用的計算模型跑杭,MapReduce的計算過程被封裝的很好,我們只用使用Map和Reduce函數(shù)澳泵,所以對其整體的計算過程不是太清楚平绩,同時MapReduce1.0和MapReduce2.0在網(wǎng)上有很多人混淆庄撮。
MapReduce1.0運行模型
Input
Input但是輸入文件的存儲位置谆棺,
但是注意這里并一定是一些博客說的當然是HDFS似的分布式文件系統(tǒng)位置嚼贡,默認是HDFS文件系統(tǒng),當然也可以修改似踱。
,它也可以是本機上的文件位置。
我們來仔細分析下input
首先我們知道要和JobTracker打交道是離不開JobClient這個接口的撕蔼,就如上圖所示,
然后JobClient中的Run方法 會讓 JobClient 把所有 Hadoop Job 的信息秽誊,比如 mapper reducer jar path, mapper / reducer class name, 輸入文件的路徑等等鲸沮,告訴給 JobTracker,如下面的代碼所示:
public int run(String[] args) throws Exception {
//create job
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
// set run jar class
job.setJarByClass(this.getClass());
// set input . output
FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1")));
FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2")));
// set map
job.setMapperClass(HFile2TabMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// set reduce
job.setReducerClass(PutSortReducer.class);
return 0;
}
除此以外锅论,JobClient.runJob() 還會做一件事讼溺,使用 InputFormat類去計算如何把 input 文件 分割成一份一份,然后交給 mapper 處理最易。inputformat.getSplit() 函數(shù)返回一個 InputSplit 的 List, 每一個 InputSplit 就是一個 mapper 需要處理的數(shù)據(jù)怒坯。
一個 Hadoop Job的 input 既可以是一個很大的 file, 也可以是多個 file; 無論怎樣炫狱,getSplit() 都會計算如何分割 input.
如果是HDFS文件系統(tǒng),我們都知道其可以通過將文件分割為block的形式存放在很多臺電腦上剔猿,使其可以存放很大的文件视译。那么Mapper是如何確定一個HDFS文件中的block存放哪幾臺電腦,有什么數(shù)據(jù)归敬?
inputFormat它實際上是個 interface, 需要 類 來繼承酷含,提供分割 input 的邏輯。
Jobclient 有一個方法叫 setInputFormat(), 通過它汪茧,我們可以告訴 JobTracker 想要使用的 InputFormat 類 是什么椅亚。如果我們不設(shè)置,Hadoop默認的是 TextInputFormat, 它默認為文件在 HDFS上的每一個 Block 生成一個對應(yīng)的 InputSplit. 所以大家使用 Hadoop 時舱污,也可以編寫自己的 input format, 這樣可以自由的選擇分割 input 的算法呀舔,甚至處理存儲在 HDFS 之外的數(shù)據(jù)。
JobTracker 盡量把 mapper 安排在離它要處理的數(shù)據(jù)比較近的機器上扩灯,以便 mapper 從本機讀取數(shù)據(jù)媚赖,節(jié)省網(wǎng)絡(luò)傳輸時間。具體實現(xiàn)是如何實現(xiàn)珠插?
對于每個 map任務(wù), 我們知道它的 split 包含的數(shù)據(jù)所在的主機位置省古,我們就把 mapper 安排在那個相應(yīng)的主機上好了,至少是比較近的host. 你可能會問:split 里存儲的 主機位置是 HDFS 存數(shù)據(jù)的主機丧失,和 MapReduce 的主機 有什么相關(guān)呢豺妓?為了達到數(shù)據(jù)本地性,其實通常把MapReduce 和 HDFS 部署在同一組主機上布讹。
既然一個 InputSplit 對應(yīng)一個 map任務(wù), 那么當 map 任務(wù)收到它所處理數(shù)據(jù)的位置信息琳拭,它就可以從 HDFS 讀取這些數(shù)據(jù)了。
接下來我們再從map函數(shù)看Input
map函數(shù)接受的是一個 key value 對描验。
實際上白嘁,Hadoop 會把每個 mapper 的輸入數(shù)據(jù)再次分割,分割成一個個 key-value對, 然后為每一個 key-value對膘流,調(diào)用Map函數(shù)一次. 為了這一步分割絮缅,Hadoop 使用到另一個類: RecordReader. 它主要的方法是 next(), 作用就是從 InputSplit 讀出一條 key-value對.
RecordReader 可以被定義在每個 InputFormat 類中。當我們通過 JobClient.setInputFormat() 告訴 Hadoop inputFormat 類名稱的時候呼股, RecordReader 的定義也一并被傳遞過來耕魄。
所以整個Input,
1.JobClient輸入輸入文件的存儲位置
2.JobClient通過InputFormat接口可以設(shè)置分割的邏輯,默認是按HDFS文件分割彭谁。
3.Hadoop把文件再次分割為key-value對吸奴。
4.JobTracker負責分配對應(yīng)的分割塊由對應(yīng)的maper處理,同時 RecordReader負責讀取key-value對值。
Mapper
JobClient運行后獲得所需的配置文件和客戶端計算所得的輸入劃分信息则奥。并將這些信息都存放在JobTracker專門為該作業(yè)創(chuàng)建的文件夾中考润。文件夾名為該作業(yè)的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制)读处;
然后輸入劃分信息告訴了JobTracker應(yīng)該為這個作業(yè)啟動多少個map任務(wù)等信息糊治。
JobTracker通過TaskTracker 向其匯報的心跳情況和slot(情況),每一個slot可以接受一個map任務(wù)罚舱,這樣為了每一臺機器map任務(wù)的平均分配井辜,JobTracker會接受每一個TaskTracker所監(jiān)控的slot情況。
JobTracker接收到作業(yè)后馆匿,將其放在一個作業(yè)隊列里,等待作業(yè)調(diào)度器對其進行調(diào)度燥滑,當作業(yè)調(diào)度器根據(jù)自己的調(diào)度算法調(diào)度到該作業(yè)時渐北,會根據(jù)輸入劃分信息為每個劃分創(chuàng)建一個map任務(wù),并將map任務(wù)分配給TaskTracker執(zhí)行铭拧,分配時根據(jù)slot的情況作為標準赃蛛。
TaskTracker每隔一段時間會給JobTracker發(fā)送一個心跳,告訴JobTracker它依然在運行搀菩,同時心跳中還攜帶著很多的信息呕臂,比如當前map任務(wù)完成的進度等信息。當JobTracker收到作業(yè)的最后一個任務(wù)完成信息時肪跋,便把該作業(yè)設(shè)置成“成功”歧蒋。當JobClient查詢狀態(tài)時,它將得知任務(wù)已完成州既,便顯示一條消息給用戶谜洽。
Map通過 RecordReader 讀取Input的key/value對,map根據(jù)用戶自定義的任務(wù)吴叶,運行完畢后阐虚,產(chǎn)生另外一系列 key/value,并將其寫入到Hadoop的內(nèi)存緩沖取中蚌卤,在內(nèi)存緩沖區(qū)中的key/value對按key排序实束,此時會按照reduce partition進行,分到不同partition中逊彭,一旦內(nèi)存滿就會被寫入到本地磁盤的文件里咸灿,這個文件叫spill file。
shuffle
Shuffle是我們不需要編寫的模塊侮叮,但卻是十分關(guān)鍵的模塊析显。
在map中,每個 map 函數(shù)會輸出一組 key/value對, Shuffle 階段需要從所有 map主機上把相同的 key 的 key value對組合在一起,(也就是這里省去的Combiner階段)組合后傳給 reduce主機, 作為輸入進入 reduce函數(shù)里谷异。
Partitioner組件 負責計算哪些 key 應(yīng)當被放到同一個 reduce 里
HashPartitioner類分尸,它會把 key 放進一個 hash函數(shù)里,然后得到結(jié)果歹嘹。如果兩個 key 的哈希值 一樣箩绍,他們的 key/value對 就被放到同一個 reduce 函數(shù)里。我們也把分配到同一個 reduce函數(shù)里的 key /value對 叫做一個reduce partition.
我們看到 hash 函數(shù)最終產(chǎn)生多少不同的結(jié)果, 這個 Hadoop job 就會有多少個 reduce partition/reduce 函數(shù)尺上,這些 reduce函數(shù)最終被JobTracker 分配到負責 reduce 的主機上材蛛,進行處理。
我們知道m(xù)ap階段可能會產(chǎn)生多個spill file 當 Map 結(jié)束時怎抛,這些 spill file 會被 merge 起來,不是 merge 成一個 file卑吭,而是也會按 reduce partition 分成多個。
當 Map tasks 成功結(jié)束時马绝,他們會通知負責的 tasktracker, 然后消息通過 jobtracker 的 heartbeat 傳給 jobtracker. 這樣豆赏,對于每一個 job, jobtracker 知道 map output 和 map tasks 的關(guān)聯(lián)。Reducer 內(nèi)部有一個 thread 負責定期向 jobtracker 詢問 map output 的位置富稻,直到 reducer 得到所有它需要處理的 map output 的位置掷邦。
Reducer 的另一個 thread 會把拷貝過來的 map output file merge 成更大的 file. 如果 map task 被 configure 成需要對 map output 進行壓縮,那 reduce 還要對 map 結(jié)果進行解壓縮椭赋。當一個 reduce task 所有的 map output 都被拷貝到一個它的 host上時抚岗,reduce 就要開始對他們排序了。
排序并不是一次把所有 file 都排序哪怔,而是分幾輪宣蔚。每輪過后產(chǎn)生一個結(jié)果,然后再對結(jié)果排序认境。最后一輪就不用產(chǎn)生排序結(jié)果了件已,而是直接向 reduce 提供輸入。這時元暴,用戶提供的 reduce函數(shù) 就可以被調(diào)用了篷扩。輸入就是 map 任務(wù) 產(chǎn)生的 key value對.
同時reduce任務(wù)并不是在map任務(wù)完全結(jié)束后才開始的,Map 任務(wù)有可能在不同時間結(jié)束茉盏,所以 reduce 任務(wù)沒必要等所有 map任務(wù) 都結(jié)束才開始鉴未。事實上,每個 reduce任務(wù)有一些 threads 專門負責從 map主機復(fù)制 map 輸出(默認是5個)鸠姨。
Reduce
reduce() 函數(shù)以 key 及對應(yīng)的 value 列表作為輸入铜秆,按照用戶自己的程序邏輯,經(jīng)合并 key 相同的 value 值后讶迁,產(chǎn) 生另外一系列 key/value 對作為最終輸出寫入 HDFS连茧。