SparkSQL讀寫(xiě)jdbc一些鮮為人知的事兒

最近部門(mén)正在使用Spark做ETL,在使用JDBC作為DataSource的時(shí)候遇到了一些坑爹的問(wèn)題缀蹄,本文主要分享一下我遇到的問(wèn)題和一些解決方案踊餐,當(dāng)然可能會(huì)有更好的解決方案暖哨,還請(qǐng)各位大佬在評(píng)論區(qū)給點(diǎn)意見(jiàn)医增。另外珍手,本文會(huì)涉及一些Spark的源碼分析真竖,我使用的版本是org.apache.spark:spark-sql_2.12:3.2.1

首先簡(jiǎn)單介紹一下Spark SQL讀寫(xiě)JDBC的基本操作和參數(shù)配置荐开。

SparkSQL讀寫(xiě)JDBC

Spark SQL支持通過(guò)JDBC直接讀取數(shù)據(jù)庫(kù)中的數(shù)據(jù),這個(gè)特性是基于JdbcRDD實(shí)現(xiàn)尖阔,返回值作為DataFrame返回或者注冊(cè)成Spark SQL的臨時(shí)表烘豹,用戶(hù)可以在數(shù)據(jù)源選項(xiàng)中配置JDBC相關(guān)的連接參數(shù),user和password一般是必須提供的參數(shù)诺祸,常用的參數(shù)列表如下:

參數(shù)名稱(chēng) 參數(shù)說(shuō)明 作用范圍
url 數(shù)據(jù)庫(kù)連接jdbc url read/write
user 數(shù)據(jù)庫(kù)登陸用戶(hù)名 read/write
password 數(shù)據(jù)庫(kù)登陸密碼 read/write
dbtable 需要讀取或者寫(xiě)入的JDBC表。注意不能同時(shí)配置dbtable和query祭芦。 read/write
query query用于指定from后面的子查詢(xún)筷笨,拼接成的sql如下:SELECT FROM () spark_gen_alias 。<br />注意dbtable和query不能同時(shí)使用龟劲;<br />不允許同時(shí)使用partitionColumn和query胃夏。 read/write
driver jdbc驅(qū)動(dòng)driver read/write
partitionColumn, lowerBound, upperBound 指定時(shí)這三項(xiàng)需要同時(shí)存在,描述了worker如何并行讀取數(shù)據(jù)庫(kù)昌跌。<br />其中partitionColumn必須是數(shù)字仰禀、date、timestamp蚕愤。<br />lowerBound和upperBound只是決定了分區(qū)的步長(zhǎng)答恶,而不會(huì)過(guò)濾數(shù)據(jù),因此表中所有的數(shù)據(jù)都會(huì)被分區(qū)返回 read
numPartitions 讀寫(xiě)時(shí)的最大分區(qū)數(shù)萍诱。這也決定了連接JDBC的最大連接數(shù)悬嗓,如果并行度超過(guò)該配置,將會(huì)使用coalesce(partition)來(lái)降低并行度裕坊。 read/write
queryTimeout driver執(zhí)行statement的等待時(shí)間包竹,0意味著沒(méi)有限制。<br />寫(xiě)入的時(shí)候這個(gè)選項(xiàng)依賴(lài)于底層是如何實(shí)現(xiàn)setQueryTimeout的 read/write
fetchsize fetch的大小籍凝,決定了每一個(gè)fetch周瞎,拉取多少數(shù)據(jù)量。 read
batchsize batch大小饵蒂,決定插入時(shí)的并發(fā)大小声诸,默認(rèn)1000。 write
isolationLevel 事務(wù)隔離的等級(jí)苹享,作用于當(dāng)前連接双絮,默認(rèn)是READ_UNCOMMITTED浴麻。<br />可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE。<br />依賴(lài)于底層jdbc提供的事務(wù)隔離囤攀。 write
truncate 當(dāng)使用SaveMode.Overwrite時(shí)软免,該配置才會(huì)生效。<br />默認(rèn)是false焚挠,會(huì)先刪除表再創(chuàng)建表膏萧,會(huì)改變?cè)械谋斫Y(jié)構(gòu)。<br />如果是true蝌衔,則會(huì)直接執(zhí)行truncate table榛泛,但是由于各個(gè)DBMS的行為不同,使用它并不總是安全的噩斟,并不是所有的DBMS都支持truncate曹锨。 write

基礎(chǔ)代碼如下:

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

Q&A

簡(jiǎn)單介紹了一下SparkSQL讀寫(xiě)JDBC的操作,接下來(lái)介紹一下我在項(xiàng)目上遇到的一些問(wèn)題剃允。

  1. 我想利用數(shù)據(jù)庫(kù)索引沛简,能提高查詢(xún)速度,能過(guò)濾掉一些數(shù)據(jù)斥废,并不想全表查詢(xún)生成DataFrame椒楣,然后通過(guò)DataFrame操作過(guò)濾數(shù)據(jù)。(雖說(shuō)我也不知道性能能提高多少牡肉,有懂哥也可以幫忙解釋一下)
  2. dbtable和query應(yīng)該用什么捧灰?到底有什么樣的區(qū)別?
  3. 本來(lái)想指定numPartitions统锤,用來(lái)提高并行度毛俏,但是并不管用,通過(guò)源碼查看跪另,發(fā)現(xiàn)還是需要跟partitionColumn, lowerBound, upperBound三個(gè)參數(shù)一起使用才會(huì)生效拧抖,但是并不是每個(gè)表都有符合分區(qū)的字段,比如查詢(xún)的字段都是字符串類(lèi)型免绿,那就只能通過(guò)調(diào)用DataFrame.repartition(numPartitions)唧席,但是需要先把數(shù)據(jù)全部都查出來(lái)再進(jìn)行分區(qū)。
  4. 使用DataFrame.write()方法很暴力嘲驾,竟然會(huì)改變表的結(jié)構(gòu)淌哟?

Q1: 哪個(gè)查詢(xún)性能更好?

看下面的方法辽故。方式1是通過(guò)query中寫(xiě)sql語(yǔ)句帶著查詢(xún)的條件徒仓,并且字段是索引字段。方式2則是通過(guò)操作DataFrame來(lái)過(guò)濾的誊垢。這兩種方式哪種性能更好呢掉弛?

// 方式1
spark
  .read()
  .format("jdbc")
  .option(
  "url",
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
  .option("user", "root")
  .option("password", "root")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("query", "select * from user where id > 10")
  .load();

// 方式2
spark
  .read()
  .format("jdbc")
  .option(
  "url",
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
  .option("user", "root")
  .option("password", "root")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("query", "select * from user where id > 10")
  .load()
  .where("id > 10")

除了性能症见,還有另外一個(gè)話(huà)題也可以說(shuō)道說(shuō)道。一般來(lái)說(shuō)傳給SQL的查詢(xún)參數(shù)是固定的殃饿,但是每次查詢(xún)的參數(shù)值是不固定的谋作。例如select * from user where birthday >= ? and birthday < ?,根據(jù)生日的區(qū)間來(lái)查看用戶(hù)列表乎芳。

如果采用方式1遵蚜,有個(gè)很尷尬的問(wèn)題,參數(shù)值如何傳進(jìn)去呢奈惑?看Spark SQL關(guān)于JDBC DataSource的文檔發(fā)現(xiàn)并沒(méi)有關(guān)于這一塊的配置吭净,通過(guò)源碼查看也并未發(fā)現(xiàn)可以傳參數(shù)進(jìn)去的方式。

目前我能想到的方式有兩種肴甸。

第一種通過(guò)字符串替換的方式寂殉,生成對(duì)應(yīng)的SQL語(yǔ)句。舉例來(lái)說(shuō)原在,傳給main方法參數(shù)有

  • sql=select * from user where birthday >= ${startTime} and birthday < ${endTime}
  • params=[startTime: 1995-02-01, endTime: 2000-02-01]

然后通過(guò)字符串替換的方式不撑,將sql替換為select * from user where birthday >= '1995-02-01' and birthday < '2000-02-01'

但上述的方式有個(gè)致命的缺陷,就是SQL注入晤斩,那么如何解決SQL注入的問(wèn)題呢?

其實(shí)答案也很簡(jiǎn)單姆坚,通過(guò)prepareStatement來(lái)set參數(shù)澳泵,那么就需要params里帶著參數(shù)的類(lèi)型,例如params=[{"name": "startTime", "value": '1995-02-01', "type": "Date"}, {"name": "endTime", "value": '2000-02-01', "type": "Date"}]兼呵。那么可以通過(guò)prepareStatement.setDate方法給參數(shù)賦值即可兔辅,最后通過(guò)prepareStatement.toString()方法來(lái)獲取到預(yù)處理之后的SQL,這樣就能保證SQL不會(huì)被注入了击喂。

雖說(shuō)這個(gè)方法避免了SQL的注入维苔,但是prepareStatement.toString()具體實(shí)現(xiàn)依賴(lài)各個(gè)數(shù)據(jù)庫(kù)提供的驅(qū)動(dòng)包,MySQL是會(huì)打印預(yù)處理之后的SQL懂昂,但是不能保證其他數(shù)據(jù)庫(kù)(例如Oracle)也會(huì)有相同的行為介时,這個(gè)需要對(duì)其他數(shù)據(jù)庫(kù)也要充分的調(diào)研。

第二種方法則是通過(guò)SparkSQL方式

spark.sql("set startTime = 1995-02-01");
spark.sql("set endTime = 2000-02-01");
spark
  .read()
  .format("jdcb")
  .option(
  "url",
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
  .option("user", "root")
  .option("password", "root")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("query", "select * from user")
  .load().createOrReplaceTempView("t1");
spark.sql("select * from t1 where birthday >= ${startTime} and birthday < ${endTime}")

Spark SQL這種set params的方式凌彬,盲猜應(yīng)該也是直接替換字符串的方式沸柔。

至于這兩種方式孰優(yōu)孰劣,個(gè)人覺(jué)得還是得測(cè)試一下性能才能確定铲敛,雖說(shuō)看上去Spark SQL的方式更加通用一下褐澎,但是缺點(diǎn)還是需要獲取到全表數(shù)據(jù),一旦數(shù)據(jù)量很大伐蒋,會(huì)很影響性能的工三,也會(huì)喪失數(shù)據(jù)庫(kù)本身索引的優(yōu)勢(shì)迁酸。

當(dāng)然這只是自己的一廂情愿,其實(shí)我還沒(méi)具體測(cè)試過(guò)俭正。奸鬓。。

Q2: dbtable和query到底有什么區(qū)別

根據(jù)官方文檔上的說(shuō)明段审,dbtable和query不能同時(shí)使用全蝶。

dbtable可以填寫(xiě)表名,也可以是一個(gè)sql語(yǔ)句作為子查詢(xún)寺枉。

query可以填寫(xiě)sql語(yǔ)句抑淫,但不可以填寫(xiě)表名。

option("dbtable", "user"); // 正確示例
option("dbtable", "(select * from user) as subQuery"); // 正確示例
option("dbtable", "select * from user"); // 這個(gè)是錯(cuò)誤案例

option("query", "select * from user"); // 正確示例
option("query", "user"); // 錯(cuò)誤示例
option("query", "(select * from user) as subQuery"); // 錯(cuò)誤示例

看正反示例代碼也可以發(fā)現(xiàn)姥闪,dbtable和query是沖突的始苇,所以不能同時(shí)使用。

再來(lái)看看源代碼筐喳,也能充分說(shuō)明dbtable和query的區(qū)別

// 摘錄自org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
    // 這里判斷了dbtable和query是不是都存在催式,如果都存在,拋出異常
    case (Some(name), Some(subquery)) =>
      throw QueryExecutionErrors.cannotSpecifyBothJdbcTableNameAndQueryError(
        JDBC_TABLE_NAME, JDBC_QUERY_STRING)
    // 這里判斷了dbtable和query是不是都不存在避归,如果都不存在荣月,拋出異常
    case (None, None) =>
      throw QueryExecutionErrors.missingJdbcTableNameAndQueryError(
        JDBC_TABLE_NAME, JDBC_QUERY_STRING)
    // 如果只有dbtable并且不為空直接返回
    case (Some(name), None) =>
      if (name.isEmpty) {
        throw QueryExecutionErrors.emptyOptionError(JDBC_TABLE_NAME)
      } else {
        name.trim
      }
    // 如果是query,則需要做一層拼接梳毙,(subquery) SPARK_GEN_SUBQ_id
    case (None, Some(subquery)) =>
      if (subquery.isEmpty) {
        throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
      } else {
        s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
      }
  }

// 使用tableOrQuery的地方哺窄,org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#compute
// select * from dbtable
// SELECT * FROM (subquery) SPARK_GEN_SUBQ_id 
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" +
      s" $getGroupByClause"

除了配置上的區(qū)別,還有一個(gè)區(qū)別就是如果指定了partitionColumn账锹,lowerBound萌业,upperBound,則必須使用dbtable奸柬,不能使用query生年。這一點(diǎn)在官方文檔里有說(shuō)明,源代碼里也有體現(xiàn)廓奕。

// 摘錄自 org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined),
  s"""
     |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together.
     |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify
     |the partition columns using the supplied subquery alias to resolve any ambiguity.
     |Example :
     |spark.read.format("jdbc")
     |  .option("url", jdbcUrl)
     |  .option("dbtable", "(select c1, c2 from t1) as subq")
     |  .option("partitionColumn", "c1")
     |  .option("lowerBound", "1")
     |  .option("upperBound", "100")
     |  .option("numPartitions", "3")
     |  .load()
   """.stripMargin
)

所以如果指定了partitionColumn就必須使用dbtable抱婉。不過(guò),我沒(méi)整明白桌粉,dbtable和query的實(shí)際作用一樣授段,最后都是select語(yǔ)句,為啥partitionColumn就必須使用dbtable番甩?不知道是不是spark對(duì)這一塊是否還有其他看法侵贵。

Q3: numPartitions, partitionColumn, lowerBound, upperBound四個(gè)參數(shù)之間的關(guān)系到底是什么?

  1. numPartitions:讀缘薛、寫(xiě)的最大分區(qū)數(shù)窍育,也決定了開(kāi)啟數(shù)據(jù)庫(kù)連接的數(shù)目卡睦。注意最大兩個(gè)字,也就是說(shuō)你指定了32個(gè)分區(qū)漱抓,它也不一定就真的分32個(gè)分區(qū)了表锻。比如:在讀的時(shí)候,即便指定了numPartitions為任何大于1的值乞娄,如果沒(méi)有指定分區(qū)規(guī)則瞬逊,就只有一個(gè)task去執(zhí)行查詢(xún)。
  2. partitionColumn, lowerBound, upperBound:指定讀數(shù)據(jù)時(shí)的分區(qū)規(guī)則仪或。要使用這三個(gè)參數(shù)确镊,必須定義numPartitions,而且這三個(gè)參數(shù)不能單獨(dú)出現(xiàn)范删,要用就必須全部指定蕾域。而且lowerBound, upperBound不是過(guò)濾條件,只是用于決定分區(qū)跨度到旦。
spark
  .read()
  .format("jdbc")
  .option(
  "url",
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
  .option("user", "root")
  .option("password", "root")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("dbtable", "(select * from user) as subQuery")
  .option("numPartitions", "10")
  .load().rdd().getNumPartitions(); // 結(jié)果為1

spark
  .read()
  .format("jdbc")
  .option(
  "url",
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true")
  .option("user", "root")
  .option("password", "root")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  //            .option("query", "select * from user")
  .option("dbtable", "(select * from user) as subQuery")
  .option("partitionColumn", "id")
  .option("lowerBound", "1")
  .option("upperBound", "50")
  .option("numPartitions", "10")
  .load().rdd().getNumPartitions(); // 結(jié)果為10

對(duì)于numPartitions的使用有我感到疑惑的地方旨巷,官方文檔也沒(méi)有說(shuō)明的很清楚,如果說(shuō)指定了numPartitions但是不指定分區(qū)規(guī)則添忘,這個(gè)參數(shù)相當(dāng)于沒(méi)用采呐,如果需要指定分區(qū)規(guī)則就需要用到partitionColumn, lowerBound, upperBound這三個(gè)字段,官網(wǎng)在介紹numPartitions并沒(méi)有說(shuō)明一定要這三個(gè)字段才生效搁骑,不知道有沒(méi)有懂哥知道其他指定分區(qū)的方法懈万。

所以我扒了一下源碼,淺析了一下靶病。首先先看看JDBCOptions里的描述

// 這里就表明了partitionColumn,lowerBound口予,upperBound娄周,numPartitions必須都要存在,分區(qū)才能生效
require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) ||
  (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined &&
    numPartitions.isDefined),
  s"When reading JDBC data sources, users need to specify all or none for the following " +
    s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
    s"and '$JDBC_NUM_PARTITIONS'")

具體使用的邏輯在org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#columnPartition

def columnPartition(
    schema: StructType,
    resolver: Resolver,
    timeZoneId: String,
    jdbcOptions: JDBCOptions): Array[Partition] = {
  val partitioning = {
    import JDBCOptions._

    val partitionColumn = jdbcOptions.partitionColumn
    val lowerBound = jdbcOptions.lowerBound
    val upperBound = jdbcOptions.upperBound
    val numPartitions = jdbcOptions.numPartitions

    // 如果partitionColumn沒(méi)有指定沪停,直接返回null
    if (partitionColumn.isEmpty) {
      assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not " +
        s"specified, '$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
      null
    } else {
      // 如果partitionColumn指定了煤辨,lowerBound,upperBound木张,numPartitions也必須指定
      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
        s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
          s"'$JDBC_NUM_PARTITIONS' are also required")

      // verifyAndGetNormalizedPartitionColumn會(huì)判斷 column存不存在以及類(lèi)型是否滿(mǎn)足Numeric,Date,Timestamp
      val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
        schema, partitionColumn.get, resolver, jdbcOptions)
            // toInternalBoundValue 會(huì)做數(shù)據(jù)轉(zhuǎn)換
      val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
      val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
      JDBCPartitioningInfo(
        column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
    }
  }

  // 如果partitioning為null或者numPartitions<=1或者lowerBound==upperBound众辨,表明不需要分區(qū)。
  // 這里也就表明了如果partitionColumn不指定舷礼,即使指定了numPartitions也是無(wú)用的鹃彻。不知道這算不算官網(wǎng)文檔的一個(gè)bug?
  if (partitioning == null || partitioning.numPartitions <= 1 ||
    partitioning.lowerBound == partitioning.upperBound) {
    return Array[Partition](JDBCPartition(null, 0))
  }

  val lowerBound = partitioning.lowerBound
  val upperBound = partitioning.upperBound
  require (lowerBound <= upperBound,
    "Operation not allowed: the lower bound of partitioning column is larger than the upper " +
    s"bound. Lower bound: $lowerBound; Upper bound: $upperBound")

  val boundValueToString: Long => String =
    toBoundValueInWhereClause(_, partitioning.columnType, timeZoneId)
  
  // 從這里可以看出upperBound-lowerBound < numPartitions妻献,那么區(qū)數(shù)其實(shí)是upperBound-lowerBound
  val numPartitions =
    if ((upperBound - lowerBound) >= partitioning.numPartitions || /* check for overflow */
        (upperBound - lowerBound) < 0) {
      partitioning.numPartitions
    } else {
      // 省略一些代碼
      upperBound - lowerBound
    }

  val upperStride = (upperBound / BigDecimal(numPartitions))
    .setScale(18, RoundingMode.HALF_EVEN)
  val lowerStride = (lowerBound / BigDecimal(numPartitions))
    .setScale(18, RoundingMode.HALF_EVEN)

  val preciseStride = upperStride - lowerStride
  // 分區(qū)跨度
  val stride = preciseStride.toLong
  
  val lostNumOfStrides = (preciseStride - stride) * numPartitions / stride
  val lowerBoundWithStrideAlignment = lowerBound +
    ((lostNumOfStrides / 2) * stride).setScale(0, RoundingMode.HALF_UP).toLong

  var i: Int = 0
  val column = partitioning.column
  var currentValue = lowerBoundWithStrideAlignment
  // 用于存儲(chǔ)分區(qū)where條件
  val ans = new ArrayBuffer[Partition]()
  
  // 開(kāi)始構(gòu)建分區(qū)sql where條件
  while (i < numPartitions) {
    val lBoundValue = boundValueToString(currentValue)
    // 構(gòu)造分區(qū)下界條件語(yǔ)句蛛株,若是第一個(gè)分區(qū)(partition0)团赁,下界條件為null
    val lBound = if (i != 0) s"$column >= $lBoundValue" else null
    currentValue += stride
    val uBoundValue = boundValueToString(currentValue)
    // 構(gòu)造分區(qū)上界條件語(yǔ)句,若是最后一個(gè)分區(qū)谨履,上界條件為null
    val uBound = if (i != numPartitions - 1) s"$column < $uBoundValue" else null
    val whereClause =
      if (uBound == null) {
        lBound
      } else if (lBound == null) {
        s"$uBound or $column is null"
      } else {
        s"$lBound AND $uBound"
      }
    ans += JDBCPartition(whereClause, i)
    i = i + 1
  }
  val partitions = ans.toArray
  logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " +
    partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
  partitions
}

小總結(jié)一下欢摄,實(shí)際上numPartitions, partitionColumn, lowerBound, upperBound這四個(gè)參數(shù),如果設(shè)置了笋粟,則4個(gè)都需要設(shè)置怀挠,如果只設(shè)置了numPartitions是無(wú)效的,原因源碼里有說(shuō)明害捕。

那如果dbtable里并沒(méi)有滿(mǎn)足可以分區(qū)的字段绿淋,比如都是String類(lèi)型的字段,那該如何分區(qū)呢吨艇?其實(shí)躬它,當(dāng)時(shí)剛看到numPartitions我的第一想法是如果沒(méi)有指定partitionColumn,Spark會(huì)根據(jù)數(shù)據(jù)庫(kù)分頁(yè)的方式來(lái)做分區(qū)东涡,雖說(shuō)最后調(diào)研的結(jié)果看起來(lái)是我想多了冯吓,但這確實(shí)也給我這個(gè)問(wèn)題的答案提供了思路,大致思路如下:

// 需要先執(zhí)行select count(*) from <dbtable>
int count = 1000;
int numPartitions = 10;
int stride = count / numPartitions;
SparkSession spark =
  SparkSession.builder().appName("local-test").master("local[2]").getOrCreate();
String[] predicates = new String[numPartitions];
// 拼接where條件疮跑,這里除了分頁(yè)组贺,也可以是其他可以分區(qū)的條件。
for (int i = 0; i < numPartitions; i++) {
  predicates[i] = String.format("1 = 1 limit %d, %d", i * stride, stride);
}
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "root");
properties.put("driver", "com.mysql.cj.jdbc.Driver");
Dataset<Row> df =
  spark
  .read()
  .jdbc(
  "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true",
  "user",
  predicates,
  properties);
System.out.println(df.rdd().getNumPartitions()); // 10

Q4: Spark SQL對(duì)于jdbc write方法很暴力祖娘,竟然會(huì)改變表的結(jié)構(gòu)失尖?

@Test
public void testJDBCWriter() {
  SparkSession spark =
    SparkSession.builder().appName("local-test").master("local[2]").getOrCreate();
  StructType structType =
    new StructType(
    new StructField[] {
      new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
      new StructField("name", DataTypes.StringType, false, Metadata.empty()),
      new StructField("createTime", DataTypes.TimestampType, false, Metadata.empty())
    });
  Dataset<Row> df =
    spark
    .read()
    .option("header", "true")
    .schema(structType)
    .format("csv")
    .load(SparkSQLDataSourceDemo.class.getClassLoader().getResource("user.csv").getPath());
  Properties properties = new Properties();
  properties.put("user", "root");
  properties.put("password", "root");
  properties.put("driver", "com.mysql.cj.jdbc.Driver");
  df.write()
    .jdbc(
    "jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&serverTimezone=UTC&useSSL=true",
    "user",
    properties);
}

如果user表沒(méi)有創(chuàng)建,dataframe jdbc write就會(huì)自己根據(jù)schema和表名自己在jdbc里創(chuàng)建表渐苏。

如果user表已經(jīng)存在掀潮,默認(rèn)的SaveMode.ErrorIfExists如果表已經(jīng)存在,會(huì)報(bào)Table or view 'user' already exists.的錯(cuò)誤琼富。

如果user表已經(jīng)存在仪吧,SaveMode選擇Append,則會(huì)追加到表里鞠眉,但是如果配置了主鍵或者唯一約束薯鼠,相同的數(shù)據(jù)會(huì)報(bào)錯(cuò)。

如果user表已經(jīng)存在械蹋,SaveMode選擇Overwrite出皇,并且truncatefalse。會(huì)先刪除表再重建表哗戈,會(huì)改變?cè)缺斫Y(jié)構(gòu)郊艘,例如原表的表結(jié)構(gòu)里有主鍵,索引,或者某字段類(lèi)型(比如varchar)暇仲,經(jīng)過(guò)先刪除表再重建表的操作步做,原來(lái)的主鍵,索引已經(jīng)沒(méi)有了奈附,字段的類(lèi)型也被改變了(比如varchar變成了text)全度。

源代碼如下:

// 摘錄自:org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    val options = new JdbcOptionsInWrite(parameters)
    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis

    val conn = JdbcUtils.createConnectionFactory(options)()
    try {
      val tableExists = JdbcUtils.tableExists(conn, options)
      if (tableExists) {
        mode match {
          case SaveMode.Overwrite =>
            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
              // In this case, we should truncate table and then load.
              truncateTable(conn, options)
              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
              saveTable(df, tableSchema, isCaseSensitive, options)
            } else {
              // Otherwise, do not truncate the table, instead drop and recreate it
              dropTable(conn, options.table, options)
              createTable(conn, options.table, df.schema, isCaseSensitive, options)
              saveTable(df, Some(df.schema), isCaseSensitive, options)
            }

          case SaveMode.Append =>
            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
            saveTable(df, tableSchema, isCaseSensitive, options)

          case SaveMode.ErrorIfExists =>
            throw QueryCompilationErrors.tableOrViewAlreadyExistsError(options.table)

          case SaveMode.Ignore =>
            // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
            // to not save the contents of the DataFrame and to not change the existing data.
            // Therefore, it is okay to do nothing here and then just return the relation below.
        }
      } else {
        createTable(conn, options.table, df.schema, isCaseSensitive, options)
        saveTable(df, Some(df.schema), isCaseSensitive, options)
      }
    } finally {
      conn.close()
    }

    createRelation(sqlContext, parameters)
  }

那如果我們想更好的實(shí)現(xiàn)Overwrite,那應(yīng)該怎么實(shí)現(xiàn)呢斥滤?我的解決方案是使用Dataframe.foreachPartition()方法實(shí)現(xiàn)将鸵,實(shí)現(xiàn)思路如下:

df.foreachPartition(
        partition -> {
          Connection connection = null;
          PreparedStatement ps = null;
          try {
            Class.forName(driverClassName);
            connection = DriverManager.getConnection(url, username, password);
            // sql可以根據(jù)是Append還是Overwrite來(lái)提供
            ps = connection.prepareStatement(sql); //(1)
            connection.setAutoCommit(false);
            connection.setTransactionIsolation(transactionLevel);
            int count = 0;
            while (partition.hasNext()) {
              Row row = partition.next();
              // 根據(jù)Column類(lèi)型來(lái)setParamete。
              setParameter(ps, row);
              count++;
              if (count < batchSize) {
                ps.addBatch();
              } else {
                ps.executeBatch();
                connection.commit();
                count = 0;
              }
            }
            if (count != 0) {
              ps.executeBatch();
              connection.commit();
            }
          } catch (Exception e) {
            if (connection != null) {
              connection.rollback();
            }
          } finally {
            if (ps != null) {
              ps.close();
            }
            if (connection != null) {
              connection.close();
            }
          }
        });

對(duì)上述代碼(1)處的說(shuō)明如下:

  1. sql可以根據(jù)寫(xiě)入模式是Append還是Overwrite來(lái)生成

  2. 如果是Append佑颇,則可以提供insert語(yǔ)句

  3. 如果是Overwrite顶掉,如果是MySQL,可以提供Replace語(yǔ)句也可以提供insert on duplicate key update語(yǔ)句挑胸,但是其他的DBMS痒筒,需要看看有沒(méi)有其他語(yǔ)句能夠達(dá)成這樣的功效。

  4. 對(duì)于第三點(diǎn)茬贵,是依賴(lài)主鍵或者其他唯一約束的簿透,如果表里沒(méi)有主鍵或者其他數(shù)據(jù)庫(kù)沒(méi)有類(lèi)似MySQL那種insert on duplicate key update語(yǔ)句,也可以自定義唯一的條件語(yǔ)句解藻,通過(guò)update來(lái)實(shí)現(xiàn)老充,如果update返回0,則執(zhí)行insert螟左。PreparedStatement.executeUpdate()會(huì)返回執(zhí)行成功的條數(shù)啡浊,根據(jù)這個(gè)來(lái)判斷即可。

總結(jié)

本篇文章主要講述了Spark SQL讀寫(xiě)JDBC一些細(xì)節(jié)胶背,通過(guò)一些案例來(lái)講述巷嚣,總結(jié)如下:

  1. Spark SQL讀寫(xiě)JDBC,一些參數(shù)是必須的钳吟,例如url, driver, user, password廷粒。
  2. 討論了一下如何替換sql參數(shù),避免sql注入砸抛。主要還是需要通過(guò)PreparedStatement.setXXX方式實(shí)現(xiàn),缺點(diǎn)就是PreparedStatement.toString()方法依賴(lài)各個(gè)驅(qū)動(dòng)包的實(shí)現(xiàn)树枫。也可以通過(guò)Spark SQL set語(yǔ)法實(shí)現(xiàn)直焙,但主要問(wèn)題是這么做就需要全表查詢(xún),封裝成DataFrame再操作砂轻,如果數(shù)據(jù)量很大奔誓,會(huì)很消耗內(nèi)存,如果能夠利用數(shù)據(jù)庫(kù)查詢(xún)語(yǔ)句,不僅能夠過(guò)濾出符合條件的數(shù)據(jù)厨喂,也能利用數(shù)據(jù)庫(kù)索引的優(yōu)勢(shì)提高查詢(xún)效率和措。雖說(shuō)這是我一廂情愿的想法,還有待驗(yàn)證蜕煌。
  3. dbtable和query的區(qū)別就是dbtable可以填寫(xiě)表名例如user派阱,也可以是一個(gè)sql語(yǔ)句作為子查詢(xún),例如(select * from user) as tmp斜纪,query僅能填寫(xiě)填寫(xiě)sql語(yǔ)句贫母,例如select * from user。如果配置了partitionColumn盒刚,那就只能使用dbtable腺劣。
  4. numPartitions, partitionColumn, lowerBound, upperBound這四個(gè)參數(shù),如果設(shè)置了因块,則4個(gè)都需要設(shè)置橘原,如果只設(shè)置了numPartitions是無(wú)效的。
  5. 如果只想指定numPartitions涡上,又想分區(qū)趾断,怎么辦?可以調(diào)用spark.read().jdbc("url", "tablename",predicates,properties)方法吓懈,自己實(shí)現(xiàn)一個(gè)作為分區(qū)的predicates即可歼冰。
  6. 對(duì)于write jdbc而言,原生spark sql的方式還是比較暴力耻警,且不安全隔嫡,如果不指定truncate,則會(huì)刪除表再重建甘穿,這樣會(huì)改變?cè)瓉?lái)的表結(jié)構(gòu)腮恩。truncate還依賴(lài)各個(gè)數(shù)據(jù)庫(kù)的行為,不一定所有數(shù)據(jù)庫(kù)都支持truncate温兼。
  7. 個(gè)人覺(jué)得write jdbc的最佳實(shí)踐還是通過(guò)DataFrame.foreachPartition()方法實(shí)現(xiàn)秸滴,不管寫(xiě)入模式是Append還是Overwrite,都可以自己控制邏輯募判。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末荡含,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子届垫,更是在濱河造成了極大的恐慌释液,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件装处,死亡現(xiàn)場(chǎng)離奇詭異误债,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén)寝蹈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)李命,“玉大人,你說(shuō)我怎么就攤上這事箫老》庾郑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵槽惫,是天一觀的道長(zhǎng)周叮。 經(jīng)常有香客問(wèn)我,道長(zhǎng)界斜,這世上最難降的妖魔是什么仿耽? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮各薇,結(jié)果婚禮上项贺,老公的妹妹穿的比我還像新娘。我一直安慰自己峭判,他們只是感情好开缎,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著林螃,像睡著了一般奕删。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上疗认,一...
    開(kāi)封第一講書(shū)人閱讀 49,749評(píng)論 1 289
  • 那天完残,我揣著相機(jī)與錄音,去河邊找鬼横漏。 笑死谨设,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缎浇。 我是一名探鬼主播扎拣,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼素跺!你這毒婦竟也來(lái)了二蓝?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤指厌,失蹤者是張志新(化名)和其女友劉穎刊愚,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體仑乌,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡百拓,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了晰甚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衙传。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖厕九,靈堂內(nèi)的尸體忽然破棺而出蓖捶,到底是詐尸還是另有隱情,我是刑警寧澤扁远,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布俊鱼,位于F島的核電站,受9級(jí)特大地震影響畅买,放射性物質(zhì)發(fā)生泄漏并闲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一谷羞、第九天 我趴在偏房一處隱蔽的房頂上張望帝火。 院中可真熱鬧,春花似錦湃缎、人聲如沸犀填。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)九巡。三九已至,卻和暖如春蹂季,著一層夾襖步出監(jiān)牢的瞬間冕广,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工乏盐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留佳窑,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓父能,卻偏偏與公主長(zhǎng)得像神凑,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子何吝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容