Spark 連接kafka兩種方式及區(qū)別(direct和receiver)

Kafka Direct 跟Receiver方式的區(qū)別

Receiver

Receiver

Receiver是使用Kafka的 High-Level Consumer API來實(shí)現(xiàn)的。Receiver從Kafka中獲取的數(shù)據(jù)都存儲在Spark Executor的內(nèi)存中的(如果數(shù)據(jù)暴增,數(shù)據(jù)大量堆積,容易出現(xiàn)oom的問題),Spark Streaming啟動的job會去處理那些數(shù)據(jù)烫映。
在默認(rèn)的配置下上陕,這種方式可能會因?yàn)榈讓拥氖《鴣G失數(shù)據(jù),如果要啟用高可靠機(jī)制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL),該機(jī)制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS,S3)上的預(yù)寫日志中,所以當(dāng)?shù)讓庸?jié)點(diǎn)出現(xiàn)了失敗,可以通過WAL中的數(shù)據(jù)進(jìn)行恢復(fù),但是效率會下降晌砾。

使用時(shí)注意事項(xiàng):

1.操作簡單,代碼量少,不需要手動管理offset,需要開啟wal機(jī)制,可以保證數(shù)據(jù)不丟失,但效率會減低,并且為了保證數(shù)據(jù)不丟失,將一份數(shù)據(jù)存兩份,浪費(fèi)資源
2.無法保證數(shù)據(jù)只被處理一次,在寫入外部存儲的數(shù)據(jù)還未將offset更新到zk就掛掉,這些數(shù)據(jù)會被重復(fù)消費(fèi)
3.kafka的topic的分區(qū)和spark streaming生成的rdd分區(qū)不相關(guān),增加topic的分區(qū)數(shù),只會增加reciver讀取分區(qū)數(shù)據(jù)的線程數(shù),并不會提高spark的處理數(shù)據(jù)的并行度

Direct

Direct

Direct 使用Kafka的Low-Level Consumer api讀取kafka數(shù)據(jù),來獲得每個(gè)topic+partition的最新的offset烦磁,從而定義每個(gè)batch的offset的范圍养匈。當(dāng)處理數(shù)據(jù)的job啟動時(shí),就會使用Kafka的Low-Level Consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)都伪。

使用時(shí)注意事項(xiàng):

1.當(dāng)讀取topic的數(shù)據(jù)時(shí)候,會自動對應(yīng)topic的分區(qū)生成對應(yīng)的RDD分區(qū)并行從Kafka中讀取數(shù)據(jù),在Kafka partition和RDD partition之間呕乎,有一對一的映射關(guān)系。
2.不需要開啟WAL機(jī)制陨晶,只要Kafka中作了數(shù)據(jù)的備份猬仁,那么就可以使用通過Kafka的副本進(jìn)行恢復(fù)。
3.Spark內(nèi)部一定時(shí)同步的,所以可以自己跟蹤offset并保存到checkpoint中,可以保證數(shù)據(jù)不會被重復(fù)消費(fèi)
4.操作復(fù)雜,代碼量大,并且需要自己對offset監(jiān)控維護(hù),增加用戶開發(fā)成本

Receiver配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性先誉,但是卻無法保證數(shù)據(jù)被處理一次且僅一次湿刽,可能會處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的褐耳≌┕耄基于direct的方式,使用kafka的簡單api铃芦,SparkStreaming自己就負(fù)責(zé)追蹤消費(fèi)的offset雅镊,并保存在checkpoint中襟雷。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次仁烹。

區(qū)別

Receiver Direct
需要開啟WAL 不需要開啟WAL
使用高層次 api 使用簡單api
zk自動維護(hù) 手動維護(hù)offset
無法保證數(shù)據(jù)被處理一次 數(shù)據(jù)只被處理一次
代碼簡單,量少 代碼復(fù)雜,量大
topic分區(qū)與rdd分區(qū)不是一對一的關(guān)系 topic分區(qū)與rdd分區(qū)是一對一的關(guān)系
由receiver拉取kafka數(shù)據(jù) 由rdd分區(qū)拉取對應(yīng)分區(qū)的數(shù)據(jù)(kafka與rdd分區(qū)相等的情況)
.. ..


連接kafka的兩種方式 (receiver&direct) 栗子

Maven依賴

 <properties>
 <scala.version>2.11.8</scala.version>
 <spark.version>2.1.3</spark.version>
 <scala.binary.version>2.11</scala.binary.version>
 </properties>
 <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <!-- spark-streaming kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
 </dependencies>

Receiver

package xzw.shuai.kafka.demo

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaReceiver {

  private val topics = "receiver-test"
  private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
  private val numThreads = 1

  def main(args: Array[String]): Unit = {
    //當(dāng)應(yīng)用程序停止的時(shí)候,會將當(dāng)前批次的數(shù)據(jù)處理完成后在停止
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //1000*分區(qū)數(shù)*采樣時(shí)間=拉取數(shù)據(jù)量
    System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
      //設(shè)置監(jiān)控級別
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(HDFS_PATH)
    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
      "group.id" -> "receiver"
    )
    
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)

    // word count
    kafkaDStream
      .map(_._2) // 1是分區(qū)號,2是具體kafka中數(shù)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print(10) // 輸出結(jié)果

    ssc.start()
    ssc.awaitTermination()
  }
}

Direct

package xzw.shuai.kafka.demo

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat

object SparkKafkaDirect {
  private val zkHosts = "node01:2181,node02:2181,node03:2181"
  private val logger = Logger.getLogger("SparkKafkaDirect")
  private val zkPath = "/kafka-direct-test"
  private val topic = Set("direct-test")
  private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val ssc = new StreamingContext(sc, Seconds(5))    

    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "group.id" -> "direct"
    )

    val zkClient: ZkClient = new ZkClient(zkHosts)

    // 讀取 offset
    val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)

    // 獲取到kafka數(shù)據(jù)
    val kafkaDStream: InputDStream[(String, String)] = offsets match {
      // 使用 direct方式消費(fèi)kafka數(shù)據(jù)
      case None =>
        print("start from scratch")
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
      case Some(offset) =>
        print("start with the offset")
        val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
        KafkaUtils.createDirectStream[String, String, StringDecoder,
          StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
    }

    // word count
    kafkaDStream.map(_._2) // 1是分區(qū)號,2是具體kafka中數(shù)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .foreachRDD(print(_)) // 輸出結(jié)果
    
    // 保存偏移量到zk中 , 也可自定義到其他存儲介質(zhì)
    kafkaDStream.foreachRDD(rdd =>
      saveOffset(zkClient, zkHosts, zkPath, rdd)
    )

    ssc.start()
    ssc.awaitTermination()

  }

  // 保存 offset
  def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
    logger.info("save offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsRanges.foreach(offsetRange => logger.debug(s"  Using $offsetRange"))
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
      .mkString(",")
    logger.info("writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
    updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
    logger.info("done updating offsets in zookeeper. took " + stopwatch)
  }

  // 讀取 offset
  def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {

    val stopwatch = new Stopwatch()

    val stat = new Stat()
    val dataAndStat: (Option[String], Stat) = try {
      (Some(zkClient.readData(zkPath, stat)), stat)
    } catch {
      case _ => (None, stat)
      case e2: Throwable => throw e2
    }

    // 獲取offset
    dataAndStat._1 match {
      case Some(offsetsRangeStr) =>
        logger.info(s" Read offset ranges: $offsetsRangeStr")
        val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
          .map(str => str.split(":"))
          .map {
            case Array(partitions, offset) =>
              TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
          }.toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offset)
      case None =>
        logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  // 更新 zk中的 offset
  def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
    try {
      zkClient.writeData(zkPath, offsetsRangesStr)
    } catch {
      // 如果失敗了 ==> 沒有此目錄,則創(chuàng)建目錄
      case _: ZkNoNodeException =>
        createParentPath(zkClient, zkPath)
        try {
          // 創(chuàng)建一個(gè)持久的節(jié)點(diǎn) ==> 即 目錄
          // 在offset寫入到 該節(jié)點(diǎn)
          zkClient.createPersistent(zkPath, offsetsRangesStr)
        } catch {
          case _: ZkNodeExistsException =>
            zkClient.writeData(zkPath, offsetsRangesStr)
          case e2: Throwable => throw e2
        }
      case e2: Throwable => throw e2
    }
  }

  // 如果path不存在,則創(chuàng)建
  def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
    val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
    if (parentDir.length != 0)
      zkClient.createPersistent(parentDir, true)
  }

  // 過程時(shí)間
  class Stopwatch {
    private val start = System.currentTimeMillis()

    override def toString: String = (System.currentTimeMillis() - start) + " ms"
  }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末耸弄,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子晃危,更是在濱河造成了極大的恐慌叙赚,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件僚饭,死亡現(xiàn)場離奇詭異,居然都是意外死亡胧砰,警方通過查閱死者的電腦和手機(jī)鳍鸵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尉间,“玉大人偿乖,你說我怎么就攤上這事≌艹埃” “怎么了贪薪?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長眠副。 經(jīng)常有香客問我画切,道長,這世上最難降的妖魔是什么囱怕? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任霍弹,我火速辦了婚禮,結(jié)果婚禮上娃弓,老公的妹妹穿的比我還像新娘典格。我一直安慰自己,他們只是感情好台丛,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布耍缴。 她就那樣靜靜地躺著,像睡著了一般挽霉。 火紅的嫁衣襯著肌膚如雪防嗡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天炼吴,我揣著相機(jī)與錄音本鸣,去河邊找鬼。 笑死硅蹦,一個(gè)胖子當(dāng)著我的面吹牛荣德,可吹牛的內(nèi)容都是我干的闷煤。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼涮瞻,長吁一口氣:“原來是場噩夢啊……” “哼鲤拿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起署咽,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤近顷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后宁否,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窒升,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年慕匠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了饱须。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,997評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡台谊,死狀恐怖蓉媳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锅铅,我是刑警寧澤酪呻,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站盐须,受9級特大地震影響玩荠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜丰歌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一姨蟋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧立帖,春花似錦眼溶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绑咱,卻和暖如春绰筛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背描融。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工铝噩, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人窿克。 一個(gè)月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓骏庸,卻偏偏與公主長得像毛甲,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子具被,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容