kafka簡介
參考網(wǎng)址
http://www.cnblogs.com/likehua/p/3999538.html
http://www.infoq.com/cn/articles/apache-kafka/
http://www.infoq.com/cn/articles/kafka-analysis-part-1
http://www.infoq.com/cn/articles/kafka-analysis-part-2
http://www.infoq.com/cn/articles/kafka-analysis-part-3
http://www.infoq.com/cn/articles/kafka-analysis-part-4
http://www.infoq.com/cn/articles/kafka-analysis-part-5
http://www.aboutyun.com/thread-12882-1-1.html
question
partitions是如何分配的借浊,是每個server上都有所有的partitions么减宣,還是在每個server上只有某一份partitions? 如果是前者界轩,如何節(jié)省磁盤空間的画饥?
每個分區(qū),多個server, 一個leader,分區(qū)可以有多個備份耸棒。
本質(zhì)上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費.也就是說可以通過group在內(nèi)部實現(xiàn)consumer的負(fù)載均衡荒澡,在外部實現(xiàn)不同topic消息的隔離报辱。
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認(rèn)為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的.
使用場景
1.messaging
對于一些常規(guī)的消息系統(tǒng),kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優(yōu)勢.不過到目前為止,我們應(yīng)該很清楚認(rèn)識到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機制)""消息分組"等企業(yè)級特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)
2.Websit activity tracking
kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實時監(jiān)控,或者離線統(tǒng)計分析等
3.Log Aggregation
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng).
設(shè)計思路
1与殃、持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性.且無論任何OS下,對文件系統(tǒng)本身的優(yōu)化幾乎沒有可能.文件緩存/直接內(nèi)存映射等是常用的手段.因為kafka是對日志文件進(jìn)行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數(shù),broker會將消息暫時buffer起來,當(dāng)消息的個數(shù)(或尺寸)達(dá)到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù).
2、性能
需要考慮的影響性能點很多,除磁盤IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以將消息buffer起來,當(dāng)消息的條數(shù)達(dá)到一定閥值時,批量發(fā)送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對于kafka broker端,似乎有個sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無需進(jìn)程再次copy和交換. 其實對于producer/consumer/broker三者而言,CPU的開支應(yīng)該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過壓縮.kafka支持gzip/snappy等多種壓縮方式.
3碍现、生產(chǎn)者
負(fù)載均衡: producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何"路由層".事實上,消息被路由到哪個partition上,有producer客戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現(xiàn)"消息均衡分發(fā)"是必要的.
其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經(jīng)注冊了watch用來監(jiān)聽partition leader的變更事件.
異步發(fā)送:將多條消息暫且在客戶端buffer起來幅疼,并將他們批量的發(fā)送到broker,小數(shù)據(jù)IO太多昼接,會拖慢整體的網(wǎng)絡(luò)延遲爽篷,批量延遲發(fā)送事實上提升了網(wǎng)絡(luò)效率。不過這也有一定的隱患慢睡,比如說當(dāng)producer失效時逐工,那些尚未發(fā)送的消息將會丟失铡溪。
4、消費者
consumer端向broker發(fā)送"fetch"請求,并告知其獲取消息的offset;此后consumer將會獲得一定條數(shù)的消息;consumer端也可以重置offset來重新消費消息.
在JMS實現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.不過在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull(或者說fetch)消息;這中模式有些優(yōu)點,首先consumer端可以根據(jù)自己的消費能力適時的去fetch消息并處理,且可以控制消息消費的進(jìn)度(offset);此外,消費者可以良好的控制消息消費的數(shù)量,batch fetch.
其他JMS實現(xiàn),消息消費的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒有消費成功的消息重發(fā)等,同時還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態(tài)的控制,也沒有復(fù)雜的消息確認(rèn)機制,可見kafka broker端是相當(dāng)輕量級的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級.
集成
單機集成環(huán)境
參考資料
http://colobu.com/2014/11/19/kafka-spring-integration-in-practice/
https://github.com/smallnest/spring-kafka-demo
通過spring boot來集成kafka
http://www.reibang.com/p/048e954dab40
Github上一個用spring-boot來集成kafka,mongodb,myibatis等等的例子:
https://github.com/xho22/spring-boot-dubbo-mongo-mybatis-kafka-liquibase
kafka使用
在kafka啟動的時候要同時啟動zookeeper和kafka server
命令如下:
bin/kafka-server-start.sh config/server.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
具體配置可以修改server.properties和zookeeper.properties.
如果出現(xiàn)下面的錯誤泪喊,則是因為沒有啟動kafka server.
Caused by: kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
at org.springframework.integration.kafka.listener.KafkaTopicOffsetManager.createCompactedTopicIfNotFound(KafkaTopicOffsetManager.java:268)
at org.springframework.integration.kafka.listener.KafkaTopicOffsetManager.afterPropertiesSet(KafkaTopicOffsetManager.java:210)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
... 22 more
創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper 10.160.5.56:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --zookeeper 10.160.5.56:2181 -list
生產(chǎn)者
bin/kafka-console-producer.sh --broker-list 10.160.5.56:9092 --topic test
消費者
bin/kafka-console-consumer.sh --zookeeper 10.160.5.56:2181 --topic test --from-beginning
注意
- 新版本出了spring-kafka棕硫,一部分功能從spring-integration-kafka中移出來了,但是除了官方使用之外袒啼,網(wǎng)上資料很少
https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka