問題描述
存在一些文件名類似 2016-07-16_161.res 2016-07-18_161.res
的文件弥鹦,文件內(nèi)容如下:
less 2016-07-16_161.res
1.34.0.68
1.34.1.37
1.34.1.121
1.34.5.87
1.34.5.182
1.34.6.72
1.34.6.245
1.34.9.149
1.34.11.74
1.34.13.161
...
希望通過 Spark 統(tǒng)計(jì)出存活超過 n 天的 IP落君,并可以同時(shí)看到該 IP 分別在哪天存活。
因此需要先將這些文件轉(zhuǎn)換成 Pair RDD,其中的每一項(xiàng)以 IP 為 key脖咐,以日期為 value断箫。
編寫 Spark 腳本如下:
rdds = [sc.textFile(f).map(lambda x: (x, f.split('_')[0])) for f in glob('*.res')]
預(yù)期得到結(jié)果:
rdds[0].first()
(u'210.240.117.126', '2016-07-16')
rdds[1].first()
(u'210.240.117.126', '2016-07-18')
但實(shí)際得到的結(jié)果為:
rdds[0].first()
(u'210.240.117.126', '2016-07-18')
rdds[1].first()
(u'210.240.117.126', '2016-07-18')
即所有的 RDD 中每一項(xiàng)的 value 都為同一值瓜浸。
問題定位
看到這個(gè)現(xiàn)象基本把可能出問題的點(diǎn)鎖定在了 map(lambda x: (x, f.split('_')[0]))
附近。
查找了 pyspark 中 map
函數(shù)的實(shí)現(xiàn)比原,也沒發(fā)現(xiàn)有什么不妥的地方插佛。rdd.py 中 RDD::map()
的實(shí)現(xiàn)如下:
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
"""
def func(_, iterator):
return map(f, iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
另一個(gè)值得懷疑的點(diǎn)就是 lambda 的行為是不是真的符合預(yù)期,但是一直只是懷疑量窘,沒有找到有效的方法來驗(yàn)證自己的猜測(cè)雇寇。直到看了 @張通 轉(zhuǎn)發(fā)的 《Python 中的 lambda 和「真正的」lambda 有什么區(qū)別?》[1]蚌铜,才確認(rèn)并搞清楚問題發(fā)生的根本原因锨侯。
問題發(fā)生的根本原因在于 Python 中的 lambda
在實(shí)現(xiàn)上存在缺陷[2],導(dǎo)致 Spark 腳本中傳入 map
函數(shù)的 lambda
表達(dá)式共享了同一個(gè)變量 f
冬殃,從而導(dǎo)致了上述問題的發(fā)生囚痴。
舉一反三
目前大概可以確定,Python 實(shí)現(xiàn)的 lambda
表達(dá)式中的變量可能并不像正常的函數(shù)那樣具有獨(dú)立的作用域审葬。
以下述代碼為例:
test = [lambda x: x+i for i in range(10)]
print test[0](1)
print test[9](1)
這段代碼并不會(huì)如預(yù)期的輸出 1
和 10
深滚,而是會(huì)輸出 10
和 10
。使用 dis
模塊分析列表中的任意一個(gè) lambda
表達(dá)式得到如下結(jié)果涣觉。
In [3]: dis.dis(test[0])
1 0 LOAD_FAST 0 (x)
3 LOAD_GLOBAL 0 (i)
6 BINARY_ADD
7 RETURN_VALUE
從上述 Python bytecode 中可以看出 lambda
表達(dá)式中的變量 i
的確沒有一個(gè)獨(dú)立的作用域痴荐,而是使用了相對(duì)全局的作用域,而此時(shí)該作用域中的變量 i
已經(jīng)變成了 9
官册,因此得到了上述結(jié)果生兆。
更近一步,list comprehension 中的變量的作用域又是怎樣的呢攀隔?是僅僅作用于 list comprehension 內(nèi)部皂贩,還是也會(huì)影響到外部呢?
實(shí)測(cè)代碼如下:
In [13]: p = 100
In [14]: a = [p for p in range(10)]
In [15]: print a
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [16]: print p
9
可見 list comprehension 中的變量也會(huì)對(duì)其外的變量產(chǎn)生影響昆汹,即 list comprehension 中的變量也不具有獨(dú)立的作用域明刷。所以,雖然 list comprehension 具有執(zhí)行效率高和可讀性強(qiáng)等優(yōu)點(diǎn)满粗,在實(shí)際的編碼中也需要多注意這些副作用辈末,防止被坑。
問題解決
下面兩個(gè)方法均可解決該問題:
def gen_rdd(f):
return sc.textFile(f).map(lambda x: (x, f.split('_')[0]))
rdds = [gen_rdd(f) for f in glob('*.res')]
rdds = map(
lambda f: sc.textFile(f).map(lambda x: (x, f.split('_')[0])), glob('*.res')
)
推薦使用第二種方式映皆。