上一篇:083-BigData-11HDFS目錄結(jié)構(gòu)
一、MapReduce入門(mén)
1、MapReduce定義
Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架看尼,是用戶開(kāi)發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架淑倾。
Mapreduce核心功能是將用戶編寫(xiě)的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序河狐,并發(fā)運(yùn)行在一個(gè)hadoop集群上当犯。
2、MapReduce優(yōu)點(diǎn)
1)MapReduce 易于編程割疾。它簡(jiǎn)單的實(shí)現(xiàn)一些接口嚎卫,就可以完成一個(gè)分布式程序,這個(gè)分布式程序可以分布到大量廉價(jià)的PC機(jī)器上運(yùn)行宏榕。也就是說(shuō)你寫(xiě)一個(gè)分布式程序拓诸,跟寫(xiě)一個(gè)簡(jiǎn)單的串行程序是一模一樣的。就是因?yàn)檫@個(gè)特點(diǎn)使得MapReduce編程變得非常流行麻昼。
2)良好的擴(kuò)展性奠支。當(dāng)你的計(jì)算資源不能得到滿足的時(shí)候,你可以通過(guò)簡(jiǎn)單的增加機(jī)器來(lái)擴(kuò)展它的計(jì)算能力抚芦。
3)高容錯(cuò)性倍谜。MapReduce設(shè)計(jì)的初衷就是使程序能夠部署在廉價(jià)的PC機(jī)器上迈螟,這就要求它具有很高的容錯(cuò)性。比如其中一臺(tái)機(jī)器掛了尔崔,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另外一個(gè)節(jié)點(diǎn)上運(yùn)行答毫,不至于這個(gè)任務(wù)運(yùn)行失敗,而且這個(gè)過(guò)程不需要人工參與季春,而完全是由 Hadoop內(nèi)部完成的洗搂。
4)適合PB級(jí)以上海量數(shù)據(jù)的離線處理。它適合離線處理而不適合在線處理载弄。比如像毫秒級(jí)別的返回一個(gè)結(jié)果耘拇,MapReduce很難做到。
3宇攻、MapReduce缺點(diǎn)
MapReduce不擅長(zhǎng)做實(shí)時(shí)計(jì)算惫叛、流式計(jì)算、DAG(有向無(wú)環(huán)圖)計(jì)算尺碰。
1)實(shí)時(shí)計(jì)算挣棕。MapReduce無(wú)法像Mysql一樣,在毫秒或者秒級(jí)內(nèi)返回結(jié)果亲桥。
2)流式計(jì)算洛心。流式計(jì)算的輸入數(shù)據(jù)是動(dòng)態(tài)的,而MapReduce的輸入數(shù)據(jù)集是靜態(tài)的题篷,不能動(dòng)態(tài)變化词身。這是因?yàn)镸apReduce自身的設(shè)計(jì)特點(diǎn)決定了數(shù)據(jù)源必須是靜態(tài)的。
3)DAG(有向無(wú)環(huán)圖)計(jì)算番枚。多個(gè)應(yīng)用程序存在依賴關(guān)系法严,后一個(gè)應(yīng)用程序的輸入為前一個(gè)的輸出。在這種情況下葫笼,MapReduce并不是不能做深啤,而是使用后,每個(gè)MapReduce作業(yè)的輸出結(jié)果都會(huì)寫(xiě)入到磁盤(pán)路星,會(huì)造成大量的磁盤(pán)IO溯街,導(dǎo)致性能非常的低下。
4洋丐、MapReduce核心思想
1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段呈昔。
2)第一個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行友绝,互不相干橄唬。
3)第二個(gè)階段的reduce task并發(fā)實(shí)例互不相干疾棵,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有maptask并發(fā)實(shí)例的輸出。
4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)reduce階段料皇,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個(gè)mapreduce程序,串行運(yùn)行。
5、MapReduce進(jìn)程
一個(gè)完整的mapreduce程序在分布式運(yùn)行時(shí)有三類(lèi)實(shí)例進(jìn)程:
1)MrAppMaster:負(fù)責(zé)整個(gè)程序的過(guò)程調(diào)度及狀態(tài)協(xié)調(diào)催蝗。
2)MapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程。
3)ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程育特。
6丙号、MapReduce編程規(guī)范
用戶編寫(xiě)的程序分成三個(gè)部分:Mapper,Reducer缰冤,Driver(提交運(yùn)行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承自己的父類(lèi)
(2)Mapper的輸入數(shù)據(jù)是KV對(duì)的形式(KV的類(lèi)型可自定義)
(3)Mapper中的業(yè)務(wù)邏輯寫(xiě)在map()方法中
(4)Mapper的輸出數(shù)據(jù)是KV對(duì)的形式(KV的類(lèi)型可自定義)
(5)map()方法(maptask進(jìn)程)對(duì)每一個(gè)<K,V>調(diào)用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類(lèi)
(2)Reducer的輸入數(shù)據(jù)類(lèi)型對(duì)應(yīng)Mapper的輸出數(shù)據(jù)類(lèi)型犬缨,也是KV
(3)Reducer的業(yè)務(wù)邏輯寫(xiě)在reduce()方法中
(4)Reducetask進(jìn)程對(duì)每一組相同k的<k,v>組調(diào)用一次reduce()方法
3)Driver階段
整個(gè)程序需要一個(gè)Drvier來(lái)進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對(duì)象
二棉浸、Hadoop序列化
1怀薛、為什么要序列化?
一般來(lái)說(shuō)迷郑,“活的”對(duì)象只生存在內(nèi)存里枝恋,關(guān)機(jī)斷電就沒(méi)有了。而且“活的”對(duì)象只能由本地的進(jìn)程使用嗡害,不能被發(fā)送到網(wǎng)絡(luò)上的另外一臺(tái)計(jì)算機(jī)焚碌。 然而序列化可以存儲(chǔ)“活的”對(duì)象,可以將“活的”對(duì)象發(fā)送到遠(yuǎn)程計(jì)算機(jī)霸妹。
2十电、什么是序列化?
序列化就是把內(nèi)存中的對(duì)象叹螟,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)(持久化)和網(wǎng)絡(luò)傳輸鹃骂。
反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤(pán)的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對(duì)象罢绽。
3畏线、為什么不用Java的序列化?
Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable)良价,一個(gè)對(duì)象被序列化后寝殴,會(huì)附帶很多額外的信息(各種校驗(yàn)信息,header棚壁,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸栈虚。所以袖外,hadoop自己開(kāi)發(fā)了一套序列化機(jī)制(Writable),精簡(jiǎn)魂务、高效曼验。
4泌射、為什么序列化對(duì)Hadoop很重要?
因?yàn)镠adoop在集群之間進(jìn)行通訊或者RPC調(diào)用的時(shí)候鬓照,需要序列化熔酷,而且要求序列化要快,且體積要小豺裆,占用帶寬要小拒秘。所以必須理解Hadoop的序列化機(jī)制。
序列化和反序列化在分布式數(shù)據(jù)處理領(lǐng)域經(jīng)常出現(xiàn):進(jìn)程通信和永久存儲(chǔ)臭猜。然而Hadoop中各個(gè)節(jié)點(diǎn)的通信是通過(guò)遠(yuǎn)程調(diào)用(RPC)實(shí)現(xiàn)的躺酒,那么RPC序列化要求具有以下特點(diǎn):
1)緊湊:緊湊的格式能讓我們充分利用網(wǎng)絡(luò)帶寬,而帶寬是數(shù)據(jù)中心最稀缺的資
2)快速:進(jìn)程通信形成了分布式系統(tǒng)的骨架蔑歌,所以需要盡量減少序列化和反序列化的性能開(kāi)銷(xiāo)羹应,這是基本的;
3)可擴(kuò)展:協(xié)議為了滿足新的需求變化次屠,所以控制客戶端和服務(wù)器過(guò)程中园匹,需要直接引進(jìn)相應(yīng)的協(xié)議,這些是新協(xié)議劫灶,原序列化方式能支持新的協(xié)議報(bào)文裸违;
4)互操作:能支持不同語(yǔ)言寫(xiě)的客戶端和服務(wù)端進(jìn)行交互;
5浑此、常用數(shù)據(jù)序列化類(lèi)型
常用的數(shù)據(jù)類(lèi)型對(duì)應(yīng)的hadoop數(shù)據(jù)序列化類(lèi)型
6累颂、自定義bean對(duì)象實(shí)現(xiàn)序列化接口(Writable)
1)自定義bean對(duì)象要想序列化傳輸,必須實(shí)現(xiàn)序列化接口凛俱,需要注意以下7項(xiàng)紊馏。
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù)蒲犬,所以必須有空參構(gòu)造
public FlowBean() {
super();
}
(3)重寫(xiě)序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重寫(xiě)反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中朱监,需要重寫(xiě)toString(),可用”\t”分開(kāi)原叮,方便后續(xù)用赫编。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口奋隶,因?yàn)閙apreduce框中的shuffle過(guò)程一定會(huì)對(duì)key進(jìn)行排序擂送。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,從大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
三唯欣、MapReduce框架原理
1嘹吨、MapReduce工作流程
1)流程示意圖
2)流程詳解
上面的流程是整個(gè)mapreduce最全工作流程,但是shuffle過(guò)程只是從第7步開(kāi)始到第16步結(jié)束境氢,具體shuffle過(guò)程詳解蟀拷,如下:
1)maptask收集我們的map()方法輸出的kv對(duì)碰纬,放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤(pán)文件,可能會(huì)溢出多個(gè)文件
3)多個(gè)溢出文件會(huì)被合并成大的溢出文件
4)在溢出過(guò)程中问芬,及合并的過(guò)程中悦析,都要調(diào)用partitioner進(jìn)行分區(qū)和針對(duì)key進(jìn)行排序
5)reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6)reducetask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同maptask的結(jié)果文件此衅,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)
7)合并成大文件后强戴,shuffle的過(guò)程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group炕柔,調(diào)用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率酌泰,原則上說(shuō),緩沖區(qū)越大匕累,磁盤(pán)io的次數(shù)越少陵刹,執(zhí)行速度就越快。
緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整欢嘿,參數(shù):io.sort.mb 默認(rèn)100M衰琐。
2、InputFormat數(shù)據(jù)輸入
- Job提交流程和切片源碼詳解
1)job提交流程源碼詳解
waitForCompletion()
submit();
// 1建立連接
connect();
// 1)創(chuàng)建提交job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn還是遠(yuǎn)程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid 炼蹦,并創(chuàng)建job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計(jì)算切片羡宙,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫(xiě)xml配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2)FileInputFormat源碼解析(input.getSplits(job))
(1)找到你數(shù)據(jù)存儲(chǔ)的目錄。
(2)開(kāi)始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
(3)遍歷第一個(gè)文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計(jì)算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默認(rèn)情況下掐隐,切片大小=blocksize
d)開(kāi)始切狗热,形成第1個(gè)切片:ss.txt—0:128M 第2個(gè)切片ss.txt—128:256M 第3個(gè)切片ss.txt—256M:300M(每次切片時(shí),都要判斷切完剩下的部分是否大于塊的1.1倍虑省,不大于1.1倍就劃分一塊切片)
e)將切片信息寫(xiě)到一個(gè)切片規(guī)劃文件中
f)整個(gè)切片的核心過(guò)程在getSplit()方法中完成匿刮。
g)數(shù)據(jù)切片只是在邏輯上對(duì)輸入數(shù)據(jù)進(jìn)行分片,并不會(huì)再磁盤(pán)上將其切分成分片進(jìn)行存儲(chǔ)探颈。InputSplit只記錄了分片的元數(shù)據(jù)信息熟丸,比如起始位置、長(zhǎng)度以及所在的節(jié)點(diǎn)列表等伪节。
h)注意:block是HDFS物理上存儲(chǔ)的數(shù)據(jù)光羞,切片是對(duì)數(shù)據(jù)邏輯上的劃分。
(4)提交切片規(guī)劃文件到y(tǒng)arn上怀大,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)算開(kāi)啟maptask個(gè)數(shù)纱兑。
3、FileInputFormat切片機(jī)制
1)FileInputFormat中默認(rèn)的切片機(jī)制:
(1)簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
(2)切片大小化借,默認(rèn)等于block大小
(3)切片時(shí)不考慮數(shù)據(jù)集整體潜慎,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件:
file1.txt 320M
file2.txt 10M
經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
2)FileInputFormat切片大小的參數(shù)配置
通過(guò)分析源碼,在FileInputFormat的280行中勘纯,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個(gè)值來(lái)運(yùn)算決定
mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認(rèn)值Long.MAXValue
因此,默認(rèn)情況下钓瞭,切片大小=blocksize驳遵。
maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小山涡,而且就等于配置的這個(gè)參數(shù)的值堤结。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大鸭丛。
3)獲取切片信息API
// 根據(jù)文件類(lèi)型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱(chēng)
String name = inputSplit.getPath().getName();
4竞穷、CombineTextInputFormat切片機(jī)制
關(guān)于大量小文件的優(yōu)化策略
1)默認(rèn)情況下TextInputformat對(duì)任務(wù)的切片機(jī)制是按文件規(guī)劃切片,不管文件多小鳞溉,都會(huì)是一個(gè)單獨(dú)的切片瘾带,都會(huì)交給一個(gè)maptask,這樣如果有大量小文件熟菲,就會(huì)產(chǎn)生大量的maptask看政,處理效率極其低下。
2)優(yōu)化策略
(1)最好的辦法抄罕,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/采集)允蚣,將小文件先合并成大文件,再上傳到HDFS做后續(xù)分析呆贿。
(2)補(bǔ)救措施:如果已經(jīng)是大量小文件在HDFS中了嚷兔,可以使用另一種InputFormat來(lái)做切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中做入,這樣冒晰,多個(gè)小文件就可以交給一個(gè)maptask。
(3)優(yōu)先滿足最小切片大小母蛛,不超過(guò)最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實(shí)現(xiàn)步驟
// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
5翩剪、InputFormat接口實(shí)現(xiàn)類(lèi)
MapReduce任務(wù)的輸入文件一般是存儲(chǔ)在HDFS里面。輸入的文件格式包括:基于行的日志文件彩郊、二進(jìn)制格式文件等前弯。這些文件一般會(huì)很大,達(dá)到數(shù)十GB秫逝,甚至更大恕出。那么MapReduce是如何讀取這些數(shù)據(jù)的呢?下面我們首先學(xué)習(xí)InputFormat接口违帆。
InputFormat常見(jiàn)的接口實(shí)現(xiàn)類(lèi)包括:TextInputFormat浙巫、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等的畴。
1)TextInputFormat
TextInputFormat是默認(rèn)的InputFormat渊抄。每條記錄是一行輸入。鍵K是LongWritable類(lèi)型丧裁,存儲(chǔ)該行在整個(gè)文件中的字節(jié)偏移量护桦。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車(chē)符)煎娇。
以下是一個(gè)示例二庵,比如,一個(gè)分片包含了如下4條文本記錄缓呛。
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
每條記錄表示為以下鍵/值對(duì):
(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
很明顯催享,鍵并不是行號(hào)。一般情況下哟绊,很難取得行號(hào)因妙,因?yàn)槲募醋止?jié)而不是按行切分為分片。
2)KeyValueTextInputFormat
每一行均為一條記錄票髓,被分隔符分割為key兰迫,value【娉疲可以通過(guò)在驅(qū)動(dòng)類(lèi)中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來(lái)設(shè)定分隔符汁果。默認(rèn)分隔符是tab(\t)。
以下是一個(gè)示例玲躯,輸入是一個(gè)包含4條記錄的分片据德。其中——>表示一個(gè)(水平方向的)制表符。
line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise
每條記錄表示為以下鍵/值對(duì):
(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)
此時(shí)的鍵是每行排在制表符之前的Text序列跷车。
3)NLineInputFormat
如果使用NlineInputFormat棘利,代表每個(gè)map進(jìn)程處理的InputSplit不再按block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來(lái)劃分朽缴。即輸入文件的總行數(shù)/N=切片數(shù)(20)善玫,如果不整除,切片數(shù)=商+1密强。
以下是一個(gè)示例茅郎,仍然以上面的4行輸入為例。
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
例如或渤,如果N是2系冗,則每個(gè)輸入分片包含兩行。開(kāi)啟2個(gè)maptask薪鹦。
(0,Rich learning form)
(19,Intelligent learning engine)
另一個(gè) mapper 則收到后兩行:
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
這里的鍵和值與TextInputFormat生成的一樣掌敬。
6惯豆、自定義InputFormat
1)概述
(1)自定義一個(gè)類(lèi)繼承FileInputFormat。
(2)改寫(xiě)RecordReader奔害,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV楷兽。
(3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件。
7华临、并行度決定機(jī)制
1)問(wèn)題引出
maptask的并行度決定map階段的任務(wù)處理并發(fā)度拄养,進(jìn)而影響到整個(gè)job的處理速度。那么银舱,mapTask并行任務(wù)是否越多越好呢?
2)MapTask并行度決定機(jī)制
一個(gè)job的map階段MapTask并行度(個(gè)數(shù))跛梗,由客戶端提交job時(shí)的切片個(gè)數(shù)決定寻馏。
8、MapTask工作機(jī)制
(1)Read階段:Map Task通過(guò)用戶編寫(xiě)的RecordReader核偿,從輸入InputSplit中解析出一個(gè)個(gè)key/value诚欠。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫(xiě)map()函數(shù)處理,并產(chǎn)生一系列新的key/value漾岳。
(3)Collect收集階段:在用戶編寫(xiě)map()函數(shù)中轰绵,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果尼荆。在該函數(shù)內(nèi)部左腔,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner),并寫(xiě)入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中捅儒。
(4)Spill階段:即“溢寫(xiě)”液样,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫(xiě)到本地磁盤(pán)上巧还,生成一個(gè)臨時(shí)文件鞭莽。需要注意的是,將數(shù)據(jù)寫(xiě)入本地磁盤(pán)之前麸祷,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序澎怒,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作阶牍。
溢寫(xiě)階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序喷面,排序方式是,先按照分區(qū)編號(hào)partition進(jìn)行排序走孽,然后按照key進(jìn)行排序乖酬。這樣,經(jīng)過(guò)排序后融求,數(shù)據(jù)以分區(qū)為單位聚集在一起咬像,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫(xiě)入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當(dāng)前溢寫(xiě)次數(shù))中。如果用戶設(shè)置了Combiner县昂,則寫(xiě)入文件之前肮柜,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫(xiě)到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中倒彰,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量审洞、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過(guò)1MB待讳,則將內(nèi)存索引寫(xiě)到文件output/spillN.out.index中芒澜。
(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對(duì)所有臨時(shí)文件進(jìn)行一次合并创淡,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件痴晦。
當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件琳彩,并保存到文件output/file.out中誊酌,同時(shí)生成相應(yīng)的索引文件output/file.out.index。
在進(jìn)行文件合并過(guò)程中露乏,MapTask以分區(qū)為單位進(jìn)行合并碧浊。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式瘟仿。每輪合并io.sort.factor(默認(rèn)100)個(gè)文件箱锐,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后劳较,重復(fù)以上過(guò)程瑞躺,直到最終得到一個(gè)大文件。
讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件兴想,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來(lái)的開(kāi)銷(xiāo)幢哨。
9、Shuffle機(jī)制
Mapreduce確保每個(gè)reducer的輸入都是按鍵排序的嫂便。系統(tǒng)執(zhí)行排序的過(guò)程(即將map輸出作為輸入傳給reducer)稱(chēng)為shuffle捞镰。
10、Partition分區(qū)
0)問(wèn)題引出:要求將統(tǒng)計(jì)結(jié)果按照條件輸出到不同文件中(分區(qū))毙替。比如:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))
1)默認(rèn)partition分區(qū)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默認(rèn)分區(qū)是根據(jù)key的hashCode對(duì)reduceTasks個(gè)數(shù)取模得到的岸售。用戶沒(méi)法控制哪個(gè)key存儲(chǔ)到哪個(gè)分區(qū)。
&是為了不出現(xiàn)負(fù)數(shù)厂画,也可以用 hashcode()%num + num來(lái)代替凸丸,但是位運(yùn)算性能比較高。
2)自定義Partitioner步驟
(1)自定義類(lèi)繼承Partitioner袱院,重寫(xiě)getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號(hào)碼的前三位
String preNum = key.toString().substring(0, 3);
partition = 4;
// 2 判斷是哪個(gè)省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
(2)在job驅(qū)動(dòng)中屎慢,設(shè)置自定義partitioner:
job.setPartitionerClass(CustomPartitioner.class);
(3)自定義partition后瞭稼,要根據(jù)自定義partitioner的邏輯設(shè)置相應(yīng)數(shù)量的reduce task
job.setNumReduceTasks(5);
3)注意:
如果reduceTask的數(shù)量> getPartition的結(jié)果數(shù),則會(huì)多產(chǎn)生幾個(gè)空的輸出文件part-r-000xx腻惠;
如果1<reduceTask的數(shù)量<getPartition的結(jié)果數(shù)环肘,則有一部分分區(qū)數(shù)據(jù)無(wú)處安放,會(huì)Exception集灌;
如果reduceTask的數(shù)量=1悔雹,則不管mapTask端輸出多少個(gè)分區(qū)文件,最終結(jié)果都交給這一個(gè)reduceTask欣喧,最終也就只會(huì)產(chǎn)生一個(gè)結(jié)果文件 part-r-00000腌零;
例如:假設(shè)自定義分區(qū)數(shù)為5,則
(1)job.setNumReduceTasks(1);會(huì)正常運(yùn)行唆阿,只不過(guò)會(huì)產(chǎn)生一個(gè)輸出文件
(2)job.setNumReduceTasks(2);會(huì)報(bào)錯(cuò)
(3)job.setNumReduceTasks(6);大于5益涧,程序會(huì)正常運(yùn)行,會(huì)產(chǎn)生空文件
4)案例實(shí)操
詳見(jiàn)7.2.2 需求2:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)
詳見(jiàn)7.1.2 需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)
11酷鸦、WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會(huì)對(duì)數(shù)據(jù)(按照key)進(jìn)行排序牙咏。該操作屬于Hadoop的默認(rèn)行為臼隔。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要妄壶。默認(rèn)排序是按照字典順序排序摔握,且實(shí)現(xiàn)該排序的方法是快速排序。
對(duì)于Map Task丁寄,它會(huì)將處理的結(jié)果暫時(shí)放到一個(gè)緩沖區(qū)中氨淌,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后,再對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次排序伊磺,并將這些有序數(shù)據(jù)寫(xiě)到磁盤(pán)上盛正,而當(dāng)數(shù)據(jù)處理完畢后,它會(huì)對(duì)磁盤(pán)上所有文件進(jìn)行一次合并屑埋,以將這些文件合并成一個(gè)大的有序文件豪筝。
對(duì)于Reduce Task,它從每個(gè)Map Task上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件摘能,如果文件大小超過(guò)一定閾值续崖,則放到磁盤(pán)上,否則放到內(nèi)存中团搞。如果磁盤(pán)上文件數(shù)目達(dá)到一定閾值严望,則進(jìn)行一次合并以生成一個(gè)更大文件;如果內(nèi)存中文件大小或者數(shù)目超過(guò)一定閾值逻恐,則進(jìn)行一次合并后將數(shù)據(jù)寫(xiě)到磁盤(pán)上像吻。當(dāng)所有數(shù)據(jù)拷貝完畢后峻黍,Reduce Task統(tǒng)一對(duì)內(nèi)存和磁盤(pán)上的所有數(shù)據(jù)進(jìn)行一次合并。
每個(gè)階段的默認(rèn)排序
1)排序的分類(lèi):
(1)部分排序:
MapReduce根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)集排序萧豆。保證輸出的每個(gè)文件內(nèi)部排序奸披。
(2)全排序:
如何用Hadoop產(chǎn)生一個(gè)全局排序的文件?最簡(jiǎn)單的方法是使用一個(gè)分區(qū)涮雷。但該方法在處理大型文件時(shí)效率極低阵面,因?yàn)橐慌_(tái)機(jī)器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構(gòu)洪鸭。
替代方案:首先創(chuàng)建一系列排好序的文件样刷;其次,串聯(lián)這些文件览爵;最后置鼻,生成一個(gè)全局排序的文件。主要思路是使用一個(gè)分區(qū)來(lái)描述輸出的全局排序蜓竹。例如:可以為上述文件創(chuàng)建3個(gè)分區(qū)箕母,在第一分區(qū)中,記錄的單詞首字母a-g俱济,第二分區(qū)記錄單詞首字母h-n, 第三分區(qū)記錄單詞首字母o-z嘶是。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達(dá)reducer之前按鍵對(duì)記錄排序,但鍵所對(duì)應(yīng)的值并沒(méi)有被排序蛛碌。甚至在不同的執(zhí)行輪次中聂喇,這些值的排序也不固定,因?yàn)樗鼈儊?lái)自不同的map任務(wù)且這些map任務(wù)在不同輪次中完成時(shí)間各不相同蔚携。一般來(lái)說(shuō)希太,大多數(shù)MapReduce程序會(huì)避免讓reduce函數(shù)依賴于值的排序。但是酝蜒,有時(shí)也需要通過(guò)特定的方法對(duì)鍵進(jìn)行排序和分組等以實(shí)現(xiàn)對(duì)值的排序誊辉。
(4)二次排序:
在自定義排序過(guò)程中,如果compareTo中的判斷條件為兩個(gè)即為二次排序亡脑。
2)自定義排序WritableComparable
(1)原理分析
bean對(duì)象實(shí)現(xiàn)WritableComparable接口重寫(xiě)compareTo方法芥映,就可以實(shí)現(xiàn)排序
@Override
public int compareTo(FlowBean o) {
// 倒序排列,從大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
12远豺、GroupingComparator分組(輔助排序)
1)對(duì)reduce階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組奈偏。
13、Combiner合并
0)在分布式的架構(gòu)中躯护,分布式文件系統(tǒng)HDFS惊来,和分布式運(yùn)算程序編程框架mapreduce。
HDFS:不怕大文件棺滞,怕很多小文件
mapreduce :怕數(shù)據(jù)傾斜
那么mapreduce是如果解決多個(gè)小文件的問(wèn)題呢裁蚁?
mapreduce關(guān)于大量小文件的優(yōu)化策略
(1) 默認(rèn)情況下矢渊,TextInputFormat對(duì)任務(wù)的切片機(jī)制是按照文件規(guī)劃切片,不管有多少個(gè)小文件枉证,都會(huì)是單獨(dú)的切片矮男,都會(huì)交給一個(gè)maptask,這樣室谚,如果有大量的小文件
就會(huì)產(chǎn)生大量的maptask毡鉴,處理效率極端底下
(2)優(yōu)化策略
最好的方法:在數(shù)據(jù)處理的最前端(預(yù)處理、采集)秒赤,就將小文件合并成大文件猪瞬,在上傳到HDFS做后續(xù)的分析
補(bǔ)救措施:如果已經(jīng)是大量的小文件在HDFS中了,可以使用另一種inputformat來(lái)做切片(CombineFileInputformat)入篮,它的切片邏輯跟TextInputformat
注:CombineTextInputFormat是CombineFileInputformat的子類(lèi)
不同:
它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中陈瘦,這樣,多個(gè)小文件就可以交給一個(gè)maptask了
//如果不設(shè)置InputFormat潮售,它默認(rèn)的用的是TextInputFormat.class
/*CombineTextInputFormat為系統(tǒng)自帶的組件類(lèi)
* setMinInputSplitSize 中的2048是表示n個(gè)小文件之和不能大于2048
* setMaxInputSplitSize 中的4096是 當(dāng)滿足setMinInputSplitSize中的2048情況下 在滿足n+1個(gè)小文件之和不能大于4096
*/
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMinInputSplitSize(job, 2048);
CombineTextInputFormat.setMaxInputSplitSize(job, 4096);
1)輸入數(shù)據(jù):準(zhǔn)備5個(gè)小文件
2)實(shí)現(xiàn)過(guò)程
(1)不做任何處理痊项,運(yùn)行需求1中的wordcount程序,觀察切片個(gè)數(shù)為5
(2)在WordcountDriver中增加如下代碼酥诽,運(yùn)行程序鞍泉,并觀察運(yùn)行的切片個(gè)數(shù)為1
// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m
注:在看number of splits時(shí)盆均,和最大值(MaxSplitSize)有關(guān)塞弊、總體規(guī)律就是和低于最大值是一片漱逸、高于最大值1.5倍+泪姨,則為兩片;高于最大值2倍以上則向下取整饰抒,比如文件大小65MB肮砾,切片最大值為4MB,那么切片為16個(gè).總體來(lái)說(shuō),切片差值不超過(guò)1個(gè)袋坑,不影響整體性能
6)自定義Combiner實(shí)現(xiàn)步驟:
(1)自定義一個(gè)combiner繼承Reducer仗处,重寫(xiě)reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1 匯總操作
int count = 0;
for(IntWritable v :values){
count = v.get();
}
// 2 寫(xiě)出
context.write(key, new IntWritable(count));
}
}
(2)在job驅(qū)動(dòng)類(lèi)中設(shè)置:
job.setCombinerClass(WordcountCombiner.class);
13、ReduceTask工作機(jī)制
1)設(shè)置ReduceTask并行度(個(gè)數(shù))
reducetask的并行度同樣影響整個(gè)job的執(zhí)行并發(fā)度和執(zhí)行效率枣宫,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同婆誓,Reducetask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
//默認(rèn)值是1,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);
2)注意
(1)reducetask=0 也颤,表示沒(méi)有reduce階段洋幻,輸出文件個(gè)數(shù)和map個(gè)數(shù)一致。
(2)reducetask默認(rèn)值就是1翅娶,所以輸出文件個(gè)數(shù)為一個(gè)文留。
(3)如果數(shù)據(jù)分布不均勻好唯,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)reducetask數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求燥翅,有些情況下骑篙,需要計(jì)算全局匯總結(jié)果,就只能有1個(gè)reducetask森书。
(5)具體多少個(gè)reducetask靶端,需要根據(jù)集群性能而定。
(6)如果分區(qū)數(shù)不是1拄氯,但是reducetask為1躲查,是否執(zhí)行分區(qū)過(guò)程。答案是:不執(zhí)行分區(qū)過(guò)程译柏。因?yàn)樵趍aptask的源碼中镣煮,執(zhí)行分區(qū)的前提是先判斷reduceNum個(gè)數(shù)是否大于1。不大于1肯定不執(zhí)行鄙麦。
3)實(shí)驗(yàn):測(cè)試reducetask多少合適典唇。
(1)實(shí)驗(yàn)環(huán)境:1個(gè)master節(jié)點(diǎn),16個(gè)slave節(jié)點(diǎn):CPU:8GHZ胯府,內(nèi)存: 2G
(2)實(shí)驗(yàn)結(jié)論:
表1 改變r(jià)educe task (數(shù)據(jù)量為1GB)
4)ReduceTask工作機(jī)制
(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù)介衔,并針對(duì)某一片數(shù)據(jù),如果其大小超過(guò)一定閾值骂因,則寫(xiě)到磁盤(pán)上炎咖,否則直接放到內(nèi)存中。
(2)Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)寒波,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤(pán)上的文件進(jìn)行合并乘盼,以防止內(nèi)存使用過(guò)多或磁盤(pán)上文件過(guò)多。
(3)Sort階段:按照MapReduce語(yǔ)義俄烁,用戶編寫(xiě)reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)绸栅。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略页屠。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序粹胯,因此,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可辰企。
(4)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫(xiě)到HDFS上风纠。
14、OutputFormat接口實(shí)現(xiàn)類(lèi)
OutputFormat是MapReduce輸出的基類(lèi)牢贸,所有實(shí)現(xiàn)MapReduce輸出都實(shí)現(xiàn)了 OutputFormat接口竹观。下面我們介紹幾種常見(jiàn)的OutputFormat實(shí)現(xiàn)類(lèi)。
1)文本輸出TextOutputFormat
默認(rèn)的輸出格式是TextOutputFormat十减,它把每條記錄寫(xiě)為文本行栈幸。它的鍵和值可以是任意類(lèi)型愤估,因?yàn)門(mén)extOutputFormat調(diào)用toString()方法把它們轉(zhuǎn)換為字符串。
2)SequenceFileOutputFormat
SequenceFileOutputFormat將它的輸出寫(xiě)為一個(gè)順序文件速址。如果輸出需要作為后續(xù) MapReduce任務(wù)的輸入玩焰,這便是一種好的輸出格式,因?yàn)樗母袷骄o湊芍锚,很容易被壓縮昔园。
3)自定義OutputFormat
根據(jù)用戶需求,自定義實(shí)現(xiàn)輸出并炮。
15默刚、自定義OutputFormat
為了實(shí)現(xiàn)控制最終文件的輸出路徑,可以自定義OutputFormat逃魄。
要在一個(gè)mapreduce程序中根據(jù)數(shù)據(jù)的不同輸出兩類(lèi)結(jié)果到不同目錄荤西,這類(lèi)靈活的輸出需求可以通過(guò)自定義outputformat來(lái)實(shí)現(xiàn)。
1)自定義OutputFormat步驟
(1)自定義一個(gè)類(lèi)繼承FileOutputFormat伍俘。
(2)改寫(xiě)recordwriter邪锌,具體改寫(xiě)輸出數(shù)據(jù)的方法write()。
2)實(shí)操案例:癌瘾?
Join多種應(yīng)用
16觅丰、Map join(Distributedcache分布式緩存)
1)使用場(chǎng)景:一張表十分小、一張表很大妨退。
2)解決方案
在map端緩存多張表妇萄,提前處理業(yè)務(wù)邏輯,這樣增加map端業(yè)務(wù)咬荷,減少reduce端數(shù)據(jù)的壓力冠句,盡可能的減少數(shù)據(jù)傾斜。
3)具體辦法:采用distributedcache
(1)在mapper的setup階段萍丐,將文件讀取到緩存集合中轩端。
(2)在驅(qū)動(dòng)函數(shù)中加載緩存放典。
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運(yùn)行節(jié)點(diǎn)
4)實(shí)操案例:
17逝变、Reduce join
1)原理:
Map端的主要工作:為來(lái)自不同表(文件)的key/value對(duì)打標(biāo)簽以區(qū)別不同來(lái)源的記錄。然后用連接字段作為key奋构,其余部分和新加的標(biāo)志作為value壳影,最后進(jìn)行輸出。
Reduce端的主要工作:在reduce端以連接字段作為key的分組已經(jīng)完成弥臼,我們只需要在每一個(gè)分組當(dāng)中將那些來(lái)源于不同文件的記錄(在map階段已經(jīng)打標(biāo)志)分開(kāi)宴咧,最后進(jìn)行合并就ok了。
2)該方法的缺點(diǎn)
這種方式的缺點(diǎn)很明顯就是會(huì)造成map和reduce端也就是shuffle階段出現(xiàn)大量的數(shù)據(jù)傳輸径缅,效率很低掺栅。
3)案例實(shí)操
18烙肺、數(shù)據(jù)清洗(ETL)
3.8 數(shù)據(jù)清洗(ETL)
1)概述
在運(yùn)行核心業(yè)務(wù)Mapreduce程序之前,往往要先對(duì)數(shù)據(jù)進(jìn)行清洗氧卧,清理掉不符合用戶要求的數(shù)據(jù)桃笙。清理的過(guò)程往往只需要運(yùn)行mapper程序,不需要運(yùn)行reduce程序沙绝。
2)實(shí)操案例
19搏明、計(jì)數(shù)器應(yīng)用
Hadoop為每個(gè)作業(yè)維護(hù)若干內(nèi)置計(jì)數(shù)器,以描述多項(xiàng)指標(biāo)闪檬。例如星著,某些計(jì)數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù),使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量粗悯。
1)API
(1)采用枚舉的方式統(tǒng)計(jì)計(jì)數(shù)
enum MyCounter{MALFORORMED,NORMAL}
//對(duì)枚舉定義的自定義計(jì)數(shù)器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)采用計(jì)數(shù)器組虚循、計(jì)數(shù)器名稱(chēng)的方式統(tǒng)計(jì)
context.getCounter("counterGroup", "countera").increment(1);
組名和計(jì)數(shù)器名稱(chēng)隨便起,但最好有意義样傍。
(3)計(jì)數(shù)結(jié)果在程序運(yùn)行后的控制臺(tái)上查看邮丰。
2)案例實(shí)操
20、MapReduce開(kāi)發(fā)總結(jié)
在編寫(xiě)mapreduce程序時(shí),需要考慮的幾個(gè)方面:
1)輸入數(shù)據(jù)接口:InputFormat
默認(rèn)使用的實(shí)現(xiàn)類(lèi)是:TextInputFormat
TextInputFormat的功能邏輯是:一次讀一行文本缭受,然后將該行的起始偏移量作為key烈疚,行內(nèi)容作為value返回。
KeyValueTextInputFormat每一行均為一條記錄斗蒋,被分隔符分割為key,value笛质。默認(rèn)分隔符是tab(\t)泉沾。
NlineInputFormat按照指定的行數(shù)N來(lái)劃分切片。
CombineTextInputFormat可以把多個(gè)小文件合并成一個(gè)切片處理妇押,提高處理效率跷究。
用戶還可以自定義InputFormat。
2)邏輯處理接口:Mapper
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:map() setup() cleanup ()
3)Partitioner分區(qū)
有默認(rèn)實(shí)現(xiàn) HashPartitioner敲霍,邏輯是根據(jù)key的哈希值和numReduces來(lái)返回一個(gè)分區(qū)號(hào)俊马;key.hashCode()&Integer.MAXVALUE % numReduces
如果業(yè)務(wù)上有特別的需求,可以自定義分區(qū)肩杈。
4)Comparable排序
當(dāng)我們用自定義的對(duì)象作為key來(lái)輸出時(shí)柴我,就必須要實(shí)現(xiàn)WritableComparable接口,重寫(xiě)其中的compareTo()方法扩然。
部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序艘儒。
全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce。
二次排序:排序的條件有兩個(gè)界睁。
5)Combiner合并
Combiner合并可以提高程序執(zhí)行效率觉增,減少io傳輸。但是使用時(shí)必須不能影響原有的業(yè)務(wù)處理結(jié)果翻斟。
6)reduce端分組:Groupingcomparator
reduceTask拿到輸入數(shù)據(jù)(一個(gè)partition的所有數(shù)據(jù))后抑片,首先需要對(duì)數(shù)據(jù)進(jìn)行分組,其分組的默認(rèn)原則是key相同杨赤,然后對(duì)每一組kv數(shù)據(jù)調(diào)用一次reduce()方法敞斋,并且將這一組kv中的第一個(gè)kv的key作為參數(shù)傳給reduce的key,將這一組數(shù)據(jù)的value的迭代器傳給reduce()的values參數(shù)疾牲。
利用上述這個(gè)機(jī)制植捎,我們可以實(shí)現(xiàn)一個(gè)高效的分組取最大值的邏輯。
自定義一個(gè)bean對(duì)象用來(lái)封裝我們的數(shù)據(jù)阳柔,然后改寫(xiě)其compareTo方法產(chǎn)生倒序排序的效果焰枢。然后自定義一個(gè)Groupingcomparator,將bean對(duì)象的分組邏輯改成按照我們的業(yè)務(wù)分組id來(lái)分組(比如訂單號(hào))舌剂。這樣济锄,我們要取的最大值就是reduce()方法中傳進(jìn)來(lái)key。
7)邏輯處理接口:Reducer
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:reduce() setup() cleanup ()
8)輸出數(shù)據(jù)接口:OutputFormat
默認(rèn)實(shí)現(xiàn)類(lèi)是TextOutputFormat霍转,功能邏輯是:將每一個(gè)KV對(duì)向目標(biāo)文本文件中輸出為一行荐绝。
SequenceFileOutputFormat將它的輸出寫(xiě)為一個(gè)順序文件。如果輸出需要作為后續(xù) MapReduce任務(wù)的輸入避消,這便是一種好的輸出格式低滩,因?yàn)樗母袷骄o湊,很容易被壓縮岩喷。
用戶還可以自定義OutputFormat恕沫。