1. Kafka internal
1.1 Request processing
Kafka 的 request 分為:
- produce request
- fetch request
- metadata request
- and so on
1.1.1 Metadata request
kafka client 會定期向任意一個 broker(所有的 brokers 都擁有 partition metadata)發(fā)起 metadata request 來獲取到每個 topic partition leader 的位置偷霉,并緩存在本地(有定期的緩存更新機制)距境,從而在需要發(fā)送 produce or fetch requests 時正確的定位 broker 的位置泽西。
如果緩存更新延遲,導致錯誤的 produce request 發(fā)到非 leader partition 的 broker 上摆碉,該 broker 會直接返回 error踪央,而不是像 ElasticSearch 那樣內部轉發(fā)赫编。
client 收到 produce/fetch response 的 "not a leader" error 后會 refresh metadata拄衰。
1.1.2 Produce request
produce request 的過程之前也聊過了蛤奢,一個重要配置是 acks鬼癣。
1.1.3 Fetch request
fetch request 由 consumer 或者 follower replicas 發(fā)起陶贼。
client 發(fā)起人request:
- 通知 broker 自己要 poll 的 messages 的 topics, partitions & offsets
-
可以設置 messages 數(shù)量的上限 & 下限,上限是為了防止 OOM待秃,下限是為了減少網(wǎng)絡次數(shù)拜秧。
- 可以設置 timeout,如果 timeout 時間內章郁,messages 量不夠枉氮,也直接返回目前可以返回的 messages
consumers 只能 poll 到寫入所有 replicas 的 messages,這也是為了盡量保證 consistency驱犹。
這就帶來一個問題嘲恍,replication 是有 lag 的,會導致消息無法第一時間到達 consumers雄驹。
// todo : This delay is limited to replica.lag.time.max.ms—the amount of time a replica can be delayed in replicating new messages while still being considered in-sync.
broker 會校驗 request 的正確性佃牛,包括 leader partition, offset。
broker 使用 zero-copy method 將 messages 發(fā)送給 client医舆,大大增加了性能俘侠。
broker 處理 fetch request 的方式和處理 produce request 非常相似。
1.1.4 OffsetCommitRequest
過去蔬将,kafka 使用 zk 維護 consumers 的 offsets爷速。當 consumer 啟動時,向 zk 發(fā)請求獲取要讀的 partition 以及 offset霞怀。
但是因為頻繁的讀寫 offsets 對 zk 的壓力較大惫东,所以推薦通過 Kafka broker 直接管理,現(xiàn)在 kafka 通過創(chuàng)建一個單獨的特殊的 __consumer_offsets topic 來維護 offsets毙石。
1.2 Physical Storage
我們會將 partition 在物理層面拆分成 segments廉沮。默認情況下,每個 segment 包含 1GB 的數(shù)據(jù)/一周的數(shù)據(jù)徐矩。當 broker 往 partition 中寫數(shù)據(jù)時滞时,如果當前 segment 的限制到了,會關閉該文件 & 創(chuàng)建一個新的滤灯。當前正在使用的 segment 叫做 active segment坪稽。每個 segment 會單獨存儲在 *.log 的物理文件中。
1.2.1 Indexes
由于 Kafka 支持 consumers 從任意可用的 offset 處讀取 message鳞骤,所以需要能快速的定位 message 所在的 segment 位置窒百,Kafka 為每個 partition 創(chuàng)建有 index,index 維護從 offset 到 segment file 及 file 中的位置的映射豫尽。
index 也按照 segment 來拆分贝咙。
1.2.2 Compact
有一種場景,只保存相同 key 的最新的 message拂募,就用到了 compact庭猩,將老舊的 message delete 掉。本質上也是一種 retention policy陈症,技術細節(jié)這里不做介紹了蔼水。
這部分內容主要整理總結自 《Kafka The Definitive Guide》 Chapter 6: Reliable Data Delivery
2. Reliable
Kafka 的 components 有 producer, broker, consumer,整體服務的 reliable(可靠性) 依賴每一個環(huán)節(jié)的 reliable录肯,需要 Linux administrators, network and storage administrators, and the application developers 共同努力趴腋。
最常為人知的 Reliability Guarantees 是 ACID,它只要用在關系型數(shù)據(jù)庫中论咏。
2.1 Reliability Guarantees
Kafka 提供的基本的 guarantee 包括:
- 單個 partition 中 messages 的 order
- produce 只有 messages 被寫到所有的 replicas 時优炬,該 messages 才被認為 "committed"
- acks: 0, producer 通過網(wǎng)絡成功發(fā)出去就不管了
- acks: 1, producer 等到 leader 收到消息
- acks: all, producer 等到 message committed
- messages 只要 committed,就不會丟失厅贪,因為已經(jīng)同步到所有 replicas
- consumers 只能讀到 committed 的 messages
但是這種 guarantee 只能保證基本的 reliable蠢护,并不能保證 fully reliable。
系統(tǒng)中存在著 trade-offs养涮,需要在 reliable 和 availability, high throughput, low latency, hardware costs 間做 trade-offs葵硕。
2.2 Replication
replication 是 kafka reliability guarantee 的核心。通過做 replication 來對抗 broker 的 crash贯吓。
如何判斷某個 replica 是處于 in-sync 狀態(tài)呢:
- 不言而喻 leader 是 in-sync 的
- 與 zk 有活躍的 session懈凹,定期(默認 6s)給 zk 發(fā)送 heartbeat
- 在過去10秒(可配置)內從 leader 那里 fetch messages
- 在過去10秒內獲取 leader 的最新消息。 也就是說悄谐,追隨者仍然從領導者那里得到消息是不夠的; 它必須幾乎沒有滯后介评。
可以看出:
- in-sync 是指 replica 處在活躍的持續(xù)同步數(shù)據(jù)的狀態(tài),但不代表 leader 的數(shù)據(jù)已經(jīng)全部同步到 replica 中爬舰。即使 replica 處在 in-sync 狀態(tài)们陆,消息數(shù)據(jù)本身的同步也是有延遲的。
- 只要在一個時間周期內有同步操作洼专,replica 就被認為是 in-sync 的棒掠,這會導致誤判。如 replica 向 zk, leader 活躍過狀態(tài)后屁商,立馬 crash(out of sync)烟很,但由于還沒到下次同步狀態(tài)的時間,此時 leader 會誤以為此 replica 還是 in-sync 的蜡镶。
什么情況下可能導致 replicas out-of-sync 呢:
- broker 在某些時間做 garbage collection雾袱,導致進程 pause 一段時間,此時會丟掉和 zk, leader 的鏈接官还,被認為 out-of-sync芹橡,在 gc 完后,重新鏈接 leader & zk望伦,恢復回 in-sync 狀態(tài)
- replica 處在的 broker crash/ network partition
2.3 Broker Configuration
對于 reliable 的配置林说,可以配置在 broker level煎殷,也可以配置在 topic level。
2.3.1 Replication Factor
replication.factor 即副本個數(shù)腿箩,這個參數(shù)在 topic 創(chuàng)建后豪直,也可以手動修改的。
replication.factor 越大珠移,會有越高的 reliability弓乙,但是會帶來更大的備份延遲時間。
不同的 replicas 建議在不同的 brokers 上钧惧,而存有 partition replicas 的 broker 盡量在不同的 rack 上暇韧。
topic level 的配置是 replication.factor
broker level 的配置是 default.replication.factor(針對自動創(chuàng)建的 topic)
2.3.2 Unclean Leader Election
clean leader election 是指當 leader unavailable 時,kafka 會自動從 in-sync 的 replicas 中選舉一個作為新 leader浓瞪。所有 committed data(committed 是指 message 已經(jīng)備份到所有 in-sync 的 replicas 中) 都不會丟失懈玻。
但是,當 leader unavailable 時追逮,完全沒有 in-sync replicas 存在時酪刀,該怎么辦?參數(shù) unclean.leader.election.enable 決定了這種情況發(fā)生時的處理辦法钮孵,默認配置為 true骂倘。
假設存在 3 個 replicas,如果 2 個 followers 全變?yōu)?unavailable巴席,leader 會繼續(xù)接受 write request, followers 變?yōu)?out-of-sync±裕現(xiàn)在 leader unavailable,之前 unavailable 的 out-of-sync 的 followers 恢復后漾唉,系統(tǒng)如何抉擇:
- 如果不允許 out-of-sync 的 replica 參與 election 變成新 leader荧库,那么 partition 只能處于 unavailable 狀態(tài),直到之前的 leader 恢復赵刑。這種配置適合金融等強數(shù)據(jù)一致性問題的場景分衫。
- 如果允許 out-of-sync 的 replica 變成新 leader,可能會丟失一部分寫在老 leader 中的數(shù)據(jù)般此,可能引起不同 consumer group 讀到的數(shù)據(jù)不一致蚪战。這種配置適合對數(shù)據(jù)一致性不那么敏感的場景。
可以看出铐懊,這也是在 consistency 和 availability之間做選擇邀桑,符合 CAP 理論。默認為 true科乎,說明 Kafka 傾向于選擇 AP & weak consistency壁畸。
Kafka 的數(shù)據(jù)同步模式很像 Master / Slave Async。
2.3.3 Minimum In-Sync Replicas
min.insync.replicas 這個參數(shù)的意思有些晦澀,我讀了兩三遍才算理解捏萍。它是指 replicas 中太抓,至少有幾個 replicas 處于 in-sync 狀態(tài),leader 才會執(zhí)行寫請求照弥,否則拒絕寫腻异。而不是 message 至少寫入了幾個 replicas。
假設有 3 個 replicas(leader 包含在內)这揣,如果 min.insync.replicas = 2,則當 producer 寫 messages 時影斑,至少要有兩個 replicas 處于 in-sync 狀態(tài)给赞,leader 才會執(zhí)行寫入操作,否則返回 producer NotEnoughReplicasException矫户。
所以該參數(shù)配置的越高片迅,則 messages 越可能成功寫入到更多的副本。如果該參數(shù)設置為 1皆辽,則 leader 不會關心其他 replicas 是否處于及時更新數(shù)據(jù)的狀態(tài)柑蛇,只要 leader 自身寫沒問題,就寫入了驱闷,而其他 replicas 可能過很長時間才能同步到數(shù)據(jù)耻台,這就增加了 leader unavailable 時,數(shù)據(jù)丟失的風險空另。
2.4 Using Producers in a Reliable System
上面說完了如何配置 broker 來盡量保證 reliable盆耽,但是如果 producer 配置不當,也會導致丟數(shù)據(jù)的情況發(fā)生扼菠。
2.4.1 ACK
考慮如下場景摄杂,partition 有 3 個 replicas:
- acks=0。producer 向 leader 發(fā)送請求時循榆,如果 serialized error 或者 network fail 時析恢,會返回 error,能成功發(fā)出去就認為寫成功秧饮,但是 leader & replicas 在過程中出任何問題導致數(shù)據(jù)沒寫成功映挂,都會造成數(shù)據(jù)丟失。
- acks=1浦楣。當 producer 向 leader 寫消息袖肥,leader 寫成功,返回 producer success振劳,之后在數(shù)據(jù)還沒同步到 replicas 時 crash椎组,replicas 中的一個晉升為 leader。此時历恐,producer 認為數(shù)據(jù)寫成功了寸癌,但是 leader 卻把這條數(shù)據(jù)丟失了专筷。
- acks=all≌粑可以規(guī)避第一種情況磷蛹,但是,當 producer 向 leader 寫消息時溪烤,leader 宕機了 & 新的 leader 還沒選出來味咳,producer 會收到 “Leader not Available” 。如果 producer 沒有處理好異常檬嘀,沒有重試直到寫成功槽驶,這條消息也會丟失。但是如果做好 error handle鸳兽,是可以保證數(shù)據(jù)不丟失的掂铐。
單獨 acks 的配置不能完全保證數(shù)據(jù)的成功存儲。對于 producer 來說揍异,為了保證 reliable全陨,需要:
- Use the correct acks configuration to match reliability requirements
- Handle errors correctly both in configuration and in code
2.4.2 Retries
當有 error 發(fā)生時,需要進行 handle衷掷。error 分為兩種:
- Retriable errors: KafkaProducer 針對這種異常辱姨,可以自動發(fā)起重試。全部邏輯隱藏在 send 方法中棍鳖,開發(fā)人員不需要人工干預
- a connection error can be resolved because the connection may get reestablished.
- A “no leader” error can be resolved when a new leader is elected for the partition.
- Nonretriable errors: 這種錯誤沒法通過重試修復炮叶,會直接拋異常,需要開發(fā)人員在代碼層面處理
- message size too large error
- serialization errors
retry 也有風險:
- 可能造成消息被重復寫入多次渡处【迪ぃ考慮以下場景:broker 成功寫消息,但是由于網(wǎng)絡原因医瘫,沒及時返回 ack侣肄,producer 認為消息創(chuàng)建失敗,發(fā)起 retry醇份,會導致消息寫入兩次稼锅。當然了,我們可以在 consumer 端解決這個問題僚纷,將 consumer 做成 idempotent 冪等的接口矩距。
- producer 耗盡了 retry 次數(shù)
這些處理方式都需要根據(jù)業(yè)務需求,做出恰當?shù)倪x擇怖竭。
2.5 Using Consumers in a Reliable System
由于 Kafka broker 內部已經(jīng)保證了返回給 consumer 的都是 committed data(保存到全部 in-sync replicas 中)锥债,所以 consumer 需要處理的事情就簡單多了。但是 consumer 還需要能夠正確的處理跟蹤消息 offset & 處理消息,保證每次獲取到的 message 的正確性哮肚,既不重復登夫,也不缺失。
consumer 中有 4 個參數(shù)可以影響 reliable:
- group.id: 之前已經(jīng)有過詳細介紹
- auto.offset.reset: 控制當 consumer 提交的 offset 不存在允趟,或者 consumer 剛啟動 & offset topic 中沒有記錄過 offset 時的處理方法
- earliest: 可以導致重復處理消息
- latest: 可能導致遺漏處理消息
-
enable.auto.commit: 這個配置很關鍵恼策,是否讓 consumer 自動提交 offset,或者開發(fā)人員在自己的代碼中提交
- true: 好處潮剪,不需要開發(fā)人員寫代碼干預涣楷。壞處:可控性差,由于是周期性提交 offset抗碰,可能會重復處理消息总棵。
- false: 好處,靈活控制提交頻率
- auto.com mit.interval.ms: 如果 enable.auto.commit=true改含,這個配置自動提交的間隔周期,默認是 5s迄汛,周期越短捍壤,越能減少意外發(fā)生時,重復處理的消息數(shù)
2.5.1 Consumer retry
有時鞍爱,當 consumer poll 到 messages 后鹃觉,處理 messages 時會發(fā)生錯誤,如短暫的寫入 db 失敗睹逃,但是又不想丟失消息盗扇,這時可以:
- 存儲消息到某個 buffer(本地 queue, redis, db...) 中,consumer 繼續(xù)提交最新的 offset
- 存儲消息到 kafka 中單獨為這種情況創(chuàng)建的 topic 中沉填,有專門的 consumer 來從該 topic 中消費