在前一章中,已經(jīng)對workcount.py 做了代碼注釋稀颁,但是對于初學(xué)者而言,難點(diǎn)在于能將RDD等抽象地數(shù)據(jù)結(jié)構(gòu)在大腦中呈現(xiàn)棱烂,對應(yīng)map粘昨, flatMap的轉(zhuǎn)換過程更是難于通過單步調(diào)試等手段觀察到程序內(nèi)部變量的賦值過程张肾。
在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內(nèi)部各種概念的面紗馁启。我們再次回顧wordcount.py代碼來回答如下問題
對于大多數(shù)語言的Hello Word示例惯疙,都有main()函數(shù)妖啥, wordcount.py的main函數(shù),或者說調(diào)用Spark的main() 在哪里
數(shù)據(jù)的讀入蒿偎,各個RDD數(shù)據(jù)如何轉(zhuǎn)換
map與flatMap的工作機(jī)制诉位,以及區(qū)別
reduceByKey的作用
WordCount.py 的代碼如下:
from __future__ import print_function
import sys
from operator import add
# SparkSession:是一個對Spark的編程入口菜枷,取代了原本的SQLContext與HiveContext,方便調(diào)用Dataset和DataFrame API
# SparkSession可用于創(chuàng)建DataFrame岳瞭,將DataFrame注冊為表瞳筏,在表上執(zhí)行SQL枫耳,緩存表和讀取parquet文件。
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Python 常用的簡單參數(shù)傳入
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
# appName 為 Spark 應(yīng)用設(shè)定一個應(yīng)用名凄硼,改名會顯示在 Spark Web UI 上
# 假如SparkSession 已經(jīng)存在就取得已存在的SparkSession捷沸,否則創(chuàng)建一個新的。
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
# 讀取傳入的文件內(nèi)容说墨,并寫入一個新的RDD實例lines中尼斧,此條語句所做工作有些多棺棵,不適合初學(xué)者熄捍,可以截成兩條語句以便理解。
# map是一種轉(zhuǎn)換函數(shù)缚柏,將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素币喧。原始RDD中的數(shù)據(jù)項與新RDD中的數(shù)據(jù)項是一一對應(yīng)的關(guān)系缕陕。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
# flatMap與map類似扛邑,但每個元素輸入項都可以被映射到0個或多個的輸出項铐然,最終將結(jié)果”扁平化“后輸出
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
# collect() 在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回。 這在返回足夠小的數(shù)據(jù)子集的過濾器或其他操作之后通常是有用的沥阳。由于collect 是將整個RDD匯聚到一臺機(jī)子上自点,所以通常需要預(yù)估返回數(shù)據(jù)集的大小以免溢出。
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
Spark 入口 SparkSession
Spark2.0中引入了SparkSession的概念溅潜,它為用戶提供了一個統(tǒng)一的切入點(diǎn)來使用Spark的各項功能滚澜,這邊不妨對照Http Session嫁怀, 在此Spark就在充當(dāng)Web service的角色,程序調(diào)用Spark功能的時候需要先建立一個Session萝招。因此看到getOrCreate()就很容易理解了即寒, 表明可以視情況新建session或利用已有的session召噩。
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
既然將Spark 想象成一個Web server具滴, 也就意味著可能用多個訪問在進(jìn)行,為了便于監(jiān)控管理周蹭, 對應(yīng)用命名一個恰當(dāng)?shù)拿Q是個好辦法疲恢。Web UI并不是本文的重點(diǎn),有興趣的同學(xué)可以參考 ?Spark Application’s Web Console
加載數(shù)據(jù)
在建立SparkSession之后棚愤, 就是讀入數(shù)據(jù)并寫入到Dateset中杂数。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
為了更好的分解執(zhí)行過程揍移,是時候借助PySpark了, PySpark是python調(diào)用Spark的 API那伐,它可以啟動一個交互式Python Shell。為了方便腳本調(diào)試畅形,暫時切換到Linux執(zhí)行
# pyspark
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.
>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')
>>> type(ds)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> print ds
DataFrame[value: string]
>>> lines = ds.rdd
交互式Shell的好處是可以方便的查看變量內(nèi)容和類型束亏。此刻文件a.txt已經(jīng)加載到lines中碍遍,它是RDD(Resilient Distributed Datasets)彈性分布式數(shù)據(jù)集的實例。
RDD操作
RDD在內(nèi)存中的結(jié)構(gòu)可以參考論文揣炕, 理解RDD有兩點(diǎn)比較重要:
一是RDD一種只讀东跪、只能由已存在的RDD變換而來的共享內(nèi)存,然后將所有數(shù)據(jù)都加載到內(nèi)存中丁恭,方便進(jìn)行多次重用牲览。
二是RDD的數(shù)據(jù)默認(rèn)情況下存放在集群中不同節(jié)點(diǎn)的內(nèi)存中恶守,本身提供了容錯性,可以自動從節(jié)點(diǎn)失敗中恢復(fù)過來庸毫。即如果某個節(jié)點(diǎn)上的RDD partition飒赃,因為節(jié)點(diǎn)故障橡伞,導(dǎo)致數(shù)據(jù)丟了晋被,那么RDD會自動通過自己的數(shù)據(jù)來源重新計算該partition。
為了探究RDD內(nèi)部的數(shù)據(jù)內(nèi)容挂脑,可以利用collect()函數(shù), 它能夠以數(shù)組的形式肋联,返回RDD數(shù)據(jù)集的所有元素刁俭。
>>> lines = ds.rdd
>>> for i in lines.collect():
... print i
...
Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')
lines存儲的是Row object類型牍戚,而我們希望的是對String類型進(jìn)行處理,所以需要利用map api進(jìn)一步轉(zhuǎn)換RDD
>>> lines_map = lines.map(lambda x: x[0])
>>> for i in lines_map.collect():
... print i
...
These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.
為了統(tǒng)計每個單詞的出現(xiàn)頻率宪哩,需要對每個單詞分別統(tǒng)計锁孟,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來茁瘦,并為每個詞設(shè)置一個計數(shù)器。比如 These出現(xiàn)次數(shù)是1, 我們期望的數(shù)據(jù)結(jié)構(gòu)是['There', 1]桑包。但是如何將包含字符串的RDD轉(zhuǎn)換成元素為類似 ['There', 1] 的RDD呢哑了?
>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))
>>> rdd_map = flat_map.map(lambda x: [x, 1])
>>> for i in rdd_map.collect():
... print i
...
[u'These', 1]
[u'examples', 1]
[u'give', 1]
[u'a', 1]
[u'quick', 1]
下圖簡要的講述了flatMap 和 map的轉(zhuǎn)換過程烧颖。
不難看出炕淮,map api只是為所有出現(xiàn)的單詞初始化了計數(shù)器為1,并沒有統(tǒng)計相同詞们镜,接下來這個任務(wù)由reduceByKey()來完成。在rdd_map 中模狭,所有的詞被視為一個key嚼鹉,而key相同的value則執(zhí)行reduceByKey內(nèi)的算子操作,因為統(tǒng)計相同key是累加操作锚赤,所以可以直接add操作线脚。
>>> from operator import add
>>> add_map = rdd_map.reduceByKey(add)
>>> for i in add_map.collect():
... print i
...
(u'a', 1)
(u'on', 1)
(u'of', 2)
(u'arbitrary', 1)
(u'quick', 1)
(u'the', 2)
(u'or', 1)
>>> print rdd_map.count()
26
>>> print add_map.count()
23
根據(jù)a.txt 的內(nèi)容浑侥,可知只有 of 和 the 兩個單詞出現(xiàn)了兩次,符合預(yù)期蠢莺。
總結(jié)
以上的分解步驟零如,可以幫我們理解RDD的操作考蕾,需要提示的是,RDD將操作分為兩類:transformation與action蚯窥。無論執(zhí)行了多少次transformation操作塞帐,RDD都不會真正執(zhí)行運(yùn)算葵姥,只有當(dāng)action操作被執(zhí)行時,運(yùn)算才會觸發(fā)允乐。也就是說牍疏,上面所有的RDD都是通過collect()觸發(fā)的拨齐, 那么如果將上述的transformation放入一條簡練語句中, 則展現(xiàn)為原始wordcount.py的書寫形式奏黑。
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
而真正的action 則是由collect()完成编矾。
output = counts.collect()
至此,已經(jīng)完成了對wordcount.py的深入剖析蹂匹,但是有意的忽略了一些更底層的執(zhí)行過程,比如DAG, stage, 以及Driver程序忍啸。在下一章繼續(xù)講解计雌。