本內(nèi)容主要來自當(dāng)前Spark最新版2.1.0的官方文檔sql-programming-guide,以及一些其他閱讀時搜索找到的相關(guān)輔助資料蟀给。
在所有工作開始前蝙砌,也就是在官網(wǎng)文檔中也沒有介紹的就是,在pom文件里面添加spark-sql的依賴跋理。不知為何官網(wǎng)沒有把這個寫進去择克。不過google下就知道了。這里建議去Spark Project SQL這個maven官網(wǎng)中去找當(dāng)前最新的依賴添加到pom中前普。
Overview
Spark SQL是Spark的一個模塊祠饺,用于結(jié)構(gòu)化的數(shù)據(jù)處理。不像基礎(chǔ)的RDD的API汁政,Spark SQL提供了更多關(guān)于數(shù)據(jù)結(jié)構(gòu)和計算的信息道偷。Spark SQL使用了這些額外的信息來執(zhí)行額外的優(yōu)化。有多種方式和Spark SQL進行交互:包括SQL以及Dataset的API记劈。不管使用的是哪種API或者哪種語言勺鸦,最終的執(zhí)行本質(zhì)上是一樣的。這種統(tǒng)一性意味著開發(fā)者可以輕松切換并使用最喜歡的方式來表示一個指定的Transformation目木。
SQL
通過SQL查詢語句來使用Spark SQL是一種大家喜聞樂見的使用方式换途。而且Spark SQL還可以用來在已經(jīng)存在的Hive裝置中讀取數(shù)據(jù),后面會有介紹如何配置使用這一功能刽射。在其他編程語言中使用SQL時军拟,這條語句的返回值是Dataset/DataFrame類型的。同時你可以直接在cmd中與SQL接口交互誓禁,或者通過JDBC/ODBC(下面也有對其的介紹)懈息。
Datasets和DataFrames
這里有兩個重要的概念Datasets和Dataframes。(其實當(dāng)你在網(wǎng)上搜索這兩個名詞時你會看到很多舊版本的解釋摹恰,這多少可以幫助你理解辫继,不過最終你還是要看最新版本的使用怒见。只有真正理解了這兩個概念才算是稍微懂了一些Spark SQL吧)
Dataset是一個分布式的數(shù)據(jù)收集器。這是在Spark1.6之后新加的一個接口姑宽,兼顧了RDD的優(yōu)點(強類型遣耍,可以使用功能強大的lambda)以及Spark SQL的執(zhí)行器高效性的優(yōu)點。Dataset可以創(chuàng)建為一個JVM的對象炮车,并使用Transformation功能(map,flatMap,filter, etc.)舵变。注意:Dataset這個API只在Scala和Java中有。雖然Python中不支持這個API瘦穆,但是根據(jù)Python語言的動態(tài)特性棋傍,其實這個API的很多好處Python已經(jīng)具有了(比如row.columnName就可以訪問數(shù)據(jù))。在R語言中也類似难审。
其實Dataset主要由兩部分構(gòu)成瘫拣,一個是具體的數(shù)據(jù)結(jié)構(gòu)類對象,另一個就是和這個類對應(yīng)的Encoder(啥是Encoder下面會有介紹告喊,也可以google其他相關(guān)資料再看看)麸拄。
DataFrame是一個organized into named columns的Dataset。它在概念上與關(guān)系型數(shù)據(jù)庫的table或者R/Python語言的DataFrame相似,不過被底層平臺優(yōu)化了。DataFrame可以從廣泛的數(shù)據(jù)源中構(gòu)成如捅,比如:結(jié)構(gòu)化的數(shù)據(jù)文件、Hive的table淮椰、外部數(shù)據(jù)庫和RDD。DataFrame在Scala纳寂、Java主穗、Python和R中都支持,在Scala中為Dataset[Row]毙芜,在Java中為Dataset<Row>忽媒。
其實如果想要再進一步了解DataFrame,就必須要了解Row的定義腋粥。其中有一個非常重要的屬性:schema晦雨,后面也會有進一步的介紹。
“從代碼中來看隘冲,DataFrame更像是Dataset的一種特殊情況闹瞧。事實上是這樣嗎?展辞。奥邮。”
Getting Started
要先有一個SparkSession
所有Spark功能的入口是類SparkSession纵竖,創(chuàng)建一個基礎(chǔ)的SparkSession使用SparkSession.builder()方法:
創(chuàng)建DataFrame
有了SparkSession漠烧,application就可以從RDD、Hive table或者其他Spark數(shù)據(jù)源(Data Sources)中創(chuàng)建出一個DataFrame
//從一個json文件中讀取數(shù)據(jù)生成一個DataFrame
Dataset<Row>?df=spark.read().json("/home/paul/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json");
df.show();
df.printSchema();
無類型的Dataset操作(也稱為DataFrame操作)
正如上面提到的靡砌,DataFrame在Spark2.0之后的Sala和Java中以Dataset of RowS方式存在已脓。所以DataFrame的操作被稱為無類型的操作,與其他Dataset的強類型操作形成對比通殃。下面列舉幾個簡單的操作:
df.printSchema();
df.select("name").show();
df.select(col("name"),col("age").plus(1)).show();
df.filter(col("age").gt(21)).show();
df.groupBy("age").count().show();
完整的DataFrame操作類型列表API:Class Dataset<T>
代碼中運行SQL語句
SparkSession的方法sql允許application在代碼中運行SQL語句度液,并得到Dataset<Row>類型的返回值。
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people");
DatasetsqlDF=spark.sql("SELECT * FROM people");
Global Temporary View
上面使用的是一個在Session生命周期中的臨時views在Spark SQL中画舌。如果你想擁有一個臨時的view不過可以在不同的Session中共享堕担,而且在application的運行周期內(nèi)可用,那么就需要創(chuàng)建一個全局的臨時view曲聂。并記得使用的時候加上global_temp作為前綴霹购,因為全局的臨時view是綁定到系統(tǒng)保留的數(shù)據(jù)庫global_temp上。
// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database 注意people前面的global_temp
global_temp`spark.sql("SELECT * FROM global_temp.people").show();
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
創(chuàng)建Datasets
Datasets和RDDs比較類似朋腋,不同的地方在與RDD是使用Java serialization或者Kryo實現(xiàn)序列化和反序列化齐疙,而Datasets是使用Encoder來實現(xiàn)對象的序列化并在網(wǎng)絡(luò)中傳輸。Encoder的動態(tài)特性使得Spark可以在執(zhí)行filtering旭咽、sorting和hashing等許多操作時無需把字節(jié)反序列化為對象贞奋。
創(chuàng)建一個用戶自定義類的Encoder需要使用Java的beans,比如之前介紹中的代碼:Encoder<Person>?personEncoder=Encoders.bean(Person.class);
除了上面在講Dataset時的一個創(chuàng)建Person類的Dataset的代碼實例穷绵,下面再介紹下基礎(chǔ)類型的以及從json文件中讀取的情況轿塔。
從上面的代碼可以看出來,Dataset<Person>和DataFame(也就是Dataset<Row>)的區(qū)別就是DF后面沒有as(類的Encoder)
轉(zhuǎn)變RDD為DataFrame
Spark SQL支持兩種不同的方法將RDD轉(zhuǎn)換為DataFrame仲墨。第一種方法是根據(jù)RDD對象的具體類型映射(Reflection)推導(dǎo)出schema勾缭,這種映射為基礎(chǔ)的方法可以讓代碼非常簡潔,不過前提是在寫application的時候已經(jīng)知道schema的內(nèi)容目养。
第二種方法是通過顯式的程序代碼構(gòu)造schema漫拭,然后將這個schema應(yīng)用到RDD上最后轉(zhuǎn)化為Dataset。雖然這種方法會讓代碼變得比較復(fù)雜混稽,但是它能在不知道Dataset的列名稱及其類型的時候使用采驻,也就是在代碼運行時讀出列的數(shù)據(jù)和類型。
第一種方法匈勋,使用Reflection推導(dǎo)出Schema
其實這個Reflection就是Spark SQL支持的JavaBean自動將RDD轉(zhuǎn)換為DataFrame礼旅。通過Reflection獲得到的BeanInfo定義為table的schema。下面是一段代碼示例:
獲取到DataFrame之后對于列的索引可以通過index和name兩種方式
第二種方法:代碼顯式的構(gòu)造Schema
當(dāng)JavaBean無法根據(jù)類的具體內(nèi)容提前定義出DataFrame時(比如:數(shù)據(jù)記錄的結(jié)構(gòu)對于不同的用戶有不同的理解和使用)痘系,需要以下三個步驟創(chuàng)建出DataFrame。
1饿自、創(chuàng)建一個RDD<Row>類型的RDD
2汰翠、創(chuàng)建類型為StructType的schema龄坪,與Step1中的RDD結(jié)構(gòu)相匹配
3、通過SparkSession的方法createDataFrame將schema應(yīng)用到RDD上產(chǎn)生DataFrame
數(shù)據(jù)源Data Sources
Spark SQL通過DataFrame接口支持多種數(shù)據(jù)源复唤。DataFrame可以使用相關(guān)的transformation操作以及用于產(chǎn)生臨時的view健田。下面將展示一些與數(shù)據(jù)源相關(guān)的常用方法。
通用的Load/Save函數(shù)
最簡單的方式佛纫,默認的數(shù)據(jù)源(parquet妓局,除非有另外的配置spark.sql.sources.default)被用于所有的操作。
下面代碼展示了從parquet文件讀取生成DataFrame呈宇,保存為parquet文件以及直接對文件使用SQL語句的方法好爬。
Save模式
Save操作可以選擇多種SaveMode,來指定對于已經(jīng)存在的文件做如何處理甥啄。而且需要特別明白的是這些save模式并沒有實現(xiàn)任何鎖機制而且也不是原子操作存炮。下面是具體的:
在代碼中加入SaveMode也很簡單。如下所示:
Save成Persistent Tables
DataFrame可以保存成persistent table到Hive的metastore通過代碼saveAsTable。而且不需要存在Hive的部署迎变,因為Spark會創(chuàng)建一個默認的本地Hive metastore(使用Derby)充尉。并且不像createOrReplaceTempView,saveAsTable實現(xiàn)了DataFrame的內(nèi)容而且創(chuàng)建了一個指針指向Hive metastore的數(shù)據(jù)衣形。只要application一直保持對這個metastore的使用驼侠,那么這個persistent table就會一直存在,即使Spark程序已經(jīng)重啟了谆吴。SparkSession需要使用方法table來給DataFrame使用的persistent table命名倒源。
默認的saveAsTable會創(chuàng)建“managed table”,意味著本地的數(shù)據(jù)將會被metastore控制句狼,當(dāng)table被drop的時候Managed table會自動delete相關(guān)數(shù)據(jù)笋熬。
Parquet文件
Parquet是一個列格式而且用于多個數(shù)據(jù)處理系統(tǒng)中。Spark SQL提供支持對于Parquet文件的讀寫腻菇,也就是自動保存原始數(shù)據(jù)的schema胳螟。當(dāng)讀Parquet文件時,所有的列被自動轉(zhuǎn)化為nullable因為兼容性的緣故筹吐。
下面是對Parquet文件的一段代碼處理:
Partition Discovery
Table partitioning是一個通用的優(yōu)化方法在很多系統(tǒng)中使用糖耸,比如Hive。在一個partitioned table中丘薛,數(shù)據(jù)根據(jù)partitioning列不同的取值嘉竟,通常被保存到多個不同的包含列取值名字的目錄中。Parquet數(shù)據(jù)源現(xiàn)在可以發(fā)現(xiàn)并自動推導(dǎo)partitioning信息。下面是一個例子舍扰,新增加兩個partitioning列g(shù)ender和country到原先的partitioned table中倦蚪。
然后通過傳入path/to/table到SparkSession.read.parquet或者SparkSession.read.load,Spark SQL可以自動從paths提取出partitioning信息边苹。比如對于上面的那個例子陵且,現(xiàn)在DataFrame的schema變?yōu)椋?/p>
另外需要注意兩點:1、對于partitioning列的自動推導(dǎo)包含數(shù)據(jù)類型的推導(dǎo)勾给,目前支持數(shù)值型的類型以及字符串型的類型滩报」可以通過配置spark.sql.sources.partitionColumnTypeInference.enabled來選擇這個自動類型推導(dǎo)的開關(guān)播急。默認是true,如果將其關(guān)閉那么所有類型認為是string類型售睹;2桩警、對于傳入的paths參數(shù),對于上面這個例子推薦使用path/to/table/gender=male的父目錄也就是path/to/table/昌妹,不然gender不會被認為是一個partitioning列捶枢。
Schema合并
類似ProtocolBuffer、Avro以及Thrift飞崖,Parquet也支持Schema的演進烂叔。用戶可以從一個簡單的schema開始逐漸加入其它需要的列。在這種情況下固歪,用戶需要手動合并多個不同但兼容的Parquet文件蒜鸡。Parquet數(shù)據(jù)源目前已經(jīng)可以自動檢測并合并這些文件。
不過因為schema合并是一個相當(dāng)昂貴的操作并且不是在所有的情況中都那么必需牢裳,所以Spark在1.5.0之后就關(guān)閉了自動合并逢防。可以通過下面兩張方法手動配置打開蒲讯。
1忘朝、設(shè)置數(shù)據(jù)源選項mergeSchema為true,當(dāng)reading Parquet文件時判帮,或者
2局嘁、設(shè)置全局SQL選項spark.sql.parquet.mergeSchema為true
下面代碼是一個例子,創(chuàng)建了一個Square類型的DataFrame和一個Cube類型的DataFrame晦墙。在保存為Parquet文件時特意配置了一個列值選項导狡。在各自都保存好后,再從他們的父目錄中讀取偎痛,可以看到生成一個新的合并的DataFrame:
Hive metastore Parquet table轉(zhuǎn)換
當(dāng)讀寫到Hive metastore Parquet table時,Spark SQL將會使用自己的Parquet而不是Hive的SerDes為了更好的性能。當(dāng)然可以通過配置spark.sql.hive.convertMetastoreParquet來控制開關(guān)枚赡,默認是打開的氓癌。
Hive/Parquet Schema解沖突
有兩個關(guān)鍵的不同在Hive和Parquet對table生成schema的處理中:
1、Hive是不區(qū)分大小寫的贫橙,但是Parquet區(qū)分
2贪婉、Hive認為所有的列是nullable,在Parquet中這只是列的一個特性卢肃。
基于上面的兩點不同疲迂,我們必須解決Hive metastore schema和Parquet schema的沖突在轉(zhuǎn)換Hive metastore Parquet table到Spark SQL Parquet table時。解決沖突的規(guī)則如下:
Metadata更新
當(dāng)有外部的Hive metastore對table操作時莫湘,可以在代碼中手動刷新來保持一致尤蒿。
Parquet配置
下面對于Parquet的配置通過使用SparkSession的方法setConf或者通過在SQL語句中運行SET key=value
JSON Datasets
Spark SQL可以自動推導(dǎo)出schema從JSON數(shù)據(jù)集中,并保存為Dataset<Row>幅垮。這個轉(zhuǎn)換可以從RDD<String>或者JSON文件中使用SparkSession.read().json()完成腰池。下面代碼是這兩種轉(zhuǎn)換的例子:
Hive Table
Spark SQL同樣支持讀寫存儲在Apache Hive的數(shù)據(jù)呵萨。然而因為Hive需要巨大的依賴,所以這些依賴沒有包含在目前默認的Spark發(fā)布版本中(這也就意味著不安裝Hive就肯定無法完成下面的代碼示例)囱皿。如果Hive的依賴可以在classpath中找到跑杭,Spark會自動加載。需要注意的是這些Hive的依賴也必須出現(xiàn)在所有的worker節(jié)點上爹橱,因為為了訪問Hive存儲的數(shù)據(jù)窄做,他們也必須使用Hive的serialization和deserialization庫(SerDes)
配置Hive就是將hive-site.xml、core-site.xml(安全配置)和hdfs-site.xml(HDFS配置)放到$SPARK_HOME下的conf/中组砚。
代碼1 從Hive數(shù)據(jù)文件中讀取:
代碼2 將已有的DataFrame與Hive產(chǎn)生的JOIN
和不同版本的Hive Metastore交互
Spark SQL的Hive支持功能中一個最重要的點就是和Hive metastore交互今野,這使得Spark SQL可以訪問Hive table的matadata条霜。從Spark 1.4.0起一個單獨的Spark SQL庫可以被用于查詢不同版本的Hive metastore啃匿,具體的配置信息如下表所示蛔外。 Note that independent of the version of Hive that is being used to talk to the metastore, internally Spark SQL will compile against Hive 1.2.1 and use those classes for internal execution (serdes, UDFs, UDAFs, etc).
JDBC到其他數(shù)據(jù)庫
Spark SQL同樣支持通過JDBC讀取其他數(shù)據(jù)庫的數(shù)據(jù)作為數(shù)據(jù)源裆悄。在函數(shù)中推薦使用jdbcRDD臂聋,這是因為作為結(jié)果返回一個DataFrame可以方便的在Spark SQL中處理并且與其他數(shù)據(jù)源合并。具體的內(nèi)容見官網(wǎng)文檔:JDBC To Other Databases
附上官網(wǎng)的代碼:
性能調(diào)優(yōu)
性能調(diào)優(yōu)主要是將數(shù)據(jù)放入內(nèi)存中操作艾君。通過spark.cacheTable("tableName")或者dataFrame.cache()肄方。使用spark.uncacheTable("tableName")來從內(nèi)存中去除table冰垄。
其他配置選項(不過不怎么推薦手動修改虹茶,可能在后續(xù)版本自動的自適應(yīng)修改):
分布式的SQL引擎
Spark SQL同樣可以通過使用它的JDBC/ODBC或者command-line接口作為分布式的查詢引擎。在這種模式下步清,終端用戶和應(yīng)用可以通過SQL查詢與Spark SQL直接交互而不需要其他額外的代碼要门。
Running the Thrift JDBC/ODBC server
內(nèi)容見spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server
Running the Spark SQL CLI
內(nèi)容見spark.apache.org/docs/latest/sql-programming-guide.html#running-the-spark-sql-cli