MapReduce

MapReduce

1. Why MapReduce?

  • 一臺機(jī)器上處理數(shù)據(jù)耗時(shí)間太長

  • 采用MapReduce分而治之

    • To speed up the processing, we need to run parts of the program in parallel.

2. Map and Reduce

  1. MapReduce works by breaking the processing into two phases: the map phase and the reduce phase.
    (分為map階段和reduce階段)

  2. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer.
    (每個(gè)階段輸入輸出均為鍵值對)

  3. The programmer also specifies two functions: the map function and the reduce function.
    (可定義map和reduce功能函數(shù))

map reduce example

  • 統(tǒng)計(jì)每年的氣溫最高值


    image
  • 分析

    1. The input to our map phase is the raw NCDC data.
    2. Our map function is simple. We pull out the year and the air temperature
    3. The
      map function merely extracts the year and the air temperature
    4. The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.(排序)
    5. All the reduce function has to do now is iterate through the list and pick up the maximum reading
  • Code

    Example : Mapper for the maximum temperature example
    image

    image
    • Rather than using built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization.

    • These are found in the org.apache.hadoop.io package.
      (Hadoop自己定義的數(shù)據(jù)類型在org.apache.hadoop.io包中,類型與Java中一一對應(yīng))

    • Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer)

    Example : Reducer for the maximum temperature example
    image
    • The input types of the reduce function must match the
      output types of the map function: Text and IntWritable.
    Example: Application to find the maximum temperature in the weather dataset
    image
    • When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specifying
      the name of the JAR file, we can pass a class in the Job’s setJarByClass() method, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.

    • The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files
      from the reduce function are written. The directory shouldn’t exist before running the job because Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with that of another).

    • The setOutputKeyClass() and setOutputValueClass() methods control the output types for the reduce function, and must match what the Reduce class produces.

    • The waitForCompletion() method on Job submits the job and waits for it to finish. The single argument to the method is a flag indicating whether verbose output is generated. When true, the job writes information about its progress to the console.

Data Flow (重點(diǎn))

  1. A MapReduce job is a unit of work that the client wants to be
    performed: it consists of the input data, the MapReduce program, and configuration information.
    (MapReduce過程實(shí)際是人為定義的代碼)

  2. Hadoop runs the job by dividing it into tasks, of which there are two types:map tasks and reduce tasks.

    • The tasks are scheduled using YARN and run on nodes in the cluster. If a task fails, it will be automatically rescheduled to run on a different node.
    • (hadoop內(nèi)部執(zhí)行機(jī)制:YARN進(jìn)行資源調(diào)度,一個(gè)done掉,其他會自動(dòng)執(zhí)行)
  3. input splits : Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits,or just splits.

    • Hadoop creates one map task for each split, which runs the user-defined map function for each record in the split.
  4. Hadoop does its best to run the map task on a node where the input data resides in HDFS, because it doesn’t use valuable cluster bandwidth. This is called the data locality optimization. (這一塊不太能理解)

     (
         這段話的意思是:map的執(zhí)行節(jié)點(diǎn)和數(shù)據(jù)的存儲節(jié)點(diǎn)為同一節(jié)點(diǎn)時(shí)瘦麸,hadoop性能達(dá)到最佳。
     )
    
  5. It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.

    • If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
      (split的大小和hadoop中block大小一樣歧胁,這樣確保hadoop性能達(dá)到最佳滋饲,因?yàn)槿绻鹲plit的大小正好為兩個(gè)block時(shí)厉碟,這樣沒有一個(gè)節(jié)點(diǎn)可以同時(shí)存儲split這樣大小的數(shù)據(jù),這時(shí)存在一個(gè)拉取數(shù)據(jù)的時(shí)間消耗.)
  6. Map tasks write their output to the local disk, not to HDFS.

    • Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete, the map output can be thrown away屠缭。
      (map階段的輸出是寫出到本地箍鼓,而不是hdfs。因?yàn)樗皇亲鳛橹虚g輸出)
    • If the node running the map task fails before the map
      output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to re-create the map output.
      (如果某節(jié)點(diǎn)上map輸出在傳給reduce前崩潰呵曹,那么hadoop將在另一個(gè)節(jié)點(diǎn)上重新運(yùn)行map任務(wù)在此創(chuàng)建map輸出款咖。)
  7. the input to a single reduce task is normally the output from all mappers.

    • The output of the reduce is normally stored in HDFS for reliability.
    • for each HDFS block of the reduce output, the first replica is stored on the local node, with other
      replicas being stored on off-rack nodes for reliability.
      (reduce的輸出存放在HDFS中,第一備份在本地節(jié)點(diǎn)奄喂,其他的在HDFS中)
    image
  8. When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition.


    image
  9. it’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle because the processing can be carried out entirely in parallel.


    image

Combiner Function

  1. Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks.

  2. Hadoop allows the user to specify a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function.

  3. Not all functions possess this property.1 For example, if we were calculating mean temperatures, we couldn’t use the mean as our combiner function.

  4. The combiner function doesn’t replace the reduce function. (How could it? The reducefunction is still needed to process records with the same key from different maps.) But it can help cut down the amount of data shuffled between the mappers and the reducers.
    (Conbiner并不能代替reducer铐殃,只是幫助減少map和reduce之間的數(shù)據(jù)傳輸量)

    image

org.apache.hadoop.mapred.JobConf類

  1. JobConf typically specifies the Mapper, combiner (if any), Partitioner, Reducer, InputFormat and OutputFormat implementations to be used etc.

  2. Here is an example on how to configure a job via JobConf:

     // Create a new JobConf
     JobConf job = new JobConf(new Configuration(), MyJob.class);
    
      // Specify various job-specific parameters     
      job.setJobName("myjob");
    
      FileInputFormat.setInputPaths(job, new Path("in"));
      FileOutputFormat.setOutputPath(job, new Path("out"));
      
      job.setMapperClass(MyJob.MyMapper.class);
      job.setCombinerClass(MyJob.MyReducer.class);
      job.setReducerClass(MyJob.MyReducer.class);
      
      job.setInputFormat(SequenceFileInputFormat.class);
      job.setOutputFormat(SequenceFileOutputFormat.class);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市砍聊,隨后出現(xiàn)的幾起案子背稼,更是在濱河造成了極大的恐慌,老刑警劉巖玻蝌,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蟹肘,死亡現(xiàn)場離奇詭異,居然都是意外死亡俯树,警方通過查閱死者的電腦和手機(jī)帘腹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來许饿,“玉大人阳欲,你說我怎么就攤上這事÷剩” “怎么了球化?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瓦糟。 經(jīng)常有香客問我筒愚,道長,這世上最難降的妖魔是什么菩浙? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任巢掺,我火速辦了婚禮,結(jié)果婚禮上劲蜻,老公的妹妹穿的比我還像新娘陆淀。我一直安慰自己,他們只是感情好先嬉,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布轧苫。 她就那樣靜靜地躺著,像睡著了一般疫蔓。 火紅的嫁衣襯著肌膚如雪浸剩。 梳的紋絲不亂的頭發(fā)上钾军,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機(jī)與錄音绢要,去河邊找鬼吏恭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛重罪,可吹牛的內(nèi)容都是我干的樱哼。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼剿配,長吁一口氣:“原來是場噩夢啊……” “哼搅幅!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起呼胚,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤茄唐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蝇更,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沪编,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年年扩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蚁廓。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,872評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡厨幻,死狀恐怖相嵌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情况脆,我是刑警寧澤饭宾,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站格了,受9級特大地震影響看铆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜笆搓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一性湿、第九天 我趴在偏房一處隱蔽的房頂上張望纬傲。 院中可真熱鬧满败,春花似錦、人聲如沸叹括。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽汁雷。三九已至净嘀,卻和暖如春报咳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背挖藏。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工暑刃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人膜眠。 一個(gè)月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓岩臣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親宵膨。 傳聞我的和親對象是個(gè)殘疾皇子架谎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容