重寫JdbcRDD實(shí)現(xiàn)條件查詢Mysql數(shù)據(jù)庫

雖然Spark使用關(guān)系型數(shù)據(jù)庫作為數(shù)據(jù)源的場(chǎng)景并不多尼啡,但是有時(shí)候我們還是希望能夠能夠從MySql等數(shù)據(jù)庫中讀取數(shù)據(jù)琐凭,并封裝成RDD芽隆。Spark官方確實(shí)也提供了這么一個(gè)庫給我們,org.apache.spark.rdd.JdbcRDD统屈。但是這個(gè)庫使用起來讓人覺得很雞肋胚吁,因?yàn)樗恢С謼l件查詢,只支持起止邊界查詢愁憔,這大大限定了它的使用場(chǎng)景腕扶。很多時(shí)候我們需要分析的數(shù)據(jù)不可能單獨(dú)建一個(gè)表,它們往往被混雜在一個(gè)大的表中吨掌,我們會(huì)希望更加精確的找出某一類的數(shù)據(jù)做分析半抱。
查看了一下這個(gè)JdbcRDD的源碼,我們就能明白為什么他只提供起止邊界了思犁。

 val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

    val url = conn.getMetaData.getURL
    if (url.startsWith("jdbc:mysql:")) {
      // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
      // streaming results, rather than pulling entire resultset into memory.
      // See the below URL
      // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html

      stmt.setFetchSize(Integer.MIN_VALUE)
    } else {
      stmt.setFetchSize(100)
    }

    logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")

    stmt.setLong(1, part.lower)
    stmt.setLong(2, part.upper)
    val rs = stmt.executeQuery()

它使用的是游標(biāo)的方式代虾,conn.prepareStatement(sql, type, concurrency)进肯,因此傳入的參數(shù)只能是這個(gè)分區(qū)的起始編號(hào)part.lower和這個(gè)分區(qū)的終止編號(hào)part.upper激蹲。我查了半天資料,也不知道這種方式該如何將條件傳給這個(gè)stmt 江掩,有點(diǎn)難受学辱。索性也不嘗試了乘瓤,也不考慮兼容其他類型的數(shù)據(jù)庫,只考慮mysql數(shù)據(jù)庫的話策泣,把游標(biāo)這種方式給去了衙傀,這樣使用limit總能給它查出來吧。
以下是具體實(shí)現(xiàn)萨咕,
重寫的JdbcRDD:

package JdbcRDD

import java.sql.{Connection, ResultSet}
import java.util.ArrayList
import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
private class JdbcPartition(idx: Int, val lower: Long, val upper: Long ,val params:ArrayList[Any]) extends Partition {
  override def index: Int = idx 
}

class JdbcRDD[T: ClassTag](
                            sc: SparkContext,
                            getConnection: () => Connection,
                            sql: String,
                            lowerBound: Long,
                            upperBound: Long,
                            params: ArrayList[Any],
                            numPartitions: Int,
                            mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
  extends RDD[T](sc, Nil) with Logging {
  override def getPartitions: Array[Partition] = {
    // bounds are inclusive, hence the + 1 here and - 1 on end
    val length = BigInt(1) + upperBound - lowerBound
    (0 until numPartitions).map { i =>
      val start = lowerBound + ((i * length) / numPartitions)
      val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
      new JdbcPartition(i, start.toLong, end.toLong,params)
    }.toArray
  }

  override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
  {
    context.addTaskCompletionListener{ context => closeIfNeeded() }
    val part = thePart.asInstanceOf[JdbcPartition]
    val conn = getConnection()
//直接采用我們常用的預(yù)處理方式
    val stmt = conn.prepareStatement(sql)
    val url = conn.getMetaData.getURL
    if (url.startsWith("jdbc:mysql:")) {
      stmt.setFetchSize(Integer.MIN_VALUE)
    } else {
      return null
    }
    logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
//傳參
   val params = part.params
    val paramsSize = params.size()
    if(params!=null){
      for(i <- 1 to paramsSize){
        val param = params.get(i-1)
        param match {
          case param:String => stmt.setString(i,param)
          case param:Int => stmt.setInt(i,param)
          case param:Boolean => stmt.setBoolean(i,param)
          case param:Double => stmt.setDouble(i,param)
          case param:Float => stmt.setFloat(i,param)
          case _=> {
            println("type is fault")
          }
        }
      }
    }
//限定該分區(qū)查詢起始偏移量和條數(shù)
    stmt.setLong(paramsSize+1, part.lower)
    stmt.setLong(paramsSize+2, part.upper-part.lower+1)
    val rs = stmt.executeQuery()
    override def getNext(): T = {
      if (rs.next()) {
        mapRow(rs)
      } else {
        finished = true
        null.asInstanceOf[T]
      }
    }
    override def close() {
      try {
        if (null != rs) {
          rs.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing resultset", e)
      }
      try {
        if (null != stmt) {
          stmt.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing statement", e)
      }
      try {
        if (null != conn) {
          conn.close()
        }
        logInfo("closed connection")
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }
}
object JdbcRDD {
  def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
    Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
  }

  trait ConnectionFactory extends Serializable {
    @throws[Exception]
    def getConnection: Connection
  }

  def create[T](
                 sc: JavaSparkContext,
                 connectionFactory: ConnectionFactory,
                 sql: String,
                 lowerBound: Long,
                 upperBound: Long,
                 params: ArrayList[Any],
                 numPartitions: Int,
                 mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {

    val jdbcRDD = new JdbcRDD[T](
      sc.sc,
      () => connectionFactory.getConnection,
      sql,
      lowerBound,
      upperBound,
      params,
      numPartitions,
      (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag)
    new JavaRDD[T](jdbcRDD)( fakeClassTag)
  }

 
  def create(
              sc: JavaSparkContext,
              connectionFactory: ConnectionFactory,
              sql: String,
              lowerBound: Long,
              upperBound: Long,
              params: ArrayList[Any],
              numPartitions: Int): JavaRDD[Array[Object]] = {

    val mapRow = new JFunction[ResultSet, Array[Object]] {
      override def call(resultSet: ResultSet): Array[Object] = {
        resultSetToObjectArray(resultSet)
      }
    }
    create(sc, connectionFactory, sql, lowerBound, upperBound, params,numPartitions, mapRow)
  }
  private def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}

接下來是測(cè)試代碼:

package JdbcRDD

import java.sql.{DriverManager, ResultSet}
import java.util

import org.apache.spark.SparkContext

object JdbcRDDTest {
  def main(args: Array[String]) {
    //val conf = new SparkConf().setAppName("spark_mysql").setMaster("local")
    val sc = new SparkContext("local[2]","spark_mysql")

    def createConnection() = {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://localhost:3306/transportation", "root", "pass")
    }
    def extractValues(r: ResultSet) = {
      (r.getString(1), r.getString(2))
    }
    val params = new util.ArrayList[Any]
    params.add(100)//傳參
    params.add(7)
    val data = new JdbcRDD(sc, createConnection, "SELECT * FROM login_log where  id<=? and user_id=? limit ?,?", lowerBound = 1, upperBound =20,params=params, numPartitions = 5, mapRow = extractValues)
    data.cache()
    println(data.collect.length)
    println(data.collect().toList)
    sc.stop()
  }
}

測(cè)試結(jié)果:


測(cè)試結(jié)果

可以看出统抬,重寫這個(gè)JdbcRDD后我們可以條件查詢某一個(gè)表,也可以同時(shí)限定查詢條數(shù)危队,這給我們用Spark分析Mysql中的數(shù)據(jù)提供了方便聪建,我們不需要先將需要的數(shù)據(jù)濾出來再進(jìn)行分析。當(dāng)然茫陆,這個(gè)demo寫的比較粗糙金麸,只是提供這么一種方法的演示,后期還可以稍加修改簿盅。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挥下,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子桨醋,更是在濱河造成了極大的恐慌棚瘟,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件喜最,死亡現(xiàn)場(chǎng)離奇詭異解取,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)返顺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門禀苦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人遂鹊,你說我怎么就攤上這事振乏。” “怎么了秉扑?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵慧邮,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我舟陆,道長(zhǎng)误澳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任秦躯,我火速辦了婚禮忆谓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘踱承。我一直安慰自己倡缠,他們只是感情好哨免,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著昙沦,像睡著了一般琢唾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上盾饮,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天采桃,我揣著相機(jī)與錄音,去河邊找鬼丘损。 笑死芍碧,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的号俐。 我是一名探鬼主播泌豆,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼吏饿!你這毒婦竟也來了踪危?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤猪落,失蹤者是張志新(化名)和其女友劉穎贞远,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體笨忌,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蓝仲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了官疲。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袱结。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖途凫,靈堂內(nèi)的尸體忽然破棺而出垢夹,到底是詐尸還是另有隱情,我是刑警寧澤维费,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布果元,位于F島的核電站,受9級(jí)特大地震影響犀盟,放射性物質(zhì)發(fā)生泄漏而晒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一阅畴、第九天 我趴在偏房一處隱蔽的房頂上張望倡怎。 院中可真熱鬧,春花似錦、人聲如沸诈胜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焦匈。三九已至,卻和暖如春昵仅,著一層夾襖步出監(jiān)牢的瞬間缓熟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工摔笤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留够滑,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓吕世,卻偏偏與公主長(zhǎng)得像彰触,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子命辖,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359