spark combineByKey常用的數(shù)據(jù)操作

聚合函數(shù)combineByKey

將RDD[k,v]轉(zhuǎn)化為RDD[k,c],利用該函數(shù)可以實現(xiàn)reduceByKey函數(shù)的功能饭望。也可以實現(xiàn)類似于join的操作

參數(shù)簡介

  • createCombiner: V => C

處理每個分區(qū)數(shù)據(jù)時荤胁,如果遇到key沒有出現(xiàn)的仰美,就會創(chuàng)建一個該鍵對應(yīng)的累加器初始值,每個分區(qū)相互獨立。

  • mergeValue: (C, V) => C

處理每個分區(qū)數(shù)據(jù)時萍诱,如果遇到key已經(jīng)出現(xiàn)三圆,則利用mergeValue進(jìn)行合并處理狞换。

  • mergeCombiners: (C, C) => C

所有分區(qū)數(shù)據(jù)處理完成后,利用mergeCombiners對各個分區(qū)的累加器進(jìn)行再次合并

實現(xiàn)reduceByKey函數(shù)

將List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的數(shù)據(jù)按照key舟肉,對value做求和計算修噪,順帶統(tǒng)計次數(shù)

val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定義一個元組類型(科目計數(shù)器,分?jǐn)?shù))
val combReault = rdd.combineByKey(
  score => (1, score),
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印計算結(jié)果
combReault.collect().foreach(println)
//結(jié)果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))

實現(xiàn)join操作

spark實現(xiàn)join操作非常簡單 rddA.join(rddB)即可實現(xiàn)

def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
  (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//結(jié)果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))

跟sql的left join類似

  • 下面我們嘗試使用spark sql來實現(xiàn)join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
    def main(args: Array[String]) {
    val spark = SparkSession
                .builder().master("local[*]")
                .appName("Spark SQL data sources example")
                .config("spark.some.config.option", "some-value")
                .getOrCreate()
    val sc = spark.sparkContext
    val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
    ("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes =>         Row(attributes._1, attributes._2))
    val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3",              "home"))).map(attributes => Row(attributes._1, attributes._2))
    val schemaString = "key name"
    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val dataA = spark.createDataFrame(rddA, schema)
    dataA.createOrReplaceTempView("dataA")
    val dataB = spark.createDataFrame(rddB, schema)
    dataB.createOrReplaceTempView("dataB")
    dataA.show()
    dataB.show()
    val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
    val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
    spark.stop()
    }
}
//結(jié)果
+---+-------+----------+
|key|   name|     name2|
+---+-------+----------+
|  3|   good|      home|
|  1|    www|songshifan|
|  1|iteblog|songshifan|
|  1|    com|songshifan|
|  2|    bbs|   haiyang|
|  2|iteblog|   haiyang|
|  2|    com|   haiyang|
+---+-------+----------+
  • 注意:在使用spark-session時,總是會報SparkSession類找不到的錯誤路媚,這是因為我們的代碼是運行在本地環(huán)境中割按,maven在打包的時候沒有把Spark-session相關(guān)的內(nèi)容打到我們的package中,這一點可以將編譯好的jar包解壓到相應(yīng)的目錄下找找看磷籍。

  • 解決辦法:在編輯器運行時适荣,強(qiáng)制指定依賴的jar包。

  • 疑問:之前測試過1.4版本的院领,寫好的代碼不把依賴jar包打入我們的jar包中弛矛,提交集群時會報錯,所以1把所有依賴包都打入jar包中比然,2 在執(zhí)行時用--jars參數(shù)去提交機(jī)器上找jar包≌擅ィ現(xiàn)在有一種說法是運行環(huán)境已經(jīng)把依賴包都放在創(chuàng)建的執(zhí)行器中,不必再加入依賴jar包。這個需要繼續(xù)研究万俗、測試湾笛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市闰歪,隨后出現(xiàn)的幾起案子嚎研,更是在濱河造成了極大的恐慌,老刑警劉巖库倘,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件临扮,死亡現(xiàn)場離奇詭異,居然都是意外死亡教翩,警方通過查閱死者的電腦和手機(jī)杆勇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來饱亿,“玉大人蚜退,你說我怎么就攤上這事”肓” “怎么了关霸?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長杰扫。 經(jīng)常有香客問我队寇,道長,這世上最難降的妖魔是什么章姓? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任佳遣,我火速辦了婚禮,結(jié)果婚禮上凡伊,老公的妹妹穿的比我還像新娘零渐。我一直安慰自己,他們只是感情好系忙,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布诵盼。 她就那樣靜靜地躺著,像睡著了一般银还。 火紅的嫁衣襯著肌膚如雪风宁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天蛹疯,我揣著相機(jī)與錄音戒财,去河邊找鬼。 笑死捺弦,一個胖子當(dāng)著我的面吹牛饮寞,可吹牛的內(nèi)容都是我干的孝扛。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼幽崩,長吁一口氣:“原來是場噩夢啊……” “哼苦始!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起慌申,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤陌选,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后太示,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡香浩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年类缤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片邻吭。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡餐弱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出囱晴,到底是詐尸還是另有隱情膏蚓,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布畸写,位于F島的核電站驮瞧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏枯芬。R本人自食惡果不足惜论笔,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望千所。 院中可真熱鬧狂魔,春花似錦、人聲如沸淫痰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽待错。三九已至籽孙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間火俄,已是汗流浹背蚯撩。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留烛占,地道東北人胎挎。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓沟启,卻偏偏與公主長得像,于是被迫代替她去往敵國和親犹菇。 傳聞我的和親對象是個殘疾皇子德迹,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容