PySpark進(jìn)階--深入剖析wordcount.py

前一章中,已經(jīng)對workcount.py 做了代碼注釋稀颁,但是對于初學(xué)者而言,難點(diǎn)在于能將RDD等抽象地數(shù)據(jù)結(jié)構(gòu)在大腦中呈現(xiàn)棱烂,對應(yīng)map粘昨, flatMap的轉(zhuǎn)換過程更是難于通過單步調(diào)試等手段觀察到程序內(nèi)部變量的賦值過程张肾。

在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內(nèi)部各種概念的面紗馁启。我們再次回顧wordcount.py代碼來回答如下問題

  1. 對于大多數(shù)語言的Hello Word示例惯疙,都有main()函數(shù)妖啥, wordcount.py的main函數(shù),或者說調(diào)用Spark的main() 在哪里

  2. 數(shù)據(jù)的讀入蒿偎,各個RDD數(shù)據(jù)如何轉(zhuǎn)換

  3. map與flatMap的工作機(jī)制诉位,以及區(qū)別

  4. reduceByKey的作用

WordCount.py 的代碼如下:

from __future__ import print_function

import sys
from operator import add

# SparkSession:是一個對Spark的編程入口菜枷,取代了原本的SQLContext與HiveContext,方便調(diào)用Dataset和DataFrame API
# SparkSession可用于創(chuàng)建DataFrame岳瞭,將DataFrame注冊為表瞳筏,在表上執(zhí)行SQL枫耳,緩存表和讀取parquet文件。
from pyspark.sql import SparkSession


if __name__ == "__main__":

    # Python 常用的簡單參數(shù)傳入
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)
        
    # appName 為 Spark 應(yīng)用設(shè)定一個應(yīng)用名凄硼,改名會顯示在 Spark Web UI 上
    # 假如SparkSession 已經(jīng)存在就取得已存在的SparkSession捷沸,否則創(chuàng)建一個新的。
    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()
        
    # 讀取傳入的文件內(nèi)容说墨,并寫入一個新的RDD實例lines中尼斧,此條語句所做工作有些多棺棵,不適合初學(xué)者熄捍,可以截成兩條語句以便理解。
    # map是一種轉(zhuǎn)換函數(shù)缚柏,將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素币喧。原始RDD中的數(shù)據(jù)項與新RDD中的數(shù)據(jù)項是一一對應(yīng)的關(guān)系缕陕。
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
   
    # flatMap與map類似扛邑,但每個元素輸入項都可以被映射到0個或多個的輸出項铐然,最終將結(jié)果”扁平化“后輸出 
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
                
    # collect() 在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回。 這在返回足夠小的數(shù)據(jù)子集的過濾器或其他操作之后通常是有用的沥阳。由于collect 是將整個RDD匯聚到一臺機(jī)子上自点,所以通常需要預(yù)估返回數(shù)據(jù)集的大小以免溢出。             
    output = counts.collect()
    
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

Spark 入口 SparkSession

Spark2.0中引入了SparkSession的概念溅潜,它為用戶提供了一個統(tǒng)一的切入點(diǎn)來使用Spark的各項功能滚澜,這邊不妨對照Http Session嫁怀, 在此Spark就在充當(dāng)Web service的角色,程序調(diào)用Spark功能的時候需要先建立一個Session萝招。因此看到getOrCreate()就很容易理解了即寒, 表明可以視情況新建session或利用已有的session召噩。

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

既然將Spark 想象成一個Web server具滴, 也就意味著可能用多個訪問在進(jìn)行,為了便于監(jiān)控管理周蹭, 對應(yīng)用命名一個恰當(dāng)?shù)拿Q是個好辦法疲恢。Web UI并不是本文的重點(diǎn),有興趣的同學(xué)可以參考 ?Spark Application’s Web Console

加載數(shù)據(jù)

在建立SparkSession之后棚愤, 就是讀入數(shù)據(jù)并寫入到Dateset中杂数。

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

為了更好的分解執(zhí)行過程揍移,是時候借助PySpark了, PySpark是python調(diào)用Spark的 API那伐,它可以啟動一個交互式Python Shell。為了方便腳本調(diào)試畅形,暫時切換到Linux執(zhí)行

# pyspark
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.
>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')
>>> type(ds)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> print ds
DataFrame[value: string]
>>> lines = ds.rdd

交互式Shell的好處是可以方便的查看變量內(nèi)容和類型束亏。此刻文件a.txt已經(jīng)加載到lines中碍遍,它是RDD(Resilient Distributed Datasets)彈性分布式數(shù)據(jù)集的實例。

RDD操作

RDD在內(nèi)存中的結(jié)構(gòu)可以參考論文揣炕, 理解RDD有兩點(diǎn)比較重要:

一是RDD一種只讀东跪、只能由已存在的RDD變換而來的共享內(nèi)存,然后將所有數(shù)據(jù)都加載到內(nèi)存中丁恭,方便進(jìn)行多次重用牲览。

二是RDD的數(shù)據(jù)默認(rèn)情況下存放在集群中不同節(jié)點(diǎn)的內(nèi)存中恶守,本身提供了容錯性,可以自動從節(jié)點(diǎn)失敗中恢復(fù)過來庸毫。即如果某個節(jié)點(diǎn)上的RDD partition飒赃,因為節(jié)點(diǎn)故障橡伞,導(dǎo)致數(shù)據(jù)丟了晋被,那么RDD會自動通過自己的數(shù)據(jù)來源重新計算該partition。

為了探究RDD內(nèi)部的數(shù)據(jù)內(nèi)容挂脑,可以利用collect()函數(shù), 它能夠以數(shù)組的形式肋联,返回RDD數(shù)據(jù)集的所有元素刁俭。

>>> lines = ds.rdd
>>> for i in lines.collect():
...     print i
... 
Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

lines存儲的是Row object類型牍戚,而我們希望的是對String類型進(jìn)行處理,所以需要利用map api進(jìn)一步轉(zhuǎn)換RDD

>>> lines_map = lines.map(lambda x: x[0])
>>> for i in lines_map.collect():
...     print i
... 
These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

為了統(tǒng)計每個單詞的出現(xiàn)頻率宪哩,需要對每個單詞分別統(tǒng)計锁孟,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來茁瘦,并為每個詞設(shè)置一個計數(shù)器。比如 These出現(xiàn)次數(shù)是1, 我們期望的數(shù)據(jù)結(jié)構(gòu)是['There', 1]桑包。但是如何將包含字符串的RDD轉(zhuǎn)換成元素為類似 ['There', 1] 的RDD呢哑了?

>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))
>>> rdd_map = flat_map.map(lambda x: [x, 1])
>>> for i in rdd_map.collect():
...     print i
... 
[u'These', 1]
[u'examples', 1]
[u'give', 1]
[u'a', 1]
[u'quick', 1]

下圖簡要的講述了flatMap 和 map的轉(zhuǎn)換過程烧颖。

transfrom.png

不難看出炕淮,map api只是為所有出現(xiàn)的單詞初始化了計數(shù)器為1,并沒有統(tǒng)計相同詞们镜,接下來這個任務(wù)由reduceByKey()來完成。在rdd_map 中模狭,所有的詞被視為一個key嚼鹉,而key相同的value則執(zhí)行reduceByKey內(nèi)的算子操作,因為統(tǒng)計相同key是累加操作锚赤,所以可以直接add操作线脚。

>>> from operator import add
>>> add_map = rdd_map.reduceByKey(add)
>>> for i in add_map.collect():
...     print i
... 
(u'a', 1)
(u'on', 1)
(u'of', 2)
(u'arbitrary', 1)
(u'quick', 1)
(u'the', 2)
(u'or', 1)

>>> print rdd_map.count()
26
>>> print add_map.count()
23

根據(jù)a.txt 的內(nèi)容浑侥,可知只有 of 和 the 兩個單詞出現(xiàn)了兩次,符合預(yù)期蠢莺。

總結(jié)

以上的分解步驟零如,可以幫我們理解RDD的操作考蕾,需要提示的是,RDD將操作分為兩類:transformation與action蚯窥。無論執(zhí)行了多少次transformation操作塞帐,RDD都不會真正執(zhí)行運(yùn)算葵姥,只有當(dāng)action操作被執(zhí)行時,運(yùn)算才會觸發(fā)允乐。也就是說牍疏,上面所有的RDD都是通過collect()觸發(fā)的拨齐, 那么如果將上述的transformation放入一條簡練語句中, 則展現(xiàn)為原始wordcount.py的書寫形式奏黑。

counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)

而真正的action 則是由collect()完成编矾。

output = counts.collect()

至此,已經(jīng)完成了對wordcount.py的深入剖析蹂匹,但是有意的忽略了一些更底層的執(zhí)行過程,比如DAG, stage, 以及Driver程序忍啸。在下一章繼續(xù)講解计雌。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末玫霎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子翁脆,更是在濱河造成了極大的恐慌鼻种,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件罢缸,死亡現(xiàn)場離奇詭異祖能,居然都是意外死亡养铸,警方通過查閱死者的電腦和手機(jī)轧膘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門鳞滨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拯啦,“玉大人,你說我怎么就攤上這事唁情〉槟瘢” “怎么了兵迅?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵刻恭,是天一觀的道長鳍贾。 經(jīng)常有香客問我勉抓,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任暇藏,我火速辦了婚禮盐碱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己收擦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布酣藻。 她就那樣靜靜地躺著,像睡著了一般送淆。 火紅的嫁衣襯著肌膚如雪偷崩。 梳的紋絲不亂的頭發(fā)上撞羽,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天诀紊,我揣著相機(jī)與錄音,去河邊找鬼笤喳。 笑死碌宴,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的贰镣。 我是一名探鬼主播碑隆,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼子姜!你這毒婦竟也來了哥捕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤愧薛,失蹤者是張志新(化名)和其女友劉穎费奸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡拒贱,死狀恐怖暖呕,靈堂內(nèi)的尸體忽然破棺而出葱淳,到底是詐尸還是另有隱情艳狐,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布刮便,位于F島的核電站,受9級特大地震影響搜贤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望掏湾。 院中可真熱鬧,春花似錦尊浪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瓦侮。三九已至狭魂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間魁莉,已是汗流浹背痹束。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晒来,地道東北人接箫。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓废累,卻偏偏與公主長得像,于是被迫代替她去往敵國和親掖看。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容