Spark算子總結(jié)版

Spark的算子的分類

從大方向來說,Spark 算子大致可以分為以下兩類:

1)Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè),完成作業(yè)中間過程處理。

 Transformation 操作是延遲計(jì)算的,也就是說從一個(gè)RDD 轉(zhuǎn)換生成另一個(gè) RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有 Action 操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算鱼鼓。

2)Action 行動(dòng)算子:這類算子會(huì)觸發(fā) SparkContext 提交 Job 作業(yè)。

 Action 算子會(huì)觸發(fā) Spark 提交作業(yè)(Job)该编,并將數(shù)據(jù)輸出 Spark系統(tǒng)迄本。


從小方向來說,Spark 算子大致可以分為以下三類:

1)Value數(shù)據(jù)類型的Transformation算子课竣,這種變換并不觸發(fā)提交作業(yè)嘉赎,針對(duì)處理的數(shù)據(jù)項(xiàng)是Value型的數(shù)據(jù)。

2)Key-Value數(shù)據(jù)類型的Transfromation算子于樟,這種變換并不觸發(fā)提交作業(yè)公条,針對(duì)處理的數(shù)據(jù)項(xiàng)是Key-Value型的數(shù)據(jù)對(duì)。

3)Action算子迂曲,這類算子會(huì)觸發(fā)SparkContext提交Job作業(yè)靶橱。



1)Value數(shù)據(jù)類型的Transformation算子

  一、輸入分區(qū)與輸出分區(qū)一對(duì)一型

    1路捧、map算子

    2关霸、flatMap算子

    3、mapPartitions算子

    4杰扫、glom算子

  二队寇、輸入分區(qū)與輸出分區(qū)多對(duì)一型

    5、union算子

    6章姓、cartesian算子

  三佳遣、輸入分區(qū)與輸出分區(qū)多對(duì)多型

    7、grouBy算子

  四凡伊、輸出分區(qū)為輸入分區(qū)子集型

    8零渐、filter算子

    9、distinct算子

    10窗声、subtract算子

    11相恃、sample算子

? ? ?   12辜纲、takeSample算子

?  五笨觅、Cache型

    13拦耐、cache算子  

    14、persist算子


2)Key-Value數(shù)據(jù)類型的Transfromation算子

  一见剩、輸入分區(qū)與輸出分區(qū)一對(duì)一

    15杀糯、mapValues算子

  二、對(duì)單個(gè)RDD或兩個(gè)RDD聚集

   單個(gè)RDD聚集

    16苍苞、combineByKey算子

    17固翰、reduceByKey算子

    18、partitionBy算子

?  兩個(gè)RDD聚集

? ? ? ? ? ? ??19羹呵、Cogroup算子

  三骂际、連接

    20、join算子

    21冈欢、leftOutJoin和 rightOutJoin算子


?3)Action算子

  一歉铝、無輸出

? ? ? ? ? ? ? ?22、foreach算子

  二凑耻、HDFS

    23太示、saveAsTextFile算子

    24、saveAsObjectFile算子

  三香浩、Scala集合和數(shù)據(jù)類型

    25类缤、collect算子

    26、collectAsMap算子

? ? ? ? ? ? ? 27邻吭、reduceByKeyLocally算子

 ? ? ? ? ? 28餐弱、lookup算子

    29、count算子

    30镜盯、top算子

    31岸裙、reduce算子

    32、fold算子

    33速缆、aggregate算子


1. Transformations 算子

(1)?map

將原來 RDD 的每個(gè)數(shù)據(jù)項(xiàng)通過?map 中的用戶自定義函數(shù) f?映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素降允。源碼中 map 算子相當(dāng)于初始化一個(gè) RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))艺糜。

圖 1中每個(gè)方框表示一個(gè) RDD 分區(qū)剧董,左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù) f:T->U?映射為右側(cè)的新 RDD 分區(qū)。但是破停,實(shí)際只有等到 Action算子觸發(fā)后翅楼,這個(gè) f 函數(shù)才會(huì)和其他函數(shù)在一個(gè)stage 中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。在圖 1 中的第一個(gè)分區(qū)真慢,數(shù)據(jù)記錄 V1 輸入 f毅臊,通過 f 轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄 V’1。

  圖1 ? ?map 算子對(duì) RDD 轉(zhuǎn)換

(2)?flatMap

將原來 RDD 中的每個(gè)元素通過函數(shù) f 轉(zhuǎn)換為新的元素黑界,并將生成的 RDD 的每個(gè)集合中的元素合并為一個(gè)集合管嬉,內(nèi)部創(chuàng)建 FlatMappedRDD(this细诸,sc.clean(f))几颜。

圖 2 表 示 RDD 的 一 個(gè) 分 區(qū) 婿脸,進(jìn) 行 flatMap函 數(shù) 操 作咽弦, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U,?T和 U 可以是任意的數(shù)據(jù)類型胎挎。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù) f 轉(zhuǎn)換為新的數(shù)據(jù)沟启。外部大方框可以認(rèn)為是一個(gè) RDD 分區(qū),小方框代表一個(gè)集合犹菇。 V1德迹、 V2、 V3 在一個(gè)集合作為 RDD 的一個(gè)數(shù)據(jù)項(xiàng)揭芍,可能存儲(chǔ)為數(shù)組或其他容器浦辨,轉(zhuǎn)換為V’1、 V’2沼沈、 V’3 后流酬,將原來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項(xiàng)列另。

圖2 ? ? flapMap 算子對(duì) RDD 轉(zhuǎn)換

(3)?mapPartitions

mapPartitions 函 數(shù) 獲 取 到 每 個(gè) 分 區(qū) 的 迭 代器芽腾,在 函 數(shù) 中 通 過 這 個(gè) 分 區(qū) 整 體 的 迭 代 器 對(duì)整 個(gè) 分 區(qū) 的 元 素 進(jìn) 行 操 作。 內(nèi) 部 實(shí) 現(xiàn) 是 生 成

MapPartitionsRDD页衙。圖 3 中的方框代表一個(gè) RDD 分區(qū)摊滔。圖 3 中,用戶通過函數(shù) f (iter)=>iter.f ilter(_>=3) 對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過濾店乐,大于和等于 3 的數(shù)據(jù)保留艰躺。一個(gè)方塊代表一個(gè) RDD 分區(qū),含有 1眨八、 2腺兴、 3 的分區(qū)過濾只剩下元素 3。

圖3 ?mapPartitions 算子對(duì) RDD 轉(zhuǎn)換

(4)glom

glom函數(shù)將每個(gè)分區(qū)形成一個(gè)數(shù)組廉侧,內(nèi)部實(shí)現(xiàn)是返回的GlommedRDD页响。 圖4中的每個(gè)方框代表一個(gè)RDD分區(qū)。圖4中的方框代表一個(gè)分區(qū)段誊。 該圖表示含有V1闰蚕、 V2、 V3的分區(qū)通過函數(shù)glom形成一數(shù)組Array[(V1)连舍,(V2)没陡,(V3)]。

圖 4 ??glom算子對(duì)RDD轉(zhuǎn)換

(5)?union

使用 union 函數(shù)時(shí)需要保證兩個(gè) RDD 元素的數(shù)據(jù)類型相同,返回的 RDD 數(shù)據(jù)類型和被合并的 RDD 元素?cái)?shù)據(jù)類型相同盼玄,并不進(jìn)行去重操作染簇,保存所有元素。如果想去重

可以使用 distinct()强岸。同時(shí) Spark 還提供更為簡(jiǎn)潔的使用 union 的 API,通過 ++ 符號(hào)相當(dāng)于 union 函數(shù)操作砾赔。

圖 5 中左側(cè)大方框代表兩個(gè) RDD蝌箍,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD暴心,大方框內(nèi)的小方框代表分區(qū)妓盲。

含有V1、V2专普、U1悯衬、U2、U3檀夹、U4的RDD和含有V1筋粗、V8、U5炸渡、U6娜亿、U7、U8的RDD合并所有元素形成一個(gè)RDD蚌堵。V1买决、V1、V2吼畏、V8形成一個(gè)分區(qū)督赤,U1、U2泻蚊、U3躲舌、U4、U5性雄、U6孽糖、U7、U8形成一個(gè)分區(qū)毅贮。

圖 5 ?union 算子對(duì) RDD 轉(zhuǎn)換

(6)?cartesian

對(duì) 兩 個(gè) RDD 內(nèi) 的 所 有 元 素?進(jìn) 行 笛 卡 爾 積 操 作办悟。 操 作 后, 內(nèi) 部 實(shí) 現(xiàn) 返 回CartesianRDD滩褥。圖6中左側(cè)大方框代表兩個(gè) RDD病蛉,大方框內(nèi)的小方框代表 RDD 的分區(qū)。右側(cè)大方框代表合并后的 RDD,大方框內(nèi)的小方框代表分區(qū)铺然。圖6中的大方框代表RDD俗孝,大方框中的小方框代表RDD分區(qū)。

例 如: V1 和 另 一 個(gè) RDD 中 的 W1魄健、 W2赋铝、 Q5 進(jìn) 行 笛 卡 爾 積 運(yùn) 算 形 成 (V1,W1)、(V1,W2)沽瘦、 (V1,Q5)革骨。

?圖 6 ?cartesian 算子對(duì) RDD 轉(zhuǎn)換

(7)?groupBy

groupBy :將元素通過函數(shù)生成相應(yīng)的 Key,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式析恋,之后將 Key 相同的元素分為一組良哲。

函數(shù)實(shí)現(xiàn)如下:

1)將用戶函數(shù)預(yù)處理:

val cleanF = sc.clean(f)

2)對(duì)數(shù)據(jù) map 進(jìn)行函數(shù)操作,最后再進(jìn)行 groupByKey 分組操作助隧。

this.map(t => (cleanF(t), t)).groupByKey(p)

其中筑凫, p 確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù),也就決定了并行化的程度并村。

圖7 中方框代表一個(gè) RDD 分區(qū)巍实,相同key 的元素合并到一個(gè)組。例如 V1 和 V2 合并為 V哩牍, Value 為 V1,V2蔫浆。形成 V,Seq(V1,V2)。

圖 7?groupBy 算子對(duì) RDD 轉(zhuǎn)換


(8)?filter

filter 函數(shù)功能是對(duì)元素進(jìn)行過濾姐叁,對(duì)每個(gè) 元 素 應(yīng) 用 f 函 數(shù)瓦盛, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉外潜。 內(nèi) 部 實(shí) 現(xiàn) 相 當(dāng) 于 生 成 FilteredRDD(this原环,sc.clean(f))。

下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):

deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

圖 8 中每個(gè)方框代表一個(gè) RDD 分區(qū)处窥, T 可以是任意的類型嘱吗。通過用戶自定義的過濾函數(shù) f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作滔驾,將滿足條件谒麦、返回結(jié)果為 true 的數(shù)據(jù)項(xiàng)保留。例如哆致,過濾掉 V2 和 V3 保留了 V1绕德,為區(qū)分命名為 V’1。

圖 8 ?filter 算子對(duì) RDD 轉(zhuǎn)換

(9)distinct

distinct將RDD中的元素進(jìn)行去重操作摊阀。圖9中的每個(gè)方框代表一個(gè)RDD分區(qū)耻蛇,通過distinct函數(shù)踪蹬,將數(shù)據(jù)去重。 例如臣咖,重復(fù)數(shù)據(jù)V1跃捣、 V1去重后只保留一份V1。

    圖9 ?distinct算子對(duì)RDD轉(zhuǎn)換


(10)subtract

subtract相當(dāng)于進(jìn)行集合的差操作夺蛇,RDD 1去除RDD 1和RDD 2交集中的所有元素疚漆。圖10中左側(cè)的大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)刁赦。 右側(cè)大方框

代表合并后的RDD娶聘,大方框內(nèi)的小方框代表分區(qū)。 V1在兩個(gè)RDD中均有截型,根據(jù)差集運(yùn)算規(guī)則,新RDD不保留儒溉,V2在第一個(gè)RDD有宦焦,第二個(gè)RDD沒有,則在新RDD元素中包含V2顿涣。

          圖10 ? subtract算子對(duì)RDD轉(zhuǎn)換


(11)?sample

sample 將 RDD 這個(gè)集合內(nèi)的元素進(jìn)行采樣波闹,獲取所有元素的子集。用戶可以設(shè)定是否有放回的抽樣涛碑、百分比精堕、隨機(jī)種子,進(jìn)而決定采樣方式蒲障。內(nèi)部實(shí)現(xiàn)是生成 SampledRDD(withReplacement歹篓, fraction, seed)揉阎。

函數(shù)參數(shù)設(shè)置:

‰   withReplacement=true庄撮,表示有放回的抽樣。

‰   withReplacement=false毙籽,表示無放回的抽樣洞斯。

圖 11中 的 每 個(gè) 方 框 是 一 個(gè) RDD 分 區(qū)。 通 過 sample 函 數(shù)坑赡, 采 樣 50% 的 數(shù) 據(jù)烙如。V1、 V2毅否、 U1亚铁、 U2、U3螟加、U4 采樣出數(shù)據(jù) V1 和 U1刀闷、 U2 形成新的 RDD熊泵。

       圖11 ?sample 算子對(duì) RDD 轉(zhuǎn)換


(12)takeSample

takeSample()函數(shù)和上面的sample函數(shù)是一個(gè)原理,但是不使用相對(duì)比例采樣甸昏,而是按設(shè)定的采樣個(gè)數(shù)進(jìn)行采樣顽分,同時(shí)返回結(jié)果不再是RDD,而是相當(dāng)于對(duì)采樣后的數(shù)據(jù)進(jìn)行

Collect()施蜜,返回結(jié)果的集合為單機(jī)的數(shù)組卒蘸。

圖12中左側(cè)的方框代表分布式的各個(gè)節(jié)點(diǎn)上的分區(qū),右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組翻默。 通過takeSample對(duì)數(shù)據(jù)采樣缸沃,設(shè)置為采樣一份數(shù)據(jù),返回結(jié)果為V1修械。

    圖12 ?  takeSample算子對(duì)RDD轉(zhuǎn)換


(13)?cache

cache?將 RDD 元素從磁盤緩存到內(nèi)存趾牧。 相當(dāng)于 persist(MEMORY_ONLY) 函數(shù)的功能。

圖13 中每個(gè)方框代表一個(gè) RDD 分區(qū)肯污,左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤翘单,通過 cache 算子將數(shù)據(jù)緩存在內(nèi)存。

      圖 13 Cache 算子對(duì) RDD 轉(zhuǎn)換


(14)?persist

persist 函數(shù)對(duì)?RDD 進(jìn)行緩存操作蹦渣。數(shù)據(jù)緩存在哪里依據(jù) StorageLevel 這個(gè)枚舉類型進(jìn)行確定哄芜。 有以下幾種類型的組合(見10), DISK 代表磁盤柬唯,MEMORY 代表內(nèi)存认臊, SER 代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)。

下面為函數(shù)定義锄奢, StorageLevel 是枚舉類型失晴,代表存儲(chǔ)模式,用戶可以通過圖 14-1 按需進(jìn)行選擇拘央。

persist(newLevel:StorageLevel)

圖 14-1 中列出persist 函數(shù)可以進(jìn)行緩存的模式师坎。例如,MEMORY_AND_DISK_SER 代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤堪滨,并且以序列化的方式存儲(chǔ)胯陋,其他同理。

            圖 14-1 ?persist 算子對(duì) RDD 轉(zhuǎn)換

  圖 14-2 中方框代表 RDD 分區(qū)袱箱。 disk 代表存儲(chǔ)在磁盤遏乔, mem 代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初全部存儲(chǔ)在磁盤发笔,通過 persist(MEMORY_AND_DISK) 將數(shù)據(jù)緩存到內(nèi)存盟萨,但是有的分區(qū)無法容納在內(nèi)存,將含有 V1了讨、 V2捻激、 V3 的RDD存儲(chǔ)到磁盤制轰,將含有U1,U2的RDD仍舊存儲(chǔ)在內(nèi)存胞谭。

? ? ? 圖 14-2 ? Persist 算子對(duì) RDD 轉(zhuǎn)換


(15)?mapValues

mapValues :針對(duì)(Key垃杖, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作,而不對(duì) Key 進(jìn)行處理丈屹。

? ? 圖 15 中的方框代表 RDD 分區(qū)调俘。 a=>a+2 代表對(duì) (V1,1) 這樣的 Key Value 數(shù)據(jù)對(duì),數(shù)據(jù)只對(duì) Value 中的 1 進(jìn)行加 2 操作旺垒,返回結(jié)果為 3彩库。

      圖 15 ? mapValues 算子 RDD 對(duì)轉(zhuǎn)換


(16)?combineByKey

下面代碼為 combineByKey 函數(shù)的定義:

combineByKey[C](createCombiner:(V) C,

mergeValue:(C, V) C,

mergeCombiners:(C, C) C,

partitioner:Partitioner,

mapSideCombine:Boolean=true,

serializer:Serializer=null):RDD[(K,C)]

說明:

‰   createCombiner: V => C, C 不存在的情況下先蒋,比如通過 V 創(chuàng)建 seq C骇钦。

‰   mergeValue: (C, V) => C竞漾,當(dāng) C 已經(jīng)存在的情況下眯搭,需要 merge,比如把 item V

加到 seq C 中畴蹭,或者疊加坦仍。

mergeCombiners: (C鳍烁, C) => C叨襟,合并兩個(gè) C。

‰   partitioner: Partitioner, Shuff le 時(shí)需要的 Partitioner幔荒。

‰   mapSideCombine : Boolean = true糊闽,為了減小傳輸量,很多 combine 可以在 map

端先做爹梁,比如疊加右犹,可以先在一個(gè) partition 中把所有相同的 key 的 value 疊加,

再 shuff le姚垃。

‰   serializerClass: String = null念链,傳輸需要序列化,用戶可以自定義序列化類:

例如积糯,相當(dāng)于將元素為 (Int掂墓, Int) 的 RDD 轉(zhuǎn)變?yōu)榱?(Int, Seq[Int]) 類型元素的 RDD看成。圖 16中的方框代表 RDD 分區(qū)君编。如圖,通過 combineByKey川慌, 將 (V1,2)吃嘿, (V1,1)數(shù)據(jù)合并為( V1,Seq(2,1))祠乃。

      圖 16? comBineByKey 算子對(duì) RDD 轉(zhuǎn)換


(17)?reduceByKey

reduceByKey 是比 combineByKey 更簡(jiǎn)單的一種情況,只是兩個(gè)值合并成一個(gè)值兑燥,( Int亮瓷, Int V)to (Int, Int C)贪嫂,比如疊加寺庄。所以 createCombiner reduceBykey 很簡(jiǎn)單,就是直接返回 v力崇,而 mergeValue和 mergeCombiners 邏輯是相同的斗塘,沒有區(qū)別。

函數(shù)實(shí)現(xiàn):

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

= {

combineByKey[V]((v: V) => v, func, func, partitioner)

}

圖17中的方框代表 RDD 分區(qū)亮靴。通過用戶自定義函數(shù) (A,B) => (A + B) 函數(shù)馍盟,將相同 key 的數(shù)據(jù) (V1,2) 和 (V1,1) 的 value 相加運(yùn)算,結(jié)果為( V1,3)茧吊。

        圖 17?reduceByKey 算子對(duì) RDD 轉(zhuǎn)換


(18)partitionBy

partitionBy函數(shù)對(duì)RDD進(jìn)行分區(qū)操作贞岭。

函數(shù)定義如下。

partitionBy(partitioner:Partitioner)

如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致搓侄,則不重分區(qū)瞄桨,如果不一致,則相當(dāng)于根據(jù)分區(qū)器生成一個(gè)新的ShuffledRDD讶踪。

圖18中的方框代表RDD分區(qū)芯侥。 通過新的分區(qū)策略將原來在不同分區(qū)的V1、 V2數(shù)據(jù)都合并到了一個(gè)分區(qū)乳讥。


    圖18  partitionBy算子對(duì)RDD轉(zhuǎn)換


(19)Cogroup

cogroup函數(shù)將兩個(gè)RDD進(jìn)行協(xié)同劃分柱查,cogroup函數(shù)的定義如下。

cogroup[W](other: RDD[(K云石, W)]唉工, numPartitions: Int): RDD[(K, (Iterable[V]汹忠, Iterable[W]))]

對(duì)在兩個(gè)RDD中的Key-Value類型的元素淋硝,每個(gè)RDD相同Key的元素分別聚合為一個(gè)集合,并且返回兩個(gè)RDD中對(duì)應(yīng)Key的元素集合的迭代器宽菜。

(K谣膳, (Iterable[V], Iterable[W]))

其中赋焕,Key和Value参歹,Value是兩個(gè)RDD下相同Key的兩個(gè)數(shù)據(jù)集合的迭代器所構(gòu)成的元組。

圖19中的大方框代表RDD隆判,大方框內(nèi)的小方框代表RDD中的分區(qū)犬庇。 將RDD1中的數(shù)據(jù)(U1僧界,1)、 (U1臭挽,2)和RDD2中的數(shù)據(jù)(U1捂襟,2)合并為(U1,((1欢峰,2)葬荷,(2)))。

        圖19 ?Cogroup算子對(duì)RDD轉(zhuǎn)換


(20)?join

join 對(duì)兩個(gè)需要連接的 RDD 進(jìn)行 cogroup函數(shù)操作纽帖,將相同 key 的數(shù)據(jù)能夠放到一個(gè)分區(qū)宠漩,在 cogroup 操作之后形成的新 RDD 對(duì)每個(gè)key 下的元素進(jìn)行笛卡爾積的操作,返回的結(jié)果再展平懊直,對(duì)應(yīng) key 下的所有元組形成一個(gè)集合扒吁。最后返回 RDD[(K, (V室囊, W))]雕崩。

下 面 代 碼 為 join 的 函 數(shù) 實(shí) 現(xiàn), 本 質(zhì) 是通 過 cogroup 算 子 先 進(jìn) 行 協(xié) 同 劃 分融撞, 再 通 過flatMapValues 將合并的數(shù)據(jù)打散盼铁。

this.cogroup(other,partitioner).f latMapValues{case(vs,ws) =>?for(v<-vs;w<-ws)yield(v,w) }

圖 20是對(duì)兩個(gè) RDD 的 join 操作示意圖。大方框代表 RDD尝偎,小方框代表 RDD 中的分區(qū)饶火。函數(shù)對(duì)相同 key 的元素,如 V1 為 key 做連接后結(jié)果為 (V1,(1,1)) 和 (V1,(1,2))冬念。

                    圖 20 ? join 算子對(duì) RDD 轉(zhuǎn)換


(21)eftOutJoinrightOutJoin

LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空趁窃,如果為空牧挣,則填充為空急前。 如果不為空,則將數(shù)據(jù)進(jìn)行連接運(yùn)算瀑构,并

返回結(jié)果裆针。

下面代碼是leftOutJoin的實(shí)現(xiàn)。

if (ws.isEmpty) {

vs.map(v => (v寺晌, None))

} else {

for (v <- vs世吨; w <- ws) yield (v, Some(w))

}


2. Actions 算子

本質(zhì)上在 Action 算子中通過 SparkContext 進(jìn)行了提交作業(yè)的 runJob 操作呻征,觸發(fā)了RDD DAG 的執(zhí)行耘婚。

例如, Action 算子 collect 函數(shù)的代碼如下陆赋,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖析:

/**

* Return an array that contains all of the elements in this RDD.

*/

def collect(): Array[T] = {

/* 提交 Job*/

val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

(22)?foreach

foreach 對(duì) RDD 中的每個(gè)元素都應(yīng)用 f 函數(shù)操作沐祷,不返回 RDD 和 Array嚷闭, 而是返回Uint。圖22表示 foreach 算子通過用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作赖临。本例中自定義函數(shù)為 println()胞锰,控制臺(tái)打印所有數(shù)據(jù)項(xiàng)。

      圖 22 foreach 算子對(duì) RDD 轉(zhuǎn)換


(23)?saveAsTextFile

函數(shù)將數(shù)據(jù)輸出兢榨,存儲(chǔ)到 HDFS 的指定目錄嗅榕。

下面為 saveAsTextFile 函數(shù)的內(nèi)部實(shí)現(xiàn),其內(nèi)部

通過調(diào)用 saveAsHadoopFile 進(jìn)行實(shí)現(xiàn):

this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

將 RDD 中的每個(gè)元素映射轉(zhuǎn)變?yōu)?(null吵聪, x.toString)凌那,然后再將其寫入 HDFS。

圖 23中左側(cè)方框代表 RDD 分區(qū)吟逝,右側(cè)方框代表 HDFS 的 Block案怯。通過函數(shù)將RDD 的每個(gè)分區(qū)存儲(chǔ)為 HDFS 中的一個(gè) Block。

            圖 23 ? saveAsHadoopFile 算子對(duì) RDD 轉(zhuǎn)換


(24)saveAsObjectFile

saveAsObjectFile將分區(qū)中的每10個(gè)元素組成一個(gè)Array澎办,然后將這個(gè)Array序列化嘲碱,映射為(Null,BytesWritable(Y))的元素局蚀,寫入HDFS為SequenceFile的格式麦锯。

下面代碼為函數(shù)內(nèi)部實(shí)現(xiàn)。

map(x=>(NullWritable.get()琅绅,new BytesWritable(Utils.serialize(x))))

圖24中的左側(cè)方框代表RDD分區(qū)扶欣,右側(cè)方框代表HDFS的Block。 通過函數(shù)將RDD的每個(gè)分區(qū)存儲(chǔ)為HDFS上的一個(gè)Block千扶。

            圖24 saveAsObjectFile算子對(duì)RDD轉(zhuǎn)換


(25)?collect

collect 相當(dāng)于 toArray料祠, toArray 已經(jīng)過時(shí)不推薦使用, collect 將分布式的 RDD 返回為一個(gè)單機(jī)的 scala Array 數(shù)組澎羞。在這個(gè)數(shù)組上運(yùn)用 scala 的函數(shù)式操作髓绽。

圖 25中左側(cè)方框代表 RDD 分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組妆绞。通過函數(shù)操作顺呕,將結(jié)果返回到 Driver 程序所在的節(jié)點(diǎn),以數(shù)組形式存儲(chǔ)括饶。

  圖 25 ? Collect 算子對(duì) RDD 轉(zhuǎn)換


(26)collectAsMap

collectAsMap對(duì)(K株茶,V)型的RDD數(shù)據(jù)返回一個(gè)單機(jī)HashMap。 對(duì)于重復(fù)K的RDD元素图焰,后面的元素覆蓋前面的元素启盛。

圖26中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)數(shù)組。 數(shù)據(jù)通過collectAsMap函數(shù)返回給Driver程序計(jì)算結(jié)果僵闯,結(jié)果以HashMap形式存儲(chǔ)笤闯。


          圖26 CollectAsMap算子對(duì)RDD轉(zhuǎn)換


(27)reduceByKeyLocally

  實(shí)現(xiàn)的是先reduce再collectAsMap的功能,先對(duì)RDD的整體進(jìn)行reduce操作棍厂,然后再收集所有結(jié)果返回為一個(gè)HashMap颗味。


(28)lookup

下面代碼為lookup的聲明。

lookup(key:K):Seq[V]

Lookup函數(shù)對(duì)(Key牺弹,Value)型的RDD操作浦马,返回指定Key對(duì)應(yīng)的元素形成的Seq。 這個(gè)函數(shù)處理優(yōu)化的部分在于张漂,如果這個(gè)RDD包含分區(qū)器晶默,則只會(huì)對(duì)應(yīng)處理K所在的分區(qū),然后返回由(K航攒,V)形成的Seq磺陡。 如果RDD不包含分區(qū)器,則需要對(duì)全RDD元素進(jìn)行暴力掃描處理漠畜,搜索指定K對(duì)應(yīng)的元素币他。

圖28中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表Seq憔狞,最后結(jié)果返回到Driver所在節(jié)點(diǎn)的應(yīng)用中蝴悉。

      圖28 ?lookup對(duì)RDD轉(zhuǎn)換


(29)?count

count 返回整個(gè) RDD 的元素個(gè)數(shù)。

內(nèi)部函數(shù)實(shí)現(xiàn)為:

defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

圖 29中瘾敢,返回?cái)?shù)據(jù)的個(gè)數(shù)為 5拍冠。一個(gè)方塊代表一個(gè) RDD 分區(qū)。

    ?圖29 count 對(duì) RDD 算子轉(zhuǎn)換


(30)top

top可返回最大的k個(gè)元素簇抵。 函數(shù)定義如下庆杜。

top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函數(shù)說明如下。

·top返回最大的k個(gè)元素碟摆。

·take返回最小的k個(gè)元素晃财。

·takeOrdered返回最小的k個(gè)元素,并且在返回的數(shù)組中保持元素的順序焦履。

·first相當(dāng)于top(1)返回整個(gè)RDD中的前k個(gè)元素拓劝,可以定義排序的方式Ordering[T]雏逾。

返回的是一個(gè)含前k個(gè)元素的數(shù)組嘉裤。


(31)reduce

reduce函數(shù)相當(dāng)于對(duì)RDD中的元素進(jìn)行reduceLeft函數(shù)的操作。 函數(shù)實(shí)現(xiàn)如下栖博。

Some(iter.reduceLeft(cleanF))

reduceLeft先對(duì)兩個(gè)元素進(jìn)行reduce函數(shù)操作屑宠,然后將結(jié)果和迭代器取出的下一個(gè)元素進(jìn)行reduce函數(shù)操作,直到迭代器遍歷完所有元素仇让,得到最后結(jié)果典奉。在RDD中躺翻,先對(duì)每個(gè)分區(qū)中的所有元素的集合分別進(jìn)行reduceLeft。 每個(gè)分區(qū)形成的結(jié)果相當(dāng)于一個(gè)元素卫玖,再對(duì)這個(gè)結(jié)果集合進(jìn)行reduceleft操作公你。

例如:用戶自定義函數(shù)如下。

f:(A假瞬,B)=>(A._1+”@”+B._1陕靠,A._2+B._2)

圖31中的方框代表一個(gè)RDD分區(qū),通過用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運(yùn)算脱茉。 示例

最后的返回結(jié)果為V1@[1]V2U剪芥!@U2@U3@U4,12琴许。


圖31 reduce算子對(duì)RDD轉(zhuǎn)換


(32)fold

fold和reduce的原理相同税肪,但是與reduce不同,相當(dāng)于每個(gè)reduce時(shí)榜田,迭代器取的第一個(gè)元素是zeroValue益兄。

圖32中通過下面的用戶自定義函數(shù)進(jìn)行fold運(yùn)算,圖中的一個(gè)方框代表一個(gè)RDD分區(qū)箭券。 讀者可以參照reduce函數(shù)理解偏塞。

fold((”V0@”,2))( (A邦鲫,B)=>(A._1+”@”+B._1灸叼,A._2+B._2))

          圖32 ?fold算子對(duì)RDD轉(zhuǎn)換


(33)aggregate

aggregate先對(duì)每個(gè)分區(qū)的所有元素進(jìn)行aggregate操作,再對(duì)分區(qū)的結(jié)果進(jìn)行fold操作庆捺。

aggreagate與fold和reduce的不同之處在于古今,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集,這種聚集是并行化的滔以。 而在fold和reduce函數(shù)的運(yùn)算過程中捉腥,每個(gè)分區(qū)中需要進(jìn)行串行處理,每個(gè)分區(qū)串行計(jì)算完結(jié)果你画,結(jié)果再按之前的方式進(jìn)行聚集抵碟,并返回最終聚集結(jié)果。

函數(shù)的定義如下坏匪。

aggregate[B](z: B)(seqop: (B拟逮,A) => B,combop: (B适滓,B) => B): B

圖33通過用戶自定義函數(shù)對(duì)RDD 進(jìn)行aggregate的聚集操作敦迄,圖中的每個(gè)方框代表一個(gè)RDD分區(qū)。

rdd.aggregate(”V0@”,2)((A罚屋,B)=>(A._1+”@”+B._1苦囱,A._2+B._2)),(A脾猛,B)=>(A._1+”@”+B_1撕彤,A._@+B_.2))

最后,介紹兩個(gè)計(jì)算模型中的兩個(gè)特殊變量猛拴。

廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表喉刘,以及廣播大變量等場(chǎng)景。 這些數(shù)據(jù)集合在單節(jié)點(diǎn)內(nèi)存能夠容納漆弄,不需要像RDD那樣在節(jié)點(diǎn)之間打散存儲(chǔ)睦裳。

Spark運(yùn)行時(shí)把廣播變量數(shù)據(jù)發(fā)到各個(gè)節(jié)點(diǎn),并保存下來撼唾,后續(xù)計(jì)算可以復(fù)用廉邑。 相比Hadoo的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享倒谷。 Broadcast的底層實(shí)現(xiàn)采用了BT機(jī)制蛛蒙。

圖33 ?aggregate算子對(duì)RDD轉(zhuǎn)換

②代表V。

③代表U渤愁。

accumulator變量:允許做全局累加操作牵祟,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運(yùn)行指標(biāo)的情景。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抖格,一起剝皮案震驚了整個(gè)濱河市诺苹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌雹拄,老刑警劉巖丽惭,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔚携,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡徽龟,警方通過查閱死者的電腦和手機(jī)担映,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門诲侮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來饶辙,“玉大人耳幢,你說我怎么就攤上這事〗疲” “怎么了念祭?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)绷蹲。 經(jīng)常有香客問我棒卷,道長(zhǎng)顾孽,這世上最難降的妖魔是什么祝钢? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任比规,我火速辦了婚禮,結(jié)果婚禮上拦英,老公的妹妹穿的比我還像新娘蜒什。我一直安慰自己,他們只是感情好疤估,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布灾常。 她就那樣靜靜地躺著,像睡著了一般铃拇。 火紅的嫁衣襯著肌膚如雪钞瀑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天慷荔,我揣著相機(jī)與錄音雕什,去河邊找鬼。 笑死显晶,一個(gè)胖子當(dāng)著我的面吹牛贷岸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播磷雇,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼偿警,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了唯笙?” 一聲冷哼從身側(cè)響起螟蒸,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎崩掘,沒想到半個(gè)月后尿庐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡呢堰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年抄瑟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片枉疼。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡皮假,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出骂维,到底是詐尸還是另有隱情惹资,我是刑警寧澤,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布航闺,位于F島的核電站褪测,受9級(jí)特大地震影響猴誊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜侮措,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一懈叹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧分扎,春花似錦澄成、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至菲饼,卻和暖如春肾砂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宏悦。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工镐确, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肛根。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓辫塌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親派哲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子臼氨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容