0301 Getting Started

轉(zhuǎn)載請注明出處苏研,謝謝合作~

該篇中的示例暫時只有 Scala 版本~

上手 Spark SQL

入口:SparkSession

Spark 應(yīng)用程序的編程入口是 SparkSession 類砂豌,可以通過 SparkSession.builder() 創(chuàng)建一個基礎(chǔ)的 SparkSession

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」吐辙。

Spark 2.0 中的 SparkSession 內(nèi)置了對 Hive 的支持,包括使用 HiveSQL 編寫查詢語句啥繁,使用 Hive UDF府瞄,以及從 Hive 表中讀取數(shù)據(jù)。這些功能需要首先安裝好 Hive惋嚎。

創(chuàng)建 DataFrame

應(yīng)用程序可以使用 SparkSession 通過一個現(xiàn)有的 RDD(existing RDD)杠氢,通過 Hive 表,或者通過 Spark 數(shù)據(jù)源(Spark data sources)創(chuàng)建 DataFrame另伍。

下面的示例通過一個 JSON 文件創(chuàng)建 DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」鼻百。

類型無關(guān)的 DataFrame 算子

DataFrame 針對操作結(jié)構(gòu)化的數(shù)據(jù)提供了特定的算子(Scala, Java, PythonR)。

上文提到摆尝,對于 Spark 2.0 中的 Scala 和 Java API温艇, DataFrame 只是 Row 類型的 Dataset。相較于強(qiáng)類型相關(guān)的 Dataset堕汞,這些算子是類型無關(guān)的勺爱。

這里我們展示一些使用 Dataset 進(jìn)行 結(jié)構(gòu)化數(shù)據(jù)處理的基礎(chǔ)樣例:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

此類操作 Dataset 的算子的完整列表詳見 API Documentation讯检。

除了簡單的列引用和表達(dá)式琐鲁,Dataset 還有一個強(qiáng)大的函數(shù)庫卫旱,包括操作字符串,日期計算围段,常見的數(shù)據(jù)計算等等顾翼。完整的函數(shù)列表參見 DataFrame Function Reference

編程中使用 SQL 查詢

SparkSessionsql 方法可以讓應(yīng)用程序通過編程使用 SQL 查詢奈泪,返回值一個 DataFrame适贸。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

全局臨時視圖

Spark SQL 中的臨時視圖在當(dāng)前 SparkSession 存在范圍內(nèi)有效涝桅,一旦 SparkSession 結(jié)束拜姿,臨時視圖就消失了。如果需要在不同的應(yīng)用程序之間共享臨時視圖冯遂,即使 SparkSession 結(jié)束依舊存在蕊肥,可以使用全局臨時視圖。全局臨時視圖與一個系統(tǒng)保留數(shù)據(jù)庫 global_temp 綁定债蜜,必須使用全限定名稱來使用晴埂,比如 SELECT * FROM global_temp.view1

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」寻定。

創(chuàng)建 Dataset

Dataset 跟 RDD 類似儒洛,但是不像 RDD 那樣使用 Java 或者 Kryo 序列化器,在計算以及網(wǎng)絡(luò)傳輸過程中 Dataset 使用一個特定的 Encoder 來序列化對象狼速。盡管 encoder 和標(biāo)準(zhǔn)的序列化器都是用來將一個對象轉(zhuǎn)換為字節(jié)琅锻,encoder 采用的是動態(tài)代碼生成的,并且采用了一種特殊的格式向胡,Spark 可以對這種格式進(jìn)行像過濾恼蓬、排序和哈希運(yùn)行而不用將其反序列化為對象。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」僵芹。

與 RDD 交互

Spark SQL 支持兩種不同的方式把一個現(xiàn)有的 RDD 轉(zhuǎn)換為 Dataset处硬。第一種方式是通過反射推斷一個定義了類型的 RDD 的表結(jié)構(gòu)。這種基于反射的方式可以使代碼簡潔拇派,在已知表結(jié)構(gòu)的場景下工作良好荷辕。

第二種方式是通過編程的方式構(gòu)建一個表結(jié)構(gòu)對象,并把它賦予一個現(xiàn)有的 RDD件豌。盡管這種方式相對復(fù)雜疮方,但是能夠在無法得知運(yùn)行時類型的情況下創(chuàng)建 Dataset。

通過反射推斷表結(jié)構(gòu)

Spark SQL 的 Scala 接口自動支持將一個樣本類類型的 RDD 轉(zhuǎn)換為一個 DataFrame茧彤。樣本類定義了表結(jié)構(gòu)骡显,通過反射獲取類中的字段名稱并將其應(yīng)用為列名。樣本類可以嵌套,還可以包含像 SeqArray 這樣的復(fù)雜類型惫谤。該 RDD 會被隱式轉(zhuǎn)換為一個 DataFrame壁顶,之后可以注冊成一張表,該表可以通過 SQL 進(jìn)行查詢石挂。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」

編程指定表結(jié)構(gòu)

如果樣本類無法被實(shí)現(xiàn)創(chuàng)建(例如博助,一行數(shù)據(jù)以字符串的格式編碼,或者是需要被解析的文本類型的數(shù)據(jù)集痹愚,以及對于不同用戶來說需要抽取的字段不同),可以分三步編程創(chuàng)建一個 DataFrame 蛔糯。

  1. 通過一個 RDD 創(chuàng)建一個 Row 類型的 RDD拯腮;
  2. 創(chuàng)建一個 StructType 類型的表結(jié)構(gòu)對象,需要與第 1 步 Row 中的數(shù)據(jù)相對應(yīng)蚁飒;
  3. 通過 SparkSession 提供的 createDataFrame 方法將第 1 步生成的 RDD 和第 2 步生成的表結(jié)構(gòu)結(jié)合起來动壤。

例如:

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

標(biāo)量函數(shù)

標(biāo)量函數(shù)通過一行數(shù)據(jù)只返回一個單值淮逻,而不是像聚合函數(shù)那樣接收多行數(shù)據(jù)返回一個單值琼懊。Spark SQL 支持許多內(nèi)置標(biāo)量函數(shù)(Built-in Scalar Functions),同事也支持自定義標(biāo)量函數(shù)(User Defined Scalar Functions)爬早。

聚合函數(shù)

聚合函數(shù)接收多行數(shù)據(jù)返回一個單值哼丈。內(nèi)置的聚合函數(shù)(Built-in Aggregation Functions)提供了常見的聚合函數(shù),比如 count(), countDistinct(), avg(), max(), min() 等等筛严。用戶不用受預(yù)定義聚合函數(shù)的限制醉旦,可以定義自己的聚合函數(shù),詳情參見 User Defined Aggregate Functions桨啃。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末车胡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子照瘾,更是在濱河造成了極大的恐慌匈棘,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件析命,死亡現(xiàn)場離奇詭異主卫,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)碳却,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門队秩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人昼浦,你說我怎么就攤上這事馍资。” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵鸟蟹,是天一觀的道長乌妙。 經(jīng)常有香客問我,道長建钥,這世上最難降的妖魔是什么藤韵? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮熊经,結(jié)果婚禮上泽艘,老公的妹妹穿的比我還像新娘。我一直安慰自己镐依,他們只是感情好匹涮,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著槐壳,像睡著了一般然低。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上务唐,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天雳攘,我揣著相機(jī)與錄音,去河邊找鬼枫笛。 笑死吨灭,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的崇堰。 我是一名探鬼主播沃于,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼海诲!你這毒婦竟也來了繁莹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤特幔,失蹤者是張志新(化名)和其女友劉穎咨演,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蚯斯,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡薄风,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拍嵌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遭赂。...
    茶點(diǎn)故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖横辆,靈堂內(nèi)的尸體忽然破棺而出撇他,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布困肩,位于F島的核電站划纽,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏锌畸。R本人自食惡果不足惜勇劣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望潭枣。 院中可真熱鬧比默,春花似錦、人聲如沸盆犁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚣抗。三九已至,卻和暖如春瓮下,著一層夾襖步出監(jiān)牢的瞬間翰铡,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工讽坏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锭魔,地道東北人。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓路呜,卻偏偏與公主長得像迷捧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子胀葱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評論 2 361