1.冪等消息
為了解決重試導(dǎo)致的消息重復(fù)、亂序問題柳沙,kafka引入了冪等消息跃赚。冪等消息保證producer在一次會話內(nèi)寫入一個partition內(nèi)的消息具有冪等性囱晴,可以通過重試來確保消息發(fā)布的Exactly Once語義澳泵。
實現(xiàn)邏輯很簡單:
- 區(qū)分producer會話
producer每次啟動后实愚,首先向broker申請一個全局唯一的pid,用來標(biāo)識本次會話兔辅。
- 消息檢測
message_v2 增加了sequence number字段腊敲,producer每發(fā)一批消息,seq就加1维苔。
broker在內(nèi)存維護(pid,seq)映射碰辅,收到消息后檢查seq,如果介时,
new_seq=old_seq+1: 正常消息没宾;
new_seq<=old_seq : 重復(fù)消息;
new_seq>old_seq+1: 消息丟失沸柔;
- producer重試
producer在收到明確的的消息丟失ack榕吼,或者超時后未收到ack,要進行重試勉失。
2.事務(wù)消息
考慮在stream處理的場景中,需要多個消息的原子寫入語義原探,要么全部寫入成功乱凿,要么全部失敗,這就是kafka事務(wù)消息要解決的問題咽弦。
事務(wù)消息是由producer徒蟆、事務(wù)協(xié)調(diào)器、broker型型、組協(xié)調(diào)器段审、consumer共同參與實現(xiàn)的,
1)producer
為producer指定固定的TransactionalId闹蒜,可以穿越producer的多次會話(producer重啟/斷線重連)中寺枉,持續(xù)標(biāo)識producer的身份抑淫。
使用epoch標(biāo)識producer的每一次"重生",防止同一producer存在多個會話姥闪。
producer遵從冪等消息的行為始苇,并在發(fā)送的recordbatch中增加事務(wù)id和epoch。
2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)
引入事務(wù)協(xié)調(diào)器筐喳,以兩階段提交的方式催式,實現(xiàn)消息的事務(wù)提交。
事務(wù)協(xié)調(diào)器使用一個特殊的topic:transaction避归,來做事務(wù)提交日志荣月。
事務(wù)控制器通過RPC調(diào)研,協(xié)調(diào) broker 和 consumer coordinator 實現(xiàn)事務(wù)的兩階段提交梳毙。
每一個broker都會啟動一個事務(wù)協(xié)調(diào)器哺窄,使用hash(TransactionalId)確定producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個集群的負載均衡顿天。
3) broker
broker處理在事務(wù)協(xié)調(diào)器的commit/abort控制消息堂氯,把控制消息向正常消息一樣寫入topic(和正常消息交織在一起,用來確認事務(wù)提交的日志偏移)牌废,并向前推進消息提交偏移hw咽白。
4) 組協(xié)調(diào)器
如果在事務(wù)過程中,提交了消費偏移鸟缕,組協(xié)調(diào)器在offset log中寫入事務(wù)消費偏移晶框。當(dāng)事務(wù)提交時,在offset log中寫入事務(wù)offset確認消息懂从。
5)consumer
consumer過濾未提交消息和事務(wù)控制消息授段,使這些消息對用戶不可見。
有兩種實現(xiàn)方式番甩,
- consumer緩存方式
設(shè)置isolation.level=read_uncommitted侵贵,此時topic的所有消息對consumer都可見。
consumer緩存這些消息缘薛,直到收到事務(wù)控制消息窍育。若事務(wù)commit,則對外發(fā)布這些消息宴胧;若事務(wù)abort漱抓,則丟棄這些消息。
- broker過濾方式
設(shè)置isolation.level=read_committed恕齐,此時topic中未提交的消息對consumer不可見乞娄,只有在事務(wù)結(jié)束后,消息才對consumer可見。
broker給consumer的BatchRecord消息中仪或,會包含以列表确镊,指明哪些是"abort"事務(wù),consumer丟棄abort事務(wù)的消息即可溶其。
事務(wù)消息處理流程如圖1所示骚腥,
圖1 事務(wù)消息業(yè)務(wù)流程
流程說明:
1. 查找事務(wù)協(xié)調(diào)器 -- FindCoordinatorRequest
事務(wù)協(xié)調(diào)器是分配pid和管理事務(wù)的核心,produer首先對任何一個broker發(fā)送FindCoordinatorRequest瓶逃,發(fā)現(xiàn)自己的事務(wù)協(xié)調(diào)器束铭。
2. 申請pid -- InitPidRequest
緊接著,producer向事務(wù)協(xié)調(diào)器發(fā)送InitPidRequest厢绝,申請生成pid契沫。
2a.當(dāng)指定了transactional.id時,事務(wù)協(xié)調(diào)器為producer分區(qū)pid昔汉,并更新epoch懈万,把(tid,pid)的映射關(guān)系寫入事務(wù)日志。 同時清理tid任何未完成的事務(wù)靶病,丟棄未提交的消息会通。
3. 啟動事務(wù)
啟動事務(wù)是producer的本地操作,促使producer更新內(nèi)部狀態(tài)娄周,不會和事務(wù)協(xié)調(diào)器發(fā)生關(guān)系涕侈。
事務(wù)協(xié)調(diào)器自動啟動事務(wù),始終處在一個接一個的事務(wù)處理狀態(tài)機中煤辨。
4. consume-transform-produce 事務(wù)循環(huán)
4.1. 注冊partition -- AddPartitionsToTxnRequest
對于每一個要在事務(wù)中寫消息的topic分區(qū)裳涛,producer應(yīng)當(dāng)在第一次發(fā)消息前,向事務(wù)處理器注冊分區(qū)众辨。
4.1a.事務(wù)處理器把事務(wù)關(guān)聯(lián)的分區(qū)寫入事務(wù)日志端三。
在提交或終止事務(wù)時,事務(wù)協(xié)調(diào)器需要這些信息鹃彻,控制事務(wù)涉及的所有分區(qū)leader完成事務(wù)提交或終止郊闯。
4.2. 寫消息 -- ProduceRequest
4.2a. producer向分區(qū)leader寫消息,消息中包含tid,pid,epoch和seq蛛株。
4.3. 提交消費偏移 -- AddOffsetCommitsToTxnRequest
4.3a. producer向事務(wù)協(xié)調(diào)器發(fā)送消費偏移虚婿,事務(wù)協(xié)調(diào)器在事務(wù)日志中記錄偏移信息,并把組協(xié)調(diào)器返回給producer泳挥。
4.4. 提交消費偏移 -- TxnOffsetCommitRequest
4.4a. producer向組協(xié)調(diào)器發(fā)送TxnOffsetCommitRequest,組協(xié)調(diào)器把偏移信息寫入偏移日志至朗。但是屉符,要一直等到事務(wù)提交后,這個偏移才生效,對外部可見矗钟。
5. 提交或終止事務(wù)
5.1. EndTxnRequest
收到提交或終止事務(wù)的請求時唆香,事務(wù)處理器執(zhí)行下面的操作:
1. 在事務(wù)日志中寫入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。
2. 通過WriteTxnMarkerRequest向事務(wù)中的所有broker發(fā)事務(wù)控制消息(5.2)吨艇。
3. 在事務(wù)之日中寫入COMMITTED或ABORTED消息(5.3)躬它。
5.2. WriteTxnMarkerRequest
這個消息由事務(wù)處理器發(fā)給事務(wù)中所涉及分區(qū)的leader。
當(dāng)收到這個消息后东涡,broker會在分區(qū)log中寫入一個COMMIT或ABORT控制消息冯吓。同時,也會更新該分區(qū)的事務(wù)提交偏移hw疮跑。
如果事務(wù)中有提交消費偏移组贺, broker也會把控制消息寫入 __consumer-offsets log,并通知組協(xié)調(diào)器使事務(wù)中提交的消費偏移生效祖娘。
5.3. 寫最終的commit或abort消息
當(dāng)所有的commit或abort消息寫入數(shù)據(jù)日志失尖,事務(wù)協(xié)調(diào)器在事務(wù)日志中寫入事務(wù)日志,標(biāo)志這事務(wù)結(jié)束渐苏。
至此掀潮,本事務(wù)的所有狀態(tài)信息都可以被刪除,可以開始一個新的事務(wù)琼富。
在實現(xiàn)上仪吧,還有很多細節(jié),比如公黑,事務(wù)協(xié)調(diào)器會啟動定時器邑商,用來檢測并終止開始后長時間不活動的事務(wù),具體請參考下面列出的kafka社區(qū)技術(shù)文檔凡蚜。
【總結(jié)】:
我們要認識到人断,雖然kafka事務(wù)消息提供了多個消息原子寫的保證,但它不保證原子讀朝蜘。
例如恶迈,
1)事務(wù)向topic_a和topic_b兩個分區(qū)寫入消息,在事務(wù)提交后的某個時刻谱醇,topic_a的全部副本失效暇仲。這時topic_b中的消息可以正常消費,但topic_a中的消息就丟失了副渴。
2)假如consumer只消費了topic_a奈附,沒有消費topic_b,這樣也不能讀到完整的事務(wù)消息煮剧。
3)典型的kafka stream應(yīng)用從多個topic消費斥滤,然后向一個或多個topic寫将鸵。在一次故障后,kafka stream應(yīng)用重新開始處理流數(shù)據(jù)佑颇,由于從多個topic讀到的數(shù)據(jù)之間不存在穩(wěn)定的順序(即便只有一個topic顶掉,從多個分區(qū)讀到的數(shù)據(jù)之間也沒有穩(wěn)定的順序),那么兩次處理輸出的結(jié)果就可能會不一樣挑胸。
也就是說痒筒,雖然kafka log持久化了數(shù)據(jù),也可以通過指定offset多次消費數(shù)據(jù)茬贵,但由于分區(qū)數(shù)據(jù)之間的無序性簿透,導(dǎo)致每次處理輸出的結(jié)果都是不同的。這使得kafka stream不能像hadoop批處理任務(wù)一樣闷沥,可以隨時重新執(zhí)行萎战,保證每次執(zhí)行的結(jié)果相同。除非我們只從一個topic分區(qū)讀數(shù)據(jù)舆逃。
【參考】:
[0] https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
[1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-4.1AddPartitionsToTxnRequest
[2]https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#
[3]https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
[4]https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
[5]https://www.confluent.io/blog/transactions-apache-kafka/