Hadoop - MapReduce
MapReduce是一個(gè)框架爽茴,我們可以編寫(xiě)應(yīng)用程序荞胡,以可靠的方式并行處理大量商品硬件的大量數(shù)據(jù)。
原文鏈接:http://blogxinxiucan.sh1.newtouch.com/2017/07/17/Hadoop-MapReduce/
什么是MapReduce砰盐?
MapReduce
是基于java的分布式計(jì)算的處理技術(shù)和程序模型肿仑。MapReduce算法包含兩個(gè)重要任務(wù),即Map和Reduce腥泥。地圖獲取一組數(shù)據(jù)并將其轉(zhuǎn)換為另一組數(shù)據(jù)匾南,其中單個(gè)元素分解為元組(鍵/值對(duì))。其次蛔外,減少任務(wù)蛆楞,將地圖的輸出作為輸入,并將這些數(shù)據(jù)元組合并成一組較小的元組夹厌。按照MapReduce名稱(chēng)的順序臊岸,reduce任務(wù)總是在地圖作業(yè)之后執(zhí)行。
MapReduce的主要優(yōu)點(diǎn)是可以輕松地在多個(gè)計(jì)算節(jié)點(diǎn)上進(jìn)行數(shù)據(jù)處理尊流。在MapReduce模型下帅戒,數(shù)據(jù)處理原語(yǔ)稱(chēng)為映射器和還原器。將數(shù)據(jù)處理應(yīng)用程序分解成映射器和還原器有時(shí)是不重要的崖技。但是逻住,一旦我們?cè)贛apReduce表單中編寫(xiě)應(yīng)用程序,將應(yīng)用程序擴(kuò)展到集群中數(shù)以百計(jì)迎献,甚至數(shù)萬(wàn)臺(tái)計(jì)算機(jī)上的計(jì)算機(jī)只是配置更改瞎访。這種簡(jiǎn)單的可擴(kuò)展性是吸引了許多程序員使用MapReduce模型。
算法
- 一般MapReduce范例通常是將計(jì)算機(jī)發(fā)送到數(shù)據(jù)所在的地方吁恍!
- MapReduce程序分三個(gè)階段執(zhí)行扒秸,分別是地圖階段,洗牌階段和減少階段冀瓦。
地圖階段:地圖或映射器的作業(yè)是處理輸入數(shù)據(jù)伴奥。通常,輸入數(shù)據(jù)是以文件或目錄的形式存儲(chǔ)在Hadoop文件系統(tǒng)(HDFS)中翼闽。輸入文件逐行傳遞給映射器函數(shù)拾徙。映射器處理數(shù)據(jù)并創(chuàng)建幾個(gè)小塊數(shù)據(jù)。
減少階段:這個(gè)階段是洗牌階段和減少階段的組合感局。Reducer的工作是處理來(lái)自映射器的數(shù)據(jù)尼啡。在處理之后,它產(chǎn)生一組新的輸出询微,將其存儲(chǔ)在HDFS中崖瞭。
- 在MapReduce作業(yè)期間,Hadoop將Map和Reduce任務(wù)發(fā)送到集群中的相應(yīng)服務(wù)器撑毛。
- 該框架管理數(shù)據(jù)傳遞的所有細(xì)節(jié)书聚,如發(fā)布任務(wù),驗(yàn)證任務(wù)完成,以及在節(jié)點(diǎn)之間圍繞集群復(fù)制數(shù)據(jù)寺惫。
- 大多數(shù)計(jì)算發(fā)生在具有本地磁盤(pán)上的數(shù)據(jù)的節(jié)點(diǎn)上疹吃,從而減少網(wǎng)絡(luò)流量。
- 在完成給定任務(wù)后西雀,集群收集并減少數(shù)據(jù)以形成適當(dāng)?shù)慕Y(jié)果萨驶,并將其發(fā)送回Hadoop服務(wù)器。
MapReduce算法
輸入和輸出(Java Perspective)
MapReduce框架在<key艇肴,value>
對(duì)上運(yùn)行腔呜,即框架將作業(yè)的輸入視為一組<key,value>
對(duì)再悼,并生成一組<key核畴,value>
對(duì)作為作業(yè)的輸出,可以想象不同的類(lèi)型冲九。
關(guān)鍵和值類(lèi)應(yīng)該由框架以序列化的方式谤草,因此需要實(shí)現(xiàn)Writable接口。另外莺奸,關(guān)鍵類(lèi)必須實(shí)現(xiàn)Writable-Comparable接口丑孩,以便于框架進(jìn)行排序。MapReduce作業(yè)的輸入和輸出類(lèi)型:(輸入)<k1灭贷,v1> - > map - > <k2温学,v2> - > reduce - > <k3,v3>(輸出)
甚疟。
0 | 輸入 | 產(chǎn)量 |
---|---|---|
地圖 | <k1??仗岖,v1> |
列表(<k2,v2>)
|
減少 | <k2览妖,list(v2)> |
列表(<k3轧拄,v3> ) |
術(shù)語(yǔ)
- PayLoad - 應(yīng)用程序?qū)崿F(xiàn)Map和Reduce功能,并構(gòu)成工作的核心黄痪。
- Mapper - Mapper將輸入鍵/值對(duì)映射到一組中間鍵/值對(duì)紧帕。
- NamedNode - 管理Hadoop分布式文件系統(tǒng)(HDFS)的節(jié)點(diǎn)盔然。
- DataNode - 數(shù)據(jù)在進(jìn)行任何處理之前提前呈現(xiàn)的節(jié)點(diǎn)桅打。
- MasterNode - JobTracker運(yùn)行的節(jié)點(diǎn),并接收來(lái)自客戶(hù)端的作業(yè)請(qǐng)求愈案。
- SlaveNode - 運(yùn)行Map和Reduce程序的節(jié)點(diǎn)挺尾。
- JobTracker - 調(diào)度作業(yè)并跟蹤分配作業(yè)到任務(wù)跟蹤器。
- Task Tracker - 跟蹤任務(wù)并向JobTracker報(bào)告狀態(tài)站绪。
- Job - 程序是跨數(shù)據(jù)集執(zhí)行映射器和還原器遭铺。
- Task - 在一片數(shù)據(jù)上執(zhí)行Mapper或Reducer。
- Task Attempt - 嘗試在SlaveNode上執(zhí)行任務(wù)的特定實(shí)例。
示例場(chǎng)景
以下是有關(guān)組織的電力消耗的數(shù)據(jù)魂挂。它包含每年的電力消耗和各年的年平均值甫题。
如果以上數(shù)據(jù)作為輸入,我們必須編寫(xiě)應(yīng)用程序來(lái)處理它涂召,并產(chǎn)生結(jié)果坠非,例如查找最大使用年份,最小使用年數(shù)等果正。這是一個(gè)有限數(shù)量記錄的程序員的過(guò)程炎码。他們只需編寫(xiě)邏輯來(lái)產(chǎn)生所需的輸出,并將數(shù)據(jù)傳遞給寫(xiě)入的應(yīng)用程序秋泳。
但是潦闲,考慮到自形成以來(lái),特定國(guó)家所有大型工業(yè)的電力消耗量的數(shù)據(jù)迫皱。
當(dāng)我們編寫(xiě)應(yīng)用程序來(lái)處理這樣的批量數(shù)據(jù)時(shí)歉闰,
- 他們需要很多時(shí)間來(lái)執(zhí)行。
- 當(dāng)我們從數(shù)據(jù)源到網(wǎng)絡(luò)服務(wù)器等移動(dòng)數(shù)據(jù)時(shí)卓起,會(huì)有一個(gè)沉重的網(wǎng)絡(luò)流量新娜。 為了解決這些問(wèn)題,我們提供了MapReduce框架既绩。
輸入數(shù)據(jù)
上述數(shù)據(jù)保存為sample.txt并作為輸入給出概龄。輸入文件如下圖所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例程序
以下是使用MapReduce框架的樣本數(shù)據(jù)的程序饲握。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens())
{
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
將上述程序另存為ProcessUnits.java
私杜。下面說(shuō)明程序的編譯和執(zhí)行。
流程單位編制和執(zhí)行計(jì)劃
讓我們假設(shè)我們?cè)贖adoop用戶(hù)的主目錄(例如/ home / hadoop)中救欧。
按照以下步驟編譯并執(zhí)行上述程序衰粹。
步驟1
以下命令是創(chuàng)建一個(gè)目錄來(lái)存儲(chǔ)編譯的java類(lèi)。
$ mkdir units
第2步
下載Hadoop-core-1.2.1.jar笆怠,用于編譯和執(zhí)行MapReduce程序铝耻。訪(fǎng)問(wèn)以下鏈接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1
下載jar。讓我們假設(shè)下載的文件夾是/ home / hadoop /蹬刷。
步驟3
以下命令用于編譯ProcessUnits.java
程序并為程序創(chuàng)建一個(gè)jar瓢捉。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
步驟4
以下命令用于在HDFS中創(chuàng)建輸入目錄。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步驟5
以下命令用于在HDFS的輸入目錄中復(fù)制名為sample.txt的輸入文件办成。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
步驟6
以下命令用于驗(yàn)證輸入目錄中的文件泡态。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步驟7
以下命令用于通過(guò)從輸入目錄中獲取輸入文件來(lái)運(yùn)行Eleunit_max應(yīng)用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段時(shí)間迂卢,直到文件被執(zhí)行某弦。執(zhí)行后桐汤,如下圖所示,輸出將包含輸入分割數(shù)靶壮,Map任務(wù)數(shù)怔毛,reducer任務(wù)數(shù)等。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
步驟8
以下命令用于驗(yàn)證輸出文件夾中的結(jié)果文件腾降。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步驟9
以下命令用于查看Part-00000文件中的輸出馆截。此文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是MapReduce程序生成的輸出蜂莉。
1981 34
1984 40
1985 45
步驟10
以下命令用于將輸出文件夾從HDFS復(fù)制到本地文件系統(tǒng)進(jìn)行分析蜡娶。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要命令
所有Hadoop命令都由$ HADOOP_HOME / bin / hadoop命令調(diào)用。沒(méi)有任何參數(shù)運(yùn)行Hadoop腳本會(huì)打印所有命令的描述映穗。
用法:hadoop [--config confdir]命令
下表列出了可用的選項(xiàng)及其說(shuō)明窖张。
選項(xiàng) | 描述 |
---|---|
namenode -format | 格式化DFS文件系統(tǒng)。 |
secondarynamenode | 運(yùn)行DFS輔助節(jié)點(diǎn)蚁滋。 |
namenode | 運(yùn)行DFS namenode宿接。 |
datanode | 運(yùn)行DFS datanode。 |
dfsadmin | 運(yùn)行DFS管理客戶(hù)機(jī)辕录。 |
mradmin | 運(yùn)行Map-Reduce管理客戶(hù)端睦霎。 |
fsck | 運(yùn)行DFS文件系統(tǒng)檢查實(shí)用程序。 |
fs | 運(yùn)行通用文件系統(tǒng)用戶(hù)客戶(hù)機(jī)走诞。 |
balancer | 運(yùn)行集群平衡實(shí)用程序副女。 |
oiv | 將離線(xiàn)fsimage查看器應(yīng)用到fsimage。 |
fetchdt | 從NameNode獲取一個(gè)委托令牌蚣旱。 |
jobtracker | 運(yùn)行MapReduce作業(yè)跟蹤器節(jié)點(diǎn)碑幅。 |
pipes | 運(yùn)行管道工作。 |
tasktracker | 運(yùn)行MapReduce任務(wù)跟蹤器節(jié)點(diǎn)塞绿。 |
historyserver | 將作業(yè)歷史記錄服務(wù)器作為獨(dú)立守護(hù)程序運(yùn)行沟涨。 |
job | 操縱MapReduce作業(yè)。 |
queue | 獲取有關(guān)JobQueues的信息异吻。 |
version | 打印版本裹赴。 |
jar <jar> | 運(yùn)行一個(gè)jar文件。 |
distcp <srcurl> <desturl> | 遞歸復(fù)制文件或目錄诀浪。 |
distcp2 <srcurl> <desturl> | DistCp版本2棋返。 |
archive -archiveName NAME -p | 創(chuàng)建hadoop存檔。 |
<parent path> <src>* <dest> | |
classpath | 打印獲取Hadoop jar和所需庫(kù)所需的類(lèi)路徑笋妥。 |
daemonlog | 獲取/設(shè)置每個(gè)守護(hù)進(jìn)程的日志級(jí)別 |
如何與MapReduce工作進(jìn)行交互
用法:hadoop作業(yè)[GENERIC_OPTIONS]
以下是Hadoop作業(yè)中可用的通用選項(xiàng)懊昨。
GENERIC_OPTIONS | 描述 |
---|---|
-submit <job-file> | 提交作業(yè) |
-status <job-id> | 打印地圖并減少完成百分比和所有作業(yè)計(jì)數(shù)器。 |
-counter <job-id> <group-name> <countername> | 打印計(jì)數(shù)器值春宣。 |
-kill <job-id> | 殺死了這個(gè)工作酵颁。 |
-events <job-id> <fromevent-#> <#-of-events> | 打印給定范圍的jobtracker收到的事件的詳細(xì)信息。 |
-history [all] <jobOutputDir> - history < jobOutputDir> | 打印作業(yè)詳細(xì)信息月帝,失敗并殺死提示詳細(xì)信息躏惋。可以通過(guò)指定[all]選項(xiàng)來(lái)查看有關(guān)作業(yè)的更多詳細(xì)信息嚷辅,例如為每個(gè)任務(wù)執(zhí)行的成功任務(wù)和任務(wù)嘗試簿姨。 |
-list[all] | 顯示所有作業(yè)。-list僅顯示尚未完成的作業(yè)簸搞。 |
-kill-task <task-id> | 殺死任務(wù)扁位。殺死的任務(wù)不計(jì)入失敗的嘗試。 |
-fail-task <task-id> | 無(wú)法完成任務(wù) 失敗的任務(wù)會(huì)計(jì)入失敗的嘗試次數(shù)趁俊。 |
-set-priority <job-id> <priority> | 更改作業(yè)的優(yōu)先級(jí)域仇。允許的優(yōu)先級(jí)值為VERY_HIGH,HIGH寺擂,NORMAL暇务,LOW,VERY_LOW |
查看工作狀態(tài)
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
查看工作輸出歷史
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
殺死工作
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004