翻譯: https://www.cloudera.com/documentation/enterprise/latest/topics/spark_sparksql.html
版本: 5.14.2
Spark SQL允許您使用SQL或使用DataFrame API查詢Spark程序內(nèi)的結(jié)構(gòu)化數(shù)據(jù)。
有關(guān)Spark SQL的詳細(xì)信息,請(qǐng)參閱Spark SQL和DataFrame指南为流。
繼續(xù)閱讀:
- SQLContext和HiveContext
- 將文件查詢到DataFrame中
- Spark SQL示例
- 確保HiveContext實(shí)現(xiàn)安全訪問
- 與Hive Views交互
- Spark SQL DROP TABLE PURGE的性能和存儲(chǔ)注意事項(xiàng)
SQLContext和HiveContext
所有Spark SQL功能的入口點(diǎn)是 SQLContext 類或其后代之一烫映。你創(chuàng)建一個(gè) SQLContext 從一個(gè) SparkContext 。使用SQLContext 兢交,您可以從RDD薪捍,Hive表或數(shù)據(jù)源創(chuàng)建DataFrame。
要在Spark應(yīng)用程序中使用存儲(chǔ)在Hive或Impala表中的數(shù)據(jù)配喳,請(qǐng)構(gòu)建一個(gè) HiveContext 酪穿,它繼承自SQLContext 。使用 HiveContext 晴裹,您可以訪問Hive或Impala表等代表的Metastore數(shù)據(jù)庫被济。
注意:
Hive和Impala表和相關(guān)的SQL語法在大多數(shù)方面是可以互換的。因?yàn)镾park使用底層Hive基礎(chǔ)架構(gòu)息拜,所以使用Spark SQL您可以使用HiveQL語法編寫DDL語句溉潭,DML語句和查詢。對(duì)于交互式查詢性能少欺,可以通過Impala使用impala-shell或Impala JDBC和ODBC接口訪問相同的表喳瓣。
如果你使用 spark-shell , 一個(gè) HiveContext 已經(jīng)為你創(chuàng)建赞别,并作為 sqlContext 變量畏陕。
如果你使用 spark-submit ,在程序開始時(shí)使用如下代碼:
Python:
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName = "test")
sqlContext = HiveContext(sc)
Spark應(yīng)用程序提交的主機(jī) 或 spark-shell or pyspark 運(yùn)行的主機(jī)仿滔,運(yùn)行必須具有在Cloudera Manager 中定義的Hive 網(wǎng)關(guān)角色以及部署的客戶端配置惠毁。
當(dāng)Spark作業(yè)訪問Hive視圖時(shí)犹芹,Spark必須有權(quán)讀取底層Hive表中的數(shù)據(jù)文件。目前鞠绰,Spark不能使用基于列或者where子句腰埂。如果Spark沒有底層數(shù)據(jù)文件所需的權(quán)限,則針對(duì)視圖的SparkSQL查詢將返回空結(jié)果集蜈膨,而不是錯(cuò)誤屿笼。
將文件查詢到DataFrame中
如果數(shù)據(jù)文件位于Hive或Impala表之外,則可以使用SQL將JSON或Parquet文件直接讀取到DataFrame中:
- JSON:
df = sqlContext.sql("SELECT * FROM json.`input dir`")
- Parquet:
df = sqlContext.sql("SELECT * FROM parquet.`input dir`")
請(qǐng)參閱在文件上運(yùn)行SQL翁巍。
Spark SQL示例
這個(gè)例子演示了如何使用sqlContext.sql 創(chuàng)建并加載兩個(gè)表驴一,并從表中選擇兩行到兩個(gè)DataFrame。接下來的步驟使用DataFrame API從其中一個(gè)表中過濾大于150,000的工資灶壶,并顯示結(jié)果DataFrame肝断。然后將這兩個(gè)DataFrame加入來創(chuàng)建第三個(gè)DataFrame。最后驰凛,新的DataFrame被保存到一個(gè)Hive表胸懈。
- 在命令行中,將Hue sample_07和sample_08 CSV文件復(fù)制到HDFS:
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_07.csv /user/hdfs
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_08.csv /user/hdfs
其中 HUE_HOME 默認(rèn)/opt/cloudera/parcels/CDH/lib/hue (包裹安裝)或 /usr/lib/hue(軟件包安裝)洒嗤。
- 開始 spark-shell:: spark-shell:
- 創(chuàng)建Hive表sample_07和sample_08:
scala> sqlContext.sql("CREATE TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
scala> sqlContext.sql("CREATE TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
- 在Beeline中箫荡,顯示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
| tab_name |
+------------+--+
| sample_07 |
| sample_08 |
+------------+--+
- 將CSV文件中的數(shù)據(jù)加載到表中:
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_07.csv' OVERWRITE INTO TABLE sample_07")
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_08.csv' OVERWRITE INTO TABLE sample_08")
- 創(chuàng)建包含sample_07和sample_08表格內(nèi)容的DataFrame:
scala> val df_07 = sqlContext.sql("SELECT * from sample_07")
scala> val df_08 = sqlContext.sql("SELECT * from sample_08")
- 顯示薪水大于150,000的df_07中的所有行:
scala> df_07.filter(df_07("salary") > 150000).show()
輸出應(yīng)該是:
+-------+--------------------+---------+------+
| code| description|total_emp|salary|
+-------+--------------------+---------+------+
|11-1011| Chief executives| 299160|151370|
|29-1022|Oral and maxillof...| 5040|178440|
|29-1023| Orthodontists| 5350|185340|
|29-1024| Prosthodontists| 380|169360|
|29-1061| Anesthesiologists| 31030|192780|
|29-1062|Family and genera...| 113250|153640|
|29-1063| Internists, general| 46260|167270|
|29-1064|Obstetricians and...| 21340|183600|
|29-1067| Surgeons| 50260|191410|
|29-1069|Physicians and su...| 237400|155150|
+-------+--------------------+---------+------+
- 通過加入df_07和df_08創(chuàng)建DataFrame df_09,僅保留 code and description 列渔隶。
scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description"))
scala> df_09.show()
新的DataFrame如下所示:
+-------+--------------------+
| code| description|
+-------+--------------------+
|00-0000| All Occupations|
|11-0000|Management occupa...|
|11-1011| Chief executives|
|11-1021|General and opera...|
|11-1031| Legislators|
|11-2011|Advertising and p...|
|11-2021| Marketing managers|
|11-2022| Sales managers|
|11-2031|Public relations ...|
|11-3011|Administrative se...|
|11-3021|Computer and info...|
|11-3031| Financial managers|
|11-3041|Compensation and ...|
|11-3042|Training and deve...|
|11-3049|Human resources m...|
|11-3051|Industrial produc...|
|11-3061| Purchasing managers|
|11-3071|Transportation, s...|
|11-9011|Farm, ranch, and ...|
|11-9012|Farmers and ranchers|
+-------+--------------------+
- 將DataFrame df_09另存為Hive表sample_09:
scala> df_09.write.saveAsTable("sample_09")
- 在Beeline中羔挡,顯示Hive表:
[0: jdbc:hive2://hostname.com:> show tables;
+------------+--+
| tab_name |
+------------+--+
| sample_07 |
| sample_08 |
| sample_09 |
+------------+--+
Python中的等效程序,您可以使用spark-submit提交间唉, 將會(huì):
from pyspark import SparkContext, SparkConf, HiveContext
if __name__ == "__main__":
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Data Frame Join")
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df_07 = sqlContext.sql("SELECT * from sample_07")
df_07.filter(df_07.salary > 150000).show()
df_08 = sqlContext.sql("SELECT * from sample_08")
tbls = sqlContext.sql("show tables")
tbls.show()
df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description)
df_09.show()
df_09.write.saveAsTable("sample_09")
tbls = sqlContext.sql("show tables")
tbls.show()
而不是使用直線來顯示表格 , show tables 查詢使用Spark SQL API運(yùn)行绞灼。
確保HiveContext實(shí)現(xiàn)安全訪問
為了保證 HiveContext 強(qiáng)制ACL,按照同步HDFS ACL和Sentry權(quán)限中所述啟用HDFS-Sentry插件呈野。HDFS-Sentry插件不支持從Spark SQL進(jìn)行訪問的列級(jí)訪問控制低矮。
與Hive Views交互
當(dāng)Spark作業(yè)訪問Hive視圖時(shí),Spark必須有權(quán)讀取底層Hive表中的數(shù)據(jù)文件被冒。目前军掂,Spark不能使用基于列或者where子句。如果Spark沒有底層數(shù)據(jù)文件所需的權(quán)限昨悼,則針對(duì)視圖的SparkSQL查詢將返回空結(jié)果集蝗锥,而不是錯(cuò)誤。
Spark SQL DROP TABLE PURGE的性能和存儲(chǔ)注意事項(xiàng)
Hive中的 DROP TABLE 語句的 PURGE 子句 會(huì)立即刪除底層數(shù)據(jù)文件率触,而不會(huì)將其傳輸?shù)脚R時(shí)存儲(chǔ)區(qū)域(HDFS垃圾箱)中终议。
雖然 PURGE 子句被Spark SQL DROP TABLE 語句識(shí)別,此子句當(dāng)前不會(huì)傳遞給在后臺(tái)執(zhí)行“drop table”操作的Hive語句。所以穴张,如果你知道PURGE 行為對(duì)于性能细燎,存儲(chǔ)或安全性原因 非常重要,請(qǐng)執(zhí)行此操作DROP TABLE 直接在Hive中皂甘,例如通過beeline shell玻驻,而不是通過Spark SQL。
即時(shí)刪除方面 偿枕,PURGE 在以下情況下可能很重要:
如果群集的存儲(chǔ)空間不足击狮,并且立即釋放空間非常重要,而不是等待HDFS垃圾箱定期清空益老。
如果底層數(shù)據(jù)文件駐留在Amazon S3文件系統(tǒng)上。將文件從S3移動(dòng)到HDFS垃圾箱涉及物理復(fù)制文件寸莫,即默認(rèn)設(shè)置DROP TABLE S3上的行為涉及顯著的性能開銷捺萌。
如果底層數(shù)據(jù)文件包含敏感信息,并且將其完全刪除非常重要膘茎,而不是通過定期清空垃圾箱來清除它們桃纯。
如果對(duì)HDFS加密區(qū)域的限制阻止將文件移動(dòng)到HDFS垃圾箱。此限制主要適用于CDH 5.7及更低版本披坏。在CDH 5.8及更高版本中态坦,每個(gè)HDFS加密區(qū)都有自己的HDFS垃圾桶,因此不帶PURGE 子句的 DROP TABLE 行為正常棒拂。