作者:陳華勇
評(píng)審人:宋雪菲,孔慶振
近些年來,隨著互聯(lián)網(wǎng)技術(shù)的高速發(fā)展众旗,數(shù)據(jù)量也在指數(shù)級(jí)增長(zhǎng)埠戳,繼而產(chǎn)生了大數(shù)據(jù)。大數(shù)據(jù)數(shù)據(jù)規(guī)模巨大,數(shù)據(jù)類型多樣怨绣,產(chǎn)生和處理速度極快角溃,價(jià)值巨大但是密度較低。如何使用這些大數(shù)據(jù)是近些年研究的重要內(nèi)容篮撑。spark就是處理大數(shù)據(jù)的一個(gè)重要的技術(shù)减细。本系列文章主要由淺入深,從基礎(chǔ)到復(fù)雜來介紹spark技術(shù)的各個(gè)方面赢笨。
本文簡(jiǎn)要介紹spark的基本組件未蝌,并從spark對(duì)數(shù)據(jù)的核心抽象——彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset)簡(jiǎn)稱RDD,來開始spark技術(shù)的學(xué)習(xí)茧妒。
一萧吠、spark及其組件簡(jiǎn)介
spark是一個(gè)用來實(shí)現(xiàn)快速而通用的集群計(jì)算的平臺(tái)。其擴(kuò)展了在大數(shù)據(jù)中廣泛使用的MapReduce計(jì)算模型桐筏,并且高效地支持更多的計(jì)算模式(例如:交互式查詢和流處理)纸型。適用于各種各樣原先需要多種不同的分布式平臺(tái)的場(chǎng)景,包括批處理九昧、迭代算法绊袋、流處理。此外铸鹰,spark提供了豐富的接口癌别,除了提供基于python、java蹋笼、Scala和SQL的api以及內(nèi)建的豐富的程序庫(kù)以外展姐,還能和其他大數(shù)據(jù)工具配合使用。
總的來說剖毯,spark是一個(gè)大一統(tǒng)的軟件棧圾笨,包含了多個(gè)緊密集成的組件。如圖1-1所示逊谋。接下里簡(jiǎn)單介紹部分重要組件的基本功能擂达。
1、Spark Core
spark core實(shí)現(xiàn)了spark的基本功能胶滋,包括任務(wù)調(diào)度板鬓、內(nèi)存管理、錯(cuò)誤恢復(fù)究恤、與存儲(chǔ)系統(tǒng)交互等模塊俭令。
2、Spark SQL
Spark SQL是spark用來操作結(jié)構(gòu)化數(shù)據(jù)的程序包部宿。通過該組件抄腔,我們可以使用SQL或者Apache Hive的SQL方言(HQL)來查詢數(shù)據(jù)。
3、Spark Streaming
Spark Streaming是用來對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行流式計(jì)算的組件赫蛇。
4绵患、MLlib
MLlib是spark為機(jī)器學(xué)習(xí)而生成的一個(gè)程序庫(kù)。提供了多種機(jī)器學(xué)習(xí)的算法棍掐,包括分類藏雏、回歸、聚類作煌,還提供了模型評(píng)估掘殴、數(shù)據(jù)導(dǎo)入等額外的支持功能。
5粟誓、GraphX
GraphX是用來處理圖數(shù)據(jù)的程序庫(kù)奏寨。
二、RDD編程
RDD(彈性分布式數(shù)據(jù)集)是spark對(duì)數(shù)據(jù)的抽象核心鹰服。在spark中病瞳,所有對(duì)數(shù)據(jù)的操作都是通過RDD來實(shí)現(xiàn)的。包括創(chuàng)建RDD悲酷、轉(zhuǎn)換已有RDD和調(diào)用RDD操作進(jìn)行求值套菜。在此過程中,用戶不需要考慮數(shù)據(jù)的集群?jiǎn)栴}设易,因?yàn)閟park會(huì)自動(dòng)地將RDD中的數(shù)據(jù)分發(fā)到集群上逗柴,并將操作并行化執(zhí)行。下面主要以python3的API為例介紹RDD的創(chuàng)建以及基本的操作顿肺。
1戏溺、RDD創(chuàng)建
RDD的創(chuàng)建有兩種方式:在驅(qū)動(dòng)程序中對(duì)一個(gè)集合進(jìn)行并行化;讀取外部數(shù)據(jù)集屠尊。
將一個(gè)已有的集合傳給SparkContext的parallelize()方法就可以創(chuàng)建一個(gè)簡(jiǎn)單的RDD旷祸。由于創(chuàng)建方式比較簡(jiǎn)單,這種方式在學(xué)習(xí)spark的時(shí)候用的比較多讼昆。但是在開發(fā)和測(cè)試的時(shí)候使用的并不是很多托享。具體創(chuàng)建方式如例2-1所示。
** 例2-1 >>>lines=sc.parallelize(["pandas","i like pandas"])**
開發(fā)過程中使用比較多的是從外部存儲(chǔ)中讀取數(shù)據(jù)來創(chuàng)建RDD浸赫。其中讀取文本文件的方法是SparkContext的textFile()方法闰围。創(chuàng)建方式如例2-2所示。
** 例2-2 >>>liens=sc.textFile("/path/example.txt")**
2掺炭、RDD操作
RDD支持兩種類型的操作:轉(zhuǎn)化操作和行動(dòng)操作。其中凭戴,轉(zhuǎn)化操作是返回一個(gè)新的RDD涧狮,而行動(dòng)操作是向驅(qū)動(dòng)器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作,會(huì)觸發(fā)實(shí)際的計(jì)算。
轉(zhuǎn)化操作
轉(zhuǎn)化操作轉(zhuǎn)化出來的RDD都是惰性求值的(惰性求值會(huì)在后面詳細(xì)介紹)者冤。只有在行動(dòng)操作中的這些轉(zhuǎn)化操作才會(huì)被真正執(zhí)行肤视。注意的是,許多的轉(zhuǎn)化操作都是各個(gè)元素的涉枫,每次只會(huì)操作RDD中的一個(gè)元素邢滑。當(dāng)然這得看轉(zhuǎn)化操作的具體實(shí)現(xiàn)。比較常見的轉(zhuǎn)化操作有:map()和filter()愿汰。下面以filter()為例介紹轉(zhuǎn)化操作困后。
filter()是一種篩選操作,可以將RDD中符合條件的元素提取出來生成一個(gè)新的RDD衬廷。例2-3中errorsRDD中就是從inputRDD中的篩選出有“error”字符串的行從而生成的新的RDD摇予。注意的是,filter()操作不會(huì)改變?cè)械膇nputRDD內(nèi)容吗跋,只是重新生成了一個(gè)RDD侧戴。
例2-3
>>>inputRDD=sc.textFile("log.txt")
>>>errorsRDD=inputRDD.filter(lambda x: "error" in x)
行動(dòng)操作
行動(dòng)操作需要生成實(shí)際的輸出,它會(huì)強(qiáng)制執(zhí)行那些求值必須用到的RDD的轉(zhuǎn)化操作跌宛。其中比較典型的就是count()方法酗宋。在例2-4中,errorsRDD為轉(zhuǎn)化操作中生成的RDD疆拘,調(diào)用count()時(shí)會(huì)返回該RDD中的元素的個(gè)數(shù)蜕猫。此時(shí),例2-3中的filter()才會(huì)被真正地執(zhí)行入问。這就是我們所說的惰性求值丹锹。在該例中還使用了take()方法獲取到RDD中的前10個(gè)元素,需要注意的是take()返回的結(jié)果是一個(gè)list芬失。
例2-4
>>>print("errors number is :"+str(errorsRDD.count()))
>>>for line in errorsRDD.take(10):
>>> print(line)
參考文獻(xiàn)
[1] Holden Karau , Andy Konwinski , Patrick Wendell , Matei Zaharia .Spark快速大數(shù)據(jù)分析[M].北京:人民郵電出版社,2015.9;