Spark大數(shù)據(jù)分析實(shí)戰(zhàn)
1.4 彈性分布式數(shù)據(jù)集
本節(jié)將介紹彈性分布式數(shù)據(jù)集RDD狂打。Spark是一個(gè)分布式計(jì)算框架,而RDD是其對(duì)分布式內(nèi)存數(shù)據(jù)的抽象揽涮,可以認(rèn)為RDD就是Spark分布式算法的數(shù)據(jù)結(jié)構(gòu)丈探,而RDD之上的操作是Spark分布式算法的核心原語,由數(shù)據(jù)結(jié)構(gòu)和原語設(shè)計(jì)上層算法寄纵。Spark最終會(huì)將算法(RDD上的一連串操作)翻譯為DAG形式的工作流進(jìn)行調(diào)度,并進(jìn)行分布式任務(wù)的分發(fā)脖苏。
1.4.1 RDD簡(jiǎn)介
在集群背后程拭,有一個(gè)非常重要的分布式數(shù)據(jù)架構(gòu),即彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset棍潘,RDD)恃鞋。它在集群中的多臺(tái)機(jī)器上進(jìn)行了數(shù)據(jù)分區(qū),邏輯上可以認(rèn)為是一個(gè)分布式的數(shù)組亦歉,而數(shù)組中每個(gè)記錄可以是用戶自定義的任意數(shù)據(jù)結(jié)構(gòu)恤浪。RDD是Spark的核心數(shù)據(jù)結(jié)構(gòu),通過RDD的依賴關(guān)系形成Spark的調(diào)度順序肴楷,通過對(duì)RDD的操作形成整個(gè)Spark程序水由。
(1)RDD創(chuàng)建方式
1)從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲(chǔ)系統(tǒng),如Hive赛蔫、Cassandra砂客、HBase)輸入(例如HDFS)創(chuàng)建泥张。
2)從父RDD轉(zhuǎn)換得到新RDD。
3)通過parallelize或makeRDD將單機(jī)數(shù)據(jù)創(chuàng)建為分布式RDD鞠值。
(2)RDD的兩種操作算子
對(duì)于RDD可以有兩種操作算子:轉(zhuǎn)換(Transformation)與行動(dòng)(Action)媚创。
1)轉(zhuǎn)換(Transformation):Transformation操作是延遲計(jì)算的,也就是說從一個(gè)RDD轉(zhuǎn)換生成另一個(gè)RDD的轉(zhuǎn)換操作不是馬上執(zhí)行彤恶,需要等到有Action操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算钞钙。
2)行動(dòng)(Action):Action算子會(huì)觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出Spark系統(tǒng)声离。
(3)RDD的重要內(nèi)部屬性
通過RDD的內(nèi)部屬性芒炼,用戶可以獲取相應(yīng)的元數(shù)據(jù)信息。通過這些信息可以支持更復(fù)雜的算法或優(yōu)化抵恋。
1)分區(qū)列表:通過分區(qū)列表可以找到一個(gè)RDD中包含的所有分區(qū)及其所在地址焕议。
2)計(jì)算每個(gè)分片的函數(shù):通過函數(shù)可以對(duì)每個(gè)數(shù)據(jù)塊進(jìn)行RDD需要進(jìn)行的用戶自定義函數(shù)運(yùn)算。
3)對(duì)父RDD的依賴列表:為了能夠回溯到父RDD弧关,為容錯(cuò)等提供支持盅安。
4)對(duì)key-value pair數(shù)據(jù)類型RDD的分區(qū)器,控制分區(qū)策略和分區(qū)數(shù)世囊。通過分區(qū)函數(shù)可以確定數(shù)據(jù)記錄在各個(gè)分區(qū)和節(jié)點(diǎn)上的分配别瞭,減少分布不平衡。
5)每個(gè)數(shù)據(jù)分區(qū)的地址列表(如HDFS上的數(shù)據(jù)塊的地址)株憾。
如果數(shù)據(jù)有副本蝙寨,則通過地址列表可以獲知單個(gè)數(shù)據(jù)塊的所有副本地址,為負(fù)載均衡和容錯(cuò)提供支持嗤瞎。
(4)Spark計(jì)算工作流
圖1-5中描述了Spark的輸入墙歪、運(yùn)行轉(zhuǎn)換、輸出贝奇。在運(yùn)行轉(zhuǎn)換中通過算子對(duì)RDD進(jìn)行轉(zhuǎn)換虹菲。算子是RDD中定義的函數(shù),可以對(duì)RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換和操作掉瞳。
·輸入:在Spark程序運(yùn)行中毕源,數(shù)據(jù)從外部數(shù)據(jù)空間(例如,HDFS陕习、Scala集合或數(shù)據(jù))輸入到Spark霎褐,數(shù)據(jù)就進(jìn)入了Spark運(yùn)行時(shí)數(shù)據(jù)空間,會(huì)轉(zhuǎn)化為Spark中的數(shù)據(jù)塊该镣,通過BlockManager進(jìn)行管理冻璃。
·運(yùn)行:在Spark數(shù)據(jù)輸入形成RDD后,便可以通過變換算子fliter等,對(duì)數(shù)據(jù)操作并將RDD轉(zhuǎn)化為新的RDD俱饿,通過行動(dòng)(Action)算子歌粥,觸發(fā)Spark提交作業(yè)。如果數(shù)據(jù)需要復(fù)用拍埠,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存土居。
·輸出:程序運(yùn)行結(jié)束數(shù)據(jù)會(huì)輸出Spark運(yùn)行時(shí)空間枣购,存儲(chǔ)到分布式存儲(chǔ)中(如saveAsTextFile輸出到HDFS)或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合,count返回Scala Int型數(shù)據(jù))擦耀。
[插圖]
圖1-5 Spark算子和數(shù)據(jù)空間
Spark的核心數(shù)據(jù)模型是RDD棉圈,但RDD是個(gè)抽象類,具體由各子類實(shí)現(xiàn)眷蜓,如MappedRDD分瘾、ShuffledRDD等子類。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類吁系。
1.4.2 RDD算子分類
本節(jié)將主要介紹Spark算子的作用德召,以及算子的分類。
Spark算子大致可以分為以下兩類汽纤。
1)Transformation變換算子:這種變換并不觸發(fā)提交作業(yè)上岗,完成作業(yè)中間過程處理。
2)Action行動(dòng)算子:這類算子會(huì)觸發(fā)SparkContext提交Job作業(yè)蕴坪。
下面分別對(duì)兩類算子進(jìn)行詳細(xì)介紹肴掷。
1.Transformations算子
下文將介紹常用和較為重要的Transformation算子。
(1)map
將原來RDD的每個(gè)數(shù)據(jù)項(xiàng)通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素背传。源碼中map算子相當(dāng)于初始化一個(gè)RDD呆瞻,新RDD叫做MappedRDD(this,sc.clean(f))径玖。
圖1-7中每個(gè)方框表示一個(gè)RDD分區(qū)痴脾,左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù)f:T->U映射為右側(cè)的新RDD分區(qū)。但是挺狰,實(shí)際只有等到Action算子觸發(fā)后這個(gè)f函數(shù)才會(huì)和其他函數(shù)在一個(gè)stage中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算明郭。在圖1-6中的第一個(gè)分區(qū),數(shù)據(jù)記錄V1輸入f丰泊,通過f轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄V'1薯定。
(2)flatMap
將原來RDD中的每個(gè)元素通過函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD的每個(gè)集合中的元素合并為一個(gè)集合瞳购,內(nèi)部創(chuàng)建FlatMappedRDD(this话侄,sc.clean(f))。
[插圖]
圖1-6 map算子對(duì)RDD轉(zhuǎn)換
圖1-7表示RDD的一個(gè)分區(qū)進(jìn)行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U年堆,T和U可以是任意的數(shù)據(jù)類型吞杭。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè)RDD分區(qū)变丧,小方框代表一個(gè)集合芽狗。V1、V2痒蓬、V3在一個(gè)集合作為RDD的一個(gè)數(shù)據(jù)項(xiàng)童擎,可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V'1攻晒、V'2顾复、V'3后,將原來的數(shù)組或容器結(jié)合拆散鲁捏,拆散的數(shù)據(jù)形成為RDD中的數(shù)據(jù)項(xiàng)芯砸。
[插圖]
圖1-7 flapMap算子對(duì)RDD轉(zhuǎn)換
(3)mapPartitions
mapPartitions函數(shù)獲取到每個(gè)分區(qū)的迭代器,在函數(shù)中通過這個(gè)分區(qū)整體的迭代器對(duì)整個(gè)分區(qū)的元素進(jìn)行操作给梅。內(nèi)部實(shí)現(xiàn)是生成MapPartitionsRDD假丧。圖1-8中的方框代表一個(gè)RDD分區(qū)。
圖1-8中破喻,用戶通過函數(shù)f(iter)=>iter.filter(_>=3)對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過濾虎谢,大于和等于3的數(shù)據(jù)保留。一個(gè)方塊代表一個(gè)RDD分區(qū)曹质,含有1婴噩、2、3的分區(qū)過濾只剩下元素3羽德。
[插圖]
圖1-8 mapPartitions算子對(duì)RDD轉(zhuǎn)換
(4)union
使用union函數(shù)時(shí)需要保證兩個(gè)RDD元素的數(shù)據(jù)類型相同几莽,返回的RDD數(shù)據(jù)類型和被合并的RDD元素?cái)?shù)據(jù)類型相同。并不進(jìn)行去重操作宅静,保存所有元素章蚣,如果想去重可以使用distinct()。同時(shí)Spark還提供更為簡(jiǎn)潔的使用union的API姨夹,通過++符號(hào)相當(dāng)于union函數(shù)操作纤垂。
圖1-9中左側(cè)大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)磷账。右側(cè)大方框代表合并后的RDD峭沦,大方框內(nèi)的小方框代表分區(qū)。合并后逃糟,V1吼鱼、V2蓬豁、V3……V8形成一個(gè)分區(qū),其他元素同理進(jìn)行合并菇肃。
(5)cartesian
對(duì)兩個(gè)RDD內(nèi)的所有元素進(jìn)行笛卡爾積操作地粪。操作后,內(nèi)部實(shí)現(xiàn)返回CartesianRDD琐谤。圖1-10中左側(cè)大方框代表兩個(gè)RDD蟆技,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD笑跛,大方框內(nèi)的小方框代表分區(qū)付魔。
例如:V1和另一個(gè)RDD中的W1、W2飞蹂、Q5進(jìn)行笛卡爾積運(yùn)算形成(V1,W1)翻屈、(V1陈哑,W2)、(V1伸眶,Q5)惊窖。
[插圖]
圖1-9 union算子對(duì)RDD轉(zhuǎn)換
[插圖]
圖1-10 cartesian算子對(duì)RDD轉(zhuǎn)換
(6)groupBy
groupBy:將元素通過函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value格式厘贼,之后將Key相同的元素分為一組界酒。
函數(shù)實(shí)現(xiàn)如下:
1)將用戶函數(shù)預(yù)處理:
val cleanF = sc.clean(f)
2)對(duì)數(shù)據(jù)map進(jìn)行函數(shù)操作,最后再進(jìn)行g(shù)roupByKey分組操作嘴秸。
this.map(t =>(cleanF(t), t)).groupByKey(p)
其中毁欣,p確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù),也就決定了并行化的程度岳掐。
圖1-11中方框代表一個(gè)RDD分區(qū)凭疮,相同key的元素合并到一個(gè)組。例如V1和V2合并為V串述,Value為V1执解,V2。形成V纲酗,Seq(V1衰腌,V2)。
[插圖]
圖1-11 groupBy算子對(duì)RDD轉(zhuǎn)換
(7)filter
filter函數(shù)功能是對(duì)元素進(jìn)行過濾觅赊,對(duì)每個(gè)元素應(yīng)用f函數(shù)右蕊,返回值為true的元素在RDD中保留,返回值為false的元素將被過濾掉茉兰。內(nèi)部實(shí)現(xiàn)相當(dāng)于生成FilteredRDD(this尤泽,sc.clean(f))。
下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
圖1-12中每個(gè)方框代表一個(gè)RDD分區(qū),T可以是任意的類型坯约。通過用戶自定義的過濾函數(shù)f熊咽,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作,將滿足條件闹丐、返回結(jié)果為true的數(shù)據(jù)項(xiàng)保留横殴。例如,過濾掉V2和V3保留了V1卿拴,為區(qū)分命名為V'1衫仑。
(8)sample
sample將RDD這個(gè)集合內(nèi)的元素進(jìn)行采樣,獲取所有元素的子集堕花。用戶可以設(shè)定是否有放回的抽樣文狱、百分比、隨機(jī)種子缘挽,進(jìn)而決定采樣方式瞄崇。
內(nèi)部實(shí)現(xiàn)是生成SampledRDD(withReplacement,fraction壕曼,seed)苏研。
函數(shù)參數(shù)設(shè)置:
·withReplacement=true,表示有放回的抽樣腮郊。
·withReplacement=false摹蘑,表示無放回的抽樣。
圖1-13中的每個(gè)方框是一個(gè)RDD分區(qū)轧飞。通過sample函數(shù)衅鹿,采樣50%的數(shù)據(jù)。V1踪少、V2塘安、U1、U2……U4采樣出數(shù)據(jù)V1和U1援奢、U2形成新的RDD兼犯。
[插圖]
圖1-12 filter算子對(duì)RDD轉(zhuǎn)換
[插圖]
圖1-13 sample算子對(duì)RDD轉(zhuǎn)換
(9)cache
cache將RDD元素從磁盤緩存到內(nèi)存。相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能集漾。
[插圖]
圖1-14 Cache算子對(duì)RDD轉(zhuǎn)換
圖1-14中每個(gè)方框代表一個(gè)RDD分區(qū)切黔,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤,通過cache算子將數(shù)據(jù)緩存在內(nèi)存具篇。
(10)persist
persist函數(shù)對(duì)RDD進(jìn)行緩存操作纬霞。數(shù)據(jù)緩存在哪里依據(jù)StorageLevel這個(gè)枚舉類型進(jìn)行確定。有以下幾種類型的組合(見圖1-14)驱显,DISK代表磁盤诗芜,MEMORY代表內(nèi)存瞳抓,SER代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)。
下面為函數(shù)定義伏恐,StorageLevel是枚舉類型孩哑,代表存儲(chǔ)模式,用戶可以通過圖1-14按需進(jìn)行選擇翠桦。
persist(newLevel:StorageLevel)
圖1-15中列出persist函數(shù)可以進(jìn)行緩存的模式横蜒。例如,MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤销凑,并且以序列化的方式存儲(chǔ)丛晌,其他同理。
[插圖]
圖1-15 persist算子對(duì)RDD轉(zhuǎn)換
圖1-16中方框代表RDD分區(qū)斗幼。disk代表存儲(chǔ)在磁盤澎蛛,mem代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初全部存儲(chǔ)在磁盤蜕窿,通過persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存瓶竭,但是有的分區(qū)無法容納在內(nèi)存,將含有V1渠羞、V2、V3的分區(qū)存儲(chǔ)到磁盤智哀。
(11)mapValues
mapValues:針對(duì)(Key次询,Value)型數(shù)據(jù)中的Value進(jìn)行Map操作,而不對(duì)Key進(jìn)行處理瓷叫。
圖1-17中的方框代表RDD分區(qū)屯吊。a=>a+2代表對(duì)(V1,1)這樣的Key Value數(shù)據(jù)對(duì)摹菠,數(shù)據(jù)只對(duì)Value中的1進(jìn)行加2操作盒卸,返回結(jié)果為3。
[插圖]
圖1-16 Persist算子對(duì)RDD轉(zhuǎn)換
[插圖]
圖1-17 mapValues算子RDD對(duì)轉(zhuǎn)換
(12)combineByKey
下面代碼為combineByKey函數(shù)的定義:
combineByKey[C](createCombiner:(V)C,
mergeValue:(C, V)C,
mergeCombiners:(C, C)C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
說明:
·createCombiner:V=>C次氨,C不存在的情況下蔽介,比如通過V創(chuàng)建seq C。
·mergeValue:(C煮寡,V)=>C虹蓄,當(dāng)C已經(jīng)存在的情況下,需要merge幸撕,比如把item V加到seq C中薇组,或者疊加。
·mergeCombiners:(C坐儿,C)=>C律胀,合并兩個(gè)C宋光。
·partitioner:Partitioner,Shuffle時(shí)需要的Partitioner炭菌。
·mapSideCombine:Boolean=true罪佳,為了減小傳輸量,很多combine可以在map端先做娃兽,比如疊加菇民,可以先在一個(gè)partition中把所有相同的key的value疊加,再shuffle投储。
·serializerClass:String=null第练,傳輸需要序列化,用戶可以自定義序列化類:
例如玛荞,相當(dāng)于將元素為(Int娇掏,Int)的RDD轉(zhuǎn)變?yōu)榱耍↖nt,Seq[Int])類型元素的RDD勋眯。
圖1-18中的方框代表RDD分區(qū)婴梧。如圖,通過combineByKey客蹋,將(V1塞蹭,2),(V1讶坯,1)數(shù)據(jù)合并為(V1番电,Seq(2,1))辆琅。
(13)reduceByKey
reduceByKey是比combineByKey更簡(jiǎn)單的一種情況漱办,只是兩個(gè)值合并成一個(gè)值,(Int婉烟,Int V)to(Int娩井,Int C),比如疊加似袁。所以createCombiner reduceBykey很簡(jiǎn)單洞辣,就是直接返回v,而mergeValue和mergeCombiners邏輯是相同的叔营,沒有區(qū)別屋彪。
[插圖]
圖1-18 comBineByKey算子對(duì)RDD轉(zhuǎn)換
函數(shù)實(shí)現(xiàn):
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
圖1-19中的方框代表RDD分區(qū)。通過用戶自定義函數(shù)(A绒尊,B)=>(A+B)函數(shù)畜挥,將相同key的數(shù)據(jù)(V1,2)和(V1婴谱,1)的value相加運(yùn)算蟹但,結(jié)果為(V1躯泰,3)。
[插圖]
圖1-19 reduceByKey算子對(duì)RDD轉(zhuǎn)換
(14)join
join對(duì)兩個(gè)需要連接的RDD進(jìn)行cogroup函數(shù)操作华糖,將相同key的數(shù)據(jù)能夠放到一個(gè)分區(qū)麦向,在cogroup操作之后形成的新RDD對(duì)每個(gè)key下的元素進(jìn)行笛卡爾積的操作,返回的結(jié)果再展平客叉,對(duì)應(yīng)key下的所有元組形成一個(gè)集合诵竭。最后返回RDD[(K,(V兼搏,W))]卵慰。
下面代碼為join的函數(shù)實(shí)現(xiàn),本質(zhì)是通過cogroup算子先進(jìn)行協(xié)同劃分佛呻,再通過flatMapValues將合并的數(shù)據(jù)打散裳朋。
this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>
for(v<-vs;w>-ws)yield(v,w) }
圖1-20是對(duì)兩個(gè)RDD的join操作示意圖。大方框代表RDD吓著,小方框代表RDD中的分區(qū)鲤嫡。函數(shù)對(duì)相同key的元素,如V1為key做連接后結(jié)果為(V1绑莺,(1暖眼,1))和(V1,(1纺裁,2))罢荡。
2.Actions算子
本質(zhì)上在Action算子中通過SparkContext進(jìn)行了提交作業(yè)的runJob操作,觸發(fā)了RDD DAG的執(zhí)行对扶。
[插圖]
圖1-20 join算子對(duì)RDD轉(zhuǎn)換
例如,Action算子collect函數(shù)的代碼如下惭缰,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖析:
- /**
- Return an array that contains all of the elements in this RDD.
/
def collect(): Array[T] = {
/提交Job/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _)
}
- Return an array that contains all of the elements in this RDD.
下面將介紹常用和較為重要的Action算子浪南。
(1)foreach
foreach對(duì)RDD中的每個(gè)元素都應(yīng)用f函數(shù)操作,不返回RDD和Array漱受,而是返回Uint络凿。
圖1-21表示foreach算子通過用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作。本例中自定義函數(shù)為println()昂羡,控制臺(tái)打印所有數(shù)據(jù)項(xiàng)絮记。
[插圖]
圖1-21 foreach算子對(duì)RDD轉(zhuǎn)換
(2)saveAsTextFile
函數(shù)將數(shù)據(jù)輸出,存儲(chǔ)到HDFS的指定目錄虐先。
下面為saveAsTextFile函數(shù)的內(nèi)部實(shí)現(xiàn)怨愤,其內(nèi)部通過調(diào)用saveAsHadoopFile進(jìn)行實(shí)現(xiàn):
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFileTextOutputFormat[NullWritable, Text]
將RDD中的每個(gè)元素映射轉(zhuǎn)變?yōu)椋╪ull,x.toString)蛹批,然后再將其寫入HDFS撰洗。
圖1-22中左側(cè)方框代表RDD分區(qū)篮愉,右側(cè)方框代表HDFS的Block。通過函數(shù)將RDD的每個(gè)分區(qū)存儲(chǔ)為HDFS中的一個(gè)Block差导。
(3)collect
collect相當(dāng)于toArray试躏,toArray已經(jīng)過時(shí)不推薦使用,collect將分布式的RDD返回為一個(gè)單機(jī)的scala Array數(shù)組设褐。在這個(gè)數(shù)組上運(yùn)用scala的函數(shù)式操作颠蕴。
圖1-23中左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組助析。通過函數(shù)操作犀被,將結(jié)果返回到Driver程序所在的節(jié)點(diǎn),以數(shù)組形式存儲(chǔ)貌笨。
圖1-23中左側(cè)方框代表RDD分區(qū)弱判,右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組。通過函數(shù)操作锥惋,將結(jié)果返回到Driver程序所在的節(jié)點(diǎn)昌腰,以數(shù)組形式存儲(chǔ)。
[插圖]
圖1-22 saveAsHadoopFile算子對(duì)RDD轉(zhuǎn)換
[插圖]
圖1-23 Collect算子對(duì)RDD轉(zhuǎn)換
(4)count
count返回整個(gè)RDD的元素個(gè)數(shù)膀跌。
內(nèi)部函數(shù)實(shí)現(xiàn)為:
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
圖1-24中遭商,返回?cái)?shù)據(jù)的個(gè)數(shù)為5。一個(gè)方塊代表一個(gè)RDD分區(qū)捅伤。
[插圖]
圖1-24 count對(duì)RDD算子轉(zhuǎn)換
1.5 本章小結(jié)
本章首先介紹了Spark分布式計(jì)算平臺(tái)的基本概念劫流、原理以及Spark生態(tài)系統(tǒng)BDAS之上的典型組件。Spark為用戶提供了系統(tǒng)底層細(xì)節(jié)透明丛忆、編程接口簡(jiǎn)潔的分布式計(jì)算平臺(tái)祠汇。Spark具有內(nèi)存計(jì)算、實(shí)時(shí)性高熄诡、容錯(cuò)性好等突出特點(diǎn)可很。同時(shí)本章介紹了Spark的計(jì)算模型,Spark會(huì)將應(yīng)用程序整體翻譯為一個(gè)有向無環(huán)圖進(jìn)行調(diào)度和執(zhí)行凰浮。相比MapReduce我抠,Spark提供了更加優(yōu)化和復(fù)雜的執(zhí)行流。讀者還可以深入了解Spark的運(yùn)行機(jī)制與Spark算子袜茧,這樣能更加直觀地了解API的使用菜拓。Spark提供了更加豐富的函數(shù)式算子,這樣就為Spark上層組件的開發(fā)奠定了堅(jiān)實(shí)的基礎(chǔ)笛厦。
相信讀者已經(jīng)想了解如何開發(fā)Spark程序纳鼎,接下來將就Spark的開發(fā)環(huán)境配置進(jìn)行闡述。