正文
一憔古、Kafka的架構(gòu)
如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View佳魔,或者是服務(wù)器日志凭迹,系統(tǒng)CPU颅崩、Memory等),若干broker(Kafka支持水平擴展蕊苗,一般broker數(shù)量越多,集群吞吐率越高)沿彭,若干Consumer Group朽砰,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置喉刘,選舉leader瞧柔,以及在Consumer Group發(fā)生變化時進行rebalance。Producer使用push模式將消息發(fā)布到broker睦裳,Consumer使用pull模式從broker訂閱并消費消息造锅。
二、Topics和Partition
Topic在邏輯上可以被認為是一個queue廉邑,每條消費都必須指定它的Topic哥蔚,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高蛛蒙,物理上把Topic分成一個或多個Partition糙箍,每個Partition在物理上對應(yīng)一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件牵祟。創(chuàng)建一個topic時深夯,同時可以指定分區(qū)數(shù)目,分區(qū)數(shù)越多诺苹,其吞吐量也越大咕晋,但是需要的資源也越多,同時也會導致更高的不可用性收奔,kafka在接收到生產(chǎn)者發(fā)送的消息之后掌呜,會根據(jù)均衡策略將消息存儲到不同的分區(qū)中。因為每條消息都被append到該Partition中坪哄,屬于順序?qū)懘疟P站辉,因此效率非常高(經(jīng)驗證,順序?qū)懘疟P效率比隨機寫內(nèi)存還要高损姜,這是Kafka高吞吐率的一個很重要的保證)饰剥。
對于傳統(tǒng)的message queue而言,一般會刪除已經(jīng)被消費的消息摧阅,而Kafka集群會保留所有的消息汰蓉,無論其被消費與否。當然棒卷,因為磁盤限制顾孽,不可能永久保留所有數(shù)據(jù)(實際上也沒必要)祝钢,因此Kafka提供兩種策略刪除舊數(shù)據(jù)。一是基于時間若厚,二是基于Partition文件大小拦英。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數(shù)據(jù)测秸,也可在Partition文件超過1GB時刪除舊數(shù)據(jù)疤估,配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān)霎冯,所以這里刪除過期文件與提高Kafka性能無關(guān)铃拇。選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。另外沈撞,Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position慷荔,也即offset。這個offset由Consumer控制缠俺。正常情況下Consumer會在消費完一條消息后遞增該offset显晶。當然,Consumer也可將offset設(shè)成一個較小的值壹士,重新消費一些消息吧碾。因為offet由Consumer控制,所以Kafka broker是無狀態(tài)的墓卦,它不需要標記哪些消息被哪些消費過倦春,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制落剪,這也為Kafka的高吞吐率提供了有力保障睁本。
三、Producer消息路由
Producer發(fā)送消息到broker時忠怖,會根據(jù)Paritition機制選擇將其存儲到哪一個Partition呢堰。如果Partition機制設(shè)置合理,所有消息可以均勻分布到不同的Partition里凡泣,這樣就實現(xiàn)了負載均衡枉疼。如果一個Topic對應(yīng)一個文件,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸鞋拟,而有了Partition后骂维,不同的消息可以并行寫入不同broker的不同Partition里,極大的提高了吞吐率贺纲『焦耄可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數(shù)量,也可在創(chuàng)建Topic時通過參數(shù)指定,同時也可以在Topic創(chuàng)建之后通過Kafka提供的工具修改潦刃。
在發(fā)送一條消息時侮措,可以指定這條消息的key,Producer根據(jù)這個key和Partition機制來判斷應(yīng)該將這條消息發(fā)送到哪個Parition乖杠。Paritition機制可以通過指定Producer的paritition. class這一參數(shù)來指定分扎,該class必須實現(xiàn)kafka.producer.Partitioner接口。
四胧洒、Consumer Group
使用Consumer high level API時畏吓,同一Topic的一條消息只能被同一個Consumer Group內(nèi)的一個Consumer消費,但多個Consumer Group可同時消費這一消息略荡。
這是Kafka用來實現(xiàn)一個Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個Consumer)的手段。一個Topic可以對應(yīng)多個Consumer Group歉胶。如果需要實現(xiàn)廣播汛兜,只要每個Consumer有一個獨立的Group就可以了。要實現(xiàn)單播只要所有的Consumer在同一個Group里通今。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發(fā)送消息到不同的Topic粥谬。
實際上,Kafka的設(shè)計理念之一就是同時提供離線處理和實時處理辫塌。根據(jù)這一特性漏策,可以使用Storm這種實時流處理系統(tǒng)對消息進行實時在線處理,同時使用Hadoop這種批處理系統(tǒng)進行離線處理臼氨,還可以同時將數(shù)據(jù)實時備份到另一個數(shù)據(jù)中心掺喻,只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可。
五储矩、Push vs. Pull? ? (push 推送感耙,pull拉取,對于消費者來說持隧,push是被動接收即硼,pull是主動接收。 而對于服務(wù)者來說屡拨,push是主動推送只酥,而pull是被動推送)
作為一個消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式呀狼,選擇由Producer向broker push消息并由Consumer從broker pull消息裂允。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume哥艇,采用push模式叫胖。事實上,push模式和pull模式各有優(yōu)劣她奥。
push模式很難適應(yīng)消費速率不同的消費者瓮增,因為消息發(fā)送速率是由broker決定的怎棱。push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息绷跑,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞拳恋。而pull模式則可以根據(jù)Consumer的消費能力以適當?shù)乃俾氏M消息。
對于Kafka而言砸捏,pull模式更合適谬运。pull模式可簡化broker的設(shè)計,Consumer可自主控制消費消息的速率垦藏,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費梆暖,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
六掂骏、Kafka delivery guarantee
有這么幾種可能的delivery guarantee:
At most once 消息可能會丟轰驳,但絕不會重復(fù)傳輸
At least one ? 消息絕不會丟,但可能會重復(fù)傳輸
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次弟灼,很多時候這是用戶所想要的级解。
當Producer向broker發(fā)送消息時,一旦這條消息被commit田绑,因數(shù)replication的存在勤哗,它就不會丟。但是如果Producer發(fā)送數(shù)據(jù)給broker后掩驱,遇到網(wǎng)絡(luò)問題而造成通信中斷芒划,那Producer就無法判斷該條消息是否已經(jīng)commit。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么欧穴,但是Producer可以生成一種類似于主鍵的東西腊状,發(fā)生故障時冪等性的重試多次,這樣就做到了Exactly once苔可。
接下來討論的是消息從broker到Consumer的delivery guarantee語義缴挖。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息后焚辅,可以選擇commit映屋,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取同蜻。如未commit棚点,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當然可以將Consumer設(shè)置為autocommit湾蔓,即Consumer一旦讀到數(shù)據(jù)立即自動commit瘫析。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應(yīng)用程序并非在Consumer讀取完數(shù)據(jù)就結(jié)束了贬循,而是要進行進一步處理咸包,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
Kafka默認保證At least once杖虾,并且允許通過設(shè)置Producer異步提交來實現(xiàn)At most once烂瘫。而Exactly once要求與外部存儲系統(tǒng)協(xié)作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式奇适。