??spark支持多種數據源渤闷,從總體來分分為兩大部分:文件系統(tǒng)和數據庫。
文件系統(tǒng)
??文件系統(tǒng)主要有本地文件系統(tǒng)脖镀、Amazon S3飒箭、HDFS等。
??文件系統(tǒng)中存儲的文件有多種存儲格式蜒灰。spark支持的一些常見格式有:
格式名稱 | 結構化 | 說明 |
---|---|---|
文件文件 | 否 | 普通文件文件弦蹂,每行一條記錄 |
JSON | 半結構化 | 常見的基于文本的半結構化數據 |
CSV | 是 | 常見的基于文本的格式,在電子表格應用中使用 |
SequenceFiles | 是 | 一種用于鍵值對數據的常見Hadoop文件格式 |
文本文件
-
讀取
-
讀取單個文件卷员,參數為文件全路徑盈匾,輸入的每一行都會成為RDD的一個元素。
- python
input = sc.textFile("file://opt/module/spark/README.md")
- scala
val input = sc.textFile("file://opt/module/spark/README.md")
- java
JavaRDD<String> input = sc.textFile("file://opt/module/spark/README.md")
讀取多個文件時毕骡,可以使用textFile將參數改為目錄或以逗號文件的多個文件名即可削饵。如果是小文件,也可以使用wholeTextFiles讀取為一個Pair RDD(鍵是文件名未巫,值是文件內容)窿撬。
val input = sc.wholeTextFiles("file://opt/module/spark/datas") val result = input.mapValues{ y => { val nums = y.split(" ").map(x => x.toDouble) nums.sum / nums.size.toDouble } }
-
- 寫入
??輸出文本文件時,可使用saveAsTextFile()方法接收一個目錄叙凡,將RDD中的內容輸出到目錄中的多個文件中劈伴。
```
result.saveAsTextFile(outputFile)
```
JSON
-
讀取
- 將數據作為文本文件讀取,然后使用JSON解析器對數據進行解析握爷。
- python使用內置庫讀取JSON
import json ... input = sc.textFile("file.json") data = input.map(lambda x: json.loads(x))
- scala使用Jackson讀取JSON
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule ... case class Person(name: String, lovesPandas: Boolean) ... val input = sc.textFile("file.json") val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) val result = input.flatMap(record => { try { Some(mapper.readValue(record, classOf[Person])) } catch { case e: Exception => None } })
- java使用Jackson讀取JSON
class ParseJson implements FlatMapFunction<Iterator<String>, Person> { public Iterable<Person> call(Iterator<String> lines) throws Exception { ArrayList<Person> people = new ArrayList<Person>(); ObjectMapper mapper = new ObjectMapper(); while(lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch(Exception e) { //跳過失敗的數據 } } return people; } } JavaRDD<String> input = sc.textFile("file.json"); JavaRDD<Person> result = input.mapPartitions(new ParseJson());
-
寫入
- 使用JSON解析器將結構化的RDD轉為字符串RDD跛璧,然后使用文本文件API輸出。
- python
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)
- scala
result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
- java
class WriteJson implements FlatMapFunction<Iterator<Person>, String> { public Iterable<String> call(Iterator<Person> people) throws Exception { ArrayList<String> text = new ArrayList<String>(); ObjectMapper mapper = new ObjectMapper(); while(people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text; } } JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas()); JavaRDD<String> formatted = result.mapPartitions(new WriteJson()); formatted.saveAsTextFile(outfile);
CSV與TSV
??CSV與TSV文件每行都有固定的字段新啼,字段之間使用分隔符(CSV使用逗號追城;tsv使用制表符)分隔。
-
讀取
將csv或tsv文件當作普通文本文件讀取燥撞,然后使用響應的解析器進行解析座柱,同json處理方式迷帜。
-
python使用內置庫讀取csv
- 文件中所有字段沒有包含換行符
import csv import StringIO ... def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"]) return reader.next() """讀取每行記錄""" input = sc.textFile(inputFile).map(loadRecord)
- 文件中的字段包含換行符
def loadRecords(fileNameContents): input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"]) return reader """讀取整個文件""" fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
-
scala使用opencsv庫讀取csv
- 文件中所有字段沒有包含換行符
import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... val input = sc.textFile(inputFile) val result = input.map{ line => { val reader = new CSVReader(new StringReader(line)) reader.readNext() } }
- 文件中的字段包含換行符
case class Person(name: String, favoriteAnimal: String) val input = sc.wholeTextFiles(inputFile) val result = input.flatMap( case(_, txt) => { val reader = new CSVReader(new StringReader(txt)) reader.readAll().map(x => Person(x(0), x(1))) }
-
java使用opencsv庫讀取csv
- 文件中所有字段沒有包含換行符
import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... public static class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); return reader.readNext(); } } JavaPairRDD<String[]> csvData = sc.textFile(inputFile).map(new ParseLine());
- 文件中的字段包含換行符
public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> { public Iterable<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2); return reader.readAll(); } } JavaRDD<String[]> keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());
-
寫入
- csv或tsv文件輸出時,將個字段轉為指定順序的數組色洞,然后采用普通文本文件的方式進行輸出戏锹。
- python
def writeRecords(records): output = StringIO.StringIO() writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getValue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
- scala
pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{ people => { val stringWriter = new StringWriter() val csvWriter = new CSVWriter(stringWriter) csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) } }.saveAsTextFile(outFile)
SequenceFile
??SequenceFile是鍵值對形式的常用Hadoop數據格式。由于Hadoop使用一套自定義的序列化框架火诸,因此SequenceFile的鍵值對類型需實現(xiàn)Hadoop的Writable接口锦针。
-
讀取
- python
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
- scala
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}
- java
public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> { public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) { return new Tuple2(record._1.toString(), record._2.get()); } } JavaPairRDD<String, Integer> result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());
-
寫入
- python
data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)]) data.saveAsSequeceFile(outputFile)
- scala
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2))) data.saveAsSequenceFile(outputFile)
- java(java中沒有saveAsSequenceFile方法,用自定義hadoop格式的方式實現(xiàn))
public static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> { public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) { return new Tuple2(new Text(record._1), new IntWritable(record._2)); } } JavaPairRDD<Text, IntWritable> result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
數據庫
??數據庫主要分為關系型數據庫(MySQL惭蹂、PostgreSQL等)和非關系型數據庫(HBase伞插、ElasticSearch等)。
JDBC數據庫連接
??spark使用JDBC訪問關系型數據庫(MySQL盾碗、PostgreSQL等),只需要構建一個org.apache.spark.rdd.JdbcRDD即可舀瓢。
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection,
"SELECT * FROM panda WHERE id >= ? AND id <= ?"),
lowerBound = 1, upperBound = 3,
numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
HBase
??spark通過Hadoop輸入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)訪問HBase廷雅。這種格式返回鍵值對數據,鍵類型為org.apache.hadoop.hbase.io.ImmutableBytesWritable京髓,值類型為org.apache.hadoop.hbase.client.Result航缀。
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], ClassOf[Result])
ElasticSearch
??spark使用ElasticSearch-Hadoop連接器從ElasticSearch中讀寫數據。ElasticSearch連接器依賴于SparkContext設置的配置項堰怨。ElasticSearch連接器也沒有用到Spark封裝的類型芥玉,而使用saveAsHadoopDataSet。
- 讀取
def mapWritableToInput(in: MapWritable): Map[String, String] = {
in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args[1])
jobConf.set(ConfigurationOptions.ES_NODES, args[2])
val currentTweets = sc.hadoopRDD(jobConf, classOf[EsInputFormat[Object, MapWritable]], classOf[Object], ClassOf[MapWritable])
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
- 寫入
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)