簡單的 MapReduce 作業(yè)属铁,需要一個(gè) map 函數(shù)丁眼,一個(gè) reduce 函數(shù)和一些用來運(yùn)行作業(yè)的代碼
// Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
橫向擴(kuò)展(Scaling out)
需要把數(shù)據(jù)存儲(chǔ)在分布式文件系統(tǒng)中蚌铜,通過使用 Hadoop 資源管理系統(tǒng) YARN悦即,Hadoop 可以將 MapReduce 計(jì)算轉(zhuǎn)移到存儲(chǔ)有部分?jǐn)?shù)據(jù)的各機(jī)器上
相關(guān)概念
MapReduce 作業(yè)
MapReduce 作業(yè) == 輸入數(shù)據(jù) + MapReduce程序 + 配置信息
任務(wù)分類
Hadoop 將作業(yè)分成若干個(gè)任務(wù)(task)來執(zhí)行遥金,其中包括兩類任務(wù):map 任務(wù)和 reduce 任務(wù)鳄虱,這些任務(wù)運(yùn)行在集群幾點(diǎn)上,并通過 YARN 進(jìn)行調(diào)度凰浮。如果一個(gè)任務(wù)失敗我抠,它將在另一個(gè)不同的節(jié)點(diǎn)上自動(dòng)重新調(diào)度運(yùn)行
分片(input split)
Hadoop 將 MapReduce 的輸入數(shù)據(jù)劃分成等長的小數(shù)據(jù)塊,成為輸入分片(input split)或簡稱“分片”
Hadoop 為每個(gè)分片構(gòu)建一個(gè) map 任務(wù)袜茧,并由該任務(wù)來運(yùn)行用戶自定義的 map 函數(shù)從而處理分片中的每條記錄
分片切分的粒度
相對來說菜拓,分片被切分的越細(xì),作業(yè)的負(fù)載平衡質(zhì)量會(huì)更高笛厦。但是如果分片切分的太細(xì)纳鼎,那么管理分片的總時(shí)間和構(gòu)建 map 任務(wù)的總時(shí)間將決定作業(yè)的整個(gè)執(zhí)行時(shí)間
對于大多數(shù)作業(yè)來說,一個(gè)合理的分片大小趨向于 HDFS 的一個(gè)塊的大猩淹埂(128MB)
數(shù)據(jù)本地化優(yōu)化(data locality optimization)
Hadoop 在存儲(chǔ)輸入數(shù)據(jù)的節(jié)點(diǎn)上運(yùn)行 map 任務(wù)贱鄙,可以獲得最佳性能,而無需使用寶貴的集群帶寬資源
跨機(jī)架的 map 任務(wù)
有時(shí)對于一個(gè) map 任務(wù)的輸入分片來說姨谷,存儲(chǔ)該分片的 HDFS 數(shù)據(jù)塊副本的所有節(jié)點(diǎn)可能正在運(yùn)行其他的 map 任務(wù)逗宁,此時(shí)作業(yè)調(diào)度需要從某一個(gè)數(shù)據(jù)塊所在的機(jī)架中的一個(gè)節(jié)點(diǎn)上尋找一個(gè)空閑的 map 槽(slot)來運(yùn)行該 map 任務(wù),這將導(dǎo)致機(jī)架與機(jī)架之間的網(wǎng)絡(luò)傳輸
為何最佳分片的大小應(yīng)該與塊大小相同梦湘?
如果分片跨越兩個(gè)數(shù)據(jù)塊瞎颗,那么對于任何一個(gè) HDFS 節(jié)點(diǎn),基本上都不可能同時(shí)存儲(chǔ)這兩個(gè)數(shù)據(jù)塊践叠,因此分片中的部分?jǐn)?shù)據(jù)需要通過網(wǎng)絡(luò)傳輸?shù)?map 任務(wù)運(yùn)行的節(jié)點(diǎn)言缤。這與使用本地?cái)?shù)據(jù)運(yùn)行整個(gè) map 任務(wù)相比,顯然效率更低
reduce 任務(wù)并不具備數(shù)據(jù)本地化的優(yōu)勢禁灼,單個(gè) reduce 任務(wù)的輸入通常來自于所有的 mapper 的輸出管挟;多個(gè) reduce 任務(wù),每個(gè) map 任務(wù)針對輸出進(jìn)行分區(qū)
reduce 的輸出通常存儲(chǔ)在 HDFS 中以實(shí)現(xiàn)可靠存儲(chǔ)弄捕。第一個(gè)副本存儲(chǔ)在本地節(jié)點(diǎn)上僻孝,其他的副本處于可靠性考慮存儲(chǔ)在其他機(jī)架的節(jié)點(diǎn)上
reduce 任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定导帝,反而是獨(dú)立指定的
combiner 函數(shù)
combiner 函數(shù)能夠幫助減少 mapper 和 reducer 之間的數(shù)據(jù)傳輸量
// 通過如下方式調(diào)用來啟用 combiner 函數(shù)
job.setComiberClass(XXXReducer.class)
Hadoop Streaming
Hadoop Streaming 使用 Unix 標(biāo)準(zhǔn)流作為 Hadoop 和應(yīng)用程序之間的接口,所以可以使用任何編程語言通過標(biāo)準(zhǔn)輸入/輸出來寫 MapReduce 程序
Streaming 天生適合用于文本處理穿铆。map 的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)輸入流傳遞給 map 函數(shù)您单,并且是一行一行地傳輸,最后將結(jié)果行寫到標(biāo)準(zhǔn)輸出荞雏。map 輸出的鍵-值對以一個(gè)制表符分隔的行虐秦,reduce 函數(shù)的輸入格式與之相同并通過標(biāo)準(zhǔn)輸入流進(jìn)行傳輸。reduce 函數(shù)從標(biāo)準(zhǔn)輸入流中讀取輸入行凤优,該輸入已由 Hadoop 框架根據(jù)鍵排過序悦陋,最后將結(jié)果寫入標(biāo)準(zhǔn)輸出