Spark SQL解決了什么問題
這個(gè)之前,先說下Hive,Hive有自己的語言Hive SQL(HQL),利用sql語句查詢,然后走的是MapReduce程序,提交到集群上運(yùn)行.這樣的話有個(gè)很大的優(yōu)勢,那就是它相比MapReduce節(jié)省了很多的代碼,很多..
但是也有個(gè)致命的缺陷,那就是MapReduce.(后面我想寫一篇MapReduce從仰望到失望,從失望到絕望...)? 前面也說過,MR相比Spark的RDD,性能速度正如官方所說的,有百倍之差..? 既然Spark這么強(qiáng),那為何不出一個(gè)Spark SQL直接對(duì)應(yīng)Hive呢,底層走的是Spark呢?? 于是就有了Spark SQL.它將Spark SQL轉(zhuǎn)換成RDD杖狼,然后提交到集群執(zhí)行赛蔫,執(zhí)行效率也是MR和spark的差距幔睬。
Spark sql宏觀了解
Spark sql是Spark的一個(gè)組件,Spark sql自己也有兩個(gè)組件:DataFrame / DataSet.
Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊谢澈,它提供了一個(gè)編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用.
從上圖可以看出,SparkSQL可以看做是一個(gè)轉(zhuǎn)換層,向下對(duì)接各種不同的結(jié)構(gòu)化數(shù)據(jù)源葵硕,向上提供不同的數(shù)據(jù)訪問方式竞穷。
DataFrame / DataSet / RDD的關(guān)系
之前我們講,RDD是Spark的基石,因?yàn)槠渌膕park框架都是運(yùn)行在Spark core上的.但是在我們Spark sql里面,就有點(diǎn)區(qū)別了.
在Spark sql中,DataSet是核心,沒有之一.但是DataSet僅限于Spark sql中,不能在其他框架中使用,所以RDD依舊還是spark的基石,依舊是核心.而DataFrame已經(jīng)被DataSet替換了,DataFrame能實(shí)現(xiàn)的功能,DataSet都能實(shí)現(xiàn),相反,DataFrame卻不能.
三者的關(guān)系如下:
RDD + schema(數(shù)據(jù)的結(jié)構(gòu)信息) = DataFrame = DataSet[Row]
RDD 0.x版本發(fā)布,? DataFrame1.3版本發(fā)布, DataSet1.6版本發(fā)布.
RDD: 提供了很多實(shí)用簡單的API, www.reibang.com/p/a3a64f51ddf4 ,這是我之前寫的RDD的
DataFrame: DataFrame可以理解為一個(gè)傳統(tǒng)數(shù)據(jù)庫的二維表格,除了數(shù)據(jù)以外,還記錄著數(shù)據(jù)的結(jié)構(gòu)信息,即schema.DataFrame API提供的是一套高層的關(guān)系操作,比函數(shù)式的RDD API要更加友好
DataSet: DataSet[Row] = Dataframe ;? 它是Dataframe API的一個(gè)擴(kuò)展,是spark最新的數(shù)據(jù)抽象,也是Spark SQL中最核心的組件,它基本代替了Dataframe,也有完全代替Dataframe的趨勢.
注: RDD不支持spark sql的操作
RDD / DataSet / Dataframe之間的轉(zhuǎn)換
上面說到RDD不支持Spark sql的操作,但是Spark生態(tài)圈只提供了Spark core一個(gè)計(jì)算框架,且Spark生態(tài)圈都是基于Spark core進(jìn)行計(jì)算的,所以Spark core對(duì)接Spark sql的方式就是:將RDD轉(zhuǎn)換為DataSet? /? Dataframe,且三者之間支持互相轉(zhuǎn)換!
轉(zhuǎn)換之前先聊一下DataFrame支持兩種查詢方式:一種是DSL風(fēng)格,另外一種是SQL風(fēng)格暖途,dataFrame支持兩者查詢風(fēng)格
DSL: 你需要引入? import spark.implicit._? 這個(gè)隱式轉(zhuǎn)換卑惜,可以將DataFrame隱式轉(zhuǎn)換成RDD。
SQL: 你需要將DataFrame注冊成一張表格,且需要通過sparkSession.sql 方法來運(yùn)行你的SQL語句
用Spark-shell來操作Spark SQL驻售,spark作為SparkSession的變量名露久,sc作為SparkContext的變量名.
將文件中的數(shù)據(jù)轉(zhuǎn)換成DataSet? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //先case一個(gè)類? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? case class person(name:String,age:Int)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //將文件里面的數(shù)據(jù)轉(zhuǎn)換成DataSet? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? val peopleDF2 =spark.sparkContext.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(para=> Person(para(0).trim,para(1).trim.toInt)).toDF? ? ? ? ? ? ? ? ? ? //制成一張person的表? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? peopleDF2.createOrReplaceTempView("persons")? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //查詢? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? val teen =spark.sql("select * from persons where age between 13 and 30")? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //因?yàn)镈ataFrame=DataSet[row],所以要訪問row對(duì)象中的每一個(gè)元素,可以通過這種row[0]角標(biāo)的方式來訪問,上面是通過反射獲取schema????????????????????????????????????????????????????????????????????????????????????? teen.map(row => "name:" + row(0)).show
//將RDD轉(zhuǎn)換成DataFrame? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? var peopleRDD = sc.textFile("../examples/src/main/resources/people.txt")? ? ? ? ? ? ? ? ? ? ? ? peopleRDD.collect? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //以”,”切割,得到一個(gè)Array,然后再用map對(duì)里面的每一個(gè)元素都進(jìn)行轉(zhuǎn)換,最后用toDF方法給這兩個(gè)起名字? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? val peopleDF =peopleRDD.map(_.split(",")).map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
//將 DataFrame轉(zhuǎn)換成RDD? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? val aa = peopleDF.rdd??????????? //對(duì),沒錯(cuò)欺栗,就是一行毫痕!一個(gè)方法搞定?????????????????????
//將RDD轉(zhuǎn)換成DataSet? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? case class person(name:String,age:Int)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //先定義一個(gè)case實(shí)例,最后是直接toDS就ok了? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? val peopleDF= peopleRDD.map(_.split(",")).map(para=>person(para(0).trim(),para(1).trim().toInt)).toDS
//將DataSet轉(zhuǎn)換成RDD??????????????????????????????????????????????????????????????????????????????????????????????????????????????????? val bb = peopleDF.rdd ? ? ?? //和上面一樣,一個(gè)方法搞定
//將DataFrame轉(zhuǎn)換成DataSet? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? peopleDF.as[person]?????????? as那個(gè)case的類
將DataSet轉(zhuǎn)換成DataFrame? ? ? ? ? ? ? ???????????????????????????????????????????????????????????????????????????????? PeopleDS.toDF?????????????????? 用的toDF方法
Spark SQL鏈接Hive
Spark SQL和Hive的連接有兩種,一種是Spark內(nèi)置的Hive,一種是Spark連接外部的Hive.
內(nèi)置Hive: 內(nèi)置Hive和Spark會(huì)完美地兼容,但是我用的都是外置的Hive
外置Hive: 這是Spark連接Hive的主要模式; 實(shí)現(xiàn)方式:
1. 需要將hive-site.xml 拷貝到spark的conf目錄下纸巷。
2. 如果hive的metestore使用的是mysql數(shù)據(jù)庫镇草,那么需要將mysql的jdbc驅(qū)動(dòng)包放到spark的jars目錄下。
3. 用bin/spark-shell打開時(shí)候瘤旨,第一次需要在后面加--confspark.sql.warehouse.dir=hdfs:hadoop101:9000/spark_warehouse