1. 常見數(shù)據(jù)源
文件格式與文件系統(tǒng)
對于存儲在本地文件系統(tǒng)或分布式文件系統(tǒng)(比如NFS脑漫、HDFS街望、Amazon S3 等)中的數(shù)據(jù)飞蚓,Spark 可以訪問很多種不同的文件格式缩擂,包括文本文件鼠冕、JSON、SequenceFile胯盯,以及protocol buffer懈费。Spark SQL中的結(jié)構(gòu)化數(shù)據(jù)源
Spark SQL 模塊,它針對包括JSON 和Apache Hive 在內(nèi)的結(jié)構(gòu)化數(shù)據(jù)源博脑,為我們提供了一套更加簡潔高效的API憎乙。數(shù)據(jù)庫與鍵值存儲
Spark 自帶的庫和一些第三方庫票罐,可以用來連接Cassandra、HBase泞边、Elasticsearch 以及JDBC 源该押。
2.文件格式
2.1 文本文件
將一個文本文件讀取為RDD 時,輸入的每一行都會成為RDD 的一個元素阵谚。
將多個完整的文本文件一次性讀取為一個pair RDD蚕礼,其中鍵是文件名,值是文件內(nèi)容梢什。
2.1.1 讀取本地文件
- 讀取單個文件:textFile(fileName奠蹬,minPartitions) ,如果要控制分區(qū)數(shù)的話绳矩,可以指定minPartitions罩润。
注意:textFile()方法也可以讀取文件夾,將目錄作為參數(shù)翼馆,會將目錄中的數(shù)據(jù)都讀入到RDD中割以。
//在Scala中讀取一個文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
//在Java中讀取一個文件
JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
- 讀取一個目錄中的文件:wholeTextFiles(),方法會返回一個pair RDD应媚,其中鍵是輸入文件的文件名严沥。
Spark 支持讀取給定目錄中的所有文件,以及在輸入路徑中使用通配字符(如part-*.txt)中姜。
//使用wholeTextFiles讀取文件夾
val input = sc.wholeTextFiles("/user/admin/mrwinter/chapter05/txt/")
input.collect().foreach(println)
使用textFile讀取文件夾
val input = sc.textFile("/user/admin/mrwinter/chapter05/txt/")
input.collect().foreach(println)
//在Scala 中求每個文件的平均值
val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}
2.1.2 保存本地文件
saveAsTextFile() 方法接收一個路徑消玄,并將RDD 中的內(nèi)容都輸入到路徑對應(yīng)的文件中。Spark 將傳入的路徑作為目錄對待丢胚,會在那個目錄下輸出多個文件翩瓜。
//在Scala中將數(shù)據(jù)保存為文本文件
inputRDD.saveAsTextFile(outputFile)
2.2 JSON文件
- 讀取JSON 數(shù)據(jù)的最簡單的方式是將數(shù)據(jù)作為文本文件讀取,然后使用JSON 解析器來對RDD 中的值進行映射操作携龟。
- 也可以使用JSON 序列化庫來將數(shù)據(jù)轉(zhuǎn)為字符串兔跌,然后將其寫出去。
- 在Java 和Scala 中也可以使用一個自定義Hadoop 格式來操作JSON 數(shù)據(jù)峡蟋。
- 還可以使用Spark SQL 讀取JSON 數(shù)據(jù)坟桅。
測試json文件:
{"name":"王陽","age":32}
{"name":"李偉","age":22}
{"name":"劉涵","age":41}
{"name":"張麗","age":23}
{"name":"楊梅","age":34}
讀取JSON文件
使用Jackson來解析JSON文件
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
case class Person(name:String,age:Int)
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("/user/admin/mrwinter/chapter05/person.json")
val result = input.collect().map(x => parse(x).extract[Person])
//保存JSON文件用saveASTextFile(outputFile)即可
val save_json = sc.parallelize(result)
save_json.saveAsTextFile("/user/admin/mrwinter/chapter05/save_person.json")
2.3 逗號分隔值與制表符分隔值
逗號分隔值(CSV)文件每行都有固定數(shù)目的字段,字段間用逗號隔開(在制表符分隔值文件蕊蝗,即TSV 文件中用制表符隔開)仅乓。
2.3.1 讀取CSV
讀取CSV/TSV 數(shù)據(jù)和讀取JSON 數(shù)據(jù)相似,都需要先把文件當(dāng)作普通文本文件來讀取數(shù)據(jù)蓬戚,再對數(shù)據(jù)進行處理夸楣。
//CSV文件
holden,panda
hotholden,notpanda
spark,bear
//在Scala 中使用textFile() 讀取CSV
import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile("/user/admin/mrwinter/chapter05/animal.csv")
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
result.collect().foreach(x => println(x(0), x(1)))
//在Java 中使用textFile() 讀取CSV
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
JavaRDD<String> csvFile1 = sc.textFile("/user/admin/mrwinter/chapter05/animal.csv");
JavaPairRDD<String[]> csvData = csvFile1.map(line ->
new CSVReader(new StringReader(line)).readNext();
);
如果在字段中嵌有換行符,就需要完整讀入每個文件,然后解析各段裕偿。如果每個文件都很大洞慎,讀取和解析的過程可能會很不幸地成為性能瓶頸。
2.3.2 保存CSV文件
saveAsTextFile(outputFile)