0302 Data Sources

轉(zhuǎn)載請(qǐng)注明出處霜医,謝謝合作~

該篇中的示例暫時(shí)只有 Scala 版本~

數(shù)據(jù)源

Spark SQL 支持通過(guò) DataFrame 接口操作多種數(shù)據(jù)源枷踏。一個(gè) DataFrame 可以通過(guò)關(guān)系型轉(zhuǎn)換算子操作繁仁,還可以用來(lái)創(chuàng)建一個(gè)臨時(shí)視圖。將 DataFrame 注冊(cè)成一個(gè)臨時(shí)視圖之后就可以通過(guò) SQL 語(yǔ)句進(jìn)行查詢桶雀。本章節(jié)介紹了使用 Spark 數(shù)據(jù)源加載和存儲(chǔ)數(shù)據(jù)的常用方法矿酵,之后給出內(nèi)置數(shù)據(jù)源需要的不同的參數(shù)。

常用讀寫(xiě)函數(shù)

最簡(jiǎn)單的情況全肮,默認(rèn)的數(shù)據(jù)源(parquet 除非配置了參數(shù) spark.sql.sources.default)將會(huì)被使用。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」棘捣。

手動(dòng)指定參數(shù)

可以通過(guò)手動(dòng)指定數(shù)據(jù)源參數(shù)和相關(guān)的額外選項(xiàng)來(lái)創(chuàng)建 DataFrame辜腺。數(shù)據(jù)源通過(guò)全限定類名來(lái)指定(比如 org.apache.spark.sql.parquet),但是對(duì)于內(nèi)置的數(shù)據(jù)源可以使用簡(jiǎn)稱(json, parquet, jdbc, orc, libsvm, csv, text)。從任意數(shù)據(jù)源類型中加載的數(shù)據(jù)可以輕松的轉(zhuǎn)換成其他支持的格式评疗。

可以通過(guò) API 文檔了解內(nèi)置數(shù)據(jù)源的所有選項(xiàng)测砂,例如 org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。文檔中的選項(xiàng)對(duì)于非 Scala 語(yǔ)言的 Spark API(比如 PySpark)也適用百匆。對(duì)于其他的格式砌些,參見(jiàn)相應(yīng)的 API 文檔。

加載 JSON 文件:

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」加匈。

加載 CSV 文件:

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」存璃。

額外的選項(xiàng)對(duì)于寫(xiě)出操作也適用。例如雕拼,對(duì)于 ORC 文件可以通過(guò)選項(xiàng)控制布隆過(guò)濾器和字典編碼纵东。下面的 ORC 樣例將會(huì)創(chuàng)建布隆過(guò)濾器并對(duì) favorite_color 列使用字典編碼。對(duì)于 Parquet 文件啥寇,也有一個(gè) parquet.enable.dictionary 的選項(xiàng)偎球。有關(guān) ORC/Parquet 數(shù)據(jù)格式選項(xiàng)的更多詳情請(qǐng)移步 Apache ORC/Parquet 官方網(wǎng)站。

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」辑甜。

直接通過(guò) SQL 操作文件

除了使用 read API 加載文件成為一個(gè) DataFrame衰絮,還可以通過(guò) SQL 直接查詢。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」栈戳。

存儲(chǔ)模式

存儲(chǔ)操作有一個(gè)可選的 SaveMode 類型的參數(shù)岂傲,該參數(shù)指定如何處理存儲(chǔ)路徑中已經(jīng)存在的數(shù)據(jù)难裆。需要注意的是存儲(chǔ)模式并不加鎖子檀,也不是原子性的。此外乃戈,當(dāng)存儲(chǔ)模式設(shè)置為 Overwrite 時(shí)褂痰,在寫(xiě)入新數(shù)據(jù)之前會(huì)刪除舊數(shù)據(jù)。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) 如果輸出路徑中的數(shù)據(jù)已存在症虑,會(huì)拋出異常缩歪。
SaveMode.Append "append" 如果輸出路徑中的數(shù)據(jù)已存在,寫(xiě)出的數(shù)據(jù)會(huì)被追加到現(xiàn)有數(shù)據(jù)之后谍憔。
SaveMode.Overwrite "overwrite" 如果輸出路徑中的數(shù)據(jù)已存在匪蝙,會(huì)被新寫(xiě)出的數(shù)據(jù)替換。
SaveMode.Ignore "ignore" 如果輸出路徑中的數(shù)據(jù)已存在习贫,新寫(xiě)出的數(shù)據(jù)不會(huì)被存儲(chǔ)逛球,現(xiàn)有的數(shù)據(jù)也不會(huì)被更新。類似于 SQL 中的 CREATE TABLE IF NOT EXISTS苫昌。

持久化數(shù)據(jù)表

DataFrames 還可以通過(guò) saveAsTable 方法存儲(chǔ)到 Hive 中颤绕。注意,并不需要事先安裝好 Hive,Spark 會(huì)通過(guò) Derby 創(chuàng)建一個(gè)默認(rèn)的本地 Hive 元數(shù)據(jù)庫(kù)奥务。跟 createOrReplaceTempView 方法不同的是物独,saveAsTable 方法會(huì)將 DataFrame 中的數(shù)據(jù)落盤(pán)并在 Hive 元數(shù)據(jù)庫(kù)中創(chuàng)建一條描述存儲(chǔ)信息的記錄。只要能夠訪問(wèn)元數(shù)據(jù)庫(kù)氯葬,持久化后的數(shù)據(jù)表在 Spark 應(yīng)用程序重啟之后依舊可以訪問(wèn)挡篓。在持久化數(shù)據(jù)表時(shí)可以通過(guò)調(diào)用 SparkSessiontable 方法指定表名稱。

基于文件的數(shù)據(jù)源帚称,比如 text瞻凤,parquet,json 等等世杀,可以通過(guò) path 選項(xiàng)指定存儲(chǔ)路徑阀参,例如 df.write.option("path", "/some/path").saveAsTable("t")。當(dāng)數(shù)據(jù)表被刪除后瞻坝,自定義存儲(chǔ)路徑中的數(shù)據(jù)不會(huì)被刪除蛛壳。如果不指定存儲(chǔ)路徑,Spark 會(huì)將數(shù)據(jù)寫(xiě)出到 warehouse 文件夾所刀,這種情況下如果數(shù)據(jù)表被刪除衙荐,數(shù)據(jù)也會(huì)被刪除。

從 Spark 2.1 開(kāi)始浮创,持久化的數(shù)據(jù)表的每個(gè)分區(qū)都會(huì)在 Hive 元數(shù)據(jù)庫(kù)中有相關(guān)記錄忧吟,這種方式帶來(lái)一些優(yōu)化:

  • 對(duì)于查詢?cè)獢?shù)據(jù)庫(kù)可以只返回所需要分區(qū)的信息,在第一次查詢時(shí)不再需要加載所有的分區(qū)斩披。
  • Datasource API 支持了像 ALTER TABLE PARTITION ... SET LOCATION 這樣的 Hive DDL 語(yǔ)句溜族。

注意,在創(chuàng)建外部數(shù)據(jù)表(自定義了 path 選項(xiàng))時(shí)垦沉,默認(rèn)情況下分區(qū)信息并沒(méi)有被收集煌抒,同步元數(shù)據(jù)庫(kù)中的分區(qū)信息需要執(zhí)行 MSCK REPAIR TABLE 語(yǔ)句。

分桶厕倍,排序和分區(qū)

對(duì)于基于文件的數(shù)據(jù)源寡壮,還可以對(duì)輸出進(jìn)行分桶凹联,排序和分區(qū)采桃。分桶和排序只適用于通過(guò) saveAsTable 持久化數(shù)據(jù)表:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

但是分區(qū)操作對(duì)于 savesaveAsTable 都適用:

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」属瓣。

可以同時(shí)在一張表中適用分桶和分區(qū):

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」组民。

就像 Partition Discovery 章節(jié)描述的那樣棒仍,partitionBy 操作會(huì)創(chuàng)建一個(gè)目錄結(jié)構(gòu)。所以邪乍,以很大基數(shù)的字段分區(qū)不會(huì)是一個(gè)明智的選擇降狠。相反对竣,bucketBy 操作將數(shù)據(jù)按照固定數(shù)量分桶,適用于數(shù)據(jù)技基數(shù)很大的場(chǎng)景榜配。

常用的文件數(shù)據(jù)源選項(xiàng)

這些常用的選項(xiàng)只在適用文件類型的數(shù)據(jù)源時(shí)有效:parquet, orc, avro, json, csv, text否纬。

注意在示例中使用的目錄結(jié)構(gòu)如下:

dir1/
 ├── dir2/
 │    └── file2.parquet (schema: <file: string>, content: "file2.parquet")
 └── file1.parquet (schema: <file, string>, content: "file1.parquet")
 └── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")

忽略損壞的文件

在從文件讀取數(shù)據(jù)的過(guò)程中可以通過(guò)設(shè)置參數(shù) spark.sql.files.ignoreCorruptFiles 忽略損壞的文件,當(dāng)設(shè)置為 true 時(shí)蛋褥,Spark 作業(yè)在遇到損壞的文件的時(shí)候會(huì)繼續(xù)運(yùn)行临燃,已經(jīng)被讀取的部分依舊有效。

// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF = spark.read.parquet(
  "examples/src/main/resources/dir1/",
  "examples/src/main/resources/dir1/dir2/")
testCorruptDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」烙心。

忽略丟失的文件

在從文件讀取數(shù)據(jù)的過(guò)程中可以通過(guò)設(shè)置參數(shù) spark.sql.files.ignoreMissingFiles 忽略丟失的文件膜廊。在這里丟失的文件是指在創(chuàng)建好 DataFrame 之后刪除了的文件。當(dāng)設(shè)置為 true 時(shí)淫茵,Spark 作業(yè)在遇到丟失的文件的時(shí)候會(huì)繼續(xù)運(yùn)行爪瓜,已經(jīng)被讀取的部分依舊有效。

全局文件過(guò)濾器

選項(xiàng) pathGlobFilter 用來(lái)只讀取匹配目標(biāo)模式的文件匙瘪,語(yǔ)法與 org.apache.hadoop.fs.GlobFilter 相同铆铆,過(guò)濾操作不會(huì)改變分區(qū)發(fā)現(xiàn)的行為。

val testGlobFilterDF = spark.read.format("parquet")
  .option("pathGlobFilter", "*.parquet") // json file should be filtered out
  .load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」丹喻。

遞歸文件搜索

選項(xiàng) recursiveFileLookup 用來(lái)遞歸查找目標(biāo)路徑下的匹配文件薄货,但是會(huì)關(guān)閉分區(qū)推斷機(jī)制,默認(rèn)值為 false碍论。如果在選項(xiàng) recursiveFileLookup 為 true 時(shí)顯示指定了 partitionSpec 選項(xiàng)谅猾,會(huì)拋出異常。

val recursiveLoadedDF = spark.read.format("parquet")
  .option("recursiveFileLookup", "true")
  .load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」鳍悠。

Parquet 文件

Parquet 是一種被許多數(shù)據(jù)處理系統(tǒng)支持的列式存儲(chǔ)格式税娜。Spark SQL 提供了對(duì) Parquet 文件讀寫(xiě)的支持,文件中會(huì)保存原始數(shù)據(jù)的模式贼涩。當(dāng)讀取 Parquet 文件時(shí)巧涧,處于兼容性的考慮所有的列被自動(dòng)轉(zhuǎn)換為 nullable 類型。

通過(guò)程序加載數(shù)據(jù)

采用上述示例中的數(shù)據(jù):

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」遥倦。

分區(qū)發(fā)現(xiàn)

數(shù)據(jù)分區(qū)是一種常見(jiàn)的優(yōu)化方式,比如在 Hive 中就是如此占锯。對(duì)于分區(qū)表袒哥,數(shù)據(jù)通常存儲(chǔ)在不同的目錄,分區(qū)鍵的值會(huì)被編碼到分區(qū)目錄的名稱中消略。所有內(nèi)置的基于文件的數(shù)據(jù)源(Text/CSV/JSON/ORC/Parquet)都能夠自動(dòng)的發(fā)現(xiàn)和推斷分區(qū)信息堡称。例如,可以將之前用到的人口數(shù)據(jù)采用下面的目錄結(jié)構(gòu)存儲(chǔ)到分區(qū)表中艺演,分區(qū)鍵是 gendercountry 兩列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

將目錄 path/to/table 傳遞給 SparkSession.read.parquet 或者 SparkSession.read.load 方法却紧,Spark SQL 會(huì)從目錄中自動(dòng)提取分區(qū)信息桐臊,于是 DataFrame 的模式變成了:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分區(qū)鍵的數(shù)據(jù)類型是自動(dòng)推斷的晓殊,目前支持的有數(shù)字類型断凶,日期類型,時(shí)間戳類型和字符串類型巫俺。有事用戶不希望自動(dòng)推斷分區(qū)鍵的數(shù)據(jù)類型认烁,對(duì)于此類需求,分區(qū)鍵類型推斷可以由參數(shù) spark.sql.sources.partitionColumnTypeInference.enabled 控制介汹,默認(rèn)值為 true却嗡。當(dāng)類型推薦被禁用時(shí),分區(qū)鍵會(huì)被指定為字符串類型嘹承。

從 Spark 1.6.0 開(kāi)始窗价,默認(rèn)情況下分區(qū)發(fā)現(xiàn)功能只會(huì)尋找指定路徑下的分區(qū)。對(duì)于上述示例叹卷,如果用戶將路徑 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或者 SparkSession.read.load 方法舌镶,gender 不會(huì)被當(dāng)做一個(gè)分區(qū)鍵。如果用戶需要指定發(fā)現(xiàn)分區(qū)的根目錄豪娜,可以設(shè)置 basePath 選項(xiàng)餐胀。例如,當(dāng)給定路徑為 path/to/table/gender=male 而選項(xiàng) basePath 設(shè)置為 path/to/table/ 時(shí)瘤载,gender 也會(huì)被當(dāng)做一個(gè)分區(qū)鍵否灾。

模式融合

像 Protocol Buffer,Avro 和 Thrift 一樣鸣奔,Parquet 也支持模式演化墨技。用戶可以初始定義一個(gè)簡(jiǎn)單的模式,之后在需要的時(shí)候增加列挎狸。如此一來(lái)扣汪,可能最終會(huì)有多個(gè)模式不同但又相互兼容的 Parquet 文件。Parquet 數(shù)據(jù)源現(xiàn)在已經(jīng)能夠自動(dòng)檢測(cè)這種情況锨匆,并在需要時(shí)融合多個(gè)文件的模式崭别。

由于模式融合是一個(gè)開(kāi)銷較大的操作,而且在大多數(shù)情況下是不需要的恐锣,自 Spark 1.5.0 以來(lái)該功能默認(rèn)情況下時(shí)關(guān)閉的茅主,可以通過(guò)以下兩種方式開(kāi)啟:

  1. 在讀取 Parquet 文件時(shí)設(shè)置數(shù)據(jù)源選項(xiàng) mergeSchematrue
  2. 設(shè)置全局的 SQL 參數(shù) spark.sql.parquet.mergeSchematrue土榴。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」诀姚。

Hive 元數(shù)據(jù) Parquet 表轉(zhuǎn)換

當(dāng)從 Hive 中讀取 Parquet 格式的表和向 Hive 中寫(xiě)入不分區(qū)的 Parquet 表時(shí),處于性能的考量 Spark SQL 會(huì)嘗試使用自己的 Parquet 解析器而不是 Hive 的 SerDe 類庫(kù)玷禽。這個(gè)機(jī)制由參數(shù)spark.sql.hive.convertMetastoreParquet` 控制赫段,默認(rèn)開(kāi)啟呀打。

Hive/Parquet 模式調(diào)和

從模式解析的角度來(lái)看,Hive 和 Parquet 之間有兩個(gè)重要的不同之處糯笙。

  1. Hive 是大小寫(xiě)不敏感的贬丛,而 Parquet 大小寫(xiě)敏感。
  2. Hive 中的所有列都默認(rèn)是 nullable炬丸,但是空值在 Parquet 中是有意義的瘫寝。

因此,當(dāng)把一個(gè) Hive 中的 Parquet 表轉(zhuǎn)換為 Spark SQL 中的 Parquet 表時(shí)必須調(diào)和兩者模式之間的差異稠炬。規(guī)則如下:

  1. 無(wú)論是否可為空值焕阿,兩者模式中相同名稱的字段必須擁有相同的數(shù)據(jù)類型。為了滿足可為空值的條件首启,被調(diào)和的字段應(yīng)該具有 Parquet 端的數(shù)據(jù)類型暮屡。
  2. 被調(diào)和的模式需要完全包含 Hive 模式中的字段。
    • 在被調(diào)和的模式中毅桃,只出現(xiàn)在 Parquet 端的字段會(huì)被刪除褒纲。Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
    • 在被調(diào)和的模式中,只出現(xiàn)在 Hive 模式中的字段會(huì)被添加為一個(gè) nullable 字段钥飞。

刷新元數(shù)據(jù)

為了更好的性能莺掠,Spark SQL 會(huì)緩存 Parquet 表的元數(shù)據(jù)。開(kāi)啟 Hive 元數(shù)據(jù) Parquet 表轉(zhuǎn)換之后读宙,被轉(zhuǎn)換的的表的元數(shù)據(jù)也會(huì)被緩存彻秆。如果這些表被 Hive 或者外部工具更新,需要手動(dòng)刷新來(lái)保持元數(shù)據(jù)的一致性结闸。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

配置項(xiàng)

Parquet 的配置項(xiàng)可以通過(guò) SparkSessionsetConf 方法或者使用 SQL 語(yǔ)法 SET key=value 來(lái)設(shè)置唇兑。

Property Name Default Meaning Since Version
spark.sql.parquet.binaryAsString false 一些處理 Parquet 文件的計(jì)算引擎,尤其是 Impala桦锄,HIve 和早期版本的 Spark SQL扎附,在寫(xiě)出數(shù)據(jù)時(shí)并不區(qū)分二進(jìn)制類型和自負(fù)串類型。該參數(shù)告訴 Spark SQL 將二進(jìn)制數(shù)據(jù)解析為字符串來(lái)與那些系統(tǒng)保持兼容结耀。 1.1.1
spark.sql.parquet.int96AsTimestamp true 一些處理 Parquet 文件的計(jì)算引擎留夜,尤其是 Impala 和 HIve,把時(shí)間戳存儲(chǔ)為 INT96 類型饼记。該參數(shù)告訴 Spark SQL 將 INT96 數(shù)據(jù)解析為時(shí)間戳來(lái)與那些系統(tǒng)保持兼容香伴。 1.3.0
spark.sql.parquet.compression.codec snappy 寫(xiě)出 Parquet 文件時(shí)使用的壓縮格式。如果在表屬性中同時(shí)設(shè)置了 compressionparquet.compression 屬性具则,則配置生效的優(yōu)先級(jí)從高到低依次為 compression, parquet.compression, spark.sql.parquet.compression.codec【甙铮可選的壓縮格式有:snappy, gzip, lzo, brotli, lz4, zstd博肋。注意低斋,在 Hadoop 2.9.0 之前 zstd 格式需要事先安裝 ZStandardCodecbrotli 格式需要事先安裝 BrotliCodec匪凡。 1.1.1
spark.sql.parquet.filterPushdown true 是否開(kāi)啟 Parquet 謂詞下推的優(yōu)化機(jī)制膊畴。 1.2.0
spark.sql.hive.convertMetastoreParquet true 設(shè)置為 false 時(shí),Spark SQL 會(huì)使用 Hive SerDe 解析 Parquet 文件而不是使用內(nèi)置的解析器病游。 1.1.1
spark.sql.parquet.mergeSchema false 設(shè)置為 true 時(shí)唇跨,Parquet 數(shù)據(jù)源會(huì)融合所有數(shù)據(jù)文件的模式,否則模式將會(huì)從概要文件中推斷衬衬,如果概要文件不存在买猖,就從隨機(jī)數(shù)據(jù)文件中推斷。 1.5.0
spark.sql.parquet.writeLegacyFormat false 如果設(shè)置為 true滋尉,數(shù)據(jù)將會(huì)以 Spark 1.4 之前的格式寫(xiě)出玉控。例如,小數(shù)格式的數(shù)據(jù)將會(huì)以定長(zhǎng)字節(jié)數(shù)組的格式寫(xiě)出狮惜,也是 Hive 和 Impala 的方式高诺。如果寫(xiě)出失敗,那么將會(huì)使用新的數(shù)據(jù)格式碾篡。例如虱而,小數(shù)格式的數(shù)據(jù)會(huì)以 int 類型的格式寫(xiě)出。如果寫(xiě)出的 Parquet 文件是給不支持新格式的系統(tǒng)所用开泽,請(qǐng)將該參數(shù)設(shè)置為 true牡拇。 1.6.0

ORC 文件

從 Spark 2.3 開(kāi)始,Spark 支持以向量化的方式讀取 ORC 文件眼姐,不過(guò)需要新增幾個(gè)配置項(xiàng)诅迷。對(duì)于原生 ORC 數(shù)據(jù)表(使用 USING ORC 語(yǔ)句創(chuàng)建的數(shù)據(jù)表),需要設(shè)置參數(shù) spark.sql.orc.implnative 众旗,設(shè)置參數(shù) spark.sql.orc.enableVectorizedReadertrue 來(lái)開(kāi)啟向量化讀取罢杉。對(duì)于 Hive ORC serde 數(shù)據(jù)表(使用 USING HIVE OPTIONS (fileFormat 'ORC') 語(yǔ)句創(chuàng)建的數(shù)據(jù)表),需要設(shè)置參數(shù) spark.sql.hive.convertMetastoreOrctrue 來(lái)開(kāi)啟向量化讀取

Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause USING ORC) when spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true. For the Hive ORC serde tables (e.g., the ones created using the clause USING HIVE OPTIONS (fileFormat 'ORC')), the vectorized reader is used when spark.sql.hive.convertMetastoreOrc is also set to true.

Property Name Default Meaning Since Version
spark.sql.orc.impl native ORC 的實(shí)現(xiàn)模式贡歧√沧猓可以是 nativehivenative 表示原生 ORC利朵, hive 表示 Hive 中的 ORC 類庫(kù)律想。 2.3.0
spark.sql.orc.enableVectorizedReader true native 模式下開(kāi)啟向量化讀取 ORC 文件。如果設(shè)置為 false绍弟,會(huì)使用非向量化的方式讀取 ORC 文件技即。對(duì)于 hive 模式,該配置被忽略樟遣。 2.3.0

JSON 文件

Spark SQL 可以自動(dòng)推斷 JSON 文件的模式并將其加載為一個(gè) Dataset[Row]而叼∩眢裕可以使用方法讀取一個(gè) Dataset[String] 或是 JSON 文件。

注意所葵陵,讀取的 JSON 文件不是經(jīng)典的格式 JSON 文件液荸。文件中每一行必須是一個(gè)獨(dú)立的、自包含的脱篙、有效的 JSON 對(duì)象娇钱。詳情參見(jiàn) JSON Lines text format, also called newline-delimited JSON

對(duì)于常見(jiàn)的多行 JSON 文件绊困,請(qǐng)?jiān)O(shè)置選項(xiàng) multiLinetrue文搂。

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

Hive 表

Spark SQL 也支持讀寫(xiě)存儲(chǔ)在 Hive(Apache Hive)中的數(shù)據(jù)考抄。然而细疚,由于 Hive 有很多很多依賴,這些依賴默認(rèn)沒(méi)有包含在 Spark 發(fā)布的版本中川梅。如果 Hive 依賴可以再 classpath 中找到疯兼,Spark 會(huì)自動(dòng)加載它們。注意這些 Hive 依賴需要同時(shí)在所有的的 worker 節(jié)點(diǎn)上贫途,為了方位存儲(chǔ)在 Hive 中的數(shù)據(jù)需要用到 Hive 的序列化和反序列化類庫(kù)(SerDes)吧彪。

需要將 Hive 的配置文件 hive-site.xmlcore-site.xml(其中的安全配置)和 hdfs-site.xml(HDFS 配置) 放到 conf/ 目錄下丢早。

集成 Hive 功能姨裸,必須在初始化 SparkSession 時(shí)配置開(kāi)啟 Hive 支持,包括一個(gè)跟 Hive metastore 的長(zhǎng)連接怨酝,對(duì) Hive serdes 的支持傀缩,和 Hive UDF。如果沒(méi)有安裝 Hive 也可以開(kāi)啟 Hive 支持农猬,若沒(méi)有配置 hive-site.xml赡艰,Spark 會(huì)在當(dāng)前目錄自動(dòng)創(chuàng)建一個(gè) metastore_db 文件和一個(gè)由參數(shù) spark.sql.warehouse.dir 指定的目錄,默認(rèn)值是當(dāng)前目錄下的 spark-warehouse斤葱,這里所說(shuō)的當(dāng)前目錄是指 Spark 應(yīng)用程序啟動(dòng)的目錄慷垮。注意從 Spark 2.0.0 開(kāi)始,hive-site.xml 文件中的 hive.metastore.warehouse.dir 配置項(xiàng)已經(jīng)被標(biāo)記為棄用揍堕,目前使用的是參數(shù) spark.sql.warehouse.dir 指定數(shù)據(jù)倉(cāng)庫(kù)的位置料身,啟動(dòng)程序的用戶必須對(duì)該目錄擁有寫(xiě)權(quán)限。

import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala」衩茸。

指定 Hive 表的存儲(chǔ)格式

創(chuàng)建一個(gè) Hive 表時(shí)芹血,需要定義這個(gè)表該如何從文件系統(tǒng)中讀取以及寫(xiě)入,即「input format」和「output format」。還需要定義這個(gè)表應(yīng)該如何進(jìn)行正反序列化的操作祟牲,即「serde」隙畜。下面的選項(xiàng)可以用來(lái)指定存儲(chǔ)格式(「serde」抖部,「input format」说贝,「output format」),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')慎颗。默認(rèn)情況下乡恕,表數(shù)據(jù)將會(huì)以文本的格式讀取。注意俯萎,目前在創(chuàng)建表時(shí)還不支持 Hive storage handler傲宜,你可以在 Hive 中適用 Hive storage handler 創(chuàng)建表,然后通過(guò) Spark SQL 來(lái)讀取夫啊。

Property Name Meaning
fileFormat 指定數(shù)據(jù)存儲(chǔ)的文件格式函卒,包括「serde」,「input format」撇眯,「output format」报嵌。目前支持 6 種文件格式:'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'。
inputFormat, outputFormat 這兩個(gè)選項(xiàng)指定 InputFormatOutputFormat 相應(yīng)的全限定類名熊榛,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat锚国。這兩個(gè)選項(xiàng)必須成對(duì)出現(xiàn),而且不能在設(shè)置了 fileFormat 選項(xiàng)的情況下再設(shè)置玄坦。
serde 該選項(xiàng)指定 serde 的全限定類名血筑。在選項(xiàng) fileFormat 被指定的情況下,如果其中已經(jīng)包含了 serde 信息煎楣,那么請(qǐng)不要設(shè)置該選項(xiàng)豺总。目前「sequencefile」,「textfile」择懂,「rcfile」格式并不包含 serde 信息喻喳,所以可以在這 3 種數(shù)據(jù)格式的情況下使用該選項(xiàng)。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這項(xiàng)選項(xiàng)只適用于「textfile」文件格式休蟹,它們定義了如何將文本分割成數(shù)據(jù)行沸枯。

所有其他的在 OPTIONS 中定義的屬性都會(huì)被當(dāng)做 Hive serde 屬性。

與不同版本的 Hive Metastore 交互

Spark SQL 對(duì) Hive 支持的一個(gè)很重要的點(diǎn)就是與 Hive metastore 進(jìn)行交互赂弓,這可以讓 Spark SQL 能夠訪問(wèn) Hive 表的元信息绑榴。從 Spark 1.4.0 開(kāi)始,一個(gè)單獨(dú)的 Spark SQL 構(gòu)建版本可以用來(lái)跟不同版本的 Hive metastore 進(jìn)行交互盈魁,請(qǐng)使用下面的配置翔怎。注意,與 Hive 的版本無(wú)關(guān),這里說(shuō)的的是 metastore 的版本赤套,在 Spark SQL 內(nèi)部會(huì)編譯內(nèi)建的 Hive 并使用那些類來(lái)執(zhí)行內(nèi)部流程(serdes, UDFs, UDAFs 等等)飘痛。

下列選項(xiàng)用來(lái)配置 Hive 版本,以獲取元數(shù)據(jù):

Property Name Default Meaning Since Version
spark.sql.hive.metastore.version 2.3.7 Hive metastore 的版本容握,可選項(xiàng)從 0.12.02.3.7宣脉,從 3.0.03.1.2漆枚。 1.4.0
spark.sql.hive.metastore.jars builtin 初始化 HiveMetastoreClient 所需要的 Jar 包的位置呻袭。該選項(xiàng)有三種可選值:1蚀同、builtin 使用 Hive 2.3.7阻课,跟 Spark 的集成構(gòu)建綁定颗祝,編譯時(shí)使用 -Phive 參數(shù)生效习寸,此時(shí)參數(shù) spark.sql.hive.metastore.version 的值必須是 2.3.7 或者未定義寂曹;2爱态、maven 使用從 Maven 倉(cāng)庫(kù)下載的指定版本的 Hive 依賴感憾,這種配置在生產(chǎn)環(huán)境不建議使用蜡励。3、一個(gè)適用于 JVM 的標(biāo)準(zhǔn)的 classpath 路徑阻桅,該路徑中必須包含所有 Hive 相關(guān)的類庫(kù)及其依賴凉倚,包括正確版本的 Hadoop。只有 driver 程序需要這些依賴鳍刷,但是如果使用 yarn cluster 模式啟動(dòng)應(yīng)用程序占遥,就需要把這些依賴裝進(jìn)你的應(yīng)用程序 Jar 文件。 1.4.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 以逗號(hào)分隔的全限定類名前綴列表输瓜,相應(yīng)的類需要通過(guò) Spark SQL 和指定版本的 Hive 共享的類加載器進(jìn)行加載瓦胎。例如其中一個(gè)需要被共享的依賴就是連接 metastore 需要用到的 JDBC 驅(qū)動(dòng)包。其他需要被共享的依賴就是那些已經(jīng)被共享的依賴尤揣,例如 log4j搔啊。 1.4.0
spark.sql.hive.metastore.barrierPrefixes (empty) 以逗號(hào)分隔的全限定類名前綴列表,這些依賴需要被 Spark SQL 連接的 Hive 的相應(yīng)版本重新加載北戏。例如负芋,定義了 Hive UDF 的依賴應(yīng)該被共享(即 org.apache.spark.*)。 1.4.0

JDBC 連接其他數(shù)據(jù)庫(kù)

Spark SQL 還提供了一個(gè)可以通過(guò) JDBC 連接其他數(shù)據(jù)庫(kù)的數(shù)據(jù)源嗜愈,該功能需要通過(guò) JdbcRDD 來(lái)實(shí)現(xiàn)旧蛾。結(jié)果將會(huì)以 DataFrame 的形式返回,之后就可以通過(guò) Spark SQL 進(jìn)行處理或者與其他數(shù)據(jù)源進(jìn)行連接蠕嫁。JDBC 數(shù)據(jù)源在 Java 和 Python 中用起來(lái)也很簡(jiǎn)單锨天,并不要求提供一個(gè) ClassTag。(注意 JDBC 數(shù)據(jù)源和 Spark SQL JDBC server 不是一回事剃毒,后者可以讓?xiě)?yīng)用程序通過(guò) Spark SQL 執(zhí)行查詢)

要使用 JDBC 數(shù)據(jù)源病袄,需要將相應(yīng)的 JDBC 驅(qū)動(dòng)包放到 spark 的 classpath 中搂赋。例如,如果想從 Spark Shell 中連接 PostGRE 數(shù)據(jù)庫(kù)需要執(zhí)行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

遠(yuǎn)端數(shù)據(jù)庫(kù)中的表可以通過(guò)數(shù)據(jù)源 API 被加載成為一個(gè) DataFrame 或者 Spark SQL 臨時(shí)視圖益缠。用戶可以通過(guò)數(shù)據(jù)源選項(xiàng)指定 JDBC 連接的參數(shù)脑奠。userpassword 選項(xiàng)通常用來(lái)登錄數(shù)據(jù)庫(kù),Spark 還支持下列大小寫(xiě)不敏感的選項(xiàng):

Property Name Meaning
url 連接 JDBC 所用的 URL幅慌。有些選項(xiàng)可以包含在 URL 中宋欺。例如,jdbc:postgresql://localhost/test?user=fred&password=secret欠痴。
dbtable 需要讀寫(xiě)的表迄靠。當(dāng)該選項(xiàng)被用作讀取時(shí)的參數(shù)時(shí),任何 SQL 中有效的 FROM 語(yǔ)句都可以被使用喇辽。例如,可以不填寫(xiě)完整的表名而是一個(gè)用小括號(hào)括起來(lái)的子查詢雨席。不允許同時(shí)指定 dbtablequery 選項(xiàng)菩咨。
query 一個(gè)從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的查詢語(yǔ)句。被指定的查詢會(huì)被小括號(hào)括起來(lái)作為 FROM 語(yǔ)句的子查詢陡厘,Spark 會(huì)為該子查詢分配一個(gè)別名抽米。例如,Spark 會(huì)組織一個(gè)這樣的語(yǔ)句來(lái)訪問(wèn) JDBC:SELECT <columns> FROM (<user_specified_query>) spark_gen_alias糙置。下面是一些使用該選項(xiàng)的限制云茸。不允許同時(shí)指定 dbtablequery 選項(xiàng)。不允許同時(shí)指定 partitionColumnquery 選項(xiàng)谤饭;當(dāng)需要指定 partitionColumn 選項(xiàng)時(shí)标捺,請(qǐng)使用 dbtable 來(lái)指定子查詢,分區(qū)列可以通過(guò)子查詢中的別名來(lái)指定揉抵。示例:spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load()亡容。
driver JDBC 驅(qū)動(dòng)的全限定類名。
partitionColumn, lowerBound, upperBound 這些選項(xiàng)必須同時(shí)指定冤今,此外闺兢,選項(xiàng) numPartitions 也必須指定。這些選項(xiàng)一同定義了如何從數(shù)據(jù)庫(kù)并行讀取數(shù)據(jù)戏罢。partitionColumn 選項(xiàng)必須是一個(gè)數(shù)字屋谭,日期或者時(shí)間戳類型。注意 lowerBoundupperBound 選項(xiàng)只影響分區(qū)步距龟糕,并不過(guò)濾表中的數(shù)據(jù)桐磁。所以表中的所有數(shù)據(jù)會(huì)被分區(qū)之后讀取,這些選項(xiàng)只在讀取時(shí)有效翩蘸。
numPartitions 讀寫(xiě)數(shù)據(jù)時(shí)可以被用到的最大分區(qū)數(shù)所意,該選項(xiàng)也決定了最大并發(fā) JDBC 連接數(shù)。如果寫(xiě)出時(shí)的分區(qū)數(shù)超過(guò)了這個(gè)限制,會(huì)在寫(xiě)出之前調(diào)用 coalesce(numPartitions) 方法削減分區(qū)扶踊。
queryTimeout JDBC 查詢超時(shí)時(shí)間泄鹏,單位為秒,零表示無(wú)限制秧耗。在寫(xiě)出時(shí)备籽,該選項(xiàng)取決于 JDBC 驅(qū)動(dòng)實(shí)現(xiàn) setQueryTimeout API 的方式,例如分井,h2 JDBC 驅(qū)動(dòng)每次查詢都會(huì)檢查超時(shí)時(shí)間车猬,而不是對(duì)于整個(gè) JDBC 批次檢查。默認(rèn)值為 0尺锚。
fetchsize JDBC 拉取數(shù)據(jù)的數(shù)量珠闰,決定了一次拉取多少行數(shù)據(jù)。該選項(xiàng)可以幫助提升那些默認(rèn)拉取值很小 JDBC 的性能(例如 Oracle 是一次 10 行)瘫辩。該選項(xiàng)只在讀取數(shù)據(jù)時(shí)有效伏嗜。
batchsize JDBC 批次數(shù)據(jù)的數(shù)量,決定了一次寫(xiě)出多少數(shù)據(jù)伐厌。該選項(xiàng)可以幫助提升 JDBC 的性能承绸。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。默認(rèn)值為 1000挣轨。
isolationLevel 事務(wù)隔離級(jí)別军熏,應(yīng)用于當(dāng)前連接【戆纾可以是 NONE荡澎,READ_COMMITTEDREAD_UNCOMMITTED画饥,REPEATABLE_READ衔瓮,或者 SERIALIZABLE,與 JDBC 連接定義的標(biāo)準(zhǔn)事務(wù)隔離級(jí)別相對(duì)應(yīng)抖甘,默認(rèn)值是 READ_UNCOMMITTED热鞍。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。詳情參見(jiàn) java.sql.Connection衔彻。
sessionInitStatement 在連接到遠(yuǎn)端數(shù)據(jù)庫(kù)的會(huì)話開(kāi)啟之后薇宠,在讀取數(shù)據(jù)之前,執(zhí)行該選項(xiàng)所定義的 SQL 語(yǔ)句(或是 PL/SQL 塊)艰额,可以通過(guò)該選項(xiàng)做一些會(huì)話初始化工作澄港。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate 當(dāng)存儲(chǔ)模式設(shè)置為 SaveMode.Overwrite 時(shí)柄沮,該選項(xiàng)會(huì)在寫(xiě)出時(shí)清空表而不是刪除后再創(chuàng)建回梧。這樣會(huì)更高效一些废岂,還避免了刪除表的元數(shù)據(jù)(比如說(shuō)索引)。然而在某些情況下該選項(xiàng)不會(huì)生效狱意,比如在新數(shù)據(jù)跟原始表的模式不同的時(shí)候湖苞。默認(rèn)值為 false。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效详囤。
cascadeTruncate 這是一個(gè) JDBC 寫(xiě)出時(shí)選項(xiàng)财骨。如果開(kāi)啟,同時(shí)數(shù)據(jù)庫(kù)也支持(目前只有 PostgreSQL 和 Oracle)藏姐,該選項(xiàng)在寫(xiě)出前會(huì)執(zhí)行 TRUNCATE TABLE t CASCADE 語(yǔ)句(在 PostgreSQL 中是 TRUNCATE TABLE ONLY t CASCADE隆箩,來(lái)避免無(wú)意間清空了衍生表)。該選項(xiàng)會(huì)影響其他的表羔杨,需要謹(jǐn)慎使用捌臊。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效。默認(rèn)值為相應(yīng)數(shù)據(jù)庫(kù)默認(rèn)的 cascading truncate 行為问畅,通過(guò) JDBC 會(huì)話中的 isCascadeTruncate 參數(shù)指定娃属。
createTableOptions 這是一個(gè) JDBC 寫(xiě)出時(shí)選項(xiàng)真慢。該選項(xiàng)可以指定寫(xiě)出數(shù)據(jù)時(shí)的建表語(yǔ)句(例如 CREATE TABLE t (name string) ENGINE=InnoDB.)缨睡。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效汇荐。
createTableColumnTypes 在建表時(shí)指定每列的數(shù)據(jù)類型而不是采用默認(rèn)值。數(shù)據(jù)類型信息應(yīng)該和 CREATE TABLE 語(yǔ)句中指定的一致(例如:"name CHAR(64), comments VARCHAR(1024)"))卵皂。被指定的類型應(yīng)該是有效的 Spark SQL 數(shù)據(jù)類型。該選項(xiàng)只在寫(xiě)出數(shù)據(jù)時(shí)有效砚亭。
customSchema 從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)時(shí)自定義數(shù)據(jù)模式灯变,例如:"id DECIMAL(38, 0), name STRING"⊥北欤可以只指定部分列添祸,其他列使用默認(rèn)類型映射,例如:"id DECIMAL(38, 0)"寻仗。列名應(yīng)該和數(shù)據(jù)庫(kù)表中的字段名相同刃泌。用戶可以通過(guò)該選項(xiàng)指定 Spark SQL 中的相應(yīng)類型而不是使用默認(rèn)映射。該選項(xiàng)只在讀取數(shù)據(jù)時(shí)有效署尤。
pushDownPredicate 是否開(kāi)啟謂詞下推機(jī)制耙替,默認(rèn)值是 true,Spark 會(huì)盡力將過(guò)濾語(yǔ)句下推到數(shù)據(jù)庫(kù)執(zhí)行曹体。否則俗扇,如果被設(shè)置為 false,不會(huì)有謂詞下推箕别,所有的過(guò)濾操作都會(huì)由 Spark 來(lái)執(zhí)行铜幽。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」滞谢。

Avro 文件

從 Spark 2.4 開(kāi)始, Spark SQL 對(duì) Avro 數(shù)據(jù)的讀寫(xiě)提供了原生支持除抛。

部署

spark-avro 是額外的模塊狮杨,默認(rèn)情況下并不包含在 spark-submit 或者 spark-shell 的 classpath 中。

對(duì)于 Spark 應(yīng)用程序來(lái)說(shuō)镶殷, spark-submit 腳本用來(lái)啟動(dòng)應(yīng)用程序禾酱。spark-avro_2.12 和它的依賴可以通過(guò)添加到 spark-submit 腳本的參數(shù)中,就像:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.0.0 ...

對(duì)于 spark-shell 腳本绘趋,也可以通過(guò) --packages 參數(shù)直接添加 org.apache.spark:spark-avro_2.12 和它的依賴:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 ...

更多有關(guān)啟動(dòng)應(yīng)用程序添加額外依賴的詳情參見(jiàn) Application Submission Guide颤陶。

讀寫(xiě)函數(shù)

由于 spark-avro 模塊是外部的,在 DataFrameReader 或者 DataFrameWriter 中并不存在 .avro API陷遮。

讀寫(xiě) Avro 格式的數(shù)據(jù)滓走,需要指定數(shù)據(jù)源選項(xiàng) formatavro(或者 org.apache.spark.sql.avro)。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

to_avro() 和 from_avro()

Avro 依賴提供了 to_avro() 函數(shù)來(lái)編碼一列數(shù)據(jù)為 Avro 格式帽馋,以及 from_avro() 函數(shù)來(lái)解碼 Avro 數(shù)據(jù)成為一列對(duì)象搅方。兩個(gè)函數(shù)都將一列轉(zhuǎn)換為另外一列,輸入輸出的 SQL 數(shù)據(jù)類型可以是復(fù)雜類型或者基礎(chǔ)類型绽族。

當(dāng)從一個(gè)像 Kafka 一樣的流失數(shù)據(jù)源讀寫(xiě)數(shù)據(jù)時(shí)姨涡,使用 Avro record 作為一列很方便。每一個(gè) Kafka 鍵值對(duì)記錄會(huì)跟一些元信息一起被讀取進(jìn)來(lái)吧慢,比如 Kafka 收錄這條記錄時(shí)的時(shí)間戳涛漂,記錄偏移量等等。

  • 如果字段中包含的數(shù)據(jù)是 Avro 格式检诗,可以使用 from_avro() 函數(shù)來(lái)抽取數(shù)據(jù)匈仗,改進(jìn),清洗逢慌,然后再輸出到 Kafka 中去 或者寫(xiě)入到文件悠轩。
  • 函數(shù)可以用來(lái)將結(jié)構(gòu)化數(shù)據(jù)轉(zhuǎn)換為 Avro record。對(duì)于需要將多列融合為一列寫(xiě)出到 Kafka 的場(chǎng)景很適用攻泼。

兩個(gè)函數(shù)目前已支持 Scala 和 Java 語(yǔ)言火架。

import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

數(shù)據(jù)源選項(xiàng)

Avro 數(shù)據(jù)源選項(xiàng)可以通過(guò)以下兩種方式設(shè)置:

  • DataFrameReader 或者 DataFrameWriter 中的 .option 方法。
  • 函數(shù) from_avro 中的 options 參數(shù)坠韩。
Property Name Default Meaning Scope
avroSchema None 可選距潘,用戶提供的 JSON 格式的模式。在讀取 Avro 數(shù)據(jù)時(shí)只搁,此選項(xiàng)可以用來(lái)定義一個(gè)演化的模式音比,可以和實(shí)際的 Avro 模式不同,但必須兼容氢惋。反序列化的模式會(huì)和演化模式一致洞翩。 例如稽犁,如果設(shè)置一個(gè)擁有一個(gè)額外字段(有默認(rèn)值)的演化模式,Spark 讀取的結(jié)果中也會(huì)包含這個(gè)新字段骚亿。當(dāng)寫(xiě)出數(shù)據(jù)為 Avro 時(shí)已亥,該選項(xiàng)可以調(diào)和目標(biāo)模式與 Spark 轉(zhuǎn)換的模式不匹配的情況。例如来屠,目標(biāo)模式中有一個(gè)列式枚舉類型虑椎,而 Spark 生成的模式中該字段是字符串類型。 read, write and function from_avro
recordName topLevelRecord 寫(xiě)出數(shù)據(jù)時(shí)的 record 名稱俱笛,Avro 需要指定捆姜。 write
recordNamespace "" 寫(xiě)出數(shù)據(jù)時(shí)的 record 命名空間。 write
ignoreExtension true 該選項(xiàng)控制在讀取數(shù)據(jù)時(shí)是否忽略文件的 .avro 后綴迎膜。如果開(kāi)啟泥技,所有的文件都會(huì)被加載(有或者沒(méi)有 .avro 后綴)。該項(xiàng)選已被棄用磕仅,會(huì)在將來(lái)的版本中移除珊豹。請(qǐng)使用更通用的數(shù)據(jù)源選項(xiàng) pathGlobFilter 來(lái)過(guò)濾文件。 read
compression snappy compression 選項(xiàng)指定在寫(xiě)出數(shù)據(jù)時(shí)的壓縮格式榕订,目前支持的選項(xiàng)有 uncompressed, snappy, deflate, bzip2xz店茶。如果沒(méi)有指定,會(huì)使用配置中的 spark.sql.avro.compression.codec 參數(shù)劫恒。 write
mode FAILFAST mode 選項(xiàng)指定 from_avro 函數(shù)的解析模式忽妒,目前支持的模式有 FAILFAST:在處理?yè)p壞文件時(shí)拋出異常;PERMISSIVE:損壞的記錄被當(dāng)做空值處理兼贸。所以,數(shù)據(jù)模式會(huì)被強(qiáng)制改變?yōu)榭梢詾榭罩党越Γ@有可能與用戶提供的模式不同溶诞。 function from_avro

配置項(xiàng)

可以通過(guò) SparkSession 對(duì)象的 setConf 方法或者執(zhí)行 SQL 中的 SET key=value 命令來(lái)改變的配置項(xiàng)。

Property Name Default Meaning Since Version
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果設(shè)置為 true决侈,為了向后兼容螺垢,數(shù)據(jù)源 provider com.databricks.spark.avro 會(huì)被作為外部數(shù)據(jù)源模塊映射為內(nèi)置模式。 2.4.0
spark.sql.avro.compression.codec snappy 寫(xiě)出 Avro 文件時(shí)使用的壓縮格式赖歌。支持的壓縮格式有:uncompressed, deflate, snappy, bzip2 和 xz枉圃。默認(rèn)值為 snappy。 2.4.0
spark.sql.avro.deflate.level -1 寫(xiě)出 Avro 文件時(shí)石筍的壓縮級(jí)別庐冯,有效值必須在 1 到9 之間或者 -1孽亲。默認(rèn)值為 -1,在目前的版本中相當(dāng)于 6展父。 2.4.0

與 Databricks spark-avro 的兼容

該 Avro 數(shù)據(jù)源模塊衍生于 Databricks 的開(kāi)源版本 spark-avro返劲,并與之保持兼容玲昧。

默認(rèn)情況下 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 配置是開(kāi)啟的,數(shù)據(jù)源 provider com.databricks.spark.avro 會(huì)被映射為內(nèi)置的 Avro 模塊篮绿。對(duì)于通過(guò) com.databricks.spark.avro 中的 Provider 創(chuàng)建的表孵延,其元數(shù)據(jù)可以被內(nèi)置 Avro 模塊讀取。

注意亲配,在 Databricks 的 spark-avro 中尘应,隱式類 AvroDataFrameWriterAvroDataFrameReader 可以通過(guò)函數(shù) .avro() 創(chuàng)建。而在內(nèi)置的模塊中吼虎,兩個(gè)隱式類都被移除了犬钢。請(qǐng)?jiān)?DataFrameWriterDataFrameReader 對(duì)象中使用 .format("avro") 方法,足夠簡(jiǎn)潔明了鲸睛。

如果你更想使用自己構(gòu)建的 spark-avro jar 文件娜饵,可以將配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 設(shè)置為 false,通過(guò) --jars 參數(shù)來(lái)部署應(yīng)用程序官辈。詳情參見(jiàn) Advanced Dependency Management箱舞。

支持的數(shù)據(jù)類型 Avro -> Spark SQL

目前 Spark 支持讀取 Avro record 中所有的基礎(chǔ)數(shù)據(jù)類型(primitive types)和復(fù)雜數(shù)據(jù)類型(complex types)。

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union See below

除了上面列出的類型之外拳亿,還支持讀取 union 類型晴股,下面三種類型被解析為 union 類型:

  1. union(int, long) 會(huì)被映射為 LongType。
  2. union(float, double) 會(huì)被映射為 DoubleType肺魁。
  3. union(something, null)电湘,其中 something 是任意支持的 Avro 類型。該類型會(huì)被映射為與 something 相關(guān)的 Spark SQL 類型(nullable 設(shè)置為 true)鹅经。所有其他的類型會(huì)被當(dāng)做復(fù)雜數(shù)據(jù)類型寂呛,會(huì)被映射為 StructType,其中的字段名稱分別為 member0瘾晃,member1 等等贷痪,數(shù)量與 union 中的類型數(shù)量一致。在 Avro 和 Parquet 之間相互轉(zhuǎn)換時(shí)也遵循同樣的原則蹦误。

還支持讀取下列 Avro 邏輯類型(logical types):

Avro logical type Avro type Spark SQL type
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前劫拢,Spark SQL 會(huì)忽略 Avro 文件的 docs,aliases 和其他屬性强胰。

支持的數(shù)據(jù)類型 Spark SQL -> Avro conversion

Spark 支持寫(xiě)入 Spark SQL 數(shù)據(jù)類型的數(shù)據(jù)到 Avro舱沧。對(duì)于大多數(shù)數(shù)據(jù)類型,從 Spark SQL 數(shù)據(jù)類型到 Avro 類型的映射是很直接的(比如說(shuō) IntegerType 映射為 int)偶洋;然而還有一些特殊的情況:

Spark SQL type Avro type Avro logical type
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

還可以通過(guò)選項(xiàng) avroSchema 指定完整的 Avro 模式波闹,這樣 Spark SQL 數(shù)據(jù)類型會(huì)被轉(zhuǎn)換為另一種 Avro 數(shù)據(jù)類型摇幻。下列轉(zhuǎn)換不能通過(guò)默認(rèn)映射執(zhí)行肤京,需要用戶指定 Avro 模式:

Spark SQL type Avro type Avro logical type
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal

二進(jìn)制文件

從 Spark 3.0 開(kāi)始,Spark 支持二進(jìn)制文件數(shù)據(jù)源肾筐,可以讀取二進(jìn)制文件,將每個(gè)文件轉(zhuǎn)換為一條記錄缸剪,記錄中包含了完整的文件數(shù)據(jù)吗铐。生成的 DataFrame 會(huì)包含以下字段以及可能的分區(qū)字段:

  • path: StringType
  • modificationTime: TimestampType
  • length: LongType
  • content: BinaryType

讀取完整的二進(jìn)制文件需要指定數(shù)據(jù)源選項(xiàng) formatbinaryFile,可以通過(guò)數(shù)據(jù)源選項(xiàng) pathGlobFilter 來(lái)過(guò)濾需要加載的文件同時(shí)不影響分區(qū)發(fā)現(xiàn)的功能杏节。例如下面的代碼讀取指定路徑中所有的 PNG 文件:

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")

二進(jìn)制文件數(shù)據(jù)源不支持將一個(gè) DataFrame 寫(xiě)回到原來(lái)的文件唬渗。

疑難解答

  • JDBC 驅(qū)動(dòng)包必須可以被 client 和所有 executor 的 Application ClassLoader 加載。這是因?yàn)?Java 的 DriverManager 類會(huì)進(jìn)行安全檢查奋渔,會(huì)在打開(kāi)數(shù)據(jù)庫(kù)連接時(shí)忽略所有無(wú)法被 Application ClassLoader 加載的類镊逝。將相關(guān)的 Jar 文件加入到所有節(jié)點(diǎn)上的 compute_classpath.sh 腳本中是一種簡(jiǎn)便的解決方案。
  • 某些數(shù)據(jù)庫(kù)嫉鲸,比如說(shuō) H2撑蒜,將所有的名稱轉(zhuǎn)換為大寫(xiě),此時(shí)在 Spark SQL 中引用那些名稱時(shí)也需要用大寫(xiě)形式玄渗。
  • 用戶可以在數(shù)據(jù)源選項(xiàng)中指定 JDBC 驅(qū)動(dòng)包供應(yīng)商提供的特殊屬性座菠。例如 spark.read.format("jdbc").option("url", oracleJdbcUrl).option("oracle.jdbc.mapDateToTimestamp", "false")oracle.jdbc.mapDateToTimestamp 默認(rèn)為 true藤树,用戶通常會(huì)關(guān)閉該選項(xiàng)來(lái)避免 Oracle 中的 date 類型轉(zhuǎn)換為 timestamp浴滴。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市岁钓,隨后出現(xiàn)的幾起案子升略,更是在濱河造成了極大的恐慌,老刑警劉巖屡限,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件品嚣,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡钧大,警方通過(guò)查閱死者的電腦和手機(jī)腰根,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拓型,“玉大人,你說(shuō)我怎么就攤上這事瘸恼×哟欤” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵东帅,是天一觀的道長(zhǎng)压固。 經(jīng)常有香客問(wèn)我,道長(zhǎng)靠闭,這世上最難降的妖魔是什么帐我? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任坎炼,我火速辦了婚禮,結(jié)果婚禮上拦键,老公的妹妹穿的比我還像新娘谣光。我一直安慰自己,他們只是感情好芬为,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布萄金。 她就那樣靜靜地躺著,像睡著了一般媚朦。 火紅的嫁衣襯著肌膚如雪氧敢。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天询张,我揣著相機(jī)與錄音孙乖,去河邊找鬼。 笑死份氧,一個(gè)胖子當(dāng)著我的面吹牛唯袄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播半火,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼越妈,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了钮糖?” 一聲冷哼從身側(cè)響起梅掠,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎店归,沒(méi)想到半個(gè)月后阎抒,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡消痛,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年且叁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秩伞。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡逞带,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出纱新,到底是詐尸還是另有隱情展氓,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布脸爱,位于F島的核電站遇汞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜空入,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一络它、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧歪赢,春花似錦化戳、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至递鹉,卻和暖如春盟步,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背躏结。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工却盘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人媳拴。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓黄橘,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親屈溉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子塞关,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359