首先,介紹一下scala語言:
Scala 是一種把面向?qū)ο蠛秃瘮?shù)式編程理念加入到靜態(tài)類型語言中的混血兒肚豺。
為什么學(xué)scala?
spark提供了R界拦、Python等語言的接口吸申,為什么還要重新學(xué)一門新的語言呢?
1、spark本身就是用scala寫的截碴,采用與底層框架相同的語言有很多好處梳侨,例如以后你要看源碼......
2、性能開銷小日丹,scala可以直接編譯運行在java的JVM上
3走哺、能用上最新的版本。一般新版本都是最先支持scala哲虾,雖然現(xiàn)在python的接口也在不斷的豐富
4丙躏、到了工作崗位,你的師父(都是有幾年相關(guān)經(jīng)驗的)束凑,前期由于python的支持還沒有像scala那樣完善晒旅,因此會從scala開始使用spark的,你不學(xué)scala還讓你師父轉(zhuǎn)python巴羲摺敢朱!
新手學(xué)習(xí)Spark編程,在熟悉了Scala語言的基礎(chǔ)上摩瞎,首先需要對以下常用的Spark算子或者Scala函數(shù)比較熟悉拴签,才能開始動手寫能解決實際業(yè)務(wù)的代碼。
簡單來說旗们,Spark 算子大致可以分為以下兩類:
-
Transformation 變換/轉(zhuǎn)換算子:這種變換并不觸發(fā)提交作業(yè)蚓哩,完成作業(yè)中間過程處理。
Transformation 操作是延遲計算的上渴,也就是說從一個RDD 轉(zhuǎn)換生成另一個 RDD 的轉(zhuǎn)換操作不是馬上執(zhí)行岸梨,需要等到有 Action 操作的時候才會真正觸發(fā)運算。 -
Action 行動算子:這類算子會觸發(fā) SparkContext 提交 Job 作業(yè)稠氮。
Action 算子會觸發(fā) Spark 提交作業(yè)(Job)曹阔,并將數(shù)據(jù)輸出 Spark系統(tǒng)。
從小方向來說隔披,Spark 算子大致可以分為以下三類:
- Value數(shù)據(jù)類型的Transformation算子赃份,這種變換并不觸發(fā)提交作業(yè),針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù)奢米。
- Key-Value數(shù)據(jù)類型的Transfromation算子抓韩,這種變換并不觸發(fā)提交 作業(yè),針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對鬓长。
- Action算子谒拴,這類算子會觸發(fā)SparkContext提交Job作業(yè)
下面是我以前總結(jié)的一些常用的Spark算子以及Scala函數(shù):
map():將原來 RDD 的每個數(shù)據(jù)項通過 map 中的用戶自定義函數(shù) f 映射轉(zhuǎn)變?yōu)橐粋€新的元素。
mapPartitions(function) :map()的輸入函數(shù)是應(yīng)用于RDD中每個元素涉波,而mapPartitions()的輸入函數(shù)是應(yīng)用于每個分區(qū)英上。
mapValues(function) :?該操作只會??改動value
flatMap(function) :并將生成的 RDD 的每個集合中的元素合并為一個集合
flatMapValues(function):通過上面的例子可知炭序,該操作也是只操作value,不改變key苍日。
reduceByKey(func,numPartitions:用于對每個key對應(yīng)的多個value進行merge操作
groupByKey(numPartitions):將元素通過函數(shù)生成相應(yīng)的 Key惭聂,數(shù)據(jù)就轉(zhuǎn)化為 Key-Value 格式,之后將 Key 相同的元素分為一組易遣。
sortByKey(accending彼妻,numPartitions)
cogroup(otherDataSet,numPartitions)
join(otherDataSet,numPartitions):找出左右相同同的記錄
LeftOutJoin(otherDataSet豆茫,numPartitions):以左邊表為準(zhǔn)侨歉,逐條去右邊表找相同字段,如果有多條會依次列出
RightOutJoin(otherDataSet, numPartitions)
lookup():查詢指定的key,u返回其對應(yīng)的value。
filter(): filter 函數(shù)功能是對元素進行過濾娶吞,對每個 元 素 應(yīng) 用 f 函 數(shù), 返 回 值 為 true 的 元 素 在RDD 中保留牵舵,返回值為 false 的元素將被過濾掉。
full outer join()包括兩個表的join結(jié)果倦挂,左邊在右邊中沒找到的結(jié)果(NULL)畸颅,右邊在左邊沒找到的結(jié)果,F(xiàn)ULL OUTER JOIN 關(guān)鍵字結(jié)合了 LEFT JOIN 和 RIGHT JOIN 的結(jié)果方援。
collect():函數(shù)可以提取出所有rdd里的數(shù)據(jù)項:RDD——>數(shù)組(collect用于將一個RDD轉(zhuǎn)換成數(shù)組没炒。)
reduce():根據(jù)映射函數(shù)f,對RDD中的元素進行二元計算犯戏,返回計算結(jié)果送火。
count():返回RDD內(nèi)元素的個數(shù)
first():返回RDD內(nèi)的第一個元素,first相當(dāng)于top(1)
top:top可返回最大的k個元素先匪。
case:匹配种吸,更多用于 PartialFunction(偏函數(shù))中 {case …}
saveAsTextFile:函數(shù)將數(shù)據(jù)輸出,存儲到 HDFS 的指定目錄
cache : cache 將 RDD 元素從磁盤緩存到內(nèi)存呀非,內(nèi)部默認(rèn)會調(diào)用persist(StorageLevel.MEMORY_ONLY)坚俗,也就是說它無法自定義緩存級別的。
persist():與cache一樣都是將一個RDD進行緩存姜钳,在之后的使用過程匯總不需要重新的計算了坦冠。它比cache靈活,可以通過自定義
StorageLevel類型參數(shù)哥桥,來定義緩存的級別。coalesce():對RDD的分區(qū)進行?在分區(qū)激涤,(用于分區(qū)數(shù)據(jù)分布不均勻的情況拟糕,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū))
reparation:與coalesce功能一樣判呕,它只是coalesce中shuffle設(shè)置為true的簡易實現(xiàn)。(數(shù)據(jù)不經(jīng)過shuffle是無法將RDD的分區(qū)變多的)
distinct(): distinct將RDD中的元素進行去重操作
subtract(): subtract相當(dāng)于進行集合的差操作送滞,RDD 1去除RDD 1和RDD 2交集中的所有元素侠草。
1、map是對RDD中的每個元素都執(zhí)行一個指定的函數(shù)來產(chǎn)生一個新的RDD犁嗅。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應(yīng)边涕。
val a = sc.parallelize(1 to 9, 3)
# x =>*2是一個函數(shù),x是傳入?yún)?shù)即RDD的每個元素褂微,x*2是返回值
val b = a.map(x => x*2)
a.collect
# 結(jié)果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect
# 結(jié)果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
list/key--->key-value
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x, 1))
b.collect.foreach(println(_))
# /*
# (dog,1)
# (tiger,1)
# (lion,1)
# (cat,1)
# (panther,1)
# ( eagle,1)
# */
val l=sc.parallelize(List((1,'a'),(2,'b')))
var ll=l.map(x=>(x._1,"PV:"+x._2)).collect()
ll.foreach(println)
# (1,PVa)
# (2,PVb)
=================================================================
2功蜓、mapPartitions(function)
map()的輸入函數(shù)是應(yīng)用于RDD中每個元素,而mapPartitions()的輸入函數(shù)是應(yīng)用于每個分區(qū)
package test
import scala.Iterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TestRdd {
def sumOfEveryPartition(input: Iterator[Int]): Int = {
var total = 0
input.foreach { elem =>
total += elem
}
total
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Rdd Test")
val spark = new SparkContext(conf)
val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6個元素宠蚂,分成2個partition
val result = input.mapPartitions(
partition => Iterator(sumOfEveryPartition(partition)))//partition是傳入的參數(shù)式撼,是個list,要求返回也是list求厕,即Iterator(sumOfEveryPartition(partition))
result.collect().foreach {
println(_)
# 6 15,分區(qū)計算和
}
spark.stop()
}
}
=================================================================
3著隆、mapValues(function)
原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素呀癣。因此美浦,該函數(shù)只適用于元素為KV對的RDD
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
# //結(jié)果
# Array(
# (3,xdogx),
# (5,xtigerx),
# (4,xlionx),
# (3,xcatx),
# (7,xpantherx),
# (5,xeaglex)
# )
# val grouped = mds.groupBy(md => md.matched)
# grouped.mapValues(x => x.size).foreach(println)
=================================================================
4、flatMap(function)
與map類似项栏,區(qū)別是原RDD中的元素經(jīng)map處理后只能生成一個元素浦辨,而原RDD中的元素經(jīng)flatmap處理后可生成多個元素
val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)//每個元素擴展
b.collect
/*
結(jié)果 Array[Int] = Array( 1,
1, 2,
1, 2, 3,
1, 2, 3, 4)
*/
===============================================
5、flatMapValues(function)
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=>1 to x)
b.collect.foreach(println(_))
/*結(jié)果
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
*/
val list = List(("mobin",22),("kpop",20),("lufei",23))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.flatMapValues(x => Seq(x,"male"))
mapValuesRDD.foreach(println)
輸出:
(mobin,22)
(mobin,male)
(kpop,20)
(kpop,male)
(lufei,23)
(lufei,male)
如果是mapValues會輸出:【對比區(qū)別】
(mobin,List(22, male))
(kpop,List(20, male))
(lufei,List(23, male))
=================================================================
6忘嫉、reduceByKey(func,numPartitions):按Key進行分組荤牍,使用給定的func函數(shù)聚合value值, numPartitions設(shè)置分區(qū)數(shù),提高作業(yè)并行度
val arr = List(("A",3),("A",2),("B",1),("B",3))
val rdd = sc.parallelize(arr)
val reduceByKeyRDD = rdd.reduceByKey(_ +_)
reduceByKeyRDD.foreach(println)
sc.stop
# (A,5)
# (A,4)
=================================================================
7庆冕、groupByKey(numPartitions):按Key進行分組康吵,返回[K,Iterable[V]],numPartitions設(shè)置分區(qū)數(shù)访递,提高作業(yè)并行度【value并不是累加晦嵌,而是變成一個數(shù)組】
//省略
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val groupByKeyRDD = rdd.groupByKey()
groupByKeyRDD.foreach(println)
sc.stop
# (B,CompactBuffer(2, 3))
# (A,CompactBuffer(1, 2))
# 統(tǒng)計key后面的數(shù)組匯總元素的個數(shù)
scala> groupByKeyRDD.mapValues(x => x.size).foreach(println)
# (A,2)
# (B,2)
=================================================================
8、sortByKey(accending拷姿,numPartitions):返回以Key排序的(K,V)鍵值對組成的RDD惭载,accending為true時表示升序,為false時表示降序响巢,numPartitions設(shè)置分區(qū)數(shù)描滔,提高作業(yè)并行度。
//省略sc
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val sortByKeyRDD = rdd.sortByKey()
sortByKeyRDD.foreach(println)
sc.stop
# (A,1)
# (A,2)
# (B,2)
# (B,3)
# 統(tǒng)計單詞的詞頻
val rdd = sc.textFile("/home/scipio/README.md")
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
val wcsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
wcsort.saveAsTextFile("/home/scipio/sort.txt")
# 升序的話踪古,sortByKey(true)
=================================================================
9含长、cogroup(otherDataSet券腔,numPartitions):對兩個RDD(如:(K,V)和(K,W))相同Key的元素先分別做聚合,最后返回(K,Iterator<V>,Iterator<W>)形式的RDD,numPartitions設(shè)置分區(qū)數(shù)拘泞,提高作業(yè)并行度
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd1 = sc.parallelize(arr, 3)
val rdd2 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd1.cogroup(rdd2)
groupByKeyRDD.foreach(println)
sc.stop
# (B,(CompactBuffer(2, 3),CompactBuffer(B1, B2)))
# (A,(CompactBuffer(1, 2),CompactBuffer(A1, A2)))
=================================================================
10纷纫、join(otherDataSet,numPartitions):對兩個RDD先進行cogroup操作形成新的RDD,再對每個Key下的元素進行笛卡爾積陪腌,numPartitions設(shè)置分區(qū)數(shù)辱魁,提高作業(yè)并行度
//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd.join(rdd1)
groupByKeyRDD.foreach(println)
# (B,(2,B1))
# (B,(2,B2))
# (B,(3,B1))
# (B,(3,B2))
# (A,(1,A1))
# (A,(1,A2))
# (A,(2,A1))
# (A,(2,A2
=================================================================
11、LeftOutJoin(otherDataSet诗鸭,numPartitions):左外連接染簇,包含左RDD的所有數(shù)據(jù),如果右邊沒有與之匹配的用None表示,numPartitions設(shè)置分區(qū)數(shù)只泼,提高作業(yè)并行度
//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3),("C",1))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val leftOutJoinRDD = rdd.leftOuterJoin(rdd1)
leftOutJoinRDD .foreach(println)
sc.stop
# (B,(2,Some(B1)))
# (B,(2,Some(B2)))
# (B,(3,Some(B1)))
# (B,(3,Some(B2)))
# (C,(1,None))
# (A,(1,Some(A1)))
# (A,(1,Some(A2)))
# (A,(2,Some(A1)))
# (A,(2,Some(A2)))
=================================================================
12剖笙、RightOutJoin(otherDataSet, numPartitions):右外連接,包含右RDD的所有數(shù)據(jù)请唱,如果左邊沒有與之匹配的用None表示,numPartitions設(shè)置分區(qū)數(shù)弥咪,提高作業(yè)并行度
//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"),("C","C1"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val rightOutJoinRDD = rdd.rightOuterJoin(rdd1)
rightOutJoinRDD.foreach(println)
sc.stop
# (B,(Some(2),B1))
# (B,(Some(2),B2))
# (B,(Some(3),B1))
# (B,(Some(3),B2))
# (C,(None,C1))
# (A,(Some(1),A1))
# (A,(Some(1),A2))
# (A,(Some(2),A1))
# (A,(Some(2),A2))
=================================================================
13、lookup()
var rdd1=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
# rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[81] at parallelize at
rdd1.lookup(1)
# res34: Seq[String] = WrappedArray(a)
=================================================================
14十绑、filter()
val filterRdd = sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
filterRdd.collect
# res5: Array[Int] = Array(6, 8, 10)
=================================================================
16聚至、collect()
scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.collect
# res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
=================================================================
17、reduce()
scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
# 求和
scala> rdd1.reduce(_ + _)
# res18: Int = 55
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
# rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
# 分項求和
scala> rdd2.reduce((x,y) => {
| (x._1 + y._1,x._2 + y._2)
| })
res21: (String, Int) = (CBBAA,6)
=================================================================
18本橙、count()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd1.count
# res15: Long = 3
=================================================================
19扳躬、first()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
scala> rdd1.first
# res14: (String, String) = (A,1)
=================================================================
21、case
scala> val aa=List(1,2,3,"asa")
# aa: List[Any] = List(1, 2, 3, asa)
scala> aa. map {
| case i: Int => i + 1
| case s: String => s.length
| }
# res16: List[Int] = List(2, 3, 4, 3)
補充:reduceByKeyt與groupByKey的區(qū)別甚亭?
[優(yōu)化代碼的最基本思路]
(1)當(dāng)采用reduceByKeyt時贷币,Spark可以在每個分區(qū)移動數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個共用的key結(jié)合。借助下圖可以理解在reduceByKey里究竟發(fā)生了什么亏狰。 注意在數(shù)據(jù)對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數(shù))役纹。然后lamdba函數(shù)在每個區(qū)上被再次調(diào)用來將所有值reduce成一個最終結(jié)果。整個過程如下:
(2)當(dāng)采用groupByKey時暇唾,由于它不接收函數(shù)促脉,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節(jié)點之間的開銷很大策州,導(dǎo)致傳輸延時瘸味。整個過程如下:
因此,在對大數(shù)據(jù)進行復(fù)雜計算時够挂,reduceByKey優(yōu)于groupByKey旁仿。
另外,如果僅僅是group處理孽糖,那么以下函數(shù)應(yīng)該優(yōu)先于 groupByKey :
《∈拧(1)combineByKey 組合數(shù)據(jù)汁胆,但是組合之后的數(shù)據(jù)類型與輸入時值的類型不一樣梭姓。
∷住(2)foldByKey合并每一個 key 的所有值,在級聯(lián)函數(shù)和“零值”中使用誉尖。