MapReduce 原理與 Python 實踐

備份自:http://blog.rainy.im/2016/03/13/python-on-hadoop-mapreduce/

繼上一篇Hadoop 入門實踐之后,接下來應該是 MapReduce 的原理與實踐操作竭宰。

MapReduce 原理

Hadoop 的 MapReduce 是基于 Google - MapReduce: Simplified Data Processing on Large Clusters 的一種實現(xiàn)哨啃。對 MapReduce 的基本介紹如下:

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

MapReduce 是一種編程模型逝慧,用于處理大規(guī)模的數(shù)據(jù)熟掂。用戶主要通過指定一個 map 函數(shù)和一個 reduce 函數(shù)來完成數(shù)據(jù)的處理缰泡∧扛龋看到 map/reduce 很容易就聯(lián)想到函數(shù)式編程近迁,而實際上論文中也提到確實受到 Lisp 和其它函數(shù)式編程語言的啟發(fā)艺普。以 Python 為例,map/reduce 的用法如下:

from functools import reduce
from operator import add
ls = map(lambda x: len(x), ["ana", "bob", "catty", "dogge"])
# print(list(ls))
# => [3, 3, 5, 5]
reduce(add, ls)
# => 16

MapReduce 的優(yōu)勢在于對大規(guī)模數(shù)據(jù)進行切分(split)鉴竭,并在分布式集群上分別運行 map/reduce 并行加工歧譬,而用戶只需要針對數(shù)據(jù)處理邏輯編寫簡單的 map/reduce 函數(shù),MapReduce 則負責保證分布式運行和容錯機制搏存。Hadoop 的 MapReduce 雖然由 Java 實現(xiàn)瑰步,但同時提供 Streaming API 可以通過標準化輸入/輸出允許我們使用任何編程語言來實現(xiàn) map/reduce

MapReduce 在處理數(shù)據(jù)時璧眠,首先生成一個 job 將輸入文件切分成獨立的塊(chunk)缩焦,切塊的大小是根據(jù)配置設定的。然后每個獨立的文件塊交給 map task 并行加工责静,得到一組 <k1, v1> 列表袁滥,MapReduce 再將 map 輸出的結果按 k1 進行重新組合,再將結果傳遞給 reduce task灾螃,最后 reduce 計算得出結果题翻。

以官方提供的 WordCount 為例,輸入為兩個文件:

hadoop fs -cat file0
# Hello World Bye World

hadoop fs -cat file1
# Hello Hadoop Goodbye Hadoop

利用 MapReduce 來計算所有文件中單詞出現(xiàn)數(shù)量的統(tǒng)計腰鬼。MapReduce 的運行過程如下圖所示:

MapReduce

Python map/reduce

Hadoop 的 Streaming API 通過 STDIN/STDOUT 傳遞數(shù)據(jù)嵌赠,因此 Python 版本的 map 可以寫作:

#!/usr/bin/env python3
import sys

def read_inputs(file):
  for line in file:
    line = line.strip()
    yield line.split()
def main():
  file = sys.stdin
  lines = read_inputs(file)
  for words in lines:
    for word in words:
      print("{}\t{}".format(word, 1))
if __name__ == "__main__":
  main()

運行一下:

chmod +x map.py
echo "Hello World Bye World" | ./map.py

# Hello   1
# World   1
# Bye     1
# World   1

reduce 函數(shù)以此讀取經(jīng)過排序之后的 map 函數(shù)的輸出,并統(tǒng)計單詞的次數(shù):

#!/usr/bin/env python3
import sys

def read_map_outputs(file):
  for line in file:
    yield line.strip().split("\t", 1)
def main():
  current_word = None
  word_count   = 0
  lines = read_map_outputs(sys.stdin)
  for word, count in lines:
    try:
      count = int(count)
    except ValueError:
      continue
    if current_word == word:
      word_count += count
    else:
      if current_word:
        print("{}\t{}".format(current_word, word_count))
      current_word = word
      word_count = count
  if current_word:
    print("{}\t{}".format(current_word, word_count))
if __name__ == "__main__":
  main()

reduce 的輸入是排序后的 map 輸出:

chmod +x reduce.py
echo "Hello World Bye World" | ./map.py | sort | ./reduce.py

# Bye     1
# Hello   1
# World   2

這其實與 MapReduce 的執(zhí)行流程是一致的熄赡,下面我們通過 MapReduce 來執(zhí)行(已啟動 Hadoop)姜挺,需要用到 hadoop-streaming-2.6.4.jar,不同的 Hadoop 版本位置可能不同:

cd $HADOOP_INSTALL && find ./ -name "hadoop-streaming*.jar"
# ./share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar
 
mkdir wordcount -p wordcount/input
cd wordcount
echo "Hello World Bye World" >> input/file0
echo "Hello Hadoop Goodbye Hadoop" >> input/file1

hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar \
-input $(pwd)/input \
-output output \
-mapper $(pwd)/map.py \
-reducer $(pwd)/reduce.py

執(zhí)行完成之后會在 output 目錄產(chǎn)生結果:

hadoop fs -ls output
# Found 2 items
# -rw-r--r--   1 rainy rainy          0 2016-03-13 02:15 output/_SUCCESS
# -rw-r--r--   1 rainy rainy         41 2016-03-13 02:15 output/part-00000
hadoop fs -cat output/part-00000
# Bye     1
# Goodbye 1
# Hadoop  2
# Hello   2
# World   2

總結

Hadoop 的架構讓 MapReduce 的實際執(zhí)行過程簡化了許多彼硫,但這里省略了很多細節(jié)的內(nèi)容炊豪,尤其是針對完全分布式模式,并且要在輸入文件足夠大的情況下才能體現(xiàn)出優(yōu)勢乌助。這里處理純文本文檔作為示例溜在,但我想要做的是通過連接 MongoDB 直接讀取數(shù)據(jù)到 HDFS 然后進行 MapReduce 處理,但考慮到數(shù)據(jù)量仍然不是很大(700,000條記錄)的情況他托,不知道是否會比直接 Python + MongoDB 更快。

下一步目標:

  1. MongoDB and Hadoop仆葡。
歡迎關注公眾號 PyHub赏参!

參考

  1. MapReduce
  2. MapReduce Tutorial
  3. Writing an Hadoop MapReduce Program in Python
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末志笼,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子把篓,更是在濱河造成了極大的恐慌纫溃,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件韧掩,死亡現(xiàn)場離奇詭異紊浩,居然都是意外死亡,警方通過查閱死者的電腦和手機疗锐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門坊谁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人滑臊,你說我怎么就攤上這事口芍。” “怎么了雇卷?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵鬓椭,是天一觀的道長。 經(jīng)常有香客問我关划,道長小染,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任贮折,我火速辦了婚禮裤翩,結果婚禮上,老公的妹妹穿的比我還像新娘脱货。我一直安慰自己岛都,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布振峻。 她就那樣靜靜地躺著臼疫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪扣孟。 梳的紋絲不亂的頭發(fā)上烫堤,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音凤价,去河邊找鬼鸽斟。 笑死,一個胖子當著我的面吹牛利诺,可吹牛的內(nèi)容都是我干的富蓄。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼慢逾,長吁一口氣:“原來是場噩夢啊……” “哼立倍!你這毒婦竟也來了灭红?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤口注,失蹤者是張志新(化名)和其女友劉穎变擒,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寝志,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡娇斑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了材部。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毫缆。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖败富,靈堂內(nèi)的尸體忽然破棺而出悔醋,到底是詐尸還是另有隱情,我是刑警寧澤兽叮,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布芬骄,位于F島的核電站,受9級特大地震影響鹦聪,放射性物質發(fā)生泄漏账阻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一泽本、第九天 我趴在偏房一處隱蔽的房頂上張望淘太。 院中可真熱鬧,春花似錦规丽、人聲如沸蒲牧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冰抢。三九已至,卻和暖如春艘狭,著一層夾襖步出監(jiān)牢的瞬間挎扰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工巢音, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留遵倦,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓官撼,卻偏偏與公主長得像梧躺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子傲绣,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容