SparkStreaming之優(yōu)雅停止

本文主要記錄使用SparkStreaming從Kafka里讀取數(shù)據(jù)纳寂,并使用Redis保存Offset,并監(jiān)聽Redis中的某個Key是否存在來停止程序

相關(guān)文章:
1.Spark之PI本地
2.Spark之WordCount集群
3.SparkStreaming之讀取Kafka數(shù)據(jù)
4.SparkStreaming之使用redis保存Kafka的Offset
5.SparkStreaming之優(yōu)雅停止
6.SparkStreaming之寫數(shù)據(jù)到Kafka
7.Spark計算《西虹市首富》短評詞云

1.監(jiān)聽redis的某個key是否存在

/**
  * 優(yōu)雅的停止Streaming程序
  *
  * @param ssc
  */
def stopByMarkKey(ssc: StreamingContext): Unit = {
  val intervalMills = 10 * 1000 // 每隔10秒掃描一次消息是否存在
    var isStop = false
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
        if (!isStop && isExists(STOP_FLAG)) {
          LOG.warn("2秒后開始關(guān)閉sparstreaming程序.....")
            Thread.sleep(2000)
            ssc.stop(true, true)
        }
    }
}

/**
    * 判斷Key是否存在
    *
    * @param key
    * @return
    */
def isExists(key: String): Boolean = {
  val jedis = InternalRedisClient.getPool.getResource
    val flag = jedis.exists(key)
    jedis.close()
    flag
}

2.KafkaRedisStreaming

package me.jinkun.stream

import me.jinkun.scala.ETLStreaming.LOG
import me.jinkun.scala.util.InternalRedisClient
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.slf4j.LoggerFactory

/**
  *
  */
object KafkaRedisStreaming {
  private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming")

  private val STOP_FLAG = "TEST_STOP_FLAG"

  def initRedisPool() = {
    // Redis configurations
    val maxTotal = 20
    val maxIdle = 10
    val minIdle = 1
    val redisHost = "47.98.119.122"
    val redisPort = 6379
    val redisTimeout = 30000
    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
  }

  /**
    * 從redis里獲取Topic的offset值
    *
    * @param topicName
    * @param partitions
    * @return
    */
  def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {
    if (LOG.isInfoEnabled())
      LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)

    //從Redis獲取上一次存的Offset
    val jedis = InternalRedisClient.getPool.getResource
    val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]
    for (partition <- 0 to partitions - 1) {
      val topic_partition_key = topicName + "_" + partition
      val lastSavedOffset = jedis.get(topic_partition_key)
      val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong
      fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)
    }
    jedis.close()

    fromOffsets.toMap
  }

  def main(args: Array[String]): Unit = {
    //初始化Redis Pool
    initRedisPool()

    val conf = new SparkConf()
      .setAppName("ScalaKafkaStream")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc, Seconds(3))

    val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val groupId = "kafka-test-group"
    val topicName = "Test"
    val maxPoll = 20000

    val kafkaParams = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 這里指定Topic的Partition的總數(shù)
    val fromOffsets = getLastCommittedOffsets(topicName, 3)

    // 初始化KafkaDS
    val kafkaTopicDS =
      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))

    kafkaTopicDS.foreachRDD(rdd => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // 如果rdd有數(shù)據(jù)
      if (!rdd.isEmpty()) {
        val jedis = InternalRedisClient.getPool.getResource
        val p = jedis.pipelined()
        p.multi() //開啟事務(wù)

        // 處理數(shù)據(jù)
        rdd
          .map(_.value)
          .flatMap(_.split(" "))
          .map(x => (x, 1L))
          .reduceByKey(_ + _)
          .sortBy(_._2, false)
          .foreach(println)

        //更新Offset
        offsetRanges.foreach { offsetRange =>
          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
          val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
          p.set(topic_partition_key, offsetRange.untilOffset + "")
        }

        p.exec() //提交事務(wù)
        p.sync //關(guān)閉pipeline
        jedis.close()
      }
    })

    ssc.start()

    // 優(yōu)雅停止
    stopByMarkKey(ssc)

    ssc.awaitTermination()
  }

  /**
    * 優(yōu)雅停止
    *
    * @param ssc
    */
  def stopByMarkKey(ssc: StreamingContext): Unit = {
    val intervalMills = 10 * 1000 // 每隔10秒掃描一次消息是否存在
    var isStop = false
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExists(STOP_FLAG)) {
        LOG.warn("2秒后開始關(guān)閉sparstreaming程序.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }
    }
  }

  /**
    * 判斷Key是否存在
    *
    * @param key
    * @return
    */
  def isExists(key: String): Boolean = {
    val jedis = InternalRedisClient.getPool.getResource
    val flag = jedis.exists(key)
    jedis.close()
    flag
  }
}

這樣只需要在Redis里創(chuàng)建TEST_STOP_FLAG即可

set "TEST_STOP_FLAG" 1

運行結(jié)果:


這樣在停止SparkStreaming程序是就不會造成數(shù)據(jù)丟失了蜂桶。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子鞍陨,更是在濱河造成了極大的恐慌势决,老刑警劉巖阻塑,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異果复,居然都是意外死亡陈莽,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門虽抄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來走搁,“玉大人,你說我怎么就攤上這事迈窟∷街玻” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵车酣,是天一觀的道長曲稼。 經(jīng)常有香客問我,道長湖员,這世上最難降的妖魔是什么贫悄? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮破衔,結(jié)果婚禮上清女,老公的妹妹穿的比我還像新娘。我一直安慰自己晰筛,他們只是感情好嫡丙,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著读第,像睡著了一般曙博。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上怜瞒,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天父泳,我揣著相機與錄音般哼,去河邊找鬼。 笑死惠窄,一個胖子當著我的面吹牛蒸眠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播杆融,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼楞卡,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了脾歇?” 一聲冷哼從身側(cè)響起蒋腮,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎藕各,沒想到半個月后池摧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡激况,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年作彤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乌逐。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡宦棺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出黔帕,到底是詐尸還是另有隱情代咸,我是刑警寧澤,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布成黄,位于F島的核電站呐芥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏奋岁。R本人自食惡果不足惜思瘟,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望闻伶。 院中可真熱鬧滨攻,春花似錦、人聲如沸蓝翰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽畜份。三九已至诞帐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間爆雹,已是汗流浹背停蕉。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工愕鼓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人慧起。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓菇晃,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蚓挤。 傳聞我的和親對象是個殘疾皇子谋旦,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)屈尼,斷路器,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評論 13 425
  • 本月昕空價值指數(shù)進行了調(diào)倉拴孤,新的組合脾歧。指數(shù)化組合進行定期的調(diào)整,等權(quán)組合演熟,期末調(diào)入調(diào)出鞭执,利于心態(tài)。一開始芒粹,...
    bonjour1218閱讀 233評論 0 0
  • 我的世界本該是無人的兄纺, 直到遇到了你, 又驗證了這句話化漆。
    生活無非就是愛啊閱讀 149評論 0 0
  • 我以為我們相識多年 你我知己知彼 沒成想我的坦誠相待 卻變成你輕言輕語自以為是的資本
    枯葉晚茶閱讀 156評論 0 0