15 Spark Streaming源碼解讀之No Receivers徹底思考

Spark Streaming在企業(yè)級使用中搅轿,一般會使用no receiver的方式讀取數(shù)據(jù)痰憎,對應(yīng)kafka中的Direct方式,采用no receiver的方式可以提高數(shù)據(jù)讀取效率并保證事務(wù)的一致性陨享,看看在Spark Streaming中是怎樣使用kafka的Direct方式

  1. 首先展示一個Demo,代碼如下
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 這里使用了KafkaUtils.createDirectStream方法來創(chuàng)建了一個DirectKafkaInputDStream,createDirectStream的代碼如下
def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
  1. 首先實例化了一個KafkaCluster急膀,KafkaCluster里面封裝了Spark Streaming操作kafka Api的所有方法。重點來了辆飘,如果我們在其他項目中需要讀取kafka的數(shù)據(jù)啦辐,KafkaCluster就是最好的參考文檔了谓传。接著看getFromOffsets(kc, kafkaParams, topics)方法,返回的fromOffsets 變量類型為 Map[TopicAndPartition, Long]芹关,TopicAndPartition是kafka里的對象续挟,代表了topic和他的第幾個分區(qū),Long的值就是offset(數(shù)據(jù)偏移量)侥衬,kafka是一個分布式消息隊列诗祸,每一個topic可以有多個partition,多個partition可以提高kafka的吞吐量轴总。
  2. 接著看DirectKafkaInputDStream內(nèi)部代碼直颅,compute()方法代碼如下
//這里沒有使用receiver,而是直接包裝了KafkaRDD進(jìn)行數(shù)據(jù)讀取
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    // 獲取每個topic中每個partition的offset結(jié)束偏移量肘习,對應(yīng)currentOffsets际乘。代表了每個partition數(shù)據(jù)的開始和結(jié)束位置
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    // KafkaRDD用來從kafka中讀取數(shù)據(jù),類似hadoopRDD從hdfs中讀取數(shù)據(jù)漂佩,如果我們讀取其他數(shù)據(jù)來源脖含,也可以自定義RDD
    val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    // offsetRanges 表示每次讀取partition數(shù)據(jù)的范圍
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map("offsets" -> offsetRanges.toList,StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    // 更新currentOffsets,這次的結(jié)束位置就下次的開始位置
    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
}

主要是獲取untilOffsets 投蝉,把currentOffsets和untilOffsets 傳遞到KafkaRDD中养葵,這樣KafkaRDD就知道讀取哪些數(shù)據(jù)了。

  1. 看KafkaRDD中都有哪些內(nèi)容瘩缆,首先使用KafkaRDD的伴生對象去創(chuàng)建Kafka的實例关拒,看一下Kafka的apply方法,代碼如下
def apply[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,
    R: ClassTag](
      sc: SparkContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      untilOffsets: Map[TopicAndPartition, LeaderOffset],
      messageHandler: MessageAndMetadata[K, V] => R
    ): KafkaRDD[K, V, U, T, R] = {
    val leaders = untilOffsets.map { case (tp, lo) =>
        tp -> (lo.host, lo.port)
    }.toMap
    // offsetRanges代表了數(shù)據(jù)偏移量的開始和結(jié)束位置
    val offsetRanges = fromOffsets.map { case (tp, fo) =>
        val uo = untilOffsets(tp)
        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }.toArray

    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

主要就是生成了一個offsetRanges庸娱,和leaders(數(shù)據(jù)所在的ip地址和端口)着绊,messageHandler也是一個很有意思的函數(shù),一會再介紹

  1. 看KafkaRDD的代碼熟尉,自定義一個RDD归露,最關(guān)鍵的三個方法分別是getPartitions、getPreferredLocations斤儿、compute剧包。
    (1)首先看getPartitions,代碼如下
override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
   }.toArray
}

每一個topic的每一個partition對應(yīng)一個Spark中的partition往果,從這里我們可以評估一個kafka數(shù)據(jù)源(topic)需要多少個core資源來計算
(2)再看getPreferredLocations方法疆液,代碼如下

override def getPreferredLocations(thePart: Partition): Seq[String] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    // TODO is additional hostname resolution necessary here
    Seq(part.host)
}

getPreferredLocations決定了數(shù)據(jù)本地性,如果kafka中broker和Spark在同一個集群中陕贮,此時getPreferredLocations獲取本地性就可以極大提高效率堕油,因為沒有了數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)某杀?br> (3)最后看compute方法,代碼如下

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
    if (part.fromOffset == part.untilOffset) {
      log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")
      Iterator.empty
    } else {
      new KafkaRDDIterator(part, context)
    }
}

主要功能封裝在KafkaRDDIterator中,代碼如下

private class KafkaRDDIterator(part: KafkaRDDPartition,context: TaskContext) extends NextIterator[R] {

    context.addTaskCompletionListener{ context => closeIfNeeded() }

    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val kc = new KafkaCluster(kafkaParams)
    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(kc.config.props)
      .asInstanceOf[Decoder[K]]
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(kc.config.props)
      .asInstanceOf[Decoder[V]]
    val consumer = connectLeader
    var requestOffset = part.fromOffset
    var iter: Iterator[MessageAndOffset] = null

    // The idea is to use the provided preferred host, except on task retry attempts,
    // to minimize number of kafka metadata requests
    private def connectLeader: SimpleConsumer = {
      if (context.attemptNumber > 0) {
        kc.connectLeader(part.topic, part.partition).fold(
          errs => throw new SparkException(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
              errs.mkString("\n")),
          consumer => consumer
        )
      } else {
        kc.connect(part.host, part.port)
      }
    }

    private def handleFetchErr(resp: FetchResponse) {
      if (resp.hasError) {
        val err = resp.errorCode(part.topic, part.partition)
        if (err == ErrorMapping.LeaderNotAvailableCode ||
          err == ErrorMapping.NotLeaderForPartitionCode) {
          log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
          Thread.sleep(kc.config.refreshLeaderBackoffMs)
        }
        // Let normal rdd retry sort out reconnect attempts
        throw ErrorMapping.exceptionFor(err)
      }
    }

    // 3馍迄、獲取一批數(shù)據(jù)
    private def fetchBatch: Iterator[MessageAndOffset] = {
      // 4福也、包裝一個request請求對象
      val req = new FetchRequestBuilder()
        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
        .build()
      // 5、使用SimpleConsumer對象發(fā)送fetch請求攀圈,返回response
      val resp = consumer.fetch(req)
      handleFetchErr(resp)
      // kafka may return a batch that starts before the requested offset
      // 5、從返回的response中得到一個數(shù)據(jù)的迭代器
      resp.messageSet(part.topic, part.partition)
        .iterator
        .dropWhile(_.offset < requestOffset)
    }

    override def close(): Unit = {
      if (consumer != null) {
        consumer.close()
      }
    }
    // 1峦甩、從讀取數(shù)據(jù)開始看
    override def getNext(): R = {
      if (iter == null || !iter.hasNext) {
        // 2赘来、獲取一些數(shù)據(jù),數(shù)據(jù)包裝在迭代器中
        iter = fetchBatch
      }
      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {
        // 6凯傲、不斷讀取數(shù)據(jù)
        val item = iter.next()
        if (item.offset >= part.untilOffset) {
          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
          // 7犬辰、將數(shù)據(jù)封裝到MessageAndMetadata類中
          messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
        }
      }
    }
}

跟著KafkaRDDIterator里的步驟來看代碼
(4)最后返回的消息被封裝到了MessageAndMetadata中,那么messageHandler是個什么東東冰单,我們順著代碼向上找幌缝,是在KafkaUtils的createDirectStream方法有定義的,代碼如下

val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

原來是取消息中的key和message诫欠,如果我們想取出其他參數(shù)可以自定義messageHandler函數(shù)涵卵,是不是有點意思。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末荒叼,一起剝皮案震驚了整個濱河市轿偎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌被廓,老刑警劉巖坏晦,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嫁乘,居然都是意外死亡昆婿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進(jìn)店門蜓斧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來埠巨,“玉大人,你說我怎么就攤上這事吨灭∠倒荩” “怎么了?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵搂蜓,是天一觀的道長狼荞。 經(jīng)常有香客問我,道長帮碰,這世上最難降的妖魔是什么相味? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮殉挽,結(jié)果婚禮上丰涉,老公的妹妹穿的比我還像新娘拓巧。我一直安慰自己,他們只是感情好一死,可當(dāng)我...
    茶點故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布肛度。 她就那樣靜靜地躺著,像睡著了一般投慈。 火紅的嫁衣襯著肌膚如雪承耿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天伪煤,我揣著相機(jī)與錄音加袋,去河邊找鬼。 笑死抱既,一個胖子當(dāng)著我的面吹牛职烧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播防泵,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼蚀之,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了择克?” 一聲冷哼從身側(cè)響起恬总,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肚邢,沒想到半個月后壹堰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡骡湖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年贱纠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片响蕴。...
    茶點故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡谆焊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出浦夷,到底是詐尸還是另有隱情辖试,我是刑警寧澤,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布劈狐,位于F島的核電站罐孝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏肥缔。R本人自食惡果不足惜莲兢,卻給世界環(huán)境...
    茶點故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧改艇,春花似錦收班、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至舵变,卻和暖如春酣溃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纪隙。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留扛或,地道東北人绵咱。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像熙兔,于是被迫代替她去往敵國和親悲伶。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容