從這篇文章開始克握,我會開始系統(tǒng)性地輸出在大數(shù)據(jù)踩坑過程中的積累蕾管,后面會涉及到實戰(zhàn)項目的具體操作,目前的規(guī)劃是按照系列來更新菩暗,力爭做到一個系列在5
篇文章之內(nèi)總結(jié)出最核心的干貨掰曾,如果是涉及到理論方面的文章,會以畫圖的方式來講解停团,如果是涉及到操作方面旷坦,會以實際的代碼來演示。
這篇是MapReduce
系列的第一篇佑稠,初識MapReduce
的應用場景秒梅,在文章后面會有關于代碼的演示。
前言
Hadoop
作為Apache
旗下的一個以Java
語言實現(xiàn)的分布式計算開源框架舌胶,其由兩個部分組成捆蜀,一個是分布式的文件系統(tǒng)HDFS
,另一個是批處理計算框架MapReduce
幔嫂。這篇文章作為MapReduce
系列的第一篇文章辆它,會從MapReduce
的產(chǎn)生背景、框架的計算流程履恩、應用場景和演示Demo
來講解娩井,主要是讓大家對MapReduce
的這個批計算框架有個初步的了解及簡單的部署和使用。
目錄
-
MapReduce
的產(chǎn)生背景 -
MapReduce
的計算流程 -
MapReduce
的框架架構(gòu) -
MapReduce
的生命周期 - 應用場景
- 演示
Demo
MapReduce的產(chǎn)生背景
Google
在2004年的時候在 MapReduce: Simplified Data Processing on Large Clusters 這篇論文中提出了MapReduce
的功能特性和設計理念似袁,設計MapReduce
的出發(fā)點就是為了解決如何把大問題分解成獨立的小問題洞辣,再并行解決。例如昙衅,MapReduce
的經(jīng)典使用場景之一就是對一篇長文進行詞頻統(tǒng)計扬霜,統(tǒng)計過程就是先把文章分為一句一句,然后進行分割而涉,最后進行詞的數(shù)量統(tǒng)計著瓶。
MapReduce的架構(gòu)圖
這里的Client和TaskTracker我都使用一個來簡化了,在實際中是會有很個Client和TaskTracker的啼县。
我們來講解下不同的組件作用
- Client
Client
的含義是指用戶使用MapReduce
程序通過Client
來提交任務到Job Tracker
上材原,同時用戶也可以使用Client
來查看一些作業(yè)的運行狀態(tài)。
- Job Tracker
這個負責的是資源監(jiān)控和作業(yè)調(diào)度季眷。JobTracker
會監(jiān)控著TaskTracker
和作業(yè)的健康狀況余蟹,會把失敗的任務轉(zhuǎn)移到其他節(jié)點上,同時也監(jiān)控著任務的執(zhí)行進度子刮、資源使用量等情況威酒,會把這些消息通知任務調(diào)度器窑睁,而調(diào)度器會在資源空閑的時候選擇合適的任務來使用這些資源。
任務調(diào)度器是一個可插拔的模塊葵孤,用戶可以根據(jù)自己的需要來設計相對應的調(diào)度器担钮。
- TaskTracker
TaskTracker
會周期性地通過Hearbeat
來向Job Tracker
匯報自己的資源使用情況和任務的運行進度。會接受來自于JobTaskcker
的指令來執(zhí)行操作(例如啟動新任務尤仍、殺死任務之類的)箫津。
在TaskTracker
中通過的是slot
來進行等量劃分一個節(jié)點上資源量,只用Task
獲得slot
的時候才有機會去運行宰啦。調(diào)度器的作用就是進行將空閑的slot
分配給Task
使用苏遥,可以配置slot
的數(shù)量來進行限定Task上的并發(fā)度。
- Task
Task分為Map Task
和Reduce Task
绑莺,在MapReduce
中的 split
就是一個 Map Task
,split
的大小可以設置的,由 mapred.max.spilt.size
參數(shù)來設置,默認是 Hadoop
中的block
的大小,在Hadoop 2.x
中默認是128M
,在Hadoop 1.x
中默認是64M
惕耕。
在Task
中的設置可以這么設置纺裁,一般來講,會把一個文件設置為一個split
,如果是小文件司澎,那么就會存在很多的Map Task
,這是特別浪費資源的欺缘,如果split
切割的數(shù)據(jù)塊的量大,那么會導致跨節(jié)點去獲取數(shù)據(jù)挤安,這樣也是消耗很多的系統(tǒng)資源的谚殊。
MapReduce的生命周期
一共分為5個步驟:
- 作業(yè)的提交和初始化
由用戶提交作業(yè)之前,需要先把文件上傳到HDFS
上,JobClient
使用upload
來加載關于打包好的jar
包蛤铜,JobClient
會RPC
創(chuàng)建一個JobInProcess
來進行管理任務嫩絮,并且創(chuàng)建一個TaskProcess
來管理控制關于每一個Task
。
- JobTracker調(diào)度任務
JobTracker
會調(diào)度和管理任務,一發(fā)現(xiàn)有空閑資源围肥,會按照一個策略選擇一個合適的任務來使用該資源剿干。
任務調(diào)度器有兩個點:一個是保證作業(yè)的順利運行,如果有失敗的任務時穆刻,會轉(zhuǎn)移計算任務置尔,另一個是如果某一個Task的計算結(jié)果落后于同一個Task的計算結(jié)果時,會啟動另一個Task來做計算氢伟,最后去計算結(jié)果最塊的那個榜轿。
- 任務運行環(huán)境
TaskTracker會為每一個Task來準備一個獨立的JVM從而避免不同的Task在運行過程中的一些影響,同時也使用了操作系統(tǒng)來實現(xiàn)資源隔離防止Task濫用資源朵锣。
- 執(zhí)行任務
每個Task的任務進度通過RPC來匯報給TaskTracker谬盐,再由TaskTracker匯報給JobTracker。
- 任務結(jié)束诚些,寫入輸出的文件到HDFS中设褐。
MapReduce 的計算流程
先來看一張圖,系統(tǒng)地了解下 MapReduce
的運算流程。
為了方便大家理解助析,重新畫了一張新的圖犀被,演示的是關于如何進行把一個長句進行分割,最后進行詞頻的統(tǒng)計(已忽略掉標點符號)外冀。
整個過程就是先讀取文件寡键,接著進行split
切割,變成一個一個的詞雪隧,然后進行 map task
任務西轩,排列出所有詞的統(tǒng)計量,接著 sorting
排序,按照字典序來排脑沿,接著就是進行 reduce task
,進行了詞頻的匯總藕畔,最后一步就是輸出為文件。例如圖中的 spacedong
就出現(xiàn)了兩次庄拇。
其中對應著的是 Hadoop Mapreduce
對外提供的五個可編程組件注服,分別是InputFormat
锯玛、Mapper
妙色、Partitioner
择诈、Reduce
和OutputFormat
梗醇,后續(xù)的文章會詳細講解這幾個組件牍蜂。
用一句話簡單地總結(jié)就是臼节,Mapreduce
的運算過程就是進行拆解-排序-匯總积锅,解決的就是統(tǒng)計的問題江耀,使用的思想就是分治的思想屈张。
MapReduce的應用場景
MapReduce
的產(chǎn)生是為了把某些大的問題分解成小的問題擒权,然后解決小問題后,大問題也就解決了阁谆。那么一般有什么樣的場景會運用到這個呢菜拓?那可多了去,簡單地列舉幾個經(jīng)典的場景笛厦。
- 計算
URL
的訪問頻率
搜索引擎的使用中纳鼎,會遇到大量的URL的訪問,所以裳凸,可以使用 MapReduce
來進行統(tǒng)計贱鄙,得出(URL
,次數(shù))結(jié)果,在后續(xù)的分析中可以使用姨谷。
- 倒排索引
Map
函數(shù)去分析文件格式是(詞逗宁,文檔號)的列表,Reduce
函數(shù)就分析這個(詞梦湘,文檔號)瞎颗,排序所有的文檔號件甥,輸出(詞,list
(文檔號))哼拔,這個就可以形成一個簡單的倒排索引引有,是一種簡單的算法跟蹤詞在文檔中的位置。
- Top K 問題
在各種的文檔分析倦逐,或者是不同的場景中譬正,經(jīng)常會遇到關于 Top K
的問題,例如輸出這篇文章的出現(xiàn)前5
個最多的詞匯檬姥。這個時候也可以使用 MapReduce
來進行統(tǒng)計曾我。
演示Demo
今天的代碼演示從Python
和Java
兩個版本的演示,Python
版本的話便是不使用封裝的包健民,Java
版本的話則是使用了Hadoop
的封裝包抒巢。接下來便進行演示一個MapReduce
的簡單使用,如何進行詞匯統(tǒng)計秉犹。
Java
版本代碼
- 先是準備一個數(shù)據(jù)集蛉谜,包含著已經(jīng)切割好的詞匯,這里我們設置文件的格式是
txt
格式的凤优。文件名是WordMRDemo.txt
悦陋,內(nèi)容是下面簡短的一句話蜈彼,以空格分割開:
hello my name is spacedong welcome to the spacedong thank you
- 引入
Hadoop
的依賴包
//這里使用的是2.6.5的依賴包筑辨,你可以使用其他版本的
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
- 新建
WordMapper.java
文件,代碼的作用是進行以空格的形式進行分詞。
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString();
//StringTokenizer默認按照空格來切
StringTokenizer st = new StringTokenizer(line);
while (st.hasMoreTokens()) {
String world = st.nextToken();
//map輸出
context.write(new Text(world), new IntWritable(1));
}
}
}
- 新建
WordReduce.java
文件幸逆,作用是進行詞匯的統(tǒng)計棍辕。
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> iterator, Context context)
throws java.io.IOException ,InterruptedException {
int sum = 0 ;
for(IntWritable i:iterator){
sum+=i.get();
}
context.write(key, new IntWritable(sum));
}
}
- 新建
WordMRDemo.java
文件,作用是運行Job
还绘,開始分析句子楚昭。
public class WordMRDemo {
public static void main(String[] args) {
Configuration conf = new Configuration();
//設置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set("mapred.job.tracker", "hadoop:9000");
try {
//新建一個Job工作
Job job = new Job(conf);
//設置運行類
job.setJarByClass(WordMRDemo.class);
//設置要執(zhí)行的mapper類
job.setMapperClass(WordMapper.class);
//設置要執(zhí)行的reduce類
job.setReducerClass(WordReduce.class);
//設置輸出key的類型
job.setMapOutputKeyClass(Text.class);
//設置輸出value的類型
job.setMapOutputValueClass(IntWritable.class);
//設置ruduce任務的個數(shù)拍顷,默認個數(shù)為一個(一般reduce的個數(shù)越多效率越高)
//job.setNumReduceTasks(2);
//mapreduce 輸入數(shù)據(jù)的文件/目錄,注意抚太,這里可以輸入的是目錄。
FileInputFormat.addInputPath(job, new Path("F:\\BigDataWorkPlace\\data\\input"));
//mapreduce 執(zhí)行后輸出的數(shù)據(jù)目錄昔案,不能預先存在尿贫,否則會報錯。
FileOutputFormat.setOutputPath(job, new Path("F:\\BigDataWorkPlace\\data\\out"));
//執(zhí)行完畢退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 最后執(zhí)行
WordMRDemo.java
文件踏揣,然后得到的結(jié)果是out
文件夾內(nèi)的內(nèi)容庆亡,它長這個樣子:
out的文件目錄
打開part-r-00000
文件的內(nèi)容如下
Python代碼版本
- 新建
map.py
文件,進行詞匯的切割捞稿。
for line in sys.stdin:
time.sleep(1000)
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
- 新建
red.py
文件又谋,進行詞匯的統(tǒng)計拼缝。
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
- 新建
run.sh
文件,直接運行即可。
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-file ./map.py \
-file ./red.py
以上的是演示demo
的核心代碼彰亥,完整的代碼可以上github
的代碼倉庫上獲取咧七。
倉庫地址為:https://github.com/spacedong/bigDataNotes
以上的文章是MapReduce
系列的第一篇,下篇預告是MapReduce的編程模型
剩愧,敬請期待猪叙!
參考資料:
Hadoop的技術內(nèi)幕:深入解析MapReduce架構(gòu)設計及實現(xiàn)原理