MapReduce是一個數(shù)據(jù)處理的編程模型屈梁。這個模型很簡單嗤练,但也不是簡單到不能夠支持一些有用的語言榛了。Hadoop能夠運行以多種語言寫成的MapReduce程序。在這一章中煞抬,我們將看看怎樣用Java,Ruby,Python語言來寫同一個例子霜大。更重要的是,MapReduce程序天生并發(fā)運行革答,這就相當(dāng)于把能夠進(jìn)行大數(shù)據(jù)分析的工具交到了某個擁有足夠多機器的人手里战坤。
氣候數(shù)據(jù)集
在我們的例子中,將會寫一個程序來挖掘天氣數(shù)據(jù)残拐。天氣傳感器每一個小時都會在全球的許多地方收集數(shù)據(jù)途茫,并且也收集了大量的日志數(shù)據(jù)。這些數(shù)據(jù)非常適合于用MapReduce分析溪食。因為我們想要處理所有數(shù)據(jù)慈省,并且這些數(shù)據(jù)是半結(jié)構(gòu)化的和面向記錄的。
數(shù)據(jù)格式
我們所使用的數(shù)據(jù)來自于國家氣候數(shù)據(jù)中心或稱為NCDC眠菇。數(shù)據(jù)以行形式ASCII格式存儲,每一行一條記錄袱衷。這種格式支持豐富的氣象屬性集合捎废,其中許多屬性是可選的,長度可變的致燥。簡便起見登疗,我們僅僅關(guān)注基本的屬性,如溫度嫌蚤。溫度總是有值并且長度固定辐益。
示例2-1顯示了一行記錄,并且將主要的屬性進(jìn)行了注釋脱吱。這一行記錄被分成了多行智政,每個屬性一行。真實文件中箱蝠,這些屬性都會被放進(jìn)一行续捂,并且沒有分隔符。
示例:2-1
0057
332130 # USAF 天氣基站標(biāo)識
99999 # WBAN 天氣基站標(biāo)識
19500101 # 觀察日期
0300 # 觀察時間
4
+51317 # 緯度 (角度 x 1000)
+028783 # 經(jīng)度 (角度 x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 風(fēng)向 (角度)
1 # 質(zhì)量碼
N
0072
1
00450 # 天空最高高度 (米)
1 # 質(zhì)量碼
C
N
010000 # 可見距離 (米)
1 # 質(zhì)量碼
N
9
-0128 # 空氣溫度 (攝氏度 x 10)
1 # 質(zhì)量碼
-0139 # 露點溫度 (攝氏度 x 10)
1 # 質(zhì)量碼
10268 # 大氣壓 (百帕 x 10)
1 # 質(zhì)量碼
數(shù)據(jù)文件按照日期和天氣基站整理宦搬。從1901到2001牙瓢,每一年都有一個目錄文件。每一個目錄文件中包括每一個天氣基站收集到的當(dāng)年氣候數(shù)據(jù)的壓縮文件间校。例如1990年部分文件:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
由于有成千上萬個天氣基站矾克,所以每一年都由大量的相關(guān)小文件組成。通常處理少量的大文件更容易和有效憔足。所以這些數(shù)據(jù)需要被預(yù)處理胁附,使每一年的所有記錄都被放到一個文件中(附錄C中有詳細(xì)的方法說明)酒繁。
使用Unix工具分析
如何獲取每一年的全球最高溫度呢?我們首先不使用Hadoop工具來回答這個問題汉嗽。
這將會為我們提供一個性能基準(zhǔn)線和檢查我們往后的結(jié)果是否準(zhǔn)確的方法欲逃。
經(jīng)典的處理行結(jié)構(gòu)數(shù)據(jù)的工具是awk。示例2-2向我們展示了如何獲取每一年全球最高溫度饼暑。
示例2-2
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
這個腳本循環(huán)處理已經(jīng)壓縮的年文件稳析,首先輸出年度值,然后使用awk處理每一個文件弓叛。awk腳本從這些數(shù)據(jù)中提取出空氣溫度和質(zhì)量碼彰居。空氣溫度通過加0轉(zhuǎn)換成整數(shù)撰筷,下一步陈惰,判斷溫度(溫度9999在NCDC中表示沒檢測到溫度)和質(zhì)量碼是否有效。質(zhì)量碼表示此溫度值是否準(zhǔn)確或者錯誤毕籽。如果溫度值沒有問題抬闯,則與目前為止最高溫度相比較,如果比目前最高溫度高关筒,則更新最高溫度溶握。當(dāng)文件中所有行被處理之后,END塊被執(zhí)行蒸播,打印出最高溫度睡榆。下面看看部分運行結(jié)果:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
源文件中的溫度值被擴(kuò)大了10倍,所以1901年的最高溫度是31.7攝氏度袍榆,由于在20世紀(jì)初讀取到的氣候值非常有限胀屿,所以這個結(jié)果只能是近似真實。在硬件是單個超大型高CPU EC2實例計算中跑完整個世紀(jì)的數(shù)據(jù)花了42分鐘包雀。
為了提高處理速度宿崭,我們需要并行運行部分程序。理論上馏艾,我們很容易想到可以使用計算機中所有可用的線程并行處理不同的年份數(shù)據(jù)劳曹。但是這樣仍然存在一些問題。
首先琅摩,將整個處理工作進(jìn)程等分為相同的部分并不簡單或明顯铁孵。在這個例子中,不同的年份的文件大小不一樣房资,并且有的差別很大蜕劝。所有一些處理進(jìn)程將會完成地早一些,一些將會晚一些。即時完成早的進(jìn)程再處理其它工作岖沛,整個運行時間仍然被最大的文件限制暑始。一個更好的途徑是將輸入數(shù)據(jù)分成大小相等的塊,并且處理每一個數(shù)據(jù)塊婴削。雖然這樣可能造成更多的工作量廊镜。
第二,將每一個獨立的處理結(jié)果合并在一起需要額外處理工作唉俗。在這個例子中嗤朴,每一年的處理結(jié)果都是相互獨立的。這些結(jié)果會被連接在一起虫溜,并且按年排序雹姊。如果通過數(shù)據(jù)量大小數(shù)據(jù)塊途徑,合并將更加容易出錯衡楞。就這個例子而言吱雏,某一年的數(shù)據(jù)可能被分成多個數(shù)據(jù)塊,每一個數(shù)據(jù)塊都單獨處理瘾境,并得到每一塊的最高溫度歧杏。最后,我們還需要找到某年中這些塊中最高溫度中的最高溫度作為這一年的最高溫度迷守。
第三得滤,你仍然會被單個計算機的處理能力限制。如果用單個計算機中所有的處理器盒犹,最快的處理時間是20分鐘,那么眨业,你不可能更快急膀。而且有的數(shù)據(jù)集超過單個計算機的處理能力。當(dāng)使用多臺計算機一起處理時龄捡,一些其它的因素又會影響性性能卓嫂,主要有協(xié)調(diào)性和可靠性兩類。誰來執(zhí)行所有的作業(yè)聘殖?我們將怎么處理失敗的進(jìn)程晨雳?
所以,雖然并行處理是可行的奸腺,但卻是不那么容易控制的餐禁,是復(fù)雜的。使用像Hadoop這樣的框架來處理這些問題極大地幫助了我們突照。
使用Hadoop分析數(shù)據(jù)
為了充分利用Hadoop提供的并行處理優(yōu)勢帮非,我們需要將我們的查詢寫在一個MapReduce作業(yè)中。在本地的,小數(shù)據(jù)量地測試后末盔,我們將能夠在集群中運行它筑舅。
Map和Reduce
MapReduce將處理過程分成兩階段,map階段和reduce階段陨舱。每階段將key-value鍵值對做為輸入和輸出翠拣。開發(fā)者可以選擇輸入輸出參數(shù)類型,也能指定兩個函數(shù):map函數(shù)和reduce函數(shù)游盲。
map階段的輸入數(shù)據(jù)是原始的NCDC數(shù)據(jù)误墓。我們選擇文本格式。文本中的每一行表示一條文本記錄背桐。key值是行開頭距離當(dāng)前文件開頭的位移优烧,但是我們不需要它,忽略即可链峭。
map函數(shù)很簡單畦娄。因為我們僅關(guān)心年份和溫度,所以獲取每行的年度和溫度即可弊仪,其它屬性不需要熙卡。這個例子中,僅僅是一個數(shù)據(jù)準(zhǔn)備階段励饵,以某種方法準(zhǔn)備reduce函數(shù)能夠處理的數(shù)據(jù)驳癌。map函數(shù)還是一個丟棄壞記錄的地方,例如那些沒有測量到的役听,不準(zhǔn)備的或錯誤的溫度颓鲜。
為了展現(xiàn)map怎么樣工作的,選取少量的輸入數(shù)據(jù)進(jìn)行說明(為了適應(yīng)頁面寬度典予,一些沒有使用到的列用省略號表示)
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
這些行以key-value的形式提供給map函數(shù):
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
關(guān)鍵值是行的位移甜滨,在map函數(shù)中我們可以忽略它。map函數(shù)僅僅需要獲取到年度和溫度值(以粗體表示的數(shù)據(jù))瘤袖,然后輸出衣摩。輸出的時候?qū)囟戎缔D(zhuǎn)換成整數(shù)。
(1950, 0)
(1950, 22)
(1950, ?11)
(1949, 111)
(1949, 78)
map的輸出結(jié)果在被送往reduce函數(shù)之前被MapReduce框架按照關(guān)鍵字排序合并處理捂敌。所以在進(jìn)行下一步之前艾扮,reduce函數(shù)會接收到如下數(shù)據(jù):
(1949, [111, 78])
(1950, [0, 22, ?11])
如上所示,每一年的所有溫度值都合并到一個列表中占婉。reduce函數(shù)所要做的就是遍歷每一年的溫度泡嘴,然后找到最高溫度。
(1949, 111)
(1950, 22)
以上就是最終的輸出:每一年的最高溫度逆济。
JAVA MapReduce
在知道了MapReduce程序怎么樣工作了之后滞磺,下一步是用代碼實現(xiàn)它。我們需要做三件事情:map函數(shù)莱褒,reduce函數(shù)击困,運行作業(yè)的代碼。map功能以Mapper抽象類表示
广凸,它申明了一個map()抽象方法阅茶。示例2-3顯示了map函數(shù)的實現(xiàn)。
示例2-3
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
Mapper類是一個泛型谅海,有四個形參脸哀,分別表示輸入key,輸入值,輸出key,和map函數(shù)輸出值類型扭吁。就當(dāng)前的例子來說撞蜂,輸入key是一個長整型的位移,輸入值是一行文本侥袜,輸出key是年份蝌诡,輸出會是是空氣溫度(整數(shù))。Hadoop使用它自己的基本類型集而不使用JAVA內(nèi)建的基本類型枫吧。因為Hadoop自己的基本類型對網(wǎng)絡(luò)序列化進(jìn)行了優(yōu)化浦旱。這些基本類型可以在 org.apache.hadoop.io pack‐
age中找到。這里我們使用 LongWritable類型九杂,它表示長文本類型颁湖,對應(yīng)了Java的String類型,又使用了 IntWritable類型例隆,對應(yīng)于Java的Integer類型爷狈。
map函數(shù)被傳了一個key值和一個value值,我們把包含輸入的一行文本轉(zhuǎn)換成Java String類型數(shù)據(jù)裳擎,并使用String的SubString方法取到我們感興趣的列值。
map函數(shù)也提供了一個Context實例思币,以便將輸出結(jié)果寫入其中鹿响。在我們的這個例子中,我們把年份作為文本類型Key值寫到Context中谷饿,把溫度封閉成IntWritable類型也寫入Context.并且只有溫度有效并且質(zhì)量碼顯示當(dāng)前溫度的獲取是正常的時候才寫入惶我。
reduce功能類似地用Reduce抽象類表示,實例類見示例2-4
示例2-4
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Reduce抽象類也是一個泛型類博投,也具有四個形參绸贡。reduce函數(shù)的輸入類型必須匹配map的輸出類型,即Text和IntWritable.此例子中,reduce函數(shù)的輸出是Text和IntWritable類型听怕,分別表示年份與當(dāng)前年份最高溫度捧挺。通過遍歷溫度值,將當(dāng)前溫度值與最高溫度比較來找到當(dāng)前年份的最高溫度尿瞭。
第三部分是運行MapReduce作業(yè)的代碼闽烙,見示例2-5.
示例2-5
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Job對象指明運行一個作業(yè)所需要的所有設(shè)置以及讓你控制作業(yè)如何執(zhí)行。當(dāng)我們在一個Hadoop集群上運行這個作業(yè)的時候声搁,我們需要將代碼打包成JAR文件黑竞,Hadoop會把JAR文件在集群中分發(fā)。我們可以通過setJarByClass方法指定類文件疏旨,而不需要顯示指明JAR文件的名字很魂。Hadoop會搜索包含setJarByClass指定的類的相關(guān)JAR文件。
創(chuàng)建了一個實例Job后檐涝,指定輸入和輸出文件路徑遏匆。通過調(diào)用 FileInputFormat 的靜態(tài)方法addInputPath()指定輸入路徑,此路徑可以是一個文件骤铃,也可以是一個目錄拉岁。如果是一個目錄,輸入的數(shù)據(jù)包含此目錄下所有文件惰爬。還可以是文件類型喊暖。就像方法名所表示的那樣,addInputPath()可以被調(diào)用多次以便添加多個輸入路徑撕瞧。
輸出路徑通過FileOutputFormat 的靜態(tài)方法setOutputPath()指定陵叽。輸出路徑僅可以指定一次。它指定了一個目錄丛版。reduce會把它的輸出結(jié)果的文件放到這個目錄下巩掺。這個目錄在運行Hadoop之前不應(yīng)該存在。因為如果存在Hadoop將會報錯并不會執(zhí)行作業(yè)页畦。這是為了預(yù)防數(shù)據(jù)丟失胖替。因為如果不小心覆蓋了同一目錄下其它作業(yè)的輸出結(jié)果將是非常令人懊惱的。
下一步使用 setMapperClass() 和setReducerClass()方法指定map和reduce類豫缨。setOutputKeyClass()和 setOutputValueClass()方法控制reduce函數(shù)輸出參數(shù)的類型独令。必須和Reduce抽象類中參數(shù)的一致。map輸出參數(shù)的類型默認(rèn)是相同的類型好芭。所以如果map和reduce函數(shù)有相同的輸出參數(shù)類型時就不需要特別指定了燃箭。就像我們這個例子這樣。然而舍败,如果它們不相同招狸,就需要通過 setMapOutputKeyClass() 和setMapOutputValueClass()函數(shù)來指定map的輸出參數(shù)類型敬拓。
map函數(shù)的輸入?yún)?shù)類型通過輸入格式指定。我們沒有顯示地設(shè)置裙戏,因為我們使用了默認(rèn)的TextInputFormat格式乘凸。
在指定了自定義的map和reduce函數(shù)之后,就可以準(zhǔn)備執(zhí)行作業(yè)了。Job類的waitForCompletion()方法用于提交作業(yè),并用等待作業(yè)完成战惊。這個方法需要一個參數(shù),用以表示是否將作業(yè)日志詳細(xì)信息輸出到控制臺冀偶。如果為true,就輸出渔嚷。這個方法的返回值是一個布爾類型进鸠,用于表示作業(yè)的執(zhí)行成功與否。成功返回true,失敗返回false形病。這里我們將成功與否轉(zhuǎn)換成了0或1客年。
這部分使用的Java MapReduce API以及這本書所使用的所有API被稱為"New API"。
它代替了功能相同的老的API漠吻。這兩種API區(qū)別請查看附錄D量瓜,并且附錄D有如何在這兩種API轉(zhuǎn)換的相關(guān)建議。當(dāng)然你也能在這兒用舊的API完成相同功能的獲取每年最高溫度的應(yīng)用途乃。
測試運行
在完成MapReduce作業(yè)編寫之后绍傲,正常情況下使用少量數(shù)據(jù)集測試運行,方便立即檢測出代碼問題耍共。首先以脫機模式安裝Hadoop(附錄A中有說明),這個模式下Hadoop使用本地文件生成本地作業(yè)運行烫饼。可以在這本書的網(wǎng)站上找到安裝和編譯這個示例的說明试读。
讓我們使用上面五行數(shù)據(jù)運行這個作業(yè)杠纵,輸出結(jié)果稍微調(diào)整了一下以便適應(yīng)頁面,并且有一些行被刪除了钩骇。
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop MaxTemperature input/ncdc/sample.txt output
14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your application
with ToolRunner to remedy this.
14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job:
job_local26392882_0001
14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job:
http://localhost:8080/
14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner:
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0
28 | Chapter 2: MapReduce
is allowed to commit now
14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task
'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/
hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber
mode : false
14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100%
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed
successfully
14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=377168
FILE: Number of bytes written=828464
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=61
Input split bytes=129
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=61
Reduce input records=5
Reduce output records=2
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=39
Total committed heap usage (bytes)=226754560
File Input Format Counters
Bytes Read=529
File Output Format Counters
Bytes Written=29
當(dāng)我們在hadoop命令第一個參數(shù)填寫一個類名的時候比藻,會啟動一個JVM(JAVA虛擬機),并執(zhí)行這個類。hadoop命令添加hadoop庫和庫所依賴的其它庫文件到Classpath變量倘屹,并且加載hadoop配置银亲。為了將應(yīng)用中的類文件添加到classpath中,我們定義了一個 HADOOP_CLASSPATH環(huán)境變量唐瀑,來加載我們所寫的hadoop腳本。
當(dāng)以本地(脫機)模式運行時插爹,這本書中所有程序都假設(shè)你已經(jīng)以這種方法設(shè)置了 HADOOP_CLASSPATH環(huán)境變量哄辣。這條命令應(yīng)該在示例代碼所在目錄運行请梢。
作業(yè)運行日志提供了一些有用的信息。例如力穗,我們能看到這個作業(yè)被給了一個作業(yè)ID:job_local26392882_0001.運行了一個map任務(wù)和一個reduce任務(wù)(ID分別是:attempt_local26392882_0001_m_000000_0 和attempt_local26392882_0001_r_000000_0)毅弧。知道作業(yè)和任務(wù)ID在調(diào)用MapReduce作業(yè)時將很有用。
最后還有一部分名為"Counters"的數(shù)據(jù)当窗,這部分?jǐn)?shù)據(jù)是Hadoop為每一個作業(yè)生成的統(tǒng)計信息够坐。這些信息將對于檢查處理的數(shù)據(jù)與預(yù)期的數(shù)據(jù)是否一樣非常有用。例如崖面,我們能知道通過系統(tǒng)各部分的記錄數(shù)元咙,5條map輸入記錄,5條map輸出記錄(可以看出map對于每一條有效的輸入記錄都有對應(yīng)的一條輸出記錄)巫员。還能看出以key值分成2組的5條reduce輸入記錄庶香,以及2條輸出記錄。
輸出結(jié)果寫入輸出目錄简识。每一個reduce函數(shù)生成一個輸出文件赶掖。這個作業(yè)只有一個reduce函數(shù),所以只產(chǎn)生一個文件七扰。名稱是part-r-00000:
% cat output/part-r-00000
1949 111
1950 22
這個結(jié)果跟之前手工計算的一致奢赂。這個結(jié)果表示1949年最高溫度是11.1攝氏度,1950是2.2度颈走。
擴(kuò)展
你已經(jīng)知道了MapReduce怎么樣處理少量數(shù)據(jù)∩旁睿現(xiàn)在是時候全局看系統(tǒng),并且對于大數(shù)據(jù)處理的數(shù)據(jù)流疫鹊。簡單來說袖瞻,到目前為止,我們所舉的例子都用的本地計算機的文件拆吆。更進(jìn)一步聋迎,我們將要在分布計算機(特別是HDFS,我們將在下一節(jié)中學(xué)到)中存儲文件數(shù)據(jù)枣耀。使用Hadoop的資源管理系統(tǒng)YARN(第4節(jié)),Hadoop會將MapReduce計算過程分發(fā)到各個計算機中計算霉晕,而這些計算機每一臺都保存著一部分?jǐn)?shù)據(jù)。讓我們來看看這些是如何發(fā)生的捞奕。
工作流
首先牺堰,MapReduce作業(yè)是客戶端需要去執(zhí)行的工作單元。它包括輸入數(shù)據(jù)颅围,MapReduce程序以及一些配置信息伟葫。Hadoop會把這個作業(yè)分成多個任務(wù)步驟執(zhí)行。有兩種類型:map任務(wù)和reduce任務(wù)院促。這些任務(wù)通過YARN計劃調(diào)度并在分布式系統(tǒng)節(jié)點上運行筏养。如果一個任務(wù)失敗了斧抱,YARN會把它放到另外一個節(jié)點上重新運行。
Hadoop會把輸入數(shù)據(jù)化分成大小相同的數(shù)據(jù)片斷(被稱為輸入片或均片),Hadoop會為每一個片創(chuàng)建一個map任務(wù)渐溶。map任務(wù)會一條條記錄地循環(huán)執(zhí)行用戶自定義的map函數(shù)辉浦,直到這個片斷中所有記錄處理完畢。
很多片斷意味著處理每一個片斷的時間比一次處理整個輸入數(shù)據(jù)的時間少茎辐。所以當(dāng)我們并發(fā)地處理這些片斷宪郊,而這些片斷很小時,能夠更好地負(fù)載均衡拖陆。所以一個性能好的機器比一個性能差些的機器能夠相應(yīng)在處理更多地片斷弛槐。即使這些機器性能完全一樣,失敗的處理進(jìn)程或者同時運行的作業(yè)使負(fù)載均衡成為可能(Even if the machines are identical, failed processes or other jobs running
concurrently make load balancing desirable)慕蔚。并且當(dāng)片斷細(xì)粒度越高丐黄,負(fù)載均衡的質(zhì)量也會越高。
別外一方面孔飒,如果片斷過于小灌闺,管理片斷和創(chuàng)建Map任務(wù)所花費的時候則會成為整個作業(yè)執(zhí)行時間的瓶頸。對于大多數(shù)作業(yè)來說坏瞄,一個好的片斷大小趨向于一個HDFS塊的大小桂对,默認(rèn)是128M。這個大小可以被集群(Cluster)改變(集群的改為會影響在機群中新創(chuàng)建的所有文件),或者文件新建時就指定鸠匀。
Hadoop盡量會在輸入數(shù)據(jù)存放的HDFS那個節(jié)點運行Map任務(wù)蕉斜,因為這樣不會占用寶貴的集群帶寬資源。這被稱為本地優(yōu)化缀棍。然后宅此,有時候擁有HDFS數(shù)據(jù)的節(jié)點上正運行著其它Map任務(wù),作業(yè)調(diào)試器會嘗試著在當(dāng)前集群其它空閑的節(jié)點上創(chuàng)建一個Map任務(wù)爬范。極少情況下父腕,會到其它集群中的某個節(jié)點中創(chuàng)建一個Map任務(wù),這樣就需要集群間網(wǎng)絡(luò)傳輸青瀑。這三種可能性在圖表2-2中展示:現(xiàn)在清楚了為什么最優(yōu)的片斷大小是設(shè)置成HDFS塊大小璧亮。因為這樣做是數(shù)據(jù)能被存儲在一個節(jié)點上的最大數(shù)據(jù)量。如果一個片斷跨兩個塊大小斥难,任何一個HDFS節(jié)點都不太可能儲存兩個塊大小的數(shù)據(jù)量枝嘶,這個勢必會造成片斷的部分?jǐn)?shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)秸谶\行Map任務(wù)的節(jié)點上。這明顯的比直接在本地運行Map任務(wù)的性能差一些哑诊。
Map任務(wù)會將它的輸出結(jié)果寫入本地硬盤中群扶,而不是HDFS,為什么要這樣做?因為Map的輸出只是中間的輸出竞阐,后續(xù)它將會被Reduce任務(wù)處理產(chǎn)生最終輸出結(jié)果提茁。一旦作業(yè)完成了,Map的輸出結(jié)果可以被丟棄馁菜,所以將Map的輸出結(jié)果復(fù)制到HDFS中不必要的。如果在Reduce利用Map的輸入結(jié)果前铃岔,節(jié)點運行失敗了汪疮。Hadoop將在自動的在另外一個節(jié)點中重新執(zhí)行這個Map任務(wù),重新產(chǎn)生輸入結(jié)果毁习。
Reduce任務(wù)沒有像Map任務(wù)那樣利用數(shù)據(jù)本地化的優(yōu)勢智嚷,一個Reduce任務(wù)的輸入往往來自所有Map任務(wù)的輸出。就拿目前的例子來說纺且,我們有一個Reduce任務(wù)盏道,其輸入數(shù)據(jù)來自所有的Map任務(wù)。因此存儲的Map結(jié)果必須通過網(wǎng)絡(luò)傳輸?shù)竭\行Reduce的節(jié)點上载碌。之后這些傳過來的數(shù)據(jù)會被合并猜嘱,并傳到用戶自定義的reduce函數(shù)中執(zhí)行。Reduce的輸出結(jié)果正常都會存儲在HDFS中嫁艇。就像第三節(jié)說明的朗伶,對于存儲Reduce輸出結(jié)果的每一個HDFS塊,第一份復(fù)制的數(shù)據(jù)會存儲在本地步咪,其它復(fù)制的數(shù)據(jù)會存儲在其它集群可靠的HDFS塊中论皆。因此存儲Reduce的輸出結(jié)果確定需要消耗網(wǎng)絡(luò)帶寬,但也僅僅和一個正常的HDFS輸出通道消耗的一樣多猾漫。
擁有一個Reduce任務(wù)的數(shù)據(jù)流在圖表2-3中展示点晴。虛線框表示節(jié)點,虛線箭頭表示節(jié)點內(nèi)的數(shù)據(jù)傳輸悯周。實線的箭頭表示節(jié)點間的數(shù)據(jù)傳輸粒督。Reduce任務(wù)的個數(shù)不是由輸入數(shù)據(jù)量的大小決定,而是單獨指定的队橙。在"默認(rèn)的MapReduce作業(yè)"那一節(jié)坠陈,你將會看到對于給定的作業(yè),如何選擇Reduce任務(wù)的個數(shù)捐康。
當(dāng)有多個reduce時仇矾,map任務(wù)會將它們的結(jié)果分區(qū),每一個map任務(wù)會為每一個reduce任務(wù)創(chuàng)建一個分區(qū)解总。每一個分區(qū)里可以用很多個key和ke關(guān)聯(lián)的值贮匕,但某一個key的所有記錄必須在同一個分區(qū)里。分區(qū)這個過程能夠被用戶自定義的函數(shù)控制花枫,但一般來講刻盐,默認(rèn)的分區(qū)函數(shù)已經(jīng)能夠工作地很好了掏膏。它使用哈希函數(shù)來將key分類。
多個reduce的一般數(shù)據(jù)流程圖在圖表2-4顯示敦锌。這張圖表清楚地顯示了map和reduce之間的數(shù)據(jù)流為什么被通俗地叫做"洗牌"馒疹。"洗牌"的過程比這個圖表顯示的更復(fù)雜。你將會在"洗牌和排序"這一節(jié)中看到乙墙,調(diào)整它可以對作業(yè)的運行時間有很大影響颖变。最后,也可以有零個reduce任務(wù)听想。這種情況發(fā)生在僅并發(fā)執(zhí)行map任務(wù)就能夠輸出結(jié)果的時候腥刹。此時數(shù)據(jù)的傳輸僅發(fā)生在map的輸出結(jié)果寫入HDFS的時候(如圖2-5)。
組合函數(shù)(Combiner Function)
許多MapReduce作業(yè)執(zhí)行時間被集群的帶寬資源限制汉买。所以值得我們?nèi)ケM量減少map與reduce之間傳輸?shù)臄?shù)據(jù)量衔峰。Hadoop允許用戶指定一個組合函數(shù),以便在map輸出結(jié)果后執(zhí)行蛙粘。這個組合函數(shù)的輸出形成了reduce任務(wù)的輸入垫卤。由于組合函數(shù)是優(yōu)化函數(shù),所以Hadoop不能確保為每一個map輸出記錄調(diào)用多少次組合函數(shù)出牧。也就是說葫男,零次,一次或多次調(diào)用組合函數(shù)崔列,reduce最終都應(yīng)該輸出相同的結(jié)果梢褐。
組合函數(shù)的這種特性限制了它能被使用的業(yè)務(wù)情形。用一個例子能更好說明赵讯。假設(shè)最大的溫度盈咳,例如1950的,被兩個map任務(wù)處理,因為1950年數(shù)據(jù)分布在不同的片斷中边翼。假如第一個map任務(wù)輸出如下結(jié)果:
(1950,0)
(1950,20)
(1950,10)
第二個map輸出如下結(jié)果:
(1950,25)
(1950,15)
Hadoop將會用以上所有值組成列表傳給reduce
(1950,[0,20,10,25,15])
輸出:
(1950,25)
既然25是當(dāng)前列表最大的值鱼响。我們就像使用reduce函數(shù)一樣用一個組合函數(shù)找出每一個map結(jié)果中的最大溫度值。這樣的話组底,reduce得到以下值:
(1950,[20,25])
并且產(chǎn)生與之前相同的結(jié)果丈积。我們可以用一種更簡潔的方式表示上面的過程:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
然而,并不是所有這樣的處理都是合適的债鸡,例如江滨,我們要計算平均溫度,就不能在組合函數(shù)中計算平均溫度厌均,因為:mean(0, 20, 10, 25, 15) = 14,但是mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15唬滑。
組合函數(shù)不能代替Reduce函數(shù)(Reduce函數(shù)仍然需要用來處理來自不同map的含有相同key值的記錄),但是它能幫助減少在map與reduce之間傳遞的數(shù)據(jù)量。因此晶密,在你的MapReduce作業(yè)中擒悬,總是值得我們考慮是否使用組合函數(shù)。
指定組合函數(shù)
回到之前JAVA MapReduce程序稻艰,組合函數(shù)使用Reduce類定義懂牧,在這個應(yīng)用中,它與Reduce功能一樣尊勿。我們唯一要做的就是在作業(yè)中設(shè)定組合類(示例2-6)归苍。
示例2-6
public class MaxTemperatureWithCombiner {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCombiner
<input path> " +"<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperatureWithCombiner.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
運行一個分布式的MapReduce作業(yè)
相同的程序?qū)⒃谌繑?shù)據(jù)庫執(zhí)行。MapReduce特性是無形中擴(kuò)大了能處理的數(shù)據(jù)量大小和硬件體積运怖,運行在10個節(jié)點的EC2集群上,這個程序跑了6分鐘夏伊。在第6節(jié)中我們將會看看在集群中運行程序具體的一些技術(shù)特性摇展。
Hadoop Streaming
Hadoop給MapReduce提供了API允許你用除了JAVA語言之外的其它語言寫map和reduce函數(shù)。Hadoop流使用Unix系統(tǒng)標(biāo)準(zhǔn)流作業(yè)Hadoop和你的程序之間的接口溺忧,所以你能使用任意其它的能夠讀取Unix系統(tǒng)標(biāo)準(zhǔn)流輸入數(shù)據(jù)并能夠?qū)?shù)據(jù)寫到標(biāo)準(zhǔn)輸出的語言來寫MapReduce程序咏连。
流天生地就適用于文本處理。Map的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)的輸入流輸入到你自定義的map函數(shù)中鲁森。在map函數(shù)中祟滴,你將會一行一行的處理數(shù)據(jù),然后將這些數(shù)據(jù)寫入到輸出流中歌溉。map會用Tab分隔key和value垄懂,并將它們做為鍵值對單獨一行輸出。這些數(shù)據(jù)將會以相同的格式做為reduce函數(shù)的輸入痛垛。在輸入之間草慧,框架將會把它們按照鍵值排序,然后reduce會處理這些行匙头,然后將結(jié)果輸出到標(biāo)準(zhǔn)的輸出流漫谷。
讓我們以流的方式重寫查找每一年最高溫度的MapReduce程序來說明。
Ruby
map函數(shù)以Ruby語言編寫蹂析,見示例2-7
示例2-7
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
在示例2-7代碼塊中舔示,Ruby從標(biāo)準(zhǔn)全局IO常量類型STDIN中讀取輸入數(shù)據(jù),然后遍歷每一行數(shù)據(jù)电抚,找到行中相關(guān)的字段惕稻,如果有效,則輸出到標(biāo)準(zhǔn)的輸出流蝙叛。
有必要看一下Streaming與Java MapReduce API之間的區(qū)別缩宜。 Java API會一條條記錄地調(diào)用map函數(shù),然后如果使用Streaming形式,map函數(shù)可以自己決定怎么樣處理輸入數(shù)據(jù)锻煌,可以多行一起處理也可以單行處理妓布。JAVA map實現(xiàn)的函數(shù)是被推數(shù)據(jù),但是它仍然可以考慮通過將多條記錄放到一個實例變量中來實現(xiàn)一次處理多行的操作宋梧。這種情況下匣沼,你需要實現(xiàn)cleanup()方法,以便知道最后一條記錄處理完的時候捂龄,能夠結(jié)束處理释涛。
由于示例2-7基于標(biāo)準(zhǔn)的輸入輸出操作,可以不通過Hadoop測試倦沧,直接通過Unix命令唇撬。
% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078
reduce函數(shù)稍微有點復(fù)雜,如示例2-8
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
同map函數(shù)一樣展融,reduce函數(shù)也會從標(biāo)準(zhǔn)輸入中遍歷行窖认,但不一樣的是,當(dāng)處理每一個key組的時候告希,需要存儲某個狀態(tài)扑浸。在這個示例中,關(guān)鍵字是年燕偶,我們存儲最后一次遍歷的key,并保存每一個key組中最大的溫度喝噪。MapReduce框架會確保輸入數(shù)據(jù)會按照關(guān)鍵值排序,所以我們知道如果當(dāng)前key值不同于上一次遍歷的key值時指么,我們就進(jìn)入了新的key組酝惧。當(dāng)使用JAVA API時,reduce函數(shù)輸入的數(shù)據(jù)就已經(jīng)按照key值分好了組伯诬,而不像Streaming一樣需要人為地去判斷key組邊界系奉。
對于每一行,我們?nèi)〉胟ey和value值姑廉,然后看看是否到達(dá)了一組的最后( last_key && last_key != key), 如果到達(dá)了缺亮,我們記錄下這組的Key和最高溫度,以Tab制表符分隔桥言,然后初始化最高溫度萌踱,如果沒有到達(dá)組的最后,則更新當(dāng)前Key值的最高溫度号阿。最后一行作用是確保最后一個Key組的最高溫度能夠被記錄并鸵。
我們現(xiàn)在能夠用Unix命令來模擬整個的MapReduce傳輸通道(等效于圖2-1中所示的Unix通道)。
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22
輸出結(jié)果與Java程序的一樣扔涧。下一步使用Hadoop來運行园担。
Hadoop命令不支持流選項届谈,不過,你可以在jar選項中指定Streaming JAR文件弯汰,然后指定輸入和輸出文件路徑艰山,以及map和redeuce腳本文件,看起來如下:
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
當(dāng)在一個集群中基于大數(shù)據(jù)執(zhí)行時咏闪,我們需要使用-combiner選項來指定組合函數(shù)曙搬。
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
注意我們使用了-files選項,當(dāng)我們在集群上運行流程序時鸽嫂,需要將map和reduce腳本文件復(fù)制到集群中纵装。
Python
流程序支持任意能夠從標(biāo)準(zhǔn)輸入讀取數(shù)據(jù)并將數(shù)據(jù)寫入標(biāo)準(zhǔn)輸出的語言。所以使用讀者更熟悉的Python据某,再寫一遍以上例子橡娄。map腳本如示例2-9,reduce腳本如示例2-10.
示例2-9:map script
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
示例:2-10 reduce script
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
我們能像Ruby中一樣以相同的方法來運行這個作業(yè)癣籽。
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949 111
1950 22
本文是筆者翻譯自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第2節(jié)挽唉,后續(xù)將繼續(xù)翻譯其它章節(jié)。雖盡力翻譯才避,但奈何水平有限,錯誤再所難免氨距,如果有問題桑逝,請不吝指出!希望本文對你有所幫助俏让。