目錄
一.Spark SQL簡介
二.Spark SQL的特點(diǎn)
三.基本概念:表:(Datasets或DataFrames)
????1.表 = 表結(jié)構(gòu) + 數(shù)據(jù)
????2.DataFrame
????3.Datasets
四.創(chuàng)建DataFrames
????1.第一種方式:使用case class樣本類創(chuàng)建DataFrames
????2.第二種方式:使用SparkSession
????3.方式三盒发,直接讀取一個帶格式的文件:Json
五.操作DataFrame
????1.DSL語句
????2.SQL語句
????3.多表查詢
六.視圖
????1.視圖是一個虛表,不存儲數(shù)據(jù)
????2.兩種類型視圖:
七.創(chuàng)建Datasets
????1.方式一:使用序列
????2.方式二:使用JSON數(shù)據(jù)
????3.方式三:使用其他數(shù)據(jù)(RDD的操作和DataFrame操作結(jié)合)
八.Datasets的操作案例
????1.使用emp.json 生成DataFrame
????2.多表查詢
一.Spark SQL簡介
????Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用堕汞。
????為什么要學(xué)習(xí)Spark SQL?Hive,它將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行,大大簡化了編寫MapReduce的程序的復(fù)雜性,由于MapReduce這種計算模型執(zhí)行效率比較慢腕柜。所以Spark SQL的應(yīng)運(yùn)而生,它是將Spark SQL轉(zhuǎn)換成RDD柳爽,然后提交到集群執(zhí)行媳握,執(zhí)行效率非常快磷脯!同時Spark SQL也支持從Hive中讀取數(shù)據(jù)蛾找。
二.Spark SQL的特點(diǎn):
1.容易整合(集成):
安裝Spark的時候,已經(jīng)集成好了赵誓。不需要單獨(dú)安裝
2.統(tǒng)一的數(shù)據(jù)訪問方式
JDBC打毛、JSON、Hive俩功、parquet文件(一種列式存儲文件幻枉,是SparkSQL默認(rèn)的數(shù)據(jù)源)
3.兼容Hive:
可以將Hive中的數(shù)據(jù),直接讀取到Spark SQL中處理诡蜓。
4.標(biāo)準(zhǔn)的數(shù)據(jù)連接:JDBC
三.基本概念:表:(Datasets或DataFrames)
1.表 = 表結(jié)構(gòu) + 數(shù)據(jù)
????DataFrame = Schema(表結(jié)構(gòu)) + RDD(代表數(shù)據(jù))
2.DataFrame
????DataFrame是組織成命名列的數(shù)據(jù)集熬甫。它在概念上等同于關(guān)系數(shù)據(jù)庫中的表,但在底層具有更豐富的優(yōu)化蔓罚。DataFrames可以從各種來源構(gòu)建椿肩,
例如:
- 結(jié)構(gòu)化數(shù)據(jù)文件
- hive中的表
- 外部數(shù)據(jù)庫或現(xiàn)有RDDs
DataFrame API支持的語言有Scala,Java豺谈,Python和R
????從上圖可以看出郑象,DataFrame多了數(shù)據(jù)的結(jié)構(gòu)信息茬末,即schema厂榛。RDD是分布式的 Java對象的集合击奶。DataFrame是分布式的Row對象的集合辐马。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點(diǎn)是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計劃的優(yōu)化
3.Datasets
????Dataset是數(shù)據(jù)的分布式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象溪王。它提供了RDD的優(yōu)點(diǎn)(強(qiáng)類型化腮鞍,使用強(qiáng)大的lambda函數(shù)的能力)以及Spark SQL優(yōu)化后的執(zhí)行引擎的優(yōu)點(diǎn)。一個Dataset 可以從JVM對象構(gòu)造在扰,然后使用函數(shù)轉(zhuǎn)換(map缕减, flatMap,filter等)去操作芒珠。 Dataset API 支持Scala和Java桥狡。 Python不支持Dataset API。
四.創(chuàng)建DataFrames
1.第一種方式:使用case class樣本類創(chuàng)建DataFrames
(1)定義表的Schema
注意:由于mgr和comm列中包含null值,簡單起見裹芝,將對應(yīng)的case class類型定義為String
scala> case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,depno:Int)
(2)讀入數(shù)據(jù)
emp.csv
7369,SMITH,CLERK,7902,1980-12-17,800,0,20
7499,MARIB,SALESMAN,7901,1980-11-17,900,300,30
7450,CLARK,SALESMAN,7900,1980-10-17,900,500,30
7379,JACKE,MANAGER,7903,1980-11-11,400,0,20
7343,LARRL,CLERK,7902,1980-12-13,500,1400,30
7312,NHGJJ,SALESMAN,7904,1980-12-14,100,0,30
7343,SHJOI,MANAGER,7905,1980-12-15,200,0,10
7390,WJKLJ,ANALYST,7906,1980-12-16,300,0,20
7377,UIHKL,SALESMAN,7907,1980-12-18,400,0,10
7388,VHKJK,CLERK,7908,1980-12-19,400,0,20
//從hdfs中讀入
scala> val lines = sc.textFile("hdfs://hadoop1:9000/emp.csv").map(_.split(","))
//從本地讀入
scala> val lines = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
/opt/module/datas/TestFile
(3)把每行數(shù)據(jù)映射到Emp中部逮。把表結(jié)構(gòu)和數(shù)據(jù),關(guān)聯(lián)嫂易。
scala> val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)生成DataFrame
scala> val allEmpDF = allEmp.toDF
//展示
scala> allEmpDF.show
2.第二種方式:使用SparkSession
(1)什么是SparkSession
????Apache Spark 2.0引入了SparkSession兄朋,其為用戶提供了一個統(tǒng)一的切入點(diǎn)來使用Spark的各項功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序怜械。最重要的是颅和,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互缕允。
????在2.0版本之前峡扩,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext。然而在Spark 2.0中障本,我們可以通過SparkSession來實(shí)現(xiàn)同樣的功能教届,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經(jīng)封裝在SparkSession中驾霜。
(2)使用StructType案训,來創(chuàng)建Schema
import org.apache.spark.sql.types._
val myschema = StructType(
List(
StructField("empno", DataTypes.IntegerType),
StructField("ename", DataTypes.StringType),
StructField("job", DataTypes.StringType),
StructField("mgr", DataTypes.IntegerType),
StructField("hiredate", DataTypes.StringType),
StructField("sal", DataTypes.IntegerType),
StructField("comm", DataTypes.IntegerType),
StructField("deptno", DataTypes.IntegerType)))
注意,需要:import org.apache.spark.sql.types._
(3)讀取文件:
val lines= sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
(4)數(shù)據(jù)與表結(jié)構(gòu)匹配
import org.apache.spark.sql.Row
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
注意粪糙,需要:import org.apache.spark.sql.Row
(5)創(chuàng)建DataFrames
val df2 = spark.createDataFrame(allEmp,myschema)
df2.show
3.方式三强霎,直接讀取一個帶格式的文件:Json
(1)讀取文件people.json:
people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
val df3 = spark.read.json("/opt/TestFolder/people.json")
df3.show
(2)另一種方式
val df4 = spark.read.format("json").load("/opt/TestFolder/people.json")
df4.show
五.操作DataFrame
DataFrame操作也稱為無類型的Dataset操作
1.DSL語句
(1)展示表與展示表結(jié)構(gòu)
//展示表
df1.show
//展示表結(jié)構(gòu)
df1.printSchema
(2)查詢
df1.select("ename","sal").show
df1.select($"ename",$"sal",$"sal"+100).show
(3)$代表 取出來以后,再做一些操作
df1.filter($"sal">2000).show
df1.groupBy($"depno").count.show
完整的例子猜旬,請參考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
2.SQL語句
注意:不能直接執(zhí)行sql脆栋。需要生成一個視圖,再執(zhí)行SQL洒擦。
(1)將DataFrame注冊成表(視圖):
df1.createOrReplaceTempView("emp")
(2)執(zhí)行查詢:
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select * from emp where depno=10").show
spark.sql("select depno,count(1) from emp group by depno").show
spark.sql("select depno,sum(sal) from emp group by depno").show
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.depno from emp12345 e").show
3.多表查詢
dept.csv文件內(nèi)容
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
命令
case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
val df2 = allDept.toDF
df2.create
df2.createOrReplaceTempView("dept")
spark.sql("select dname,ename from emp12345,dept where emp12345.depno=dept.deptno").show
六.視圖
1.視圖是一個虛表椿争,不存儲數(shù)據(jù)
2.兩種類型視圖:
(1)普通視圖(本地視圖):只在當(dāng)前Session有效
(2)全局視圖:在不同Session中都有用。全局視圖創(chuàng)建在命名空間中:global_temp 類似于一個庫熟嫩。
????上面使用的是一個在Session生命周期中的臨時views秦踪。在Spark SQL中,如果你想擁有一個臨時的view掸茅,并想在不同的Session中共享椅邓,而且在application的運(yùn)行周期內(nèi)可用,那么就需要創(chuàng)建一個全局的臨時view昧狮。并記得使用的時候加上global_temp作為前綴來引用它景馁,因為全局的臨時view是綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp上。
(a)創(chuàng)建一個普通的view和一個全局的view
df1.createOrReplaceTempView("emp1")
df1.createGlobalTempView("emp2")
(b)在當(dāng)前會話中執(zhí)行查詢逗鸣,均可查詢出結(jié)果合住。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
(c)開啟一個新的會話绰精,執(zhí)行同樣的查詢
spark.newSession.sql("select * from emp1").show //(運(yùn)行出錯)
spark.newSession.sql("select * from global_temp.emp2").show
七.創(chuàng)建Datasets
????DataFrame的引入,可以讓Spark更好的處理結(jié)構(gòu)數(shù)據(jù)的計算透葛,但其中一個主要的問題是:缺乏編譯時類型安全笨使。為了解決這個問題,Spark采用新的Dataset API (DataFrame API的類型擴(kuò)展)僚害。
????Dataset是一個分布式的數(shù)據(jù)收集器硫椰。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優(yōu)點(diǎn)(強(qiáng)類型萨蚕,可以使用功能強(qiáng)大的lambda)以及Spark SQL的執(zhí)行器高效性的優(yōu)點(diǎn)靶草。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)
1.方式一:使用序列
(1)定義case class
scala >case class MyData(a:Int,b:String)
(2).生成序列岳遥,并創(chuàng)建DataSet
scala >val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
(3).查看結(jié)果
scala >ds.show
ds.collect
2.方式二:使用JSON數(shù)據(jù)
(1)定義case class
case class Person(name: String, age: BigInt)
(2)通過JSON數(shù)據(jù)生成DataFrame
val df = spark.read.format("json").load("/opt/module/datas/TestFile/people.json")
(3)將DataFrame轉(zhuǎn)成DataSet
df.as[Person].show
df.as[Person].collect
3.方式三:使用其他數(shù)據(jù)(RDD的操作和DataFrame操作結(jié)合)
(1)需求:分詞爱致;查詢出長度大于3的單詞
(a)讀取數(shù)據(jù),并創(chuàng)建DataSet
val linesDS = spark.read.text("/opt/module/datas/TestFile/test_WordCount.txt").as[String]
(b)對DataSet進(jìn)行操作:分詞后寒随,查詢長度大于3的單詞
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
(2)需求:執(zhí)行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:
result.orderBy($"value").show
result.orderBy($"count(1)").show
八.Datasets的操作案例
1.使用emp.json 生成DataFrame
(1)數(shù)據(jù):emp.json
(2)使用emp.json 生成DataFrame
val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")
emp.show
查詢工資大于3000的員工
empDF.where($"sal" >= 3000).show
(3)創(chuàng)建case class,生成DataSets
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
val empDS = empDF.as[Emp]
(4)查詢數(shù)據(jù)
//查詢工資大于3000的員工
empDS.filter(_.sal > 3000).show
//查看10號部門的員工
empDS.filter(_.deptno == 10).show
2.多表查詢
(1)創(chuàng)建部門表
val deptRDD=sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS.show
(2)創(chuàng)建員工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
empDS.show
(3)執(zhí)行多表查詢:等值鏈接
val result = deptDS.join(empDS,"deptno")
result.show
(4)另一種寫法:注意有三個等號
val result1 = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
result1.show
joinWith和join的區(qū)別是連接后的新Dataset的schema會不一樣
(5)多表條件查詢:
val result = deptDS.join(empDS,"deptno").where("deptno==10")
result.show