Spark 例子

Spark 例子

最常用的轉(zhuǎn)換操作有兩個(gè): mapfilter ,map(func)是將func應(yīng)用到所有元素怔匣,得到一個(gè)新的RDD。filter是將func返回為true的元素過濾出來,組成一個(gè)新的RDD。一些比較常用的轉(zhuǎn)換如下:

  • map(func) 返回一個(gè)新的分布式數(shù)據(jù)集护戳,將數(shù)據(jù)源的每一個(gè)元素傳遞給函數(shù) func 映射組成翎冲。
  • filter(func) 返回一個(gè)新的數(shù)據(jù)集垂睬,從數(shù)據(jù)源中選中一些元素通過函數(shù) func 返回 true。
  • flatMap(func) 類似于 map抗悍,但是每個(gè)輸入項(xiàng)能被映射成多個(gè)輸出項(xiàng)(所以 func 必須返回一個(gè) Seq驹饺,而不是單個(gè) item)。
  • union(otherDataset) 兩個(gè)RDD求并集
  • intersection(otherDataset) 兩個(gè)RDD求交集
  • groupByKey() 作用于(K,V)的數(shù)據(jù)集缴渊,依據(jù)K對(duì)值進(jìn)行歸并赏壹,返回一個(gè)(K, Iterable)
  • reduceByKey(func) 作用于(K,V)的數(shù)據(jù)集,依據(jù)K對(duì)值使用func進(jìn)行歸約衔沼,返回一個(gè)(K,V)數(shù)據(jù)集
  • sortByKey([asending]) 返回一個(gè)依據(jù)K進(jìn)行排序的數(shù)據(jù)集

最常用的動(dòng)作就是reduce蝌借,將數(shù)據(jù)集歸約為一個(gè)結(jié)果。一些比較常用的動(dòng)作如下:

  • reduce(func) 按照func函數(shù)對(duì)數(shù)據(jù)集進(jìn)行歸約指蚁,func接受兩個(gè)參數(shù)菩佑,返回一個(gè)結(jié)果,須滿足結(jié)合律和交換律凝化,以便于分布式計(jì)算稍坯。
  • count() 返回?cái)?shù)據(jù)集的元素個(gè)數(shù)
  • first() 返回第一個(gè)元素
  • take(n) 以數(shù)組形式返回集合的前n個(gè)元素
  • saveAsTextFile(path) 將數(shù)據(jù)集保存為文本文件

讀寫文件

val lines = sc.textFile("file:///path_to_local/file")  
val lines = sc.textFile("hdfs:///path_to_hdfs/file")  
rdd.saveAsTextFile("hdfs://")

如果是parquet格式文件,可以用下面的辦法搓劫,得到一個(gè)DataFrame瞧哟,同樣可以識(shí)別本地及hdfs文件混巧,也可以識(shí)別目錄及正則

val parquetFile = sqlContext.read.parquet("people.parquet")  
df.write.save("temp.parquet")  

JSON格式文件

val df = sqlContext.read.json("path to json file")  
val df = sqlContext.read.format("json").load("path to file")  
df.write.format("json").save("path to save")

統(tǒng)計(jì)字符數(shù)

val lines = sc.textFile("data.txt")     //讀文件,得到以行字符串為單位的RDD  
val lineLengths = lines.map(s => s.length)    //轉(zhuǎn)換勤揩,將字符串元素映射為其長(zhǎng)度   
val totalLength = lineLengths.reduce((a, b) => a + b)   //動(dòng)作咧党,將所有元素加起來 

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
import org.apache.spark.SparkContext._  
  
object SparkWordCount {  
 def FILE_NAME:String = "word_count_results_";  
   
 def main(args:Array[String]) {  
 if (args.length < 1) {  
 println("Usage:SparkWordCount FileName");  
 System.exit(1);  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");  
 val sc = new SparkContext(conf);  
 val textFile = sc.textFile(args(0));  
 val wordCounts = textFile.flatMap(line => line.split(" ")).map(  
                                        word => (word, 1)).reduceByKey((a, b) => a + b)  
  
                                          
 wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());  
 println("Word Count program running results are successfully saved.");  
 }  
}  

./spark-submit \  
--class com.ibm.spark.exercise.basic.SparkWordCount \  
--master spark://hadoop036166:7077 \  
--num-executors 3 \  
--driver-memory 6g --executor-memory 2g \  
--executor-cores 2 \  
/home/fams/sparkexercise.jar \  
hdfs://hadoop036166:9000/user/fams/*.txt  

求平均值

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
object AvgAgeCalculator {  
 def main(args:Array[String]) {  
 if (args.length < 1){  
 println("Usage:AvgAgeCalculator datafile")  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")  
 val sc = new SparkContext(conf)  
 val dataFile = sc.textFile(args(0), 5);  
 val count = dataFile.count()  
 val ageData = dataFile.map(line => line.split(" ")(1))  
 val totalAge = ageData.map(age => Integer.parseInt(  
                                String.valueOf(age))).collect().reduce((a,b) => a+b)  
 println("Total Age:" + totalAge + ";Number of People:" + count )  
 val avgAge : Double = totalAge.toDouble / count.toDouble  
 println("Average Age is " + avgAge)  
 }  
}  

./spark-submit \  
 --class com.ibm.spark.exercise.basic.AvgAgeCalculator \  
 --master spark://hadoop036166:7077 \  
 --num-executors 3 \  
 --driver-memory 6g \  
 --executor-memory 2g \  
 --executor-cores 2 \  
 /home/fams/sparkexercise.jar \  
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt  

求男性/女性 最高 最低身高

object PeopleInfoCalculator {  
 def main(args:Array[String]) {  
 if (args.length < 1){  
 println("Usage:PeopleInfoCalculator datafile")  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")  
 val sc = new SparkContext(conf)  
 val dataFile = sc.textFile(args(0), 5);  
 val maleData = dataFile.filter(line => line.contains("M")).map(  
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))  
 val femaleData = dataFile.filter(line => line.contains("F")).map(  
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))  
  
 val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)  
 val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)  
  
 val lowestMale = maleHeightData.sortBy(x => x,true).first()  
 val lowestFemale = femaleHeightData.sortBy(x => x,true).first()  
  
 val highestMale = maleHeightData.sortBy(x => x, false).first()  
 val highestFemale = femaleHeightData.sortBy(x => x, false).first()  
 println("Number of Male Peole:" + maleData.count())  
 println("Number of Female Peole:" + femaleData.count())  
 println("Lowest Male:" + lowestMale)  
 println("Lowest Female:" + lowestFemale)  
 println("Highest Male:" + highestMale)  
 println("Highest Female:" + highestFemale)  
 }  
}  
./spark-submit \  
 --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \  
 --master spark://hadoop036166:7077 \  
 --num-executors 3 \  
 --driver-memory 6g \  
 --executor-memory 3g \  
 --executor-cores 2 \  
 /home/fams/sparkexercise.jar \  
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt  

每行數(shù)據(jù)出現(xiàn)的次數(shù)最高的

=============

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
  
object TopKSearchKeyWords {  
 def main(args:Array[String]){  
 if (args.length < 2) {  
 println("Usage:TopKSearchKeyWords KeyWordsFile K");  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")  
 val sc = new SparkContext(conf)  
 val srcData = sc.textFile(args(0))  
 val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)  
  
 val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)  
 val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }  
 topKData.foreach(println)  
 }  
}  

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市雄可,隨后出現(xiàn)的幾起案子凿傅,更是在濱河造成了極大的恐慌,老刑警劉巖数苫,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件聪舒,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡虐急,警方通過查閱死者的電腦和手機(jī)箱残,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來止吁,“玉大人被辑,你說我怎么就攤上這事【吹耄” “怎么了盼理?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)俄删。 經(jīng)常有香客問我宏怔,道長(zhǎng),這世上最難降的妖魔是什么畴椰? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任臊诊,我火速辦了婚禮,結(jié)果婚禮上斜脂,老公的妹妹穿的比我還像新娘抓艳。我一直安慰自己,他們只是感情好帚戳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布玷或。 她就那樣靜靜地躺著,像睡著了一般片任。 火紅的嫁衣襯著肌膚如雪偏友。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天蚂踊,我揣著相機(jī)與錄音约谈,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛棱诱,可吹牛的內(nèi)容都是我干的泼橘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼迈勋,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼炬灭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起靡菇,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤重归,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后厦凤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鼻吮,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年较鼓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了椎木。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡博烂,死狀恐怖香椎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情禽篱,我是刑警寧澤畜伐,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站躺率,受9級(jí)特大地震影響玛界,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜肥照,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一脚仔、第九天 我趴在偏房一處隱蔽的房頂上張望勤众。 院中可真熱鬧舆绎,春花似錦、人聲如沸们颜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窥突。三九已至努溃,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間阻问,已是汗流浹背梧税。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人第队。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓哮塞,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親凳谦。 傳聞我的和親對(duì)象是個(gè)殘疾皇子忆畅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容