MapReduce
1. Why MapReduce?
一臺機(jī)器上處理數(shù)據(jù)耗時(shí)間太長
-
采用MapReduce分而治之
- To speed up the processing, we need to run parts of the program in parallel.
2. Map and Reduce
MapReduce works by breaking the processing into two phases: the map phase and the reduce phase.
(分為map階段和reduce階段)Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer.
(每個(gè)階段輸入輸出均為鍵值對)The programmer also specifies two functions: the map function and the reduce function.
(可定義map和reduce功能函數(shù))
map reduce example
-
統(tǒng)計(jì)每年的氣溫最高值
image -
分析
- The input to our map phase is the raw NCDC data.
- Our map function is simple. We pull out the year and the air temperature
- The
map function merely extracts the year and the air temperature - The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.(排序)
- All the reduce function has to do now is iterate through the list and pick up the maximum reading
-
Code
Example : Mapper for the maximum temperature example
image
imageRather than using built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization.
These are found in the org.apache.hadoop.io package.
(Hadoop自己定義的數(shù)據(jù)類型在org.apache.hadoop.io包中,類型與Java中一一對應(yīng))Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer)
Example : Reducer for the maximum temperature example
image- The input types of the reduce function must match the
output types of the map function: Text and IntWritable.
Example: Application to find the maximum temperature in the weather dataset
imageWhen we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specifying
the name of the JAR file, we can pass a class in the Job’s setJarByClass() method, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files
from the reduce function are written. The directory shouldn’t exist before running the job because Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with that of another).The setOutputKeyClass() and setOutputValueClass() methods control the output types for the reduce function, and must match what the Reduce class produces.
The waitForCompletion() method on Job submits the job and waits for it to finish. The single argument to the method is a flag indicating whether verbose output is generated. When true, the job writes information about its progress to the console.
Data Flow (重點(diǎn))
A MapReduce job is a unit of work that the client wants to be
performed: it consists of the input data, the MapReduce program, and configuration information.
(MapReduce過程實(shí)際是人為定義的代碼)-
Hadoop runs the job by dividing it into tasks, of which there are two types:map tasks and reduce tasks.
- The tasks are scheduled using YARN and run on nodes in the cluster. If a task fails, it will be automatically rescheduled to run on a different node.
- (hadoop內(nèi)部執(zhí)行機(jī)制:YARN進(jìn)行資源調(diào)度,一個(gè)done掉,其他會自動(dòng)執(zhí)行)
-
input splits : Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits,or just splits.
- Hadoop creates one map task for each split, which runs the user-defined map function for each record in the split.
-
Hadoop does its best to run the map task on a node where the input data resides in HDFS, because it doesn’t use valuable cluster bandwidth. This is called the data locality optimization. (這一塊不太能理解)
( 這段話的意思是:map的執(zhí)行節(jié)點(diǎn)和數(shù)據(jù)的存儲節(jié)點(diǎn)為同一節(jié)點(diǎn)時(shí)瘦麸,hadoop性能達(dá)到最佳。 )
-
It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.
- If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
(split的大小和hadoop中block大小一樣歧胁,這樣確保hadoop性能達(dá)到最佳滋饲,因?yàn)槿绻鹲plit的大小正好為兩個(gè)block時(shí)厉碟,這樣沒有一個(gè)節(jié)點(diǎn)可以同時(shí)存儲split這樣大小的數(shù)據(jù),這時(shí)存在一個(gè)拉取數(shù)據(jù)的時(shí)間消耗.)
- If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
-
Map tasks write their output to the local disk, not to HDFS.
- Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete, the map output can be thrown away屠缭。
(map階段的輸出是寫出到本地箍鼓,而不是hdfs。因?yàn)樗皇亲鳛橹虚g輸出) - If the node running the map task fails before the map
output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to re-create the map output.
(如果某節(jié)點(diǎn)上map輸出在傳給reduce前崩潰呵曹,那么hadoop將在另一個(gè)節(jié)點(diǎn)上重新運(yùn)行map任務(wù)在此創(chuàng)建map輸出款咖。)
- Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete, the map output can be thrown away屠缭。
-
the input to a single reduce task is normally the output from all mappers.
- The output of the reduce is normally stored in HDFS for reliability.
- for each HDFS block of the reduce output, the first replica is stored on the local node, with other
replicas being stored on off-rack nodes for reliability.
(reduce的輸出存放在HDFS中,第一備份在本地節(jié)點(diǎn)奄喂,其他的在HDFS中)
image -
When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition.
image -
it’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle because the processing can be carried out entirely in parallel.
image
Combiner Function
Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks.
Hadoop allows the user to specify a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function.
Not all functions possess this property.1 For example, if we were calculating mean temperatures, we couldn’t use the mean as our combiner function.
-
The combiner function doesn’t replace the reduce function. (How could it? The reducefunction is still needed to process records with the same key from different maps.) But it can help cut down the amount of data shuffled between the mappers and the reducers.
(Conbiner并不能代替reducer铐殃,只是幫助減少map和reduce之間的數(shù)據(jù)傳輸量)image
org.apache.hadoop.mapred.JobConf類
JobConf typically specifies the Mapper, combiner (if any), Partitioner, Reducer, InputFormat and OutputFormat implementations to be used etc.
-
Here is an example on how to configure a job via JobConf:
// Create a new JobConf JobConf job = new JobConf(new Configuration(), MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); FileInputFormat.setInputPaths(job, new Path("in")); FileOutputFormat.setOutputPath(job, new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setCombinerClass(MyJob.MyReducer.class); job.setReducerClass(MyJob.MyReducer.class); job.setInputFormat(SequenceFileInputFormat.class); job.setOutputFormat(SequenceFileOutputFormat.class);