spark sql可以從很多數(shù)據(jù)源中讀寫數(shù)據(jù), 比較常用的是json文件和可使用jdbc協(xié)議的數(shù)據(jù)庫.
訪問json數(shù)據(jù)
官方文檔: https://spark.apache.org/docs/latest/sql-data-sources-json.html
注意: json文件的每一行必須是一個json對象
從json加載數(shù)據(jù)
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
或者
Dataset<Row> people = spark.read().format("json").load("examples/src/main/resources/people.json");
寫入數(shù)據(jù)到j(luò)son
people.write().json("examples/src/main/resources/people.json");
或者
people.write().format("json").save("examples/src/main/resources/people.json");
基于jdbc訪問數(shù)據(jù)庫
官方文檔: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
示例:
//讀取數(shù)據(jù)
Dataset<Row> df = sparkSession.read()
.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.22:3306/user?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai")
.option("user", "root")
.option("password", "123456")
.option("dbtable","task_info")
.load();
df.show();
//寫入數(shù)據(jù), 事先創(chuàng)建好表
df.write()
.format("jdbc")
.option("url", "jdbc:mysql://192.168.1.22:3306/user?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai")
.option("dbtable", "task_info2")
.option("user", "root")
.option("password", "123456")
.option("truncate","true")
.mode(SaveMode.Overwrite)
.save();
支持的選項如下:
Property Name | Meaning |
---|---|
url | jdbc連接串, 例如:jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 要訪問的表名. 注意當在read path訪問的時候, 它可能是SQL FROM 后面的有效內(nèi)容(比如子查詢). 不允許同時使用dbtable 和query 選項. |
query | 將數(shù)據(jù)讀取到spark的查詢. 這個查詢會被圓括號括起來, 作為FROM后面的子查詢. Spark會為這個子查詢設(shè)置一個別名. 例如, spark以下面這種形式向JDBC數(shù)據(jù)源發(fā)送查詢SELECT <columns> FROM (<user_specified_query>) spark_gen_alias
|
driver | 連接url的JDBC驅(qū)動的類名 |
partitionColumn, lowerBound, upperBound | 這些選項必須全部設(shè)置或者全部不設(shè)置.另外, numPartitions 也要被一起設(shè)置.它們描述了從多個worker并行讀取時如何對表進行分區(qū).partitionColumn 必須是相關(guān)表中的數(shù)字宁否、日期或時間戳列.注意悄蕾,LowerBound和UpperBound只是用來決定分區(qū)的步幅习绢,而不是用來過濾表中的行.所以表中的所有行都將被分區(qū)并返回.此選項僅適用于讀取. |
numPartitions | 表讀寫中可用于并行的最大分區(qū)數(shù)芋肠。這也決定了并發(fā)JDBC連接的最大數(shù)量拨脉。如果要寫入的分區(qū)數(shù)超過此限制逊移,則在寫入之前通過調(diào)用coalesce(numpartitions)將其減少到此限制. |
queryTimeout | 0意味著沒有限制严蓖。在寫入的時候逻翁,此選項效果取決于JDBC驅(qū)動程序如何實現(xiàn)APIsetQueryTimeout 的饥努,例如,h2 JDBC驅(qū)動程序檢查每個查詢的超時八回,而不是整個JDBC批處理酷愧。默認值為0 |
fetchsize | JDBC每次往返行數(shù), 這對默認值比較小的JDBC驅(qū)動有性能提升(比如Oracle默認是10行).這個選項僅對讀取操作有效. |
batchsize | JDBC批處理大小, 決定每次往返寫入的行數(shù).這個選項僅對寫入有效, 默認值為1000. |
isolationLevel | 事務(wù)隔離級別,對當前連接有效.它可以是NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ或SERIALIZABLE缠诅,對應(yīng)于JDBC連接對象定義的標準事務(wù)隔離級別溶浴,默認為READ_UNCOMMITTED.此選項僅適用于寫入. |
sessionInitStatement | 會話初始化語句, 在會話打開時, 執(zhí)行讀取數(shù)據(jù)之前, 可以執(zhí)行自定義的SQL語句(或者PL/SQL語句塊). |
truncate | 這是一個與JDBC寫入相關(guān)的選項。啟用SaveMode.Overwrite時管引,此選項會導(dǎo)致spark清空現(xiàn)有表士败,而不是刪除并重新創(chuàng)建它。這樣可以更有效率汉匙,并防止刪除表元數(shù)據(jù)(如索引)拱烁。但是生蚁,在某些情況下,它將不起作用戏自,例如當新數(shù)據(jù)具有不同的模式時邦投。它默認為false。此選項僅適用于寫入操作擅笔。 |
cascadeTruncate | 這是一個與JDBC寫入相關(guān)的選項志衣。如果jdbc數(shù)據(jù)庫(PostgreSQL和Oracle)啟用并支持,則此選項允許執(zhí)行truncate table t cascade(對于postgresql猛们,執(zhí)行truncate table only t cascade以防止無意中清空子表)念脯。這將影響其他表,因此應(yīng)小心使用弯淘。此選項僅適用于寫入操作绿店。它默認為所討論的JDBC數(shù)據(jù)庫的默認級聯(lián)清空行為,在每個JDBCDialect中的isCascadeTruncate中指定庐橙。 |
createTableOptions | 這是一個與JDBC寫入相關(guān)的選項假勿。如果指定,則此選項允許在創(chuàng)建表時設(shè)置特定于數(shù)據(jù)庫的表和分區(qū)選項(例如态鳖,create table t(name string)engine=innodb).此選項僅適用于寫入操作. |
createTableColumnTypes | 創(chuàng)建表時要使用的數(shù)據(jù)庫列數(shù)據(jù)類型转培,而不是默認值。數(shù)據(jù)類型信息的格式應(yīng)與創(chuàng)建表列語法相同(例如:“name char(64)浆竭,comments varchar(1024)”)浸须。指定的類型應(yīng)為有效的Spark SQL數(shù)據(jù)類型。此選項僅適用于寫入邦泄。 |
customSchema | 用于從JDBC連接讀取數(shù)據(jù)的自定義模式删窒。例如,id DECIMAL(38, 0), name STRING 虎韵。還可以指定部分字段易稠,其他字段使用默認類型映射。例如包蓝,id DECIMAL(38, 0) 驶社。列名應(yīng)該與JDBC表的相應(yīng)列名相同。用戶可以指定Spark SQL的相應(yīng)數(shù)據(jù)類型测萎,而不是使用默認值亡电。此選項僅適用于讀取操作。 |
pushDownPredicate | 啟用或禁用謂詞下推到JDBC數(shù)據(jù)源的選項硅瞧。默認值為true份乒,在這種情況下,spark將盡可能將過濾器向下推送到JDBC數(shù)據(jù)源或辖。否則瘾英,如果設(shè)置為false,則不會將任何過濾器向下推送到JDBC數(shù)據(jù)源缺谴,因此所有過濾器都將由spark處理。當SPARK比JDBC數(shù)據(jù)源更快地執(zhí)行謂詞篩選時,謂詞下推通常被關(guān)閉。 |