Transformations
map笆怠,filter
spark最長用的兩個(gè)Transformations:map,filter辐棒,下面就來介紹一下這兩個(gè)。
先看下面這張圖:
從上圖中可以清洗的看到 map和filter都是做的什么工作,那我們就代碼演示一下斤斧。
val input = sc.parallelize(List(1,2,3,4))
val result1 = input.map(x=>x*x)
val result2 = input.filter(x=>x!=1)
print(result1.collect().mkString(","))
print("\n")
print(result2.collect().mkString(","))
print("\n")
執(zhí)行結(jié)果如下:
16/08/17 18:48:31 INFO DAGScheduler: ResultStage 0 (collect at Map.scala:17) finished in 0.093 s
16/08/17 18:48:31 INFO DAGScheduler: Job 0 finished: collect at Map.scala:17, took 0.268871 s
1,4,9,16
........
16/08/17 18:48:31 INFO DAGScheduler: ResultStage 1 (collect at Map.scala:19) finished in 0.000 s
16/08/17 18:48:31 INFO DAGScheduler: Job 1 finished: collect at Map.scala:19, took 0.018291 s
2,3,4
再回頭看下上面那張圖,是不是明白什么意思了霎烙!
flatMap
另外一個(gè)常用的就是flatMap撬讽,輸入一串字符,分割出每個(gè)字符
來用代碼實(shí)踐一下:
val lines = sc.parallelize(List("hello world","hi"))
val words = lines.flatMap (lines=>lines.split(" "))
print(words.first())
print("\n")
執(zhí)行結(jié)果:
16/08/17 19:23:24 INFO DAGScheduler: Job 2 finished: first at Map.scala:24, took 0.016987 s
hello
16/08/17 19:23:24 INFO SparkContext: Invoking stop() from shutdown hook
分隔符如果改一下的話:
val words = lines.flatMap (lines=>lines.split(","))
結(jié)果會(huì)怎樣呢悬垃?
16/08/17 19:33:14 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
hello world
16/08/17 19:33:14 INFO SparkContext: Invoking stop() from shutdown hook
和想象的一樣吧~
distinct锐秦,distinct,intersection盗忱,subtract
還有幾個(gè)比較常用的:distinct,distinct羊赵,intersection趟佃,subtract
來看看代碼實(shí)踐:
val rdd1 = sc.parallelize(List("coffee","coffee","panda","monkey","tea"))
val rdd2 = sc.parallelize(List("coffee","monkey","kitty"))
rdd1.distinct().take(100).foreach(println)
結(jié)果:
16/08/17 19:52:29 INFO DAGScheduler: ResultStage 4 (take at Map.scala:30) finished in 0.047 s
16/08/17 19:52:29 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/08/17 19:52:29 INFO DAGScheduler: Job 3 finished: take at Map.scala:30, took 0.152405 s
monkey
coffee
panda
tea
16/08/17 19:52:29 INFO SparkContext: Starting job: take at Map.scala:32
代碼:
rdd1.union(rdd2).take(100).foreach(println)
結(jié)果:
6/08/17 19:52:29 INFO DAGScheduler: Job 5 finished: take at Map.scala:32, took 0.011825 s
coffee
coffee
panda
monkey
tea
coffee
monkey
kitty
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:34
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 11 (intersection at Map.scala:34)
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 12 (intersection at Map.scala:34)
代碼:
rdd1.intersection(rdd2).take(100).foreach(println)
結(jié)果:
16/08/17 19:52:30 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 9) in 31 ms on localhost (1/1)
16/08/17 19:52:30 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool
16/08/17 19:52:30 INFO DAGScheduler: ResultStage 9 (take at Map.scala:34) finished in 0.031 s
16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36
代碼:
rdd1.subtract(rdd2).take(100).foreach(println)
結(jié)果:
16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36
再看看上面的圖,很容易理解吧
Actions
常用的Transformations就介紹到這里昧捷,下面介紹下常用的Action:
reduce,countByValue,takeOrdered,takeSample,aggregate
首先看一下:reduce
val rdd5 = sc.parallelize(List(1,2,3,4))
print("reduce action:"+rdd5.reduce((x,y)=>x+y)+"\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 15 finished: reduce at Function.scala:55, took 0.012698 s
reduce action:10
16/08/18 11:51:16 INFO SparkContext: Starting job: aggregate at Function.scala:57
countByValue
print(rdd1.countByValue() + "\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 11 finished: countByValue at Function.scala:48, took 0.031726 s
Map(monkey -> 1, coffee -> 2, panda -> 1, tea -> 1)
16/08/18 11:51:16 INFO SparkContext: Starting job: takeOrdered at Function.scala:50
takeOrdered
rdd1.takeOrdered(10).take(100).foreach(println)
16/08/18 11:51:16 INFO DAGScheduler: Job 12 finished: takeOrdered at Function.scala:50, took 0.026160 s
coffee
coffee
monkey
panda
tea
16/08/18 11:51:16 INFO SparkContext: Starting job: takeSample at Function.scala:52
aggregate
這個(gè)要重點(diǎn)介紹一下:
Spark文檔中aggregate函數(shù)定義如下
def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.seqOp操作會(huì)聚合各分區(qū)中的元素闲昭,然后combOp操作把所有分區(qū)的聚合結(jié)果再次聚合,兩個(gè)操作的初始值都是zeroValue. seqOp的操作是遍歷分區(qū)中的所有元素(T)靡挥,第一個(gè)T跟zeroValue做操作序矩,結(jié)果再作為與第二個(gè)T做操作的zeroValue,直到遍歷完整個(gè)分區(qū)跋破。combOp操作是把各分區(qū)聚合的結(jié)果簸淀,再聚合。aggregate函數(shù)返回一個(gè)跟RDD不同類型的值毒返。因此租幕,需要一個(gè)操作seqOp來把分區(qū)中的元素T合并成一個(gè)U,另外一個(gè)操作combOp把所有U聚合拧簸。
val rdd5 = sc.parallelize(List(1,2,3,4))
val rdd6 = rdd5.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2+1), (x, y) =>(x._1 + y._1, x._2 + y._2))
print ("aggregate action : " + rdd6 + "\n" )
我們看一下結(jié)果:
16/08/18 11:51:16 INFO DAGScheduler: Job 16 finished: aggregate at Function.scala:57, took 0.011686 s
aggregate action : (10,4)
16/08/18 11:51:16 INFO SparkContext: Invoking stop() from shutdown hook
我們可以根據(jù)以上執(zhí)行的例子來理解aggregate 用法:
- 第一步:將rdd5中的元素與初始值遍歷進(jìn)行聚合操作
- 第二步:將初始值加1進(jìn)行遍歷聚合
- 第三步:將結(jié)果進(jìn)行聚合
- 根據(jù)本次的RDD 背部實(shí)現(xiàn)如下:
- 第一步:其實(shí)是0+1
1+2
3+3
6+4
- 然后執(zhí)行:0+1
1+1
2+1
3+1
- 此時(shí)返回(10,4)
- 本次執(zhí)行是一個(gè)節(jié)點(diǎn)劲绪,如果在集群中的話,多個(gè)節(jié)點(diǎn),會(huì)先把數(shù)據(jù)打到不同的分區(qū)上贾富,比如(1,2) (3,4)
- 得到的結(jié)果就會(huì)是(3,2) (7,2)
- 然后進(jìn)行第二步combine就得到 (10,4)
這樣你應(yīng)該能理解aggregate這個(gè)函數(shù)了吧
以上就是對(duì)常用的Transformations 和Actions介紹歉眷,對(duì)于初學(xué)者來說,動(dòng)手代碼實(shí)踐各個(gè)函數(shù)颤枪,才是明白其功能最好的方法汗捡。
PS :源碼