Spark SQL 教程

一乾蓬、什么是Spark SQL

Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊仪缸,它提供了兩個編程抽象分別叫做DataFrame和DataSet,它們用于作為分布式SQL查詢引擎。從下圖可以查看RDD纳鼎、DataFrames與DataSet的關系建车。


image.png

二扩借、為什么要學習Spark SQL?

我們已經(jīng)學習了Hive缤至,它是將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行潮罪,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執(zhí)行效率比較慢领斥。所以Spark SQL的應運而生嫉到,它是將Spark SQL轉(zhuǎn)換成RDD,然后提交到集群執(zhí)行月洛,執(zhí)行效率非澈味瘢快!所以我們類比的理解:Hive---SQL-->MapReduce嚼黔,Spark SQL---SQL-->RDD细层。都是一種解析傳統(tǒng)SQL到大數(shù)據(jù)運算模型的引擎,屬于數(shù)據(jù)分析的范圍隔崎。

三今艺、什么是DataFrame和DataSet?

首先,最簡單的理解我們可以認為DataFrame就是Spark中的數(shù)據(jù)表(類比傳統(tǒng)數(shù)據(jù)庫)爵卒,DataFrame的結(jié)構(gòu)如下:

DataFrame(表)= Schema(表結(jié)構(gòu)) + Data(表數(shù)據(jù))

總結(jié):DataFrame(表)是Spark SQL對結(jié)構(gòu)化數(shù)據(jù)的抽象虚缎。可以將DataFrame看做RDD钓株。

DataFrame

DataFrame是組織成命名列的數(shù)據(jù)集实牡。它在概念上等同于關系數(shù)據(jù)庫中的,但在底層具有更豐富的優(yōu)化轴合。DataFrames可以從各種來源構(gòu)建创坞,

例如:

  • 結(jié)構(gòu)化數(shù)據(jù)文件(JSON)
  • 外部數(shù)據(jù)庫或現(xiàn)有RDDs

DataFrame API支持的語言有Scala,Java受葛,Python和R题涨。

image

從上圖可以看出,DataFrame相比RDD多了數(shù)據(jù)的結(jié)構(gòu)信息总滩,即schema纲堵。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合闰渔。DataFrame除了提供了比RDD更豐富的算子以外席函,更重要的特點是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計劃的優(yōu)化冈涧。

DataSet

Dataset是數(shù)據(jù)的分布式集合茂附。Dataset是在Spark 1.6中添加的一個新接口正蛙,是DataFrame之上更高一級的抽象。它提供了RDD的優(yōu)點(強類型化)以及Spark SQL優(yōu)化后的執(zhí)行引擎的優(yōu)點营曼。一個Dataset 可以從JVM對象構(gòu)造乒验,然后使用函數(shù)轉(zhuǎn)換(map, flatMap溶推,filter等)去操作徊件。 Dataset API 支持Scala和Java。 Python不支持Dataset API蒜危。

四虱痕、測試數(shù)據(jù)

我們使用2個csv文件作為部分測試數(shù)據(jù):

dept.csv信息:

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp.csv信息:

7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

將這2個csv文件put到HDFS的hdfs://bigdata111:9000/input/csvFiles/目錄以便后面使用

[root@bigdata111 ~]# hdfs dfs -ls /input/csvFiles
Found 2 items
-rw-r--r--   1 root supergroup         84 2018-06-15 13:40 /input/csvFiles/dept.csv
-rw-r--r--   1 root supergroup        617 2018-06-15 13:40 /input/csvFiles/emp.csv

五、創(chuàng)建DataFrame

前提:在集群模式下啟動spark-shell:bin/spark-shell --master spark://bigdata111:7077

image

方式1:使用case class定義表

(1) 定義case class代表表的結(jié)構(gòu)schema
scala>case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
        
(2) 導入emp.csv文件(導入數(shù)據(jù))
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//讀取Linux本地數(shù)據(jù)
或者
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/csvFiles/emp.csv").map(_.split(","))//讀取HDFS數(shù)據(jù)

(3) 生成表: DataFrame
scala>val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

(4)由allEmp直接生成表
scala>val empDF = allEmp.toDF

(4) 操作: DSL語句
scala>empDF.show         ---->  select * from emp
scala>empDF.printSchema  ---->  desc emp

操作結(jié)果:


image

方式2:使用SparkSession對象創(chuàng)建DataFrame

什么是SparkSession?

Apache Spark 2.0引入了SparkSession辐赞,其為用戶提供了一個統(tǒng)一的切入點來使用Spark的各項功能部翘,并且允許用戶通過它調(diào)用DataFrame和Dataset相關API來編寫Spark程序。最重要的是响委,它減少了用戶需要了解的一些概念新思,使得我們可以很容易地與Spark交互。
在2.0版本之前赘风,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext夹囚。然而在Spark 2.0中,我們可以通過SparkSession來實現(xiàn)同樣的功能邀窃,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext荸哟,因為這些對象已經(jīng)封裝在SparkSession中。

通過SparkSession可以訪問Spark所有的模塊!

image

使用Sparksession創(chuàng)建DataFrame過程:

    (2)加載結(jié)構(gòu)化數(shù)據(jù)
        scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//讀取Linux數(shù)據(jù)
        或者
        scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//讀取HDFS數(shù)據(jù)

        (3) 定義schema:StructType
        scala>import org.apache.spark.sql._
        scala>import org.apache.spark.sql.types._
        scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
        , StructField("ename", DataTypes.StringType)
        ,StructField("job", DataTypes.StringType)
        ,StructField("mgr", DataTypes.StringType)
        ,StructField("hiredate", DataTypes.StringType)
        ,StructField("sal", DataTypes.IntegerType)
        ,StructField("comm", DataTypes.StringType)
        ,StructField("deptno", DataTypes.IntegerType)))

        (4)把讀入的每一行數(shù)據(jù)映射成一個個Row
        scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

        (5) 使用SparkSession.createDataFrame創(chuàng)建表
        scala>val df = spark.createDataFrame(rowRDD,myschema)


可以看到df支持的函數(shù)很多瞬捕,其實就是RDD的算子鞍历。這里也可以看出DF很像一個RDD。

image.png

方式3:直接讀取格式化的文件(json,csv)等-最簡單

前提:數(shù)據(jù)文件本身一定具有格式,這里我們選取json格式的數(shù)據(jù)肪虎,json文件可以使用spark例子中提供的people.json劣砍。你也可以使用任意json文件進行操作。
測試數(shù)據(jù)如下:
[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc  kv1.txt  people.json  people.txt  user.avsc  users.avro  users.parquet
[root@bigdata111 resources]# more people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

使用SparkSession對象直接讀取Json文件
spark>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
創(chuàng)建完畢DF之后就可以直接查看表的信息扇救,十分的簡單:

六刑枝、操作DataFrame(DSL+SQL)

DataFrame操作也稱為無類型的Dataset操作.操作的DataFrame是方法1創(chuàng)建的empDF.

>1.DSL(domain-specific language)操作DataFrame

1.查看所有的員工信息===selec * from empDF;
scala>empDF.show


image

2.查詢所有的員工姓名 ($符號添加不加功能一樣)===select ename,deptno from empDF;
scala>empDF.select("ename","deptno").show
scala>empDF.select("ename","deptno").show

image

3.查詢所有的員工姓名和薪水,并給薪水加100塊錢===select ename,sal,sal+100 from empDF;
scala>empDF.select("ename","sal",$"sal"+100).show

image

4.查詢工資大于2000的員工===select * from empDF where sal>2000;
scala>empDF.filter($"sal" > 2000).show

image

5.分組===select deptno,count(*) from empDF group by deptno;
scala>empDF.groupBy("deptno").count.show scala>empDF.groupBy("deptno").avg().show
scala>empDF.groupBy($"deptno").max().show

image

2.SQL操作DataFrame

(1)前提條件:需要把DataFrame注冊成是一個Table或者View
scala>empDF.createOrReplaceTempView("emp")

(2)使用SparkSession執(zhí)行從查詢
scala>spark.sql("select * from emp").show
scala>spark.sql("select * from emp where deptno=10").show


image

(3)求每個部門的工資總額
scala>spark.sql("select deptno,sum(sal) from emp group by deptno").show


image

七迅腔、視圖(臨時和全局視圖)

在使用SQL操作DataFrame的時候仅讽,有一個前提就是必須通過DF創(chuàng)建一個表或者視圖:empDF.createOrReplaceTempView("emp")

在SparkSQL中,如果你想擁有一個臨時的view钾挟,并想在不同的Session中共享,而且在application的運行周期內(nèi)可用饱岸,那么就需要創(chuàng)建一個全局的臨時view掺出。并記得使用的時候加上global_temp作為前綴來引用它徽千,因為全局的臨時view是綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp上。

① 創(chuàng)建一個普通的view和一個全局的view
scala>empDF.createOrReplaceTempView("emp1")
scala>empDF.createGlobalTempView("emp2")

image

② 在當前會話中執(zhí)行查詢汤锨,均可查詢出結(jié)果双抽。
scala>spark.sql("select * from emp1").show
scala>spark.sql("select * from global_temp.emp2").show

image

③ 開啟一個新的會話,執(zhí)行同樣的查詢
scala>spark.newSession.sql("select * from emp1").show (運行出錯)
scala>spark.newSession.sql("select * from global_temp.emp2").show

image

八闲礼、使用數(shù)據(jù)源

在介紹parquet文件的時候我們使用的是Spark例子文件夾中提供的users.parquet文件:

[root@bigdata111 resources]# pwd
/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources
[root@bigdata111 resources]# ls
full_user.avsc kv1.txt people.json people.txt temp user.avsc users.avro users.parquet

1牍汹、通用的Load/Save函數(shù)

(*)什么是parquet文件?
Parquet是列式存儲格式的一種文件類型柬泽,列式存儲有以下的核心:

  • 可以跳過不符合條件的數(shù)據(jù)慎菲,只讀取需要的數(shù)據(jù),降低IO數(shù)據(jù)量锨并。
  • 壓縮編碼可以降低磁盤存儲空間露该。由于同一列的數(shù)據(jù)類型是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節(jié)約存儲空間第煮。
  • 只讀取需要的列解幼,支持向量運算,能夠獲取更好的掃描性能包警。

Parquet格式是Spark SQL的默認數(shù)據(jù)源撵摆,可通過spark.sql.sources.default配置

(*)通用的Load/Save函數(shù)

對比如下語句:

scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")

查詢Schema和數(shù)據(jù):scala>userDF.show

image
  • save函數(shù)保存數(shù)據(jù),默認的文件格式:Parquet文件(列式存儲文件)

scala>userDF.select("name","favorite_color").write.save("/root/temp/result1")
scala>userDF.select("name","favorite_color").write.format("csv").save("/root/temp/result2")
scala>userDF.select("name","favorite_color").write.csv("/root/temp/result3")

image
image

(*)顯式指定文件格式:加載json格式
直接加載:val usersDF = spark.read.load("/root/resources/people.json")
會出錯
val usersDF = spark.read.format("json").load("/root/resources/people.json")

(*)存儲模式(Save Modes)
可以采用SaveMode執(zhí)行存儲操作害晦,SaveMode定義了對數(shù)據(jù)的處理模式特铝。需要注意的是,這些保存模式不使用任何鎖定篱瞎,不是原子操作苟呐。此外,當使用Overwrite方式執(zhí)行時俐筋,在輸出新數(shù)據(jù)之前原數(shù)據(jù)就已經(jīng)被刪除牵素。SaveMode詳細介紹如下:
默認為SaveMode.ErrorIfExists模式,該模式下澄者,如果數(shù)據(jù)庫中已經(jīng)存在該表笆呆,則會直接報異常,導致數(shù)據(jù)不能存入數(shù)據(jù)庫.另外三種模式如下:
SaveMode.Append 如果表已經(jīng)存在粱挡,則追加在該表中赠幕;若該表不存在,則會先創(chuàng)建表询筏,再插入數(shù)據(jù)榕堰;
SaveMode.Overwrite 重寫模式,其實質(zhì)是先將已有的表及其數(shù)據(jù)全都刪除,再重新創(chuàng)建該表逆屡,最后插入新的數(shù)據(jù)圾旨;
SaveMode.Ignore 若表不存在,則創(chuàng)建表魏蔗,并存入數(shù)據(jù)砍的;在表存在的情況下,直接跳過數(shù)據(jù)的存儲莺治,不會報錯廓鞠。

Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出錯:因為/root/result/parquet1已經(jīng)存在

usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

5 讀寫mysql

5.1 JDBC

Spark SQL可以通過JDBC從關系型數(shù)據(jù)庫中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過對DataFrame一系列的計算后谣旁,還可以將數(shù)據(jù)再寫回關系型數(shù)據(jù)庫中床佳。

5.1.1 從Mysql中加載數(shù)據(jù)庫(Spark Shell 方式)

  1. 啟動Spark Shell,必須指定mysql連接驅(qū)動jar包
spark-shell --master spark://hadoop1:7077 --jars mysql-connector-java-5.1.35-bin.jar --driver-class-path mysql-connector-java-5.1.35-bin.jar
  1. 從mysql中加載數(shù)據(jù)
val jdbcDF = sqlContext.read.format("jdbc").options(
     Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
            "driver"->"com.mysql.jdbc.Driver", 
            "dbtable"->"person", //  "dbtable"->"(select * from person where id = 12) as person", 
            "user"->"root",
            "password"->"123456")
     ).load()
  1. 執(zhí)行查詢
jdbcDF.show()

5.1.2 將數(shù)據(jù)寫入到MySQL中(打jar包方式)

  1. 編寫Spark SQL程序
import java.util.Properties
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author y15079
  * @create 2018-05-12 2:50
  * @desc
  **/
object JdbcDFDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MysqlDemo").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //創(chuàng)建SQLContext spark1.6.1以下的寫法
    //val sqlContext = new SQLContext(sc)

    //spark2.0 以上的寫法
    val sqlContext = SparkSession.builder().config(conf).getOrCreate()

    //通過并行化創(chuàng)建RDD
    val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //通過StructType直接指定每個字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )

    //將RDD映射到rowRDD
    val rowRDD = personRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).toInt))
    //將schema信息應用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //創(chuàng)建Properties存儲數(shù)據(jù)庫相關屬性
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //將數(shù)據(jù)追加到數(shù)據(jù)庫
    personDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata","bigdata.person", prop)

    sc.stop()
  }
}
  1. 用maven-shade-plugin插件將程序打包

  2. 將jar包提交到spark集群

spark-submit 
--class cn.itcast.spark.sql.jdbcDF 
--master spark://hadoop1:7077 
--jars mysql-connector-java-5.1.35-bin.jar 
--driver-class-path mysql-connector-java-5.1.35-bin.jar 
/root/demo.jar
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蔓挖,一起剝皮案震驚了整個濱河市夕土,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瘟判,老刑警劉巖怨绣,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異拷获,居然都是意外死亡篮撑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門匆瓜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赢笨,“玉大人,你說我怎么就攤上這事驮吱〖攵剩” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵左冬,是天一觀的道長桐筏。 經(jīng)常有香客問我,道長拇砰,這世上最難降的妖魔是什么梅忌? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮除破,結(jié)果婚禮上牧氮,老公的妹妹穿的比我還像新娘。我一直安慰自己瑰枫,他們只是感情好踱葛,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般尸诽。 火紅的嫁衣襯著肌膚如雪圾笨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天逊谋,我揣著相機與錄音,去河邊找鬼土铺。 笑死胶滋,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的悲敷。 我是一名探鬼主播究恤,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼后德!你這毒婦竟也來了部宿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤瓢湃,失蹤者是張志新(化名)和其女友劉穎理张,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绵患,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡雾叭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了落蝙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片织狐。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖筏勒,靈堂內(nèi)的尸體忽然破棺而出移迫,到底是詐尸還是另有隱情,我是刑警寧澤管行,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布厨埋,位于F島的核電站,受9級特大地震影響病瞳,放射性物質(zhì)發(fā)生泄漏揽咕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一套菜、第九天 我趴在偏房一處隱蔽的房頂上張望亲善。 院中可真熱鬧,春花似錦逗柴、人聲如沸蛹头。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽渣蜗。三九已至屠尊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間耕拷,已是汗流浹背讼昆。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留骚烧,地道東北人浸赫。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像赃绊,于是被迫代替她去往敵國和親既峡。 傳聞我的和親對象是個殘疾皇子魔招,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容