跟著Leo學(xué)習(xí)PySpark
chapter2——編寫我們的第一個(gè)程序WordCount
上一章我們大致講了一下pyspark的基本理論和重要概念赢织,如果想系統(tǒng)化且更深入地理解spark中的概念妖爷,還請移步官方文檔烁兰,這一章丸冕,將用一個(gè)我們耳熟能詳?shù)腤ordCount小例子嚎研,零距離感受下pyspark的簡單使用
from pyspark import SparkContext, SparkConf
# 編寫Spark程序做的第一件事是創(chuàng)建一個(gè)SparkContext對象筒繁,該對象告訴Spark如何訪問集群许饿。
# 要?jiǎng)?chuàng)建SparkContext阳欲,首先需要?jiǎng)?chuàng)建一個(gè)SparkConf對象,該對象包含有關(guān)您的應(yīng)用程序的信息。
# conf = SparkConf().setAppName(appName).setMaster(master)
# sc = SparkContext(conf=conf)
conf = SparkConf().setAppName("leo-study-spark").setMaster("local")
sc = SparkContext(conf=conf)
# 1. appName參數(shù)是您的應(yīng)用程序在群集UI上顯示的名稱胸完。
# 2. master是一個(gè)Spark书释,Mesos或YARN群集URL,或一個(gè)特殊的“l(fā)ocal”字符串赊窥,以本地模式運(yùn)行爆惧。
# 3. 對于本地測試和單元測試,您可以傳遞“ local”以在內(nèi)部運(yùn)行Spark
# spark程序圍繞RDD的概念展開锨能,創(chuàng)建RDD的方式有兩種:并行化驅(qū)動(dòng)程序中的現(xiàn)有集合扯再,或外部存儲系統(tǒng)
# 1. 所謂并行化驅(qū)動(dòng)程序中的現(xiàn)有集合,說白了址遇,就類似于本地的一個(gè)數(shù)組變量
# 2. 外部存儲系統(tǒng)可以是本地文件系統(tǒng)熄阻、HDFS
# 方式一:從本地?cái)?shù)組變量中創(chuàng)建一個(gè)RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print type(rdd)
# rdd被創(chuàng)建后,就可以并行化處理倔约,列如我們可以調(diào)用map做一步轉(zhuǎn)換操作秃殉,
# 然后調(diào)用reduce聚合計(jì)算我們的數(shù)據(jù)集,最后使用print打印輸出浸剩。
result = rdd.map(lambda x: x+1).reduce(lambda a, b: a + b)
print result
# 可以看到輸出的結(jié)果是20
# 程序首先進(jìn)行了一個(gè)map轉(zhuǎn)換操作钾军,即對數(shù)據(jù)集中的每一個(gè)元素都加上1
# 其次,又對這個(gè)RDD進(jìn)行了reduce的累加操作绢要,最后輸出元素累加后的結(jié)果
<class 'pyspark.rdd.RDD'>
20
# 方式二:從本地文件系統(tǒng)中創(chuàng)建一個(gè)RDD吏恭,并演示我們今天的第一個(gè)入門級小程序,WordCount
# test.txt 文本內(nèi)容如下:
# I love china
# china is my home
# I love yyf
# yyf is a beautiful girl
file_path = "/Users/mac/software/conda-demo/test_data/test.txt"
# 從文件系統(tǒng)中創(chuàng)建RDD重罪,調(diào)用sc.textFile(filePath)
rdd = sc.textFile(file_path)
# textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz")
print 'textFile 將文件映射成RDD樱哼,RDD中的每一個(gè)元素是文件中的一行內(nèi)容'
print rdd.collect()
print '--------------------------------------------------'
# flatMap 其實(shí)是先做了map操作,然后在此基礎(chǔ)上又做了一層合并操作剿配,大家可以看到
rdd = rdd.flatMap(lambda x: x.split(" "))
print rdd.collect()
print 'flatMap 先做了map操作搅幅,把RDD每一行內(nèi)容按空格分隔,映射成為一個(gè)字符串?dāng)?shù)組惨篱,再做了合并操作'
print '--------------------------------------------------'
rdd = rdd.map(lambda x:(x, 1))
print rdd.collect()
print "map 操作對RDD數(shù)據(jù)集中的每一個(gè)單詞進(jìn)行計(jì)數(shù)"
print '--------------------------------------------------'
rdd = rdd.reduceByKey(lambda x, y: x + y)
print 'reduceByKey 對數(shù)據(jù)集中每一個(gè)元組結(jié)構(gòu)的第一個(gè)元素盏筐,分組后進(jìn)行累加計(jì)算围俘,統(tǒng)計(jì)出文章中每個(gè)單詞出現(xiàn)的頻次砸讳,然后把結(jié)果輸出'
print rdd.collect()
textFile 將文件映射成RDD,RDD中的每一個(gè)元素是文件中的一行內(nèi)容
[u'I love china', u'china is my home', u'I love yyf', u'yyf is a beautiful girl']
--------------------------------------------------
[u'I', u'love', u'china', u'china', u'is', u'my', u'home', u'I', u'love', u'yyf', u'yyf', u'is', u'a', u'beautiful', u'girl']
flatMap 先做了map操作界牡,把RDD每一行內(nèi)容按空格分隔簿寂,映射成為一個(gè)字符串?dāng)?shù)組,再做了合并操作
--------------------------------------------------
[(u'I', 1), (u'love', 1), (u'china', 1), (u'china', 1), (u'is', 1), (u'my', 1), (u'home', 1), (u'I', 1), (u'love', 1), (u'yyf', 1), (u'yyf', 1), (u'is', 1), (u'a', 1), (u'beautiful', 1), (u'girl', 1)]
map 操作對RDD數(shù)據(jù)集中的每一個(gè)單詞進(jìn)行計(jì)數(shù)
--------------------------------------------------
reduceByKey 對數(shù)據(jù)集中每一個(gè)元組結(jié)構(gòu)的第一個(gè)元素宿亡,分組后進(jìn)行累加計(jì)算常遂,統(tǒng)計(jì)出文章中每個(gè)單詞出現(xiàn)的頻次,然后把結(jié)果輸出
[(u'a', 1), (u'beautiful', 1), (u'love', 2), (u'I', 2), (u'is', 2), (u'yyf', 2), (u'china', 2), (u'home', 1), (u'girl', 1), (u'my', 1)]
# 同樣演示基于本地集合的WordCount程序挽荠,依舊是同樣的輸入與輸出
data = ["I love china",
"china is my home",
"I love yyf",
"yyf is a beautiful girl"]
rdd = sc.parallelize(data)
print rdd.collect()
rdd = rdd.flatMap(lambda x: x.split(' '))
print rdd.collect()
rdd = rdd.map(lambda x: (x, 1))
print rdd.collect()
rdd = rdd.reduceByKey(lambda x, y: x + y)
print rdd.collect()
['I love china', 'china is my home', 'I love yyf', 'yyf is a beautiful girl']
['I', 'love', 'china', 'china', 'is', 'my', 'home', 'I', 'love', 'yyf', 'yyf', 'is', 'a', 'beautiful', 'girl']
[('I', 1), ('love', 1), ('china', 1), ('china', 1), ('is', 1), ('my', 1), ('home', 1), ('I', 1), ('love', 1), ('yyf', 1), ('yyf', 1), ('is', 1), ('a', 1), ('beautiful', 1), ('girl', 1)]
[('a', 1), ('beautiful', 1), ('love', 2), ('I', 2), ('is', 2), ('yyf', 2), ('china', 2), ('home', 1), ('girl', 1), ('my', 1)]
# 除了上述方式克胳,還有很多創(chuàng)建RDD的方式平绩,從HDFS文件系統(tǒng)中創(chuàng)建RDD,只用把file_path路徑換成我們的HDFS路徑即可
#到這里漠另,我們可以知曉捏雌,spark程序說白了就是對一個(gè)超大數(shù)據(jù)集(一臺機(jī)器跑不動(dòng)),轉(zhuǎn)換成可并行計(jì)算的RDD數(shù)據(jù)集笆搓,然后被分發(fā)到不同的計(jì)算節(jié)點(diǎn)
#去執(zhí)行性湿,經(jīng)過一系列的轉(zhuǎn)換操作,最終觸發(fā)Action操作满败,要么把計(jì)算結(jié)果收集肤频,要么輸出到其他存儲介質(zhì)中
# 顯式地聲明SparkContext的做法,在目前版本的Spark(2.x)中是不被推薦的算墨,pache Spark 2.0引入了SparkSession宵荒,
#為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來使用Spark的各項(xiàng)功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序净嘀。最重要的是骇扇,
#它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互面粮,而且少孝,在之后的課程中也將采用這樣的方式去聲明spark操作對象。
#
# 下面將演示通過SparkSession來實(shí)現(xiàn)同樣的WordCount功能熬苍,而不需要顯式地創(chuàng)建SparkConf稍走,SparkContext,
#因?yàn)檫@些對象已經(jīng)封裝在SparkSession中柴底。
#
# 使用生成器的設(shè)計(jì)模式(builder design pattern)婿脸,如果我們沒有創(chuàng)建SparkSession對象,
#則會實(shí)例化出一個(gè)新的SparkSession對象及其相關(guān)的上下文柄驻。
#
# 一樣的輸入與輸出狐树,大家可以自己嘗試一步一步調(diào)試,再次感受spark程序的運(yùn)行規(guī)律鸿脓。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("leo-study-spark").getOrCreate()
rdd = spark.read.text('/Users/mac/software/conda-demo/test_data/test.txt').rdd.map(lambda x: x[0])
rdd = rdd.flatMap(lambda x: x.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
for (word, count) in rdd.collect():
print "%s: %d" %(word, count)
spark.stop()
a: 1
beautiful: 1
love: 2
I: 2
is: 2
yyf: 2
china: 2
home: 1
girl: 1
my: 1