Spark SQL 相關(guān)

Spark SQL的簡介

1. 簡介

Spark SQL是用于處理結(jié)構(gòu)化數(shù)據(jù)的模塊肛真。與Spark RDD不同的是写妥,Spark SQL提供數(shù)據(jù)的結(jié)構(gòu)信息(源數(shù)據(jù))和性能更好媚污,可以通過SQL和DataSet API與Spark SQL進(jìn)行交互断医。

2. 特點(diǎn)

2.1 Spark程序和SQL可以無縫對接

2.2 統(tǒng)一數(shù)據(jù)訪問:使用相同方式鏈接到數(shù)據(jù)

2.3 集成hive:在hive數(shù)據(jù)倉庫中可以執(zhí)行HQL泰鸡,也可以執(zhí)行SQL

2.4 標(biāo)準(zhǔn)連接:通過JDBC或者ODBC連接到數(shù)據(jù)

Spark SQL應(yīng)用場景

1.執(zhí)行SQL:從hive中讀取數(shù)據(jù)

Hive Table

  • Hive擁有很多依賴债蓝,但是不再Spark中,如果能從classpath中讀取這些依賴盛龄,Spark會自動加載這些以來饰迹,如果是集群,在每個節(jié)點(diǎn)上都要有這些依賴余舶,以便訪問到Hive中的數(shù)據(jù)啊鸭。
  • 需要將Hive的配置文件hive-site.xml(用來訪問源數(shù)據(jù)),拷貝到Spark的conf目錄下匿值。
  • 在讀取Hive的數(shù)據(jù)時赠制,要實(shí)例化一個SparkSession對象,用戶不需要部署hive。如果沒有配置hive-site.xml,Spark會自動在spark.sql.warehouse.dir(如果沒有指定挟憔,在當(dāng)前目錄)指定的文件下創(chuàng)建metastore_db文件钟些,在Spark2.0.0之后,spark.sql.warehouse.dir取代hive.metastore.warehouse.dir指定源數(shù)據(jù)的存放位置绊谭。
  • 如果使用Hive默認(rèn)的數(shù)據(jù)庫(derby),只能使用一個連接厘唾,因?yàn)閐erby是單session的。
  • 需要將相應(yīng)的連接數(shù)據(jù)庫的jar包龙誊,放到Hive和Spark的目錄下,如果不這樣做的話喷楣,可以在spark配置文件中指定的驅(qū)動程序的位置趟大,在spark-defaults.conf文件鹤树,也可以使用--jars來引入驅(qū)動程序。
spark.driver.extraClassPath  ../jars/mysql-connector-java-bin-5.1.27.jar # jar包的位置
spark.executor.extraClassPath  ../jars/mysql-connector-java-bin-5.1.27.jar # jar包的位置
  • 配置驅(qū)動程序的方式:a)直接將驅(qū)動程序放入hive和spark的目錄中逊朽;b)使用參數(shù)--jars;c)配置到spark-defaults.conf的文件中

Spark SQL執(zhí)行過程

  • 從Hive MetaStore中查找指定的數(shù)據(jù)庫中是否有查找的數(shù)據(jù)表罕伯,如果有,執(zhí)行下面的操作叽讳,如果沒有返回錯誤追他。
  • Hive中的數(shù)據(jù)是存儲在HDFS上,通過SparkContext來啟動Job岛蚤。
  • DAGScheduler 將DAG切分成為stage(多個task邑狸,taskSet),并且提交stage
  • TaskScheduler 在cluster manager啟動task涤妒,如果task失敗進(jìn)行重試
  • Worker 運(yùn)行task
  • WebUI 查看結(jié)果

2.DataSet

  • DataSet是一個分布式數(shù)據(jù)集合单雾,從Spark1.6開始添加的,可以從JVM或者通過transform操作獲取DataSet她紫。Python不支持DataSet API硅堆,Java和Scala支持。
  • DataFrame是對DataSet數(shù)據(jù)進(jìn)行組織贿讹,類似一個二維表渐逃,具有字段名稱和數(shù)據(jù)∶窆樱可以通過結(jié)構(gòu)化數(shù)據(jù)文件茄菊、Hive的數(shù)據(jù)表、外部數(shù)據(jù)源和已經(jīng)存在的RDD中獲取助赞。Python买羞、Java和Scala都支持DataFrame API,DataFrame借鑒自Python雹食。
  • DataFrame=DataSet[Row],SchemaRDD(Spark1.2)==>DataFrame(Spark1.6)==>DataSet(Spark2.0.0)

Spark SQL操作

簡介

  1. 創(chuàng)建SparkSession對象:在創(chuàng)建的過程中會設(shè)置一些屬性
var sparkSession=SparkSession.builder()
      .appName("SparkSessionApp")
      .master("local[2]")
      .getOrCreate()

2.獲取DataFrame

2.1外部數(shù)據(jù)源

//Spark Sql支持的文件格式為:json, parquet, jdbc, orc, libsvm, csv, text
//如果是text的讀取方式畜普,會返回一個字符串列名字為value,其他的返回dataframe。
sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load().show()
sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show()
//如果讀取的是csv文件群叶,需要添加頭信息
sparkSession.read.format("csv").option("sep", ";").option("header",true).option("inferSchema", "true").load("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.csv")

2.2從HiveTable中獲取DataFrame

sparkSession.table("people").show()

需要將hive-site.xml文件拷貝到項(xiàng)目resources文件下吃挑,然后在pom文件中添加:

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.27</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

在創(chuàng)建SparkSession對象時,需要添加參數(shù)街立,用來支持Hive:

var sparkSession=SparkSession.builder()
      .appName("SparkSessionApp")
      .master("local[2]")
      .enableHiveSupport() //使用hive一定要開啟這個
      .getOrCreate()

2.3從現(xiàn)有的RDD轉(zhuǎn)換為DataFrame

import sparkSession.implicits._
sparkSession.sparkContext.parallelize(Array(("1", "xiaoyao"), ("2", "yisheng"),( "3", "bruce"))).toDF("id","name").show()

2.4spark讀取的默認(rèn)格式為parquet

println(sparkSession.conf.get("spark.sql.sources.default")) //輸出為parquet,默認(rèn)的數(shù)據(jù)源格式為parquet舶衬。

2.5輸出schema信息

val df=sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.printSchema()
//printSchema()==>println(schema.treeString)

Spark1.5之后,Spark自己來管理內(nèi)存而不是使用JVM赎离,這樣可以避免JVM GC帶來的性能損失逛犹。內(nèi)存中的Java對象被存儲成Spark自己的二進(jìn)制格式,計算直接繁盛在二進(jìn)制格式上,省去了序列化和發(fā)序列化時間虽画。同時這種格式也更加緊湊舞蔽,節(jié)省內(nèi)存空間,而且能更好地估計數(shù)據(jù)量大小和內(nèi)存使用情況码撰。默認(rèn)情況下為開啟狀態(tài):spark.sql.tungsten.enabled=true渗柿。

3.獲取指定的數(shù)據(jù)

3.1 獲取一列數(shù)據(jù)

val df=sparkSession.read.format("json").option("path","/Users/Download/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").show()
//下面兩種方式需要將sparkSession定義為val類型,需要導(dǎo)入隱士轉(zhuǎn)換:import sparkSession.implicits._
df.select($"name").show() 
df.select('name).show() //需要進(jìn)行隱士轉(zhuǎn)換脖岛,在spark-shell中可以直接操作
df.select(col("domain")).show() //需要導(dǎo)入以來朵栖,import org.apache.spark.sql.functions.col

3.2對某些列進(jìn)行操作

df.select('name,'age+1).show() //需要進(jìn)行隱士轉(zhuǎn)換,在spark-shell中可以直接操作

3.3對數(shù)據(jù)進(jìn)行過濾

df.filter("age>21").show() //age>20
df.filter('age>21).show()  //age>20
df.filter($"age">21).show() //age>20
df.filter('age===19).show() //age==19
df.filter($"age"===19).show() //age==19
df.filter("age=19").show() //age==19
df.filter("age==19").show() //age==19

在上面的參數(shù)中show(),默認(rèn)展示20條數(shù)據(jù)柴梆,可以指定設(shè)置展示的個陨溅,第二個參數(shù)為是否對字符串進(jìn)行裁剪(如果字符串的長度超過20個,就需要這個參數(shù)truncate)轩性。
3.4 聚合函數(shù)

df.groupBy("domain").agg(sum("responseSize").as("rs"))
      .select("domain","rs").show() //后面的agg表示執(zhí)行聚合操作

4.創(chuàng)建數(shù)據(jù)的視圖
由于數(shù)據(jù)的視圖是與SparkSession的同生命周期(SparkSession結(jié)束声登,視圖失效),可以創(chuàng)建一個全局的視圖揣苏,可以在全局使用悯嗓,然后就可以使用sql來操作數(shù)據(jù)

df.createOrReplaceGlobalTempView("people")
    sparkSession.sql("SELECT * FROM global_temp.people").show() #不要同時在Spark-shell與idea同時運(yùn)行

5.與RDD進(jìn)行交互
有兩種方式將RDD轉(zhuǎn)換為DataSet:

a)使用反射來推斷出來對象的類型,這種方式可以減少代碼量卸察,如果知道數(shù)據(jù)類型之后脯厨,可以提高性能;

def inferReflection(sparkSession: SparkSession): Unit ={
    //get the origin data
    val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
    //split the data and convert to an object
    import sparkSession.implicits._
    //簡單寫法
    //val df=info.map(_.split(",")).map(x=>People(x(0),x(1).toLong)).toDF()
    //復(fù)雜寫法,在使用這種方式時坑质,需要表明每一列的名稱
    val df=info.map(x=>{
      val temp=x.split(",")
      (temp(0),temp(1).toLong)
    }).toDF("name","age")

    //Before Spark2.0,we can operate directly.Since Spark2.0,we need to convert the dataframe to rdd.
    //df.map(x=>x(0)).collect().foreach(println) //before spark2.0
    //df.rdd.map(x=>x(0)).collect().foreach(println) //since spark2.0
    //df.rdd.map(x=>x.get(0)).collect().foreach(println)
    //df.rdd.map(x=>x.getAs[String](0)).collect().foreach(println)
    //df.rdd.map(x=>x.getString(0)).collect().foreach(println)
    //df.rdd.map(x=>x.getAs[String]("name")).collect().foreach(println)
    //df.select(col("name")).show()
    /**
      * Compute the average for all numeric columns grouped by department.
      * ds.groupBy("department").avg()
      * // Compute the max age and average salary, grouped by department and gender.
      * ds.groupBy($"department", $"gender").agg(Map(
      * "salary" -> "avg",
      * "age" -> "max"
      * ))
      */
    df.groupBy("name").agg(sum("age").as("age"))
     .select("name","age").show()
  }
  //scala2.10之前合武,字段個數(shù)有限制:最大值為22個,在scala2.10之后,就沒有這個限制涡扼,如果超過這個限制稼跳,修改設(shè)計或者使用nested case classes。
case class People(name:String,age:Long) 

b)通過編程方式吃沪,構(gòu)建schema結(jié)構(gòu)汤善,然后作用與現(xiàn)有的RDD,這種方式在運(yùn)行時才會知道字段的類型票彪。

def programmatically(sparkSession: SparkSession): Unit ={
    /**
      * 1.Create an RDD of Rows from the original RDD;
      * 2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
      * 3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
      */
    //1.Create an RDD of Rows from the original RDD;
    val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
    val rdd=info.map(_.split(",")).map(x=>Row(x (0),x(1).toLong))
    //2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
    val struct= StructType(
      StructField("name", StringType, false) ::
        StructField("age", LongType, false) :: Nil)
    //第二中寫法
    /*val struct= StructType(
                Array(StructField("name", StringType, false),
                  StructField("age", LongType, false)))*/
    //Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
    val df=sparkSession.createDataFrame(rdd,struct)
    //df.show()
    df.groupBy("name").agg(sum("age").as("age"))
      .select("name","age").show()
  }

c)聚合函數(shù)

df.groupBy("name").agg(Map("age"->"max")).show()

d)自定義函數(shù)

自定義函數(shù)的步驟:1.defined the udf红淡;2.register the udf;3.use the udf
1.defined the udf
def getStringLength(name: String)={
    name.length()
}
2.register the udf
sparkSession.udf.register("strlen",(name:String)=>{
      getStringLength(name)
})
3.use the udf
//Registers this Dataset as a temporary table using the given name,
// registerTempTable is deprecated since 2.0.0,we can use createOrReplaceTempView instead.
df.registerTempTable("people")
df.createOrReplaceTempView("people")
sparkSession.sql("select name,strlen(name) from people").show()

e)增加字段

//使用sql的方式
sparkSession.udf.register("strlen",(name:String,age:Long)=>{
  getStringLength(name,age)
})
df.registerTempTable("people")
df.createOrReplaceTempView("people")
sparkSession.sql("select name,strlen(name) as length from people").show()
//udf第二種方式
val code=(args0:String)=>{
  (args0.length())
}
val addCol=udf(code)
df.withColumn("other",addCol(df("name"))).show(false)
//udf 第三種寫法
sparkSession.udf.register("strlen",(name:String,age:Long)=>{
  getStringLength(name,age)
})
import sparkSession.implicits._
df.withColumn("length",callUDF("strlen",$"name",$"age")).show()

f)創(chuàng)建DataSet

DataSet使用具體的編碼器而不是用java和kryo的序列化方式降铸。
編碼器和序列化都是將對象變成字節(jié)在旱,編碼器是動態(tài)產(chǎn)生并且可以執(zhí)行過濾、排序和hash等操作而不用反序列化為對象推掸。
import sparkSession.implicits._
//讀取數(shù)據(jù)桶蝎,直接對應(yīng)到類
//val peolpleDs = sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").as[People]
//peolpleDs.show()
//將數(shù)據(jù)直接序列化
val ds = Seq(People("Andy", 32)).toDS()
ds.show()

6.將數(shù)據(jù)寫入到文件中

//在寫文件時驻仅,只能指定文件的路徑,不能指定文件的名稱
val df=sparkSession.read.format("json").option("path","/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").write.format("json").mode(SaveMode.Overwrite).save("/Users/Downloads/test/")
//也可以指定不同的格式    
df.select("name").write.format("text").mode(SaveMode.Overwrite).save("/Users/Downloads/test/")
//寫入orc文件:
df.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("users_with_options.orc")
//保存為HiveTable,也可以指定路徑
val df=sparkSession.read.format("json").option("path","/Users//Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").load()
df.select("name").write.saveAsTable("people")
//sparkSession.sql("select * from people").show()
sparkSession.table("people").show()

6.1 SaveMode

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(默認(rèn)) "error" or "errorifexists" 如果文件存在俊嗽,就報錯
SaveMode.Append "append" 如果文件存在雾家,直接追加,如果不存在绍豁,創(chuàng)建
SaveMode.Overwrite "overwrite" 如果存在直接重寫,如果不存在牙捉,創(chuàng)建
SaveMode.Ignore "ignore" 如果存在竹揍,不寫

6.2 Saving to Persistent Tables

DatFrame使用saveAsTable將數(shù)據(jù)存儲到HiveMetaStore中(不需要安裝Hive)。如果Spark應(yīng)用程序重啟之后可以連接到之前HiveMetaStore邪铲,數(shù)據(jù)依然存在芬位。

使用df.write.option("path", "/some/path").saveAsTable("t"),如果自定義路徑之后带到,刪除表之后昧碉,路徑和數(shù)據(jù)依然存在。如果沒有自定路徑揽惹,刪除表之后被饿,表和數(shù)據(jù)都沒有拉。

從Spark2.1之后搪搏,可以存儲分區(qū)表狭握,有以下好處:

  • 只返回查詢的分區(qū),不用掃描所有的分區(qū)
  • 在DataSource 中疯溺,可以直接使用Hive DDLs
    在默認(rèn)情況论颅,創(chuàng)建外部表的時候,不會存儲分區(qū)信息囱嫩,需要使用命令MSCK REPAIR TABLE來將這些信息加上恃疯。

6.3 Bucketing, Sorting and Partitioning

Bucketing和分區(qū)只能在持久化表的時候,才能使用

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

在使用DataSet API時墨闲,可以使用save和saveAsTable今妄。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

在操作單表的時候,可以使用partitioning和bucketing

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

7.直接運(yùn)行SQL

sparkSession.sql("select * from parquet.`/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet`").show()

數(shù)據(jù)源操作

1.parquet
1.1讀取數(shù)據(jù)保存為parquet格式

val df=sparkSession.read.json("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
    df.write.parquet("people.parquet")
    val peopleDF=sparkSession.read.parquet("people.parquet")
    peopleDF.createOrReplaceTempView("parquetFile")
    val namesDF=sparkSession.sql("select name from parquetFile where age between 13 and 19")
    namesDF.show()

1.2發(fā)現(xiàn)分區(qū)

內(nèi)置文件數(shù)據(jù)源Text/CSV/JSON/ORC/Parquet可以自動發(fā)現(xiàn)分區(qū)和獲取分區(qū)信息损俭。
SparkSql可以從路徑中自動發(fā)現(xiàn)分區(qū)信息和分區(qū)字段的類型蛙奖,目前支持類型為:numeric data types、date杆兵、timestamp雁仲、string。
如果不需要字段的類型琐脏,設(shè)置參數(shù)spark.sql.sources.partitionColumnTypeInference.enabled=false攒砖。
Spark1.6開始支持發(fā)現(xiàn)分區(qū)缸兔,只查找指定路徑下的分區(qū)。如果需要更深層次的分區(qū)吹艇,需要指定basePath惰蜜。

1.3Schema Merging

從SparkSQL1.5開始該功能默認(rèn)關(guān)閉,因?yàn)榉浅:膬?nèi)存受神。如果需要的話抛猖,可以開啟,設(shè)置spark.sql.parquet.mergeSchema=true鼻听。
Parquet數(shù)據(jù)可以自動對數(shù)據(jù)的schema信息進(jìn)行合并财著。

1.4Hive metastore Parquet table conversion

Hive與Parquet在處理表schema信息的區(qū)別:
a)Hive不區(qū)分大小寫,Parquet區(qū)分大小寫;
b)Hive需要考慮列是否為空,Parquet不需要考慮凌停;
在將Hive的Parquet數(shù)據(jù)表轉(zhuǎn)換為SparkSQL的Parquet表時,需要將Hive中的源數(shù)據(jù)與Parquet的源數(shù)據(jù)進(jìn)行轉(zhuǎn)換伟姐。
轉(zhuǎn)換規(guī)則為:僅出現(xiàn)在Parquet中出現(xiàn)的源數(shù)據(jù)需要考慮,僅出現(xiàn)在Hive源數(shù)據(jù)中亿卤,可以添加為null值列愤兵。

1.5刷新MetaData
為了提高性能,需要對Parquet數(shù)據(jù)進(jìn)行緩存怠噪,Hive metastore Parquet table conversion時恐似,會造成一些源數(shù)據(jù)發(fā)生變化,需要刷新代碼中緩存的源數(shù)據(jù)傍念,使用

sparkSession.catalog.refreshTable("my_table")

1.6配置

常用的配置有:

參數(shù) 解釋
spark.sql.parquet.binaryAsString 為了兼容之前的版本和其他的源數(shù)據(jù)矫夷,將二進(jìn)制變成字符串,默認(rèn)為不開啟
spark.sql.parquet.int96AsTimestamp 將其他系統(tǒng)中的INT96轉(zhuǎn)換為SparkSQL的timestamp類型憋槐,默認(rèn)為轉(zhuǎn)換
spark.sql.parquet.compression.codec 對Parquet數(shù)據(jù)進(jìn)行壓縮
spark.sql.parquet.filterPushdown 用于過濾優(yōu)化

1.7 Parquet文件的優(yōu)點(diǎn)

  • 高效双藕,Parquet采取列示存儲避免讀入不需要的數(shù)據(jù),具有極好的性能和GC阳仔。
  • 方便的壓縮和解壓縮忧陪,并具有極好的壓縮比例。
  • 可以直接固化為parquet文件近范,可以直接讀取parquet文件嘶摊,具有比磁盤更好的緩存效果。

2.ORC文件
從Spark2.3支持矢量化讀取ORC文件评矩,需要開啟一下配置:

參數(shù) 解釋
spark.sql.orc.impl native和Hive二選一叶堆,native是基于ORC1.4,Hive是基于Hive的ORC1.2.1
spark.sql.orc.enableVectorizedReader 默認(rèn)為true,在本地可以矢量化讀取斥杜,否則虱颗,不可以沥匈,如果讀取hive數(shù)據(jù),忽略這個配置

3.JSON

在讀取JSON文件時忘渔,分單行和多行讀取高帖,如果使用多行讀取,需要將multiline配置設(shè)置為true畦粮。

4.HiveTable
在從Hive總讀取數(shù)據(jù)時散址,需要指定spark.sql.warehouse.dir的地址。
可以創(chuàng)建Hive表宣赔、加載數(shù)據(jù)爪飘、分區(qū)

//指定warehouse的路徑,需要導(dǎo)入import java.io.File
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val sparkSession = SparkSession
  .builder()
  .master("local[2]")
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport() //連接到Hive必須要有的配置
  .getOrCreate()
import sparkSession.implicits._
import sparkSession.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") //創(chuàng)建數(shù)據(jù)表
sql("LOAD DATA LOCAL INPATH '/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/kv1.txt' INTO TABLE src")
//查詢數(shù)據(jù)
sql("SELECT * FROM src").show()
//聚合操作
sql("SELECT COUNT(*) FROM src").show()
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
//遍歷每一行數(shù)據(jù)
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
//創(chuàng)建表并設(shè)置存儲格式
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = sparkSession.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
//指定路徑
val dataDir = "/tmp/parquet_data"
sparkSession.range(10).write.parquet(dataDir)
//指定外部表
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
//開啟動態(tài)分區(qū)
sparkSession.sqlContext.setConf("hive.exec.dynamic.partition", "true")
sparkSession.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
//創(chuàng)建分區(qū)表
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
sparkSession.sql("SELECT * FROM hive_part_tbl").show()
sparkSession.stop()
case class Record(key: Int, value: String)

4.1設(shè)置文件的存儲格式
默認(rèn)情況下拉背,讀取Hive Table都是以文本文件讀取。Hive storage handler不支持創(chuàng)建表(spark sql中)默终,在hive中創(chuàng)建表椅棺,使用spark sql讀取。

參數(shù) 解釋
fileFormat 存儲文件格式:sequencefile齐蔽、rcfile两疚、orc、parquet含滴、textfile诱渤、avro。
inputFormat, outputFormat 設(shè)置輸入格式和輸出格式
serde 進(jìn)行序列化的方式
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 指定分隔符

4.2與Hive MetaStore進(jìn)行交互
從SparkSQL1.4可以訪問各種Hive MetaStore谈况。

參數(shù) 解釋
spark.sql.hive.metastore.version 默認(rèn)Hive版本為1.2.1,支持0.12.0到2.3.3
spark.sql.hive.metastore.jars 實(shí)例化HiveMetastoreClient的三種方式:內(nèi)置(必須要定義spark.sql.hive.metastore.version)勺美、maven(不推薦使用)、classpath(必須要包含所有以來)
spark.sql.hive.metastore.sharedPrefixes 連接數(shù)據(jù)庫驅(qū)動程序
spark.sql.hive.metastore.barrierPrefixes 連接不同版本的Hive

5.JDBC

參數(shù) 解釋
url 連接字符串
dbtable 連接的表
query 查詢語句碑韵,dbtable不能與query同時使用赡茸,query不能與partitionColumn同時使用
driver 連接驅(qū)動類
numPartitions 同時讀取分區(qū)的數(shù)量,同時最大連接數(shù)祝闻,如果數(shù)量超過限制占卧,會對數(shù)據(jù)進(jìn)行重新分配
partitionColumn, lowerBound, upperBound 查詢條件限制,按照那個字段進(jìn)行分區(qū)联喘,掃描數(shù)據(jù)的范圍
fetchsize 返回數(shù)據(jù)的大小
batchsize 一次獲取數(shù)據(jù)的大小
createTableOptions 創(chuàng)建表時指定分區(qū)等一系列參數(shù)
pushDownPredicate 是否使用謂詞下壓华蜒,默認(rèn)為true

設(shè)置連接相關(guān)參數(shù)的方式:1.使用option來指定;2.通過Properties來指定豁遭。

val sparkSession=SparkSession
  .builder()
  .master("local[2]")
  .appName("Spark Jdbc Example")
  .getOrCreate()
//讀取數(shù)據(jù)
val jdbcDF = sparkSession.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/xiaoyao")
  .option("dbtable", "select * from xiaoyao.TBLS")
  .option("user", "root")
  .option("password", "123456")
  .option("driver","com.mysql.jdbc.Driver")
  .load()
jdbcDF.show()
val connectProperties=new Properties()
connectProperties.put("user","root")
connectProperties.put("password","123456")
connectProperties.put("driver","com.mysql.jdbc.Driver")
val jdbcDF2=sparkSession.read.jdbc("jdbc:mysql://localhost:3306/xiaoyao","people",connectProperties)
jdbcDF2.show()
// 寫入數(shù)據(jù)
val schema= StructType(StructField("namge", StringType, false) :: StructField("age", LongType, false) :: Nil)
val info=sparkSession.sparkContext.textFile("/Users/Documents/test/spark-2.2.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.txt")
val rdd=info.map(_.split(",")).map(x=>Row(x (0),x(1).toLong))
val peopleDF=sparkSession.createDataFrame(rdd,schema)
peopleDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/xiaoyao","people",connectProperties)
jdbcDF.show()

6.Troubleshooting

  • 原始class loader必須要對JDBC Driver可見叭喜,因?yàn)镴ava有安全檢查;修改classpath.sh堤框,使得classpath.sh包含driver的jar域滥。
  • 一些數(shù)據(jù)庫會將名字變成大寫纵柿。在Spark SQL需要將名字變成大寫。
  • 用戶可以通過指定具體的JDBC來連接到相應(yīng)的數(shù)據(jù)庫启绰。

自定義外部數(shù)據(jù)源

名詞解釋

1.BaseRelation

Schema是一個KV的數(shù)據(jù)組合昂儒,繼承BaseRelation的類,必須要設(shè)置自己的Schema(StructType形式)委可,實(shí)現(xiàn)Scan類的相關(guān)方法渊跋。主要功能是定義Schema信息。

2.TableScan

將元祖數(shù)據(jù)變成Rows對象中的數(shù)據(jù)着倾。

3.PrunedScan

將不需要的列過濾掉拾酝。

4.PrunedFilteredScan

先進(jìn)行數(shù)據(jù)過濾然后將不需要的列過濾掉。

5.InsertableRelation

將數(shù)據(jù)寫回去卡者。

有三種假設(shè):a)插入字段與原先的字段一致蒿囤;b)Schema細(xì)膩不能變化;c)插入的數(shù)據(jù)可以為空崇决。

6.RelationProvider

指定數(shù)據(jù)源材诽,如果沒有找到指定的數(shù)據(jù)源,就找DefaultSource附加到路徑中恒傻,每次調(diào)用DDL都會創(chuàng)建實(shí)例對象脸侥。

7.DataSourceRegister

注冊自定義的外部數(shù)據(jù)源

關(guān)于自定義數(shù)據(jù)源請參考后續(xù)文章。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盈厘,一起剝皮案震驚了整個濱河市睁枕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌沸手,老刑警劉巖外遇,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異罐氨,居然都是意外死亡臀规,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進(jìn)店門栅隐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來塔嬉,“玉大人,你說我怎么就攤上這事租悄〗骶浚” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵泣棋,是天一觀的道長胶哲。 經(jīng)常有香客問我,道長潭辈,這世上最難降的妖魔是什么鸯屿? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任澈吨,我火速辦了婚禮,結(jié)果婚禮上寄摆,老公的妹妹穿的比我還像新娘谅辣。我一直安慰自己,他們只是感情好婶恼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布桑阶。 她就那樣靜靜地躺著,像睡著了一般勾邦。 火紅的嫁衣襯著肌膚如雪蚣录。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天眷篇,我揣著相機(jī)與錄音萎河,去河邊找鬼。 笑死蕉饼,一個胖子當(dāng)著我的面吹牛公壤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播椎椰,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼沾鳄!你這毒婦竟也來了慨飘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤译荞,失蹤者是張志新(化名)和其女友劉穎瓤的,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吞歼,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡圈膏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了篙骡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稽坤。...
    茶點(diǎn)故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖糯俗,靈堂內(nèi)的尸體忽然破棺而出尿褪,到底是詐尸還是另有隱情,我是刑警寧澤得湘,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布杖玲,位于F島的核電站,受9級特大地震影響淘正,放射性物質(zhì)發(fā)生泄漏摆马。R本人自食惡果不足惜臼闻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望囤采。 院中可真熱鬧述呐,春花似錦、人聲如沸斑唬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恕刘。三九已至缤谎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間褐着,已是汗流浹背坷澡。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留含蓉,地道東北人频敛。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像馅扣,于是被迫代替她去往敵國和親斟赚。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容