SparkStreaming 寫數(shù)據(jù)到 HBase茁彭,由于共用連接造成的數(shù)據(jù)丟失問(wèn)題

有如下程序总寒,SparkStreaming 讀取 Kafka 中的數(shù)據(jù),經(jīng)過(guò)處理后理肺,把數(shù)據(jù)寫入到 Hbase 中

/**
  * Author: Jed
  * Description: SparkStreaming 讀取 Kafka 中的數(shù)據(jù)摄闸,實(shí)時(shí)寫入 HBase中
  * Create: 2018-05-04 14:50
  */
object HBaseTest {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))

    val kafkaParams = Map[String, AnyRef](
      "bootstrap.servers" -> "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest"
      "group.id" -> s"GROUP${new Random().nextInt(1000)}"
    )

    val topics = Array("baihe")

    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val values: DStream[Array[String]] = stream.map(_.value.split("\\|"))

    values.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        val connection = HBaseUtil.getConnection
        val tableName = TableName.valueOf("test")
        val table = connection.getTable(tableName)
        val puts = new ArrayList[Put]

        try {
          partition.foreach(arr => {

            val put = new Put(CustomerFunction.genRowkey(arr(0)))
            val index = Array[Int](0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            val enNames = Array[String]("touched", "user_number", "start_time", "end_time", "channel_id", "binding_flag", "click_path", "result", "interf_name", "channel_category", "by_operator")
            var value = ""
            for (i <- 0 until index.length) {
              value += arr(i) + "|"
            }
            value = value.dropRight(1)
            put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes(value))
            puts.add(put)
            // 這里為了提高性能,每一萬(wàn)條入一次HBase庫(kù)
            if (puts.size % 10000 == 0) {
              table.put(puts)
              puts.clear()
            }
          })
        } catch {
          case e: Exception => e.printStackTrace
        } finally {
          table.put(puts)
          table.close
          connection.close
        }
      })
    })

    ssc.start
    ssc.awaitTermination
  }
}


object HBaseUtil {

  var conf: Configuration = null
  var connection: Connection = null

  def getConnection(): Connection = {

    if (conf == null) {
      conf.set("hbase.zookeeper.quorum", "172.16.26.6:2181,172.16.26.10:2181,172.16.26.13:2181")
    }

    if ((connection == null || connection.isClosed()) && conf != null) {
      try {
        connection = ConnectionFactory.createConnection(conf)
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
    return connection;
  }

  def colse() = {
    if (connection != null) {
      try {
        connection.close();
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
  }
}

執(zhí)行以上程序妹萨,中途會(huì)報(bào)錯(cuò):

2018-05-29 16:21:40 883 [ERROR] org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:432) Failed to get region location 
org.apache.hadoop.hbase.DoNotRetryIOException: hconnection-0x6432ad81 closed
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1174)
    at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:422)
    at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:371)
    at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:245)
    at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:197)
    at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1461)
    at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1029)

重點(diǎn)是:hconnection-0x6432ad81 closed
問(wèn)題出在獲得連接的工具類中年枕,在 DStream 中的每個(gè) partition 中獲得中一個(gè) HBase 的連接,為了提高"效率"眠副,讓每個(gè) partition 共用了一個(gè) connection画切,但就是這樣,才導(dǎo)致了問(wèn)題的出現(xiàn)囱怕,假設(shè) A partition 中有 10000 條數(shù)據(jù)霍弹,B partition 中有 20000 條數(shù)據(jù),兩個(gè) partition 共用一個(gè) connection娃弓,A典格、B兩個(gè) partition 并行的往 HBase 中寫數(shù)據(jù),當(dāng) A partition 寫完10000條數(shù)據(jù)后台丛,關(guān)閉了 connection耍缴,假設(shè)此時(shí) B partition 也已經(jīng)寫入了10000條數(shù)據(jù),但它還有 10000 條數(shù)據(jù)要寫挽霉,連接卻關(guān)閉了防嗡,程序會(huì)報(bào)以上的錯(cuò)誤,數(shù)據(jù)會(huì)丟失 10000 條

解決辦法就是讓每個(gè) partition 獲得獨(dú)立的 connection侠坎,只需要把 HBaseUtil 類修改如下即可:

object HBaseUtil {
  val conf: Configuration = HBaseConfiguration.create()
  conf.set("hbase.zookeeper.quorum", "192.168.42.101:2181,192.168.42.102:2181,192.168.42.101:2181")
  def getConnection(): Connection = {
    return ConnectionFactory.createConnection(conf)
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蚁趁,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子实胸,更是在濱河造成了極大的恐慌他嫡,老刑警劉巖番官,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異钢属,居然都是意外死亡徘熔,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門淆党,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)酷师,“玉大人,你說(shuō)我怎么就攤上這事染乌≈仙” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵慕匠,是天一觀的道長(zhǎng)饱须。 經(jīng)常有香客問(wèn)我,道長(zhǎng)台谊,這世上最難降的妖魔是什么蓉媳? 我笑而不...
    開(kāi)封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮锅铅,結(jié)果婚禮上酪呻,老公的妹妹穿的比我還像新娘。我一直安慰自己盐须,他們只是感情好玩荠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著贼邓,像睡著了一般阶冈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上塑径,一...
    開(kāi)封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天女坑,我揣著相機(jī)與錄音,去河邊找鬼统舀。 笑死匆骗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的誉简。 我是一名探鬼主播碉就,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼闷串!你這毒婦竟也來(lái)了瓮钥?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎骏庸,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體年叮,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡具被,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了只损。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片一姿。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖跃惫,靈堂內(nèi)的尸體忽然破棺而出叮叹,到底是詐尸還是另有隱情,我是刑警寧澤爆存,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布蛉顽,位于F島的核電站,受9級(jí)特大地震影響先较,放射性物質(zhì)發(fā)生泄漏携冤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一闲勺、第九天 我趴在偏房一處隱蔽的房頂上張望曾棕。 院中可真熱鬧,春花似錦菜循、人聲如沸翘地。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)衙耕。三九已至,卻和暖如春勺远,著一層夾襖步出監(jiān)牢的瞬間臭杰,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工谚中, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留渴杆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓宪塔,卻偏偏與公主長(zhǎng)得像磁奖,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子某筐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,712評(píng)論 13 425
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理比搭,服務(wù)發(fā)現(xiàn),斷路器南誊,智...
    卡卡羅2017閱讀 134,629評(píng)論 18 139
  • 睡不著這事兒最痛苦了身诺。躺在床上輾轉(zhuǎn)反側(cè)蜜托,明明眼睛已經(jīng)困澀的要粘在一起,可大腦還在一個(gè)人自嗨霉赡。越想著自己要睡著...
    周米啊閱讀 859評(píng)論 1 5
  • 炊煙起了,我在門口等你 夕陽(yáng)下了,我在山邊等你 葉子黃了,我在樹(shù)下等你 月兒彎了,我在十五等你 細(xì)雨來(lái)了,我在傘下...
    若水云煙閱讀 503評(píng)論 13 8
  • JitPack、jCenter是我們常用的發(fā)布Android開(kāi)源庫(kù)的網(wǎng)站嗓化,發(fā)布成功后就可以在Android Stu...
    SheHuan閱讀 6,308評(píng)論 5 59