聚合函數(shù)combineByKey
將RDD[k,v]轉(zhuǎn)化為RDD[k,c],利用該函數(shù)可以實現(xiàn)reduceByKey函數(shù)的功能饭望。也可以實現(xiàn)類似于join的操作
參數(shù)簡介
- createCombiner: V => C
處理每個分區(qū)數(shù)據(jù)時荤胁,如果遇到key沒有出現(xiàn)的仰美,就會創(chuàng)建一個該鍵對應(yīng)的累加器初始值,每個分區(qū)相互獨立。
- mergeValue: (C, V) => C
處理每個分區(qū)數(shù)據(jù)時萍诱,如果遇到key已經(jīng)出現(xiàn)三圆,則利用mergeValue進(jìn)行合并處理狞换。
- mergeCombiners: (C, C) => C
所有分區(qū)數(shù)據(jù)處理完成后,利用mergeCombiners對各個分區(qū)的累加器進(jìn)行再次合并
實現(xiàn)reduceByKey函數(shù)
將List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的數(shù)據(jù)按照key舟肉,對value做求和計算修噪,順帶統(tǒng)計次數(shù)
val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定義一個元組類型(科目計數(shù)器,分?jǐn)?shù))
val combReault = rdd.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印計算結(jié)果
combReault.collect().foreach(println)
//結(jié)果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))
實現(xiàn)join操作
spark實現(xiàn)join操作非常簡單 rddA.join(rddB)即可實現(xiàn)
def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
(2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//結(jié)果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))
跟sql的left join類似
- 下面我們嘗試使用spark sql來實現(xiàn)join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
def main(args: Array[String]) {
val spark = SparkSession
.builder().master("local[*]")
.appName("Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sc = spark.sparkContext
val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes => Row(attributes._1, attributes._2))
val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3", "home"))).map(attributes => Row(attributes._1, attributes._2))
val schemaString = "key name"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val dataA = spark.createDataFrame(rddA, schema)
dataA.createOrReplaceTempView("dataA")
val dataB = spark.createDataFrame(rddB, schema)
dataB.createOrReplaceTempView("dataB")
dataA.show()
dataB.show()
val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
spark.stop()
}
}
//結(jié)果
+---+-------+----------+
|key| name| name2|
+---+-------+----------+
| 3| good| home|
| 1| www|songshifan|
| 1|iteblog|songshifan|
| 1| com|songshifan|
| 2| bbs| haiyang|
| 2|iteblog| haiyang|
| 2| com| haiyang|
+---+-------+----------+
注意:在使用spark-session時,總是會報SparkSession類找不到的錯誤路媚,這是因為我們的代碼是運行在本地環(huán)境中割按,maven在打包的時候沒有把Spark-session相關(guān)的內(nèi)容打到我們的package中,這一點可以將編譯好的jar包解壓到相應(yīng)的目錄下找找看磷籍。
解決辦法:在編輯器運行時适荣,強(qiáng)制指定依賴的jar包。
疑問:之前測試過1.4版本的院领,寫好的代碼不把依賴jar包打入我們的jar包中弛矛,提交集群時會報錯,所以1把所有依賴包都打入jar包中比然,2 在執(zhí)行時用--jars參數(shù)去提交機(jī)器上找jar包≌擅ィ現(xiàn)在有一種說法是運行環(huán)境已經(jīng)把依賴包都放在創(chuàng)建的執(zhí)行器中,不必再加入依賴jar包。這個需要繼續(xù)研究万俗、測試湾笛。