scala代碼快速從Oracle抽取數(shù)據(jù)到kudu

scala代碼高速拉去數(shù)據(jù)-適合同步歷史數(shù)據(jù)場景

package com.longi.util

import com.longi.common.OracleTemplate
import com.longi.common.base.{HDFSPathWrapper, PropertiesHelper}
import com.longi.hadoop.{LoadKuduToDataFrame, SparkSessionPort}
import org.apache.kudu.client.SessionConfiguration
import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}
import org.apache.spark.sql.functions.{coalesce, lit}
import org.apache.spark.sql.types.{DataType, DecimalType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

/**
 * @date: 2021-12-3 9:55
 * @desc: Please fill in the remarks
 */
object Oracle2KuduWMS {

  def main(args: Array[String]): Unit = {
    //Step 0: 參數(shù)初始化
    if (args.length > 7) {
      throw new IllegalArgumentException("You need to pass the following parameters  1:The data of time(2020-06-01) 2:The name of Kudu table 3:Data extraction date column 4:The business Domain")
    }

    //(Oracle Table , Oracle Column, Begin Dt, End Dt, interval, Kudu Table)
    //Oracle Table Name
    val oraTableName = args(0)
    //Oracle Incremental Sliding Column
    val oraColumnName = args(1)
    //The Begin time and End Time
    val datBeginDateTime = args(2)
    val datEndDateTime = args(3)
    //Kudu table name
    val kuduTableName = args(4)
    val mesge = s"Oracle Table $oraTableName -> Kudu Table : $kuduTableName ($datBeginDateTime - $datEndDateTime) "
    val fieldValueChange = args(5)
    println(mesge)
    val spark = SparkSessionPort(mesge)

    var exeSQLCondition = ""
    if (datBeginDateTime == "0000-00-00 00:00:00") {
      exeSQLCondition = "1=1"
    } else {
      exeSQLCondition = oraColumnName.concat(">=to_date('").concat(datBeginDateTime).concat("','yyyy-mm-dd hh24:mi:ss') AND ").concat(oraColumnName).concat("<to_date('").concat(datEndDateTime).concat("','yyyy-mm-dd hh24:mi:ss')")
    }
    println(s"exeSQLCondition = ${exeSQLCondition}")

    val exeSQL =
      s"""
         |     (
         |     SELECT rownum as rs
         |           ,rowid as offsets  -- 用這個替換cast(rowid as varchar(20)) as offsets
         |           ,'I' as op_type
         |           ,t.*
         |       FROM ${oraTableName} t
         |      WHERE ${exeSQLCondition}
         |     )
          """.stripMargin
    println(s"exeSQL = ${exeSQL}")
    val exeAggrSQL =
      s"""
         |     (
         |     SELECT count(1)
         |       FROM ${oraTableName} t
         |      WHERE ${exeSQLCondition}
         |     ) t
         |""".stripMargin
    println(exeAggrSQL)

    val aggrDF = spark
      .read
      .format("jdbc")
      .option("url", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.uri"))
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .option("dbtable", exeAggrSQL)
      .option("user", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.username"))
      .option("password", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.password")).load()
    val upperBound = aggrDF.first().getDecimal(0).toBigInteger.intValue()


    val numPartitions = upperBound match {
      case upperBound if (upperBound <= 100000) => 1
      case upperBound if (upperBound < 100000 && upperBound <= 500000) => 3
      case upperBound if (upperBound < 500000 && upperBound <= 1000000) => 10
      case upperBound if (upperBound < 1000000 && upperBound <= 5000000) => 15
      case _ => 20
    }

    val oracleDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:oracle:thin:@xxx-st.longi.com:1521/prod")
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .option("dbtable", exeSQL)
      .option("user", "USER")
      .option("partitionColumn", "rs")
      .option("lowerBound", 1)
      .option("upperBound", upperBound)
      .option("numPartitions", numPartitions)
      .option("fetchsize", 3000)
      .option("password", "XXXXXXX").load()
    println(oracleDF.printSchema())

    val kuduSchema = LoadKuduToDataFrame(spark, kuduTableName).schema
    import org.apache.spark.sql.functions._
    //按照Kudu表數(shù)據(jù)類型對Oracle數(shù)據(jù)類型進行改造
    val colNames = kuduSchema.names.map(c => {
      col(c).cast(kuduSchema.fields(kuduSchema.fieldIndex(c)).dataType.typeName)
    })
    var oracleFinalDF = oracleDF.select(colNames: _*)
    //oracleFinalDF.show()
    //需要轉(zhuǎn)變的字段不為空
    /*val fields = fieldValueChange.split(",")

    fields.foreach(f => {
      /*oracleFinalDF.schema.fields(kuduSchema.fieldIndex(f.split("=")(0))).dataType.typeName match {
        case "Int" => coalesce(oracleFinalDF(f.split("=")(0)), lit(f.split("=")(1)))
        case "String" => (f.split("=")(0))
      }*/
      println(s"f={$fields}, f0=${f.split("=")(0)} f1=${f.split("=")(1)}")
      //coalesce(oracleFinalDF(f.split("=")(0)), lit(f.split("=")(1)))
      //nvl(oracleFinalDF.col(f.split("=")(0)), f.split("=")(1))
      //coalesce(col(f.split("=")(0)),lit(oracleFinalDF(f.split("=")(1))))
      val tf = oracleFinalDF.withColumn(f.split("=")(0).concat("_new"), nvl(oracleFinalDF.col(f.split("=")(0)), f.split("=")(1)))
      //tf.withColumn("a",oracleFinalDF.col("gl_sl_link_id"))
      tf.where("").show()
      val df = tf.drop(col(f.split("=")(0)))
      df.show()
      val rf = df.withColumnRenamed(f.split("=")(0).concat("_new"), f.split("=")(0))
      rf.where("gl_sl_link_id is null or gl_sl_link_id =-1").show()
      oracleFinalDF = rf
    })

    //將空值轉(zhuǎn)為給定值
    def nvl(ColIn: Column, ReplaceVal: Any): Column = {
      println(ReplaceVal)
      (when(ColIn.isNull, lit(ReplaceVal)).otherwise(ColIn))
    }

    oracleFinalDF.select("gl_sl_link_id").where("gl_sl_link_id is null or gl_sl_link_id =-1").distinct().show(200)
*/
    //創(chuàng)建新DF演痒,用于插入Kudu表中
    val newData = spark.createDataFrame(oracleFinalDF.rdd, kuduSchema)


    println(newData.printSchema())

    println(s"newData.rdd.partitions.size :${newData.rdd.partitions.size}")

    upsertKuduData(spark, newData, kuduTableName)
    spark.stop()

  }

  /**
   * @Description:寫入Kudu數(shù)據(jù)庫
   * @Param: [spark, dataFrame, tabName]
   * @return: void
   */
  private def upsertKuduData(spark: SparkSession, dataFrame: DataFrame, tabName: String): Unit = {
    val kuduContext = new KuduContext(
      HDFSPathWrapper.getKuduMasterPath(),
      spark.sparkContext
    )
    val ks = kuduContext.syncClient.newSession()
    ks.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
    ks.setMutationBufferSpace(10000)
    kuduContext.upsertRows(dataFrame, tabName, new KuduWriteOptions(false, true))
  }

}

調(diào)用方式

spark-submit  --class com.longi.util.Oracle2KuduWMS  --master yarn  --deploy-mode cluster  --queue bdp --driver-memory 2g  --num-executors 20  --executor-memory 1g  --executor-cores 1  --conf spark.sql.warehouse.dir=hdfs://longi/home/data/hive/warehouse  --conf spark.sql.session.timeZone=Asia/Shanghai  --conf spark.executor.memoryOverhead=10G  --jars /dfs/projects/etl-schedule-entry/lib/common/etl-functions-1.0.jar,/dfs/projects/etl-schedule-entry/lib/common/ojdbc6-11.2.0.3.jar,/dfs/projects/etl-schedule-entry/lib/common/kudu-spark2_2.11-1.10.0-cdh6.3.2.jar,/dfs/projects/etl-schedule-entry/lib/common/grizzled-slf4j_2.11-1.3.0.jar,/dfs/projects/etl-schedule-entry/lib/common/mysql-connector-java-5.1.44.jar  /dfs/projects-test/etl-schedule-entry/lib/common/etl-tools-1.0.jar WMS_PROD.ACT_ALLOCATION_DETAILS  addtime '2010-01-01 00:00:00' '2023-12-23 10:00:00'  ods_po_wms.streaming_cemd_act_allocation_details  'a=1'
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末亲轨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子鸟顺,更是在濱河造成了極大的恐慌惦蚊,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诊沪,死亡現(xiàn)場離奇詭異养筒,居然都是意外死亡,警方通過查閱死者的電腦和手機端姚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挤悉,“玉大人渐裸,你說我怎么就攤上這事巫湘。” “怎么了昏鹃?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵尚氛,是天一觀的道長。 經(jīng)常有香客問我洞渤,道長阅嘶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任载迄,我火速辦了婚禮讯柔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘护昧。我一直安慰自己魂迄,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布惋耙。 她就那樣靜靜地躺著捣炬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪绽榛。 梳的紋絲不亂的頭發(fā)上湿酸,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音灭美,去河邊找鬼推溃。 笑死,一個胖子當著我的面吹牛冲粤,可吹牛的內(nèi)容都是我干的美莫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼梯捕,長吁一口氣:“原來是場噩夢啊……” “哼厢呵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起傀顾,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤襟铭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后短曾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寒砖,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年嫉拐,在試婚紗的時候發(fā)現(xiàn)自己被綠了哩都。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡婉徘,死狀恐怖漠嵌,靈堂內(nèi)的尸體忽然破棺而出咐汞,到底是詐尸還是另有隱情,我是刑警寧澤儒鹿,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布化撕,位于F島的核電站,受9級特大地震影響约炎,放射性物質(zhì)發(fā)生泄漏植阴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一圾浅、第九天 我趴在偏房一處隱蔽的房頂上張望掠手。 院中可真熱鬧,春花似錦贱傀、人聲如沸惨撇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽魁衙。三九已至,卻和暖如春株搔,著一層夾襖步出監(jiān)牢的瞬間剖淀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工纤房, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纵隔,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓炮姨,卻偏偏與公主長得像捌刮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子舒岸,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容