4. 設(shè)計(jì)思想
4.1 動(dòng)機(jī)
我們?cè)O(shè)計(jì)的 Kafka 能夠作為一個(gè)統(tǒng)一的平臺(tái)來(lái)處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)饋送迷帜。 要做到這點(diǎn)物舒,我們必須考慮相當(dāng)廣泛的用例。
Kafka 必須具有高吞吐量來(lái)支持高容量事件流戏锹,例如實(shí)時(shí)日志聚合冠胯。
Kafka 需要能夠正常處理大量的數(shù)據(jù)積壓,以便能夠支持來(lái)自離線系統(tǒng)的周期性數(shù)據(jù)加載景用。
這也意味著系統(tǒng)必須處理低延遲分發(fā)涵叮,來(lái)處理更傳統(tǒng)的消息傳遞用例。
我們希望支持對(duì)這些饋送進(jìn)行分區(qū)伞插,分布式割粮,以及實(shí)時(shí)處理來(lái)創(chuàng)建新的分發(fā)饋送等特性。由此產(chǎn)生了我們的分區(qū)模式和消費(fèi)者模式媚污。
最后舀瓢,在數(shù)據(jù)流被推送到其他數(shù)據(jù)系統(tǒng)進(jìn)行服務(wù)的情況下,我們要求系統(tǒng)在出現(xiàn)機(jī)器故障時(shí)必須能夠保證容錯(cuò)耗美。
為支持這些使用場(chǎng)景導(dǎo)致我們?cè)O(shè)計(jì)了一些獨(dú)特的元素京髓,使得 Kafka 相比傳統(tǒng)的消息系統(tǒng)更像是數(shù)據(jù)庫(kù)日志。我們將在后面的章節(jié)中概述設(shè)計(jì)中的部分要素商架。
4.2 持久化
不要害怕文件系統(tǒng)堰怨!
Kafka 對(duì)消息的存儲(chǔ)和緩存嚴(yán)重依賴于文件系統(tǒng)。人們對(duì)于“磁盤速度慢”的普遍印象蛇摸,使得人們對(duì)于持久化的架構(gòu)能夠提供強(qiáng)有力的性能產(chǎn)生懷疑备图。事實(shí)上,磁盤的速度比人們預(yù)期的要慢的多赶袄,也快得多揽涮,這取決于人們使用磁盤的方式。而且設(shè)計(jì)合理的磁盤結(jié)構(gòu)通扯龇危可以和網(wǎng)絡(luò)一樣快脚猾。
關(guān)于磁盤性能的關(guān)鍵事實(shí)是舟陆,磁盤的吞吐量和過(guò)去十年里磁盤的尋址延遲不同。因此,使用6個(gè)7200rpm胸哥、SATA接口、RAID-5的磁盤陣列在JBOD配置下的順序?qū)懭氲男阅芗s為600MB/秒荆隘,但隨機(jī)寫入的性能僅約為100k/秒无埃,相差6000倍以上。因?yàn)榫€性的讀取和寫入是磁盤使用模式中最有規(guī)律的喊积,并且由操作系統(tǒng)進(jìn)行了大量的優(yōu)化∨肜В現(xiàn)代操作系統(tǒng)提供了 read-ahead 和 write-behind 技術(shù),read-ahead 是以大的 data block 為單位預(yù)先讀取數(shù)據(jù)乾吻,而 write-behind 是將多個(gè)小型的邏輯寫合并成一次大型的物理磁盤寫入髓梅。關(guān)于該問(wèn)題的進(jìn)一步討論可以參考 ACM Queue article,他們發(fā)現(xiàn)實(shí)際上順序磁盤訪問(wèn)在某些情況下比隨機(jī)內(nèi)存訪問(wèn)還要快绎签!
為了彌補(bǔ)這種性能差異枯饿,現(xiàn)代操作系統(tǒng)在越來(lái)越注重使用內(nèi)存對(duì)磁盤進(jìn)行 cache。現(xiàn)代操作系統(tǒng)主動(dòng)將所有空閑內(nèi)存用作 disk caching诡必,代價(jià)是在內(nèi)存回收時(shí)性能會(huì)有所降低奢方。所有對(duì)磁盤的讀寫操作都會(huì)通過(guò)這個(gè)統(tǒng)一的 cache搔扁。如果不使用直接I/O,該功能不能輕易關(guān)閉蟋字。因此即使進(jìn)程維護(hù)了 in-process cache稿蹲,該數(shù)據(jù)也可能會(huì)被復(fù)制到操作系統(tǒng)的 pagecache 中,事實(shí)上所有內(nèi)容都被存儲(chǔ)了兩份鹊奖。
此外苛聘,Kafka 建立在 JVM 之上,任何了解 Java 內(nèi)存使用的人都知道兩點(diǎn):
對(duì)象的內(nèi)存開(kāi)銷非常高忠聚,通常是所存儲(chǔ)的數(shù)據(jù)的兩倍(甚至更多)设哗。
隨著堆中數(shù)據(jù)的增加,Java 的垃圾回收變得越來(lái)越復(fù)雜和緩慢两蟀。
受這些因素影響网梢,相比于維護(hù) in-memory cache 或者其他結(jié)構(gòu),使用文件系統(tǒng)和 pagecache 顯得更有優(yōu)勢(shì)--我們可以通過(guò)自動(dòng)訪問(wèn)所有空閑內(nèi)存將可用緩存的容量至少翻倍赂毯,并且通過(guò)存儲(chǔ)緊湊的字節(jié)結(jié)構(gòu)而不是獨(dú)立的對(duì)象澎粟,有望將緩存容量再翻一番。 這樣使得32GB的機(jī)器緩存容量可以達(dá)到28-30GB,并且不會(huì)產(chǎn)生額外的 GC 負(fù)擔(dān)欢瞪。此外活烙,即使服務(wù)重新啟動(dòng),緩存依舊可用遣鼓,而 in-process cache 則需要在內(nèi)存中重建(重建一個(gè)10GB的緩存可能需要10分鐘)啸盏,否則進(jìn)程就要從 cold cache 的狀態(tài)開(kāi)始(這意味著進(jìn)程最初的性能表現(xiàn)十分糟糕)。 這同時(shí)也極大的簡(jiǎn)化了代碼骑祟,因?yàn)樗斜3?cache 和文件系統(tǒng)之間一致性的邏輯現(xiàn)在都被放到了 OS 中回懦,這樣做比一次性的進(jìn)程內(nèi)緩存更準(zhǔn)確、更高效次企。如果你的磁盤使用更傾向于順序讀取怯晕,那么 read-ahead 可以有效的使用每次從磁盤中讀取到的有用數(shù)據(jù)預(yù)先填充 cache。
這里給出了一個(gè)非常簡(jiǎn)單的設(shè)計(jì):相比于維護(hù)盡可能多的 in-memory cache缸棵,并且在空間不足的時(shí)候匆忙將數(shù)據(jù) flush 到文件系統(tǒng)舟茶,我們把這個(gè)過(guò)程倒過(guò)來(lái)。所有數(shù)據(jù)一開(kāi)始就被寫入到文件系統(tǒng)的持久化日志中堵第,而不用在 cache 空間不足的時(shí)候 flush 到磁盤吧凉。實(shí)際上,這表明數(shù)據(jù)被轉(zhuǎn)移到了內(nèi)核的 pagecache 中踏志。
這種 pagecache-centric 的設(shè)計(jì)風(fēng)格出現(xiàn)在一篇關(guān)于 Varnish 設(shè)計(jì)的文章中阀捅。
常量時(shí)間就足夠了
消息系統(tǒng)使用的持久化數(shù)據(jù)結(jié)構(gòu)通常是和 BTree 相關(guān)聯(lián)的消費(fèi)者隊(duì)列或者其他用于存儲(chǔ)消息源數(shù)據(jù)的通用隨機(jī)訪問(wèn)數(shù)據(jù)結(jié)構(gòu)。BTree 是最通用的數(shù)據(jù)結(jié)構(gòu)针余,可以在消息系統(tǒng)能夠支持各種事務(wù)性和非事務(wù)性語(yǔ)義饲鄙。 雖然 BTree 的操作復(fù)雜度是 O(log N)凄诞,但成本也相當(dāng)高。通常我們認(rèn)為 O(log N) 基本等同于常數(shù)時(shí)間忍级,但這條在磁盤操作中不成立幔摸。磁盤尋址是每10ms一跳,并且每個(gè)磁盤同時(shí)只能執(zhí)行一次尋址颤练,因此并行性受到了限制。 因此即使是少量的磁盤尋址也會(huì)很高的開(kāi)銷驱负。由于存儲(chǔ)系統(tǒng)將非赤戮粒快的cache操作和非常慢的物理磁盤操作混合在一起,當(dāng)數(shù)據(jù)隨著 fixed cache 增加時(shí)跃脊,可以看到樹(shù)的性能通常是非線性的——比如數(shù)據(jù)翻倍時(shí)性能下降不只兩倍宇挫。
所以直觀來(lái)看,持久化隊(duì)列可以建立在簡(jiǎn)單的讀取和向文件后追加兩種操作之上酪术,這和日志解決方案相同器瘪。這種架構(gòu)的優(yōu)點(diǎn)在于所有的操作復(fù)雜度都是O(1),而且讀操作不會(huì)阻塞寫操作绘雁,讀操作之間也不會(huì)互相影響橡疼。這有著明顯的性能優(yōu)勢(shì),由于性能和數(shù)據(jù)大小完全分離開(kāi)來(lái)——服務(wù)器現(xiàn)在可以充分利用大量廉價(jià)庐舟、低轉(zhuǎn)速的1+TB SATA硬盤欣除。 雖然這些硬盤的尋址性能很差,但他們?cè)诖笠?guī)模讀寫方面的性能是可以接受的挪略,而且價(jià)格是原來(lái)的三分之一历帚、容量是原來(lái)的三倍。
在不產(chǎn)生任何性能損失的情況下能夠訪問(wèn)幾乎無(wú)限的硬盤空間杠娱,這意味著我們可以提供一些其它消息系統(tǒng)不常見(jiàn)的特性挽牢。例如:在 Kafka 中,我們可以讓消息保留相對(duì)較長(zhǎng)的一段時(shí)間(比如一周)摊求,而不是試圖在被消費(fèi)后立即刪除禽拔。正如我們后面將要提到的,這給消費(fèi)者帶來(lái)了很大的靈活性室叉。
4.3 Efficiency
我們?cè)谛阅苌弦呀?jīng)做了很大的努力奏赘。 我們主要的使用場(chǎng)景是處理WEB活動(dòng)數(shù)據(jù),這個(gè)數(shù)據(jù)量非常大太惠,因?yàn)槊總€(gè)頁(yè)面都有可能大量的寫入磨淌。此外我們假設(shè)每個(gè)發(fā)布 message 至少被一個(gè)consumer (通常很多個(gè)consumer) 消費(fèi), 因此我們盡可能的去降低消費(fèi)的代價(jià)凿渊。
我們還發(fā)現(xiàn)梁只,從構(gòu)建和運(yùn)行許多相似系統(tǒng)的經(jīng)驗(yàn)上來(lái)看缚柳,性能是多租戶運(yùn)營(yíng)的關(guān)鍵。如果下游的基礎(chǔ)設(shè)施服務(wù)很輕易被應(yīng)用層沖擊形成瓶頸搪锣,那么一些小的改變也會(huì)造成問(wèn)題秋忙。通過(guò)非常快的(緩存)技術(shù)构舟,我們能確保應(yīng)用層沖擊基礎(chǔ)設(shè)施之前灰追,將負(fù)載穩(wěn)定下來(lái)。 當(dāng)嘗試去運(yùn)行支持集中式集群上成百上千個(gè)應(yīng)用程序的集中式服務(wù)時(shí)狗超,這一點(diǎn)很重要弹澎,因?yàn)閼?yīng)用層使用方式幾乎每天都會(huì)發(fā)生變化。
我們?cè)谏弦还?jié)討論了磁盤性能努咐。 一旦消除了磁盤訪問(wèn)模式不佳的情況苦蒿,該類系統(tǒng)性能低下的主要原因就剩下了兩個(gè):大量的小型 I/O 操作,以及過(guò)多的字節(jié)拷貝渗稍。
小型的 I/O 操作發(fā)生在客戶端和服務(wù)端之間以及服務(wù)端自身的持久化操作中佩迟。
為了避免這種情況,我們的協(xié)議是建立在一個(gè) “消息塊” 的抽象基礎(chǔ)上竿屹,合理將消息分組报强。 這使得網(wǎng)絡(luò)請(qǐng)求將多個(gè)消息打包成一組,而不是每次發(fā)送一條消息拱燃,從而使整組消息分擔(dān)網(wǎng)絡(luò)中往返的開(kāi)銷躺涝。Consumer 每次獲取多個(gè)大型有序的消息塊,并由服務(wù)端 依次將消息塊一次加載到它的日志中扼雏。
這個(gè)簡(jiǎn)單的優(yōu)化對(duì)速度有著數(shù)量級(jí)的提升坚嗜。批處理允許更大的網(wǎng)絡(luò)數(shù)據(jù)包,更大的順序讀寫磁盤操作诗充,連續(xù)的內(nèi)存塊等等苍蔬,所有這些都使 KafKa 將隨機(jī)流消息順序?qū)懭氲酱疟P, 再由 consumers 進(jìn)行消費(fèi)蝴蜓。
另一個(gè)低效率的操作是字節(jié)拷貝碟绑,在消息量少時(shí),這不是什么問(wèn)題茎匠。但是在高負(fù)載的情況下格仲,影響就不容忽視。為了避免這種情況诵冒,我們使用 producer 凯肋,broker 和 consumer 都共享的標(biāo)準(zhǔn)化的二進(jìn)制消息格式,這樣數(shù)據(jù)塊不用修改就能在他們之間傳遞汽馋。
broker 維護(hù)的消息日志本身就是一個(gè)文件目錄侮东,每個(gè)文件都由一系列以相同格式寫入到磁盤的消息集合組成圈盔,這種寫入格式被 producer 和 consumer 共用。保持這種通用格式可以對(duì)一些很重要的操作進(jìn)行優(yōu)化: 持久化日志塊的網(wǎng)絡(luò)傳輸悄雅。 現(xiàn)代的unix 操作系統(tǒng)提供了一個(gè)高度優(yōu)化的編碼方式驱敲,用于將數(shù)據(jù)從 pagecache 轉(zhuǎn)移到 socket 網(wǎng)絡(luò)連接中;在 Linux 中系統(tǒng)調(diào)用 sendfile 做到這一點(diǎn)宽闲。
為了理解 sendfile 的意義众眨,了解數(shù)據(jù)從文件到套接字的常見(jiàn)數(shù)據(jù)傳輸路徑就非常重要:
操作系統(tǒng)從磁盤讀取數(shù)據(jù)到內(nèi)核空間的 pagecache
應(yīng)用程序讀取內(nèi)核空間的數(shù)據(jù)到用戶空間的緩沖區(qū)
應(yīng)用程序?qū)?shù)據(jù)(用戶空間的緩沖區(qū))寫回內(nèi)核空間到套接字緩沖區(qū)(內(nèi)核空間)
操作系統(tǒng)將數(shù)據(jù)從套接字緩沖區(qū)(內(nèi)核空間)復(fù)制到通過(guò)網(wǎng)絡(luò)發(fā)送的 NIC 緩沖區(qū)
這顯然是低效的,有四次 copy 操作和兩次系統(tǒng)調(diào)用容诬。使用 sendfile 方法娩梨,可以允許操作系統(tǒng)將數(shù)據(jù)從 pagecache 直接發(fā)送到網(wǎng)絡(luò),這樣避免重新復(fù)制數(shù)據(jù)放案。所以這種優(yōu)化方式,只需要最后一步的copy操作矫俺,將數(shù)據(jù)復(fù)制到 NIC 緩沖區(qū)吱殉。
我們期望一個(gè)普遍的應(yīng)用場(chǎng)景,一個(gè) topic 被多消費(fèi)者消費(fèi)厘托。使用上面提交的 zero-copy(零拷貝)優(yōu)化友雳,數(shù)據(jù)在使用時(shí)只會(huì)被復(fù)制到 pagecache 中一次,節(jié)省了每次拷貝到用戶空間內(nèi)存中铅匹,再?gòu)挠脩艨臻g進(jìn)行讀取的消耗押赊。這使得消息能夠以接近網(wǎng)絡(luò)連接速度的 上限進(jìn)行消費(fèi)。
pagecache 和 sendfile 的組合使用意味著包斑,在一個(gè)kafka集群中流礁,大多數(shù) consumer 消費(fèi)時(shí),您將看不到磁盤上的讀取活動(dòng)罗丰,因?yàn)閿?shù)據(jù)將完全由緩存提供神帅。
JAVA 中更多有關(guān) sendfile 方法和 zero-copy (零拷貝)相關(guān)的資料,可以參考這里的 文章萌抵。
端到端的批量壓縮
在某些情況下找御,數(shù)據(jù)傳輸?shù)钠款i不是 CPU ,也不是磁盤绍填,而是網(wǎng)絡(luò)帶寬霎桅。對(duì)于需要通過(guò)廣域網(wǎng)在數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)管道尤其如此。當(dāng)然讨永,用戶可以在不需要 Kakfa 支持下一次一個(gè)的壓縮消息滔驶。但是這樣會(huì)造成非常差的壓縮比和消息重復(fù)類型的冗余,比如 JSON 中的字段名稱或者是或 Web 日志中的用戶代理或公共字符串值卿闹。高性能的壓縮是一次壓縮多個(gè)消息瓜浸,而不是壓縮單個(gè)消息澳淑。
Kafka 以高效的批處理格式支持一批消息可以壓縮在一起發(fā)送到服務(wù)器。這批消息將以壓縮格式寫入插佛,并且在日志中保持壓縮杠巡,只會(huì)在 consumer 消費(fèi)時(shí)解壓縮。
Kafka 支持 GZIP雇寇,Snappy 和 LZ4 壓縮協(xié)議氢拥,更多有關(guān)壓縮的資料參看 這里。
4.4 The Producer
Load balancing
生產(chǎn)者直接發(fā)送數(shù)據(jù)到主分區(qū)的服務(wù)器上锨侯,不需要經(jīng)過(guò)任何中間路由嫩海。 為了讓生產(chǎn)者實(shí)現(xiàn)這個(gè)功能,所有的 kafka 服務(wù)器節(jié)點(diǎn)都能響應(yīng)這樣的元數(shù)據(jù)請(qǐng)求: 哪些服務(wù)器是活著的囚痴,主題的哪些分區(qū)是主分區(qū)叁怪,分配在哪個(gè)服務(wù)器上,這樣生產(chǎn)者就能適當(dāng)?shù)刂苯影l(fā)送它的請(qǐng)求到服務(wù)器上深滚。
客戶端控制消息發(fā)送數(shù)據(jù)到哪個(gè)分區(qū)奕谭,這個(gè)可以實(shí)現(xiàn)隨機(jī)的負(fù)載均衡方式,或者使用一些特定語(yǔ)義的分區(qū)函數(shù)。 我們有提供特定分區(qū)的接口讓用于根據(jù)指定的鍵值進(jìn)行hash分區(qū)(當(dāng)然也有選項(xiàng)可以重寫分區(qū)函數(shù))痴荐,例如血柳,如果使用用戶ID作為key,則用戶相關(guān)的所有數(shù)據(jù)都會(huì)被分發(fā)到同一個(gè)分區(qū)上生兆。 這允許消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)做一些特定的本地化處理难捌。這樣的分區(qū)風(fēng)格經(jīng)常被設(shè)計(jì)用于一些本地處理比較敏感的消費(fèi)者。
Asynchronous send
批處理是提升性能的一個(gè)主要驅(qū)動(dòng)鸦难,為了允許批量處理根吁,kafka 生產(chǎn)者會(huì)嘗試在內(nèi)存中匯總數(shù)據(jù),并用一次請(qǐng)求批次提交信息合蔽。 批處理婴栽,不僅僅可以配置指定的消息數(shù)量,也可以指定等待特定的延遲時(shí)間(如64k 或10ms)辈末,這允許匯總更多的數(shù)據(jù)后再發(fā)送愚争,在服務(wù)器端也會(huì)減少更多的IO操作。 該緩沖是可配置的挤聘,并給出了一個(gè)機(jī)制轰枝,通過(guò)權(quán)衡少量額外的延遲時(shí)間獲取更好的吞吐量。
更多的細(xì)節(jié)可以在 producer 的 configuration 和 api文檔中進(jìn)行詳細(xì)的了解组去。
4.5 消費(fèi)者
Kafka consumer通過(guò)向 broker 發(fā)出一個(gè)“fetch”請(qǐng)求來(lái)獲取它想要消費(fèi)的 partition鞍陨。consumer 的每個(gè)請(qǐng)求都在 log 中指定了對(duì)應(yīng)的 offset,并接收從該位置開(kāi)始的一大塊數(shù)據(jù)。因此诚撵,consumer 對(duì)于該位置的控制就顯得極為重要缭裆,并且可以在需要的時(shí)候通過(guò)回退到該位置再次消費(fèi)對(duì)應(yīng)的數(shù)據(jù)。
Push vs. pull
最初我們考慮的問(wèn)題是:究竟是由 consumer 從 broker 那里 pull 數(shù)據(jù)寿烟,還是由 broker 將數(shù)據(jù) push 到 consumer澈驼。Kafka 在這方面采取了一種較為傳統(tǒng)的設(shè)計(jì)方式,也是大多數(shù)的消息系統(tǒng)所共享的方式:即 producer 把數(shù)據(jù) push 到 broker筛武,然后 consumer 從 broker 中 pull 數(shù)據(jù)缝其。 也有一些 logging-centric 的系統(tǒng),比如 Scribe 和 Apache Flume徘六,沿著一條完全不同的 push-based 的路徑内边,將數(shù)據(jù) push 到下游節(jié)點(diǎn)。這兩種方法都有優(yōu)缺點(diǎn)待锈。然而漠其,由于 broker 控制著數(shù)據(jù)傳輸速率, 所以 push-based 系統(tǒng)很難處理不同的 consumer竿音。讓 broker 控制數(shù)據(jù)傳輸速率主要是為了讓 consumer 能夠以可能的最大速率消費(fèi)和屎;不幸的是,這導(dǎo)致著在 push-based 的系統(tǒng)中谍失,當(dāng)消費(fèi)速率低于生產(chǎn)速率時(shí)眶俩,consumer 往往會(huì)不堪重負(fù)(本質(zhì)上類似于拒絕服務(wù)攻擊)莹汤。pull-based 系統(tǒng)有一個(gè)很好的特性快鱼, 那就是當(dāng) consumer 速率落后于 producer 時(shí),可以在適當(dāng)?shù)臅r(shí)間趕上來(lái)纲岭。還可以通過(guò)使用某種 backoff 協(xié)議來(lái)減少這種現(xiàn)象:即 consumer 可以通過(guò) backoff 表示它已經(jīng)不堪重負(fù)了抹竹,然而通過(guò)獲得負(fù)載情況來(lái)充分使用 consumer(但永遠(yuǎn)不超載)這一方式實(shí)現(xiàn)起來(lái)比它看起來(lái)更棘手。前面以這種方式構(gòu)建系統(tǒng)的嘗試止潮,引導(dǎo)著 Kafka 走向了更傳統(tǒng)的 pull 模型窃判。
另一個(gè) pull-based 系統(tǒng)的優(yōu)點(diǎn)在于:它可以大批量生產(chǎn)要發(fā)送給 consumer 的數(shù)據(jù)。而 push-based 系統(tǒng)必須選擇立即發(fā)送請(qǐng)求或者積累更多的數(shù)據(jù)喇闸,然后在不知道下游的 consumer 能否立即處理它的情況下發(fā)送這些數(shù)據(jù)袄琳。如果系統(tǒng)調(diào)整為低延遲狀態(tài),這就會(huì)導(dǎo)致一次只發(fā)送一條消息燃乍,以至于傳輸?shù)臄?shù)據(jù)不再被緩沖唆樊,這種方式是極度浪費(fèi)的。 而 pull-based 的設(shè)計(jì)修復(fù)了該問(wèn)題刻蟹,因?yàn)?consumer 總是將所有可用的(或者達(dá)到配置的最大長(zhǎng)度)消息 pull 到 log 當(dāng)前位置的后面逗旁,從而使得數(shù)據(jù)能夠得到最佳的處理而不會(huì)引入不必要的延遲。
簡(jiǎn)單的 pull-based 系統(tǒng)的不足之處在于:如果 broker 中沒(méi)有數(shù)據(jù)舆瘪,consumer 可能會(huì)在一個(gè)緊密的循環(huán)中結(jié)束輪詢片效,實(shí)際上 busy-waiting 直到數(shù)據(jù)到來(lái)红伦。為了避免 busy-waiting,我們?cè)?pull 請(qǐng)求中加入?yún)?shù)淀衣,使得 consumer 在一個(gè)“l(fā)ong pull”中阻塞等待昙读,直到數(shù)據(jù)到來(lái)(還可以選擇等待給定字節(jié)長(zhǎng)度的數(shù)據(jù)來(lái)確保傳輸長(zhǎng)度)。
你可以想象其它可能的只基于 pull 的舌缤, end-to-end 的設(shè)計(jì)箕戳。例如producer 直接將數(shù)據(jù)寫入一個(gè)本地的 log,然后 broker 從 producer 那里 pull 數(shù)據(jù)国撵,最后 consumer 從 broker 中 pull 數(shù)據(jù)陵吸。通常提到的還有“store-and-forward”式 producer, 這是一種很有趣的設(shè)計(jì)介牙,但我們覺(jué)得它跟我們?cè)O(shè)定的有數(shù)以千計(jì)的生產(chǎn)者的應(yīng)用場(chǎng)景不太相符壮虫。我們?cè)谶\(yùn)行大規(guī)模持久化數(shù)據(jù)系統(tǒng)方面的經(jīng)驗(yàn)使我們感覺(jué)到,橫跨多個(gè)應(yīng)用环础、涉及數(shù)千磁盤的系統(tǒng)事實(shí)上并不會(huì)讓事情更可靠囚似,反而會(huì)成為操作時(shí)的噩夢(mèng)。在實(shí)踐中线得, 我們發(fā)現(xiàn)可以通過(guò)大規(guī)模運(yùn)行的帶有強(qiáng)大的 SLAs 的 pipeline饶唤,而省略 producer 的持久化過(guò)程。
消費(fèi)者的位置
令人驚訝的是贯钩,持續(xù)追蹤已經(jīng)被消費(fèi)的內(nèi)容是消息系統(tǒng)的關(guān)鍵性能點(diǎn)之一募狂。
大多數(shù)消息系統(tǒng)都在 broker 上保存被消費(fèi)消息的元數(shù)據(jù)。也就是說(shuō)角雷,當(dāng)消息被傳遞給 consumer祸穷,broker 要么立即在本地記錄該事件,要么等待 consumer 的確認(rèn)后再記錄勺三。這是一種相當(dāng)直接的選擇雷滚,而且事實(shí)上對(duì)于單機(jī)服務(wù)器來(lái)說(shuō),也沒(méi)與其它地方能夠存儲(chǔ)這些狀態(tài)信息吗坚。 由于大多數(shù)消息系統(tǒng)用于存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)規(guī)模都很小祈远,所以這也是一個(gè)很實(shí)用的選擇——因?yàn)橹灰?broker 知道哪些消息被消費(fèi)了,就可以在本地立即進(jìn)行刪除商源,一直保持較小的數(shù)據(jù)量车份。
也許不太明顯,但要讓 broker 和 consumer 就被消費(fèi)的數(shù)據(jù)保持一致性也不是一個(gè)小問(wèn)題炊汹。如果 broker 在每條消息被發(fā)送到網(wǎng)絡(luò)的時(shí)候躬充,立即將其標(biāo)記為 consumed,那么一旦 consumer 無(wú)法處理該消息(可能由 consumer 崩潰或者請(qǐng)求超時(shí)或者其他原因?qū)е拢撓⒕蜁?huì)丟失充甚。 為了解決消息丟失的問(wèn)題以政,許多消息系統(tǒng)增加了確認(rèn)機(jī)制:即當(dāng)消息被發(fā)送出去的時(shí)候,消息僅被標(biāo)記為sent 而不是 consumed伴找;然后 broker 會(huì)等待一個(gè)來(lái)自 consumer 的特定確認(rèn)盈蛮,再將消息標(biāo)記為consumed。這個(gè)策略修復(fù)了消息丟失的問(wèn)題技矮,但也產(chǎn)生了新問(wèn)題抖誉。 首先,如果 consumer 處理了消息但在發(fā)送確認(rèn)之前出錯(cuò)了衰倦,那么該消息就會(huì)被消費(fèi)兩次袒炉。第二個(gè)是關(guān)于性能的,現(xiàn)在 broker 必須為每條消息保存多個(gè)狀態(tài)(首先對(duì)其加鎖樊零,確保該消息只被發(fā)送一次我磁,然后將其永久的標(biāo)記為 consumed,以便將其移除)驻襟。 還有更棘手的問(wèn)題要處理夺艰,比如如何處理已經(jīng)發(fā)送但一直得不到確認(rèn)的消息。
Kafka 使用完全不同的方式解決消息丟失問(wèn)題沉衣。Kafka的 topic 被分割成了一組完全有序的 partition郁副,其中每一個(gè) partition 在任意給定的時(shí)間內(nèi)只能被每個(gè)訂閱了這個(gè) topic 的 consumer 組中的一個(gè) consumer 消費(fèi)。這意味著 partition 中 每一個(gè) consumer 的位置僅僅是一個(gè)數(shù)字豌习,即下一條要消費(fèi)的消息的offset存谎。這使得被消費(fèi)的消息的狀態(tài)信息相當(dāng)少,每個(gè) partition 只需要一個(gè)數(shù)字斑鸦。這個(gè)狀態(tài)信息還可以作為周期性的 checkpoint愕贡。這以非常低的代價(jià)實(shí)現(xiàn)了和消息確認(rèn)機(jī)制等同的效果草雕。
這種方式還有一個(gè)附加的好處巷屿。consumer 可以回退到之前的 offset 來(lái)再次消費(fèi)之前的數(shù)據(jù),這個(gè)操作違反了隊(duì)列的基本原則墩虹,但事實(shí)證明對(duì)大多數(shù) consumer 來(lái)說(shuō)這是一個(gè)必不可少的特性嘱巾。 例如,如果 consumer 的代碼有 bug诫钓,并且在 bug 被發(fā)現(xiàn)前已經(jīng)有一部分?jǐn)?shù)據(jù)被消費(fèi)了旬昭, 那么 consumer 可以在 bug 修復(fù)后通過(guò)回退到之前的 offset 來(lái)再次消費(fèi)這些數(shù)據(jù)。
離線數(shù)據(jù)加載
可伸縮的持久化特性允許 consumer 只進(jìn)行周期性的消費(fèi)菌湃,例如批量數(shù)據(jù)加載问拘,周期性將數(shù)據(jù)加載到諸如 Hadoop 和關(guān)系型數(shù)據(jù)庫(kù)之類的離線系統(tǒng)中。
在 Hadoop 的應(yīng)用場(chǎng)景中,我們通過(guò)將數(shù)據(jù)加載分配到多個(gè)獨(dú)立的 map 任務(wù)來(lái)實(shí)現(xiàn)并行化骤坐,每一個(gè) map 任務(wù)負(fù)責(zé)一個(gè) node/topic/partition绪杏,從而達(dá)到充分并行化。Hadoop 提供了任務(wù)管理機(jī)制纽绍,失敗的任務(wù)可以重新啟動(dòng)而不會(huì)有重復(fù)數(shù)據(jù)的風(fēng)險(xiǎn)蕾久,只需要簡(jiǎn)單的從原來(lái)的位置重啟即可。
4.6 消息交付語(yǔ)義
現(xiàn)在我們對(duì)于 producer 和 consumer 的工作原理已將有了一點(diǎn)了解拌夏,讓我們接著討論 Kafka 在 producer 和 consumer 之間提供的語(yǔ)義保證僧著。顯然,Kafka可以提供的消息交付語(yǔ)義保證有多種:
At most once——消息可能會(huì)丟失但絕不重傳障簿。
At least once——消息可以重傳但絕不丟失盹愚。
Exactly once——這正是人們想要的, 每一條消息只被傳遞一次.
值得注意的是,這個(gè)問(wèn)題被分成了兩部分:發(fā)布消息的持久性保證和消費(fèi)消息的保證站故。
很多系統(tǒng)聲稱提供了“Exactly once”的消息交付語(yǔ)義, 然而閱讀它們的細(xì)則很重要, 因?yàn)檫@些聲稱大多數(shù)都是誤導(dǎo)性的 (即它們沒(méi)有考慮 consumer 或 producer 可能失敗的情況杯拐,以及存在多個(gè) consumer 進(jìn)行處理的情況,或者寫入磁盤的數(shù)據(jù)可能丟失的情況世蔗。).
Kafka 的語(yǔ)義是直截了當(dāng)?shù)摹?發(fā)布消息時(shí)端逼,我們會(huì)有一個(gè)消息的概念被“committed”到 log 中。 一旦消息被提交污淋,只要有一個(gè) broker 備份了該消息寫入的 partition顶滩,并且保持“alive”狀態(tài),該消息就不會(huì)丟失寸爆。 有關(guān) committed message 和 alive partition 的定義礁鲁,以及我們?cè)噲D解決的故障類型都將在下一節(jié)進(jìn)行細(xì)致描述。 現(xiàn)在讓我們假設(shè)存在完美無(wú)缺的 broker赁豆,然后來(lái)試著理解 Kafka 對(duì) producer 和 consumer 的語(yǔ)義保證仅醇。如果一個(gè) producer 在試圖發(fā)送消息的時(shí)候發(fā)生了網(wǎng)絡(luò)故障, 則不確定網(wǎng)絡(luò)錯(cuò)誤發(fā)生在消息提交之前還是之后魔种。這與使用自動(dòng)生成的鍵插入到數(shù)據(jù)庫(kù)表中的語(yǔ)義場(chǎng)景很相似析二。
在 0.11.0.0 之前的版本中, 如果 producer 沒(méi)有收到表明消息已經(jīng)被提交的響應(yīng), 那么 producer 除了將消息重傳之外別無(wú)選擇。 這里提供的是 at-least-once 的消息交付語(yǔ)義节预,因?yàn)槿绻畛醯恼?qǐng)求事實(shí)上執(zhí)行成功了叶摄,那么重傳過(guò)程中該消息就會(huì)被再次寫入到 log 當(dāng)中。 從 0.11.0.0 版本開(kāi)始安拟,Kafka producer新增了冪等性的傳遞選項(xiàng)蛤吓,該選項(xiàng)保證重傳不會(huì)在 log 中產(chǎn)生重復(fù)條目。 為實(shí)現(xiàn)這個(gè)目的, broker 給每個(gè) producer 都分配了一個(gè) ID 糠赦,并且 producer 給每條被發(fā)送的消息分配了一個(gè)序列號(hào)來(lái)避免產(chǎn)生重復(fù)的消息会傲。 同樣也是從 0.11.0.0 版本開(kāi)始, producer 新增了使用類似事務(wù)性的語(yǔ)義將消息發(fā)送到多個(gè) topic partition 的功能: 也就是說(shuō)锅棕,要么所有的消息都被成功的寫入到了 log,要么一個(gè)都沒(méi)寫進(jìn)去淌山。這種語(yǔ)義的主要應(yīng)用場(chǎng)景就是 Kafka topic 之間的 exactly-once 的數(shù)據(jù)傳遞(如下所述)哲戚。
并非所有使用場(chǎng)景都需要這么強(qiáng)的保證。對(duì)于延遲敏感的應(yīng)用場(chǎng)景艾岂,我們?cè)试S生產(chǎn)者指定它需要的持久性級(jí)別顺少。如果 producer 指定了它想要等待消息被提交,則可以使用10ms的量級(jí)王浴。然而脆炎, producer 也可以指定它想要完全異步地執(zhí)行發(fā)送,或者它只想等待直到 leader 節(jié)點(diǎn)擁有該消息(follower 節(jié)點(diǎn)有沒(méi)有無(wú)所謂)氓辣。
現(xiàn)在讓我們從 consumer 的視角來(lái)描述語(yǔ)義秒裕。 所有的副本都有相同的 log 和相同的 offset。consumer 負(fù)責(zé)控制它在 log 中的位置钞啸。如果 consumer 永遠(yuǎn)不崩潰几蜻,那么它可以將這個(gè)位置信息只存儲(chǔ)在內(nèi)存中。但如果 consumer 發(fā)生了故障体斩,我們希望這個(gè) topic partition 被另一個(gè)進(jìn)程接管梭稚, 那么新進(jìn)程需要選擇一個(gè)合適的位置開(kāi)始進(jìn)行處理。假設(shè) consumer 要讀取一些消息——它有幾個(gè)處理消息和更新位置的選項(xiàng)絮吵。
Consumer 可以先讀取消息弧烤,然后將它的位置保存到 log 中,最后再對(duì)消息進(jìn)行處理蹬敲。在這種情況下暇昂,消費(fèi)者進(jìn)程可能會(huì)在保存其位置之后,帶還沒(méi)有保存消息處理的輸出之前發(fā)生崩潰伴嗡。而在這種情況下急波,即使在此位置之前的一些消息沒(méi)有被處理,接管處理的進(jìn)程將從保存的位置開(kāi)始瘪校。在 consumer 發(fā)生故障的情況下铃将,這對(duì)應(yīng)于“at-most-once”的語(yǔ)義秧骑,可能會(huì)有消息得不到處理青责。
Consumer 可以先讀取消息捻撑,然后處理消息吉嫩,最后再保存它的位置价认。在這種情況下,消費(fèi)者進(jìn)程可能會(huì)在處理了消息之后自娩,但還沒(méi)有保存位置之前發(fā)生崩潰用踩。而在這種情況下渠退,當(dāng)新的進(jìn)程接管后,它最初收到的一部分消息都已經(jīng)被處理過(guò)了脐彩。在 consumer 發(fā)生故障的情況下碎乃,這對(duì)應(yīng)于“at-least-once”的語(yǔ)義。 在許多應(yīng)用場(chǎng)景中惠奸,消息都設(shè)有一個(gè)主鍵梅誓,所以更新操作是冪等的(相同的消息接收兩次時(shí),第二次寫入會(huì)覆蓋掉第一次寫入的記錄)佛南。
那么 exactly once 語(yǔ)義(即你真正想要的東西)呢梗掰?當(dāng)從一個(gè) kafka topic 中消費(fèi)并輸出到另一個(gè) topic 時(shí) (正如在一個(gè)Kafka Streams 應(yīng)用中所做的那樣),我們可以使用我們上文提到的 0.11.0.0 版本中的新事務(wù)型 producer嗅回,并將 consumer 的位置存儲(chǔ)為一個(gè) topic 中的消息及穗,所以我們可以在輸出 topic 接收已經(jīng)被處理的數(shù)據(jù)的時(shí)候,在同一個(gè)事務(wù)中向 Kafka 寫入 offset绵载。如果事務(wù)被中斷埂陆,則消費(fèi)者的位置將恢復(fù)到原來(lái)的值,而輸出 topic 上產(chǎn)生的數(shù)據(jù)對(duì)其他消費(fèi)者是否可見(jiàn)娃豹,取決于事務(wù)的“隔離級(jí)別”焚虱。 在默認(rèn)的“read_uncommitted”隔離級(jí)別中,所有消息對(duì) consumer 都是可見(jiàn)的懂版,即使它們是中止的事務(wù)的一部分著摔,但是在“read_committed”的隔離級(jí)別中,消費(fèi)者只能訪問(wèn)已提交的事務(wù)中的消息(以及任何不屬于事務(wù)的消息)定续。
在寫入外部系統(tǒng)的應(yīng)用場(chǎng)景中谍咆,限制在于需要在 consumer 的 offset 與實(shí)際存儲(chǔ)為輸出的內(nèi)容間進(jìn)行協(xié)調(diào)。解決這一問(wèn)題的經(jīng)典方法是在 consumer offset 的存儲(chǔ)和 consumer 的輸出結(jié)果的存儲(chǔ)之間引入 two-phase commit私股。但這可以用更簡(jiǎn)單的方法處理摹察,而且通常的做法是讓 consumer 將其 offset 存儲(chǔ)在與其輸出相同的位置。 這也是一種更好的方式倡鲸,因?yàn)榇蠖鄶?shù) consumer 想寫入的輸出系統(tǒng)都不支持 two-phase commit供嚎。舉個(gè)例子,Kafka Connect連接器峭状,它將所讀取的數(shù)據(jù)和數(shù)據(jù)的 offset 一起寫入到 HDFS克滴,以保證數(shù)據(jù)和 offset 都被更新,或者兩者都不被更新优床。 對(duì)于其它很多需要這些較強(qiáng)語(yǔ)義劝赔,并且沒(méi)有主鍵來(lái)避免消息重復(fù)的數(shù)據(jù)系統(tǒng),我們也遵循類似的模式胆敞。
因此着帽,事實(shí)上 Kafka 在Kafka Streams中支持了exactly-once 的消息交付功能杂伟,并且在 topic 之間進(jìn)行數(shù)據(jù)傳遞和處理時(shí),通常使用事務(wù)型 producer/consumer 提供 exactly-once 的消息交付功能仍翰。 到其它目標(biāo)系統(tǒng)的 exactly-once 的消息交付通常需要與該類系統(tǒng)協(xié)作赫粥,但 Kafka 提供了 offset,使得這種應(yīng)用場(chǎng)景的實(shí)現(xiàn)變得可行予借。(詳見(jiàn) Kafka Connect)越平。否則,Kafka 默認(rèn)保證 at-least-once 的消息交付灵迫, 并且 Kafka 允許用戶通過(guò)禁用 producer 的重傳功能和讓 consumer 在處理一批消息之前提交 offset喧笔,來(lái)實(shí)現(xiàn) at-most-once 的消息交付。
4.7 Replication
Kafka 允許 topic 的 partition 擁有若干副本龟再,你可以在server端配置partition 的副本數(shù)量书闸。當(dāng)集群中的節(jié)點(diǎn)出現(xiàn)故障時(shí),能自動(dòng)進(jìn)行故障轉(zhuǎn)移利凑,保證數(shù)據(jù)的可用性浆劲。
其他的消息系統(tǒng)也提供了副本相關(guān)的特性,但是在我們(帶有偏見(jiàn))看來(lái)哀澈,他們的副本功能不常用牌借,而且有很大缺點(diǎn):slaves 處于非活動(dòng)狀態(tài),導(dǎo)致吞吐量受到嚴(yán)重影響割按,并且還要手動(dòng)配置副本機(jī)制膨报。Kafka 默認(rèn)使用備份機(jī)制,事實(shí)上适荣,我們將沒(méi)有設(shè)置副本數(shù) 的 topic 實(shí)現(xiàn)為副本數(shù)為1的 topic 现柠。
創(chuàng)建副本的單位是 topic 的 partition ,正常情況下弛矛, 每個(gè)分區(qū)都有一個(gè) leader 和零或多個(gè) followers 够吩。 總的副本數(shù)是包含 leader 的總和。 所有的讀寫操作都由 leader 處理丈氓,一般 partition 的數(shù)量都比 broker 的數(shù)量多的多周循,各分區(qū)的 leader 均 勻的分布在brokers 中。所有的 followers 節(jié)點(diǎn)都同步 leader 節(jié)點(diǎn)的日志万俗,日志中的消息和偏移量都和 leader 中的一致湾笛。(當(dāng)然, 在任何給定時(shí)間, leader 節(jié)點(diǎn)的日志末尾時(shí)可能有幾個(gè)消息尚未被備份完成)。
Followers 節(jié)點(diǎn)就像普通的 consumer 那樣從 leader 節(jié)點(diǎn)那里拉取消息并保存在自己的日志文件中闰歪。Followers 節(jié)點(diǎn)可以從 leader 節(jié)點(diǎn)那里批量拉取消息日志到自己的日志文件中嚎研。
與大多數(shù)分布式系統(tǒng)一樣,自動(dòng)處理故障需要精確定義節(jié)點(diǎn) “alive” 的概念课竣。Kafka 判斷節(jié)點(diǎn)是否存活有兩種方式嘉赎。
節(jié)點(diǎn)必須可以維護(hù)和 ZooKeeper 的連接置媳,Zookeeper 通過(guò)心跳機(jī)制檢查每個(gè)節(jié)點(diǎn)的連接于樟。
如果節(jié)點(diǎn)是個(gè) follower 公条,它必須能及時(shí)的同步 leader 的寫操作,并且延時(shí)不能太久迂曲。
我們認(rèn)為滿足這兩個(gè)條件的節(jié)點(diǎn)處于 “in sync” 狀態(tài)靶橱,區(qū)別于 “alive” 和 “failed” 。 Leader會(huì)追蹤所有 “in sync” 的節(jié)點(diǎn)路捧。如果有節(jié)點(diǎn)掛掉了, 或是寫超時(shí), 或是心跳超時(shí), leader 就會(huì)把它從同步副本列表中移除关霸。 同步超時(shí)和寫超時(shí)的時(shí)間由 replica.lag.time.max.ms 配置確定。
分布式系統(tǒng)中杰扫,我們只嘗試處理 “fail/recover” 模式的故障队寇,即節(jié)點(diǎn)突然停止工作,然后又恢復(fù)(節(jié)點(diǎn)可能不知道自己曾經(jīng)掛掉)的狀況章姓。Kafka 沒(méi)有處理所謂的 “Byzantine” 故障佳遣,即一個(gè)節(jié)點(diǎn)出現(xiàn)了隨意響應(yīng)和惡意響應(yīng)(可能由于 bug 或 非法操作導(dǎo)致)。
現(xiàn)在, 我們可以更精確地定義, 只有當(dāng)消息被所有的副本節(jié)點(diǎn)加入到日志中時(shí), 才算是提交, 只有提交的消息才會(huì)被 consumer 消費(fèi), 這樣就不用擔(dān)心一旦 leader 掛掉了消息會(huì)丟失凡伊。另一方面零渐, producer 也 可以選擇是否等待消息被提交,這取決他們的設(shè)置在延遲時(shí)間和持久性之間的權(quán)衡系忙,這個(gè)選項(xiàng)是由 producer 使用的 acks 設(shè)置控制诵盼。 請(qǐng)注意,Topic 可以設(shè)置同步備份的最小數(shù)量银还, producer 請(qǐng)求確認(rèn)消息是否被寫入到所有的備份時(shí), 可以用最小同步數(shù)量判斷风宁。如果 producer 對(duì)同步的備份數(shù)沒(méi)有嚴(yán)格的要求,即使同步的備份數(shù)量低于 最小同步數(shù)量(例如蛹疯,僅僅只有 leader 同步了數(shù)據(jù))杀糯,消息也會(huì)被提交,然后被消費(fèi)苍苞。
在所有時(shí)間里固翰,Kafka 保證只要有至少一個(gè)同步中的節(jié)點(diǎn)存活,提交的消息就不會(huì)丟失羹呵。
節(jié)點(diǎn)掛掉后骂际,經(jīng)過(guò)短暫的故障轉(zhuǎn)移后,Kafka將仍然保持可用性冈欢,但在網(wǎng)絡(luò)分區(qū)( network partitions )的情況下可能不能保持可用性歉铝。
備份日志:Quorums, ISRs, 和狀態(tài)機(jī)
Kafka的核心是備份日志文件。備份日志文件是分布式數(shù)據(jù)系統(tǒng)最基礎(chǔ)的要素之一凑耻,實(shí)現(xiàn)方法也有很多種太示。其他系統(tǒng)也可以用 kafka 的備份日志模塊來(lái)實(shí)現(xiàn)狀態(tài)機(jī)風(fēng)格的分布式系統(tǒng)
備份日志按照一系列有序的值(通常是編號(hào)為0柠贤、1、2类缤、…)進(jìn)行建模臼勉。有很多方法可以實(shí)現(xiàn)這一點(diǎn),但最簡(jiǎn)單和最快的方法是由 leader 節(jié)點(diǎn)選擇需要提供的有序的值餐弱,只要 leader 節(jié)點(diǎn)還存活宴霸,所有的 follower 只需要拷貝數(shù)據(jù)并按照 leader 節(jié)點(diǎn)的順序排序。
當(dāng)然膏蚓,如果 leader 永遠(yuǎn)都不會(huì)掛掉瓢谢,那我們就不需要 follower 了。 但是如果 leader crash驮瞧,我們就需要從 follower 中選舉出一個(gè)新的 leader氓扛。 但是 followers 自身也有可能落后或者 crash,所以 我們必須確保我們leader的候選者們 是一個(gè)數(shù)據(jù)同步 最新的 follower 節(jié)點(diǎn)论笔。
如果選擇寫入時(shí)候需要保證一定數(shù)量的副本寫入成功采郎,讀取時(shí)需要保證讀取一定數(shù)量的副本,讀取和寫入之間有重疊翅楼。這樣的讀寫機(jī)制稱為 Quorum尉剩。
這種權(quán)衡的一種常見(jiàn)方法是對(duì)提交決策和 leader 選舉使用多數(shù)投票機(jī)制。Kafka 沒(méi)有采取這種方式毅臊,但是我們還是要研究一下這種投票機(jī)制理茎,來(lái)理解其中蘊(yùn)含的權(quán)衡。假設(shè)我們有2f + 1個(gè)副本管嬉,如果在 leader 宣布消息提交之前必須有f+1個(gè)副本收到 該消息皂林,并且如果我們從這至少f+1個(gè)副本之中,有著最完整的日志記錄的 follower 里來(lái)選擇一個(gè)新的 leader蚯撩,那么在故障次數(shù)少于f的情況下础倍,選舉出的 leader 保證具有所有提交的消息。這是因?yàn)樵谌我鈌+1個(gè)副本中胎挎,至少有一個(gè)副本一定包含 了所有提交的消息沟启。該副本的日志將是最完整的,因此將被選為新的 leader犹菇。這個(gè)算法都必須處理許多其他細(xì)節(jié)(例如精確定義怎樣使日志更加完整德迹,確保在 leader down 掉期間, 保證日志一致性或者副本服務(wù)器的副本集的改變),但是現(xiàn)在我們將忽略這些細(xì)節(jié)揭芍。
這種大多數(shù)投票方法有一個(gè)非常好的優(yōu)點(diǎn):延遲是取決于最快的服務(wù)器胳搞。也就是說(shuō),如果副本數(shù)是3,則備份完成的等待時(shí)間取決于最快的 Follwer 肌毅。
這里有很多分布式算法筷转,包含 ZooKeeper 的 Zab, Raft, 和 Viewstamped Replication. 我們所知道的與 Kafka 實(shí)際執(zhí)行情況最相似的學(xué)術(shù)刊物是來(lái)自微軟的 PacificA
大多數(shù)投票的缺點(diǎn)是,多數(shù)的節(jié)點(diǎn)掛掉讓你不能選擇 leader悬而。要冗余單點(diǎn)故障需要三份數(shù)據(jù)呜舒,并且要冗余兩個(gè)故障需要五份的數(shù)據(jù)。根據(jù)我們的經(jīng)驗(yàn)摊滔,在一個(gè)系統(tǒng)中阴绢,僅僅靠冗余來(lái)避免單點(diǎn)故障是不夠的店乐,但是每寫5次艰躺,對(duì)磁盤空間需求是5倍, 吞吐量下降到 1/5眨八,這對(duì)于處理海量數(shù)據(jù)問(wèn)題是不切實(shí)際的腺兴。這可能是為什么 quorum 算法更常用于共享集群配置(如 ZooKeeper ), 而不適用于原始數(shù)據(jù)存儲(chǔ)的原因廉侧,例如 HDFS 中 namenode 的高可用是建立在 基于投票的元數(shù)據(jù) 页响,這種代價(jià)高昂的存儲(chǔ)方式不適用數(shù)據(jù)本身。
Kafka 采取了一種稍微不同的方法來(lái)選擇它的投票集段誊。 Kafka 不是用大多數(shù)投票選擇 leader 闰蚕。Kafka 動(dòng)態(tài)維護(hù)了一個(gè)同步狀態(tài)的備份的集合 (a set of in-sync replicas), 簡(jiǎn)稱 ISR 连舍,在這個(gè)集合中的節(jié)點(diǎn)都是和 leader 保持高度一致的没陡,只有這個(gè)集合的成員才 有資格被選舉為 leader,一條消息必須被這個(gè)集合 所有 節(jié)點(diǎn)讀取并追加到日志中了索赏,這條消息才能視為提交盼玄。這個(gè) ISR 集合發(fā)生變化會(huì)在 ZooKeeper 持久化,正因?yàn)槿绱饲蹦澹@個(gè)集合中的任何一個(gè)節(jié)點(diǎn)都有資格被選為 leader 埃儿。這對(duì)于 Kafka 使用模型中, 有很多分區(qū)和并確保主從關(guān)系是很重要的融涣。因?yàn)?ISR 模型和 f+1 副本童番,一個(gè) Kafka topic 冗余 f 個(gè)節(jié)點(diǎn)故障而不會(huì)丟失任何已經(jīng)提交的消息。
我們認(rèn)為對(duì)于希望處理的大多數(shù)場(chǎng)景這種策略是合理的威鹿。在實(shí)際中剃斧,為了冗余 f 節(jié)點(diǎn)故障,大多數(shù)投票和 ISR 都會(huì)在提交消息前確認(rèn)相同數(shù)量的備份被收到(例如在一次故障生存之后专普,大多數(shù)的 quorum 需要三個(gè)備份節(jié)點(diǎn)和一次確認(rèn)悯衬,ISR 只需要兩個(gè)備份節(jié)點(diǎn)和一次確認(rèn)),多數(shù)投票方法的一個(gè)優(yōu)點(diǎn)是提交時(shí)能避免最慢的服務(wù)器。但是筋粗,我們認(rèn)為通過(guò)允許客戶端選擇是否阻塞消息提交來(lái)改善策橘,和所需的備份數(shù)較低而產(chǎn)生的額外的吞吐量和磁盤空間是值得的。
另一個(gè)重要的設(shè)計(jì)區(qū)別是娜亿,Kafka 不要求崩潰的節(jié)點(diǎn)恢復(fù)所有的數(shù)據(jù)丽已,在這種空間中的復(fù)制算法經(jīng)常依賴于存在 “穩(wěn)定存儲(chǔ)”,在沒(méi)有違反潛在的一致性的情況下买决,出現(xiàn)任何故障再恢復(fù)情況下都不會(huì)丟失沛婴。 這個(gè)假設(shè)有兩個(gè)主要的問(wèn)題。首先督赤,我們?cè)诔志眯詳?shù)據(jù)系統(tǒng)的實(shí)際操作中觀察到的最常見(jiàn)的問(wèn)題是磁盤錯(cuò)誤嘁灯,并且它們通常不能保證數(shù)據(jù)的完整性。其次躲舌,即使磁盤錯(cuò)誤不是問(wèn)題丑婿,我們也不希望在每次寫入時(shí)都要求使用 fsync 來(lái)保證一致性, 因?yàn)檫@會(huì)使性能降低兩到三個(gè)數(shù)量級(jí)没卸。我們的協(xié)議能確保備份節(jié)點(diǎn)重新加入ISR 之前羹奉,即使它掛時(shí)沒(méi)有新的數(shù)據(jù), 它也必須完整再一次同步數(shù)據(jù)。
Unclean leader 選舉: 如果節(jié)點(diǎn)全掛了约计?
請(qǐng)注意诀拭,Kafka 對(duì)于數(shù)據(jù)不會(huì)丟失的保證,是基于至少一個(gè)節(jié)點(diǎn)在保持同步狀態(tài)煤蚌,一旦分區(qū)上的所有備份節(jié)點(diǎn)都掛了耕挨,就無(wú)法保證了。
但是铺然,實(shí)際在運(yùn)行的系統(tǒng)需要去考慮假設(shè)一旦所有的備份都掛了俗孝,怎么去保證數(shù)據(jù)不會(huì)丟失,這里有兩種實(shí)現(xiàn)的方法
等待一個(gè) ISR 的副本重新恢復(fù)正常服務(wù)魄健,并選擇這個(gè)副本作為領(lǐng) leader (它有極大可能擁有全部數(shù)據(jù))赋铝。
選擇第一個(gè)重新恢復(fù)正常服務(wù)的副本(不一定是 ISR 中的)作為leader。
這是可用性和一致性之間的簡(jiǎn)單妥協(xié)沽瘦,如果我只等待 ISR 的備份節(jié)點(diǎn)革骨,那么只要 ISR 備份節(jié)點(diǎn)都掛了,我們的服務(wù)將一直會(huì)不可用析恋,如果它們的數(shù)據(jù)損壞了或者丟失了良哲,那就會(huì)是長(zhǎng)久的宕機(jī)。另一方面助隧,如果不是 ISR 中的節(jié)點(diǎn)恢復(fù)服務(wù)并且我們?cè)试S它成為 leader 筑凫, 那么它的數(shù)據(jù)就是可信的來(lái)源,即使它不能保證記錄了每一個(gè)已經(jīng)提交的消息。 kafka 默認(rèn)選擇第二種策略巍实,當(dāng)所有的 ISR 副本都掛掉時(shí)滓技,會(huì)選擇一個(gè)可能不同步的備份作為 leader ,可以配置屬性 unclean.leader.election.enable 禁用此策略棚潦,那么就會(huì)使用第 一種策略即停機(jī)時(shí)間優(yōu)于不同步令漂。
這種困境不只有 Kafka 遇到,它存在于任何 quorum-based 規(guī)則中丸边。例如叠必,在大多數(shù)投票算法當(dāng)中,如果大多數(shù)服務(wù)器永久性的掛了妹窖,那么您要么選擇丟失100%的數(shù)據(jù)纬朝,要么違背數(shù)據(jù)的一致性選擇一個(gè)存活的服務(wù)器作為數(shù)據(jù)可信的來(lái)源。
可用性和持久性保證
向 Kafka 寫數(shù)據(jù)時(shí)嘱吗,producers 設(shè)置 ack 是否提交完成玄组, 0:不等待broker返回確認(rèn)消息,1: leader保存成功返回或, -1(all): 所有備份都保存成功返回.請(qǐng)注意. 設(shè)置 “ack = all” 并不能保證所有的副本都寫入了消息滔驾。默認(rèn)情況下谒麦,當(dāng) acks = all 時(shí),只要 ISR 副本同步完成哆致,就會(huì)返回消息已經(jīng)寫入绕德。例如,一個(gè) topic 僅僅設(shè)置了兩個(gè)副本摊阀,那么只有一個(gè) ISR 副本耻蛇,那么當(dāng)設(shè)置acks = all時(shí)返回寫入成功時(shí),剩下了的那個(gè)副本數(shù)據(jù)也可能數(shù)據(jù)沒(méi)有寫入胞此。 盡管這確保了分區(qū)的最大可用性臣咖,但是對(duì)于偏好數(shù)據(jù)持久性而不是可用性的一些用戶,可能不想用這種策略漱牵,因此夺蛇,我們提供了兩個(gè)topic 配置,可用于優(yōu)先配置消息數(shù)據(jù)持久性:
禁用 unclean leader 選舉機(jī)制 - 如果所有的備份節(jié)點(diǎn)都掛了,分區(qū)數(shù)據(jù)就會(huì)不可用酣胀,直到最近的 leader 恢復(fù)正常刁赦。這種策略優(yōu)先于數(shù)據(jù)丟失的風(fēng)險(xiǎn), 參看上一節(jié)的 unclean leader 選舉機(jī)制闻镶。
指定最小的 ISR 集合大小甚脉,只有當(dāng) ISR 的大小大于最小值,分區(qū)才能接受寫入操作铆农,以防止僅寫入單個(gè)備份的消息丟失造成消息不可用的情況牺氨,這個(gè)設(shè)置只有在生產(chǎn)者使用 acks = all 的情況下才會(huì)生效,這至少保證消息被 ISR 副本寫入。此設(shè)置是一致性和可用性 之間的折衷猴凹,對(duì)于設(shè)置更大的最小ISR大小保證了更好的一致性酝豪,因?yàn)樗WC將消息被寫入了更多的備份,減少了消息丟失的可能性精堕。但是孵淘,這會(huì)降低可用性,因?yàn)槿绻?ISR 副本的數(shù)量低于最小閾值歹篓,那么分區(qū)將無(wú)法寫入瘫证。
備份管理
以上關(guān)于備份日志的討論只涉及單個(gè)日志文件,即一個(gè) topic 分區(qū)庄撮,事實(shí)上背捌,一個(gè)Kafka集群管理著成百上千個(gè)這樣的 partitions。我們嘗試以輪詢調(diào)度的方式將集群內(nèi)的 partition 負(fù)載均衡洞斯,避免大量topic擁有的分區(qū)集中在 少數(shù)幾個(gè)節(jié)點(diǎn)上毡庆。同樣,我們也試圖平衡leadership,以至于每個(gè)節(jié)點(diǎn)都是部分 partition 的leader節(jié)點(diǎn)烙如。
優(yōu)化主從關(guān)系的選舉過(guò)程也是重要的么抗,這是數(shù)據(jù)不可用的關(guān)鍵窗口。原始的實(shí)現(xiàn)是當(dāng)有節(jié)點(diǎn)掛了后亚铁,進(jìn)行主從關(guān)系選舉時(shí)蝇刀,會(huì)對(duì)掛掉節(jié)點(diǎn)的所有partition 的領(lǐng)導(dǎo)權(quán)重新選舉。相反徘溢,我們會(huì)選擇一個(gè) broker 作為 “controller”節(jié)點(diǎn)吞琐。controller 節(jié)點(diǎn)負(fù)責(zé) 檢測(cè) brokers 級(jí)別故障,并負(fù)責(zé)在 broker 故障的情況下更改這個(gè)故障 Broker 中的 partition 的 leadership 。這種方式可以批量的通知主從關(guān)系的變化然爆,使得對(duì)于擁有大量partition 的broker ,選舉過(guò)程的代價(jià)更低并且速度更快站粟。如果 controller 節(jié)點(diǎn)掛了,其他 存活的 broker 都可能成為新的 controller 節(jié)點(diǎn)曾雕。
4.8 日志壓縮
日志壓縮可確保 Kafka 始終至少為單個(gè) topic partition 的數(shù)據(jù)日志中的每個(gè) message key 保留最新的已知值奴烙。 這樣的設(shè)計(jì)解決了應(yīng)用程序崩潰、系統(tǒng)故障后恢復(fù)或者應(yīng)用在運(yùn)行維護(hù)過(guò)程中重啟后重新加載緩存的場(chǎng)景翻默。 接下來(lái)讓我們深入討論這些在使用過(guò)程中的更多細(xì)節(jié)缸沃,闡述在這個(gè)過(guò)程中它是如何進(jìn)行日志壓縮的。
迄今為止修械,我們只介紹了簡(jiǎn)單的日志保留方法(當(dāng)舊的數(shù)據(jù)保留時(shí)間超過(guò)指定時(shí)間趾牧、日志大達(dá)到規(guī)定大小后就丟棄)。 這樣的策略非常適用于處理那些暫存的數(shù)據(jù)肯污,例如記錄每條消息之間相互獨(dú)立的日志翘单。 然而在實(shí)際使用過(guò)程中還有一種非常重要的場(chǎng)景——根據(jù)key進(jìn)行數(shù)據(jù)變更(例如更改數(shù)據(jù)庫(kù)表內(nèi)容)吨枉,使用以上的方式顯然不行。
讓我們來(lái)討論一個(gè)關(guān)于處理這樣的流式數(shù)據(jù)的具體的例子哄芜。 假設(shè)我們有一個(gè)topic貌亭,里面的內(nèi)容包含用戶的email地址;每次用戶更新他們的email地址時(shí)认臊,我們發(fā)送一條消息到這個(gè)topic圃庭,這里使用用戶Id作為消息的key值。 現(xiàn)在失晴,我們?cè)谝欢螘r(shí)間內(nèi)為id為123的用戶發(fā)送一些消息剧腻,每個(gè)消息對(duì)應(yīng)email地址的改變(其他ID消息省略):
1
2
3
4
5
6
7
8
9
123 => bill@microsoft.com
? ? ? ? .
? ? ? ? .
? ? ? ? .
123 => bill@gatesfoundation.org
? ? ? ? .
? ? ? ? .
? ? ? ? .
123 => bill@gmail.com
日志壓縮為我提供了更精細(xì)的保留機(jī)制,所以我們至少保留每個(gè)key的最后一次更新 (例如:bill@gmail.com)涂屁。 這樣我們保證日志包含每一個(gè)key的最終值而不只是最近變更的完整快照书在。這意味著下游的消費(fèi)者可以獲得最終的狀態(tài)而無(wú)需拿到所有的變化的消息信息。
讓我們先看幾個(gè)有用的使用場(chǎng)景拆又,然后再看看如何使用它儒旬。
數(shù)據(jù)庫(kù)更改訂閱。 通常需要在多個(gè)數(shù)據(jù)系統(tǒng)設(shè)置擁有一個(gè)數(shù)據(jù)集帖族,這些系統(tǒng)中通常有一個(gè)是某種類型的數(shù)據(jù)庫(kù)(無(wú)論是RDBMS或者新流行的key-value數(shù)據(jù)庫(kù))栈源。 例如,你可能有一個(gè)數(shù)據(jù)庫(kù)盟萨,緩存凉翻,搜索引擎集群或者Hadoop集群。每次變更數(shù)據(jù)庫(kù)捻激,也同時(shí)需要變更緩存、搜索引擎以及hadoop集群前计。 在只需處理最新日志的實(shí)時(shí)更新的情況下胞谭,你只需要最近的日志。但是男杈,如果你希望能夠重新加載緩存或恢復(fù)搜索失敗的節(jié)點(diǎn)丈屹,你可能需要一個(gè)完整的數(shù)據(jù)集。
事件源伶棒。 這是一種應(yīng)用程序設(shè)計(jì)風(fēng)格旺垒,它將查詢處理與應(yīng)用程序設(shè)計(jì)相結(jié)合,并使用變更的日志作為應(yīng)用程序的主要存儲(chǔ)肤无。
日志高可用先蒋。 執(zhí)行本地計(jì)算的進(jìn)程可以通過(guò)注銷對(duì)其本地狀態(tài)所做的更改來(lái)實(shí)現(xiàn)容錯(cuò),以便另一個(gè)進(jìn)程可以重新加載這些更改并在出現(xiàn)故障時(shí)繼續(xù)進(jìn)行宛渐。 一個(gè)具體的例子就是在流查詢系統(tǒng)中進(jìn)行計(jì)數(shù)竞漾,聚合和其他類似“group by”的操作眯搭。實(shí)時(shí)流處理框架Samza, 使用這個(gè)特性正是出于這一原因业岁。
在這些場(chǎng)景中鳞仙,主要需要處理變化的實(shí)時(shí)feed,但是偶爾當(dāng)機(jī)器崩潰或需要重新加載或重新處理數(shù)據(jù)時(shí)笔时,需要處理所有數(shù)據(jù)棍好。 日志壓縮允許在同一topic下同時(shí)使用這兩個(gè)用例。這種日志使用方式更詳細(xì)的描述請(qǐng)看這篇博客允耿。
想法很簡(jiǎn)單梳玫,我們有無(wú)限的日志,以上每種情況記錄變更日志右犹,我們從一開(kāi)始就捕獲每一次變更提澎。 使用這個(gè)完整的日志,我們可以通過(guò)回放日志來(lái)恢復(fù)到任何一個(gè)時(shí)間點(diǎn)的狀態(tài)念链。 然而這種假設(shè)的情況下盼忌,完整的日志是不實(shí)際的,對(duì)于那些每一行記錄會(huì)變更多次的系統(tǒng)掂墓,即使數(shù)據(jù)集很小谦纱,日志也會(huì)無(wú)限的增長(zhǎng)下去。 丟棄舊日志的簡(jiǎn)單操作可以限制空間的增長(zhǎng)君编,但是無(wú)法重建狀態(tài)——因?yàn)榕f的日志被丟棄跨嘉,可能一部分記錄的狀態(tài)會(huì)無(wú)法重建(這些記錄所有的狀態(tài)變更都在舊日志中)。
日志壓縮機(jī)制是更細(xì)粒度的吃嘿、每個(gè)記錄都保留的機(jī)制祠乃,而不是基于時(shí)間的粗粒度。 這個(gè)理念是選擇性的刪除那些有更新的變更的記錄的日志兑燥。 這樣最終日志至少包含每個(gè)key的記錄的最后一個(gè)狀態(tài)亮瓷。
這個(gè)策略可以為每個(gè)Topic設(shè)置,這樣一個(gè)集群中降瞳,可以一部分Topic通過(guò)時(shí)間和大小保留日志嘱支,另外一些可以通過(guò)壓縮壓縮策略保留。
這個(gè)功能的靈感來(lái)自于LinkedIn的最古老且最成功的基礎(chǔ)設(shè)置——一個(gè)稱為Databus的數(shù)據(jù)庫(kù)變更日志緩存系統(tǒng)挣饥。 不像大多數(shù)的日志存儲(chǔ)系統(tǒng)除师,Kafka是專門為訂閱和快速線性的讀和寫的組織數(shù)據(jù)。 和Databus不同扔枫,Kafka作為真實(shí)的存儲(chǔ)汛聚,壓縮日志是非常有用的,這非常有利于上游數(shù)據(jù)源不能重放的情況茧吊。
日志壓縮基礎(chǔ)
這是一個(gè)高級(jí)別的日志邏輯圖贞岭,展示了kafka日志的每條消息的offset邏輯結(jié)構(gòu)八毯。
Log head中包含傳統(tǒng)的Kafka日志,它包含了連續(xù)的offset和所有的消息瞄桨。 日志壓縮增加了處理tail Log的選項(xiàng)话速。 上圖展示了日志壓縮的的Log tail的情況。tail中的消息保存了初次寫入時(shí)的offset芯侥。 即使該offset的消息被壓縮泊交,所有offset仍然在日志中是有效的。在這個(gè)場(chǎng)景中柱查,無(wú)法區(qū)分和下一個(gè)出現(xiàn)的更高offset的位置廓俭。 如上面的例子中,36唉工、37研乒、38是屬于相同位置的,從他們開(kāi)始讀取日志都將從38開(kāi)始淋硝。
壓縮也允許刪除雹熬。通過(guò)消息的key和空負(fù)載(null payload)來(lái)標(biāo)識(shí)該消息可從日志中刪除。 這個(gè)刪除標(biāo)記將會(huì)引起所有之前擁有相同key的消息被移除(包括擁有key相同的新消息)谣膳。 但是刪除標(biāo)記比較特殊竿报,它將在一定周期后被從日志中刪除來(lái)釋放空間。這個(gè)時(shí)間點(diǎn)被稱為“delete retention point”继谚,如上圖烈菌。
壓縮操作通過(guò)在后臺(tái)周期性的拷貝日志段來(lái)完成。 清除操作不會(huì)阻塞讀取花履,并且可以被配置不超過(guò)一定IO吞吐來(lái)避免影響Producer和Consumer芽世。實(shí)際的日志段壓縮過(guò)程有點(diǎn)像這樣:
What guarantees does log compaction provide?
日志壓縮的保障措施如下:
任何滯留在日志head中的所有消費(fèi)者能看到寫入的所有消息;這些消息都是有序的offset臭挽。 topic使用min.compaction.lag.ms來(lái)保障消息寫入之前必須經(jīng)過(guò)的最小時(shí)間長(zhǎng)度捂襟,才能被壓縮。 這限制了一條消息在Log Head中的最短存在時(shí)間欢峰。
始終保持消息的有序性。壓縮永遠(yuǎn)不會(huì)重新排序消息涨共,只是刪除了一些纽帖。
消息的Offset不會(huì)變更。這是消息在日志中的永久標(biāo)志举反。
任何從頭開(kāi)始處理日志的Consumer至少會(huì)拿到每個(gè)key的最終狀態(tài)懊直。 另外,只要Consumer在小于Topic的delete.retention.ms設(shè)置(默認(rèn)24小時(shí))的時(shí)間段內(nèi)到達(dá)Log head火鼻,將會(huì)看到所有刪除記錄的所有刪除標(biāo)記室囊。 換句話說(shuō)雕崩,因?yàn)橐瞥齽h除標(biāo)記和讀取是同時(shí)發(fā)生的,Consumer可能會(huì)因?yàn)槁浜蟪^(guò)delete.retention.ms而導(dǎo)致錯(cuò)過(guò)刪除標(biāo)記融撞。
日志壓縮的細(xì)節(jié)
日志壓縮由Log Cleaner執(zhí)行盼铁,后臺(tái)線程池重新拷貝日志段,移除那些key存在于Log Head中的記錄尝偎。每個(gè)壓縮線程如下工作:
選擇log head與log tail比率最高的日志饶火。
在head log中為每個(gè)key的最后offset創(chuàng)建一個(gè)的簡(jiǎn)單概要。
它從日志的開(kāi)始到結(jié)束致扯,刪除那些在日志中最新出現(xiàn)的key的舊的值肤寝。新的、干凈的日志將會(huì)立即被交到到日志中抖僵,所以只需要一個(gè)額外的日志段空間(不是日志的完整副本)
日志head的概要本質(zhì)上是一個(gè)空間密集型的哈希表鲤看,每個(gè)條目使用24個(gè)字節(jié)。所以如果有8G的整理緩沖區(qū)耍群, 則能迭代處理大約366G的日志頭部(假設(shè)消息大小為1k)义桂。
配置Log Cleaner
Log Cleaner默認(rèn)啟用。這會(huì)啟動(dòng)清理的線程池世吨。如果要開(kāi)始特定Topic的清理功能澡刹,可以開(kāi)啟特定的屬性:
1
log.cleanup.policy=compact
這個(gè)可以通過(guò)創(chuàng)建Topic時(shí)配置或者之后使用Topic命令實(shí)現(xiàn)。
Log Cleaner可以配置保留最小的不壓縮的head log耘婚“战剑可以通過(guò)配置壓縮的延遲時(shí)間:
1
log.cleaner.min.compaction.lag.ms
這可以保證消息在配置的時(shí)長(zhǎng)內(nèi)不被壓縮。 如果沒(méi)有設(shè)置沐祷,除了最后一個(gè)日志外嚷闭,所有的日志都會(huì)被壓縮。 活動(dòng)的 segment 是不會(huì)被壓縮的赖临,即使它保存的消息的滯留時(shí)長(zhǎng)已經(jīng)超過(guò)了配置的最小壓縮時(shí)間長(zhǎng)胞锰。
關(guān)于cleaner更詳細(xì)的配置在 這里。
4.9 Quotas 配額
Kafka 集群可以對(duì)客戶端請(qǐng)求進(jìn)行配額兢榨,控制集群資源的使用嗅榕。Kafka broker 可以對(duì)客戶端做兩種類型資源的配額限制,同一個(gè)group的client 共享配額吵聪。
定義字節(jié)率的閾值來(lái)限定網(wǎng)絡(luò)帶寬的配額豺裆。 (從 0.9 版本開(kāi)始)
request 請(qǐng)求率的配額捻爷,網(wǎng)絡(luò)和 I/O線程 cpu利用率的百分比部默。 (從 0.11 版本開(kāi)始)
為什么要對(duì)資源進(jìn)行配額?
producers 和 consumers 可能會(huì)生產(chǎn)或者消費(fèi)大量的數(shù)據(jù)或者產(chǎn)生大量的請(qǐng)求绒怨,導(dǎo)致對(duì) broker 資源的壟斷,引起網(wǎng)絡(luò)的飽和块攒,對(duì)其他clients和brokers本身造成DOS攻擊励稳。 資源的配額保護(hù)可以有效防止這些問(wèn)題佃乘,在大型多租戶集群中,因?yàn)橐恍〔糠直憩F(xiàn)不佳的客戶端降低了良好的用戶體驗(yàn)驹尼,這種情況下非常需要資源的配額保護(hù)趣避。 實(shí)際情況中,當(dāng)把Kafka當(dāng)做一種服務(wù)提供的時(shí)候扶欣,可以根據(jù)客戶端和服務(wù)端的契約對(duì) API 調(diào)用做限制鹅巍。
Client groups
Kafka client 是一個(gè)用戶的概念, 是在一個(gè)安全的集群中經(jīng)過(guò)身份驗(yàn)證的用戶料祠。在一個(gè)支持非授權(quán)客戶端的集群中骆捧,用戶是一組非授權(quán)的 users,broker使用一個(gè)可配置的 PrincipalBuilder 類來(lái)配置 group 規(guī)則髓绽。 Client-id 是客戶端的邏輯分組敛苇,客戶端應(yīng)用使用一個(gè)有意義的名稱進(jìn)行標(biāo)識(shí)。(user, client-id)元組定義了一個(gè)安全的客戶端邏輯分組顺呕,使用相同的user 和 client-id 標(biāo)識(shí)枫攀。
資源配額可以針對(duì) (user,client-id),users 或者client-id groups 三種規(guī)則進(jìn)行配置株茶。對(duì)于一個(gè)請(qǐng)求連接来涨,連接會(huì)匹配最細(xì)化的配額規(guī)則的限制。同一個(gè) group 的所有連接共享這個(gè) group 的資源配額启盛。 舉個(gè)例子蹦掐,如果 (user="test-user", client-id="test-client") 客戶端producer 有10MB/sec 的生產(chǎn)資源配置,這10MB/sec 的資源在所有 "test-user" 用戶僵闯,client-id是 "test-client" 的producer實(shí)例中是共享的卧抗。
Quota Configuration(資源配額的配置)
資源配額的配置可以根據(jù) (user, client-id),user 和 client-id 三種規(guī)則進(jìn)行定義鳖粟。在配額級(jí)別需要更高(或者更低)的配額的時(shí)候社裆,是可以覆蓋默認(rèn)的配額配置。 這種機(jī)制和每個(gè) topic 可以自定義日志配置屬性類似向图。 覆蓋 User 和 (user, client-id) 規(guī)則的配額配置會(huì)寫到zookeeper的 /config/users路徑下泳秀,client-id 配額的配置會(huì)寫到 /config/clients 路徑下。 這些配置的覆蓋會(huì)被所有的 brokers 實(shí)時(shí)的監(jiān)聽(tīng)到并生效榄攀。所以這使得我們修改配額配置不需要重啟整個(gè)集群晶默。更多細(xì)節(jié)參考 here。 每個(gè) group 的默認(rèn)配額可以使用相同的機(jī)制進(jìn)行動(dòng)態(tài)更新航攒。
配額配置的優(yōu)先級(jí)順序是:
/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>
Broker 的配置屬性 (quota.producer.default, quota.consumer.default) 也可以用來(lái)設(shè)置 client-id groups 默認(rèn)的網(wǎng)絡(luò)帶寬配置。這些配置屬性在未來(lái)的 release 版本會(huì)被 deprecated趴梢。 client-id 的默認(rèn)配額也是用zookeeper配置漠畜,和其他配額配置的覆蓋和默認(rèn)方式是相似的币他。
Network Bandwidth Quotas(網(wǎng)絡(luò)帶寬配額配置)
網(wǎng)絡(luò)帶寬配額使用字節(jié)速率閾值來(lái)定義每個(gè) group 的客戶端的共享配額。 默認(rèn)情況下憔狞,每個(gè)不同的客戶端 group 是集群配置的固定配額蝴悉,單位是 bytes/sec。 這個(gè)配額會(huì)以broker 為基礎(chǔ)進(jìn)行定義瘾敢。在 clients 被限制之前拍冠,每個(gè) group 的clients可以發(fā)布和拉取單個(gè)broker 的最大速率,單位是 bytes/sec簇抵。
Request Rate Quotas 請(qǐng)求速率配額
請(qǐng)求速率的配額定義了一個(gè)客戶端可以使用 broker request handler I/O 線程和網(wǎng)絡(luò)線程在一個(gè)配額窗口時(shí)間內(nèi)使用的百分比庆杜。 n% 的配置代表一個(gè)線程的 n%的使用率,所以這種配額是建立在總?cè)萘?((num.io.threads + num.network.threads) * 100)%之上的. 每個(gè) group 的client 的資源在被限制之前可以使用單位配額時(shí)間窗口內(nèi)I/O線程和網(wǎng)絡(luò)線程利用率的 n%碟摆。 由于分配給 I/O和網(wǎng)絡(luò)線程的數(shù)量是基于 broker 的核數(shù)晃财,所以請(qǐng)求量的配額代表每個(gè)group 的client 使用cpu的百分比。
Enforcement(限制)
默認(rèn)情況下典蜕,集群給每個(gè)不同的客戶端group 配置固定的配額断盛。 這個(gè)配額是以broker為基礎(chǔ)定義的。每個(gè) client 在受到限制之前可以利用每個(gè)broker配置的配額資源愉舔。 我們覺(jué)得給每個(gè)broker配置資源配額比為每個(gè)客戶端配置一個(gè)固定的集群帶寬資源要好钢猛,為每個(gè)客戶端配置一個(gè)固定的集群帶寬資源需要一個(gè)機(jī)制來(lái)共享client 在brokers上的配額使用情況。這可能比配額本身實(shí)現(xiàn)更難轩缤。
broker在檢測(cè)到有配額資源使用違反規(guī)則會(huì)怎么辦命迈?在我們計(jì)劃中,broker不會(huì)返回error典奉,而是會(huì)嘗試減速 client 超出的配額設(shè)置躺翻。 broker 會(huì)計(jì)算出將客戶端限制到配額之下的延遲時(shí)間,并且延遲response響應(yīng)卫玖。這種方法對(duì)于客戶端來(lái)說(shuō)也是透明的(客戶端指標(biāo)除外)公你。這也使得client不需要執(zhí)行任何特殊的 backoff 和 retry 行為。而且不友好的客戶端行為(沒(méi)有 backoff 的重試)會(huì)加劇正在解決的資源配額問(wèn)題假瞬。
網(wǎng)絡(luò)字節(jié)速率和線程利用率可以用多個(gè)小窗口來(lái)衡量(例如 1秒30個(gè)窗口)陕靠,以便快速的檢測(cè)和修正配額規(guī)則的違反行為。實(shí)際情況中 較大的測(cè)量窗口(例如脱茉,30秒10個(gè)窗口)會(huì)導(dǎo)致大量的突發(fā)流量剪芥,隨后長(zhǎng)時(shí)間的延遲,會(huì)使得用戶體驗(yàn)不是很好琴许。