Spark常用算子詳解 2020-05-07

Spark常用算子詳解

Spark的算子的分類(lèi)

從大方向來(lái)說(shuō),Spark 算子大致可以分為以下兩類(lèi):

1)Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè)编整,完成作業(yè)中間過(guò)程處理诈铛。

Transformation 操作是延遲計(jì)算的宫纬,也就是說(shuō)從一個(gè)RDD 轉(zhuǎn)換生成另一個(gè) RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有 Action 操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算闺兢。

2)Action 行動(dòng)算子:這類(lèi)算子會(huì)觸發(fā) SparkContext 提交 Job 作業(yè)雇初。

Action 算子會(huì)觸發(fā) Spark 提交作業(yè)(Job),并將數(shù)據(jù)輸出 Spark系統(tǒng)烁兰。

從小方向來(lái)說(shuō)耐亏,Spark 算子大致可以分為以下三類(lèi):

1)Value數(shù)據(jù)類(lèi)型的Transformation算子,這種變換并不觸發(fā)提交作業(yè)沪斟,針對(duì)處理的數(shù)據(jù)項(xiàng)是Value型的數(shù)據(jù)广辰。
  2)Key-Value數(shù)據(jù)類(lèi)型的Transfromation算子,這種變換并不觸發(fā)提交作業(yè)主之,針對(duì)處理的數(shù)據(jù)項(xiàng)是Key-Value型的數(shù)據(jù)對(duì)择吊。

3)Action算子,這類(lèi)算子會(huì)觸發(fā)SparkContext提交Job作業(yè)槽奕。

1)Value數(shù)據(jù)類(lèi)型的Transformation算子

一几睛、輸入分區(qū)與輸出分區(qū)一對(duì)一型

1痴颊、map算子

2儿普、flatMap算子

3、mapPartitions算子

4、glom算子

二突想、輸入分區(qū)與輸出分區(qū)多對(duì)一型

5煤篙、union算子

6萌壳、cartesian算子

三场梆、輸入分區(qū)與輸出分區(qū)多對(duì)多型

7、grouBy算子

四晴弃、輸出分區(qū)為輸入分區(qū)子集型

8掩幢、filter算子

9、distinct算子

10上鞠、subtract算子

11际邻、sample算子

12、takeSample算子

五芍阎、Cache型

13枯怖、cache算子

14、persist算子

2)Key-Value數(shù)據(jù)類(lèi)型的Transfromation算子

一能曾、輸入分區(qū)與輸出分區(qū)一對(duì)一

15、mapValues算子

二肿轨、對(duì)單個(gè)RDD或兩個(gè)RDD聚集

單個(gè)RDD聚集

16寿冕、combineByKey算子

17、reduceByKey算子

18椒袍、partitionBy算子

兩個(gè)RDD聚集

19驼唱、Cogroup算子

三、連接

20驹暑、join算子

21玫恳、leftOutJoin和 rightOutJoin算子

3)Action算子

一、無(wú)輸出

22优俘、foreach算子

二京办、HDFS

23、saveAsTextFile算子

24帆焕、saveAsObjectFile算子

三惭婿、Scala集合和數(shù)據(jù)類(lèi)型

25、collect算子

26叶雹、collectAsMap算子

27财饥、reduceByKeyLocally算子

28、lookup算子

29折晦、count算子

30钥星、top算子

31、reduce算子

32满着、fold算子

33谦炒、aggregate算子

1. Transformations 算子

(1) map

將原來(lái) RDD 的每個(gè)數(shù)據(jù)項(xiàng)通過(guò) map 中的用戶自定義函數(shù) f 映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素贯莺。源碼中 map 算子相當(dāng)于初始化一個(gè) RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))编饺。

圖 1中每個(gè)方框表示一個(gè) RDD 分區(qū)乖篷,左側(cè)的分區(qū)經(jīng)過(guò)用戶自定義函數(shù) f:T->U 映射為右側(cè)的新 RDD 分區(qū)。但是透且,實(shí)際只有等到 Action算子觸發(fā)后撕蔼,這個(gè) f 函數(shù)才會(huì)和其他函數(shù)在一個(gè)stage 中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。在圖 1 中的第一個(gè)分區(qū)秽誊,數(shù)據(jù)記錄 V1 輸入 f鲸沮,通過(guò) f 轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄 V’1。

img

圖1 map 算子對(duì) RDD 轉(zhuǎn)換

(2) flatMap

將原來(lái) RDD 中的每個(gè)元素通過(guò)函數(shù) f 轉(zhuǎn)換為新的元素锅论,并將生成的 RDD 的每個(gè)集合中的元素合并為一個(gè)集合讼溺,內(nèi)部創(chuàng)建 FlatMappedRDD(this,sc.clean(f))最易。
  圖 2 表 示 RDD 的 一 個(gè) 分 區(qū) 怒坯,進(jìn) 行 flatMap函 數(shù) 操 作, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U藻懒, T和 U 可以是任意的數(shù)據(jù)類(lèi)型剔猿。將分區(qū)中的數(shù)據(jù)通過(guò)用戶自定義函數(shù) f 轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè) RDD 分區(qū)嬉荆,小方框代表一個(gè)集合归敬。 V1、 V2鄙早、 V3 在一個(gè)集合作為 RDD 的一個(gè)數(shù)據(jù)項(xiàng)汪茧,可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V’1限番、 V’2舱污、 V’3 后,將原來(lái)的數(shù)組或容器結(jié)合拆散弥虐,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項(xiàng)慌闭。

[圖片上傳失敗...(image-ab6902-1588818548418)]

圖2 flapMap 算子對(duì) RDD 轉(zhuǎn)換

(3) mapPartitions

mapPartitions 函 數(shù) 獲 取 到 每 個(gè) 分 區(qū) 的 迭 代器,在 函 數(shù) 中 通 過(guò) 這 個(gè) 分 區(qū) 整 體 的 迭 代 器 對(duì)整 個(gè) 分 區(qū) 的 元 素 進(jìn) 行 操 作躯舔。 內(nèi) 部 實(shí) 現(xiàn) 是 生 成
MapPartitionsRDD驴剔。圖 3 中的方框代表一個(gè) RDD 分區(qū)。圖 3 中粥庄,用戶通過(guò)函數(shù) f (iter)=>iter.f ilter(_>=3) 對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過(guò)濾丧失,大于和等于 3 的數(shù)據(jù)保留。一個(gè)方塊代表一個(gè) RDD 分區(qū)惜互,含有 1布讹、 2琳拭、 3 的分區(qū)過(guò)濾只剩下元素 3。

img

圖3 mapPartitions 算子對(duì) RDD 轉(zhuǎn)換

(4)glom

glom函數(shù)將每個(gè)分區(qū)形成一個(gè)數(shù)組描验,內(nèi)部實(shí)現(xiàn)是返回的GlommedRDD白嘁。 圖4中的每個(gè)方框代表一個(gè)RDD分區(qū)。圖4中的方框代表一個(gè)分區(qū)膘流。 該圖表示含有V1絮缅、 V2、 V3的分區(qū)通過(guò)函數(shù)glom形成一數(shù)組Array[(V1)呼股,(V2)耕魄,(V3)]。

img

圖 4 glom算子對(duì)RDD轉(zhuǎn)換

(5) union

使用 union 函數(shù)時(shí)需要保證兩個(gè) RDD 元素的數(shù)據(jù)類(lèi)型相同彭谁,返回的 RDD 數(shù)據(jù)類(lèi)型和被合并的 RDD 元素?cái)?shù)據(jù)類(lèi)型相同吸奴,并不進(jìn)行去重操作,保存所有元素缠局。如果想去重
可以使用 distinct()则奥。同時(shí) Spark 還提供更為簡(jiǎn)潔的使用 union 的 API,通過(guò) ++ 符號(hào)相當(dāng)于 union 函數(shù)操作狭园。
圖 5 中左側(cè)大方框代表兩個(gè) RDD读处,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD妙啃,大方框內(nèi)的小方框代表分區(qū)。

含有V1俊戳、V2揖赴、U1、U2抑胎、U3燥滑、U4的RDD和含有V1、V8阿逃、U5铭拧、U6、U7恃锉、U8的RDD合并所有元素形成一個(gè)RDD搀菩。V1、V1破托、V2肪跋、V8形成一個(gè)分區(qū),U1土砂、U2州既、U3谜洽、U4、U5吴叶、U6阐虚、U7、U8形成一個(gè)分區(qū)蚌卤。

img

圖 5 union 算子對(duì) RDD 轉(zhuǎn)換

(6) cartesian

? 對(duì) 兩 個(gè) RDD 內(nèi) 的 所 有 元 素 進(jìn) 行 笛 卡 爾 積 操 作实束。 操 作 后, 內(nèi) 部 實(shí) 現(xiàn) 返 回CartesianRDD造寝。圖6中左側(cè)大方框代表兩個(gè) RDD磕洪,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD诫龙,大方框內(nèi)的小方框代表分區(qū)析显。圖6中的大方框代表RDD,大方框中的小方框代表RDD分區(qū)签赃。
例 如: V1 和 另 一 個(gè) RDD 中 的 W1谷异、 W2、 Q5 進(jìn) 行 笛 卡 爾 積 運(yùn) 算 形 成 (V1,W1)锦聊、(V1,W2)歹嘹、 (V1,Q5)。


img

? 圖 6 cartesian 算子對(duì) RDD 轉(zhuǎn)換

(7) groupBy

groupBy :將元素通過(guò)函數(shù)生成相應(yīng)的 Key孔庭,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式尺上,之后將 Key 相同的元素分為一組。
  函數(shù)實(shí)現(xiàn)如下:
  1)將用戶函數(shù)預(yù)處理:
  val cleanF = sc.clean(f)
  2)對(duì)數(shù)據(jù) map 進(jìn)行函數(shù)操作圆到,最后再進(jìn)行 groupByKey 分組操作怎抛。

this.map(t => (cleanF(t), t)).groupByKey(p)
  其中, p 確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù)芽淡,也就決定了并行化的程度马绝。

圖7 中方框代表一個(gè) RDD 分區(qū),相同key 的元素合并到一個(gè)組挣菲。例如 V1 和 V2 合并為 V富稻, Value 為 V1,V2。形成 V,Seq(V1,V2)白胀。

img

圖 7 groupBy 算子對(duì) RDD 轉(zhuǎn)換

(8) filter

filter 函數(shù)功能是對(duì)元素進(jìn)行過(guò)濾椭赋,對(duì)每個(gè) 元 素 應(yīng) 用 f 函 數(shù), 返 回 值 為 true 的 元 素 在RDD 中保留或杠,返回值為 false 的元素將被過(guò)濾掉纹份。 內(nèi) 部 實(shí) 現(xiàn) 相 當(dāng) 于 生 成 FilteredRDD(this,sc.clean(f))。
下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
  圖 8 中每個(gè)方框代表一個(gè) RDD 分區(qū)蔓涧, T 可以是任意的類(lèi)型件已。通過(guò)用戶自定義的過(guò)濾函數(shù) f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作元暴,將滿足條件篷扩、返回結(jié)果為 true 的數(shù)據(jù)項(xiàng)保留。例如茉盏,過(guò)濾掉 V2 和 V3 保留了 V1鉴未,為區(qū)分命名為 V’1。

img

圖 8 filter 算子對(duì) RDD 轉(zhuǎn)換

(9)distinct

distinct將RDD中的元素進(jìn)行去重操作鸠姨。圖9中的每個(gè)方框代表一個(gè)RDD分區(qū)铜秆,通過(guò)distinct函數(shù),將數(shù)據(jù)去重讶迁。 例如连茧,重復(fù)數(shù)據(jù)V1、 V1去重后只保留一份V1巍糯。

img

圖9 distinct算子對(duì)RDD轉(zhuǎn)換

(10)subtract

subtract相當(dāng)于進(jìn)行集合的差操作啸驯,RDD 1去除RDD 1和RDD 2交集中的所有元素。圖10中左側(cè)的大方框代表兩個(gè)RDD祟峦,大方框內(nèi)的小方框代表RDD的分區(qū)罚斗。 右側(cè)大方框
代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)宅楞。 V1在兩個(gè)RDD中均有针姿,根據(jù)差集運(yùn)算規(guī)則,新RDD不保留厌衙,V2在第一個(gè)RDD有距淫,第二個(gè)RDD沒(méi)有,則在新RDD元素中包含V2迅箩。

 
img

圖10 subtract算子對(duì)RDD轉(zhuǎn)換

(11) sample

sample 將 RDD 這個(gè)集合內(nèi)的元素進(jìn)行采樣溉愁,獲取所有元素的子集处铛。用戶可以設(shè)定是否有放回的抽樣饲趋、百分比、隨機(jī)種子撤蟆,進(jìn)而決定采樣方式奕塑。內(nèi)部實(shí)現(xiàn)是生成 SampledRDD(withReplacement, fraction家肯, seed)龄砰。

函數(shù)參數(shù)設(shè)置:
‰   withReplacement=true,表示有放回的抽樣。
‰   withReplacement=false换棚,表示無(wú)放回的抽樣式镐。
  圖 11中 的 每 個(gè) 方 框 是 一 個(gè) RDD 分 區(qū)。 通 過(guò) sample 函 數(shù)固蚤, 采 樣 50% 的 數(shù) 據(jù)娘汞。V1、 V2夕玩、 U1你弦、 U2、U3燎孟、U4 采樣出數(shù)據(jù) V1 和 U1禽作、 U2 形成新的 RDD。

img

圖11 sample 算子對(duì) RDD 轉(zhuǎn)換

(12)takeSample

takeSample()函數(shù)和上面的sample函數(shù)是一個(gè)原理揩页,但是不使用相對(duì)比例采樣旷偿,而是按設(shè)定的采樣個(gè)數(shù)進(jìn)行采樣,同時(shí)返回結(jié)果不再是RDD碍沐,而是相當(dāng)于對(duì)采樣后的數(shù)據(jù)進(jìn)行
Collect()狸捅,返回結(jié)果的集合為單機(jī)的數(shù)組。
  圖12中左側(cè)的方框代表分布式的各個(gè)節(jié)點(diǎn)上的分區(qū)累提,右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組尘喝。 通過(guò)takeSample對(duì)數(shù)據(jù)采樣,設(shè)置為采樣一份數(shù)據(jù)斋陪,返回結(jié)果為V1朽褪。

img

圖12  takeSample算子對(duì)RDD轉(zhuǎn)換

(13) cache

cache 將 RDD 元素從磁盤(pán)緩存到內(nèi)存。 相當(dāng)于 persist(MEMORY_ONLY) 函數(shù)的功能无虚。
圖13 中每個(gè)方框代表一個(gè) RDD 分區(qū)缔赠,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤(pán),通過(guò) cache 算子將數(shù)據(jù)緩存在內(nèi)存友题。


img

圖 13 Cache 算子對(duì) RDD 轉(zhuǎn)換

(14) persist

persist 函數(shù)對(duì) RDD 進(jìn)行緩存操作嗤堰。數(shù)據(jù)緩存在哪里依據(jù) StorageLevel 這個(gè)枚舉類(lèi)型進(jìn)行確定。 有以下幾種類(lèi)型的組合(見(jiàn)10)度宦, DISK 代表磁盤(pán)踢匣,MEMORY 代表內(nèi)存, SER 代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)戈抄。

下面為函數(shù)定義离唬, StorageLevel 是枚舉類(lèi)型,代表存儲(chǔ)模式划鸽,用戶可以通過(guò)圖 14-1 按需進(jìn)行選擇输莺。
  persist(newLevel:StorageLevel)
  圖 14-1 中列出persist 函數(shù)可以進(jìn)行緩存的模式戚哎。例如,MEMORY_AND_DISK_SER 代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤(pán)嫂用,并且以序列化的方式存儲(chǔ)型凳,其他同理。

img

圖 14-1 persist 算子對(duì) RDD 轉(zhuǎn)換

圖 14-2 中方框代表 RDD 分區(qū)嘱函。 disk 代表存儲(chǔ)在磁盤(pán)啰脚, mem 代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初全部存儲(chǔ)在磁盤(pán)实夹,通過(guò) persist(MEMORY_AND_DISK) 將數(shù)據(jù)緩存到內(nèi)存橄浓,但是有的分區(qū)無(wú)法容納在內(nèi)存,將含有 V1亮航、 V2荸实、 V3 的RDD存儲(chǔ)到磁盤(pán),將含有U1缴淋,U2的RDD仍舊存儲(chǔ)在內(nèi)存准给。

img

圖 14-2 Persist 算子對(duì) RDD 轉(zhuǎn)換

(15) mapValues

mapValues :針對(duì)(Key, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作重抖,而不對(duì) Key 進(jìn)行處理露氮。

圖 15 中的方框代表 RDD 分區(qū)。 a=>a+2 代表對(duì) (V1,1) 這樣的 Key Value 數(shù)據(jù)對(duì)钟沛,數(shù)據(jù)只對(duì) Value 中的 1 進(jìn)行加 2 操作畔规,返回結(jié)果為 3。

img

圖 15 mapValues 算子 RDD 對(duì)轉(zhuǎn)換

(16) combineByKey

下面代碼為 combineByKey 函數(shù)的定義:
  combineByKey[C](createCombiner:(V) C,
  mergeValue:(C, V) C,
  mergeCombiners:(C, C) C,
  partitioner:Partitioner,
  mapSideCombine:Boolean=true,
  serializer:Serializer=null):RDD[(K,C)]

說(shuō)明:
‰   createCombiner: V => C恨统, C 不存在的情況下叁扫,比如通過(guò) V 創(chuàng)建 seq C。
‰   mergeValue: (C畜埋, V) => C莫绣,當(dāng) C 已經(jīng)存在的情況下,需要 merge悠鞍,比如把 item V
加到 seq C 中对室,或者疊加。
   mergeCombiners: (C咖祭, C) => C掩宜,合并兩個(gè) C。
‰   partitioner: Partitioner, Shuff le 時(shí)需要的 Partitioner心肪。
‰   mapSideCombine : Boolean = true锭亏,為了減小傳輸量纠吴,很多 combine 可以在 map
端先做硬鞍,比如疊加,可以先在一個(gè) partition 中把所有相同的 key 的 value 疊加,
再 shuff le固该。
‰   serializerClass: String = null锅减,傳輸需要序列化,用戶可以自定義序列化類(lèi):

例如伐坏,相當(dāng)于將元素為 (Int怔匣, Int) 的 RDD 轉(zhuǎn)變?yōu)榱?(Int, Seq[Int]) 類(lèi)型元素的 RDD桦沉。圖 16中的方框代表 RDD 分區(qū)每瞒。如圖,通過(guò) combineByKey纯露, 將 (V1,2)剿骨, (V1,1)數(shù)據(jù)合并為( V1,Seq(2,1))。

  
img

圖 16 comBineByKey 算子對(duì) RDD 轉(zhuǎn)換

(17) reduceByKey

reduceByKey 是比 combineByKey 更簡(jiǎn)單的一種情況埠褪,只是兩個(gè)值合并成一個(gè)值浓利,( Int, Int V)to (Int钞速, Int C)贷掖,比如疊加。所以 createCombiner reduceBykey 很簡(jiǎn)單渴语,就是直接返回 v苹威,而 mergeValue和 mergeCombiners 邏輯是相同的,沒(méi)有區(qū)別驾凶。
函數(shù)實(shí)現(xiàn):
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
  圖17中的方框代表 RDD 分區(qū)屠升。通過(guò)用戶自定義函數(shù) (A,B) => (A + B) 函數(shù),將相同 key 的數(shù)據(jù) (V1,2) 和 (V1,1) 的 value 相加運(yùn)算狭郑,結(jié)果為( V1,3)腹暖。


img

圖 17 reduceByKey 算子對(duì) RDD 轉(zhuǎn)換

(18)partitionBy

partitionBy函數(shù)對(duì)RDD進(jìn)行分區(qū)操作。
  函數(shù)定義如下翰萨。
  partitionBy(partitioner:Partitioner)
  如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致脏答,則不重分區(qū),如果不一致亩鬼,則相當(dāng)于根據(jù)分區(qū)器生成一個(gè)新的ShuffledRDD殖告。
  圖18中的方框代表RDD分區(qū)。 通過(guò)新的分區(qū)策略將原來(lái)在不同分區(qū)的V1雳锋、 V2數(shù)據(jù)都合并到了一個(gè)分區(qū)黄绩。

img

圖18  partitionBy算子對(duì)RDD轉(zhuǎn)換

(19)Cogroup

cogroup函數(shù)將兩個(gè)RDD進(jìn)行協(xié)同劃分,cogroup函數(shù)的定義如下玷过。
  cogroup[W](other: RDD[(K爽丹, W)]筑煮, numPartitions: Int): RDD[(K, (Iterable[V]粤蝎, Iterable[W]))]
  對(duì)在兩個(gè)RDD中的Key-Value類(lèi)型的元素真仲,每個(gè)RDD相同Key的元素分別聚合為一個(gè)集合,并且返回兩個(gè)RDD中對(duì)應(yīng)Key的元素集合的迭代器初澎。
 〗沼Α(K, (Iterable[V]碑宴, Iterable[W]))
  其中软啼,Key和Value,Value是兩個(gè)RDD下相同Key的兩個(gè)數(shù)據(jù)集合的迭代器所構(gòu)成的元組延柠。
  圖19中的大方框代表RDD焰宣,大方框內(nèi)的小方框代表RDD中的分區(qū)。 將RDD1中的數(shù)據(jù)(U1捕仔,1)匕积、 (U1,2)和RDD2中的數(shù)據(jù)(U1榜跌,2)合并為(U1闪唆,((1,2)钓葫,(2)))悄蕾。

img

圖19 Cogroup算子對(duì)RDD轉(zhuǎn)換

(20) join

join 對(duì)兩個(gè)需要連接的 RDD 進(jìn)行 cogroup函數(shù)操作,將相同 key 的數(shù)據(jù)能夠放到一個(gè)分區(qū)础浮,在 cogroup 操作之后形成的新 RDD 對(duì)每個(gè)key 下的元素進(jìn)行笛卡爾積的操作帆调,返回的結(jié)果再展平,對(duì)應(yīng) key 下的所有元組形成一個(gè)集合豆同。最后返回 RDD[(K番刊, (V, W))]影锈。

下 面 代 碼 為 join 的 函 數(shù) 實(shí) 現(xiàn)芹务, 本 質(zhì) 是通 過(guò) cogroup 算 子 先 進(jìn) 行 協(xié) 同 劃 分, 再 通 過(guò)flatMapValues 將合并的數(shù)據(jù)打散鸭廷。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
圖 20是對(duì)兩個(gè) RDD 的 join 操作示意圖枣抱。大方框代表 RDD,小方框代表 RDD 中的分區(qū)辆床。函數(shù)對(duì)相同 key 的元素佳晶,如 V1 為 key 做連接后結(jié)果為 (V1,(1,1)) 和 (V1,(1,2))。

img

圖 20 join 算子對(duì) RDD 轉(zhuǎn)換

(21)eftOutJoin和rightOutJoin

LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空讼载,如果為空轿秧,則填充為空中跌。 如果不為空,則將數(shù)據(jù)進(jìn)行連接運(yùn)算淤刃,并
返回結(jié)果。
下面代碼是leftOutJoin的實(shí)現(xiàn)吱型。
if (ws.isEmpty) {
vs.map(v => (v逸贾, None))
} else {
for (v <- vs; w <- ws) yield (v津滞, Some(w))
}

2. Actions 算子

本質(zhì)上在 Action 算子中通過(guò) SparkContext 進(jìn)行了提交作業(yè)的 runJob 操作铝侵,觸發(fā)了RDD DAG 的執(zhí)行。
例如触徐, Action 算子 collect 函數(shù)的代碼如下咪鲜,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖析:

/**
* Return an array that contains all of the elements in this RDD.
/
def collect(): Array[T] = {
/
提交 Job/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _
)
}

(22) foreach

foreach 對(duì) RDD 中的每個(gè)元素都應(yīng)用 f 函數(shù)操作,不返回 RDD 和 Array撞鹉, 而是返回Uint疟丙。圖22表示 foreach 算子通過(guò)用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作。本例中自定義函數(shù)為 println()鸟雏,控制臺(tái)打印所有數(shù)據(jù)項(xiàng)享郊。

  
img

圖 22 foreach 算子對(duì) RDD 轉(zhuǎn)換

(23) saveAsTextFile

函數(shù)將數(shù)據(jù)輸出,存儲(chǔ)到 HDFS 的指定目錄孝鹊。

下面為 saveAsTextFile 函數(shù)的內(nèi)部實(shí)現(xiàn)炊琉,其內(nèi)部
  通過(guò)調(diào)用 saveAsHadoopFile 進(jìn)行實(shí)現(xiàn):
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFileTextOutputFormat[NullWritable, Text]
將 RDD 中的每個(gè)元素映射轉(zhuǎn)變?yōu)?(null, x.toString)又活,然后再將其寫(xiě)入 HDFS苔咪。
  圖 23中左側(cè)方框代表 RDD 分區(qū),右側(cè)方框代表 HDFS 的 Block柳骄。通過(guò)函數(shù)將RDD 的每個(gè)分區(qū)存儲(chǔ)為 HDFS 中的一個(gè) Block团赏。

img

圖 23 saveAsHadoopFile 算子對(duì) RDD 轉(zhuǎn)換

(24)saveAsObjectFile

saveAsObjectFile將分區(qū)中的每10個(gè)元素組成一個(gè)Array,然后將這個(gè)Array序列化耐薯,映射為(Null馆里,BytesWritable(Y))的元素,寫(xiě)入HDFS為SequenceFile的格式可柿。
  下面代碼為函數(shù)內(nèi)部實(shí)現(xiàn)鸠踪。
  map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
  圖24中的左側(cè)方框代表RDD分區(qū)复斥,右側(cè)方框代表HDFS的Block营密。 通過(guò)函數(shù)將RDD的每個(gè)分區(qū)存儲(chǔ)為HDFS上的一個(gè)Block。

img

圖24 saveAsObjectFile算子對(duì)RDD轉(zhuǎn)換

(25) collect

collect 相當(dāng)于 toArray目锭, toArray 已經(jīng)過(guò)時(shí)不推薦使用评汰, collect 將分布式的 RDD 返回為一個(gè)單機(jī)的 scala Array 數(shù)組纷捞。在這個(gè)數(shù)組上運(yùn)用 scala 的函數(shù)式操作。
  圖 25中左側(cè)方框代表 RDD 分區(qū)被去,右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組主儡。通過(guò)函數(shù)操作,將結(jié)果返回到 Driver 程序所在的節(jié)點(diǎn)惨缆,以數(shù)組形式存儲(chǔ)糜值。

img

圖 25 Collect 算子對(duì) RDD 轉(zhuǎn)換

(26)collectAsMap

collectAsMap對(duì)(K,V)型的RDD數(shù)據(jù)返回一個(gè)單機(jī)HashMap坯墨。 對(duì)于重復(fù)K的RDD元素寂汇,后面的元素覆蓋前面的元素。
  圖26中的左側(cè)方框代表RDD分區(qū)捣染,右側(cè)方框代表單機(jī)數(shù)組骄瓣。 數(shù)據(jù)通過(guò)collectAsMap函數(shù)返回給Driver程序計(jì)算結(jié)果,結(jié)果以HashMap形式存儲(chǔ)耍攘。

img

圖26 CollectAsMap算子對(duì)RDD轉(zhuǎn)換

(27)reduceByKeyLocally

實(shí)現(xiàn)的是先reduce再collectAsMap的功能榕栏,先對(duì)RDD的整體進(jìn)行reduce操作,然后再收集所有結(jié)果返回為一個(gè)HashMap蕾各。

(28)lookup

下面代碼為lookup的聲明臼膏。
lookup(key:K):Seq[V]
Lookup函數(shù)對(duì)(Key,Value)型的RDD操作示损,返回指定Key對(duì)應(yīng)的元素形成的Seq渗磅。 這個(gè)函數(shù)處理優(yōu)化的部分在于,如果這個(gè)RDD包含分區(qū)器检访,則只會(huì)對(duì)應(yīng)處理K所在的分區(qū)始鱼,然后返回由(K,V)形成的Seq脆贵。 如果RDD不包含分區(qū)器医清,則需要對(duì)全RDD元素進(jìn)行暴力掃描處理,搜索指定K對(duì)應(yīng)的元素卖氨。
  圖28中的左側(cè)方框代表RDD分區(qū)会烙,右側(cè)方框代表Seq,最后結(jié)果返回到Driver所在節(jié)點(diǎn)的應(yīng)用中筒捺。

img

圖28 lookup對(duì)RDD轉(zhuǎn)換

(29) count

count 返回整個(gè) RDD 的元素個(gè)數(shù)柏腻。
  內(nèi)部函數(shù)實(shí)現(xiàn)為:
  defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
  圖 29中,返回?cái)?shù)據(jù)的個(gè)數(shù)為 5系吭。一個(gè)方塊代表一個(gè) RDD 分區(qū)五嫂。

img

圖29 count 對(duì) RDD 算子轉(zhuǎn)換

(30)top

top可返回最大的k個(gè)元素。 函數(shù)定義如下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函數(shù)說(shuō)明如下沃缘。
·top返回最大的k個(gè)元素躯枢。
·take返回最小的k個(gè)元素。
·takeOrdered返回最小的k個(gè)元素槐臀,并且在返回的數(shù)組中保持元素的順序锄蹂。
·first相當(dāng)于top(1)返回整個(gè)RDD中的前k個(gè)元素,可以定義排序的方式Ordering[T]水慨。
返回的是一個(gè)含前k個(gè)元素的數(shù)組得糜。

(31)reduce

reduce函數(shù)相當(dāng)于對(duì)RDD中的元素進(jìn)行reduceLeft函數(shù)的操作。 函數(shù)實(shí)現(xiàn)如下讥巡。
  Some(iter.reduceLeft(cleanF))
  reduceLeft先對(duì)兩個(gè)元素<K掀亩,V>進(jìn)行reduce函數(shù)操作舔哪,然后將結(jié)果和迭代器取出的下一個(gè)元素<k欢顷,V>進(jìn)行reduce函數(shù)操作,直到迭代器遍歷完所有元素捉蚤,得到最后結(jié)果抬驴。在RDD中,先對(duì)每個(gè)分區(qū)中的所有元素<K缆巧,V>的集合分別進(jìn)行reduceLeft布持。 每個(gè)分區(qū)形成的結(jié)果相當(dāng)于一個(gè)元素<K,V>陕悬,再對(duì)這個(gè)結(jié)果集合進(jìn)行reduceleft操作题暖。
  例如:用戶自定義函數(shù)如下。
  f:(A捉超,B)=>(A._1+”@”+B._1胧卤,A._2+B._2)
  圖31中的方框代表一個(gè)RDD分區(qū),通過(guò)用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運(yùn)算拼岳。 示例
最后的返回結(jié)果為V1@[1]V2U枝誊!@U2@U3@U4,12惜纸。

img

圖31 reduce算子對(duì)RDD轉(zhuǎn)換

(32)fold

fold和reduce的原理相同叶撒,但是與reduce不同,相當(dāng)于每個(gè)reduce時(shí)耐版,迭代器取的第一個(gè)元素是zeroValue祠够。
  圖32中通過(guò)下面的用戶自定義函數(shù)進(jìn)行fold運(yùn)算,圖中的一個(gè)方框代表一個(gè)RDD分區(qū)粪牲。 讀者可以參照reduce函數(shù)理解哪审。
  fold((”V0@”,2))( (A虑瀑,B)=>(A._1+”@”+B._1湿滓,A._2+B._2))

img

圖32 fold算子對(duì)RDD轉(zhuǎn)換

(33)aggregate

aggregate先對(duì)每個(gè)分區(qū)的所有元素進(jìn)行aggregate操作滴须,再對(duì)分區(qū)的結(jié)果進(jìn)行fold操作。
  aggreagate與fold和reduce的不同之處在于叽奥,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集扔水,這種聚集是并行化的。 而在fold和reduce函數(shù)的運(yùn)算過(guò)程中朝氓,每個(gè)分區(qū)中需要進(jìn)行串行處理魔市,每個(gè)分區(qū)串行計(jì)算完結(jié)果,結(jié)果再按之前的方式進(jìn)行聚集赵哲,并返回最終聚集結(jié)果待德。
  函數(shù)的定義如下。
aggregate[B](z: B)(seqop: (B枫夺,A) => B将宪,combop: (B,B) => B): B
  圖33通過(guò)用戶自定義函數(shù)對(duì)RDD 進(jìn)行aggregate的聚集操作橡庞,圖中的每個(gè)方框代表一個(gè)RDD分區(qū)较坛。
  rdd.aggregate(”V0@”,2)((A扒最,B)=>(A._1+”@”+B._1丑勤,A._2+B.2)),(A吧趣,B)=>(A.1+”@”+B_1法竞,A.@+B.2))
  最后,介紹兩個(gè)計(jì)算模型中的兩個(gè)特殊變量强挫。
  廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表岔霸,以及廣播大變量等場(chǎng)景。 這些數(shù)據(jù)集合在單節(jié)點(diǎn)內(nèi)存能夠容納纠拔,不需要像RDD那樣在節(jié)點(diǎn)之間打散存儲(chǔ)秉剑。
Spark運(yùn)行時(shí)把廣播變量數(shù)據(jù)發(fā)到各個(gè)節(jié)點(diǎn),并保存下來(lái)稠诲,后續(xù)計(jì)算可以復(fù)用侦鹏。 相比Hadoo的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享臀叙。 Broadcast的底層實(shí)現(xiàn)采用了BT機(jī)制略水。

img

圖33 aggregate算子對(duì)RDD轉(zhuǎn)換

②代表V。
 ∪坝③代表U渊涝。
  accumulator變量:允許做全局累加操作,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運(yùn)行指標(biāo)的情景。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末跨释,一起剝皮案震驚了整個(gè)濱河市胸私,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鳖谈,老刑警劉巖岁疼,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異缆娃,居然都是意外死亡捷绒,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)贯要,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)暖侨,“玉大人,你說(shuō)我怎么就攤上這事崇渗∽侄海” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵显押,是天一觀的道長(zhǎng)扳肛。 經(jīng)常有香客問(wèn)我傻挂,道長(zhǎng)乘碑,這世上最難降的妖魔是什么塔次? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任吓肋,我火速辦了婚禮,結(jié)果婚禮上另假,老公的妹妹穿的比我還像新娘绪抛。我一直安慰自己资铡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布幢码。 她就那樣靜靜地躺著笤休,像睡著了一般。 火紅的嫁衣襯著肌膚如雪症副。 梳的紋絲不亂的頭發(fā)上店雅,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音贞铣,去河邊找鬼闹啦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛辕坝,可吹牛的內(nèi)容都是我干的窍奋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼琳袄!你這毒婦竟也來(lái)了江场?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤窖逗,失蹤者是張志新(化名)和其女友劉穎扛稽,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體滑负,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡在张,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了矮慕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帮匾。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖痴鳄,靈堂內(nèi)的尸體忽然破棺而出瘟斜,到底是詐尸還是另有隱情,我是刑警寧澤痪寻,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布螺句,位于F島的核電站,受9級(jí)特大地震影響橡类,放射性物質(zhì)發(fā)生泄漏蛇尚。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一顾画、第九天 我趴在偏房一處隱蔽的房頂上張望取劫。 院中可真熱鬧,春花似錦研侣、人聲如沸谱邪。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)惦银。三九已至,卻和暖如春末誓,著一層夾襖步出監(jiān)牢的瞬間扯俱,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工基显, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蘸吓,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓撩幽,卻偏偏與公主長(zhǎng)得像库继,于是被迫代替她去往敵國(guó)和親箩艺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359