Spark SQL的簡介
1. 簡介
Spark SQL是用于處理結(jié)構(gòu)化數(shù)據(jù)的模塊肛真。與Spark RDD不同的是写妥,Spark SQL提供數(shù)據(jù)的結(jié)構(gòu)信息(源數(shù)據(jù))和性能更好媚污,可以通過SQL和DataSet API與Spark SQL進(jìn)行交互断医。
2. 特點(diǎn)
2.1 Spark程序和SQL可以無縫對接
2.2 統(tǒng)一數(shù)據(jù)訪問:使用相同方式鏈接到數(shù)據(jù)
2.3 集成hive:在hive數(shù)據(jù)倉庫中可以執(zhí)行HQL泰鸡,也可以執(zhí)行SQL
2.4 標(biāo)準(zhǔn)連接:通過JDBC或者ODBC連接到數(shù)據(jù)
Spark SQL應(yīng)用場景
1.執(zhí)行SQL:從hive中讀取數(shù)據(jù)
Hive Table
- Hive擁有很多依賴债蓝,但是不再Spark中,如果能從classpath中讀取這些依賴盛龄,Spark會自動加載這些以來饰迹,如果是集群,在每個節(jié)點(diǎn)上都要有這些依賴余舶,以便訪問到Hive中的數(shù)據(jù)啊鸭。
- 需要將Hive的配置文件hive-site.xml(用來訪問源數(shù)據(jù)),拷貝到Spark的conf目錄下匿值。
- 在讀取Hive的數(shù)據(jù)時赠制,要實(shí)例化一個SparkSession對象,用戶不需要部署hive。如果沒有配置hive-site.xml,Spark會自動在spark.sql.warehouse.dir(如果沒有指定挟憔,在當(dāng)前目錄)指定的文件下創(chuàng)建metastore_db文件钟些,在Spark2.0.0之后,spark.sql.warehouse.dir取代hive.metastore.warehouse.dir指定源數(shù)據(jù)的存放位置绊谭。
- 如果使用Hive默認(rèn)的數(shù)據(jù)庫(derby),只能使用一個連接厘唾,因?yàn)閐erby是單session的。
- 需要將相應(yīng)的連接數(shù)據(jù)庫的jar包龙誊,放到Hive和Spark的目錄下,如果不這樣做的話喷楣,可以在spark配置文件中指定的驅(qū)動程序的位置趟大,在spark-defaults.conf文件鹤树,也可以使用--jars來引入驅(qū)動程序。
spark.driver.extraClassPath ../jars/mysql-connector-java-bin-5.1.27.jar # jar包的位置
spark.executor.extraClassPath ../jars/mysql-connector-java-bin-5.1.27.jar # jar包的位置
- 配置驅(qū)動程序的方式:a)直接將驅(qū)動程序放入hive和spark的目錄中逊朽;b)使用參數(shù)--jars;c)配置到spark-defaults.conf的文件中
Spark SQL執(zhí)行過程
- 從Hive MetaStore中查找指定的數(shù)據(jù)庫中是否有查找的數(shù)據(jù)表罕伯,如果有,執(zhí)行下面的操作叽讳,如果沒有返回錯誤追他。
- Hive中的數(shù)據(jù)是存儲在HDFS上,通過SparkContext來啟動Job岛蚤。
- DAGScheduler 將DAG切分成為stage(多個task邑狸,taskSet),并且提交stage
- TaskScheduler 在cluster manager啟動task涤妒,如果task失敗進(jìn)行重試
- Worker 運(yùn)行task
- WebUI 查看結(jié)果
2.DataSet
- DataSet是一個分布式數(shù)據(jù)集合单雾,從Spark1.6開始添加的,可以從JVM或者通過transform操作獲取DataSet她紫。Python不支持DataSet API硅堆,Java和Scala支持。
- DataFrame是對DataSet數(shù)據(jù)進(jìn)行組織贿讹,類似一個二維表渐逃,具有字段名稱和數(shù)據(jù)∶窆樱可以通過結(jié)構(gòu)化數(shù)據(jù)文件茄菊、Hive的數(shù)據(jù)表、外部數(shù)據(jù)源和已經(jīng)存在的RDD中獲取助赞。Python买羞、Java和Scala都支持DataFrame API,DataFrame借鑒自Python雹食。
- DataFrame=DataSet[Row],SchemaRDD(Spark1.2)==>DataFrame(Spark1.6)==>DataSet(Spark2.0.0)
Spark SQL操作
簡介
- 創(chuàng)建SparkSession對象:在創(chuàng)建的過程中會設(shè)置一些屬性
var sparkSession=SparkSession.builder()
.appName("SparkSessionApp")
.master("local[2]")
.getOrCreate()
2.獲取DataFrame
2.1外部數(shù)據(jù)源
//Spark Sql支持的文件格式為:json, parquet, jdbc, orc, libsvm, csv, text
//如果是text的讀取方式畜普,會返回一個字符串列名字為value,其他的返回dataframe。
sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load().show()
sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show()
//如果讀取的是csv文件群叶,需要添加頭信息
sparkSession.read.format("csv").option("sep", ";").option("header",true).option("inferSchema", "true").load("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.csv")
2.2從HiveTable中獲取DataFrame
sparkSession.table("people").show()
需要將hive-site.xml文件拷貝到項(xiàng)目resources文件下吃挑,然后在pom文件中添加:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
在創(chuàng)建SparkSession對象時,需要添加參數(shù)街立,用來支持Hive:
var sparkSession=SparkSession.builder()
.appName("SparkSessionApp")
.master("local[2]")
.enableHiveSupport() //使用hive一定要開啟這個
.getOrCreate()
2.3從現(xiàn)有的RDD轉(zhuǎn)換為DataFrame
import sparkSession.implicits._
sparkSession.sparkContext.parallelize(Array(("1", "xiaoyao"), ("2", "yisheng"),( "3", "bruce"))).toDF("id","name").show()
2.4spark讀取的默認(rèn)格式為parquet
println(sparkSession.conf.get("spark.sql.sources.default")) //輸出為parquet,默認(rèn)的數(shù)據(jù)源格式為parquet舶衬。
2.5輸出schema信息
val df=sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.printSchema()
//printSchema()==>println(schema.treeString)
Spark1.5之后,Spark自己來管理內(nèi)存而不是使用JVM赎离,這樣可以避免JVM GC帶來的性能損失逛犹。內(nèi)存中的Java對象被存儲成Spark自己的二進(jìn)制格式,計算直接繁盛在二進(jìn)制格式上,省去了序列化和發(fā)序列化時間虽画。同時這種格式也更加緊湊舞蔽,節(jié)省內(nèi)存空間,而且能更好地估計數(shù)據(jù)量大小和內(nèi)存使用情況码撰。默認(rèn)情況下為開啟狀態(tài):spark.sql.tungsten.enabled=true渗柿。
3.獲取指定的數(shù)據(jù)
3.1 獲取一列數(shù)據(jù)
val df=sparkSession.read.format("json").option("path","/Users/Download/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").show()
//下面兩種方式需要將sparkSession定義為val類型,需要導(dǎo)入隱士轉(zhuǎn)換:import sparkSession.implicits._
df.select($"name").show()
df.select('name).show() //需要進(jìn)行隱士轉(zhuǎn)換脖岛,在spark-shell中可以直接操作
df.select(col("domain")).show() //需要導(dǎo)入以來朵栖,import org.apache.spark.sql.functions.col
3.2對某些列進(jìn)行操作
df.select('name,'age+1).show() //需要進(jìn)行隱士轉(zhuǎn)換,在spark-shell中可以直接操作
3.3對數(shù)據(jù)進(jìn)行過濾
df.filter("age>21").show() //age>20
df.filter('age>21).show() //age>20
df.filter($"age">21).show() //age>20
df.filter('age===19).show() //age==19
df.filter($"age"===19).show() //age==19
df.filter("age=19").show() //age==19
df.filter("age==19").show() //age==19
在上面的參數(shù)中show(),默認(rèn)展示20條數(shù)據(jù)柴梆,可以指定設(shè)置展示的個陨溅,第二個參數(shù)為是否對字符串進(jìn)行裁剪(如果字符串的長度超過20個,就需要這個參數(shù)truncate)轩性。
3.4 聚合函數(shù)
df.groupBy("domain").agg(sum("responseSize").as("rs"))
.select("domain","rs").show() //后面的agg表示執(zhí)行聚合操作
4.創(chuàng)建數(shù)據(jù)的視圖
由于數(shù)據(jù)的視圖是與SparkSession的同生命周期(SparkSession結(jié)束声登,視圖失效),可以創(chuàng)建一個全局的視圖揣苏,可以在全局使用悯嗓,然后就可以使用sql來操作數(shù)據(jù)
df.createOrReplaceGlobalTempView("people")
sparkSession.sql("SELECT * FROM global_temp.people").show() #不要同時在Spark-shell與idea同時運(yùn)行
5.與RDD進(jìn)行交互
有兩種方式將RDD轉(zhuǎn)換為DataSet:
a)使用反射來推斷出來對象的類型,這種方式可以減少代碼量卸察,如果知道數(shù)據(jù)類型之后脯厨,可以提高性能;
def inferReflection(sparkSession: SparkSession): Unit ={
//get the origin data
val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
//split the data and convert to an object
import sparkSession.implicits._
//簡單寫法
//val df=info.map(_.split(",")).map(x=>People(x(0),x(1).toLong)).toDF()
//復(fù)雜寫法,在使用這種方式時坑质,需要表明每一列的名稱
val df=info.map(x=>{
val temp=x.split(",")
(temp(0),temp(1).toLong)
}).toDF("name","age")
//Before Spark2.0,we can operate directly.Since Spark2.0,we need to convert the dataframe to rdd.
//df.map(x=>x(0)).collect().foreach(println) //before spark2.0
//df.rdd.map(x=>x(0)).collect().foreach(println) //since spark2.0
//df.rdd.map(x=>x.get(0)).collect().foreach(println)
//df.rdd.map(x=>x.getAs[String](0)).collect().foreach(println)
//df.rdd.map(x=>x.getString(0)).collect().foreach(println)
//df.rdd.map(x=>x.getAs[String]("name")).collect().foreach(println)
//df.select(col("name")).show()
/**
* Compute the average for all numeric columns grouped by department.
* ds.groupBy("department").avg()
* // Compute the max age and average salary, grouped by department and gender.
* ds.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
*/
df.groupBy("name").agg(sum("age").as("age"))
.select("name","age").show()
}
//scala2.10之前合武,字段個數(shù)有限制:最大值為22個,在scala2.10之后,就沒有這個限制涡扼,如果超過這個限制稼跳,修改設(shè)計或者使用nested case classes。
case class People(name:String,age:Long)
b)通過編程方式吃沪,構(gòu)建schema結(jié)構(gòu)汤善,然后作用與現(xiàn)有的RDD,這種方式在運(yùn)行時才會知道字段的類型票彪。
def programmatically(sparkSession: SparkSession): Unit ={
/**
* 1.Create an RDD of Rows from the original RDD;
* 2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
* 3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
*/
//1.Create an RDD of Rows from the original RDD;
val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
val rdd=info.map(_.split(",")).map(x=>Row(x (0),x(1).toLong))
//2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
val struct= StructType(
StructField("name", StringType, false) ::
StructField("age", LongType, false) :: Nil)
//第二中寫法
/*val struct= StructType(
Array(StructField("name", StringType, false),
StructField("age", LongType, false)))*/
//Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
val df=sparkSession.createDataFrame(rdd,struct)
//df.show()
df.groupBy("name").agg(sum("age").as("age"))
.select("name","age").show()
}
c)聚合函數(shù)
df.groupBy("name").agg(Map("age"->"max")).show()
d)自定義函數(shù)
自定義函數(shù)的步驟:1.defined the udf红淡;2.register the udf;3.use the udf
1.defined the udf
def getStringLength(name: String)={
name.length()
}
2.register the udf
sparkSession.udf.register("strlen",(name:String)=>{
getStringLength(name)
})
3.use the udf
//Registers this Dataset as a temporary table using the given name,
// registerTempTable is deprecated since 2.0.0,we can use createOrReplaceTempView instead.
df.registerTempTable("people")
df.createOrReplaceTempView("people")
sparkSession.sql("select name,strlen(name) from people").show()
e)增加字段
//使用sql的方式
sparkSession.udf.register("strlen",(name:String,age:Long)=>{
getStringLength(name,age)
})
df.registerTempTable("people")
df.createOrReplaceTempView("people")
sparkSession.sql("select name,strlen(name) as length from people").show()
//udf第二種方式
val code=(args0:String)=>{
(args0.length())
}
val addCol=udf(code)
df.withColumn("other",addCol(df("name"))).show(false)
//udf 第三種寫法
sparkSession.udf.register("strlen",(name:String,age:Long)=>{
getStringLength(name,age)
})
import sparkSession.implicits._
df.withColumn("length",callUDF("strlen",$"name",$"age")).show()
f)創(chuàng)建DataSet
DataSet使用具體的編碼器而不是用java和kryo的序列化方式降铸。
編碼器和序列化都是將對象變成字節(jié)在旱,編碼器是動態(tài)產(chǎn)生并且可以執(zhí)行過濾、排序和hash等操作而不用反序列化為對象推掸。
import sparkSession.implicits._
//讀取數(shù)據(jù)桶蝎,直接對應(yīng)到類
//val peolpleDs = sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").as[People]
//peolpleDs.show()
//將數(shù)據(jù)直接序列化
val ds = Seq(People("Andy", 32)).toDS()
ds.show()
6.將數(shù)據(jù)寫入到文件中
//在寫文件時驻仅,只能指定文件的路徑,不能指定文件的名稱
val df=sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").write.format("json").mode(SaveMode.Overwrite).save("/Users/Downloads/test/")
//也可以指定不同的格式
df.select("name").write.format("text").mode(SaveMode.Overwrite).save("/Users/Downloads/test/")
//寫入orc文件:
df.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("users_with_options.orc")
//保存為HiveTable,也可以指定路徑
val df=sparkSession.read.format("json").option("path","/Users//Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").write.saveAsTable("people")
//sparkSession.sql("select * from people").show()
sparkSession.table("people").show()
6.1 SaveMode
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(默認(rèn)) | "error" or "errorifexists" | 如果文件存在俊嗽,就報錯 |
SaveMode.Append | "append" | 如果文件存在雾家,直接追加,如果不存在绍豁,創(chuàng)建 |
SaveMode.Overwrite | "overwrite" | 如果存在直接重寫,如果不存在牙捉,創(chuàng)建 |
SaveMode.Ignore | "ignore" | 如果存在竹揍,不寫 |
6.2 Saving to Persistent Tables
DatFrame使用saveAsTable將數(shù)據(jù)存儲到HiveMetaStore中(不需要安裝Hive)。如果Spark應(yīng)用程序重啟之后可以連接到之前HiveMetaStore邪铲,數(shù)據(jù)依然存在芬位。
使用df.write.option("path", "/some/path").saveAsTable("t"),如果自定義路徑之后带到,刪除表之后昧碉,路徑和數(shù)據(jù)依然存在。如果沒有自定路徑揽惹,刪除表之后被饿,表和數(shù)據(jù)都沒有拉。
從Spark2.1之后搪搏,可以存儲分區(qū)表狭握,有以下好處:
- 只返回查詢的分區(qū),不用掃描所有的分區(qū)
- 在DataSource 中疯溺,可以直接使用Hive DDLs
在默認(rèn)情況论颅,創(chuàng)建外部表的時候,不會存儲分區(qū)信息囱嫩,需要使用命令MSCK REPAIR TABLE來將這些信息加上恃疯。
6.3 Bucketing, Sorting and Partitioning
Bucketing和分區(qū)只能在持久化表的時候,才能使用
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在使用DataSet API時墨闲,可以使用save和saveAsTable今妄。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
在操作單表的時候,可以使用partitioning和bucketing
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
7.直接運(yùn)行SQL
sparkSession.sql("select * from parquet.`/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet`").show()
數(shù)據(jù)源操作
1.parquet
1.1讀取數(shù)據(jù)保存為parquet格式
val df=sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df.write.parquet("people.parquet")
val peopleDF=sparkSession.read.parquet("people.parquet")
peopleDF.createOrReplaceTempView("parquetFile")
val namesDF=sparkSession.sql("select name from parquetFile where age between 13 and 19")
namesDF.show()
1.2發(fā)現(xiàn)分區(qū)
內(nèi)置文件數(shù)據(jù)源Text/CSV/JSON/ORC/Parquet可以自動發(fā)現(xiàn)分區(qū)和獲取分區(qū)信息损俭。
SparkSql可以從路徑中自動發(fā)現(xiàn)分區(qū)信息和分區(qū)字段的類型蛙奖,目前支持類型為:numeric data types、date杆兵、timestamp雁仲、string。
如果不需要字段的類型琐脏,設(shè)置參數(shù)spark.sql.sources.partitionColumnTypeInference.enabled=false攒砖。
Spark1.6開始支持發(fā)現(xiàn)分區(qū)缸兔,只查找指定路徑下的分區(qū)。如果需要更深層次的分區(qū)吹艇,需要指定basePath惰蜜。
1.3Schema Merging
從SparkSQL1.5開始該功能默認(rèn)關(guān)閉,因?yàn)榉浅:膬?nèi)存受神。如果需要的話抛猖,可以開啟,設(shè)置spark.sql.parquet.mergeSchema=true鼻听。
Parquet數(shù)據(jù)可以自動對數(shù)據(jù)的schema信息進(jìn)行合并财著。
1.4Hive metastore Parquet table conversion
Hive與Parquet在處理表schema信息的區(qū)別:
a)Hive不區(qū)分大小寫,Parquet區(qū)分大小寫;
b)Hive需要考慮列是否為空,Parquet不需要考慮凌停;
在將Hive的Parquet數(shù)據(jù)表轉(zhuǎn)換為SparkSQL的Parquet表時,需要將Hive中的源數(shù)據(jù)與Parquet的源數(shù)據(jù)進(jìn)行轉(zhuǎn)換伟姐。
轉(zhuǎn)換規(guī)則為:僅出現(xiàn)在Parquet中出現(xiàn)的源數(shù)據(jù)需要考慮,僅出現(xiàn)在Hive源數(shù)據(jù)中亿卤,可以添加為null值列愤兵。
1.5刷新MetaData
為了提高性能,需要對Parquet數(shù)據(jù)進(jìn)行緩存怠噪,Hive metastore Parquet table conversion時恐似,會造成一些源數(shù)據(jù)發(fā)生變化,需要刷新代碼中緩存的源數(shù)據(jù)傍念,使用
sparkSession.catalog.refreshTable("my_table")
1.6配置
常用的配置有:
參數(shù) | 解釋 |
---|---|
spark.sql.parquet.binaryAsString | 為了兼容之前的版本和其他的源數(shù)據(jù)矫夷,將二進(jìn)制變成字符串,默認(rèn)為不開啟 |
spark.sql.parquet.int96AsTimestamp | 將其他系統(tǒng)中的INT96轉(zhuǎn)換為SparkSQL的timestamp類型憋槐,默認(rèn)為轉(zhuǎn)換 |
spark.sql.parquet.compression.codec | 對Parquet數(shù)據(jù)進(jìn)行壓縮 |
spark.sql.parquet.filterPushdown | 用于過濾優(yōu)化 |
1.7 Parquet文件的優(yōu)點(diǎn)
- 高效双藕,Parquet采取列示存儲避免讀入不需要的數(shù)據(jù),具有極好的性能和GC阳仔。
- 方便的壓縮和解壓縮忧陪,并具有極好的壓縮比例。
- 可以直接固化為parquet文件近范,可以直接讀取parquet文件嘶摊,具有比磁盤更好的緩存效果。
2.ORC文件
從Spark2.3支持矢量化讀取ORC文件评矩,需要開啟一下配置:
參數(shù) | 解釋 |
---|---|
spark.sql.orc.impl | native和Hive二選一叶堆,native是基于ORC1.4,Hive是基于Hive的ORC1.2.1 |
spark.sql.orc.enableVectorizedReader | 默認(rèn)為true,在本地可以矢量化讀取斥杜,否則虱颗,不可以沥匈,如果讀取hive數(shù)據(jù),忽略這個配置 |
3.JSON
在讀取JSON文件時忘渔,分單行和多行讀取高帖,如果使用多行讀取,需要將multiline配置設(shè)置為true畦粮。
4.HiveTable
在從Hive總讀取數(shù)據(jù)時散址,需要指定spark.sql.warehouse.dir的地址。
可以創(chuàng)建Hive表宣赔、加載數(shù)據(jù)爪飘、分區(qū)
//指定warehouse的路徑,需要導(dǎo)入import java.io.File
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val sparkSession = SparkSession
.builder()
.master("local[2]")
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() //連接到Hive必須要有的配置
.getOrCreate()
import sparkSession.implicits._
import sparkSession.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") //創(chuàng)建數(shù)據(jù)表
sql("LOAD DATA LOCAL INPATH '/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/kv1.txt' INTO TABLE src")
//查詢數(shù)據(jù)
sql("SELECT * FROM src").show()
//聚合操作
sql("SELECT COUNT(*) FROM src").show()
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
//遍歷每一行數(shù)據(jù)
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
//創(chuàng)建表并設(shè)置存儲格式
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = sparkSession.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
//指定路徑
val dataDir = "/tmp/parquet_data"
sparkSession.range(10).write.parquet(dataDir)
//指定外部表
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
//開啟動態(tài)分區(qū)
sparkSession.sqlContext.setConf("hive.exec.dynamic.partition", "true")
sparkSession.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
//創(chuàng)建分區(qū)表
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
sparkSession.sql("SELECT * FROM hive_part_tbl").show()
sparkSession.stop()
case class Record(key: Int, value: String)
4.1設(shè)置文件的存儲格式
默認(rèn)情況下拉背,讀取Hive Table都是以文本文件讀取。Hive storage handler不支持創(chuàng)建表(spark sql中)默终,在hive中創(chuàng)建表椅棺,使用spark sql讀取。
參數(shù) | 解釋 |
---|---|
fileFormat | 存儲文件格式:sequencefile齐蔽、rcfile两疚、orc、parquet含滴、textfile诱渤、avro。 |
inputFormat, outputFormat | 設(shè)置輸入格式和輸出格式 |
serde | 進(jìn)行序列化的方式 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 指定分隔符 |
4.2與Hive MetaStore進(jìn)行交互
從SparkSQL1.4可以訪問各種Hive MetaStore谈况。
參數(shù) | 解釋 |
---|---|
spark.sql.hive.metastore.version | 默認(rèn)Hive版本為1.2.1,支持0.12.0到2.3.3 |
spark.sql.hive.metastore.jars | 實(shí)例化HiveMetastoreClient的三種方式:內(nèi)置(必須要定義spark.sql.hive.metastore.version)勺美、maven(不推薦使用)、classpath(必須要包含所有以來) |
spark.sql.hive.metastore.sharedPrefixes | 連接數(shù)據(jù)庫驅(qū)動程序 |
spark.sql.hive.metastore.barrierPrefixes | 連接不同版本的Hive |
5.JDBC
參數(shù) | 解釋 |
---|---|
url | 連接字符串 |
dbtable | 連接的表 |
query | 查詢語句碑韵,dbtable不能與query同時使用赡茸,query不能與partitionColumn同時使用 |
driver | 連接驅(qū)動類 |
numPartitions | 同時讀取分區(qū)的數(shù)量,同時最大連接數(shù)祝闻,如果數(shù)量超過限制占卧,會對數(shù)據(jù)進(jìn)行重新分配 |
partitionColumn, lowerBound, upperBound | 查詢條件限制,按照那個字段進(jìn)行分區(qū)联喘,掃描數(shù)據(jù)的范圍 |
fetchsize | 返回數(shù)據(jù)的大小 |
batchsize | 一次獲取數(shù)據(jù)的大小 |
createTableOptions | 創(chuàng)建表時指定分區(qū)等一系列參數(shù) |
pushDownPredicate | 是否使用謂詞下壓华蜒,默認(rèn)為true |
設(shè)置連接相關(guān)參數(shù)的方式:1.使用option來指定;2.通過Properties來指定豁遭。
val sparkSession=SparkSession
.builder()
.master("local[2]")
.appName("Spark Jdbc Example")
.getOrCreate()
//讀取數(shù)據(jù)
val jdbcDF = sparkSession.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/xiaoyao")
.option("dbtable", "select * from xiaoyao.TBLS")
.option("user", "root")
.option("password", "123456")
.option("driver","com.mysql.jdbc.Driver")
.load()
jdbcDF.show()
val connectProperties=new Properties()
connectProperties.put("user","root")
connectProperties.put("password","123456")
connectProperties.put("driver","com.mysql.jdbc.Driver")
val jdbcDF2=sparkSession.read.jdbc("jdbc:mysql://localhost:3306/xiaoyao","people",connectProperties)
jdbcDF2.show()
// 寫入數(shù)據(jù)
val schema= StructType(StructField("namge", StringType, false) :: StructField("age", LongType, false) :: Nil)
val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
val rdd=info.map(_.split(",")).map(x=>Row(x (0),x(1).toLong))
val peopleDF=sparkSession.createDataFrame(rdd,schema)
peopleDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/xiaoyao","people",connectProperties)
jdbcDF.show()
6.Troubleshooting
- 原始class loader必須要對JDBC Driver可見叭喜,因?yàn)镴ava有安全檢查;修改classpath.sh堤框,使得classpath.sh包含driver的jar域滥。
- 一些數(shù)據(jù)庫會將名字變成大寫纵柿。在Spark SQL需要將名字變成大寫。
- 用戶可以通過指定具體的JDBC來連接到相應(yīng)的數(shù)據(jù)庫启绰。
自定義外部數(shù)據(jù)源
名詞解釋
1.BaseRelation
Schema是一個KV的數(shù)據(jù)組合昂儒,繼承BaseRelation的類,必須要設(shè)置自己的Schema(StructType形式)委可,實(shí)現(xiàn)Scan類的相關(guān)方法渊跋。主要功能是定義Schema信息。
2.TableScan
將元祖數(shù)據(jù)變成Rows對象中的數(shù)據(jù)着倾。
3.PrunedScan
將不需要的列過濾掉拾酝。
4.PrunedFilteredScan
先進(jìn)行數(shù)據(jù)過濾然后將不需要的列過濾掉。
5.InsertableRelation
將數(shù)據(jù)寫回去卡者。
有三種假設(shè):a)插入字段與原先的字段一致蒿囤;b)Schema細(xì)膩不能變化;c)插入的數(shù)據(jù)可以為空崇决。
6.RelationProvider
指定數(shù)據(jù)源材诽,如果沒有找到指定的數(shù)據(jù)源,就找DefaultSource附加到路徑中恒傻,每次調(diào)用DDL都會創(chuàng)建實(shí)例對象脸侥。
7.DataSourceRegister
注冊自定義的外部數(shù)據(jù)源
關(guān)于自定義數(shù)據(jù)源請參考后續(xù)文章。