優(yōu)點
1)MapReduce易于編程
它簡單的實現(xiàn)一些接口读第,就可以完成一個分布式程序兜看,這個分布式程序可以分布到大量廉價的PC機器上運行瘤载。也就是說你寫一個分布式程序帅霜,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點使得MapReduce編程變得非常流行蒜危。
2)良好的擴展性
當(dāng)你的計算資源不能得到滿足的時候虱痕,你可以通過簡單的增加機器來擴展它的計算能力。
3)高容錯性
MapReduce設(shè)計的初衷就是使程序能夠部署在廉價的PC機器上辐赞,這就要求它具有很高的容錯性部翘。比如其中一臺機器掛了,它可以把上面的計算任務(wù)轉(zhuǎn)移到另外一個節(jié)點上運行响委,不至于這個任務(wù)運行失敗新思,而且這個過程不需要人工參與,而完全是由Hadoop內(nèi)部完成的赘风。
4)適合PB級以上海量數(shù)據(jù)的離線處理
可以實現(xiàn)上千臺服務(wù)器集群并發(fā)工作夹囚,提供數(shù)據(jù)處理能力。
缺點
1)不擅長實時計算
MapReduce無法像MySQL一樣邀窃,在毫秒或者秒級內(nèi)返回結(jié)果荸哟。
2)不擅長流式計算
流式計算的輸入數(shù)據(jù)是動態(tài)的,而MapReduce的輸入數(shù)據(jù)集是靜態(tài)的瞬捕,不能動態(tài)變化鞍历。這是因為MapReduce自身的設(shè)計特點決定了數(shù)據(jù)源必須是靜態(tài)的。
3)不擅長DAG(有向無環(huán)圖)計算
多個應(yīng)用程序存在依賴關(guān)系肪虎,后一個應(yīng)用程序的輸入為前一個的輸出劣砍。在這種情況下,MapReduce并不是不能做扇救,而是使用后刑枝,每個MapReduce作業(yè)的輸出結(jié)果都會寫入到磁盤,會造成大量的磁盤IO迅腔,導(dǎo)致性能非常的低下装畅。
普通api
//獲取配置信息和job對象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf)
Hadoop序列化(Writable)
java序列化是一個重量級序列化框架,每個對象被序列化會附帶很多額外信息(各種校驗信息钾挟,Header洁灵,繼承體系等),不便于在網(wǎng)絡(luò)傳輸。
Hadoop序列化特點:
1.緊湊:高效使用存儲空間
2.快速:讀寫數(shù)據(jù)額外開銷少
3.互操作:支持多語言交互
使用:1實現(xiàn)Writable接口
2反序列化會反射空參構(gòu)造方法徽千,創(chuàng)建空參構(gòu)造器
3重寫[反]序列化方法(順序一致)
4bean做key重寫careparable
MapReduce原理
1.一個Job的Map階段并行度由客戶端在提交Job時的切片數(shù)決定
2.一個split切片分配一個MapTask并行處理
3.默認情況下苫费,切片大小=BlockSize
4.切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片
MapReduce工作流程
(1)Read階段:MapTask通過InputFormat獲得的RecordReader双抽,從輸入InputSplit中解析出一個個key/value百框。
(2)Map階段:該節(jié)點主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value牍汹。
(3)Collect收集階段:在用戶編寫map()函數(shù)中铐维,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結(jié)果慎菲。在該函數(shù)內(nèi)部嫁蛇,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中露该。
(4)Spill階段:即“溢寫”睬棚,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上解幼,生成一個臨時文件抑党。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前撵摆,先要對數(shù)據(jù)進行一次本地排序底靠,并在必要時對數(shù)據(jù)進行合并、壓縮等操作特铝。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進行排序暑中,排序方式是,先按照分區(qū)編號Partition進行排序苟呐,然后按照key進行排序痒芝。這樣俐筋,經(jīng)過排序后牵素,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序澄者。
步驟2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時文件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中笆呆。如果用戶設(shè)置了Combiner,則寫入文件之前粱挡,對每個分區(qū)中的數(shù)據(jù)進行一次聚集操作赠幕。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中,其中每個分區(qū)的元信息包括在臨時文件中的偏移量询筏、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小榕堰。如果當(dāng)前內(nèi)存索引大小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中。
(5)Merge階段:當(dāng)所有數(shù)據(jù)處理完成后逆屡,MapTask對所有臨時文件進行一次合并圾旨,以確保最終只會生成一個數(shù)據(jù)文件。
當(dāng)所有數(shù)據(jù)處理完后魏蔗,MapTask會將所有臨時文件合并成一個大文件砍的,并保存到文件output/file.out中,同時生成相應(yīng)的索引文件output/file.out.index莺治。
在進行文件合并過程中廓鞠,MapTask以分區(qū)為單位進行合并。對于某個分區(qū)谣旁,它將采用多輪遞歸合并的方式床佳。每輪合并mapreduce.task.io.sort.factor(默認10)個文件,并將產(chǎn)生的文件重新加入待合并列表中榄审,對文件排序后夕土,重復(fù)以上過程,直到最終得到一個大文件瘟判。
讓每個MapTask最終只生成一個數(shù)據(jù)文件怨绣,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機讀取帶來的開銷。
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù)拷获,并針對某一片數(shù)據(jù)篮撑,如果其大小超過一定閾值,則寫到磁盤上匆瓜,否則直接放到內(nèi)存中赢笨。
(2)Sort階段:在遠程拷貝數(shù)據(jù)的同時,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并驮吱,以防止內(nèi)存使用過多或磁盤上文件過多茧妒。按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)左冬。為了將key相同的數(shù)據(jù)聚在一起桐筏,Hadoop采用了基于排序的策略。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結(jié)果進行了局部排序拇砰,因此梅忌,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可。
(3)Reduce階段:reduce()函數(shù)將計算結(jié)果寫到HDFS上除破。
Shuffle機制
FileInputFormat切片機制
1.按文件內(nèi)容長度切片
2.切片大小默認等于Block大小
3.切片不考慮數(shù)據(jù)集整體牧氮,逐個針對每個文件單獨切片
4.獲取切片信息api
(FileInput)context.getInputSplit.getPath().getName
TextInputFormat
TextInputFormat是FileInputFormat默認的實現(xiàn)類。逐行讀取記錄瑰枫。鍵是該行在整個文件的起始偏移量踱葛,LongWritable類型。值是該行內(nèi)容,不包括任何終止符尸诽,Text類型圾笨。
CombineTextInputFormat
應(yīng)用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中逊谋,這樣擂达,多個小文件就可以交給一個MapTask。
1.虛擬機設(shè)置最大虛擬內(nèi)存
CombineTextInputFormat.setMaxInputSplitSize(job,大小[b]);
2.切片機制
1)切片過程
2maxsize>size>maxsize 切片 分為大小相同的兩塊
size>2maxsize 切片
2)虛擬存儲過程
按字典順序合并切片
3.代碼差異
job.setInputFormatClass(CombineTextInputFormat.class);
CombinTextInput.setMaxInputSplitSize(job, 20971520);
Patition分區(qū)
1.自定義Partitioner繼承Partitioner類胶滋,實現(xiàn)getPartition方法板鬓。
2.驅(qū)動類設(shè)置Partitioner
job.setPartitionerClass()
3.設(shè)置reduceTask數(shù)量
job.setNumReduceTask();
注:getPartition>reduceTask 會生成空的輸出文件;getPartition<reduceTask會拋出異常究恤;reduceTask=1,只有一個輸出文件
Combiner合并
1.Combiner是M和R之外的一種組件
2.Combiner的父類是Reducer
3.Combiner運行在每個MapTask節(jié)點俭令。Reducer是接收全局所有Mapper的輸出結(jié)果
4.Combiner的意義是對每個MapTask的輸出進行局部匯總,減少網(wǎng)絡(luò)傳輸量
5.使用Combiner的前提是不能影響最終業(yè)務(wù)邏輯
6.代碼差異
job.setCombinerClass();
OutputFormat
重寫OutputFormat
重寫RecordWriter
ReduceJoin
map端打標(biāo)簽部宿,reduce端接收合并抄腔。
用(FileSplit)context.getInputSplit()獲取文件信息,進行分類打標(biāo)簽就行了
MapJoin
適合一張表十分小理张,一張表很大的場景赫蛇。
Reduce端處理過多表,容易發(fā)生數(shù)據(jù)傾斜雾叭。在Map端緩存表悟耘,提前處理業(yè)務(wù)邏輯,增加map端業(yè)務(wù)织狐,減少reduce任務(wù)暂幼,盡可能減少數(shù)據(jù)傾斜。
代碼差異:
放入緩存的文件
job.addCacheFile(new URI("hdfs://.../xx.log"))
setup方法里:
//通過緩存文件得到小表數(shù)據(jù)pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
//獲取文件系統(tǒng)對象,并開流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
//通過包裝流轉(zhuǎn)換為reader,方便按行讀取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
ETL(數(shù)據(jù)清洗)
描述數(shù)據(jù)從來源端經(jīng)過抽取移迫,轉(zhuǎn)換旺嬉,加載至目的端的過程 。