Hadoop大數(shù)據(jù)技術(shù)體系
框架
參考:http://www.reibang.com/p/17bee8316848
MapReduce
從wordcount開始
- Map: for each (k,v) ---> produce new set of (k,v) pairs
-
Reduce: produce one (k,v) for each distinct key
比如wordcount中:
map過程
reduce過程
代碼:
package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
/**
* MapReduceBase類:實(shí)現(xiàn)了Mapper和Reducer接口的基類(其中的方法只是實(shí)現(xiàn)接口忙迁,而未作任何事情)
* Mapper接口:
* WritableComparable接口:實(shí)現(xiàn)WritableComparable的類可以相互比較。所有被用作key的類應(yīng)該實(shí)現(xiàn)此接口秽荤。
* Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用控漠。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類无拗,這些類實(shí)現(xiàn)了WritableComparable接口,
* 都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換遣总,你可以將它們分別視為long,int,String 的替代品昌抠。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一個(gè)單個(gè)的輸入k/v對(duì)到一個(gè)中間的k/v對(duì)
* 輸出對(duì)不需要和輸入對(duì)是相同的類型患朱,輸入對(duì)可以映射到0個(gè)或多個(gè)輸出對(duì)。
* OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對(duì)扰魂。
* OutputCollector接口的collect(k, v)方法:增加一個(gè)(k,v)對(duì)到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
/**
* JobConf:map/reduce的job配置類麦乞,向hadoop框架描述map-reduce執(zhí)行的工作
* 構(gòu)造方法:JobConf()蕴茴、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); //設(shè)置一個(gè)用戶定義的job名稱
conf.setOutputKeyClass(Text.class); //為job的輸出數(shù)據(jù)設(shè)置Key類
conf.setOutputValueClass(IntWritable.class); //為job輸出設(shè)置value類
conf.setMapperClass(Map.class); //為job設(shè)置Mapper類
conf.setCombinerClass(Reduce.class); //為job設(shè)置Combiner類
conf.setReducerClass(Reduce.class); //為job設(shè)置Reduce類
conf.setInputFormat(TextInputFormat.class); //為map-reduce任務(wù)設(shè)置InputFormat實(shí)現(xiàn)類
conf.setOutputFormat(TextOutputFormat.class); //為map-reduce任務(wù)設(shè)置OutputFormat實(shí)現(xiàn)類
/**
* InputFormat描述map-reduce中對(duì)job的輸入定義
* setInputPaths():為map-reduce job設(shè)置路徑數(shù)組作為輸入列表
* setInputPath():為map-reduce job設(shè)置路徑數(shù)組作為輸出列表
*/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); //運(yùn)行一個(gè)job
}
}
MapReduce框架組成
MapReduce主要包括JobClient姐直、JobTracker倦淀、TaskTracker、HDFS四個(gè)部分声畏。
mapreduce框架
- JobClient:配置參數(shù)Configuration撞叽,并打包成jar文件存儲(chǔ)在HDFS上,將文件路徑提交給JobTracker的master服務(wù)插龄,然后由master創(chuàng)建每個(gè)task將它們分發(fā)到各個(gè)TaskTracker服務(wù)中去執(zhí)行愿棋。
- JobTracker:這是一個(gè)master服務(wù),程序啟動(dòng)后均牢,JobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度糠雨。JobTracker監(jiān)控所有的TaskTracker和job的健康狀況,一旦發(fā)生失敗徘跪,即將之轉(zhuǎn)移到其他節(jié)點(diǎn)上甘邀,同時(shí)JobTracker會(huì)跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息垮庐,并將這些信息告訴任務(wù)調(diào)度器松邪,而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí),選擇合適的任務(wù)使用這些資源哨查。在Hadoop 中逗抑,任務(wù)調(diào)度器是一個(gè)可插拔的模塊,用戶可以根據(jù)自己的需要設(shè)計(jì)相應(yīng)的調(diào)度器寒亥。
- TaskTracker:運(yùn)行在多個(gè)節(jié)點(diǎn)上的slaver服務(wù)邮府。TaskTracker主動(dòng)與JobTracker通信接受作業(yè),并負(fù)責(zé)直接執(zhí)行每個(gè)任務(wù)护盈。TaskTracker 會(huì)周期性地通過Heartbeat 將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報(bào)給JobTracker挟纱,同時(shí)接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動(dòng)新任務(wù)、殺死任務(wù)等)腐宋。TaskTracker 使用“slot”等量劃分本節(jié)點(diǎn)上的資源量√垂欤“slot”代表計(jì)算資源(CPU胸竞、內(nèi)存等)。一個(gè)Task 獲取到一個(gè)slot 后才有機(jī)會(huì)運(yùn)行参萄,而Hadoop 調(diào)度器的作用就是將各個(gè)TaskTracker 上的空閑slot 分配給Task 使用卫枝。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用讹挎。TaskTracker 通過slot 數(shù)目(可配置參數(shù))限定Task 的并發(fā)度校赤。
Task分為Map Task和Reduce Task兩種吆玖,均由TaskTracker啟動(dòng)。
邏輯角度分析作業(yè)運(yùn)行順序:輸入分片(input split)马篮、map階段沾乘、combiner階段、shuffle階段浑测、reduce階段翅阵。
邏輯流程
- input split:在map計(jì)算之前,程序會(huì)根據(jù)輸入文件計(jì)算split迁央,每個(gè)input split針對(duì)一個(gè)map任務(wù)掷匠。input split存儲(chǔ)的并非是數(shù)據(jù)本身,而是一個(gè)分片長度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組岖圈。
- map階段:即執(zhí)行map函數(shù)讹语。
- combiner階段:這是一個(gè)可選擇的函數(shù),實(shí)質(zhì)上是一種reduce操作蜂科。combiner是map的后續(xù)操作顽决,主要是在map計(jì)算出中間文件前做一個(gè)簡單的合并重復(fù)key值的操作。
- shuffle階段:指從map輸出開始崇摄,包括系統(tǒng)執(zhí)行排序即傳送map輸出到reduce作為輸入的過程擎值。另外針對(duì)map輸出的key進(jìn)行排序又叫sort階段。map端shuffle逐抑,簡單來說就是利用combiner對(duì)數(shù)據(jù)進(jìn)行預(yù)排序鸠儿,利用內(nèi)存緩沖區(qū)來完成。reduce端的shuffle包括復(fù)制數(shù)據(jù)和歸并數(shù)據(jù)厕氨,最終產(chǎn)生一個(gè)reduce輸入文件进每。shuffle過程有許多可調(diào)優(yōu)的參數(shù)來提高M(jìn)apReduce的性能,其總原則就是給shuffle過程盡量多的內(nèi)存空間命斧。
- reduce階段:即執(zhí)行reduce函數(shù)并存到hdfs文件系統(tǒng)中田晚。