1 簡(jiǎn)介
SparkSql 可以從各種結(jié)構(gòu)化數(shù)據(jù)源讀取數(shù)據(jù)(JSON Hive Parquet等)中讀取數(shù)據(jù)贪绘。而且SparkSql還可以通過JDBC去讀去數(shù)據(jù)稠歉。
操作Spark SQL的方式有兩種:SQL API, Dataset API
2 SQL
SparkSql的用途之一就是執(zhí)行SQL查詢嚎杨。Spark sql 可以用來從已安裝的Hive里面讀取數(shù)據(jù)蹬耘。當(dāng)執(zhí)行SQL語句時(shí)返回的結(jié)果就是DataSet/DataFrame造烁。DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs灰羽。根據(jù)官方英文我們可以知道DataFrames可以由結(jié)構(gòu)化文件(json),Hive 數(shù)據(jù)表渣刷,外在數(shù)據(jù)庫(kù)如mysql,oracle, 再或者已存在的RDD數(shù)據(jù)集構(gòu)造鹦肿。
3 使用方法
目前有兩套方式實(shí)現(xiàn)Spark Sql,其中一套就是老的辅柴,使用的是HiveContext以及 SchemaRDD 箩溃。 SchemaRDD 是一種特殊的RDD: SchemaRDD 是存放Row的RDD瞭吃,每個(gè)ROW對(duì)象代表一行記錄。這個(gè)很類似我們?cè)袛?shù)據(jù)庫(kù)的Schema涣旨。就像是表的結(jié)構(gòu)歪架。在后面的Spark版本SchemaRDD被DataFrame取代了。
3.1 數(shù)據(jù)準(zhǔn)備
這里我們先準(zhǔn)備一份json文件开泽,里面的數(shù)據(jù)很簡(jiǎn)單,命名為demo.json穆律,其文件內(nèi)容如下:
{"user":{"name":"Kason","location":"Beijing"},"text":"這個(gè)世界就是這樣"}
{"user":{"name":"Lucy","location":"Nanjing"},"text":"我也覺得樓上說得對(duì)"}
3.2 舊的API實(shí)現(xiàn):
package com.scala.action.spark_sql
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by kason_zhang on 4/14/2017.
*/
object SparkSqlDemo extends App{
val conf = new SparkConf().setAppName("SparkSqlDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
val hiveCtx = new HiveContext(sc)
val tweets = hiveCtx.jsonFile("D:\\work\\cloud\\demo.json")
//注冊(cè)輸入的SchemaRDD
tweets.registerTempTable("demo")
//根據(jù)json文件選出名字與text
val re = hiveCtx.sql("select user.name,text from demo")
println(re.collect().foreach(println))
}
其輸出結(jié)果如下:
17/04/14 17:31:35 INFO CodeGenerator: Code generated in 14.157012 ms
[Kason,這個(gè)世界就是這樣]
[Lucy,我也覺得樓上說得對(duì)]
上面的流程很簡(jiǎn)單:就是構(gòu)造HiveContext,然后通過registerTempTable方法注冊(cè)SchemaRDD(你可以理解為注冊(cè)為數(shù)據(jù)庫(kù)表)峦耘,之后就可以執(zhí)行相關(guān)sql語句了,可以看到老方法還是很簡(jiǎn)單的辅髓。
3.3 DataFrame API實(shí)現(xiàn)
分為如下幾部
- 創(chuàng)建SparkSession
- 創(chuàng)建DataFrame
- 注冊(cè)數(shù)據(jù)表DataFrame執(zhí)行數(shù)據(jù)表操作
代碼code如下所示:
//創(chuàng)建SparkSession
val spark_session = SparkSession.builder()
.appName("Spark SQL basic")
.config("spark.some.config.option","some-value")
.getOrCreate()
//讀取數(shù)據(jù)集
val dataFrame = spark_session.read.json("D:\\work\\cloud\\demo.json")
//展示數(shù)據(jù)表信息
dataFrame.show()
//把dataFrame注冊(cè)成為global temporary 視圖泣崩。
dataFrame.createGlobalTempView("test")
//執(zhí)行sql查詢
spark_session.sql("select user.name,text from global_temp.test").show()
輸出的結(jié)果是:
dataFrame可以注冊(cè)成temporary view一種臨時(shí)性view,他是回話級(jí)別的洛口,基本上回話結(jié)束他生命周期也就結(jié)束了矫付。global temporary view與Spark Application的生命周期一致。
3.4 使用RDD
SparkSQL 支持兩種不同的方法轉(zhuǎn)換RDD為Dataset第焰。
- 第一種方法
通過反射推斷schema买优。當(dāng)我們已經(jīng)知道這個(gè)schema時(shí)此法很好用
Scala接口支持自動(dòng)將case class的RDD轉(zhuǎn)化成DataFrame。case class定義了表結(jié)構(gòu)挺举,case class的參數(shù)會(huì)被反射成表結(jié)構(gòu)的列名杀赢。case class可以包括一些復(fù)雜類型如Seqs或者ArrayS。這種RDD可以隱含的轉(zhuǎn)成DataFrame然后注冊(cè)成數(shù)據(jù)表湘纵,即可執(zhí)行sql語句脂崔。
private def infer_schema_use_reflection(sparkSession: SparkSession): Unit ={
//文件內(nèi)容這樣:
/**
* kason,12
* lili,25
* sime,30
*/
//通過文件創(chuàng)建Person對(duì)象的RDD,并轉(zhuǎn)成DataFrame
import sparkSession.implicits._
val peopleDataFrame: DataFrame = sparkSession.sparkContext.textFile("D:\\data\\people.txt")
.map(str => str.split(","))
.map(attrs => Person(attrs(0),attrs(1).toInt))
.toDF()
peopleDataFrame.show()
//注冊(cè)為臨時(shí)表
peopleDataFrame.createOrReplaceTempView("people")
//執(zhí)行sql查詢
val selectRe: DataFrame = sparkSession.sql("select * from people where age > 10 and age < 30")
//展示結(jié)果
selectRe.show()
selectRe.map(row_people => "Name: " + row_people(0)).show()
selectRe.map(row_people => "Age: " + row_people.getAs[Int]("age")).show()
}
- 第二種方法
通過可編程接口梧喷,此方法比較冗余砌左,但是它允許你構(gòu)造Dataset即使你不知到列以及他的類型。
此方法適用于不能提前預(yù)知case class铺敌。步驟:
1绊困,根據(jù)軟式的RDD創(chuàng)建每行的RDD
2,創(chuàng)建匹配Row RDD的結(jié)構(gòu)schem
3适刀,通過createDataFrame方法應(yīng)用Row RDDde schema
代碼如下:
private def Programmatically_specify_schema(sparkSession: SparkSession): Unit ={
//Create RDD
val peopleRDD = sparkSession.sparkContext.textFile("D:\\data\\people.txt")
//The Schema is encoded in a string
val schemaString = "name age"
//Generate the real Schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName,StringType,nullable = true))
val schema = StructType(fields)
//將原始的RDD轉(zhuǎn)化成ROW RDD
val rowRdd = peopleRDD.map(str => str.split(","))
.map(attrs => Row(attrs(0),attrs(1).trim))
//Apply schema to the ROW RDD
val peopleDataFrame = sparkSession.createDataFrame(rowRdd,schema)
//創(chuàng)建臨時(shí)表
peopleDataFrame.createOrReplaceTempView("people")
sparkSession.sql("select age from people").show()
}