RDD分區(qū)
在分布式程序中,通信的代價(jià)是很大的睁枕,因此控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大地提升整體性能搜变。所以對(duì)RDD進(jìn)行分區(qū)的目的就是減少網(wǎng)絡(luò)傳輸?shù)拇鷥r(jià)以提高系統(tǒng)的性能凤巨。
RDD的特性
在講RDD分區(qū)之前,先說一下RDD的特性男摧。
RDD宾舅,全稱為Resilient Distributed Datasets,是一個(gè)容錯(cuò)的彩倚、并行的數(shù)據(jù)結(jié)構(gòu)筹我,可以讓用戶顯式地將數(shù)據(jù)存儲(chǔ)到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)帆离。同時(shí)蔬蕊,RDD還提供了一組豐富的操作來操作這些數(shù)據(jù)。在這些操作中哥谷,諸如map岸夯、flatMap、filter等轉(zhuǎn)換操作實(shí)現(xiàn)了monad模式们妥,很好地契合了Scala的集合操作猜扮。除此之外,RDD還提供了諸如join监婶、groupBy旅赢、reduceByKey等更為方便的操作(注意齿桃,reduceByKey是action,而非transformation)煮盼,以支持常見的數(shù)據(jù)運(yùn)算短纵。
通常來講,針對(duì)數(shù)據(jù)處理有幾種常見模型僵控,包括:Iterative Algorithms香到,Relational Queries,MapReduce报破,Stream Processing悠就。例如Hadoop MapReduce采用了MapReduces模型,Storm則采用了Stream Processing模型充易。RDD混合了這四種模型理卑,使得Spark可以應(yīng)用于各種大數(shù)據(jù)處理場(chǎng)景。
RDD作為數(shù)據(jù)結(jié)構(gòu)蔽氨,本質(zhì)上是一個(gè)只讀的分區(qū)記錄集合藐唠。一個(gè)RDD可以包含多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)dataset片段鹉究。RDD可以相互依賴宇立。如果RDD的每個(gè)分區(qū)最多只能被一個(gè)Child RDD的一個(gè)分區(qū)使用,則稱之為narrow dependency自赔;若多個(gè)Child RDD分區(qū)都可以依賴妈嘹,則稱之為wide dependency。不同的操作依據(jù)其特性绍妨,可能會(huì)產(chǎn)生不同的依賴润脸。例如map操作會(huì)產(chǎn)生narrow dependency,而join操作則產(chǎn)生wide dependency他去。
Spark之所以將依賴分為narrow與wide毙驯,基于兩點(diǎn)原因。
首先灾测,narrow dependencies可以支持在同一個(gè)cluster node上以管道形式執(zhí)行多條命令爆价,例如在執(zhí)行了map后,緊接著執(zhí)行filter媳搪。相反铭段,wide dependencies需要所有的父分區(qū)都是可用的,可能還需要調(diào)用類似MapReduce之類的操作進(jìn)行跨節(jié)點(diǎn)傳遞秦爆。
其次序愚,則是從失敗恢復(fù)的角度考慮。narrow dependencies的失敗恢復(fù)更有效等限,因?yàn)樗恍枰匦掠?jì)算丟失的parent partition即可爸吮,而且可以并行地在不同節(jié)點(diǎn)進(jìn)行重計(jì)算芬膝。而wide dependencies牽涉到RDD各級(jí)的多個(gè)Parent Partitions。下圖說明了narrow dependencies與wide dependencies之間的區(qū)別:
本圖來自Matei Zaharia撰寫的論文An Architecture for Fast and General Data Processing on Large Clusters拗胜。圖中,一個(gè)box代表一個(gè)RDD怒允,一個(gè)帶陰影的矩形框代表一個(gè)partition埂软。
另外給一張join的操作過程圖片:
RDD分區(qū)
我們分析這樣一個(gè)應(yīng)用,它在內(nèi)存中保存著一張很大的用戶信息表(UserData)——也就是一個(gè)由(UserId, UserInfo)對(duì)組成的RDD纫事,其中UserInfo包含一個(gè)該用戶所訂閱的主題列表勘畔。該應(yīng)用會(huì)周期性性地將這張表與一個(gè)小文件進(jìn)行組合,這個(gè)小文件中存著過去五分鐘內(nèi)發(fā)生的事件(events)——其實(shí)就是一個(gè)由(UserID, LinkInfo)對(duì)組成的表丽惶。如果我們要進(jìn)行對(duì)用戶訪問情況的統(tǒng)計(jì)炫七,就需要對(duì)這兩個(gè)表進(jìn)行join操作,以獲得(UserID,UserInfo,LinkInfo)信息钾唬。
如圖pic-2默認(rèn)情況下万哪,join操作會(huì)將兩個(gè)數(shù)據(jù)集中的所有的鍵的哈希值都求出來,將哈希值相同的記錄傳送到同一臺(tái)機(jī)器上抡秆,之后在該機(jī)器上對(duì)所有鍵相同的記錄進(jìn)行join操作奕巍。
所以這種情況之下,每次進(jìn)行join都會(huì)有數(shù)據(jù)混洗的問題儒士。造成了很大的網(wǎng)絡(luò)傳輸開銷的止。
這種情況之下由于UserData表比events表要大得多,所以選擇將UserData進(jìn)行分區(qū)着撩。如果對(duì)UserData進(jìn)行分區(qū)诅福,之后Spark就會(huì)知曉該RDD是根據(jù)鍵的哈希值來分區(qū)的,這樣在調(diào)用join()時(shí)拖叙,Spark就會(huì)利用這一點(diǎn)氓润。當(dāng)調(diào)用UserData.join(events)時(shí),Spark只會(huì)對(duì)events進(jìn)行數(shù)據(jù)混洗操作薯鳍,將events中特定的UserID的記錄發(fā)送到userData的對(duì)應(yīng)分區(qū)所在的那臺(tái)機(jī)器上旺芽。如下圖:
自定義分區(qū)
我們都知道Spark內(nèi)部提供了HashPartitioner和RangePartitioner兩種分區(qū)策略,這兩種分區(qū)策略在很多情況下都適合我們的場(chǎng)景辐啄。但是有些情況下采章,Spark內(nèi)部不能符合咱們的需求,這時(shí)候我們就可以自定義分區(qū)策略壶辜。為此悯舟,Spark提供了相應(yīng)的接口,我們只需要擴(kuò)展Partitioner抽象類砸民,然后實(shí)現(xiàn)里面的三個(gè)方法:
def numPartitions: Int:這個(gè)方法需要返回你想要?jiǎng)?chuàng)建分區(qū)的個(gè)數(shù)抵怎;
def getPartition(key: Any): Int:這個(gè)函數(shù)需要對(duì)輸入的key做計(jì)算奋救,然后返回該key的分區(qū)ID,范圍一定是0到numPartitions-1反惕;
equals():這個(gè)是Java標(biāo)準(zhǔn)的判斷相等的函數(shù)尝艘,之所以要求用戶實(shí)現(xiàn)這個(gè)函數(shù)是因?yàn)镾park內(nèi)部會(huì)比較兩個(gè)RDD的分區(qū)是否一樣。
假如我們想把來自同一個(gè)域名的URL放到一臺(tái)節(jié)點(diǎn)上姿染,比如:http://www.csdn.net和http://www.csdn.net/bolg/背亥,如果你使用HashPartitioner,這兩個(gè)URL的Hash值可能不一樣悬赏,這就使得這兩個(gè)URL被放到不同的節(jié)點(diǎn)上狡汉。所以這種情況下我們就需要自定義我們的分區(qū)策略,可以如下實(shí)現(xiàn):
因?yàn)閔ashCode值可能為負(fù)數(shù)闽颇,所以我們需要對(duì)他進(jìn)行處理盾戴。然后我們就可以在partitionBy()方法里面使用我們的分區(qū):
類似的,在Java中定義自己的分區(qū)策略和Scala類似兵多,只需要繼承org.apache.spark.Partitioner尖啡,并實(shí)現(xiàn)其中的方法即可。
在Python中剩膘,你不需要擴(kuò)展Partitioner類可婶,我們只需要對(duì)iteblog.partitionBy()加上一個(gè)額外的hash函數(shù),如下: