exactly once指的是在處理數(shù)據(jù)的過(guò)程中径荔,系統(tǒng)有很好的容錯(cuò)性(fault-tolerance)少辣,能夠保證數(shù)據(jù)處理不重不丟耐齐,每一條數(shù)據(jù)僅被處理一次厂抽。
Spark具備很好的機(jī)制來(lái)保證exactly once的語(yǔ)義需频,具體體現(xiàn)在數(shù)據(jù)源的可重放性、計(jì)算過(guò)程中的容錯(cuò)性筷凤、以及寫入存儲(chǔ)介質(zhì)時(shí)的冪等性或者事務(wù)性贺辰。
數(shù)據(jù)源的可重放性
數(shù)據(jù)源具有可重放性指的是在出現(xiàn)問(wèn)題時(shí)可以重新獲取到需要的數(shù)據(jù),一般Spark的數(shù)據(jù)源分為兩種:
第一種為基于文件系統(tǒng)的數(shù)據(jù)源嵌施,如HDFS饲化,這種數(shù)據(jù)源的數(shù)據(jù)來(lái)自于文件,所以本身就具備了可重放性吗伤;
另一種為基于外部接收器的數(shù)據(jù)源吃靠,如Kafka,這種數(shù)據(jù)源就不一定能夠具備可重放性足淆,需要具體考慮巢块。
以Kafka為例,一般與Kafka配合使用的是Spark中的SparkStreaming模塊巧号,用來(lái)對(duì)流式數(shù)據(jù)進(jìn)行準(zhǔn)實(shí)時(shí)的處理族奢。而SparkStreaming接入Kafka的數(shù)據(jù)有兩種模式,一種為Receiver模式丹鸿,一種為Direct模式越走。
Receiver模式
Receiver模式采用Kafka的高階consumer API,Kafka自己封裝了對(duì)數(shù)據(jù)的獲取邏輯,且通過(guò)Zookeeper管理offset信息廊敌,這種模式在與SparkStreaming對(duì)接時(shí)铜跑,有以下特點(diǎn):
- Kafka中的partition數(shù)量與SparkStreaming中的并行度不是一一對(duì)應(yīng)的,SparkStreaming通過(guò)創(chuàng)建Receiver去讀取Kafka中數(shù)據(jù)骡澈,createStream()方法傳入的并發(fā)參數(shù)代表的是讀取Kafka中topic+partition的線程數(shù)锅纺,并不能提高SparkStreaming讀取數(shù)據(jù)的并行度。
- Kafka自己管理offset肋殴,Receiver作為一個(gè)高層的Consumer來(lái)消費(fèi)數(shù)據(jù)囤锉,其消費(fèi)的偏移量(offset)由Kafka記錄在Zookeeper中,一旦出現(xiàn)錯(cuò)誤护锤,那些已經(jīng)標(biāo)記為消費(fèi)過(guò)的數(shù)據(jù)將會(huì)丟失官地。
Receiver模式下,為了解決讀取數(shù)據(jù)時(shí)的并行度問(wèn)題蔽豺,可以創(chuàng)建多個(gè)DStream区丑,然后union起來(lái)拧粪,具體可參考文章:http://www.reibang.com/p/c8669261165a修陡;為了解決數(shù)據(jù)丟失的問(wèn)題,可以選擇開(kāi)啟Spark的WAL(write ahead log)機(jī)制可霎,每次處理數(shù)據(jù)前將預(yù)寫日志寫入到HDFS中魄鸦,如果節(jié)點(diǎn)出現(xiàn)錯(cuò)誤,可以從WAL中恢復(fù)癣朗。但是這種方法其實(shí)效率低下拾因,不僅數(shù)據(jù)冗余(Kafka中有副本機(jī)制,Spark中還要存一份)旷余,且無(wú)法保證exactly once绢记,數(shù)據(jù)可能重復(fù)消費(fèi)。
無(wú)論采取什么方法進(jìn)行補(bǔ)救正卧,Receiver模式都不能夠?qū)崿F(xiàn)exactly once的語(yǔ)義蠢熄,其根本原因是Kafka自己管理的offset與SparkStreaming實(shí)際處理數(shù)據(jù)的offset沒(méi)有同步導(dǎo)致的。
Direct模式
為了解決Receiver模式的弊病炉旷,Spark1.3中引入了Direct模式來(lái)替代Receiver模式签孔,它使用Kafka的Simple consumer API,由Spark應(yīng)用自己管理offset信息窘行,以達(dá)成exactly once的語(yǔ)義饥追,其特點(diǎn)如下:
- Kafka中的partition與SparkStreaming中的partition一一對(duì)應(yīng),也就是SparkStreaming讀取數(shù)據(jù)的并行度取決于Kafka中partition的數(shù)量罐盔。
- 不依賴Receiver但绕,而是通過(guò)低階api直接找到topic+partition的leader獲取數(shù)據(jù),并由SparkStreaming應(yīng)用自己負(fù)責(zé)追蹤維護(hù)消費(fèi)的offset惶看。
由于SparkStreaming自己可以維護(hù)offset壁熄,所以應(yīng)用自身消費(fèi)的數(shù)據(jù)和偏移量之間的對(duì)應(yīng)關(guān)系確定的帚豪,數(shù)據(jù)也是同步的,所以可以實(shí)現(xiàn)exactly once的語(yǔ)義草丧。
下面將給出Direct模式下狸臣,SparkStreaming應(yīng)用管理offset的方法案例,其中offset依然是存放在zookeeper中昌执,但是由應(yīng)用自身來(lái)管理的烛亦,offset也可以放在Redis、MySQL懂拾、HBase中進(jìn)行管理煤禽,根據(jù)具體情況進(jìn)行選擇。
createDirectStream方法:
def createDirectStream(ssc:StreamingContext)(implicit streamingConfig: StreamingConfig, kc: SimpleKafkaCluster): InputDStream[(Array[Byte], Array[Byte])] = {
val topics = streamingConfig.topicSet
val groupId = streamingConfig.group
// 首先更新offset
setOrUpdateOffsets(topics, groupId)
//從zookeeper上讀取offset開(kāi)始消費(fèi)message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
consumerOffsets.foreach {
case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
}
KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (Array[Byte], Array[Byte])](
ssc, streamingConfig.kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message))
}
messages
}
代碼中岖赋,參數(shù)streamingConfig中封裝了Kafka的具體參數(shù)信息檬果,如topic名稱,broker list唐断,消費(fèi)者組id等选脊。kc則是Simple Consumer API的接口類,封裝了具體獲取Kafka數(shù)據(jù)的方法脸甘。代碼段剛開(kāi)始的時(shí)候就調(diào)用了setOrUpdateOffsets()方法來(lái)更新offset恳啥,確保下面得到的數(shù)據(jù)是最新的。
setOrUpdateOffsets方法:
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
// 某個(gè)groupid首次沒(méi)有offset信息丹诀,會(huì)報(bào)錯(cuò)钝的,從頭開(kāi)始讀
if (hasConsumed) {// 消費(fèi)過(guò)
/**
* 如果streaming程序執(zhí)行的時(shí)候出現(xiàn)kafka.common.OffsetOutOfRangeException,
* 說(shuō)明zk上保存的offsets已經(jīng)過(guò)時(shí)了铆遭,即kafka的定時(shí)清理策略已經(jīng)將包含該offsets的文件刪除硝桩。
* 針對(duì)這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小枚荣,
* 如果consumerOffsets比earliestLeaderOffsets還小的話碗脊,說(shuō)明consumerOffsets已過(guò)時(shí),
* 這時(shí)把consumerOffsets更新為earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分區(qū)consumerOffsets過(guò)時(shí),所以只更新過(guò)時(shí)分區(qū)的consumerOffsets為earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已經(jīng)過(guò)時(shí)棍弄,更新為" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 沒(méi)有消費(fèi)過(guò)
val reset = streamingConfig.resetSign.toLowerCase
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {// 從頭消費(fèi)
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else { // 從最新offset處消費(fèi)
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
最后是updateZKOffsets方法望薄,用于應(yīng)用輸出數(shù)據(jù)后,更新zk中的offset:
def updateZKOffsets(rdd: RDD[(Array[Byte], Array[Byte])]) : Unit = {
val groupId = streamingConfig.group
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
對(duì)于一個(gè)SparkStreaming應(yīng)用來(lái)說(shuō)呼畸,每個(gè)批次通過(guò)createDirectStream方法來(lái)獲取zookeeper中最新的offset痕支,然后使用simple kafka api獲取數(shù)據(jù),消費(fèi)處理完之后蛮原,再通過(guò)updateZKOffsets方法卧须,更新這個(gè)duration消費(fèi)的offset至zookeeper,以此過(guò)程保證了exactly once的語(yǔ)義。
計(jì)算過(guò)程中的容錯(cuò)性
以上所述僅保證了在讀取數(shù)據(jù)源過(guò)程中的exactly once花嘶,數(shù)據(jù)讀取成功后笋籽,在Spark應(yīng)用中做處理時(shí),是怎么保證數(shù)據(jù)不重不丟的呢椭员?Spark在容錯(cuò)性這一方面交出了令人滿意的答卷车海。撇去Driver與Executor的高可用性不說(shuō),Spark應(yīng)用內(nèi)部則采用checkpoint和lineage的機(jī)制來(lái)確保容錯(cuò)性隘击。
lineage
一般翻譯為血統(tǒng)侍芝,簡(jiǎn)單來(lái)說(shuō)就是RDD在轉(zhuǎn)化的過(guò)程中,由于父RDD與子RDD存在依賴關(guān)系(Dependency)埋同,從而形成的lineage州叠,也可以理解為lineage串起了RDD DAG。
RDD可以進(jìn)行緩存凶赁,通過(guò)調(diào)用persist或者cache方法咧栗,將RDD持久化到內(nèi)存或者磁盤中,這樣緩存的RDD就可以被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中被重用虱肄,緩存是構(gòu)建Spark快速迭代的關(guān)鍵致板。
當(dāng)一個(gè)RDD丟失的情況下,Spark會(huì)去尋找它的父RDD是否已經(jīng)緩存浩峡,如果已經(jīng)緩存可岂,就可以通過(guò)父RDD直接算出當(dāng)前的RDD错敢,從而避免了緩存之前的RDD的計(jì)算過(guò)程翰灾,且只有丟失數(shù)據(jù)的partition需要進(jìn)行重算,這樣Spark就避免了RDD上的重復(fù)計(jì)算稚茅,能夠極大的提升計(jì)算速度纸淮。
緩存雖然可以提升Spark快速迭代計(jì)算的速度,但是緩存是會(huì)丟失的亚享。
checkpoint
檢查點(diǎn)機(jī)制就是為了可以切斷l(xiāng)ineage的依賴關(guān)系咽块,在某個(gè)重要的節(jié)點(diǎn),將RDD持久化到文件系統(tǒng)中(一般選擇HDFS)欺税,這樣就算之前的緩存已經(jīng)丟失了侈沪,也可以保證檢查點(diǎn)數(shù)據(jù)不會(huì)丟失,這樣在恢復(fù)的時(shí)候晚凿,會(huì)直接從檢查點(diǎn)的數(shù)據(jù)開(kāi)始進(jìn)行計(jì)算亭罪,檢查點(diǎn)機(jī)制在SparkStreaming這種流式計(jì)算中發(fā)揮的作用會(huì)更大。
可以通過(guò)以下源碼為入口進(jìn)一步了解Spark的緩存和檢查點(diǎn)機(jī)制歼秽,RDD在進(jìn)行計(jì)算的時(shí)候會(huì)調(diào)用其iterator方法应役,在該方法中會(huì)首先去讀取緩存的數(shù)據(jù),如果沒(méi)有緩存的數(shù)據(jù)則會(huì)去讀取checkpoint的數(shù)據(jù)
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
Spark在計(jì)算過(guò)程中采用的lineage和checkpoint機(jī)制相互結(jié)合,取長(zhǎng)補(bǔ)短箩祥,再加上Spark各個(gè)組件底層本身就是具有高可用性院崇,所以在Spark應(yīng)用在轉(zhuǎn)化計(jì)算的過(guò)程中,可是保證數(shù)據(jù)處理的exactly once袍祖。
寫入存儲(chǔ)介質(zhì)的冪等性或事務(wù)性
Spark進(jìn)行數(shù)據(jù)輸出的時(shí)候底瓣,為了達(dá)到exactly once,有兩種方式:
- 冪等更新
指多次寫入的結(jié)果總是寫入相同的數(shù)據(jù)蕉陋,比較典型的例子是key-value型數(shù)據(jù)庫(kù)濒持,即使數(shù)據(jù)可能多次寫入,但是最終的結(jié)果也不會(huì)影響其正確性寺滚,Spark RDD的輸出方法saveAsTextFile在輸出的時(shí)候?qū)DD轉(zhuǎn)換成為PairRDD柑营,總是將相同的數(shù)據(jù)寫入到文件系統(tǒng)中,而PairRDD的輸出方法本身就滿足key-value的模型村视,所以均滿足冪等更新官套。 - 事務(wù)更新
指所有的更新都是基于事務(wù)的,所以更新都是exactly once蚁孔。Spark需要用戶自己實(shí)現(xiàn)事物機(jī)制奶赔,在foreachRDD方法中,用戶可以使用batch time和partition index來(lái)創(chuàng)建一個(gè)id杠氢,使用這個(gè)id來(lái)確保數(shù)據(jù)的唯一性站刑,啟動(dòng)事務(wù)并使用這個(gè)id來(lái)更新外部系統(tǒng)數(shù)據(jù),如果這個(gè)id不存在則提交更新鼻百,如果這個(gè)id已經(jīng)存在那么則放棄更新绞旅。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
另外,還有一些下游的存儲(chǔ)介質(zhì)本身就不支持冪等或是事務(wù)性寫入温艇,比如kafka因悲。Spark的task或是stage的失敗重做機(jī)制以及kafka本身的高可用寫入,都會(huì)造成一些數(shù)據(jù)重復(fù)勺爱,這可能就需要Kafka本身去支持transaction write或者其下游應(yīng)用去實(shí)現(xiàn)去重機(jī)制晃琳。
最后,exactly once固然是個(gè)理想的狀態(tài)琐鲁,但其實(shí)現(xiàn)成本也是非常高的卫旱,在對(duì)數(shù)據(jù)可靠性要求不是很高的場(chǎng)景中,at-least-once甚至丟失少量數(shù)據(jù)也是可以作為一個(gè)選項(xiàng)考慮的围段。