數(shù)據(jù)源類型
- 文件系統(tǒng)中的不同文件格式數(shù)據(jù)源:支持文件系統(tǒng)包括NFS,HDFS,Amazon S3,支持的文件格式包括有:文本文件历葛,JSON恤溶,SequenceFile咒程,protocal buffer
- Spark SQL中的結(jié)構(gòu)化數(shù)據(jù)源: 包括JSON 和 Apache Hive 在內(nèi)的結(jié)構(gòu)化數(shù)據(jù)源
- 數(shù)據(jù)庫與鍵值存儲:可以用來連接 Cassandra稠集、 HBase、Elasticsearch 以及 JDBC 源
文件格式
-
文本文件:當(dāng)我們將一個文本文件讀取為 RDD 時痹籍,輸入的每一行都會成為 RDD 的一個元素蹲缠。也可以將多個完整的文本文件一次性讀取為一個 pair RDD线定,其中鍵是文件名斤讥,值是文件內(nèi)容芭商。
- textFile():返回一個RDD蓉坎,其中文件中的每一行都是一個元素
- wholeTextFile():返回一個 pair RDD蛉艾,其中鍵是輸入文件的文件名勿侯。在每個文件表示一個特定時間段內(nèi)的數(shù)據(jù)時非常有用缴罗。
- saveAsTextFile():將數(shù)據(jù)保存為本地文件:保存之后會生成一個文件夾文件的格式如下所示
averg.saveAsTextFile("averg.out")
其生成的文件內(nèi)容如下:
eversilver@debian:/usr/local/spark2.1/mytest$ ls averg.out/
part-00000 _SUCCESS
eversilver@debian:/usr/local/spark2.1/mytest$ cat averg.out/part-00000
('panda', 2.5)
('coffee', 1.5)
('jxiaolun', 2.0)
('wangcheng', 6.0)
JSON
讀取 JSON 數(shù)據(jù)的最簡單的方式是將數(shù)據(jù)作為文本文件讀取兵钮, 然后使用 JSON 解析器來對 RDD 中的值進(jìn)行映射操作掘譬。
讀取JSON:這種方法假設(shè)文件中的每一行都是一條 JSON 記錄呻拌。如果你有跨行的JSON 數(shù)據(jù), 你就只能讀入整個文件垃喊,然后對每個文件進(jìn)行解析本谜。
#python讀取json數(shù)據(jù)方式
import json
data = input.map(lambda x: json.loads(x))
保存JSON:寫出 JSON 文件比讀取它要簡單得多耕突,因為不需要考慮格式錯誤的數(shù)據(jù)评架,并且也知道要寫出的數(shù)據(jù)的類型。 可以使用之前將字符串 RDD 轉(zhuǎn)為解析好的 JSON 數(shù)據(jù)的庫培遵,將由結(jié)構(gòu)化數(shù)據(jù)組成的 RDD 轉(zhuǎn)為字符串 RDD籽腕,然后使用 Spark 的文本文件 API 寫出去皇耗。
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))
CSV文件
逗號分隔值(CSV)文件每行都有固定數(shù)目的字段,字段間用逗號隔開万伤。記錄通常是一行一條
讀取CSV:對于python可以使用自帶的庫敌买,不需要導(dǎo)入任何包虹钮。在 Scala 和 Java中則使用 opencsv 庫(http://opencsv.sourceforge.net/)
一個CSV文件的例子如下所示:
eversilver@debian:/usr/local/spark2.1/mytest$ cat persons.csv
wangcheng,24,basketball
jxiaolun,22,dance
tom,22,sell
對其讀取的例子如下所示:
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
import io
import csv
def loadRecords(fileNameContents):
"""讀取文件中的所有記錄"""
input = io.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name","age","hobby"])
return reader
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
fullFileData = sc.wholeTextFiles("persons.csv").flatMap(loadRecords)
print(fullFileData.first())
執(zhí)行的結(jié)果如下所示:
eversilver@debian:/usr/local/spark2.1/mytest$ ../bin/spark-submit useCSV.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark2.1/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/05/04 15:32:32 WARN Utils: Your hostname, debian resolves to a loopback address: 127.0.1.1; using 192.168.142.133 instead (on interface eth0)
17/05/04 15:32:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/05/04 15:32:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{'age': '24', 'name': 'wangcheng', 'hobby': 'basketball'}
保存csv:和 JSON 數(shù)據(jù)一樣,寫出 CSV數(shù)據(jù)相當(dāng)簡單宅倒,同樣可以通過重用輸出編碼器來加速拐迁。 CSV 中我們不會在每條記錄中輸出字段名线召,為了使輸出保持一致缓淹,需要創(chuàng)建一種映射關(guān)系。 一種簡單做法是寫一個函數(shù)料仗,用于將各字段轉(zhuǎn)為指定順序的數(shù)組立轧。在Python 中氛改, 如果輸出字典胜卤, CSV 輸出器會根據(jù)創(chuàng)建輸出器時給定的 fieldnames 的順序幫我們完成這一行為葛躏,下面是利用上面讀取csv的結(jié)果進(jìn)行保存:
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
import io
import csv
def loadRecords(fileNameContents):
"""讀取文件中的所有記錄"""
input = io.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name","age","hobby"])
return reader
def writeRecords(records):
"""寫出csv記錄"""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "age", "hobby"])
for record in records:
writer.writerow(record)
return [output.getvalue()]#getValue返回StringIO中的所有數(shù)據(jù)
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
fullFileData = sc.wholeTextFiles("persons.csv").flatMap(loadRecords) # 讀取csv文件中的數(shù)據(jù)
fullFileData.mapPartitions(writeRecords).saveAsTextFile("persons.out.csv")
查看輸出文件為:
eversilver@debian:/usr/local/spark2.1/mytest$ cat ./persons.out.csv/part-00000
wangcheng,24,basketball
jxiaolun,22,dance
tom,22,sell
SequenceFile
SequenceFile 是由沒有相對關(guān)系結(jié)構(gòu)的鍵值對文件組成的常用 Hadoop 格式。 SequenceFile文件有同步標(biāo)記芒率, Spark 可以用它來定位到文件中的某個點篙顺,然后再與記錄的邊界對齊德玫。這可以讓 Spark 使用多個節(jié)點高效地并行讀取 SequenceFile 文件宰僧。 SequenceFile 也是Hadoop MapReduce 作業(yè)中常用的輸入輸出格式。(即經(jīng)常見到的Writeable的鍵值對文件)嘁捷。
讀取SequenceFile:調(diào)用sequenceFile(path,keyClass, valueClass, minPartitions)
。前面提到過缓升, SequenceFile 使用 Writable 類蕴轨,因此 keyClass 和 valueClass 參數(shù)都必須使用正確的 Writable 類尺棋。下面使用python來讀取SequenceFile:
val data = sc.sequenceFile(inFile,
"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
保存SequenceFile:因為 SequenceFile 存儲的是鍵值對膘螟,所以需要創(chuàng)建一個由可以寫出到 SequenceFile 的類型構(gòu)成的 PairRDD荆残。如果你要寫出的是 Scala 的原生類型内斯,可以直接調(diào)用saveSequenceFile(path) 保存你的 PairRDD俘闯,它會幫你寫出數(shù)據(jù)。 如果鍵和值不能自動轉(zhuǎn)為 Writable 類型遮婶,或者想使用變長類型(比如VIntWritable)旗扑,就可以對數(shù)據(jù)進(jìn)行映射操作臀防,在保存之前進(jìn)行類型轉(zhuǎn)換琼锋。下面是Scala的例子:
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
對象文件
對象文件是對 SequenceFile 的簡單封裝缕坎,它允許存儲只包含值的 RDD谜叹。和SequenceFile 不一樣的是荷腊,對象文件是使用 Java 序列化寫出的。
Hadoop輸入輸出文件格式Spark也可以與任何 Hadoop 支持的格式交互疾忍。 Spark 支持新舊兩套Hadoop 文件 API一罩。
讀取其他Hadoop輸入格式:newAPIHadoopFile接收一個路徑以及三個類聂渊。第一個類是“格式”類,代表輸入格式饼暑。相似的函數(shù)hadoopFile() 則用于使用舊的 API 實現(xiàn)的 Hadoop 輸入格式撵孤。第二個類是鍵的類竭望,最后一個類是值的類邪码。如果需要設(shè)定額外的 Hadoop 配置屬性,也可以傳入一個 conf 對象咬清。例如下面這樣讀取KeyValueTextInputFormat (最簡單的 Hadoop 輸入格式):
//這里使用的是老式的API
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{
case (x, y) => (x.toString, y.toString)
}
下面使用了 Spark 中使用新式Hadoop API闭专,一個使用 Lzo JsonInputFormat 讀取 LZO 算法壓縮的 JSON 數(shù)據(jù)的例子
//scala
val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
// "輸入"中的每個MapWritable代表一個JSON對象
讀取其他Hadoop輸入格式: Java API 中沒有易用的保存 pair RDD 的函數(shù)奴潘。我們就把這種情況作為展示如何使用舊式 Hadoop 格式的 API 的例子。下面是如何在Java中保存SequenceFile:
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
SequenceFileOutputFormat.class);
非文件系統(tǒng)數(shù)據(jù)源:除 了 hadoopFile() 和 saveAsHadoopFile() 這 一 大 類 函 數(shù)影钉, 還 可 以 使 用 hadoopDataset/saveAsHadoopDataSet 和newAPIHadoopDataset/saveAsNewAPIHadoopDataset 來訪問 Hadoop 所支持的非文件系統(tǒng)的存儲格式画髓。hadoopDataset() 這一組函數(shù)只接收一個 Configuration 對象,這個對象用來設(shè)置訪問數(shù)據(jù)源所必需的 Hadoop 屬性平委。你要使用與配置 Hadoop MapReduce 作業(yè)相同的方式來配置這個對象。所以你應(yīng)當(dāng)按照在 MapReduce 中訪問這些數(shù)據(jù)源的使用說明來配置碉纳,并把配置對象傳給 Spark琅摩。
讀取Protocol Buffer:
PB 由可選字段、必需字段鳖擒、重復(fù)字段三種字段組成。在解析時,可選字段的缺失不會導(dǎo)致解析失敗,而必需字段的缺失則會導(dǎo)致數(shù)據(jù)解析失敗寺酪。因此陨献,在往 PB 定義中添加新字段時,最好將新字段設(shè)為可選字段,因為不是所有人都會同時更新到新版本。支持許多預(yù)定義類型, 或者另一個 PB 消息。這些類型包括 string喜鼓、 int32隅忿、 enum等。(https://developers.google.com/protocol-buffers有其資料)
下面為從一個簡單的 PB 格式中讀取許多 VenueResponse 對象。 VenueResponse
是只包含一個重復(fù)字段的簡單格式,這個字段包含一條帶有必需字段、可選字段以及枚舉類型字段的 PB 消息。
//PB的定義如下
message Venue {
required int32 id = 1;
required string name = 2;
required VenueType type = 3;
optional string address = 4;
enum VenueType {
COFFEESHOP = 0;
WORKPLACE = 1;
CLUB = 2;
OMNOMNOM = 3;
OTHER = 4;
}
}
message VenueResponse {
repeated Venue results = 1;
}
//在 Scala 中使用 Elephant Bird 寫出 protocol buffer
val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text],
classOf[ProtobufWritable[Places.Venue]],
classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
文件壓縮
在大數(shù)據(jù)工作中艳吠,我們經(jīng)常需要對數(shù)據(jù)進(jìn)行壓縮以節(jié)省存儲空間和網(wǎng)絡(luò)傳輸開銷栏渺。對于大多數(shù) Hadoop 輸出格式來說霎终,我們可以指定一種壓縮編解碼器來壓縮數(shù)據(jù)。我們已經(jīng)提過,Spark 原生的輸入方式(textFile 和 sequenceFile)可以自動處理一些類型的壓縮。在讀取壓縮后的數(shù)據(jù)時智末,一些壓縮編解碼器可以推測壓縮類型,下面是不同的壓縮選項帶來的影響:
文件系統(tǒng):
本地文件系統(tǒng):Spark 支持從本地文件系統(tǒng)中讀取文件裳擎,不過它要求文件在集群中所有節(jié)點的相同路徑下
都可以找到抢野。
val rdd = sc.textFile("file:///home/holden/happypandas.gz")
Amazon S3:用 Amazon S3 來存儲大量數(shù)據(jù)正日益流行黎做。當(dāng)計算節(jié)點部署在 Amazon EC2 上的時候,使用 S3 作為存儲尤其快,但是在需要通過公網(wǎng)訪問數(shù)據(jù)時性能會差很多.在 Spark 中訪問 S3 數(shù)據(jù)霞玄,你應(yīng)該首先把你的 S3 訪問憑據(jù)設(shè)置為 AWS_ACCESS_KEY_ID 和AWS_SECRET_ACCESS_KEY 環(huán)境變量咨跌。你可以從 Amazon Web Service 控制臺創(chuàng)建這些憑據(jù)州胳。接下來,將一個以 s3n:// 開頭的路徑以 s3n://bucket/path-within-bucket 的形式傳給Spark 的輸入方法角塑。 和其他所有文件系統(tǒng)一樣欺劳, Spark 也能在 S3 路徑中支持通字符插爹,例如 s3n://bucket/myFiles/*.txt超全。如果你從 Amazon 那里得到 S3 訪問權(quán)限錯誤倘零,請確保你指定了訪問密鑰的賬號對數(shù)據(jù)桶有“read”(讀)和“l(fā)ist”(列表)的權(quán)限。 Spark 需要列出桶內(nèi)的內(nèi)容砌庄,來找到想要讀取的數(shù)據(jù)。
HDFS :在 Spark 中使用 HDFS 只需要將輸入輸出路徑指定為hdfs://master:port/path 就夠了撼玄。注意:HDFS 協(xié)議隨 Hadoop 版本改變而變化斋配,因此如果你使用的 Spark 是依賴于另一個版本的 Hadoop 編譯的狮崩,那么讀取會失敗蘸炸。默認(rèn)情況下铃岔, Spark 基于Hadoop 1.0.4 編譯 7。如果從源代碼編譯丹弱,你可以在環(huán)境變量中指定 SPARK_HADOOP_VERSION= 來基于另一個版本的 Hadoop 進(jìn)行編譯嘿棘;也可以直接下載預(yù)編譯好的 Spark 版本。你可以根據(jù)運行 hadoop version 的結(jié)果來獲得環(huán)境變量要設(shè)置的值汹胃。
Spark SQL結(jié)構(gòu)化數(shù)據(jù)(非數(shù)據(jù)庫)
結(jié)構(gòu)化數(shù)據(jù):結(jié)構(gòu)化數(shù)據(jù)指的是有結(jié)構(gòu)信息的數(shù)據(jù)——也就是所有的數(shù)
據(jù)記錄都具有一致字段結(jié)構(gòu)的集合胯努。在 Java 和 Scala 中, Row 對象的訪問是基于下標(biāo)的坯苹。每個 Row 都有一個get() 方法隆檀,會返回一個一般類型讓我們可以進(jìn)行類型轉(zhuǎn)換。另外還有針對常見基本類型的 專 用 get() 方 法( 例 如 getFloat()、 getInt()恐仑、 getLong()泉坐、 getString()、 getShort()裳仆、getBoolean() 等)腕让。在 Python 中,可以使用 row[column_number] 以及 row.column_name 來訪問元素歧斟。
Hive:Apache Hive 是 Hadoop 上的一種常見的結(jié)構(gòu)化數(shù)據(jù)源纯丸。可以在 HDFS 內(nèi)或者在其他存儲系統(tǒng)上存儲多種格式的表静袖。 這些格式從普通文本到列式存儲格式觉鼻,應(yīng)有盡有。 SparkSQL 可以讀取 Hive 支持的任何表队橙。
JSON:要讀取 JSON 數(shù)據(jù)坠陈,首先需要和使用 Hive 一樣創(chuàng)建一個HiveContext。然后使用 HiveContext.jsonFile 方法來從整個文件中獲取由 Row 對象組成的RDD捐康。除了使用整個 Row 對象仇矾,你也可以將 RDD注冊為一張表,然后從中選出特定的字段吹由。例如若未,假設(shè)有一個包含推文的 JSON 文件,格式如例 5-33 所示倾鲫,每行一條記錄粗合。
數(shù)據(jù)庫
Java數(shù)據(jù)庫連接:Spark 可 以 從 任 何 支 持 Java 數(shù) 據(jù) 庫 連 接(JDBC) 的 關(guān) 系 型 數(shù) 據(jù) 庫 中 讀 取 數(shù) 據(jù), 包括 MySQL乌昔、 Postgre 等系統(tǒng)隙疚。要訪問這些數(shù)據(jù),需要構(gòu)建一個 org.apache.spark.rdd.JdbcRDD磕道,將 SparkContext 和其他參數(shù)一起傳給它供屉。
? 首先,要提供一個用于對數(shù)據(jù)庫創(chuàng)建連接的函數(shù)溺蕉。這個函數(shù)讓每個節(jié)點在連接必要的配
置后創(chuàng)建自己讀取數(shù)據(jù)的連接伶丐。
? 接下來,要提供一個可以讀取一定范圍內(nèi)數(shù)據(jù)的查詢疯特,以及查詢參數(shù)中 lowerBound 和
upperBound 的值哗魂。這些參數(shù)可以讓 Spark 在不同機(jī)器上查詢不同范圍的數(shù)據(jù),這樣就不
會因嘗試在一個節(jié)點上讀取所有數(shù)據(jù)而遭遇性能瓶頸漓雅。 8
? 這個函數(shù)的最后一個參數(shù)是一個可以將輸出結(jié)果從 java.sql.ResultSet(http://docs.
oracle.com/javase/7/docs/api/java/sql/ResultSet.html)轉(zhuǎn)為對操作數(shù)據(jù)有用的格式的函數(shù)录别。
在例 5-37 中朽色, 我們會得到 (Int, String) 對。如果這個參數(shù)空缺组题, Spark 會自動將每行
結(jié)果轉(zhuǎn)為一個對象數(shù)組葫男。
//下面是一個Scala的例子
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
Cassandra:Spark 的 Cassandra 連接器目前只能在 Java 和 Scala 中使用。
//配置Maven依賴
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector</artifactId>
<version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java</artifactId>
<version>1.0.0-rc5</version>
</dependency>
//配置Cassandra屬性
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "hostname")
val sc = new SparkContext(conf)
//在 Scala 中將整張鍵值對表讀取為 RDD
// 為SparkContext和RDD提供附加函數(shù)的隱式轉(zhuǎn)換
import com.datastax.spark.connector._
// 將整張表讀為一個RDD崔列。假設(shè)你的表test的創(chuàng)建語句為
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
val data = sc.cassandraTable("test" , "kv")
// 打印出value字段的一些基本統(tǒng)計梢褐。
data.map(row => row.getInt("value")).stats()
HBase: Spark 可 以 通 過Hadoop 輸入格式訪問 HBase。 這個輸入格式會返回鍵值對數(shù)據(jù)赵讯,其中鍵的類型為 org.apache.hadoop.hbase.io.ImmutableBytesWritable利职,而值的類型為org.apache.hadoop.hbase.client.Result。 Result 類包含多種根據(jù)列獲取值的方法瘦癌,在其 API 文檔(https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html)中有所描述。
//從 HBase 讀取數(shù)據(jù)的 Scala 示例
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 掃描哪張表
val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])
TableInputFormat 包含多個可以用來優(yōu)化對 HBase 的讀取的設(shè)置項跷敬,比如將掃描限制到
一部分列中讯私, 以及限制掃描的時間范圍。你可以在 TableInputFormat 的 API 文檔(http://
hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html)中找到
這些選項西傀,并在 HBaseConfiguration 中設(shè)置它們斤寇,然后再把它傳給 Spark。
Elasticsearch:Elasticsearch 是一個開源的拥褂、基于 Lucene 的搜索系統(tǒng)娘锁。