轉(zhuǎn)載請注明出處苏研,謝謝合作~
該篇中的示例暫時只有 Scala 版本~
上手 Spark SQL
- 入口:SparkSession(Starting Point: SparkSession)
- 創(chuàng)建 DataFrame(Creating DataFrames)
- 類型無關(guān)的 DataFrame 算子(Untyped Dataset Operations (aka DataFrame Operations))
- 編程中使用 SQL 查詢(Running SQL Queries Programmatically)
- 全局臨時視圖(Global Temporary View)
- 創(chuàng)建 Dataset(Creating Datasets)
- 與 RDD 交互(Interoperating with RDDs)
- 通過反射推斷表結(jié)構(gòu)(Inferring the Schema Using Reflection)
- 編程指定表結(jié)構(gòu)(Programmatically Specifying the Schema)
- 標(biāo)量函數(shù)(Scalar Functions)
- 聚合函數(shù)(Aggregate Functions)
入口:SparkSession
Spark 應(yīng)用程序的編程入口是 SparkSession
類砂豌,可以通過 SparkSession.builder()
創(chuàng)建一個基礎(chǔ)的 SparkSession
:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」吐辙。
Spark 2.0 中的 SparkSession
內(nèi)置了對 Hive 的支持,包括使用 HiveSQL 編寫查詢語句啥繁,使用 Hive UDF府瞄,以及從 Hive 表中讀取數(shù)據(jù)。這些功能需要首先安裝好 Hive惋嚎。
創(chuàng)建 DataFrame
應(yīng)用程序可以使用 SparkSession
通過一個現(xiàn)有的 RDD(existing RDD
)杠氢,通過 Hive 表,或者通過 Spark 數(shù)據(jù)源(Spark data sources)創(chuàng)建 DataFrame另伍。
下面的示例通過一個 JSON 文件創(chuàng)建 DataFrame:
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」鼻百。
類型無關(guān)的 DataFrame 算子
DataFrame 針對操作結(jié)構(gòu)化的數(shù)據(jù)提供了特定的算子(Scala, Java, Python 和 R)。
上文提到摆尝,對于 Spark 2.0 中的 Scala 和 Java API温艇, DataFrame 只是 Row
類型的 Dataset。相較于強(qiáng)類型相關(guān)的 Dataset堕汞,這些算子是類型無關(guān)的勺爱。
這里我們展示一些使用 Dataset 進(jìn)行 結(jié)構(gòu)化數(shù)據(jù)處理的基礎(chǔ)樣例:
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
此類操作 Dataset 的算子的完整列表詳見 API Documentation讯检。
除了簡單的列引用和表達(dá)式琐鲁,Dataset 還有一個強(qiáng)大的函數(shù)庫卫旱,包括操作字符串,日期計算围段,常見的數(shù)據(jù)計算等等顾翼。完整的函數(shù)列表參見 DataFrame Function Reference。
編程中使用 SQL 查詢
SparkSession
的 sql
方法可以讓應(yīng)用程序通過編程使用 SQL 查詢奈泪,返回值一個 DataFrame
适贸。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
全局臨時視圖
Spark SQL 中的臨時視圖在當(dāng)前 SparkSession
存在范圍內(nèi)有效涝桅,一旦 SparkSession
結(jié)束拜姿,臨時視圖就消失了。如果需要在不同的應(yīng)用程序之間共享臨時視圖冯遂,即使 SparkSession
結(jié)束依舊存在蕊肥,可以使用全局臨時視圖。全局臨時視圖與一個系統(tǒng)保留數(shù)據(jù)庫 global_temp
綁定债蜜,必須使用全限定名稱來使用晴埂,比如 SELECT * FROM global_temp.view1
。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」寻定。
創(chuàng)建 Dataset
Dataset 跟 RDD 類似儒洛,但是不像 RDD 那樣使用 Java 或者 Kryo 序列化器,在計算以及網(wǎng)絡(luò)傳輸過程中 Dataset 使用一個特定的 Encoder 來序列化對象狼速。盡管 encoder 和標(biāo)準(zhǔn)的序列化器都是用來將一個對象轉(zhuǎn)換為字節(jié)琅锻,encoder 采用的是動態(tài)代碼生成的,并且采用了一種特殊的格式向胡,Spark 可以對這種格式進(jìn)行像過濾恼蓬、排序和哈希運(yùn)行而不用將其反序列化為對象。
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」僵芹。
與 RDD 交互
Spark SQL 支持兩種不同的方式把一個現(xiàn)有的 RDD 轉(zhuǎn)換為 Dataset处硬。第一種方式是通過反射推斷一個定義了類型的 RDD 的表結(jié)構(gòu)。這種基于反射的方式可以使代碼簡潔拇派,在已知表結(jié)構(gòu)的場景下工作良好荷辕。
第二種方式是通過編程的方式構(gòu)建一個表結(jié)構(gòu)對象,并把它賦予一個現(xiàn)有的 RDD件豌。盡管這種方式相對復(fù)雜疮方,但是能夠在無法得知運(yùn)行時類型的情況下創(chuàng)建 Dataset。
通過反射推斷表結(jié)構(gòu)
Spark SQL 的 Scala 接口自動支持將一個樣本類類型的 RDD 轉(zhuǎn)換為一個 DataFrame茧彤。樣本類定義了表結(jié)構(gòu)骡显,通過反射獲取類中的字段名稱并將其應(yīng)用為列名。樣本類可以嵌套,還可以包含像 Seq
和 Array
這樣的復(fù)雜類型惫谤。該 RDD 會被隱式轉(zhuǎn)換為一個 DataFrame壁顶,之后可以注冊成一張表,該表可以通過 SQL 進(jìn)行查詢石挂。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」
編程指定表結(jié)構(gòu)
如果樣本類無法被實(shí)現(xiàn)創(chuàng)建(例如博助,一行數(shù)據(jù)以字符串的格式編碼,或者是需要被解析的文本類型的數(shù)據(jù)集痹愚,以及對于不同用戶來說需要抽取的字段不同),可以分三步編程創(chuàng)建一個 DataFrame
蛔糯。
- 通過一個 RDD 創(chuàng)建一個
Row
類型的 RDD拯腮; - 創(chuàng)建一個
StructType
類型的表結(jié)構(gòu)對象,需要與第 1 步Row
中的數(shù)據(jù)相對應(yīng)蚁飒; - 通過
SparkSession
提供的createDataFrame
方法將第 1 步生成的 RDD 和第 2 步生成的表結(jié)構(gòu)結(jié)合起來动壤。
例如:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。
標(biāo)量函數(shù)
標(biāo)量函數(shù)通過一行數(shù)據(jù)只返回一個單值淮逻,而不是像聚合函數(shù)那樣接收多行數(shù)據(jù)返回一個單值琼懊。Spark SQL 支持許多內(nèi)置標(biāo)量函數(shù)(Built-in Scalar Functions),同事也支持自定義標(biāo)量函數(shù)(User Defined Scalar Functions)爬早。
聚合函數(shù)
聚合函數(shù)接收多行數(shù)據(jù)返回一個單值哼丈。內(nèi)置的聚合函數(shù)(Built-in Aggregation Functions)提供了常見的聚合函數(shù),比如 count()
, countDistinct()
, avg()
, max()
, min()
等等筛严。用戶不用受預(yù)定義聚合函數(shù)的限制醉旦,可以定義自己的聚合函數(shù),詳情參見 User Defined Aggregate Functions桨啃。