kafka事務(wù)

2018-12-03

Kafka消息保證生產(chǎn)的信息不丟失和重復(fù)消費問題

1)使用同步模式的時候毕莱,有3種狀態(tài)保證消息被安全生產(chǎn)凌埂,在配置為1(只保證寫入leader成功)的話,如果剛好leader partition掛了诀蓉,數(shù)據(jù)就會丟失住诸。
2)還有一種情況可能會丟失消息,就是使用異步模式的時候箱亿,當(dāng)緩沖區(qū)滿了跛锌,如果配置為0(還沒有收到確認(rèn)的情況下,緩沖池一滿届惋,就清空緩沖池里的消息)髓帽,
數(shù)據(jù)就會被立即丟棄掉。

在數(shù)據(jù)生產(chǎn)時避免數(shù)據(jù)丟失的方法:
只要能避免上述兩種情況脑豹,那么就可以保證消息不會被丟失郑藏。
1)就是說在同步模式的時候,確認(rèn)機制設(shè)置為-1晨缴,也就是讓消息寫入leader和所有的副本译秦。
2)還有,在異步模式下,如果消息發(fā)出去了筑悴,但還沒有收到確認(rèn)的時候们拙,緩沖池滿了,在配置文件中設(shè)置成不限制阻塞超時的時間阁吝,也就說讓生產(chǎn)端一直阻塞砚婆,這樣也能保證數(shù)據(jù)不會丟失。
在數(shù)據(jù)消費時突勇,避免數(shù)據(jù)丟失的方法:如果使用了storm装盯,要開啟storm的ackfail機制;如果沒有使用storm甲馋,確認(rèn)數(shù)據(jù)被完成處理之后埂奈,再更新offset值。低級API中需要手動控制offset值定躏。

推薦配置

producer
block.on.buffer.full = true 盡管該參數(shù)在0.9.0.0已經(jīng)被標(biāo)記為“deprecated”账磺,但鑒于它的含義非常直觀,所以這里還是顯式設(shè)置它為true痊远,使得producer將一直等待緩沖區(qū)直至其變?yōu)榭捎每蹇埂7駝t如果producer生產(chǎn)速度過快耗盡了緩沖區(qū),producer將拋出異常碧聪。緩沖區(qū)滿了就阻塞在那冒版,不要拋異常,也不要丟失數(shù)據(jù)
acks=all 很好理解逞姿,所有follower都響應(yīng)了才認(rèn)為消息提交成功辞嗡,即"committed"
retries = MAX 無限重試,直到你意識到出現(xiàn)了問題
max.in.flight.requests.per.connection = 1 限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)哼凯。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求欲间。注意:設(shè)置此參數(shù)是為了避免消息亂序
使用KafkaProducer.send(record, callback)而不是send(record)方法 自定義回調(diào)邏輯處理消息發(fā)送失敗楚里,比如記錄在日志中断部,用定時腳本掃描重處理
callback邏輯中最好顯式關(guān)閉producer:close(0) 注意:設(shè)置此參數(shù)是為了避免消息亂序(僅僅因為一條消息發(fā)送沒收到反饋就關(guān)閉生產(chǎn)者,感覺代價很大)
unclean.leader.election.enable=false 關(guān)閉unclean leader選舉班缎,即不允許非ISR中的副本被選舉為leader蝴光,以避免數(shù)據(jù)丟失
replication.factor >= 3 這個完全是個人建議了,參考了Hadoop及業(yè)界通用的三備份原則
min.insync.replicas > 1 消息至少要被寫入到這么多副本才算成功达址,也是提升數(shù)據(jù)持久性的一個參數(shù)蔑祟。與acks配合使用
保證replication.factor > min.insync.replicas 如果兩者相等,當(dāng)一個副本掛掉了分區(qū)也就沒法正常工作了沉唠。通常設(shè)置replication.factor = min.insync.replicas + 1即可

consumer
enable.auto.commit=false 關(guān)閉自動提交位移
在消息被完整處理之后再手動提交位移

https://blog.csdn.net/weixin_38750084/article/details/82939435

kafka動態(tài)維護了一個同步狀態(tài)的副本的集合(a set of In-Sync Replicas)疆虚,簡稱ISR

ack

ack確認(rèn)機制設(shè)置為0,表示不等待響應(yīng),不等待borker的確認(rèn)信息径簿,最小延遲罢屈,producer無法知道消息是否發(fā)生成功,消息可能丟失篇亭,但具有最大吞吐量缠捌。

ack確認(rèn)機制設(shè)置為-1,也就是讓消息寫入leader和所有的副本译蒂,ISR列表中的所有replica都返回確認(rèn)消息曼月。

ack確認(rèn)機制設(shè)置為1,leader已經(jīng)接收了數(shù)據(jù)的確認(rèn)信息柔昼,replica異步拉取消息哑芹,比較折中。

ack確認(rèn)機制設(shè)置為2捕透,表示producer寫partition leader和其他一個follower成功的時候绩衷,broker就返回成功,無論其他的partition follower是否寫成功激率。

ack確認(rèn)機制設(shè)置為 "all" 即所有副本都同步到數(shù)據(jù)時send方法才返回, 以此來完全判斷數(shù)據(jù)是否發(fā)送成功, 理論上來講數(shù)據(jù)不會丟失咳燕。

min.insync.replicas=1 意思是至少有1個replica返回成功,否則product異常

topic

image.png

LogEndOffset 的縮寫乒躺,表示每個 partition 的 log 最后一條 Message 的位置招盲。HW 是 HighWatermark 的縮寫,是指 consumer 能夠看到的此 partition 的位置
leader 負(fù)責(zé)維護和跟蹤 ISR(In-Sync Replicas 的縮寫嘉冒,表示副本同步隊列) 中所有 follower 滯后的狀態(tài)曹货。
ISR 是 AR 中的一個子集,由 leader 維護 ISR 列表讳推,follower 從 leader 同步數(shù)據(jù)有一些延遲(包括延遲時間 replica.lag.time.max.ms 和延遲條數(shù) replica.lag.max.messages 兩個維度, 當(dāng)前最新的版本 0.10.x 中只支持 replica.lag.time.max.ms 這個維度)顶籽,任意一個超過閾值都會把 follower 剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也會先存放在 OSR 中银觅。AR=ISR+OSR礼饱。

HW 俗稱高水位,HighWatermark 的縮寫究驴,取一個 partition 對應(yīng)的 ISR 中最小的 LEO 作為 HW镊绪,consumer 最多只能消費到 HW 所在的位置。另外每個 replica 都有 HW,leader 和 follower 各自負(fù)責(zé)更新自己的 HW 的狀態(tài)洒忧。對于 leader 新寫入的消息蝴韭,consumer 不能立刻消費,leader 會等待該消息被所有 ISR 中的 replicas 同步后更新 HW熙侍,此時消息才能被 consumer 消費榄鉴。這樣就保證了如果 leader 所在的 broker 失效履磨,該消息仍然可以從新選舉的 leader 中獲取。對于來自內(nèi)部 broKer 的讀取請求庆尘,沒有 HW 的限制蹬耘。

有兩個地方會對kafka的 Zookeeper 節(jié)點進(jìn)行維護:

Controller 來維護:Kafka 集群中的其中一個 Broker 會被選舉為 Controller,主要負(fù)責(zé) Partition 管理和副本狀態(tài)管理减余,也會執(zhí)行類似于重分配 partition 之類的管理任務(wù)综苔。在符合某些特定條件下,Controller 下的 LeaderSelector 會選舉新的 leader位岔,ISR 和新的 leader_epoch 及 controller_epoch 寫入 Zookeeper 的相關(guān)節(jié)點中如筛。同時發(fā)起 LeaderAndIsrRequest 通知所有的 replicas。

leader 來維護:leader 有單獨的線程定期檢測 ISR 中 follower 是否脫離 ISR, 如果發(fā)現(xiàn) ISR 變化抒抬,則會將新的 ISR 的信息返回到 Zookeeper 的相關(guān)節(jié)點中杨刨。

如果某一個 partition 的所有 replica 都掛了,就無法保證數(shù)據(jù)不丟失了擦剑。這種情況下有兩種可行的方案:

  • 等待 ISR 中任意一個 replica“活”過來妖胀,并且選它作為 leader

  • 選擇第一個“活”過來的 replica(并不一定是在 ISR 中)作為 leader (默認(rèn)情況)

https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability

LSO:是 LastStableOffset 的簡稱,對未完成的事務(wù)而言惠勒,LSO 的值等于事務(wù)中第一條消息的位置(firstUnstableOffset)赚抡,對已完成的事務(wù)而言,它的值同 HW 相同
LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

如何實現(xiàn)高吞吐量

  • 順序讀寫
  • 零拷貝
  • 文件分段
  • 批量發(fā)送
  • 數(shù)據(jù)壓縮

冪等性

PID纠屋。每個新的Producer在初始化的時候會被分配一個唯一的PID涂臣,這個PID對用戶是不可見的。
Sequence Numbler售担。(對于每個PID赁遗,該Producer發(fā)送數(shù)據(jù)的每個<Topic, Partition>都對應(yīng)一個從0開始單調(diào)遞增的Sequence Number

enable.idempotence=true

只能保證單個Producer對于同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等族铆。
代碼

  • 調(diào)用kafkaProducer的send方法將數(shù)據(jù)添加到 RecordAccumulator 中岩四,添加時會判斷是否需要新建一個 ProducerBatch,這時這個 ProducerBatch 還是沒有 PID 和 sequence number 信息的哥攘;
  • Producer 后臺發(fā)送線程 Sender剖煌,在 run() 方法中,會先根據(jù) TransactionManager 的 shouldResetProducerStateAfterResolvingSequences() 方法判斷當(dāng)前的 PID 是否需要重置献丑,重置的原因是因為:如果有topic-partition的batch已經(jīng)超時還沒處理完末捣,此時可能會造成sequence number 不連續(xù)。因為sequence number 有部分已經(jīng)分配出去了创橄,而Kafka服務(wù)端沒有收到這部分sequence number 的序號,Kafka服務(wù)端為了保證冪等性莽红,只會接受同一個pid的sequence number 等于服務(wù)端緩存sequence number +1的消息妥畏,所有這時候需要重置Pid來保證冪等性
  • Sender線程調(diào)用maybeWaitForProducerId()方法判斷是否要申請Pid邦邦,如果需要,會阻塞直到成功申請到Pid
  • 最后調(diào)用sendProduceRequest方法將消息發(fā)送出去

相關(guān)類

BatchMetadata
用來存儲Batch的元數(shù)據(jù)醉蚁, BatchMetadata類的幾個重要的字段
ProducerStateEntry
用于存儲每個producerId對應(yīng)的Batch燃辖,按照sequence從小到大進(jìn)行排序,最小的作為頭网棍,最大的作為尾 ,每個producerId的隊列失蹤保持著最多5個Batch,如果超過5個了睁壁,就從頭開始remove
ProducerAppendInfo用于在追加的消息寫到Log之前進(jìn)行校驗寒屯,主要對epoch、sequence number進(jìn)行校驗
currentEntry:ProducerStateEntry類型惑畴,就是pid對應(yīng)的ProducerStateEntry中batchMetadata尾部對象蛋欣,用于跟新追加的Batch做比較
validationType:校驗的方式。不同的類型如贷,校驗的規(guī)則不一樣

checkSequence方法是一個跟冪等性很重要的方法陷虎,此方法就是校驗sequence number的。有以下幾個判斷規(guī)則

  • 如果producerEpoch更新了杠袱,則追加的Batch里的appendFirstSeq必須是0
  • 當(dāng)currentLastSeq為-1時尚猿,說明此生產(chǎn)者還沒有成功追加過消息,appendFirstSeq也必須是0
  • appendFirstSeq = currentLastSeq+1楣富,或者當(dāng)currentLastSeq達(dá)到Int的最大值Int.MaxValue時谊路,appendFirstSeq為0

ProducerStateManager
用來管理Producer的狀態(tài),里面存儲了各個生產(chǎn)者與ProducerStateEntry的對應(yīng)關(guān)系菩彬。每個ProducerStateManager對應(yīng)一個TopicPartition

服務(wù)端流程

partition assgin

如何分配partition到broker

  • 副本因子不能大于 Broker 的個數(shù)缠劝;
  • 第一個分區(qū)(編號為0)的第一個副本放置位置是隨機從 brokerList 選擇的;
  • 其他分區(qū)的第一個副本放置位置相對于第0個分區(qū)依次往后移骗灶。也就是如果我們有5個 Broker惨恭,5個分區(qū),假設(shè)第一個分區(qū)放在第四個 Broker 上耙旦,那么第二個分區(qū)將會放在第五個 Broker 上脱羡;第三個分區(qū)將會放在第一個 Broker 上;第四個分區(qū)將會放在第二個 Broker 上免都,依次類推锉罐;
  • 剩余的副本相對于第一個副本放置位置其實是由 nextReplicaShift 決定的,而這個數(shù)也是隨機產(chǎn)生的绕娘;

如何分配partition到consumer
Range策略是對每個主題而言的脓规,首先對同一個主題里面的分區(qū)按照序號進(jìn)行排序,并對消費者按照字母順序進(jìn)行排序险领。

使用RoundRobin策略有兩個前提條件必須滿足:

  • 同一個Consumer Group里面的所有消費者的num.streams必須相等侨舆;
  • 每個消費者訂閱的主題必須相同秒紧。

file design

Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段挨下,就容易定期清除或刪除已經(jīng)消費完文件熔恢,減少磁盤占用。
通過索引信息可以快速定位message和確定response的最大大小臭笆。
通過index元數(shù)據(jù)全部映射到memory叙淌,可以避免segment file的IO磁盤操作。
通過索引文件稀疏存儲愁铺,可以大幅降低index文件元數(shù)據(jù)占用空間大小

事務(wù)性

link

有了Transaction ID后鹰霍,Kafka可保證:

  • 跨Session的數(shù)據(jù)冪等發(fā)送。當(dāng)具有相同Transaction ID的新的Producer實例被創(chuàng)建且工作時帜讲,舊的且擁有相同Transaction ID的Producer將不再工作衅谷。
  • 跨Session的事務(wù)恢復(fù)。如果某個應(yīng)用實例宕機似将,新的實例可以保證任何未完成的舊的事務(wù)要么Commit要么Abort获黔,使得新實例從一個正常狀態(tài)開始工作

Kafka 0.11.0.0引入了一個服務(wù)器端的模塊,名為Transaction Coordinator在验,用于管理Producer發(fā)送的消息的事務(wù)性玷氏。

該Transaction Coordinator維護Transaction Log,該log存于一個內(nèi)部的Topic內(nèi)腋舌。由于Topic數(shù)據(jù)具有持久性盏触,因此事務(wù)的狀態(tài)也具有持久性。

Producer并不直接讀寫Transaction Log块饺,它與Transaction Coordinator通信赞辩,然后由Transaction Coordinator將該事務(wù)的狀態(tài)插入相應(yīng)的Transaction Log。

Transaction Log的設(shè)計與Offset Log用于保存Consumer的Offset類似授艰。

為了區(qū)分寫入Partition的消息被Commit還是Abort辨嗽,Kafka引入了一種特殊類型的消息,即Control Message淮腾。該類消息的Value內(nèi)不包含任何應(yīng)用相關(guān)的數(shù)據(jù)糟需,并且不會暴露給應(yīng)用程序。它只用于Broker與Client間的內(nèi)部通信谷朝。

對于Producer端事務(wù)洲押,Kafka以Control Message的形式引入一系列的Transaction Marker。Consumer即可通過該標(biāo)記判定對應(yīng)的消息被Commit了還是Abort了圆凰,然后結(jié)合該Consumer配置的隔離級別決定是否應(yīng)該將該消息返回給應(yīng)用程序杈帐。

Producer<String, String> producer = new KafkaProducer<String, String>(props);
    
// 初始化事務(wù),包括結(jié)束該Transaction ID對應(yīng)的未完成的事務(wù)(如果有)
// 保證新的事務(wù)在一個正確的狀態(tài)下啟動
producer.initTransactions();

// 開始事務(wù)
producer.beginTransaction();

// 消費數(shù)據(jù)
ConsumerRecords<String, String> records = consumer.poll(100);

try{
    // 發(fā)送數(shù)據(jù)
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
    
    // 發(fā)送消費數(shù)據(jù)的Offset送朱,將上述數(shù)據(jù)消費與數(shù)據(jù)發(fā)送納入同一個Transaction內(nèi)
    producer.sendOffsetsToTransaction(offsets, "group1");

    // 數(shù)據(jù)發(fā)送及Offset發(fā)送均成功的情況下娘荡,提交事務(wù)
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 數(shù)據(jù)發(fā)送或者Offset發(fā)送出現(xiàn)異常時干旁,終止事務(wù)
    producer.abortTransaction();
} finally {
    // 關(guān)閉Producer和Consumer
    producer.close();
    consumer.close();
}
image.png

總結(jié)

  • PID與Sequence Number的引入實現(xiàn)了寫操作的冪等性
  • 寫操作的冪等性結(jié)合At Least Once語義實現(xiàn)了單一Session內(nèi)的Exactly Once語義
  • Transaction Marker與PID提供了識別消息是否應(yīng)該被讀取的能力驶沼,從而實現(xiàn)了事務(wù)的隔離性
  • Offset的更新標(biāo)記了消息是否被讀取炮沐,從而將對讀操作的事務(wù)處理轉(zhuǎn)換成了對寫(Offset)操作的事務(wù)處理
  • Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對應(yīng)的消息與一組讀操作(如果有)對應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(即Transaction Marker)來實現(xiàn)事務(wù)中涉及的所有讀寫操作同時對外可見或同時對外不可見
  • Kafka只提供對Kafka本身的讀寫操作的事務(wù)性回怜,不提供包含外部系統(tǒng)的事務(wù)性

與zk比較

Zookeeper
Zookeeper的原子廣播協(xié)議與兩階段提交以及Kafka事務(wù)機制有相似之處大年,但又有各自的特點

Kafka事務(wù)可COMMIT也可ABORT。而Zookeeper原子廣播協(xié)議只有COMMIT沒有ABORT玉雾。當(dāng)然翔试,Zookeeper不COMMIT某消息也即等效于ABORT該消息的更新。
Kafka存在多個Transaction Coordinator實例复旬,擴展性較好垦缅。而Zookeeper寫操作只能在Leader節(jié)點進(jìn)行,所以其寫性能遠(yuǎn)低于讀性能驹碍。
Kafka事務(wù)是COMMIT還是ABORT完全取決于Producer即客戶端壁涎。而Zookeeper原子廣播協(xié)議中某條消息是否被COMMIT取決于是否有一大半FOLLOWER ACK該消息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末志秃,一起剝皮案震驚了整個濱河市怔球,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浮还,老刑警劉巖竟坛,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異钧舌,居然都是意外死亡担汤,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門洼冻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來崭歧,“玉大人,你說我怎么就攤上這事碘赖〖萑伲” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵普泡,是天一觀的道長播掷。 經(jīng)常有香客問我,道長撼班,這世上最難降的妖魔是什么歧匈? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮砰嘁,結(jié)果婚禮上件炉,老公的妹妹穿的比我還像新娘勘究。我一直安慰自己,他們只是感情好斟冕,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布口糕。 她就那樣靜靜地躺著,像睡著了一般磕蛇。 火紅的嫁衣襯著肌膚如雪景描。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天秀撇,我揣著相機與錄音超棺,去河邊找鬼。 笑死呵燕,一個胖子當(dāng)著我的面吹牛棠绘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播再扭,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼氧苍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了霍衫?” 一聲冷哼從身側(cè)響起候引,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎敦跌,沒想到半個月后澄干,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡柠傍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年麸俘,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惧笛。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡从媚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出患整,到底是詐尸還是另有隱情拜效,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布各谚,位于F島的核電站紧憾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏昌渤。R本人自食惡果不足惜赴穗,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧般眉,春花似錦了赵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至辑奈,卻和暖如春苛茂,著一層夾襖步出監(jiān)牢的瞬間已烤,已是汗流浹背鸠窗。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留胯究,地道東北人稍计。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像裕循,于是被迫代替她去往敵國和親臣嚣。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容