3.2 彈性分布式數(shù)據(jù)集

3.2 彈性分布式數(shù)據(jù)集

本節(jié)簡單介紹RDD拜鹤,并介紹RDD與分布式共享內(nèi)存的異同徽千。

3.2.1 RDD簡介

在集群背后,有一個非常重要的分布式數(shù)據(jù)架構(gòu)被环,即彈性分布式數(shù)據(jù)集(resilient distributed dataset真竖,RDD)脐雪,它是邏輯集中的實體,在集群中的多臺機(jī)器上進(jìn)行了數(shù)據(jù)分區(qū)恢共。通過對多臺機(jī)器上不同RDD分區(qū)的控制战秋,就能夠減少機(jī)器之間的數(shù)據(jù)重排(data shuffling)。Spark提供了“partitionBy”運(yùn)算符讨韭,能夠通過集群中多臺機(jī)器之間對原始RDD進(jìn)行數(shù)據(jù)再分配來創(chuàng)建一個新的RDD脂信。RDD是Spark的核心數(shù)據(jù)結(jié)構(gòu),通過RDD的依賴關(guān)系形成Spark的調(diào)度順序透硝。通過對RDD的操作形成整個Spark程序狰闪。

(1)RDD的兩種創(chuàng)建方式

1)從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲系統(tǒng),如Hive濒生、Cassandra埋泵、Hbase)輸入(如HDFS)創(chuàng)建。

2)從父RDD轉(zhuǎn)換得到新的RDD罪治。

(2)RDD的兩種操作算子

對于RDD可以有兩種計算操作算子:Transformation(變換)與Action(行動)丽声。

1)Transformation(變換)。

Transformation操作是延遲計算的规阀,也就是說從一個RDD轉(zhuǎn)換生成另一個RDD的轉(zhuǎn)換操作不是馬上執(zhí)行恒序,需要等到有Actions操作時瘦麸,才真正觸發(fā)運(yùn)算谁撼。

2)Action(行動)

Action算子會觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出到Spark系統(tǒng)滋饲。

(3)RDD的重要內(nèi)部屬性

1)分區(qū)列表厉碟。

2)計算每個分片的函數(shù)。

3)對父RDD的依賴列表屠缭。

4)對Key-Value 對數(shù)據(jù)類型RDD的分區(qū)器箍鼓,控制分區(qū)策略和分區(qū)數(shù)。

5)每個數(shù)據(jù)分區(qū)的地址列表(如HDFS上的數(shù)據(jù)塊的地址)呵曹。

3.2.2 RDD與分布式共享內(nèi)存的異同

RDD是一種分布式的內(nèi)存抽象款咖,表3-1列出了RDD與分布式共享內(nèi)存(Distributed Shared Memory何暮,DSM)的對比。在DSM系統(tǒng)[插圖]中铐殃,應(yīng)用可以向全局地址空間的任意位置進(jìn)行讀寫操作海洼。DSM是一種通用的內(nèi)存數(shù)據(jù)抽象,但這種通用性同時也使其在商用集群上實現(xiàn)有效的容錯性和一致性更加困難富腊。

RDD與DSM主要區(qū)別在于[插圖]坏逢,不僅可以通過批量轉(zhuǎn)換創(chuàng)建(即“寫”)RDD,還可以對任意內(nèi)存位置讀寫赘被。RDD限制應(yīng)用執(zhí)行批量寫操作是整,這樣有利于實現(xiàn)有效的容錯。特別是民假,由于RDD可以使用Lineage(血統(tǒng))來恢復(fù)分區(qū)浮入,基本沒有檢查點開銷。失效時只需要重新計算丟失的那些RDD分區(qū)阳欲,就可以在不同節(jié)點上并行執(zhí)行舵盈,而不需要回滾(Roll Back)整個程序。

表3-1 RDD與DSM的對比

[插圖]

通過備份任務(wù)的復(fù)制球化,RDD還可以處理落后任務(wù)(即運(yùn)行很慢的節(jié)點)秽晚,這點與MapReduce類似,DSM則難以實現(xiàn)備份任務(wù)筒愚,因為任務(wù)及其副本均需讀寫同一個內(nèi)存位置的數(shù)據(jù)赴蝇。

與DSM相比,RDD模型有兩個優(yōu)勢巢掺。第一句伶,對于RDD中的批量操作,運(yùn)行時將根據(jù)數(shù)據(jù)存放的位置來調(diào)度任務(wù)陆淀,從而提高性能考余。第二,對于掃描類型操作轧苫,如果內(nèi)存不足以緩存整個RDD楚堤,就進(jìn)行部分緩存,將內(nèi)存容納不下的分區(qū)存儲到磁盤上含懊。

另外身冬,RDD支持粗粒度和細(xì)粒度的讀操作。RDD上的很多函數(shù)操作(如count和collect等)都是批量讀操作岔乔,即掃描整個數(shù)據(jù)集酥筝,可以將任務(wù)分配到距離數(shù)據(jù)最近的節(jié)點上。同時雏门,RDD也支持細(xì)粒度操作嘿歌,即在哈系停或范圍分區(qū)的RDD上執(zhí)行關(guān)鍵字查找。

后續(xù)將算子從兩個維度結(jié)合在3.3節(jié)對RDD算子進(jìn)行詳細(xì)介紹宙帝。

1)Transformations(變換)和Action(行動)算子維度阅束。

2)在Transformations算子中再將數(shù)據(jù)類型維度細(xì)分為:Value數(shù)據(jù)類型和Key-Value對數(shù)據(jù)類型的Transformations算子。Value型數(shù)據(jù)的算子封裝在RDD類中可以直接使用茄唐,Key-Value 對數(shù)據(jù)類型的算子封裝于PairRDDFunctions類中息裸,用戶需要引入import org.apache.spark.SparkContext._才能夠使用。進(jìn)行這樣的細(xì)分是由于不同的數(shù)據(jù)類型處理思想不太一樣沪编,同時有些算子是不同的呼盆。

3.2.3 Spark的數(shù)據(jù)存儲

Spark數(shù)據(jù)存儲的核心是彈性分布式數(shù)據(jù)集(RDD)。RDD可以被抽象地理解為一個大的數(shù)組(Array)蚁廓,但是這個數(shù)組是分布在集群上的访圃。邏輯上RDD的每個分區(qū)叫一個Partition。

在Spark的執(zhí)行過程中相嵌,RDD經(jīng)歷一個個的Transfomation算子之后腿时,最后通過Action算子進(jìn)行觸發(fā)操作。邏輯上每經(jīng)歷一次變換饭宾,就會將RDD轉(zhuǎn)換為一個新的RDD批糟,RDD之間通過Lineage產(chǎn)生依賴關(guān)系,這個關(guān)系在容錯中有很重要的作用看铆。變換的輸入和輸出都是RDD徽鼎。RDD會被劃分成很多的分區(qū)分布到集群的多個節(jié)點中。分區(qū)是個邏輯概念弹惦,變換前后的新舊分區(qū)在物理上可能是同一塊內(nèi)存存儲否淤。這是很重要的優(yōu)化,以防止函數(shù)式數(shù)據(jù)不變性(immutable)導(dǎo)致的內(nèi)存需求無限擴(kuò)張棠隐。有些RDD是計算的中間結(jié)果石抡,其分區(qū)并不一定有相應(yīng)的內(nèi)存或磁盤數(shù)據(jù)與之對應(yīng),如果要迭代使用數(shù)據(jù)助泽,可以調(diào)cache()函數(shù)緩存數(shù)據(jù)啰扛。

圖3-2為RDD的數(shù)據(jù)存儲模型。

[插圖]

圖3-2 RDD數(shù)據(jù)管理模型

圖3-2中的RDD_1含有5個分區(qū)(p1报咳、p2侠讯、p3挖藏、p4暑刃、p5),分別存儲在4個節(jié)點(Node1膜眠、node2岩臣、Node3溜嗜、Node4)中。RDD_2含有3個分區(qū)(p1架谎、p2炸宵、p3),分布在3個節(jié)點(Node1谷扣、Node2土全、Node3)中。

在物理上会涎,RDD對象實質(zhì)上是一個元數(shù)據(jù)結(jié)構(gòu)裹匙,存儲著Block、Node等的映射關(guān)系末秃,以及其他的元數(shù)據(jù)信息。一個RDD就是一組分區(qū),在物理數(shù)據(jù)存儲上留搔,RDD的每個分區(qū)對應(yīng)的就是一個Block译红,Block可以存儲在內(nèi)存,當(dāng)內(nèi)存不夠時可以存儲到磁盤上铃将。

每個Block中存儲著RDD所有數(shù)據(jù)項的一個子集项鬼,暴露給用戶的可以是一個Block的迭代器(例如,用戶可以通過mapPartitions獲得分區(qū)迭代器進(jìn)行操作)劲阎,也可以就是一個數(shù)據(jù)項(例如秃臣,通過map函數(shù)對每個數(shù)據(jù)項并行計算)。本書會在后面章節(jié)具體介紹數(shù)據(jù)管理的底層實現(xiàn)細(xì)節(jié)哪工。

如果是從HDFS等外部存儲作為輸入數(shù)據(jù)源奥此,數(shù)據(jù)按照HDFS中的數(shù)據(jù)分布策略進(jìn)行數(shù)據(jù)分區(qū),HDFS中的一個Block對應(yīng)Spark的一個分區(qū)雁比。同時Spark支持重分區(qū)稚虎,數(shù)據(jù)通過Spark默認(rèn)的或者用戶自定義的分區(qū)器決定數(shù)據(jù)塊分布在哪些節(jié)點。例如偎捎,支持Hash分區(qū)(按照數(shù)據(jù)項的Key值取Hash值蠢终,Hash值相同的元素放入同一個分區(qū)之內(nèi))和Range分區(qū)(將屬于同一數(shù)據(jù)范圍的數(shù)據(jù)放入同一分區(qū))等分區(qū)策略。

下面具體介紹這些算子的功能茴她。

3.3 Spark算子分類及功能

本節(jié)將主要介紹Spark算子的作用寻拂,以及算子的分類。

1.Saprk算子的作用

圖3-3描述了Spark的輸入丈牢、運(yùn)行轉(zhuǎn)換祭钉、輸出。在運(yùn)行轉(zhuǎn)換中通過算子對RDD進(jìn)行轉(zhuǎn)換己沛。算子是RDD中定義的函數(shù)慌核,可以對RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換和操作距境。

[插圖]

圖3-3 Spark算子和數(shù)據(jù)空間

1)輸入:在Spark程序運(yùn)行中,數(shù)據(jù)從外部數(shù)據(jù)空間(如分布式存儲:textFile讀取HDFS等垮卓,parallelize方法輸入Scala集合或數(shù)據(jù))輸入Spark垫桂,數(shù)據(jù)進(jìn)入Spark運(yùn)行時數(shù)據(jù)空間,轉(zhuǎn)化為Spark中的數(shù)據(jù)塊粟按,通過BlockManager進(jìn)行管理诬滩。

2)運(yùn)行:在Spark數(shù)據(jù)輸入形成RDD后便可以通過變換算子,如fliter等灭将,對數(shù)據(jù)進(jìn)行操作并將RDD轉(zhuǎn)化為新的RDD碱呼,通過Action算子,觸發(fā)Spark提交作業(yè)宗侦。如果數(shù)據(jù)需要復(fù)用愚臀,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存矾利。

3)輸出:程序運(yùn)行結(jié)束數(shù)據(jù)會輸出Spark運(yùn)行時空間姑裂,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合男旗,count返回Scala int型數(shù)據(jù))舶斧。

Spark的核心數(shù)據(jù)模型是RDD,但RDD是個抽象類察皇,具體由各子類實現(xiàn)茴厉,如MappedRDD、ShuffledRDD等子類什荣。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類矾缓。

2.算子的分類

大致可以分為三大類算子。

1)Value數(shù)據(jù)類型的Transformation算子稻爬,這種變換并不觸發(fā)提交作業(yè)嗜闻,針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù)。

2)Key-Value數(shù)據(jù)類型的Transfromation算子桅锄,這種變換并不觸發(fā)提交作業(yè)琉雳,針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對。

3)Action算子友瘤,這類算子會觸發(fā)SparkContext提交Job作業(yè)翠肘。

下面分別對這3類算子進(jìn)行詳細(xì)介紹。

3.3.1 Value型Transformation算子

處理數(shù)據(jù)類型為Value型的Transformation算子可以根據(jù)RDD變換算子的輸入分區(qū)與輸出分區(qū)關(guān)系分為以下幾種類型辫秧。

1)輸入分區(qū)與輸出分區(qū)一對一型束倍。

2)輸入分區(qū)與輸出分區(qū)多對一型。

3)輸入分區(qū)與輸出分區(qū)多對多型。

4)輸出分區(qū)為輸入分區(qū)子集型肌幽。

5)還有一種特殊的輸入與輸出分區(qū)一對一的算子類型:Cache型。Cache算子對RDD分區(qū)進(jìn)行緩存抓半。

1.輸入分區(qū)與輸出分區(qū)一對一型

(1)map

將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素喂急。源碼中的map算子相當(dāng)于初始化一個RDD,新RDD叫作MappedRDD(this,sc.clean(f))笛求。

圖3-4中的每個方框表示一個RDD分區(qū)廊移,左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù)f:T->U映射為右側(cè)的新的RDD分區(qū)。但是實際只有等到Action算子觸發(fā)后探入,這個f函數(shù)才會和其他函數(shù)在一個Stage中對數(shù)據(jù)進(jìn)行運(yùn)算狡孔。V1輸入f轉(zhuǎn)換輸出V’1。

[插圖]

圖3-4 map算子對RDD轉(zhuǎn)換

(2)flatMap

將原來RDD中的每個元素通過函數(shù)f轉(zhuǎn)換為新的元素蜂嗽,并將生成的RDD的每個集合中的元素合并為一個集合苗膝。內(nèi)部創(chuàng)建 FlatMappedRDD(this,sc.clean(f))。

圖3-5中小方框表示RDD的一個分區(qū)植旧,對分區(qū)進(jìn)行flatMap函數(shù)操作辱揭,flatMap中傳入的函數(shù)為f:T->U,T和U可以是任意的數(shù)據(jù)類型病附。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)问窃。外部大方框可以認(rèn)為是一個RDD分區(qū),小方框代表一個集合完沪。V1域庇、V2、V3在一個集合作為RDD的一個數(shù)據(jù)項覆积,轉(zhuǎn)換為V’1听皿、V’2、V’3后宽档,將結(jié)合拆散写穴,形成為RDD中的數(shù)據(jù)項。

[插圖]

圖3-5 flapMap算子對RDD轉(zhuǎn)換

(3)mapPartitions

mapPartitions函數(shù)獲取到每個分區(qū)的迭代器雌贱,在函數(shù)中通過這個分區(qū)整體的迭代器對整個分區(qū)的元素進(jìn)行操作啊送。內(nèi)部實現(xiàn)是生成MapPartitionsRDD。圖3-6中的方框代表一個RDD分區(qū)欣孤。

圖3-6中馋没,用戶通過函數(shù)f(iter)=>iter.filter(_>=3)對分區(qū)中的所有數(shù)據(jù)進(jìn)行過濾,>=3的數(shù)據(jù)保留降传。一個方塊代表一個RDD分區(qū)篷朵,含有1、2、3的分區(qū)過濾只剩下元素3声旺。

[插圖]

圖3-6 mapPartitions算子對RDD轉(zhuǎn)換

(4)glom

glom函數(shù)將每個分區(qū)形成一個數(shù)組笔链,內(nèi)部實現(xiàn)是返回的GlommedRDD。圖3-7中的每個方框代表一個RDD分區(qū)腮猖。

圖3-7中的方框代表一個分區(qū)鉴扫。該圖表示含有V1、V2澈缺、V3的分區(qū)通過函數(shù)glom形成一個數(shù)組Array[(V1),(V2),(V3)]坪创。

[插圖]

圖3-7 glom算子對RDD轉(zhuǎn)換

2.輸入分區(qū)與輸出分區(qū)多對一型

(1)union

使用union函數(shù)時需要保證兩個RDD元素的數(shù)據(jù)類型相同,返回的RDD數(shù)據(jù)類型和被合并的RDD元素數(shù)據(jù)類型相同姐赡,并不進(jìn)行去重操作莱预,保存所有元素。如果想去重项滑,可以使用distinct()依沮。++符號相當(dāng)于uion函數(shù)操作。

圖3-8中左側(cè)的大方框代表兩個RDD枪狂,大方框內(nèi)的小方框代表RDD的分區(qū)悉抵。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)摘完。含有V1姥饰,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD孝治。V1列粪、V1、V2谈飒、V8形成一個分區(qū)岂座,其他元素同理進(jìn)行合并。

(2)cartesian

對兩個RDD內(nèi)的所有元素進(jìn)行笛卡爾積操作杭措。操作后费什,內(nèi)部實現(xiàn)返回CartesianRDD。圖3-9中左側(cè)的大方框代表兩個RDD手素,大方框內(nèi)的小方框代表RDD的分區(qū)鸳址。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)泉懦。

[插圖]

圖3-8 union算子對RDD轉(zhuǎn)換

圖3-9中的大方框代表RDD稿黍,大方框中的小方框代表RDD分區(qū)。例如崩哩,V1和另一個RDD中的W1巡球、W2言沐、Q5進(jìn)行笛卡爾積運(yùn)算形成(V1,W1)、(V1,W2)酣栈、(V1,Q5)险胰。

[插圖]

圖3-9 cartesian算子對RDD轉(zhuǎn)換

3.輸入分區(qū)與輸出分區(qū)多對多型

groupBy:將元素通過函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value 格式矿筝,之后將Key相同的元素分為一組起便。

函數(shù)實現(xiàn)如下。

①sc.clean()函數(shù)將用戶函數(shù)預(yù)處理:

val cleanF=sc.clean(f)

②對數(shù)據(jù)map進(jìn)行函數(shù)操作跋涣,最后再對groupByKey進(jìn)行分組操作缨睡。

this.map(t=>(cleanF(t),t)).groupByKey(p)

其中鸟悴,p中確定了分區(qū)個數(shù)和分區(qū)函數(shù)陈辱,也就決定了并行化的程度。圖3-10中的方框代表RDD分區(qū)细诸。

圖3-10中的方框代表一個RDD分區(qū)沛贪,相同key的元素合并到一個組。例如震贵,V1利赋,V2合并為一個Key-Value對,其中key為“V”猩系,Value為“V1,V2”媚送,形成V,Seq(V1,V2)。

[插圖]

圖3-10 groupBy算子對RDD轉(zhuǎn)換

4.輸出分區(qū)為輸入分區(qū)子集型

(1)filter

filter的功能是對元素進(jìn)行過濾寇甸,對每個元素應(yīng)用f函數(shù)塘偎,返回值為true的元素在RDD中保留,返回為false的將過濾掉拿霉。內(nèi)部實現(xiàn)相當(dāng)于生成FilteredRDD(this吟秩,sc.clean(f))。

下面代碼為函數(shù)的本質(zhì)實現(xiàn)绽淘。

def filter(f:T=>Boolean):RDD[T]=new FilteredRDD(this,sc.clean(f))

圖3-11中的每個方框代表一個RDD分區(qū)涵防。T可以是任意的類型。通過用戶自定義的過濾函數(shù)f沪铭,對每個數(shù)據(jù)項進(jìn)行操作壮池,將滿足條件,返回結(jié)果為true的數(shù)據(jù)項保留杀怠。例如火窒,過濾掉V2、V3保留了V1驮肉,將區(qū)分命名為V1'熏矿。

[插圖]

圖3-11 filter算子對RDD轉(zhuǎn)換

(2)distinct

distinct將RDD中的元素進(jìn)行去重操作。圖3-12中的方框代表RDD分區(qū)。

圖3-12中的每個方框代表一個分區(qū)票编,通過distinct函數(shù)褪储,將數(shù)據(jù)去重。例如慧域,重復(fù)數(shù)據(jù)V1鲤竹、V1去重后只保留一份V1。

[插圖]

圖3-12 distinct算子對RDD轉(zhuǎn)換

(3)subtract

subtract相當(dāng)于進(jìn)行集合的差操作昔榴,RDD 1去除RDD 1和RDD 2交集中的所有元素辛藻。

圖3-13中左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)互订。右側(cè)大方框代表合并后的RDD吱肌,大方框內(nèi)的小方框代表分區(qū)。V1在兩個RDD中均有仰禽,根據(jù)差集運(yùn)算規(guī)則氮墨,新RDD不保留,V2在第一個RDD有吐葵,第二個RDD沒有规揪,則在新RDD元素中包含V2。

[插圖]

圖3-13 subtract算子對RDD轉(zhuǎn)換

(4)sample

sample將RDD這個集合內(nèi)的元素進(jìn)行采樣温峭,獲取所有元素的子集猛铅。用戶可以設(shè)定是否有放回的抽樣、百分比凤藏、隨機(jī)種子奸忽,進(jìn)而決定采樣方式。

內(nèi)部實現(xiàn)是生成SampledRDD(withReplacement,fraction,seed)清笨。

函數(shù)參數(shù)設(shè)置如下月杉。

□withReplacement=true,表示有放回的抽樣抠艾;

□withReplacement=false苛萎,表示無放回的抽樣。

圖3-14中的每個方框是一個RDD分區(qū)检号。通過sample函數(shù)腌歉,采樣50%的數(shù)據(jù)。V1齐苛、V2翘盖、U1、U2凹蜂、U3馍驯、U4采樣出數(shù)據(jù)V1和U1阁危、U2,形成新的RDD汰瘫。

(5)takeSample

takeSample()函數(shù)和上面的sample函數(shù)是一個原理狂打,但是不使用相對比例采樣,而是按設(shè)定的采樣個數(shù)進(jìn)行采樣混弥,同時返回結(jié)果不再是RDD趴乡,而是相當(dāng)于對采樣后的數(shù)據(jù)進(jìn)行Collect(),返回結(jié)果的集合為單機(jī)的數(shù)組蝗拿。

圖3-15中左側(cè)的方框代表分布式的各個節(jié)點上的分區(qū)晾捏,右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組。通過takeSample對數(shù)據(jù)采樣哀托,設(shè)置為采樣一份數(shù)據(jù)惦辛,返回結(jié)果為V1。

[插圖]

圖3-14 sample算子對RDD轉(zhuǎn)換

5.Cache型

(1)cache

cache將RDD元素從磁盤緩存到內(nèi)存萤捆,相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能裙品。圖3-14中的方框代表RDD分區(qū)俗批。

圖3-16中的每個方框代表一個RDD分區(qū)俗或,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲在磁盤,通過cache算子將數(shù)據(jù)緩存在內(nèi)存岁忘。

[插圖]

圖3-15 takeSample算子對RDD轉(zhuǎn)換

[插圖]

圖3-16 cache算子對RDD轉(zhuǎn)換

(2)persist

persist函數(shù)對RDD進(jìn)行緩存操作辛慰。數(shù)據(jù)緩存在哪里由StorageLevel枚舉類型確定。有以下幾種類型的組合(見圖3-15)干像,DISK代表磁盤帅腌,MEMORY代表內(nèi)存,SER代表數(shù)據(jù)是否進(jìn)行序列化存儲麻汰。

下面為函數(shù)定義速客,StorageLevel是枚舉類型,代表存儲模式五鲫,用戶可以通過圖3-17按需選擇溺职。

persist(newLevel:Stor ageLevel)

圖3-17中列出persist函數(shù)可以緩存的模式。例如位喂,MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲在內(nèi)存和磁盤浪耘,并且以序列化的方式存儲。其他同理塑崖。

[插圖]

圖3-17 persist算子對RDD轉(zhuǎn)換

圖3-18中的方框代表RDD分區(qū)七冲。disk代表存儲在磁盤,mem代表存儲在內(nèi)存规婆。數(shù)據(jù)最初全部存儲在磁盤澜躺,通過persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存蝉稳,但是有的分區(qū)無法容納在內(nèi)存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤掘鄙,將含有U1颠区,U2的RDD仍舊存儲在內(nèi)存。

[插圖]

圖3-18 Persist算子對RDD轉(zhuǎn)換

3.3.2 Key-Value型Transformation算子

Transformation處理的數(shù)據(jù)為Key-Value形式的算子通铲,大致可以分為3種類型:輸入分區(qū)與輸出分區(qū)一對一毕莱、聚集、連接操作颅夺。

1.輸入分區(qū)與輸出分區(qū)一對一

mapValues:針對(Key,Value)型數(shù)據(jù)中的 Value進(jìn)行Map操作朋截,而不對Key進(jìn)行處理。

圖3-19中的方框代表RDD分區(qū)吧黄。a=>a+2代表只對(V1,1)數(shù)據(jù)中的1進(jìn)行加2操作部服,返回結(jié)果為3。

[插圖]

圖3-19 mapValues算子RDD對轉(zhuǎn)換

2.對單個RDD或兩個RDD聚集

(1)單個RDD聚集

1)combineByKey拗慨。

定義combineByKey算子的代碼如下廓八。

combineByKey[C](createCombiner:(V)? C,

mergeValue:(C赵抢,V)? C剧蹂,

mergeCombiners:(C,C)? C烦却,

partitioner:Partitioner

mapSideCombine:Boolean=true宠叼,

serializer:Serializer=null):RDD[(K,C)]

說明:

□createCombiner:V=>C,在C不存在的情況下其爵,如通過V創(chuàng)建seq C冒冬。

□mergeValue:(C,V)=>C,當(dāng)C已經(jīng)存在的情況下摩渺,需要merge简烤,如把item V加到seq C中,或者疊加摇幻。

□mergeCombiners:(C,C)=>C横侦,合并兩個C。

□partitioner:Partitioner(分區(qū)器)囚企,Shuffle時需要通過Partitioner的分區(qū)策略進(jìn)行分區(qū)丈咐。

□mapSideCombine:Boolean=true,為了減小傳輸量龙宏,很多combine可以在map端先做棵逊。例如,疊加可以先在一個partition中把所有相同的Key的Value疊加银酗,再shuffle辆影。

□serializerClass:String=null徒像,傳輸需要序列化,用戶可以自定義序列化類蛙讥。

例如锯蛀,相當(dāng)于將元素為(Int,Int)的RDD轉(zhuǎn)變?yōu)榱耍↖nt,Seq[Int])類型元素的RDD次慢。

圖3-20中的方框代表RDD分區(qū)旁涤。通過combineByKey,將(V1,2)迫像、(V1,1)數(shù)據(jù)合并為(V1,Seq(2,1))劈愚。

[插圖]

圖3-20 comBineByKey算子對RDD轉(zhuǎn)換

2)reduceByKey。

reduceByKey是更簡單的一種情況闻妓,只是兩個值合并成一個值菌羽,所以createCombiner很簡單,就是直接返回v由缆,而mergeValue和mergeCombiners的邏輯相同注祖,沒有區(qū)別。

函數(shù)實現(xiàn)代碼如下均唉。

def reduceByKey(partitioner:Partitioner是晨,func:(V,V)=>V):RDD[(K浸卦,V)]={

combineByKey[V]((v:V)=>v署鸡,func案糙,func限嫌,partitioner)

}

圖3-21中的方框代表RDD分區(qū)。通過用戶自定義函數(shù)(A,B)=>(A+B)时捌,將相同Key的數(shù)據(jù)(V1,2)怒医、(V1,1)的value相加,結(jié)果為(V1,3)奢讨。

[插圖]

圖3-21 reduceByKey算子對RDD轉(zhuǎn)換

3)partitionBy稚叹。

partitionBy函數(shù)對RDD進(jìn)行分區(qū)操作。

函數(shù)定義如下拿诸。

partitionBy(partitioner:Partitioner)

如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致扒袖,則不重分區(qū),如果不一致亩码,則相當(dāng)于根據(jù)分區(qū)器生成一個新的ShuffledRDD季率。

圖3-22中的方框代表RDD分區(qū)。通過新的分區(qū)策略將原來在不同分區(qū)的V1描沟、V2數(shù)據(jù)都合并到了一個分區(qū)飒泻。

[插圖]

圖3-22 partitionBy算子對RDD轉(zhuǎn)換

(2)對兩個RDD進(jìn)行聚集

cogroup函數(shù)將兩個RDD進(jìn)行協(xié)同劃分鞭光,cogroup函數(shù)的定義如下。

cogroup[W](other:RDD[(K,W)],numPartitions:Int):RDD[(K,(Iterable[V],Iterable[W]))]

對在兩個RDD中的Key-Value類型的元素泞遗,每個RDD相同Key的元素分別聚合為一個集合惰许,并且返回兩個RDD中對應(yīng)Key的元素集合的迭代器。

(K,(Iterable[V],Iterable[W]))

其中史辙,Key和Value汹买,Value是兩個RDD下相同Key的兩個數(shù)據(jù)集合的迭代器所構(gòu)成的元組。

圖3-23中的大方框代表RDD聊倔,大方框內(nèi)的小方框代表RDD中的分區(qū)卦睹。將RDD1中的數(shù)據(jù)(U1,1)、(U1,2)和RDD2中的數(shù)據(jù)(U1,2)合并為(U1,((1,2),(2)))方库。

[插圖]

圖3-23 Cogroup算子對RDD轉(zhuǎn)換

3.連接

(1)join

□oin對兩個需要連接的RDD進(jìn)行cogroup函數(shù)操作结序,cogroup原理請見上文。cogroup操作之后形成的新RDD纵潦,對每個key下的元素進(jìn)行笛卡爾積操作徐鹤,返回的結(jié)果再展平,對應(yīng)Key下的所有元組形成一個集合邀层,最后返回RDD[(K,(V,W))]

下面代碼為join的函數(shù)實現(xiàn)返敬,本質(zhì)是通過cogroup算子先進(jìn)行協(xié)同劃分,再通過flatMapValues將合并的數(shù)據(jù)打散寥院。

this.cogroup(other,partitioner).flatMapValues { case(vs,ws)=>

for(v <- vs;w <- ws)yield(v,w)}

圖3-24是對兩個RDD的join操作示意圖劲赠。大方框代表RDD,小方框代表RDD中的分區(qū)秸谢。函數(shù)對擁有相同Key的元素(例如V1)為Key凛澎,以做連接后的數(shù)據(jù)結(jié)果為(V1,(1,1))和(V1,(1,2))。

[插圖]

圖3-24 join算子對RDD轉(zhuǎn)換

(2)leftOutJoin和rightOutJoin

LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空估蹄,如果為空塑煎,則填充為空。如果不為空臭蚁,則將數(shù)據(jù)進(jìn)行連接運(yùn)算最铁,并返回結(jié)果。

下面代碼是leftOutJoin的實現(xiàn)垮兑。

if(ws.isEmpty){

vs.map(v=>(v,None))

}else {

for(v <- vs;w <- ws)yield(v,Some(w))

}

3.3.3 Actions算子

本質(zhì)上在Actions算子中通過SparkContext執(zhí)行提交作業(yè)的runJob操作冷尉,觸發(fā)了RDD DAG的執(zhí)行。

例如系枪,Actions算子collect函數(shù)的代碼如下雀哨,感興趣的讀者可以順著這個入口進(jìn)行源碼剖析。

/*返回這個RDD的所有數(shù)據(jù)嗤无,結(jié)果以數(shù)組形式存儲*/

def collect():Array[T]={

/*提交Job*/

val results=sc.runJob(this,(iter:Iterator[T])=>iter.toArray)

Array.concat(results:_*)

}

下面根據(jù)Action算子的輸出空間將Action算子進(jìn)行分類:無輸出震束、HDFS怜庸、Scala集合和數(shù)據(jù)類型。

1.無輸出

(1)foreach

對RDD中的每個元素都應(yīng)用f函數(shù)操作垢村,不返回RDD和Array割疾,而是返回Uint。

圖3-25表示foreach算子通過用戶自定義函數(shù)對每個數(shù)據(jù)項進(jìn)行操作嘉栓。本例中自定義函數(shù)為println()宏榕,控制臺打印所有數(shù)據(jù)項。

2.HDFS

(1)saveAsTextFile

函數(shù)將數(shù)據(jù)輸出侵佃,存儲到HDFS的指定目錄麻昼。

下面為函數(shù)的內(nèi)部實現(xiàn)。

this.map(x=>(NullWritable.get(),new Text(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

將RDD中的每個元素映射轉(zhuǎn)變?yōu)?Null,x.toString)馋辈,然后再將其寫入HDFS抚芦。

圖3-26中左側(cè)的方框代表RDD分區(qū),右側(cè)方框代表HDFS的Block迈螟。通過函數(shù)將RDD的每個分區(qū)存儲為HDFS中的一個Block叉抡。

[插圖]

圖3-25 foreach算子對RDD轉(zhuǎn)換

[插圖]

圖3-26 saveAsHadoopFile算子對RDD轉(zhuǎn)換

(2)saveAsObjectFile

saveAsObjectFile將分區(qū)中的每10個元素組成一個Array,然后將這個Array序列化答毫,映射為(Null,BytesWritable(Y))的元素褥民,寫入HDFS為SequenceFile的格式。

下面代碼為函數(shù)內(nèi)部實現(xiàn)洗搂。

map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))

圖3-27中的左側(cè)方框代表RDD分區(qū)消返,右側(cè)方框代表HDFS的Block。通過函數(shù)將RDD的每個分區(qū)存儲為HDFS上的一個Block耘拇。

[插圖]

圖3-27 saveAsObjectFile算子對RDD轉(zhuǎn)換

3.Scala集合和數(shù)據(jù)類型

(1)collect

collect相當(dāng)于toArray撵颊,toArray已經(jīng)過時不推薦使用,collect將分布式的RDD返回為一個單機(jī)的scala Array數(shù)組驼鞭。在這個數(shù)組上運(yùn)用scala的函數(shù)式操作秦驯。

圖3-28中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組挣棕。通過函數(shù)操作,將結(jié)果返回到Driver程序所在的節(jié)點亲桥,以數(shù)組形式存儲洛心。

(2)collectAsMap

collectAsMap對(K,V)型的RDD數(shù)據(jù)返回一個單機(jī)HashMap。對于重復(fù)K的RDD元素题篷,后面的元素覆蓋前面的元素词身。

圖3-29中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)數(shù)組番枚。數(shù)據(jù)通過collectAsMap函數(shù)返回給Driver程序計算結(jié)果法严,結(jié)果以HashMap形式存儲损敷。

[插圖]

圖3-28 Collect算子對RDD轉(zhuǎn)換

[插圖]

圖3-29 collectAsMap算子對RDD轉(zhuǎn)換

(3)reduceByKeyLocally

實現(xiàn)的是先reduce再collectAsMap的功能,先對RDD的整體進(jìn)行reduce操作深啤,然

后再收集所有結(jié)果返回為一個HashMap拗馒。

(4)lookup

下面代碼為lookup的聲明。

lookup(key:K):Seq[V]

Lookup函數(shù)對(Key,Value)型的RDD操作溯街,返回指定Key對應(yīng)的元素形成的Seq诱桂。這個函數(shù)處理優(yōu)化的部分在于,如果這個RDD包含分區(qū)器呈昔,則只會對應(yīng)處理K所在的分區(qū)挥等,然后返回由(K,V)形成的Seq。如果RDD不包含分區(qū)器堤尾,則需要對全RDD元素進(jìn)行暴力掃描處理肝劲,搜索指定K對應(yīng)的元素。

圖3-30中的左側(cè)方框代表RDD分區(qū)郭宝,右側(cè)方框代表Seq涡相,最后結(jié)果返回到Driver所在節(jié)點的應(yīng)用中。

(5)count

count返回整個RDD的元素個數(shù)剩蟀。內(nèi)部函數(shù)實現(xiàn)如下催蝗。

Def count():Long=sc.runJob(this,Utils.getIteratorSize_).sum

在圖3-31中,返回數(shù)據(jù)的個數(shù)為5育特。一個方塊代表一個RDD分區(qū)丙号。

[插圖]

圖3-30 lookup對RDD轉(zhuǎn)換

[插圖]

圖3-31 count對RDD轉(zhuǎn)換

(6)top

top可返回最大的k個元素。函數(shù)定義如下缰冤。

top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函數(shù)說明如下犬缨。

□top返回最大的k個元素。

□take返回最小的k個元素棉浸。

□takeOrdered返回最小的k個元素怀薛,并且在返回的數(shù)組中保持元素的順序。

□first相當(dāng)于top(1)返回整個RDD中的前k個元素迷郑,可以定義排序的方式Ordering[T]枝恋。返回的是一個含前k個元素的數(shù)組。

(7)reduce

reduce函數(shù)相當(dāng)于對RDD中的元素進(jìn)行reduceLeft函數(shù)的操作嗡害。函數(shù)實現(xiàn)如下焚碌。

Some(iter.reduceLeft(cleanF))

reduceLeft先對兩個元素<K,V>進(jìn)行reduce函數(shù)操作霸妹,然后將結(jié)果和迭代器取出的下一個元素<k十电,V>進(jìn)行reduce函數(shù)操作,直到迭代器遍歷完所有元素,得到最后結(jié)果鹃骂。

在RDD中台盯,先對每個分區(qū)中的所有元素<K,V>的集合分別進(jìn)行reduceLeft。每個分區(qū)形成的結(jié)果相當(dāng)于一個元素<K,V>畏线,再對這個結(jié)果集合進(jìn)行reduceleft操作静盅。

例如:用戶自定義函數(shù)如下。

f:(A象踊,B)=>(A._1+"@"+B._1温亲,A._2+B._2)

圖3-32中的方框代表一個RDD分區(qū),通過用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運(yùn)算杯矩。示例最后的返回結(jié)果為V1@[插圖]V2U!@U2@U3@U4,12栈虚。

[插圖]

圖3-32 reduce算子對RDD轉(zhuǎn)換

(8)fold

fold和reduce的原理相同,但是與reduce不同史隆,相當(dāng)于每個reduce時魂务,迭代器取的第一個元素是zeroValue。

圖3-33中通過下面的用戶自定義函數(shù)進(jìn)行fold運(yùn)算泌射,圖中的一個方框代表一個RDD分區(qū)粘姜。讀者可以參照(7)reduce函數(shù)理解。

fold(("V0@"熔酷,2))((A孤紧,B)=>(A._1+"@"+B._1,A._2+B._2))

[插圖]

圖3-33 fold算子對RDD轉(zhuǎn)換

(9)aggregate

aggregate先對每個分區(qū)的所有元素進(jìn)行aggregate操作拒秘,再對分區(qū)的結(jié)果進(jìn)行fold操作号显。

aggreagate與fold和reduce的不同之處在于,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集躺酒,這種聚集是并行化的色乾。而在fold和reduce函數(shù)的運(yùn)算過程中剧包,每個分區(qū)中需要進(jìn)行串行處理,每個分區(qū)串行計算完結(jié)果尊流,結(jié)果再按之前的方式進(jìn)行聚集麦锯,并返回最終聚集結(jié)果唱较。

函數(shù)的定義如下天梧。

aggregate[B](z:B)(seqop:(B,A)? B,combop:(B,B)? B):B

圖3-34通過用戶自定義函數(shù)對RDD 進(jìn)行aggregate的聚集操作戚丸,圖中的每個方框代表一個RDD分區(qū)。

rdd.aggregate("V0@",2)((A偎肃,B)=>(A._1+"@"+B._1煞烫,A._2+B._2)),

(A,B)=>(A._1+"@"+B_1,A._@+B_.2))

最后,介紹兩個計算模型中的兩個特殊變量累颂。

廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表,以及廣播大變量等場景。這些數(shù)據(jù)集合在單節(jié)點內(nèi)存能夠容納紊馏,不需要像RDD那樣在節(jié)點之間打散存儲料饥。Spark運(yùn)行時把廣播變量數(shù)據(jù)發(fā)到各個節(jié)點,并保存下來朱监,后續(xù)計算可以復(fù)用岸啡。相比Hadoop的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享赫编。Broadcast的底層實現(xiàn)采用了BT機(jī)制巡蘸。有興趣的讀者可以參考論文[插圖]。

[插圖]

圖3-34 aggregate算子對RDD轉(zhuǎn)換

㈡代表V擂送。㈢代表U悦荒。

accumulator變量:允許做全局累加操作,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運(yùn)行指標(biāo)的情景嘹吨。

3.4 本章小結(jié)

本章主要介紹了Spark的計算模型搬味,Spark將應(yīng)用程序整體翻譯為一個有向無環(huán)圖進(jìn)行調(diào)度和執(zhí)行。相比MapReduce蟀拷,Spark提供了更加優(yōu)化和復(fù)雜的執(zhí)行流碰纬。

讀者還可以深入了解Spark的運(yùn)行機(jī)制與Spark算子,這樣能更加直觀地了解API的使用问芬。Spark提供了更加豐富的函數(shù)式算子悦析,這樣就為Spark上層組件的開發(fā)奠定了堅實的基礎(chǔ)。

通過閱讀本章此衅,讀者可以對Spark計算模型進(jìn)行更為宏觀的把握强戴。相信讀者還想對Spark內(nèi)部執(zhí)行機(jī)制進(jìn)行更深入的了解,下面章節(jié)就對Spark的內(nèi)核進(jìn)行更深入的剖析炕柔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末酌泰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子匕累,更是在濱河造成了極大的恐慌陵刹,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件欢嘿,死亡現(xiàn)場離奇詭異衰琐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)炼蹦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進(jìn)店門羡宙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人掐隐,你說我怎么就攤上這事狗热〕伲” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵匿刮,是天一觀的道長僧凰。 經(jīng)常有香客問我,道長熟丸,這世上最難降的妖魔是什么训措? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮光羞,結(jié)果婚禮上绩鸣,老公的妹妹穿的比我還像新娘。我一直安慰自己纱兑,他們只是感情好呀闻,可當(dāng)我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著萍启,像睡著了一般总珠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上勘纯,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天局服,我揣著相機(jī)與錄音,去河邊找鬼驳遵。 笑死淫奔,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的堤结。 我是一名探鬼主播唆迁,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼竞穷!你這毒婦竟也來了唐责?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤瘾带,失蹤者是張志新(化名)和其女友劉穎鼠哥,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體看政,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡朴恳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了允蚣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片于颖。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嚷兔,靈堂內(nèi)的尸體忽然破棺而出森渐,到底是詐尸還是另有隱情做入,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布章母,位于F島的核電站母蛛,受9級特大地震影響翩剪,放射性物質(zhì)發(fā)生泄漏乳怎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一前弯、第九天 我趴在偏房一處隱蔽的房頂上張望蚪缀。 院中可真熱鬧,春花似錦恕出、人聲如沸询枚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽金蜀。三九已至,卻和暖如春的畴,著一層夾襖步出監(jiān)牢的瞬間渊抄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工丧裁, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留护桦,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓煎娇,卻偏偏與公主長得像二庵,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子缓呛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容