MapReduce是一個用于處理海量數(shù)據(jù)的分布式計算框架菩掏。
- 這個框架解決了 ? 數(shù)據(jù)分布式存儲
?作業(yè)調(diào)度霎烙、
? 容錯募寨、
? 機器間通信等復(fù)雜問題
MapReduce的核心思想叁怪,分而治之
分:map
? 把復(fù)雜的問題分解為若干“簡單的 任務(wù)”
合:reduce
上面這幅圖就是mapreduce的工作原理
以詞頻統(tǒng)計為例审葬。
詞頻統(tǒng)計就是統(tǒng)計一個單詞在所有文本中出現(xiàn)的次數(shù),在Hadoop中的事例程序就是wordcount奕谭,俗稱hadoop編程的"hello world".因為我們有多個文本涣觉,所以可以并行的統(tǒng)計每個文本中單詞出現(xiàn)的個數(shù),然后最后進行合計血柳。
所以這個可以很好地體現(xiàn)map官册,reduce的過程。
1)首先文檔的數(shù)據(jù)記錄(如文本中的行难捌,或數(shù)據(jù)表格中的行)是以“鍵值對”的形式傳入map 函數(shù)膝宁,然后map函數(shù)對這些鍵值對進行處理(如統(tǒng)計詞頻),然后輸出到中間結(jié)果根吁。
2)在鍵值對進入reduce進行處理之前员淫,必須等到所有的map函數(shù)都做完,所以既為了達到這種同步又提高運行效率击敌,在mapreduce中間的過程引入了barrier(同步障)
在負(fù)責(zé)同步的同時完成對map的中間結(jié)果的統(tǒng)計介返,包括 a. 對同一個map節(jié)點的相同key的value值進行合并,b. 之后將來自不同map的具有相同的key的鍵值對送到同一個reduce進行處理。
3)在reduce階段圣蝎,每個reduce節(jié)點得到的是從所有map節(jié)點傳過來的具有相同的key的鍵值對刃宵。reduce節(jié)點對這些鍵值進行合并。
4)Combiner 節(jié)點負(fù)責(zé)完成上面提到的將同一個map中相同的key進行合并徘公,避免重復(fù)傳輸组去,從而減少傳輸中的通信開銷。
5)Partitioner節(jié)點負(fù)責(zé)將map產(chǎn)生的中間結(jié)果進行劃分步淹,確保相同的key到達同一個reduce節(jié)點.
編程模型
? 借鑒函數(shù)式的編程方式
? 用戶只需要實現(xiàn)兩個函數(shù)接口:
? Map(in_key, in_value)-> (out_key, intermediate_value) list
? Reduce (out_key, intermediate_value list) ->out_value list
兩個重要的進程
-
JobTracker
主進程从隆,負(fù)責(zé)接收客戶作業(yè)提交,調(diào)度任務(wù)到作節(jié)點上運行缭裆,并提供諸如監(jiān)控工作節(jié)點狀態(tài)及任務(wù)進度等 管理功能键闺,一個MapReduce集群有一個jobtracker,一般運行在可靠的硬件上澈驼。
? tasktracker是通過周期性的心跳來通知jobtracker其當(dāng)前的健康狀態(tài)辛燥,每一次心跳包含了可用的map和 reduce任務(wù)數(shù)目、占用的數(shù)目以及運行中的任務(wù)詳細(xì)信息缝其。Jobtracker利用一個線程池來同時處理心跳和 客戶請求挎塌。 -
TaskTracker
? 由jobtracker指派任務(wù),實例化用戶程序内边,在本地執(zhí)行任務(wù)并周期性地向jobtracker匯報 狀態(tài)榴都。在每一個工 作節(jié)點上永遠只會有一個tasktracker
? JobTracker一直在等待JobClient提交作業(yè)
? TaskTracker每隔3秒向JobTracker發(fā)送心跳詢問有沒有任務(wù)可做,如果有漠其,讓
其派發(fā)任務(wù)給它執(zhí)行
? Slave主動向master拉生意
wordCount實例代碼(python實現(xiàn))
案例文件結(jié)構(gòu)
run.sh文件
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar" //python代碼實現(xiàn)嘴高,引入streaming
#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \ //設(shè)置通過streaming方式提交
-input $INPUT_FILE_PATH_1 \ //上面定義的,指向/1.data和屎,原始文件(數(shù)據(jù)源)
-output $OUTPUT_PATH \ //輸出路徑
-mapper "python map_new.py" \ //指定如何執(zhí)行map
-reducer "python red_new.py" \ //指定如何執(zhí)行reducer
-file ./map_new.py \ //通過下面兩個配置文件拴驮,把本地的代碼分發(fā)到集群的map和Reducer上去
-file ./red_new.py
a.txt
111
222
333
map_new.py
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
cat a.txt | python map_new.py。通過管道的方式柴信,a.txt 是數(shù)據(jù)源套啤,標(biāo)準(zhǔn)輸入到map。
red_new.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
head -2 The_Man_of_Property.txt | python map_new.py | sort -k1 | python red_new.py > result.local
這段代碼模擬了數(shù)據(jù)進入map随常,中間sort之后再進入red潜沦,最后輸出result.local
啟動集群。進入hadoop/bin 目錄下
./start-all.sh
然后hadoop fs -out 1.data / 把數(shù)據(jù)源上傳到HDFS
bash run.sh執(zhí)行streaming方式提交的腳本线罕。
MapReduce計算框架-執(zhí)行流程
File:
文件要存儲在HDFS中止潮,每個文件切分成多個一定大小(默認(rèn)64M)的Block(默認(rèn)3個備份)存儲在多個節(jié)點(DataNode)上
文件數(shù)據(jù)內(nèi)容:
We are studying at badou.\n
We are studyPinagrtiatitonbeardou.\n
......
InputFormat:
MR框架基礎(chǔ)類之一
? 數(shù)據(jù)分割(Data Splits)
? S記pli錄t讀取Sp器lit(RSepcliotrdReader)
例子:
數(shù)據(jù)格定義,如果以“\n”分割每條記錄钞楼,以空格區(qū)分一個目標(biāo)單詞
Shuffle “we are studying at badou.”為一條記錄
“are”“at”等為一個目標(biāo)單詞
Split:
實際上每個split包含后一個Block中開頭部分的數(shù)據(jù)(解決記錄跨Block問題)
例子:
比如記錄 “we are studing at badou./n"
跨越存儲在兩個Block中,那么這條記錄屬于前一個Block對應(yīng)的split
RecordReader:(RR)
每讀取一條記錄袄琳,調(diào)用一次map函數(shù)
例子
比如询件,記錄“we are studying at badou." 作為參數(shù)v燃乍,調(diào)用map(v)
然后繼續(xù)這個過程,讀取議案一條記錄知道split尾部宛琅。
Map:
比如記錄”we are studying at badou"
調(diào)用執(zhí)行一次map("we are studying at badou")
在內(nèi)存中增加數(shù)據(jù):
{"we":1}
{"are":1}
......
Shuffle:
Partion,Sort,Spill,Meger,Combiner.......
神奇發(fā)生的地方刻蟹,性能優(yōu)化大有可為的地方!
Partitioner:
決定數(shù)據(jù)由哪個Reducer處理嘿辟,從而分區(qū)
比如采用Hash法舆瘪。
MemoryBuffer
內(nèi)存緩沖區(qū),每個map的結(jié)果和partition處理的key value結(jié)果都保存在緩存中
緩沖區(qū)大泻炻住:默認(rèn)100M
溢寫閾值:100M*0.8 = 80M
緩沖區(qū)中的數(shù)據(jù):partition key value 三元組數(shù)據(jù)
{“1”, “are” : 1}
{“2”, “at” : 1}
{“1”, “we” : 1}
Spill:
內(nèi)存緩沖區(qū)達到閾值時英古,溢寫spill線程鎖住這80M 的緩沖區(qū),開始將數(shù)據(jù)寫出到本地磁盤中昙读,然后釋 放內(nèi)存召调。
每次溢寫都生成一個數(shù)據(jù)文件。 溢出的數(shù)據(jù)到磁盤前會對數(shù)據(jù)進行key排序sort蛮浑, 以及合并combiner
發(fā)送相同Reduce的key數(shù)量唠叛,會拼接到一起,減少 partition的索引數(shù)量沮稚。
Sort:
緩沖區(qū)數(shù)據(jù)按照key進行排序
Combiner:
數(shù)據(jù)合并艺沼,相同的key的數(shù)據(jù),value值合并蕴掏,減少輸 出傳輸量 Combiner函數(shù)事實上是reducer函數(shù)澳厢,滿足 combiner處理不影響{sum,max等}最終reduce的 結(jié)果時囚似,可以極大提升性能
{“1”剩拢, “are”, 1} {“1”饶唤, “are”徐伐, 1} {“1”, “we”募狂, 1}==》
{“1”办素, “are”, 2} {“1”祸穷, “we”性穿, 1}
Reducer
多個reduce任務(wù)輸入的數(shù)據(jù)都屬于不同的partition,因此結(jié)果數(shù)據(jù)的key不會重復(fù)雷滚。
合并reduce的輸出文件即可得到最終的結(jié)果需曾。
MapReduce物理配置
? 文件句柄個數(shù) – ulimit
? cpu
– 多核
? 內(nèi)存
– 8G以上
? 合適的slot
– 單機map、reduce個數(shù)
– mapred.tasktracker.map.tasks.maximum(默認(rèn)2)
– mapreduce.tasktracker.tasks.reduce.maximum(默認(rèn)2) – 內(nèi)存限制
– cpu核數(shù)-1
– 多機集群分離
? 磁盤情況
– 合適單機多磁盤
– mapred.local.dir和dfs.data.dir
? 確定map任務(wù)數(shù)時依次優(yōu)先參考如下幾個原則:
– 每個map任務(wù)使用的內(nèi)存不超過800M,盡量在500M以下
– 每個map任務(wù)運行時間控制在大約20分鐘呆万,最好1-3分鐘
– 每個map任務(wù)處理的最大數(shù)據(jù)量為一個HDFS塊大小商源,一個map任務(wù)處理的輸入不能跨文件
– map任務(wù)總數(shù)不超過平臺可用的任務(wù)槽位
? 配置加載的問題
– 簡單配置通過提交作業(yè)時-file分發(fā)
– 復(fù)雜較大配置
? 傳入hdfs
? map中打開文件讀取
? 建立內(nèi)存結(jié)構(gòu)
? map個數(shù)為split的份數(shù)
? 壓縮文件不可切分
? 非壓縮文件和sequence文件可以切分
? dfs.block.size決定block大小
? 確定reduce任務(wù)數(shù)時依次優(yōu)先參考如下幾個方面:
– 每個reduce任務(wù)使用的內(nèi)存不超過800M,盡量在500M以下
– 每個reduce任務(wù)運行時間控制在大約20分鐘谋减,最好1-3分鐘 – 整個reduce階段的輸入數(shù)據(jù)總量
– 每個reduce任務(wù)處理的數(shù)據(jù)量控制在500MB以內(nèi)
– map任務(wù)數(shù)與reduce任務(wù)數(shù)的乘積
– 輸出數(shù)據(jù)要求
? reduce個數(shù)設(shè)置
– mapred.reduce.tasks – 默認(rèn)為1
? reduce個數(shù)太少 – 單次執(zhí)行慢
– 出錯再試成本高
? reduce個數(shù)太多 – shuffle開銷大
– 輸出大量小文件