轉(zhuǎn)載:[http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/](http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/)
這篇文章在初次接觸kafka時(shí)季眷,便拜讀過,確實(shí)寫的很好胞谭,另外其他系列文章也同樣有可取之處汽久。
Kafka系列文章
[Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹](http://www.jasongj.com/2015/03/10/KafkaColumn1/)
[Kafka設(shè)計(jì)解析(二)- Kafka High Availability (上)](http://www.jasongj.com/2015/04/24/KafkaColumn2/)
[Kafka設(shè)計(jì)解析(三)- Kafka High Availability (下)](http://www.jasongj.com/2015/06/08/KafkaColumn3/)
[Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析](http://www.jasongj.com/2015/08/09/KafkaColumn4/)
[Kafka設(shè)計(jì)解析(五)- Kafka性能測試方法及Benchmark報(bào)告](http://www.jasongj.com/2015/12/31/KafkaColumn5_kafka_benchmark/)
背景介紹
Kafka簡介
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)柑司。主要設(shè)計(jì)目標(biāo)如下:
- 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能
- 高吞吐率锅劝。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸
- 支持Kafka Server間的消息分區(qū)攒驰,及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸
- 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理
為什么要用消息系統(tǒng)
解耦
在項(xiàng)目啟動之初來預(yù)測將來項(xiàng)目會碰到什么需求故爵,是極其困難的玻粪。消息隊(duì)列在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層稠集,兩邊的處理過程都要實(shí)現(xiàn)這一接口奶段。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束
冗余
有些情況下剥纷,處理數(shù)據(jù)的過程會失敗痹籍。除非數(shù)據(jù)被持久化,否則將造成丟失晦鞋。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理蹲缠,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。在被許多消息隊(duì)列所采用的”插入-獲取-刪除”范式中悠垛,在把一個(gè)消息從隊(duì)列中刪除之前线定,需要你的處理過程明確的指出該消息已經(jīng)被處理完畢,確保你的數(shù)據(jù)被安全的保存直到你使用完畢确买。
擴(kuò)展性
因?yàn)橄㈥?duì)列解耦了你的處理過程斤讥,所以增大消息入隊(duì)和處理的頻率是很容易的;只要另外增加處理過程即可湾趾。不需要改變代碼芭商、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡單搀缠。
靈活性 & 峰值處理能力
在訪問量劇增的情況下铛楣,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見艺普;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)簸州。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰歧譬。
可恢復(fù)性
當(dāng)體系的一部分組件失效岸浑,不會影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度瑰步,所以即使一個(gè)處理消息的進(jìn)程掛掉助琐,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。而這種允許重試或者延后處理請求的能力通常是造就一個(gè)略感不便的用戶和一個(gè)沮喪透頂?shù)挠脩糁g的區(qū)別面氓。
送達(dá)保證
消息隊(duì)列提供的冗余機(jī)制保證了消息能被實(shí)際的處理兵钮,只要一個(gè)進(jìn)程讀取了該隊(duì)列即可。在此基礎(chǔ)上舌界,IronMQ提供了一個(gè)”只送達(dá)一次”保證掘譬。無論有多少進(jìn)程在從隊(duì)列中領(lǐng)取數(shù)據(jù),每一個(gè)消息只能被處理一次呻拌。這之所以成為可能葱轩,是因?yàn)楂@取一個(gè)消息只是”預(yù)定”了這個(gè)消息,暫時(shí)把它移出了隊(duì)列藐握。除非客戶端明確的表示已經(jīng)處理完了這個(gè)消息靴拱,否則這個(gè)消息會被放回隊(duì)列中去,在一段可配置的時(shí)間之后可再次被處理猾普。
順序保證
在大多使用場景下袜炕,數(shù)據(jù)處理的順序都很重要。消息隊(duì)列本來就是排序的初家,并且能保證數(shù)據(jù)會按照特定的順序來處理偎窘。IronMO保證消息通過FIFO(先進(jìn)先出)的順序來處理污它,因此消息在隊(duì)列中的位置就是從隊(duì)列中檢索他們的位置哪审。
緩沖
在任何重要的系統(tǒng)中国拇,都會有需要不同的處理時(shí)間的元素础锐。例如,加載一張圖片比應(yīng)用過濾器花費(fèi)更少的時(shí)間镇辉。消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行–寫入隊(duì)列的處理會盡可能的快速靠瞎,而不受從隊(duì)列讀的預(yù)備處理的約束逻悠。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度俏站。
理解數(shù)據(jù)流
在一個(gè)分布式系統(tǒng)里志笼,要得到一個(gè)關(guān)于用戶操作會用多長時(shí)間及其原因的總體印象沿盅,是個(gè)巨大的挑戰(zhàn)。消息隊(duì)列通過消息被處理的頻率籽腕,來方便的輔助確定那些表現(xiàn)不佳的處理過程或領(lǐng)域嗡呼,這些地方的數(shù)據(jù)流都不夠優(yōu)化。
異步通信
很多時(shí)候皇耗,你不想也不需要立即處理消息南窗。消息隊(duì)列提供了異步處理機(jī)制,允許你把一個(gè)消息放入隊(duì)列郎楼,但并不立即處理它万伤。你想向隊(duì)列中放入多少消息就放多少,然后在你樂意的時(shí)候再去處理它們呜袁。
常用Message Queue對比
RabbitMQ
RabbitMQ是使用Erlang編寫的一個(gè)開源的消息隊(duì)列敌买,本身支持很多的協(xié)議:AMQP,XMPP, SMTP, STOMP阶界,也正因如此虹钮,它非常重量級聋庵,更適合于企業(yè)級的開發(fā)。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架芙粱,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)祭玉。對路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持春畔。
Redis
Redis是一個(gè)基于Key-Value對的NoSQL數(shù)據(jù)庫脱货,開發(fā)維護(hù)很活躍。雖然它是一個(gè)Key-Value數(shù)據(jù)庫存儲系統(tǒng)律姨,但它本身支持MQ功能振峻,所以完全可以當(dāng)做一個(gè)輕量級的隊(duì)列服務(wù)來使用。對于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作择份,各執(zhí)行100萬次扣孟,每10萬次記錄一次執(zhí)行時(shí)間。測試數(shù)據(jù)分為128Bytes缓淹、512Bytes哈打、1K和10K四個(gè)不同大小的數(shù)據(jù)。實(shí)驗(yàn)表明:入隊(duì)時(shí)讯壶,當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ料仗,而如果數(shù)據(jù)大小超過了10K,Redis則慢的無法忍受伏蚊;出隊(duì)時(shí)立轧,無論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能躏吊,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis氛改。
ZeroMQ
ZeroMQ號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景比伏。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長的高級/復(fù)雜的隊(duì)列胜卤,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對這MQ能夠應(yīng)用成功的挑戰(zhàn)赁项。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式葛躏,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪萘诉@個(gè)服務(wù)角色悠菜。你只需要簡單的引用ZeroMQ程序庫舰攒,可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了悔醋。但是ZeroMQ僅提供非持久性的隊(duì)列摩窃,也就是說如果宕機(jī),數(shù)據(jù)將會丟失芬骄。其中猾愿,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)鹦聪。
ActiveMQ
ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類似于ZeroMQ匪蟀,它能夠以代理人和點(diǎn)對點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列椎麦。同時(shí)類似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級應(yīng)用場景材彪。
Kafka/Jafka
Kafka是Apache下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng)琴儿,而Jafka是在Kafka之上孵化而來的段化,即Kafka的一個(gè)升級版。具有以下特性:快速持久化造成,可以在O(1)的系統(tǒng)開銷下進(jìn)行消息持久化显熏;高吞吐,在一臺普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率晒屎;完全的分布式系統(tǒng)喘蟆,Broker、Producer鼓鲁、Consumer都原生自動支持分布式蕴轨,自動實(shí)現(xiàn)負(fù)載均衡;支持Hadoop數(shù)據(jù)并行加載骇吭,對于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)橙弱,但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案燥狰。Kafka通過Hadoop的并行加載機(jī)制來統(tǒng)一了在線和離線的消息處理棘脐。Apache Kafka相對于ActiveMQ是一個(gè)非常輕量級的消息系統(tǒng),除了性能非常好之外龙致,還是一個(gè)工作良好的分布式系統(tǒng)蛀缝。
Kafka解析
Terminology
Broker
Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker
Topic
每條發(fā)布到Kafka集群的消息都有一個(gè)類別目代,這個(gè)類別被稱為topic屈梁。(物理上不同topic的消息分開存儲,邏輯上一個(gè)topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
Partition
parition是物理上的概念像啼,每個(gè)topic包含一個(gè)或多個(gè)partition俘闯,創(chuàng)建topic時(shí)可指定parition數(shù)量。每個(gè)partition對應(yīng)于一個(gè)文件夾忽冻,該文件夾下存儲該partition的數(shù)據(jù)和索引文件
Producer
負(fù)責(zé)發(fā)布消息到Kafka broker
Consumer
消費(fèi)消息真朗。每個(gè)consumer屬于一個(gè)特定的consumer group(可為每個(gè)consumer指定group name,若不指定group name則屬于默認(rèn)的group)僧诚。使用consumer high level API時(shí)遮婶,同一topic的一條消息只能被同一個(gè)consumer group內(nèi)的一個(gè)consumer消費(fèi)蝗碎,但多個(gè)consumer group可同時(shí)消費(fèi)這一消息。
Kafka架構(gòu)
如上圖所示旗扑,一個(gè)典型的kafka集群中包含若干producer(可以是web前端產(chǎn)生的page view蹦骑,或者是服務(wù)器日志,系統(tǒng)CPU臀防、memory等)眠菇,若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多袱衷,集群吞吐率越高)捎废,若干consumer group,以及一個(gè)Zookeeper集群致燥。Kafka通過Zookeeper管理集群配置登疗,選舉leader,以及在consumer group發(fā)生變化時(shí)進(jìn)行rebalance嫌蚤。producer使用push模式將消息發(fā)布到broker辐益,consumer使用pull模式從broker訂閱并消費(fèi)消息。
Push vs. Pull
作為一個(gè)messaging system脱吱,Kafka遵循了傳統(tǒng)的方式智政,選擇由producer向broker push消息并由consumer從broker pull消息。一些logging-centric system急凰,比如Facebook的Scribe和Cloudera的Flume,采用非常不同的push模式女仰。事實(shí)上,push模式和pull模式各有優(yōu)劣抡锈。
push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者疾忍,因?yàn)橄l(fā)送速率是由broker決定的。push模式的目標(biāo)是盡可能以最快速度傳遞消息床三,但是這樣很容易造成consumer來不及處理消息一罩,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息撇簿。
Topic & Partition
Topic在邏輯上可以被認(rèn)為是一個(gè)queue聂渊。每條消費(fèi)都必須指定它的topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里四瘫。為了使得Kafka的吞吐率可以水平擴(kuò)展汉嗽,物理上把topic分成一個(gè)或多個(gè)partition,每個(gè)partition在物理上對應(yīng)一個(gè)文件夾找蜜,該文件夾下存儲這個(gè)partition的所有消息和索引文件饼暑。
每個(gè)日志文件都是“l(fā)og entries”序列,每一個(gè)log entry包含一個(gè)4字節(jié)整型數(shù)(值為N),其后跟N個(gè)字節(jié)的消息體弓叛。每條消息都有一個(gè)當(dāng)前partition下唯一的64字節(jié)的offset彰居,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
這個(gè)“l(fā)og entries”并非由一個(gè)文件構(gòu)成撰筷,而是分成多個(gè)segment陈惰,每個(gè)segment名為該segment第一條消息的offset和“.kafka”組成。另外會有一個(gè)索引文件毕籽,它標(biāo)明了每個(gè)segment下包含的log entry的offset范圍抬闯,如下圖所示。
因?yàn)槊織l消息都被append到該partition中关筒,是順序?qū)懘疟P画髓,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高平委,這是Kafka高吞吐率的一個(gè)很重要的保證)。
每一條消息被發(fā)送到broker時(shí)夺谁,會根據(jù)paritition規(guī)則選擇被存儲到哪一個(gè)partition廉赔。如果partition規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的partition里匾鸥,這樣就實(shí)現(xiàn)了水平擴(kuò)展蜡塌。(如果一個(gè)topic對應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器I/O將會成為這個(gè)topic的性能瓶頸勿负,而partition解決了這個(gè)問題)馏艾。
在創(chuàng)建topic時(shí)可以在$KAFKA_HOME/config/server.properties中指定這個(gè)partition的數(shù)量(如下所示),當(dāng)然也可以在topic創(chuàng)建之后去修改parition數(shù)量奴愉。
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
在發(fā)送一條消息時(shí)琅摩,可以指定這條消息的key,producer根據(jù)這個(gè)key和partition機(jī)制來判斷將這條消息發(fā)送到哪個(gè)parition锭硼。paritition機(jī)制可以通過指定producer的paritition. class這一參數(shù)來指定房资,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口。本例中如果key可以被解析為整數(shù)則將對應(yīng)的整數(shù)與partition總數(shù)取余檀头,該消息會被發(fā)送到該數(shù)對應(yīng)的partition轰异。(每個(gè)parition都會有個(gè)序號)
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
//如果將上例中的class作為partition.class,并通過如下代碼發(fā)送20條消息(key分別為0暑始,1搭独,2,3)至topic2(包含4個(gè)partition)廊镜。
public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++){
messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
}
producer.send(messageList);
}
producer.close();
}
則key相同的消息會被發(fā)送并存儲到同一個(gè)partition里牙肝,而且key的序號正好和partition序號相同。(partition序號從0開始,本例中的key也正好從0開始)惊奇。如下圖所示互躬。
對于傳統(tǒng)的message queue而言,一般會刪除已經(jīng)被消費(fèi)的消息颂郎,而Kafka集群會保留所有的消息吼渡,無論其被消費(fèi)與否。當(dāng)然乓序,因?yàn)榇疟P限制寺酪,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒必要),因此Kafka提供兩種策略去刪除舊數(shù)據(jù)替劈。一是基于時(shí)間寄雀,二是基于partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties陨献,讓Kafka刪除一周前的數(shù)據(jù)盒犹,也可通過配置讓Kafka在partition文件超過1GB時(shí)刪除舊數(shù)據(jù),如下所示眨业。
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false
這里要注意急膀,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無關(guān)龄捡,所以這里刪除文件與Kafka性能無關(guān)卓嫂,選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。另外聘殖,Kafka會為每一個(gè)consumer group保留一些metadata信息–當(dāng)前消費(fèi)的消息的position晨雳,也即offset。這個(gè)offset由consumer控制奸腺。正常情況下consumer會在消費(fèi)完一條消息后線性增加這個(gè)offset餐禁。當(dāng)然,consumer也可將offset設(shè)成一個(gè)較小的值洋机,重新消費(fèi)一些消息坠宴。因?yàn)閛ffet由consumer控制,所以Kafka broker是無狀態(tài)的绷旗,它不需要標(biāo)記哪些消息被哪些consumer過喜鼓,不需要通過broker去保證同一個(gè)consumer group只有一個(gè)consumer能消費(fèi)某一條消息,因此也就不需要鎖機(jī)制衔肢,這也為Kafka的高吞吐率提供了有力保障庄岖。
Replication & Leader election
Kafka從0.8開始提供partition級別的replication,replication的數(shù)量可在$KAFKA_HOME/config/server.properties中配置角骤。
default.replication.factor = 1
該 Replication與leader election配合提供了自動的failover機(jī)制隅忿。replication對Kafka的吞吐率是有一定影響的心剥,但極大的增強(qiáng)了可用性。默認(rèn)情況下背桐,Kafka的replication數(shù)量為1优烧。每個(gè)partition都有一個(gè)唯一的leader,所有的讀寫操作都在leader上完成链峭,leader批量從leader上pull數(shù)據(jù)畦娄。一般情況下partition的數(shù)量大于等于broker的數(shù)量,并且所有partition的leader均勻分布在broker上弊仪。follower上的日志和其leader上的完全一樣熙卡。
和大部分分布式系統(tǒng)一樣,Kakfa處理失敗需要明確定義一個(gè)broker是否alive励饵。對于Kafka而言驳癌,Kafka存活包含兩個(gè)條件,一是它必須維護(hù)與Zookeeper的session(這個(gè)通過Zookeeper的heartbeat機(jī)制來實(shí)現(xiàn))役听。二是follower必須能夠及時(shí)將leader的writing復(fù)制過來颓鲜,不能“落后太多”。
leader會track“in sync”的node list典予。如果一個(gè)follower宕機(jī)灾杰,或者落后太多,leader將把它從”in sync” list中移除熙参。這里所描述的“落后太多”指follower復(fù)制的消息落后于leader后的條數(shù)超過預(yù)定值,該值可在$KAFKA_HOME/config/server.properties中配置
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000
#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000
需要說明的是麦备,Kafka只解決”fail/recover”孽椰,不處理“Byzantine”(“拜占庭”)問題。
一條消息只有被“in sync” list里的所有follower都從leader復(fù)制過去才會被認(rèn)為已提交凛篙。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了leader黍匾,還沒來得及被任何follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(consumer無法消費(fèi)這些數(shù)據(jù))呛梆。而對于producer而言锐涯,它可以選擇是否等待消息commit,這可以通過request.required.acks來設(shè)置填物。這種機(jī)制確保了只要“in sync” list有一個(gè)或以上的flollower纹腌,一條被commit的消息就不會丟失。
這里的復(fù)制機(jī)制即不是同步復(fù)制滞磺,也不是單純的異步復(fù)制升薯。事實(shí)上,同步復(fù)制要求“活著的”follower都復(fù)制完击困,這條消息才會被認(rèn)為commit涎劈,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù)蛛枚,數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit谅海,這種情況下如果follwer都落后于leader,而leader突然宕機(jī)蹦浦,則會丟失數(shù)據(jù)扭吁。而Kafka的這種使用“in sync” list的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。follower可以批量的從leader復(fù)制數(shù)據(jù)白筹,這樣極大的提高復(fù)制性能(批量寫磁盤)智末,極大減少了follower與leader的差距(前文有說到,只要follower落后leader不太遠(yuǎn)徒河,則被認(rèn)為在“in sync” list里)系馆。
上文說明了Kafka是如何做replication的,另外一個(gè)很重要的問題是當(dāng)leader宕機(jī)了顽照,怎樣在follower中選舉出新的leader由蘑。因?yàn)閒ollower可能落后許多或者crash了,所以必須確保選擇“最新”的follower作為新的leader代兵。一個(gè)基本的原則就是尼酿,如果leader不在了,新的leader必須擁有原來的leader commit的所有消息植影。這就需要作一個(gè)折衷裳擎,如果leader在標(biāo)明一條消息被commit前等待更多的follower確認(rèn),那在它die之后就有更多的follower可以作為新的leader思币,但這也會造成吞吐率的下降鹿响。
一種非常常用的選舉leader的方式是“majority vote”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式谷饿。這種模式下惶我,如果我們有2f+1個(gè)replica(包含leader和follower),那在commit之前必須保證有f+1個(gè)replica復(fù)制完消息博投,為了保證正確選出新的leader绸贡,fail的replica不能超過f個(gè)。因?yàn)樵谑O碌娜我鈌+1個(gè)replica里毅哗,至少有一個(gè)replica包含有最新的所有消息听怕。這種方式有個(gè)很大的優(yōu)勢,系統(tǒng)的latency只取決于最快的幾臺server虑绵,也就是說叉跛,如果replication factor是3,那latency就取決于最快的那個(gè)follower而非最慢那個(gè)蒸殿。majority vote也有一些劣勢筷厘,為了保證leader election的正常進(jìn)行鸣峭,它所能容忍的fail的follower個(gè)數(shù)比較少。如果要容忍1個(gè)follower掛掉酥艳,必須要有3個(gè)以上的replica摊溶,如果要容忍2個(gè)follower掛掉,必須要有5個(gè)以上的replica充石。也就是說莫换,在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)程度,必須要有大量的replica骤铃,而大量的replica又會在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降拉岁。這就是這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲大量數(shù)據(jù)的系統(tǒng)中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal惰爬,但是它的數(shù)據(jù)存儲并沒有使用這種expensive的方式喊暖。
實(shí)際上,leader election算法非常多撕瞧,比如Zookeper的Zab, Raft和Viewstamped Replication陵叽。而Kafka所使用的leader election算法更像微軟的PacificA算法。
Kafka在Zookeeper中動態(tài)維護(hù)了一個(gè)ISR(in-sync replicas) set丛版,這個(gè)set里的所有replica都跟上了leader巩掺,只有ISR里的成員才有被選為leader的可能。在這種模式下页畦,對于f+1個(gè)replica胖替,一個(gè)Kafka topic能在保證不丟失已經(jīng)ommit的消息的前提下容忍f個(gè)replica的失敗。在大多數(shù)使用場景中豫缨,這種模式是非常有利的刊殉。事實(shí)上,為了容忍f個(gè)replica的失敗州胳,majority vote和ISR在commit前需要等待的replica數(shù)量是一樣的,但是ISR需要的總的replica的個(gè)數(shù)幾乎是majority vote的一半逸月。
雖然majority vote與ISR相比有不需等待最慢的server這一優(yōu)勢栓撞,但是Kafka作者認(rèn)為Kafka可以通過producer選擇是否被commit阻塞來改善這一問題,并且節(jié)省下來的replica和磁盤使得ISR模式仍然值得碗硬。
上文提到瓤湘,在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失恩尾,但如果某一個(gè)partition的所有replica都掛了弛说,就無法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:
- 等待ISR中的任一個(gè)replica“活”過來翰意,并且選它作為leader
- 選擇第一個(gè)“活”過來的replica(不一定是ISR中的)作為leader
這就需要在可用性和一致性當(dāng)中作出一個(gè)簡單的平衡木人。如果一定要等待ISR中的replica“活”過來信柿,那不可用的時(shí)間就可能會相對較長。而且如果ISR中的所有replica都無法“活”過來了醒第,或者數(shù)據(jù)都丟失了渔嚷,這個(gè)partition將永遠(yuǎn)不可用竟纳。選擇第一個(gè)“活”過來的replica作為leader肩袍,而這個(gè)replica不是ISR中的replica,那即使它并不保證已經(jīng)包含了所有已commit的消息蛾找,它也會成為leader而作為consumer的數(shù)據(jù)源(前文有說明霞幅,所有讀寫都由leader完成)漠吻。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔司恳,在以后的版本中途乃,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場景選擇高可用性還是強(qiáng)一致性抵赢。
上文說明了一個(gè)parition的replication過程欺劳,然爾Kafka集群需要管理成百上千個(gè)partition,Kafka通過round-robin的方式來平衡partition從而避免大量partition集中在了少數(shù)幾個(gè)節(jié)點(diǎn)上铅鲤。同時(shí)Kafka也需要平衡leader的分布划提,盡可能的讓所有partition的leader均勻分布在不同broker上。另一方面邢享,優(yōu)化leadership election的過程也是很重要的鹏往,畢竟這段時(shí)間相應(yīng)的partition處于不可用狀態(tài)。一種簡單的實(shí)現(xiàn)是暫停宕機(jī)的broker上的所有partition骇塘,并為之選舉leader伊履。實(shí)際上,Kafka選舉一個(gè)broker作為controller款违,這個(gè)controller通過watch Zookeeper檢測所有的broker failure唐瀑,并負(fù)責(zé)為所有受影響的parition選舉leader,再將相應(yīng)的leader調(diào)整命令發(fā)送至受影響的broker插爹,過程如下圖所示哄辣。
這樣做的好處是,可以批量的通知leadership的變化赠尾,從而使得選舉過程成本更低力穗,尤其對大量的partition而言。如果controller失敗了气嫁,幸存的所有broker都會嘗試在Zookeeper中創(chuàng)建/controller->{this broker id}当窗,如果創(chuàng)建成功(只可能有一個(gè)創(chuàng)建成功),則該broker會成為controller寸宵,若創(chuàng)建不成功崖面,則該broker會等待新controller的命令元咙。
Consumer group
(本節(jié)所有描述都是基于consumer hight level API而非low level API)。
每一個(gè)consumer實(shí)例都屬于一個(gè)consumer group嘶朱,每一條消息只會被同一個(gè)consumer group里的一個(gè)consumer實(shí)例消費(fèi)蛾坯。(不同consumer group可以同時(shí)消費(fèi)同一條消息)
很多傳統(tǒng)的message queue都會在消息被消費(fèi)完后將消息刪除,一方面避免重復(fù)消費(fèi)疏遏,另一方面可以保證queue的長度比較少脉课,提高效率。而如上文所將财异,Kafka并不刪除已消費(fèi)的消息倘零,為了實(shí)現(xiàn)傳統(tǒng)message queue消息只被消費(fèi)一次的語義,Kafka保證保證同一個(gè)consumer group里只有一個(gè)consumer會消費(fèi)一條消息戳寸。與傳統(tǒng)message queue不同的是呈驶,Kafka還允許不同consumer group同時(shí)消費(fèi)同一條消息,這一特性可以為消息的多元化處理提供了支持疫鹊。實(shí)際上袖瞻,Kafka的設(shè)計(jì)理念之一就是同時(shí)提供離線處理和實(shí)時(shí)處理。根據(jù)這一特性拆吆,可以使用Storm這種實(shí)時(shí)流處理系統(tǒng)對消息進(jìn)行實(shí)時(shí)在線處理聋迎,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)實(shí)時(shí)備份到另一個(gè)數(shù)據(jù)中心枣耀,只需要保證這三個(gè)操作所使用的consumer在不同的consumer group即可霉晕。下圖展示了Kafka在Linkedin的一種簡化部署。
為了更清晰展示Kafka consumer group的特性捞奕,筆者作了一項(xiàng)測試牺堰。創(chuàng)建一個(gè)topic (名為topic1),創(chuàng)建一個(gè)屬于group1的consumer實(shí)例颅围,并創(chuàng)建三個(gè)屬于group2的consumer實(shí)例伟葫,然后通過producer向topic1發(fā)送key分別為1,2院促,3r的消息筏养。結(jié)果發(fā)現(xiàn)屬于group1的consumer收到了所有的這三條消息,同時(shí)group2中的3個(gè)consumer分別收到了key為1一疯,2,3的消息夺姑。如下圖所示墩邀。
Consumer Rebalance
(本節(jié)所講述內(nèi)容均基于Kafka consumer high level API)
Kafka保證同一consumer group中只有一個(gè)consumer會消費(fèi)某條消息,實(shí)際上盏浙,Kafka保證的是穩(wěn)定狀態(tài)下每一個(gè)consumer實(shí)例只會消費(fèi)某一個(gè)或多個(gè)特定partition的數(shù)據(jù)眉睹,而某個(gè)partition的數(shù)據(jù)只會被某一個(gè)特定的consumer實(shí)例所消費(fèi)荔茬。這樣設(shè)計(jì)的劣勢是無法讓同一個(gè)consumer group里的consumer均勻消費(fèi)數(shù)據(jù),優(yōu)勢是每個(gè)consumer不用都跟大量的broker通信竹海,減少通信開銷慕蔚,同時(shí)也降低了分配難度,實(shí)現(xiàn)也更簡單斋配。另外孔飒,因?yàn)橥粋€(gè)partition里的數(shù)據(jù)是有序的,這種設(shè)計(jì)可以保證每個(gè)partition里的數(shù)據(jù)也是有序被消費(fèi)艰争。
如果某consumer group中consumer數(shù)量少于partition數(shù)量坏瞄,則至少有一個(gè)consumer會消費(fèi)多個(gè)partition的數(shù)據(jù),如果consumer的數(shù)量與partition數(shù)量相同甩卓,則正好一個(gè)consumer消費(fèi)一個(gè)partition的數(shù)據(jù)鸠匀,而如果consumer的數(shù)量多于partition的數(shù)量時(shí),會有部分consumer無法消費(fèi)該topic下任何一條消息逾柿。
如下例所示缀棍,如果topic1有0,1机错,2共三個(gè)partition爬范,當(dāng)group1只有一個(gè)consumer(名為consumer1)時(shí),該 consumer可消費(fèi)這3個(gè)partition的所有數(shù)據(jù)毡熏。
增加一個(gè)consumer(consumer2)后坦敌,其中一個(gè)consumer(consumer1)可消費(fèi)2個(gè)partition的數(shù)據(jù),另外一個(gè)consumer(consumer2)可消費(fèi)另外一個(gè)partition的數(shù)據(jù)痢法。
再增加一個(gè)consumer(consumer3)后狱窘,每個(gè)consumer可消費(fèi)一個(gè)partition的數(shù)據(jù)。consumer1消費(fèi)partition0财搁,consumer2消費(fèi)partition1蘸炸,consumer3消費(fèi)partition2
再增加一個(gè)consumer(consumer4)后,其中3個(gè)consumer可分別消費(fèi)一個(gè)partition的數(shù)據(jù)尖奔,另外一個(gè)consumer(consumer4)不能消費(fèi)topic1任何數(shù)據(jù)搭儒。
此時(shí)關(guān)閉consumer1,剩下的consumer可分別消費(fèi)一個(gè)partition的數(shù)據(jù)提茁。
接著關(guān)閉consumer2淹禾,剩下的consumer3可消費(fèi)2個(gè)partition,consumer4可消費(fèi)1個(gè)partition茴扁。
再關(guān)閉consumer3铃岔,剩下的consumer4可同時(shí)消費(fèi)topic1的3個(gè)partition。
consumer rebalance算法如下:
Sort PT (all partitions in topic T)
Sort CG(all consumers in consumer group G)
Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
Remove current entries owned by Ci from the partition owner registry
Assign partitions from iN to (i+1)N-1 to consumer Ci
Add newly assigned partitions to the partition owner registry
目前consumer rebalance的控制策略是由每一個(gè)consumer通過Zookeeper完成的峭火。具體的控制方式如下:Register itself in the consumer id registry under its group.
Register a watch on changes under the consumer id registry.
Register a watch on changes under the broker id registry.
If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
Force itself to rebalance within in its consumer group.
在這種策略下毁习,每一個(gè)consumer或者broker的增加或者減少都會觸發(fā)consumer rebalance智嚷。因?yàn)槊總€(gè)consumer只負(fù)責(zé)調(diào)整自己所消費(fèi)的partition,為了保證整個(gè)consumer group的一致性纺且,所以當(dāng)一個(gè)consumer觸發(fā)了rebalance時(shí)盏道,該consumer group內(nèi)的其它所有consumer也應(yīng)該同時(shí)觸發(fā)rebalance。
目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式载碌。但該方式有不利的方面:
Herd effect
任何broker或者consumer的增減都會觸發(fā)所有的consumer的rebalanceSplit Brain
每個(gè)consumer分別單獨(dú)通過Zookeeper判斷哪些partition down了猜嘱,那么不同consumer從Zookeeper“看”到的view就可能不一樣,這就會造成錯(cuò)誤的reblance嘗試恐仑。而且有可能所有的consumer都認(rèn)為rebalance已經(jīng)完成了泉坐,但實(shí)際上可能并非如此。
根據(jù)Kafka官方文檔裳仆,Kafka作者正在考慮在還未發(fā)布的0.9.x版本中使用中心協(xié)調(diào)器(coordinator)腕让。大體思想是選舉出一個(gè)broker作為coordinator,由它watch Zookeeper歧斟,從而判斷是否有partition或者consumer的增減纯丸,然后生成rebalance命令,并檢查是否這些rebalance在所有相關(guān)的consumer中被執(zhí)行成功静袖,如果不成功則重試觉鼻,若成功則認(rèn)為此次rebalance成功(這個(gè)過程跟replication controller非常類似,所以我很奇怪為什么當(dāng)初設(shè)計(jì)replication controller時(shí)沒有使用類似方式來解決consumer rebalance的問題)队橙。流程如下:
消息Deliver guarantee
通過上文介紹坠陈,想必讀者已經(jīng)明天了producer和consumer是如何工作的,以及Kafka是如何做replication的捐康,接下來要討論的是Kafka如何確保消息在producer和consumer之間傳輸仇矾。有這么幾種可能的delivery guarantee:
- At most once 消息可能會丟,但絕不會重復(fù)傳輸
- At least one 消息絕不會丟解总,但可能會重復(fù)傳輸
- Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次贮匕,很多時(shí)候這是用戶所想要的。
Kafka的delivery guarantee semantic非常直接花枫。當(dāng)producer向broker發(fā)送消息時(shí)刻盐,一旦這條消息被commit,因數(shù)replication的存在劳翰,它就不會丟敦锌。但是如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問題而造成通信中斷佳簸,那producer就無法判斷該條消息是否已經(jīng)commit乙墙。這一點(diǎn)有點(diǎn)像向一個(gè)自動生成primary key的數(shù)據(jù)庫表中插入數(shù)據(jù)。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是producer可以生成一種類似于primary key的東西伶丐,發(fā)生故障時(shí)冪等性的retry多次,這樣就做到了Exactly one疯特。截止到目前(Kafka 0.8.2版本哗魂,2015-01-25),這一feature還并未實(shí)現(xiàn)漓雅,有希望在Kafka未來的版本中實(shí)現(xiàn)录别。(所以目前默認(rèn)情況下一條消息從producer和broker是確保了At least once,但可通過設(shè)置producer異步發(fā)送實(shí)現(xiàn)At most once)邻吞。
接下來討論的是消息從broker到consumer的delivery guarantee semantic组题。(僅針對Kafka consumer high level API)。consumer在從broker讀取消息后抱冷,可以選擇commit崔列,該操作會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時(shí)會從下一條開始讀取旺遮。如未commit赵讯,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當(dāng)然可以將consumer設(shè)置為autocommit耿眉,即consumer一旦讀到數(shù)據(jù)立即自動commit边翼。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once鸣剪。但實(shí)際上實(shí)際使用中consumer并非讀取完數(shù)據(jù)就結(jié)束了组底,而是要進(jìn)行進(jìn)一步處理,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic筐骇。
讀完消息先commit再處理消息债鸡。這種模式下,如果consumer在commit后還沒來得及處理消息就crash了拥褂,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息娘锁,這就對應(yīng)于At most once
讀完消息先處理再commit。這種模式下饺鹃,如果處理完了消息在commit之前consumer crash了莫秆,下次重新開始工作時(shí)還會處理剛剛未commit的消息,實(shí)際上該消息已經(jīng)被處理過了悔详。這就對應(yīng)于At least once镊屎。在很多情況使用場景下,消息都有一個(gè)primary key茄螃,所以消息的處理往往具有冪等性缝驳,即多次處理這一條消息跟只處理一次是等效的,那就可以認(rèn)為是Exactly once。(人個(gè)感覺這種說法有些牽強(qiáng)用狱,畢竟它不是Kafka本身提供的機(jī)制运怖,而且primary key本身不保證操作的冪等性。而且實(shí)際上我們說delivery guarantee semantic是討論被處理多少次夏伊,而非處理結(jié)果怎樣摇展,因?yàn)樘幚矸绞蕉喾N多樣,我們的系統(tǒng)不應(yīng)該把處理過程的特性–如是否冪等性溺忧,當(dāng)成Kafka本身的feature)
如果一定要做到Exactly once咏连,就需要協(xié)調(diào)offset和實(shí)際操作的輸出。精典的做法是引入兩階段提交鲁森。如果能讓offset和操作輸入存在同一個(gè)地方祟滴,會更簡潔和通用。這種方式可能更好歌溉,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交垄懂。比如,consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS痛垛,如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS埠偿,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,要么都不完成榜晦,間接實(shí)現(xiàn)Exactly once冠蒋。(目前就high level API而言,offset是存于Zookeeper中的乾胶,無法存于HDFS抖剿,而low level API的offset是由自己去維護(hù)的,可以將之存于HDFS中)
總之识窿,Kafka默認(rèn)保證At least once斩郎,并且允許通過設(shè)置producer異步提交來實(shí)現(xiàn)At most once。而Exactly once要求與目標(biāo)存儲系統(tǒng)協(xié)作喻频,幸運(yùn)的是Kafka提供的offset可以使用這種方式非常直接非常容易缩宜。
Benchmark
Kafka的創(chuàng)始人之一的Jay Kreps的bechmark。以下描述皆基于該benchmark甥温。(該benchmark基于Kafka0.8.1)
測試環(huán)境
該benchmark用到了六臺機(jī)器锻煌,機(jī)器配置如下
- Intel Xeon 2.5 GHz processor with six cores
- Six 7200 RPM SATA drives
- 32GB of RAM
- 1Gb Ethernet
這6臺機(jī)器其中3臺用來搭建Kafka broker集群,另外3臺用來安裝Zookeeper及生成測試數(shù)據(jù)姻蚓。6個(gè)drive都直接以非RAID方式掛載宋梧。實(shí)際上kafka對機(jī)器的需求與Hadoop的類似。
Producer吞吐率
該項(xiàng)測試只測producer的吞吐率狰挡,也就是數(shù)據(jù)只被持久化捂龄,沒有consumer讀數(shù)據(jù)释涛。
1個(gè)producer線程,無replication
在這一測試中倦沧,創(chuàng)建了一個(gè)包含6個(gè)partition且沒有replication的topic唇撬。然后通過一個(gè)線程盡可能快的生成50 million條比較短(payload100字節(jié)長)的消息。測試結(jié)果是821,557 records/second(78.3MB/second)展融。
之所以使用短消息局荚,是因?yàn)閷τ谙⑾到y(tǒng)來說這種使用場景更難。因?yàn)槿绻褂肕B/second來表征吞吐率愈污,那發(fā)送長消息無疑能使得測試結(jié)果更好。
整個(gè)測試中轮傍,都是用每秒鐘delivery的消息的數(shù)量乘以payload的長度來計(jì)算MB/second的暂雹,沒有把消息的元信息算在內(nèi),所以實(shí)際的網(wǎng)絡(luò)使用量會比這個(gè)大创夜。對于本測試來說杭跪,每次還需傳輸額外的22個(gè)字節(jié),包括一個(gè)可選的key驰吓,消息長度描述涧尿,CRC等。另外檬贰,還包含一些請求相關(guān)的overhead姑廉,比如topic,partition翁涤,acknowledgement等桥言。這就導(dǎo)致我們比較難判斷是否已經(jīng)達(dá)到網(wǎng)卡極限,但是把這些overhead都算在吞吐率里面應(yīng)該更合理一些葵礼。因此号阿,我們已經(jīng)基本達(dá)到了網(wǎng)卡的極限。
初步觀察此結(jié)果會認(rèn)為它比人們所預(yù)期的要高很多鸳粉,尤其當(dāng)考慮到Kafka要把數(shù)據(jù)持久化到磁盤當(dāng)中扔涧。實(shí)際上,如果使用隨機(jī)訪問數(shù)據(jù)系統(tǒng)届谈,比如RDBMS枯夜,或者key-velue store,可預(yù)期的最高訪問頻率大概是5000到50000個(gè)請求每秒艰山,這和一個(gè)好的RPC層所能接受的遠(yuǎn)程請求量差不多卤档。而該測試中遠(yuǎn)超于此的原因有兩個(gè)。
- Kafka確保寫磁盤的過程是線性磁盤I/O程剥,測試中使用的6塊廉價(jià)磁盤線性I/O的最大吞吐量是822MB/second劝枣,這已經(jīng)遠(yuǎn)大于1Gb網(wǎng)卡所能帶來的吞吐量了汤踏。許多消息系統(tǒng)把數(shù)據(jù)持久化到磁盤當(dāng)成是一個(gè)開銷很大的事情,這是因?yàn)樗麄儗Υ疟P的操作都不是線性I/O舔腾。
- 在每一個(gè)階段溪胶,Kafka都盡量使用批量處理。如果想了解批處理在I/O操作中的重要性稳诚,可以參考David Patterson的”Latency Lags Bandwidth“
1個(gè)producer線程哗脖,3個(gè)異步replication
該項(xiàng)測試與上一測試基本一樣,唯一的區(qū)別是每個(gè)partition有3個(gè)replica(所以網(wǎng)絡(luò)傳輸?shù)暮蛯懭氪疟P的總的數(shù)據(jù)量增加了3倍)扳还。每一個(gè)broker即要寫作為leader的partition才避,也要讀(從leader讀數(shù)據(jù))寫(將數(shù)據(jù)寫到磁盤)作為follower的partition。測試結(jié)果為786,980 records/second(75.1MB/second)氨距。
該項(xiàng)測試中replication是異步的桑逝,也就是說broker收到數(shù)據(jù)并寫入本地磁盤后就acknowledge producer,而不必等所有replica都完成replication俏让。也就是說楞遏,如果leader crash了,可能會丟掉一些最新的還未備份的數(shù)據(jù)首昔。但這也會讓message acknowledgement延遲更少寡喝,實(shí)時(shí)性更好。
這項(xiàng)測試說明勒奇,replication可以很快预鬓。整個(gè)集群的寫能力可能會由于3倍的replication而只有原來的三分之一,但是對于每一個(gè)producer來說吞吐率依然足夠好赊颠。
1個(gè)producer線程珊皿,3個(gè)同步replication
該項(xiàng)測試與上一測試的唯一區(qū)別是replication是同步的,每條消息只有在被in sync集合里的所有replica都復(fù)制過去后才會被置為committed(此時(shí)broker會向producer發(fā)送acknowledgement)巨税。在這種模式下蟋定,Kafka可以保證即使leader crash了,也不會有數(shù)據(jù)丟失草添。測試結(jié)果為421,823 records/second(40.2MB/second)驶兜。
Kafka同步復(fù)制與異步復(fù)制并沒有本質(zhì)的不同。leader會始終track follower replica從而監(jiān)控它們是否還alive远寸,只有所有in sync集合里的replica都acknowledge的消息才可能被consumer所消費(fèi)抄淑。而對follower的等待影響了吞吐率〕酆螅可以通過增大batch size來改善這種情況肆资,但為了避免特定的優(yōu)化而影響測試結(jié)果的可比性,本次測試并沒有做這種調(diào)整灶芝。
3個(gè)producer,3個(gè)異步replication
該測試相當(dāng)于把上文中的1個(gè)producer,復(fù)制到了3臺不同的機(jī)器上(在1臺機(jī)器上跑多個(gè)實(shí)例對吞吐率的增加不會有太大幫忙郑原,因?yàn)榫W(wǎng)卡已經(jīng)基本飽和了)唉韭,這3個(gè)producer同時(shí)發(fā)送數(shù)據(jù)。整個(gè)集群的吞吐率為2,024,032 records/second(193,0MB/second)犯犁。
Producer Throughput Vs. Stored Data
消息系統(tǒng)的一個(gè)潛在的危險(xiǎn)是當(dāng)數(shù)據(jù)能都存于內(nèi)存時(shí)性能很好属愤,但當(dāng)數(shù)據(jù)量太大無法完全存于內(nèi)存中時(shí)(然后很多消息系統(tǒng)都會刪除已經(jīng)被消費(fèi)的數(shù)據(jù),但當(dāng)消費(fèi)速度比生產(chǎn)速度慢時(shí)酸役,仍會造成數(shù)據(jù)的堆積)住诸,數(shù)據(jù)會被轉(zhuǎn)移到磁盤,從而使得吞吐率下降涣澡,這又反過來造成系統(tǒng)無法及時(shí)接收數(shù)據(jù)贱呐。這樣就非常糟糕,而實(shí)際上很多情景下使用queue的目的就是解決數(shù)據(jù)消費(fèi)速度和生產(chǎn)速度不一致的問題入桂。
但Kafka不存在這一問題奄薇,因?yàn)镵afka始終以O(shè)(1)的時(shí)間復(fù)雜度將數(shù)據(jù)持久化到磁盤,所以其吞吐率不受磁盤上所存儲的數(shù)據(jù)量的影響事格。為了驗(yàn)證這一特性,做了一個(gè)長時(shí)間的大數(shù)據(jù)量的測試搞隐,下圖是吞吐率與數(shù)據(jù)量大小的關(guān)系圖驹愚。
上圖中有一些variance的存在,并可以明顯看到劣纲,吞吐率并不受磁盤上所存數(shù)據(jù)量大小的影響逢捺。實(shí)際上從上圖可以看到,當(dāng)磁盤數(shù)據(jù)量達(dá)到1TB時(shí)癞季,吞吐率和磁盤數(shù)據(jù)只有幾百M(fèi)B時(shí)沒有明顯區(qū)別劫瞳。
這個(gè)variance是由Linux I/O管理造成的,它會把數(shù)據(jù)緩存起來再批量flush绷柒。上圖的測試結(jié)果是在生產(chǎn)環(huán)境中對Kafka集群做了些tuning后得到的志于,這些tuning方法可參考這里。
consumer吞吐率
需要注意的是废睦,replication factor并不會影響consumer的吞吐率測試伺绽,因?yàn)閏onsumer只會從每個(gè)partition的leader讀數(shù)據(jù),而與replicaiton factor無關(guān)嗜湃。同樣奈应,consumer吞吐率也與同步復(fù)制還是異步復(fù)制無關(guān)。
1個(gè)consumer
該測試從有6個(gè)partition购披,3個(gè)replication的topic消費(fèi)50 million的消息杖挣。測試結(jié)果為940,521 records/second(89.7MB/second)。
可以看到刚陡,Kafkar的consumer是非常高效的惩妇。它直接從broker的文件系統(tǒng)里讀取文件塊株汉。Kafka使用sendfile API來直接通過操作系統(tǒng)直接傳輸,而不用把數(shù)據(jù)拷貝到用戶空間屿附。該項(xiàng)測試實(shí)際上從log的起始處開始讀數(shù)據(jù)郎逃,所以它做了真實(shí)的I/O。在生產(chǎn)環(huán)境下挺份,consumer可以直接讀取producer剛剛寫下的數(shù)據(jù)(它可能還在緩存中)褒翰。實(shí)際上,如果在生產(chǎn)環(huán)境下跑I/O stat匀泊,你可以看到基本上沒有物理“讀”优训。也就是說生產(chǎn)環(huán)境下consumer的吞吐率會比該項(xiàng)測試中的要高。
3個(gè)consumer
將上面的consumer復(fù)制到3臺不同的機(jī)器上各聘,并且并行運(yùn)行它們(從同一個(gè)topic上消費(fèi)數(shù)據(jù))揣非。測試結(jié)果為2,615,968 records/second(249.5MB/second)。
正如所預(yù)期的那樣躲因,consumer的吞吐率幾乎線性增漲早敬。
Producer and Consumer
上面的測試只是把producer和consumer分開測試,而該項(xiàng)測試同時(shí)運(yùn)行producer和consumer大脉,這更接近使用場景搞监。實(shí)際上目前的replication系統(tǒng)中follower就相當(dāng)于consumer在工作。
該項(xiàng)測試镰矿,在具有6個(gè)partition和3個(gè)replica的topic上同時(shí)使用1個(gè)producer和1個(gè)consumer琐驴,并且使用異步復(fù)制。測試結(jié)果為795,064 records/second(75.8MB/second)秤标。
可以看到绝淡,該項(xiàng)測試結(jié)果與單獨(dú)測試1個(gè)producer時(shí)的結(jié)果幾乎一致。所以說consumer非常輕量級苍姜。
消息長度對吞吐率的影響
上面的所有測試都基于短消息(payload 100字節(jié))牢酵,而正如上文所說,短消息對Kafka來說是更難處理的使用方式衙猪,可以預(yù)期茁帽,隨著消息長度的增大,records/second會減小屈嗤,但MB/second會有所提高潘拨。下圖是records/second與消息長度的關(guān)系圖。
正如我們所預(yù)期的那樣饶号,隨著消息長度的增加铁追,每秒鐘所能發(fā)送的消息的數(shù)量逐漸減小。但是如果看每秒鐘發(fā)送的消息的總大小茫船,它會隨著消息長度的增加而增加琅束,如下圖所示扭屁。
從上圖可以看出,當(dāng)消息長度為10字節(jié)時(shí)涩禀,因?yàn)橐l繁入隊(duì)料滥,花了太多時(shí)間獲取鎖,CPU成了瓶頸艾船,并不能充分利用帶寬葵腹。但從100字節(jié)開始,我們可以看到帶寬的使用逐漸趨于飽和(雖然MB/second還是會隨著消息長度的增加而增加屿岂,但增加的幅度也越來越屑纭)。
端到端的Latency
上文中討論了吞吐率爷怀,那消息傳輸?shù)膌atency如何呢阻肩?也就是說消息從producer到consumer需要多少時(shí)間呢?該項(xiàng)測試創(chuàng)建1個(gè)producer和1個(gè)consumer并反復(fù)計(jì)時(shí)运授。結(jié)果是烤惊,2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)。
∮蹼(這里并沒有說明topic有多少個(gè)partition柒室,也沒有說明有多少個(gè)replica,replication是同步還是異步喇完。實(shí)際上這會極大影響producer發(fā)送的消息被commit的latency伦泥,而只有committed的消息才能被consumer所消費(fèi)剥啤,所以它會最終影響端到端的latency)
重現(xiàn)該benchmark
如果讀者想要在自己的機(jī)器上重現(xiàn)本次benchmark測試锦溪,可以參考本次測試的配置和所使用的命令。
實(shí)際上Kafka Distribution提供了producer性能測試工具府怯,可通過bin/kafka-producer-perf-test.sh腳本來啟動刻诊。所使用的命令如下
Producer
Setup
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3
Single thread, no replication
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
Single-thread, async 3x replication
bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test6 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
Single-thread, sync 3x replication
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000
Three Producers, 3x async replication
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
Throughput Versus Stored Data
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
Effect of message size
for i in 10 100 1000 10000 100000;
do
echo ""
echo $i
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000*1024*1024/$i)) $i -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000
done;
Consumer
Consumer throughput
bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1
3 Consumers
On three servers, run:
bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1
End-to-end Latency
bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000
Producer and consumer
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1
broker配置如下
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost
# The number of threads handling network requests
num.network.threads=4
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# The directory under which to store log files
log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs
# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=8
############################# Log Flush Policy #############################
# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
# 3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false
replica.lag.max.messages=10000000
個(gè)人介紹:
高廣超 :多年一線互聯(lián)網(wǎng)研發(fā)與架構(gòu)設(shè)計(jì)經(jīng)驗(yàn),擅長設(shè)計(jì)與落地高可用牺丙、高性能互聯(lián)網(wǎng)架構(gòu)则涯。目前就職于美團(tuán)網(wǎng),負(fù)責(zé)核心業(yè)務(wù)研發(fā)工作冲簿。