什么是MapReduce
摘自wiki中關(guān)于MapReduce的說明
MapReduce是Google提出的一個軟件架構(gòu)栈源,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算孵滞。概念“Map(映射)”和“Reduce(歸納)”,及他們的主要思想履肃,都是從函數(shù)式編程語言借來的查描,還有從矢量編程語言借來的特性。[1]
當(dāng)前的軟件實現(xiàn)是指定一個Map(映射)函數(shù)昆著,用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸納)函數(shù)术陶,用來保證所有映射的鍵值對中的每一個共享相同的鍵組宣吱。
簡單來說,一個映射函數(shù)就是對一些獨立元素組成的概念上的列表(例如瞳别,一個測試成績的列表)的每一個元素進(jìn)行指定的操作(比如,有人發(fā)現(xiàn)所有學(xué)生的成績都被高估了一分杭攻,他可以定義一個“減一”的映射函數(shù)祟敛,用來修正這個錯誤。)兆解。事實上馆铁,每個元素都是被獨立操作的,而原始列表沒有被更改锅睛,因為這里創(chuàng)建了一個新的列表來保存新的答案埠巨。這就是說,Map操作是可以高度并行的现拒,這對高性能要求的應(yīng)用以及并行計算領(lǐng)域的需求非常有用辣垒。
而歸納操作指的是對一個列表的元素進(jìn)行適當(dāng)?shù)暮喜ⅲɡ^續(xù)看前面的例子,如果有人想知道班級的平均分該怎么做印蔬?他可以定義一個歸納函數(shù)勋桶,通過讓列表中的奇數(shù)(odd)或偶數(shù)(even)元素跟自己的相鄰的元素相加的方式把列表減半,如此遞歸運算直到列表只剩下一個元素侥猬,然后用這個元素除以人數(shù)例驹,就得到了平均分)。雖然他不如映射函數(shù)那么并行退唠,但是因為歸納總是有一個簡單的答案鹃锈,大規(guī)模的運算相對獨立,所以歸納函數(shù)在高度并行環(huán)境下也很有用瞧预。
python中的map和reduce
python中內(nèi)置支持map和reduce操作
map和reduce的原型
map函數(shù)原型為
map(*function*, *iterable*, *...*) -> list
意思是map函數(shù)對第二個參數(shù)(或者后面更多的參數(shù))進(jìn)行迭代屎债,將迭代的元素作為參數(shù)傳遞給function仅政,function將處理過的結(jié)果保存在一個list里面并返回這個list
reduce(*function*, *iterable*[, *initializer*]) -> value
實現(xiàn)差不多等同于下面的代碼
def reduce(function, iterable, initializer=None):
it = iter(iterable)
if initializer is None:
try:
initializer = next(it)
except StopIteration:
raise TypeError('reduce() of empty sequence with no initial value')
accum_value = initializer
for x in it:
accum_value = function(accum_value, x)
return accum_value
舉例,假設(shè)現(xiàn)在有幾個list扔茅,想要統(tǒng)計它們總的元素個數(shù)已旧,利用map-reduce的思想可以這樣實現(xiàn)
a = [1, 2, 3]
b = [4, 5, 6, 7]
c = [8, 9, 1, 2, 3]
L = map(lambda x: len(x), [a, b, c])
N = reduce(lambda x, y: x + y, L)
可以看到,上面的代碼
- 沒有寫出一個循環(huán)
- 沒有臨時變量的狀態(tài)被改變
卻簡潔有力地描述了問題的解決辦法召娜,因此可讀性是很高的运褪。這也是函數(shù)式編程的特性。
但是上面的寫法和下面的方法解決問題的效率幾乎是一樣的玖瘸。
result = sum([len(item) for item in [a, b, c]])
在面對非常大的數(shù)據(jù)量的時候秸讹,這樣的處理方式效率并不理想。
并行的解法
提到并行雅倒,首先想到的是多線程璃诀。但是,python中有GIL蔑匣,并不能很好地利用多處理器的進(jìn)行并發(fā)的計算劣欢。
所以想到python中的multiprocessing模塊,這個模塊提供了Pool這個類來管理任務(wù)的進(jìn)程池裁良,并且這個類提供了并行的map方法凿将。這個map方法和之前提到的概念是很類似的,但是并不是說它處理的是MapReduce中的map步驟价脾。
以經(jīng)典的wordcount問題為例牧抵,直接上代碼。
def my_map(l):
results = []
for w in l:
# True if w contains non-alphanumeric characters
if not w.isalnum():
w = sanitize(w)
# True if w is a title-cased token
results.append((w.lower(), 1))
return results
def my_partition(l):
tf = {}
for sublist in l:
for p in sublist:
# Append the tuple to the list in the map
tf[p[0]] = tf.get(p[0], []) + [p]
return tf
def my_reduce(mapping):
return (mapping[0], sum(pair[1] for pair in mapping[1]))
整個計算流程被拆成了Map, Partition, Reduce三個步驟
- my_map方法
傳入一個token的list侨把,去掉token首尾的標(biāo)點符號犀变,并且返回(token.lower(), 1)的一個list - my_partition方法
傳入上面my_map處理的結(jié)果,返回一個dict秋柄,key為token获枝,value為所有(token, 1)的一個list - my_reduce方法
統(tǒng)計各個單詞出現(xiàn)的次數(shù)
def sanitize(w):
# 去除字符串首尾的標(biāo)點符號
while len(w) > 0 and not w[0].isalnum():
w = w[1:] # String punctuation from the back
while len(w) > 0 and not w[-1].isalnum():
w = w[:-1]
return w
def load(path):
word_list = []
f = open(path, "r")
for line in f:
word_list.append(line)
return (''.join(word_list)).split()
def chunks(l, n):
for i in xrange(0, len(l), n):
yield l[i:i + n]
def tuple_sort(a, b):
if a[1] < b[1]:
return 1
elif a[1] > b[1]:
return -1
else:
return cmp(a[0], b[0])
if __name__ == '__main__':
if len(sys.argv) != 2:
print "Program requires path to file for reading!"
sys.exit(1)
text = load(sys.argv[1])
pool = Pool(processes=8, )
partitioned_text = list(chunks(text, len(text) / 8))
single_count_tuples = pool.map(my_map, partitioned_text)
token_to_tuples = my_partition(single_count_tuples)
term_frequencies = pool.map(my_reduce, token_to_tuples.items())
term_frequencies.sort(tuple_sort)
這里利用了multiprocess的map方法,對map和reduce方法進(jìn)行了多進(jìn)程的處理华匾。共設(shè)立了8個進(jìn)程映琳,把讀取到的文件分成8塊進(jìn)行處理。
需要說明的是蜘拉,這里完全是為了仿照hadoop的流程進(jìn)行的計算萨西。效率可能并不是最優(yōu)的。