pyspark_2_入門篇(編寫我們的第一個(gè)程序WordCount)

跟著Leo學(xué)習(xí)PySpark

chapter2——編寫我們的第一個(gè)程序WordCount

上一章我們大致講了一下pyspark的基本理論和重要概念赢织,如果想系統(tǒng)化且更深入地理解spark中的概念妖爷,還請移步官方文檔烁兰,這一章丸冕,將用一個(gè)我們耳熟能詳?shù)腤ordCount小例子嚎研,零距離感受下pyspark的簡單使用

from pyspark import SparkContext, SparkConf

# 編寫Spark程序做的第一件事是創(chuàng)建一個(gè)SparkContext對象筒繁,該對象告訴Spark如何訪問集群许饿。
# 要?jiǎng)?chuàng)建SparkContext阳欲,首先需要?jiǎng)?chuàng)建一個(gè)SparkConf對象,該對象包含有關(guān)您的應(yīng)用程序的信息。

# conf = SparkConf().setAppName(appName).setMaster(master)
# sc = SparkContext(conf=conf)

conf = SparkConf().setAppName("leo-study-spark").setMaster("local")
sc = SparkContext(conf=conf)

# 1. appName參數(shù)是您的應(yīng)用程序在群集UI上顯示的名稱胸完。
# 2. master是一個(gè)Spark书释,Mesos或YARN群集URL,或一個(gè)特殊的“l(fā)ocal”字符串赊窥,以本地模式運(yùn)行爆惧。
# 3. 對于本地測試和單元測試,您可以傳遞“ local”以在內(nèi)部運(yùn)行Spark

# spark程序圍繞RDD的概念展開锨能,創(chuàng)建RDD的方式有兩種:并行化驅(qū)動(dòng)程序中的現(xiàn)有集合扯再,或外部存儲系統(tǒng)

# 1. 所謂并行化驅(qū)動(dòng)程序中的現(xiàn)有集合,說白了址遇,就類似于本地的一個(gè)數(shù)組變量
# 2. 外部存儲系統(tǒng)可以是本地文件系統(tǒng)熄阻、HDFS
# 方式一:從本地?cái)?shù)組變量中創(chuàng)建一個(gè)RDD

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print type(rdd)

# rdd被創(chuàng)建后,就可以并行化處理倔约,列如我們可以調(diào)用map做一步轉(zhuǎn)換操作秃殉,
# 然后調(diào)用reduce聚合計(jì)算我們的數(shù)據(jù)集,最后使用print打印輸出浸剩。
result = rdd.map(lambda x: x+1).reduce(lambda a, b: a + b)
print result

# 可以看到輸出的結(jié)果是20
# 程序首先進(jìn)行了一個(gè)map轉(zhuǎn)換操作钾军,即對數(shù)據(jù)集中的每一個(gè)元素都加上1
# 其次,又對這個(gè)RDD進(jìn)行了reduce的累加操作绢要,最后輸出元素累加后的結(jié)果
<class 'pyspark.rdd.RDD'>
20
# 方式二:從本地文件系統(tǒng)中創(chuàng)建一個(gè)RDD吏恭,并演示我們今天的第一個(gè)入門級小程序,WordCount
# test.txt 文本內(nèi)容如下:
# I love china
# china is my home
# I love yyf
# yyf is a beautiful girl

file_path = "/Users/mac/software/conda-demo/test_data/test.txt"

# 從文件系統(tǒng)中創(chuàng)建RDD重罪,調(diào)用sc.textFile(filePath)

rdd = sc.textFile(file_path)

# textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz")

print 'textFile 將文件映射成RDD樱哼,RDD中的每一個(gè)元素是文件中的一行內(nèi)容'
print rdd.collect()
print '--------------------------------------------------'

# flatMap 其實(shí)是先做了map操作,然后在此基礎(chǔ)上又做了一層合并操作剿配,大家可以看到

rdd = rdd.flatMap(lambda x: x.split(" "))

print rdd.collect()
print 'flatMap 先做了map操作搅幅,把RDD每一行內(nèi)容按空格分隔,映射成為一個(gè)字符串?dāng)?shù)組惨篱,再做了合并操作'
print '--------------------------------------------------'

rdd = rdd.map(lambda x:(x, 1))

print rdd.collect()
print "map 操作對RDD數(shù)據(jù)集中的每一個(gè)單詞進(jìn)行計(jì)數(shù)"
print '--------------------------------------------------'

rdd = rdd.reduceByKey(lambda x, y: x + y)
print 'reduceByKey 對數(shù)據(jù)集中每一個(gè)元組結(jié)構(gòu)的第一個(gè)元素盏筐,分組后進(jìn)行累加計(jì)算围俘,統(tǒng)計(jì)出文章中每個(gè)單詞出現(xiàn)的頻次砸讳,然后把結(jié)果輸出'

print rdd.collect()

textFile 將文件映射成RDD,RDD中的每一個(gè)元素是文件中的一行內(nèi)容
[u'I love china', u'china is my home', u'I love yyf', u'yyf is a beautiful girl']
--------------------------------------------------
[u'I', u'love', u'china', u'china', u'is', u'my', u'home', u'I', u'love', u'yyf', u'yyf', u'is', u'a', u'beautiful', u'girl']
flatMap 先做了map操作界牡,把RDD每一行內(nèi)容按空格分隔簿寂,映射成為一個(gè)字符串?dāng)?shù)組,再做了合并操作
--------------------------------------------------
[(u'I', 1), (u'love', 1), (u'china', 1), (u'china', 1), (u'is', 1), (u'my', 1), (u'home', 1), (u'I', 1), (u'love', 1), (u'yyf', 1), (u'yyf', 1), (u'is', 1), (u'a', 1), (u'beautiful', 1), (u'girl', 1)]
map 操作對RDD數(shù)據(jù)集中的每一個(gè)單詞進(jìn)行計(jì)數(shù)
--------------------------------------------------
reduceByKey 對數(shù)據(jù)集中每一個(gè)元組結(jié)構(gòu)的第一個(gè)元素宿亡,分組后進(jìn)行累加計(jì)算常遂,統(tǒng)計(jì)出文章中每個(gè)單詞出現(xiàn)的頻次,然后把結(jié)果輸出
[(u'a', 1), (u'beautiful', 1), (u'love', 2), (u'I', 2), (u'is', 2), (u'yyf', 2), (u'china', 2), (u'home', 1), (u'girl', 1), (u'my', 1)]
# 同樣演示基于本地集合的WordCount程序挽荠,依舊是同樣的輸入與輸出
data = ["I love china",
"china is my home",
"I love yyf",
"yyf is a beautiful girl"]

rdd = sc.parallelize(data)
print rdd.collect()

rdd = rdd.flatMap(lambda x: x.split(' '))

print rdd.collect()

rdd = rdd.map(lambda x: (x, 1))

print rdd.collect()

rdd = rdd.reduceByKey(lambda x, y: x + y)

print rdd.collect()
['I love china', 'china is my home', 'I love yyf', 'yyf is a beautiful girl']
['I', 'love', 'china', 'china', 'is', 'my', 'home', 'I', 'love', 'yyf', 'yyf', 'is', 'a', 'beautiful', 'girl']
[('I', 1), ('love', 1), ('china', 1), ('china', 1), ('is', 1), ('my', 1), ('home', 1), ('I', 1), ('love', 1), ('yyf', 1), ('yyf', 1), ('is', 1), ('a', 1), ('beautiful', 1), ('girl', 1)]
[('a', 1), ('beautiful', 1), ('love', 2), ('I', 2), ('is', 2), ('yyf', 2), ('china', 2), ('home', 1), ('girl', 1), ('my', 1)]
#  除了上述方式克胳,還有很多創(chuàng)建RDD的方式平绩,從HDFS文件系統(tǒng)中創(chuàng)建RDD,只用把file_path路徑換成我們的HDFS路徑即可
#到這里漠另,我們可以知曉捏雌,spark程序說白了就是對一個(gè)超大數(shù)據(jù)集(一臺機(jī)器跑不動(dòng)),轉(zhuǎn)換成可并行計(jì)算的RDD數(shù)據(jù)集笆搓,然后被分發(fā)到不同的計(jì)算節(jié)點(diǎn)
#去執(zhí)行性湿,經(jīng)過一系列的轉(zhuǎn)換操作,最終觸發(fā)Action操作满败,要么把計(jì)算結(jié)果收集肤频,要么輸出到其他存儲介質(zhì)中
#  顯式地聲明SparkContext的做法,在目前版本的Spark(2.x)中是不被推薦的算墨,pache Spark 2.0引入了SparkSession宵荒,
#為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來使用Spark的各項(xiàng)功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序净嘀。最重要的是骇扇,
#它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互面粮,而且少孝,在之后的課程中也將采用這樣的方式去聲明spark操作對象。
#  
#  下面將演示通過SparkSession來實(shí)現(xiàn)同樣的WordCount功能熬苍,而不需要顯式地創(chuàng)建SparkConf稍走,SparkContext,
#因?yàn)檫@些對象已經(jīng)封裝在SparkSession中柴底。
#
#  使用生成器的設(shè)計(jì)模式(builder design pattern)婿脸,如果我們沒有創(chuàng)建SparkSession對象,
#則會實(shí)例化出一個(gè)新的SparkSession對象及其相關(guān)的上下文柄驻。
#  
#  一樣的輸入與輸出狐树,大家可以自己嘗試一步一步調(diào)試,再次感受spark程序的運(yùn)行規(guī)律鸿脓。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("leo-study-spark").getOrCreate()

rdd = spark.read.text('/Users/mac/software/conda-demo/test_data/test.txt').rdd.map(lambda x: x[0])
rdd = rdd.flatMap(lambda x: x.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

for (word, count) in rdd.collect():
    print "%s: %d" %(word, count)
    
spark.stop()
a: 1
beautiful: 1
love: 2
I: 2
is: 2
yyf: 2
china: 2
home: 1
girl: 1
my: 1
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末抑钟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子野哭,更是在濱河造成了極大的恐慌在塔,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拨黔,死亡現(xiàn)場離奇詭異蛔溃,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門贺待,熙熙樓的掌柜王于貴愁眉苦臉地迎上來徽曲,“玉大人,你說我怎么就攤上這事麸塞∨蔽唬” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵喘垂,是天一觀的道長甜刻。 經(jīng)常有香客問我,道長正勒,這世上最難降的妖魔是什么得院? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮章贞,結(jié)果婚禮上祥绞,老公的妹妹穿的比我還像新娘。我一直安慰自己鸭限,他們只是感情好蜕径,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著败京,像睡著了一般兜喻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赡麦,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天朴皆,我揣著相機(jī)與錄音,去河邊找鬼泛粹。 笑死遂铡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晶姊。 我是一名探鬼主播扒接,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼们衙!你這毒婦竟也來了钾怔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤砍艾,失蹤者是張志新(化名)和其女友劉穎蒂教,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體脆荷,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜓谋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片梦皮。...
    茶點(diǎn)故事閱讀 39,785評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖桃焕,靈堂內(nèi)的尸體忽然破棺而出剑肯,到底是詐尸還是另有隱情,我是刑警寧澤观堂,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布让网,位于F島的核電站,受9級特大地震影響师痕,放射性物質(zhì)發(fā)生泄漏溃睹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一胰坟、第九天 我趴在偏房一處隱蔽的房頂上張望因篇。 院中可真熱鬧,春花似錦笔横、人聲如沸竞滓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽商佑。三九已至,卻和暖如春厢塘,著一層夾襖步出監(jiān)牢的瞬間莉御,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工俗冻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留礁叔,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓迄薄,卻偏偏與公主長得像琅关,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子讥蔽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容