Kakfa介紹
Kafka是什么
Kafka最初是LinkedIn的內(nèi)部內(nèi)部基礎(chǔ)設(shè)施系統(tǒng)膝迎。它被認(rèn)為是一個(gè)流平臺队丝,在Kafka上可以發(fā)布和訂閱流數(shù)據(jù)工育,并把它們保存起來攀圈、進(jìn)行處理昭娩。但是我們在使用Kafka中凛篙,最多的就是將它作為一個(gè)消息系統(tǒng)使用,類似于ActiveMQ栏渺、RabbitMQ等呛梆。但是Kafka與這些傳統(tǒng)的消息系統(tǒng)又有著許多的不同點(diǎn),這些差異使它又不同于消息系統(tǒng)磕诊。
- Kafka是一個(gè)分布式系統(tǒng)填物,以集群(支持自由伸縮)的方式運(yùn)行。(所以我們總稱為分布式消息隊(duì)列)
- Kafka可以用來存儲數(shù)據(jù)霎终,數(shù)據(jù)存儲的時(shí)間長短由你自己定義(以容錯(cuò)持久化的方式存儲)滞磺。并且只要數(shù)據(jù)還存儲在Kafka中,你可以重復(fù)讀取莱褒。
- 流式處理將數(shù)據(jù)處理的層次提升到了新高度击困。
而傳統(tǒng)的消息系統(tǒng),只會用來傳遞消息广凸。
Kafka也可以看成是實(shí)時(shí)版的Hadoop(這也是設(shè)計(jì)Kafka的初衷之一)阅茶。Hadoop可以存儲和定期處理大量的數(shù)據(jù)文件,而Kafka而可以存儲和持續(xù)型的處理大型的數(shù)據(jù)流谅海。Hadoop主要應(yīng)用于數(shù)據(jù)分析上脸哀,而Kafka因其低延遲的特點(diǎn)更合適應(yīng)用于核心業(yè)務(wù)上,業(yè)務(wù)事件發(fā)生時(shí)扭吁,Kafka能夠針對這些事件及時(shí)做出相應(yīng)企蹭。同時(shí)kafka也可以和ETL進(jìn)行比較,因?yàn)樗鼈兩瞄L移動數(shù)據(jù)智末。
Kafka屬于消息系統(tǒng)中的發(fā)布-訂閱模式消息系統(tǒng)谅摄。消息發(fā)送者不會將消息直接發(fā)送到消息接受者里,而是將消息首先進(jìn)行分類(topic)系馆,然后將消息發(fā)布到消息系統(tǒng)中送漠。消息接受者選擇需要訂閱的消息類型(topic),然后就能夠從消息系統(tǒng)中接收所訂閱的消息了由蘑。
Kafka中的消息和批次
Kafka中的數(shù)據(jù)單元稱為消息闽寡,消息可以看成關(guān)系型數(shù)據(jù)庫中的“數(shù)據(jù)行”或“記錄”。消息是由鍵值對組成尼酿,其中鍵稱之為元數(shù)據(jù)爷狈,是可選的。消息中的鍵值對是由字節(jié)數(shù)組組成裳擎,所以消息里的數(shù)據(jù)沒有特別格式或含義(schema)涎永。鍵主要用來分區(qū)寫,比如通過鍵生成一個(gè)一致性散列值,然后使用散列值對分區(qū)取模羡微,為消息選取分區(qū)谷饿,保證了相同類型鍵的消息都寫入到了相同分區(qū)內(nèi)。
為了提高消息寫入效率妈倔,消息被分批次寫入Kafka中博投。批次就是一組消息,這些消息屬于同一topic下的同一分區(qū)盯蝴。這樣減少了網(wǎng)絡(luò)開銷毅哗,但是這需要在時(shí)間延遲和吞吐量之間作出平衡。批次的數(shù)據(jù)會被壓縮捧挺,這樣提升了數(shù)據(jù)的傳輸和存儲能力黎做,但同樣做了更多的計(jì)算(這也是Kafka對CPU性能的要求點(diǎn))。
Kafka中的主題和分區(qū)
Kafka的消息通過主題(topic)進(jìn)行分類松忍,主題類似關(guān)系型數(shù)據(jù)庫中的表谷暮,或者文件系統(tǒng)中的文件夾椅野。一個(gè)主題可以被分為若干個(gè)分區(qū)(partition)驼修,一個(gè)分區(qū)就是一個(gè)提交日志肚邢。消息以追加的方式寫入分區(qū),然后以先進(jìn)先出的順序讀取摊溶。因?yàn)橐粋€(gè)topic一般由多個(gè)partition組成爬骤,所以Kafka不能保證主題范圍內(nèi)的消息順序,但是能夠保證單個(gè)分區(qū)的消息順序莫换。如果要保證整個(gè)主題的有序性霞玄,就只能一個(gè)主題只有一個(gè)分區(qū)。Kafka通過分區(qū)來完成消息的冗余和伸縮性拉岁,分區(qū)可以分布在不同的服務(wù)器上坷剧,這樣比單個(gè)服務(wù)器具有更高的性能。
每個(gè)分區(qū)都是一個(gè)有序喊暖、不可變的記錄序列惫企,新提交的記錄會不斷的追加到分區(qū)中。分區(qū)中的每條記錄都會被分配一個(gè)連續(xù)的序列號叫做offset(偏移)陵叽,用于唯一標(biāo)識分區(qū)中的每個(gè)記錄狞尔。
在一個(gè)可配置的保留周期內(nèi)(保存時(shí)間或保存大小),Kafka集群會持久化所有發(fā)布的記錄巩掺,無論這個(gè)記錄是否被消費(fèi)過偏序。比如,我們將保存周期設(shè)置為2天胖替,則記錄在發(fā)布的兩天內(nèi)都可以重復(fù)被使用研儒,當(dāng)過了兩天后豫缨,這條記錄就會被丟棄以釋放空間。Kafka的性能是與數(shù)據(jù)大小無關(guān)的常量殉摔,所以數(shù)據(jù)存儲多長時(shí)間都沒有問題。
為什么要進(jìn)行日志分區(qū)
- 使得每個(gè)topic日志不會太大记焊,以便單個(gè)服務(wù)能夠保存逸月。
- 每個(gè)分區(qū)能夠單獨(dú)發(fā)布和消費(fèi),為并發(fā)消費(fèi)topic提供一種可能遍膜。(也是最重要的)
Kafka客戶端
Kafka的客戶端就是Kafka的系統(tǒng)用戶碗硬,它們被分為兩種基本類型:生產(chǎn)者和消費(fèi)者。除了這些基礎(chǔ)API之外瓢颅,Kafka還提供了一些高級API恩尾,比如用于數(shù)據(jù)集成的Kafka Connect API,用于流式處理的Kafka Streams和用于管理Kafka的AdminClient挽懦。
- Producer API(http://kafka.apache.org/documentation.html#producerapi):用于應(yīng)用程序?qū)?shù)據(jù)流發(fā)布到一個(gè)或多個(gè)topic上翰意。
- Consumer API(http://kafka.apache.org/documentation.html#consumerapi):用于應(yīng)用程序訂閱一個(gè)或多個(gè)topic,并處理這些流記錄信柿。
- Streams API(http://kafka.apache.org/documentation/streams):用于流式處理冀偶,消費(fèi)來自一個(gè)或多個(gè)topic的輸入流,并生成一個(gè)輸出流到一個(gè)或多個(gè)topic上渔嚷,輸入輸出都是kafka进鸠。
- Connector API(http://kafka.apache.org/documentation.html#connect):用于Kafka topic與現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)集成的API。
Kafka生產(chǎn)者
Kafka中的生產(chǎn)者是用于寫入消息的形病,一般一個(gè)消息會被寫入到一個(gè)指定的主題內(nèi)客年。生產(chǎn)者默認(rèn)會將消息均衡地分布到主題的所有分區(qū)上。但是我們可以通過消息鍵或者分區(qū)器來將消息分類漠吻,將同類數(shù)據(jù)寫入到同一個(gè)分區(qū)內(nèi)量瓜。
Kafka消費(fèi)者
Kafka中的消費(fèi)者是用于讀取消息的,消費(fèi)者會訂閱一個(gè)或多個(gè)主題途乃,并且按照消息的生成順序讀取它們榔至。消費(fèi)者會通過消息的“偏移量”來記錄已經(jīng)讀取的位置,偏移量是一種元數(shù)據(jù)欺劳,它是一個(gè)不斷自增的整數(shù)值唧取。在消息寫入到分區(qū)內(nèi)時(shí),Kafka會為該條消息生成所在分區(qū)內(nèi)的唯一數(shù)值划提。消費(fèi)者會把最后讀取消息所在的偏移量保存到Zookeeper或Kafka中枫弟,如果消費(fèi)者關(guān)閉或重啟,則會重新讀取該偏移量鹏往。
在Kafka中消費(fèi)者是消費(fèi)者群組的一部分淡诗,即一個(gè)群組可能有多個(gè)消費(fèi)者共同讀取一個(gè)主題骇塘。但是群組能夠保證每個(gè)分區(qū)內(nèi)的消息只能被消費(fèi)者群組中的一個(gè)消費(fèi)者消費(fèi)。
消費(fèi)者與分區(qū)之間的關(guān)系稱為消費(fèi)者對分區(qū)的所有權(quán)韩容。當(dāng)一個(gè)消費(fèi)者掛掉后款违,同一群組的消費(fèi)者可以接管失效消費(fèi)者的工作。
消息的有序性
相比傳統(tǒng)的消息系統(tǒng)群凶,Kafka可以很好的保證有序性插爹。
傳統(tǒng)消息隊(duì)列在服務(wù)器上保存有序消息,但是當(dāng)多個(gè)Consumer消費(fèi)隊(duì)列中的數(shù)據(jù)時(shí)请梢,由于消息被異步發(fā)送到不同的Consumer上赠尾,所以消息到達(dá)時(shí)可能已經(jīng)失去了原來的順序。通常這種情況如果需要強(qiáng)順序讀取毅弧,則只能有一個(gè)Consumer消費(fèi)消息气嫁。這樣也就失去了并發(fā)性。
Kafka由于使用了分區(qū)概念够坐,可以在多個(gè)Consumer組并發(fā)的情況下提供較好的有序性和負(fù)載均衡寸宵。將每個(gè)分區(qū)只發(fā)給一個(gè)Consumer,這樣一個(gè)分區(qū)就只被一個(gè)Consumer消費(fèi)了元咙,就可以順序消費(fèi)這個(gè)分區(qū)的消息了邓馒,由于一個(gè)topic有多個(gè)partition,所以可以使用多個(gè)Consumer消費(fèi)蛾坯,來實(shí)現(xiàn)負(fù)載均衡光酣。但是Kafka只能保證一個(gè)分區(qū)的消息是有序的,如果需要topic所有消息都有序脉课,那只能一個(gè)topic只有一個(gè)分區(qū)救军,也就只能有一個(gè)Consumer消費(fèi)。
Kafka集群
在多臺機(jī)器上分別部署Kafka倘零,就會組成一個(gè)Kafka集群唱遭。每臺機(jī)器運(yùn)行的Kafka服務(wù)稱為broker,broker用于接收生產(chǎn)者消息呈驶,為消息設(shè)置偏移量拷泽,并且將消息保存到磁盤中。broker還為消費(fèi)者提供讀取消息服務(wù)袖瞻,向消費(fèi)者返回已經(jīng)提交到磁盤中的消息司致。單個(gè)broker可以輕松處理數(shù)千分區(qū)以及每秒百萬級消息量(依賴于具體機(jī)器性能)。
在broker集群中聋迎,會選舉出一個(gè)leader脂矫,作為集群控制器的角色。leader控制器負(fù)責(zé)管理工作霉晕,比如將分區(qū)分配給broker和監(jiān)控broker庭再。在broker集群中捞奕,一個(gè)分區(qū)隸屬于一個(gè)broker,這個(gè)broker稱為分區(qū)的leader拄轻。一個(gè)分區(qū)可以分配到多個(gè)broker上颅围,而這些其它broker上的分區(qū)數(shù)據(jù)是分區(qū)leader的復(fù)制數(shù)據(jù),當(dāng)分區(qū)leader掛掉后恨搓,其它broker可以接管領(lǐng)導(dǎo)權(quán)院促,但是這時(shí)候相關(guān)的消費(fèi)者和生產(chǎn)者會連接到新的分區(qū)leader上。這種分區(qū)復(fù)制的機(jī)制為kafka提供消息冗余奶卓,保證了kafka的容錯(cuò)性和負(fù)載均衡一疯。
broker集群中的消息會持久化到磁盤上撼玄,這是kafka的一個(gè)重要特性夺姑。Kafka broker默認(rèn)的消息保留策略有兩種:保留到指定的時(shí)間和保留到消息到達(dá)一定的字節(jié)數(shù)。當(dāng)達(dá)到上限時(shí)掌猛,就消息就會被刪除盏浙。
Kafka集群搭建
對于Kafka集群來說,單個(gè)節(jié)點(diǎn)broker和多個(gè)節(jié)點(diǎn)的broker并沒有任何區(qū)別荔茬。多broker節(jié)點(diǎn)只是在集群啟動過程中废膘,每個(gè)broker節(jié)點(diǎn)都需要啟動。
Kafka安裝包下載(2.0.0版本)
下載路徑:
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -zxvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
安裝Zookeeper
Kafka是使用Zookeeper來保存集群元數(shù)據(jù)信息和消費(fèi)者信息慕蔚。雖然Kafka發(fā)行版已經(jīng)自帶了Zookeeper丐黄,可以通過腳本直接啟動,但仍然建議安裝一個(gè)完整版的Zookeeper孔飒。
Zookeeper部署:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
注意:
1)灌闺、在部署Zookeeper時(shí),應(yīng)該使用Linux監(jiān)督(supervision)坏瞄。因?yàn)閆ookeeper遇到任何失敗情況桂对,都會快速失敗并且退出線程,查看:http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_supervision鸠匀。
2)蕉斜、部署完Zookeeper應(yīng)該配置一個(gè)cron來定時(shí)壓縮zk的數(shù)據(jù)和日志,因?yàn)閦k并不會做這些事缀棍。如果我們不設(shè)置cron宅此,系統(tǒng)磁盤有可能會被zk打滿。
https://www.cnblogs.com/fesh/p/3900253.htmlhttps://blog.csdn.net/qq_37716485/article/details/71786894
Kafka配置
Kafka的配置文件在${KAFKA_HOME}/config/server.properties目錄爬范,我們只需要簡單進(jìn)行配置下:
broker.id=1 #當(dāng)前broker在集群中的唯一標(biāo)識诽凌,類似zk中的myid
log.dir=/opt/yangjianzhang/kafka/log #消息日志輸出目錄
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2281 #使用的zk集群
分發(fā)安裝并啟動
將Kafka安裝包分發(fā)到其它機(jī)器上:
scp -r kafka_2.11-2.0.0 root@192.168.0.1:/opt/yangjianzhang/kafka/
#啟動集群中每臺機(jī)器的Kafka服務(wù)
bin/kafka-server-start.sh -daemon config/server.properties #需要指定啟動配置文件
創(chuàng)建topic
#創(chuàng)建test topic,只有一個(gè)分區(qū)坦敌、三個(gè)副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
查看創(chuàng)建的topic信息
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: demo Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
第一行輸出是對所有分區(qū)的一個(gè)描述侣诵,然后每個(gè)分區(qū)會有一行輸出痢法。
leader:當(dāng)前分區(qū)所在的leader節(jié)點(diǎn),負(fù)責(zé)處理消息的讀和寫杜顺,leader是從所有分區(qū)所在broker中隨機(jī)選擇出來的财搁。
replicas:列出了所有副本節(jié)點(diǎn)(包含了leader節(jié)點(diǎn)),無論該節(jié)點(diǎn)當(dāng)前是否存活躬络。
isr:分區(qū)副本所在節(jié)點(diǎn)尖奔,并且該節(jié)點(diǎn)正常運(yùn)行服務(wù)。
當(dāng)前分區(qū)leader是broker 3穷当,我們kill 掉broker 3中的kafka服務(wù)提茁,然后再看分區(qū)信息:
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: demo Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2
leader重新進(jìn)行了選舉,并且當(dāng)前服務(wù)節(jié)點(diǎn)isr中已經(jīng)把3剔除馁菜。
使用kafka命令行發(fā)送和消費(fèi)消息
Kafka附帶了一個(gè)命令行客戶端茴扁,允許讀取文件或標(biāo)準(zhǔn)輸入發(fā)送到Kafka集群中,默認(rèn)情況下每行作為一條消息發(fā)送汪疮。
[root@yjz01 kafka_2.11-2.0.0]# bin/kafka-console-producer.sh -broker-list localhost:2181 --topic demo
>hello world
hello kafka
使用命令行consumer讀數(shù)據(jù):
bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic demo --from-beginning
>hello world
hello kafka
關(guān)注我
歡迎關(guān)注我的公眾號峭火,會定期推送優(yōu)質(zhì)技術(shù)文章,讓我們一起進(jìn)步智嚷、一起成長卖丸!
公眾號搜索:data_tc
或直接掃碼:??