Spark SQL
Spark SQL的概述
Hive的誕生,主要是因為開發(fā)MapReduce程序對 Java 要求比較高肥缔,為了讓他們能夠操作HDFS上的數據,推出了Hive只锭。Hive與RDBMS的SQL模型比較類似谢澈,容易掌握。 Hive的主要缺陷在于它的底層是基于MapReduce的粗俱,執(zhí)行比較慢说榆。
在Spark 0.x版的時候推出了Shark,Shark與Hive是緊密關聯的,Shark底層很多東西還是依賴于Hive签财,修改了內存管理串慰、物理計劃、執(zhí)行三個模塊荠卷,底層使用Spark的基于內存的計算模型模庐,性能上比Hive提升了很多倍。
Shark更多是對Hive的改造油宜,替換了Hive的物理執(zhí)行引擎掂碱,提高了執(zhí)行速度。但Shark繼承了大量的Hive代碼慎冤,因此給優(yōu)化和維護帶來了大量的麻煩疼燥。
在Spark 1.x的時候Shark被淘汰。在2014 年7月1日的Spark Summit 上蚁堤, Databricks宣布終止對Shark的開發(fā)醉者,將重點放到 Spark SQL 上。
Shark終止以后披诗,產生了兩個分支:
- Hive on Spark
hive社區(qū)的撬即,源碼在hive中
- Spark SQL(Spark on Hive)
Spark社區(qū),源碼在Spark中呈队,支持多種數據源剥槐,多種優(yōu)化技術,擴展性好很多;Apache Spark 3.0.0解決超過3400個Jira問題被解決宪摧,這些問題在Spark各個核心組件中分布情況如下圖:
Spark SQL特點
Spark SQL自從面世以來不僅接過了shark的接力棒粒竖,為spark用戶提供高性能的SQL on hadoop的解決方案,還為spark帶來了通用的高效的几于,多元一體的結構化的數據 處理能力蕊苗。
Spark SQL 的優(yōu)勢
- 寫更少的代碼
- 讀更少的數據(SparkSQL的表數據在內存中存儲,不使用原生態(tài)JVM對象存儲方法沿彭,而是采用內存列存儲)
- 提供更好的性能(字節(jié)碼生成技術朽砰、SQL優(yōu)化)
Spark SQL數據抽象
Spark SQL提供了兩個新的抽象,分別是DataFrame 和Dataset膝蜈;同樣的數據都給到這三個數據結構锅移,經過系統的計算邏輯,都得到相同的結果饱搏。不同的是他們的執(zhí)行效率和執(zhí)行方式
在后期的Spark版本中非剃,Dataset會逐步取代RDD和DataFrame成為唯一的API接口
DataFrame
DataFrame的前身是SchemaRDD。Spark1.3更名為DataFrame推沸。不繼承RDD备绽,自己實現RDD的大部分功能券坞。
與RDD類似,DataFrame也是一個分布式數據集
- DataFrame可以看做分布式Row對象的集合肺素,提供了由列組成的詳細模式信息恨锚,使其可以得到優(yōu)化,DataFrame不僅有比RDD更多的算子倍靡,還可以進行執(zhí)行計劃的優(yōu)化
- DataFrame更像傳統數據庫的二維表格猴伶,除了數據以外,還記錄數據的結構信息塌西,即schema
- DataFrame也支持嵌套數據類型(struct他挎、array和Map)
- DataFrame API提供的是一套高層的關系操作,比函數式RDD API更加優(yōu)化捡需,門檻低
-
DataFrame的劣勢在于在編譯期缺少類型安全檢查办桨,導致運行時出錯。
Dataset
Dataset時在Spark1.6中添加的新接口站辉;與RDD相比呢撞,可以保存更多的描述信息,概念上等同于關系型數據庫中的二維表饰剥。與DataFrame相比殊霞,保存了類型信息,是強類型汰蓉,提供了編譯時檢查脓鹃。
調用Dataset的方法會生成邏輯計劃,然后Spark的優(yōu)化器進行優(yōu)化古沥,最終勝出無力計劃,然后提交到集群中運行娇跟。
Dataset包含了DataFrame的功能岩齿,在Spark2.0中兩者得到了統一,DataFrame表示為Dataset[Row]苞俘,即Dataset的子集
Row & Shcema
DataFrame = RDD[Row] + Schema; DataFrame 的前身的SchemaRDD
Row是一個泛化的無類型JVM對象
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val row1 = Row(1,"abc",1.2)
row1: org.apache.spark.sql.Row = [1,abc,1.2]
scala> row1(0)
res0: Any = 1
scala> row1(1)
res1: Any = abc
scala> row1(2)
res2: Any = 1.2
scala> row1.getInt(0)
res3: Int = 1
scala> row1.getString(1)
res4: String = abc
scala> row1.getDouble(2)
res6: Double = 1.2
scala> row1.getDouble(3)
java.lang.ArrayIndexOutOfBoundsException: 3
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row.isNullAt(Row.scala:191)
at org.apache.spark.sql.Row.isNullAt$(Row.scala:191)
at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
at org.apache.spark.sql.Row.getAnyValAs(Row.scala:472)
at org.apache.spark.sql.Row.getDouble(Row.scala:248)
at org.apache.spark.sql.Row.getDouble$(Row.scala:248)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:166)
... 49 elided
scala> row1.getAs[Int](0)
res7: Int = 1
scala> row1.getAs[String](1)
res8: String = abc
scala> row1.getAs[Double](2)
res9: Double = 1.2
DataFrame(即帶有Schema信息的RDD)盹沈,Spark通過Schema就能夠讀懂數據。
什么是schema吃谣?
DataFrame 中提供了詳細的數據結構信息乞封,從而使得SparkSQl可以清晰地知道該數據集中包含哪些列,每列的名稱和類型各是什么岗憋,DataFrame中的數據結構信息肃晚,即為schema。
[圖片上傳失敗...(image-64ce3e-1616052741093)]
import org.apache.spark.sql.types._
val schema = (new StructType).
add("id", "int", false).
add("name", "string", false).
add("height", "double", false)
參考源碼StructType.scala
import org.apache.spark.sql.types._
val schema1 = StructType( StructField("name", StringType, false)
:: StructField("age", IntegerType, false)
:: StructField("height", IntegerType,false) :: Nil)
val schema2 = StructType( Seq(StructField("name", StringType,false),
StructField("age", IntegerType,false),
StructField("height", IntegerType,false)))
val schema3 = StructType( List(StructField("name", StringType,false),
StructField("age", IntegerType,false),
StructField("height", IntegerType,false)))
// 來自源碼
val schema4 = (new StructType).
add(StructField("name", StringType, false)).
add(StructField("age", IntegerType, false)).
add(StructField("height", IntegerType, false))
val schema5 = (new StructType).
add("name", StringType, true, "comment1").
add("age", IntegerType, false, "comment2").
add("height", IntegerType, true, "comment3")
三者的共性
- RDD仔戈、DataFrame关串、Dataset都是Spark平臺下的分布式彈性數據集拧廊,為處理海量數據提供便利
- 三者都有許多相同的概念,如分區(qū)晋修、持久化吧碾、容錯多難過,有許多共同的函數墓卦,如map倦春、filter、sortBy等
- 三者都有惰性機制落剪,只有遇到Action算子時睁本,才會開始真正的計算
- 對DataFrame和Dataset進行操作,許多操作都是通過這個包來進行支持著榴,
import spark.implicits._
三者的區(qū)別
DataFrame(DataFrame = RDD[Row] + Schema):
- 與RDD和Dataset不同添履,DataFrame每一行的固定為Row,只有通過解析才能獲取各個字段的值脑又。
- DataFrame與Dataset均支持SparkSQL的操作
Dataset(Dataset = RDD[case class].toDS):
- Dataset和DataFrame擁有完全相同的成員函數暮胧,區(qū)別只是每一行的數據類型不同
- DataFrame第一位Dataset[Row]。每一行的類型是Row问麸。每一行究竟有哪些字段往衷,各個字段又是什么類型都無從得知,只能用前面提到的getAs方法或者模式匹配拿出特定字段
- Dataset每一行的數據類型都是一個case class严卖,在自定義了case class之后席舍,可以很自由的獲取每一行的信息
數據類型
Spark SQL 編程
SparkSession
DataFrame & Dataset的創(chuàng)建
不用刻意區(qū)分:DF和DS,因為DF是一種特殊的DS哮笆,而且當DS執(zhí)行某些transformation的時候會返回DF
有range生成DS
//創(chuàng)建一個DS
scala> val numDs = spark.range(5,1000,5)
numDs: org.apache.spark.sql.Dataset[Long] = [id: bigint]
//輸出DS的schema信息
scala> numDs.printSchema
root
|-- id: long (nullable = false)
//統計信息
scala> numDs.describe().show
+-------+------------------+
|summary| id|
+-------+------------------+
| count| 199|
| mean| 500.0|
| stddev|287.95254238618327|
| min| 5|
| max| 995|
+-------+------------------+
//與上面的統計信息一致
scala> numDs.rdd.map(_.toInt).stats
res14: org.apache.spark.util.StatCounter = (count: 199, mean: 500.000000, stdev: 287.228132, max: 995.000000, min: 5.000000)
// 按照id字段排序后顯示前五個
scala> numDs.orderBy(desc("id")).show(5)
+---+
| id|
+---+
|995|
|990|
|985|
|980|
|975|
+---+
only showing top 5 rows
//按照ID排序来颤,顯示20個,默認20個
scala> numDs.orderBy(desc("id")).show()
+---+
| id|
+---+
|995|
|990|
|985|
|980|
|975|
|970|
|965|
|960|
|955|
|950|
|945|
|940|
|935|
|930|
|925|
|920|
|915|
|910|
|905|
|900|
+---+
only showing top 20 rows
// 檢查分區(qū)數
scala> numDs.rdd.getNumPartitions
res17: Int = 5
由集合生成DS
Dataset = RDD[case class]
//
case class Person(name:String,age:Int,height:Int)
// 創(chuàng)建一個Person的集合
scala> val seq1 =Seq(Person("掌聲",12,180), Person("歷史",20,178),Person("wangw",30,160))
seq1: Seq[Person] = List(Person(掌聲,12,180), Person(歷史,20,178), Person(wangw,30,160))
//創(chuàng)建一個ds
scala> val ds1 = spark.createDataset(seq1)
ds1: org.apache.spark.sql.Dataset[Person] = [name: string, age: int ... 1 more field]
//顯示字段信息
scala> ds1.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- height: integer (nullable = false)
//顯示具體內容
scala> ds1.show
+-----+---+------+
| name|age|height|
+-----+---+------+
| 掌聲| 12| 180|
| 歷史| 20| 178|
|wangw| 30| 160|
+-----+---+------+
scala> val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
seq2: Seq[(String, Int, Int)] = List((Jack,28,184), (Tom,10,144), (Andy,16,165))
scala> val ds2 = spark.createDataset(seq2)
ds2: org.apache.spark.sql.Dataset[(String, Int, Int)] = [_1: string, _2: int ... 1 more field]
scala> ds2.show
+----+---+---+
| _1| _2| _3|
+----+---+---+
|Jack| 28|184|
| Tom| 10|144|
|Andy| 16|165|
+----+---+---+
由集合生成DataFrame
DataFrame = RDD[Row] + Schema
scala> val lst = List(("zhangsan",12,180.0),("lisi",15,177.9),("wangwu",20,180.1))
lst: List[(String, Int, Double)] = List((zhangsan,12,180.0), (lisi,15,177.9), (wangwu,20,180.1))
scala> val df1 = spark.createDataFrame(lst)
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: int ... 1 more field]
scala> df1.printSchema
root
|-- _1: string (nullable = true)
|-- _2: integer (nullable = false)
|-- _3: double (nullable = false)
//修改單個字段
scala> df1.withColumnRenamed("_1","name").
| withColumnRenamed("_2","age")
res5: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> df1.printSchema
root
|-- _1: string (nullable = true)
|-- _2: integer (nullable = false)
|-- _3: double (nullable = false)
scala> res5.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- _3: double (nullable = false)
// 在spark-shell中不需要處理稠肘,但是在IDEA中福铅,desc是函數,在IDEA中使用是需要導包
import org.apache.spark.sql.functions._
scala> res5.orderBy(desc("age")).show()
[Stage 0:> (0 + 0) / 2]20/11/02 22:25:58 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
+--------+---+-----+
| name|age| _3|
+--------+---+-----+
| wangwu| 20|180.1|
| lisi| 15|177.9|
|zhangsan| 12|180.0|
+--------+---+-----+
//修改整個DF的列名
scala> val df2 = spark.createDataFrame(lst).toDF("name","age","height")
df2: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> df2.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- height: double (nullable = false)
使用idea项阴,引入POM
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
代碼:
package com.hhb.spark.sql
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-02 22:34
**/
case class Person(name: String, age: Int, height: Int)
object Demo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")
import spark.implicits._
val arr1 = Array(("zhangsan", 18, 185), ("lisi", 10, 178), ("wangwu", 29, 177))
val rdd1 = sc.makeRDD(arr1).map(f => Row(f._1, f._2, f._3))
val schema1 = StructType(StructField("name", StringType, false)
:: StructField("age", IntegerType, false)
:: StructField("height", IntegerType, false)
:: Nil)
val schema2 = (new StructType).
add("name", StringType, false, "姓名").
add("age", IntegerType, false, "年齡").
add("height", IntegerType, false, "身高")
val dataFrame = spark.createDataFrame(rdd1, schema2)
//desc 是一個function滑黔,需要引入包
dataFrame.orderBy(desc("age")).show(1)
dataFrame.printSchema()
println("***" * 15)
val arr2 = Array(("zhangsan", 18, null), ("lisi", 10, 178), ("wangwu", 29, 177))
val rdd2 = sc.makeRDD(arr2).map(f => Row(f._1, f._2, f._3))
val df2 = spark.createDataFrame(rdd2, schema2)
df2.printSchema()
//執(zhí)行到這報錯,因為height不可以為空环揽,但是這里面有空值
// df2.show()
println("***" * 15)
val schema3 = (new StructType).
add("name", StringType, false, "姓名").
add("age", IntegerType, false, "年齡").
add("height", IntegerType, true, "身高")
val df3 = spark.createDataFrame(rdd2, schema3)
df3.printSchema()
df3.show()
println("***" * 15)
val rdd3 = sc.makeRDD(arr1).map(f => Person(f._1, f._2, f._3))
//反射推斷略荡,spark通過反射從case class 的定義得到類名
val df = rdd3.toDF()
//發(fā)射推斷
val ds = rdd3.toDS()
df.printSchema()
df.show()
println("***" * 15)
ds.printSchema()
ds.show()
}
}
RDD轉Dataset
Dataset = RDD[case class]
DataFrame= RDD[Row] + schema
val ds2 = spark.createDataset(rdd3)
ds2.printSchema()
從文件創(chuàng)建DataFrame(以csv文件為例)
//讀取文件創(chuàng)建DF
val df4 = spark.read.csv("/Users/baiwang/myproject/spark/data/people1.csv")
df4.printSchema()
df4.show()
println("***" * 15)
val df5 = spark.read.csv("/Users/baiwang/myproject/spark/data/people2.csv")
df5.printSchema()
df5.show()
println("****" * 15)
//inferschema 自動類型推斷,適用于簡單的數據類型
//header:表頭
val df6 = spark.read
.options(Map(("header", "true"), ("inferschema", "true")))
.csv("/Users/baiwang/myproject/spark/data/people1.csv")
df6.printSchema()
df6.show()
println("***" * 15)
//復雜類型是歉胶,不能使用自動類型推斷
val schema = "name string,age int,job string"
val df7 = spark.read
.option("header", "true")
//設置csv的分隔符
.option("delimiter", ";")
.schema(schema)
.csv("/Users/baiwang/myproject/spark/data/people2.csv")
df7.printSchema()
df7.show()
三者的轉換
SparkSQL提供了一個領域特定語言(DSL)以方便操作結構化數據汛兜。核心思想還是SQL,僅僅是一個語法的問題
Action操作
與RDD類似的操作:show跨扮、collect序无、collectAsList验毡、head、first帝嗡、count晶通、take、takeAsList哟玷、reduce
與結構類似的操作:printSchema狮辽、explain、columns巢寡、dtypes必孤、col
package com.hhb.spark.sql
import java.util
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* @description:
* @date: 2020-11-03 13:41
**/
object ActionDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")
import spark.implicits._
val df = spark.read
//表頭
.option("header", "true")
//自動類型推斷
.option("inferschema", "true")
.csv("/Users/baiwang/myproject/spark/data/emp.dat")
df.printSchema()
df.show()
println(s"df.count() => ${df.count()}")
//并集此再,顯示前20個
df.union(df).show()
println(s"df.union(df) => ${df.union(df).count()}")
//顯示前兩個
df.show(2)
//將df轉換成json肠牲,并顯示前十條豫喧,且不截斷字符
df.toJSON.show(10, false)
//顯示前十個,不截斷
spark.catalog.listFunctions().show(10, false)
//返回的是數組
val rows: Array[Row] = df.collect()
println(s"row:${rows}")
//f返回的是list
val rowList: util.List[Row] = df.collectAsList()
println(s"rowList:${rowList}")
//返回第一行谦絮,返回值是row類型
val rowHead: Row = df.head()
val rowFirst: Row = df.first()
println(s"rowHead:$rowHead,rowFirst:$rowFirst")
//返回前三行题诵,返回值類型是Array
val rowsHead: Array[Row] = df.head(3)
val rowsTake: Array[Row] = df.take(3)
println(s"rowsHead:$rowsHead,rowsTake:$rowsTake")
//返回前兩行,返回值類型是list
val rowsTakeList: util.List[Row] = df.takeAsList(2)
println(s"rowsTakeList:$rowsTakeList")
//結構屬性:
println("查看列名:" + df.columns)
println("查看列名和類型:" + df.dtypes)
println("查看執(zhí)行計劃:" + df.explain())
println("獲取某個列:" + df.col("EMPNO"))
df.printSchema()
}
}
Transformation 操作
select * from tab where ... group by ... having... order by...
RDD類似的操作
持久化层皱、緩存與checkpoint
select
where
group by /聚合
order by
join
集合操作
空值操作(函數)
函數
與RDD類似的操作
map性锭、filter、flatMap叫胖、mapPartitions草冈、sample、 randomSplit瓮增、 limit怎棱、 distinct、dropDuplicates绷跑、describe
package com.hhb.spark.sql
import org.apache.spark.sql.{Dataset, Row, SparkSession}
/**
* @description:
* @date: 2020-11-03 14:09
**/
object TransformationDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")
import spark.implicits._
val df = spark.read
//表頭
.option("header", "true")
//自動類型推斷
.option("inferschema", "true")
.csv("/Users/baiwang/myproject/spark/data/emp.dat")
df.printSchema()
//輸出第一列
df.map(row => row.getAs[Int](0)).show()
df.map(_.getInt(0)).show()
df.map(_.get(0).toString.toInt).show()
//randomSplit(與RDD類似蹄殃,將DF、DS切割成傳入的比例你踩,該比例為一個大概值)
val arrayDS: Array[Dataset[Row]] = df.randomSplit(Array(0.5, 0.6, 0.7))
println(s"${arrayDS(0).count()} ,,,${arrayDS(0).show()}")
println(s"${arrayDS(1).count()} ,,,${arrayDS(1).show()}")
println(s"${arrayDS(2).count()} ,,,${arrayDS(2).show()}")
println("***" * 15)
//取出10行數據,生成新的Dataset
val ds: Dataset[Row] = df.limit(10)
ds.show()
println("***" * 15)
//distinct 去重
val dfDis = df.union(df)
dfDis.distinct().show()
//dropDuplicates讳苦,按列值去重
println("***" * 15)
dfDis.dropDuplicates.show()
//按照這兩個列去重
dfDis.dropDuplicates("mgr", "deptno").show()
//返回全部列的統計信息(count带膜、mean、stddev鸳谜、min膝藕、max)
ds.describe().show()
//返回指定列的統計
ds.describe("sal").show()
ds.describe("sal", "comm").show()
sc.stop()
spark.close()
}
}
存儲相關
cacheTable、persist咐扭、checkpoint芭挽、unpersist滑废、cache
備注:Dataset默認存儲級別MEMORY_AND_DISK
package com.hhb.spark.sql
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* @description:
* @date: 2020-11-03 14:09
**/
object TransformationCacheDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")
sc.setCheckpointDir("hdfs://linux121:9000/spark-test/checkpoint")
import spark.implicits._
val df = spark.read
//表頭
.option("header", "true")
//自動類型推斷
.option("inferschema", "true")
.csv("/Users/baiwang/myproject/spark/data/emp.dat")
df.show()
df.checkpoint()
df.cache()
df.persist(StorageLevel.MEMORY_ONLY)
println(df.count())
df.unpersist(true)
df.createOrReplaceTempView("t1")
spark.sql("select * from t1").show()
spark.catalog.cacheTable("t1")
spark.catalog.uncacheTable("t1")
sc.stop()
spark.close()
}
}
select 相關
列的多種表示、select袜爪、selectExpr蠕趁、drop、withColumn辛馆、withColumnRenamed俺陋、cast(內置函數)
val df = spark.read.
option("header", "true").
option("inferschema", "true").
csv("/spark-test/data/emp.dat")
//查詢ename、sal兩列昙篙,有下面五種寫法腊状。但是不能混用
scala> df.select($"ename",$"sal").show
scala> df.select("ename","sal").show
scala> df.select('ename,'sal).show
scala> df.select(col("ename"),col("sal")).show
scala> df.select(df("ename"),df("sal")).show
//查詢ename、sal兩列,并對sal列+1000苔可,下面兩種無效
scala> df.select("ename","sal+1000").show
scala> df.select("ename","sal"+1000).show
//查詢ename缴挖、sal兩列,并對sal列+1000
scala> df.select($"ename",$"sal" + 1000).show
scala> df.select('ename,'sal + 1000).show
// 可使用expr表達式(expr里面只能使用引號)
scala> df.select(expr("comm+100"),expr("sal + 100"),expr("ename")).show
scala> df.select(expr("nvl(comm,0)+100"),expr("sal + 100"),expr("ename")).show
scala> df.selectExpr("ename as name").show
scala> df.selectExpr("power(sal,2) as newSal","sal").show
scala> df.selectExpr("round(sal, -3) as newsal", "sal", "ename").show
//drop 刪除一列或多列,得到新的DF
scala> df.drop("mgr").show
scala> df.drop("empno","mgr").show
//withColumn,修改列值
scala> df.withColumn("sal",$"sal" + 1000).show
scala> df.withColumnRenamed("sal","newSal").show
// 備注:drop焚辅、withColumn映屋、withColumnRenamed返回的是DF
//cast 類型轉換
scala> df.selectExpr("cast(empno as string)").printSchema
scala> import org.apache.spark.sql.types._
scala> df.select('empno.cast(StringType)).printSchema
where相關
where == filter
// filter操作
scala> df.filter("sal > 2000").show
scala> df.filter("sal > 2000 and job = 'MANAGER'").show
scala> df.filter("sal > 2000 and job == 'MANAGER'").show
//where 操作
scala> df.where("sal > 2000").show
scala> df.where("sal > 2000 and job == 'MANAGER'").show
scala> df.where("sal > 2000 and job = 'MANAGER'").show
groupBy相關
groupBy、agg法焰、max秧荆、min、avg埃仪、sum乙濒、count(后面5個為內置函數)
// groupBy、max卵蛉、min颁股、mean、sum傻丝、count(與df1.count不同)
scala> df.groupBy("job").sum("sal").show
scala> df.groupBy("job").max("sal").show
scala> df.groupBy("job").min("sal").show
scala> df.groupBy("job").avg("sal").show
scala> df.groupBy("job").count.show
//類似having字句
scala> df.groupBy("job").avg("sal").where("avg(sal) > 2000").show
scala> df.groupBy("job").avg("sal").where($"avg(sal)" > 2000).show
//agg
scala> df.groupBy("job").agg("sal" -> "max","sal" -> "min","sal" -> "avg").show
scala> df.groupBy("job").agg(max("sal"),min("sal"),avg("sal")).show
//給列取別名
scala> df.groupBy("job").agg("sal" -> "max","sal" -> "min","sal" -> "avg").withColumnRenamed("min(sal)","minSal").show
scala> df.groupBy("job").agg(max("sal"),min("sal").as("min1"),avg("sal")).show
orderBy相關
orderBy == sort
//升序
scala> df.orderBy("sal").show
scala> df.orderBy($"sal").show
scala> df.orderBy($"sal".asc).show
scala> df.orderBy('sal).show
scala> df.orderBy(col("sal")).show
scala> df.orderBy(df("sal")).show
//降序
scala> df.orderBy(-$"sal").show
scala> df.orderBy($"sal".desc).show
scala> df.orderBy($"sal".desc,-'deptno).show
//sort
scala> df.sort("sal").show
scala> df.sort($"sal").show
scala> df.sort($"sal".asc).show
scala> df.sort('sal).show
scala> df.sort(col("sal")).show
scala> df.sort(df("sal")).show
//降序
scala> df.sort(-$"sal").show
scala> df.sort($"sal".desc).show
scala> df.sort($"sal".desc,-'deptno).show
join相關
//笛卡爾積
scala> df.crossJoin(df).show
//等值鏈接甘有,單字段,鏈接的字段為empno葡缰,在結果中亏掀,只顯示一次
scala> df.join(df,"empno").show
//等值鏈接,
scala> df.join(df,Seq("empno","ename")).show
scala> df.join(df,List("empno","ename")).show
// 定義第一個數據集
case class StudentAge(sno: Int, name: String, age: Int)
val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))
val ds1 = spark.createDataset(lst)
ds1.show()
// 定義第二個數據集
case class StudentHeight(sname: String, height: Int)
val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))
val ds2 = rdd.toDS
//使用非相同字段名稱作為鏈接條件
scala> ds1.join(ds2,$"name" === $"sname").show
scala> ds1.join(ds2,$"sname" === $"name").show
scala> ds1.join(ds2,'name === 'sname).show
scala> ds1.join(ds2,ds1("name") === ds2("sname")).show
//內鏈接泛释,默認即內鏈接
scala> ds1.join(ds2,ds1("name") === ds2("sname"),"inner").show
//左外鏈接
scala> ds1.join(ds2,ds1("name") === ds2("sname"),"left").show
scala> ds1.join(ds2,ds1("name") === ds2("sname"),"left_outer").show
//右外鏈接
scala> ds1.join(ds2,$"name" === $"sname","right").show
scala> ds1.join(ds2,$"name" === $"sname","right_outer").show
//全外鏈接
scala> ds1.join(ds2,$"name" === $"sname","full").show
scala> ds1.join(ds2,$"name" === $"sname","full_outer").show
備注:DS在join操作之后變成的DF
集合相關
union滤愕、unionAll(過期)、intersect怜校、except
val ds3 = ds1.select("name")
ds3: org.apache.spark.sql.DataFrame = [name: string]
scala> val ds4 = ds2.select("sname")
ds4: org.apache.spark.sql.DataFrame = [sname: string]
//并集间影。不去重
scala> ds3.union(ds3).show
//并集(過期)不去重,不建議使用
scala> ds3.unionAll(ds3).show
warning: there was one deprecation warning (since 2.0.0); for details, enable `:setting -deprecation' or `:replay -deprecation'
//求交
scala> ds3.intersect(ds4).show
//求差
scala> ds3.except(ds4).show
空值處理
na.fill 填充茄茁、na.drop 刪除
//NaN(not a number)
scala> math.sqrt(-1.0)
res12: Double = NaN
scala> math.sqrt(-1.0).isNaN()
res14: Boolean = true
//顯示所有列不含空的行
scala> df.na.drop.show
//顯示mgr列不為空的行
scala> df.na.drop(Array("mgr")).show
//對全部的列進行填充
scala> df.na.fill(100).show
//對單列進行填充
scala> df.na.fill(100,Array("comm")).show
//對多列進行填充
scala> df.na.fill(Map("mgr" -> -1,"comm" -> -2)).show
//對指定列的指定值進行替換
scala> df.na.replace(List("comm","deptno"),Map(0 -> 9999,10 -> 10000)).show
// 查詢空值列或非空值列魂贬。isNull巩割、isNotNull為內置函數
df.filter("comm is null").show
df.filter($"comm".isNull).show
df.filter(col("comm").isNull).show
df.filter("comm is not null").show
df.filter(col("comm").isNotNull).show
窗口函數
一半情況下,窗口函數不用DSL處理付燥,直接使用SQL更方便宣谈,參考源碼Window.scala、WindowSpec.scala(主要)
import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("cookieid").orderBy("createtime")
val w2 = Window.partitionBy("cookieid").orderBy("pv")
val w3 = w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w4 = w1.rowsBetween(-1, 1)
// 聚組函數【用分析函數的數據集】
df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show df.select($"cookieid", $"pv", sum("pv").over(w3).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w4).as("pv1")).show
// 排名
df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show df.select($"cookieid", $"pv", dense_rank().over(w2).alias("denserank")).show df.select($"cookieid", $"pv", row_number().over(w2).alias("rownumber")).show
// lag机蔗、lead
df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("rownumber")).show df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("rownumber")).show
內建函數
http://spark.apache.org/docs/latest/api/sql/index.html
SQL語句
總體而言:SparkSQL與HQL兼容;與HQL相比蒲祈,SparkSQL更簡潔。createTempView萝嘁、createOrReplaceTempView梆掸、spark.sql("SQL")
package com.hhb.spark.sql
import org.apache.spark.sql.SparkSession
/**
* @description:
* @date: 2020-11-04 16:46
**/
case class Info(id: String, tags: String)
object SQLDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName.init)
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
import spark.implicits._
val arr = Array("1 1,2,3", "2 2,3", "3 1,3")
val rdd = spark.sparkContext.makeRDD(arr)
.map {
x =>
Info(x.split("\\s+")(0), x.split("\\s+")(1))
}
val ds = rdd.toDS()
ds.createOrReplaceTempView("info")
spark.sql(
"""
|select * from info
|""".stripMargin).show()
println("**" * 15)
spark.sql(
"""
|select
| id,tag
|from
| info lateral view explode(split(tags,",")) t as tag
|
|""".stripMargin).show()
println("**" * 15)
spark.sql(
"""
|select
| id,explode(split(tags,",")) as tag
|from
| info
|""".stripMargin).show()
spark.close()
}
}
輸入與輸出
SparkSQL內建支持的數據源包括:Parquet、JSON牙言、CSV酸钦、Avro、Images咱枉、BinaryFiles(Spark3.0)卑硫,其中Parquet是默認數據源。
// 內部使用
DataFrameReader.format(args).option("key", "value").schema(args).load()
// 開發(fā)API
SparkSession.read
Parquet文件
package com.hhb.spark.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-04 20:06
**/
object InputOutputDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
//直接使用命令加載文件
val parquetDF: DataFrame = spark.read.load("data/users.parquet")
parquetDF.show()
//加載文件并創(chuàng)建成users
spark.sql(
"""
|create or replace temporary view users
|using parquet
|options(path "data/users.parquet")
|""".stripMargin)
spark.sql(
"""
|select * from users
|""".stripMargin).show()
//輸出文件
parquetDF.write.format("parquet")
.mode("overwrite") // 寫文件的方式
.option("compression", "none") // 是否壓縮
.save("data/parquet") // 保存的路徑
spark.sql(
"""
|select * from users
|""".stripMargin).write.format("parquet")
.mode("append")
.option("compression", "none")
.save("data/parquet")
spark.close()
}
}
json文件
package com.hhb.spark.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @description:
* @date: 2020-11-04 20:15
**/
object JsonFileDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
//直接使用命令加載文件
val jsonDF: DataFrame = spark.read.format("json").load("data/emp.json")
jsonDF.show(false)
//使用SQL加載文件
spark.sql(
"""
|create or replace temporary view emp
|using json
|options(path "data/emp.json")
|""".stripMargin)
spark.sql(
"""
|select * from emp
|""".stripMargin).show()
//使用命令導出文件
jsonDF.write.format("json")
.mode("overwrite")
.save("data/json")
//使用SQL導出文件
spark.sql(
"""
|select * from emp
|""".stripMargin).write.format("json")
.mode("overwrite")
.save("data/json1")
spark.close()
}
}
CSV文件
package com.hhb.spark.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-04 20:22
**/
object CSVFileDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
//直接使用命令加載文件
val csvDF = spark.read.format("csv")
.options(Map("header" -> "true", "inferschema" -> "true", "delimiter" -> ";"))
.load("/Users/baiwang/myproject/spark/data/people2.csv")
csvDF.show()
csvDF.printSchema()
//使用SQL加載文件
spark.sql(
"""
|create or replace temporary view people
|using csv
|options(path "/Users/baiwang/myproject/spark/data/people2.csv",
| header "true",
| inferschema "true",
| delimiter ";"
| )
|""".stripMargin)
spark.sql(
"""
|select * from people
|""".stripMargin).show()
//使用命令導出文件
csvDF.write.format("csv")
.option("delimiter", "|")
.option("header", "true")
.mode("overwrite")
.save("data/csv")
//使用SQL導出文件
spark.sql(
"""
|select * from people
|""".stripMargin).write.format("csv")
.option("delimiter", "|")
.option("header", "true")
.mode("append")
.save("data/csv")
spark.close()
}
}
JDBC
package com.hhb.spark.sql
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-05 09:21
**/
object JdbcDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
//加載
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
.option("user", "hive")
.option("password", "12345678")
// .option("dbtable", "test")
.option("driver", "com.mysql.jdbc.Driver")
.option("query", "select * from test where total >= 10")
.load()
jdbcDF.show()
//導出
jdbcDF.write.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
.option("user", "hive")
.option("password", "12345678")
.option("dbtable", "test_bak")
.option("driver", "com.mysql.jdbc.Driver")
.mode(SaveMode.Append)
.save()
//加載
spark.read.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
.option("user", "hive")
.option("password", "12345678")
.option("dbtable", "test_bak")
.option("driver", "com.mysql.jdbc.Driver")
// .option("query", "select * from test_bak")
.load().show()
spark.close()
}
}
備注:如果有中文蚕断,需要注意表的字符集欢伏,否則會亂嗎
- SaveMode.ErrorIfExists(默認)。若表存在亿乳,則會直接報異常硝拧,數據不能存入數據庫
- SaveMode.Append。若表存在葛假,則追加在該表中;若該表不存在障陶,則會先創(chuàng)建表,再插入數據
- SaveMode.Overwrite聊训。先將已有的表及其數據全都刪除抱究,再重新創(chuàng)建該表,最后插入新的數據
- SaveMode.Ignore带斑。若表不存在鼓寺,則創(chuàng)建表并存入數據;若表存在,直接跳過數據的存儲勋磕,不會報錯
-- 創(chuàng)建表
create table lagou_product_info_back as select * from lagou_product_info;
-- 檢查表的字符集
show create table lagou_product_info_back;
show create table lagou_product_info;
-- 修改表的字符集
alter table lagou_product_info_back convert to character set utf8;
UDF & UDAF
UDF
UDF(User Defined Function),自定義函數侄刽。函數的輸入輸出都是一條記錄,類似于Spark-SQL中普通的數學或字符串函數朋凉。實現上看就是一個普通的Scala函數;
UDAF(User Defined Aggregation Function),用戶自定義聚合函數醋安。函數本身作用于數據集合杂彭,能夠在聚合操作的基礎上進行自定義操作(多條數據輸入墓毒,一條數據輸出)類似于group by之后使用的sum、avg等函數
用Scala編寫的UDF與普通的scala函數幾乎沒有任何區(qū)別亲怠,唯一需要多執(zhí)行的一個步驟是要在SQLContext注冊所计。
def len(bookTitle: String):Int = bookTitle.length
spark.udf.register("len", len _)
val booksWithLongTitle = spark.sql("select title, author from books where len(title) > 10")
編寫的UDF可以放到SQL語句的fields部分,也可以作為where团秽、groupBy或者 having子句的一部分主胧。也可以在使用UDF時,傳入常量而非表的列名习勤。稍稍修改一下前面的函數踪栋,讓長度 10作為函數的參數傳入:
def lengthLongerThan(bookTitle: String, length: Int): Boolean = bookTitle.length > length
spark.udf.register("longLength", lengthLongerThan _)
val booksWithLongTitle = spark.sql("select title, author from books where longLength(title, 10)")
若使用DataFrame的API,則以字符串的形式將UDF傳入:
val booksWithLongTitle = dataFrame.filter("longLength(title, 10)")
DataFrame的API也可以接收Column對象图毕,可以用是定義在 SQLImplicits 對象中的一個隱式轉換。此時予颤,UDF的定義也不 相同囤官,不能直接定義Scala函數,而是要用定義在org.apache.spark.sql.functions中 的 udf 方法來接收一個函數蛤虐。這種方式無需register:
import org.apache.spark.sql.functions._
val longLength = udf((bookTitle: String, length: Int) => bookTitle.length > length)
import spark.implicits._
val booksWithLongTitle = dataFrame.filter(longLength($"title", lit(10)))
完整示例:
package com.hhb.spark.sql
import org.apache.spark.sql.{Row, SparkSession}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-05 10:24
**/
object UDFDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName.init)
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
//準備數據
val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6"))
val df = spark.createDataFrame(data).toDF("title", "author")
df.createTempView("book")
//定義函數并注冊
def len(title: String) = title.length
spark.udf.register("len", len _)
spark.sql(
"""
|select title,author,len(title) as l from book
|""".stripMargin).show()
spark.sql(
"""
|select * from book where len(title) > 5
|""".stripMargin).show()
import spark.implicits._
df.filter("len(title) > 5").show()
//無法編譯
// df.where(len($"title") > 5)
// 如果要在DSL語法中使用$符號包裹字符串表示一個Column党饮,需要用udf方法來接 收函數。這種函數無需注冊
import org.apache.spark.sql.functions._
val len2 = udf(len _)
df.where(len2($"title") > 5).show()
//不能使用udf
df.map { case Row(title: String, author: String) =>
(title, author, title.length)
}.show()
spark.close()
}
}
UDAF
數據如下:
id, name, sales, discount, state, saleDate
1, "Widget Co",1000.00,0.00, "AZ","2019-01-01"
2, "Acme Widgets",2000.00,500.00,"CA","2019-02-01"
3, "Widgetry",1000.00,200.00, "CA","2020-01-11"
4, "Widgets R Us",2000.00,0.0,"CA","2020-02-19"
5, "Ye Olde Widgete",3000.00,0.0,"MA","2020-02-28"
最后要得到的結果為:
(2020年的合計值 – 2019年的合計值) / 2019年的合計值 (6000 - 3000) / 3000 = 1
執(zhí)行以下SQL得到最終的結果:
select userFunc(sales, saleDate) from table1;
即計算邏輯在userFunc中實現
普通的UDF不支持數據的聚合運算驳庭。如當要對銷售數據執(zhí)行年度同比計算刑顺,就需要對當年和上一年的銷量分別求和,然后再利用公式進行計算嚷掠。此時需要使用UDAF捏检,Spark為所有的UDAF定義一個父類, UserDefinedAffregateFunction不皆。要繼承這個類贯城,需要實現父類的幾個抽象方法:
- inputSchema用于定義與DataFrame列有關的輸入樣式
- bufferSchema用于定義存儲聚合運算時產生的中間數據結果的Schema
- dataType標明了UDAF函數的返回值類型
- deterministic是一個布爾值,用以標記針對給定的一組輸入霹娄,UDAF是否總是生 成相同的結果
- initialize對聚合運算中間結果的初始化
- update函數的第一個參數為bufferSchema中兩個Field的索引能犯,默認以0開始; UDAF的核心計算都發(fā)生在update函數中;update函數的第二個參數input: Row對應的并非DataFrame的行,而是被inputSchema投影了的行
- merge函數負責合并兩個聚合運算的buffer犬耻,再將其存儲到 MutableAggregationBuffer中
- evaluate函數完成對聚合Buffer值的運算踩晶,得到最終的結果
UDAF-類型不安全
package com.hhb.spark.sql
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-05 10:55
**/
class TypeUnsafeUDAF extends UserDefinedAggregateFunction {
//輸入類型
override def inputSchema: StructType = new StructType().add("salDate", StringType).add("sal", DoubleType)
//緩存的數據類型
override def bufferSchema: StructType = new StructType().add("year2019", DoubleType).add("year2020", DoubleType)
//返回值類型
override def dataType: DataType = DoubleType
// 布爾值,用以標記針對給定的一組輸入枕磁,UDAF是否總是生成相同的結果渡蜻。通常用 true
override def deterministic: Boolean = true
// 初始化的初值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.00
buffer(1) = 0.00
}
//分區(qū)內合并
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val year = input.getString(0).take(4)
val sal = input.getAs[Double](1)
year match {
case "2019" => buffer(0) = buffer.getDouble(0) + sal
case "2020" => buffer(1) = buffer.getDouble(1) + sal
case _ => println("ERROR")
}
}
//分區(qū)間合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
}
//最終結果 (2020年的合計值 – 2019年的合計值) / 2019年的合計值 (6000 - 3000) / 3000
override def evaluate(buffer: Row): Any = {
if (buffer.getDouble(0) < 0.0000000001) 0.00
else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0)
}
}
object TypeUnsafeUDAFDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName.init)
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
val sales = Seq(
(1, "Widget Co", 1000.00, 0.00, "AZ", "2019-01-01"),
(2, "Acme Widgets", 2000.00, 500.00, "CA", "2019-02-01"),
(3, "Widgetry", 1000.00, 200.00, "CA", "2020-01-11"),
(4, "Widgets R Us", 2000.00, 0.0, "CA", "2020-02-19"),
(5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2020-02-28")
)
val df = spark.createDataFrame(sales).toDF("id", "name", "sal", "total", "state", "salDate")
df.createTempView("sales")
spark.udf.register("userFunc", new TypeUnsafeUDAF())
spark.sql(
"""
|select userFunc(salDate,sal) from sales
|""".stripMargin).show()
spark.close()
}
}
UDAF-類型安全
package com.hhb.spark.sql
import org.apache.spark.sql.{Column, Encoder, Encoders, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
/**
* @description:
* @author: huanghongbo
* @date: 2020-11-05 11:30
**/
case class Sales(id: Int, name: String, sal: Double, total: Double, state: String, salDate: String)
case class SaleBuffer(var sal2019: Double, var sal2020: Double)
class TypeSafeUDAF extends Aggregator[Sales, SaleBuffer, Double] {
//初始值
override def zero: SaleBuffer = SaleBuffer(0.00, 0.00)
//分區(qū)內計算
override def reduce(b: SaleBuffer, a: Sales): SaleBuffer = {
val year = a.salDate.take(4)
val sal = a.sal
year match {
case "2019" => b.sal2019 = b.sal2019 + sal
case "2020" => b.sal2020 = b.sal2020 + sal
case _ => println("ERROR")
}
b
}
//分區(qū)間計算
override def merge(b1: SaleBuffer, b2: SaleBuffer): SaleBuffer = {
b1.sal2019 = b1.sal2019 + b2.sal2019
b1.sal2020 = b2.sal2020 + b1.sal2020
b1
}
//最終結果
override def finish(reduction: SaleBuffer): Double = {
if (reduction.sal2019 < 0.000000001) 0.00
else (reduction.sal2020 - reduction.sal2019) / reduction.sal2019
}
//編碼器,對象編碼器是固定的
override def bufferEncoder: Encoder[SaleBuffer] = Encoders.product
//編碼器
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
object TypeSafeUDAFDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName.init)
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
val sales = Seq(
Sales(1, "Widget Co", 1000.00, 0.00, "AZ", "2019-01-01"),
Sales(2, "Acme Widgets", 2000.00, 500.00, "CA", "2019-02-01"),
Sales(3, "Widgetry", 1000.00, 200.00, "CA", "2020-01-11"),
Sales(4, "Widgets R Us", 2000.00, 0.0, "CA", "2020-02-19"),
Sales(5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2020-02-28")
)
import spark.implicits._
val ds = spark.createDataset(sales)
ds.show()
val rate: TypedColumn[Sales, Double] = new TypeSafeUDAF().toColumn.name("rate")
ds.select(rate).show()
spark.close()
}
}
訪問Hive
在 pom 文件中增加依賴:
<!-- spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.1</version>
</dependency>
在resources中增加hive-site.xml文件,在文件中增加內容:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- hive元數據的存儲位置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://linux123:3306/hivemetadata?createDatabaseIfNotExist=true&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<!-- 指定驅動程序 -->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<!-- 連接數據庫的用戶名 -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<!-- 連接數據庫的口令 -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>12345678</value>
<description>password to use against metastore database</description>
</property>
<!-- 數據默認的存儲位置(HDFS) -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<!-- 在命令行中,顯示當前操作的數據庫 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
<description>Whether to include the current database in the Hive prompt.</description>
</property>
<!-- 在命令行中茸苇,顯示數據的表頭 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- 操作小規(guī)模數據時排苍,使用本地模式,提高效率 -->
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
<description>Let Hive determine whether to run in local mode automatically</description>
</property>
<!--指定metastore地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://linux121:9083,thrift://linux123:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>3600</value>
</property>
</configuration>
備注:最好使用 metastore service 連接Hive;使用直連 metastore 的方式時学密, SparkSQL程序會修改 Hive 的版本信息;默認Spark使用 Hive 1.2.1進行編譯淘衙,包含對應的serde, udf, udaf等。
package com.hhb.spark.sql
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* @description:
* @date: 2020-11-05 13:37
**/
object AccessHive {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Demo1").master("local[2]")
.enableHiveSupport()
// Spark使用與Hive相同的約定寫parquet數據
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")
spark.sql("show databases").show
spark.sql(
"""
| use ads
|""".stripMargin)
spark.sql("show tables").show()
val df: DataFrame = spark.sql(
"""
|select * from ads_ad_show
|""".stripMargin)
df.write
.mode(SaveMode.Append)
.saveAsTable("")
spark.close()
}
}
Spark SQL原理
SparkSQL中的join
數據分析中將兩個數據集進行join操作是很常見的場景腻暮。在Spark 物理計劃階段彤守,Spark的Join Selection 類會根據Join hints策略、join表的大小哭靖、Join是等值join還是不等值以及參與Join的key是否可以排序等條件來選擇最終的Join的策略具垫,最后Spark會利用選擇好的Join策略執(zhí)行最終的計算,當前Spark一共支持五種Join策略
- Broadcast hash join (BHJ)
- Shuffle hash join(SHJ)
- Shuffle sort merge join (SMJ)
- Shuffle-and-replicate nested loop join款青,又稱笛卡爾積(Cartesian product join)
- Broadcast nested loop join (BNLJ)
其中 BHJ 和 SMJ 這兩種 Join 策略是我們運行 Spark 作業(yè)最常見的做修。JoinSelection 會先根據 Join 的 Key 為等值 Join 來選擇 Broadcast hash join、Shuffle hash join 以及 Shuffle sort merge join 中的一個;如果 Join 的 Key 為不等值 Join 或者沒有指定 Join 條件抡草,則會選擇 Broadcast nested loop join 或 Shuffle-and-replicate nested loop join饰及。
不同的 Join 策略在執(zhí)行上效率差別很大,了解每種 Join 策略的執(zhí)行過程和適用條件是很有必要的康震。
Broadcast Hash Join
Broadcast Hash Join 的實現是將小表的數據廣播到 Spark 所有的 Executor 端燎含,這個廣播過程和我們自己去廣播數據沒什么區(qū)別:
利用 collect 算子將小表的數據從 Executor 端拉到 Driver 端
在 Driver 端調用 sparkContext.broadcast 廣播到所有 Executor 端
在 Executor 端使用廣播的數據與大表進行 Join 操作(實際上是執(zhí)行map操作)
這種 Join 策略避免了 Shuffle 操作。一般而言腿短,Broadcast Hash Join 會比其他 Join 策略執(zhí)行的要快屏箍。
使用這種Join策略必須滿足以下條件:
- 小表的數據必須很小,可以通過spark.sql.autoBroadcastJoinThrdshold參數來配置橘忱,默認是10M
- 如果內存比較大赴魁,可以將閾值適當加大
- 將spark.sql.autoBroadcastJoinThrdshold參數設置為-1,可以關閉這種鏈接方式
- 只能用于等值join钝诚,不要求參與join的keys可排序
Shuffle Hash Join
當表中的數據比較大颖御,又不適合廣播,這個時候就可以考慮使用Shuffle Hash Join凝颇。
Shuffle Hash Join 同樣是在大表和小表進行join的時候選擇的一種策略潘拱,他的計算思想是:把大表和小表按照相同的分區(qū)算法和分區(qū)數進行分區(qū)(根據參與Join的keys進行分區(qū))這樣就保證了Hash值一樣的數據分發(fā)到同一個分區(qū)中,然后在同一個Executor中兩張表hash值一樣的分區(qū)就可以在本地進行hash join了拧略。在進行Join之前芦岂,還會對小表的分區(qū)構建Hash Map,Shuffle hash Join 利用了分治思想垫蛆,把大問題拆解成小問題去解決禽最。
要啟用Shuffle Hash join 必須滿足以下條件:
- 僅支持等值Join腺怯,不要求參與Join的keys可排序
- spark.sql.join.preferSortMergeJoin參數必須設置為false,參數是從spark2.0.0版本引入的川无,默認是true瓢喉,也就是默認情況下選擇Sort Merge Join
- 小表的大小(plan.stats.sizeInBytes)必須小于 spark.sql.autoBroadcastJoinThreshold * Spark.sql.shuffle.partitions(默認值 200)
- 而且小表大幸ㄍ浮(stats.sizeInBytes)的三倍必須小于等于大表的大小(stats.sizeInBytes),也就是a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
Shuffle Sort Merge Join
前面兩種join策略對表的大小都是有條件的决左,如果參與的join的表都很大愕够,這時候就得考慮Shuffle Sort Merge Join了。
Shuffle Sort Merge Join的實現思想:
- 將兩張表按照Join Key進行shuffle佛猛,保證join key 值相同的記錄回被分為相應的分區(qū)惑芭。
- 對每個分區(qū)內的數據進行排序
- 排序后在對對應的分區(qū)內的記錄進行連接
無論分區(qū)有多大,Sort Merge Join都不用把一側的數據全部加載到內存中继找,而是即用即丟遂跟;因為兩個序列都有序。從頭遍歷婴渡,遇到key相同的就輸出幻锁,如果不同,左邊小就繼續(xù)取左邊边臼,反之取右邊哄尔。從而大大提高了大數據量下sql join的穩(wěn)定性。
要啟用Shuffle Sort Merge Join 必須滿足以下條件:
- 僅支持等值Join柠并,而且要求參與Join的keys可排序
Cartesian Product Join
如果 Spark 中兩張參與 Join 的表沒指定連接條件岭接,那么會產生Cartesian product join,這個 Join 得到的結果其實就是兩張表行數的乘積臼予。
Broadcast Nested loop Join
可以把 Broadcast nested loop join 的執(zhí)行看做下面的計算:
for record_1 in relation_1:
for record_2 in relation_2:
# join condition is executed
可以看出 Broadcast nested loop join 在某些情況會對某張表重復掃描多次鸣戴,效率非常低下。從名字可以看出粘拾,這種 join 會根據相關條件對小表進行廣播窄锅,以減少表的掃描次數。
Broadcast nested loop join 支持等值和不等值 Join半哟,支持所有的 Join 類型酬滤。
SQL解析過程
Spark SQL 可以說是 Spark 中的精華部分。原來基于 RDD 構建大數據計算任務寓涨,重心在向 DataSet 轉移盯串,原來基于 RDD 寫的代碼也在遷移。使用 Spark SQL 編碼好處是非常大的戒良,尤其是在性能方面体捏,有很大提升。Spark SQL 中各 種內嵌的性能優(yōu)化比寫 RDD 遵守各種最佳實踐更靠譜的,尤其對新手來說几缭。如先 filter 操作再 map 操作河泳,Spark SQL 中會自動進行謂詞下推;Spark SQL中會自動使用 broadcast join 來廣播小表,把 shuffle join 轉化為 map join 等等年栓。
Spark SQL對SQL語句的處理和關系型數據庫類似拆挥,即詞法/語法解析、綁定某抓、優(yōu)化纸兔、執(zhí)行。Spark SQL會先將SQL語 句解析成一棵樹否副,然后使用規(guī)則(Rule)對Tree進行綁定汉矿、優(yōu)化等處理過程。Spark SQL由Core备禀、Catalyst洲拇、Hive、 Hive-ThriftServer四部分構成:
- Core: 負責處理數據的輸入和輸出曲尸,如獲取數據赋续,查詢結果輸出成DataFrame等
- Catalyst: 負責處理整個查詢過程,包括解析队腐、綁定蚕捉、優(yōu)化等
- Hive: 負責對Hive數據進行處理
- Hive-ThriftServer: 主要用于對Hive的訪問
Spark SQL的代碼復雜度是問題的本質復雜度帶來的,Spark SQL中的 Catalyst 框架大部分邏輯是在一個 Tree 類型 的數據結構上做各種折騰柴淘,基于 Scala 來實現還是很優(yōu)雅的迫淹,Scala 的偏函數和強大的 Case 正則匹配,讓整個代碼 看起來非常優(yōu)雅为严。
SparkSession 是編寫 Spark 應用代碼的入口敛熬,啟動一個 spark-shell 會提供給你一個創(chuàng)建 SparkSession, 這個對象 是整個 Spark 應用的起始點第股。以下是 SparkSession 的一些重要的變量和方法:
object Plan {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Demo1")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
import spark.implicits._
Seq((0, "zhansan", 10),
(1, "lisi", 11),
(2, "wangwu", 12)).toDF("id", "name", "age").createOrReplaceTempView("stu")
Seq((0, "chinese", 80), (0, "math", 100), (0, "english", 98),
(1, "chinese", 86), (1, "math", 97), (1, "english", 90),
(2, "chinese", 90), (2, "math", 94), (2, "english", 88)
).toDF("id", "subject", "score").createOrReplaceTempView("score")
val df: DataFrame = spark.sql(
"""
|select sum(v), name
| from (select stu.id, 100 + 10 + score.score as v, name
| from stu join score
| where stu.id = score.id and stu.age >= 11) tmp
|group by name
|""".stripMargin)
df.show()
// 打印執(zhí)行計劃
println(df.queryExecution)
println(df.queryExecution.optimizedPlan)
spark.close()
}
}
queryExecution 就是整個執(zhí)行計劃的執(zhí)行引擎应民,里面有執(zhí)行過程中各個中間過程變量,整個執(zhí)行流程如下:
上面例子中的 SQL 語句經過 Parser 解析后就會變成一個抽象語法樹夕吻,對應解析后的邏輯計劃 AST 為:
== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias `tmp`
+- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#26, 'name]
+- 'Filter (('stu.id = 'score.id) && ('stu.age >= 11))
+- 'Join Inner
:- 'UnresolvedRelation `stu`
+- 'UnresolvedRelation `score`
備注:在執(zhí)行計劃中 Project/Projection 代表的意思是投影
選投連三種最基本的操作
其中過濾條件變?yōu)榱?Filter 節(jié)點诲锹,這個節(jié)點是 UnaryNode(一元節(jié)點) 類型, 只有一個孩子涉馅。兩個表中的數據變?yōu)榱?UnresolvedRelation 節(jié)點达址,節(jié)點類型為 LeafNode 蒋腮,即葉子節(jié)點, JOIN 操作為節(jié)點构拳, 這個是一個 BinaryNode 節(jié) 點篇亭,有兩個孩子。
以上節(jié)點都是 LogicalPlan 類型的, 可以理解為進行各種操作的 Operator, SparkSQL 對各種操作定義了各種 Operator朱灿。
這些 operator 組成的抽象語法樹就是整個 Catatyst 優(yōu)化的基礎,Catatyst 優(yōu)化器會在這個樹上面進行各種折騰钠四,把 樹上面的節(jié)點挪來挪去來進行優(yōu)化盗扒。
經過 Parser 有了抽象語法樹,但是并不知道 score缀去,sum 這些東西是啥环疼,所以就需要 analyer 來定位。
analyzer 會把 AST 上所有 Unresolved 的東西都轉變?yōu)?resolved 狀態(tài)朵耕,SparkSQL 有很多resolve 規(guī)則:
- ResolverRelations。解析表(列)的基本類型等信息
- ResolveFuncions淋叶。解析出來函數的基本信息
- ResolveReferences阎曹。解析引用,通常是解析列名
== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS sum(v)#28L, name#8]
+- SubqueryAlias `tmp`
+- Project [id#7, ((100 + 10) + score#22) AS v#26, name#8]
+- Filter ((id#7 = id#20) && (age#9 >= 11))
+- Join Inner
:- SubqueryAlias `stu`
: +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS age#9]
: +- LocalRelation [_1#3, _2#4, _3#5]
+- SubqueryAlias `score`
+- Project [_1#16 AS id#20, _2#17 AS subject#21, _3#18 AS score#22]
+- LocalRelation [_1#16, _2#17, _3#18]
下面要進行邏輯優(yōu)化了煞檩,常見的邏輯優(yōu)化有:
== Optimized Logical Plan ==
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS sum(v)#28L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]
+- Join Inner, (id#7 = id#20)
:- LocalRelation [id#7, name#8]
+- LocalRelation [id#20, score#22]
這里用到的優(yōu)化有:謂詞下推(Push Down Predicate)处嫌、常量折疊(Constant Folding)、字段裁剪(Columning Pruning)
做完邏輯優(yōu)化斟湃,還需要先轉換為物理執(zhí)行計劃熏迹,將邏輯上可行的執(zhí)行計劃變?yōu)?Spark 可以真正執(zhí)行的計劃:
SparkSQL 把邏輯節(jié)點轉換為了相應的物理節(jié)點, 比如 Join 算子凝赛,Spark 根據不同場景為該算子制定了不同的算法 策略注暗。
== Physical Plan ==
*(2) HashAggregate(keys=[name#8], functions=[sum(cast(v#26 as bigint))], output=[sum(v)#28L, name#8])
+- Exchange hashpartitioning(name#8, 200)
+- *(1) HashAggregate(keys=[name#8], functions=[partial_sum(cast(v#26 as bigint))], output=[name#8, sum#38L])
+- *(1) Project [(110 + score#22) AS v#26, name#8]
+- *(1) BroadcastHashJoin [id#7], [id#20], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- LocalTableScan [id#7, name#8]
+- LocalTableScan [id#20, score#22]
Aggregate [name#8], [sum(cast(v#26 as bigint)) AS sum(v)#28L, name#8]
+- Project [(110 + score#22) AS v#26, name#8]
+- Join Inner, (id#7 = id#20)
:- LocalRelation [id#7, name#8]
+- LocalRelation [id#20, score#22]
數據在一個一個的 plan 中流轉,然后每個 plan 里面表達式都會對數據進行處理墓猎,就相當于經過了一個個小函數的調 用處理捆昏,這里面有大量的函數調用開銷。是不是可以把這些小函數內聯一下毙沾,當成一個大函數骗卜, WholeStageCodegen 就是干這事的∽蟀可以看到最終執(zhí)行計劃每個節(jié)點前面有個 * 號寇仓,說明整段代碼生成被啟用, Project烤宙、BroadcastHashJoin遍烦、HashAggregate 這一段都啟用了整段代碼生成,級聯為了大函數门烂。