最近部門(mén)正在使用Spark做ETL,在使用JDBC作為DataSource的時(shí)候遇到了一些坑爹的問(wèn)題缀蹄,本文主要分享一下我遇到的問(wèn)題和一些解決方案踊餐,當(dāng)然可能會(huì)有更好的解決方案暖哨,還請(qǐng)各位大佬在評(píng)論區(qū)給點(diǎn)意見(jiàn)医增。另外珍手,本文會(huì)涉及一些Spark的源碼分析真竖,我使用的版本是org.apache.spark:spark-sql_2.12:3.2.1
首先簡(jiǎn)單介紹一下Spark SQL讀寫(xiě)JDBC的基本操作和參數(shù)配置荐开。
SparkSQL讀寫(xiě)JDBC
Spark SQL支持通過(guò)JDBC直接讀取數(shù)據(jù)庫(kù)中的數(shù)據(jù),這個(gè)特性是基于JdbcRDD實(shí)現(xiàn)尖阔,返回值作為DataFrame返回或者注冊(cè)成Spark SQL的臨時(shí)表烘豹,用戶(hù)可以在數(shù)據(jù)源選項(xiàng)中配置JDBC相關(guān)的連接參數(shù),user和password一般是必須提供的參數(shù)诺祸,常用的參數(shù)列表如下:
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 | 作用范圍 |
---|---|---|
url | 數(shù)據(jù)庫(kù)連接jdbc url | read/write |
user | 數(shù)據(jù)庫(kù)登陸用戶(hù)名 | read/write |
password | 數(shù)據(jù)庫(kù)登陸密碼 | read/write |
dbtable | 需要讀取或者寫(xiě)入的JDBC表。注意不能同時(shí)配置dbtable和query祭芦。 | read/write |
query | query用于指定from后面的子查詢(xún)筷笨,拼接成的sql如下:SELECT FROM () spark_gen_alias 。<br />注意dbtable和query不能同時(shí)使用龟劲;<br />不允許同時(shí)使用partitionColumn和query胃夏。 | read/write |
driver | jdbc驅(qū)動(dòng)driver | read/write |
partitionColumn, lowerBound, upperBound | 指定時(shí)這三項(xiàng)需要同時(shí)存在,描述了worker如何并行讀取數(shù)據(jù)庫(kù)昌跌。<br />其中partitionColumn必須是數(shù)字仰禀、date、timestamp蚕愤。<br />lowerBound和upperBound只是決定了分區(qū)的步長(zhǎng)答恶,而不會(huì)過(guò)濾數(shù)據(jù),因此表中所有的數(shù)據(jù)都會(huì)被分區(qū)返回 | read |
numPartitions | 讀寫(xiě)時(shí)的最大分區(qū)數(shù)萍诱。這也決定了連接JDBC的最大連接數(shù)悬嗓,如果并行度超過(guò)該配置,將會(huì)使用coalesce(partition)來(lái)降低并行度裕坊。 | read/write |
queryTimeout | driver執(zhí)行statement的等待時(shí)間包竹,0意味著沒(méi)有限制。<br />寫(xiě)入的時(shí)候這個(gè)選項(xiàng)依賴(lài)于底層是如何實(shí)現(xiàn)setQueryTimeout的 | read/write |
fetchsize | fetch的大小籍凝,決定了每一個(gè)fetch周瞎,拉取多少數(shù)據(jù)量。 | read |
batchsize | batch大小饵蒂,決定插入時(shí)的并發(fā)大小声诸,默認(rèn)1000。 | write |
isolationLevel | 事務(wù)隔離的等級(jí)苹享,作用于當(dāng)前連接双絮,默認(rèn)是READ_UNCOMMITTED浴麻。<br />可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE。<br />依賴(lài)于底層jdbc提供的事務(wù)隔離囤攀。 | write |
truncate | 當(dāng)使用SaveMode.Overwrite時(shí)软免,該配置才會(huì)生效。<br />默認(rèn)是false焚挠,會(huì)先刪除表再創(chuàng)建表膏萧,會(huì)改變?cè)械谋斫Y(jié)構(gòu)。<br />如果是true蝌衔,則會(huì)直接執(zhí)行truncate table榛泛,但是由于各個(gè)DBMS的行為不同,使用它并不總是安全的噩斟,并不是所有的DBMS都支持truncate曹锨。 | write |
基礎(chǔ)代碼如下:
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
Q&A
簡(jiǎn)單介紹了一下SparkSQL讀寫(xiě)JDBC的操作,接下來(lái)介紹一下我在項(xiàng)目上遇到的一些問(wèn)題剃允。
- 我想利用數(shù)據(jù)庫(kù)索引沛简,能提高查詢(xún)速度,能過(guò)濾掉一些數(shù)據(jù)斥废,并不想全表查詢(xún)生成DataFrame椒楣,然后通過(guò)DataFrame操作過(guò)濾數(shù)據(jù)。(雖說(shuō)我也不知道性能能提高多少牡肉,有懂哥也可以幫忙解釋一下)
- dbtable和query應(yīng)該用什么捧灰?到底有什么樣的區(qū)別?
- 本來(lái)想指定
numPartitions
统锤,用來(lái)提高并行度毛俏,但是并不管用,通過(guò)源碼查看跪另,發(fā)現(xiàn)還是需要跟partitionColumn, lowerBound, upperBound
三個(gè)參數(shù)一起使用才會(huì)生效拧抖,但是并不是每個(gè)表都有符合分區(qū)的字段,比如查詢(xún)的字段都是字符串類(lèi)型免绿,那就只能通過(guò)調(diào)用DataFrame.repartition(numPartitions)
唧席,但是需要先把數(shù)據(jù)全部都查出來(lái)再進(jìn)行分區(qū)。 - 使用
DataFrame.write()
方法很暴力嘲驾,竟然會(huì)改變表的結(jié)構(gòu)淌哟?
Q1: 哪個(gè)查詢(xún)性能更好?
看下面的方法辽故。方式1是通過(guò)query中寫(xiě)sql語(yǔ)句帶著查詢(xún)的條件徒仓,并且字段是索引字段。方式2則是通過(guò)操作DataFrame來(lái)過(guò)濾的誊垢。這兩種方式哪種性能更好呢掉弛?
// 方式1
spark
.read()
.format("jdbc")
.option(
"url",
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("query", "select * from user where id > 10")
.load();
// 方式2
spark
.read()
.format("jdbc")
.option(
"url",
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("query", "select * from user where id > 10")
.load()
.where("id > 10")
除了性能症见,還有另外一個(gè)話(huà)題也可以說(shuō)道說(shuō)道。一般來(lái)說(shuō)傳給SQL的查詢(xún)參數(shù)是固定的殃饿,但是每次查詢(xún)的參數(shù)值是不固定的谋作。例如select * from user where birthday >= ? and birthday < ?
,根據(jù)生日的區(qū)間來(lái)查看用戶(hù)列表乎芳。
如果采用方式1遵蚜,有個(gè)很尷尬的問(wèn)題,參數(shù)值如何傳進(jìn)去呢奈惑?看Spark SQL關(guān)于JDBC DataSource的文檔發(fā)現(xiàn)并沒(méi)有關(guān)于這一塊的配置吭净,通過(guò)源碼查看也并未發(fā)現(xiàn)可以傳參數(shù)進(jìn)去的方式。
目前我能想到的方式有兩種肴甸。
第一種通過(guò)字符串替換的方式寂殉,生成對(duì)應(yīng)的SQL語(yǔ)句。舉例來(lái)說(shuō)原在,傳給main方法參數(shù)有
sql=select * from user where birthday >= ${startTime} and birthday < ${endTime}
params=[startTime: 1995-02-01, endTime: 2000-02-01]
然后通過(guò)字符串替換的方式不撑,將sql替換為select * from user where birthday >= '1995-02-01' and birthday < '2000-02-01'
但上述的方式有個(gè)致命的缺陷,就是SQL注入晤斩,那么如何解決SQL注入的問(wèn)題呢?
其實(shí)答案也很簡(jiǎn)單姆坚,通過(guò)prepareStatement
來(lái)set參數(shù)澳泵,那么就需要params里帶著參數(shù)的類(lèi)型,例如params=[{"name": "startTime", "value": '1995-02-01', "type": "Date"}, {"name": "endTime", "value": '2000-02-01', "type": "Date"}]
兼呵。那么可以通過(guò)prepareStatement.setDate
方法給參數(shù)賦值即可兔辅,最后通過(guò)prepareStatement.toString()
方法來(lái)獲取到預(yù)處理之后的SQL,這樣就能保證SQL不會(huì)被注入了击喂。
雖說(shuō)這個(gè)方法避免了SQL的注入维苔,但是prepareStatement.toString()
具體實(shí)現(xiàn)依賴(lài)各個(gè)數(shù)據(jù)庫(kù)提供的驅(qū)動(dòng)包,MySQL是會(huì)打印預(yù)處理之后的SQL懂昂,但是不能保證其他數(shù)據(jù)庫(kù)(例如Oracle)也會(huì)有相同的行為介时,這個(gè)需要對(duì)其他數(shù)據(jù)庫(kù)也要充分的調(diào)研。
第二種方法則是通過(guò)SparkSQL方式
spark.sql("set startTime = 1995-02-01");
spark.sql("set endTime = 2000-02-01");
spark
.read()
.format("jdcb")
.option(
"url",
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("query", "select * from user")
.load().createOrReplaceTempView("t1");
spark.sql("select * from t1 where birthday >= ${startTime} and birthday < ${endTime}")
Spark SQL這種set params的方式凌彬,盲猜應(yīng)該也是直接替換字符串的方式沸柔。
至于這兩種方式孰優(yōu)孰劣,個(gè)人覺(jué)得還是得測(cè)試一下性能才能確定铲敛,雖說(shuō)看上去Spark SQL的方式更加通用一下褐澎,但是缺點(diǎn)還是需要獲取到全表數(shù)據(jù),一旦數(shù)據(jù)量很大伐蒋,會(huì)很影響性能的工三,也會(huì)喪失數(shù)據(jù)庫(kù)本身索引的優(yōu)勢(shì)迁酸。
當(dāng)然這只是自己的一廂情愿,其實(shí)我還沒(méi)具體測(cè)試過(guò)俭正。奸鬓。。
Q2: dbtable和query到底有什么區(qū)別
根據(jù)官方文檔上的說(shuō)明段审,dbtable和query不能同時(shí)使用全蝶。
dbtable可以填寫(xiě)表名,也可以是一個(gè)sql語(yǔ)句作為子查詢(xún)寺枉。
query可以填寫(xiě)sql語(yǔ)句抑淫,但不可以填寫(xiě)表名。
option("dbtable", "user"); // 正確示例
option("dbtable", "(select * from user) as subQuery"); // 正確示例
option("dbtable", "select * from user"); // 這個(gè)是錯(cuò)誤案例
option("query", "select * from user"); // 正確示例
option("query", "user"); // 錯(cuò)誤示例
option("query", "(select * from user) as subQuery"); // 錯(cuò)誤示例
看正反示例代碼也可以發(fā)現(xiàn)姥闪,dbtable和query是沖突的始苇,所以不能同時(shí)使用。
再來(lái)看看源代碼筐喳,也能充分說(shuō)明dbtable和query的區(qū)別
// 摘錄自org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
// 這里判斷了dbtable和query是不是都存在催式,如果都存在,拋出異常
case (Some(name), Some(subquery)) =>
throw QueryExecutionErrors.cannotSpecifyBothJdbcTableNameAndQueryError(
JDBC_TABLE_NAME, JDBC_QUERY_STRING)
// 這里判斷了dbtable和query是不是都不存在避归,如果都不存在荣月,拋出異常
case (None, None) =>
throw QueryExecutionErrors.missingJdbcTableNameAndQueryError(
JDBC_TABLE_NAME, JDBC_QUERY_STRING)
// 如果只有dbtable并且不為空直接返回
case (Some(name), None) =>
if (name.isEmpty) {
throw QueryExecutionErrors.emptyOptionError(JDBC_TABLE_NAME)
} else {
name.trim
}
// 如果是query,則需要做一層拼接梳毙,(subquery) SPARK_GEN_SUBQ_id
case (None, Some(subquery)) =>
if (subquery.isEmpty) {
throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
} else {
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
}
}
// 使用tableOrQuery的地方哺窄,org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#compute
// select * from dbtable
// SELECT * FROM (subquery) SPARK_GEN_SUBQ_id
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" +
s" $getGroupByClause"
除了配置上的區(qū)別,還有一個(gè)區(qū)別就是如果指定了partitionColumn账锹,lowerBound萌业,upperBound,則必須使用dbtable奸柬,不能使用query生年。這一點(diǎn)在官方文檔里有說(shuō)明,源代碼里也有體現(xiàn)廓奕。
// 摘錄自 org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined),
s"""
|Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together.
|Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify
|the partition columns using the supplied subquery alias to resolve any ambiguity.
|Example :
|spark.read.format("jdbc")
| .option("url", jdbcUrl)
| .option("dbtable", "(select c1, c2 from t1) as subq")
| .option("partitionColumn", "c1")
| .option("lowerBound", "1")
| .option("upperBound", "100")
| .option("numPartitions", "3")
| .load()
""".stripMargin
)
所以如果指定了partitionColumn就必須使用dbtable抱婉。不過(guò),我沒(méi)整明白桌粉,dbtable和query的實(shí)際作用一樣授段,最后都是select語(yǔ)句,為啥partitionColumn就必須使用dbtable番甩?不知道是不是spark對(duì)這一塊是否還有其他看法侵贵。
Q3: numPartitions, partitionColumn, lowerBound, upperBound四個(gè)參數(shù)之間的關(guān)系到底是什么?
-
numPartitions
:讀缘薛、寫(xiě)的最大分區(qū)數(shù)窍育,也決定了開(kāi)啟數(shù)據(jù)庫(kù)連接的數(shù)目卡睦。注意最大兩個(gè)字,也就是說(shuō)你指定了32個(gè)分區(qū)漱抓,它也不一定就真的分32個(gè)分區(qū)了表锻。比如:在讀的時(shí)候,即便指定了numPartitions
為任何大于1的值乞娄,如果沒(méi)有指定分區(qū)規(guī)則瞬逊,就只有一個(gè)task
去執(zhí)行查詢(xún)。 -
partitionColumn, lowerBound, upperBound
:指定讀數(shù)據(jù)時(shí)的分區(qū)規(guī)則仪或。要使用這三個(gè)參數(shù)确镊,必須定義numPartitions
,而且這三個(gè)參數(shù)不能單獨(dú)出現(xiàn)范删,要用就必須全部指定蕾域。而且lowerBound, upperBound
不是過(guò)濾條件,只是用于決定分區(qū)跨度到旦。
spark
.read()
.format("jdbc")
.option(
"url",
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "(select * from user) as subQuery")
.option("numPartitions", "10")
.load().rdd().getNumPartitions(); // 結(jié)果為1
spark
.read()
.format("jdbc")
.option(
"url",
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.cj.jdbc.Driver")
// .option("query", "select * from user")
.option("dbtable", "(select * from user) as subQuery")
.option("partitionColumn", "id")
.option("lowerBound", "1")
.option("upperBound", "50")
.option("numPartitions", "10")
.load().rdd().getNumPartitions(); // 結(jié)果為10
對(duì)于numPartitions
的使用有我感到疑惑的地方旨巷,官方文檔也沒(méi)有說(shuō)明的很清楚,如果說(shuō)指定了numPartitions
但是不指定分區(qū)規(guī)則添忘,這個(gè)參數(shù)相當(dāng)于沒(méi)用采呐,如果需要指定分區(qū)規(guī)則就需要用到partitionColumn, lowerBound, upperBound
這三個(gè)字段,官網(wǎng)在介紹numPartitions
并沒(méi)有說(shuō)明一定要這三個(gè)字段才生效搁骑,不知道有沒(méi)有懂哥知道其他指定分區(qū)的方法懈万。
所以我扒了一下源碼,淺析了一下靶病。首先先看看JDBCOptions里的描述
// 這里就表明了partitionColumn,lowerBound口予,upperBound娄周,numPartitions必須都要存在,分區(qū)才能生效
require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) ||
(partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined &&
numPartitions.isDefined),
s"When reading JDBC data sources, users need to specify all or none for the following " +
s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
s"and '$JDBC_NUM_PARTITIONS'")
具體使用的邏輯在org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#columnPartition
def columnPartition(
schema: StructType,
resolver: Resolver,
timeZoneId: String,
jdbcOptions: JDBCOptions): Array[Partition] = {
val partitioning = {
import JDBCOptions._
val partitionColumn = jdbcOptions.partitionColumn
val lowerBound = jdbcOptions.lowerBound
val upperBound = jdbcOptions.upperBound
val numPartitions = jdbcOptions.numPartitions
// 如果partitionColumn沒(méi)有指定沪停,直接返回null
if (partitionColumn.isEmpty) {
assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not " +
s"specified, '$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
null
} else {
// 如果partitionColumn指定了煤辨,lowerBound,upperBound木张,numPartitions也必須指定
assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
s"'$JDBC_NUM_PARTITIONS' are also required")
// verifyAndGetNormalizedPartitionColumn會(huì)判斷 column存不存在以及類(lèi)型是否滿(mǎn)足Numeric,Date,Timestamp
val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
schema, partitionColumn.get, resolver, jdbcOptions)
// toInternalBoundValue 會(huì)做數(shù)據(jù)轉(zhuǎn)換
val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
JDBCPartitioningInfo(
column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
}
}
// 如果partitioning為null或者numPartitions<=1或者lowerBound==upperBound众辨,表明不需要分區(qū)。
// 這里也就表明了如果partitionColumn不指定舷礼,即使指定了numPartitions也是無(wú)用的鹃彻。不知道這算不算官網(wǎng)文檔的一個(gè)bug?
if (partitioning == null || partitioning.numPartitions <= 1 ||
partitioning.lowerBound == partitioning.upperBound) {
return Array[Partition](JDBCPartition(null, 0))
}
val lowerBound = partitioning.lowerBound
val upperBound = partitioning.upperBound
require (lowerBound <= upperBound,
"Operation not allowed: the lower bound of partitioning column is larger than the upper " +
s"bound. Lower bound: $lowerBound; Upper bound: $upperBound")
val boundValueToString: Long => String =
toBoundValueInWhereClause(_, partitioning.columnType, timeZoneId)
// 從這里可以看出upperBound-lowerBound < numPartitions妻献,那么區(qū)數(shù)其實(shí)是upperBound-lowerBound
val numPartitions =
if ((upperBound - lowerBound) >= partitioning.numPartitions || /* check for overflow */
(upperBound - lowerBound) < 0) {
partitioning.numPartitions
} else {
// 省略一些代碼
upperBound - lowerBound
}
val upperStride = (upperBound / BigDecimal(numPartitions))
.setScale(18, RoundingMode.HALF_EVEN)
val lowerStride = (lowerBound / BigDecimal(numPartitions))
.setScale(18, RoundingMode.HALF_EVEN)
val preciseStride = upperStride - lowerStride
// 分區(qū)跨度
val stride = preciseStride.toLong
val lostNumOfStrides = (preciseStride - stride) * numPartitions / stride
val lowerBoundWithStrideAlignment = lowerBound +
((lostNumOfStrides / 2) * stride).setScale(0, RoundingMode.HALF_UP).toLong
var i: Int = 0
val column = partitioning.column
var currentValue = lowerBoundWithStrideAlignment
// 用于存儲(chǔ)分區(qū)where條件
val ans = new ArrayBuffer[Partition]()
// 開(kāi)始構(gòu)建分區(qū)sql where條件
while (i < numPartitions) {
val lBoundValue = boundValueToString(currentValue)
// 構(gòu)造分區(qū)下界條件語(yǔ)句蛛株,若是第一個(gè)分區(qū)(partition0)团赁,下界條件為null
val lBound = if (i != 0) s"$column >= $lBoundValue" else null
currentValue += stride
val uBoundValue = boundValueToString(currentValue)
// 構(gòu)造分區(qū)上界條件語(yǔ)句,若是最后一個(gè)分區(qū)谨履,上界條件為null
val uBound = if (i != numPartitions - 1) s"$column < $uBoundValue" else null
val whereClause =
if (uBound == null) {
lBound
} else if (lBound == null) {
s"$uBound or $column is null"
} else {
s"$lBound AND $uBound"
}
ans += JDBCPartition(whereClause, i)
i = i + 1
}
val partitions = ans.toArray
logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " +
partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
partitions
}
小總結(jié)一下欢摄,實(shí)際上numPartitions, partitionColumn, lowerBound, upperBound
這四個(gè)參數(shù),如果設(shè)置了笋粟,則4個(gè)都需要設(shè)置怀挠,如果只設(shè)置了numPartitions
是無(wú)效的,原因源碼里有說(shuō)明害捕。
那如果dbtable里并沒(méi)有滿(mǎn)足可以分區(qū)的字段绿淋,比如都是String類(lèi)型的字段,那該如何分區(qū)呢吨艇?其實(shí)躬它,當(dāng)時(shí)剛看到numPartitions
我的第一想法是如果沒(méi)有指定partitionColumn
,Spark會(huì)根據(jù)數(shù)據(jù)庫(kù)分頁(yè)的方式來(lái)做分區(qū)东涡,雖說(shuō)最后調(diào)研的結(jié)果看起來(lái)是我想多了冯吓,但這確實(shí)也給我這個(gè)問(wèn)題的答案提供了思路,大致思路如下:
// 需要先執(zhí)行select count(*) from <dbtable>
int count = 1000;
int numPartitions = 10;
int stride = count / numPartitions;
SparkSession spark =
SparkSession.builder().appName("local-test").master("local[2]").getOrCreate();
String[] predicates = new String[numPartitions];
// 拼接where條件疮跑,這里除了分頁(yè)组贺,也可以是其他可以分區(qū)的條件。
for (int i = 0; i < numPartitions; i++) {
predicates[i] = String.format("1 = 1 limit %d, %d", i * stride, stride);
}
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "root");
properties.put("driver", "com.mysql.cj.jdbc.Driver");
Dataset<Row> df =
spark
.read()
.jdbc(
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true",
"user",
predicates,
properties);
System.out.println(df.rdd().getNumPartitions()); // 10
Q4: Spark SQL對(duì)于jdbc write方法很暴力祖娘,竟然會(huì)改變表的結(jié)構(gòu)失尖?
@Test
public void testJDBCWriter() {
SparkSession spark =
SparkSession.builder().appName("local-test").master("local[2]").getOrCreate();
StructType structType =
new StructType(
new StructField[] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
new StructField("createTime", DataTypes.TimestampType, false, Metadata.empty())
});
Dataset<Row> df =
spark
.read()
.option("header", "true")
.schema(structType)
.format("csv")
.load(SparkSQLDataSourceDemo.class.getClassLoader().getResource("user.csv").getPath());
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "root");
properties.put("driver", "com.mysql.cj.jdbc.Driver");
df.write()
.jdbc(
"jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true",
"user",
properties);
}
如果user表沒(méi)有創(chuàng)建,dataframe jdbc write就會(huì)自己根據(jù)schema和表名自己在jdbc里創(chuàng)建表渐苏。
如果user表已經(jīng)存在掀潮,默認(rèn)的SaveMode.ErrorIfExists
如果表已經(jīng)存在,會(huì)報(bào)Table or view 'user' already exists.
的錯(cuò)誤琼富。
如果user表已經(jīng)存在仪吧,SaveMode
選擇Append
,則會(huì)追加到表里鞠眉,但是如果配置了主鍵或者唯一約束薯鼠,相同的數(shù)據(jù)會(huì)報(bào)錯(cuò)。
如果user表已經(jīng)存在械蹋,SaveMode
選擇Overwrite
出皇,并且truncate
為false
。會(huì)先刪除表再重建表哗戈,會(huì)改變?cè)缺斫Y(jié)構(gòu)郊艘,例如原表的表結(jié)構(gòu)里有主鍵,索引,或者某字段類(lèi)型(比如varchar)暇仲,經(jīng)過(guò)先刪除表再重建表的操作步做,原來(lái)的主鍵,索引已經(jīng)沒(méi)有了奈附,字段的類(lèi)型也被改變了(比如varchar變成了text)全度。
源代碼如下:
// 摘錄自:org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val conn = JdbcUtils.createConnectionFactory(options)()
try {
val tableExists = JdbcUtils.tableExists(conn, options)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
// In this case, we should truncate table and then load.
truncateTable(conn, options)
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
dropTable(conn, options.table, options)
createTable(conn, options.table, df.schema, isCaseSensitive, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
case SaveMode.Append =>
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
case SaveMode.ErrorIfExists =>
throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
createTable(conn, options.table, df.schema, isCaseSensitive, options)
saveTable(df, Some(df.schema), isCaseSensitive, options)
}
} finally {
conn.close()
}
createRelation(sqlContext, parameters)
}
那如果我們想更好的實(shí)現(xiàn)Overwrite,那應(yīng)該怎么實(shí)現(xiàn)呢斥滤?我的解決方案是使用Dataframe.foreachPartition()
方法實(shí)現(xiàn)将鸵,實(shí)現(xiàn)思路如下:
df.foreachPartition(
partition -> {
Connection connection = null;
PreparedStatement ps = null;
try {
Class.forName(driverClassName);
connection = DriverManager.getConnection(url, username, password);
// sql可以根據(jù)是Append還是Overwrite來(lái)提供
ps = connection.prepareStatement(sql); //(1)
connection.setAutoCommit(false);
connection.setTransactionIsolation(transactionLevel);
int count = 0;
while (partition.hasNext()) {
Row row = partition.next();
// 根據(jù)Column類(lèi)型來(lái)setParamete。
setParameter(ps, row);
count++;
if (count < batchSize) {
ps.addBatch();
} else {
ps.executeBatch();
connection.commit();
count = 0;
}
}
if (count != 0) {
ps.executeBatch();
connection.commit();
}
} catch (Exception e) {
if (connection != null) {
connection.rollback();
}
} finally {
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
}
});
對(duì)上述代碼(1)處的說(shuō)明如下:
sql可以根據(jù)寫(xiě)入模式是Append還是Overwrite來(lái)生成
如果是Append佑颇,則可以提供insert語(yǔ)句
如果是Overwrite顶掉,如果是MySQL,可以提供Replace語(yǔ)句也可以提供insert on duplicate key update語(yǔ)句挑胸,但是其他的DBMS痒筒,需要看看有沒(méi)有其他語(yǔ)句能夠達(dá)成這樣的功效。
對(duì)于第三點(diǎn)茬贵,是依賴(lài)主鍵或者其他唯一約束的簿透,如果表里沒(méi)有主鍵或者其他數(shù)據(jù)庫(kù)沒(méi)有類(lèi)似MySQL那種insert on duplicate key update語(yǔ)句,也可以自定義唯一的條件語(yǔ)句解藻,通過(guò)update來(lái)實(shí)現(xiàn)老充,如果update返回0,則執(zhí)行insert螟左。
PreparedStatement.executeUpdate()
會(huì)返回執(zhí)行成功的條數(shù)啡浊,根據(jù)這個(gè)來(lái)判斷即可。
總結(jié)
本篇文章主要講述了Spark SQL讀寫(xiě)JDBC一些細(xì)節(jié)胶背,通過(guò)一些案例來(lái)講述巷嚣,總結(jié)如下:
- Spark SQL讀寫(xiě)JDBC,一些參數(shù)是必須的钳吟,例如
url, driver, user, password
廷粒。 - 討論了一下如何替換sql參數(shù),避免sql注入砸抛。主要還是需要通過(guò)
PreparedStatement.setXXX
方式實(shí)現(xiàn),缺點(diǎn)就是PreparedStatement.toString()
方法依賴(lài)各個(gè)驅(qū)動(dòng)包的實(shí)現(xiàn)树枫。也可以通過(guò)Spark SQL set語(yǔ)法實(shí)現(xiàn)直焙,但主要問(wèn)題是這么做就需要全表查詢(xún),封裝成DataFrame再操作砂轻,如果數(shù)據(jù)量很大奔誓,會(huì)很消耗內(nèi)存,如果能夠利用數(shù)據(jù)庫(kù)查詢(xún)語(yǔ)句,不僅能夠過(guò)濾出符合條件的數(shù)據(jù)厨喂,也能利用數(shù)據(jù)庫(kù)索引的優(yōu)勢(shì)提高查詢(xún)效率和措。雖說(shuō)這是我一廂情愿的想法,還有待驗(yàn)證蜕煌。 - dbtable和query的區(qū)別就是dbtable可以填寫(xiě)表名例如user派阱,也可以是一個(gè)sql語(yǔ)句作為子查詢(xún),例如(select * from user) as tmp斜纪,query僅能填寫(xiě)填寫(xiě)sql語(yǔ)句贫母,例如select * from user。如果配置了partitionColumn盒刚,那就只能使用dbtable腺劣。
-
numPartitions, partitionColumn, lowerBound, upperBound
這四個(gè)參數(shù),如果設(shè)置了因块,則4個(gè)都需要設(shè)置橘原,如果只設(shè)置了numPartitions
是無(wú)效的。 - 如果只想指定
numPartitions
涡上,又想分區(qū)趾断,怎么辦?可以調(diào)用spark.read().jdbc("url", "tablename",predicates,properties)
方法吓懈,自己實(shí)現(xiàn)一個(gè)作為分區(qū)的predicates即可歼冰。 - 對(duì)于write jdbc而言,原生spark sql的方式還是比較暴力耻警,且不安全隔嫡,如果不指定truncate,則會(huì)刪除表再重建甘穿,這樣會(huì)改變?cè)瓉?lái)的表結(jié)構(gòu)腮恩。truncate還依賴(lài)各個(gè)數(shù)據(jù)庫(kù)的行為,不一定所有數(shù)據(jù)庫(kù)都支持truncate温兼。
- 個(gè)人覺(jué)得write jdbc的最佳實(shí)踐還是通過(guò)
DataFrame.foreachPartition()
方法實(shí)現(xiàn)秸滴,不管寫(xiě)入模式是Append還是Overwrite,都可以自己控制邏輯募判。