1. 常用函數(shù)
- takeWhile
# takeWhile是從第一個元素開始贡珊,取滿足條件的元素,直到不滿足為止
val s1 = List(1,2,3,4,10,20,30,40,5,6,7,8,50,60,70,80)
val r1 = s1.takeWhile( _ < 10)
r1: List[Int] = List(1, 2, 3, 4)
- Iterator類型的drop函數(shù)
val it = List.range(0, 10, 2).map {i => i.toString}
it.drop(1).zip(it.dropRight(1))
- List
# List add Element
it3 :+ (1000,2000) # 向末尾加
it3 :: (1000,2000) # 向頭部加
- reduceByKey
# reduce不按map順序執(zhí)行, 可以使用groupBy
- cogroup | join | groupByKey 區(qū)別
Join() returns an dataset of [key, leftValue, rightValue], where [key, leftValue] comes from one dataset, and [key, rightValue] from the other dataset.
CoGroup() returns an dataset of [key, leftValues, rightValues], where [key, leftValue] entries from one dataset are group together into [key, leftValues], and [key, rightValue] from the other dataset are grouped into [key, rightValues], and both grouped entries are combined into [key, leftValues, rightValues].
GroupByKey() returns an dataset of [key, values], where [key, value] entries from one dataset are group together.
Join(), GroupByKey() and CoGroup() all depend on Partition(). Both of the input datasets should be partitioned by the same key, and partitioned to the same number of shards. Otherwise, a relatively costly partitioning will be performed.
join過程包含cogroup和flatmap兩個過程, 如下圖:
引自: join(otherRDD, numPartitions)
#After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once.
#In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable
#(e.g. if the variable is shipped to a new node later).
#即: 廣播變量允許程序員將一個只讀的變量緩存在每臺機(jī)器上驾凶,而不用在任務(wù)之間傳遞變量。
# 廣播變量可被用于有效地給每個節(jié)點一個大輸入數(shù)據(jù)集的副本惋砂。
# Spark還嘗試使用高效地廣播算法來分發(fā)變量拂募,進(jìn)而減少通信的開銷。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int} = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
- foldLeft
左側(cè)累計
0為初始值(記住numbers是List[Int]類型)叁征,m作為一個累加器尸红。
直接觀察運行過程:
scala> numbers.foldLeft(0) { (m: Int, n: Int) => println("m: " + m + " n: " + n); m + n }
m: 0 n: 1
m: 1 n: 2
m: 3 n: 3
m: 6 n: 4
m: 10 n: 5
m: 15 n: 6
m: 21 n: 7
m: 28 n: 8
m: 36 n: 9
m: 45 n: 10
res0: Int = 55
- Option
scala> val myMap: Map[String, (String, Boolean)] = Map("key1" -> ("value", true))
myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
scala> val vs = myMap.get("key1")
vs: Option[(String, Boolean)] = Some((value,true))
# 以上是元組方式吱涉,取出元組中數(shù)據(jù),方式如下
# 方法一:
val (v2, s2) = vs match {
case Some((v,s)) => (v, s)
case _ => ("null", "null")
}
#方法二:
#如果被map的元素個數(shù)是0外里,就不執(zhí)行map怎爵,但是可以執(zhí)行map之后的函數(shù),如下:
val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse((null, null))
# val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse(("null", "null"))
#注意:方法二盅蝗,null不是string鳖链,后面s2不能調(diào)用關(guān)于String的方法, 關(guān)于null的類型轉(zhuǎn)化墩莫,以下例子幫助理解
# null不能調(diào)用toString, 但None是可以的
scala> null.toString
java.lang.NullPointerException
scala> None.toString
res42: String = None
# null的類型芙委,及其使用:
scala> "null"
res38: String = null
scala> null
res39: Null = null
scala> null.asInstanceOf[String]
res40: String = null
scala> Array("a",null).mkString(",")
res41: String = a,null
- Option[Boolean]
scala> val myMap: Map[String, (String, Boolean)] = Map("key1" -> true)
myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
scala> val myMap2 = myMap + ("k2" -> false)
// 體會以下區(qū)別, 返回值
scala> myMap2.get("k8").map(_.toString).getOrElse(null)
res160: String = null
scala> myMap2.get("k8").getOrElse(null)
res161: Any = null
- HashMap
scala> val map1 = mutable.HashMap[String, String]()
map1: scala.collection.mutable.HashMap[String,String] = Map()
scala> map1.put("a1","aa1")
res104: Option[String] = None
scala> map1
res105: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1)
scala> map1("a2") = "aa2"
scala> map1
res108: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1, a2 -> aa2)
- immutable.Map
// myMap 是immutable, 即不可改變的Map, 不能對其增加元素
scala> val myMap = Map("k1" -> true)
myMap: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true)
// 但可以把immutable與其他map相加, 返回新的值
scala> val myMap2 = myMap + ("k2" -> false)
myMap2: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true, k2 -> false)
scala> myMap2.get("k8").isEmpty
res147: Boolean = true
- sortBy | sortByKey | top
引用:Spark: sortBy和sortByKey函數(shù)詳解
// sortBy
// 本地創(chuàng)建, 測試該函數(shù)
scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
scala> rdd.sortBy(x => x, false).collect
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
// 默認(rèn)的partitions = 6
scala> result.partitions.size
res9: Int = 6
// 這里我們可以設(shè)置partitions的數(shù)量
scala> val result = rdd.sortBy(x => x, false, 1)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
scala> result.partitions.size
res10: Int = 1
// sortByKey
scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:25
scala> val b = sc. parallelize (1 to a.count.toInt , 2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[86] at parallelize at <console>:27
scala> b.collect
res60: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[92] at zip at <console>:29
scala> c.sortByKey().collect
res61: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))
// top取出按key倒序排列的的top N元素, 注意使用top不需要進(jìn)行sortBy操作, 它自帶操作
scala> c.top(3)
res63: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com))
// 默認(rèn)是升序排列
scala> c.sortByKey().collect
res64: Array[(Int, String)] = Array((1,wyp), (2,iteblog), (3,com), (4,397090770), (5,test))
scala> c.sortByKey(false).collect
res66: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com), (2,iteblog), (1,wyp))
// top 注意, 如果為rdd, 且, 結(jié)構(gòu)為(k,v), 那么使用top函數(shù)進(jìn)行排序時, v中不能含有Array[Long], 但可以含有l(wèi)ong
scala> val rdd2 = sc.parallelize(List((10, ("a", Array(1,2))), (9, ("b", Array(3,5))), (1, ("c", Array(6,0)))))
rdd2: org.apache.spark.rdd.RDD[(Int, (String, Array[Int]))] = ParallelCollectionRDD[134] at parallelize at <console>:26
scala> rdd2.top(1)
<console>:29: error: No implicit Ordering defined for (Int, (String, Array[Int])).
rdd2.top(1)
// 但可以含有l(wèi)ong
scala> val rdd2 = sc.parallelize(List((10, ("a", 11)), (9, ("b", 10)), (100, ("c", 20))))
rdd2: org.apache.spark.rdd.RDD[(Int, (String, Int))] = ParallelCollectionRDD[138] at parallelize at <console>:26
scala> rdd2.top(2)
res209: Array[(Int, (String, Int))] = Array((100,(c,20)), (10,(a,11)))
- 數(shù)組Array.grouped
// 將數(shù)組, 分成N組:
scala> val a = (1 to 9).toArray
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> a.grouped(3).toArray
res178: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
- zip函數(shù)
// 原來:
reduceByKey{case ((s1, c1), (s2, c2)) =>
val n1 = s1.split("\t")(0).toLong + s2.split("\t")(0).toLong
val n2 = s1.split("\t")(1).toLong + s2.split("\t")(1).toLong
val n3 = s1.split("\t")(2).toLong + s2.split("\t")(2).toLong
val n4 = s1.split("\t")(3).toLong + s2.split("\t")(3).toLong
val n5 = s1.split("\t")(4).toLong + s2.split("\t")(4).toLong
val statusTrueNumStr = Array(n1, n2, n3, n4, n5).mkString("\t")
val count = c1 + c2
// 使用zip后:
val rddLastOneWeek2 = rddLastOneWeek.map{case (_, bigVersion, arrStatusTrueNum, isStable, count) =>
((bigVersion, isStable), (arrStatusTrueNum, count))
}.reduceByKey{case ((arr1, count1), (arr2, count2)) =>
val arr = arr1.zip(arr2).map{case (x,y) => x+y}
val count = count1 + count2
- zipWithIndex
scala> l
res21: List[Int] = List(1, 2, 3, 4)
scala> l.zipWithIndex
res22: List[(Int, Int)] = List((1,0), (2,1), (3,2), (4,3))