Spark的算子的分類
從大方向來說,Spark 算子大致可以分為以下兩類:
1)Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè),完成作業(yè)中間過程處理。
Transformation 操作是延遲計(jì)算的,也就是說從一個(gè)RDD 轉(zhuǎn)換生成另一個(gè) RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有 Action 操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算鱼鼓。
2)Action 行動(dòng)算子:這類算子會(huì)觸發(fā) SparkContext 提交 Job 作業(yè)。
Action 算子會(huì)觸發(fā) Spark 提交作業(yè)(Job)该编,并將數(shù)據(jù)輸出 Spark系統(tǒng)迄本。
從小方向來說,Spark 算子大致可以分為以下三類:
1)Value數(shù)據(jù)類型的Transformation算子课竣,這種變換并不觸發(fā)提交作業(yè)嘉赎,針對(duì)處理的數(shù)據(jù)項(xiàng)是Value型的數(shù)據(jù)。
2)Key-Value數(shù)據(jù)類型的Transfromation算子于樟,這種變換并不觸發(fā)提交作業(yè)公条,針對(duì)處理的數(shù)據(jù)項(xiàng)是Key-Value型的數(shù)據(jù)對(duì)。
3)Action算子迂曲,這類算子會(huì)觸發(fā)SparkContext提交Job作業(yè)靶橱。
1)Value數(shù)據(jù)類型的Transformation算子
一、輸入分區(qū)與輸出分區(qū)一對(duì)一型
1路捧、map算子
2关霸、flatMap算子
3、mapPartitions算子
4杰扫、glom算子
二队寇、輸入分區(qū)與輸出分區(qū)多對(duì)一型
5、union算子
6章姓、cartesian算子
三佳遣、輸入分區(qū)與輸出分區(qū)多對(duì)多型
7、grouBy算子
四凡伊、輸出分區(qū)為輸入分區(qū)子集型
8零渐、filter算子
9、distinct算子
10窗声、subtract算子
11相恃、sample算子
? ? ? 12辜纲、takeSample算子
? 五笨觅、Cache型
13拦耐、cache算子
14、persist算子
2)Key-Value數(shù)據(jù)類型的Transfromation算子
一见剩、輸入分區(qū)與輸出分區(qū)一對(duì)一
15杀糯、mapValues算子
二、對(duì)單個(gè)RDD或兩個(gè)RDD聚集
單個(gè)RDD聚集
16苍苞、combineByKey算子
17固翰、reduceByKey算子
18、partitionBy算子
? 兩個(gè)RDD聚集
? ? ? ? ? ? ??19羹呵、Cogroup算子
三骂际、連接
20、join算子
21冈欢、leftOutJoin和 rightOutJoin算子
?3)Action算子
一歉铝、無輸出
? ? ? ? ? ? ? ?22、foreach算子
二凑耻、HDFS
23太示、saveAsTextFile算子
24、saveAsObjectFile算子
三香浩、Scala集合和數(shù)據(jù)類型
25类缤、collect算子
26、collectAsMap算子
? ? ? ? ? ? ? 27邻吭、reduceByKeyLocally算子
? ? ? ? ? 28餐弱、lookup算子
29、count算子
30镜盯、top算子
31岸裙、reduce算子
32、fold算子
33速缆、aggregate算子
1. Transformations 算子
(1)?map
將原來 RDD 的每個(gè)數(shù)據(jù)項(xiàng)通過?map 中的用戶自定義函數(shù) f?映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素降允。源碼中 map 算子相當(dāng)于初始化一個(gè) RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))艺糜。
圖 1中每個(gè)方框表示一個(gè) RDD 分區(qū)剧董,左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù) f:T->U?映射為右側(cè)的新 RDD 分區(qū)。但是破停,實(shí)際只有等到 Action算子觸發(fā)后翅楼,這個(gè) f 函數(shù)才會(huì)和其他函數(shù)在一個(gè)stage 中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。在圖 1 中的第一個(gè)分區(qū)真慢,數(shù)據(jù)記錄 V1 輸入 f毅臊,通過 f 轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄 V’1。
(2)?flatMap
將原來 RDD 中的每個(gè)元素通過函數(shù) f 轉(zhuǎn)換為新的元素黑界,并將生成的 RDD 的每個(gè)集合中的元素合并為一個(gè)集合管嬉,內(nèi)部創(chuàng)建 FlatMappedRDD(this细诸,sc.clean(f))几颜。
圖 2 表 示 RDD 的 一 個(gè) 分 區(qū) 婿脸,進(jìn) 行 flatMap函 數(shù) 操 作咽弦, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U,?T和 U 可以是任意的數(shù)據(jù)類型胎挎。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù) f 轉(zhuǎn)換為新的數(shù)據(jù)沟启。外部大方框可以認(rèn)為是一個(gè) RDD 分區(qū),小方框代表一個(gè)集合犹菇。 V1德迹、 V2、 V3 在一個(gè)集合作為 RDD 的一個(gè)數(shù)據(jù)項(xiàng)揭芍,可能存儲(chǔ)為數(shù)組或其他容器浦辨,轉(zhuǎn)換為V’1、 V’2沼沈、 V’3 后流酬,將原來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項(xiàng)列另。
(3)?mapPartitions
mapPartitions 函 數(shù) 獲 取 到 每 個(gè) 分 區(qū) 的 迭 代器芽腾,在 函 數(shù) 中 通 過 這 個(gè) 分 區(qū) 整 體 的 迭 代 器 對(duì)整 個(gè) 分 區(qū) 的 元 素 進(jìn) 行 操 作。 內(nèi) 部 實(shí) 現(xiàn) 是 生 成
MapPartitionsRDD页衙。圖 3 中的方框代表一個(gè) RDD 分區(qū)摊滔。圖 3 中,用戶通過函數(shù) f (iter)=>iter.f ilter(_>=3) 對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過濾店乐,大于和等于 3 的數(shù)據(jù)保留艰躺。一個(gè)方塊代表一個(gè) RDD 分區(qū),含有 1眨八、 2腺兴、 3 的分區(qū)過濾只剩下元素 3。
(4)glom
glom函數(shù)將每個(gè)分區(qū)形成一個(gè)數(shù)組廉侧,內(nèi)部實(shí)現(xiàn)是返回的GlommedRDD页响。 圖4中的每個(gè)方框代表一個(gè)RDD分區(qū)。圖4中的方框代表一個(gè)分區(qū)段誊。 該圖表示含有V1闰蚕、 V2、 V3的分區(qū)通過函數(shù)glom形成一數(shù)組Array[(V1)连舍,(V2)没陡,(V3)]。
(5)?union
使用 union 函數(shù)時(shí)需要保證兩個(gè) RDD 元素的數(shù)據(jù)類型相同,返回的 RDD 數(shù)據(jù)類型和被合并的 RDD 元素?cái)?shù)據(jù)類型相同盼玄,并不進(jìn)行去重操作染簇,保存所有元素。如果想去重
可以使用 distinct()强岸。同時(shí) Spark 還提供更為簡(jiǎn)潔的使用 union 的 API,通過 ++ 符號(hào)相當(dāng)于 union 函數(shù)操作砾赔。
圖 5 中左側(cè)大方框代表兩個(gè) RDD蝌箍,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD暴心,大方框內(nèi)的小方框代表分區(qū)妓盲。
含有V1、V2专普、U1悯衬、U2、U3檀夹、U4的RDD和含有V1筋粗、V8、U5炸渡、U6娜亿、U7、U8的RDD合并所有元素形成一個(gè)RDD蚌堵。V1买决、V1、V2吼畏、V8形成一個(gè)分區(qū)督赤,U1、U2泻蚊、U3躲舌、U4、U5性雄、U6孽糖、U7、U8形成一個(gè)分區(qū)毅贮。
(6)?cartesian
對(duì) 兩 個(gè) RDD 內(nèi) 的 所 有 元 素?進(jìn) 行 笛 卡 爾 積 操 作办悟。 操 作 后, 內(nèi) 部 實(shí) 現(xiàn) 返 回CartesianRDD滩褥。圖6中左側(cè)大方框代表兩個(gè) RDD病蛉,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD,大方框內(nèi)的小方框代表分區(qū)铺然。圖6中的大方框代表RDD俗孝,大方框中的小方框代表RDD分區(qū)。
例 如: V1 和 另 一 個(gè) RDD 中 的 W1魄健、 W2赋铝、 Q5 進(jìn) 行 笛 卡 爾 積 運(yùn) 算 形 成 (V1,W1)、(V1,W2)沽瘦、 (V1,Q5)革骨。
(7)?groupBy
groupBy :將元素通過函數(shù)生成相應(yīng)的 Key,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式析恋,之后將 Key 相同的元素分為一組良哲。
函數(shù)實(shí)現(xiàn)如下:
1)將用戶函數(shù)預(yù)處理:
val cleanF = sc.clean(f)
2)對(duì)數(shù)據(jù) map 進(jìn)行函數(shù)操作,最后再進(jìn)行 groupByKey 分組操作助隧。
this.map(t => (cleanF(t), t)).groupByKey(p)
其中筑凫, p 確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù),也就決定了并行化的程度并村。
圖7 中方框代表一個(gè) RDD 分區(qū)巍实,相同key 的元素合并到一個(gè)組。例如 V1 和 V2 合并為 V哩牍, Value 為 V1,V2蔫浆。形成 V,Seq(V1,V2)。
圖 7?groupBy 算子對(duì) RDD 轉(zhuǎn)換
(8)?filter
filter 函數(shù)功能是對(duì)元素進(jìn)行過濾姐叁,對(duì)每個(gè) 元 素 應(yīng) 用 f 函 數(shù)瓦盛, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉外潜。 內(nèi) 部 實(shí) 現(xiàn) 相 當(dāng) 于 生 成 FilteredRDD(this原环,sc.clean(f))。
下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
圖 8 中每個(gè)方框代表一個(gè) RDD 分區(qū)处窥, T 可以是任意的類型嘱吗。通過用戶自定義的過濾函數(shù) f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作滔驾,將滿足條件谒麦、返回結(jié)果為 true 的數(shù)據(jù)項(xiàng)保留。例如哆致,過濾掉 V2 和 V3 保留了 V1绕德,為區(qū)分命名為 V’1。
圖 8 ?filter 算子對(duì) RDD 轉(zhuǎn)換
(9)distinct
distinct將RDD中的元素進(jìn)行去重操作摊阀。圖9中的每個(gè)方框代表一個(gè)RDD分區(qū)耻蛇,通過distinct函數(shù)踪蹬,將數(shù)據(jù)去重。 例如臣咖,重復(fù)數(shù)據(jù)V1跃捣、 V1去重后只保留一份V1。
圖9 ?distinct算子對(duì)RDD轉(zhuǎn)換
(10)subtract
subtract相當(dāng)于進(jìn)行集合的差操作夺蛇,RDD 1去除RDD 1和RDD 2交集中的所有元素疚漆。圖10中左側(cè)的大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)刁赦。 右側(cè)大方框
代表合并后的RDD娶聘,大方框內(nèi)的小方框代表分區(qū)。 V1在兩個(gè)RDD中均有截型,根據(jù)差集運(yùn)算規(guī)則,新RDD不保留儒溉,V2在第一個(gè)RDD有宦焦,第二個(gè)RDD沒有,則在新RDD元素中包含V2顿涣。
圖10 ? subtract算子對(duì)RDD轉(zhuǎn)換
(11)?sample
sample 將 RDD 這個(gè)集合內(nèi)的元素進(jìn)行采樣波闹,獲取所有元素的子集。用戶可以設(shè)定是否有放回的抽樣涛碑、百分比精堕、隨機(jī)種子,進(jìn)而決定采樣方式蒲障。內(nèi)部實(shí)現(xiàn)是生成 SampledRDD(withReplacement歹篓, fraction, seed)揉阎。
函數(shù)參數(shù)設(shè)置:
‰ withReplacement=true庄撮,表示有放回的抽樣。
‰ withReplacement=false毙籽,表示無放回的抽樣洞斯。
圖 11中 的 每 個(gè) 方 框 是 一 個(gè) RDD 分 區(qū)。 通 過 sample 函 數(shù)坑赡, 采 樣 50% 的 數(shù) 據(jù)烙如。V1、 V2毅否、 U1亚铁、 U2、U3螟加、U4 采樣出數(shù)據(jù) V1 和 U1刀闷、 U2 形成新的 RDD熊泵。
圖11 ?sample 算子對(duì) RDD 轉(zhuǎn)換
(12)takeSample
takeSample()函數(shù)和上面的sample函數(shù)是一個(gè)原理,但是不使用相對(duì)比例采樣甸昏,而是按設(shè)定的采樣個(gè)數(shù)進(jìn)行采樣顽分,同時(shí)返回結(jié)果不再是RDD,而是相當(dāng)于對(duì)采樣后的數(shù)據(jù)進(jìn)行
Collect()施蜜,返回結(jié)果的集合為單機(jī)的數(shù)組卒蘸。
圖12中左側(cè)的方框代表分布式的各個(gè)節(jié)點(diǎn)上的分區(qū),右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組翻默。 通過takeSample對(duì)數(shù)據(jù)采樣缸沃,設(shè)置為采樣一份數(shù)據(jù),返回結(jié)果為V1修械。
圖12 ? takeSample算子對(duì)RDD轉(zhuǎn)換
(13)?cache
cache?將 RDD 元素從磁盤緩存到內(nèi)存趾牧。 相當(dāng)于 persist(MEMORY_ONLY) 函數(shù)的功能。
圖13 中每個(gè)方框代表一個(gè) RDD 分區(qū)肯污,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤翘单,通過 cache 算子將數(shù)據(jù)緩存在內(nèi)存。
圖 13 Cache 算子對(duì) RDD 轉(zhuǎn)換
(14)?persist
persist 函數(shù)對(duì)?RDD 進(jìn)行緩存操作蹦渣。數(shù)據(jù)緩存在哪里依據(jù) StorageLevel 這個(gè)枚舉類型進(jìn)行確定哄芜。 有以下幾種類型的組合(見10), DISK 代表磁盤柬唯,MEMORY 代表內(nèi)存认臊, SER 代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)。
下面為函數(shù)定義锄奢, StorageLevel 是枚舉類型失晴,代表存儲(chǔ)模式,用戶可以通過圖 14-1 按需進(jìn)行選擇拘央。
persist(newLevel:StorageLevel)
圖 14-1 中列出persist 函數(shù)可以進(jìn)行緩存的模式师坎。例如,MEMORY_AND_DISK_SER 代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤堪滨,并且以序列化的方式存儲(chǔ)胯陋,其他同理。
圖 14-1 ?persist 算子對(duì) RDD 轉(zhuǎn)換
圖 14-2 中方框代表 RDD 分區(qū)袱箱。 disk 代表存儲(chǔ)在磁盤遏乔, mem 代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初全部存儲(chǔ)在磁盤发笔,通過 persist(MEMORY_AND_DISK) 將數(shù)據(jù)緩存到內(nèi)存盟萨,但是有的分區(qū)無法容納在內(nèi)存,將含有 V1了讨、 V2捻激、 V3 的RDD存儲(chǔ)到磁盤制轰,將含有U1,U2的RDD仍舊存儲(chǔ)在內(nèi)存胞谭。
? ? ? 圖 14-2 ? Persist 算子對(duì) RDD 轉(zhuǎn)換
(15)?mapValues
mapValues :針對(duì)(Key垃杖, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作,而不對(duì) Key 進(jìn)行處理丈屹。
? ? 圖 15 中的方框代表 RDD 分區(qū)调俘。 a=>a+2 代表對(duì) (V1,1) 這樣的 Key Value 數(shù)據(jù)對(duì),數(shù)據(jù)只對(duì) Value 中的 1 進(jìn)行加 2 操作旺垒,返回結(jié)果為 3彩库。
圖 15 ? mapValues 算子 RDD 對(duì)轉(zhuǎn)換
(16)?combineByKey
下面代碼為 combineByKey 函數(shù)的定義:
combineByKey[C](createCombiner:(V) C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
說明:
‰ createCombiner: V => C, C 不存在的情況下先蒋,比如通過 V 創(chuàng)建 seq C骇钦。
‰ mergeValue: (C, V) => C竞漾,當(dāng) C 已經(jīng)存在的情況下眯搭,需要 merge,比如把 item V
加到 seq C 中畴蹭,或者疊加坦仍。
mergeCombiners: (C鳍烁, C) => C叨襟,合并兩個(gè) C。
‰ partitioner: Partitioner, Shuff le 時(shí)需要的 Partitioner幔荒。
‰ mapSideCombine : Boolean = true糊闽,為了減小傳輸量,很多 combine 可以在 map
端先做爹梁,比如疊加右犹,可以先在一個(gè) partition 中把所有相同的 key 的 value 疊加,
再 shuff le姚垃。
‰ serializerClass: String = null念链,傳輸需要序列化,用戶可以自定義序列化類:
例如积糯,相當(dāng)于將元素為 (Int掂墓, Int) 的 RDD 轉(zhuǎn)變?yōu)榱?(Int, Seq[Int]) 類型元素的 RDD看成。圖 16中的方框代表 RDD 分區(qū)君编。如圖,通過 combineByKey川慌, 將 (V1,2)吃嘿, (V1,1)數(shù)據(jù)合并為( V1,Seq(2,1))祠乃。
圖 16? comBineByKey 算子對(duì) RDD 轉(zhuǎn)換
(17)?reduceByKey
reduceByKey 是比 combineByKey 更簡(jiǎn)單的一種情況,只是兩個(gè)值合并成一個(gè)值兑燥,( Int亮瓷, Int V)to (Int, Int C)贪嫂,比如疊加寺庄。所以 createCombiner reduceBykey 很簡(jiǎn)單,就是直接返回 v力崇,而 mergeValue和 mergeCombiners 邏輯是相同的斗塘,沒有區(qū)別。
函數(shù)實(shí)現(xiàn):
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
圖17中的方框代表 RDD 分區(qū)亮靴。通過用戶自定義函數(shù) (A,B) => (A + B) 函數(shù)馍盟,將相同 key 的數(shù)據(jù) (V1,2) 和 (V1,1) 的 value 相加運(yùn)算,結(jié)果為( V1,3)茧吊。
圖 17?reduceByKey 算子對(duì) RDD 轉(zhuǎn)換
(18)partitionBy
partitionBy函數(shù)對(duì)RDD進(jìn)行分區(qū)操作贞岭。
函數(shù)定義如下。
partitionBy(partitioner:Partitioner)
如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致搓侄,則不重分區(qū)瞄桨,如果不一致,則相當(dāng)于根據(jù)分區(qū)器生成一個(gè)新的ShuffledRDD讶踪。
圖18中的方框代表RDD分區(qū)芯侥。 通過新的分區(qū)策略將原來在不同分區(qū)的V1、 V2數(shù)據(jù)都合并到了一個(gè)分區(qū)乳讥。
圖18 partitionBy算子對(duì)RDD轉(zhuǎn)換
(19)Cogroup
cogroup函數(shù)將兩個(gè)RDD進(jìn)行協(xié)同劃分柱查,cogroup函數(shù)的定義如下。
cogroup[W](other: RDD[(K云石, W)]唉工, numPartitions: Int): RDD[(K, (Iterable[V]汹忠, Iterable[W]))]
對(duì)在兩個(gè)RDD中的Key-Value類型的元素淋硝,每個(gè)RDD相同Key的元素分別聚合為一個(gè)集合,并且返回兩個(gè)RDD中對(duì)應(yīng)Key的元素集合的迭代器宽菜。
(K谣膳, (Iterable[V], Iterable[W]))
其中赋焕,Key和Value参歹,Value是兩個(gè)RDD下相同Key的兩個(gè)數(shù)據(jù)集合的迭代器所構(gòu)成的元組。
圖19中的大方框代表RDD隆判,大方框內(nèi)的小方框代表RDD中的分區(qū)犬庇。 將RDD1中的數(shù)據(jù)(U1僧界,1)、 (U1臭挽,2)和RDD2中的數(shù)據(jù)(U1捂襟,2)合并為(U1,((1欢峰,2)葬荷,(2)))。
圖19 ?Cogroup算子對(duì)RDD轉(zhuǎn)換
(20)?join
join 對(duì)兩個(gè)需要連接的 RDD 進(jìn)行 cogroup函數(shù)操作纽帖,將相同 key 的數(shù)據(jù)能夠放到一個(gè)分區(qū)宠漩,在 cogroup 操作之后形成的新 RDD 對(duì)每個(gè)key 下的元素進(jìn)行笛卡爾積的操作,返回的結(jié)果再展平懊直,對(duì)應(yīng) key 下的所有元組形成一個(gè)集合扒吁。最后返回 RDD[(K, (V室囊, W))]雕崩。
下 面 代 碼 為 join 的 函 數(shù) 實(shí) 現(xiàn), 本 質(zhì) 是通 過 cogroup 算 子 先 進(jìn) 行 協(xié) 同 劃 分融撞, 再 通 過flatMapValues 將合并的數(shù)據(jù)打散盼铁。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) =>?for(v<-vs;w<-ws)yield(v,w) }
圖 20是對(duì)兩個(gè) RDD 的 join 操作示意圖。大方框代表 RDD尝偎,小方框代表 RDD 中的分區(qū)饶火。函數(shù)對(duì)相同 key 的元素,如 V1 為 key 做連接后結(jié)果為 (V1,(1,1)) 和 (V1,(1,2))冬念。
圖 20 ? join 算子對(duì) RDD 轉(zhuǎn)換
(21)eftOutJoin和rightOutJoin
LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空趁窃,如果為空牧挣,則填充為空急前。 如果不為空,則將數(shù)據(jù)進(jìn)行連接運(yùn)算瀑构,并
返回結(jié)果裆针。
下面代碼是leftOutJoin的實(shí)現(xiàn)。
if (ws.isEmpty) {
vs.map(v => (v寺晌, None))
} else {
for (v <- vs世吨; w <- ws) yield (v, Some(w))
}
2. Actions 算子
本質(zhì)上在 Action 算子中通過 SparkContext 進(jìn)行了提交作業(yè)的 runJob 操作呻征,觸發(fā)了RDD DAG 的執(zhí)行耘婚。
例如, Action 算子 collect 函數(shù)的代碼如下陆赋,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖析:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
/* 提交 Job*/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
(22)?foreach
foreach 對(duì) RDD 中的每個(gè)元素都應(yīng)用 f 函數(shù)操作沐祷,不返回 RDD 和 Array嚷闭, 而是返回Uint。圖22表示 foreach 算子通過用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作赖临。本例中自定義函數(shù)為 println()胞锰,控制臺(tái)打印所有數(shù)據(jù)項(xiàng)。
圖 22 foreach 算子對(duì) RDD 轉(zhuǎn)換
(23)?saveAsTextFile
函數(shù)將數(shù)據(jù)輸出兢榨,存儲(chǔ)到 HDFS 的指定目錄嗅榕。
下面為 saveAsTextFile 函數(shù)的內(nèi)部實(shí)現(xiàn),其內(nèi)部
通過調(diào)用 saveAsHadoopFile 進(jìn)行實(shí)現(xiàn):
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
將 RDD 中的每個(gè)元素映射轉(zhuǎn)變?yōu)?(null吵聪, x.toString)凌那,然后再將其寫入 HDFS。
圖 23中左側(cè)方框代表 RDD 分區(qū)吟逝,右側(cè)方框代表 HDFS 的 Block案怯。通過函數(shù)將RDD 的每個(gè)分區(qū)存儲(chǔ)為 HDFS 中的一個(gè) Block。
圖 23 ? saveAsHadoopFile 算子對(duì) RDD 轉(zhuǎn)換
(24)saveAsObjectFile
saveAsObjectFile將分區(qū)中的每10個(gè)元素組成一個(gè)Array澎办,然后將這個(gè)Array序列化嘲碱,映射為(Null,BytesWritable(Y))的元素局蚀,寫入HDFS為SequenceFile的格式麦锯。
下面代碼為函數(shù)內(nèi)部實(shí)現(xiàn)。
map(x=>(NullWritable.get()琅绅,new BytesWritable(Utils.serialize(x))))
圖24中的左側(cè)方框代表RDD分區(qū)扶欣,右側(cè)方框代表HDFS的Block。 通過函數(shù)將RDD的每個(gè)分區(qū)存儲(chǔ)為HDFS上的一個(gè)Block千扶。
圖24 saveAsObjectFile算子對(duì)RDD轉(zhuǎn)換
(25)?collect
collect 相當(dāng)于 toArray料祠, toArray 已經(jīng)過時(shí)不推薦使用, collect 將分布式的 RDD 返回為一個(gè)單機(jī)的 scala Array 數(shù)組澎羞。在這個(gè)數(shù)組上運(yùn)用 scala 的函數(shù)式操作髓绽。
圖 25中左側(cè)方框代表 RDD 分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組妆绞。通過函數(shù)操作顺呕,將結(jié)果返回到 Driver 程序所在的節(jié)點(diǎn),以數(shù)組形式存儲(chǔ)括饶。
圖 25 ? Collect 算子對(duì) RDD 轉(zhuǎn)換
(26)collectAsMap
collectAsMap對(duì)(K株茶,V)型的RDD數(shù)據(jù)返回一個(gè)單機(jī)HashMap。 對(duì)于重復(fù)K的RDD元素图焰,后面的元素覆蓋前面的元素启盛。
圖26中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)數(shù)組。 數(shù)據(jù)通過collectAsMap函數(shù)返回給Driver程序計(jì)算結(jié)果僵闯,結(jié)果以HashMap形式存儲(chǔ)笤闯。
圖26 CollectAsMap算子對(duì)RDD轉(zhuǎn)換
(27)reduceByKeyLocally
實(shí)現(xiàn)的是先reduce再collectAsMap的功能,先對(duì)RDD的整體進(jìn)行reduce操作棍厂,然后再收集所有結(jié)果返回為一個(gè)HashMap颗味。
(28)lookup
下面代碼為lookup的聲明。
lookup(key:K):Seq[V]
Lookup函數(shù)對(duì)(Key牺弹,Value)型的RDD操作浦马,返回指定Key對(duì)應(yīng)的元素形成的Seq。 這個(gè)函數(shù)處理優(yōu)化的部分在于张漂,如果這個(gè)RDD包含分區(qū)器晶默,則只會(huì)對(duì)應(yīng)處理K所在的分區(qū),然后返回由(K航攒,V)形成的Seq磺陡。 如果RDD不包含分區(qū)器,則需要對(duì)全RDD元素進(jìn)行暴力掃描處理漠畜,搜索指定K對(duì)應(yīng)的元素币他。
圖28中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表Seq憔狞,最后結(jié)果返回到Driver所在節(jié)點(diǎn)的應(yīng)用中蝴悉。
圖28 ?lookup對(duì)RDD轉(zhuǎn)換
(29)?count
count 返回整個(gè) RDD 的元素個(gè)數(shù)。
內(nèi)部函數(shù)實(shí)現(xiàn)為:
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
圖 29中瘾敢,返回?cái)?shù)據(jù)的個(gè)數(shù)為 5拍冠。一個(gè)方塊代表一個(gè) RDD 分區(qū)。
?圖29 count 對(duì) RDD 算子轉(zhuǎn)換
(30)top
top可返回最大的k個(gè)元素簇抵。 函數(shù)定義如下庆杜。
top(num:Int)(implicit ord:Ordering[T]):Array[T]
相近函數(shù)說明如下。
·top返回最大的k個(gè)元素碟摆。
·take返回最小的k個(gè)元素晃财。
·takeOrdered返回最小的k個(gè)元素,并且在返回的數(shù)組中保持元素的順序焦履。
·first相當(dāng)于top(1)返回整個(gè)RDD中的前k個(gè)元素拓劝,可以定義排序的方式Ordering[T]雏逾。
返回的是一個(gè)含前k個(gè)元素的數(shù)組嘉裤。
(31)reduce
reduce函數(shù)相當(dāng)于對(duì)RDD中的元素進(jìn)行reduceLeft函數(shù)的操作。 函數(shù)實(shí)現(xiàn)如下栖博。
Some(iter.reduceLeft(cleanF))
reduceLeft先對(duì)兩個(gè)元素進(jìn)行reduce函數(shù)操作屑宠,然后將結(jié)果和迭代器取出的下一個(gè)元素進(jìn)行reduce函數(shù)操作,直到迭代器遍歷完所有元素仇让,得到最后結(jié)果典奉。在RDD中躺翻,先對(duì)每個(gè)分區(qū)中的所有元素的集合分別進(jìn)行reduceLeft。 每個(gè)分區(qū)形成的結(jié)果相當(dāng)于一個(gè)元素卫玖,再對(duì)這個(gè)結(jié)果集合進(jìn)行reduceleft操作公你。
例如:用戶自定義函數(shù)如下。
f:(A假瞬,B)=>(A._1+”@”+B._1陕靠,A._2+B._2)
圖31中的方框代表一個(gè)RDD分區(qū),通過用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運(yùn)算脱茉。 示例
最后的返回結(jié)果為V1@[1]V2U剪芥!@U2@U3@U4,12琴许。
圖31 reduce算子對(duì)RDD轉(zhuǎn)換
(32)fold
fold和reduce的原理相同税肪,但是與reduce不同,相當(dāng)于每個(gè)reduce時(shí)榜田,迭代器取的第一個(gè)元素是zeroValue益兄。
圖32中通過下面的用戶自定義函數(shù)進(jìn)行fold運(yùn)算,圖中的一個(gè)方框代表一個(gè)RDD分區(qū)箭券。 讀者可以參照reduce函數(shù)理解偏塞。
fold((”V0@”,2))( (A邦鲫,B)=>(A._1+”@”+B._1灸叼,A._2+B._2))
圖32 ?fold算子對(duì)RDD轉(zhuǎn)換
(33)aggregate
aggregate先對(duì)每個(gè)分區(qū)的所有元素進(jìn)行aggregate操作,再對(duì)分區(qū)的結(jié)果進(jìn)行fold操作庆捺。
aggreagate與fold和reduce的不同之處在于古今,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集,這種聚集是并行化的滔以。 而在fold和reduce函數(shù)的運(yùn)算過程中捉腥,每個(gè)分區(qū)中需要進(jìn)行串行處理,每個(gè)分區(qū)串行計(jì)算完結(jié)果你画,結(jié)果再按之前的方式進(jìn)行聚集抵碟,并返回最終聚集結(jié)果。
函數(shù)的定義如下坏匪。
aggregate[B](z: B)(seqop: (B拟逮,A) => B,combop: (B适滓,B) => B): B
圖33通過用戶自定義函數(shù)對(duì)RDD 進(jìn)行aggregate的聚集操作敦迄,圖中的每個(gè)方框代表一個(gè)RDD分區(qū)。
rdd.aggregate(”V0@”,2)((A罚屋,B)=>(A._1+”@”+B._1苦囱,A._2+B._2)),(A脾猛,B)=>(A._1+”@”+B_1撕彤,A._@+B_.2))
最后,介紹兩個(gè)計(jì)算模型中的兩個(gè)特殊變量猛拴。
廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表喉刘,以及廣播大變量等場(chǎng)景。 這些數(shù)據(jù)集合在單節(jié)點(diǎn)內(nèi)存能夠容納漆弄,不需要像RDD那樣在節(jié)點(diǎn)之間打散存儲(chǔ)睦裳。
Spark運(yùn)行時(shí)把廣播變量數(shù)據(jù)發(fā)到各個(gè)節(jié)點(diǎn),并保存下來撼唾,后續(xù)計(jì)算可以復(fù)用廉邑。 相比Hadoo的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享倒谷。 Broadcast的底層實(shí)現(xiàn)采用了BT機(jī)制蛛蒙。
②代表V。
③代表U渤愁。
accumulator變量:允許做全局累加操作牵祟,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運(yùn)行指標(biāo)的情景。