概念
Kafka:是一個分布式消息系統(tǒng)愕把,由linkedin使用scala編寫姆涩,用作LinkedIn的活動流(Activity Stream)和運營數(shù)據(jù)處理管道(Pipeline)的基礎。具有高水平擴展和高吞吐量驹暑。
Kafka和其他主流分布式消息系統(tǒng)的對比
ActiveMQ | RabbitMQ | Kafka | |
---|---|---|---|
開發(fā)語言 | Java | Erlang | Java |
支持協(xié)議 | OpenWire、STOMP、REST较沪、XMP、AMQP | AMQP | AMQP |
事物 | 支持 | 支持 | 不支持 |
集群 | 支持 | 支持 | 支持 |
負載均衡 | 支持 | 支持 | 支持 |
動態(tài)擴容 | 不支持 | 不支持 | 支持(zk) |
礎知識
- 消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序
- 生產(chǎn)者:(Producer) :向broker發(fā)布消息的應用程序
- AMQP服務端(broker):用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務器中的隊列失仁,便于fafka將生產(chǎn)者發(fā)送的消息尸曼,動態(tài)的添加到磁盤并給每一條消息一個偏移量,所以對于kafka一個broker就是一個應用程序的實例
- 主題(Topic):一個主題類似新聞中的體育萄焦、娛樂控轿、教育等分類概念,在實際工程中通常一個業(yè)務一個主題
-分區(qū)(Partition):一個Topic中的消息數(shù)據(jù)按照多個分區(qū)組織拂封,分區(qū)是kafka消息隊列組織的最小單位茬射,一個分區(qū)可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列 - 每一個分區(qū)都可以有多個副本冒签,以防止數(shù)據(jù)的丟失
- 某一個分區(qū)中的數(shù)據(jù)如果需要更新在抛,都必須通過該分區(qū)所有副本中的leader來更新
- 消費者可以分組,比如有兩個消費者組A和B萧恕,共同消費一個topic:order_info,A和B所消費的消息不會重復刚梭,比如 order_info 中有100個消息肠阱,每個消息有一個id,編號從0-99,那么朴读,如果A組消費0-49號屹徘,B組就消費50-99號
- 消費者在具體消費某個topic中的消息時,可以指定起始偏移量
kafka分區(qū)是提高kafka性能的關鍵所在磨德,當你發(fā)現(xiàn)你的集群性能不高時缘回,常用手段就是增加Topic的分區(qū),分區(qū)里面的消息是按照從新到老的順序進行組織典挑,
消費者從隊列頭訂閱消息酥宴,生產(chǎn)者從隊列尾添加消息
Kafka架構
生產(chǎn)者生產(chǎn)消息、kafka集群您觉、消費者獲取消息這樣一種架構拙寡,如下圖:
kafka集群中的消息,是通過Topic(主題)來進行組織的琳水,如下圖:
工作圖
Kafka集群搭建
Kafka集群是把狀態(tài)保存在Zookeeper中的肆糕,首先要搭建Zookeeper集群
搭建Zookeeper集群
這里三臺服務器分別是
192.1682.158
192.1682.152
192.1682.150
在三臺服務器上分別安裝kafka
kafka官網(wǎng)下載地址 http://kafka.apache.org/downloads
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.0.0/ /usr/local/kafka
配置文件說明
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
#listeners=PLAINTEXT://192.168.2.152:9092 #當前kafka對外提供服務的端口默認是9092
num.network.threads=3 #這個是borker進行網(wǎng)絡處理的線程數(shù)
num.io.threads=8 #這個是borker進行I/O處理的線程數(shù)
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄在孝,這個目錄可以配置為“诚啃,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數(shù)這個目錄私沮,如果配置多個目錄始赎,新創(chuàng)建的topic他把消息持久化的地方是,當前以逗號分割的目錄中仔燕,那個分區(qū)數(shù)最少就放那一個
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小造垛,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送晰搀,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小五辽,當數(shù)據(jù)到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區(qū)數(shù)外恕,一個topic默認1個分區(qū)數(shù)
log.retention.hours=168 #默認消息的最大持久化時間杆逗,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數(shù)鳞疲,如果一個副本失效了罪郊,另一個還可以繼續(xù)提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數(shù)
log.segment.bytes=1073741824 #這個參數(shù)是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候建丧,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 )排龄,到目錄查看是否有過期的消息如果有波势,刪除
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181#設置zookeeper的連接端口
主要修改配置這幾個地方
#每臺服務器的broker.id都不能相同
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181
三臺服務器分別啟動Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
檢查服務是否啟動
jps
在kafka集群中創(chuàng)建一個topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.152:2181 --replication-factor 3 --partitions 1 --topic order
解釋:
--replication-factor 3 #復制兩份
--partitions 1 #創(chuàng)建1個分區(qū)
--topic #主題為order
查看一下自己創(chuàng)建的topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.152:2181
在192.168.2.152機器上創(chuàng)建一個producer翎朱,發(fā)布者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.152:9092 --topic order
在192.168.2.150與192.168.2.158機器上分別創(chuàng)建一個consumer橄维,消費者者
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.150:9092 --topic order --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.158:9092 --topic order --from-beginning
測試
在發(fā)布者機器上輸入內容
在兩臺消費者機器上可以看到
并且在zookeeper可以到kafka的一些情況
zkCli.sh -server 192.168.2.152:2181
上面的顯示結果中:只有zookeeper是,zookeeper原生的拴曲,其他都是Kafka創(chuàng)建的
標注一個重要的
get /brokers/ids/0
刪除topic命令
bin/kafka-topics.sh --delete --zookeeper ip:port --topic order
查看某個Topic的詳情
/usr/local/kafka/bin/kafka-topics.sh --topic order --describe --zookeeper 192.168.2.152:2181
- PartitionCount 分區(qū)數(shù)量
- ReplicationFactor 復制因子數(shù)量
- leader 是在給出的所有partitons中負責讀寫的節(jié)點争舞,每個節(jié)點都有可能成為leader
- replicas 顯示給定partiton所有副本所存儲節(jié)點的節(jié)點列表,不管該節(jié)點是否是leader或者是否存活澈灼。
- isr 副本都已同步的的節(jié)點集合竞川,這個集合中的所有節(jié)點都是存活狀態(tài),并且跟leader同步