MapReduce 概述
分布式計算框架
A MapReduce job consists of a number of
– Map tasks(Map task performs data transformation)
– Reduce tasks(Reduce task combines results of map tasks)
– (Internally) shuffle tasks (Shuffle task sends output of map tasks to right reduce tasks)
比較簡單的流程(后續(xù)有詳解):
input data -> blocks in hdfs -> map function in parallel -> reduce -> output
MapReduce 1.x 流程(MapReduce最原始的處理流程,2.x之后使用YARN)
JobTracker: ===>集群資源管理與作業(yè)調(diào)度 + 與client進行通信
- Only one job tracker in a Hadoop cluster
- Takes requests from clients (MapReduce programs)
- Ask name node for location of data
- Assign tasks to task trackers near the data
- Reassign tasks if failed
TaskTracker ===>匯報心跳,一個是執(zhí)行命令
- Accept (map, reduce, shuffle) tasks from job trackers
- Send heartbeats to job trackers: I am alive
- Monitor status of tasks and notify job tracker
從 Map function和Recuce function 說起
python 中map和reduce的使用
https://www.runoob.com/python/python-func-map.html
map函數(shù): map(function, iterable, ...)
>>> map(lambda x: x ** 2, [1, 2, 3, 4, 5])
[1, 4, 9, 16, 25]
# 提供了兩個列表故黑,對相同位置的列表數(shù)據(jù)進行相加
>>> map(lambda x, y: x + y, [1, 3, 5, 7, 9], [2, 4, 6, 8, 10])
[3, 7, 11, 15, 19]
reduce 函數(shù) reduce(function, iterable[, initializer])
from functools import reduce
>>> reduce(lambda x, y: x+y, [1,2,3,4,5])
15
MapReduce 中map和reduce的使用
- map和reduce的輸入輸出均為<k,v>鍵值對
- map的輸出作為reduce的輸入(map輸出其實還有一步group by的過程)
- System groups the intermediate key‐value pairs from map tasks by key
E.g., <hello, 1> <hello, 1> <hello, 1> <this, 1> => <hello, [1, 1, 1]>, <this, [1]>
In hadoop
- A node may run multiple map/reduce tasks
- Typically, one map task per input split (chunk of data)
- One reduce task per partition of map output E.g., partition by key range or hashing
wordcount 例子:
word.txt
hello
hello world
Map function:
- Input: <offset of line, line> // line = a line of text in a document =>{0:"hello",6:"hello world"}
- Output: for each word in line, output <word, 1> => ["hello":1,"hello":1,"world":1]
Reduce function:
– Input: <word, list of 1’s> => {"hello":[1,1],"world":[1]} 所以其實reduce的輸入在某種程度上來說不是map的輸出
– Output: <word, count> where count is the number of 1's in the input list =>
{"hello":2,"world":1}
Map 和 Reduce 中的Java實現(xiàn)
對于MapReduce框架肯污,其更像一套八股文,我們只需要編寫Map和Reduce函數(shù)即可彩届,其他的細節(jié)框架已經(jīng)幫我們實現(xiàn)了伪冰,在Java中通過Mapper和Reducer類封裝,所以我們可以編寫子類繼承父類然后overide相應(yīng)的方法即可
Each map task runs an instance of Mapper
– Mapper has a map function
– Map task invokes the map function of the Mapper once for each input key‐value pair
Each reduce task runs an instance of Reducer
– Reducer has a reduce function
– Reduce task invokes the reduce function of the Reducer once for every different intermediate key
– For the reduce function, values are NOT in any particular order
Map 之后樟蠕, Reduce之前的細節(jié) --- Shuffling
Shuffle
- Process of distributing intermediate key‐values to the right reduce tasks
- It is the only communication among map and reduce tasks
- Individual map tasks do not exchange data directly with other map tasks
- They are not even aware of existence of their peer
Map可能在不同的機器上并行處理的贮聂,需要通過 shuffling 將相同 key 值的數(shù)據(jù)分發(fā)到同一個節(jié)點上去合并,這樣才能統(tǒng)計出最終的結(jié)果寨辩,并且這一步也將一個由Map生成的<k,v> 變成<k,[v1,v2]>傳給后續(xù)的Reduce來操作
圖片來源:
https://xinze.fun/2020/01/10/Spark-Shuffle-%E5%92%8C-Spill-%E7%9A%84%E5%8C%BA%E5%88%AB/
Internal of shuffling
1.Map side:Partition, sort, spill & merge
- Partition data in the buffer into R parts (R = # of reduce tasks) # partition = # reduce task
- Sort data in each partition by key
- Spill/write data in the buffer to disk
- Merge the spills
- Notify job tracker: output complete
2.Reduce side: Fetch & merge
- Task tracker notified by job tracker: data ready
- Fetch/copy data from map side
- Merge data( Some data may sit on disk once fetched , this depends on the buffer size)
- Figure out groups from sorted data
回到開頭 數(shù)據(jù)的輸入輸出格式
輸入
InputFormat
- Determine how input files are split and read
- Defined in the Java interface InputFormat
- Job:
- Split input file into chunks called InputSplits(logic concepts)
- Implement RecordReader to read data from splits
在Java中InputFormat是個抽象類吓懈,其有諸多的子類去讀取特定的輸入
? FileInputFormat (input from files in given dirs) --- 用的最多
? DBInputFormat (input data from a database)
? CombineFileInputFormat (input data by combining multiple files)
FileInputFormat
– Takes paths to files
– Read all files in the paths
– Divide each file into one or more InputSplits
FileInputFormat也是一個抽象類,其下有對應(yīng)的子類來實現(xiàn)對應(yīng)的內(nèi)容輸入
– TextInputFormat
– KeyValueTextInputFormat
– SequenceFileInputFormat
InputFormat 干的第一件事情:Split input file into InputSplits
聊一聊InputSplit靡狞,注意一點耻警,InputSplit是一個邏輯劃分的概念
? If a file is big, multiple splits may be created Typical split size = 128MB
? A map task is created for each split (a chunk of some input file)
InputFormat 干的第二件事情:Implement RecordReader to read data from InputSplits
- InputFormat defines an instanceof RR E.g., TextInputFormat provides LineRecordReader
- LineRecordReader
- Form a key‐value pair for every line of file
- Data type for key: LongWritable; value: Text
- Reader is repeatedly called until all data in the split are processed
輸出
OutputFormat
- Define the format of output from Reducers(Output stored in a file)
- Defined in the Java interface OutputFormat
- Implemention: FileOutputFormat
-
Subclasses: TextOutputFormat, SequenceFileOutputFormat
-
- All Reducers write to the same directory
- Set by FileOutputFormat.setOutputPath() method
- OutputFormat defines aRecordWrite which handles the write
可選操作:Combiner
個人理解:combiner 約等于 一個本地化的reduer
- Run on the node running the Mapper
- Perform local (or mini‐) reduction
- Combine Mapper results
- Before they are sent to the Reducers
- Reduce communication costs
- E.g., may use a combiner in WordCount – (cat, 1), (cat, 1), (cat, 1) => (cat, 3)
注意,使用combiner是為了節(jié)省空間成本、加快計算榕栏,但是不能對最終的結(jié)果有影響畔勤,例如說求個平均數(shù)什么的就不能用combiner
專業(yè)的說法就是:
May directly use the combiner
- If it is commutative and associative
- Meaning operations can be grouped & performed in any order