Spark Streaming 對接 Kafka 的那些事兒

1. Background

Kafka 作為 Spark Streaming 數(shù)據(jù)的最重要的來源败玉,官方為此專門提供了二者整合的 jar 包。這使得我們可以很方便的對接二者疫铜,但是其中的細節(jié)還是很繁雜的茂浮。了解其中的原理對于理解 Spark 和 Kafka 都有很大幫助,也是我們日后進行調優(yōu)的基石壳咕。

2. Basic

2.1 Spark Streaming 如何對接 Kafka席揽?

對于 Kafka 的版本來說,有兩個重要節(jié)點:0.8.2 和 0.10谓厘。也就意味著 Spark 官方提供的整合 jar 包有兩個重要節(jié)點版本:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10幌羞。0.8.2 可以通過 receiver based 或者 dirct 的方式進行。0.10 則只支持 dircet 方式的對接竟稳。注意在 Spark 2.3 以后已經(jīng)將 Kafka 0.8 版本的支持標為過時属桦。

2.2 我們談論 RDD 的時候是在談論什么?

大家都知道他爸,Spark Streaming 是基于 DStream 展開的聂宾,通過 action 操作觸發(fā)執(zhí)行,并在設定的時間間隔匯集多個 RDD 組成 DStream诊笤,最終封裝成 task 到 executor 調度系谐。是一種批處理,偽實時的計算框架讨跟。所以 Spark Streaming 底層是 Spark Core纪他,而 Spark Core 的核心又是 RDD鄙煤。那么這么抽象的 RDD 到底表示了什么?除了源碼注釋中的五大特性外止喷,RDD 中包含了數(shù)據(jù)嗎馆类?答案顯然是不包含的。比如 textfile 生成的 RDD弹谁,實際上他是文件位置的描述乾巧。KafkaRDD 生成的時候實際上是指定了一定范圍的 offset。BlockRDD 實際上存儲的是 blockId预愤,即數(shù)據(jù)在 blockmanager 的標識符沟于。之所以不包含數(shù)據(jù),是因為大數(shù)據(jù)量的數(shù)據(jù)都是從 driver 端發(fā)送的話植康,那么 driver 的壓力會很大旷太,而且存在單點故障問題。剛剛提到的 BlockRDD 和 KafkaRDD 分別對應了 Spark Streaming 以 receiver based 和 direct 對接的方式產(chǎn)生的 RDD销睁。因為 BlockRDD 是基于 receiver 的供璧,所以他的分區(qū)數(shù)和 Kafka topics 的分區(qū)是沒有任何關系的,BlockRDD 一個分區(qū)對應的是一個 block冻记,而一個 block 是 receiver 每隔 200ms(默認值) 生成的睡毒。而 KafkaRDD 則是直接對接 Kafka,他的分區(qū)和 Kafka topics 的分區(qū)是一一對應的冗栗。前面說所有的 RDD 都不包含數(shù)據(jù)也是有特例的演顾,parallelize 方式從內存中直接生成的 RDD 會將數(shù)據(jù)從 driver 發(fā)送到 executor,可以理解為是包含數(shù)據(jù)的隅居,而不再是一種描述/標識符钠至。

2.3 High level API & Simple consumer

在 Kafka 中有兩種類型的消費者,一種是具有消費者組的概念可以自己管理組內 rebalance胎源,并且管理 offset 的我們稱之為 High level API棉钧;另一種是需要調用者自己管理 offset 并且不具備消費者組概念的我們稱之為 Simple consumer。后面版本又出了 new consumer 整合了二者涕蚤,這里不做過多介紹宪卿。

2.4 WAL(write-ahead logs)

以下截選自官網(wǎng):

Configuring write-ahead logs - Since Spark 1.2, we have introduced write-ahead logs for achieving strong fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into a write-ahead log in the configuration checkpoint directory. This prevents data loss on driver recovery, thus ensuring zero data loss.It is recommended that the replication of the received data within Spark be disabled when the write-ahead log is enabled as the log is already stored in a replicated storage system.

WAL 即預寫日志,通過將數(shù)據(jù)和計算進程等信息保存在磁盤上赞季,當集群出現(xiàn)故障進行恢復時愧捕,可以恢復到故障前的狀態(tài)奢驯。如果 Spark 開啟了 WAL申钩,建議將 Spark 中的多副本關閉,因為數(shù)據(jù)已經(jīng)通過 WAL 保存在了可靠的存儲中瘪阁。checkpoint 和 WAL 什么關系撒遣?初學者可能會混淆二者的概念邮偎,checkpoint 根據(jù)在不同的語境有不同的含義,比如在 Spark Streaming 中的注釋是:

Set the context to periodically checkpoint the DStream operations for driver fault-tolerance.

而 RDD 中的 checkpoint 則要復雜一些义黎,具體可以參考之前的這篇文章禾进。而 WAL 則是依賴 checkpoint 實現(xiàn)的,當然我們可以修改源碼通過其他的方式實現(xiàn) WAL廉涕。

2.5 Kakfa offset

雖然這是 Kafka 的知識泻云,但是 Spark Streaming 畢竟對接的是 Kakfa,所以一些基本的概念還要清楚的狐蜕。首先看看什么是 offset:

用來唯一的標識 kafka topic 中分區(qū)的每一條記錄宠纯。

再來看看 committed offset:

記錄 consumerGroup 在指定(topic,partition)的消費記錄层释。
每一個(group,topic,partition)確定一個 commited offset

offset 可以理解為書的頁碼婆瓜。而 committed offset 可以理解為書簽,這個書簽一定是針對某一個人(對應 kafka 的消費者組)的贡羔,表示了某個人對某本書的閱讀進程廉白。

committed offset 可以用持久化的方式保存在任何地方。比如 Zookeeper乖寒、Kakfa猴蹂、甚至直接保存在 HDFS 中。以 Zookeeper 為例宵统,保存的路徑是 /consumers/xx-consumer_group/offsets/xx-topic/xx-partition/晕讲。例如名字為"console-consumer-57704"的消費者組在 topic 名為 test 上的 0 號分區(qū)的 committed offset 可以通過 get /consumers/console-consumer-57704/offsets/test/0 來獲取。

我們前面說 receiver based 的對接方式消費 kafka 數(shù)據(jù)使用的是 High level api马澈,框架幫我們管理 offset( 保存在 Zookeeper)瓢省,是具有消費者組的概念的。也即是多個 receiver 可以位于同一個消費者組痊班,共同完成一份數(shù)據(jù)的消費勤婚。對于 direct 模式,我們在使用 simple consumer(082版本) or new consumer(010版本) 時引入了一個 OffsetRange 的概念涤伐,這個類有四個變量:topic, partition, fromOffset, untilOffset馒胆,可以看到其中是沒有消費者組的,即單純的表示了 Kafka 中某個 topic-partition 的一段消息凝果。類比之前書的例子祝迂,則單純的代表了某本書的某幾頁,和任何讀者是無關的器净。

2.6 metadata.broker.list & bootstrap.servers & zookeeper

在 Spark Streaming 對接 Kafka 時型雳,經(jīng)常會遇到這三個參數(shù)。首先可以簡單將 bootstrap.servers 理解為新版本中的 metadata.broker.list,新版本中已經(jīng)將 metadata.broker.list標記為過時纠俭。那么 bootstrap.servers和 zookeeper 參數(shù)之間有什么差別呢沿量?首先需要知道的是,二者都是為了保存 Kakfa 中一些重要信息冤荆,如元數(shù)據(jù)信息和消費的 offset 信息朴则。舊版本中這些信息都保存在了 Zookeeper 中,如獲取 "first" 的元數(shù)據(jù)信息kafka-topics.sh --zookeeper hadoop102:2181 --desc --topic first钓简,結果如下:

? Topic:first PartitionCount:3 ReplicationFactor:3 Configs:
? Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
? Topic: first Partition: 1 Leader: 103 Replicas: 103,104,102 Isr: 102,103,104
? Topic: first Partition: 2 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103

這些信息我們可以通過 Zookeeper 客戶端獲任诙省:以 2 號分區(qū)為例:get /brokers/topics/first/partitions/2/state

{"controller_epoch":26,"leader":104,"version":1,"leader_epoch":54,"isr":[104,102,103]}

而在新版本中,這些信息都存在了 Kakfa broker 中外邓。

2.7 receiver reliability

以下內容來自官網(wǎng):

Reliable Receiver: A reliable receiver correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with replication.

Unreliable Receiver:An unreliable receiver does not send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.

可以看到芥被,receiver 是否可靠取決于 receiver 是否發(fā)送 ACK 給數(shù)據(jù)源。

3. Deep

3.1 Receiver-based Approach

關于 receiver based 方式需要知道的事兒:

  • 工作流程:driver 調度 receiver 到 executor 端啟動坐榆,receiver 會持續(xù)不斷從 kafka 中接收數(shù)據(jù)并存放到 BlockManager 中拴魄。生成 job 時會將該批次的所有 block 組裝起來生成 BlockRDD。一個 block 對應一個 task席镀。block/task 的個數(shù)有兩個參數(shù)決定匹中,一個是 Spark Streaming 程序一個批次的間隔(batchDuration),另一個是 receiver 生成一個 block 的時間周期(spark.streaming.blockInterval默認值是 200ms )豪诲。比如批次間隔是 2s顶捷,block 周期默認 200ms,那么一個批次將產(chǎn)生 10 個 block/task屎篱。即生成的一個 BlockRDD 具有 10 個分區(qū)服赎,分區(qū)數(shù)量決定了處理數(shù)據(jù)的并行度/效率:如果通過設置參數(shù)導致分區(qū)數(shù)多小,將無法充分利用集群資源交播。但分區(qū)數(shù)也不能過多重虑,生成 block 的周期不應小于 50 ms,任務調度所占用的時間比重將過大秦士。

  • Kafka topic 的分區(qū)和 Spark Streaming 生成的 BlockRDD 的分區(qū)不是一一對應的關系缺厉。通過 KafkaUtils.createStream()創(chuàng)建的 ReceiverInputDStream中有一個參數(shù)topics: Map[String, Int],key 表示 topic_name,value 表示 numPartitions隧土。通過增加 numPartitions 的數(shù)量提针,只是單純增加了消費者組中消費者的個數(shù),并不能增加 Spark 接收/處理數(shù)據(jù)的并行度曹傀。若想增加接收數(shù)據(jù)的并行度應該增加 receiver 的個數(shù)辐脖,通過調用多次KafkaUtils.createStream()來創(chuàng)建多個 receiver 和 DStream,然后用 StreamingContext.union(streams: Seq[DStream[T]])來合并多個 DStream皆愉,以此來增加接收數(shù)據(jù)的并行度嗜价。

  • 每個 receiver 需要占用一個 cpu落萎,所以在本地模式下,不要使用 local[1] 這種方式炭剪;在集群模式下,總核數(shù)要大于總receiver 的個數(shù)

  • 如果開啟了 WAL 機制翔脱,那么創(chuàng)建的 DStream 的存儲級別應該設置為單副本:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • receiver 接收的 block 會放入blockmananger奴拦,每個 executor 都會有一個 blockmanager 實例,由于數(shù)據(jù)的本地性届吁,receiver 所在的 executor 會被調度執(zhí)行更多的 task错妖,就會導致其他某些 executor 比較空閑【毋澹可以通過1.增加 receiver 2.repartition 增加分區(qū) 3.調小參數(shù)spark.locality.wait(How long to wait to launch a data-local task before giving up and launching it on a less-local node) 來緩解暂氯。

創(chuàng)建入口:

def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }


KafkaUtils 提供了兩個入口供我們創(chuàng)建 ReceiverInputDstream ,第一個簡易版的默認消息的 KV 都是 String 類型亮蛔,并且持久化級別是 MEMORY_AND_DISK_SER_2痴施,不支持傳遞 Kakfa 配置參數(shù)。第二個則支持配置所有參數(shù)究流。下面的代碼演示中 No WAL 因為要設置auto.commit.interval.ms參數(shù)辣吃,所以使用了第二種方式創(chuàng)建。 With WAL 則使用了第一種簡易版的入口芬探。

3.1.1 No WAL

object ReceiverWithoutWAL {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf()
      .setAppName("ReceiverWithoutWAL")
      .setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

    val kafkaParams: Map[String, String] = Map[String, String](
      "zookeeper.connect" -> "hadoop102:2181",
      "group.id" -> "consumer-group_receiver_no_wal",
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.commit.interval.ms" -> "1000")

    val ssc = new StreamingContext(sc, Seconds(10))
    val topicMap = Map("topic_receiver_no_wal" -> 1)

    val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

這里將 offset 自動保存在 Zookeeper 中神得,并且自動提交 offset 時間間隔設置為 1s,Spark Streaming 批次間隔為 10s偷仿。通過測試發(fā)現(xiàn)哩簿,提交 offset 后沒有處理數(shù)據(jù)(沒到下一個批次的時間間隔),此時 driver 掛掉 --> receiver 底層存儲依賴的BlockManager 掛掉 --> 存儲的數(shù)據(jù)丟失酝静。重啟后去按照最新 offset 去 kafka 拉數(shù)據(jù)造成上一批數(shù)據(jù)丟失未處理节榜。p.s. 在 Idea 中直接點擊停止終止程序時,無論是否到達 offset 提交周期别智,都會自動提交 offset 再關閉程序全跨;而在控制臺中直接 kill 掉 jvm 進程則不會自動提交 offset。

3.1.2 With WAL

object ReceiverWithWAL {

  def createSSC(): StreamingContext = {
    val sparkConf: SparkConf = new SparkConf()
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .setAppName("ReceiverWithWAL")
      .setMaster("local[*]")

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("/ch")
    val topicMap = Map("topic_receiver_with_wal" -> 1)
    val zkQuorum = "hadoop102:2181"
    val groupId = "consumer-group_receiver_with_wal"
    val lines: DStream[String] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    lines.print()
    ssc
  }


  def main(args: Array[String]): Unit = {
    val ssc = StreamingContext.getActiveOrCreate("file:///Users/tianciyu/Desktop/ch", () => createSSC())

    ssc.start()
    ssc.awaitTermination()
  }

}

這里通過配置spark.streaming.receiver.writeAheadLog.enable參數(shù)開啟了 WAL亿遂,并通過 getActiveOrCreate都方式獲取 StreamingContext浓若,這樣就可以在發(fā)生故障后,通過 WAL 從設置的 checkpoint 目錄恢復計算和數(shù)據(jù)蛇数。注意因為開啟了 WAL挪钓,所以將持久化級別的副本數(shù)設置為了1個。

以下截取自源碼:

val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)

def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
}

其中 useReliableReceiver 是通過 walEnabled 賦值的耳舅÷瞪希可以看到源碼中通過讀取spark.streaming.receiver.writeAheadLog.enable參數(shù)來決定是否使用 ReliableReceiver倚评。而在 ReliableReceiver 中會將auto.commit.enable設置為 false。目的是寫入 WAL 預寫日志后再更新 offset馏予。此時沒有處理數(shù)據(jù) driver 掛掉天梧,重啟后從 checkpoint 中恢復狀態(tài)并從 WAL 拉取數(shù)據(jù),成功處理上一批數(shù)據(jù)防止了數(shù)據(jù)丟失(沒有 WAL 則 receiver 接收的數(shù)據(jù)只存儲在 BlockManager 中)霞丧。WAL 的本質是先處理數(shù)據(jù)再更新 offset呢岗,這里的處理數(shù)據(jù)對框架來說便是將數(shù)據(jù)持久化保存在可靠存儲。

3.2 Direct Approach

關于 direct 方式對接 Kafka 需要知道的事兒:

  • 沒有 receiver蛹尝,少占用一個cpu核

  • 不再需要 receiver 接收數(shù)據(jù)后豫,寫入 blockManager,運行時再通過 blockId 取數(shù)據(jù)突那。沒有多余的網(wǎng)絡傳輸挫酿、磁盤讀取來獲取數(shù)據(jù)的過程。而是采用 simple consumer(082版本) / new consumer(010版本) 的方式通過每個批次對應的 OffsetRange 直接從 Kafka 中讀取數(shù)據(jù)愕难,提高了效率早龟。

  • DirectKafkaInputDStream 生成的 RDD 不再是 BlockRDD,而是KafkaRDD 猫缭。KafkaRDD 的分區(qū)和 Kafka topic 的分區(qū)一一對應拄衰,更便于并行度的調優(yōu)。對比基于 receiver 的方式饵骨,則需要多次創(chuàng)建 ReceiverInputDStream 然后進行 union翘悉。

  • 無需 WAL。因為直接從 Kakfa 中取數(shù)據(jù)居触,所以 driver 掛了并不會造成數(shù)據(jù)丟失妖混。對比 receiver based 方式,因為 receiver 的存在轮洋,雖然數(shù)據(jù)最初來源自可靠存儲 Kafka 中制市,但是加了一層 receiver ,想保證數(shù)據(jù) 0 丟失仍然需要 WAL 配合弊予。

  • 不再使用 Zookeeper 存儲 offsets祥楣。

    對于 082 版本來說:

    Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints.

    offset 默認是保存在 checkpoint 中,所以必須配置 checkpoint汉柒,否則每次重啟將觸發(fā) auto.offset.reset 邏輯误褪。

    對于 010 版本來說:

    Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets.

    offset 默認是保存在 Kafka 一個特殊的 topic 中(__consumer_offsets)。這個無需配置碾褂。

  • 可以通過手動維護 offset兽间,實現(xiàn)精準一次消費語義(Exactly-once semantics)

3.2.1 kafka version: 0.8.2

/** 
*       Requires "metadata.broker.list" or "bootstrap.servers"
*   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
*   host1:port1,host2:port2 form.
*   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
*   to determine where the stream starts (defaults to "largest")
*/

def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  )

首先看一下入口函數(shù),注釋中有兩點需要注意:1. 需要配置metadata.broker.list 或者 bootstrap.servers正塌。 2.如果沒有配置 checkpoint嘀略,則由auto.offset.reset參數(shù)決定每次啟動時從哪里讀取數(shù)據(jù)恤溶。

這里以配置了 checkpoint 為例,寫一個 quick start demo:

object DirectApproach082 {


  def createSSC(): StreamingContext = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaStreamingApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("file:///Users/tianciyu/Desktop/ch")
    val bootstrapServer:String = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
    val group:String = "consumer-group_direct_082"
    val deserializer:String = "org.apache.kafka.common.serialization.StringDeserializer"
    val topic:String = "topic_direct_082"

    val paramsMap: Map[String, String] = Map[String,String] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServer,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserializer,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserializer
    )
    val lines: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      paramsMap,
      Set(topic))

    lines.print()
    ssc
  }

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:///Users/tianciyu/Desktop/ch", ()=>createSSC())
    ssc.start()
    ssc.awaitTermination()
  }

}

我們也可以在處理每個批次時帜羊,獲取當前消費的 offset:

...

lines.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    }

...

這里有兩點需要注意:1. rdd.asInstanceOf[HasOffsetRanges]操作只在 Dstream 的第一個方法調用中有效咒程,如果需要的話一般將transform()作為第一個調用。 2. KafkaRDD 的分區(qū)和 Kakfa topic 中的分區(qū)一一對應是在初始階段讼育,如果后續(xù)使用了 shuffle 算子如 reduceByKey(),repartition()等操作將不再一一對應帐姻。

3.2.2 kafak version: 0.10

對于 0.10 版本而言,多了兩個參數(shù)窥淆。

@param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
@param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe

我們分別傳入最常用的 preferConsistent 和 subscribe 即可。其他和 0.82 版本類似:

...

val topicsSet = "topic_direct_082".split(",").toSet
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092",
                                      "key.deserializer"->classOf[StringDeserializer],
                                      "value.deserializer"-> classOf[StringDeserializer],
                                      "group.id"->"consumer-group_direct_010")

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

...

還有一點與 0.8.2 不同的是 offset 的保存方式巍杈。0.8.2 保存在 checkpoint 中(如不設置每次都會觸發(fā)auto.offset.reset邏輯)忧饭。而在 0.10 中,offset 是保存在 kafka 的 topic:__consumer_offsets 里面筷畦,定期的自動提交词裤。

3.3 Offset Management

部分內容已經(jīng)在前面提及,這里做一下匯總和補充鳖宾。

3.3.1 自動提交 offset

通過配置 enable.auto.commit 為 true 并結合 auto.commit.interval.ms 提交間隔進行自動提交的設置吼砂。

其中 receiver based 方式會將 offset 自動提交到 Zookeeper。direct 方式根據(jù)版本不同提交的位置不同鼎文,0.8.2 版本是自動保存在 checkpoint 中渔肩,所以需要手動配置 checkpoint ,否則無效拇惋。 0.10 版本自動提交到 Kakfa broker 中的名為 __consumer_offsets 的 topic 里周偎。這種方式無需另外配置。

3.3.2 手動提交 offset

從上面自動提交可以發(fā)現(xiàn)撑帖,offset 存放的地方至少有三種:Zookeeper蓉坎、checkpoint、Kafka胡嘿。所以手動提交一樣可以將 offset 提交到這三個地方蛉艾,其中 checkpoint 并不推薦,原因是 checkpoint 自身具有缺陷衷敌,比如無法更新代碼等原因勿侯。這里以 Zookeeper 的方式為例進行演示:

package kafka2ss.direct

import kafka.common.TopicAndPartition
import kafka.consumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.immutable.HashMap


object ManualOffsetZK {

  def getKafkaStream(kafkaParams: Map[String, String], group: String, ssc: StreamingContext, topics: String*): InputDStream[(String, String)] = {
    val kafkaCluster = new KafkaCluster(kafkaParams)
    var kafkaStream: InputDStream[(String, String)] = null
    val partitionsE: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(topics.toSet)
    if (partitionsE.isLeft) throw new SparkException("get kafka partition failed:")

    val partitions: Set[TopicAndPartition] = partitionsE.right.get
    //從zookeeper中獲取offset信息
    val offsetsE: Either[Err, Map[TopicAndPartition, Long]] = kafkaCluster.getConsumerOffsets(group, partitions)
    //如果在zookeeper中沒有記錄,就從最小的offset開始消費
    if (offsetsE.isLeft) {
      kafkaParams + ("auto.offset.reset" -> "smallest")
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
    } else {
      val offsets: Map[TopicAndPartition, Long] = offsetsE.right.get

      val earliestOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = kafkaCluster.getEarliestLeaderOffsets(partitions).right.get
      var newConsumerOffsets: HashMap[TopicAndPartition, Long] = HashMap()

      offsets.foreach((f: (TopicAndPartition, Long)) => {
        val min: Long = earliestOffsets(f._1).offset
        //如果zookeeper中記錄的offset在kafka中不存在(已經(jīng)過期),就指定其現(xiàn)有kafka的最小offset位置開始消費
        newConsumerOffsets += (f._1 -> Math.max(f._2,min))
      })

      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
        ssc, kafkaParams, newConsumerOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))

    }

    kafkaStream
  }

  def saveOffset(kafkaCluster: KafkaCluster, group: String, kafkaDStream: InputDStream[(String,String)]): Unit = {

    kafkaDStream.map(_._2).foreachRDD {
      rdd: RDD[String] => {
        val offsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
        val ranges: Array[OffsetRange] = offsetRanges.offsetRanges
        var topicAndPartitionToOffset = new HashMap[TopicAndPartition, Long]()
        for (range <- ranges) {
          topicAndPartitionToOffset += (range.topicAndPartition() -> range.untilOffset)
        }
        kafkaCluster.setConsumerOffsets(group, topicAndPartitionToOffset)
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaLowStreamingApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))
    val bootstrapServer: String = "hadoop102:9092"
    val group: String = "consumer_group_offset_management_zk"
    val deserializer: String = "org.apache.kafka.common.serialization.StringDeserializer"
    val topic: String = "topic_offset_management_zk"

    val paramsMap: Map[String, String] = Map[String, String](
      "zookeeper.connect" -> "hadoop102:2181,hadoop103:2181,hadoop104:2181",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServer,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserializer,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserializer
    )

    val kafkaCluster = new KafkaCluster(paramsMap)
    val kafkaDStream: InputDStream[(String, String)] = getKafkaStream(paramsMap,group,ssc,topic)

    //消費
    kafkaDStream.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print
    saveOffset(kafkaCluster, group, kafkaDStream)

    ssc.start()
    ssc.awaitTermination()

  }

}

3.4 Exactly-once semantics

在講解如何實現(xiàn) Exactly-once 語義前缴罗,先了解以流處理系統(tǒng)中語義的概念罐监。

The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

At most once: Each record will be either processed once or not processed at all.
At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.
Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

首先要明確的是,所謂語義一定是和故障恢復關聯(lián)的瞒爬,如果不考慮故障的話弓柱,全都是 exactly-once沟堡。那么如何能做到所謂的精確一次消費呢?

If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let’s understand the semantics of these steps in the context of Spark Streaming.

如果保證端到端的精確一次消費矢空,需要滿足三個過程都是精確一次消費航罗。結合語義和故障恢復的強關聯(lián),換句話說屁药,所謂 Exactly once 就是三個過程的容錯性保證粥血。

  1. 接收數(shù)據(jù):

If all of the input data is already present in a fault-tolerant file system like HDFS, Spark Streaming can always recover from any failure and process all of the data.This gives exactly-once semantics, meaning all of the data will be processed exactly once no matter what fails.

  • 如果數(shù)據(jù)源來自 HDFS 這種支持容錯的存儲系統(tǒng)的文件,那么該過程可以保證精確一次語義酿箭。官網(wǎng)這段話指出了保證接收數(shù)據(jù)時精確一次消費的要點:容錯性复亏。需要注意這里的容錯性隱性包含了:原數(shù)據(jù)端必須支持隨機讀取。比如通過控制臺 socket 作為原數(shù)據(jù)端無法保證 exactly-once缭嫡,最核心的問題是在故障恢復的時候沒辦法讀取之前的數(shù)據(jù)缔御。
  • receiver based 對接 Kafka 的方式,只能保證最少一次語義
  • direct 模式對接 Kafka 可以保證精確一次

關于 receiver based 方式只能保證一次語義妇蛀,direct 可以保證精確一次的官網(wǎng)解釋如下:

There is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.

這段話理解起來有一定難度耕突,難點在于理解什么是 Spark Streaming 數(shù)據(jù)的接收與 Zookeeper 維護 offset 的不一致性。因為要保證最少一次或者精確一次的前提是不能丟數(shù)據(jù)评架,所以 receiver based 肯定是開啟了 WAL 機制的眷茁。設想這樣一種場景:receiver 從 Kakfa 拉數(shù)據(jù)并寫入 WAL 后,沒來得及更新 offset 時纵诞,receiver 掛掉上祈。恢復后浙芙,因為 WAL 的機制雇逞,Spark Streaming 會從 WAL 中恢復計算和數(shù)據(jù)损谦,消費了之前的數(shù)據(jù)流昏,但是 offset 是沒有更新的霎褐,所以拉取數(shù)據(jù)時會重復拉取上一批數(shù)據(jù)并二次消費掰派。而 direct 模式消除了這種不一致性色迂,數(shù)據(jù)的接收和 offset 的維護都是 Spark Streaming 自己負責除呵。

  1. 處理數(shù)據(jù):因為底層依賴的 RDD 的各種容錯機制巧号,可以在處理數(shù)據(jù)過程中即使發(fā)生錯誤也能保證精確一次消費浓瞪。具體可以參考這篇文章:spark RDD 容錯

  2. 輸出結果

  • offset 和結果放在不同位置矾瘾,根據(jù)二者執(zhí)行的先后順序有 at least once 和 at most once
  • offset 和結果放在一個(repartition(1))事務中 --> exactly-once

4. Ref

  1. http://spark.apache.org/docs/2.4.2/streaming-programming-guide.html

  2. http://spark.apache.org/docs/2.4.2/streaming-kafka-0-8-integration.html

  3. http://spark.apache.org/docs/2.4.2/streaming-kafka-0-10-integration.html

  4. What is the difference between simple consumer and high level consumer?

  5. https://www.slideshare.net/QuentinAmbard/exactly-once-with-spark-streaming

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末女轿,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子壕翩,更是在濱河造成了極大的恐慌蛉迹,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件放妈,死亡現(xiàn)場離奇詭異北救,居然都是意外死亡荐操,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門珍策,熙熙樓的掌柜王于貴愁眉苦臉地迎上來托启,“玉大人,你說我怎么就攤上這事攘宙⊥退剩” “怎么了?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵蹭劈,是天一觀的道長疗绣。 經(jīng)常有香客問我,道長铺韧,這世上最難降的妖魔是什么多矮? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮祟蚀,結果婚禮上工窍,老公的妹妹穿的比我還像新娘割卖。我一直安慰自己前酿,他們只是感情好,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布罢维。 她就那樣靜靜地躺著,像睡著了一般丙挽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上平窘,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天肤舞,我揣著相機與錄音,去河邊找鬼李剖。 笑死芒率,一個胖子當著我的面吹牛,可吹牛的內容都是我干的篙顺。 我是一名探鬼主播偶芍,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼充择,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了腋寨?” 一聲冷哼從身側響起聪铺,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萄窜,沒想到半個月后铃剔,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡查刻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年键兜,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片穗泵。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡普气,死狀恐怖,靈堂內的尸體忽然破棺而出佃延,到底是詐尸還是另有隱情现诀,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布履肃,位于F島的核電站仔沿,受9級特大地震影響,放射性物質發(fā)生泄漏尺棋。R本人自食惡果不足惜封锉,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望膘螟。 院中可真熱鬧成福,春花似錦、人聲如沸荆残。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽内斯。三九已至蕴潦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嘿期,已是汗流浹背品擎。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留备徐,地道東北人萄传。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親秀菱。 傳聞我的和親對象是個殘疾皇子振诬,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349