什么是唯一一次
唯一一次崖面,也叫ECO(Exactly once)語義是流式處理程序的重要話題元咙,也是很多流式處理程序問題的關鍵所在。
為了保證每個消息在系統(tǒng)或網(wǎng)絡故障的情況下巫员,被處理一次且僅處理一次庶香。這首先需要流式處理框架需要提供這種功能的支持,然后在消息傳遞简识、數(shù)據(jù)輸出存儲(比如到MySQL落庫)以及具體的代碼編寫等等各個方面整體考慮和配合赶掖,才最終能夠保證消息的ECO感猛。
GIT上面有個不錯的項目,里面有一些相關的樣例代碼https://github.com/jizhang/spark-sandbox
一個簡單的樣例代碼
下面的代碼實現(xiàn)了從Kafka獲取訪問數(shù)據(jù)奢赂,然后統(tǒng)計錯誤日志數(shù)量陪白,然后每分鐘一次把統(tǒng)計的結(jié)果插入到Mysql數(shù)據(jù)庫。
源數(shù)據(jù)樣例:
2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message
Mysql結(jié)果表:
create table error_log (
log_time datetime primary key, -- 具體到分鐘粒度膳灶,來自源數(shù)據(jù)中到秒的時間值
log_count int not null default 0 -- 行數(shù)
);
部分關鍵代碼:
// 初始化MySQL 連接池
ConnectionPool.singleton("jdbc:mysql://localhost:3306/spark", "root", "")
// 初始化SparkStreaming 的context
val conf = new SparkConf().setAppName("ExactlyOnce").setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// 通過Direct API連接Kafka拷泽,這種方式方便我們控制偏移量
val messages = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("alog"), kafkaParams))
//遍歷 DSStream中的每個RDD,每個RDD都是一個SparkStreaming 的 微批次(micro batch)
messages.foreachRDD { rdd =>
// 數(shù)據(jù)統(tǒng)計袖瞻,具體邏輯
// 取源數(shù)據(jù)中的日期和日志類型司致,然后過濾ERROR日志,然后截取日期到分鐘粒度聋迎,然后統(tǒng)計行數(shù)入庫
val result = rdd.map(_.value)
.flatMap(parseLog) // utility function to parse log line into case class
.filter(_.level == "ERROR")
.map(log => log.time.truncatedTo(ChronoUnit.MINUTES) -> 1)
.reduceByKey(_ + _)
.collect() //這里需要特別注意脂矫,數(shù)據(jù)全部通過collect() 方法回到了Driver進程中,不在各個分區(qū)
// 在Driver中對每條數(shù)據(jù)進行Merge插入霉晕,相同時間的數(shù)據(jù)更新庭再,不存在的時間則插入。
DB.autoCommit { implicit session =>
result.foreach { case (time, count) =>
sql"""
insert into error_log (log_time, log_count)
value (${time}, ${count})
on duplicate key update log_count = log_count + values(log_count)
""".update.apply()
}
}
}
流式處理語義
流式處理牺堰,一共分為三個階段拄轻,分別是 接收數(shù)據(jù)、處理數(shù)據(jù)和輸出數(shù)據(jù)
這三個階段都需要相互配合來保證ECO伟葫。
接收數(shù)據(jù)
接收數(shù)據(jù)過程恨搓,比較依賴具體的數(shù)據(jù)源。不同的數(shù)據(jù)源有不同的處理方式來保證每條消息的ECO筏养。
比如斧抱,對HDFS這種可容錯的文件系統(tǒng),可以和SparkStreaming 整合保證ECO
對于有反饋的消息隊列渐溶,如 RabbitMQ辉浦,可以結(jié)合 Spark 的 Write Ahead Log 來保證ECO
對于不可靠的接收器,比如 SocketTextStream茎辐,很容易因為Driver的故障導致數(shù)據(jù)丟失宪郊。
對于Kakfa,通過Direct API拖陆,也就是直接操作偏移量的弛槐,也可以提供ECO支持
處理數(shù)據(jù)
SparkStreaming 通過 DSStream,底層是一個個連續(xù)的Spark 微批次 RDD慕蔚。對該數(shù)據(jù)結(jié)構(gòu)操作丐黄,本身就是直接提供ECO支持的。
因此在數(shù)據(jù)處理這塊孔飒,即RDD操作這塊灌闺,本身就可以提供錯誤重啟和ECO艰争。
輸出數(shù)據(jù)
默認SparkStreaming提供At Least Once的輸出。當某個Excutor正在執(zhí)行數(shù)據(jù)輸出時發(fā)生故障桂对,Driver重新啟動一個Excutor甩卓,然后會再次進行數(shù)據(jù)輸出。
這個過程蕉斜,可能回導致同樣的數(shù)據(jù)多次輸出逾柿。
有兩種方式可以解決這種重復輸出的問題
- 基于冪等的更新方式
- 基于事務的更新方式
冪等輸出和ECO
對于固定的一份源數(shù)據(jù),重復經(jīng)過相同邏輯處理并進行寫入操作宅此,且產(chǎn)生相同的結(jié)果机错。一般這種操作就被認為是冪等更新。
舉個例子父腕,saveAsTextFile操作弱匪,被認為是典型的冪等操作。多次對一個RDD和同一個文件調(diào)用該操作璧亮,結(jié)果值是固定不變的萧诫。上次操作和下次操作之間是互不相關的。
如果message是由 unique key的枝嘶,那么message在寫入database的時候帘饶,可以通過相同key執(zhí)行update,不存在的key執(zhí)行insert群扶,來實現(xiàn)冪等更新及刻。
但是,冪等更新一般適合于map型的sparkStreaming邏輯穷当。舉個例子:
一個message進入sparkStreaming計算引擎提茁,經(jīng)過一系列計算,最終出來的結(jié)果也是一個message馁菜,且這個message的一個或者兩個字段是unique的。這種類型的處理可以通過insert on duplicate update的語法保證冪等更新铃岔。
如果sparkStreaming執(zhí)行了reduceByKey或者Count等操作汪疮,這個時候,比較難以通過這種方式保證冪等更新毁习。
冪等更新需要對kafka做一些配置
- 設置enable.auto.commit為false智嚷,默認情況下,DStream讀取kafka數(shù)據(jù)時纺且,會在接收到數(shù)據(jù)后盏道,立馬就提交offsets。把這個參數(shù)設置為false之后载碌,是為了能夠在一個批次執(zhí)行完成后再提交offset.
- 打開SparkStreaming的checkpoint進行kafka 的 offsets存儲猜嘱,不過衅枫,如果更改過了代碼之后,checkpoint會失效朗伶,因此弦撩,還需要通過下面的方式進一步的存儲offsets作為補充:
- 結(jié)果輸出到target以后,再提交offsets论皆。kafka提供了commitAsync API益楼,用來異步提交offsets。同時点晴,提供了HasOffsetRangers類來從RDD中抽取kafka的Offsets值感凤。
messages.foreachRDD { rdd =>
// 抽取RDD中的kafka offsets,以每個微批次(micro btch)RDD為單位進行獲取
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
//遍歷微批次RDD的每個分區(qū)粒督,然后輸出都數(shù)據(jù)庫
//注意俊扭,這次沒有把所有數(shù)據(jù)collect()到Driver后再輸出
// 注意,因為這次插入操作時冪等插入坠陈,類似于 insert or update on duplicate key這種萨惑,
//所以即使部分分區(qū)失敗,或者整個任務失敗重啟仇矾,都不會產(chǎn)生多余錯誤的數(shù)據(jù)
}
//把每個分區(qū)的數(shù)據(jù)插入到數(shù)據(jù)庫之后庸蔼,再提交
//如果之前輸出出現(xiàn)任何問題,則不會提交偏移量贮匕,那么則會從之前的偏移量重新計算
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
這里如果有人由疑問:如果某些個分區(qū)輸出到數(shù)據(jù)庫失敗重跑姐仅,或者,整個任務掛掉需要重跑刻盐,會不會導致重復數(shù)據(jù)掏膏?或者一個數(shù)據(jù)被計算多次?
答案是:不會
要注意敦锌,上一段代碼馒疹,是基于冪等輸出的結(jié)果之上的,比如說乙墙,saveAsTextFile颖变,或者是對數(shù)據(jù)庫進行 insert on duplicate key update更新。
所以听想,對于一個微批次來說腥刹,不管失敗多少次,只要重啟后的微批次還是通過之前的offsets去kafka取數(shù)汉买,計算的結(jié)果永遠是一致的衔峰,那么輸出的結(jié)果也永遠是一致的,那么通過冪等更新的方式,最終數(shù)據(jù)庫中或者結(jié)果文件中的內(nèi)容也是一致的5媛薄M谩!
冪等更新天生對于重復型輸出可以保證ECO
事務輸出和ECO
冪等型更新葫男,很多時候是不能保證的抱冷,因為我們會對流式數(shù)據(jù)進行各種維度的統(tǒng)計和計算,那么這個時候梢褐,就沒法保證數(shù)據(jù)的冪等更新旺遮。
事務型輸出,對于map型計算和aggregation類型的計算都有很好的支持盈咳,基本原理就是在同一個事務中耿眉,同時執(zhí)行數(shù)據(jù)輸出和更新kafka offsets,做到一起成功或者一起失敗鱼响。
在SparkStreaming的模型中鸣剪,offset的粒度是到partition粒度的,也就是一個分區(qū)都有自己的一個offsets信息丈积。然后對于一個RDD來說筐骇,所有分區(qū)的offset范圍之和,就是RDD的offsets范圍江滨。
對于map型操作铛纬,因為SparkStreaming的一個分區(qū)對應一個kafka的分區(qū),因此我們可以獲取到每個分區(qū)的offsets信息唬滑。這樣告唆,可以在輸出時,做到更細粒度的控制晶密。
messages.foreachRDD { rdd =>
//獲取整個RDD的offsets信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
// 獲取到每個分區(qū)粒度的offsets信息擒悬,做更細力度的事務控制。之前事務都是控制到RDD級別的offsets記錄稻艰,現(xiàn)在可以控制到partition級別的記錄
val offsetRange = offsetRanges(TaskContext.get.partitionId)
}
}
但是map型操作畢竟還是比較少的懂牧,更多的是aggregate類型的操作,這種操作設計到shffule连锯。經(jīng)過shuffle以后归苍,RDD的partition就和Kafka的patition沒有對應關系了。因此這個時候运怖,就需要對整個RDD放在一個事務中進行數(shù)據(jù)插入和offset更新
messages.foreachRDD { rdd =>
//獲取整個RDD的所有offsets信息信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 對RDD進行aggregate操作,并返回結(jié)果RDD
val result = processLogs(rdd).collect() // parse log and count error
// 開啟一個事務(一個RDD就對應一個事務)
DB.localTx { implicit session =>
result.foreach { case (time, count) =>
// 對RDD結(jié)果值進行入庫夏伊,注意摇展,此刻還未提交事務,下一步需要提交各分區(qū)的offsets
}
// 取出各個partition的offsets信息溺忧,并且按照 topic + partition 作為唯一鍵咏连,進行offsets的更新存儲
// kafka在建立topic的時候盯孙,就已經(jīng)指定好了分區(qū)個數(shù)
// 此刻取到的分區(qū)partiton就是對應的是kafka topic的分區(qū)
offsetRanges.foreach { offsetRange =>
val affectedRows = sql"""
update kafka_offset set offset = ${offsetRange.untilOffset}
where topic = ${topic} and `partition` = ${offsetRange.partition}
and offset = ${offsetRange.fromOffset}
""".update.apply()
if (affectedRows != 1) {
// 如果更新失敗,說明程序出現(xiàn)問題祟滴,需要拋出異常
throw new Exception("fail to update offset")
}
}
}
}
結(jié)尾
Exactly-once 是流處理中非常強的語義振惰,不可避免地會給你的應用帶來一些開銷,影響吞吐量垄懂。 它也不適用于窗口操作骑晶。 因此,您需要決定是否有必要花費這些精力草慧,或者即使數(shù)據(jù)丟失很少桶蛔,使用較弱的語義也足夠了。 但肯定知道如何實現(xiàn)精確一次是一個很好的學習機會漫谷,而且非常有趣仔雷。