Spark SQL:基礎(chǔ)

目錄
一.Spark SQL簡介
二.Spark SQL的特點(diǎn)
三.基本概念:表:(Datasets或DataFrames)
????1.表 = 表結(jié)構(gòu) + 數(shù)據(jù)
????2.DataFrame
????3.Datasets
四.創(chuàng)建DataFrames
????1.第一種方式:使用case class樣本類創(chuàng)建DataFrames
????2.第二種方式:使用SparkSession
????3.方式三盒发,直接讀取一個帶格式的文件:Json
五.操作DataFrame
????1.DSL語句
????2.SQL語句
????3.多表查詢
六.視圖
????1.視圖是一個虛表,不存儲數(shù)據(jù)
????2.兩種類型視圖:
七.創(chuàng)建Datasets
????1.方式一:使用序列
????2.方式二:使用JSON數(shù)據(jù)
????3.方式三:使用其他數(shù)據(jù)(RDD的操作和DataFrame操作結(jié)合)
八.Datasets的操作案例
????1.使用emp.json 生成DataFrame
????2.多表查詢

一.Spark SQL簡介

????Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用堕汞。
????為什么要學(xué)習(xí)Spark SQL?Hive,它將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行,大大簡化了編寫MapReduce的程序的復(fù)雜性,由于MapReduce這種計算模型執(zhí)行效率比較慢腕柜。所以Spark SQL的應(yīng)運(yùn)而生,它是將Spark SQL轉(zhuǎn)換成RDD柳爽,然后提交到集群執(zhí)行媳握,執(zhí)行效率非常快磷脯!同時Spark SQL也支持從Hive中讀取數(shù)據(jù)蛾找。

二.Spark SQL的特點(diǎn):

1.容易整合(集成):

安裝Spark的時候,已經(jīng)集成好了赵誓。不需要單獨(dú)安裝

2.統(tǒng)一的數(shù)據(jù)訪問方式

JDBC打毛、JSON、Hive俩功、parquet文件(一種列式存儲文件幻枉,是SparkSQL默認(rèn)的數(shù)據(jù)源)

3.兼容Hive:

可以將Hive中的數(shù)據(jù),直接讀取到Spark SQL中處理诡蜓。

4.標(biāo)準(zhǔn)的數(shù)據(jù)連接:JDBC

三.基本概念:表:(Datasets或DataFrames)

1.表 = 表結(jié)構(gòu) + 數(shù)據(jù)

????DataFrame = Schema(表結(jié)構(gòu)) + RDD(代表數(shù)據(jù))

2.DataFrame

????DataFrame是組織成命名列的數(shù)據(jù)集熬甫。它在概念上等同于關(guān)系數(shù)據(jù)庫中的表,但在底層具有更豐富的優(yōu)化蔓罚。DataFrames可以從各種來源構(gòu)建椿肩,

例如:

  • 結(jié)構(gòu)化數(shù)據(jù)文件
  • hive中的表
  • 外部數(shù)據(jù)庫或現(xiàn)有RDDs

DataFrame API支持的語言有Scala,Java豺谈,Python和R

????從上圖可以看出郑象,DataFrame多了數(shù)據(jù)的結(jié)構(gòu)信息茬末,即schema厂榛。RDD是分布式的 Java對象的集合击奶。DataFrame是分布式的Row對象的集合辐马。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點(diǎn)是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計劃的優(yōu)化

3.Datasets

????Dataset是數(shù)據(jù)的分布式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象溪王。它提供了RDD的優(yōu)點(diǎn)(強(qiáng)類型化腮鞍,使用強(qiáng)大的lambda函數(shù)的能力)以及Spark SQL優(yōu)化后的執(zhí)行引擎的優(yōu)點(diǎn)。一個Dataset 可以從JVM對象構(gòu)造在扰,然后使用函數(shù)轉(zhuǎn)換(map缕减, flatMap,filter等)去操作芒珠。 Dataset API 支持Scala和Java桥狡。 Python不支持Dataset API。

四.創(chuàng)建DataFrames

1.第一種方式:使用case class樣本類創(chuàng)建DataFrames

(1)定義表的Schema
注意:由于mgr和comm列中包含null值,簡單起見裹芝,將對應(yīng)的case class類型定義為String

scala> case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,depno:Int)

(2)讀入數(shù)據(jù)
emp.csv

7369,SMITH,CLERK,7902,1980-12-17,800,0,20
7499,MARIB,SALESMAN,7901,1980-11-17,900,300,30
7450,CLARK,SALESMAN,7900,1980-10-17,900,500,30
7379,JACKE,MANAGER,7903,1980-11-11,400,0,20
7343,LARRL,CLERK,7902,1980-12-13,500,1400,30
7312,NHGJJ,SALESMAN,7904,1980-12-14,100,0,30
7343,SHJOI,MANAGER,7905,1980-12-15,200,0,10
7390,WJKLJ,ANALYST,7906,1980-12-16,300,0,20
7377,UIHKL,SALESMAN,7907,1980-12-18,400,0,10
7388,VHKJK,CLERK,7908,1980-12-19,400,0,20
//從hdfs中讀入
scala> val lines = sc.textFile("hdfs://hadoop1:9000/emp.csv").map(_.split(","))

//從本地讀入
scala> val lines = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

/opt/module/datas/TestFile

(3)把每行數(shù)據(jù)映射到Emp中部逮。把表結(jié)構(gòu)和數(shù)據(jù),關(guān)聯(lián)嫂易。

scala> val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

(4)生成DataFrame

scala> val allEmpDF = allEmp.toDF

//展示 
scala> allEmpDF.show
2.第二種方式:使用SparkSession

(1)什么是SparkSession
????Apache Spark 2.0引入了SparkSession兄朋,其為用戶提供了一個統(tǒng)一的切入點(diǎn)來使用Spark的各項功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序怜械。最重要的是颅和,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互缕允。
????在2.0版本之前峡扩,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext。然而在Spark 2.0中障本,我們可以通過SparkSession來實(shí)現(xiàn)同樣的功能教届,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經(jīng)封裝在SparkSession中驾霜。

(2)使用StructType案训,來創(chuàng)建Schema

import org.apache.spark.sql.types._

val myschema = StructType(
                List(
                StructField("empno", DataTypes.IntegerType), 
                StructField("ename", DataTypes.StringType),
                StructField("job", DataTypes.StringType),
                StructField("mgr", DataTypes.IntegerType),
                StructField("hiredate", DataTypes.StringType),
                StructField("sal", DataTypes.IntegerType),
                StructField("comm", DataTypes.IntegerType),
                StructField("deptno", DataTypes.IntegerType)))

注意,需要:import org.apache.spark.sql.types._

(3)讀取文件:

val lines= sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

(4)數(shù)據(jù)與表結(jié)構(gòu)匹配

import org.apache.spark.sql.Row

val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

注意粪糙,需要:import org.apache.spark.sql.Row
(5)創(chuàng)建DataFrames

val df2 = spark.createDataFrame(allEmp,myschema)

df2.show
3.方式三强霎,直接讀取一個帶格式的文件:Json

(1)讀取文件people.json:
people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
val df3 = spark.read.json("/opt/TestFolder/people.json")

df3.show
image.png

(2)另一種方式

val df4 = spark.read.format("json").load("/opt/TestFolder/people.json")

df4.show

五.操作DataFrame

DataFrame操作也稱為無類型的Dataset操作

1.DSL語句

(1)展示表與展示表結(jié)構(gòu)

//展示表
df1.show
//展示表結(jié)構(gòu)
df1.printSchema

(2)查詢

df1.select("ename","sal").show

df1.select($"ename",$"sal",$"sal"+100).show

(3)$代表 取出來以后,再做一些操作

df1.filter($"sal">2000).show

df1.groupBy($"depno").count.show

完整的例子猜旬,請參考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset

2.SQL語句

注意:不能直接執(zhí)行sql脆栋。需要生成一個視圖,再執(zhí)行SQL洒擦。

(1)將DataFrame注冊成表(視圖):

df1.createOrReplaceTempView("emp")

(2)執(zhí)行查詢:

spark.sql("select * from emp").show

spark.sql("select * from emp where sal > 2000").show
spark.sql("select * from emp where depno=10").show

spark.sql("select depno,count(1) from emp group by depno").show 

spark.sql("select depno,sum(sal) from emp group by depno").show
image.png
df1.createOrReplaceTempView("emp12345")

spark.sql("select e.depno from emp12345 e").show
3.多表查詢

dept.csv文件內(nèi)容

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

命令

case class Dept(deptno:Int,dname:String,loc:String)

val lines = sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

val df2 = allDept.toDF

df2.create

df2.createOrReplaceTempView("dept")

spark.sql("select dname,ename from emp12345,dept where emp12345.depno=dept.deptno").show

六.視圖

1.視圖是一個虛表椿争,不存儲數(shù)據(jù)
2.兩種類型視圖:

(1)普通視圖(本地視圖):只在當(dāng)前Session有效

(2)全局視圖:在不同Session中都有用。全局視圖創(chuàng)建在命名空間中:global_temp 類似于一個庫熟嫩。
????上面使用的是一個在Session生命周期中的臨時views秦踪。在Spark SQL中,如果你想擁有一個臨時的view掸茅,并想在不同的Session中共享椅邓,而且在application的運(yùn)行周期內(nèi)可用,那么就需要創(chuàng)建一個全局的臨時view昧狮。并記得使用的時候加上global_temp作為前綴來引用它景馁,因為全局的臨時view是綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp上。
(a)創(chuàng)建一個普通的view和一個全局的view

df1.createOrReplaceTempView("emp1")

df1.createGlobalTempView("emp2")

(b)在當(dāng)前會話中執(zhí)行查詢逗鸣,均可查詢出結(jié)果合住。

spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show

(c)開啟一個新的會話绰精,執(zhí)行同樣的查詢

spark.newSession.sql("select * from emp1").show     //(運(yùn)行出錯)
spark.newSession.sql("select * from global_temp.emp2").show

七.創(chuàng)建Datasets

????DataFrame的引入,可以讓Spark更好的處理結(jié)構(gòu)數(shù)據(jù)的計算透葛,但其中一個主要的問題是:缺乏編譯時類型安全笨使。為了解決這個問題,Spark采用新的Dataset API (DataFrame API的類型擴(kuò)展)僚害。

????Dataset是一個分布式的數(shù)據(jù)收集器硫椰。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優(yōu)點(diǎn)(強(qiáng)類型萨蚕,可以使用功能強(qiáng)大的lambda)以及Spark SQL的執(zhí)行器高效性的優(yōu)點(diǎn)靶草。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)

1.方式一:使用序列

(1)定義case class

scala >case class MyData(a:Int,b:String)

(2).生成序列岳遥,并創(chuàng)建DataSet

scala >val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

(3).查看結(jié)果

scala >ds.show

ds.collect
2.方式二:使用JSON數(shù)據(jù)

(1)定義case class

case class Person(name: String, age: BigInt)

(2)通過JSON數(shù)據(jù)生成DataFrame

val df = spark.read.format("json").load("/opt/module/datas/TestFile/people.json")

(3)將DataFrame轉(zhuǎn)成DataSet

df.as[Person].show
df.as[Person].collect
3.方式三:使用其他數(shù)據(jù)(RDD的操作和DataFrame操作結(jié)合)

(1)需求:分詞爱致;查詢出長度大于3的單詞
(a)讀取數(shù)據(jù),并創(chuàng)建DataSet

val linesDS = spark.read.text("/opt/module/datas/TestFile/test_WordCount.txt").as[String]

(b)對DataSet進(jìn)行操作:分詞后寒随,查詢長度大于3的單詞

val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show

words.collect

(2)需求:執(zhí)行WordCount程序

val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

result.show

排序:

result.orderBy($"value").show

result.orderBy($"count(1)").show

八.Datasets的操作案例

1.使用emp.json 生成DataFrame

(1)數(shù)據(jù):emp.json

(2)使用emp.json 生成DataFrame

val empDF = spark.read.json("/opt/module/datas/TestFile/emp.json")

emp.show

查詢工資大于3000的員工

empDF.where($"sal" >= 3000).show

(3)創(chuàng)建case class,生成DataSets

case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

val empDS = empDF.as[Emp]

(4)查詢數(shù)據(jù)

//查詢工資大于3000的員工
empDS.filter(_.sal > 3000).show

//查看10號部門的員工 
empDS.filter(_.deptno == 10).show
2.多表查詢

(1)創(chuàng)建部門表

val deptRDD=sc.textFile("/opt/module/datas/TestFile/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

deptDS.show

(2)創(chuàng)建員工表

case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/opt/module/datas/TestFile/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

empDS.show

(3)執(zhí)行多表查詢:等值鏈接

val result = deptDS.join(empDS,"deptno")

result.show

(4)另一種寫法:注意有三個等號

val result1 = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

result1.show

joinWith和join的區(qū)別是連接后的新Dataset的schema會不一樣

(5)多表條件查詢:

val result = deptDS.join(empDS,"deptno").where("deptno==10") 

result.show
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市帮坚,隨后出現(xiàn)的幾起案子妻往,更是在濱河造成了極大的恐慌,老刑警劉巖试和,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讯泣,死亡現(xiàn)場離奇詭異,居然都是意外死亡阅悍,警方通過查閱死者的電腦和手機(jī)好渠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來节视,“玉大人拳锚,你說我怎么就攤上這事⊙靶校” “怎么了霍掺?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拌蜘。 經(jīng)常有香客問我杆烁,道長,這世上最難降的妖魔是什么简卧? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任兔魂,我火速辦了婚禮,結(jié)果婚禮上举娩,老公的妹妹穿的比我還像新娘析校。我一直安慰自己构罗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布勺良。 她就那樣靜靜地躺著绰播,像睡著了一般。 火紅的嫁衣襯著肌膚如雪尚困。 梳的紋絲不亂的頭發(fā)上蠢箩,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機(jī)與錄音事甜,去河邊找鬼谬泌。 笑死,一個胖子當(dāng)著我的面吹牛逻谦,可吹牛的內(nèi)容都是我干的掌实。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼邦马,長吁一口氣:“原來是場噩夢啊……” “哼贱鼻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起滋将,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤邻悬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后随闽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體父丰,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年掘宪,在試婚紗的時候發(fā)現(xiàn)自己被綠了蛾扇。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡魏滚,死狀恐怖镀首,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鼠次,我是刑警寧澤蘑斧,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站须眷,受9級特大地震影響竖瘾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜花颗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一捕传、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扩劝,春花似錦庸论、人聲如沸职辅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽域携。三九已至,卻和暖如春鱼喉,著一層夾襖步出監(jiān)牢的瞬間秀鞭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工扛禽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锋边,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓编曼,卻偏偏與公主長得像豆巨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子掐场,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評論 2 354