Spark中的exactly once語(yǔ)義

exactly once指的是在處理數(shù)據(jù)的過(guò)程中径荔,系統(tǒng)有很好的容錯(cuò)性(fault-tolerance)少辣,能夠保證數(shù)據(jù)處理不重不丟耐齐,每一條數(shù)據(jù)僅被處理一次厂抽。
Spark具備很好的機(jī)制來(lái)保證exactly once的語(yǔ)義需频,具體體現(xiàn)在數(shù)據(jù)源的可重放性、計(jì)算過(guò)程中的容錯(cuò)性筷凤、以及寫入存儲(chǔ)介質(zhì)時(shí)的冪等性或者事務(wù)性贺辰。

數(shù)據(jù)源的可重放性

數(shù)據(jù)源具有可重放性指的是在出現(xiàn)問(wèn)題時(shí)可以重新獲取到需要的數(shù)據(jù),一般Spark的數(shù)據(jù)源分為兩種:
第一種為基于文件系統(tǒng)的數(shù)據(jù)源嵌施,如HDFS饲化,這種數(shù)據(jù)源的數(shù)據(jù)來(lái)自于文件,所以本身就具備了可重放性吗伤;
另一種為基于外部接收器的數(shù)據(jù)源吃靠,如Kafka,這種數(shù)據(jù)源就不一定能夠具備可重放性足淆,需要具體考慮巢块。

以Kafka為例,一般與Kafka配合使用的是Spark中的SparkStreaming模塊巧号,用來(lái)對(duì)流式數(shù)據(jù)進(jìn)行準(zhǔn)實(shí)時(shí)的處理族奢。而SparkStreaming接入Kafka的數(shù)據(jù)有兩種模式,一種為Receiver模式丹鸿,一種為Direct模式越走。

Receiver模式
Receiver模式采用Kafka的高階consumer API,Kafka自己封裝了對(duì)數(shù)據(jù)的獲取邏輯,且通過(guò)Zookeeper管理offset信息廊敌,這種模式在與SparkStreaming對(duì)接時(shí)铜跑,有以下特點(diǎn):

  1. Kafka中的partition數(shù)量與SparkStreaming中的并行度不是一一對(duì)應(yīng)的,SparkStreaming通過(guò)創(chuàng)建Receiver去讀取Kafka中數(shù)據(jù)骡澈,createStream()方法傳入的并發(fā)參數(shù)代表的是讀取Kafka中topic+partition的線程數(shù)锅纺,并不能提高SparkStreaming讀取數(shù)據(jù)的并行度。
  2. Kafka自己管理offset肋殴,Receiver作為一個(gè)高層的Consumer來(lái)消費(fèi)數(shù)據(jù)囤锉,其消費(fèi)的偏移量(offset)由Kafka記錄在Zookeeper中,一旦出現(xiàn)錯(cuò)誤护锤,那些已經(jīng)標(biāo)記為消費(fèi)過(guò)的數(shù)據(jù)將會(huì)丟失官地。

Receiver模式下,為了解決讀取數(shù)據(jù)時(shí)的并行度問(wèn)題蔽豺,可以創(chuàng)建多個(gè)DStream区丑,然后union起來(lái)拧粪,具體可參考文章:http://www.reibang.com/p/c8669261165a修陡;為了解決數(shù)據(jù)丟失的問(wèn)題,可以選擇開(kāi)啟Spark的WAL(write ahead log)機(jī)制可霎,每次處理數(shù)據(jù)前將預(yù)寫日志寫入到HDFS中魄鸦,如果節(jié)點(diǎn)出現(xiàn)錯(cuò)誤,可以從WAL中恢復(fù)癣朗。但是這種方法其實(shí)效率低下拾因,不僅數(shù)據(jù)冗余(Kafka中有副本機(jī)制,Spark中還要存一份)旷余,且無(wú)法保證exactly once绢记,數(shù)據(jù)可能重復(fù)消費(fèi)。

無(wú)論采取什么方法進(jìn)行補(bǔ)救正卧,Receiver模式都不能夠?qū)崿F(xiàn)exactly once的語(yǔ)義蠢熄,其根本原因是Kafka自己管理的offset與SparkStreaming實(shí)際處理數(shù)據(jù)的offset沒(méi)有同步導(dǎo)致的。

Direct模式
為了解決Receiver模式的弊病炉旷,Spark1.3中引入了Direct模式來(lái)替代Receiver模式签孔,它使用Kafka的Simple consumer API,由Spark應(yīng)用自己管理offset信息窘行,以達(dá)成exactly once的語(yǔ)義饥追,其特點(diǎn)如下:

  1. Kafka中的partition與SparkStreaming中的partition一一對(duì)應(yīng),也就是SparkStreaming讀取數(shù)據(jù)的并行度取決于Kafka中partition的數(shù)量罐盔。
  2. 不依賴Receiver但绕,而是通過(guò)低階api直接找到topic+partition的leader獲取數(shù)據(jù),并由SparkStreaming應(yīng)用自己負(fù)責(zé)追蹤維護(hù)消費(fèi)的offset惶看。

由于SparkStreaming自己可以維護(hù)offset壁熄,所以應(yīng)用自身消費(fèi)的數(shù)據(jù)和偏移量之間的對(duì)應(yīng)關(guān)系確定的帚豪,數(shù)據(jù)也是同步的,所以可以實(shí)現(xiàn)exactly once的語(yǔ)義草丧。

下面將給出Direct模式下狸臣,SparkStreaming應(yīng)用管理offset的方法案例,其中offset依然是存放在zookeeper中昌执,但是由應(yīng)用自身來(lái)管理的烛亦,offset也可以放在Redis、MySQL懂拾、HBase中進(jìn)行管理煤禽,根據(jù)具體情況進(jìn)行選擇。

createDirectStream方法:

def createDirectStream(ssc:StreamingContext)(implicit streamingConfig: StreamingConfig, kc: SimpleKafkaCluster): InputDStream[(Array[Byte], Array[Byte])] = {
  val topics = streamingConfig.topicSet
  val groupId = streamingConfig.group
   // 首先更新offset
  setOrUpdateOffsets(topics, groupId)
  //從zookeeper上讀取offset開(kāi)始消費(fèi)message
  val messages = {
    val partitionsE = kc.getPartitions(topics)
    if (partitionsE.isLeft)
      throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft)
      throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
    val consumerOffsets = consumerOffsetsE.right.get
    consumerOffsets.foreach {
      case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
    }
    KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (Array[Byte], Array[Byte])](
      ssc, streamingConfig.kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message))
  }
  messages
}

代碼中岖赋,參數(shù)streamingConfig中封裝了Kafka的具體參數(shù)信息檬果,如topic名稱,broker list唐断,消費(fèi)者組id等选脊。kc則是Simple Consumer API的接口類,封裝了具體獲取Kafka數(shù)據(jù)的方法脸甘。代碼段剛開(kāi)始的時(shí)候就調(diào)用了setOrUpdateOffsets()方法來(lái)更新offset恳啥,確保下面得到的數(shù)據(jù)是最新的。

setOrUpdateOffsets方法:

private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
  topics.foreach(topic => {
    var hasConsumed = true
    val partitionsE = kc.getPartitions(Set(topic))
    if (partitionsE.isLeft)
      throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft) hasConsumed = false
    // 某個(gè)groupid首次沒(méi)有offset信息丹诀,會(huì)報(bào)錯(cuò)钝的,從頭開(kāi)始讀
    if (hasConsumed) {// 消費(fèi)過(guò)
      /**
        * 如果streaming程序執(zhí)行的時(shí)候出現(xiàn)kafka.common.OffsetOutOfRangeException,
        * 說(shuō)明zk上保存的offsets已經(jīng)過(guò)時(shí)了铆遭,即kafka的定時(shí)清理策略已經(jīng)將包含該offsets的文件刪除硝桩。
        * 針對(duì)這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小枚荣,
        * 如果consumerOffsets比earliestLeaderOffsets還小的話碗脊,說(shuō)明consumerOffsets已過(guò)時(shí),
        * 這時(shí)把consumerOffsets更新為earliestLeaderOffsets
        */
      val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
      if (earliestLeaderOffsetsE.isLeft)
        throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
      val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
      val consumerOffsets = consumerOffsetsE.right.get
      // 可能只是存在部分分區(qū)consumerOffsets過(guò)時(shí),所以只更新過(guò)時(shí)分區(qū)的consumerOffsets為earliestLeaderOffsets
      var offsets: Map[TopicAndPartition, Long] = Map()
      consumerOffsets.foreach({ case(tp, n) =>
        val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
        if (n < earliestLeaderOffset) {
          println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
            " offsets已經(jīng)過(guò)時(shí)棍弄,更新為" + earliestLeaderOffset)
          offsets += (tp -> earliestLeaderOffset)
        }
      })
      if (!offsets.isEmpty) {
        kc.setConsumerOffsets(groupId, offsets)
      }
    } else {// 沒(méi)有消費(fèi)過(guò)
    val reset = streamingConfig.resetSign.toLowerCase
      var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
      if (reset == Some("smallest")) {// 從頭消費(fèi)
      val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (leaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
        leaderOffsets = leaderOffsetsE.right.get
      } else { // 從最新offset處消費(fèi)
        val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
        if (leaderOffsetsE.isLeft)
          throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
        leaderOffsets = leaderOffsetsE.right.get
      }
      val offsets = leaderOffsets.map {
        case (tp, offset) => (tp, offset.offset)
      }
      kc.setConsumerOffsets(groupId, offsets)
    }
  })
}

最后是updateZKOffsets方法望薄,用于應(yīng)用輸出數(shù)據(jù)后,更新zk中的offset:

def updateZKOffsets(rdd: RDD[(Array[Byte], Array[Byte])]) : Unit = {
  val groupId = streamingConfig.group
  val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (offsets <- offsetsList) {
    val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
    val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
    if (o.isLeft) {
      println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
    }
  }
}

對(duì)于一個(gè)SparkStreaming應(yīng)用來(lái)說(shuō)呼畸,每個(gè)批次通過(guò)createDirectStream方法來(lái)獲取zookeeper中最新的offset痕支,然后使用simple kafka api獲取數(shù)據(jù),消費(fèi)處理完之后蛮原,再通過(guò)updateZKOffsets方法卧须,更新這個(gè)duration消費(fèi)的offset至zookeeper,以此過(guò)程保證了exactly once的語(yǔ)義。

計(jì)算過(guò)程中的容錯(cuò)性

以上所述僅保證了在讀取數(shù)據(jù)源過(guò)程中的exactly once花嘶,數(shù)據(jù)讀取成功后笋籽,在Spark應(yīng)用中做處理時(shí),是怎么保證數(shù)據(jù)不重不丟的呢椭员?Spark在容錯(cuò)性這一方面交出了令人滿意的答卷车海。撇去Driver與Executor的高可用性不說(shuō),Spark應(yīng)用內(nèi)部則采用checkpoint和lineage的機(jī)制來(lái)確保容錯(cuò)性隘击。

lineage
一般翻譯為血統(tǒng)侍芝,簡(jiǎn)單來(lái)說(shuō)就是RDD在轉(zhuǎn)化的過(guò)程中,由于父RDD與子RDD存在依賴關(guān)系(Dependency)埋同,從而形成的lineage州叠,也可以理解為lineage串起了RDD DAG。

RDD可以進(jìn)行緩存凶赁,通過(guò)調(diào)用persist或者cache方法咧栗,將RDD持久化到內(nèi)存或者磁盤中,這樣緩存的RDD就可以被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中被重用虱肄,緩存是構(gòu)建Spark快速迭代的關(guān)鍵致板。

當(dāng)一個(gè)RDD丟失的情況下,Spark會(huì)去尋找它的父RDD是否已經(jīng)緩存浩峡,如果已經(jīng)緩存可岂,就可以通過(guò)父RDD直接算出當(dāng)前的RDD错敢,從而避免了緩存之前的RDD的計(jì)算過(guò)程翰灾,且只有丟失數(shù)據(jù)的partition需要進(jìn)行重算,這樣Spark就避免了RDD上的重復(fù)計(jì)算稚茅,能夠極大的提升計(jì)算速度纸淮。

緩存雖然可以提升Spark快速迭代計(jì)算的速度,但是緩存是會(huì)丟失的亚享。

checkpoint
檢查點(diǎn)機(jī)制就是為了可以切斷l(xiāng)ineage的依賴關(guān)系咽块,在某個(gè)重要的節(jié)點(diǎn),將RDD持久化到文件系統(tǒng)中(一般選擇HDFS)欺税,這樣就算之前的緩存已經(jīng)丟失了侈沪,也可以保證檢查點(diǎn)數(shù)據(jù)不會(huì)丟失,這樣在恢復(fù)的時(shí)候晚凿,會(huì)直接從檢查點(diǎn)的數(shù)據(jù)開(kāi)始進(jìn)行計(jì)算亭罪,檢查點(diǎn)機(jī)制在SparkStreaming這種流式計(jì)算中發(fā)揮的作用會(huì)更大。

可以通過(guò)以下源碼為入口進(jìn)一步了解Spark的緩存和檢查點(diǎn)機(jī)制歼秽,RDD在進(jìn)行計(jì)算的時(shí)候會(huì)調(diào)用其iterator方法应役,在該方法中會(huì)首先去讀取緩存的數(shù)據(jù),如果沒(méi)有緩存的數(shù)據(jù)則會(huì)去讀取checkpoint的數(shù)據(jù)

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

Spark在計(jì)算過(guò)程中采用的lineage和checkpoint機(jī)制相互結(jié)合,取長(zhǎng)補(bǔ)短箩祥,再加上Spark各個(gè)組件底層本身就是具有高可用性院崇,所以在Spark應(yīng)用在轉(zhuǎn)化計(jì)算的過(guò)程中,可是保證數(shù)據(jù)處理的exactly once袍祖。

寫入存儲(chǔ)介質(zhì)的冪等性或事務(wù)性

Spark進(jìn)行數(shù)據(jù)輸出的時(shí)候底瓣,為了達(dá)到exactly once,有兩種方式:

  1. 冪等更新
    指多次寫入的結(jié)果總是寫入相同的數(shù)據(jù)蕉陋,比較典型的例子是key-value型數(shù)據(jù)庫(kù)濒持,即使數(shù)據(jù)可能多次寫入,但是最終的結(jié)果也不會(huì)影響其正確性寺滚,Spark RDD的輸出方法saveAsTextFile在輸出的時(shí)候?qū)DD轉(zhuǎn)換成為PairRDD柑营,總是將相同的數(shù)據(jù)寫入到文件系統(tǒng)中,而PairRDD的輸出方法本身就滿足key-value的模型村视,所以均滿足冪等更新官套。
  2. 事務(wù)更新
    指所有的更新都是基于事務(wù)的,所以更新都是exactly once蚁孔。Spark需要用戶自己實(shí)現(xiàn)事物機(jī)制奶赔,在foreachRDD方法中,用戶可以使用batch time和partition index來(lái)創(chuàng)建一個(gè)id杠氢,使用這個(gè)id來(lái)確保數(shù)據(jù)的唯一性站刑,啟動(dòng)事務(wù)并使用這個(gè)id來(lái)更新外部系統(tǒng)數(shù)據(jù),如果這個(gè)id不存在則提交更新鼻百,如果這個(gè)id已經(jīng)存在那么則放棄更新绞旅。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

另外,還有一些下游的存儲(chǔ)介質(zhì)本身就不支持冪等或是事務(wù)性寫入温艇,比如kafka因悲。Spark的task或是stage的失敗重做機(jī)制以及kafka本身的高可用寫入,都會(huì)造成一些數(shù)據(jù)重復(fù)勺爱,這可能就需要Kafka本身去支持transaction write或者其下游應(yīng)用去實(shí)現(xiàn)去重機(jī)制晃琳。

最后,exactly once固然是個(gè)理想的狀態(tài)琐鲁,但其實(shí)現(xiàn)成本也是非常高的卫旱,在對(duì)數(shù)據(jù)可靠性要求不是很高的場(chǎng)景中,at-least-once甚至丟失少量數(shù)據(jù)也是可以作為一個(gè)選項(xiàng)考慮的围段。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末顾翼,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蒜撮,更是在濱河造成了極大的恐慌暴构,老刑警劉巖跪呈,帶你破解...
    沈念sama閱讀 223,002評(píng)論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異取逾,居然都是意外死亡耗绿,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門砾隅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)误阻,“玉大人,你說(shuō)我怎么就攤上這事晴埂【糠矗” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 169,787評(píng)論 0 365
  • 文/不壞的土叔 我叫張陵儒洛,是天一觀的道長(zhǎng)精耐。 經(jīng)常有香客問(wèn)我,道長(zhǎng)琅锻,這世上最難降的妖魔是什么卦停? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 60,237評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮恼蓬,結(jié)果婚禮上惊完,老公的妹妹穿的比我還像新娘。我一直安慰自己处硬,他們只是感情好小槐,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,237評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著荷辕,像睡著了一般凿跳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上桐腌,一...
    開(kāi)封第一講書(shū)人閱讀 52,821評(píng)論 1 314
  • 那天拄显,我揣著相機(jī)與錄音苟径,去河邊找鬼案站。 笑死,一個(gè)胖子當(dāng)著我的面吹牛棘街,可吹牛的內(nèi)容都是我干的蟆盐。 我是一名探鬼主播,決...
    沈念sama閱讀 41,236評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼遭殉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼石挂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起险污,我...
    開(kāi)封第一講書(shū)人閱讀 40,196評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤痹愚,失蹤者是張志新(化名)和其女友劉穎富岳,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拯腮,經(jīng)...
    沈念sama閱讀 46,716評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡窖式,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,794評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了动壤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萝喘。...
    茶點(diǎn)故事閱讀 40,928評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖琼懊,靈堂內(nèi)的尸體忽然破棺而出阁簸,到底是詐尸還是另有隱情,我是刑警寧澤哼丈,帶...
    沈念sama閱讀 36,583評(píng)論 5 351
  • 正文 年R本政府宣布启妹,位于F島的核電站,受9級(jí)特大地震影響醉旦,放射性物質(zhì)發(fā)生泄漏翅溺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,264評(píng)論 3 336
  • 文/蒙蒙 一髓抑、第九天 我趴在偏房一處隱蔽的房頂上張望咙崎。 院中可真熱鬧,春花似錦吨拍、人聲如沸褪猛。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,755評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)伊滋。三九已至,卻和暖如春队秩,著一層夾襖步出監(jiān)牢的瞬間笑旺,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,869評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工馍资, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留筒主,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,378評(píng)論 3 379
  • 正文 我出身青樓鸟蟹,卻偏偏與公主長(zhǎng)得像乌妙,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子建钥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,937評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容