Kafka相關(guān)內(nèi)容總結(jié)(概念和原理)

說明

  • 主要內(nèi)容是在網(wǎng)上的一些文章中整理出來闪盔;
  • 加粗的字體是比較重要的內(nèi)容,部分是自己的經(jīng)驗(yàn)和理解辱士;
  • 整理的目的主要是為了方便查閱锭沟;

為什么需要消息系統(tǒng)

  • 解耦:
    在項(xiàng)目啟動(dòng)之初來預(yù)測(cè)將來項(xiàng)目會(huì)碰到什么需求,是極其困難的识补。消息隊(duì)列在處理過程中間插入了一個(gè)隱含的族淮、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程祝辣,只要確保它們遵守同樣的接口約束贴妻。

  • 冗余:
    有些情況下,處理數(shù)據(jù)的過程會(huì)失敗蝙斜。除非數(shù)據(jù)被持久化名惩,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理孕荠,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)娩鹉。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,在把一個(gè)消息從隊(duì)列中刪除之前稚伍,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢弯予,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

  • 擴(kuò)展性:
    因?yàn)橄㈥?duì)列解耦了你的處理過程个曙,所以增大消息入隊(duì)和處理的頻率是很容易的锈嫩,只要另外增加處理過程即可。

  • 靈活性 & 峰值處理能力:
    在訪問量劇增的情況下垦搬,應(yīng)用仍然需要繼續(xù)發(fā)揮作用呼寸,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)猴贰。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力对雪,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。

  • 可恢復(fù)性:
    系統(tǒng)的一部分組件失效時(shí)米绕,不會(huì)影響到整個(gè)系統(tǒng)瑟捣。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉义郑,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理蝶柿。

  • 送達(dá)保證:
    消息隊(duì)列提供的冗余機(jī)制保證了消息能被實(shí)際的處理丈钙,只要一個(gè)進(jìn)程讀取了該隊(duì)列即可非驮。在此基礎(chǔ)上,部分消息系統(tǒng)提供了一個(gè)”只送達(dá)一次”保證雏赦。無論有多少進(jìn)程在從隊(duì)列中領(lǐng)取數(shù)據(jù)劫笙,每一個(gè)消息只能被處理一次。這之所以成為可能星岗,是因?yàn)楂@取一個(gè)消息只是”預(yù)定”了這個(gè)消息填大,暫時(shí)把它移出了隊(duì)列。除非客戶端明確的表示已經(jīng)處理完了這個(gè)消息俏橘,否則這個(gè)消息會(huì)被放回隊(duì)列中去允华,在一段可配置的時(shí)間之后可再次被處理。

  • 順序保證:
    在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要靴寂。大部分消息隊(duì)列本來就是排序的磷蜀,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。(Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性)

  • 緩沖:
    有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度百炬,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況褐隆。

  • 理解數(shù)據(jù)流
    在一個(gè)分布式系統(tǒng)里,要得到一個(gè)關(guān)于用戶操作會(huì)用多長(zhǎng)時(shí)間及其原因的總體印象剖踊,是個(gè)巨大的挑戰(zhàn)庶弃。消息隊(duì)列通過消息被處理的頻率,來方便的輔助確定那些表現(xiàn)不佳的處理過程或領(lǐng)域德澈,這些地方的數(shù)據(jù)流都不夠優(yōu)化歇攻。

  • 異步通信:
    很多時(shí)候,用戶不想也不需要立即處理消息圃验。消息隊(duì)列提供了異步處理機(jī)制掉伏,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它澳窑。想向隊(duì)列中放入多少消息就放多少斧散,然后在需要的時(shí)候再去處理它們。

多個(gè)消息隊(duì)列橫向?qū)Ρ?/h1>
多個(gè)消息隊(duì)列橫向?qū)Ρ?/div>
  • RabbitMQ
    RabbitMQ是使用Erlang編寫的一個(gè)開源的消息隊(duì)列摊聋,本身支持很多的協(xié)議:AMQP鸡捐,XMPP, SMTP, STOMP,也正因如此麻裁,它非常重量級(jí)箍镜,更適合于企業(yè)級(jí)的開發(fā)。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架煎源,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)色迂。對(duì)路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持手销。rabbitMQ在吞吐量方面稍遜于kafka歇僧,他們的出發(fā)點(diǎn)不一樣,rabbitMQ支持對(duì)消息的可靠的傳遞锋拖,支持事務(wù)诈悍,不支持批量的操作;基于存儲(chǔ)的可靠性的要求存儲(chǔ)可以采用內(nèi)存或者硬盤兽埃。

  • Redis
    Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫侥钳,開發(fā)維護(hù)很活躍。雖然它是一個(gè)Key-Value數(shù)據(jù)庫存儲(chǔ)系統(tǒng)柄错,但它本身支持MQ功能舷夺,所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來使用苦酱。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬次给猾,每10萬次記錄一次執(zhí)行時(shí)間躏啰。測(cè)試數(shù)據(jù)分為128Bytes、512Bytes耙册、1K和10K四個(gè)不同大小的數(shù)據(jù)给僵。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ详拙,而如果數(shù)據(jù)大小超過了10K帝际,Redis則慢的無法忍受;出隊(duì)時(shí)饶辙,無論數(shù)據(jù)大小蹲诀,Redis都表現(xiàn)出非常好的性能,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis弃揽。redis不能支持大量數(shù)據(jù)脯爪。

  • ZeroMQ
    ZeroMQ號(hào)稱最快的消息隊(duì)列系統(tǒng),尤其針對(duì)大吞吐量的需求場(chǎng)景矿微。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列痕慢,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)涌矢。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式掖举,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪萘诉@個(gè)服務(wù)角色娜庇。你只需要簡(jiǎn)單的引用ZeroMQ程序庫塔次,可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了名秀。但是ZeroMQ僅提供非持久性的隊(duì)列励负,也就是說如果宕機(jī),數(shù)據(jù)將會(huì)丟失匕得。其中继榆,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)。

  • ActiveMQ
    ActiveMQ是Apache下的一個(gè)子項(xiàng)目耗跛。 類似于ZeroMQ裕照,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列攒发。同時(shí)類似于RabbitMQ调塌,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景。

  • Kafka/Jafka
    Kafka是Apache下的一個(gè)子項(xiàng)目惠猿,是一個(gè)高性能跨語言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng)羔砾,而Jafka是在Kafka之上孵化而來的,即Kafka的一個(gè)升級(jí)版。具有以下特性:快速持久化姜凄,可以在O(1)的系統(tǒng)開銷下進(jìn)行消息持久化政溃;高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率态秧;完全的分布式系統(tǒng)董虱,Broker、Producer申鱼、Consumer都原生自動(dòng)支持分布式愤诱,自動(dòng)實(shí)現(xiàn)負(fù)載均衡;支持Hadoop數(shù)據(jù)并行加載捐友,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)淫半,但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案匣砖。Kafka通過Hadoop的并行加載機(jī)制來統(tǒng)一了在線和離線的消息處理科吭。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng),除了性能非常好之外猴鲫,還是一個(gè)工作良好的分布式系統(tǒng)对人。

kafka 架構(gòu)

拓?fù)浣Y(jié)構(gòu)

拓?fù)浣Y(jié)構(gòu)1

拓?fù)浣Y(jié)構(gòu)2

相關(guān)概念

  • producer:
    消息生產(chǎn)者,發(fā)布消息到kafka集群的終端或服務(wù)拂共。

  • broker:
    kafka集群中包含的服務(wù)器规伐。

  • topic:
    每條發(fā)布到kafka集群的消息屬于的類別,即kafka是面向topic的匣缘。

  • partition:
    partition是物理上的概念猖闪,每個(gè)topic包含一個(gè)或多個(gè)partition。kafka分配的單位是partition肌厨。

  • consumer:
    從kafka集群中消費(fèi)消息的終端或服務(wù)培慌。

  • ConsumerGroup:
    high-level consumerAPI中,每個(gè)consumer都屬于一個(gè)ConsumerGroup柑爸,每條消息只能被ConsumerGroup中的一個(gè)Consumer消費(fèi)吵护,但可以被多個(gè)ConsumerGroup消費(fèi)趟妥。

  • replica:
    partition的副本神僵,保障partition的高可用薄坏。

  • leader:
    replica中的一個(gè)角色夺欲,producer和consumer只跟leader交互灌砖。

  • follower:
    replica中的一個(gè)角色浦旱,從leader中復(fù)制數(shù)據(jù)痢掠。

  • controller:
    kafka集群中的其中一個(gè)服務(wù)器产还,用來進(jìn)行l(wèi)eade relection以及 各種failover厘熟。

  • zookeeper:
    kafka通過zookeeper來存儲(chǔ)集群的meta信息屯蹦。

  • zookeeper 節(jié)點(diǎn)
    kafka 在 zookeeper 中的存儲(chǔ)結(jié)構(gòu)如下圖所示:


    kafka在zookeeper中的存儲(chǔ)結(jié)構(gòu)

producer 發(fā)布消息

寫入方式

producer 采用 push 模式將消息發(fā)布到 broker维哈,每條消息都被 append 到 patition 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高登澜,保障 kafka 吞吐率)阔挠。

消息路由

producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition脑蠕。其路由機(jī)制為:

  • 指定了 patition购撼,則直接使用;
  • 未指定 patition 但指定 key谴仙,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
  • patition 和 key 都未指定份招,使用輪詢或者隨機(jī)選出一個(gè) patition。

寫入流程

producer 寫入消息序列
  • producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader
  • producer 將消息發(fā)送給該 leader
  • leader 將消息寫入本地 log
  • followers 從 leader pull 消息狞甚,寫入本地 log 后 leader 發(fā)送 ACK
  • leader 收到所有 ISR 中的 replica 的 ACK 后锁摔,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK

數(shù)據(jù)的保證

  • 一般情況下存在三種情況:
    • At most once 消息可能會(huì)丟哼审,但絕不會(huì)重復(fù)傳輸
    • At least one 消息絕不會(huì)丟谐腰,但可能會(huì)重復(fù)傳輸
    • Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次
  • 當(dāng) producer 向 broker 發(fā)送消息時(shí),一旦這條消息被 commit涩盾,由于 replication 的存在十气,它就不會(huì)丟。但是如果 producer 發(fā)送數(shù)據(jù)給 broker 后春霍,遇到網(wǎng)絡(luò)問題而造成通信中斷砸西,那 Producer 就無法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么址儒,但是 producer 可以生成一種類似于主鍵的東西芹枷,發(fā)生故障時(shí)冪等性的重試多次,這樣就做到了 Exactly once莲趣,但目前還并未實(shí)現(xiàn)鸳慈。所以目前默認(rèn)情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設(shè)置 producer 異步發(fā)送實(shí)現(xiàn)At most once喧伞。
  • Partition leader與follower:partition也有l(wèi)eader和follower之分走芋。leader是主partition,producer寫kafka的時(shí)候先寫partition leader潘鲫,再由partition leader push給其他的partition follower翁逞。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節(jié)點(diǎn)宕機(jī)溉仑,zookeeper會(huì)沖其他的broker的partition follower上選擇follower變?yōu)閜arition leader挖函。
  • Partition ack:當(dāng)ack=1,表示producer寫partition leader成功后彼念,broker就返回成功挪圾,無論其他的partition follower是否寫成功。當(dāng)ack=2逐沙,表示producer寫partition leader和其他一個(gè)follower成功的時(shí)候哲思,broker就返回成功,無論其他的partition follower是否寫成功吩案。當(dāng)ack=-1[parition的數(shù)量]的時(shí)候棚赔,表示只有producer全部寫成功的時(shí)候,才算成功徘郭,kafka broker才返回成功信息靠益。這里需要注意的是,如果ack=1的時(shí)候残揉,一旦有個(gè)broker宕機(jī)導(dǎo)致partition的follower和leader切換胧后,會(huì)導(dǎo)致丟數(shù)據(jù)。
消息可靠性0

消息可靠性1

消息可靠性2

消息可靠性3

kafka對(duì)數(shù)據(jù)的保證

  • 如果將 consumer 設(shè)置為 auto commit抱环,consumer 一旦讀到數(shù)據(jù)立即自動(dòng) commit壳快。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once镇草。
  • 但實(shí)際使用中應(yīng)用程序并非在 consumer 讀取完數(shù)據(jù)就結(jié)束了眶痰,而是要進(jìn)行進(jìn)一步處理,而數(shù)據(jù)處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:
    • 讀完消息先 commit 再處理消息梯啤。
      這種模式下竖伯,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息因宇,這就對(duì)應(yīng)于 At most once七婴;這是kafka可能丟數(shù)據(jù)的一個(gè)原因
    • 讀完消息先處理再 commit。
      這種模式下察滑,如果在處理完消息之后 commit 之前 consumer crash 了本姥,下次重新開始工作時(shí)還會(huì)處理剛剛未 commit 的消息,實(shí)際上該消息已經(jīng)被處理過了杭棵。這就對(duì)應(yīng)于 At least once婚惫。這是kafka可能重復(fù)處理數(shù)據(jù)的一個(gè)原因
    • 如果一定要做到 Exactly once,就需要協(xié)調(diào) offset 和實(shí)際操作的輸出魂爪。
      精典的做法是引入兩階段提交先舷。如果能讓 offset 和操作輸入存在同一個(gè)地方,會(huì)更簡(jiǎn)潔和通用滓侍。這種方式可能更好蒋川,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交。比如撩笆,consumer 拿到數(shù)據(jù)后可能把數(shù)據(jù)放到 HDFS捺球,如果把最新的 offset 和數(shù)據(jù)本身一起寫到 HDFS缸浦,那就可以保證數(shù)據(jù)的輸出和 offset 的更新要么都完成,要么都不完成氮兵,間接實(shí)現(xiàn) Exactly once裂逐。目前HighLevelConsumerAPI的offset存在于kafka自己的topic中,不能存HDFS泣栈,可以用SimpleConsuemer API實(shí)現(xiàn)
    • auto commit的坑
      • 如果auto.commit.enable=false卜高,假設(shè)consumer的兩個(gè)fetcher各自拿了一條數(shù)據(jù),并且由兩個(gè)線程同時(shí)處理南片,這時(shí)線程t1處理完partition1的數(shù)據(jù)掺涛,手動(dòng)提交offset,這里需要著重說明的是疼进,當(dāng)手動(dòng)執(zhí)行commit的時(shí)候薪缆,實(shí)際上是對(duì)這個(gè)consumer進(jìn)程所占有的所有partition進(jìn)行commit,kafka暫時(shí)還沒有提供更細(xì)粒度的commit方式(TODO:新版kafka有沒有更細(xì)粒度的commit方式伞广?矮燎?),也就是說赔癌,即使t2沒有處理完partition2的數(shù)據(jù)诞外,offset也被t1提交掉了。如果這時(shí)consumer crash掉灾票,t2正在處理的這條數(shù)據(jù)就丟失了峡谊。解決辦法應(yīng)該是只有手動(dòng)commit+兩階段提交。

broker 保存消息

存儲(chǔ)方式

物理上把 topic 分成一個(gè)或多個(gè) patition(對(duì)應(yīng) server.properties 中的 num.partitions=3 配置)刊苍,每個(gè) patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文件)既们,如下:


topic數(shù)據(jù)在broker的存儲(chǔ)方式

存儲(chǔ)策略

無論消息是否被消費(fèi),kafka 都會(huì)保留所有消息正什。有兩種策略可以刪除舊數(shù)據(jù):
基于時(shí)間:log.retention.hours=168
基于大猩吨健:log.retention.bytes=1073741824

文件存儲(chǔ)方式

  • 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中。但每個(gè)段segment file消息數(shù)量不一定相等婴氮,這種特性方便old segment file快速被刪除斯棒。
  • 每個(gè)partiton只需要支持順序讀寫就行了,segment文件生命周期由服務(wù)端配置參數(shù)決定主经。
  • 這樣做的好處就是能快速刪除無用文件荣暮,有效提高磁盤利用率

文件存儲(chǔ)方式

  • producer發(fā)message到某個(gè)topic罩驻,message會(huì)被均勻的分布到多個(gè)partition上(隨機(jī)或根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布)穗酥,kafka broker收到message往對(duì)應(yīng)partition的最后一個(gè)segment上添加該消息,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment上的消息會(huì)被flush到磁盤砾跃,只有flush到磁盤上的消息consumer才能消費(fèi)骏啰,segment達(dá)到一定的大小后將不會(huì)再往該segment寫數(shù)據(jù),broker會(huì)創(chuàng)建新的segment
  • 每個(gè)part在內(nèi)存中對(duì)應(yīng)一個(gè)index抽高,記錄每個(gè)segment中的第一條消息偏移判耕。
    segment file組成:由2大部分組成,分別為index file和data file厨内,此2個(gè)文件一一對(duì)應(yīng)祈秕,成對(duì)出現(xiàn)渺贤,后綴".index"和“.log”分別表示為segment索引文件雏胃、數(shù)據(jù)文件.
  • segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始,后續(xù)每個(gè)segment文件名為上一個(gè)全局partion的最大offset(偏移message數(shù))志鞍。數(shù)值最大為64位long大小瞭亮,19位數(shù)字字符長(zhǎng)度,沒有數(shù)字用0填充固棚。

讀取原理

  • segment file 組成:由2部分組成统翩,分別為index file和data file,這兩個(gè)文件是一一對(duì)應(yīng)的此洲,后綴”.index”和”.log”分別表示索引文件和數(shù)據(jù)文件厂汗;
  • segment file 命名規(guī)則:partition的第一個(gè)segment從0開始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset,ofsset的數(shù)值最大為64位(long類型)呜师,20位數(shù)字字符長(zhǎng)度娶桦,沒有數(shù)字用0填充。如下圖所示:


    例子:segment文件的index文件和log文件
  • segment的索引文件中存儲(chǔ)著大量的元數(shù)據(jù)汁汗,數(shù)據(jù)文件中存儲(chǔ)著大量消息衷畦,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中的message的物理偏移地址。以索引文件中的3知牌,497為例祈争,在數(shù)據(jù)文件中表示第3個(gè)message(在全局partition表示第368772個(gè)message),以及該消息的物理偏移地址為497角寸。
  • 注:Partition中的每條message由offset來表示它在這個(gè)partition中的偏移量菩混,這個(gè)offset并不是該Message在partition中實(shí)際存儲(chǔ)位置,而是邏輯上的一個(gè)值(如上面的3)扁藕,但它卻唯一確定了partition中的一條Message(可以認(rèn)為offset是partition中Message的id)墨吓。


    kafka的Index文件和log文件

Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn)

  • Kafka把topic中一個(gè)parition大文件分成多個(gè)小文件段,通過多個(gè)小文件段纹磺,就容易定期清除或刪除已經(jīng)消費(fèi)完文件帖烘,減少磁盤占用。
  • 通過索引信息可以快速定位message和確定response的最大大小橄杨。
  • 通過index元數(shù)據(jù)全部映射到memory秘症,可以避免segment file的IO磁盤操作照卦。
  • 通過索引文件稀疏存儲(chǔ),可以大幅降低index文件元數(shù)據(jù)占用空間大小乡摹。
  • 消息系統(tǒng)的持久化隊(duì)列可以構(gòu)建在對(duì)一個(gè)文件的讀和追加上役耕,就像一般情況下的日志解決方案。它有一個(gè)優(yōu)點(diǎn)聪廉,所有的操作都是常數(shù)時(shí)間瞬痘,并且讀寫之間不會(huì)相互阻塞。這種設(shè)計(jì)具有極大的性能優(yōu)勢(shì):最終系統(tǒng)性能和數(shù)據(jù)大小完全無關(guān)板熊,服務(wù)器可以充分利用廉價(jià)的硬盤來提供高效的消息服務(wù)框全。
  • 事實(shí)上還有一點(diǎn),磁盤空間的無限增大而不影響性能這點(diǎn)干签,意味著我們可以提供一般消息系統(tǒng)無法提供的特性津辩。比如說,消息被消費(fèi)后不是立馬被刪除容劳,我們可以將這些消息保留一段相對(duì)比較長(zhǎng)的時(shí)間(比如一個(gè)星期)喘沿。

Kafka Broker一些特性

  • 無狀態(tài)的Kafka Broker :

    • Broker沒有副本機(jī)制,一旦broker宕機(jī)竭贩,該broker的消息將都不可用蚜印。
    • Broker不保存訂閱者的狀態(tài),由訂閱者自己保存留量。
    • 無狀態(tài)導(dǎo)致消息的刪除成為難題(可能刪除的消息正在被訂閱)窄赋,kafka采用基于時(shí)間的SLA(服務(wù)水平保證),消息保存一定時(shí)間(通常為7天)后會(huì)被刪除肪获。
    • 消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi)寝凌,當(dāng)訂閱者故障時(shí),可以選擇最小的offset進(jìn)行重新讀取消費(fèi)消息孝赫。
  • message的交付與生命周期 :

    • 不是嚴(yán)格的JMS较木, 因此kafka對(duì)消息的重復(fù)、丟失青柄、錯(cuò)誤以及順序型沒有嚴(yán)格的要求伐债。(這是與AMQ最大的區(qū)別)
    • JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API致开,用于在兩個(gè)應(yīng)用程序之間峰锁,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信双戳。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API虹蒋,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
    • kafka提供at-least-once delivery,即當(dāng)consumer宕機(jī)后,有些消息可能會(huì)被重復(fù)delivery魄衅。
    • 因每個(gè)partition只會(huì)被consumer group內(nèi)的一個(gè)consumer消費(fèi)峭竣,故kafka保證每個(gè)partition內(nèi)的消息會(huì)被順序的訂閱。
    • Kafka為每條消息為每條消息計(jì)算CRC校驗(yàn)晃虫,用于錯(cuò)誤檢測(cè)皆撩,crc校驗(yàn)不通過的消息會(huì)直接被丟棄掉。
  • 壓縮

    • Kafka支持以集合(batch)為單位發(fā)送消息哲银,在此基礎(chǔ)上扛吞,Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer端可以通過GZIP或Snappy格式對(duì)消息集合進(jìn)行壓縮荆责。Producer端進(jìn)行壓縮之后滥比,在Consumer端需進(jìn)行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量草巡,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫κ匚兀趯?duì)大數(shù)據(jù)處理上型酥,瓶頸往往體現(xiàn)在網(wǎng)絡(luò)上而不是CPU山憨。
    • 那么如何區(qū)分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個(gè)描述壓縮屬性字節(jié)弥喉,這個(gè)字節(jié)的后兩位表示消息的壓縮采用的編碼郁竟,如果后兩位為0,則表示消息未被壓縮由境。

offset管理

kafka會(huì)記錄offset到zk中棚亩。但是,zk client api對(duì)zk的頻繁寫入是一個(gè)低效的操作虏杰。0.8.2 kafka引入了native offset storage讥蟆,將offset管理從zk移出,并且可以做到水平擴(kuò)展纺阔。其原理就是利用了kafka的compacted topic瘸彤,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中,topic名稱為__consumer_offsets笛钝。同時(shí)Kafka又在內(nèi)存中維護(hù)了的三元組來維護(hù)最新的offset信息质况,consumer來取最新offset信息的時(shí)候直接內(nèi)存里拿即可。當(dāng)然玻靡,kafka允許你快速的checkpoint最新的offset信息到磁盤上结榄。根據(jù)以上的信息可以理解,既然生產(chǎn)者有可能因?yàn)閎roker的掛掉而造成丟數(shù)據(jù)囤捻,那么消費(fèi)成功的offset臼朗,如果發(fā)送kafka失敗,或者kafka寫入失敗(如broker掛掉)等情況视哑,也有可能造成重復(fù)消費(fèi)(已經(jīng)消費(fèi)老厌,但是kafka寫入不成功)。

  • __consumer_offsets包含很多partition黎炉,Kafka會(huì)使用下面公式計(jì)算某個(gè)ConsumerGroup位移保存在__consumer_offsets的哪個(gè)分區(qū)上:
Math.abs(groupID.hashCode()) % numPartitions
  • 獲取指定consumer group的位移信息
//0.11.0.0版本之前
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

//0.11.0.0版本以后(含)
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

輸出結(jié)果如下:

...
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
 ...

上圖可見枝秤,該consumer group保存在分區(qū)11上,且位移信息都是對(duì)的(這里的位移信息是已消費(fèi)的位移慷嗜,嚴(yán)格來說不是第3步中的位移淀弹。由于consumer已經(jīng)消費(fèi)完了所有的消息,所以這里的位移與第3步中的位移相同)庆械。另外薇溃,可以看到__consumer_offsets topic的每一日志項(xiàng)的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

topic創(chuàng)建

kafka的架構(gòu)圖

kafka的partition

創(chuàng)建topic命令

./bin/kafka-topics.sh --zookeeper 10.1.112.57:2181,10.1.112.58:2181,10.1.112.59:2181/kafka --create --topic LvsKafka --replication-factor 2 --partitions 24

流程圖

topic創(chuàng)建
  • topic創(chuàng)建主要分為兩個(gè)部分:命令行部分+后臺(tái)(controller)邏輯部分,如下圖所示缭乘。主要的思想就是后臺(tái)邏輯會(huì)監(jiān)聽zookeeper下對(duì)應(yīng)的目錄節(jié)點(diǎn)沐序,一旦發(fā)起topic創(chuàng)建命令,該命令會(huì)創(chuàng)建新的數(shù)據(jù)節(jié)點(diǎn)從而觸發(fā)后臺(tái)的創(chuàng)建邏輯堕绩。
  • 簡(jiǎn)單來說我們發(fā)起的命令行主要做兩件事情:1. 確定分區(qū)副本的分配方案(就是每個(gè)分區(qū)的副本都分配到哪些broker上)策幼;2. 創(chuàng)建zookeeper節(jié)點(diǎn),把這個(gè)方案寫入/brokers/topics/<topic>節(jié)點(diǎn)下奴紧。
  • Kafka controller部分主要做下面這些事情:1. 創(chuàng)建分區(qū)特姐;2. 創(chuàng)建副本;3. 為每個(gè)分區(qū)選舉leader黍氮、ISR唐含;4.更新各種緩存。

流程說明

  • controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher沫浆,當(dāng) topic 被創(chuàng)建捷枯,則 controller 會(huì)通過 watch 得到該 topic 的 partition/replica 分配。(也就是专执,創(chuàng)建topic的命令負(fù)責(zé)分配partition/replica并寫入zk淮捆,然后controller從zk的節(jié)點(diǎn)信息發(fā)現(xiàn)了一個(gè)新的topic
  • controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表,對(duì)于 set_p 中的每一個(gè) partition:
    • 從分配給該 partition 的所有 replica(稱為AR)中任選一個(gè)可用的 broker 作為新的 leader他炊,并將AR設(shè)置為新的 ISR
    • 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
  • controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest争剿。

命令行部分

  • 我們發(fā)起topic創(chuàng)建命令之后,Kafka會(huì)做一些基本的校驗(yàn)痊末,比如是否同時(shí)指定了分區(qū)數(shù)蚕苇、副本因子或是topic名字中是否含有非法字符等。值得一提的是凿叠,0.10版本支持指定broker的機(jī)架信息涩笤,類似于Hadoop那樣嚼吞,可以更好地利用局部性原理減少集群中網(wǎng)絡(luò)開銷。如果指定了機(jī)架信息(broker.rack), Kafka在為分區(qū)做副本分配時(shí)就會(huì)考慮這部分信息蹬碧,盡可能地為副本挑選不同機(jī)架的broker舱禽。
  • 做完基本的校驗(yàn)之后,Kafka會(huì)從zookeeper的/brokers/ids下獲取集群當(dāng)前存活broker列表然后開始執(zhí)行副本的分配工作恩沽。首先誊稚,分區(qū)副本的分配有以下3個(gè)目標(biāo):
    • 盡可能地在各個(gè)broker之間均勻地分配副本
    • 如果分區(qū)的某個(gè)副本被分配到了一個(gè)broker,那么要盡可能地讓該分區(qū)的其他副本均勻地分配到其他broker上
    • 如果所有broker都指定了機(jī)架信息罗心,那么盡可能地讓每個(gè)分區(qū)的副本都分配到不同的機(jī)架上
  • 概括:隨機(jī)挑選一個(gè)broker采用輪詢的方式分配每個(gè)分區(qū)的第一個(gè)副本里伯,然后采用增量右移的方式分配其他的副本。算法如下:①將所有 broker(假設(shè)共 n 個(gè) broker)和待分配的 partition 排序渤闷;②將第 i 個(gè) partition 分配到第(i mod n)個(gè) broker 上疾瓮;③將第 i 個(gè) partition 的第 j 個(gè) replica 分配到第((i + j) mode n)個(gè) broker上;
  • 確定了分區(qū)副本分配方案之后飒箭,Kafka會(huì)把這個(gè)分配方案持久化到zookeeper的/brokers/topics/<topic>節(jié)點(diǎn)下狼电,類似于這樣的信息:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}},partitions下的格式是:格式是分區(qū)號(hào) -> 副本所在broker Id集合弦蹂,該例子是三個(gè)partition肩碟,每個(gè)partition三個(gè)副本。
  • Kafka集群partition replication默認(rèn)自動(dòng)分配分析:Kafka集群中4個(gè)Broker舉例盈匾,創(chuàng)建1個(gè)topic包含4個(gè)Partition腾务,2 Replication毕骡;

Controller部分(后臺(tái))

  • 所謂的后臺(tái)邏輯其實(shí)是由Kafka的controller負(fù)責(zé)提供的削饵。Kafka的controller內(nèi)部保存了很多信息,其中有一個(gè)分區(qū)狀態(tài)機(jī)未巫,用于記錄topic各個(gè)分區(qū)的狀態(tài)窿撬。這個(gè)狀態(tài)機(jī)內(nèi)部注冊(cè)了一些zookeeper監(jiān)聽器。Controller在啟動(dòng)的時(shí)候會(huì)創(chuàng)建這些監(jiān)聽器叙凡。其中一個(gè)監(jiān)聽器(TopicChangeListener)就是用于監(jiān)聽zookeeper的/brokers/topics目錄的子節(jié)點(diǎn)變化的劈伴。一旦該目錄子節(jié)點(diǎn)數(shù)發(fā)生變化就會(huì)調(diào)用這個(gè)監(jiān)聽器的處理方法。

  • TopicChangeListener監(jiān)聽器一方面會(huì)更新controller的緩存信息(比如更新集群當(dāng)前所有的topic列表以及更新新增topic的分區(qū)副本分配方案緩存等)握爷,另一方面就是創(chuàng)建對(duì)應(yīng)的分區(qū)及其副本對(duì)象并為每個(gè)分區(qū)確定leader副本及ISR跛璧。

  • 詳細(xì)內(nèi)容見:Kafka如何創(chuàng)建topic

topic刪除

刪除topic命令

流程圖

topic刪除流程

流程說明(守護(hù)線程)

  • Kafka的broker在被選舉成controller后,會(huì)執(zhí)行下面幾步
    • 注冊(cè)DeleteTopicsListener新啼,監(jiān)聽zookeeper節(jié)點(diǎn)/admin/delete_topics下子節(jié)點(diǎn)的變化追城,delete命令實(shí)際上就是要在該節(jié)點(diǎn)下創(chuàng)建一個(gè)節(jié)點(diǎn),名字是待刪除topic名燥撞,標(biāo)記該topic是待刪除的
    • 創(chuàng)建一個(gè)單獨(dú)的線程DeleteTopicsThread,來執(zhí)行topic刪除的操作
  • DeleteTopicsThread線程啟動(dòng)時(shí)會(huì)先在awaitTopicDeletionNotification處阻塞并等待刪除事件的通知沈善,即有新的topic被添加到queue里等待被刪除担孔。
  • 當(dāng)我們使用了delete命令,在zookeeper上的節(jié)點(diǎn)/admin/delete_topics下創(chuàng)建子節(jié)點(diǎn)< topic_name >戏锹。
  • DeleteTopicsListener會(huì)收到ChildChange事件會(huì)依次判斷如下邏輯:
    • 查詢topic是否存在,若已經(jīng)不存在了火诸,則直接刪除/admin/delete_topics/< topic_name >節(jié)點(diǎn)锦针。
    • 查詢topic是否為當(dāng)前正在執(zhí)行Preferred副本選舉或分區(qū)重分配,若果是置蜀,則標(biāo)記為暫時(shí)不適合被刪除伞插。
    • 并將該topic添加到queue中,此時(shí)會(huì)喚醒DeleteTopicsThread中doWork方法里awaitTopicDeletionNotification處的阻塞線程盾碗,讓刪除線程繼續(xù)往下執(zhí)行媚污。

流程說明(后臺(tái)邏輯,實(shí)現(xiàn)刪除操作)

  • 它首先會(huì)向各broker更新原信息廷雅,使得他們不再向外提供數(shù)據(jù)服務(wù)耗美,準(zhǔn)備開始刪除數(shù)據(jù)。
  • 開始刪除這個(gè)topic的所有分區(qū)
    • 給所有broker發(fā)請(qǐng)求航缀,告訴它們這些分區(qū)要被刪除商架。broker收到后就不再接受任何在這些分區(qū)上的客戶端請(qǐng)求了
    • 把每個(gè)分區(qū)下的所有副本都置于OfflineReplica狀態(tài),這樣ISR就不斷縮小芥玉,當(dāng)leader副本最后也被置于OfflineReplica狀態(tài)時(shí)leader信息將被更新為-1
    • 將所有副本置于ReplicaDeletionStarted狀態(tài)
    • 副本狀態(tài)機(jī)捕獲狀態(tài)變更蛇摸,然后發(fā)起StopReplicaRequest給broker,broker接到請(qǐng)求后停止所有fetcher線程灿巧、移除緩存赶袄,然后刪除底層log文件
    • 關(guān)閉所有空閑的Fetcher線程
  • 刪除zookeeper上節(jié)點(diǎn)/brokers/topics/< topic_name >
  • 刪除zookeeper上節(jié)點(diǎn)/config/topics/< topic_name >
  • 刪除zookeeper上節(jié)點(diǎn)/admin/delete_topics/< topic_name >
  • 并刪除內(nèi)存中的topic相關(guān)信息。

leader failover

  • 當(dāng) partition 對(duì)應(yīng)的 leader 宕機(jī)時(shí)抠藕,需要從 follower 中選舉出新 leader饿肺。在選舉新leader時(shí),一個(gè)基本的原則是盾似,新的 leader 必須擁有舊 leader commit 過的所有消息敬辣。
  • kafka 在 zookeeper 中(/brokers/.../state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas),由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader零院,只有 ISR 里面的成員才能選為 leader溉跃。對(duì)于 f+1 個(gè) replica,一個(gè) partition 可以在容忍 f 個(gè) replica 失效的情況下保證消息不丟失告抄。
  • kafka 0.8.* 采用的方案是:選擇第一個(gè)活過來的 replica(不一定是 ISR 成員)作為 leader撰茎。無法保障數(shù)據(jù)不丟失,但相對(duì)不可用時(shí)間較短玄妈。
  • 這種方案也是丟數(shù)據(jù)的原因之一乾吻,但是恢復(fù)時(shí)間相對(duì)較短髓梅。

broker failover

  • controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊(cè) Watcher,當(dāng) broker 宕機(jī)時(shí) zookeeper 會(huì) fire watch绎签;
  • controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker枯饿;
  • controller決定set_p,該集合包含宕機(jī) broker 上的所有 partition诡必;
  • 對(duì) set_p 中的每一個(gè) partition:
    • 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR
    • 決定新 leader
    • 將新 leader奢方、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點(diǎn)
    • 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令

controller failover

當(dāng) controller 宕機(jī)時(shí)會(huì)觸發(fā) controller failover爸舒。每個(gè) broker 都會(huì)在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊(cè) watcher蟋字,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失,所有存活的 broker 收到 fire 的通知扭勉,每個(gè) broker 都嘗試創(chuàng)建新的 controller path鹊奖,只有一個(gè)競(jìng)選成功并當(dāng)選為 controller。

消費(fèi)消息:The high-level consumer API

high-level consumer API 提供了 consumer group 的語義涂炎,一個(gè)消息只能被 group 內(nèi)的一個(gè) consumer 所消費(fèi)忠聚,且 consumer 消費(fèi)消息時(shí)不關(guān)注 offset。(注:客戶端開啟自動(dòng)提交offset唱捣,offset由kafka自行保存两蟀,這是新版kafka的功能,offset的維護(hù)不依賴于zk

  • 如果消費(fèi)線程大于 patition 數(shù)量震缭,則有些線程將收不到消息
  • 如果 patition 數(shù)量大于線程數(shù)赂毯,則有些線程會(huì)同時(shí)收到多個(gè) patition 的消息
  • 如果一個(gè)線程消費(fèi)多個(gè) patition,則無法保證你收到的消息的順序拣宰,而一個(gè) patition 內(nèi)的消息是有序的

消費(fèi)消息:The SimpleConsumer API(低層次的接口)

  • 多次讀取一個(gè)消息(自己管理offset党涕,不再自動(dòng)提交

  • 只消費(fèi)一個(gè) patition 中的部分消息

  • 使用事務(wù)來保證一個(gè)消息僅被消費(fèi)一次(用戶自行實(shí)現(xiàn)

  • 必須在應(yīng)用程序中跟蹤 offset,從而確定下一條應(yīng)該消費(fèi)哪條消息

  • 應(yīng)用程序需要通過程序獲知每個(gè) Partition 的 leader 是誰

  • 需要處理 leader 的變更

The high-level consumer API之消費(fèi)組(consumer group)

  • kafka 的分配單位是 patition徐裸。每個(gè) consumer 都屬于一個(gè) group遣鼓,一個(gè) partition 只能被同一個(gè) group 內(nèi)的一個(gè) consumer 所消費(fèi)(也就保障了一個(gè)消息只能被 group 內(nèi)的一個(gè) consuemr 所消費(fèi)),但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition重贺。
  • kafka 的設(shè)計(jì)目標(biāo)之一就是同時(shí)實(shí)現(xiàn)離線處理和實(shí)時(shí)處理,根據(jù)這一特性回懦,可以使用 spark/Storm 這些實(shí)時(shí)處理系統(tǒng)對(duì)消息在線處理气笙,同時(shí)使用 Hadoop 批處理系統(tǒng)進(jìn)行離線處理,還可以將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)中心怯晕,只需要保證這三者屬于不同的 consumer group潜圃。


    消費(fèi)組

消費(fèi)方式

  • consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)。
  • push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者舟茶,因?yàn)橄l(fā)送速率是由 broker 決定的谭期。它的目標(biāo)是盡可能以最快速度傳遞消息堵第,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞隧出。而pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息踏志。
  • 對(duì)于 Kafka 而言,pull 模式更合適胀瞪,它可簡(jiǎn)化 broker 的設(shè)計(jì)针余,consumer 可自主控制消費(fèi)消息的速率,同時(shí) consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi)凄诞,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義圆雁。

ConsumerGroup最佳實(shí)踐

  • kafka為了保證吞吐量,只允許同一個(gè)consumer group下的一個(gè)consumer線程去訪問一個(gè)partition帆谍。如果覺得效率不高的時(shí)候伪朽,可以加partition的數(shù)量來橫向擴(kuò)展,那么再加新的consumer thread去消費(fèi)汛蝙。如果想多個(gè)不同的業(yè)務(wù)都需要這個(gè)topic的數(shù)據(jù)驱负,起多個(gè)consumer group就好了,大家都是順序的讀取message患雇,offsite的值互不影響跃脊。這樣沒有鎖競(jìng)爭(zhēng),充分發(fā)揮了橫向的擴(kuò)展性苛吱,吞吐量極高酪术。這也就形成了分布式消費(fèi)的概念。

  • 當(dāng)啟動(dòng)一個(gè)consumer group去消費(fèi)一個(gè)topic的時(shí)候翠储,無論topic里面有多個(gè)少個(gè)partition绘雁,無論我們consumer group里面配置了多少個(gè)consumer thread,這個(gè)consumer group下面的所有consumer thread一定會(huì)消費(fèi)全部的partition援所;即便這個(gè)consumer group下只有一個(gè)consumer thread庐舟,那么這個(gè)consumer thread也會(huì)去消費(fèi)所有的partition。因此住拭,最優(yōu)的設(shè)計(jì)就是挪略,consumer group下的consumer thread的數(shù)量等于partition數(shù)量,這樣效率是最高的滔岳。

  • 同一partition的一條message只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi)杠娱。不能夠一個(gè)consumer group的多個(gè)consumer同時(shí)消費(fèi)一個(gè)partition.

  • 一個(gè)consumer group下,無論有多少個(gè)consumer谱煤,這個(gè)consumer group一定會(huì)去把這個(gè)topic下所有的partition都消費(fèi)了摊求。當(dāng)consumer group里面的consumer數(shù)量小于這個(gè)topic下的partition數(shù)量的時(shí)候,如下圖groupA,groupB刘离,就會(huì)出現(xiàn)一個(gè)conusmer thread消費(fèi)多個(gè)partition的情況室叉,總之是這個(gè)topic下的partition都會(huì)被消費(fèi)睹栖。如果consumer group里面的consumer數(shù)量等于這個(gè)topic下的partition數(shù)量的時(shí)候,如下圖groupC茧痕,此時(shí)效率是最高的野来,每個(gè)partition都有一個(gè)consumer thread去消費(fèi)。當(dāng)consumer group里面的consumer數(shù)量大于這個(gè)topic下的partition數(shù)量的時(shí)候凿渊,如下圖GroupD梁只,就會(huì)有一個(gè)consumer thread空閑。因此埃脏,我們?cè)谠O(shè)定consumer group的時(shí)候搪锣,只需要指明里面有幾個(gè)consumer數(shù)量即可,無需指定對(duì)應(yīng)的消費(fèi)partition序號(hào)彩掐,consumer會(huì)自動(dòng)進(jìn)行rebalance构舟。


    ConsumerGroup
  • 多個(gè)Consumer Group下的consumer可以消費(fèi)同一條message,但是這種消費(fèi)也是以O(shè)(1)的方式順序的讀取message去消費(fèi),堵幽,所以一定會(huì)重復(fù)消費(fèi)這批message的狗超。

consumer rebalance

  • 引發(fā)consumer rebalance的時(shí)機(jī)
    • 使用ConsumerGroup消費(fèi)時(shí),同一個(gè)ConsumerGroup增加或者減少消費(fèi)者朴下;
    • Broker的數(shù)目增加或者減少努咐;
  • consumer rebalance算法如下:
      1. 將目標(biāo) topic 下的所有 partirtion 排序,存于PT
      1. 對(duì)某 consumer group 下所有 consumer 排序殴胧,存于 CG渗稍,第 i 個(gè)consumer 記為 Ci
      1. N=size(PT)/size(CG),向上取整
      1. 解除 Ci 對(duì)原來分配的 partition 的消費(fèi)權(quán)(i從0開始)
      1. 將第iN到(i+1)N-1個(gè) partition 分配給 Ci

Producer生產(chǎn)者

  • Producers直接發(fā)送消息到broker上的leader partition团滥,不需要經(jīng)過任何中介或其他路由轉(zhuǎn)發(fā)竿屹。為了實(shí)現(xiàn)這個(gè)特性,kafka集群中的每個(gè)broker都可以響應(yīng)producer的請(qǐng)求灸姊,并返回topic的一些元信息拱燃,這些元信息包括哪些機(jī)器是存活的,topic的leader partition都在哪力惯,現(xiàn)階段哪些leader partition是可以直接被訪問的碗誉。這是生產(chǎn)者高效的原因。
  • Producer客戶端自己控制著消息被推送到哪些partition夯膀。實(shí)現(xiàn)的方式可以是隨機(jī)分配诗充、實(shí)現(xiàn)一類隨機(jī)負(fù)載均衡算法,或者指定一些分區(qū)算法诱建。Kafka提供了接口供用戶實(shí)現(xiàn)自定義的partition,用戶可以為每個(gè)消息指定一個(gè)partitionKey碟绑,通過這個(gè)key來實(shí)現(xiàn)一些hash分區(qū)算法俺猿。比如茎匠,把userid作為partitionkey的話,相同userid的消息將會(huì)被推送到同一個(gè)partition押袍。
  • 以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率诵冒,kafka Producer 可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。Batch的數(shù)量大小可以通過Producer的參數(shù)控制谊惭,參數(shù)值可以設(shè)置為累計(jì)的消息的數(shù)量(如500條)汽馋、累計(jì)的時(shí)間間隔(如100ms)或者累計(jì)的數(shù)據(jù)大小(64KB)。通過增加batch的大小圈盔,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù)豹芯,當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。

復(fù)制原理和同步方式

  • Kafka中topic的每個(gè)partition有一個(gè)預(yù)寫式的日志文件驱敲,雖然partition可以繼續(xù)細(xì)分為若干個(gè)segment文件铁蹈,但是對(duì)于上層應(yīng)用來說可以將partition看成最小的存儲(chǔ)單元(一個(gè)有多個(gè)segment文件拼接的“巨型”文件),每個(gè)partition都由一些列有序的众眨、不可變的消息組成握牧,這些消息被連續(xù)的追加到partition中。


    kafka消息
  • 上圖中有兩個(gè)新名詞:HW和LEO娩梨。這里先介紹下LEO沿腰,LogEndOffset的縮寫,表示每個(gè)partition的log最后一條Message的位置狈定。HW是HighWatermark的縮寫颂龙,是指consumer能夠看到的此partition的位置。
  • 為了提高消息的可靠性掸冤,Kafka每個(gè)topic的partition有N個(gè)副本(replicas)厘托,其中N(大于等于1)是topic的復(fù)制因子(replica fator)的個(gè)數(shù)。Kafka通過多副本機(jī)制實(shí)現(xiàn)故障自動(dòng)轉(zhuǎn)移稿湿,當(dāng)Kafka集群中一個(gè)broker失效情況下仍然保證服務(wù)可用铅匹。在Kafka中發(fā)生復(fù)制時(shí)確保partition的日志能有序地寫到其他節(jié)點(diǎn)上,N個(gè)replicas中饺藤,其中一個(gè)replica為leader包斑,其他都為follower, leader處理partition的所有讀寫請(qǐng)求,與此同時(shí)涕俗,follower會(huì)被動(dòng)定期地去復(fù)制leader上的數(shù)據(jù)罗丰。
  • 如下圖所示,Kafka集群中有4個(gè)broker, 某topic有3個(gè)partition,且復(fù)制因子即副本個(gè)數(shù)也為3:


    4個(gè)broker的Kafka集群

    Kafka提供了數(shù)據(jù)復(fù)制算法保證再姑,如果leader發(fā)生故障或掛掉萌抵,一個(gè)新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個(gè)副本為leader,或者說follower追趕leader數(shù)據(jù)绍填。leader負(fù)責(zé)維護(hù)和跟蹤ISR(In-Sync Replicas的縮寫霎桅,表示副本同步隊(duì)列,具體可參考下節(jié))中所有follower滯后的狀態(tài)讨永。當(dāng)producer發(fā)送一條消息到broker后滔驶,leader寫入消息并復(fù)制到所有follower。消息提交之后才被成功復(fù)制到所有的同步副本卿闹。消息復(fù)制延遲受最慢的follower限制揭糕,重要的是快速檢測(cè)慢副本,如果follower“落后”太多或者失效锻霎,leader將會(huì)把它從ISR中刪除著角。

ISR

  • ISR (In-Sync Replicas),這個(gè)是指副本同步隊(duì)列量窘。副本數(shù)對(duì)Kafka的吞吐率是有一定的影響雇寇,但極大的增強(qiáng)了可用性。默認(rèn)情況下Kafka的replica數(shù)量為1蚌铜,即每個(gè)partition都有一個(gè)唯一的leader锨侯,為了確保消息的可靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設(shè)置為大于1冬殃,比如3囚痴。 所有的副本(replicas)統(tǒng)稱為Assigned Replicas,即AR审葬。ISR是AR中的一個(gè)子集深滚,由leader維護(hù)ISR列表,follower從leader同步數(shù)據(jù)有一些延遲(包括延遲時(shí)間replica.lag.time.max.ms和延遲條數(shù)replica.lag.max.messages兩個(gè)維度, 當(dāng)前最新的版本0.10.x中只支持replica.lag.time.max.ms這個(gè)維度)涣觉,任意一個(gè)超過閾值都會(huì)把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表痴荐,新加入的follower也會(huì)先存放在OSR中。AR=ISR+OSR官册。
  • Kafka 0.10.x版本后移除了replica.lag.max.messages參數(shù)生兆,只保留了replica.lag.time.max.ms作為ISR中副本管理的參數(shù)。為什么這樣做呢?replica.lag.max.messages表示當(dāng)前某個(gè)副本落后leaeder的消息數(shù)量超過了這個(gè)參數(shù)的值膝宁,那么leader就會(huì)把follower從ISR中刪除鸦难。假設(shè)設(shè)置replica.lag.max.messages=4,那么如果producer一次傳送至broker的消息數(shù)量都小于4條時(shí)员淫,因?yàn)樵趌eader接受到producer發(fā)送的消息之后而follower副本開始拉取這些消息之前合蔽,follower落后leader的消息數(shù)不會(huì)超過4條消息,故此沒有follower移出ISR介返,所以這時(shí)候replica.lag.max.message的設(shè)置似乎是合理的拴事。但是producer發(fā)起瞬時(shí)高峰流量沃斤,producer一次發(fā)送的消息超過4條時(shí),也就是超過replica.lag.max.messages挤聘,此時(shí)follower都會(huì)被認(rèn)為是與leader副本不同步了轰枝,從而被踢出了ISR捅彻。但實(shí)際上這些follower都是存活狀態(tài)的且沒有性能問題组去。那么在之后追上leader,并被重新加入了ISR。于是就會(huì)出現(xiàn)它們不斷地剔出ISR然后重新回歸ISR步淹,這無疑增加了無謂的性能損耗从隆。而且這個(gè)參數(shù)是broker全局的。設(shè)置太大了缭裆,影響真正“落后”follower的移除;設(shè)置的太小了键闺,導(dǎo)致follower的頻繁進(jìn)出。無法給定一個(gè)合適的replica.lag.max.messages的值澈驼,故此辛燥,新版本的Kafka移除了這個(gè)參數(shù)。
  • 注:ISR中包括:leader和follower缝其。
  • HW俗稱高水位挎塌,HighWatermark的縮寫,取一個(gè)partition對(duì)應(yīng)的ISR中最小的LEO作為HW内边,consumer最多只能消費(fèi)到HW所在的位置榴都。另外每個(gè)replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)。對(duì)于leader新寫入的消息漠其,consumer不能立刻消費(fèi)嘴高,leader會(huì)等待該消息被所有ISR中的replicas同步后更新HW,此時(shí)消息才能被consumer消費(fèi)和屎。這樣就保證了如果leader所在的broker失效拴驮,該消息仍然可以從新選舉的leader中獲取。對(duì)于來自內(nèi)部broKer的讀取請(qǐng)求柴信,沒有HW的限制套啤。
  • 下圖詳細(xì)的說明了當(dāng)producer生產(chǎn)消息至broker后,ISR以及HW和LEO的流轉(zhuǎn)過程:


    當(dāng)producer生產(chǎn)消息至broker后颠印,ISR以及HW和LEO的流轉(zhuǎn)過程
  • 由此可見纲岭,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制线罕。事實(shí)上止潮,同步復(fù)制要求所有能工作的follower都復(fù)制完,這條消息才會(huì)被commit钞楼,這種復(fù)制方式極大的影響了吞吐率喇闸。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit燃乍,這種情況下如果follower都還沒有復(fù)制完唆樊,落后于leader時(shí),突然leader宕機(jī)刻蟹,則會(huì)丟失數(shù)據(jù)逗旁。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。
  • Kafka的ISR的管理最終都會(huì)反饋到Zookeeper節(jié)點(diǎn)上舆瘪。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state片效。目前有兩個(gè)地方會(huì)對(duì)這個(gè)Zookeeper的節(jié)點(diǎn)進(jìn)行維護(hù):
    • Controller來維護(hù):Kafka集群中的其中一個(gè)Broker會(huì)被選舉為Controller,主要負(fù)責(zé)Partition管理和副本狀態(tài)管理英古,也會(huì)執(zhí)行類似于重分配partition之類的管理任務(wù)淀衣。在符合某些特定條件下,Controller下的LeaderSelector會(huì)選舉新的leader召调,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關(guān)節(jié)點(diǎn)中膨桥。同時(shí)發(fā)起LeaderAndIsrRequest通知所有的replicas。
    • leader來維護(hù):leader有單獨(dú)的線程定期檢測(cè)ISR中follower是否脫離ISR, 如果發(fā)現(xiàn)ISR變化唠叛,則會(huì)將新的ISR的信息返回到Zookeeper的相關(guān)節(jié)點(diǎn)中只嚣。

數(shù)據(jù)可靠性和持久性保證

  • request.required.acks=1(默認(rèn)):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。如果leader宕機(jī)了玻墅,則會(huì)丟失數(shù)據(jù)介牙。producer發(fā)送數(shù)據(jù)到leader,leader寫本地日志成功澳厢,返回客戶端成功;此時(shí)ISR中的副本還沒有來得及拉取該消息环础,leader就宕機(jī)了,那么此次發(fā)送的消息就會(huì)丟失剩拢。
    request.required.acks=1
  • request.required.acks=0:這意味著producer無需等待來自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息线得。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的徐伐。
    request.required.acks=0
  • request.required.acks=-1:producer需要等待ISR中的所有follower都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成贯钩,可靠性最高。但是這樣也不能保證數(shù)據(jù)不丟失办素,比如當(dāng)ISR中只有l(wèi)eader時(shí)角雷,這樣就變成了acks=1的情況。數(shù)據(jù)發(fā)送到leader后 性穿,部分ISR的副本同步勺三,leader此時(shí)掛掉。比如follower1h和follower2都有可能變成新的leader, producer端會(huì)得到返回異常需曾,producer端會(huì)重新發(fā)送數(shù)據(jù)吗坚,數(shù)據(jù)可能會(huì)重復(fù)祈远。
    request.required.acks=-1

關(guān)于HW的進(jìn)一步探討

  • 考慮上圖(即acks=-1,部分ISR副本同步)中的另一種情況,如果在Leader掛掉的時(shí)候商源,follower1同步了消息4,5车份,follower2同步了消息4,與此同時(shí)follower2被選舉為leader牡彻,那么此時(shí)follower1中的多出的消息5該做如何處理呢?
  • 這里就需要HW的協(xié)同配合了扫沼。如前所述,一個(gè)partition中的ISR列表中讨便,leader的HW是所有ISR列表里副本中最小的那個(gè)的LEO充甚。類似于木桶原理,水位取決于最低那塊短板霸褒。


    HW
  • 如上圖,某個(gè)topic的某partition有三個(gè)副本盈蛮,分別為A废菱、B、C抖誉。A作為leader肯定是LEO最高殊轴,B緊隨其后,C機(jī)器由于配置比較低袒炉,網(wǎng)絡(luò)比較差旁理,故而同步最慢。這個(gè)時(shí)候A機(jī)器宕機(jī)我磁,這時(shí)候如果B成為leader孽文,假如沒有HW,在A重新恢復(fù)之后會(huì)做同步(makeFollower)操作夺艰,在宕機(jī)時(shí)log文件之后直接做追加操作芋哭,而假如B的LEO已經(jīng)達(dá)到了A的LEO,會(huì)產(chǎn)生數(shù)據(jù)不一致的情況郁副,所以使用HW來避免這種情況减牺。
  • A在做同步操作的時(shí)候,先將log文件截?cái)嗟街白约旱腍W的位置存谎,即3拔疚,之后再?gòu)腂中拉取消息進(jìn)行同步。
  • 如果失敗的follower恢復(fù)過來既荚,它首先將自己的log文件截?cái)嗟缴洗蝐heckpointed時(shí)刻的HW的位置稚失,之后再?gòu)膌eader中同步消息。leader掛掉會(huì)重新選舉固以,新的leader會(huì)發(fā)送“指令”讓其余的follower截?cái)嘀磷陨淼腍W的位置然后再拉取新的消息墩虹。
  • 當(dāng)ISR中的個(gè)副本的LEO不一致時(shí)嘱巾,如果此時(shí)leader掛掉,選舉新的leader時(shí)并不是按照LEO的高低進(jìn)行選舉诫钓,而是按照ISR中的順序選舉旬昭。
  • 假如跨機(jī)房部署kafka集群,可能因?yàn)榫W(wǎng)絡(luò)抖動(dòng)菌湃,節(jié)點(diǎn)不停離開和加入ISR问拘,或者會(huì)丟數(shù)據(jù)或者數(shù)據(jù)重復(fù)。

Leader選舉

  • 一條消息只有被ISR中的所有follower都從leader復(fù)制過去才會(huì)被認(rèn)為已提交惧所。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了leader骤坐,還沒來得及被任何follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失下愈。而對(duì)于producer而言纽绍,它可以選擇是否等待消息commit,這可以通過request.required.acks來設(shè)置势似。這種機(jī)制確保了只要ISR中有一個(gè)或者以上的follower拌夏,一條被commit的消息就不會(huì)丟失
  • 有一個(gè)很重要的問題是當(dāng)leader宕機(jī)了履因,怎樣在follower中選舉出新的leader障簿,因?yàn)閒ollower可能落后很多或者直接crash了,所以必須確保選擇“最新”的follower作為新的leader栅迄。一個(gè)基本的原則就是站故,如果leader不在了,新的leader必須擁有原來的leader commit的所有消息毅舆。這就需要做一個(gè)折中西篓,如果leader在表名一個(gè)消息被commit前等待更多的follower確認(rèn),那么在它掛掉之后就有更多的follower可以成為新的leader朗兵,但這也會(huì)造成吞吐率的下降污淋。
  • Kafka在Zookeeper中為每一個(gè)partition動(dòng)態(tài)的維護(hù)了一個(gè)ISR,這個(gè)ISR里的所有replica都跟上了leader余掖,只有ISR里的成員才能有被選為leader的可能(unclean.leader.election.enable=false)仍侥。在這種模式下聂使,對(duì)于f+1個(gè)副本绎晃,一個(gè)Kafka topic能在保證不丟失已經(jīng)commit消息的前提下容忍f個(gè)副本的失敗波附,在大多數(shù)使用場(chǎng)景下,這種模式是十分有利的冗美。事實(shí)上魔种,為了容忍f個(gè)副本的失敗,“少數(shù)服從多數(shù)”的方式和ISR在commit前需要等待的副本的數(shù)量是一樣的粉洼,但是ISR需要的總的副本的個(gè)數(shù)幾乎是“少數(shù)服從多數(shù)”的方式的一半节预。
  • 在ISR中至少有一個(gè)follower時(shí)叶摄,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某一個(gè)partition的所有replica都掛了安拟,就無法保證數(shù)據(jù)不丟失了蛤吓。這種情況下有兩種可行的方案:
    • 等待ISR中任意一個(gè)replica“活”過來,并且選它作為leader
    • 選擇第一個(gè)“活”過來的replica(并不一定是在ISR中)作為leader
  • 這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的抉擇糠赦。如果一定要等待ISR中的replica“活”過來会傲,那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中所有的replica都無法“活”過來了拙泽,或者數(shù)據(jù)丟失了淌山,這個(gè)partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過來的replica作為leader,而這個(gè)replica不是ISR中的replica,那即使它并不保障已經(jīng)包含了所有已commit的消息顾瞻,它也會(huì)成為leader而作為consumer的數(shù)據(jù)源泼疑。默認(rèn)情況下,Kafka采用第二種策略朋其,即unclean.leader.election.enable=true王浴,也可以將此參數(shù)設(shè)置為false來啟用第一種策略。
  • 細(xì)節(jié)參見深入剖析kafka架構(gòu)內(nèi)部原理

Kafka的發(fā)送模式

  • Kafka的發(fā)送模式由producer端的配置參數(shù)producer.type來設(shè)置梅猿,這個(gè)參數(shù)指定了在后臺(tái)線程中消息的發(fā)送方式是同步的還是異步的,默認(rèn)是同步的方式秒裕,即producer.type=sync袱蚓。如果設(shè)置成異步的模式,即producer.type=async几蜻,可以是producer以batch的形式push數(shù)據(jù)喇潘,這樣會(huì)極大的提高broker的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)梭稚。如果需要確保消息的可靠性颖低,必須要將producer.type設(shè)置為sync。
  • 對(duì)于異步模式弧烤,還有4個(gè)配套的參數(shù)忱屑,如下:


    異步模式的幾個(gè)參數(shù)
  • 以batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka producer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求暇昂。batch的數(shù)量大小可以通過producer的參數(shù)(batch.num.messages)控制莺戒。通過增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù)急波,當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡从铲。在比較新的版本中還有batch.size這個(gè)參數(shù)。注意澄暮,緩存消息的大小還會(huì)被message.max.bytes參數(shù)限制名段。
  • 在Kafka在0.8以前的版本中阱扬,是沒有Replication的,一旦某一個(gè)Broker宕機(jī)伸辟,則其上所有的Partition數(shù)據(jù)都不可被消費(fèi)麻惶,這與Kafka數(shù)據(jù)持久性及Delivery Guarantee的設(shè)計(jì)目標(biāo)相悖。同時(shí)Producer都不能再將數(shù)據(jù)存于這些Partition中自娩。
    • 如果Producer使用同步模式則Producer會(huì)在嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后拋出Exception用踩,用戶可以選擇停止發(fā)送后續(xù)數(shù)據(jù)也可選擇繼續(xù)選擇發(fā)送。而前者會(huì)造成數(shù)據(jù)的阻塞忙迁,后者會(huì)造成本應(yīng)發(fā)往該Broker的數(shù)據(jù)的丟失脐彩。
    • 如果Producer使用異步模式,則Producer會(huì)嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后記錄該異常并繼續(xù)發(fā)送后續(xù)數(shù)據(jù)姊扔,這會(huì)造成數(shù)據(jù)丟失并且用戶只能通過日志發(fā)現(xiàn)該問題惠奸。異步發(fā)送可能丟數(shù)據(jù)的原因在這里。

消息去重

  • Kafka在producer端和consumer端都會(huì)出現(xiàn)消息的重復(fù)恰梢;
  • Kafka文檔中提及GUID(Globally Unique Identifier)的概念佛南,通過客戶端生成算法得到每個(gè)消息的unique id,同時(shí)可映射至broker上存儲(chǔ)的地址嵌言,即通過GUID便可查詢提取消息內(nèi)容嗅回,也便于發(fā)送方的冪等性保證,需要在broker上提供此去重處理模塊摧茴,目前版本尚不支持绵载。
  • 針對(duì)GUID, 如果從客戶端的角度去重,那么需要引入集中式緩存苛白,必然會(huì)增加依賴復(fù)雜度娃豹,另外緩存的大小難以界定。
  • 不只是Kafka, 類似RabbitMQ以及RocketMQ這類商業(yè)級(jí)中間件也只保障at least once, 且也無法從自身去進(jìn)行消息去重购裙。所以我們建議業(yè)務(wù)方根據(jù)自身的業(yè)務(wù)特點(diǎn)進(jìn)行去重懂版,比如業(yè)務(wù)消息本身具備冪等性,或者借助Redis等其他產(chǎn)品進(jìn)行去重處理躏率。

高可靠性配置

  • Kafka提供了很高的數(shù)據(jù)冗余彈性躯畴,對(duì)于需要數(shù)據(jù)高可靠性的場(chǎng)景,我們可以增加數(shù)據(jù)冗余備份數(shù)(replication.factor)禾锤,調(diào)高最小寫入副本數(shù)的個(gè)數(shù)(min.insync.replicas)等等私股,但是這樣會(huì)影響性能。反之恩掷,性能提高而可靠性則降低倡鲸,用戶需要自身業(yè)務(wù)特性在彼此之間做一些權(quán)衡性選擇。

  • 要保證數(shù)據(jù)寫入到Kafka是安全的黄娘,高可靠的峭状,需要如下的配置:

    • topic的配置:replication.factor>=3,即副本數(shù)至少是3個(gè)克滴;2<=min.insync.replicas<=replication.factor;
    • broker的配置:leader的選舉條件unclean.leader.election.enable=false
      producer的配置:request.required.acks=-1(all)优床,producer.type=sync

性能測(cè)試相關(guān)

消息長(zhǎng)度對(duì)吞吐率的影響1

消息長(zhǎng)度對(duì)吞吐率的影響2

Partition Number VS. Throughput

Replica Number VS. Throughput

Consumer Only

Producer Only

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市胆敞,隨后出現(xiàn)的幾起案子着帽,更是在濱河造成了極大的恐慌,老刑警劉巖移层,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仍翰,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡观话,警方通過查閱死者的電腦和手機(jī)予借,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來频蛔,“玉大人灵迫,你說我怎么就攤上這事』尴” “怎么了瀑粥?”我有些...
    開封第一講書人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)三圆。 經(jīng)常有香客問我利凑,道長(zhǎng),這世上最難降的妖魔是什么嫌术? 我笑而不...
    開封第一講書人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮牌借,結(jié)果婚禮上度气,老公的妹妹穿的比我還像新娘。我一直安慰自己膨报,他們只是感情好磷籍,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著现柠,像睡著了一般院领。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上够吩,一...
    開封第一講書人閱讀 52,156評(píng)論 1 308
  • 那天比然,我揣著相機(jī)與錄音,去河邊找鬼周循。 笑死强法,一個(gè)胖子當(dāng)著我的面吹牛万俗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饮怯,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼闰歪,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了蓖墅?” 一聲冷哼從身側(cè)響起库倘,我...
    開封第一講書人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎论矾,沒想到半個(gè)月后教翩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡拇囊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年迂曲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片寥袭。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡路捧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出传黄,到底是詐尸還是另有隱情杰扫,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布膘掰,位于F島的核電站章姓,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏识埋。R本人自食惡果不足惜凡伊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望窒舟。 院中可真熱鬧系忙,春花似錦、人聲如沸惠豺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洁墙。三九已至蛹疯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間热监,已是汗流浹背捺弦。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人羹呵。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓骂际,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親冈欢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子歉铝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容