MAPREDUCE概念
Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架劝堪,是用戶開發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架;
Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序揉稚,并發(fā)運(yùn)行在一個(gè)hadoop集群上
為什么要MAPREDUCE
- 海量數(shù)據(jù)在單機(jī)上處理因?yàn)橛布Y源限制秒啦,無(wú)法勝任
- 而一旦將單機(jī)版程序擴(kuò)展到集群來(lái)分布式運(yùn)行,將極大增加程序的復(fù)雜度和開發(fā)難度
- 引入mapreduce框架后搀玖,開發(fā)人員可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上余境,而將分布式計(jì)算中的復(fù)雜性交由框架來(lái)處理
設(shè)想一個(gè)海量數(shù)據(jù)場(chǎng)景下的wordcount需求:
單機(jī)版:內(nèi)存受限,磁盤受限灌诅,運(yùn)算能力受限
分布式:
文件分布式存儲(chǔ)(HDFS)
運(yùn)算邏輯需要至少分成2個(gè)階段(一個(gè)階段獨(dú)立并發(fā)芳来,一個(gè)階段匯聚)
運(yùn)算程序如何分發(fā)
程序如何分配運(yùn)算任務(wù)(切片)
兩階段的程序如何啟動(dòng)?如何協(xié)調(diào)猜拾?
整個(gè)程序運(yùn)行過程中的監(jiān)控即舌?容錯(cuò)?重試挎袜?
可見在程序由單機(jī)版擴(kuò)成分布式時(shí)晾浴,會(huì)引入大量的復(fù)雜工作。為了提高開發(fā)效率叶撒,可以將分布式程序中的公共功能封裝成框架,讓開發(fā)人員可以將精力集中于業(yè)務(wù)邏輯务豺。
而mapreduce就是這樣一個(gè)分布式程序的通用運(yùn)算框架
MAPREDUCE框架結(jié)構(gòu)及核心運(yùn)行機(jī)制
結(jié)構(gòu)
一個(gè)完整的mapreduce程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:
- MRAppMaster:負(fù)責(zé)整個(gè)程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
- mapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程
- ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程
MR程序運(yùn)行流程
流程解析
一個(gè)mr程序啟動(dòng)的時(shí)候,最先啟動(dòng)的是MRAppMaster嗦明,MRAppMaster啟動(dòng)后根據(jù)本次job的描述信息笼沥,計(jì)算出需要的maptask實(shí)例數(shù)量,然后向集群申請(qǐng)機(jī)器啟動(dòng)相應(yīng)數(shù)量的maptask進(jìn)程
maptask進(jìn)程啟動(dòng)之后娶牌,根據(jù)給定的數(shù)據(jù)切片范圍進(jìn)行數(shù)據(jù)處理奔浅,主體流程為:
a) 利用客戶指定的inputformat來(lái)獲取RecordReader讀取數(shù)據(jù),形成輸入KV對(duì)
b) 將輸入KV對(duì)傳遞給客戶定義的map()方法诗良,做邏輯運(yùn)算汹桦,并將map()方法輸出的KV對(duì)收集到緩存
c) 將緩存中的KV對(duì)按照K分區(qū)排序后不斷溢寫到磁盤文件MRAppMaster監(jiān)控到所有maptask進(jìn)程任務(wù)完成之后,會(huì)根據(jù)客戶指定的參數(shù)啟動(dòng)相應(yīng)數(shù)量的reducetask進(jìn)程鉴裹,并告知reducetask進(jìn)程要處理的數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))
Reducetask進(jìn)程啟動(dòng)之后舞骆,根據(jù)MRAppMaster告知的待處理數(shù)據(jù)所在位置,從若干臺(tái)maptask運(yùn)行所在機(jī)器上獲取到若干個(gè)maptask輸出結(jié)果文件径荔,并在本地進(jìn)行重新歸并排序督禽,然后按照相同key的KV為一個(gè)組,調(diào)用客戶定義的reduce()方法進(jìn)行邏輯運(yùn)算总处,并收集運(yùn)算輸出的結(jié)果KV狈惫,然后調(diào)用客戶指定的outputformat將結(jié)果數(shù)據(jù)輸出到外部存儲(chǔ)
MapTask并行度決定機(jī)制
maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度
那么鹦马,mapTask并行實(shí)例是否越多越好呢胧谈?其并行度又是如何決定呢?
mapTask并行度的決定機(jī)制
一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定
而客戶端對(duì)map階段并行度的規(guī)劃的基本邏輯為:
將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小荸频,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split)菱肖,然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理
這段邏輯及形成的切片規(guī)劃描述文件,由InputFormat實(shí)現(xiàn)類的getSplits()方法完成旭从,其過程如下圖:
FileInputFormat切片機(jī)制
1稳强、切片定義在InputFormat類中的getSplit()方法
2、FileInputFormat中默認(rèn)的切片機(jī)制:
a) 簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
b) 切片大小遇绞,默認(rèn)等于block大小
c) 切片時(shí)不考慮數(shù)據(jù)集整體键袱,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件
file1.txt 320M
file2.txt 10M
經(jīng)過FileInputFormat的切片機(jī)制運(yùn)算后燎窘,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
3摹闽、FileInputFormat中切片的大小的參數(shù)配置
通過分析源碼,在FileInputFormat中褐健,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個(gè)值來(lái)運(yùn)算決定
minsize:默認(rèn)值:1
配置參數(shù): mapreduce.input.fileinputformat.split.minsize
maxsize:默認(rèn)值:Long.MAXValue
配置參數(shù):mapreduce.input.fileinputformat.split.maxsize blocksize
因此付鹿,默認(rèn)情況下澜汤,切片大小=blocksize
maxsize(切片最大值):
參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小舵匾,而且就等于配置的這個(gè)參數(shù)的值
minsize (切片最小值):
參數(shù)調(diào)的比blockSize大俊抵,則可以讓切片變得比blocksize還大
選擇并發(fā)數(shù)的影響因素:
運(yùn)算節(jié)點(diǎn)的硬件配置
運(yùn)算任務(wù)的類型:CPU密集型還是IO密集型
運(yùn)算任務(wù)的數(shù)據(jù)量
map并行度的經(jīng)驗(yàn)之談
如果硬件配置為212core + 64G,恰當(dāng)?shù)膍ap并行度是大約每個(gè)節(jié)點(diǎn)20-100個(gè)map坐梯,最好每個(gè)map的執(zhí)行時(shí)間至少一分鐘徽诲。
如果job的每個(gè)map或者 reduce task的運(yùn)行時(shí)間都只有30-40秒鐘,那么就減少該job的map或者reduce數(shù)吵血,每一個(gè)task(map|reduce)的setup和加入到調(diào)度器中進(jìn)行調(diào)度谎替,這個(gè)中間的過程可能都要花費(fèi)幾秒鐘,所以如果每個(gè)task都非程8ǎ快就跑完了钱贯,就會(huì)在task的開始和結(jié)束的時(shí)候浪費(fèi)太多的時(shí)間。
配置task的JVM重用*可以改善該問題:
(mapred.job.reuse.jvm.num.tasks侦另,默認(rèn)是1秩命,表示一個(gè)JVM上最多可以順序執(zhí)行的task
數(shù)目(屬于同一個(gè)Job)是1。也就是說一個(gè)task啟一個(gè)JVM)
如果input的文件非常的大褒傅,比如1TB弃锐,可以考慮將hdfs上的每個(gè)block size設(shè)大,比如設(shè)成256MB或者512MB
ReduceTask并行度的決定
reducetask的并行度同樣影響整個(gè)job的執(zhí)行并發(fā)度和執(zhí)行效率樊卓,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同拿愧,Reducetask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
默認(rèn)值是1,手動(dòng)設(shè)置為4 job.setNumReduceTasks(4);
如果數(shù)據(jù)分布不均勻碌尔,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
注意: reducetask數(shù)量并不是任意設(shè)置浇辜,還要考慮業(yè)務(wù)邏輯需求,有些情況下唾戚,需要計(jì)算全局匯總結(jié)果柳洋,就只能有1個(gè)reducetask
盡量不要運(yùn)行太多的reduce task。對(duì)大多數(shù)job來(lái)說叹坦,最好rduce的個(gè)數(shù)最多和集群中的reduce持平熊镣,或者比集群的 reduce slots小。這個(gè)對(duì)于小集群而言募书,尤其重要绪囱。