RDD 的轉(zhuǎn)換可以產(chǎn)生新的 RDD。
如上圖,外圈是 RDD 的轉(zhuǎn)換成洗,內(nèi)圈紅色 RDD 是轉(zhuǎn)換產(chǎn)生的新 RDD五督。
按顏色區(qū)分轉(zhuǎn)換:
- 綠色是單 RDD 窄依賴轉(zhuǎn)換
- 黑色是多 RDD 窄依賴轉(zhuǎn)換
- 紫色是 KV 洗牌型轉(zhuǎn)換
- 黃色是重分區(qū)轉(zhuǎn)換
- 藍(lán)色是特例的轉(zhuǎn)換
單 RDD 窄依賴轉(zhuǎn)換
MapPartitionRDD
這個(gè) RDD 在第一次分析中已經(jīng)分析過。簡(jiǎn)單復(fù)述一下:
- 依賴列表:一個(gè)窄依賴瓶殃,依賴上游 RDD
- 分區(qū)列表:上游 RDD 的分區(qū)列表
- 計(jì)算流程:映射關(guān)系(輸入一個(gè)分區(qū)充包,返回一個(gè)迭代器)
- 分區(qū)器 :上游 RDD 的分區(qū)器
- 存儲(chǔ)位置:上游 RDD 的優(yōu)先位置
可見除了計(jì)算流程,其他都是上游 RDD 的內(nèi)容遥椿。
- map 傳入一個(gè)帶“值到值”轉(zhuǎn)化函數(shù)的迭代器(例如字符串到字符串長(zhǎng)度)
- mapPartitions 傳入一個(gè)“迭代器到迭代器”的轉(zhuǎn)化函數(shù)基矮,如果需要按分區(qū)做一些比較重的過程(例如數(shù)據(jù)庫連接等)
- flatMap 傳入一個(gè)“迭代器到迭代器的迭代器”的轉(zhuǎn)化函數(shù)(例如押逼,統(tǒng)計(jì)字母策肝,“字符串的迭代器”到“‘字符的迭代器’的迭代器”)
- filter 傳入了一個(gè)帶“值到布爾值”篩選函數(shù)的迭代器
PartitionwiseSampledRDD
在分區(qū)中采樣的RDD
- 分區(qū)列表:在上游的分區(qū)的基礎(chǔ)上包裝一個(gè)采樣過程内颗,形成一個(gè)新的分區(qū)
PartitionwiseSampledRDDPartition
- 計(jì)算流程:采樣器返回的迭代器
- 其他成分:與上游 RDD 相同
PartitionwiseSampledRDD升薯,有放回的采樣用泊松采樣器立润,無放回的采樣用伯努利采樣器屉来,傳給分區(qū)器焕阿。
多 RDD 窄依賴
UnionRDD
- 依賴列表:每個(gè)上游 RDD 一個(gè)
RangeDependency
钾挟,每個(gè)RangeDependency
依賴上游 RDD 的所有分區(qū) - 分區(qū)列表:每個(gè)上游 RDD 一個(gè)
UnionPartition
舔株,構(gòu)成列表 - 計(jì)算流程:獲得目標(biāo)分區(qū)的迭代器
- 分區(qū)器 :None
- 存儲(chǔ)位置:每個(gè)上游 RDD 的優(yōu)先位置
CartesianRDD
笛卡爾積莺琳,是兩個(gè) RDD 每個(gè)數(shù)據(jù)都進(jìn)行一次關(guān)聯(lián)。下文中兩個(gè) RDD 的關(guān)聯(lián)中载慈,兩個(gè) RDD 分別稱為 rdd1惭等、rdd2。
- 依賴列表:兩個(gè)窄依賴組成的數(shù)組娃肿,分別依賴 rdd1咕缎、rdd2
- 分區(qū)列表:“rdd1的分區(qū)數(shù) 乘以 rdd2的分區(qū)數(shù)”個(gè)分區(qū)
- 計(jì)算流程:rdd1的一條記錄與 rdd2的一條記錄合成元組
- 分區(qū)器 :None
- 存儲(chǔ)位置:rdd1、rdd2的存儲(chǔ)位置的積
洗牌型轉(zhuǎn)換
洗牌型轉(zhuǎn)換料扰,是多個(gè) RDD 關(guān)聯(lián)的的轉(zhuǎn)換凭豪。
CoGroupedRDD
多個(gè)源 RDD 依據(jù) key 關(guān)聯(lián),key 相同的合并晒杈,形成最終的目標(biāo) RDD嫂伞。
- 依賴列表:每個(gè)源 RDD 一個(gè)依賴,構(gòu)成列表拯钻。如果源 RDD 的分區(qū)器與目標(biāo)的分區(qū)器相同帖努,則是1-to-1依賴,如果不同粪般,則是洗牌依賴
- 分區(qū)列表:目標(biāo) RDD 分區(qū)器指定的分區(qū)數(shù)量個(gè)
CoGroupPartition
拼余,每個(gè)分區(qū)記錄了數(shù)據(jù)來源分區(qū)。其中如果是洗牌依賴的數(shù)據(jù)源亩歹,需要洗牌過程匙监,具體洗牌過程以后再分析 - 計(jì)算流程:返回一個(gè)迭代器凡橱,迭代對(duì)象是 key 和 key 對(duì)應(yīng)源分區(qū)迭代器的數(shù)組 組成的元祖
- 分區(qū)器 :目標(biāo) RDD 的分區(qū)器
- 存儲(chǔ)位置:None
ShuffledRDD
同樣是多個(gè)源 RDD 依據(jù) key 關(guān)聯(lián),key 相同的做排序或聚合運(yùn)算亭姥,形成最終的目標(biāo) RDD稼钩。
- 依賴列表:一個(gè)洗牌依賴,依賴所有上游 RDD
- 分區(qū)列表:目標(biāo) RDD 分區(qū)器指定的分區(qū)數(shù)量個(gè)
ShuffledRDDPartition
达罗,每個(gè)分區(qū)只有一個(gè)編號(hào)(因?yàn)槊總€(gè)上游分區(qū)) - 計(jì)算流程:洗牌過程坝撑,具體洗牌過程以后再分析
- 分區(qū)器 :目標(biāo) RDD 的分區(qū)器
- 存儲(chǔ)位置:None
除了這五個(gè)成員以外,還有另外幾個(gè)重要的成員:序列化器粮揉、key 排序器巡李、聚合器、map 端合并器滔蝉,他們都將用于洗牌
其他
- coalesce击儡,是減少分區(qū)數(shù)量塔沃,可以在過濾之后蝠引,使數(shù)據(jù)更集中,以提高效率
- repartition蛀柴,是重新分區(qū)螃概,增加或減少分區(qū)數(shù)量,數(shù)據(jù)隨機(jī)重新分配鸽疾,可以消除分區(qū)間的數(shù)據(jù)量差異
- pipe吊洼,是與外部程序管道關(guān)聯(lián),從外部程序中獲取數(shù)據(jù)制肮。
Scala語法
在 RDD.scala中冒窍,幾乎每一個(gè)轉(zhuǎn)換和操作函數(shù)都會(huì)有一個(gè)withScope
,例如:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
withScope
是一個(gè)函數(shù)豺鼻,調(diào)用了RDDOperationScope.withScope
方法:
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
withScope
就像是一個(gè) AOP(面向切面編程)综液,嵌入到所有RDD 的轉(zhuǎn)換和操作的函數(shù)中,RDDOperationScope
會(huì)把調(diào)用棧記錄下來儒飒,用于繪制Spark UI的 DAG(有向無環(huán)圖谬莹,可以理解為 Spark 的執(zhí)行計(jì)劃)。
我們用下面的代碼簡(jiǎn)單演示一下 Scala 用函數(shù)做 AOP:
object Day1 {
def main(args: Array[String]) = {
Range(1,5).foreach(twice)
println()
Array("China", "Beijing", "HelloWorld").foreach(length)
}
def twice(i: Int): Int = aopPrint {
i * 2
}
def length(s: String): Int = aopPrint {
s.length
}
def aopPrint[U](i: => U): U = {
print(i + " ")
i
}
}
aopPrint
的 入?yún)⑹恰耙粋€(gè)返回類型為U的函數(shù)”桩了。這段程序中aopPrint
就是一個(gè)模擬的切面附帽,作用是把所有的函數(shù)返回值打印出來。結(jié)果是:
2 4 6 8
5 7 10
從代碼上看井誉,aopPrint
并沒有降低代碼的可讀性蕉扮。讀者依然能很清楚地讀懂twice
和length
函數(shù)。打印返回結(jié)果這個(gè)流程是獨(dú)立于函數(shù)之外的切面颗圣。
結(jié)論
- RDD 的轉(zhuǎn)換分圖上幾種
- RDD 的轉(zhuǎn)換可以看成是產(chǎn)生新的 RDD喳钟,而新的 RDD 記錄了每一個(gè)分區(qū)依賴上游的哪些分區(qū)爪模、每個(gè)分區(qū)如何用上游分區(qū)計(jì)算而來
本文源碼
spark/core/rdd包下的部分 RDD 類spark/core/src/main/scala/org/apache/spark/rdd at master · apache/spark · GitHub
@ Kangying Village, Beijing, China