Spark的這些事(三)——spark常用的Transformations 和Actions

Transformations

map笆怠,filter
spark最長用的兩個(gè)Transformations:map,filter辐棒,下面就來介紹一下這兩個(gè)。

先看下面這張圖:

這里寫圖片描述

從上圖中可以清洗的看到 map和filter都是做的什么工作,那我們就代碼演示一下斤斧。

    val input = sc.parallelize(List(1,2,3,4))
    
    val result1 = input.map(x=>x*x)
    val result2 = input.filter(x=>x!=1)
    
    print(result1.collect().mkString(","))
    print("\n")
    print(result2.collect().mkString(","))
    print("\n")

執(zhí)行結(jié)果如下:

16/08/17 18:48:31 INFO DAGScheduler: ResultStage 0 (collect at Map.scala:17) finished in 0.093 s
16/08/17 18:48:31 INFO DAGScheduler: Job 0 finished: collect at Map.scala:17, took 0.268871 s
1,4,9,16
........
16/08/17 18:48:31 INFO DAGScheduler: ResultStage 1 (collect at Map.scala:19) finished in 0.000 s
16/08/17 18:48:31 INFO DAGScheduler: Job 1 finished: collect at Map.scala:19, took 0.018291 s
2,3,4

再回頭看下上面那張圖,是不是明白什么意思了霎烙!

flatMap
另外一個(gè)常用的就是flatMap撬讽,輸入一串字符,分割出每個(gè)字符

map和flatmap的區(qū)別

來用代碼實(shí)踐一下:

    val lines = sc.parallelize(List("hello world","hi"))
    val words = lines.flatMap (lines=>lines.split(" "))
    print(words.first())
    print("\n")

執(zhí)行結(jié)果:

16/08/17 19:23:24 INFO DAGScheduler: Job 2 finished: first at Map.scala:24, took 0.016987 s
hello
16/08/17 19:23:24 INFO SparkContext: Invoking stop() from shutdown hook

分隔符如果改一下的話:

val words = lines.flatMap (lines=>lines.split(","))

結(jié)果會(huì)怎樣呢悬垃?

16/08/17 19:33:14 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
hello world
16/08/17 19:33:14 INFO SparkContext: Invoking stop() from shutdown hook

和想象的一樣吧~

distinct锐秦,distinct,intersection盗忱,subtract
還有幾個(gè)比較常用的:distinct,distinct羊赵,intersection趟佃,subtract

這里寫圖片描述

來看看代碼實(shí)踐:

val rdd1 = sc.parallelize(List("coffee","coffee","panda","monkey","tea"))
    val rdd2 = sc.parallelize(List("coffee","monkey","kitty"))
    
    rdd1.distinct().take(100).foreach(println)

結(jié)果:

16/08/17 19:52:29 INFO DAGScheduler: ResultStage 4 (take at Map.scala:30) finished in 0.047 s
16/08/17 19:52:29 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
16/08/17 19:52:29 INFO DAGScheduler: Job 3 finished: take at Map.scala:30, took 0.152405 s
monkey
coffee
panda
tea
16/08/17 19:52:29 INFO SparkContext: Starting job: take at Map.scala:32

代碼:

 rdd1.union(rdd2).take(100).foreach(println)

結(jié)果:

6/08/17 19:52:29 INFO DAGScheduler: Job 5 finished: take at Map.scala:32, took 0.011825 s
coffee
coffee
panda
monkey
tea
coffee
monkey
kitty
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:34
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 11 (intersection at Map.scala:34)
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 12 (intersection at Map.scala:34)

代碼:

rdd1.intersection(rdd2).take(100).foreach(println)

結(jié)果:

16/08/17 19:52:30 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 9) in 31 ms on localhost (1/1)
16/08/17 19:52:30 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 
16/08/17 19:52:30 INFO DAGScheduler: ResultStage 9 (take at Map.scala:34) finished in 0.031 s
16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36

代碼:

rdd1.subtract(rdd2).take(100).foreach(println)

結(jié)果:

16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36

再看看上面的圖,很容易理解吧

Actions

常用的Transformations就介紹到這里昧捷,下面介紹下常用的Action:
reduce,countByValue,takeOrdered,takeSample,aggregate

首先看一下:reduce

    val rdd5 = sc.parallelize(List(1,2,3,4))
    print("reduce action:"+rdd5.reduce((x,y)=>x+y)+"\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 15 finished: reduce at Function.scala:55, took 0.012698 s
reduce action:10
16/08/18 11:51:16 INFO SparkContext: Starting job: aggregate at Function.scala:57

countByValue

print(rdd1.countByValue() + "\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 11 finished: countByValue at Function.scala:48, took 0.031726 s
Map(monkey -> 1, coffee -> 2, panda -> 1, tea -> 1)
16/08/18 11:51:16 INFO SparkContext: Starting job: takeOrdered at Function.scala:50

takeOrdered

rdd1.takeOrdered(10).take(100).foreach(println)
16/08/18 11:51:16 INFO DAGScheduler: Job 12 finished: takeOrdered at Function.scala:50, took 0.026160 s
coffee
coffee
monkey
panda
tea
16/08/18 11:51:16 INFO SparkContext: Starting job: takeSample at Function.scala:52

aggregate
這個(gè)要重點(diǎn)介紹一下:

Spark文檔中aggregate函數(shù)定義如下
def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

seqOp操作會(huì)聚合各分區(qū)中的元素闲昭,然后combOp操作把所有分區(qū)的聚合結(jié)果再次聚合,兩個(gè)操作的初始值都是zeroValue. seqOp的操作是遍歷分區(qū)中的所有元素(T)靡挥,第一個(gè)T跟zeroValue做操作序矩,結(jié)果再作為與第二個(gè)T做操作的zeroValue,直到遍歷完整個(gè)分區(qū)跋破。combOp操作是把各分區(qū)聚合的結(jié)果簸淀,再聚合。aggregate函數(shù)返回一個(gè)跟RDD不同類型的值毒返。因此租幕,需要一個(gè)操作seqOp來把分區(qū)中的元素T合并成一個(gè)U,另外一個(gè)操作combOp把所有U聚合拧簸。

val rdd5 = sc.parallelize(List(1,2,3,4))
val rdd6 = rdd5.aggregate((0, 0))  ((x, y) =>(x._1 + y, x._2+1),  (x, y) =>(x._1 + y._1, x._2 + y._2))
    print ("aggregate action : " + rdd6 + "\n"  )

我們看一下結(jié)果:

16/08/18 11:51:16 INFO DAGScheduler: Job 16 finished: aggregate at Function.scala:57, took 0.011686 s
aggregate action : (10,4)
16/08/18 11:51:16 INFO SparkContext: Invoking stop() from shutdown hook

我們可以根據(jù)以上執(zhí)行的例子來理解aggregate 用法:

  • 第一步:將rdd5中的元素與初始值遍歷進(jìn)行聚合操作
    • 第二步:將初始值加1進(jìn)行遍歷聚合
    • 第三步:將結(jié)果進(jìn)行聚合
    • 根據(jù)本次的RDD 背部實(shí)現(xiàn)如下:
    • 第一步:其實(shí)是0+1
    •                1+2
      
    •                3+3
      
    •                6+4
      
    • 然后執(zhí)行:0+1
    •            1+1
      
    •            2+1
      
    •            3+1
      
    • 此時(shí)返回(10,4)
    • 本次執(zhí)行是一個(gè)節(jié)點(diǎn)劲绪,如果在集群中的話,多個(gè)節(jié)點(diǎn),會(huì)先把數(shù)據(jù)打到不同的分區(qū)上贾富,比如(1,2) (3,4)
    • 得到的結(jié)果就會(huì)是(3,2) (7,2)
    • 然后進(jìn)行第二步combine就得到 (10,4)

這樣你應(yīng)該能理解aggregate這個(gè)函數(shù)了吧

以上就是對(duì)常用的Transformations 和Actions介紹歉眷,對(duì)于初學(xué)者來說,動(dòng)手代碼實(shí)踐各個(gè)函數(shù)颤枪,才是明白其功能最好的方法汗捡。

PS :源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市汇鞭,隨后出現(xiàn)的幾起案子凉唐,更是在濱河造成了極大的恐慌,老刑警劉巖霍骄,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件台囱,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡读整,警方通過查閱死者的電腦和手機(jī)簿训,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來米间,“玉大人强品,你說我怎么就攤上這事∏” “怎么了的榛?”我有些...
    開封第一講書人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長逻锐。 經(jīng)常有香客問我夫晌,道長,這世上最難降的妖魔是什么昧诱? 我笑而不...
    開封第一講書人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任晓淀,我火速辦了婚禮,結(jié)果婚禮上盏档,老公的妹妹穿的比我還像新娘凶掰。我一直安慰自己,他們只是感情好蜈亩,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開白布懦窘。 她就那樣靜靜地躺著,像睡著了一般勺拣。 火紅的嫁衣襯著肌膚如雪奶赠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,156評(píng)論 1 308
  • 那天药有,我揣著相機(jī)與錄音毅戈,去河邊找鬼苹丸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛苇经,可吹牛的內(nèi)容都是我干的赘理。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼扇单,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼商模!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蜘澜,我...
    開封第一講書人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤施流,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后鄙信,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瞪醋,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年装诡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了银受。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鸦采,死狀恐怖宾巍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情渔伯,我是刑警寧澤顶霞,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站锣吼,受9級(jí)特大地震影響确丢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吐限,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望褂始。 院中可真熱鬧诸典,春花似錦、人聲如沸崎苗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胆数。三九已至肌蜻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間必尼,已是汗流浹背蒋搜。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來泰國打工篡撵, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人豆挽。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓育谬,卻偏偏與公主長得像,于是被迫代替她去往敵國和親帮哈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子膛檀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容