Spark SQL, DataFrames and Datasets Guide
無類型的Dataset操作 (aka DataFrame 操作)
Running SQL Queries Programmatically
Untyped User-Defined Aggregate Functions
Type-Safe User-Defined Aggregate Functions
Generic Load/Save Functions (通用 加載/保存 功能)
Manually Specifying Options (手動指定選項)
Run SQL on files directly (直接在文件上運行 SQL)
Saving to Persistent Tables (保存到持久表)
Bucketing, Sorting and Partitioning (分桶, 排序和分區(qū))
Loading Data Programmatically (以編程的方式加載數(shù)據(jù))
Partition Discovery (分區(qū)發(fā)現(xiàn))
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉(zhuǎn)換)
Hive/Parquet Schema Reconciliation
Metadata Refreshing (元數(shù)據(jù)刷新)
JSON Datasets (JSON 數(shù)據(jù)集)
DataFrame data reader/writer interface
DataFrame.groupBy 保留 grouping columns(分組的列)
隔離隱式轉(zhuǎn)換和刪除 dsl 包(僅Scala)
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala)
UDF 注冊遷移到sqlContext.udf中 (Java & Scala)
Python DataTypes 不再是 Singletons(單例的)
在現(xiàn)有的 Hive Warehouses 中部署
Spark SQL 是 Spark 處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊.與基礎的 Spark RDD API 不同, Spark SQL 提供了查詢結(jié)構(gòu)化數(shù)據(jù)及計算結(jié)果等信息的接口.在內(nèi)部, Spark SQL 使用這個額外的信息去執(zhí)行額外的優(yōu)化.有幾種方式可以跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執(zhí)行引擎進行計算時, 無論使用哪種 API / 語言都可以快速的計算.這種統(tǒng)一意味著開發(fā)人員能夠在基于提供最自然的方式來表達一個給定的 transformation API 之間實現(xiàn)輕松的來回切換不同的 .
該頁面所有例子使用的示例數(shù)據(jù)都包含在 Spark 的發(fā)布中, 并且可以使用spark-shell,pysparkshell, 或者sparkRshell來運行.
Spark SQL 的功能之一是執(zhí)行 SQL 查詢.Spark SQL 也能夠被用于從已存在的 Hive 環(huán)境中讀取數(shù)據(jù).更多關于如何配置這個特性的信息, 請參考Hive 表這部分. 當以另外的編程語言運行SQL 時, 查詢結(jié)果將以Dataset/DataFrame的形式返回.您也可以使用命令行或者通過JDBC/ODBC與 SQL 接口交互.
一個 Dataset 是一個分布式的數(shù)據(jù)集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優(yōu)點(強類型化, 能夠使用強大的 lambda 函數(shù))與Spark SQL執(zhí)行引擎的優(yōu)點.一個 Dataset 可以從 JVM 對象來構(gòu)造并且使用轉(zhuǎn)換功能(map, flatMap, filter, 等等). Dataset API 在Scala和Java是可用的.Python 不支持 Dataset API.但是由于 Python 的動態(tài)特性, 許多 Dataset API 的優(yōu)點已經(jīng)可用了 (也就是說, 你可能通過 name 天生的row.columnName屬性訪問一行中的字段).這種情況和 R 相似.
一個 DataFrame 是一個Dataset組成的指定列.它的概念與一個在關系型數(shù)據(jù)庫或者在 R/Python 中的表是相等的, 但是有很多優(yōu)化. DataFrames 可以從大量的sources中構(gòu)造出來, 比如: 結(jié)構(gòu)化的文本文件, Hive中的表, 外部數(shù)據(jù)庫, 或者已經(jīng)存在的 RDDs. DataFrame API 可以在 Scala, Java,Python, 和R中實現(xiàn). 在 Scala 和 Java中, 一個 DataFrame 所代表的是一個多個Row(行)的的 Dataset(數(shù)據(jù)集合). 在the Scala API中,DataFrame僅僅是一個Dataset[Row]類型的別名. 然而, 在Java API中, 用戶需要去使用Dataset去代表一個DataFrame.
在此文檔中, 我們將常常會引用 Scala/Java Datasets 的Rows 作為 DataFrames.
Spark SQL中所有功能的入口點是SparkSession類. 要創(chuàng)建一個SparkSession, 僅使用SparkSession.builder()就可以了:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFramesimportspark.implicits._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark 2.0 中的SparkSession為 Hive 特性提供了內(nèi)嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數(shù)據(jù)的能力.為了使用這些特性, 你不需要去有一個已存在的 Hive 設置.
在一個SparkSession中, 應用程序可以從一個已經(jīng)存在的RDD, 從hive表, 或者從Spark數(shù)據(jù)源中創(chuàng)建一個DataFrames.
舉個例子, 下面就是基于一個JSON文件創(chuàng)建一個DataFrame:
valdf=spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
無類型的Dataset操作 (aka DataFrame 操作)
DataFrames 提供了一個特定的語法用在Scala,Java,PythonandR中機構(gòu)化數(shù)據(jù)的操作.
正如上面提到的一樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個Rows的Dataset. 這些操作也參考了與強類型的Scala/Java Datasets中的”類型轉(zhuǎn)換” 對應的”無類型轉(zhuǎn)換” .
這里包括一些使用 Dataset 進行結(jié)構(gòu)化數(shù)據(jù)處理的示例 :
// This import is needed to use the $-notationimportspark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// |? name|// +-------+// |Michael|// |? Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name",$"age"+1).show()// +-------+---------+// |? name|(age + 1)|// +-------+---------+// |Michael|? ? null|// |? Andy|? ? ? 31|// | Justin|? ? ? 20|// +-------+---------+// Select people older than 21df.filter($"age">21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// |? 19|? ? 1|// |null|? ? 1|// |? 30|? ? 1|// +----+-----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
能夠在 DataFrame 上被執(zhí)行的操作類型的完整列表請參考API 文檔.
除了簡單的列引用和表達式之外, DataFrame 也有豐富的函數(shù)庫, 包括 string 操作, date 算術, 常見的 math 操作以及更多.可用的完整列表請參考DataFrame 函數(shù)指南.
Running SQL Queries Programmatically
SparkSession的sql函數(shù)可以讓應用程序以編程的方式運行 SQL 查詢, 并將結(jié)果作為一個DataFrame返回.
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")valsqlDF=spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark SQL中的臨時視圖是session級別的, 也就是會隨著session的消失而消失. 如果你想讓一個臨時視圖在所有session中相互傳遞并且可用, 直到Spark 應用退出, 你可以建立一個全局的臨時視圖.全局的臨時視圖存在于系統(tǒng)數(shù)據(jù)庫global_temp中, 我們必須加上庫名去引用它, 比如.SELECT * FROM global_temp.view1.
// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Dataset 與 RDD 相似, 然而, 并不是使用 Java 序列化或者 Kryo編碼器來序列化用于處理或者通過網(wǎng)絡進行傳輸?shù)膶ο? 雖然編碼器和標準的序列化都負責將一個對象序列化成字節(jié), 編碼器是動態(tài)生成的代碼, 并且使用了一種允許 Spark 去執(zhí)行許多像 filtering, sorting 以及 hashing 這樣的操作, 不需要將字節(jié)反序列化成對象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name:String,age:Long)// Encoders are created for case classesvalcaseClassDS=Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._valprimitiveDS=Seq(1,2,3).toDS()primitiveDS.map(_+1).collect()// Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by namevalpath="examples/src/main/resources/people.json"valpeopleDS=spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark SQL 支持兩種不同的方法用于轉(zhuǎn)換已存在的 RDD 成為 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基于方法的反射可以讓你的代碼更簡潔.
第二種用于創(chuàng)建 Dataset 的方法是通過一個允許你構(gòu)造一個 Schema 然后把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它允許你去構(gòu)造 Dataset.
Spark SQL 的 Scala 接口支持自動轉(zhuǎn)換一個包含 case classes 的 RDD 為 DataFrame.Case class 定義了表的 Schema.Case class 的參數(shù)名使用反射讀取并且成為了列名.Case class 也可以是嵌套的或者包含像Seq或者Array這樣的復雜類型.這個 RDD 能夠被隱式轉(zhuǎn)換成一個 DataFrame 然后被注冊為一個表.表可以用于后續(xù)的 SQL 語句.
// For implicit conversions from RDDs to DataFramesimportspark.implicits._// Create an RDD of Person objects from a text file, convert it to a DataframevalpeopleDF=spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by SparkvalteenagersDF=spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager=>"Name: "+teenager(0)).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager=>"Name: "+teenager.getAs[String]("name")).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager=>teenager.getValuesMap[Any](List("name","age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
當 case class 不能夠在執(zhí)行之前被定義(例如, records 記錄的結(jié)構(gòu)在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析并且不同的用戶投影的字段是不一樣的).一個DataFrame可以使用下面的三步以編程的方式來創(chuàng)建.
從原始的 RDD 創(chuàng)建 RDD 的Row(行);
Step 1 被創(chuàng)建后, 創(chuàng)建 Schema 表示一個StructType匹配 RDD 中的Row(行)的結(jié)構(gòu).
通過SparkSession提供的createDataFrame方法應用 Schema 到 RDD 的 RowS(行).
例如:
importorg.apache.spark.sql.types._// Create an RDDvalpeopleRDD=spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringvalschemaString="name age"http:// Generate the schema based on the string of schemavalfields=schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))valschema=StructType(fields)// Convert records of the RDD (people) to RowsvalrowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0),attributes(1).trim))// Apply the schema to the RDDvalpeopleDF=spark.createDataFrame(rowRDD,schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesvalresults=spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes=>"Name: "+attributes(0)).show()// +-------------+// |? ? ? ? value|// +-------------+// |Name: Michael|// |? Name: Andy|// | Name: Justin|// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Thebuilt-in DataFrames functionsprovide common aggregations such ascount(),countDistinct(),avg(),max(),min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them inScalaandJavato work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend theUserDefinedAggregateFunctionabstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.expressions.UserDefinedAggregateFunctionimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionobjectMyAverageextendsUserDefinedAggregateFunction{// Data types of input arguments of this aggregate functiondefinputSchema:StructType=StructType(StructField("inputColumn",LongType)::Nil)// Data types of values in the aggregation bufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)}// The data type of the returned valuedefdataType:DataType=DoubleType// Whether this function always returns the same output on the identical inputdefdeterministic:Boolean=true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.definitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0Lbuffer(1)=0L}// Updates the given aggregation buffer `buffer` with new input data from `input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0)){buffer(0)=buffer.getLong(0)+input.getLong(0)buffer(1)=buffer.getLong(1)+1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`defmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}// Calculates the final resultdefevaluate(buffer:Row):Double=buffer.getLong(0).toDouble/buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage",MyAverage)valdf=spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()// +-------+------+// |? name|salary|// +-------+------+// |Michael|? 3000|// |? Andy|? 4500|// | Justin|? 3500|// |? Berta|? 4000|// +-------+------+valresult=spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()// +--------------+// |average_salary|// +--------------+// |? ? ? ? 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around theAggregatorabstract class. For example, a type-safe user-defined average can look like:
importorg.apache.spark.sql.expressions.Aggregatorimportorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encodersimportorg.apache.spark.sql.SparkSessioncaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]{// A zero value for this aggregation. Should satisfy the property that any b + zero = bdefzero:Average=Average(0L,0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdefreduce(buffer:Average,employee:Employee):Average={buffer.sum+=employee.salarybuffer.count+=1buffer}// Merge two intermediate valuesdefmerge(b1:Average,b2:Average):Average={b1.sum+=b2.sumb1.count+=b2.countb1}// Transform the output of the reductiondeffinish(reduction:Average):Double=reduction.sum.toDouble/reduction.count// Specifies the Encoder for the intermediate value typedefbufferEncoder:Encoder[Average]=Encoders.product// Specifies the Encoder for the final output value typedefoutputEncoder:Encoder[Double]=Encoders.scalaDouble}valds=spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// +-------+------+// |? name|salary|// +-------+------+// |Michael|? 3000|// |? Andy|? 4500|// | Justin|? 3500|// |? Berta|? 4000|// +-------+------+// Convert the function to a `TypedColumn` and give it a namevalaverageSalary=MyAverage.toColumn.name("average_salary")valresult=ds.select(averageSalary)result.show()// +--------------+// |average_salary|// +--------------+// |? ? ? ? 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.
Spark SQL 支持通過 DataFrame 接口對各種 data sources (數(shù)據(jù)源)進行操作. DataFrame 可以使用 relational transformations (關系轉(zhuǎn)換)操作, 也可用于創(chuàng)建 temporary view (臨時視圖). 將 DataFrame 注冊為 temporary view (臨時視圖)允許您對其數(shù)據(jù)運行 SQL 查詢. 本節(jié) 描述了使用 Spark Data Sources 加載和保存數(shù)據(jù)的一般方法, 然后涉及可用于 built-in data sources (內(nèi)置數(shù)據(jù)源)的 specific options (特定選項).
Generic Load/Save Functions (通用 加載/保存 功能)
在最簡單的形式中, 默認數(shù)據(jù)源(parquet, 除非另有配置spark.sql.sources.default)將用于所有操作.
valusersDF=spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Manually Specifying Options (手動指定選項)
您還可以 manually specify (手動指定)將與任何你想傳遞給 data source 的其他選項一起使用的 data source . Data sources 由其 fully qualified name (完全限定名稱)(即org.apache.spark.sql.parquet), 但是對于 built-in sources (內(nèi)置的源), 你也可以使用它們的 shortnames (短名稱)(json,parquet,jdbc,orc,libsvm,csv,text).從任何 data source type (數(shù)據(jù)源類型)加載 DataFrames 可以使用此 syntax (語法)轉(zhuǎn)換為其他類型.
valpeopleDF=spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Run SQL on files directly (直接在文件上運行 SQL)
不使用讀取 API 將文件加載到 DataFrame 并進行查詢, 也可以直接用 SQL 查詢該文件.
valsqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Save operations (保存操作)可以選擇使用SaveMode, 它指定如何處理現(xiàn)有數(shù)據(jù)如果存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)并且不是 atomic (原子). 另外, 當執(zhí)行Overwrite時, 數(shù)據(jù)將在新數(shù)據(jù)寫出之前被刪除.
Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果數(shù)據(jù)已經(jīng)存在, 則會拋出異常.
SaveMode.Append"append"將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果 data/table 已存在, 則 DataFrame 的內(nèi)容將被 append (附加)到現(xiàn)有數(shù)據(jù)中.
SaveMode.Overwrite"overwrite"Overwrite mode (覆蓋模式)意味著將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果 data/table 已經(jīng)存在, 則預期 DataFrame 的內(nèi)容將 overwritten (覆蓋)現(xiàn)有數(shù)據(jù).
SaveMode.Ignore"ignore"Ignore mode (忽略模式)意味著當將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果數(shù)據(jù)已經(jīng)存在, 則保存操作預期不會保存 DataFrame 的內(nèi)容, 并且不更改現(xiàn)有數(shù)據(jù). 這與 SQL 中的CREATE TABLE IF NOT EXISTS類似.
Saving to Persistent Tables (保存到持久表)
DataFrames也可以使用saveAsTable命令作為 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現(xiàn)有的 Hive 部署)不需要使用此功能. Spark 將為您創(chuàng)建默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與createOrReplaceTempView命令不同,saveAsTable將 materialize (實現(xiàn)) DataFrame 的內(nèi)容, 并創(chuàng)建一個指向 Hive metastore 中數(shù)據(jù)的指針. 即使您的 Spark 程序重新啟動, Persistent tables (持久性表)仍然存在, 因為您保持與同一個 metastore 的連接. 可以通過使用表的名稱在SparkSession上調(diào)用table方法來創(chuàng)建 persistent tabl (持久表)的 DataFrame .
對于 file-based (基于文件)的 data source (數(shù)據(jù)源), 例如 text, parquet, json等, 您可以通過path選項指定 custom table path (自定義表路徑), 例如df.write.option("path", "/some/path").saveAsTable("t"). 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 并且表數(shù)據(jù)仍然存在. 如果未指定自定義表路徑, Spark 將把數(shù)據(jù)寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.
從 Spark 2.1 開始, persistent datasource tables (持久性數(shù)據(jù)源表)將 per-partition metadata (每個分區(qū)元數(shù)據(jù))存儲在 Hive metastore 中. 這帶來了幾個好處:
由于 metastore 只能返回查詢的必要 partitions (分區(qū)), 因此不再需要將第一個查詢上的所有 partitions discovering 到表中.
Hive DDLs 如ALTER TABLE PARTITION ... SET LOCATION現(xiàn)在可用于使用 Datasource API 創(chuàng)建的表.
請注意, 創(chuàng)建 external datasource tables (外部數(shù)據(jù)源表)(帶有path選項)的表時, 默認情況下不會收集 partition information (分區(qū)信息). 要 sync (同步) metastore 中的分區(qū)信息, 可以調(diào)用MSCK REPAIR TABLE.
Bucketing, Sorting and Partitioning (分桶, 排序和分區(qū))
對于 file-based data source (基于文件的數(shù)據(jù)源), 也可以對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用于 persistent tables :
peopleDF.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
在使用 Dataset API 時, partitioning 可以同時與save和saveAsTable一起使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
可以為 single table (單個表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
partitionBy創(chuàng)建一個 directory structure (目錄結(jié)構(gòu)), 如Partition Discovery部分所述. 因此, 對 cardinality (基數(shù))較高的 columns 的適用性有限. 相反,bucketBy可以在固定數(shù)量的 buckets 中分配數(shù)據(jù), 并且可以在 a number of unique values is unbounded (多個唯一值無界時)使用數(shù)據(jù).
Parquet是許多其他數(shù)據(jù)處理系統(tǒng)支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數(shù)據(jù)的模式). 當編寫 Parquet 文件時, 出于兼容性原因, 所有 columns 都將自動轉(zhuǎn)換為可空.
Loading Data Programmatically (以編程的方式加載數(shù)據(jù))
使用上面例子中的數(shù)據(jù):
// Encoders for most common types are automatically provided by importing spark.implicits._importspark.implicits._valpeopleDF=spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFramevalparquetFileDF=spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")valnamesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes=>"Name: "+attributes(0)).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Partition Discovery (分區(qū)發(fā)現(xiàn))
Table partitioning (表分區(qū))是在像 Hive 這樣的系統(tǒng)中使用的常見的優(yōu)化方法. 在 partitioned table (分區(qū)表)中, 數(shù)據(jù)通常存儲在不同的目錄中, partitioning column values encoded (分區(qū)列值編碼)在每個 partition directory (分區(qū)目錄)的路徑中. Parquet data source (Parquet 數(shù)據(jù)源)現(xiàn)在可以自動 discover (發(fā)現(xiàn))和 infer (推斷)分區(qū)信息. 例如, 我們可以使用以下 directory structure (目錄結(jié)構(gòu))將所有以前使用的 population data (人口數(shù)據(jù))存儲到 partitioned table (分區(qū)表)中, 其中有兩個額外的列gender和country作為 partitioning columns (分區(qū)列):
path└── to? ? └── table? ? ? ? ├── gender=male? ? ? ? │?? ├── ...? ? ? ? │?? │? ? ? ? │?? ├── country=US? ? ? ? │?? │?? └── data.parquet? ? ? ? │?? ├── country=CN? ? ? ? │?? │?? └── data.parquet? ? ? ? │?? └── ...? ? ? ? └── gender=female? ? ? ? ?? ├── ...? ? ? ? ?? │? ? ? ? ?? ├── country=US? ? ? ? ?? │?? └── data.parquet? ? ? ? ?? ├── country=CN? ? ? ? ?? │?? └── data.parquet? ? ? ? ?? └── ...
通過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load, Spark SQL 將自動從路徑中提取 partitioning information (分區(qū)信息). 現(xiàn)在返回的 DataFrame 的 schema (模式)變成:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
請注意, 會自動 inferred (推斷) partitioning columns (分區(qū)列)的 data types (數(shù)據(jù)類型).目前, 支持 numeric data types (數(shù)字數(shù)據(jù)類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區(qū)列)的數(shù)據(jù)類型.對于這些用例, automatic type inference (自動類型推斷)可以由spark.sql.sources.partitionColumnTypeInference.enabled配置, 默認為true.當禁用 type inference (類型推斷)時, string type (字符串類型)將用于 partitioning columns (分區(qū)列).
從 Spark 1.6.0 開始, 默認情況下, partition discovery (分區(qū)發(fā)現(xiàn))只能找到給定路徑下的 partitions (分區(qū)).對于上述示例, 如果用戶將path/to/table/gender=male傳遞給SparkSession.read.parquet或SparkSession.read.load, 則gender將不被視為 partitioning column (分區(qū)列).如果用戶需要指定 partition discovery (分區(qū)發(fā)現(xiàn))應該開始的基本路徑, 則可以在數(shù)據(jù)源選項中設置basePath.例如, 當path/to/table/gender=male是數(shù)據(jù)的路徑并且用戶將basePath設置為path/to/table/,gender將是一個 partitioning column (分區(qū)列).
像 ProtocolBuffer , Avro 和 Thrift 一樣, Parquet 也支持 schema evolution (模式演進). 用戶可以從一個 simple schema (簡單的架構(gòu))開始, 并根據(jù)需要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不同但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數(shù)據(jù)源)現(xiàn)在能夠自動檢測這種情況并 merge (合并)所有這些文件的 schemas .
由于 schema merging (模式合并)是一個 expensive operation (相對昂貴的操作), 并且在大多數(shù)情況下不是必需的, 所以默認情況下從 1.5.0 開始. 你可以按照如下的方式啟用它:
讀取 Parquet 文件時, 將 data source option (數(shù)據(jù)源選項)mergeSchema設置為true(如下面的例子所示), 或
將 global SQL option (全局 SQL 選項)spark.sql.parquet.mergeSchema設置為true.
// This is used to implicitly convert an RDD to a DataFrame.importspark.implicits._// Create a simple DataFrame, store into a partition directoryvalsquaresDF=spark.sparkContext.makeRDD(1to5).map(i=>(i,i*i)).toDF("value","square")squaresDF.write.parquet("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")cubesDF.write.parquet("data/test_table/key=2")// Read the partitioned tablevalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")mergedDF.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root//? |-- value: int (nullable = true)//? |-- square: int (nullable = true)//? |-- cube: int (nullable = true)//? |-- key: int (nullable = true)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉(zhuǎn)換)
當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用自己的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來獲得更好的性能. 此 behavior (行為)由spark.sql.hive.convertMetastoreParquet配置控制, 默認情況下 turned on (打開).
Hive/Parquet Schema Reconciliation
從 table schema processing (表格模式處理)的角度來說, Hive 和 Parquet 之間有兩個關鍵的區(qū)別.
Hive 不區(qū)分大小寫, 而 Parquet 不是
Hive 認為所有 columns (列)都可以為空, 而 Parquet 中的可空性是 significant (重要)的.
由于這個原因, 當將 Hive metastore Parquet 表轉(zhuǎn)換為 Spark SQL Parquet 表時, 我們必須調(diào)整 metastore schema 與 Parquet schema. reconciliation 規(guī)則是:
在兩個 schema 中具有 same name (相同名稱)的 Fields (字段)必須具有 same data type (相同的數(shù)據(jù)類型), 而不管 nullability (可空性). reconciled field 應具有 Parquet 的數(shù)據(jù)類型, 以便 nullability (可空性)得到尊重.
reconciled schema (調(diào)和模式)正好包含 Hive metastore schema 中定義的那些字段.
只出現(xiàn)在 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
僅在 Hive metastore schema 中出現(xiàn)的任何字段在 reconciled schema 中作為 nullable field (可空字段)添加.
Metadata Refreshing (元數(shù)據(jù)刷新)
Spark SQL 緩存 Parquet metadata 以獲得更好的性能. 當啟用 Hive metastore Parquet table conversion (轉(zhuǎn)換)時, 這些 converted tables (轉(zhuǎn)換表)的 metadata (元數(shù)據(jù))也被 cached (緩存). 如果這些表由 Hive 或其他外部工具更新, 則需要手動刷新以確保 consistent metadata (一致的元數(shù)據(jù)).
// spark is an existing SparkSessionspark.catalog.refreshTable("my_table")
可以使用SparkSession上的setConf方法或使用 SQL 運行SET key = value命令來完成 Parquet 的配置.
Property Name (參數(shù)名稱)Default(默認)Meaning(含義)
spark.sql.parquet.binaryAsStringfalse一些其他 Parquet-producing systems (Parquet 生產(chǎn)系統(tǒng)), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區(qū)分 binary data (二進制數(shù)據(jù))和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數(shù)據(jù))解釋為 string (字符串)以提供與這些系統(tǒng)的兼容性.
spark.sql.parquet.int96AsTimestamptrue一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數(shù)據(jù)解析為 timestamp 以提供與這些系統(tǒng)的兼容性.
spark.sql.parquet.cacheMetadatatrue打開 Parquet schema metadata 的緩存. 可以加快查詢靜態(tài)數(shù)據(jù).
spark.sql.parquet.compression.codecsnappy在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdowntrue設置為 true 時啟用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquettrue當設置為 false 時, Spark SQL 將使用 Hive SerDe 作為 parquet tables , 而不是內(nèi)置的支持.
spark.sql.parquet.mergeSchemafalse當為 true 時, Parquet data source (Parquet 數(shù)據(jù)源) merges (合并)從所有 data files (數(shù)據(jù)文件)收集的 schemas , 否則如果沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .
spark.sql.optimizer.metadataOnlytrue如果為 true , 則啟用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區(qū)列)而不是 table scans (表掃描). 當 scanned (掃描)的所有 columns (列)都是 partition columns (分區(qū)列)并且 query (查詢)具有滿足 distinct semantics (不同語義)的 aggregate operator (聚合運算符)時, 它將適用.
JSON Datasets (JSON 數(shù)據(jù)集)
Spark SQL 可以 automatically infer (自動推斷)JSON dataset 的 schema, 并將其作為Dataset[Row]加載. 這個 conversion (轉(zhuǎn)換)可以在Dataset[String]上使用SparkSession.read.json()來完成, 或 JSON 文件.
請注意, 以a json file提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱JSON Lines text format, also called newline-delimited JSON.
對于 regular multi-line JSON file (常規(guī)的多行 JSON 文件), 將multiLine選項設置為true.
// Primitive types (Int, String, etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.importspark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesvalpath="examples/src/main/resources/people.json"valpeopleDF=spark.read.json(path)// The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()// root//? |-- age: long (nullable = true)//? |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by sparkvalteenagerNamesDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()// +------+// |? name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset[String] storing one JSON object per stringvalotherPeopleDataset=spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""::Nil)valotherPeople=spark.read.json(otherPeopleDataset)otherPeople.show()// +---------------+----+// |? ? ? ? address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Spark SQL 還支持讀取和寫入存儲在Apache Hive中的數(shù)據(jù)。 但是,由于 Hive 具有大量依賴關系仁烹,因此這些依賴關系不包含在默認 Spark 分發(fā)中悼沿。 如果在類路徑中找到 Hive 依賴項,Spark 將自動加載它們骡送。 請注意,這些 Hive 依賴關系也必須存在于所有工作節(jié)點上,因為它們將需要訪問 Hive 序列化和反序列化庫 (SerDes)搀绣,以訪問存儲在 Hive 中的數(shù)據(jù)。
通過將hive-site.xml,core-site.xml(用于安全配置)和hdfs-site.xml(用于 HDFS 配置)文件放在conf/中來完成配置戳气。
當使用 Hive 時链患,必須用 Hive 支持實例化SparkSession,包括連接到持續(xù)的 Hive 轉(zhuǎn)移瓶您,支持 Hive serdes 和 Hive 用戶定義的功能麻捻。 沒有現(xiàn)有 Hive 部署的用戶仍然可以啟用 Hive 支持纲仍。 當hive-site.xml未配置時,上下文會自動在當前目錄中創(chuàng)建metastore_db贸毕,并創(chuàng)建由spark.sql.warehouse.dir配置的目錄郑叠,該目錄默認為Spark應用程序當前目錄中的spark-warehouse目錄 開始了 請注意,自從2.0.0以來崖咨,hive-site.xml中的hive.metastore.warehouse.dir屬性已被棄用锻拘。 而是使用spark.sql.warehouse.dir來指定倉庫中數(shù)據(jù)庫的默認位置。 您可能需要向啟動 Spark 應用程序的用戶授予寫權(quán)限击蹲。?
importjava.io.Fileimportorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessioncaseclassRecord(key:Int,value:String)// warehouseLocation points to the default location for managed databases and tablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).enableHiveSupport().getOrCreate()importspark.implicits._importspark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsql("SELECT * FROM src").show()// +---+-------+// |key|? value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.sql("SELECT COUNT(*) FROM src").show()// +--------+// |count(1)|// +--------+// |? ? 500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.valsqlDF=sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.valstringsDS=sqlDF.map{caseRow(key:Int,value:String)=>s"Key:$key, Value:$value"}stringsDS.show()// +--------------------+// |? ? ? ? ? ? ? value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// |? 2| val_2|? 2| val_2|// |? 4| val_4|? 4| val_4|// |? 5| val_5|? 5| val_5|// ...
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.
創(chuàng)建 Hive 表時署拟,需要定義如何 從/向 文件系統(tǒng) read/write 數(shù)據(jù),即 “輸入格式” 和 “輸出格式”歌豺。 您還需要定義該表如何將數(shù)據(jù)反序列化為行推穷,或?qū)⑿行蛄谢癁閿?shù)據(jù),即 “serde”类咧。 以下選項可用于指定存儲格式 (“serde”, “input format”, “output format”)馒铃,例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')痕惋。 默認情況下区宇,我們將以純文本形式讀取表格文件。 請注意值戳,Hive 存儲處理程序在創(chuàng)建表時不受支持议谷,您可以使用 Hive 端的存儲處理程序創(chuàng)建一個表,并使用 Spark SQL 來讀取它堕虹。
Property NameMeaning
fileFormatfileFormat是一種存儲格式規(guī)范的包卧晓,包括 "serde","input format" 和 "output format"赴捞。 目前我們支持6個文件格式:'sequencefile'逼裆,'rcfile','orc'赦政,'parquet'胜宇,'textfile'和'avro'。
inputFormat, outputFormat這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定為字符串文字昼钻,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`掸屡。 這兩個選項必須成對出現(xiàn),如果您已經(jīng)指定了 "fileFormat" 選項然评,則無法指定它們仅财。
serde此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時碗淌,如果給定的 `fileFormat` 已經(jīng)包含 serde 的信息盏求,那么不要指定這個選項抖锥。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可以使用這3個文件格式的這個選項碎罚。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim這些選項只能與 "textfile" 文件格式一起使用磅废。它們定義如何將分隔的文件讀入行。
使用OPTIONS定義的所有其他屬性將被視為 Hive serde 屬性荆烈。
Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互拯勉,這使得 Spark SQL 能夠訪問 Hive 表的元數(shù)據(jù)。 從 Spark 1.4.0 開始憔购,使用 Spark SQL 的單一二進制構(gòu)建可以使用下面所述的配置來查詢不同版本的 Hive 轉(zhuǎn)移宫峦。 請注意,獨立于用于與轉(zhuǎn)移點通信的 Hive 版本玫鸟,內(nèi)部 Spark SQL 將針對 Hive 1.2.1 進行編譯导绷,并使用這些類進行內(nèi)部執(zhí)行(serdes,UDF屎飘,UDAF等)妥曲。
以下選項可用于配置用于檢索元數(shù)據(jù)的 Hive 版本:
屬性名稱默認值含義
spark.sql.hive.metastore.version1.2.1Hive metastore 版本。 可用選項為0.12.0至1.2.1钦购。
spark.sql.hive.metastore.jarsbuiltin當啟用-Phive時檐盟,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一起押桃。選擇此選項時遵堵,spark.sql.hive.metastore.version 必須為1.2.1或未定義。 行家 使用從Maven存儲庫下載的指定版本的Hive jar怨规。 通常不建議在生產(chǎn)部署中使用此配置。 ***** 應用于實例化 HiveMetastoreClient 的 jar 的位置锡足。該屬性可以是三個選項之一:
builtin當啟用-Phive時波丰,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一起舶得。選擇此選項時,spark.sql.hive.metastore.version必須為1.2.1或未定義九孩。
maven使用從 Maven 存儲庫下載的指定版本的 Hive jar先馆。通常不建議在生產(chǎn)部署中使用此配置。
JVM 的標準格式的 classpath躺彬。 該類路徑必須包含所有 Hive 及其依賴項煤墙,包括正確版本的 Hadoop梅惯。這些罐只需要存在于 driver 程序中,但如果您正在運行在 yarn 集群模式仿野,那么您必須確保它們與應用程序一起打包铣减。
spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載脚作。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver篮灼。 其它需要共享的類,是需要與已經(jīng)共享的類進行交互的暇务。 例如亡脸,log4j 使用的自定義 appender。
spark.sql.hive.metastore.barrierPrefixes(empty)一個逗號分隔的類前綴列表宾符,應該明確地為 Spark SQL 正在通信的 Hive 的每個版本重新加載酿秸。 例如,在通常將被共享的前綴中聲明的 Hive UDF (即: ?org.apache.spark.*)魏烫。
Spark SQL 還包括可以使用 JDBC 從其他數(shù)據(jù)庫讀取數(shù)據(jù)的數(shù)據(jù)源辣苏。此功能應優(yōu)于使用JdbcRDD。 這是因為結(jié)果作為 DataFrame 返回哄褒,并且可以輕松地在 Spark SQL 中處理或與其他數(shù)據(jù)源連接稀蟋。 JDBC 數(shù)據(jù)源也更容易從 Java 或 Python 使用,因為它不需要用戶提供 ClassTag呐赡。(請注意退客,這不同于 Spark SQL JDBC 服務器,允許其他應用程序使用 Spark SQL 運行查詢)链嘀。
要開始使用萌狂,您需要在 Spark 類路徑中包含特定數(shù)據(jù)庫的 JDBC driver 程序。 例如怀泊,要從 Spark Shell 連接到 postgres茫藏,您將運行以下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
可以使用 Data Sources API 將來自遠程數(shù)據(jù)庫的表作為 DataFrame 或 Spark SQL 臨時視圖進行加載。 用戶可以在數(shù)據(jù)源選項中指定 JDBC 連接屬性霹琼。用戶和密碼通常作為登錄數(shù)據(jù)源的連接屬性提供务傲。 除了連接屬性外,Spark 還支持以下不區(qū)分大小寫的選項:
?屬性名稱含義
url要連接的JDBC URL枣申。 源特定的連接屬性可以在URL中指定售葡。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable應該讀取的 JDBC 表。請注意忠藤,可以使用在SQL查詢的FROM子句中有效的任何內(nèi)容挟伙。 例如,您可以使用括號中的子查詢代替完整表模孩。
driver用于連接到此 URL 的 JDBC driver 程序的類名像寒。
partitionColumn, lowerBound, upperBound如果指定了這些選項烘豹,則必須指定這些選項。 另外诺祸,必須指定numPartitions. 他們描述如何從多個 worker 并行讀取數(shù)據(jù)時將表給分區(qū)携悯。partitionColumn必須是有問題的表中的數(shù)字列。 請注意筷笨,lowerBound和upperBound僅用于決定分區(qū)的大小憔鬼,而不是用于過濾表中的行。 因此胃夏,表中的所有行將被分區(qū)并返回轴或。此選項僅適用于讀操作。
numPartitions在表讀寫中可以用于并行度的最大分區(qū)數(shù)仰禀。這也確定并發(fā)JDBC連接的最大數(shù)量照雁。 如果要寫入的分區(qū)數(shù)超過此限制,則在寫入之前通過調(diào)用coalesce(numPartitions)將其減少到此限制答恶。
fetchsizeJDBC 抓取的大小饺蚊,用于確定每次數(shù)據(jù)往返傳遞的行數(shù)。 這有利于提升 JDBC driver 的性能悬嗓,它們的默認值較形酆簟(例如: Oracle 是 10 行)。 該選項僅適用于讀取操作包竹。
batchsizeJDBC 批處理的大小燕酷,用于確定每次數(shù)據(jù)往返傳遞的行數(shù)。 這有利于提升 JDBC driver 的性能周瞎。 該選項僅適用于寫操作苗缩。默認值為1000.
isolationLevel事務隔離級別,適用于當前連接声诸。 它可以是NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ, 或SERIALIZABLE之一挤渐,對應于 JDBC 連接對象定義的標準事務隔離級別,默認為READ_UNCOMMITTED双絮。 此選項僅適用于寫操作。請參考java.sql.Connection中的文檔得问。
truncate這是一個與 JDBC 相關的選項囤攀。 啟用SaveMode.Overwrite時,此選項會導致 Spark 截斷現(xiàn)有表宫纬,而不是刪除并重新創(chuàng)建焚挠。 這可以更有效,并且防止表元數(shù)據(jù)(例如漓骚,索引)被移除蝌衔。 但是榛泛,在某些情況下,例如當新數(shù)據(jù)具有不同的模式時噩斟,它將無法工作曹锨。 它默認為false。 此選項僅適用于寫操作剃允。
createTableOptions這是一個與JDBC相關的選項沛简。 如果指定,此選項允許在創(chuàng)建表時設置特定于數(shù)據(jù)庫的表和分區(qū)選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)斥废。此選項僅適用于寫操作椒楣。
createTableColumnTypes使用數(shù)據(jù)庫列數(shù)據(jù)類型而不是默認值,創(chuàng)建表時牡肉。 數(shù)據(jù)類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")捧灰。 指定的類型應該是有效的 spark sql 數(shù)據(jù)類型。此選項僅適用于寫操作统锤。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourcevaljdbcDF=spark.read.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").load()valconnectionProperties=newProperties()connectionProperties.put("user","username")connectionProperties.put("password","password")valjdbcDF2=spark.read.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Saving data to a JDBC sourcejdbcDF.write.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Specifying create table column data types on writejdbcDF.write.option("createTableColumnTypes","name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
JDBC driver 程序類必須對客戶端會話和所有執(zhí)行程序上的原始類加載器可見毛俏。 這是因為 Java 的 DriverManager 類執(zhí)行安全檢查,導致它忽略原始類加載器不可見的所有 driver 程序跪另,當打開連接時辑奈。一個方便的方法是修改所有工作節(jié)點上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些數(shù)據(jù)庫咪笑,例如 H2零渐,將所有名稱轉(zhuǎn)換為大寫。 您需要使用大寫字母來引用 Spark SQL 中的這些名稱嘲驾。
對于某些工作負載淌哟,可以通過緩存內(nèi)存中的數(shù)據(jù)或打開一些實驗選項來提高性能。
Spark SQL 可以通過調(diào)用spark.catalog.cacheTable("tableName")或dataFrame.cache()來使用內(nèi)存中的列格式來緩存表辽故。 然后徒仓,Spark SQL 將只掃描所需的列,并將自動調(diào)整壓縮以最小化內(nèi)存使用量和 GC 壓力誊垢。 您可以調(diào)用spark.catalog.uncacheTable("tableName")從內(nèi)存中刪除該表掉弛。
內(nèi)存緩存的配置可以使用SparkSession上的setConf方法或使用 SQL 運行SET key=value命令來完成。
屬性名稱默認含義
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為 true 時喂走,Spark SQL 將根據(jù)數(shù)據(jù)的統(tǒng)計信息為每個列自動選擇一個壓縮編解碼器殃饿。
spark.sql.inMemoryColumnarStorage.batchSize10000控制批量的柱狀緩存的大小。更大的批量大小可以提高內(nèi)存利用率和壓縮率芋肠,但是在緩存數(shù)據(jù)時會冒出 OOM 風險乎芳。
以下選項也可用于調(diào)整查詢執(zhí)行的性能。這些選項可能會在將來的版本中被廢棄,因為更多的優(yōu)化是自動執(zhí)行的奈惑。
屬性名稱默認值含義
spark.sql.files.maxPartitionBytes134217728 (128 MB)在讀取文件時吭净,將單個分區(qū)打包的最大字節(jié)數(shù)。
spark.sql.files.openCostInBytes4194304 (4 MB)按照字節(jié)數(shù)來衡量的打開文件的估計費用可以在同一時間進行掃描肴甸。 將多個文件放入分區(qū)時使用寂殉。最好過度估計,那么具有小文件的分區(qū)將比具有較大文件的分區(qū)(首先計劃的)更快雷滋。
spark.sql.broadcastTimeout300廣播連接中的廣播等待時間超時(秒)
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置執(zhí)行連接時將廣播給所有工作節(jié)點的表的最大大胁怀拧(以字節(jié)為單位)。 通過將此值設置為-1可以禁用廣播晤斩。 請注意焕檬,目前的統(tǒng)計信息僅支持 Hive Metastore 表,其中已運行命令ANALYZE TABLE COMPUTE STATISTICS noscan澳泵。
spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations.
Spark SQL 也可以充當使用其 JDBC/ODBC 或命令行界面的分布式查詢引擎实愚。 在這種模式下,最終用戶或應用程序可以直接與 Spark SQL 交互運行 SQL 查詢兔辅,而不需要編寫任何代碼腊敲。
這里實現(xiàn)的 Thrift JDBC/ODBC 服務器對應于 Hive 1.2 中的HiveServer2。 您可以使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器维苔。
要啟動 JDBC/ODBC 服務器碰辅,請在 Spark 目錄中運行以下命令:
./sbin/start-thriftserver.sh
此腳本接受所有bin/spark-submit命令行選項,以及--hiveconf選項來指定 Hive 屬性介时。 您可以運行./sbin/start-thriftserver.sh --help查看所有可用選項的完整列表没宾。 默認情況下,服務器監(jiān)聽 localhost:10000. 您可以通過環(huán)境變量覆蓋此行為沸柔,即:
exportHIVE_SERVER2_THRIFT_PORT=exportHIVE_SERVER2_THRIFT_BIND_HOST=./sbin/start-thriftserver.sh\--master \...
or system properties:
./sbin/start-thriftserver.sh\--hiveconf hive.server2.thrift.port=\--hiveconf hive.server2.thrift.bind.host=\--master ? ...
現(xiàn)在循衰,您可以使用 beeline 來測試 Thrift JDBC/ODBC 服務器:
./bin/beeline
使用 beeline 方式連接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://localhost:10000
Beeline 將要求您輸入用戶名和密碼。 在非安全模式下褐澎,只需輸入機器上的用戶名和空白密碼即可会钝。 對于安全模式,請按照beeline 文檔中的說明進行操作工三。
配置Hive是通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的迁酸。
您也可以使用 Hive 附帶的 beeline 腳本。
Thrift JDBC 服務器還支持通過 HTTP 傳輸發(fā)送 thrift RPC 消息俭正。 使用以下設置啟用 HTTP 模式作為系統(tǒng)屬性或在conf/中的hive-site.xml文件中啟用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
要測試奸鬓,請使用 beeline 以 http 模式連接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://:/?hive.server2.transport.mode=http;hive.server2.thrift.http.path=
Spark SQL CLI 是在本地模式下運行 Hive 轉(zhuǎn)移服務并執(zhí)行從命令行輸入的查詢的方便工具。 請注意段审,Spark SQL CLI 不能與 Thrift JDBC 服務器通信。
要啟動 Spark SQL CLI,請在 Spark 目錄中運行以下命令:
./bin/spark-sql
配置 Hive 是通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的寺枉。 您可以運行./bin/spark-sql --help獲取所有可用選項的完整列表抑淫。
Spark 2.1.1 介紹了一個新的配置 key:spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是NEVER_INFER, 其行為與 2.1.0 保持一致. 但是,Spark 2.2.0 將此設置的默認值更改為 “INFER_AND_SAVE”姥闪,以恢復與底層文件 schema(模式)具有大小寫混合的列名稱的 Hive metastore 表的兼容性始苇。使用INFER_AND_SAVE配置的 value, 在第一次訪問 Spark 將對其尚未保存推測 schema(模式)的任何 Hive metastore 表執(zhí)行 schema inference(模式推斷). 請注意,對于具有數(shù)千個 partitions(分區(qū))的表筐喳,模式推斷可能是非常耗時的操作催式。如果不兼容大小寫混合的列名,您可以安全地將spark.sql.hive.caseSensitiveInferenceMode設置為NEVER_INFER避归,以避免模式推斷的初始開銷荣月。請注意,使用新的默認INFER_AND_SAVE設置梳毙,模式推理的結(jié)果被保存為 metastore key 以供將來使用哺窄。因此,初始模式推斷僅發(fā)生在表的第一次訪問账锹。
Datasource tables(數(shù)據(jù)源表)現(xiàn)在存儲了 Hive metastore 中的 partition metadata(分區(qū)元數(shù)據(jù)). 這意味著諸如ALTER TABLE PARTITION ... SET LOCATION這樣的 Hive DDLs 現(xiàn)在使用 Datasource API 可用于創(chuàng)建 tables(表).
遺留的數(shù)據(jù)源表可以通過MSCK REPAIR TABLE命令遷移到這種格式萌业。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
要確定表是否已遷移奸柬,當在表上發(fā)出DESCRIBE FORMATTED命令時請查找PartitionProvider: Catalog屬性.
Datasource tables(數(shù)據(jù)源表)的INSERT OVERWRITE TABLE ... PARTITION ...行為的更改生年。
在以前的 Spark 版本中,INSERT OVERWRITE覆蓋了整個 Datasource table廓奕,即使給出一個指定的 partition. 現(xiàn)在只有匹配規(guī)范的 partition 被覆蓋抱婉。
請注意幔托,這仍然與 Hive 表的行為不同静暂,Hive 表僅覆蓋與新插入數(shù)據(jù)重疊的分區(qū)榛了。
SparkSession現(xiàn)在是 Spark 新的切入點, 它替代了老的SQLContext和HiveContext此洲。注意 : 為了向下兼容隐绵,老的 SQLContext 和 HiveContext 仍然保留咆蒿∑咏裕可以從SparkSession獲取一個新的catalog接口 — 現(xiàn)有的訪問數(shù)據(jù)庫和表的 API贴彼,如listTables缘薛,createExternalTable窍育,dropTempView,cacheTable都被移到該接口宴胧。
Dataset API 和 DataFrame API 進行了統(tǒng)一漱抓。在 Scala 中,DataFrame變成了Dataset[Row]類型的一個別名恕齐,而 Java API 使用者必須將DataFrame替換成Dataset乞娄。Dataset 類既提供了強類型轉(zhuǎn)換操作(如map,filter以及groupByKey)也提供了非強類型轉(zhuǎn)換操作(如select和groupBy)。由于編譯期的類型安全不是 Python 和 R 語言的一個特性仪或,Dataset 的概念并不適用于這些語言的 API确镊。相反,DataFrame仍然是最基本的編程抽象, 就類似于這些語言中單節(jié)點 data frame 的概念范删。
Dataset 和 DataFrame API 中 unionAll 已經(jīng)過時并且由union替代蕾域。
Dataset 和 DataFrame API 中 explode 已經(jīng)過時,作為選擇到旦,可以結(jié)合 select 或 flatMap 使用functions.explode()旨巷。
Dataset 和 DataFrame API 中registerTempTable已經(jīng)過時并且由createOrReplaceTempView替代。
對 Hive tablesCREATE TABLE ... LOCATION行為的更改.
從 Spark 2.0 開始添忘,CREATE TABLE ... LOCATION與CREATE EXTERNAL TABLE ... LOCATION是相同的采呐,以防止意外丟棄用戶提供的 locations(位置)中的現(xiàn)有數(shù)據(jù)。這意味著昔汉,在用戶指定位置的 Spark SQL 中創(chuàng)建的 Hive 表始終是 Hive 外部表懈万。刪除外部表將不會刪除數(shù)據(jù)。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意靶病,這與Hive行為不同会通。
因此,這些表上的 “DROP TABLE” 語句不會刪除數(shù)據(jù)娄周。
從 Spark 1.6 開始涕侈,默認情況下服務器在多 session(會話)模式下運行。這意味著每個 JDBC/ODBC 連接擁有一份自己的 SQL 配置和臨時函數(shù)注冊煤辨。緩存表仍在并共享裳涛。如果您希望以舊的單會話模式運行 Thrift server,請設置選項spark.sql.hive.thriftServer.singleSession為true众辨。您既可以將此選項添加到spark-defaults.conf端三,或者通過--conf將它傳遞給start-thriftserver.sh。
./sbin/start-thriftserver.sh\--conf spark.sql.hive.thriftServer.singleSession=true\...
從 1.6.1 開始鹃彻,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現(xiàn)有列郊闯。
從 Spark 1.6 開始,LongType 強制轉(zhuǎn)換為 TimestampType 期望是秒蛛株,而不是微秒团赁。這種更改是為了匹配 Hive 1.2 的行為,以便從 numeric(數(shù)值)類型進行更一致的類型轉(zhuǎn)換到 TimestampType谨履。更多詳情請參閱SPARK-11724欢摄。
使用手動管理的內(nèi)存優(yōu)化執(zhí)行,現(xiàn)在是默認啟用的笋粟,以及代碼生成表達式求值怀挠。這些功能既可以通過設置spark.sql.tungsten.enabled為false來禁止使用析蝴。
Parquet 的模式合并默認情況下不再啟用。它可以通過設置spark.sql.parquet.mergeSchema到true以重新啟用绿淋。
字符串在 Python 列的 columns(列)現(xiàn)在支持使用點(.)來限定列或訪問嵌套值嫌变。例如df['table.column.nestedField']。但是躬它,這意味著如果你的列名中包含任何圓點,你現(xiàn)在必須避免使用反引號(如table.column.with.dots.nested)东涡。
在內(nèi)存中的列存儲分區(qū)修剪默認是開啟的冯吓。它可以通過設置spark.sql.inMemoryColumnarStorage.partitionPruning為false來禁用。
無限精度的小數(shù)列不再支持疮跑,而不是 Spark SQL 最大精度為 38 组贺。當從BigDecimal對象推斷模式時,現(xiàn)在使用(38祖娘,18)失尖。在 DDL 沒有指定精度時,則默認保留Decimal(10, 0)渐苏。
時間戳現(xiàn)在存儲在 1 微秒的精度掀潮,而不是 1 納秒的。
在 sql 語句中琼富,floating point(浮點數(shù))現(xiàn)在解析為 decimal仪吧。HiveQL 解析保持不變。
SQL / DataFrame 函數(shù)的規(guī)范名稱現(xiàn)在是小寫(例如 sum vs SUM)鞠眉。
JSON 數(shù)據(jù)源不會自動加載由其他應用程序(未通過 Spark SQL 插入到數(shù)據(jù)集的文件)創(chuàng)建的新文件薯鼠。對于 JSON 持久表(即表的元數(shù)據(jù)存儲在 Hive Metastore),用戶可以使用REFRESH TABLESQL 命令或HiveContext的refreshTable方法械蹋,把那些新文件列入到表中出皇。對于代表一個 JSON dataset 的 DataFrame,用戶需要重新創(chuàng)建 DataFrame哗戈,同時 DataFrame 中將包括新的文件郊艘。
PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現(xiàn)有的同名列。
DataFrame data reader/writer interface
基于用戶反饋谱醇,我們創(chuàng)建了一個新的更流暢的 API暇仲,用于讀取 (SQLContext.read) 中的數(shù)據(jù)并寫入數(shù)據(jù) (DataFrame.write), 并且舊的 API 將過時(例如,SQLContext.parquetFile,SQLContext.jsonFile).
針對SQLContext.read(Scala,Java,Python) 和DataFrame.write(Scala,Java,Python) 的更多細節(jié)副渴,請看 API 文檔.
DataFrame.groupBy 保留 grouping columns(分組的列)
根據(jù)用戶的反饋奈附, 我們更改了DataFrame.groupBy().agg()的默認行為以保留DataFrame結(jié)果中的 grouping columns(分組列). 為了在 1.3 中保持該行為,請設置spark.sql.retainGroupColumns為false.
// In 1.3.x, in order for the grouping column "department" to show up,// it must be included explicitly as part of the agg function call.df.groupBy("department").agg($"department",max("age"),sum("expense"))// In 1.4+, grouping column "department" is included automatically.df.groupBy("department").agg(max("age"),sum("expense"))// Revert to 1.3 behavior (not retaining grouping column) by:sqlContext.setConf("spark.sql.retainGroupColumns","false")
之前 1.4 版本中煮剧,DataFrame.withColumn() 只支持添加列罐韩。該列將始終在 DateFrame 結(jié)果中被加入作為新的列台谊,即使現(xiàn)有的列可能存在相同的名稱界赔。從 1.4 版本開始,DataFrame.withColumn() 支持添加與所有現(xiàn)有列的名稱不同的列或替換現(xiàn)有的同名列顶掉。
請注意,這一變化僅適用于 Scala API挑胸,并不適用于 PySpark 和 SparkR痒筒。
在 Spark 1.3 中,我們從 Spark SQL 中刪除了 “Alpha” 的標簽茬贵,作為一部分已經(jīng)清理過的可用的 API 簿透。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其他版本的二進制兼容性解藻。這種兼容性保證不包括被明確標記為不穩(wěn)定的(即 DeveloperApi 類或 Experimental) API老充。
升級到 Spark SQL 1.3 版本時,用戶會發(fā)現(xiàn)最大的變化是螟左,SchemaRDD已更名為DataFrame啡浊。這主要是因為 DataFrames 不再從 RDD 直接繼承,而是由 RDDS 自己來實現(xiàn)這些功能胶背。DataFrames 仍然可以通過調(diào)用.rdd方法轉(zhuǎn)換為 RDDS 巷嚣。
在 Scala 中,有一個從SchemaRDD到DataFrame類型別名钳吟,可以為一些情況提供源代碼兼容性涂籽。它仍然建議用戶更新他們的代碼以使用DataFrame來代替。Java 和 Python 用戶需要更新他們的代碼砸抛。
此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext和JavaSchemaRDD)评雌,借鑒于 Scala API。在 Spark 1.3 中直焙,Java API 和 Scala API 已經(jīng)統(tǒng)一景东。兩種語言的用戶可以使用SQLContext和DataFrame。一般來說論文類嘗試使用兩種語言的共有類型(如Array替代了一些特定集合)奔誓。在某些情況下不通用的類型情況下斤吐,(例如,passing in closures 或 Maps)使用函數(shù)重載代替厨喂。
此外和措,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可以使用存在于org.apache.spark.sql.types類來描述編程模式蜕煌。
隔離隱式轉(zhuǎn)換和刪除 dsl 包(僅Scala)
許多 Spark 1.3 版本以前的代碼示例都以import sqlContext._開始派阱,這提供了從 sqlContext 范圍的所有功能。在 Spark 1.3 中斜纪,我們移除了從RDDs 到DateFrame再到SQLContext內(nèi)部對象的隱式轉(zhuǎn)換贫母。用戶現(xiàn)在應該寫成import sqlContext.implicits._.
此外文兑,隱式轉(zhuǎn)換現(xiàn)在只能使用方法toDF來增加由Product(即 case classes or tuples)構(gòu)成的RDD,而不是自動應用腺劣。
當使用 DSL 內(nèi)部的函數(shù)時(現(xiàn)在使用DataFrameAPI 來替換), 用戶習慣導入org.apache.spark.sql.catalyst.dsl. 相反绿贞,應該使用公共的 dataframe 函數(shù) API:import org.apache.spark.sql.functions._.
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala)
Spark 1.3 移除存在于基本 SQL 包的DataType類型別名。開發(fā)人員應改為導入類org.apache.spark.sql.types橘原。
UDF 注冊遷移到sqlContext.udf中 (Java & Scala)
用于注冊 UDF 的函數(shù)籍铁,不管是 DataFrame DSL 還是 SQL 中用到的,都被遷移到SQLContext中的 udf 對象中趾断。
sqlContext.udf.register("strLen",(s:String)=>s.length())
Python UDF 注冊保持不變寨辩。
Python DataTypes 不再是 Singletons(單例的)
在 Python 中使用 DataTypes 時,你需要先構(gòu)造它們(如:StringType())歼冰,而不是引用一個單例對象。
Spark SQL 在設計時就考慮到了和 Hive metastore耻警,SerDes 以及 UDF 之間的兼容性隔嫡。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,并且Spark SQL 可以連接到不同版本的Hive metastore(從 0.12.0 到 1.2.1甘穿,可以參考與不同版本的 Hive Metastore 交互)
在現(xiàn)有的 Hive Warehouses 中部署
Spark SQL Thrift JDBC server 采用了開箱即用的設計以兼容已有的 Hive 安裝版本腮恩。你不需要修改現(xiàn)有的 Hive Metastore , 或者改變數(shù)據(jù)的位置和表的分區(qū)。
Spark SQL 支持絕大部分的 Hive 功能温兼,如:
Hive query(查詢)語句, 包括:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
所有 Hive 操作, 包括:
關系運算符 (=,?,==,<>,<,>,>=,<=, 等等)
算術運算符 (+,-,*,/,%, 等等)
邏輯運算符 (AND,&&,OR,||, 等等)
復雜類型的構(gòu)造
數(shù)學函數(shù) (sign,ln,cos, 等等)
String 函數(shù) (instr,length,printf, 等等)
用戶定義函數(shù) (UDF)
用戶定義聚合函數(shù) (UDAF)
用戶定義 serialization formats (SerDes)
窗口函數(shù)
Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
Unions
Sub-queries(子查詢)
SELECT col FROM ( SELECT a + b AS col from t1) t2
Sampling
Explain
Partitioned tables including dynamic partition insertion
View
所有的 Hive DDL 函數(shù), 包括:
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
大部分的 Hive Data types(數(shù)據(jù)類型), 包括:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
以下是目前還不支持的 Hive 函數(shù)列表秸滴。在 Hive 部署中這些功能大部分都用不到。
主要的 Hive 功能
Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.
Esoteric Hive 功能
UNION類型
Unique join
Column 統(tǒng)計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.
Hive Input/Output Formats
File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
Hadoop archive
Hive 優(yōu)化
有少數(shù) Hive 優(yōu)化還沒有包含在 Spark 中募判。其中一些(比如 indexes 索引)由于 Spark SQL 的這種內(nèi)存計算模型而顯得不那么重要荡含。另外一些在 Spark SQL 未來的版本中會持續(xù)跟蹤。
Block 級別的 bitmap indexes 和虛擬 columns (用于構(gòu)建 indexes)
自動為 join 和 groupBy 計算 reducer 個數(shù) : 目前在 Spark SQL 中, 你需要使用 “SET spark.sql.shuffle.partitions=[num_tasks];” 來控制 post-shuffle 的并行度.
僅 Meta-data 的 query: 對于只使用 metadata 就能回答的查詢届垫,Spark SQL 仍然會啟動計算結(jié)果的任務.
Skew data flag: Spark SQL 不遵循 Hive 中 skew 數(shù)據(jù)的標記.
STREAMTABLEhint in join: Spark SQL 不遵循STREAMTABLEhint.
對于查詢結(jié)果合并多個小文件: 如果輸出的結(jié)果包括多個小文件, Hive 可以可選的合并小文件到一些大文件中去释液,以避免溢出 HDFS metadata. Spark SQL 還不支持這樣.
Spark SQL 和 DataFrames 支持下面的數(shù)據(jù)類型:
Numeric types
ByteType: Represents 1-byte signed integer numbers. The range of numbers is from-128to127.
ShortType: Represents 2-byte signed integer numbers. The range of numbers is from-32768to32767.
IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from-2147483648to2147483647.
LongType: Represents 8-byte signed integer numbers. The range of numbers is from-9223372036854775808to9223372036854775807.
FloatType: Represents 4-byte single-precision floating point numbers.
DoubleType: Represents 8-byte double-precision floating point numbers.
DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally byjava.math.BigDecimal. ABigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
String type
StringType: Represents character string values.
Binary type
BinaryType: Represents byte sequence values.
Boolean type
BooleanType: Represents boolean values.
Datetime type
TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
DateType: Represents values comprising values of fields year, month, day.
Complex types
ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type ofelementType.containsNullis used to indicate if elements in aArrayTypevalue can havenullvalues.
MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described bykeyTypeand the data type of values are described byvalueType. For aMapTypevalue, keys are not allowed to havenullvalues.valueContainsNullis used to indicate if values of aMapTypevalue can havenullvalues.
StructType(fields): Represents values with the structure described by a sequence ofStructFields (fields).
StructField(name, dataType, nullable): Represents a field in aStructType. The name of a field is indicated byname. The data type of a field is indicated bydataType.nullableis used to indicate if values of this fields can havenullvalues.
Spark SQL 的所有數(shù)據(jù)類型都在包org.apache.spark.sql.types中. 你可以用下示例示例來訪問它們.
importorg.apache.spark.sql.types._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數(shù)據(jù)類型)Scala 中的 Value 類型訪問或創(chuàng)建數(shù)據(jù)類型的 API
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayTypescala.collection.SeqArrayType(elementType, [containsNull])
Note(注意):containsNull的默認值是true.
MapTypescala.collection.MapMapType(keyType,valueType, [valueContainsNull])
Note(注意):valueContainsNull的默認值是true.
StructTypeorg.apache.spark.sql.RowStructType(fields)
Note(注意):fields是 StructFields 的 Seq. 所有, 兩個 fields 擁有相同的名稱是不被允許的.
StructField該 field(字段)數(shù)據(jù)類型的 Scala 中的 value 類型 (例如, 數(shù)據(jù)類型為 IntegerType 的 StructField 是 Int)StructField(name,dataType, [nullable])
Note:nullable的默認值是true.
當處理一些不符合標準浮點數(shù)語義的float或double類型時,對于 Not-a-Number(NaN) 需要做一些特殊處理. 具體如下:
NaN = NaN 返回 true.
在 aggregations(聚合)操作中装处,所有的 NaN values 將被分到同一個組中.
在 join key 中 NaN 可以當做一個普通的值.
NaN 值在升序排序中排到最后误债,比任何其他數(shù)值都大.
Spark SQL, DataFrames and Datasets Guide
無類型的Dataset操作 (aka DataFrame 操作)
Running SQL Queries Programmatically
Untyped User-Defined Aggregate Functions
Type-Safe User-Defined Aggregate Functions
Generic Load/Save Functions (通用 加載/保存 功能)
Manually Specifying Options (手動指定選項)
Run SQL on files directly (直接在文件上運行 SQL)
Saving to Persistent Tables (保存到持久表)
Bucketing, Sorting and Partitioning (分桶, 排序和分區(qū))
Loading Data Programmatically (以編程的方式加載數(shù)據(jù))
Partition Discovery (分區(qū)發(fā)現(xiàn))
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉(zhuǎn)換)
Hive/Parquet Schema Reconciliation
Metadata Refreshing (元數(shù)據(jù)刷新)
JSON Datasets (JSON 數(shù)據(jù)集)
DataFrame data reader/writer interface
DataFrame.groupBy 保留 grouping columns(分組的列)
隔離隱式轉(zhuǎn)換和刪除 dsl 包(僅Scala)
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala)
UDF 注冊遷移到sqlContext.udf中 (Java & Scala)
Python DataTypes 不再是 Singletons(單例的)
在現(xiàn)有的 Hive Warehouses 中部署
Spark SQL 是 Spark 處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊.與基礎的 Spark RDD API 不同, Spark SQL 提供了查詢結(jié)構(gòu)化數(shù)據(jù)及計算結(jié)果等信息的接口.在內(nèi)部, Spark SQL 使用這個額外的信息去執(zhí)行額外的優(yōu)化.有幾種方式可以跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執(zhí)行引擎進行計算時, 無論使用哪種 API / 語言都可以快速的計算.這種統(tǒng)一意味著開發(fā)人員能夠在基于提供最自然的方式來表達一個給定的 transformation API 之間實現(xiàn)輕松的來回切換不同的 .
該頁面所有例子使用的示例數(shù)據(jù)都包含在 Spark 的發(fā)布中, 并且可以使用spark-shell,pysparkshell, 或者sparkRshell來運行.
Spark SQL 的功能之一是執(zhí)行 SQL 查詢.Spark SQL 也能夠被用于從已存在的 Hive 環(huán)境中讀取數(shù)據(jù).更多關于如何配置這個特性的信息, 請參考Hive 表這部分. 當以另外的編程語言運行SQL 時, 查詢結(jié)果將以Dataset/DataFrame的形式返回.您也可以使用命令行或者通過JDBC/ODBC與 SQL 接口交互.
一個 Dataset 是一個分布式的數(shù)據(jù)集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優(yōu)點(強類型化, 能夠使用強大的 lambda 函數(shù))與Spark SQL執(zhí)行引擎的優(yōu)點.一個 Dataset 可以從 JVM 對象來構(gòu)造并且使用轉(zhuǎn)換功能(map, flatMap, filter, 等等). Dataset API 在Scala和Java是可用的.Python 不支持 Dataset API.但是由于 Python 的動態(tài)特性, 許多 Dataset API 的優(yōu)點已經(jīng)可用了 (也就是說, 你可能通過 name 天生的row.columnName屬性訪問一行中的字段).這種情況和 R 相似.
一個 DataFrame 是一個Dataset組成的指定列.它的概念與一個在關系型數(shù)據(jù)庫或者在 R/Python 中的表是相等的, 但是有很多優(yōu)化. DataFrames 可以從大量的sources中構(gòu)造出來, 比如: 結(jié)構(gòu)化的文本文件, Hive中的表, 外部數(shù)據(jù)庫, 或者已經(jīng)存在的 RDDs. DataFrame API 可以在 Scala, Java,Python, 和R中實現(xiàn). 在 Scala 和 Java中, 一個 DataFrame 所代表的是一個多個Row(行)的的 Dataset(數(shù)據(jù)集合). 在the Scala API中,DataFrame僅僅是一個Dataset[Row]類型的別名. 然而, 在Java API中, 用戶需要去使用Dataset去代表一個DataFrame.
在此文檔中, 我們將常常會引用 Scala/Java Datasets 的Rows 作為 DataFrames.
Spark SQL中所有功能的入口點是SparkSession類. 要創(chuàng)建一個SparkSession, 僅使用SparkSession.builder()就可以了:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFramesimportspark.implicits._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark 2.0 中的SparkSession為 Hive 特性提供了內(nèi)嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數(shù)據(jù)的能力.為了使用這些特性, 你不需要去有一個已存在的 Hive 設置.
在一個SparkSession中, 應用程序可以從一個已經(jīng)存在的RDD, 從hive表, 或者從Spark數(shù)據(jù)源中創(chuàng)建一個DataFrames.
舉個例子, 下面就是基于一個JSON文件創(chuàng)建一個DataFrame:
valdf=spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
無類型的Dataset操作 (aka DataFrame 操作)
DataFrames 提供了一個特定的語法用在Scala,Java,PythonandR中機構(gòu)化數(shù)據(jù)的操作.
正如上面提到的一樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個Rows的Dataset. 這些操作也參考了與強類型的Scala/Java Datasets中的”類型轉(zhuǎn)換” 對應的”無類型轉(zhuǎn)換” .
這里包括一些使用 Dataset 進行結(jié)構(gòu)化數(shù)據(jù)處理的示例 :
// This import is needed to use the $-notationimportspark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// |? name|// +-------+// |Michael|// |? Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name",$"age"+1).show()// +-------+---------+// |? name|(age + 1)|// +-------+---------+// |Michael|? ? null|// |? Andy|? ? ? 31|// | Justin|? ? ? 20|// +-------+---------+// Select people older than 21df.filter($"age">21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// |? 19|? ? 1|// |null|? ? 1|// |? 30|? ? 1|// +----+-----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
能夠在 DataFrame 上被執(zhí)行的操作類型的完整列表請參考API 文檔.
除了簡單的列引用和表達式之外, DataFrame 也有豐富的函數(shù)庫, 包括 string 操作, date 算術, 常見的 math 操作以及更多.可用的完整列表請參考DataFrame 函數(shù)指南.
Running SQL Queries Programmatically
SparkSession的sql函數(shù)可以讓應用程序以編程的方式運行 SQL 查詢, 并將結(jié)果作為一個DataFrame返回.
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")valsqlDF=spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark SQL中的臨時視圖是session級別的, 也就是會隨著session的消失而消失. 如果你想讓一個臨時視圖在所有session中相互傳遞并且可用, 直到Spark 應用退出, 你可以建立一個全局的臨時視圖.全局的臨時視圖存在于系統(tǒng)數(shù)據(jù)庫global_temp中, 我們必須加上庫名去引用它, 比如.SELECT * FROM global_temp.view1.
// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Dataset 與 RDD 相似, 然而, 并不是使用 Java 序列化或者 Kryo編碼器來序列化用于處理或者通過網(wǎng)絡進行傳輸?shù)膶ο? 雖然編碼器和標準的序列化都負責將一個對象序列化成字節(jié), 編碼器是動態(tài)生成的代碼, 并且使用了一種允許 Spark 去執(zhí)行許多像 filtering, sorting 以及 hashing 這樣的操作, 不需要將字節(jié)反序列化成對象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name:String,age:Long)// Encoders are created for case classesvalcaseClassDS=Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._valprimitiveDS=Seq(1,2,3).toDS()primitiveDS.map(_+1).collect()// Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by namevalpath="examples/src/main/resources/people.json"valpeopleDS=spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age|? name|// +----+-------+// |null|Michael|// |? 30|? Andy|// |? 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark SQL 支持兩種不同的方法用于轉(zhuǎn)換已存在的 RDD 成為 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基于方法的反射可以讓你的代碼更簡潔.
第二種用于創(chuàng)建 Dataset 的方法是通過一個允許你構(gòu)造一個 Schema 然后把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它允許你去構(gòu)造 Dataset.
Spark SQL 的 Scala 接口支持自動轉(zhuǎn)換一個包含 case classes 的 RDD 為 DataFrame.Case class 定義了表的 Schema.Case class 的參數(shù)名使用反射讀取并且成為了列名.Case class 也可以是嵌套的或者包含像Seq或者Array這樣的復雜類型.這個 RDD 能夠被隱式轉(zhuǎn)換成一個 DataFrame 然后被注冊為一個表.表可以用于后續(xù)的 SQL 語句.
// For implicit conversions from RDDs to DataFramesimportspark.implicits._// Create an RDD of Person objects from a text file, convert it to a DataframevalpeopleDF=spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by SparkvalteenagersDF=spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager=>"Name: "+teenager(0)).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager=>"Name: "+teenager.getAs[String]("name")).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager=>teenager.getValuesMap[Any](List("name","age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
當 case class 不能夠在執(zhí)行之前被定義(例如, records 記錄的結(jié)構(gòu)在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析并且不同的用戶投影的字段是不一樣的).一個DataFrame可以使用下面的三步以編程的方式來創(chuàng)建.
從原始的 RDD 創(chuàng)建 RDD 的Row(行);
Step 1 被創(chuàng)建后, 創(chuàng)建 Schema 表示一個StructType匹配 RDD 中的Row(行)的結(jié)構(gòu).
通過SparkSession提供的createDataFrame方法應用 Schema 到 RDD 的 RowS(行).
例如:
importorg.apache.spark.sql.types._// Create an RDDvalpeopleRDD=spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringvalschemaString="name age"http:// Generate the schema based on the string of schemavalfields=schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))valschema=StructType(fields)// Convert records of the RDD (people) to RowsvalrowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0),attributes(1).trim))// Apply the schema to the RDDvalpeopleDF=spark.createDataFrame(rowRDD,schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesvalresults=spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes=>"Name: "+attributes(0)).show()// +-------------+// |? ? ? ? value|// +-------------+// |Name: Michael|// |? Name: Andy|// | Name: Justin|// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Thebuilt-in DataFrames functionsprovide common aggregations such ascount(),countDistinct(),avg(),max(),min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them inScalaandJavato work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend theUserDefinedAggregateFunctionabstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.expressions.UserDefinedAggregateFunctionimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionobjectMyAverageextendsUserDefinedAggregateFunction{// Data types of input arguments of this aggregate functiondefinputSchema:StructType=StructType(StructField("inputColumn",LongType)::Nil)// Data types of values in the aggregation bufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)}// The data type of the returned valuedefdataType:DataType=DoubleType// Whether this function always returns the same output on the identical inputdefdeterministic:Boolean=true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.definitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0Lbuffer(1)=0L}// Updates the given aggregation buffer `buffer` with new input data from `input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0)){buffer(0)=buffer.getLong(0)+input.getLong(0)buffer(1)=buffer.getLong(1)+1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`defmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}// Calculates the final resultdefevaluate(buffer:Row):Double=buffer.getLong(0).toDouble/buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage",MyAverage)valdf=spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()// +-------+------+// |? name|salary|// +-------+------+// |Michael|? 3000|// |? Andy|? 4500|// | Justin|? 3500|// |? Berta|? 4000|// +-------+------+valresult=spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()// +--------------+// |average_salary|// +--------------+// |? ? ? ? 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around theAggregatorabstract class. For example, a type-safe user-defined average can look like:
importorg.apache.spark.sql.expressions.Aggregatorimportorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encodersimportorg.apache.spark.sql.SparkSessioncaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]{// A zero value for this aggregation. Should satisfy the property that any b + zero = bdefzero:Average=Average(0L,0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdefreduce(buffer:Average,employee:Employee):Average={buffer.sum+=employee.salarybuffer.count+=1buffer}// Merge two intermediate valuesdefmerge(b1:Average,b2:Average):Average={b1.sum+=b2.sumb1.count+=b2.countb1}// Transform the output of the reductiondeffinish(reduction:Average):Double=reduction.sum.toDouble/reduction.count// Specifies the Encoder for the intermediate value typedefbufferEncoder:Encoder[Average]=Encoders.product// Specifies the Encoder for the final output value typedefoutputEncoder:Encoder[Double]=Encoders.scalaDouble}valds=spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// +-------+------+// |? name|salary|// +-------+------+// |Michael|? 3000|// |? Andy|? 4500|// | Justin|? 3500|// |? Berta|? 4000|// +-------+------+// Convert the function to a `TypedColumn` and give it a namevalaverageSalary=MyAverage.toColumn.name("average_salary")valresult=ds.select(averageSalary)result.show()// +--------------+// |average_salary|// +--------------+// |? ? ? ? 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.
Spark SQL 支持通過 DataFrame 接口對各種 data sources (數(shù)據(jù)源)進行操作. DataFrame 可以使用 relational transformations (關系轉(zhuǎn)換)操作, 也可用于創(chuàng)建 temporary view (臨時視圖). 將 DataFrame 注冊為 temporary view (臨時視圖)允許您對其數(shù)據(jù)運行 SQL 查詢. 本節(jié) 描述了使用 Spark Data Sources 加載和保存數(shù)據(jù)的一般方法, 然后涉及可用于 built-in data sources (內(nèi)置數(shù)據(jù)源)的 specific options (特定選項).
Generic Load/Save Functions (通用 加載/保存 功能)
在最簡單的形式中, 默認數(shù)據(jù)源(parquet, 除非另有配置spark.sql.sources.default)將用于所有操作.
valusersDF=spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Manually Specifying Options (手動指定選項)
您還可以 manually specify (手動指定)將與任何你想傳遞給 data source 的其他選項一起使用的 data source . Data sources 由其 fully qualified name (完全限定名稱)(即org.apache.spark.sql.parquet), 但是對于 built-in sources (內(nèi)置的源), 你也可以使用它們的 shortnames (短名稱)(json,parquet,jdbc,orc,libsvm,csv,text).從任何 data source type (數(shù)據(jù)源類型)加載 DataFrames 可以使用此 syntax (語法)轉(zhuǎn)換為其他類型.
valpeopleDF=spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Run SQL on files directly (直接在文件上運行 SQL)
不使用讀取 API 將文件加載到 DataFrame 并進行查詢, 也可以直接用 SQL 查詢該文件.
valsqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Save operations (保存操作)可以選擇使用SaveMode, 它指定如何處理現(xiàn)有數(shù)據(jù)如果存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)并且不是 atomic (原子). 另外, 當執(zhí)行Overwrite時, 數(shù)據(jù)將在新數(shù)據(jù)寫出之前被刪除.
Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果數(shù)據(jù)已經(jīng)存在, 則會拋出異常.
SaveMode.Append"append"將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果 data/table 已存在, 則 DataFrame 的內(nèi)容將被 append (附加)到現(xiàn)有數(shù)據(jù)中.
SaveMode.Overwrite"overwrite"Overwrite mode (覆蓋模式)意味著將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果 data/table 已經(jīng)存在, 則預期 DataFrame 的內(nèi)容將 overwritten (覆蓋)現(xiàn)有數(shù)據(jù).
SaveMode.Ignore"ignore"Ignore mode (忽略模式)意味著當將 DataFrame 保存到 data source (數(shù)據(jù)源)時, 如果數(shù)據(jù)已經(jīng)存在, 則保存操作預期不會保存 DataFrame 的內(nèi)容, 并且不更改現(xiàn)有數(shù)據(jù). 這與 SQL 中的CREATE TABLE IF NOT EXISTS類似.
Saving to Persistent Tables (保存到持久表)
DataFrames也可以使用saveAsTable命令作為 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現(xiàn)有的 Hive 部署)不需要使用此功能. Spark 將為您創(chuàng)建默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與createOrReplaceTempView命令不同,saveAsTable將 materialize (實現(xiàn)) DataFrame 的內(nèi)容, 并創(chuàng)建一個指向 Hive metastore 中數(shù)據(jù)的指針. 即使您的 Spark 程序重新啟動, Persistent tables (持久性表)仍然存在, 因為您保持與同一個 metastore 的連接. 可以通過使用表的名稱在SparkSession上調(diào)用table方法來創(chuàng)建 persistent tabl (持久表)的 DataFrame .
對于 file-based (基于文件)的 data source (數(shù)據(jù)源), 例如 text, parquet, json等, 您可以通過path選項指定 custom table path (自定義表路徑), 例如df.write.option("path", "/some/path").saveAsTable("t"). 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 并且表數(shù)據(jù)仍然存在. 如果未指定自定義表路徑, Spark 將把數(shù)據(jù)寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.
從 Spark 2.1 開始, persistent datasource tables (持久性數(shù)據(jù)源表)將 per-partition metadata (每個分區(qū)元數(shù)據(jù))存儲在 Hive metastore 中. 這帶來了幾個好處:
由于 metastore 只能返回查詢的必要 partitions (分區(qū)), 因此不再需要將第一個查詢上的所有 partitions discovering 到表中.
Hive DDLs 如ALTER TABLE PARTITION ... SET LOCATION現(xiàn)在可用于使用 Datasource API 創(chuàng)建的表.
請注意, 創(chuàng)建 external datasource tables (外部數(shù)據(jù)源表)(帶有path選項)的表時, 默認情況下不會收集 partition information (分區(qū)信息). 要 sync (同步) metastore 中的分區(qū)信息, 可以調(diào)用MSCK REPAIR TABLE.
Bucketing, Sorting and Partitioning (分桶, 排序和分區(qū))
對于 file-based data source (基于文件的數(shù)據(jù)源), 也可以對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用于 persistent tables :
peopleDF.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
在使用 Dataset API 時, partitioning 可以同時與save和saveAsTable一起使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
可以為 single table (單個表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
partitionBy創(chuàng)建一個 directory structure (目錄結(jié)構(gòu)), 如Partition Discovery部分所述. 因此, 對 cardinality (基數(shù))較高的 columns 的適用性有限. 相反,bucketBy可以在固定數(shù)量的 buckets 中分配數(shù)據(jù), 并且可以在 a number of unique values is unbounded (多個唯一值無界時)使用數(shù)據(jù).
Parquet是許多其他數(shù)據(jù)處理系統(tǒng)支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數(shù)據(jù)的模式). 當編寫 Parquet 文件時, 出于兼容性原因, 所有 columns 都將自動轉(zhuǎn)換為可空.
Loading Data Programmatically (以編程的方式加載數(shù)據(jù))
使用上面例子中的數(shù)據(jù):
// Encoders for most common types are automatically provided by importing spark.implicits._importspark.implicits._valpeopleDF=spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFramevalparquetFileDF=spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")valnamesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes=>"Name: "+attributes(0)).show()// +------------+// |? ? ? value|// +------------+// |Name: Justin|// +------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Partition Discovery (分區(qū)發(fā)現(xiàn))
Table partitioning (表分區(qū))是在像 Hive 這樣的系統(tǒng)中使用的常見的優(yōu)化方法. 在 partitioned table (分區(qū)表)中, 數(shù)據(jù)通常存儲在不同的目錄中, partitioning column values encoded (分區(qū)列值編碼)在每個 partition directory (分區(qū)目錄)的路徑中. Parquet data source (Parquet 數(shù)據(jù)源)現(xiàn)在可以自動 discover (發(fā)現(xiàn))和 infer (推斷)分區(qū)信息. 例如, 我們可以使用以下 directory structure (目錄結(jié)構(gòu))將所有以前使用的 population data (人口數(shù)據(jù))存儲到 partitioned table (分區(qū)表)中, 其中有兩個額外的列gender和country作為 partitioning columns (分區(qū)列):
path└── to? ? └── table? ? ? ? ├── gender=male? ? ? ? │?? ├── ...? ? ? ? │?? │? ? ? ? │?? ├── country=US? ? ? ? │?? │?? └── data.parquet? ? ? ? │?? ├── country=CN? ? ? ? │?? │?? └── data.parquet? ? ? ? │?? └── ...? ? ? ? └── gender=female? ? ? ? ?? ├── ...? ? ? ? ?? │? ? ? ? ?? ├── country=US? ? ? ? ?? │?? └── data.parquet? ? ? ? ?? ├── country=CN? ? ? ? ?? │?? └── data.parquet? ? ? ? ?? └── ...
通過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load, Spark SQL 將自動從路徑中提取 partitioning information (分區(qū)信息). 現(xiàn)在返回的 DataFrame 的 schema (模式)變成:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
請注意, 會自動 inferred (推斷) partitioning columns (分區(qū)列)的 data types (數(shù)據(jù)類型).目前, 支持 numeric data types (數(shù)字數(shù)據(jù)類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區(qū)列)的數(shù)據(jù)類型.對于這些用例, automatic type inference (自動類型推斷)可以由spark.sql.sources.partitionColumnTypeInference.enabled配置, 默認為true.當禁用 type inference (類型推斷)時, string type (字符串類型)將用于 partitioning columns (分區(qū)列).
從 Spark 1.6.0 開始, 默認情況下, partition discovery (分區(qū)發(fā)現(xiàn))只能找到給定路徑下的 partitions (分區(qū)).對于上述示例, 如果用戶將path/to/table/gender=male傳遞給SparkSession.read.parquet或SparkSession.read.load, 則gender將不被視為 partitioning column (分區(qū)列).如果用戶需要指定 partition discovery (分區(qū)發(fā)現(xiàn))應該開始的基本路徑, 則可以在數(shù)據(jù)源選項中設置basePath.例如, 當path/to/table/gender=male是數(shù)據(jù)的路徑并且用戶將basePath設置為path/to/table/,gender將是一個 partitioning column (分區(qū)列).
像 ProtocolBuffer , Avro 和 Thrift 一樣, Parquet 也支持 schema evolution (模式演進). 用戶可以從一個 simple schema (簡單的架構(gòu))開始, 并根據(jù)需要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不同但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數(shù)據(jù)源)現(xiàn)在能夠自動檢測這種情況并 merge (合并)所有這些文件的 schemas .
由于 schema merging (模式合并)是一個 expensive operation (相對昂貴的操作), 并且在大多數(shù)情況下不是必需的, 所以默認情況下從 1.5.0 開始. 你可以按照如下的方式啟用它:
讀取 Parquet 文件時, 將 data source option (數(shù)據(jù)源選項)mergeSchema設置為true(如下面的例子所示), 或
將 global SQL option (全局 SQL 選項)spark.sql.parquet.mergeSchema設置為true.
// This is used to implicitly convert an RDD to a DataFrame.importspark.implicits._// Create a simple DataFrame, store into a partition directoryvalsquaresDF=spark.sparkContext.makeRDD(1to5).map(i=>(i,i*i)).toDF("value","square")squaresDF.write.parquet("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")cubesDF.write.parquet("data/test_table/key=2")// Read the partitioned tablevalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")mergedDF.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root//? |-- value: int (nullable = true)//? |-- square: int (nullable = true)//? |-- cube: int (nullable = true)//? |-- key: int (nullable = true)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉(zhuǎn)換)
當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用自己的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來獲得更好的性能. 此 behavior (行為)由spark.sql.hive.convertMetastoreParquet配置控制, 默認情況下 turned on (打開).
Hive/Parquet Schema Reconciliation
從 table schema processing (表格模式處理)的角度來說, Hive 和 Parquet 之間有兩個關鍵的區(qū)別.
Hive 不區(qū)分大小寫, 而 Parquet 不是
Hive 認為所有 columns (列)都可以為空, 而 Parquet 中的可空性是 significant (重要)的.
由于這個原因, 當將 Hive metastore Parquet 表轉(zhuǎn)換為 Spark SQL Parquet 表時, 我們必須調(diào)整 metastore schema 與 Parquet schema. reconciliation 規(guī)則是:
在兩個 schema 中具有 same name (相同名稱)的 Fields (字段)必須具有 same data type (相同的數(shù)據(jù)類型), 而不管 nullability (可空性). reconciled field 應具有 Parquet 的數(shù)據(jù)類型, 以便 nullability (可空性)得到尊重.
reconciled schema (調(diào)和模式)正好包含 Hive metastore schema 中定義的那些字段.
只出現(xiàn)在 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
僅在 Hive metastore schema 中出現(xiàn)的任何字段在 reconciled schema 中作為 nullable field (可空字段)添加.
Metadata Refreshing (元數(shù)據(jù)刷新)
Spark SQL 緩存 Parquet metadata 以獲得更好的性能. 當啟用 Hive metastore Parquet table conversion (轉(zhuǎn)換)時, 這些 converted tables (轉(zhuǎn)換表)的 metadata (元數(shù)據(jù))也被 cached (緩存). 如果這些表由 Hive 或其他外部工具更新, 則需要手動刷新以確保 consistent metadata (一致的元數(shù)據(jù)).
// spark is an existing SparkSessionspark.catalog.refreshTable("my_table")
可以使用SparkSession上的setConf方法或使用 SQL 運行SET key = value命令來完成 Parquet 的配置.
Property Name (參數(shù)名稱)Default(默認)Meaning(含義)
spark.sql.parquet.binaryAsStringfalse一些其他 Parquet-producing systems (Parquet 生產(chǎn)系統(tǒng)), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區(qū)分 binary data (二進制數(shù)據(jù))和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數(shù)據(jù))解釋為 string (字符串)以提供與這些系統(tǒng)的兼容性.
spark.sql.parquet.int96AsTimestamptrue一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數(shù)據(jù)解析為 timestamp 以提供與這些系統(tǒng)的兼容性.
spark.sql.parquet.cacheMetadatatrue打開 Parquet schema metadata 的緩存. 可以加快查詢靜態(tài)數(shù)據(jù).
spark.sql.parquet.compression.codecsnappy在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdowntrue設置為 true 時啟用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquettrue當設置為 false 時, Spark SQL 將使用 Hive SerDe 作為 parquet tables , 而不是內(nèi)置的支持.
spark.sql.parquet.mergeSchemafalse當為 true 時, Parquet data source (Parquet 數(shù)據(jù)源) merges (合并)從所有 data files (數(shù)據(jù)文件)收集的 schemas , 否則如果沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .
spark.sql.optimizer.metadataOnlytrue如果為 true , 則啟用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區(qū)列)而不是 table scans (表掃描). 當 scanned (掃描)的所有 columns (列)都是 partition columns (分區(qū)列)并且 query (查詢)具有滿足 distinct semantics (不同語義)的 aggregate operator (聚合運算符)時, 它將適用.
JSON Datasets (JSON 數(shù)據(jù)集)
Spark SQL 可以 automatically infer (自動推斷)JSON dataset 的 schema, 并將其作為Dataset[Row]加載. 這個 conversion (轉(zhuǎn)換)可以在Dataset[String]上使用SparkSession.read.json()來完成, 或 JSON 文件.
請注意, 以a json file提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱JSON Lines text format, also called newline-delimited JSON.
對于 regular multi-line JSON file (常規(guī)的多行 JSON 文件), 將multiLine選項設置為true.
// Primitive types (Int, String, etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.importspark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesvalpath="examples/src/main/resources/people.json"valpeopleDF=spark.read.json(path)// The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()// root//? |-- age: long (nullable = true)//? |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by sparkvalteenagerNamesDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()// +------+// |? name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset[String] storing one JSON object per stringvalotherPeopleDataset=spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""::Nil)valotherPeople=spark.read.json(otherPeopleDataset)otherPeople.show()// +---------------+----+// |? ? ? ? address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Spark SQL 還支持讀取和寫入存儲在Apache Hive中的數(shù)據(jù)。 但是妄迁,由于 Hive 具有大量依賴關系寝蹈,因此這些依賴關系不包含在默認 Spark 分發(fā)中。 如果在類路徑中找到 Hive 依賴項登淘,Spark 將自動加載它們箫老。 請注意,這些 Hive 依賴關系也必須存在于所有工作節(jié)點上黔州,因為它們將需要訪問 Hive 序列化和反序列化庫 (SerDes)槽惫,以訪問存儲在 Hive 中的數(shù)據(jù)周叮。
通過將hive-site.xml,core-site.xml(用于安全配置)和hdfs-site.xml(用于 HDFS 配置)文件放在conf/中來完成配置。
當使用 Hive 時界斜,必須用 Hive 支持實例化SparkSession仿耽,包括連接到持續(xù)的 Hive 轉(zhuǎn)移,支持 Hive serdes 和 Hive 用戶定義的功能各薇。 沒有現(xiàn)有 Hive 部署的用戶仍然可以啟用 Hive 支持项贺。 當hive-site.xml未配置時,上下文會自動在當前目錄中創(chuàng)建metastore_db峭判,并創(chuàng)建由spark.sql.warehouse.dir配置的目錄开缎,該目錄默認為Spark應用程序當前目錄中的spark-warehouse目錄 開始了 請注意,自從2.0.0以來林螃,hive-site.xml中的hive.metastore.warehouse.dir屬性已被棄用奕删。 而是使用spark.sql.warehouse.dir來指定倉庫中數(shù)據(jù)庫的默認位置。 您可能需要向啟動 Spark 應用程序的用戶授予寫權(quán)限疗认。?
importjava.io.Fileimportorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessioncaseclassRecord(key:Int,value:String)// warehouseLocation points to the default location for managed databases and tablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).enableHiveSupport().getOrCreate()importspark.implicits._importspark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsql("SELECT * FROM src").show()// +---+-------+// |key|? value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.sql("SELECT COUNT(*) FROM src").show()// +--------+// |count(1)|// +--------+// |? ? 500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.valsqlDF=sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.valstringsDS=sqlDF.map{caseRow(key:Int,value:String)=>s"Key:$key, Value:$value"}stringsDS.show()// +--------------------+// |? ? ? ? ? ? ? value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// |? 2| val_2|? 2| val_2|// |? 4| val_4|? 4| val_4|// |? 5| val_5|? 5| val_5|// ...
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.
創(chuàng)建 Hive 表時眠蚂,需要定義如何 從/向 文件系統(tǒng) read/write 數(shù)據(jù)赤兴,即 “輸入格式” 和 “輸出格式”钩杰。 您還需要定義該表如何將數(shù)據(jù)反序列化為行低散,或?qū)⑿行蛄谢癁閿?shù)據(jù),即 “serde”缎浇。 以下選項可用于指定存儲格式 (“serde”, “input format”, “output format”)扎拣,例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')素跺。 默認情況下二蓝,我們將以純文本形式讀取表格文件。 請注意指厌,Hive 存儲處理程序在創(chuàng)建表時不受支持侣夷,您可以使用 Hive 端的存儲處理程序創(chuàng)建一個表,并使用 Spark SQL 來讀取它仑乌。
Property NameMeaning
fileFormatfileFormat是一種存儲格式規(guī)范的包百拓,包括 "serde","input format" 和 "output format"晰甚。 目前我們支持6個文件格式:'sequencefile'衙传,'rcfile','orc'厕九,'parquet'蓖捶,'textfile'和'avro'。
inputFormat, outputFormat這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定為字符串文字扁远,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`俊鱼。 這兩個選項必須成對出現(xiàn)刻像,如果您已經(jīng)指定了 "fileFormat" 選項,則無法指定它們并闲。
serde此選項指定 serde 類的名稱细睡。 當指定 `fileFormat` 選項時,如果給定的 `fileFormat` 已經(jīng)包含 serde 的信息帝火,那么不要指定這個選項溜徙。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可以使用這3個文件格式的這個選項犀填。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim這些選項只能與 "textfile" 文件格式一起使用蠢壹。它們定義如何將分隔的文件讀入行。
使用OPTIONS定義的所有其他屬性將被視為 Hive serde 屬性九巡。
Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互图贸,這使得 Spark SQL 能夠訪問 Hive 表的元數(shù)據(jù)。 從 Spark 1.4.0 開始冕广,使用 Spark SQL 的單一二進制構(gòu)建可以使用下面所述的配置來查詢不同版本的 Hive 轉(zhuǎn)移疏日。 請注意,獨立于用于與轉(zhuǎn)移點通信的 Hive 版本佳窑,內(nèi)部 Spark SQL 將針對 Hive 1.2.1 進行編譯,并使用這些類進行內(nèi)部執(zhí)行(serdes父能,UDF神凑,UDAF等)。
以下選項可用于配置用于檢索元數(shù)據(jù)的 Hive 版本:
屬性名稱默認值含義
spark.sql.hive.metastore.version1.2.1Hive metastore 版本何吝。 可用選項為0.12.0至1.2.1溉委。
spark.sql.hive.metastore.jarsbuiltin當啟用-Phive時,使用 Hive 1.2.1爱榕,它與 Spark 程序集捆綁在一起瓣喊。選擇此選項時,spark.sql.hive.metastore.version 必須為1.2.1或未定義黔酥。 行家 使用從Maven存儲庫下載的指定版本的Hive jar藻三。 通常不建議在生產(chǎn)部署中使用此配置。 ***** 應用于實例化 HiveMetastoreClient 的 jar 的位置跪者。該屬性可以是三個選項之一:
builtin當啟用-Phive時棵帽,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一起渣玲。選擇此選項時逗概,spark.sql.hive.metastore.version必須為1.2.1或未定義。
maven使用從 Maven 存儲庫下載的指定版本的 Hive jar忘衍。通常不建議在生產(chǎn)部署中使用此配置逾苫。
JVM 的標準格式的 classpath卿城。 該類路徑必須包含所有 Hive 及其依賴項,包括正確版本的 Hadoop铅搓。這些罐只需要存在于 driver 程序中瑟押,但如果您正在運行在 yarn 集群模式,那么您必須確保它們與應用程序一起打包狸吞。
spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc使用逗號分隔的類前綴列表勉耀,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver蹋偏。 其它需要共享的類便斥,是需要與已經(jīng)共享的類進行交互的。 例如威始,log4j 使用的自定義 appender枢纠。
spark.sql.hive.metastore.barrierPrefixes(empty)一個逗號分隔的類前綴列表,應該明確地為 Spark SQL 正在通信的 Hive 的每個版本重新加載黎棠。 例如晋渺,在通常將被共享的前綴中聲明的 Hive UDF (即: ?org.apache.spark.*)。
Spark SQL 還包括可以使用 JDBC 從其他數(shù)據(jù)庫讀取數(shù)據(jù)的數(shù)據(jù)源脓斩。此功能應優(yōu)于使用JdbcRDD木西。 這是因為結(jié)果作為 DataFrame 返回,并且可以輕松地在 Spark SQL 中處理或與其他數(shù)據(jù)源連接随静。 JDBC 數(shù)據(jù)源也更容易從 Java 或 Python 使用八千,因為它不需要用戶提供 ClassTag。(請注意燎猛,這不同于 Spark SQL JDBC 服務器恋捆,允許其他應用程序使用 Spark SQL 運行查詢)。
要開始使用重绷,您需要在 Spark 類路徑中包含特定數(shù)據(jù)庫的 JDBC driver 程序沸停。 例如,要從 Spark Shell 連接到 postgres昭卓,您將運行以下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
可以使用 Data Sources API 將來自遠程數(shù)據(jù)庫的表作為 DataFrame 或 Spark SQL 臨時視圖進行加載愤钾。 用戶可以在數(shù)據(jù)源選項中指定 JDBC 連接屬性。用戶和密碼通常作為登錄數(shù)據(jù)源的連接屬性提供候醒。 除了連接屬性外绰垂,Spark 還支持以下不區(qū)分大小寫的選項:
?屬性名稱含義
url要連接的JDBC URL。 源特定的連接屬性可以在URL中指定火焰。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable應該讀取的 JDBC 表劲装。請注意,可以使用在SQL查詢的FROM子句中有效的任何內(nèi)容。 例如占业,您可以使用括號中的子查詢代替完整表绒怨。
driver用于連接到此 URL 的 JDBC driver 程序的類名。
partitionColumn, lowerBound, upperBound如果指定了這些選項谦疾,則必須指定這些選項南蹂。 另外,必須指定numPartitions. 他們描述如何從多個 worker 并行讀取數(shù)據(jù)時將表給分區(qū)念恍。partitionColumn必須是有問題的表中的數(shù)字列六剥。 請注意,lowerBound和upperBound僅用于決定分區(qū)的大小峰伙,而不是用于過濾表中的行疗疟。 因此栈雳,表中的所有行將被分區(qū)并返回冠息。此選項僅適用于讀操作枣耀。
numPartitions在表讀寫中可以用于并行度的最大分區(qū)數(shù)凿渊。這也確定并發(fā)JDBC連接的最大數(shù)量。 如果要寫入的分區(qū)數(shù)超過此限制蕊梧,則在寫入之前通過調(diào)用coalesce(numPartitions)將其減少到此限制炉抒。
fetchsizeJDBC 抓取的大小盏混,用于確定每次數(shù)據(jù)往返傳遞的行數(shù)音榜。 這有利于提升 JDBC driver 的性能庞瘸,它們的默認值較小(例如: Oracle 是 10 行)赠叼。 該選項僅適用于讀取操作擦囊。
batchsizeJDBC 批處理的大小,用于確定每次數(shù)據(jù)往返傳遞的行數(shù)梅割。 這有利于提升 JDBC driver 的性能霜第。 該選項僅適用于寫操作葛家。默認值為1000.
isolationLevel事務隔離級別户辞,適用于當前連接。 它可以是NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ, 或SERIALIZABLE之一癞谒,對應于 JDBC 連接對象定義的標準事務隔離級別底燎,默認為READ_UNCOMMITTED。 此選項僅適用于寫操作弹砚。請參考java.sql.Connection中的文檔双仍。
truncate這是一個與 JDBC 相關的選項。 啟用SaveMode.Overwrite時桌吃,此選項會導致 Spark 截斷現(xiàn)有表朱沃,而不是刪除并重新創(chuàng)建。 這可以更有效,并且防止表元數(shù)據(jù)(例如逗物,索引)被移除搬卒。 但是,在某些情況下翎卓,例如當新數(shù)據(jù)具有不同的模式時契邀,它將無法工作。 它默認為false失暴。 此選項僅適用于寫操作坯门。
createTableOptions這是一個與JDBC相關的選項。 如果指定逗扒,此選項允許在創(chuàng)建表時設置特定于數(shù)據(jù)庫的表和分區(qū)選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)古戴。此選項僅適用于寫操作。
createTableColumnTypes使用數(shù)據(jù)庫列數(shù)據(jù)類型而不是默認值缴阎,創(chuàng)建表時允瞧。 數(shù)據(jù)類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的類型應該是有效的 spark sql 數(shù)據(jù)類型蛮拔。此選項僅適用于寫操作述暂。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourcevaljdbcDF=spark.read.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").load()valconnectionProperties=newProperties()connectionProperties.put("user","username")connectionProperties.put("password","password")valjdbcDF2=spark.read.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Saving data to a JDBC sourcejdbcDF.write.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Specifying create table column data types on writejdbcDF.write.option("createTableColumnTypes","name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
JDBC driver 程序類必須對客戶端會話和所有執(zhí)行程序上的原始類加載器可見。 這是因為 Java 的 DriverManager 類執(zhí)行安全檢查建炫,導致它忽略原始類加載器不可見的所有 driver 程序畦韭,當打開連接時。一個方便的方法是修改所有工作節(jié)點上的compute_classpath.sh 以包含您的 driver 程序 JAR肛跌。
一些數(shù)據(jù)庫艺配,例如 H2,將所有名稱轉(zhuǎn)換為大寫衍慎。 您需要使用大寫字母來引用 Spark SQL 中的這些名稱转唉。
對于某些工作負載,可以通過緩存內(nèi)存中的數(shù)據(jù)或打開一些實驗選項來提高性能稳捆。
Spark SQL 可以通過調(diào)用spark.catalog.cacheTable("tableName")或dataFrame.cache()來使用內(nèi)存中的列格式來緩存表赠法。 然后,Spark SQL 將只掃描所需的列乔夯,并將自動調(diào)整壓縮以最小化內(nèi)存使用量和 GC 壓力砖织。 您可以調(diào)用spark.catalog.uncacheTable("tableName")從內(nèi)存中刪除該表。
內(nèi)存緩存的配置可以使用SparkSession上的setConf方法或使用 SQL 運行SET key=value命令來完成末荐。
屬性名稱默認含義
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為 true 時侧纯,Spark SQL 將根據(jù)數(shù)據(jù)的統(tǒng)計信息為每個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize10000控制批量的柱狀緩存的大小甲脏。更大的批量大小可以提高內(nèi)存利用率和壓縮率眶熬,但是在緩存數(shù)據(jù)時會冒出 OOM 風險妹笆。
以下選項也可用于調(diào)整查詢執(zhí)行的性能。這些選項可能會在將來的版本中被廢棄娜氏,因為更多的優(yōu)化是自動執(zhí)行的晾浴。
屬性名稱默認值含義
spark.sql.files.maxPartitionBytes134217728 (128 MB)在讀取文件時,將單個分區(qū)打包的最大字節(jié)數(shù)牍白。
spark.sql.files.openCostInBytes4194304 (4 MB)按照字節(jié)數(shù)來衡量的打開文件的估計費用可以在同一時間進行掃描脊凰。 將多個文件放入分區(qū)時使用。最好過度估計茂腥,那么具有小文件的分區(qū)將比具有較大文件的分區(qū)(首先計劃的)更快狸涌。
spark.sql.broadcastTimeout300廣播連接中的廣播等待時間超時(秒)
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置執(zhí)行連接時將廣播給所有工作節(jié)點的表的最大大小(以字節(jié)為單位)。 通過將此值設置為-1可以禁用廣播懒豹。 請注意脸秽,目前的統(tǒng)計信息僅支持 Hive Metastore 表蝴乔,其中已運行命令ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations.
Spark SQL 也可以充當使用其 JDBC/ODBC 或命令行界面的分布式查詢引擎。 在這種模式下,最終用戶或應用程序可以直接與 Spark SQL 交互運行 SQL 查詢榆苞,而不需要編寫任何代碼。
這里實現(xiàn)的 Thrift JDBC/ODBC 服務器對應于 Hive 1.2 中的HiveServer2。 您可以使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。
要啟動 JDBC/ODBC 服務器趁尼,請在 Spark 目錄中運行以下命令:
./sbin/start-thriftserver.sh
此腳本接受所有bin/spark-submit命令行選項,以及--hiveconf選項來指定 Hive 屬性似炎。 您可以運行./sbin/start-thriftserver.sh --help查看所有可用選項的完整列表传睹。 默認情況下邢隧,服務器監(jiān)聽 localhost:10000. 您可以通過環(huán)境變量覆蓋此行為包券,即:
exportHIVE_SERVER2_THRIFT_PORT=exportHIVE_SERVER2_THRIFT_BIND_HOST=./sbin/start-thriftserver.sh\--master \...
or system properties:
./sbin/start-thriftserver.sh\--hiveconf hive.server2.thrift.port=\--hiveconf hive.server2.thrift.bind.host=\--master ? ...
現(xiàn)在侍郭,您可以使用 beeline 來測試 Thrift JDBC/ODBC 服務器:
./bin/beeline
使用 beeline 方式連接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://localhost:10000
Beeline 將要求您輸入用戶名和密碼。 在非安全模式下,只需輸入機器上的用戶名和空白密碼即可肌访。 對于安全模式蟹演,請按照beeline 文檔中的說明進行操作羞反。
配置Hive是通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的澄惊。
您也可以使用 Hive 附帶的 beeline 腳本毕贼。
Thrift JDBC 服務器還支持通過 HTTP 傳輸發(fā)送 thrift RPC 消息。 使用以下設置啟用 HTTP 模式作為系統(tǒng)屬性或在conf/中的hive-site.xml文件中啟用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
要測試锥余,請使用 beeline 以 http 模式連接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://:/?hive.server2.transport.mode=http;hive.server2.thrift.http.path=
Spark SQL CLI 是在本地模式下運行 Hive 轉(zhuǎn)移服務并執(zhí)行從命令行輸入的查詢的方便工具佃牛。 請注意爷速,Spark SQL CLI 不能與 Thrift JDBC 服務器通信毙石。
要啟動 Spark SQL CLI漂洋,請在 Spark 目錄中運行以下命令:
./bin/spark-sql
配置 Hive 是通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的弟孟。 您可以運行./bin/spark-sql --help獲取所有可用選項的完整列表蔼水。
Spark 2.1.1 介紹了一個新的配置 key:spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是NEVER_INFER, 其行為與 2.1.0 保持一致. 但是优炬,Spark 2.2.0 將此設置的默認值更改為 “INFER_AND_SAVE”葵硕,以恢復與底層文件 schema(模式)具有大小寫混合的列名稱的 Hive metastore 表的兼容性蘸劈。使用INFER_AND_SAVE配置的 value, 在第一次訪問 Spark 將對其尚未保存推測 schema(模式)的任何 Hive metastore 表執(zhí)行 schema inference(模式推斷). 請注意棒掠,對于具有數(shù)千個 partitions(分區(qū))的表,模式推斷可能是非常耗時的操作。如果不兼容大小寫混合的列名煎殷,您可以安全地將spark.sql.hive.caseSensitiveInferenceMode設置為NEVER_INFER饵撑,以避免模式推斷的初始開銷。請注意钮孵,使用新的默認INFER_AND_SAVE設置漾唉,模式推理的結(jié)果被保存為 metastore key 以供將來使用蚪战。因此概漱,初始模式推斷僅發(fā)生在表的第一次訪問腻异。
Datasource tables(數(shù)據(jù)源表)現(xiàn)在存儲了 Hive metastore 中的 partition metadata(分區(qū)元數(shù)據(jù)). 這意味著諸如ALTER TABLE PARTITION ... SET LOCATION這樣的 Hive DDLs 現(xiàn)在使用 Datasource API 可用于創(chuàng)建 tables(表).
遺留的數(shù)據(jù)源表可以通過MSCK REPAIR TABLE命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
要確定表是否已遷移耻台,當在表上發(fā)出DESCRIBE FORMATTED命令時請查找PartitionProvider: Catalog屬性.
Datasource tables(數(shù)據(jù)源表)的INSERT OVERWRITE TABLE ... PARTITION ...行為的更改都弹。
在以前的 Spark 版本中框杜,INSERT OVERWRITE覆蓋了整個 Datasource table,即使給出一個指定的 partition. 現(xiàn)在只有匹配規(guī)范的 partition 被覆蓋。
請注意味咳,這仍然與 Hive 表的行為不同再层,Hive 表僅覆蓋與新插入數(shù)據(jù)重疊的分區(qū)棍鳖。
SparkSession現(xiàn)在是 Spark 新的切入點, 它替代了老的SQLContext和HiveContext侣肄。注意 : 為了向下兼容,老的 SQLContext 和 HiveContext 仍然保留醇份。可以從SparkSession獲取一個新的catalog接口 — 現(xiàn)有的訪問數(shù)據(jù)庫和表的 API僚纷,如listTables矩距,createExternalTable,dropTempView怖竭,cacheTable都被移到該接口锥债。
Dataset API 和 DataFrame API 進行了統(tǒng)一。在 Scala 中,DataFrame變成了Dataset[Row]類型的一個別名哮肚,而 Java API 使用者必須將DataFrame替換成Dataset毅整。Dataset 類既提供了強類型轉(zhuǎn)換操作(如map,filter以及groupByKey)也提供了非強類型轉(zhuǎn)換操作(如select和groupBy)绽左。由于編譯期的類型安全不是 Python 和 R 語言的一個特性悼嫉,Dataset 的概念并不適用于這些語言的 API。相反拼窥,DataFrame仍然是最基本的編程抽象, 就類似于這些語言中單節(jié)點 data frame 的概念戏蔑。
Dataset 和 DataFrame API 中 unionAll 已經(jīng)過時并且由union替代。
Dataset 和 DataFrame API 中 explode 已經(jīng)過時鲁纠,作為選擇总棵,可以結(jié)合 select 或 flatMap 使用functions.explode()。
Dataset 和 DataFrame API 中registerTempTable已經(jīng)過時并且由createOrReplaceTempView替代改含。
對 Hive tablesCREATE TABLE ... LOCATION行為的更改.
從 Spark 2.0 開始情龄,CREATE TABLE ... LOCATION與CREATE EXTERNAL TABLE ... LOCATION是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現(xiàn)有數(shù)據(jù)捍壤。這意味著骤视,在用戶指定位置的 Spark SQL 中創(chuàng)建的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數(shù)據(jù)鹃觉。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意专酗,這與Hive行為不同。
因此盗扇,這些表上的 “DROP TABLE” 語句不會刪除數(shù)據(jù)祷肯。
從 Spark 1.6 開始,默認情況下服務器在多 session(會話)模式下運行疗隶。這意味著每個 JDBC/ODBC 連接擁有一份自己的 SQL 配置和臨時函數(shù)注冊佑笋。緩存表仍在并共享。如果您希望以舊的單會話模式運行 Thrift server斑鼻,請設置選項spark.sql.hive.thriftServer.singleSession為true蒋纬。您既可以將此選項添加到spark-defaults.conf,或者通過--conf將它傳遞給start-thriftserver.sh卵沉。
./sbin/start-thriftserver.sh\--conf spark.sql.hive.thriftServer.singleSession=true\...
從 1.6.1 開始颠锉,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現(xiàn)有列。
從 Spark 1.6 開始史汗,LongType 強制轉(zhuǎn)換為 TimestampType 期望是秒琼掠,而不是微秒。這種更改是為了匹配 Hive 1.2 的行為停撞,以便從 numeric(數(shù)值)類型進行更一致的類型轉(zhuǎn)換到 TimestampType瓷蛙。更多詳情請參閱SPARK-11724悼瓮。
使用手動管理的內(nèi)存優(yōu)化執(zhí)行,現(xiàn)在是默認啟用的艰猬,以及代碼生成表達式求值横堡。這些功能既可以通過設置spark.sql.tungsten.enabled為false來禁止使用。
Parquet 的模式合并默認情況下不再啟用冠桃。它可以通過設置spark.sql.parquet.mergeSchema到true以重新啟用命贴。
字符串在 Python 列的 columns(列)現(xiàn)在支持使用點(.)來限定列或訪問嵌套值。例如df['table.column.nestedField']食听。但是胸蛛,這意味著如果你的列名中包含任何圓點,你現(xiàn)在必須避免使用反引號(如table.column.with.dots.nested)樱报。
在內(nèi)存中的列存儲分區(qū)修剪默認是開啟的葬项。它可以通過設置spark.sql.inMemoryColumnarStorage.partitionPruning為false來禁用。
無限精度的小數(shù)列不再支持迹蛤,而不是 Spark SQL 最大精度為 38 民珍。當從BigDecimal對象推斷模式時,現(xiàn)在使用(38盗飒,18)嚷量。在 DDL 沒有指定精度時,則默認保留Decimal(10, 0)箩兽。
時間戳現(xiàn)在存儲在 1 微秒的精度津肛,而不是 1 納秒的章喉。
在 sql 語句中汗贫,floating point(浮點數(shù))現(xiàn)在解析為 decimal。HiveQL 解析保持不變秸脱。
SQL / DataFrame 函數(shù)的規(guī)范名稱現(xiàn)在是小寫(例如 sum vs SUM)落包。
JSON 數(shù)據(jù)源不會自動加載由其他應用程序(未通過 Spark SQL 插入到數(shù)據(jù)集的文件)創(chuàng)建的新文件。對于 JSON 持久表(即表的元數(shù)據(jù)存儲在 Hive Metastore)摊唇,用戶可以使用REFRESH TABLESQL 命令或HiveContext的refreshTable方法咐蝇,把那些新文件列入到表中。對于代表一個 JSON dataset 的 DataFrame巷查,用戶需要重新創(chuàng)建 DataFrame有序,同時 DataFrame 中將包括新的文件。
PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現(xiàn)有的同名列岛请。
DataFrame data reader/writer interface
基于用戶反饋旭寿,我們創(chuàng)建了一個新的更流暢的 API,用于讀取 (SQLContext.read) 中的數(shù)據(jù)并寫入數(shù)據(jù) (DataFrame.write), 并且舊的 API 將過時(例如崇败,SQLContext.parquetFile,SQLContext.jsonFile).
針對SQLContext.read(Scala,Java,Python) 和DataFrame.write(Scala,Java,Python) 的更多細節(jié)盅称,請看 API 文檔.
DataFrame.groupBy 保留 grouping columns(分組的列)
根據(jù)用戶的反饋肩祥, 我們更改了DataFrame.groupBy().agg()的默認行為以保留DataFrame結(jié)果中的 grouping columns(分組列). 為了在 1.3 中保持該行為,請設置spark.sql.retainGroupColumns為false.
// In 1.3.x, in order for the grouping column "department" to show up,// it must be included explicitly as part of the agg function call.df.groupBy("department").agg($"department",max("age"),sum("expense"))// In 1.4+, grouping column "department" is included automatically.df.groupBy("department").agg(max("age"),sum("expense"))// Revert to 1.3 behavior (not retaining grouping column) by:sqlContext.setConf("spark.sql.retainGroupColumns","false")
之前 1.4 版本中缩膝,DataFrame.withColumn() 只支持添加列混狠。該列將始終在 DateFrame 結(jié)果中被加入作為新的列,即使現(xiàn)有的列可能存在相同的名稱疾层。從 1.4 版本開始将饺,DataFrame.withColumn() 支持添加與所有現(xiàn)有列的名稱不同的列或替換現(xiàn)有的同名列。
請注意痛黎,這一變化僅適用于 Scala API俯逾,并不適用于 PySpark 和 SparkR。
在 Spark 1.3 中舅逸,我們從 Spark SQL 中刪除了 “Alpha” 的標簽桌肴,作為一部分已經(jīng)清理過的可用的 API 庶橱。從 Spark 1.3 版本以上氛琢,Spark SQL 將提供在 1.X 系列的其他版本的二進制兼容性糕非。這種兼容性保證不包括被明確標記為不穩(wěn)定的(即 DeveloperApi 類或 Experimental) API蹬竖。
升級到 Spark SQL 1.3 版本時红碑,用戶會發(fā)現(xiàn)最大的變化是尊浓,SchemaRDD已更名為DataFrame撑螺。這主要是因為 DataFrames 不再從 RDD 直接繼承吏砂,而是由 RDDS 自己來實現(xiàn)這些功能蝇恶。DataFrames 仍然可以通過調(diào)用.rdd方法轉(zhuǎn)換為 RDDS 拳魁。
在 Scala 中,有一個從SchemaRDD到DataFrame類型別名撮弧,可以為一些情況提供源代碼兼容性潘懊。它仍然建議用戶更新他們的代碼以使用DataFrame來代替。Java 和 Python 用戶需要更新他們的代碼贿衍。
此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext和JavaSchemaRDD)授舟,借鑒于 Scala API。在 Spark 1.3 中贸辈,Java API 和 Scala API 已經(jīng)統(tǒng)一释树。兩種語言的用戶可以使用SQLContext和DataFrame。一般來說論文類嘗試使用兩種語言的共有類型(如Array替代了一些特定集合)擎淤。在某些情況下不通用的類型情況下奢啥,(例如,passing in closures 或 Maps)使用函數(shù)重載代替嘴拢。
此外桩盲,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可以使用存在于org.apache.spark.sql.types類來描述編程模式炊汤。
隔離隱式轉(zhuǎn)換和刪除 dsl 包(僅Scala)
許多 Spark 1.3 版本以前的代碼示例都以import sqlContext._開始正驻,這提供了從 sqlContext 范圍的所有功能弊攘。在 Spark 1.3 中,我們移除了從RDDs 到DateFrame再到SQLContext內(nèi)部對象的隱式轉(zhuǎn)換姑曙。用戶現(xiàn)在應該寫成import sqlContext.implicits._.
此外襟交,隱式轉(zhuǎn)換現(xiàn)在只能使用方法toDF來增加由Product(即 case classes or tuples)構(gòu)成的RDD,而不是自動應用伤靠。
當使用 DSL 內(nèi)部的函數(shù)時(現(xiàn)在使用DataFrameAPI 來替換), 用戶習慣導入org.apache.spark.sql.catalyst.dsl. 相反捣域,應該使用公共的 dataframe 函數(shù) API:import org.apache.spark.sql.functions._.
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala)
Spark 1.3 移除存在于基本 SQL 包的DataType類型別名。開發(fā)人員應改為導入類org.apache.spark.sql.types宴合。
UDF 注冊遷移到sqlContext.udf中 (Java & Scala)
用于注冊 UDF 的函數(shù)焕梅,不管是 DataFrame DSL 還是 SQL 中用到的,都被遷移到SQLContext中的 udf 對象中卦洽。
sqlContext.udf.register("strLen",(s:String)=>s.length())
Python UDF 注冊保持不變贞言。
Python DataTypes 不再是 Singletons(單例的)
在 Python 中使用 DataTypes 時,你需要先構(gòu)造它們(如:StringType())阀蒂,而不是引用一個單例對象该窗。
Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性蚤霞。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本酗失,并且Spark SQL 可以連接到不同版本的Hive metastore(從 0.12.0 到 1.2.1,可以參考與不同版本的 Hive Metastore 交互)
在現(xiàn)有的 Hive Warehouses 中部署
Spark SQL Thrift JDBC server 采用了開箱即用的設計以兼容已有的 Hive 安裝版本昧绣。你不需要修改現(xiàn)有的 Hive Metastore , 或者改變數(shù)據(jù)的位置和表的分區(qū)规肴。
Spark SQL 支持絕大部分的 Hive 功能,如:
Hive query(查詢)語句, 包括:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
所有 Hive 操作, 包括:
關系運算符 (=,?,==,<>,<,>,>=,<=, 等等)
算術運算符 (+,-,*,/,%, 等等)
邏輯運算符 (AND,&&,OR,||, 等等)
復雜類型的構(gòu)造
數(shù)學函數(shù) (sign,ln,cos, 等等)
String 函數(shù) (instr,length,printf, 等等)
用戶定義函數(shù) (UDF)
用戶定義聚合函數(shù) (UDAF)
用戶定義 serialization formats (SerDes)
窗口函數(shù)
Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
Unions
Sub-queries(子查詢)
SELECT col FROM ( SELECT a + b AS col from t1) t2
Sampling
Explain
Partitioned tables including dynamic partition insertion
View
所有的 Hive DDL 函數(shù), 包括:
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
大部分的 Hive Data types(數(shù)據(jù)類型), 包括:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
以下是目前還不支持的 Hive 函數(shù)列表夜畴。在 Hive 部署中這些功能大部分都用不到拖刃。
主要的 Hive 功能
Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.
Esoteric Hive 功能
UNION類型
Unique join
Column 統(tǒng)計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.
Hive Input/Output Formats
File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
Hadoop archive
Hive 優(yōu)化
有少數(shù) Hive 優(yōu)化還沒有包含在 Spark 中。其中一些(比如 indexes 索引)由于 Spark SQL 的這種內(nèi)存計算模型而顯得不那么重要斩启。另外一些在 Spark SQL 未來的版本中會持續(xù)跟蹤序调。
Block 級別的 bitmap indexes 和虛擬 columns (用于構(gòu)建 indexes)
自動為 join 和 groupBy 計算 reducer 個數(shù) : 目前在 Spark SQL 中, 你需要使用 “SET spark.sql.shuffle.partitions=[num_tasks];” 來控制 post-shuffle 的并行度.
僅 Meta-data 的 query: 對于只使用 metadata 就能回答的查詢,Spark SQL 仍然會啟動計算結(jié)果的任務.
Skew data flag: Spark SQL 不遵循 Hive 中 skew 數(shù)據(jù)的標記.
STREAMTABLEhint in join: Spark SQL 不遵循STREAMTABLEhint.
對于查詢結(jié)果合并多個小文件: 如果輸出的結(jié)果包括多個小文件, Hive 可以可選的合并小文件到一些大文件中去兔簇,以避免溢出 HDFS metadata. Spark SQL 還不支持這樣.
Spark SQL 和 DataFrames 支持下面的數(shù)據(jù)類型:
Numeric types
ByteType: Represents 1-byte signed integer numbers. The range of numbers is from-128to127.
ShortType: Represents 2-byte signed integer numbers. The range of numbers is from-32768to32767.
IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from-2147483648to2147483647.
LongType: Represents 8-byte signed integer numbers. The range of numbers is from-9223372036854775808to9223372036854775807.
FloatType: Represents 4-byte single-precision floating point numbers.
DoubleType: Represents 8-byte double-precision floating point numbers.
DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally byjava.math.BigDecimal. ABigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
String type
StringType: Represents character string values.
Binary type
BinaryType: Represents byte sequence values.
Boolean type
BooleanType: Represents boolean values.
Datetime type
TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
DateType: Represents values comprising values of fields year, month, day.
Complex types
ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type ofelementType.containsNullis used to indicate if elements in aArrayTypevalue can havenullvalues.
MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described bykeyTypeand the data type of values are described byvalueType. For aMapTypevalue, keys are not allowed to havenullvalues.valueContainsNullis used to indicate if values of aMapTypevalue can havenullvalues.
StructType(fields): Represents values with the structure described by a sequence ofStructFields (fields).
StructField(name, dataType, nullable): Represents a field in aStructType. The name of a field is indicated byname. The data type of a field is indicated bydataType.nullableis used to indicate if values of this fields can havenullvalues.
Spark SQL 的所有數(shù)據(jù)類型都在包org.apache.spark.sql.types中. 你可以用下示例示例來訪問它們.
importorg.apache.spark.sql.types._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數(shù)據(jù)類型)Scala 中的 Value 類型訪問或創(chuàng)建數(shù)據(jù)類型的 API
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayTypescala.collection.SeqArrayType(elementType, [containsNull])
Note(注意):containsNull的默認值是true.
MapTypescala.collection.MapMapType(keyType,valueType, [valueContainsNull])
Note(注意):valueContainsNull的默認值是true.
StructTypeorg.apache.spark.sql.RowStructType(fields)
Note(注意):fields是 StructFields 的 Seq. 所有, 兩個 fields 擁有相同的名稱是不被允許的.
StructField該 field(字段)數(shù)據(jù)類型的 Scala 中的 value 類型 (例如, 數(shù)據(jù)類型為 IntegerType 的 StructField 是 Int)StructField(name,dataType, [nullable])
Note:nullable的默認值是true.
當處理一些不符合標準浮點數(shù)語義的float或double類型時,對于 Not-a-Number(NaN) 需要做一些特殊處理. 具體如下:
NaN = NaN 返回 true.
在 aggregations(聚合)操作中硬耍,所有的 NaN values 將被分到同一個組中.
在 join key 中 NaN 可以當做一個普通的值.
NaN 值在升序排序中排到最后垄琐,比任何其他數(shù)值都大.
原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sql-programming-guide.html
網(wǎng)頁地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個 Star,謝謝经柴!~)