Spark常用算子詳解
Spark的算子的分類(lèi)
從大方向來(lái)說(shuō),Spark 算子大致可以分為以下兩類(lèi):
1)Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè)编整,完成作業(yè)中間過(guò)程處理诈铛。
Transformation 操作是延遲計(jì)算的宫纬,也就是說(shuō)從一個(gè)RDD 轉(zhuǎn)換生成另一個(gè) RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有 Action 操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算闺兢。
2)Action 行動(dòng)算子:這類(lèi)算子會(huì)觸發(fā) SparkContext 提交 Job 作業(yè)雇初。
Action 算子會(huì)觸發(fā) Spark 提交作業(yè)(Job),并將數(shù)據(jù)輸出 Spark系統(tǒng)烁兰。
從小方向來(lái)說(shuō)耐亏,Spark 算子大致可以分為以下三類(lèi):
1)Value數(shù)據(jù)類(lèi)型的Transformation算子,這種變換并不觸發(fā)提交作業(yè)沪斟,針對(duì)處理的數(shù)據(jù)項(xiàng)是Value型的數(shù)據(jù)广辰。
2)Key-Value數(shù)據(jù)類(lèi)型的Transfromation算子,這種變換并不觸發(fā)提交作業(yè)主之,針對(duì)處理的數(shù)據(jù)項(xiàng)是Key-Value型的數(shù)據(jù)對(duì)择吊。
3)Action算子,這類(lèi)算子會(huì)觸發(fā)SparkContext提交Job作業(yè)槽奕。
1)Value數(shù)據(jù)類(lèi)型的Transformation算子
一几睛、輸入分區(qū)與輸出分區(qū)一對(duì)一型
1痴颊、map算子
2儿普、flatMap算子
3、mapPartitions算子
4、glom算子
二突想、輸入分區(qū)與輸出分區(qū)多對(duì)一型
5煤篙、union算子
6萌壳、cartesian算子
三场梆、輸入分區(qū)與輸出分區(qū)多對(duì)多型
7、grouBy算子
四晴弃、輸出分區(qū)為輸入分區(qū)子集型
8掩幢、filter算子
9、distinct算子
10上鞠、subtract算子
11际邻、sample算子
12、takeSample算子
五芍阎、Cache型
13枯怖、cache算子
14、persist算子
2)Key-Value數(shù)據(jù)類(lèi)型的Transfromation算子
一能曾、輸入分區(qū)與輸出分區(qū)一對(duì)一
15、mapValues算子
二肿轨、對(duì)單個(gè)RDD或兩個(gè)RDD聚集
單個(gè)RDD聚集
16寿冕、combineByKey算子
17、reduceByKey算子
18椒袍、partitionBy算子
兩個(gè)RDD聚集
19驼唱、Cogroup算子
三、連接
20驹暑、join算子
21玫恳、leftOutJoin和 rightOutJoin算子
3)Action算子
一、無(wú)輸出
22优俘、foreach算子
二京办、HDFS
23、saveAsTextFile算子
24帆焕、saveAsObjectFile算子
三惭婿、Scala集合和數(shù)據(jù)類(lèi)型
25、collect算子
26叶雹、collectAsMap算子
27财饥、reduceByKeyLocally算子
28、lookup算子
29折晦、count算子
30钥星、top算子
31、reduce算子
32满着、fold算子
33谦炒、aggregate算子
1. Transformations 算子
(1) map
將原來(lái) RDD 的每個(gè)數(shù)據(jù)項(xiàng)通過(guò) map 中的用戶自定義函數(shù) f 映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素贯莺。源碼中 map 算子相當(dāng)于初始化一個(gè) RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))编饺。
圖 1中每個(gè)方框表示一個(gè) RDD 分區(qū)乖篷,左側(cè)的分區(qū)經(jīng)過(guò)用戶自定義函數(shù) f:T->U 映射為右側(cè)的新 RDD 分區(qū)。但是透且,實(shí)際只有等到 Action算子觸發(fā)后撕蔼,這個(gè) f 函數(shù)才會(huì)和其他函數(shù)在一個(gè)stage 中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。在圖 1 中的第一個(gè)分區(qū)秽誊,數(shù)據(jù)記錄 V1 輸入 f鲸沮,通過(guò) f 轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄 V’1。
圖1 map 算子對(duì) RDD 轉(zhuǎn)換
(2) flatMap
將原來(lái) RDD 中的每個(gè)元素通過(guò)函數(shù) f 轉(zhuǎn)換為新的元素锅论,并將生成的 RDD 的每個(gè)集合中的元素合并為一個(gè)集合讼溺,內(nèi)部創(chuàng)建 FlatMappedRDD(this,sc.clean(f))最易。
圖 2 表 示 RDD 的 一 個(gè) 分 區(qū) 怒坯,進(jìn) 行 flatMap函 數(shù) 操 作, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U藻懒, T和 U 可以是任意的數(shù)據(jù)類(lèi)型剔猿。將分區(qū)中的數(shù)據(jù)通過(guò)用戶自定義函數(shù) f 轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè) RDD 分區(qū)嬉荆,小方框代表一個(gè)集合归敬。 V1、 V2鄙早、 V3 在一個(gè)集合作為 RDD 的一個(gè)數(shù)據(jù)項(xiàng)汪茧,可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V’1限番、 V’2舱污、 V’3 后,將原來(lái)的數(shù)組或容器結(jié)合拆散弥虐,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項(xiàng)慌闭。
[圖片上傳失敗...(image-ab6902-1588818548418)]
圖2 flapMap 算子對(duì) RDD 轉(zhuǎn)換
(3) mapPartitions
mapPartitions 函 數(shù) 獲 取 到 每 個(gè) 分 區(qū) 的 迭 代器,在 函 數(shù) 中 通 過(guò) 這 個(gè) 分 區(qū) 整 體 的 迭 代 器 對(duì)整 個(gè) 分 區(qū) 的 元 素 進(jìn) 行 操 作躯舔。 內(nèi) 部 實(shí) 現(xiàn) 是 生 成
MapPartitionsRDD驴剔。圖 3 中的方框代表一個(gè) RDD 分區(qū)。圖 3 中粥庄,用戶通過(guò)函數(shù) f (iter)=>iter.f ilter(_>=3) 對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過(guò)濾丧失,大于和等于 3 的數(shù)據(jù)保留。一個(gè)方塊代表一個(gè) RDD 分區(qū)惜互,含有 1布讹、 2琳拭、 3 的分區(qū)過(guò)濾只剩下元素 3。
圖3 mapPartitions 算子對(duì) RDD 轉(zhuǎn)換
(4)glom
glom函數(shù)將每個(gè)分區(qū)形成一個(gè)數(shù)組描验,內(nèi)部實(shí)現(xiàn)是返回的GlommedRDD白嘁。 圖4中的每個(gè)方框代表一個(gè)RDD分區(qū)。圖4中的方框代表一個(gè)分區(qū)膘流。 該圖表示含有V1絮缅、 V2、 V3的分區(qū)通過(guò)函數(shù)glom形成一數(shù)組Array[(V1)呼股,(V2)耕魄,(V3)]。
圖 4 glom算子對(duì)RDD轉(zhuǎn)換
(5) union
使用 union 函數(shù)時(shí)需要保證兩個(gè) RDD 元素的數(shù)據(jù)類(lèi)型相同彭谁,返回的 RDD 數(shù)據(jù)類(lèi)型和被合并的 RDD 元素?cái)?shù)據(jù)類(lèi)型相同吸奴,并不進(jìn)行去重操作,保存所有元素缠局。如果想去重
可以使用 distinct()则奥。同時(shí) Spark 還提供更為簡(jiǎn)潔的使用 union 的 API,通過(guò) ++ 符號(hào)相當(dāng)于 union 函數(shù)操作狭园。
圖 5 中左側(cè)大方框代表兩個(gè) RDD读处,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD妙啃,大方框內(nèi)的小方框代表分區(qū)。
含有V1俊戳、V2揖赴、U1、U2抑胎、U3燥滑、U4的RDD和含有V1、V8阿逃、U5铭拧、U6、U7恃锉、U8的RDD合并所有元素形成一個(gè)RDD搀菩。V1、V1破托、V2肪跋、V8形成一個(gè)分區(qū),U1土砂、U2州既、U3谜洽、U4、U5吴叶、U6阐虚、U7、U8形成一個(gè)分區(qū)蚌卤。
圖 5 union 算子對(duì) RDD 轉(zhuǎn)換
(6) cartesian
? 對(duì) 兩 個(gè) RDD 內(nèi) 的 所 有 元 素 進(jìn) 行 笛 卡 爾 積 操 作实束。 操 作 后, 內(nèi) 部 實(shí) 現(xiàn) 返 回CartesianRDD造寝。圖6中左側(cè)大方框代表兩個(gè) RDD磕洪,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD诫龙,大方框內(nèi)的小方框代表分區(qū)析显。圖6中的大方框代表RDD,大方框中的小方框代表RDD分區(qū)签赃。
例 如: V1 和 另 一 個(gè) RDD 中 的 W1谷异、 W2、 Q5 進(jìn) 行 笛 卡 爾 積 運(yùn) 算 形 成 (V1,W1)锦聊、(V1,W2)歹嘹、 (V1,Q5)。
? 圖 6 cartesian 算子對(duì) RDD 轉(zhuǎn)換
(7) groupBy
groupBy :將元素通過(guò)函數(shù)生成相應(yīng)的 Key孔庭,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式尺上,之后將 Key 相同的元素分為一組。
函數(shù)實(shí)現(xiàn)如下:
1)將用戶函數(shù)預(yù)處理:
val cleanF = sc.clean(f)
2)對(duì)數(shù)據(jù) map 進(jìn)行函數(shù)操作圆到,最后再進(jìn)行 groupByKey 分組操作怎抛。
this.map(t => (cleanF(t), t)).groupByKey(p)
其中, p 確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù)芽淡,也就決定了并行化的程度马绝。
圖7 中方框代表一個(gè) RDD 分區(qū),相同key 的元素合并到一個(gè)組挣菲。例如 V1 和 V2 合并為 V富稻, Value 為 V1,V2。形成 V,Seq(V1,V2)白胀。
圖 7 groupBy 算子對(duì) RDD 轉(zhuǎn)換
(8) filter
filter 函數(shù)功能是對(duì)元素進(jìn)行過(guò)濾椭赋,對(duì)每個(gè) 元 素 應(yīng) 用 f 函 數(shù), 返 回 值 為 true 的 元 素 在RDD 中保留或杠,返回值為 false 的元素將被過(guò)濾掉纹份。 內(nèi) 部 實(shí) 現(xiàn) 相 當(dāng) 于 生 成 FilteredRDD(this,sc.clean(f))。
下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
圖 8 中每個(gè)方框代表一個(gè) RDD 分區(qū)蔓涧, T 可以是任意的類(lèi)型件已。通過(guò)用戶自定義的過(guò)濾函數(shù) f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作元暴,將滿足條件篷扩、返回結(jié)果為 true 的數(shù)據(jù)項(xiàng)保留。例如茉盏,過(guò)濾掉 V2 和 V3 保留了 V1鉴未,為區(qū)分命名為 V’1。
圖 8 filter 算子對(duì) RDD 轉(zhuǎn)換
(9)distinct
distinct將RDD中的元素進(jìn)行去重操作鸠姨。圖9中的每個(gè)方框代表一個(gè)RDD分區(qū)铜秆,通過(guò)distinct函數(shù),將數(shù)據(jù)去重讶迁。 例如连茧,重復(fù)數(shù)據(jù)V1、 V1去重后只保留一份V1巍糯。
圖9 distinct算子對(duì)RDD轉(zhuǎn)換
(10)subtract
subtract相當(dāng)于進(jìn)行集合的差操作啸驯,RDD 1去除RDD 1和RDD 2交集中的所有元素。圖10中左側(cè)的大方框代表兩個(gè)RDD祟峦,大方框內(nèi)的小方框代表RDD的分區(qū)罚斗。 右側(cè)大方框
代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)宅楞。 V1在兩個(gè)RDD中均有针姿,根據(jù)差集運(yùn)算規(guī)則,新RDD不保留厌衙,V2在第一個(gè)RDD有距淫,第二個(gè)RDD沒(méi)有,則在新RDD元素中包含V2迅箩。
圖10 subtract算子對(duì)RDD轉(zhuǎn)換
(11) sample
sample 將 RDD 這個(gè)集合內(nèi)的元素進(jìn)行采樣溉愁,獲取所有元素的子集处铛。用戶可以設(shè)定是否有放回的抽樣饲趋、百分比、隨機(jī)種子撤蟆,進(jìn)而決定采樣方式奕塑。內(nèi)部實(shí)現(xiàn)是生成 SampledRDD(withReplacement, fraction家肯, seed)龄砰。
函數(shù)參數(shù)設(shè)置:
‰ withReplacement=true,表示有放回的抽樣。
‰ withReplacement=false换棚,表示無(wú)放回的抽樣式镐。
圖 11中 的 每 個(gè) 方 框 是 一 個(gè) RDD 分 區(qū)。 通 過(guò) sample 函 數(shù)固蚤, 采 樣 50% 的 數(shù) 據(jù)娘汞。V1、 V2夕玩、 U1你弦、 U2、U3燎孟、U4 采樣出數(shù)據(jù) V1 和 U1禽作、 U2 形成新的 RDD。
圖11 sample 算子對(duì) RDD 轉(zhuǎn)換
(12)takeSample
takeSample()函數(shù)和上面的sample函數(shù)是一個(gè)原理揩页,但是不使用相對(duì)比例采樣旷偿,而是按設(shè)定的采樣個(gè)數(shù)進(jìn)行采樣,同時(shí)返回結(jié)果不再是RDD碍沐,而是相當(dāng)于對(duì)采樣后的數(shù)據(jù)進(jìn)行
Collect()狸捅,返回結(jié)果的集合為單機(jī)的數(shù)組。
圖12中左側(cè)的方框代表分布式的各個(gè)節(jié)點(diǎn)上的分區(qū)累提,右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組尘喝。 通過(guò)takeSample對(duì)數(shù)據(jù)采樣,設(shè)置為采樣一份數(shù)據(jù)斋陪,返回結(jié)果為V1朽褪。
圖12 takeSample算子對(duì)RDD轉(zhuǎn)換
(13) cache
cache 將 RDD 元素從磁盤(pán)緩存到內(nèi)存。 相當(dāng)于 persist(MEMORY_ONLY) 函數(shù)的功能无虚。
圖13 中每個(gè)方框代表一個(gè) RDD 分區(qū)缔赠,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤(pán),通過(guò) cache 算子將數(shù)據(jù)緩存在內(nèi)存友题。
圖 13 Cache 算子對(duì) RDD 轉(zhuǎn)換
(14) persist
persist 函數(shù)對(duì) RDD 進(jìn)行緩存操作嗤堰。數(shù)據(jù)緩存在哪里依據(jù) StorageLevel 這個(gè)枚舉類(lèi)型進(jìn)行確定。 有以下幾種類(lèi)型的組合(見(jiàn)10)度宦, DISK 代表磁盤(pán)踢匣,MEMORY 代表內(nèi)存, SER 代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)戈抄。
下面為函數(shù)定義离唬, StorageLevel 是枚舉類(lèi)型,代表存儲(chǔ)模式划鸽,用戶可以通過(guò)圖 14-1 按需進(jìn)行選擇输莺。
persist(newLevel:StorageLevel)
圖 14-1 中列出persist 函數(shù)可以進(jìn)行緩存的模式戚哎。例如,MEMORY_AND_DISK_SER 代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤(pán)嫂用,并且以序列化的方式存儲(chǔ)型凳,其他同理。
圖 14-1 persist 算子對(duì) RDD 轉(zhuǎn)換
圖 14-2 中方框代表 RDD 分區(qū)嘱函。 disk 代表存儲(chǔ)在磁盤(pán)啰脚, mem 代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初全部存儲(chǔ)在磁盤(pán)实夹,通過(guò) persist(MEMORY_AND_DISK) 將數(shù)據(jù)緩存到內(nèi)存橄浓,但是有的分區(qū)無(wú)法容納在內(nèi)存,將含有 V1亮航、 V2荸实、 V3 的RDD存儲(chǔ)到磁盤(pán),將含有U1缴淋,U2的RDD仍舊存儲(chǔ)在內(nèi)存准给。
圖 14-2 Persist 算子對(duì) RDD 轉(zhuǎn)換
(15) mapValues
mapValues :針對(duì)(Key, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作重抖,而不對(duì) Key 進(jìn)行處理露氮。
圖 15 中的方框代表 RDD 分區(qū)。 a=>a+2 代表對(duì) (V1,1) 這樣的 Key Value 數(shù)據(jù)對(duì)钟沛,數(shù)據(jù)只對(duì) Value 中的 1 進(jìn)行加 2 操作畔规,返回結(jié)果為 3。
圖 15 mapValues 算子 RDD 對(duì)轉(zhuǎn)換
(16) combineByKey
下面代碼為 combineByKey 函數(shù)的定義:
combineByKey[C](createCombiner:(V) C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
說(shuō)明:
‰ createCombiner: V => C恨统, C 不存在的情況下叁扫,比如通過(guò) V 創(chuàng)建 seq C。
‰ mergeValue: (C畜埋, V) => C莫绣,當(dāng) C 已經(jīng)存在的情況下,需要 merge悠鞍,比如把 item V
加到 seq C 中对室,或者疊加。
mergeCombiners: (C咖祭, C) => C掩宜,合并兩個(gè) C。
‰ partitioner: Partitioner, Shuff le 時(shí)需要的 Partitioner心肪。
‰ mapSideCombine : Boolean = true锭亏,為了減小傳輸量纠吴,很多 combine 可以在 map
端先做硬鞍,比如疊加,可以先在一個(gè) partition 中把所有相同的 key 的 value 疊加,
再 shuff le固该。
‰ serializerClass: String = null锅减,傳輸需要序列化,用戶可以自定義序列化類(lèi):
例如伐坏,相當(dāng)于將元素為 (Int怔匣, Int) 的 RDD 轉(zhuǎn)變?yōu)榱?(Int, Seq[Int]) 類(lèi)型元素的 RDD桦沉。圖 16中的方框代表 RDD 分區(qū)每瞒。如圖,通過(guò) combineByKey纯露, 將 (V1,2)剿骨, (V1,1)數(shù)據(jù)合并為( V1,Seq(2,1))。
圖 16 comBineByKey 算子對(duì) RDD 轉(zhuǎn)換
(17) reduceByKey
reduceByKey 是比 combineByKey 更簡(jiǎn)單的一種情況埠褪,只是兩個(gè)值合并成一個(gè)值浓利,( Int, Int V)to (Int钞速, Int C)贷掖,比如疊加。所以 createCombiner reduceBykey 很簡(jiǎn)單渴语,就是直接返回 v苹威,而 mergeValue和 mergeCombiners 邏輯是相同的,沒(méi)有區(qū)別驾凶。
函數(shù)實(shí)現(xiàn):
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
圖17中的方框代表 RDD 分區(qū)屠升。通過(guò)用戶自定義函數(shù) (A,B) => (A + B) 函數(shù),將相同 key 的數(shù)據(jù) (V1,2) 和 (V1,1) 的 value 相加運(yùn)算狭郑,結(jié)果為( V1,3)腹暖。
圖 17 reduceByKey 算子對(duì) RDD 轉(zhuǎn)換
(18)partitionBy
partitionBy函數(shù)對(duì)RDD進(jìn)行分區(qū)操作。
函數(shù)定義如下翰萨。
partitionBy(partitioner:Partitioner)
如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致脏答,則不重分區(qū),如果不一致亩鬼,則相當(dāng)于根據(jù)分區(qū)器生成一個(gè)新的ShuffledRDD殖告。
圖18中的方框代表RDD分區(qū)。 通過(guò)新的分區(qū)策略將原來(lái)在不同分區(qū)的V1雳锋、 V2數(shù)據(jù)都合并到了一個(gè)分區(qū)黄绩。
圖18 partitionBy算子對(duì)RDD轉(zhuǎn)換
(19)Cogroup
cogroup函數(shù)將兩個(gè)RDD進(jìn)行協(xié)同劃分,cogroup函數(shù)的定義如下玷过。
cogroup[W](other: RDD[(K爽丹, W)]筑煮, numPartitions: Int): RDD[(K, (Iterable[V]粤蝎, Iterable[W]))]
對(duì)在兩個(gè)RDD中的Key-Value類(lèi)型的元素真仲,每個(gè)RDD相同Key的元素分別聚合為一個(gè)集合,并且返回兩個(gè)RDD中對(duì)應(yīng)Key的元素集合的迭代器初澎。
〗沼Α(K, (Iterable[V]碑宴, Iterable[W]))
其中软啼,Key和Value,Value是兩個(gè)RDD下相同Key的兩個(gè)數(shù)據(jù)集合的迭代器所構(gòu)成的元組延柠。
圖19中的大方框代表RDD焰宣,大方框內(nèi)的小方框代表RDD中的分區(qū)。 將RDD1中的數(shù)據(jù)(U1捕仔,1)匕积、 (U1,2)和RDD2中的數(shù)據(jù)(U1榜跌,2)合并為(U1闪唆,((1,2)钓葫,(2)))悄蕾。
圖19 Cogroup算子對(duì)RDD轉(zhuǎn)換
(20) join
join 對(duì)兩個(gè)需要連接的 RDD 進(jìn)行 cogroup函數(shù)操作,將相同 key 的數(shù)據(jù)能夠放到一個(gè)分區(qū)础浮,在 cogroup 操作之后形成的新 RDD 對(duì)每個(gè)key 下的元素進(jìn)行笛卡爾積的操作帆调,返回的結(jié)果再展平,對(duì)應(yīng) key 下的所有元組形成一個(gè)集合豆同。最后返回 RDD[(K番刊, (V, W))]影锈。
下 面 代 碼 為 join 的 函 數(shù) 實(shí) 現(xiàn)芹务, 本 質(zhì) 是通 過(guò) cogroup 算 子 先 進(jìn) 行 協(xié) 同 劃 分, 再 通 過(guò)flatMapValues 將合并的數(shù)據(jù)打散鸭廷。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
圖 20是對(duì)兩個(gè) RDD 的 join 操作示意圖枣抱。大方框代表 RDD,小方框代表 RDD 中的分區(qū)辆床。函數(shù)對(duì)相同 key 的元素佳晶,如 V1 為 key 做連接后結(jié)果為 (V1,(1,1)) 和 (V1,(1,2))。
圖 20 join 算子對(duì) RDD 轉(zhuǎn)換
(21)eftOutJoin和rightOutJoin
LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空讼载,如果為空轿秧,則填充為空中跌。 如果不為空,則將數(shù)據(jù)進(jìn)行連接運(yùn)算淤刃,并
返回結(jié)果。
下面代碼是leftOutJoin的實(shí)現(xiàn)吱型。
if (ws.isEmpty) {
vs.map(v => (v逸贾, None))
} else {
for (v <- vs; w <- ws) yield (v津滞, Some(w))
}
2. Actions 算子
本質(zhì)上在 Action 算子中通過(guò) SparkContext 進(jìn)行了提交作業(yè)的 runJob 操作铝侵,觸發(fā)了RDD DAG 的執(zhí)行。
例如触徐, Action 算子 collect 函數(shù)的代碼如下咪鲜,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖析:
/**
* Return an array that contains all of the elements in this RDD.
/
def collect(): Array[T] = {
/ 提交 Job/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _)
}
(22) foreach
foreach 對(duì) RDD 中的每個(gè)元素都應(yīng)用 f 函數(shù)操作,不返回 RDD 和 Array撞鹉, 而是返回Uint疟丙。圖22表示 foreach 算子通過(guò)用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作。本例中自定義函數(shù)為 println()鸟雏,控制臺(tái)打印所有數(shù)據(jù)項(xiàng)享郊。
圖 22 foreach 算子對(duì) RDD 轉(zhuǎn)換
(23) saveAsTextFile
函數(shù)將數(shù)據(jù)輸出,存儲(chǔ)到 HDFS 的指定目錄孝鹊。
下面為 saveAsTextFile 函數(shù)的內(nèi)部實(shí)現(xiàn)炊琉,其內(nèi)部
通過(guò)調(diào)用 saveAsHadoopFile 進(jìn)行實(shí)現(xiàn):
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFileTextOutputFormat[NullWritable, Text]
將 RDD 中的每個(gè)元素映射轉(zhuǎn)變?yōu)?(null, x.toString)又活,然后再將其寫(xiě)入 HDFS苔咪。
圖 23中左側(cè)方框代表 RDD 分區(qū),右側(cè)方框代表 HDFS 的 Block柳骄。通過(guò)函數(shù)將RDD 的每個(gè)分區(qū)存儲(chǔ)為 HDFS 中的一個(gè) Block团赏。
圖 23 saveAsHadoopFile 算子對(duì) RDD 轉(zhuǎn)換
(24)saveAsObjectFile
saveAsObjectFile將分區(qū)中的每10個(gè)元素組成一個(gè)Array,然后將這個(gè)Array序列化耐薯,映射為(Null馆里,BytesWritable(Y))的元素,寫(xiě)入HDFS為SequenceFile的格式可柿。
下面代碼為函數(shù)內(nèi)部實(shí)現(xiàn)鸠踪。
map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
圖24中的左側(cè)方框代表RDD分區(qū)复斥,右側(cè)方框代表HDFS的Block营密。 通過(guò)函數(shù)將RDD的每個(gè)分區(qū)存儲(chǔ)為HDFS上的一個(gè)Block。
圖24 saveAsObjectFile算子對(duì)RDD轉(zhuǎn)換
(25) collect
collect 相當(dāng)于 toArray目锭, toArray 已經(jīng)過(guò)時(shí)不推薦使用评汰, collect 將分布式的 RDD 返回為一個(gè)單機(jī)的 scala Array 數(shù)組纷捞。在這個(gè)數(shù)組上運(yùn)用 scala 的函數(shù)式操作。
圖 25中左側(cè)方框代表 RDD 分區(qū)被去,右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組主儡。通過(guò)函數(shù)操作,將結(jié)果返回到 Driver 程序所在的節(jié)點(diǎn)惨缆,以數(shù)組形式存儲(chǔ)糜值。
圖 25 Collect 算子對(duì) RDD 轉(zhuǎn)換
(26)collectAsMap
collectAsMap對(duì)(K,V)型的RDD數(shù)據(jù)返回一個(gè)單機(jī)HashMap坯墨。 對(duì)于重復(fù)K的RDD元素寂汇,后面的元素覆蓋前面的元素。
圖26中的左側(cè)方框代表RDD分區(qū)捣染,右側(cè)方框代表單機(jī)數(shù)組骄瓣。 數(shù)據(jù)通過(guò)collectAsMap函數(shù)返回給Driver程序計(jì)算結(jié)果,結(jié)果以HashMap形式存儲(chǔ)耍攘。
圖26 CollectAsMap算子對(duì)RDD轉(zhuǎn)換
(27)reduceByKeyLocally
實(shí)現(xiàn)的是先reduce再collectAsMap的功能榕栏,先對(duì)RDD的整體進(jìn)行reduce操作,然后再收集所有結(jié)果返回為一個(gè)HashMap蕾各。
(28)lookup
下面代碼為lookup的聲明臼膏。
lookup(key:K):Seq[V]
Lookup函數(shù)對(duì)(Key,Value)型的RDD操作示损,返回指定Key對(duì)應(yīng)的元素形成的Seq渗磅。 這個(gè)函數(shù)處理優(yōu)化的部分在于,如果這個(gè)RDD包含分區(qū)器检访,則只會(huì)對(duì)應(yīng)處理K所在的分區(qū)始鱼,然后返回由(K,V)形成的Seq脆贵。 如果RDD不包含分區(qū)器医清,則需要對(duì)全RDD元素進(jìn)行暴力掃描處理,搜索指定K對(duì)應(yīng)的元素卖氨。
圖28中的左側(cè)方框代表RDD分區(qū)会烙,右側(cè)方框代表Seq,最后結(jié)果返回到Driver所在節(jié)點(diǎn)的應(yīng)用中筒捺。
圖28 lookup對(duì)RDD轉(zhuǎn)換
(29) count
count 返回整個(gè) RDD 的元素個(gè)數(shù)柏腻。
內(nèi)部函數(shù)實(shí)現(xiàn)為:
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
圖 29中,返回?cái)?shù)據(jù)的個(gè)數(shù)為 5系吭。一個(gè)方塊代表一個(gè) RDD 分區(qū)五嫂。
圖29 count 對(duì) RDD 算子轉(zhuǎn)換
(30)top
top可返回最大的k個(gè)元素。 函數(shù)定義如下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]
相近函數(shù)說(shuō)明如下沃缘。
·top返回最大的k個(gè)元素躯枢。
·take返回最小的k個(gè)元素。
·takeOrdered返回最小的k個(gè)元素槐臀,并且在返回的數(shù)組中保持元素的順序锄蹂。
·first相當(dāng)于top(1)返回整個(gè)RDD中的前k個(gè)元素,可以定義排序的方式Ordering[T]水慨。
返回的是一個(gè)含前k個(gè)元素的數(shù)組得糜。
(31)reduce
reduce函數(shù)相當(dāng)于對(duì)RDD中的元素進(jìn)行reduceLeft函數(shù)的操作。 函數(shù)實(shí)現(xiàn)如下讥巡。
Some(iter.reduceLeft(cleanF))
reduceLeft先對(duì)兩個(gè)元素<K掀亩,V>進(jìn)行reduce函數(shù)操作舔哪,然后將結(jié)果和迭代器取出的下一個(gè)元素<k欢顷,V>進(jìn)行reduce函數(shù)操作,直到迭代器遍歷完所有元素捉蚤,得到最后結(jié)果抬驴。在RDD中,先對(duì)每個(gè)分區(qū)中的所有元素<K缆巧,V>的集合分別進(jìn)行reduceLeft布持。 每個(gè)分區(qū)形成的結(jié)果相當(dāng)于一個(gè)元素<K,V>陕悬,再對(duì)這個(gè)結(jié)果集合進(jìn)行reduceleft操作题暖。
例如:用戶自定義函數(shù)如下。
f:(A捉超,B)=>(A._1+”@”+B._1胧卤,A._2+B._2)
圖31中的方框代表一個(gè)RDD分區(qū),通過(guò)用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運(yùn)算拼岳。 示例
最后的返回結(jié)果為V1@[1]V2U枝誊!@U2@U3@U4,12惜纸。
圖31 reduce算子對(duì)RDD轉(zhuǎn)換
(32)fold
fold和reduce的原理相同叶撒,但是與reduce不同,相當(dāng)于每個(gè)reduce時(shí)耐版,迭代器取的第一個(gè)元素是zeroValue祠够。
圖32中通過(guò)下面的用戶自定義函數(shù)進(jìn)行fold運(yùn)算,圖中的一個(gè)方框代表一個(gè)RDD分區(qū)粪牲。 讀者可以參照reduce函數(shù)理解哪审。
fold((”V0@”,2))( (A虑瀑,B)=>(A._1+”@”+B._1湿滓,A._2+B._2))
圖32 fold算子對(duì)RDD轉(zhuǎn)換
(33)aggregate
aggregate先對(duì)每個(gè)分區(qū)的所有元素進(jìn)行aggregate操作滴须,再對(duì)分區(qū)的結(jié)果進(jìn)行fold操作。
aggreagate與fold和reduce的不同之處在于叽奥,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集扔水,這種聚集是并行化的。 而在fold和reduce函數(shù)的運(yùn)算過(guò)程中朝氓,每個(gè)分區(qū)中需要進(jìn)行串行處理魔市,每個(gè)分區(qū)串行計(jì)算完結(jié)果,結(jié)果再按之前的方式進(jìn)行聚集赵哲,并返回最終聚集結(jié)果待德。
函數(shù)的定義如下。
aggregate[B](z: B)(seqop: (B枫夺,A) => B将宪,combop: (B,B) => B): B
圖33通過(guò)用戶自定義函數(shù)對(duì)RDD 進(jìn)行aggregate的聚集操作橡庞,圖中的每個(gè)方框代表一個(gè)RDD分區(qū)较坛。
rdd.aggregate(”V0@”,2)((A扒最,B)=>(A._1+”@”+B._1丑勤,A._2+B.2)),(A吧趣,B)=>(A.1+”@”+B_1法竞,A.@+B.2))
最后,介紹兩個(gè)計(jì)算模型中的兩個(gè)特殊變量强挫。
廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表岔霸,以及廣播大變量等場(chǎng)景。 這些數(shù)據(jù)集合在單節(jié)點(diǎn)內(nèi)存能夠容納纠拔,不需要像RDD那樣在節(jié)點(diǎn)之間打散存儲(chǔ)秉剑。
Spark運(yùn)行時(shí)把廣播變量數(shù)據(jù)發(fā)到各個(gè)節(jié)點(diǎn),并保存下來(lái)稠诲,后續(xù)計(jì)算可以復(fù)用侦鹏。 相比Hadoo的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享臀叙。 Broadcast的底層實(shí)現(xiàn)采用了BT機(jī)制略水。
圖33 aggregate算子對(duì)RDD轉(zhuǎn)換
②代表V。
∪坝③代表U渊涝。
accumulator變量:允許做全局累加操作,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運(yùn)行指標(biāo)的情景。