Spark 例子
最常用的轉(zhuǎn)換操作有兩個(gè): map
和 filter
,map(func)是將func應(yīng)用到所有元素怔匣,得到一個(gè)新的RDD。filter是將func返回為true的元素過濾出來,組成一個(gè)新的RDD。一些比較常用的轉(zhuǎn)換如下:
-
map(func)
返回一個(gè)新的分布式數(shù)據(jù)集护戳,將數(shù)據(jù)源的每一個(gè)元素傳遞給函數(shù) func 映射組成翎冲。 -
filter(func)
返回一個(gè)新的數(shù)據(jù)集垂睬,從數(shù)據(jù)源中選中一些元素通過函數(shù) func 返回 true。 -
flatMap(func)
類似于 map抗悍,但是每個(gè)輸入項(xiàng)能被映射成多個(gè)輸出項(xiàng)(所以 func 必須返回一個(gè) Seq驹饺,而不是單個(gè) item)。 -
union(otherDataset)
兩個(gè)RDD求并集 -
intersection(otherDataset)
兩個(gè)RDD求交集 -
groupByKey()
作用于(K,V)的數(shù)據(jù)集缴渊,依據(jù)K對(duì)值進(jìn)行歸并赏壹,返回一個(gè)(K, Iterable) -
reduceByKey(func)
作用于(K,V)的數(shù)據(jù)集,依據(jù)K對(duì)值使用func進(jìn)行歸約衔沼,返回一個(gè)(K,V)數(shù)據(jù)集 -
sortByKey([asending])
返回一個(gè)依據(jù)K進(jìn)行排序的數(shù)據(jù)集
最常用的動(dòng)作就是reduce蝌借,將數(shù)據(jù)集歸約為一個(gè)結(jié)果。一些比較常用的動(dòng)作如下:
-
reduce(func)
按照func函數(shù)對(duì)數(shù)據(jù)集進(jìn)行歸約指蚁,func接受兩個(gè)參數(shù)菩佑,返回一個(gè)結(jié)果,須滿足結(jié)合律和交換律凝化,以便于分布式計(jì)算稍坯。 -
count()
返回?cái)?shù)據(jù)集的元素個(gè)數(shù) -
first()
返回第一個(gè)元素 -
take(n)
以數(shù)組形式返回集合的前n個(gè)元素 -
saveAsTextFile(path)
將數(shù)據(jù)集保存為文本文件
讀寫文件
val lines = sc.textFile("file:///path_to_local/file")
val lines = sc.textFile("hdfs:///path_to_hdfs/file")
rdd.saveAsTextFile("hdfs://")
如果是parquet格式文件,可以用下面的辦法搓劫,得到一個(gè)DataFrame瞧哟,同樣可以識(shí)別本地及hdfs文件混巧,也可以識(shí)別目錄及正則
val parquetFile = sqlContext.read.parquet("people.parquet")
df.write.save("temp.parquet")
JSON格式文件
val df = sqlContext.read.json("path to json file")
val df = sqlContext.read.format("json").load("path to file")
df.write.format("json").save("path to save")
統(tǒng)計(jì)字符數(shù)
val lines = sc.textFile("data.txt") //讀文件,得到以行字符串為單位的RDD
val lineLengths = lines.map(s => s.length) //轉(zhuǎn)換勤揩,將字符串元素映射為其長(zhǎng)度
val totalLength = lineLengths.reduce((a, b) => a + b) //動(dòng)作咧党,將所有元素加起來
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def FILE_NAME:String = "word_count_results_";
def main(args:Array[String]) {
if (args.length < 1) {
println("Usage:SparkWordCount FileName");
System.exit(1);
}
val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
val sc = new SparkContext(conf);
val textFile = sc.textFile(args(0));
val wordCounts = textFile.flatMap(line => line.split(" ")).map(
word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
println("Word Count program running results are successfully saved.");
}
}
./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt
求平均值
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:AvgAgeCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val count = dataFile.count()
val ageData = dataFile.map(line => line.split(" ")(1))
val totalAge = ageData.map(age => Integer.parseInt(
String.valueOf(age))).collect().reduce((a,b) => a+b)
println("Total Age:" + totalAge + ";Number of People:" + count )
val avgAge : Double = totalAge.toDouble / count.toDouble
println("Average Age is " + avgAge)
}
}
./spark-submit \
--class com.ibm.spark.exercise.basic.AvgAgeCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt
求男性/女性 最高 最低身高
object PeopleInfoCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:PeopleInfoCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val maleData = dataFile.filter(line => line.contains("M")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
val femaleData = dataFile.filter(line => line.contains("F")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
val lowestMale = maleHeightData.sortBy(x => x,true).first()
val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
val highestMale = maleHeightData.sortBy(x => x, false).first()
val highestFemale = femaleHeightData.sortBy(x => x, false).first()
println("Number of Male Peole:" + maleData.count())
println("Number of Female Peole:" + femaleData.count())
println("Lowest Male:" + lowestMale)
println("Lowest Female:" + lowestFemale)
println("Highest Male:" + highestMale)
println("Highest Female:" + highestFemale)
}
}
./spark-submit \
--class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 3g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt
每行數(shù)據(jù)出現(xiàn)的次數(shù)最高的
=============
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopKSearchKeyWords {
def main(args:Array[String]){
if (args.length < 2) {
println("Usage:TopKSearchKeyWords KeyWordsFile K");
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
val sc = new SparkContext(conf)
val srcData = sc.textFile(args(0))
val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
topKData.foreach(println)
}
}