(轉(zhuǎn))Spark常用算子講解

Spark的算子的分類

   從大方向來說四苇,Spark 算子大致可以分為以下兩類:

1)Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè)厅缺,完成作業(yè)中間過程處理鞋囊。

     Transformation 操作是延遲計算的甸陌,也就是說從一個RDD 轉(zhuǎn)換生成另一個 RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行峡碉,需要等到有 Action 操作的時候才會真正觸發(fā)運算近哟。

2)Action 行動算子:這類算子會觸發(fā) SparkContext 提交 Job 作業(yè)。

   ?  Action 算子會觸發(fā) Spark 提交作業(yè)(Job)鲫寄,并將數(shù)據(jù)輸出 Spark系統(tǒng)吉执。


  從小方向來說,Spark 算子大致可以分為以下三類:

1)Value數(shù)據(jù)類型的Transformation算子地来,這種變換并不觸發(fā)提交作業(yè)戳玫,針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù)。

2)Key-Value數(shù)據(jù)類型的Transfromation算子未斑,這種變換并不觸發(fā)提交作業(yè)咕宿,針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對。

  3)Action算子颂碧,這類算子會觸發(fā)SparkContext提交Job作業(yè)荠列。



1)Value數(shù)據(jù)類型的Transformation算子

  一、輸入分區(qū)與輸出分區(qū)一對一型

    1载城、map算子

    2肌似、flatMap算子

    3、mapPartitions算子

    4诉瓦、glom算子

  二川队、輸入分區(qū)與輸出分區(qū)多對一型

    5力细、union算子

    6、cartesian算子

  三固额、輸入分區(qū)與輸出分區(qū)多對多型

    7眠蚂、grouBy算子

  四、輸出分區(qū)為輸入分區(qū)子集型

    8斗躏、filter算子

    9逝慧、distinct算子

    10、subtract算子

    11啄糙、sample算子

? ? ?   12笛臣、takeSample算子

?  五、Cache型

    13隧饼、cache算子  

    14沈堡、persist算子


2)Key-Value數(shù)據(jù)類型的Transfromation算子

  一、輸入分區(qū)與輸出分區(qū)一對一

    15燕雁、mapValues算子

  二诞丽、對單個RDD或兩個RDD聚集

   單個RDD聚集

    16、combineByKey算子

    17拐格、reduceByKey算子

    18僧免、partitionBy算子

?  兩個RDD聚集

 19、Cogroup算子

  三禁荒、連接

    20猬膨、join算子

    21、leftOutJoin和 rightOutJoin算子



?3)Action算子

  一呛伴、無輸出

22勃痴、foreach算子

  二、HDFS

    23热康、saveAsTextFile算子

    24沛申、saveAsObjectFile算子

  三、Scala集合和數(shù)據(jù)類型

    25姐军、collect算子

    26铁材、collectAsMap算子

27、reduceByKeyLocally算子

  ?28奕锌、lookup算子

    29著觉、count算子

    30、top算子

    31惊暴、reduce算子

    32饼丘、fold算子

    33、aggregate算子



??1. Transformations 算子

(1)?map

將原來 RDD 的每個數(shù)據(jù)項通過?map 中的用戶自定義函數(shù) f?映射轉(zhuǎn)變?yōu)橐粋€新的元素辽话。源碼中 map 算子相當(dāng)于初始化一個 RDD肄鸽, 新 RDD 叫做 MappedRDD(this, sc.clean(f))卫病。

圖 1中每個方框表示一個 RDD 分區(qū),左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù) f:T->U?映射為右側(cè)的新 RDD 分區(qū)典徘。但是蟀苛,實際只有等到 Action算子觸發(fā)后,這個 f 函數(shù)才會和其他函數(shù)在一個stage 中對數(shù)據(jù)進(jìn)行運算逮诲。在圖 1 中的第一個分區(qū)帜平,數(shù)據(jù)記錄 V1 輸入 f,通過 f 轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄 V’1汛骂。

      圖1 ? ?map 算子對 RDD 轉(zhuǎn)換


(2)?flatMap

將原來 RDD 中的每個元素通過函數(shù) f 轉(zhuǎn)換為新的元素罕模,并將生成的 RDD 的每個集合中的元素合并為一個集合,內(nèi)部創(chuàng)建 FlatMappedRDD(this帘瞭,sc.clean(f))。

圖 2 表 示 RDD 的 一 個 分 區(qū) 蒿讥,進(jìn) 行 flatMap函 數(shù) 操 作蝶念, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U,?T和 U 可以是任意的數(shù)據(jù)類型芋绸。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù) f 轉(zhuǎn)換為新的數(shù)據(jù)媒殉。外部大方框可以認(rèn)為是一個 RDD 分區(qū),小方框代表一個集合摔敛。 V1廷蓉、 V2、 V3 在一個集合作為 RDD 的一個數(shù)據(jù)項马昙,可能存儲為數(shù)組或其他容器桃犬,轉(zhuǎn)換為V’1、 V’2行楞、 V’3 后攒暇,將原來的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項子房。

        圖2 ? ? flapMap 算子對 RDD 轉(zhuǎn)換

(3)?mapPartitions

mapPartitions 函 數(shù) 獲 取 到 每 個 分 區(qū) 的 迭 代器形用,在 函 數(shù) 中 通 過 這 個 分 區(qū) 整 體 的 迭 代 器 對整 個 分 區(qū) 的 元 素 進(jìn) 行 操 作。 內(nèi) 部 實 現(xiàn) 是 生 成

MapPartitionsRDD证杭。圖 3 中的方框代表一個 RDD 分區(qū)田度。圖 3 中,用戶通過函數(shù) f (iter)=>iter.f ilter(_>=3) 對分區(qū)中所有數(shù)據(jù)進(jìn)行過濾解愤,大于和等于 3 的數(shù)據(jù)保留镇饺。一個方塊代表一個 RDD 分區(qū),含有 1琢歇、 2兰怠、 3 的分區(qū)過濾只剩下元素 3梦鉴。

    圖3 ?mapPartitions 算子對 RDD 轉(zhuǎn)換


(4)glom

glom函數(shù)將每個分區(qū)形成一個數(shù)組,內(nèi)部實現(xiàn)是返回的GlommedRDD揭保。 圖4中的每個方框代表一個RDD分區(qū)肥橙。圖4中的方框代表一個分區(qū)。 該圖表示含有V1秸侣、 V2存筏、 V3的分區(qū)通過函數(shù)glom形成一數(shù)組Array[(V1),(V2)味榛,(V3)]椭坚。

      圖 4 ??glom算子對RDD轉(zhuǎn)換


(5)?union

使用 union 函數(shù)時需要保證兩個 RDD 元素的數(shù)據(jù)類型相同,返回的 RDD 數(shù)據(jù)類型和被合并的 RDD 元素數(shù)據(jù)類型相同搏色,并不進(jìn)行去重操作善茎,保存所有元素。如果想去重

可以使用 distinct()频轿。同時 Spark 還提供更為簡潔的使用 union 的 API垂涯,通過 ++ 符號相當(dāng)于 union 函數(shù)操作。

圖 5 中左側(cè)大方框代表兩個 RDD航邢,大方框內(nèi)的小方框代表 RDD 的分區(qū)耕赘。右側(cè)大方框代表合并后的 RDD,大方框內(nèi)的小方框代表分區(qū)膳殷。

含有V1操骡、V2、U1赚窃、U2册招、U3、U4的RDD和含有V1考榨、V8跨细、U5、U6河质、U7冀惭、U8的RDD合并所有元素形成一個RDD。V1掀鹅、V1散休、V2、V8形成一個分區(qū)乐尊,U1戚丸、U2、U3、U4限府、U5夺颤、U6、U7胁勺、U8形成一個分區(qū)世澜。

圖 5 ?union 算子對 RDD 轉(zhuǎn)換

(6)?cartesian

對 兩 個 RDD 內(nèi) 的 所 有 元 素?進(jìn) 行 笛 卡 爾 積 操 作。 操 作 后署穗, 內(nèi) 部 實 現(xiàn) 返 回CartesianRDD寥裂。圖6中左側(cè)大方框代表兩個 RDD,大方框內(nèi)的小方框代表 RDD 的分區(qū)案疲。右側(cè)大方框代表合并后的 RDD封恰,大方框內(nèi)的小方框代表分區(qū)。圖6中的大方框代表RDD褐啡,大方框中的小方框代表RDD分區(qū)诺舔。

例 如: V1 和 另 一 個 RDD 中 的 W1、 W2春贸、 Q5 進(jìn) 行 笛 卡 爾 積 運 算 形 成 (V1,W1)混萝、(V1,W2)、 (V1,Q5)萍恕。

? ? ? ?圖 6 ?cartesian 算子對 RDD 轉(zhuǎn)換


(7)?groupBy

groupBy :將元素通過函數(shù)生成相應(yīng)的 Key,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式车要,之后將 Key 相同的元素分為一組允粤。

函數(shù)實現(xiàn)如下:

1)將用戶函數(shù)預(yù)處理:

val cleanF = sc.clean(f)

2)對數(shù)據(jù) map 進(jìn)行函數(shù)操作,最后再進(jìn)行 groupByKey 分組操作翼岁。

this.map(t => (cleanF(t), t)).groupByKey(p)

其中类垫, p 確定了分區(qū)個數(shù)和分區(qū)函數(shù),也就決定了并行化的程度琅坡。

圖7 中方框代表一個 RDD 分區(qū)悉患,相同key 的元素合并到一個組。例如 V1 和 V2 合并為 V榆俺, Value 為 V1,V2售躁。形成 V,Seq(V1,V2)。

圖 7?groupBy 算子對 RDD 轉(zhuǎn)換


(8)?filter

filter 函數(shù)功能是對元素進(jìn)行過濾茴晋,對每個 元 素 應(yīng) 用 f 函 數(shù)陪捷, 返 回 值 為 true 的 元 素 在RDD 中保留,返回值為 false 的元素將被過濾掉诺擅。 內(nèi) 部 實 現(xiàn) 相 當(dāng) 于 生 成 FilteredRDD(this市袖,sc.clean(f))。

下面代碼為函數(shù)的本質(zhì)實現(xiàn):

deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

圖 8 中每個方框代表一個 RDD 分區(qū)烁涌, T 可以是任意的類型苍碟。通過用戶自定義的過濾函數(shù) f酒觅,對每個數(shù)據(jù)項操作,將滿足條件微峰、返回結(jié)果為 true 的數(shù)據(jù)項保留舷丹。例如,過濾掉 V2 和 V3 保留了 V1县忌,為區(qū)分命名為 V’1掂榔。

圖 8 ?filter 算子對 RDD 轉(zhuǎn)換

(9)distinct

distinct將RDD中的元素進(jìn)行去重操作。圖9中的每個方框代表一個RDD分區(qū)症杏,通過distinct函數(shù)装获,將數(shù)據(jù)去重。 例如厉颤,重復(fù)數(shù)據(jù)V1穴豫、 V1去重后只保留一份V1。

    圖9 ?distinct算子對RDD轉(zhuǎn)換


(10)subtract

subtract相當(dāng)于進(jìn)行集合的差操作逼友,RDD 1去除RDD 1和RDD 2交集中的所有元素精肃。圖10中左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)帜乞。 右側(cè)大方框

代表合并后的RDD司抱,大方框內(nèi)的小方框代表分區(qū)。 V1在兩個RDD中均有黎烈,根據(jù)差集運算規(guī)則习柠,新RDD不保留,V2在第一個RDD有照棋,第二個RDD沒有资溃,則在新RDD元素中包含V2。

          圖10 ? subtract算子對RDD轉(zhuǎn)換


(11)?sample

sample 將 RDD 這個集合內(nèi)的元素進(jìn)行采樣烈炭,獲取所有元素的子集溶锭。用戶可以設(shè)定是否有放回的抽樣、百分比符隙、隨機(jī)種子趴捅,進(jìn)而決定采樣方式。內(nèi)部實現(xiàn)是生成 SampledRDD(withReplacement膏执, fraction驻售, seed)。

函數(shù)參數(shù)設(shè)置:

‰   withReplacement=true更米,表示有放回的抽樣欺栗。

‰   withReplacement=false,表示無放回的抽樣。

圖 11中 的 每 個 方 框 是 一 個 RDD 分 區(qū)迟几。 通 過 sample 函 數(shù)消请, 采 樣 50% 的 數(shù) 據(jù)。V1类腮、 V2臊泰、 U1、 U2蚜枢、U3缸逃、U4 采樣出數(shù)據(jù) V1 和 U1、 U2 形成新的 RDD厂抽。

       圖11 ?sample 算子對 RDD 轉(zhuǎn)換


(12)takeSample

takeSample()函數(shù)和上面的sample函數(shù)是一個原理需频,但是不使用相對比例采樣,而是按設(shè)定的采樣個數(shù)進(jìn)行采樣筷凤,同時返回結(jié)果不再是RDD昭殉,而是相當(dāng)于對采樣后的數(shù)據(jù)進(jìn)行

Collect(),返回結(jié)果的集合為單機(jī)的數(shù)組藐守。

圖12中左側(cè)的方框代表分布式的各個節(jié)點上的分區(qū)挪丢,右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組。 通過takeSample對數(shù)據(jù)采樣卢厂,設(shè)置為采樣一份數(shù)據(jù)乾蓬,返回結(jié)果為V1。

    圖12 ?  takeSample算子對RDD轉(zhuǎn)換


(13)?cache

cache?將 RDD 元素從磁盤緩存到內(nèi)存慎恒。 相當(dāng)于 persist(MEMORY_ONLY) 函數(shù)的功能巢块。

圖13 中每個方框代表一個 RDD 分區(qū),左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲在磁盤巧号,通過 cache 算子將數(shù)據(jù)緩存在內(nèi)存。

      圖 13 Cache 算子對 RDD 轉(zhuǎn)換


(14)?persist

persist 函數(shù)對?RDD 進(jìn)行緩存操作姥闭。數(shù)據(jù)緩存在哪里依據(jù) StorageLevel 這個枚舉類型進(jìn)行確定丹鸿。 有以下幾種類型的組合(見10), DISK 代表磁盤棚品,MEMORY 代表內(nèi)存靠欢, SER 代表數(shù)據(jù)是否進(jìn)行序列化存儲。

下面為函數(shù)定義铜跑, StorageLevel 是枚舉類型门怪,代表存儲模式,用戶可以通過圖 14-1 按需進(jìn)行選擇锅纺。

persist(newLevel:StorageLevel)

圖 14-1 中列出persist 函數(shù)可以進(jìn)行緩存的模式掷空。例如,MEMORY_AND_DISK_SER 代表數(shù)據(jù)可以存儲在內(nèi)存和磁盤,并且以序列化的方式存儲坦弟,其他同理护锤。

            圖 14-1 ?persist 算子對 RDD 轉(zhuǎn)換

  圖 14-2 中方框代表 RDD 分區(qū)。 disk 代表存儲在磁盤酿傍, mem 代表存儲在內(nèi)存烙懦。數(shù)據(jù)最初全部存儲在磁盤,通過 persist(MEMORY_AND_DISK) 將數(shù)據(jù)緩存到內(nèi)存赤炒,但是有的分區(qū)無法容納在內(nèi)存氯析,將含有 V1、 V2莺褒、 V3 的RDD存儲到磁盤掩缓,將含有U1,U2的RDD仍舊存儲在內(nèi)存癣朗。

? ? ? 圖 14-2 ? Persist 算子對 RDD 轉(zhuǎn)換


(15)?mapValues

mapValues :針對(Key拾因, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作,而不對 Key 進(jìn)行處理旷余。

? ? 圖 15 中的方框代表 RDD 分區(qū)绢记。 a=>a+2 代表對 (V1,1) 這樣的 Key Value 數(shù)據(jù)對版仔,數(shù)據(jù)只對 Value 中的 1 進(jìn)行加 2 操作傻挂,返回結(jié)果為 3。

      圖 15 ? mapValues 算子 RDD 對轉(zhuǎn)換


(16)?combineByKey

下面代碼為 combineByKey 函數(shù)的定義:

combineByKey[C](createCombiner:(V) C,

mergeValue:(C, V) C,

mergeCombiners:(C, C) C,

partitioner:Partitioner,

mapSideCombine:Boolean=true,

serializer:Serializer=null):RDD[(K,C)]

說明:

‰   createCombiner: V => C奔害, C 不存在的情況下炉旷,比如通過 V 創(chuàng)建 seq C签孔。

‰   mergeValue: (C, V) => C窘行,當(dāng) C 已經(jīng)存在的情況下饥追,需要 merge,比如把 item V

加到 seq C 中罐盔,或者疊加但绕。

mergeCombiners: (C, C) => C惶看,合并兩個 C捏顺。

‰   partitioner: Partitioner, Shuff le 時需要的 Partitioner。

‰   mapSideCombine : Boolean = true纬黎,為了減小傳輸量幅骄,很多 combine 可以在 map

端先做,比如疊加本今,可以先在一個 partition 中把所有相同的 key 的 value 疊加拆座,

再 shuff le主巍。

‰   serializerClass: String = null,傳輸需要序列化懂拾,用戶可以自定義序列化類:

例如煤禽,相當(dāng)于將元素為 (Int, Int) 的 RDD 轉(zhuǎn)變?yōu)榱?(Int岖赋, Seq[Int]) 類型元素的 RDD檬果。圖 16中的方框代表 RDD 分區(qū)。如圖唐断,通過 combineByKey选脊, 將 (V1,2), (V1,1)數(shù)據(jù)合并為( V1,Seq(2,1))脸甘。

      圖 16? comBineByKey 算子對 RDD 轉(zhuǎn)換


(17)?reduceByKey

reduceByKey 是比 combineByKey 更簡單的一種情況恳啥,只是兩個值合并成一個值,( Int丹诀, Int V)to (Int钝的, Int C),比如疊加铆遭。所以 createCombiner reduceBykey 很簡單硝桩,就是直接返回 v,而 mergeValue和 mergeCombiners 邏輯是相同的枚荣,沒有區(qū)別碗脊。

函數(shù)實現(xiàn):

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

= {

combineByKey[V]((v: V) => v, func, func, partitioner)

}

圖17中的方框代表 RDD 分區(qū)。通過用戶自定義函數(shù) (A,B) => (A + B) 函數(shù)橄妆,將相同 key 的數(shù)據(jù) (V1,2) 和 (V1,1) 的 value 相加運算衙伶,結(jié)果為( V1,3)。

        圖 17?reduceByKey 算子對 RDD 轉(zhuǎn)換


(18)partitionBy

partitionBy函數(shù)對RDD進(jìn)行分區(qū)操作害碾。

函數(shù)定義如下矢劲。

partitionBy(partitioner:Partitioner)

如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致,則不重分區(qū)慌随,如果不一致卧须,則相當(dāng)于根據(jù)分區(qū)器生成一個新的ShuffledRDD。

圖18中的方框代表RDD分區(qū)儒陨。 通過新的分區(qū)策略將原來在不同分區(qū)的V1、 V2數(shù)據(jù)都合并到了一個分區(qū)笋籽。


    圖18  partitionBy算子對RDD轉(zhuǎn)換


(19)Cogroup

cogroup函數(shù)將兩個RDD進(jìn)行協(xié)同劃分蹦漠,cogroup函數(shù)的定義如下。

cogroup[W](other: RDD[(K车海, W)]笛园, numPartitions: Int): RDD[(K隘击, (Iterable[V], Iterable[W]))]

對在兩個RDD中的Key-Value類型的元素研铆,每個RDD相同Key的元素分別聚合為一個集合埋同,并且返回兩個RDD中對應(yīng)Key的元素集合的迭代器。

(K棵红, (Iterable[V]凶赁, Iterable[W]))

其中,Key和Value逆甜,Value是兩個RDD下相同Key的兩個數(shù)據(jù)集合的迭代器所構(gòu)成的元組虱肄。

圖19中的大方框代表RDD,大方框內(nèi)的小方框代表RDD中的分區(qū)交煞。 將RDD1中的數(shù)據(jù)(U1咏窿,1)、 (U1素征,2)和RDD2中的數(shù)據(jù)(U1集嵌,2)合并為(U1,((1御毅,2)根欧,(2)))。

        圖19 ?Cogroup算子對RDD轉(zhuǎn)換


(20)?join

join 對兩個需要連接的 RDD 進(jìn)行 cogroup函數(shù)操作亚享,將相同 key 的數(shù)據(jù)能夠放到一個分區(qū)咽块,在 cogroup 操作之后形成的新 RDD 對每個key 下的元素進(jìn)行笛卡爾積的操作,返回的結(jié)果再展平欺税,對應(yīng) key 下的所有元組形成一個集合侈沪。最后返回 RDD[(K, (V晚凿, W))]亭罪。

下 面 代 碼 為 join 的 函 數(shù) 實 現(xiàn), 本 質(zhì) 是通 過 cogroup 算 子 先 進(jìn) 行 協(xié) 同 劃 分歼秽, 再 通 過flatMapValues 將合并的數(shù)據(jù)打散应役。

this.cogroup(other,partitioner).f latMapValues{case(vs,ws) =>?for(v<-vs;w<-ws)yield(v,w) }

圖 20是對兩個 RDD 的 join 操作示意圖。大方框代表 RDD燥筷,小方框代表 RDD 中的分區(qū)箩祥。函數(shù)對相同 key 的元素,如 V1 為 key 做連接后結(jié)果為 (V1,(1,1)) 和 (V1,(1,2))肆氓。

                    圖 20 ? join 算子對 RDD 轉(zhuǎn)換


(21)eftOutJoinrightOutJoin

LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空袍祖,如果為空,則填充為空谢揪。 如果不為空蕉陋,則將數(shù)據(jù)進(jìn)行連接運算捐凭,并

返回結(jié)果。

下面代碼是leftOutJoin的實現(xiàn)凳鬓。

if (ws.isEmpty) {

vs.map(v => (v茁肠, None))

} else {

for (v <- vs; w <- ws) yield (v缩举, Some(w))

}


2. Actions 算子

本質(zhì)上在 Action 算子中通過 SparkContext 進(jìn)行了提交作業(yè)的 runJob 操作垦梆,觸發(fā)了RDD DAG 的執(zhí)行。

例如蚁孔, Action 算子 collect 函數(shù)的代碼如下奶赔,感興趣的讀者可以順著這個入口進(jìn)行源碼剖析:

/**

* Return an array that contains all of the elements in this RDD.

*/

def collect(): Array[T] = {

/* 提交 Job*/

val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

(22)?foreach

foreach 對 RDD 中的每個元素都應(yīng)用 f 函數(shù)操作,不返回 RDD 和 Array杠氢, 而是返回Uint站刑。圖22表示 foreach 算子通過用戶自定義函數(shù)對每個數(shù)據(jù)項進(jìn)行操作。本例中自定義函數(shù)為 println()鼻百,控制臺打印所有數(shù)據(jù)項绞旅。

      圖 22 foreach 算子對 RDD 轉(zhuǎn)換


(23)?saveAsTextFile

函數(shù)將數(shù)據(jù)輸出,存儲到 HDFS 的指定目錄温艇。

下面為 saveAsTextFile 函數(shù)的內(nèi)部實現(xiàn)因悲,其內(nèi)部

通過調(diào)用 saveAsHadoopFile 進(jìn)行實現(xiàn):

this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

將 RDD 中的每個元素映射轉(zhuǎn)變?yōu)?(null, x.toString)勺爱,然后再將其寫入 HDFS晃琳。

圖 23中左側(cè)方框代表 RDD 分區(qū),右側(cè)方框代表 HDFS 的 Block琐鲁。通過函數(shù)將RDD 的每個分區(qū)存儲為 HDFS 中的一個 Block卫旱。

            圖 23 ? saveAsHadoopFile 算子對 RDD 轉(zhuǎn)換


(24)saveAsObjectFile

saveAsObjectFile將分區(qū)中的每10個元素組成一個Array,然后將這個Array序列化围段,映射為(Null顾翼,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式奈泪。

下面代碼為函數(shù)內(nèi)部實現(xiàn)适贸。

map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))

圖24中的左側(cè)方框代表RDD分區(qū)涝桅,右側(cè)方框代表HDFS的Block拜姿。 通過函數(shù)將RDD的每個分區(qū)存儲為HDFS上的一個Block。

            圖24 saveAsObjectFile算子對RDD轉(zhuǎn)換


(25)?collect

collect 相當(dāng)于 toArray冯遂, toArray 已經(jīng)過時不推薦使用砾隅, collect 將分布式的 RDD 返回為一個單機(jī)的 scala Array 數(shù)組。在這個數(shù)組上運用 scala 的函數(shù)式操作债蜜。

圖 25中左側(cè)方框代表 RDD 分區(qū)晴埂,右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組。通過函數(shù)操作寻定,將結(jié)果返回到 Driver 程序所在的節(jié)點儒洛,以數(shù)組形式存儲。

  圖 25 ? Collect 算子對 RDD 轉(zhuǎn)換


(26)collectAsMap

collectAsMap對(K狼速,V)型的RDD數(shù)據(jù)返回一個單機(jī)HashMap琅锻。 對于重復(fù)K的RDD元素,后面的元素覆蓋前面的元素向胡。

圖26中的左側(cè)方框代表RDD分區(qū)恼蓬,右側(cè)方框代表單機(jī)數(shù)組。 數(shù)據(jù)通過collectAsMap函數(shù)返回給Driver程序計算結(jié)果僵芹,結(jié)果以HashMap形式存儲处硬。


          圖26 CollectAsMap算子對RDD轉(zhuǎn)換


(27)reduceByKeyLocally

  實現(xiàn)的是先reduce再collectAsMap的功能,先對RDD的整體進(jìn)行reduce操作拇派,然后再收集所有結(jié)果返回為一個HashMap荷辕。


(28)lookup

下面代碼為lookup的聲明。

lookup(key:K):Seq[V]

Lookup函數(shù)對(Key件豌,Value)型的RDD操作疮方,返回指定Key對應(yīng)的元素形成的Seq。 這個函數(shù)處理優(yōu)化的部分在于茧彤,如果這個RDD包含分區(qū)器骡显,則只會對應(yīng)處理K所在的分區(qū),然后返回由(K曾掂,V)形成的Seq惫谤。 如果RDD不包含分區(qū)器,則需要對全RDD元素進(jìn)行暴力掃描處理遭殉,搜索指定K對應(yīng)的元素石挂。

圖28中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表Seq险污,最后結(jié)果返回到Driver所在節(jié)點的應(yīng)用中痹愚。

      圖28 ?lookup對RDD轉(zhuǎn)換


(29)?count

count 返回整個 RDD 的元素個數(shù)。

內(nèi)部函數(shù)實現(xiàn)為:

defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

圖 29中蛔糯,返回數(shù)據(jù)的個數(shù)為 5拯腮。一個方塊代表一個 RDD 分區(qū)。

    ?圖29 count 對 RDD 算子轉(zhuǎn)換


(30)top

top可返回最大的k個元素蚁飒。 函數(shù)定義如下动壤。

top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函數(shù)說明如下。

·top返回最大的k個元素淮逻。

·take返回最小的k個元素琼懊。

·takeOrdered返回最小的k個元素阁簸,并且在返回的數(shù)組中保持元素的順序。

·first相當(dāng)于top(1)返回整個RDD中的前k個元素哼丈,可以定義排序的方式Ordering[T]启妹。

返回的是一個含前k個元素的數(shù)組。


(31)reduce

reduce函數(shù)相當(dāng)于對RDD中的元素進(jìn)行reduceLeft函數(shù)的操作醉旦。 函數(shù)實現(xiàn)如下饶米。

Some(iter.reduceLeft(cleanF))

reduceLeft先對兩個元素進(jìn)行reduce函數(shù)操作,然后將結(jié)果和迭代器取出的下一個元素進(jìn)行reduce函數(shù)操作车胡,直到迭代器遍歷完所有元素檬输,得到最后結(jié)果。在RDD中匈棘,先對每個分區(qū)中的所有元素的集合分別進(jìn)行reduceLeft丧慈。 每個分區(qū)形成的結(jié)果相當(dāng)于一個元素,再對這個結(jié)果集合進(jìn)行reduceleft操作羹饰。

例如:用戶自定義函數(shù)如下伊滋。

f:(A,B)=>(A._1+”@”+B._1队秩,A._2+B._2)

圖31中的方框代表一個RDD分區(qū)笑旺,通過用戶自定函數(shù)f將數(shù)據(jù)進(jìn)行reduce運算。 示例

最后的返回結(jié)果為V1@[1]V2U馍资!@U2@U3@U4筒主,12。


圖31 reduce算子對RDD轉(zhuǎn)換


(32)fold

fold和reduce的原理相同鸟蟹,但是與reduce不同乌妙,相當(dāng)于每個reduce時,迭代器取的第一個元素是zeroValue建钥。

圖32中通過下面的用戶自定義函數(shù)進(jìn)行fold運算藤韵,圖中的一個方框代表一個RDD分區(qū)。 讀者可以參照reduce函數(shù)理解熊经。

fold((”V0@”泽艘,2))( (A,B)=>(A._1+”@”+B._1镐依,A._2+B._2))

          圖32 ?fold算子對RDD轉(zhuǎn)換


(33)aggregate

aggregate先對每個分區(qū)的所有元素進(jìn)行aggregate操作匹涮,再對分區(qū)的結(jié)果進(jìn)行fold操作。

aggreagate與fold和reduce的不同之處在于槐壳,aggregate相當(dāng)于采用歸并的方式進(jìn)行數(shù)據(jù)聚集然低,這種聚集是并行化的。 而在fold和reduce函數(shù)的運算過程中,每個分區(qū)中需要進(jìn)行串行處理雳攘,每個分區(qū)串行計算完結(jié)果带兜,結(jié)果再按之前的方式進(jìn)行聚集,并返回最終聚集結(jié)果吨灭。

函數(shù)的定義如下鞋真。

aggregate[B](z: B)(seqop: (B,A) => B沃于,combop: (B,B) => B): B

圖33通過用戶自定義函數(shù)對RDD 進(jìn)行aggregate的聚集操作海诲,圖中的每個方框代表一個RDD分區(qū)繁莹。

rdd.aggregate(”V0@”,2)((A特幔,B)=>(A._1+”@”+B._1咨演,A._2+B._2)),(A蚯斯,B)=>(A._1+”@”+B_1薄风,A._@+B_.2))

最后,介紹兩個計算模型中的兩個特殊變量拍嵌。

廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表遭赂,以及廣播大變量等場景。 這些數(shù)據(jù)集合在單節(jié)點內(nèi)存能夠容納横辆,不需要像RDD那樣在節(jié)點之間打散存儲撇他。

Spark運行時把廣播變量數(shù)據(jù)發(fā)到各個節(jié)點,并保存下來狈蚤,后續(xù)計算可以復(fù)用困肩。 相比Hadoo的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享脆侮。 Broadcast的底層實現(xiàn)采用了BT機(jī)制锌畸。

        圖33 ?aggregate算子對RDD轉(zhuǎn)換

②代表V。

③代表U靖避。

accumulator變量:允許做全局累加操作潭枣,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運行指標(biāo)的情景。


轉(zhuǎn)載:http://www.cnblogs.com/zlslch/p/5723857.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末筋蓖,一起剝皮案震驚了整個濱河市卸耘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌粘咖,老刑警劉巖蚣抗,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡翰铡,警方通過查閱死者的電腦和手機(jī)钝域,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锭魔,“玉大人例证,你說我怎么就攤上這事∶耘酰” “怎么了织咧?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長漠秋。 經(jīng)常有香客問我笙蒙,道長,這世上最難降的妖魔是什么庆锦? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任捅位,我火速辦了婚禮,結(jié)果婚禮上搂抒,老公的妹妹穿的比我還像新娘艇搀。我一直安慰自己,他們只是感情好求晶,可當(dāng)我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布焰雕。 她就那樣靜靜地躺著,像睡著了一般誉帅。 火紅的嫁衣襯著肌膚如雪淀散。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天蚜锨,我揣著相機(jī)與錄音档插,去河邊找鬼。 笑死亚再,一個胖子當(dāng)著我的面吹牛郭膛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播氛悬,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼则剃,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了如捅?” 一聲冷哼從身側(cè)響起棍现,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎镜遣,沒想到半個月后己肮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年谎僻,在試婚紗的時候發(fā)現(xiàn)自己被綠了娄柳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡艘绍,死狀恐怖赤拒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情诱鞠,我是刑警寧澤挎挖,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站航夺,受9級特大地震影響肋乍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜敷存,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望堪伍。 院中可真熱鬧锚烦,春花似錦、人聲如沸帝雇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽尸闸。三九已至彻亲,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吮廉,已是汗流浹背苞尝。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留宦芦,地道東北人宙址。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像调卑,于是被迫代替她去往敵國和親抡砂。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容