一肄程、什么是Spark SQL
Spark SQL是Spark用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊咐蚯,它提供了兩個(gè)編程抽象分別叫做DataFrame和DataSet,它們用于作為分布式SQL查詢引擎。從下圖可以查看RDD潭苞、DataFrames與DataSet的關(guān)系。
二痴腌、為什么要學(xué)習(xí)Spark SQL雌团?
我們已經(jīng)學(xué)習(xí)了Hive,它是將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行士聪,大大簡(jiǎn)化了編寫(xiě)MapReduce的程序的復(fù)雜性锦援,由于MapReduce這種計(jì)算模型執(zhí)行效率比較慢。所以Spark SQL的應(yīng)運(yùn)而生剥悟,它是將Spark SQL轉(zhuǎn)換成RDD灵寺,然后提交到集群執(zhí)行,執(zhí)行效率非城冢快略板!所以我們類比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD慈缔。都是一種解析傳統(tǒng)SQL到大數(shù)據(jù)運(yùn)算模型的引擎叮称,屬于數(shù)據(jù)分析的范圍。
三藐鹤、什么是DataFrame和DataSet?
首先瓤檐,最簡(jiǎn)單的理解我們可以認(rèn)為DataFrame就是Spark中的數(shù)據(jù)表(類比傳統(tǒng)數(shù)據(jù)庫(kù)),DataFrame的結(jié)構(gòu)如下:
DataFrame(表)= Schema(表結(jié)構(gòu)) + Data(表數(shù)據(jù))
總結(jié):DataFrame(表)是Spark SQL對(duì)結(jié)構(gòu)化數(shù)據(jù)的抽象娱节【嗨В可以將DataFrame看做RDD。
DataFrame
DataFrame是組織成命名列的數(shù)據(jù)集括堤。它在概念上等同于關(guān)系數(shù)據(jù)庫(kù)中的表碌秸,但在底層具有更豐富的優(yōu)化。DataFrames可以從各種來(lái)源構(gòu)建悄窃,
例如:
- 結(jié)構(gòu)化數(shù)據(jù)文件(JSON)
- 外部數(shù)據(jù)庫(kù)或現(xiàn)有RDDs
DataFrame API支持的語(yǔ)言有Scala讥电,Java,Python和R轧抗。
從上圖可以看出恩敌,DataFrame相比RDD多了數(shù)據(jù)的結(jié)構(gòu)信息,即schema横媚。RDD是分布式的 Java對(duì)象的集合纠炮。DataFrame是分布式的Row對(duì)象的集合。DataFrame除了提供了比RDD更豐富的算子以外灯蝴,更重要的特點(diǎn)是提升執(zhí)行效率恢口、減少數(shù)據(jù)讀取以及執(zhí)行計(jì)劃的優(yōu)化。
DataSet
Dataset是數(shù)據(jù)的分布式集合穷躁。Dataset是在Spark 1.6中添加的一個(gè)新接口耕肩,是DataFrame之上更高一級(jí)的抽象。它提供了RDD的優(yōu)點(diǎn)(強(qiáng)類型化)以及Spark SQL優(yōu)化后的執(zhí)行引擎的優(yōu)點(diǎn)。一個(gè)Dataset 可以從JVM對(duì)象構(gòu)造猿诸,然后使用函數(shù)轉(zhuǎn)換(map婚被, flatMap,filter等)去操作梳虽。 Dataset API 支持Scala和Java址芯。 Python不支持Dataset API。
四窜觉、測(cè)試數(shù)據(jù)
我們使用2個(gè)csv文件作為部分測(cè)試數(shù)據(jù):
dept.csv信息:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp.csv信息:
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
將這2個(gè)csv文件put到HDFS的hdfs://bigdata111:9000/input/csvFiles/目錄以便后面使用
[root@bigdata111 ~]# hdfs dfs -ls /input/csvFiles
Found 2 items
-rw-r--r-- 1 root supergroup 84 2018-06-15 13:40 /input/csvFiles/dept.csv
-rw-r--r-- 1 root supergroup 617 2018-06-15 13:40 /input/csvFiles/emp.csv
五谷炸、創(chuàng)建DataFrame
前提:在集群模式下啟動(dòng)spark-shell:bin/spark-shell --master spark://bigdata111:7077
方式1:使用case class定義表
(1) 定義case class代表表的結(jié)構(gòu)schema
scala>case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
(2) 導(dǎo)入emp.csv文件(導(dǎo)入數(shù)據(jù))
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//讀取Linux本地?cái)?shù)據(jù)
或者
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/csvFiles/emp.csv").map(_.split(","))//讀取HDFS數(shù)據(jù)
(3) 生成表: DataFrame
scala>val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(4)由allEmp直接生成表
scala>val empDF = allEmp.toDF
(4) 操作: DSL語(yǔ)句
scala>empDF.show ----> select * from emp
scala>empDF.printSchema ----> desc emp
操作結(jié)果:
方式2:使用SparkSession對(duì)象創(chuàng)建DataFrame
什么是SparkSession?
Apache Spark 2.0引入了SparkSession,其為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來(lái)使用Spark的各項(xiàng)功能竖螃,并且允許用戶通過(guò)它調(diào)用DataFrame和Dataset相關(guān)API來(lái)編寫(xiě)Spark程序淑廊。最重要的是逗余,它減少了用戶需要了解的一些概念特咆,使得我們可以很容易地與Spark交互。
在2.0版本之前录粱,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext腻格。然而在Spark 2.0中,我們可以通過(guò)SparkSession來(lái)實(shí)現(xiàn)同樣的功能啥繁,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext菜职,因?yàn)檫@些對(duì)象已經(jīng)封裝在SparkSession中。
通過(guò)SparkSession可以訪問(wèn)Spark所有的模塊!
使用Sparksession創(chuàng)建DataFrame過(guò)程:
(2)加載結(jié)構(gòu)化數(shù)據(jù)
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//讀取Linux數(shù)據(jù)
或者
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//讀取HDFS數(shù)據(jù)
(3) 定義schema:StructType
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
, StructField("ename", DataTypes.StringType)
,StructField("job", DataTypes.StringType)
,StructField("mgr", DataTypes.StringType)
,StructField("hiredate", DataTypes.StringType)
,StructField("sal", DataTypes.IntegerType)
,StructField("comm", DataTypes.StringType)
,StructField("deptno", DataTypes.IntegerType)))
(4)把讀入的每一行數(shù)據(jù)映射成一個(gè)個(gè)Row
scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
(5) 使用SparkSession.createDataFrame創(chuàng)建表
scala>val df = spark.createDataFrame(rowRDD,myschema)
可以看到df支持的函數(shù)很多旗闽,其實(shí)就是RDD的算子酬核。這里也可以看出DF很像一個(gè)RDD。
方式3:直接讀取格式化的文件(json,csv)等-最簡(jiǎn)單
前提:數(shù)據(jù)文件本身一定具有格式,這里我們選取json格式的數(shù)據(jù)适室,json文件可以使用spark例子中提供的people.json嫡意。你也可以使用任意json文件進(jìn)行操作。
測(cè)試數(shù)據(jù)如下:
[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc kv1.txt people.json people.txt user.avsc users.avro users.parquet
[root@bigdata111 resources]# more people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
使用SparkSession對(duì)象直接讀取Json文件
spark>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
創(chuàng)建完畢DF之后就可以直接查看表的信息捣辆,十分的簡(jiǎn)單:
六蔬螟、操作DataFrame(DSL+SQL)
DataFrame操作也稱為無(wú)類型的Dataset操作.操作的DataFrame是方法1創(chuàng)建的empDF.
>1.DSL(domain-specific language)操作DataFrame
1.查看所有的員工信息===selec * from empDF;
scala>empDF.show
2.查詢所有的員工姓名 ($符號(hào)添加不加功能一樣)===select ename,deptno from empDF;
scala>empDF.select("ename","deptno").show
scala>empDF.select([圖片上傳失敗...(image-583b02-1552188416500)]
"deptno").show
3.查詢所有的員工姓名和薪水,并給薪水加100塊錢(qián)===select ename,sal,sal+100 from empDF;
scala>empDF.select([圖片上傳失敗...(image-596cc8-1552188416500)]
"sal",$"sal"+100).show
4.查詢工資大于2000的員工===select * from empDF where sal>2000;
scala>empDF.filter($"sal" > 2000).show
5.分組===select deptno,count(*) from empDF group by deptno;
scala>empDF.groupBy([圖片上傳失敗...(image-6d049f-1552188416500)]
"deptno").avg().show
scala>empDF.groupBy($"deptno").max().show
2.SQL操作DataFrame
(1)前提條件:需要把DataFrame注冊(cè)成是一個(gè)Table或者View
scala>empDF.createOrReplaceTempView("emp")
(2)使用SparkSession執(zhí)行從查詢
scala>spark.sql("select * from emp").show
scala>spark.sql("select * from emp where deptno=10").show
(3)求每個(gè)部門(mén)的工資總額
scala>spark.sql("select deptno,sum(sal) from emp group by deptno").show
七汽畴、視圖(臨時(shí)和全局視圖)
在使用SQL操作DataFrame的時(shí)候旧巾,有一個(gè)前提就是必須通過(guò)DF創(chuàng)建一個(gè)表或者視圖:empDF.createOrReplaceTempView("emp")
在SparkSQL中,如果你想擁有一個(gè)臨時(shí)的view忍些,并想在不同的Session中共享鲁猩,而且在application的運(yùn)行周期內(nèi)可用,那么就需要?jiǎng)?chuàng)建一個(gè)全局的臨時(shí)view罢坝。并記得使用的時(shí)候加上global_temp作為前綴來(lái)引用它绳匀,因?yàn)槿值呐R時(shí)view是綁定到系統(tǒng)保留的數(shù)據(jù)庫(kù)global_temp上。
① 創(chuàng)建一個(gè)普通的view和一個(gè)全局的view
scala>empDF.createOrReplaceTempView("emp1")
scala>empDF.createGlobalTempView("emp2")
② 在當(dāng)前會(huì)話中執(zhí)行查詢,均可查詢出結(jié)果疾棵。
scala>spark.sql("select * from emp1").show
scala>spark.sql("select * from global_temp.emp2").show
③ 開(kāi)啟一個(gè)新的會(huì)話戈钢,執(zhí)行同樣的查詢
scala>spark.newSession.sql("select * from emp1").show (運(yùn)行出錯(cuò))
scala>spark.newSession.sql("select * from global_temp.emp2").show
八、使用數(shù)據(jù)源
在介紹parquet文件的時(shí)候我們使用的是Spark例子文件夾中提供的users.parquet文件:
[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc kv1.txt people.json people.txt temp user.avsc users.avro users.parquet
1是尔、通用的Load/Save函數(shù)
(*)什么是parquet文件殉了?
Parquet是列式存儲(chǔ)格式的一種文件類型,列式存儲(chǔ)有以下的核心:
- 可以跳過(guò)不符合條件的數(shù)據(jù)拟枚,只讀取需要的數(shù)據(jù)薪铜,降低IO數(shù)據(jù)量。
- 壓縮編碼可以降低磁盤(pán)存儲(chǔ)空間恩溅。由于同一列的數(shù)據(jù)類型是一樣的隔箍,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進(jìn)一步節(jié)約存儲(chǔ)空間。
- 只讀取需要的列脚乡,支持向量運(yùn)算蜒滩,能夠獲取更好的掃描性能。
Parquet格式是Spark SQL的默認(rèn)數(shù)據(jù)源奶稠,可通過(guò)spark.sql.sources.default配置
(*)通用的Load/Save函數(shù)
- load函數(shù)讀取Parquet文件:scala>val userDF = spark.read.load("hdfs://bigdata111:9000/input/users.parquet")
對(duì)比如下語(yǔ)句:
scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")
查詢Schema和數(shù)據(jù):scala>userDF.show
- save函數(shù)保存數(shù)據(jù)俯艰,默認(rèn)的文件格式:Parquet文件(列式存儲(chǔ)文件)
scala>userDF.select([圖片上傳失敗...(image-7ea1b0-1552188416500)]
"favorite_color").write.save("/root/temp/result1")
scala>userDF.select([圖片上傳失敗...(image-667695-1552188416500)]
"favorite_color").write.format("csv").save("/root/temp/result2")
scala>userDF.select([圖片上傳失敗...(image-d13a97-1552188416500)]
"favorite_color").write.csv("/root/temp/result3")
(*)顯式指定文件格式:加載json格式
直接加載:val usersDF = spark.read.load("/root/resources/people.json")
會(huì)出錯(cuò)
val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存儲(chǔ)模式(Save Modes)
可以采用SaveMode執(zhí)行存儲(chǔ)操作,SaveMode定義了對(duì)數(shù)據(jù)的處理模式锌订。需要注意的是竹握,這些保存模式不使用任何鎖定,不是原子操作辆飘。此外啦辐,當(dāng)使用Overwrite方式執(zhí)行時(shí),在輸出新數(shù)據(jù)之前原數(shù)據(jù)就已經(jīng)被刪除蜈项。SaveMode詳細(xì)介紹如下:
默認(rèn)為SaveMode.ErrorIfExists模式芹关,該模式下,如果數(shù)據(jù)庫(kù)中已經(jīng)存在該表战得,則會(huì)直接報(bào)異常充边,導(dǎo)致數(shù)據(jù)不能存入數(shù)據(jù)庫(kù).另外三種模式如下:
SaveMode.Append 如果表已經(jīng)存在,則追加在該表中常侦;若該表不存在浇冰,則會(huì)先創(chuàng)建表,再插入數(shù)據(jù)聋亡;
SaveMode.Overwrite 重寫(xiě)模式肘习,其實(shí)質(zhì)是先將已有的表及其數(shù)據(jù)全都刪除,再重新創(chuàng)建該表坡倔,最后插入新的數(shù)據(jù)漂佩;
SaveMode.Ignore 若表不存在脖含,則創(chuàng)建表,并存入數(shù)據(jù)投蝉;在表存在的情況下养葵,直接跳過(guò)數(shù)據(jù)的存儲(chǔ),不會(huì)報(bào)錯(cuò)瘩缆。
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出錯(cuò):因?yàn)?root/result/parquet1已經(jīng)存在
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
5 讀寫(xiě)mysql
5.1 JDBC
Spark SQL可以通過(guò)JDBC從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame关拒,通過(guò)對(duì)DataFrame一系列的計(jì)算后,還可以將數(shù)據(jù)再寫(xiě)回關(guān)系型數(shù)據(jù)庫(kù)中庸娱。
5.1.1 從Mysql中加載數(shù)據(jù)庫(kù)(Spark Shell 方式)
- 啟動(dòng)Spark Shell着绊,必須指定mysql連接驅(qū)動(dòng)jar包
spark-shell --master spark://hadoop1:7077 --jars mysql-connector-java-5.1.35-bin.jar --driver-class-path mysql-connector-java-5.1.35-bin.jar
- 從mysql中加載數(shù)據(jù)
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"person", // "dbtable"->"(select * from person where id = 12) as person",
"user"->"root",
"password"->"123456")
).load()
- 執(zhí)行查詢
jdbcDF.show()
5.1.2 將數(shù)據(jù)寫(xiě)入到MySQL中(打jar包方式)
- 編寫(xiě)Spark SQL程序
import java.util.Properties
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author y15079
* @create 2018-05-12 2:50
* @desc
**/
object JdbcDFDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MysqlDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
//創(chuàng)建SQLContext spark1.6.1以下的寫(xiě)法
//val sqlContext = new SQLContext(sc)
//spark2.0 以上的寫(xiě)法
val sqlContext = SparkSession.builder().config(conf).getOrCreate()
//通過(guò)并行化創(chuàng)建RDD
val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
//通過(guò)StructType直接指定每個(gè)字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//將RDD映射到rowRDD
val rowRDD = personRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).toInt))
//將schema信息應(yīng)用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//創(chuàng)建Properties存儲(chǔ)數(shù)據(jù)庫(kù)相關(guān)屬性
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "123456")
//將數(shù)據(jù)追加到數(shù)據(jù)庫(kù)
personDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata","bigdata.person", prop)
sc.stop()
}
}
用maven-shade-plugin插件將程序打包
將jar包提交到spark集群
spark-submit
--class cn.itcast.spark.sql.jdbcDF
--master spark://hadoop1:7077
--jars mysql-connector-java-5.1.35-bin.jar
--driver-class-path mysql-connector-java-5.1.35-bin.jar
/root/demo.jar