spark-streaming-kafka_2.11實現(xiàn)存儲offset的位置

前言

spark-streaming-kafka_2.11這個包的更新日期比較老了堤瘤,sparkstraming-kafka最新的包是spark-streaming-kafka-0-10_2.11,所以當(dāng)你看到這篇記錄的時候不用按照流程操作了溺健,應(yīng)為方法過于麻煩,最新的包中有自動保存offset的功能太闺,這篇文章只是用于記錄而已史侣。在spark-streaming-kafka-0-10_2.11中如果需要自動保存上次讀取topic的位置,則只需要簡單的設(shè)置"enable.auto.commit" -> (true: java.lang.Boolean)贱案,params設(shè)置如下:
kafkaParams

val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "hadoop-mgr:9092,hadoop-node1:9092,hadoop-node2:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "spark_group",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (true: java.lang.Boolean)
        )

內(nèi)容

xml

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>

main.scala

val kafkaParams = HashMap[String, String](
      "bootstrap.servers" -> "hadoop-mgr:9092,hadoop-node1:9092,hadoop-node2:9092",
      "group.id" -> "spark_group")

val topics = Set(TopicConstant.TOPIC_FACE_NAME, TopicConstant.TOPIC_PERSON_NAME
      , TopicConstant.TOPIC_MOTOR_VEHICLE, TopicConstant.TOPIC_SUB_NOTIFICATION)
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )

    val manager = new KafkaManager(kafkaParams)
    val stream = manager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    stream.foreachRDD(
      rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(partitionOfRecords => {
          partitionOfRecords.foreach(data => {
//            Log.info("------>:data:" + data._2)
            val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println(s"----->topic:${o.topic},partition:${o.partition},fromOffset:${o.fromOffset},untilOffset:${o.untilOffset}"
            }
          })
        })
        manager.updateZKOffsets(rdd)
      }
    )

KafkaManager.scala

這個類需要放在org.apache.spark.streaming.kafka包下面肛炮,不然KafkaCluster初始化不成功。

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream

import scala.collection.immutable.HashMap
import scala.reflect.ClassTag

/**
 * description: KafkaManager <br>
 * date: 2020/6/24 17:50 <br>
 * author: chezi008/chezi008@qq.com <br>
 * version: 1.0 <br>
 */
class KafkaManager(val kafkaParams: HashMap[String, String]) extends Serializable {

  private val kc = new KafkaCluster(kafkaParams)

  /**
   * 創(chuàng)建數(shù)據(jù)流
   *
   * @param ssc
   * @param kafkaParams
   * @param topics
   * @tparam K
   * @tparam V
   * @tparam KD
   * @tparam VD
   * @return
   */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
  (ssc: StreamingContext, kafkaParams: HashMap[String, String], topics: Set[String]): InputDStream[(K, V)] = {
    val groupId = kafkaParams.get("group.id").get
    //從zookeeper上讀取offset開始消費message
    //    val messages = {
    val partitionsE = kc.getPartitions(topics)
    if (partitionsE.isLeft)
      throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (!consumerOffsetsE.isLeft) {
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    } else {
      val p = kafkaParams + ("auto.offset.reset" -> "largest")
      KafkaUtils.createDirectStream(ssc, p, topics)
    }
    //    }
    //      messages
  }


  /**
   * 更新消費offsets
   *
   * @param rdd
   */
  def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, HashMap((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末宝踪,一起剝皮案震驚了整個濱河市侨糟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肴沫,老刑警劉巖粟害,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異颤芬,居然都是意外死亡悲幅,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門站蝠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汰具,“玉大人,你說我怎么就攤上這事菱魔×衾螅” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長聚蝶。 經(jīng)常有香客問我杰妓,道長,這世上最難降的妖魔是什么碘勉? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任巷挥,我火速辦了婚禮,結(jié)果婚禮上验靡,老公的妹妹穿的比我還像新娘倍宾。我一直安慰自己,他們只是感情好胜嗓,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布高职。 她就那樣靜靜地躺著,像睡著了一般辞州。 火紅的嫁衣襯著肌膚如雪怔锌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天孙技,我揣著相機與錄音产禾,去河邊找鬼。 笑死牵啦,一個胖子當(dāng)著我的面吹牛亚情,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播哈雏,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼楞件,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了裳瘪?” 一聲冷哼從身側(cè)響起土浸,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎彭羹,沒想到半個月后黄伊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡派殷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年还最,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毡惜。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡拓轻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出经伙,到底是詐尸還是另有隱情扶叉,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站枣氧,受9級特大地震影響溢十,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜达吞,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一茶宵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧宗挥,春花似錦、人聲如沸种蝶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽螃征。三九已至搪桂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間盯滚,已是汗流浹背踢械。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留魄藕,地道東北人内列。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像背率,于是被迫代替她去往敵國和親话瞧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355