MongoDB數(shù)據(jù)增量同步到Hive(方案二通過MongoSpark)

一、背景

本文續(xù)接上文 MongoDB數(shù)據(jù)增量同步到Hive(方案一通過BSON文件映射)
考慮到上文的方案一献宫,耗時(shí)又占用空間,想寫程序?qū)崿F(xiàn)數(shù)據(jù)直達(dá)澄耍,于是姻檀,有了以下方案。

二耻蛇、方案二 通過MongoSpark程序拉取monggo數(shù)據(jù)

首先還是存量數(shù)據(jù)

工具類SparkHiveUtil


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NumericType, ShortType, StringType, TimestampType}

object SparkHiveUtil {

  def createTable(spark:SparkSession,df:DataFrame,hive_db:String,tableName:String,ispartiton:Boolean): Unit ={
    val hive_table=hive_db+"."+tableName
    val createsql = getCreateSql(df,hive_table,ispartiton)
    println("SparkHiveUtil_createsql:"+createsql)
    println("SparkHiveUtil_drop table if exists "+hive_table)
    spark.sql("drop table if exists "+hive_table)
    println("SparkHiveUtil create table  "+hive_table)
    spark.sql(createsql)
  }

  def getCreateSql(df:DataFrame,hive_table:String,ispartiton:Boolean): String ={
    val structtype= df.schema
    val fields:scala.Array[org.apache.spark.sql.types.StructField] = structtype.fields
    var fieldType="STRING"
    var createsql = "CREATE  TABLE if not exists "+hive_table+"(\n"

    if(fields.length<1)return ""

    for (i <- 0 until fields.length) {
      var fieldName= fields(i).name
      if(fieldName.startsWith("_"))fieldName="`"+fieldName+"`"
      println(fieldName+":"+fields(i).dataType)
      fieldType=getFieldType(fields(i).dataType)
      if(i==fields.length-1){
        createsql=createsql+fieldName+" "+fieldType+"\n"
      }else{
        createsql=createsql+fieldName+" "+fieldType+",\n"
      }
    }
    createsql=createsql+")\ncomment '"+hive_table +"' \n"
    if(ispartiton) createsql=createsql+"PARTITIONED BY (`pyear` int, `pmonth` int,`pday` int)\n "
    createsql=createsql+"ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'\n " +
      "STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'\n " +
      "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'\n " +
      "TBLPROPERTIES ( 'orc.compress'='snappy')"
    createsql
  }

  def getFieldType(dataType:DataType): String ={
    dataType match {
      case a:ArrayType => {
        "ARRAY<"+getFieldType(a.elementType)+">"
      }
      case b: BinaryType => "BINARY"
      case bl: BooleanType => "BOOLEAN"
      case by: ByteType => "TINYINT"
      case da: DateType => "TIMESTAMP"
      case de: DecimalType => "DECIMAL"
      case dou: DoubleType => "DOUBLE"
      case f: FloatType => "FLOAT"
      case i: IntegerType => "INT"
      case l: LongType => "BIGINT"
      case n: NumericType => "BIGINT"
      case sh: ShortType => "SMALLINT"
      case st: StringType => "STRING"
      case t: TimestampType => "TIMESTAMP"
      case _:DataType => "STRING"
    }
  }
}

同步存量monggo數(shù)據(jù)的類SparkMon2hiveOriginNormal


import com.dpl.dws.mongo.utils.{MongoUtils, SparkHiveUtil, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkMon2hiveOriginNormal {
  val logger = Logger.getLogger("SparkMon2hiveOriginNormal")
  val hive_mongo_db=MongoUtils.hive_mongo_db


//不分區(qū)的表
  def syn_origin_table_normal(mongo_table:String,hive_table:String): Unit ={
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    import spark.implicits._
    val df: DataFrame = MongoSpark.load(spark)
    df.printSchema()


    //建表
    SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,false)

    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注冊(cè)臨時(shí)表

    logger.info("開始同步 "+mongo_table+" 表數(shù)據(jù)到Hive "+hive_mongo_db+"."+hive_table)
      val t1 = System.currentTimeMillis
    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864b")
   // spark.sql("truncate table "+hive_table)
    //將DF寫入到Hive中
    spark.sql("INSERT OVERWRITE TABLE   "+hive_mongo_db+"."+hive_table+"  select * from "+temp_table+" distribute by  rand() ")
    val t2 = System.currentTimeMillis
    logger.info("共耗時(shí):" + (t2 - t1) / 60000 + "分鐘")
    spark.stop()
  }

// 分區(qū)表
  def syn_origin_table_partition(mongo_table:String,hive_table:String,partitionBy:String="createTime"): Unit ={
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    val df: DataFrame = MongoSpark.load(spark)
    df.printSchema()
    //建表
    SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,true)

    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注冊(cè)臨時(shí)表
    //    val res: DataFrame = spark.sql("SELECT * from "+temp_table+" limit 10")
    //    res.show(10)
    println("開始同步 "+mongo_table+" 表數(shù)據(jù)到Hive "+hive_mongo_db+"."+hive_table)
    val t1 = System.currentTimeMillis

    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")
    //將DF寫入到Hive中
    //選擇Hive數(shù)據(jù)庫
    spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+hive_table+
      " PARTITION (pyear,pmonth,pday) " +
      "SELECT t.*,year(t."+partitionBy+") pyear,month(t."+partitionBy+") pmonth,day(t."+partitionBy+") pday " +
      "from "+temp_table+" t ")
    val t2 = System.currentTimeMillis
    println("共耗時(shí):" + (t2 - t1) / 60000 + "分鐘")
    spark.stop()
  }

  def initSparkSession(mongo_table:String): SparkSession ={
    val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveOriginNormal_"+mongo_table)
    SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
  }

}

其次是增量數(shù)據(jù)加合并

AppendCommon代碼如下:

package com.dpl.dws.mongo.mon2hive.increment.append

import java.text.SimpleDateFormat

import com.dpl.dws.common.DateTimeUtil
import com.dpl.dws.mongo.utils.{MongoUtils, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.rdd.MongoRDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.bson.Document

object AppendCommon {
  val hive_mongo_db=MongoUtils.hive_mongo_db

  def increment_partition(mongo_table:String,inc_date:String,incrementBy:String): Unit ={
    println(mongo_table,inc_date)
    val year=inc_date.substring(0,4)
    val month=inc_date.substring(4,6).toInt
    val day=inc_date.substring(6).toInt
    println(year,month,day)
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    val begin_time=inc_date+" 00:00:00.000"
    val end_time=inc_date+" 23:59:59.999"

    val dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss.SSS")
    val begin_date_time=  dateFormat.parse(begin_time)
    val end_date_time=  dateFormat.parse(end_time)
    println("begin_date_time:"+begin_date_time)
    println("end_date_time:"+end_date_time)
    val begin_date_time_UTC=DateTimeUtil.getIUTCTimestamp(begin_date_time)
    val end_date_time_UTC=DateTimeUtil.getIUTCTimestamp(end_date_time)

    val query = "{ '$match': {'"+incrementBy+"':{'$gte':ISODate('"+begin_date_time_UTC+"')," +
      "'$lte':ISODate('"+end_date_time_UTC+"')}} }"
   // val query = "{ '$match': {'createTime': { '$gte' : ISODate('"+time+"')} } }"
    println("query:"+query)

    val rdd:MongoRDD[Document] = MongoSpark.load(spark.sparkContext)
    val aggregatedRdd = rdd.withPipeline(Seq(Document.parse(query)))
    val count = aggregatedRdd.count
    println("count___"+count)
    //  println(aggregatedRdd.first.toJson)

    if(count==0){
      spark.stop()
      return
    }

    //將DF寫入到Hive中
    val df1 = aggregatedRdd.toDF()
    df1.printSchema()
    val df: DataFrame = df1.drop("_class")
    df.printSchema()
    //df.show(10)

    //將DF寫入到Hive中
    spark.sql("set spark.executor.memory=7889934592b")
    spark.sql("use "+hive_mongo_db)
    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注冊(cè)臨時(shí)表


    println("開始同步 "+mongo_table+" 表數(shù)據(jù)到Hive "+hive_mongo_db+"."+mongo_table)
    val t1 = System.currentTimeMillis

    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")


    spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+mongo_table+
      " PARTITION (pyear="+year+",pmonth="+month+",pday="+day+") " +
      "SELECT t.* " +
      "from "+temp_table+" t ")

    val t2 = System.currentTimeMillis
    println("共耗時(shí):" + (t2 - t1) / 60000 + "分鐘")

    spark.stop()
  }

  def initSparkSession(mongo_table:String): SparkSession ={
    val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveAppendCommon_"+mongo_table)
    SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
  }

}

三吭从、存在的問題

此方案相對(duì)于方案一來說朝蜘,節(jié)省了不少空間和步驟,速度也快涩金;
但是對(duì)于數(shù)據(jù)量比較大的表谱醇,存在丟失數(shù)據(jù)的情況,hive中記錄條數(shù)總是略少于monggo庫表的記錄條數(shù)步做,目前還沒找到原因副渴;
對(duì)于更新的數(shù)據(jù)涉及到的分區(qū)范圍很廣的數(shù)據(jù),做增量更新還不如全量全度,所以還是有待改進(jìn)煮剧,考慮嘗試使用Hudi做增量同步。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市勉盅,隨后出現(xiàn)的幾起案子佑颇,更是在濱河造成了極大的恐慌,老刑警劉巖草娜,帶你破解...
    沈念sama閱讀 223,002評(píng)論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挑胸,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡宰闰,警方通過查閱死者的電腦和手機(jī)茬贵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來议蟆,“玉大人,你說我怎么就攤上這事萎战「廊荩” “怎么了?”我有些...
    開封第一講書人閱讀 169,787評(píng)論 0 365
  • 文/不壞的土叔 我叫張陵蚂维,是天一觀的道長(zhǎng)戳粒。 經(jīng)常有香客問我,道長(zhǎng)虫啥,這世上最難降的妖魔是什么蔚约? 我笑而不...
    開封第一講書人閱讀 60,237評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮涂籽,結(jié)果婚禮上苹祟,老公的妹妹穿的比我還像新娘。我一直安慰自己评雌,他們只是感情好树枫,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,237評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著景东,像睡著了一般砂轻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上斤吐,一...
    開封第一講書人閱讀 52,821評(píng)論 1 314
  • 那天搔涝,我揣著相機(jī)與錄音,去河邊找鬼和措。 笑死庄呈,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的派阱。 我是一名探鬼主播抒痒,決...
    沈念sama閱讀 41,236評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了故响?” 一聲冷哼從身側(cè)響起傀广,我...
    開封第一講書人閱讀 40,196評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎彩届,沒想到半個(gè)月后伪冰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,716評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡樟蠕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,794評(píng)論 3 343
  • 正文 我和宋清朗相戀三年贮聂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片寨辩。...
    茶點(diǎn)故事閱讀 40,928評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡吓懈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出靡狞,到底是詐尸還是另有隱情耻警,我是刑警寧澤,帶...
    沈念sama閱讀 36,583評(píng)論 5 351
  • 正文 年R本政府宣布甸怕,位于F島的核電站甘穿,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏梢杭。R本人自食惡果不足惜温兼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,264評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望武契。 院中可真熱鬧募判,春花似錦、人聲如沸咒唆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽钧排。三九已至敦腔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間恨溜,已是汗流浹背符衔。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留糟袁,地道東北人判族。 一個(gè)月前我還...
    沈念sama閱讀 49,378評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像项戴,于是被迫代替她去往敵國(guó)和親形帮。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,937評(píng)論 2 361