轉(zhuǎn)載請(qǐng)注明出處霜医,謝謝合作~
該篇中的示例暫時(shí)只有 Scala 版本~
數(shù)據(jù)源
Spark SQL 支持通過(guò) DataFrame 接口操作多種數(shù)據(jù)源枷踏。一個(gè) DataFrame 可以通過(guò)關(guān)系型轉(zhuǎn)換算子操作繁仁,還可以用來(lái)創(chuàng)建一個(gè)臨時(shí)視圖。將 DataFrame 注冊(cè)成一個(gè)臨時(shí)視圖之后就可以通過(guò) SQL 語(yǔ)句進(jìn)行查詢桶雀。本章節(jié)介紹了使用 Spark 數(shù)據(jù)源加載和存儲(chǔ)數(shù)據(jù)的常用方法矿酵,之后給出內(nèi)置數(shù)據(jù)源需要的不同的參數(shù)。
- 常用讀寫(xiě)函數(shù)(Generic Load/Save Functions)
- 手動(dòng)指定參數(shù)(Manually Specifying Options)
- 直接通過(guò) SQL 操作文件(Run SQL on files directly)
- 存儲(chǔ)模式(Save Modes)
- 持久化數(shù)據(jù)表(Saving to Persistent Tables)
- 分桶矗积,排序和分區(qū)(Bucketing, Sorting and Partitioning)
- 常用的文件數(shù)據(jù)源選項(xiàng)(Generic File Source Options)
- 忽略損壞的文件(Ignore Corrupt Files)
- 忽略丟失的文件(Ignore Missing Files)
- 全局文件過(guò)濾器(Path Global Filter)
- 遞歸文件搜索(Recursive File Lookup)
- Parquet 文件(Parquet Files)
- 通過(guò)程序加載數(shù)據(jù)(Loading Data Programmatically)
- 分區(qū)發(fā)現(xiàn)(Partition Discovery)
- 模式融合(Schema Merging)
- Hive 元數(shù)據(jù) Parquet 表轉(zhuǎn)換(Hive metastore Parquet table conversion)
- 配置項(xiàng)(Configuration)
- ORC 文件(ORC Files)
- JSON 文件(JSON Files)
- Hive 表(Hive Tables)
- 指定 Hive 表的存儲(chǔ)格式(Specifying storage format for Hive tables)
- 與不同版本的 Hive Metastore 交互(Interacting with Different Versions of Hive Metastore)
- JDBC 連接其他數(shù)據(jù)庫(kù)(JDBC To Other Databases)
- Avro 文件(Avro Files)
- 部署(Deploying)
- 讀寫(xiě)函數(shù)(Load and Save Functions)
- to_avro 和 from_avro(to_avro() and from_avro())
- 數(shù)據(jù)源選項(xiàng)(Data Source Option)
- 配置項(xiàng)(Configuration)
- 與 Databricks spark-avro 的兼容(Compatibility with Databricks spark-avro)
- 支持的數(shù)據(jù)類型 Avro -> Spark SQL(Supported types for Avro -> Spark SQL conversion)
- 支持的數(shù)據(jù)類型 Spark SQL -> Avro(Supported types for Spark SQL -> Avro conversion)
- 二進(jìn)制文件(Whole Binary Files)
- 疑難解答(Troubleshooting)
常用讀寫(xiě)函數(shù)
最簡(jiǎn)單的情況全肮,默認(rèn)的數(shù)據(jù)源(parquet
除非配置了參數(shù) spark.sql.sources.default
)將會(huì)被使用。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」棘捣。
手動(dòng)指定參數(shù)
可以通過(guò)手動(dòng)指定數(shù)據(jù)源參數(shù)和相關(guān)的額外選項(xiàng)來(lái)創(chuàng)建 DataFrame辜腺。數(shù)據(jù)源通過(guò)全限定類名來(lái)指定(比如 org.apache.spark.sql.parquet
),但是對(duì)于內(nèi)置的數(shù)據(jù)源可以使用簡(jiǎn)稱(json
, parquet
, jdbc
, orc
, libsvm
, csv
, text
)。從任意數(shù)據(jù)源類型中加載的數(shù)據(jù)可以輕松的轉(zhuǎn)換成其他支持的格式评疗。
可以通過(guò) API 文檔了解內(nèi)置數(shù)據(jù)源的所有選項(xiàng)测砂,例如 org.apache.spark.sql.DataFrameReader
和 org.apache.spark.sql.DataFrameWriter
。文檔中的選項(xiàng)對(duì)于非 Scala 語(yǔ)言的 Spark API(比如 PySpark)也適用百匆。對(duì)于其他的格式砌些,參見(jiàn)相應(yīng)的 API 文檔。
加載 JSON 文件:
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」加匈。
加載 CSV 文件:
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」存璃。
額外的選項(xiàng)對(duì)于寫(xiě)出操作也適用。例如雕拼,對(duì)于 ORC 文件可以通過(guò)選項(xiàng)控制布隆過(guò)濾器和字典編碼纵东。下面的 ORC 樣例將會(huì)創(chuàng)建布隆過(guò)濾器并對(duì) favorite_color
列使用字典編碼。對(duì)于 Parquet 文件啥寇,也有一個(gè) parquet.enable.dictionary
的選項(xiàng)偎球。有關(guān) ORC/Parquet 數(shù)據(jù)格式選項(xiàng)的更多詳情請(qǐng)移步 Apache ORC/Parquet 官方網(wǎng)站。
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」辑甜。
直接通過(guò) SQL 操作文件
除了使用 read API 加載文件成為一個(gè) DataFrame衰絮,還可以通過(guò) SQL 直接查詢。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」栈戳。
存儲(chǔ)模式
存儲(chǔ)操作有一個(gè)可選的 SaveMode
類型的參數(shù)岂傲,該參數(shù)指定如何處理存儲(chǔ)路徑中已經(jīng)存在的數(shù)據(jù)难裆。需要注意的是存儲(chǔ)模式并不加鎖子檀,也不是原子性的。此外乃戈,當(dāng)存儲(chǔ)模式設(shè)置為 Overwrite
時(shí)褂痰,在寫(xiě)入新數(shù)據(jù)之前會(huì)刪除舊數(shù)據(jù)。
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" or "errorifexists" (default) |
如果輸出路徑中的數(shù)據(jù)已存在症虑,會(huì)拋出異常缩歪。 |
SaveMode.Append |
"append" |
如果輸出路徑中的數(shù)據(jù)已存在,寫(xiě)出的數(shù)據(jù)會(huì)被追加到現(xiàn)有數(shù)據(jù)之后谍憔。 |
SaveMode.Overwrite |
"overwrite" |
如果輸出路徑中的數(shù)據(jù)已存在匪蝙,會(huì)被新寫(xiě)出的數(shù)據(jù)替換。 |
SaveMode.Ignore |
"ignore" |
如果輸出路徑中的數(shù)據(jù)已存在习贫,新寫(xiě)出的數(shù)據(jù)不會(huì)被存儲(chǔ)逛球,現(xiàn)有的數(shù)據(jù)也不會(huì)被更新。類似于 SQL 中的 CREATE TABLE IF NOT EXISTS 苫昌。 |
持久化數(shù)據(jù)表
DataFrames
還可以通過(guò) saveAsTable
方法存儲(chǔ)到 Hive 中颤绕。注意,并不需要事先安裝好 Hive,Spark 會(huì)通過(guò) Derby 創(chuàng)建一個(gè)默認(rèn)的本地 Hive 元數(shù)據(jù)庫(kù)奥务。跟 createOrReplaceTempView
方法不同的是物独,saveAsTable
方法會(huì)將 DataFrame 中的數(shù)據(jù)落盤(pán)并在 Hive 元數(shù)據(jù)庫(kù)中創(chuàng)建一條描述存儲(chǔ)信息的記錄。只要能夠訪問(wèn)元數(shù)據(jù)庫(kù)氯葬,持久化后的數(shù)據(jù)表在 Spark 應(yīng)用程序重啟之后依舊可以訪問(wèn)挡篓。在持久化數(shù)據(jù)表時(shí)可以通過(guò)調(diào)用 SparkSession
的 table
方法指定表名稱。
基于文件的數(shù)據(jù)源帚称,比如 text瞻凤,parquet,json 等等世杀,可以通過(guò) path
選項(xiàng)指定存儲(chǔ)路徑阀参,例如 df.write.option("path", "/some/path").saveAsTable("t")
。當(dāng)數(shù)據(jù)表被刪除后瞻坝,自定義存儲(chǔ)路徑中的數(shù)據(jù)不會(huì)被刪除蛛壳。如果不指定存儲(chǔ)路徑,Spark 會(huì)將數(shù)據(jù)寫(xiě)出到 warehouse 文件夾所刀,這種情況下如果數(shù)據(jù)表被刪除衙荐,數(shù)據(jù)也會(huì)被刪除。
從 Spark 2.1 開(kāi)始浮创,持久化的數(shù)據(jù)表的每個(gè)分區(qū)都會(huì)在 Hive 元數(shù)據(jù)庫(kù)中有相關(guān)記錄忧吟,這種方式帶來(lái)一些優(yōu)化:
- 對(duì)于查詢?cè)獢?shù)據(jù)庫(kù)可以只返回所需要分區(qū)的信息,在第一次查詢時(shí)不再需要加載所有的分區(qū)斩披。
- Datasource API 支持了像
ALTER TABLE PARTITION ... SET LOCATION
這樣的 Hive DDL 語(yǔ)句溜族。
注意,在創(chuàng)建外部數(shù)據(jù)表(自定義了 path
選項(xiàng))時(shí)垦沉,默認(rèn)情況下分區(qū)信息并沒(méi)有被收集煌抒,同步元數(shù)據(jù)庫(kù)中的分區(qū)信息需要執(zhí)行 MSCK REPAIR TABLE
語(yǔ)句。
分桶厕倍,排序和分區(qū)
對(duì)于基于文件的數(shù)據(jù)源寡壮,還可以對(duì)輸出進(jìn)行分桶凹联,排序和分區(qū)采桃。分桶和排序只適用于通過(guò) saveAsTable
持久化數(shù)據(jù)表:
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。
但是分區(qū)操作對(duì)于 save
和 saveAsTable
都適用:
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」属瓣。
可以同時(shí)在一張表中適用分桶和分區(qū):
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」组民。
就像 Partition Discovery 章節(jié)描述的那樣棒仍,partitionBy
操作會(huì)創(chuàng)建一個(gè)目錄結(jié)構(gòu)。所以邪乍,以很大基數(shù)的字段分區(qū)不會(huì)是一個(gè)明智的選擇降狠。相反对竣,bucketBy
操作將數(shù)據(jù)按照固定數(shù)量分桶,適用于數(shù)據(jù)技基數(shù)很大的場(chǎng)景榜配。
常用的文件數(shù)據(jù)源選項(xiàng)
這些常用的選項(xiàng)只在適用文件類型的數(shù)據(jù)源時(shí)有效:parquet, orc, avro, json, csv, text否纬。
注意在示例中使用的目錄結(jié)構(gòu)如下:
dir1/
├── dir2/
│ └── file2.parquet (schema: <file: string>, content: "file2.parquet")
└── file1.parquet (schema: <file, string>, content: "file1.parquet")
└── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")
忽略損壞的文件
在從文件讀取數(shù)據(jù)的過(guò)程中可以通過(guò)設(shè)置參數(shù) spark.sql.files.ignoreCorruptFiles
忽略損壞的文件,當(dāng)設(shè)置為 true 時(shí)蛋褥,Spark 作業(yè)在遇到損壞的文件的時(shí)候會(huì)繼續(xù)運(yùn)行临燃,已經(jīng)被讀取的部分依舊有效。
// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」烙心。
忽略丟失的文件
在從文件讀取數(shù)據(jù)的過(guò)程中可以通過(guò)設(shè)置參數(shù) spark.sql.files.ignoreMissingFiles
忽略丟失的文件膜廊。在這里丟失的文件是指在創(chuàng)建好 DataFrame 之后刪除了的文件。當(dāng)設(shè)置為 true 時(shí)淫茵,Spark 作業(yè)在遇到丟失的文件的時(shí)候會(huì)繼續(xù)運(yùn)行爪瓜,已經(jīng)被讀取的部分依舊有效。
全局文件過(guò)濾器
選項(xiàng) pathGlobFilter
用來(lái)只讀取匹配目標(biāo)模式的文件匙瘪,語(yǔ)法與 org.apache.hadoop.fs.GlobFilter
相同铆铆,過(guò)濾操作不會(huì)改變分區(qū)發(fā)現(xiàn)的行為。
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」丹喻。
遞歸文件搜索
選項(xiàng) recursiveFileLookup
用來(lái)遞歸查找目標(biāo)路徑下的匹配文件薄货,但是會(huì)關(guān)閉分區(qū)推斷機(jī)制,默認(rèn)值為 false
碍论。如果在選項(xiàng) recursiveFileLookup
為 true 時(shí)顯示指定了 partitionSpec
選項(xiàng)谅猾,會(huì)拋出異常。
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」鳍悠。
Parquet 文件
Parquet 是一種被許多數(shù)據(jù)處理系統(tǒng)支持的列式存儲(chǔ)格式税娜。Spark SQL 提供了對(duì) Parquet 文件讀寫(xiě)的支持,文件中會(huì)保存原始數(shù)據(jù)的模式贼涩。當(dāng)讀取 Parquet 文件時(shí)巧涧,處于兼容性的考慮所有的列被自動(dòng)轉(zhuǎn)換為 nullable 類型。
通過(guò)程序加載數(shù)據(jù)
采用上述示例中的數(shù)據(jù):
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」遥倦。
分區(qū)發(fā)現(xiàn)
數(shù)據(jù)分區(qū)是一種常見(jiàn)的優(yōu)化方式,比如在 Hive 中就是如此占锯。對(duì)于分區(qū)表袒哥,數(shù)據(jù)通常存儲(chǔ)在不同的目錄,分區(qū)鍵的值會(huì)被編碼到分區(qū)目錄的名稱中消略。所有內(nèi)置的基于文件的數(shù)據(jù)源(Text/CSV/JSON/ORC/Parquet)都能夠自動(dòng)的發(fā)現(xiàn)和推斷分區(qū)信息堡称。例如,可以將之前用到的人口數(shù)據(jù)采用下面的目錄結(jié)構(gòu)存儲(chǔ)到分區(qū)表中艺演,分區(qū)鍵是 gender
和 country
兩列:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
將目錄 path/to/table
傳遞給 SparkSession.read.parquet
或者 SparkSession.read.load
方法却紧,Spark SQL 會(huì)從目錄中自動(dòng)提取分區(qū)信息桐臊,于是 DataFrame 的模式變成了:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
注意,分區(qū)鍵的數(shù)據(jù)類型是自動(dòng)推斷的晓殊,目前支持的有數(shù)字類型断凶,日期類型,時(shí)間戳類型和字符串類型巫俺。有事用戶不希望自動(dòng)推斷分區(qū)鍵的數(shù)據(jù)類型认烁,對(duì)于此類需求,分區(qū)鍵類型推斷可以由參數(shù) spark.sql.sources.partitionColumnTypeInference.enabled
控制介汹,默認(rèn)值為 true
却嗡。當(dāng)類型推薦被禁用時(shí),分區(qū)鍵會(huì)被指定為字符串類型嘹承。
從 Spark 1.6.0 開(kāi)始窗价,默認(rèn)情況下分區(qū)發(fā)現(xiàn)功能只會(huì)尋找指定路徑下的分區(qū)。對(duì)于上述示例叹卷,如果用戶將路徑 path/to/table/gender=male
傳遞給 SparkSession.read.parquet
或者 SparkSession.read.load
方法舌镶,gender
不會(huì)被當(dāng)做一個(gè)分區(qū)鍵。如果用戶需要指定發(fā)現(xiàn)分區(qū)的根目錄豪娜,可以設(shè)置 basePath
選項(xiàng)餐胀。例如,當(dāng)給定路徑為 path/to/table/gender=male
而選項(xiàng) basePath
設(shè)置為 path/to/table/
時(shí)瘤载,gender
也會(huì)被當(dāng)做一個(gè)分區(qū)鍵否灾。
模式融合
像 Protocol Buffer,Avro 和 Thrift 一樣鸣奔,Parquet 也支持模式演化墨技。用戶可以初始定義一個(gè)簡(jiǎn)單的模式,之后在需要的時(shí)候增加列挎狸。如此一來(lái)扣汪,可能最終會(huì)有多個(gè)模式不同但又相互兼容的 Parquet 文件。Parquet 數(shù)據(jù)源現(xiàn)在已經(jīng)能夠自動(dòng)檢測(cè)這種情況锨匆,并在需要時(shí)融合多個(gè)文件的模式崭别。
由于模式融合是一個(gè)開(kāi)銷較大的操作,而且在大多數(shù)情況下是不需要的恐锣,自 Spark 1.5.0 以來(lái)該功能默認(rèn)情況下時(shí)關(guān)閉的茅主,可以通過(guò)以下兩種方式開(kāi)啟:
- 在讀取 Parquet 文件時(shí)設(shè)置數(shù)據(jù)源選項(xiàng)
mergeSchema
為true
。 - 設(shè)置全局的 SQL 參數(shù)
spark.sql.parquet.mergeSchema
為true
土榴。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」诀姚。
Hive 元數(shù)據(jù) Parquet 表轉(zhuǎn)換
當(dāng)從 Hive 中讀取 Parquet 格式的表和向 Hive 中寫(xiě)入不分區(qū)的 Parquet 表時(shí),處于性能的考量 Spark SQL 會(huì)嘗試使用自己的 Parquet 解析器而不是 Hive 的 SerDe 類庫(kù)玷禽。這個(gè)機(jī)制由參數(shù)spark.sql.hive.convertMetastoreParquet` 控制赫段,默認(rèn)開(kāi)啟呀打。
Hive/Parquet 模式調(diào)和
從模式解析的角度來(lái)看,Hive 和 Parquet 之間有兩個(gè)重要的不同之處糯笙。
- Hive 是大小寫(xiě)不敏感的贬丛,而 Parquet 大小寫(xiě)敏感。
- Hive 中的所有列都默認(rèn)是 nullable炬丸,但是空值在 Parquet 中是有意義的瘫寝。
因此,當(dāng)把一個(gè) Hive 中的 Parquet 表轉(zhuǎn)換為 Spark SQL 中的 Parquet 表時(shí)必須調(diào)和兩者模式之間的差異稠炬。規(guī)則如下:
- 無(wú)論是否可為空值焕阿,兩者模式中相同名稱的字段必須擁有相同的數(shù)據(jù)類型。為了滿足可為空值的條件首启,被調(diào)和的字段應(yīng)該具有 Parquet 端的數(shù)據(jù)類型暮屡。
- 被調(diào)和的模式需要完全包含 Hive 模式中的字段。
- 在被調(diào)和的模式中毅桃,只出現(xiàn)在 Parquet 端的字段會(huì)被刪除褒纲。Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
- 在被調(diào)和的模式中,只出現(xiàn)在 Hive 模式中的字段會(huì)被添加為一個(gè) nullable 字段钥飞。
刷新元數(shù)據(jù)
為了更好的性能莺掠,Spark SQL 會(huì)緩存 Parquet 表的元數(shù)據(jù)。開(kāi)啟 Hive 元數(shù)據(jù) Parquet 表轉(zhuǎn)換之后读宙,被轉(zhuǎn)換的的表的元數(shù)據(jù)也會(huì)被緩存彻秆。如果這些表被 Hive 或者外部工具更新,需要手動(dòng)刷新來(lái)保持元數(shù)據(jù)的一致性结闸。
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
配置項(xiàng)
Parquet 的配置項(xiàng)可以通過(guò) SparkSession
的 setConf
方法或者使用 SQL 語(yǔ)法 SET key=value
來(lái)設(shè)置唇兑。
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.sql.parquet.binaryAsString |
false | 一些處理 Parquet 文件的計(jì)算引擎,尤其是 Impala桦锄,HIve 和早期版本的 Spark SQL扎附,在寫(xiě)出數(shù)據(jù)時(shí)并不區(qū)分二進(jìn)制類型和自負(fù)串類型。該參數(shù)告訴 Spark SQL 將二進(jìn)制數(shù)據(jù)解析為字符串來(lái)與那些系統(tǒng)保持兼容结耀。 | 1.1.1 |
spark.sql.parquet.int96AsTimestamp |
true | 一些處理 Parquet 文件的計(jì)算引擎留夜,尤其是 Impala 和 HIve,把時(shí)間戳存儲(chǔ)為 INT96 類型饼记。該參數(shù)告訴 Spark SQL 將 INT96 數(shù)據(jù)解析為時(shí)間戳來(lái)與那些系統(tǒng)保持兼容香伴。 | 1.3.0 |
spark.sql.parquet.compression.codec |
snappy | 寫(xiě)出 Parquet 文件時(shí)使用的壓縮格式。如果在表屬性中同時(shí)設(shè)置了 compression 和 parquet.compression 屬性具则,則配置生效的優(yōu)先級(jí)從高到低依次為 compression , parquet.compression , spark.sql.parquet.compression.codec 【甙铮可選的壓縮格式有:snappy, gzip, lzo, brotli, lz4, zstd博肋。注意低斋,在 Hadoop 2.9.0 之前 zstd 格式需要事先安裝 ZStandardCodec ,brotli 格式需要事先安裝 BrotliCodec 匪凡。 |
1.1.1 |
spark.sql.parquet.filterPushdown |
true | 是否開(kāi)啟 Parquet 謂詞下推的優(yōu)化機(jī)制膊畴。 | 1.2.0 |
spark.sql.hive.convertMetastoreParquet |
true | 設(shè)置為 false 時(shí),Spark SQL 會(huì)使用 Hive SerDe 解析 Parquet 文件而不是使用內(nèi)置的解析器病游。 | 1.1.1 |
spark.sql.parquet.mergeSchema |
false | 設(shè)置為 true 時(shí)唇跨,Parquet 數(shù)據(jù)源會(huì)融合所有數(shù)據(jù)文件的模式,否則模式將會(huì)從概要文件中推斷衬衬,如果概要文件不存在买猖,就從隨機(jī)數(shù)據(jù)文件中推斷。 | 1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果設(shè)置為 true滋尉,數(shù)據(jù)將會(huì)以 Spark 1.4 之前的格式寫(xiě)出玉控。例如,小數(shù)格式的數(shù)據(jù)將會(huì)以定長(zhǎng)字節(jié)數(shù)組的格式寫(xiě)出狮惜,也是 Hive 和 Impala 的方式高诺。如果寫(xiě)出失敗,那么將會(huì)使用新的數(shù)據(jù)格式碾篡。例如虱而,小數(shù)格式的數(shù)據(jù)會(huì)以 int 類型的格式寫(xiě)出。如果寫(xiě)出的 Parquet 文件是給不支持新格式的系統(tǒng)所用开泽,請(qǐng)將該參數(shù)設(shè)置為 true牡拇。 | 1.6.0 |
ORC 文件
從 Spark 2.3 開(kāi)始,Spark 支持以向量化的方式讀取 ORC 文件眼姐,不過(guò)需要新增幾個(gè)配置項(xiàng)诅迷。對(duì)于原生 ORC 數(shù)據(jù)表(使用 USING ORC
語(yǔ)句創(chuàng)建的數(shù)據(jù)表),需要設(shè)置參數(shù) spark.sql.orc.impl
為 native
众旗,設(shè)置參數(shù) spark.sql.orc.enableVectorizedReader
為 true
來(lái)開(kāi)啟向量化讀取罢杉。對(duì)于 Hive ORC serde 數(shù)據(jù)表(使用 USING HIVE OPTIONS (fileFormat 'ORC')
語(yǔ)句創(chuàng)建的數(shù)據(jù)表),需要設(shè)置參數(shù) spark.sql.hive.convertMetastoreOrc
為 true
來(lái)開(kāi)啟向量化讀取
Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause USING ORC
) when spark.sql.orc.impl
is set to native
and spark.sql.orc.enableVectorizedReader
is set to true
. For the Hive ORC serde tables (e.g., the ones created using the clause USING HIVE OPTIONS (fileFormat 'ORC')
), the vectorized reader is used when spark.sql.hive.convertMetastoreOrc
is also set to true
.
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.sql.orc.impl |
native |
ORC 的實(shí)現(xiàn)模式贡歧√沧猓可以是 native 和 hive 。native 表示原生 ORC利朵, hive 表示 Hive 中的 ORC 類庫(kù)律想。 |
2.3.0 |
spark.sql.orc.enableVectorizedReader |
true |
在 native 模式下開(kāi)啟向量化讀取 ORC 文件。如果設(shè)置為 false 绍弟,會(huì)使用非向量化的方式讀取 ORC 文件技即。對(duì)于 hive 模式,該配置被忽略樟遣。 |
2.3.0 |
JSON 文件
Spark SQL 可以自動(dòng)推斷 JSON 文件的模式并將其加載為一個(gè) Dataset[Row]
而叼∩眢裕可以使用方法讀取一個(gè) Dataset[String]
或是 JSON 文件。
注意所葵陵,讀取的 JSON 文件不是經(jīng)典的格式 JSON 文件液荸。文件中每一行必須是一個(gè)獨(dú)立的、自包含的脱篙、有效的 JSON 對(duì)象娇钱。詳情參見(jiàn) JSON Lines text format, also called newline-delimited JSON。
對(duì)于常見(jiàn)的多行 JSON 文件绊困,請(qǐng)?jiān)O(shè)置選項(xiàng) multiLine
為 true
文搂。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。
Hive 表
Spark SQL 也支持讀寫(xiě)存儲(chǔ)在 Hive(Apache Hive)中的數(shù)據(jù)考抄。然而细疚,由于 Hive 有很多很多依賴,這些依賴默認(rèn)沒(méi)有包含在 Spark 發(fā)布的版本中川梅。如果 Hive 依賴可以再 classpath 中找到疯兼,Spark 會(huì)自動(dòng)加載它們。注意這些 Hive 依賴需要同時(shí)在所有的的 worker 節(jié)點(diǎn)上贫途,為了方位存儲(chǔ)在 Hive 中的數(shù)據(jù)需要用到 Hive 的序列化和反序列化類庫(kù)(SerDes)吧彪。
需要將 Hive 的配置文件 hive-site.xml
, core-site.xml
(其中的安全配置)和 hdfs-site.xml
(HDFS 配置) 放到 conf/
目錄下丢早。
集成 Hive 功能姨裸,必須在初始化 SparkSession
時(shí)配置開(kāi)啟 Hive 支持,包括一個(gè)跟 Hive metastore 的長(zhǎng)連接怨酝,對(duì) Hive serdes 的支持傀缩,和 Hive UDF。如果沒(méi)有安裝 Hive 也可以開(kāi)啟 Hive 支持农猬,若沒(méi)有配置 hive-site.xml
赡艰,Spark 會(huì)在當(dāng)前目錄自動(dòng)創(chuàng)建一個(gè) metastore_db
文件和一個(gè)由參數(shù) spark.sql.warehouse.dir
指定的目錄,默認(rèn)值是當(dāng)前目錄下的 spark-warehouse
斤葱,這里所說(shuō)的當(dāng)前目錄是指 Spark 應(yīng)用程序啟動(dòng)的目錄慷垮。注意從 Spark 2.0.0 開(kāi)始,hive-site.xml
文件中的 hive.metastore.warehouse.dir
配置項(xiàng)已經(jīng)被標(biāo)記為棄用揍堕,目前使用的是參數(shù) spark.sql.warehouse.dir
指定數(shù)據(jù)倉(cāng)庫(kù)的位置料身,啟動(dòng)程序的用戶必須對(duì)該目錄擁有寫(xiě)權(quán)限。
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala」衩茸。
指定 Hive 表的存儲(chǔ)格式
創(chuàng)建一個(gè) Hive 表時(shí)芹血,需要定義這個(gè)表該如何從文件系統(tǒng)中讀取以及寫(xiě)入,即「input format」和「output format」。還需要定義這個(gè)表應(yīng)該如何進(jìn)行正反序列化的操作祟牲,即「serde」隙畜。下面的選項(xiàng)可以用來(lái)指定存儲(chǔ)格式(「serde」抖部,「input format」说贝,「output format」),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
慎颗。默認(rèn)情況下乡恕,表數(shù)據(jù)將會(huì)以文本的格式讀取。注意俯萎,目前在創(chuàng)建表時(shí)還不支持 Hive storage handler傲宜,你可以在 Hive 中適用 Hive storage handler 創(chuàng)建表,然后通過(guò) Spark SQL 來(lái)讀取夫啊。
Property Name | Meaning |
---|---|
fileFormat |
指定數(shù)據(jù)存儲(chǔ)的文件格式函卒,包括「serde」,「input format」撇眯,「output format」报嵌。目前支持 6 種文件格式:'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'。 |
inputFormat, outputFormat |
這兩個(gè)選項(xiàng)指定 InputFormat 和 OutputFormat 相應(yīng)的全限定類名熊榛,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 锚国。這兩個(gè)選項(xiàng)必須成對(duì)出現(xiàn),而且不能在設(shè)置了 fileFormat 選項(xiàng)的情況下再設(shè)置玄坦。 |
serde |
該選項(xiàng)指定 serde 的全限定類名血筑。在選項(xiàng) fileFormat 被指定的情況下,如果其中已經(jīng)包含了 serde 信息煎楣,那么請(qǐng)不要設(shè)置該選項(xiàng)豺总。目前「sequencefile」,「textfile」择懂,「rcfile」格式并不包含 serde 信息喻喳,所以可以在這 3 種數(shù)據(jù)格式的情況下使用該選項(xiàng)。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
這項(xiàng)選項(xiàng)只適用于「textfile」文件格式休蟹,它們定義了如何將文本分割成數(shù)據(jù)行沸枯。 |
所有其他的在 OPTIONS
中定義的屬性都會(huì)被當(dāng)做 Hive serde 屬性。
與不同版本的 Hive Metastore 交互
Spark SQL 對(duì) Hive 支持的一個(gè)很重要的點(diǎn)就是與 Hive metastore 進(jìn)行交互赂弓,這可以讓 Spark SQL 能夠訪問(wèn) Hive 表的元信息绑榴。從 Spark 1.4.0 開(kāi)始,一個(gè)單獨(dú)的 Spark SQL 構(gòu)建版本可以用來(lái)跟不同版本的 Hive metastore 進(jìn)行交互盈魁,請(qǐng)使用下面的配置翔怎。注意,與 Hive 的版本無(wú)關(guān),這里說(shuō)的的是 metastore 的版本赤套,在 Spark SQL 內(nèi)部會(huì)編譯內(nèi)建的 Hive 并使用那些類來(lái)執(zhí)行內(nèi)部流程(serdes, UDFs, UDAFs 等等)飘痛。
下列選項(xiàng)用來(lái)配置 Hive 版本,以獲取元數(shù)據(jù):
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.sql.hive.metastore.version |
2.3.7 |
Hive metastore 的版本容握,可選項(xiàng)從 0.12.0 到 2.3.7 宣脉,從 3.0.0 到 3.1.2 漆枚。 |
1.4.0 |
spark.sql.hive.metastore.jars |
builtin |
初始化 HiveMetastoreClient 所需要的 Jar 包的位置呻袭。該選項(xiàng)有三種可選值:1蚀同、builtin 使用 Hive 2.3.7阻课,跟 Spark 的集成構(gòu)建綁定颗祝,編譯時(shí)使用 -Phive 參數(shù)生效习寸,此時(shí)參數(shù) spark.sql.hive.metastore.version 的值必須是 2.3.7 或者未定義寂曹;2爱态、maven 使用從 Maven 倉(cāng)庫(kù)下載的指定版本的 Hive 依賴感憾,這種配置在生產(chǎn)環(huán)境不建議使用蜡励。3、一個(gè)適用于 JVM 的標(biāo)準(zhǔn)的 classpath 路徑阻桅,該路徑中必須包含所有 Hive 相關(guān)的類庫(kù)及其依賴凉倚,包括正確版本的 Hadoop。只有 driver 程序需要這些依賴鳍刷,但是如果使用 yarn cluster 模式啟動(dòng)應(yīng)用程序占遥,就需要把這些依賴裝進(jìn)你的應(yīng)用程序 Jar 文件。 |
1.4.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc |
以逗號(hào)分隔的全限定類名前綴列表输瓜,相應(yīng)的類需要通過(guò) Spark SQL 和指定版本的 Hive 共享的類加載器進(jìn)行加載瓦胎。例如其中一個(gè)需要被共享的依賴就是連接 metastore 需要用到的 JDBC 驅(qū)動(dòng)包。其他需要被共享的依賴就是那些已經(jīng)被共享的依賴尤揣,例如 log4j搔啊。 | 1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
以逗號(hào)分隔的全限定類名前綴列表,這些依賴需要被 Spark SQL 連接的 Hive 的相應(yīng)版本重新加載北戏。例如负芋,定義了 Hive UDF 的依賴應(yīng)該被共享(即 org.apache.spark.* )。 |
1.4.0 |
JDBC 連接其他數(shù)據(jù)庫(kù)
Spark SQL 還提供了一個(gè)可以通過(guò) JDBC 連接其他數(shù)據(jù)庫(kù)的數(shù)據(jù)源嗜愈,該功能需要通過(guò) JdbcRDD 來(lái)實(shí)現(xiàn)旧蛾。結(jié)果將會(huì)以 DataFrame 的形式返回,之后就可以通過(guò) Spark SQL 進(jìn)行處理或者與其他數(shù)據(jù)源進(jìn)行連接蠕嫁。JDBC 數(shù)據(jù)源在 Java 和 Python 中用起來(lái)也很簡(jiǎn)單锨天,并不要求提供一個(gè) ClassTag。(注意 JDBC 數(shù)據(jù)源和 Spark SQL JDBC server 不是一回事剃毒,后者可以讓?xiě)?yīng)用程序通過(guò) Spark SQL 執(zhí)行查詢)
要使用 JDBC 數(shù)據(jù)源病袄,需要將相應(yīng)的 JDBC 驅(qū)動(dòng)包放到 spark 的 classpath 中搂赋。例如,如果想從 Spark Shell 中連接 PostGRE 數(shù)據(jù)庫(kù)需要執(zhí)行以下命令:
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
遠(yuǎn)端數(shù)據(jù)庫(kù)中的表可以通過(guò)數(shù)據(jù)源 API 被加載成為一個(gè) DataFrame 或者 Spark SQL 臨時(shí)視圖益缠。用戶可以通過(guò)數(shù)據(jù)源選項(xiàng)指定 JDBC 連接的參數(shù)脑奠。user
和 password
選項(xiàng)通常用來(lái)登錄數(shù)據(jù)庫(kù),Spark 還支持下列大小寫(xiě)不敏感的選項(xiàng):
Property Name | Meaning |
---|---|
url |
連接 JDBC 所用的 URL幅慌。有些選項(xiàng)可以包含在 URL 中宋欺。例如,jdbc:postgresql://localhost/test?user=fred&password=secret 欠痴。 |
dbtable |
需要讀寫(xiě)的表迄靠。當(dāng)該選項(xiàng)被用作讀取時(shí)的參數(shù)時(shí),任何 SQL 中有效的 FROM 語(yǔ)句都可以被使用喇辽。例如,可以不填寫(xiě)完整的表名而是一個(gè)用小括號(hào)括起來(lái)的子查詢雨席。不允許同時(shí)指定 dbtable 和 query 選項(xiàng)菩咨。 |
query |
一個(gè)從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的查詢語(yǔ)句。被指定的查詢會(huì)被小括號(hào)括起來(lái)作為 FROM 語(yǔ)句的子查詢陡厘,Spark 會(huì)為該子查詢分配一個(gè)別名抽米。例如,Spark 會(huì)組織一個(gè)這樣的語(yǔ)句來(lái)訪問(wèn) JDBC:SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 糙置。下面是一些使用該選項(xiàng)的限制云茸。不允許同時(shí)指定 dbtable 和 query 選項(xiàng)。不允許同時(shí)指定 partitionColumn 和 query 選項(xiàng)谤饭;當(dāng)需要指定 partitionColumn 選項(xiàng)時(shí)标捺,請(qǐng)使用 dbtable 來(lái)指定子查詢,分區(qū)列可以通過(guò)子查詢中的別名來(lái)指定揉抵。示例:spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load() 亡容。 |
driver |
JDBC 驅(qū)動(dòng)的全限定類名。 |
partitionColumn, lowerBound, upperBound |
這些選項(xiàng)必須同時(shí)指定冤今,此外闺兢,選項(xiàng) numPartitions 也必須指定。這些選項(xiàng)一同定義了如何從數(shù)據(jù)庫(kù)并行讀取數(shù)據(jù)戏罢。partitionColumn 選項(xiàng)必須是一個(gè)數(shù)字屋谭,日期或者時(shí)間戳類型。注意 lowerBound 和 upperBound 選項(xiàng)只影響分區(qū)步距龟糕,并不過(guò)濾表中的數(shù)據(jù)桐磁。所以表中的所有數(shù)據(jù)會(huì)被分區(qū)之后讀取,這些選項(xiàng)只在讀取時(shí)有效翩蘸。 |
numPartitions |
讀寫(xiě)數(shù)據(jù)時(shí)可以被用到的最大分區(qū)數(shù)所意,該選項(xiàng)也決定了最大并發(fā) JDBC 連接數(shù)。如果寫(xiě)出時(shí)的分區(qū)數(shù)超過(guò)了這個(gè)限制,會(huì)在寫(xiě)出之前調(diào)用 coalesce(numPartitions) 方法削減分區(qū)扶踊。 |
queryTimeout |
JDBC 查詢超時(shí)時(shí)間泄鹏,單位為秒,零表示無(wú)限制秧耗。在寫(xiě)出時(shí)备籽,該選項(xiàng)取決于 JDBC 驅(qū)動(dòng)實(shí)現(xiàn) setQueryTimeout API 的方式,例如分井,h2 JDBC 驅(qū)動(dòng)每次查詢都會(huì)檢查超時(shí)時(shí)間车猬,而不是對(duì)于整個(gè) JDBC 批次檢查。默認(rèn)值為 0 尺锚。 |
fetchsize |
JDBC 拉取數(shù)據(jù)的數(shù)量珠闰,決定了一次拉取多少行數(shù)據(jù)。該選項(xiàng)可以幫助提升那些默認(rèn)拉取值很小 JDBC 的性能(例如 Oracle 是一次 10 行)瘫辩。該選項(xiàng)只在讀取數(shù)據(jù)時(shí)有效伏嗜。 |
batchsize |
JDBC 批次數(shù)據(jù)的數(shù)量,決定了一次寫(xiě)出多少數(shù)據(jù)伐厌。該選項(xiàng)可以幫助提升 JDBC 的性能承绸。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。默認(rèn)值為 1000 挣轨。 |
isolationLevel |
事務(wù)隔離級(jí)別军熏,應(yīng)用于當(dāng)前連接【戆纾可以是 NONE 荡澎,READ_COMMITTED ,READ_UNCOMMITTED 画饥,REPEATABLE_READ 衔瓮,或者 SERIALIZABLE ,與 JDBC 連接定義的標(biāo)準(zhǔn)事務(wù)隔離級(jí)別相對(duì)應(yīng)抖甘,默認(rèn)值是 READ_UNCOMMITTED 热鞍。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。詳情參見(jiàn) java.sql.Connection 衔彻。 |
sessionInitStatement |
在連接到遠(yuǎn)端數(shù)據(jù)庫(kù)的會(huì)話開(kāi)啟之后薇宠,在讀取數(shù)據(jù)之前,執(zhí)行該選項(xiàng)所定義的 SQL 語(yǔ)句(或是 PL/SQL 塊)艰额,可以通過(guò)該選項(xiàng)做一些會(huì)話初始化工作澄港。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") 。 |
truncate |
當(dāng)存儲(chǔ)模式設(shè)置為 SaveMode.Overwrite 時(shí)柄沮,該選項(xiàng)會(huì)在寫(xiě)出時(shí)清空表而不是刪除后再創(chuàng)建回梧。這樣會(huì)更高效一些废岂,還避免了刪除表的元數(shù)據(jù)(比如說(shuō)索引)。然而在某些情況下該選項(xiàng)不會(huì)生效狱意,比如在新數(shù)據(jù)跟原始表的模式不同的時(shí)候湖苞。默認(rèn)值為 false 。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效详囤。 |
cascadeTruncate |
這是一個(gè) JDBC 寫(xiě)出時(shí)選項(xiàng)财骨。如果開(kāi)啟,同時(shí)數(shù)據(jù)庫(kù)也支持(目前只有 PostgreSQL 和 Oracle)藏姐,該選項(xiàng)在寫(xiě)出前會(huì)執(zhí)行 TRUNCATE TABLE t CASCADE 語(yǔ)句(在 PostgreSQL 中是 TRUNCATE TABLE ONLY t CASCADE 隆箩,來(lái)避免無(wú)意間清空了衍生表)。該選項(xiàng)會(huì)影響其他的表羔杨,需要謹(jǐn)慎使用捌臊。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。默認(rèn)值為相應(yīng)數(shù)據(jù)庫(kù)默認(rèn)的 cascading truncate 行為问畅,通過(guò) JDBC 會(huì)話中的 isCascadeTruncate 參數(shù)指定娃属。 |
createTableOptions |
這是一個(gè) JDBC 寫(xiě)出時(shí)選項(xiàng)真慢。該選項(xiàng)可以指定寫(xiě)出數(shù)據(jù)時(shí)的建表語(yǔ)句(例如 CREATE TABLE t (name string) ENGINE=InnoDB. )缨睡。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效汇荐。 |
createTableColumnTypes |
在建表時(shí)指定每列的數(shù)據(jù)類型而不是采用默認(rèn)值。數(shù)據(jù)類型信息應(yīng)該和 CREATE TABLE 語(yǔ)句中指定的一致(例如:"name CHAR(64), comments VARCHAR(1024)") )卵皂。被指定的類型應(yīng)該是有效的 Spark SQL 數(shù)據(jù)類型。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效砚亭。 |
customSchema |
從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)時(shí)自定義數(shù)據(jù)模式灯变,例如:"id DECIMAL(38, 0), name STRING" ⊥北欤可以只指定部分列添祸,其他列使用默認(rèn)類型映射,例如:"id DECIMAL(38, 0)" 寻仗。列名應(yīng)該和數(shù)據(jù)庫(kù)表中的字段名相同刃泌。用戶可以通過(guò)該選項(xiàng)指定 Spark SQL 中的相應(yīng)類型而不是使用默認(rèn)映射。該選項(xiàng)只在讀取數(shù)據(jù)時(shí)有效署尤。 |
pushDownPredicate |
是否開(kāi)啟謂詞下推機(jī)制耙替,默認(rèn)值是 true,Spark 會(huì)盡力將過(guò)濾語(yǔ)句下推到數(shù)據(jù)庫(kù)執(zhí)行曹体。否則俗扇,如果被設(shè)置為 false,不會(huì)有謂詞下推箕别,所有的過(guò)濾操作都會(huì)由 Spark 來(lái)執(zhí)行铜幽。 |
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」滞谢。
Avro 文件
從 Spark 2.4 開(kāi)始, Spark SQL 對(duì) Avro 數(shù)據(jù)的讀寫(xiě)提供了原生支持除抛。
部署
spark-avro
是額外的模塊狮杨,默認(rèn)情況下并不包含在 spark-submit
或者 spark-shell
的 classpath 中。
對(duì)于 Spark 應(yīng)用程序來(lái)說(shuō)镶殷, spark-submit
腳本用來(lái)啟動(dòng)應(yīng)用程序禾酱。spark-avro_2.12
和它的依賴可以通過(guò)添加到 spark-submit
腳本的參數(shù)中,就像:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.0.0 ...
對(duì)于 spark-shell
腳本绘趋,也可以通過(guò) --packages
參數(shù)直接添加 org.apache.spark:spark-avro_2.12
和它的依賴:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 ...
更多有關(guān)啟動(dòng)應(yīng)用程序添加額外依賴的詳情參見(jiàn) Application Submission Guide颤陶。
讀寫(xiě)函數(shù)
由于 spark-avro
模塊是外部的,在 DataFrameReader
或者 DataFrameWriter
中并不存在 .avro
API陷遮。
讀寫(xiě) Avro 格式的數(shù)據(jù)滓走,需要指定數(shù)據(jù)源選項(xiàng) format
為 avro
(或者 org.apache.spark.sql.avro
)。
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
to_avro() 和 from_avro()
Avro 依賴提供了 to_avro()
函數(shù)來(lái)編碼一列數(shù)據(jù)為 Avro 格式帽馋,以及 from_avro()
函數(shù)來(lái)解碼 Avro 數(shù)據(jù)成為一列對(duì)象搅方。兩個(gè)函數(shù)都將一列轉(zhuǎn)換為另外一列,輸入輸出的 SQL 數(shù)據(jù)類型可以是復(fù)雜類型或者基礎(chǔ)類型绽族。
當(dāng)從一個(gè)像 Kafka 一樣的流失數(shù)據(jù)源讀寫(xiě)數(shù)據(jù)時(shí)姨涡,使用 Avro record 作為一列很方便。每一個(gè) Kafka 鍵值對(duì)記錄會(huì)跟一些元信息一起被讀取進(jìn)來(lái)吧慢,比如 Kafka 收錄這條記錄時(shí)的時(shí)間戳涛漂,記錄偏移量等等。
- 如果字段中包含的數(shù)據(jù)是 Avro 格式检诗,可以使用
from_avro()
函數(shù)來(lái)抽取數(shù)據(jù)匈仗,改進(jìn),清洗逢慌,然后再輸出到 Kafka 中去 或者寫(xiě)入到文件悠轩。 - 函數(shù)可以用來(lái)將結(jié)構(gòu)化數(shù)據(jù)轉(zhuǎn)換為 Avro record。對(duì)于需要將多列融合為一列寫(xiě)出到 Kafka 的場(chǎng)景很適用攻泼。
兩個(gè)函數(shù)目前已支持 Scala 和 Java 語(yǔ)言火架。
import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
數(shù)據(jù)源選項(xiàng)
Avro 數(shù)據(jù)源選項(xiàng)可以通過(guò)以下兩種方式設(shè)置:
-
DataFrameReader
或者DataFrameWriter
中的.option
方法。 - 函數(shù)
from_avro
中的options
參數(shù)坠韩。
Property Name | Default | Meaning | Scope |
---|---|---|---|
avroSchema |
None | 可選距潘,用戶提供的 JSON 格式的模式。在讀取 Avro 數(shù)據(jù)時(shí)只搁,此選項(xiàng)可以用來(lái)定義一個(gè)演化的模式音比,可以和實(shí)際的 Avro 模式不同,但必須兼容氢惋。反序列化的模式會(huì)和演化模式一致洞翩。 例如稽犁,如果設(shè)置一個(gè)擁有一個(gè)額外字段(有默認(rèn)值)的演化模式,Spark 讀取的結(jié)果中也會(huì)包含這個(gè)新字段骚亿。當(dāng)寫(xiě)出數(shù)據(jù)為 Avro 時(shí)已亥,該選項(xiàng)可以調(diào)和目標(biāo)模式與 Spark 轉(zhuǎn)換的模式不匹配的情況。例如来屠,目標(biāo)模式中有一個(gè)列式枚舉類型虑椎,而 Spark 生成的模式中該字段是字符串類型。 | read, write and function from_avro
|
recordName |
topLevelRecord | 寫(xiě)出數(shù)據(jù)時(shí)的 record 名稱俱笛,Avro 需要指定捆姜。 | write |
recordNamespace |
"" | 寫(xiě)出數(shù)據(jù)時(shí)的 record 命名空間。 | write |
ignoreExtension |
true | 該選項(xiàng)控制在讀取數(shù)據(jù)時(shí)是否忽略文件的 .avro 后綴迎膜。如果開(kāi)啟泥技,所有的文件都會(huì)被加載(有或者沒(méi)有 .avro 后綴)。該項(xiàng)選已被棄用磕仅,會(huì)在將來(lái)的版本中移除珊豹。請(qǐng)使用更通用的數(shù)據(jù)源選項(xiàng) pathGlobFilter 來(lái)過(guò)濾文件。 |
read |
compression |
snappy |
compression 選項(xiàng)指定在寫(xiě)出數(shù)據(jù)時(shí)的壓縮格式榕订,目前支持的選項(xiàng)有 uncompressed , snappy , deflate , bzip2 和 xz 店茶。如果沒(méi)有指定,會(huì)使用配置中的 spark.sql.avro.compression.codec 參數(shù)劫恒。 |
write |
mode |
FAILFAST |
mode 選項(xiàng)指定 from_avro 函數(shù)的解析模式忽妒,目前支持的模式有 FAILFAST :在處理?yè)p壞文件時(shí)拋出異常;PERMISSIVE :損壞的記錄被當(dāng)做空值處理兼贸。所以,數(shù)據(jù)模式會(huì)被強(qiáng)制改變?yōu)榭梢詾榭罩党越Γ@有可能與用戶提供的模式不同溶诞。 |
function from_avro
|
配置項(xiàng)
可以通過(guò) SparkSession 對(duì)象的 setConf
方法或者執(zhí)行 SQL 中的 SET key=value
命令來(lái)改變的配置項(xiàng)。
Property Name | Default | Meaning | Since Version |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | 如果設(shè)置為 true决侈,為了向后兼容螺垢,數(shù)據(jù)源 provider com.databricks.spark.avro 會(huì)被作為外部數(shù)據(jù)源模塊映射為內(nèi)置模式。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | 寫(xiě)出 Avro 文件時(shí)使用的壓縮格式赖歌。支持的壓縮格式有:uncompressed, deflate, snappy, bzip2 和 xz枉圃。默認(rèn)值為 snappy。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | 寫(xiě)出 Avro 文件時(shí)石筍的壓縮級(jí)別庐冯,有效值必須在 1 到9 之間或者 -1孽亲。默認(rèn)值為 -1,在目前的版本中相當(dāng)于 6展父。 | 2.4.0 |
與 Databricks spark-avro 的兼容
該 Avro 數(shù)據(jù)源模塊衍生于 Databricks 的開(kāi)源版本 spark-avro返劲,并與之保持兼容玲昧。
默認(rèn)情況下 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
配置是開(kāi)啟的,數(shù)據(jù)源 provider com.databricks.spark.avro
會(huì)被映射為內(nèi)置的 Avro 模塊篮绿。對(duì)于通過(guò) com.databricks.spark.avro
中的 Provider
創(chuàng)建的表孵延,其元數(shù)據(jù)可以被內(nèi)置 Avro 模塊讀取。
注意亲配,在 Databricks 的 spark-avro 中尘应,隱式類 AvroDataFrameWriter
和 AvroDataFrameReader
可以通過(guò)函數(shù) .avro()
創(chuàng)建。而在內(nèi)置的模塊中吼虎,兩個(gè)隱式類都被移除了犬钢。請(qǐng)?jiān)?DataFrameWriter
和 DataFrameReader
對(duì)象中使用 .format("avro")
方法,足夠簡(jiǎn)潔明了鲸睛。
如果你更想使用自己構(gòu)建的 spark-avro
jar 文件娜饵,可以將配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
設(shè)置為 false,通過(guò) --jars
參數(shù)來(lái)部署應(yīng)用程序官辈。詳情參見(jiàn) Advanced Dependency Management箱舞。
支持的數(shù)據(jù)類型 Avro -> Spark SQL
目前 Spark 支持讀取 Avro record 中所有的基礎(chǔ)數(shù)據(jù)類型(primitive types)和復(fù)雜數(shù)據(jù)類型(complex types)。
Avro type | Spark SQL type |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | See below |
除了上面列出的類型之外拳亿,還支持讀取 union
類型晴股,下面三種類型被解析為 union
類型:
-
union(int, long)
會(huì)被映射為 LongType。 -
union(float, double)
會(huì)被映射為 DoubleType肺魁。 -
union(something, null)
电湘,其中 something 是任意支持的 Avro 類型。該類型會(huì)被映射為與 something 相關(guān)的 Spark SQL 類型(nullable 設(shè)置為 true)鹅经。所有其他的類型會(huì)被當(dāng)做復(fù)雜數(shù)據(jù)類型寂呛,會(huì)被映射為 StructType,其中的字段名稱分別為 member0瘾晃,member1 等等贷痪,數(shù)量與 union 中的類型數(shù)量一致。在 Avro 和 Parquet 之間相互轉(zhuǎn)換時(shí)也遵循同樣的原則蹦误。
還支持讀取下列 Avro 邏輯類型(logical types):
Avro logical type | Avro type | Spark SQL type |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
目前劫拢,Spark SQL 會(huì)忽略 Avro 文件的 docs,aliases 和其他屬性强胰。
支持的數(shù)據(jù)類型 Spark SQL -> Avro conversion
Spark 支持寫(xiě)入 Spark SQL 數(shù)據(jù)類型的數(shù)據(jù)到 Avro舱沧。對(duì)于大多數(shù)數(shù)據(jù)類型,從 Spark SQL 數(shù)據(jù)類型到 Avro 類型的映射是很直接的(比如說(shuō) IntegerType 映射為 int)偶洋;然而還有一些特殊的情況:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
還可以通過(guò)選項(xiàng) avroSchema
指定完整的 Avro 模式波闹,這樣 Spark SQL 數(shù)據(jù)類型會(huì)被轉(zhuǎn)換為另一種 Avro 數(shù)據(jù)類型摇幻。下列轉(zhuǎn)換不能通過(guò)默認(rèn)映射執(zhí)行肤京,需要用戶指定 Avro 模式:
Spark SQL type | Avro type | Avro logical type |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |
二進(jìn)制文件
從 Spark 3.0 開(kāi)始,Spark 支持二進(jìn)制文件數(shù)據(jù)源肾筐,可以讀取二進(jìn)制文件,將每個(gè)文件轉(zhuǎn)換為一條記錄缸剪,記錄中包含了完整的文件數(shù)據(jù)吗铐。生成的 DataFrame 會(huì)包含以下字段以及可能的分區(qū)字段:
-
path
: StringType -
modificationTime
: TimestampType -
length
: LongType -
content
: BinaryType
讀取完整的二進(jìn)制文件需要指定數(shù)據(jù)源選項(xiàng) format
為 binaryFile
,可以通過(guò)數(shù)據(jù)源選項(xiàng) pathGlobFilter
來(lái)過(guò)濾需要加載的文件同時(shí)不影響分區(qū)發(fā)現(xiàn)的功能杏节。例如下面的代碼讀取指定路徑中所有的 PNG 文件:
spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")
二進(jìn)制文件數(shù)據(jù)源不支持將一個(gè) DataFrame 寫(xiě)回到原來(lái)的文件唬渗。
疑難解答
- JDBC 驅(qū)動(dòng)包必須可以被 client 和所有 executor 的 Application ClassLoader 加載。這是因?yàn)?Java 的 DriverManager 類會(huì)進(jìn)行安全檢查奋渔,會(huì)在打開(kāi)數(shù)據(jù)庫(kù)連接時(shí)忽略所有無(wú)法被 Application ClassLoader 加載的類镊逝。將相關(guān)的 Jar 文件加入到所有節(jié)點(diǎn)上的 compute_classpath.sh 腳本中是一種簡(jiǎn)便的解決方案。
- 某些數(shù)據(jù)庫(kù)嫉鲸,比如說(shuō) H2撑蒜,將所有的名稱轉(zhuǎn)換為大寫(xiě),此時(shí)在 Spark SQL 中引用那些名稱時(shí)也需要用大寫(xiě)形式玄渗。
- 用戶可以在數(shù)據(jù)源選項(xiàng)中指定 JDBC 驅(qū)動(dòng)包供應(yīng)商提供的特殊屬性座菠。例如
spark.read.format("jdbc").option("url", oracleJdbcUrl).option("oracle.jdbc.mapDateToTimestamp", "false")
。oracle.jdbc.mapDateToTimestamp
默認(rèn)為 true藤树,用戶通常會(huì)關(guān)閉該選項(xiàng)來(lái)避免 Oracle 中的 date 類型轉(zhuǎn)換為 timestamp浴滴。