SparkStreaming Exactly Once語義總結(jié)

什么是唯一一次

唯一一次崖面,也叫ECO(Exactly once)語義是流式處理程序的重要話題元咙,也是很多流式處理程序問題的關鍵所在。

為了保證每個消息在系統(tǒng)或網(wǎng)絡故障的情況下巫员,被處理一次且僅處理一次庶香。這首先需要流式處理框架需要提供這種功能的支持,然后在消息傳遞简识、數(shù)據(jù)輸出存儲(比如到MySQL落庫)以及具體的代碼編寫等等各個方面整體考慮和配合赶掖,才最終能夠保證消息的ECO感猛。

GIT上面有個不錯的項目,里面有一些相關的樣例代碼https://github.com/jizhang/spark-sandbox

一個簡單的樣例代碼

下面的代碼實現(xiàn)了從Kafka獲取訪問數(shù)據(jù)奢赂,然后統(tǒng)計錯誤日志數(shù)量陪白,然后每分鐘一次把統(tǒng)計的結(jié)果插入到Mysql數(shù)據(jù)庫。

源數(shù)據(jù)樣例:

2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message

Mysql結(jié)果表:

create table error_log (
 log_time datetime primary key, -- 具體到分鐘粒度膳灶,來自源數(shù)據(jù)中到秒的時間值
 log_count int not null default 0 -- 行數(shù)
);

部分關鍵代碼:

// 初始化MySQL 連接池
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")
// 初始化SparkStreaming 的context 
val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

// 通過Direct API連接Kafka拷泽,這種方式方便我們控制偏移量
val messages = KafkaUtils.createDirectStream[String, String](ssc,
   LocationStrategies.PreferConsistent,
   ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))

//遍歷 DSStream中的每個RDD,每個RDD都是一個SparkStreaming 的 微批次(micro batch)
messages.foreachRDD { rdd =>
  // 數(shù)據(jù)統(tǒng)計袖瞻,具體邏輯
  // 取源數(shù)據(jù)中的日期和日志類型司致,然后過濾ERROR日志,然后截取日期到分鐘粒度聋迎,然后統(tǒng)計行數(shù)入庫
  val result = rdd.map(_.value)
    .flatMap(parseLog) // utility function to parse log line into case class
    .filter(_.level == "ERROR")
    .map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
    .reduceByKey(_ + _)
    .collect() //這里需要特別注意脂矫,數(shù)據(jù)全部通過collect() 方法回到了Driver進程中,不在各個分區(qū)

  // 在Driver中對每條數(shù)據(jù)進行Merge插入霉晕,相同時間的數(shù)據(jù)更新庭再,不存在的時間則插入。
  DB.autoCommit { implicit session =>
    result.foreach { case (time, count) =>
      sql"""
      insert into error_log (log_time, log_count)
      value (${time}, ${count})
      on duplicate key update log_count = log_count + values(log_count)
      """.update.apply()
    }
  }
}

流式處理語義

流式處理牺堰,一共分為三個階段拄轻,分別是 接收數(shù)據(jù)、處理數(shù)據(jù)和輸出數(shù)據(jù)

這三個階段都需要相互配合來保證ECO伟葫。

接收數(shù)據(jù)

接收數(shù)據(jù)過程恨搓,比較依賴具體的數(shù)據(jù)源。不同的數(shù)據(jù)源有不同的處理方式來保證每條消息的ECO筏养。

比如斧抱,對HDFS這種可容錯的文件系統(tǒng),可以和SparkStreaming 整合保證ECO

對于有反饋的消息隊列渐溶,如 RabbitMQ辉浦,可以結(jié)合 Spark 的 Write Ahead Log 來保證ECO

對于不可靠的接收器,比如 SocketTextStream茎辐,很容易因為Driver的故障導致數(shù)據(jù)丟失宪郊。

對于Kakfa,通過Direct API拖陆,也就是直接操作偏移量的弛槐,也可以提供ECO支持

處理數(shù)據(jù)

SparkStreaming 通過 DSStream,底層是一個個連續(xù)的Spark 微批次 RDD慕蔚。對該數(shù)據(jù)結(jié)構(gòu)操作丐黄,本身就是直接提供ECO支持的。

因此在數(shù)據(jù)處理這塊孔飒,即RDD操作這塊灌闺,本身就可以提供錯誤重啟和ECO艰争。

輸出數(shù)據(jù)

默認SparkStreaming提供At Least Once的輸出。當某個Excutor正在執(zhí)行數(shù)據(jù)輸出時發(fā)生故障桂对,Driver重新啟動一個Excutor甩卓,然后會再次進行數(shù)據(jù)輸出。

這個過程蕉斜,可能回導致同樣的數(shù)據(jù)多次輸出逾柿。

有兩種方式可以解決這種重復輸出的問題

  • 基于冪等的更新方式
  • 基于事務的更新方式

冪等輸出和ECO

對于固定的一份源數(shù)據(jù),重復經(jīng)過相同邏輯處理并進行寫入操作宅此,且產(chǎn)生相同的結(jié)果机错。一般這種操作就被認為是冪等更新。

舉個例子父腕,saveAsTextFile操作弱匪,被認為是典型的冪等操作。多次對一個RDD和同一個文件調(diào)用該操作璧亮,結(jié)果值是固定不變的萧诫。上次操作和下次操作之間是互不相關的。

如果message是由 unique key的枝嘶,那么message在寫入database的時候帘饶,可以通過相同key執(zhí)行update,不存在的key執(zhí)行insert群扶,來實現(xiàn)冪等更新及刻。

但是,冪等更新一般適合于map型的sparkStreaming邏輯穷当。舉個例子:

一個message進入sparkStreaming計算引擎提茁,經(jīng)過一系列計算,最終出來的結(jié)果也是一個message馁菜,且這個message的一個或者兩個字段是unique的。這種類型的處理可以通過insert on duplicate update的語法保證冪等更新铃岔。

如果sparkStreaming執(zhí)行了reduceByKey或者Count等操作汪疮,這個時候,比較難以通過這種方式保證冪等更新毁习。

冪等更新需要對kafka做一些配置

  • 設置enable.auto.commit為false智嚷,默認情況下,DStream讀取kafka數(shù)據(jù)時纺且,會在接收到數(shù)據(jù)后盏道,立馬就提交offsets。把這個參數(shù)設置為false之后载碌,是為了能夠在一個批次執(zhí)行完成后再提交offset.
  • 打開SparkStreaming的checkpoint進行kafka 的 offsets存儲猜嘱,不過衅枫,如果更改過了代碼之后,checkpoint會失效朗伶,因此弦撩,還需要通過下面的方式進一步的存儲offsets作為補充:
  • 結(jié)果輸出到target以后,再提交offsets论皆。kafka提供了commitAsync API益楼,用來異步提交offsets。同時点晴,提供了HasOffsetRangers類來從RDD中抽取kafka的Offsets值感凤。
messages.foreachRDD { rdd =>
  // 抽取RDD中的kafka offsets,以每個微批次(micro btch)RDD為單位進行獲取
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
  rdd.foreachPartition { iter =>
    //遍歷微批次RDD的每個分區(qū)粒督,然后輸出都數(shù)據(jù)庫
    //注意俊扭,這次沒有把所有數(shù)據(jù)collect()到Driver后再輸出
    // 注意,因為這次插入操作時冪等插入坠陈,類似于 insert or update on duplicate key這種萨惑,
    //所以即使部分分區(qū)失敗,或者整個任務失敗重啟仇矾,都不會產(chǎn)生多余錯誤的數(shù)據(jù)
  }
  //把每個分區(qū)的數(shù)據(jù)插入到數(shù)據(jù)庫之后庸蔼,再提交
  //如果之前輸出出現(xiàn)任何問題,則不會提交偏移量贮匕,那么則會從之前的偏移量重新計算
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

這里如果有人由疑問:如果某些個分區(qū)輸出到數(shù)據(jù)庫失敗重跑姐仅,或者,整個任務掛掉需要重跑刻盐,會不會導致重復數(shù)據(jù)掏膏?或者一個數(shù)據(jù)被計算多次?

答案是:不會

要注意敦锌,上一段代碼馒疹,是基于冪等輸出的結(jié)果之上的,比如說乙墙,saveAsTextFile颖变,或者是對數(shù)據(jù)庫進行 insert on duplicate key update更新。

所以听想,對于一個微批次來說腥刹,不管失敗多少次,只要重啟后的微批次還是通過之前的offsets去kafka取數(shù)汉买,計算的結(jié)果永遠是一致的衔峰,那么輸出的結(jié)果也永遠是一致的,那么通過冪等更新的方式,最終數(shù)據(jù)庫中或者結(jié)果文件中的內(nèi)容也是一致的5媛薄M谩!

冪等更新天生對于重復型輸出可以保證ECO

事務輸出和ECO

冪等型更新葫男,很多時候是不能保證的抱冷,因為我們會對流式數(shù)據(jù)進行各種維度的統(tǒng)計和計算,那么這個時候梢褐,就沒法保證數(shù)據(jù)的冪等更新旺遮。

事務型輸出,對于map型計算和aggregation類型的計算都有很好的支持盈咳,基本原理就是在同一個事務中耿眉,同時執(zhí)行數(shù)據(jù)輸出和更新kafka offsets,做到一起成功或者一起失敗鱼响。

在SparkStreaming的模型中鸣剪,offset的粒度是到partition粒度的,也就是一個分區(qū)都有自己的一個offsets信息丈积。然后對于一個RDD來說筐骇,所有分區(qū)的offset范圍之和,就是RDD的offsets范圍江滨。

對于map型操作铛纬,因為SparkStreaming的一個分區(qū)對應一個kafka的分區(qū),因此我們可以獲取到每個分區(qū)的offsets信息唬滑。這樣告唆,可以在輸出時,做到更細粒度的控制晶密。

messages.foreachRDD { rdd =>
    //獲取整個RDD的offsets信息
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
      // 獲取到每個分區(qū)粒度的offsets信息擒悬,做更細力度的事務控制。之前事務都是控制到RDD級別的offsets記錄稻艰,現(xiàn)在可以控制到partition級別的記錄
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}

但是map型操作畢竟還是比較少的懂牧,更多的是aggregate類型的操作,這種操作設計到shffule连锯。經(jīng)過shuffle以后归苍,RDD的partition就和Kafka的patition沒有對應關系了。因此這個時候运怖,就需要對整個RDD放在一個事務中進行數(shù)據(jù)插入和offset更新

messages.foreachRDD { rdd =>
    //獲取整個RDD的所有offsets信息信息
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 對RDD進行aggregate操作,并返回結(jié)果RDD
  val result = processLogs(rdd).collect() // parse log and count error
    // 開啟一個事務(一個RDD就對應一個事務)
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // 對RDD結(jié)果值進行入庫夏伊,注意摇展,此刻還未提交事務,下一步需要提交各分區(qū)的offsets
    }
      // 取出各個partition的offsets信息溺忧,并且按照 topic + partition 作為唯一鍵咏连,進行offsets的更新存儲
      // kafka在建立topic的時候盯孙,就已經(jīng)指定好了分區(qū)個數(shù)
      // 此刻取到的分區(qū)partiton就是對應的是kafka topic的分區(qū)
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
          // 如果更新失敗,說明程序出現(xiàn)問題祟滴,需要拋出異常
        throw new Exception("fail to update offset")
      }
    }
  }
}

結(jié)尾

Exactly-once 是流處理中非常強的語義振惰,不可避免地會給你的應用帶來一些開銷,影響吞吐量垄懂。 它也不適用于窗口操作骑晶。 因此,您需要決定是否有必要花費這些精力草慧,或者即使數(shù)據(jù)丟失很少桶蛔,使用較弱的語義也足夠了。 但肯定知道如何實現(xiàn)精確一次是一個很好的學習機會漫谷,而且非常有趣仔雷。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市舔示,隨后出現(xiàn)的幾起案子碟婆,更是在濱河造成了極大的恐慌,老刑警劉巖惕稻,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件竖共,死亡現(xiàn)場離奇詭異,居然都是意外死亡缩宜,警方通過查閱死者的電腦和手機肘迎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锻煌,“玉大人妓布,你說我怎么就攤上這事∷挝啵” “怎么了匣沼?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長捂龄。 經(jīng)常有香客問我释涛,道長,這世上最難降的妖魔是什么倦沧? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任唇撬,我火速辦了婚禮,結(jié)果婚禮上展融,老公的妹妹穿的比我還像新娘窖认。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布扑浸。 她就那樣靜靜地躺著烧给,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喝噪。 梳的紋絲不亂的頭發(fā)上础嫡,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機與錄音酝惧,去河邊找鬼榴鼎。 笑死,一個胖子當著我的面吹牛系奉,可吹牛的內(nèi)容都是我干的檬贰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼缺亮,長吁一口氣:“原來是場噩夢啊……” “哼翁涤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起萌踱,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤葵礼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后并鸵,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鸳粉,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年园担,在試婚紗的時候發(fā)現(xiàn)自己被綠了届谈。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡弯汰,死狀恐怖艰山,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情咏闪,我是刑警寧澤曙搬,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站鸽嫂,受9級特大地震影響纵装,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜据某,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一橡娄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧癣籽,春花似錦瀑踢、人聲如沸扳还。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至桑逝,卻和暖如春棘劣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背楞遏。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工茬暇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人寡喝。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓糙俗,卻偏偏與公主長得像,于是被迫代替她去往敵國和親预鬓。 傳聞我的和親對象是個殘疾皇子巧骚,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容