原則: kafka版本!kafka broker及spring-boot配置看官網(wǎng),看kafka源碼, 源碼, 源碼 !!!
https://www.oneapm.com/ci/kafka.html
https://segmentfault.com/a/1190000016153221 (Flume + zk + kafka)
1.kafka簡介
1.1 適用場景
#1.Messaging
對于一些常規(guī)的消息系統(tǒng),kafka是個不錯的選擇;
partitons/replication和容錯,可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢.
不過到目前為止,我們應(yīng)該很清楚認(rèn)識到,
kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級特性;
kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)
#2.Websit activity tracking
kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;
可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中,并實時監(jiān)控,或者離線統(tǒng)計分析等
#3.Log Aggregation
kafka的特性決定它非常適合作為"日志收集中心";
application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;
kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.
此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng).
#4.Metrics
kafka通常用于業(yè)務(wù)數(shù)據(jù)監(jiān)測萍聊,還可以用于統(tǒng)計和匯總來自分布式應(yīng)用程序中產(chǎn)生的批量業(yè)務(wù)數(shù)據(jù)赌朋。
#5.Stream Processing
#6.Event Sourcing
事件源是應(yīng)用程序設(shè)計的一種樣式它匕,可以按時間序列記錄其狀態(tài)變化的日志涕蚤。
#7.Commit Log
kafka可以作為分布式系統(tǒng)的一種外部提交日志系統(tǒng)穆碎。
通過日志在節(jié)點(diǎn)之間復(fù)制數(shù)據(jù)玻粪,運(yùn)用同步機(jī)制為宕機(jī)節(jié)點(diǎn)恢復(fù)其數(shù)據(jù)窜管。
kafka中的日志壓縮特性有助于支持這種用法橄仍。
1.1.1 為什么需要消息系統(tǒng)
#1.解耦:
允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程韧涨,只要確保它們遵守同樣的接口約束。
#2.冗余:
消息隊列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理侮繁,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險虑粥。
許多消息隊列所采用的”插入-獲取-刪除”范式中,再把一個消息從隊列中刪除之前宪哩,
需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢娩贷,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
#3.擴(kuò)展性:
因為消息隊列解耦了你的處理過程锁孟,所以增大消息入隊和處理的頻率是很容易的彬祖,只要另外增加處理過程即可。
#4.靈活性 & 峰值處理能力:
在訪問量劇增的情況下品抽,應(yīng)用仍然需要繼續(xù)發(fā)揮作用储笑,但是這樣的突發(fā)流量并不常見。
如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費(fèi)圆恤。
使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力突倍,而不會因為突發(fā)的超負(fù)荷的請求而完全崩潰。
#5.可恢復(fù)性:
系統(tǒng)的一部分組件失效時盆昙,不會影響到整個系統(tǒng)羽历。
消息隊列降低了進(jìn)程間的耦合度,
所以即使一個處理消息的進(jìn)程掛掉淡喜,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理秕磷。
#6.順序保證:
在大多使用場景下,數(shù)據(jù)處理的順序都很重要炼团。
大部分消息隊列本來就是排序的澎嚣,并且能保證數(shù)據(jù)會按照特定的順序來處理。
(Kafka 保證一個 Partition 內(nèi)的消息的有序性)
#7.緩沖:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度们镜,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況币叹。
#8.異步通信:
很多時候,用戶不想也不需要立即處理消息模狭。
消息隊列提供了異步處理機(jī)制颈抚,允許用戶把一個消息放入隊列,但并不立即處理它。
想向隊列中放入多少消息就放多少贩汉,然后在需要的時候再去處理它們驱富。
1.2 什么是kafka
它是一個分布式消息系統(tǒng),由linkedin使用scala編寫匹舞,
用作LinkedIn的活動流(Activity Stream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)褐鸥。
具有高水平擴(kuò)展和高吞吐量。
適合小數(shù)據(jù)傳輸赐稽。
1.3 Kafka和其他主流分布式消息系統(tǒng)的對比
標(biāo)簽 | ActiveMQ | RabbitMQ | Kafka |
---|---|---|---|
所屬公司/社區(qū) | Apache | Mozilla Public License | Apache/LinkedIn |
開發(fā)語言 | java | Erlang | java |
支持的協(xié)議 | AMQP,REST等 | AMQP | 仿AMQP |
事務(wù) | 支持 | 不支持 | 不支持 |
集群 | 支持 | 支持 | 支持 |
負(fù)載均衡 | 支持 | 支持 | 不支持 |
動態(tài)擴(kuò)容 | 不支持 | 不支持 | 支持(依托于zk) |
單機(jī)吞吐量 | 萬級 | 萬級 | 10萬級 |
topic數(shù)量對吞吐量的影響 | - | - | topic從幾十個到幾百個的時候叫榕,吞吐量會大幅度下降,所以在同等機(jī)器下,kafka盡量保證topic數(shù)量不要過多姊舵。如果要支撐大規(guī)模topic晰绎,需要增加更多的機(jī)器資源 |
時效性 | ms級 | 微秒級 | ms級 |
可用性 | 高,主從架構(gòu)實現(xiàn) | 高,主從架構(gòu)實現(xiàn) | 非常高,kafka是分布式的括丁,一個數(shù)據(jù)多個副本荞下,少數(shù)機(jī)器宕機(jī),不會丟失數(shù)據(jù)史飞,不會導(dǎo)致不可用 |
消息可靠性 | 有較低的概率丟失數(shù)據(jù) | - | 經(jīng)過參數(shù)優(yōu)化配置尖昏,消息可以做到0丟失 |
1.4Kafka相關(guān)概念
1.4.1 AMQP協(xié)議 & JMS
#Advanced Message Queuing Protocol (高級消息隊列協(xié)議)
The Advanced Message Queuing Protocol (AMQP):
是一個標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件(Message Oriented Middleware)協(xié)議。
AMQP定義了通過網(wǎng)絡(luò)發(fā)送的字節(jié)流的數(shù)據(jù)格式构资。
因此兼容性非常好内地,任何實現(xiàn)AMQP協(xié)議的程序都可以和與AMQP協(xié)議兼容的其他程序交互唉工,
可以很容易做到跨語言讯蒲,跨平臺童谒。
#kafka的仿AMQP協(xié)議與之的區(qū)別
kafka和JMS(Java Message Service)實現(xiàn)(activeMQ)不同的是:
即使消息被消費(fèi),消息仍然不會被立即刪除.
日志文件將會根據(jù)broker中的配置要求,保留一定的時間之后刪除;
比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費(fèi).
kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費(fèi)之后對文件內(nèi)容改動的磁盤IO開支.
對于consumer而言,它需要保存消費(fèi)消息的offset,對于offset的保存和使用,由consumer來控制;
當(dāng)consumer正常消費(fèi)消息時,offset將會"線性"的向前驅(qū)動,即消息將依次順序被消費(fèi).
事實上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值(offset將會保存在zookeeper中).
"這里不建議cousumer配置自動提交offset"
1.4.2 基本概念
#1塞帐、主題(Topic)(數(shù)據(jù)邏輯存儲單元):
每條發(fā)送到broker的消息都有一個類別拦赠,這個類別稱為topic,即 kafka 是面向 topic 的葵姥。
"一個topic就是一個queue, 一個隊列"
#2荷鼠、分區(qū)(Partition)(數(shù)據(jù)物理存儲單元):
[kafka-topics.sh 工具可以動態(tài)創(chuàng)建刪除查看更新topic, 修改partition.
只能增加partition數(shù)量, 不能減少, 除非刪除重建.]
partition 是物理上的概念,每個 topic 包含一個或多個 partition榔幸。
kafka 分配的單位是 partition允乐。
一個Topic中的消息數(shù)據(jù)按照多個分區(qū)組織,分區(qū)是kafka消息隊列組織的最小單位削咆,
一個分區(qū)可以看作是一個FIFO( First Input First Output的縮寫牍疏,先入先出隊列)的隊列。
kafka分區(qū)是提高kafka性能的關(guān)鍵所在拨齐,當(dāng)你發(fā)現(xiàn)你的集群性能不高時鳞陨,
常用手段就是增加Topic的分區(qū),分區(qū)里面的消息是按照從新到老的順序進(jìn)行組織瞻惋,
消費(fèi)者從隊列頭訂閱消息厦滤,生產(chǎn)者從隊列尾添加消息援岩。
Partition在服務(wù)器上的表現(xiàn)形式就是一個一個的文件夾,
每個partition的文件夾下面會有多組segment文件掏导,
每組segment文件又包含.index文件享怀、.log文件、.timeindex文件(早期版本中沒有)三個文件趟咆,
log文件就實際是存儲message的地方添瓷,而index和timeindex文件為索引文件,用于檢索消息值纱。
###"Message結(jié)構(gòu)":
>> offset:offset是一個占8byte的有序id號仰坦,它可以唯一確定每條消息在parition內(nèi)的位置!
kafka的存儲文件都是按照offset.kafka來命名计雌,用offset做名字的好處是方便查找悄晃。
例如你想找位于2049的位置,只要找到2048.kafka的文件即可凿滤。
當(dāng)然the first offset就是00000000000.kafka妈橄。
>> 消息大小:消息大小占用4byte翁脆,用于描述消息的大小眷蚓。
>> 消息體:消息體存放的是實際的消息數(shù)據(jù)(被壓縮過),占用的空間根據(jù)具體的消息而不一樣反番。
#3沙热、備份(Replication)(分為leader和follower):
[kafka-reassign-partitions.sh工具可用來動態(tài)增加Replications數(shù)量.]
為了保證分布式可靠性,kafka0.8開始對每個分區(qū)的數(shù)據(jù)進(jìn)行備份(不同的Broker上)罢缸,
防止其中一個Broker宕機(jī)造成分區(qū)上的數(shù)據(jù)不可用篙贸。
Replication數(shù)量不能超過brokers數(shù)量, 否則創(chuàng)建topic時會報錯.
#4.偏移量(offset)
kafka為每條在分區(qū)的消息保存一個偏移量offset,這也是消費(fèi)者在分區(qū)的位置枫疆。
比如一個偏移量是5的消費(fèi)者爵川,表示已經(jīng)消費(fèi)了從0-4偏移量的消息,下一個要消費(fèi)的消息的偏移量是5
#5.消費(fèi)者:(Consumer):
從消息隊列中請求消息的客戶端應(yīng)用程序
一個消費(fèi)者組中的消費(fèi)者數(shù)量不要超過 topic 的 partition 的數(shù)量,
否則多出的消費(fèi)者將會被限制, 不去消費(fèi)任何消息.
#6.生產(chǎn)者:(Producer) :
向broker發(fā)布消息的應(yīng)用程序
#7.kafka實例(broker):
Kafka支持水平擴(kuò)展息楔,一般broker數(shù)量越多寝贡,集群吞吐率越高。
Kafka中使用Broker來接受Producer和Consumer的請求值依,并把Message持久化到本地磁盤圃泡。
每個Cluster當(dāng)中會選舉出一個Broker來擔(dān)任Controller,負(fù)責(zé)處理Partition的Leader選舉愿险,協(xié)調(diào)Partition遷移等工作颇蜡。
#8.Consumer group:
high-level consumer API 中,
每個 consumer 都屬于一個 consumer group,
每條消息只能被 consumer group 中的一個 Consumer 消費(fèi)澡匪,
但可以被多個 consumer group 消費(fèi)熔任。
kafka確保每個partition中的一條消息只能被某個consumer group中的一個consumer消費(fèi)
kafka通過group coordinate管理consumer實例負(fù)責(zé)消費(fèi)哪個partition, 默認(rèn)支持range和round-robin消費(fèi)
kafka在zk中保存了每個topic,每個partition在不同group的消費(fèi)偏移量(offset), 通過更新偏移量, 保證每條消息都被消費(fèi)
需要注意的是, 用多線程讀消息時, 一個線程相當(dāng)于一個consumer實例, 當(dāng)consumer數(shù)量大于partition數(shù)量時, 有些線程讀不到數(shù)據(jù)
#9.leader:
Replication中的一個角色, producer 和 consumer 只跟 leader 交互唁情。
每個Replication集合中的Partition都會選出一個唯一的Leader疑苔,所有的讀寫請求都由Leader處理。
其他Replicas從Leader處把數(shù)據(jù)更新同步到本地甸鸟,過程類似大家熟悉的MySQL中的Binlog同步惦费。
#10.follower:
Replication中的一個角色,從 leader 中復(fù)制數(shù)據(jù)抢韭。
#11.controller:
kafka 集群中的其中一個服務(wù)器薪贫,用來進(jìn)行 leader election 以及 各種 failover。
#12.zookeeper:
kafka 通過 zookeeper 來存儲集群的 meta 信息刻恭。
#13.ISR(In-Sync Replica):
是Replicas的一個子集瞧省,表示目前Alive且與Leader能夠“Catch-up”的Replicas集合。
由于讀寫都是首先落到Leader上鳍贾,
所以一般來說通過同步機(jī)制從Leader上拉取數(shù)據(jù)的Replica都會和Leader有一些延遲(包括了延遲時間和延遲條數(shù)兩個維度)鞍匾,
任意一個超過閾值都會把該Replica踢出ISR。每個Partition都有它自己獨(dú)立的ISR骑科。
#14. Segment
每個Partition包含一個或多個Segment橡淑,每個Segment包含一個數(shù)據(jù)文件和一個與之對應(yīng)的索引文件。
#15.kafka支持的客戶端語言:
Kafka客戶端支持當(dāng)前大部分主流語言咆爽,
包括:C梁棠、C++、Erlang斗埂、Java符糊、.net、perl蜜笤、PHP濒蒋、Python、Ruby把兔、Go、Javascript
可以使用以上任何一種語言和kafka服務(wù)器進(jìn)行通信
(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)
#14."總之流程可以描述為"
應(yīng)用程序A(producer)--將message按照topic分類-->push到kafka服務(wù)器集群(broker)中
應(yīng)用程序B(consumer)--從kafka服務(wù)器集群(broker)中pull消息(message)
工作圖如下:
#一個典型的Kafka集群中包含:
● 若干Producer(可以是web前端產(chǎn)生的Page View瓮顽,或者是服務(wù)器日志县好,系統(tǒng)CPU、Memory等)
● 若干broker(Kafka支持水平擴(kuò)展暖混,一般broker數(shù)量越多缕贡,集群吞吐率越高)
● 若干Consumer Group,以及一個Zookeeper集群。
Kafka通過Zookeeper管理集群配置晾咪,選舉partition 的 leader收擦,
以及在Consumer Group發(fā)生變化時進(jìn)行rebalance。
#一個簡單的消息發(fā)送流程如下:
● Producer根據(jù)指定的partition方法(round-robin谍倦、hash等)塞赂,將消息push到對應(yīng)topic的partition里面
● kafka集群(brokers)接收到Producer發(fā)過來的消息后,將其持久化到硬盤昼蛀,并保留消息指定時長(可配置), 但不關(guān)注消息是否被消費(fèi)宴猾。
● Consumer從kafka集群pull數(shù)據(jù),并控制獲取消息的offset
1.4.3 kafka的動態(tài)擴(kuò)容
#Kafka的動態(tài)擴(kuò)容是通過Zookeeper來實現(xiàn)的叼旋。
kafka使用zookeeper來存儲一些meta信息,并使用了zookeeper watch機(jī)制
來發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動作(比如consumer失效,觸發(fā)負(fù)載均衡等)
1) Producer端使用zookeeper用來"發(fā)現(xiàn)"broker列表,
并與Topic下每個partition leader建立socket連接并發(fā)送消息.
2) Broker端使用zookeeper用來注冊broker信息,以及監(jiān)測partition leader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,
其中包括consumer消費(fèi)的partition列表等,同時也用來發(fā)現(xiàn)broker列表,
并和partition leader建立socket連接,并拉取消息.
Zookeeper是一種在分布式系統(tǒng)中被廣泛用來作為:
分布式狀態(tài)管理仇哆、分布式協(xié)調(diào)管理、分布式配置管理夫植、和分布式鎖服務(wù)的集群讹剔。
kafka增加和減少服務(wù)器都會在Zookeeper節(jié)點(diǎn)上觸發(fā)相應(yīng)的事件kafka系統(tǒng)會捕獲這些事件,
進(jìn)行新一輪的負(fù)載均衡详民,客戶端也會捕獲這些事件來進(jìn)行新一輪的處理辟拷。
kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息由zookeeper保存;
因此producer和consumer的客戶端實現(xiàn)非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
1.4.4 kafka中producer的ack機(jī)制
Kafka的ack機(jī)制,指的是producer的消息發(fā)送確認(rèn)機(jī)制阐斜,與Kafka集群的吞吐量和消息可靠性密切相關(guān)衫冻。
acks有一些個可選值。
#acks=1(默認(rèn)值, 字符串類型)
producer只要收到一個分區(qū)副本(leader副本)成功寫入的通知就認(rèn)為推送消息成功了谒出。
只有l(wèi)eader副本成功寫入了隅俘,producer才會認(rèn)為消息發(fā)送成功。
#acks=0
producer發(fā)送一次就不再發(fā)送了笤喳,不管是否發(fā)送成功为居。安全性最低但是效率最高。
#ack=-1
producer只有收到分區(qū)內(nèi)所有副本的成功寫入的通知才認(rèn)為推送消息成功了杀狡。安全性最高蒙畴,但是效率最低。
#ack=n
producer只有收到n個分區(qū)副本(leader副本)成功寫入的通知才認(rèn)為推送消息成功了呜象。
----------------------------------------------
#spring-boot中指定acks類型
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: -1
----------------------------------------------
1.4.5 kafka的partition進(jìn)行消息的write-copy-read, 以及l(fā)eader選舉&高可用機(jī)制
#1.producer向brokers發(fā)送消息
當(dāng)我們向某個服務(wù)器發(fā)送請求的時候膳凝,服務(wù)端可能會對請求做一個負(fù)載,將流量分發(fā)到不同的服務(wù)器恭陡,
那在kafka中蹬音,如果某個topic有多個partition,producer又怎么知道該將數(shù)據(jù)發(fā)往哪個partition呢休玩?
producer會和Topic下所有partition leader保持socket連接;
其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經(jīng)注冊了watch用來監(jiān)聽partition leader的變更事件.
消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何"路由層".
事實上,消息被路由到哪個partition上,由producer客戶端決定.
可以采用"random""key-hash""round-robin"等來實現(xiàn)消息負(fù)載均衡.
-----------------------------------------
#producer發(fā)送消息的邏輯:
#org.apache.kafka.clients.producer.ProducerRecord源碼
#org.springframework.kafka:spring-kafka:2.2.6.RELEASE中org.springframework.kafka.core.KafkaTemplate提供了發(fā)送消息的api
1.partition在寫入的時候可以指定需要寫入的partition著淆,如果有指定劫狠,則寫入對應(yīng)的partition。
2.如果沒有指定partition永部,但是設(shè)置了數(shù)據(jù)的key独泞,則會根據(jù)key的值hash出一個partition。
3.如果沒有指定key值并且可用分區(qū)個數(shù)大于0時苔埋,在就可用分區(qū)中做輪詢決定改消息分配到哪個partition懦砂。
4.如果沒有指定key值并且沒有可用分區(qū)時,在所有分區(qū)中輪詢決定改消息分配到哪個partition讲坎。
----------------------------------------
#異步發(fā)送:
將多條消息暫且在客戶端buffer起來孕惜,并將他們批量的發(fā)送到broker,
小數(shù)據(jù)IO太多晨炕,會拖慢整體的網(wǎng)絡(luò)延遲衫画,批量延遲發(fā)送事實上提升了網(wǎng)絡(luò)效率。
不過這也有一定的隱患瓮栗,比如說當(dāng)producer失效時削罩,那些尚未發(fā)送的消息將會丟失。
----------------------------------------
#kafka-0.8.2之后, producer均為異步方式(async)
----------------------------------------
#Producer均衡算法
kafka集群中的任何一個broker,都可以向producer提供metadata信息,
這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節(jié)點(diǎn)信息).
當(dāng)producer獲取到metadata信心之后, producer將會和Topic下所有partition leader保持socket連接;
消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何"路由層".
事實上,消息被路由到哪個partition上,由producer客戶端決定.
比如可以采用"random""key-hash""輪詢"等.
在producer端的配置文件中,開發(fā)者可以指定partition路由的方式.
#2.consumer從brokers中消費(fèi)消息
"
一個消費(fèi)者組中的消費(fèi)者數(shù)量不要超過 topic 的 partition 的數(shù)量,
否則多出的消費(fèi)者將會被限制, 不去消費(fèi)任何消息.
"
>>> 問:"在kafka中, 每個 Topic 一般會有很多個 partitions费奸。
為了使得我們能夠及時消費(fèi)消息弥激,我們也可能會啟動多個 Consumer 去消費(fèi),
而每個 Consumer 又會啟動一個或多個streams去分別消費(fèi) Topic 里面的數(shù)據(jù)愿阐。
而同一個Consumer Group內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題(subscribed topics)的所有分區(qū)(partition)微服。
當(dāng)然,每個 partition 只能由同一個消費(fèi)組內(nèi)的一個consumer來消費(fèi)缨历。
那么問題來了以蕴,同一個 Consumer Group 里面的 Consumer 是如何知道該消費(fèi)哪些partition里面的數(shù)據(jù)呢?"
>>> 答: 這里由兩種策略:
// 策略一:Range strategy
Range策略是對每個主題而言的辛孵,首先對同一個主題里面的分區(qū)按照序號進(jìn)行排序丛肮,并對消費(fèi)者按照字母順序進(jìn)行排序。
然后將partitions的個數(shù)除于消費(fèi)者線程的總數(shù)來決定每個消費(fèi)者線程消費(fèi)幾個分區(qū)魄缚。
如果除不盡宝与,那么前面幾個消費(fèi)者線程將會多消費(fèi)一個分區(qū)。
// 策略二: RoundRobin strategy
將所有主題的分區(qū)組成 TopicAndPartition 列表冶匹,然后對 TopicAndPartition 列表按照 hashCode 進(jìn)行排序,
最后按照round-robin風(fēng)格將分區(qū)分別分配給不同的消費(fèi)者線程习劫。
使用RoundRobin策略有兩個前提條件必須滿足:
>> 同一個Consumer Group里面的所有消費(fèi)者的num.streams必須相等;
>> 每個消費(fèi)者訂閱的主題必須相同徙硅。
-------------------------------------------
通過partition.assignment.strategy參數(shù)選擇 range 或 roundrobin榜聂。
partition.assignment.strategy參數(shù)默認(rèn)的值是range。
#spring-boot中.yml配置
需考慮
-------------------------------------------
#Consumer均衡算法
當(dāng)一個group中,有consumer加入或者離開時,會觸發(fā)partitions均衡.
均衡的最終目的,是提升topic的并發(fā)消費(fèi)能力.
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根據(jù)partition索引號對partitions排序: P0,P1,P2,P3
4) 根據(jù)consumer.id排序: C0,C1
5) 計算倍數(shù): M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
#其他
consumer端向broker發(fā)送"fetch"請求,并告知其獲取消息的offset;
此后consumer將會獲得一定條數(shù)的消息;consumer端也可以重置offset來重新消費(fèi)消息.
在JMS實現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.
不過在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull(或者說fetch)消息;
consumer端可以根據(jù)自己的消費(fèi)能力適時的去fetch消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset);
并可以良好的控制消息消費(fèi)的數(shù)量,batch fetch.
其他JMS實現(xiàn),消息消費(fèi)的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒有消費(fèi)成功的消息重發(fā)等,同時還要控制消息的狀態(tài).
這就要求JMS broker需要太多額外的工作.
在kafka中,partition中的消息只有一個consumer在消費(fèi),且不存在消息狀態(tài)的控制,
也沒有復(fù)雜的消息確認(rèn)機(jī)制,可見kafka broker端是相當(dāng)輕量級的.
當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,
并間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級.
#3.broker間的消息復(fù)制(replication機(jī)制)
kafka將每個partition數(shù)據(jù)復(fù)制到多個server上,任何都有一個partition有一個leader和任意個follower,
備份(Replication=leader+follower)的個數(shù)可以通過broker配置文件來設(shè)定.
leader處理所有的read-write請求,follower需要和leader保持同步.
follower和consumer一樣,消費(fèi)消息并保存在本地日志中;
leader負(fù)責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.
即使只有一個replicas實例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.
當(dāng)leader失效時,需在followers中選取出新的leader,可能此時follower落后于leader,
因此需要選擇一個"up-to-date"的follower.
選擇follower時需要兼顧一個問題,就是新leader server上所已經(jīng)承載的partition leader的個數(shù),
如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.
在選舉新leader,需要考慮到"負(fù)載均衡".
#summary
對于Kafka而言嗓蘑,定義一個Broker是否“活著”包含兩個條件:
一是它必須維護(hù)與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機(jī)制來實現(xiàn))须肆。
二是Follower必須能夠及時將Leader的消息復(fù)制過來,不能“落后太多”桩皿。
基于ISR的數(shù)據(jù)復(fù)制方案
Kafka的數(shù)據(jù)復(fù)制是以Partition為單位的豌汇。而多個備份間的數(shù)據(jù)復(fù)制,通過Follower向Leader拉取數(shù)據(jù)完成泄隔。
從一這點(diǎn)來講拒贱,Kafka的數(shù)據(jù)復(fù)制方案接近于mysql的Master-Slave方案。
不同的是佛嬉,Kafka既不是完全的同步復(fù)制逻澳,也不是完全的異步復(fù)制,而是基于ISR的動態(tài)復(fù)制方案暖呕。
#ISR斜做,也即In-sync Replica。
每個Partition的Leader都會維護(hù)這樣一個列表湾揽,該列表中瓤逼,包含了所有與之同步的Replica(包含Leader自己)。
每次數(shù)據(jù)寫入時库物,只有ISR中的所有Replica都復(fù)制完霸旗,Leader才會將其置為Commit,它才能被Consumer所消費(fèi)戚揭。
這種方案诱告,與同步復(fù)制非常接近。
但不同的是民晒,這個ISR是由Leader動態(tài)維護(hù)的精居。
如果Follower不能緊“跟上”Leader,它將被Leader從ISR中移除镀虐,
待它又重新“跟上”Leader后箱蟆,會被Leader再次加加ISR中。
每次改變ISR后刮便,Leader都會將最新的ISR持久化到Zookeeper中空猜。
至于如何判斷某個Follower是否“跟上”Leader,不同版本的Kafka的策略稍微有些區(qū)別恨旱。
1) 對于0.8.*版本
如果Follower在`replica.lag.time.max.ms`時間內(nèi)未向Leader發(fā)送Fetch請求(也即數(shù)據(jù)復(fù)制請求), 則Leader會將其從ISR中移除辈毯。
如果某Follower持續(xù)向Leader發(fā)送Fetch請求,但是它與Leader的數(shù)據(jù)差距在`replica.lag.max.messages`以上搜贤,也會被Leader從ISR中移除谆沃。
2) 從0.9.0.0版本
`replica.lag.max.messages`被移除,故Leader不再考慮Follower落后的消息條數(shù)仪芒。
另外唁影,Leader不僅會判斷Follower是否在`replica.lag.time.max.ms`時間內(nèi)向其發(fā)送Fetch請求耕陷,
同時還會考慮Follower是否在該時間內(nèi)與之保持同步。
3) 0.10.* 版本的策略與0.9.*版一致
對于0.8.*版本的`replica.lag.max.messages`參數(shù)据沈,既然只有ISR中的所有Replica復(fù)制完后的消息才被認(rèn)為Commit哟沫,
那為何會出現(xiàn)Follower與Leader差距過大的情況。
原因在于锌介,Leader并不需要等到前一條消息被Commit才接收后一條消息嗜诀。
事實上,Leader可以按順序接收大量消息孔祸,最新的一條消息的Offset被記為High Wartermark隆敢。
而只有被ISR中所有Follower都復(fù)制過去的消息才會被Commit,Consumer只能消費(fèi)被Commit的消息崔慧。
由于Follower的復(fù)制是嚴(yán)格按順序的拂蝎,所以被Commit的消息之前的消息肯定也已經(jīng)被Commit過。
換句話說尊浪,High Watermark標(biāo)記的是Leader所保存的最新消息的offset匣屡,
而Commit Offset標(biāo)記的是最新的可被消費(fèi)的(已同步到ISR中的Follower)消息。
而Leader對數(shù)據(jù)的接收與Follower對數(shù)據(jù)的復(fù)制是異步進(jìn)行的拇涤,因此會出現(xiàn)Commit Offset與High Watermark存在一定差距的情況捣作。
0.8.*版本中`replica.lag.max.messages`限定了Leader允許的該差距的最大值。
#Kafka基于ISR的數(shù)據(jù)復(fù)制方案原理如下圖所示鹅士。
[圖片上傳失敗...(image-9c96fb-1570760404167)]
// 第一步
Leader A總共收到3條消息券躁,故其high watermark為3,
但由于ISR中的Follower只同步了第1條消息(m1)掉盅,故只有m1被Commit也拜,也即只有m1可被Consumer消費(fèi)。
此時Follower B與Leader A的差距是1趾痘,而Follower C與Leader A的差距是2慢哈,
均未超過默認(rèn)的`replica.lag.max.messages`,故得以保留在ISR中永票。
// 第二步
由于舊的Leader A宕機(jī)卵贱,新的Leader B在`replica.lag.time.max.ms`時間內(nèi)未收到來自A的Fetch請求,
故將A從ISR中移除侣集,此時ISR={B键俱,C}夷磕。
同時空扎,由于此時新的Leader B中只有2條消息岳链,并未包含m3(m3從未被任何Leader所Commit)浮还,所以m3無法被Consumer消費(fèi)。
// 第四步
Follower A恢復(fù)正常蛇捌,它先將宕機(jī)前未Commit的所有消息全部刪除伞矩,
然后從最后Commit過的消息的下一條消息開始追趕新的Leader B渊额,
直到它“趕上”新的Leader,才被重新加入新的ISR中杯瞻。
# 使用ISR方案的原因
1) 由于Leader可移除不能及時與之同步的Follower镐牺,
故與同步復(fù)制相比可避免最慢的Follower拖慢整體速度炫掐,也即ISR提高了系統(tǒng)可用性魁莉。
2) ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費(fèi)募胃,
故從Consumer的角度而言旗唁,ISR中的所有Replica都始終處于同步狀態(tài),從而與異步復(fù)制方案相比提高了數(shù)據(jù)一致性痹束。
3) ISR可動態(tài)調(diào)整检疫,極限情況下,可以只包含Leader祷嘶,極大提高了可容忍的宕機(jī)的Follower的數(shù)量屎媳。
與`Majority Quorum`方案相比,容忍相同個數(shù)的節(jié)點(diǎn)失敗论巍,所要求的總節(jié)點(diǎn)數(shù)少了近一半烛谊。
# ISR相關(guān)配置說明---->important!!!!!
1) Broker的`min.insync.replicas`參數(shù)指定了Broker所要求的ISR最小長度,默認(rèn)值為1嘉汰。
也即極限情況下ISR可以只包含Leader丹禀。
但此時如果Leader宕機(jī),則該P(yáng)artition不可用鞋怀,可用性得不到保證双泪。
2) 只有被ISR中所有Replica同步的消息才被Commit,但Producer發(fā)布數(shù)據(jù)時密似,
Leader并不需要ISR中的所有Replica同步該數(shù)據(jù)才確認(rèn)收到數(shù)據(jù)焙矛。Producer可以通過`acks`參數(shù)指定最少需要多少個Replica確認(rèn)收到該消息才視為該消息發(fā)送成功。
`acks`的默認(rèn)值是1残腌,即Leader收到該消息后立即告訴Producer收到該消息村斟,
此時如果在ISR中的消息復(fù)制完該消息前Leader宕機(jī),那該條消息會丟失废累。
而如果將該值設(shè)置為0邓梅,則Producer發(fā)送完數(shù)據(jù)后,立即認(rèn)為該數(shù)據(jù)發(fā)送成功邑滨,
不作任何等待日缨,而實際上該數(shù)據(jù)可能發(fā)送失敗,并且Producer的Retry機(jī)制將不生效掖看。
更推薦的做法是匣距,將`acks`設(shè)置為`all`或者`-1`面哥,此時只有ISR中的所有Replica都收到該數(shù)據(jù)(也即該消息被Commit),
Leader才會告訴Producer該消息發(fā)送成功毅待,從而保證不會有未知的數(shù)據(jù)丟失尚卫。
ISR具體實現(xiàn)參 zero-copy 相關(guān)文章
partition與replication分配
#假定集群
broker: s1 s2 s3
topic: videoTopic
partition: 3
replication: 2
#partition 分區(qū)
就是數(shù)據(jù)的水平切分,比如上面的配置中把一個topic的數(shù)據(jù)分成3分進(jìn)行存儲尸红,
而且不同分區(qū)一般都是在不同的broker中吱涉。這個就是kafka的高擴(kuò)展性。
比如上面s1外里、s2怎爵、s3各有一個分區(qū)。
#replication 副本
replication的概念就是kafka的高可用性盅蝗,比如s1節(jié)點(diǎn)宕機(jī)了鳖链,那么s1節(jié)點(diǎn)的分區(qū)將變得不可用,
那么數(shù)據(jù)就會不完整墩莫,為了防止這個問題芙委,引入了replication概念,
比如replication配置成2狂秦,這就意味著一個partition有2個replication灌侣。
#說明
此處partition是3,假如partition索引分別是 0 1 2
此處replication是2故痊,那么就意味著partition 0顶瞳、1、2 都存在2個的意思愕秫。
也就是有2*3=6個分區(qū)慨菱。(0 0 1 1 2 2)
體現(xiàn)在kafka的存儲結(jié)構(gòu)就是
>> s1中有文件夾 videoTopic-0 videoTopic-1 :0和1 號partition 存在s1中
>> s2中有文件夾 videoTopic-0 videoTopic-2 :0和2號partition 在s2中
>> s3中有文件夾 videoTopic-1 videoTopic-2 :1和2號partition 在s3中
這里也存在均衡分配,比如0號replication有2個戴甩,會選出一個leader符喝。
1.4.6 高性能
>> 數(shù)據(jù)磁盤持久化:消息不在內(nèi)存中cache,直接寫入到磁盤甜孤,充分利用磁盤的順序讀寫性能协饲。
>> zero-copy:減少IO操作步驟。--> linux的 zero
>> 支持?jǐn)?shù)據(jù)批量發(fā)送和拉取缴川。
>> 支持?jǐn)?shù)據(jù)壓縮茉稠。
>> Topic劃分為多個partition,提高并行處理能力把夸。
>> producer端: 將消息buffer起來, 批量發(fā)送.
>> consumer端: 批量fetch消息.
>> 即所有的producer而线、broker和consumer都會有多個,均為分布式的。
Producer和broker之間沒有負(fù)載均衡機(jī)制膀篮。
broker和consumer之間利用zookeeper進(jìn)行負(fù)載均衡嘹狞。
所有broker和consumer都會在zookeeper中進(jìn)行注冊,且zookeeper會保存他們的一些元數(shù)據(jù)信息誓竿。
如果某個broker和consumer發(fā)生了變化磅网,所有其他的broker和consumer都會得到通知。
利用Partition實現(xiàn)并行處理
# Partition提供并行處理的能力
Kafka是一個Pub-Sub的消息系統(tǒng)筷屡,無論是發(fā)布還是訂閱涧偷,都須指定Topic。
Topic只是一個邏輯的概念速蕊。每個Topic都包含一個或多個Partition嫂丙,不同Partition可位于不同節(jié)點(diǎn)。
同時Partition在物理上對應(yīng)一個本地文件夾规哲,每個Partition包含一個或多個Segment,
每個Segment包含一個數(shù)據(jù)文件和一個與之對應(yīng)的索引文件诽表。
在邏輯上唉锌,可以把一個Partition當(dāng)作一個非常長的數(shù)組,可通過這個“數(shù)組”的索引(offset)去訪問其數(shù)據(jù)竿奏。
一方面袄简,由于不同Partition可位于不同機(jī)器,因此可以充分利用集群優(yōu)勢泛啸,實現(xiàn)機(jī)器間的并行處理绿语。
另一方面,由于Partition在物理上對應(yīng)一個文件夾候址,即使多個Partition位于同一個節(jié)點(diǎn)吕粹,
也可通過配置讓同一節(jié)點(diǎn)上的不同Partition置于不同的disk drive上,從而實現(xiàn)磁盤間的并行處理岗仑,充分發(fā)揮多磁盤的優(yōu)勢匹耕。
利用多磁盤的具體方法是,將不同磁盤mount到不同目錄荠雕,然后在server.properties中稳其,將`log.dirs`設(shè)置為多目錄(用逗號分隔)。
Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上炸卑。
注:雖然物理上最小單位是Segment既鞠,但Kafka并不提供同一Partition內(nèi)不同Segment間的并行處理。
因為對于寫而言盖文,每次只會寫Partition內(nèi)的一個Segment嘱蛋,而對于讀而言,也只會順序讀取同一Partition內(nèi)的不同Segment。
# Partition是最小并發(fā)粒度
多Consumer消費(fèi)同一個Topic時浑槽,同一條消息只會被同一Consumer Group內(nèi)的一個Consumer所消費(fèi)蒋失。
而數(shù)據(jù)并非按消息為單位分配,而是以Partition為單位分配桐玻,
也即同一個Partition的數(shù)據(jù)只會被一個Consumer所消費(fèi)(在不考慮Rebalance的前提下)篙挽。
如果Consumer的個數(shù)多于Partition的個數(shù),那么會有部分Consumer無法消費(fèi)該Topic的任何數(shù)據(jù)镊靴,
也即當(dāng)Consumer個數(shù)超過Partition后铣卡,增加Consumer并不能增加并行度。
簡而言之偏竟,Partition個數(shù)決定了可能的最大并行度煮落。
以Spark消費(fèi)Kafka數(shù)據(jù)為例,如果所消費(fèi)的Topic的Partition數(shù)為N踊谋,則有效的Spark最大并行度也為N蝉仇。
即使將Spark的Executor數(shù)設(shè)置為N+M,最多也只有N個Executor可同時處理該Topic的數(shù)據(jù)殖蚕。
1.4.7 kafka生成的日志文件(適配: kafka_2.12-2.3.0)
LogSegment概述
類別 | 作用 |
---|---|
.index | 偏移量索引文件 |
.timestamp | 時間戳索引文件 |
.log | 日志文件 |
.snaphot | 快照文件 |
.deleted | - |
.cleaned | 日志清理時臨時文件 |
.swap | Log Compaction 之后的臨時文件 |
Leader-epoch-checkpoint | - |
// 需要說明的是:
是 Kafka 消息存儲的信息文件內(nèi)容轿衔,不是所謂的 Kafka 服務(wù)器運(yùn)行產(chǎn)生的日志文件。
// LogSement
在分區(qū)日志文件中睦疫,你會發(fā)現(xiàn)很多類型的文件害驹,比如:
.index、.timestamp蛤育、.log宛官、.snapshot 等,其中瓦糕,文件名一致的文件集合就稱為 LogSement底洗。
分區(qū)日志文件中包含很多的 LogSegment ,Kafka 日志追加是順序?qū)懭氲模?LogSegment 可以減小日志文件的大小刻坊,進(jìn)行日志刪除的時候和數(shù)據(jù)查找的時候可以快速定位枷恕。
同時,ActiveLogSegment 也就是活躍的日志分段擁有文件擁有寫入權(quán)限谭胚,其余的 LogSegment 只有只讀的權(quán)限徐块。
每個 LogSegment 都有一個基準(zhǔn)偏移量,用來表示當(dāng)前 LogSegment 中第一條消息的 offset灾而。
偏移量是一個 64 位的長整形數(shù)胡控,固定是20位數(shù)字,長度未達(dá)到旁趟,用 0 進(jìn)行填補(bǔ)昼激,索引文件和日志文件都由該作為文件名命名規(guī)則
// 如果想要查看相應(yīng)文件內(nèi)容可以通過 kafka-run-class.sh 腳本查看 .log
#windows版如下
soft_for_dev\kafka_2.12-2.3.0\bin\windows\kafka-run-class.bat kafka.tools.DumpLogSegments --files tmp\kafka-logs\__consumer_offsets-2\00000000000000000000.log
"或"
soft_for_dev\kafka_2.12-2.3.0\bin\windows\kafka-dump-log.bat --files tmp\kafka-logs\__consumer_offsets-49\00000000000000000000.index
日志與索引文件
偏移量索引文件(.log)用于記錄消息偏移量與物理地址之間的映射關(guān)系。
時間戳索引文件(.timeindex)則根據(jù)時間戳查找對應(yīng)的偏移量。
Kafka 中的索引文件是以稀疏索引的方式構(gòu)造消息的索引橙困,他并不保證每一個消息在索引文件中都有對應(yīng)的索引項瞧掺。
每當(dāng)寫入一定量的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項凡傅,
通過修改 log.index.interval.bytes 的值辟狈,改變索引項的密度。
配置項 | 默認(rèn)值 | 說明 |
---|---|---|
log.index.interval.bytes | 4096 (4K) | 增加索引項字節(jié)間隔密度夏跷,會影響索引文件中的區(qū)間密度和查詢效率 |
log.segment.bytes | 1073741824 (1G) | 日志文件最大值 |
log.roll.ms | - | 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值允許的最大范圍哼转,毫秒維度 |
log.roll.hours | 168 (7天) | 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值允許的最大范圍,小時維度 |
log.index.size.max.bytes | 10485760 (10MB) | 觸發(fā)偏移量索引文件或時間戳索引文件分段字節(jié)限額 |
切分文件
// 日志文件和索引文件都會存在多個文件槽华,組成多個 SegmentLog壹蔓,那么其切分的規(guī)則是怎樣的呢?
當(dāng)滿足如下幾個條件中的其中之一猫态,就會觸發(fā)文件的切分:
1) 當(dāng)前日志分段文件的大小超過了 broker 端參數(shù) log.segment.bytes 配置的值佣蓉。
log.segment.bytes 參數(shù)的默認(rèn)值為 1073741824,即 1GB懂鸵。
2) 當(dāng)前日志分段中消息的最大時間戳與當(dāng)前系統(tǒng)的時間戳的差值大于 log.roll.ms 或 log.roll.hours 參數(shù)配置的值偏螺。
如果同時配置了 log.roll.ms 和 log.roll.hours 參數(shù),那么 log.roll.ms 的優(yōu)先級高匆光。
默認(rèn)情況下,只配置了 log.roll.hours 參數(shù)酿联,其值為168终息,即 7 天。
3) 偏移量索引文件或時間戳索引文件的大小達(dá)到 broker 端參數(shù) log.index.size.max.bytes 配置的值贞让。
log.index.size.max.bytes 的默認(rèn)值為 10485760周崭,即 10MB。
4) 追加的消息的偏移量與當(dāng)前日志分段的偏移量之間的差值大于 Integer.MAX_VALUE喳张,
即要追加的消息的偏移量不能轉(zhuǎn)變?yōu)橄鄬ζ屏俊?
// 為什么是 Integer.MAX_VALUE 续镇?
在偏移量索引文件中,每個索引項共占用 8 個字節(jié)销部,并分為兩部分摸航。相對偏移量和物理地址。
>> 相對偏移量:表示消息相對與基準(zhǔn)偏移量的偏移量舅桩,占 4 個字節(jié)
>> 物理地址:消息在日志分段文件中對應(yīng)的物理位置酱虎,也占 4 個字節(jié)
4 個字節(jié)剛好對應(yīng) Integer.MAX_VALUE, 如果大于 Integer.MAX_VALUE, 則不能用 4 個字節(jié)進(jìn)行表示了。
// 索引文件切分過程
索引文件會根據(jù) log.index.size.max.bytes 值進(jìn)行預(yù)先分配空間擂涛,
即文件創(chuàng)建的時候就是最大值读串,當(dāng)真正的進(jìn)行索引文件切分的時候,才會將其裁剪到實際數(shù)據(jù)大小的文件。
這一點(diǎn)是跟日志文件有所區(qū)別的地方恢暖。其意義降低了代碼邏輯的復(fù)雜性排监。
日志清理
日志清理,不是日志刪除杰捂,這還是有所區(qū)別的舆床。
Kafka 提供兩種日志清理策略:
日志刪除:按照一定的刪除策略,將不滿足條件的數(shù)據(jù)進(jìn)行數(shù)據(jù)刪除
日志壓縮:針對每個消息的 Key 進(jìn)行整合琼娘,對于有相同 Key 的不同 Value 值峭弟,只保留最后一個版本。
Kafka 提供 log.cleanup.policy 參數(shù)進(jìn)行相應(yīng)配置脱拼,默認(rèn)值:delete瞒瘸,還可以選擇 compact。
是否支持針對具體的 Topic 進(jìn)行配置熄浓?
答案是肯定的情臭,主題級別的配置項是 cleanup.policy 。
日志刪除
配置項 | 默認(rèn)值 | 說明 |
---|---|---|
log.retention.check.interval.ms | 300000 (5分鐘) | 檢測頻率 |
log.retention.hours | 168 (7天) | 日志保留時間小時 |
log.retention.minutes | - | 日志保留時間分鐘 |
log.retention.ms | - | 日志保留時間毫秒 |
file.delete.delay.ms | 60000 (1分鐘) | 延遲執(zhí)行刪除時間 |
log.retention.bytes | -1 無窮大 | 運(yùn)行保留日志文件最大值 |
log.retention.bytes | 1073741824 (1G) | 日志文件最大值 |
Kafka 會周期性根據(jù)相應(yīng)規(guī)則進(jìn)行日志數(shù)據(jù)刪除赌蔑,保留策略有 3 種:
>> 基于時間的保留策略
>> 基于日志大小的保留策略
>> 基于日志起始偏移量的保留策略
1)基于時間
日志刪除任務(wù)會根據(jù) log.retention.hours/log.retention.minutes/log.retention.ms 設(shè)定日志保留的時間節(jié)點(diǎn)俯在。
如果超過該設(shè)定值,就需要進(jìn)行刪除娃惯。默認(rèn)是 7 天跷乐,log.retention.ms 優(yōu)先級最高。
// 如何查找日志分段文件中已經(jīng)過去的數(shù)據(jù)呢趾浅?
Kafka 依據(jù)日志分段中最大的時間戳進(jìn)行定位愕提,首先要查詢該日志分段所對應(yīng)的時間戳索引文件,
查找時間戳索引文件中最后一條索引項皿哨,若最后一條索引項的時間戳字段值大于 0浅侨,則取該值,否則取最近修改時間证膨。
// 為什么不直接選最近修改時間呢如输?
因為日志文件可以有意無意的被修改,并不能真實的反應(yīng)日志分段的最大時間信息央勒。
// 刪除過程
>> 從日志對象中所維護(hù)日志分段的跳躍表中移除待刪除的日志分段不见,保證沒有線程對這些日志分段進(jìn)行讀取操作。
>> 這些日志分段所有文件添加 上 .delete 后綴订歪。
>> 交由一個以 "delete-file" 命名的延遲任務(wù)來刪除這些 .delete 為后綴的文件脖祈。
>> 延遲執(zhí)行時間可以通過 file.delete.delay.ms 進(jìn)行設(shè)置
// 如果活躍的日志分段中也存在需要刪除的數(shù)據(jù)時?
Kafka 會先切分出一個新的日志分段作為活躍日志分段刷晋,然后執(zhí)行刪除操作盖高。
2) 基于日志大小
日志刪除任務(wù)會檢查當(dāng)前日志的大小是否超過設(shè)定值慎陵。
設(shè)定項為 log.retention.bytes ,單個日志分段的大小由 log.regment.bytes 進(jìn)行設(shè)定喻奥。
// 刪除過程
>> 計算需要被刪除的日志總大小 (當(dāng)前日志文件大小-retention值)席纽。
>> 從日志文件第一個 LogSegment 開始查找可刪除的日志分段的文件集合。
>> 執(zhí)行刪除撞蚕。
3) 基于日志起始偏移量
判斷依據(jù)是某日志分段的下一個日志分段的起始偏移量是否大于等于日志文件的起始偏移量润梯,
若是,則可以刪除此日志分段甥厦。
// 注意:
日志文件的起始偏移量并不一定等于第一個日志分段的基準(zhǔn)偏移量纺铭,
存在數(shù)據(jù)刪除,可能與之相等的那條數(shù)據(jù)已經(jīng)被刪除了刀疙。
// 刪除過程
>> 從頭開始變了每一個日志分段舶赔,日志分段 1 的下一個日志分段的起始偏移量為 11,小于 logStartOffset谦秧,將 日志分段 1 加入到刪除隊列中
>> 日志分段 2 的下一個日志分段的起始偏移量為 23竟纳,小于 logStartOffset,將 日志分段 2 加入到刪除隊列中
>> 日志分段 3 的下一個日志分段的起始偏移量為 30疚鲤,大于 logStartOffset锥累,則不進(jìn)行刪除。
1.4.8 Kafka 為什么這么快
Kafka的消息是保存或緩存在磁盤上的集歇,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會降低性能的桶略,
因為尋址會比較消耗時間,但是實際上诲宇,Kafka的特性之一就是高吞吐率删性。
Kafka速度的秘訣在于,它把所有的消息都變成一個批量的文件焕窝,并且進(jìn)行合理的批量壓縮,減少網(wǎng)絡(luò)IO損耗维贺,
通過mmap提高I/O速度它掂,寫入數(shù)據(jù)的時候由于單個Partion是末尾添加所以速度最優(yōu);
讀取數(shù)據(jù)的時候配合sendfile直接暴力輸出溯泣。
1.數(shù)據(jù)寫入
Kafka會把收到的消息都寫入到硬盤中虐秋,為了優(yōu)化寫入速度Kafka采用了兩個技術(shù), 順序?qū)懭?和 MMFile 絮姆。
#1.順序?qū)懭?磁盤讀寫的快慢取決于你怎么使用它鸭栖,也就是順序讀寫或者隨機(jī)讀寫皱坛。
在順序讀寫的情況下,某些優(yōu)化場景磁盤的讀寫速度可以和內(nèi)存持平靶剑。
因為硬盤是機(jī)械結(jié)構(gòu)蜻拨,每次讀寫都會尋址->寫入,其中尋址是一個“機(jī)械動作”桩引,它是最耗時的缎讼。
所以硬盤最討厭隨機(jī)I/O,最喜歡順序I/O坑匠。為了提高讀寫硬盤的速度血崭,Kafka就是使用順序I/O。
而且Linux對于磁盤的讀寫優(yōu)化也比較多厘灼,包括read-ahead和write-behind夹纫,磁盤緩存等。
如果在內(nèi)存做這些操作的時候设凹,一個是JAVA對象的內(nèi)存開銷很大舰讹,
另一個是隨著堆內(nèi)存數(shù)據(jù)的增多,JAVA的GC時間會變得很長围来,使用磁盤操作有以下幾個好處:
>> 磁盤順序讀寫速度超過內(nèi)存隨機(jī)讀寫跺涤。
>> JVM的GC效率低,內(nèi)存占用大监透。使用磁盤可以避免這一問題桶错。
>> 系統(tǒng)冷啟動后,磁盤緩存依然可用胀蛮。
下圖就展示了Kafka是如何寫入數(shù)據(jù)的院刁, 每一個Partition其實都是一個文件 ,
收到消息后Kafka會把數(shù)據(jù)插入到文件末尾(虛框部分)粪狼。
這種方法有一個缺陷—— 沒有辦法刪除數(shù)據(jù) 退腥,所以Kafka是不會刪除數(shù)據(jù)的,
它會把所有的數(shù)據(jù)都保留下來再榄,每個Consumer對每個Topic都有一個offset用來表示讀取到了第幾條數(shù)據(jù) 狡刘。
但是kafka提供了刪除數(shù)據(jù)的策略: 基于時間和基于數(shù)據(jù)大小的策略.
#2.Memory Mapped Files
即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存困鸥。
所以Kafka的數(shù)據(jù)并不是實時的寫入硬盤 嗅蔬,它充分利用了現(xiàn)代操作系統(tǒng)"分頁存儲"來利用內(nèi)存提高I/O效率。
Memory Mapped Files(后面簡稱mmap)也被翻譯成 內(nèi)存映射文件 疾就,
在64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件澜术,
它的工作原理是直接利用操作系統(tǒng)的Page來實現(xiàn)文件到物理內(nèi)存的直接映射。
完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r候)猬腰。
通過mmap鸟废,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底姑荷。
使用這種方式可以獲取很大的I/O提升盒延, 省去了用戶空間到內(nèi)核空間 復(fù)制的開銷,
調(diào)用文件的read會把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中缩擂,然后再復(fù)制到用戶空間的內(nèi)存中。
也有一個很明顯的缺陷——不可靠兰英, 寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤撇叁,
操作系統(tǒng)會在程序主動調(diào)用flush的時候才把數(shù)據(jù)真正的寫到硬盤。
Kafka提供了一個參數(shù)——producer.type來控制是不是主動flush畦贸,
如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫 同步 (sync)陨闹;
寫入mmap之后立即返回Producer不調(diào)用flush叫 異步 (async)。
2.數(shù)據(jù)讀取
Kafka在讀取磁盤時做了哪些優(yōu)化薄坏?
#基于sendfile實現(xiàn)Zero Copy
傳統(tǒng)模式下趋厉,當(dāng)需要對一個文件進(jìn)行傳輸?shù)臅r候,其具體流程細(xì)節(jié)如下:
>> 調(diào)用read函數(shù)胶坠,文件數(shù)據(jù)被copy到內(nèi)核緩沖區(qū)
>> read函數(shù)返回君账,文件數(shù)據(jù)從內(nèi)核緩沖區(qū)copy到用戶緩沖區(qū)
>> write函數(shù)調(diào)用,將文件數(shù)據(jù)從用戶緩沖區(qū)copy到內(nèi)核與socket相關(guān)的緩沖區(qū)沈善。
>> 數(shù)據(jù)從socket緩沖區(qū)copy到相關(guān)協(xié)議引擎乡数。
以上細(xì)節(jié)是傳統(tǒng)read/write方式進(jìn)行網(wǎng)絡(luò)文件傳輸?shù)姆绞剑覀兛梢钥吹剑?在這個過程當(dāng)中闻牡,文件數(shù)據(jù)實際上是經(jīng)過了四次copy操作:
>> 硬盤—>內(nèi)核buf—>用戶buf—>socket相關(guān)緩沖區(qū)—>協(xié)議引擎
而sendfile系統(tǒng)調(diào)用則提供了一種減少以上多次copy净赴,提升文件傳輸性能的方法。
在內(nèi)核版本2.1中罩润,引入了sendfile系統(tǒng)調(diào)用玖翅,以簡化網(wǎng)絡(luò)上和兩個本地文件之間的數(shù)據(jù)傳輸。
sendfile的引入不僅減少了數(shù)據(jù)復(fù)制割以,還減少了上下文切換金度。
sendfile(socket, file, len);
運(yùn)行流程如下:
>> sendfile系統(tǒng)調(diào)用,文件數(shù)據(jù)被copy至內(nèi)核緩沖區(qū)
>> 再從內(nèi)核緩沖區(qū)copy至內(nèi)核中socket相關(guān)的緩沖區(qū)
>> 最后再socket相關(guān)的緩沖區(qū)copy到協(xié)議引擎
相較傳統(tǒng)read/write方式严沥,2.1版本內(nèi)核引進(jìn)的sendfile已經(jīng)減少了內(nèi)核緩沖區(qū)到user緩沖區(qū)猜极,
再由user緩沖區(qū)到socket相關(guān)緩沖區(qū)的文件copy,而在內(nèi)核版本2.4之后消玄,
文件描述符結(jié)果被改變魔吐,sendfile實現(xiàn)了更簡單的方式,再次減少了一次copy操作莱找。
在apache,nginx嗜桌,lighttpd等web服務(wù)器當(dāng)中奥溺,都有一項sendfile相關(guān)的配置,使用sendfile可以大幅提升文件傳輸性能骨宠。
Kafka把所有的消息都存放在一個一個的文件中浮定,當(dāng)消費(fèi)者需要數(shù)據(jù)的時候Kafka直接把文件發(fā)送給消費(fèi)者相满,
配合mmap作為文件讀寫方式,直接把它傳給sendfile桦卒。
#批量壓縮
在很多情況下立美,系統(tǒng)的瓶頸不是CPU或磁盤,而是網(wǎng)絡(luò)IO方灾,
對于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線尤其如此建蹄。
進(jìn)行數(shù)據(jù)壓縮會消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮。
>> 如果每個消息都壓縮裕偿,但是壓縮率相對很低洞慎,所以Kafka使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮
>> Kafka允許使用遞歸的消息集合嘿棘,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式劲腿,直到被消費(fèi)者解壓縮
>> Kafka支持多種壓縮協(xié)議,包括Gzip和Snappy壓縮協(xié)議
2.kafka安裝
2.1 kafka集群版
2.1.1 安裝zookeeper集群
切換目錄, 下載zookeeper
cd /home
mkdir zookeeper
cd zookeeper
mkdir zkdata zkdatalog
wget http://www.apache.org/dist/zookeeper/stable/zookeeper-3.4.13.tar.gz
tar -zxf zookeeper-3.4.13.tar.gz
修改配置文件
cd /home/zookeeper/zookeeper-3.4.13/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/zookeeper/zkdata/
dataLogDir=/home/zookeeper/zkdatalog/
clientPort=12181
server.1=192.168.0.199:12888:13888
server.2=192.168.0.198:12888:13888
server.3=192.168.0.197:12888:13888
說明:
#server.1
這個1是服務(wù)器的標(biāo)識也可以是其他的數(shù)字鸟妙, 表示這個是第幾號服務(wù)器焦人,用來標(biāo)識服務(wù)器,這個標(biāo)識要寫到快照目錄下面myid文件里
#192.168.0.199為集群里的IP地址重父,
第一個端口是master和slave之間的通信端口花椭,默認(rèn)是2888,第二個端口是leader選舉的端口坪郭,集群剛啟動的時候選舉或者leader掛掉之后進(jìn)行新的選舉的端口默認(rèn)是3888
#tickTime:
這個時間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時間間隔个从,也就是每個 tickTime 時間就會發(fā)送一個心跳。
#initLimit:
這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端歪沃,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時最長能忍受多少個心跳時間間隔數(shù)嗦锐。當(dāng)已經(jīng)超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務(wù)器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗沪曙∞任郏總的時間長度就是 5*2000=10 秒
#syncLimit:
這個配置項標(biāo)識 Leader 與Follower 之間發(fā)送消息,請求和應(yīng)答時間長度液走,最長不能超過多少個 tickTime 的時間長度碳默,總的時間長度就是5*2000=10秒
#dataDir:
快照日志的存儲路徑
#dataLogDir:
事物日志的存儲路徑,如果不配置這個那么事物日志會默認(rèn)存儲到dataDir制定的目錄缘眶,這樣會嚴(yán)重影響zk的性能嘱根,當(dāng)zk吞吐量較大的時候,產(chǎn)生的事物日志巷懈、快照日志太多
#clientPort:
這個端口就是客戶端連接 Zookeeper 服務(wù)器的端口该抒,Zookeeper 會監(jiān)聽這個端口,接受客戶端的訪問請求顶燕。修改他的端口改大點(diǎn)
創(chuàng)建myid文件
#server1
echo "1" > /home/zookeeper/zkdata/myid
#server2
echo "2" > /home/zookeeper/zkdata/myid
#server3
echo "3" > /home/zookeeper/zkdata/myid
重要配置說明:
1凑保、myid文件和server.myid 在快照目錄下存放的標(biāo)識本臺服務(wù)器的文件冈爹,他是整個zk集群用來發(fā)現(xiàn)彼此的一個重要標(biāo)識。
2欧引、zoo.cfg 文件是zookeeper配置文件 在conf目錄里频伤。
3、log4j.properties文件是zk的日志輸出文件 在conf目錄里用java寫的程序基本上有個共同點(diǎn)日志都用log4j芝此,來進(jìn)行管理憋肖。
4、zkEnv.sh和zkServer.sh文件
zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置癌蓖,zookeeper集群啟動時配置環(huán)境變量的文件
5瞬哼、還有一個需要注意
zookeeper不會主動的清除舊的快照和日志文件,這個是操作者的責(zé)任租副。
但是可以通過命令去定期的清理坐慰。腳本如下:
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上這個腳本定義了刪除對應(yīng)兩個目錄中的文件,保留最新的66個文件用僧,可以將他寫到crontab中结胀,設(shè)置為每天凌晨2點(diǎn)執(zhí)行一次就可以了。
#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
其他清理zk日志的方法:
其他方法:
第二種:使用ZK的工具類PurgeTxnLog责循,它的實現(xiàn)了一種簡單的歷史文件清理策略糟港,可以在這里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html
第三種:對于上面這個執(zhí)行,ZK自己已經(jīng)寫好了腳本院仿,在bin/zkCleanup.sh中秸抚,所以直接使用這個腳本也是可以執(zhí)行清理工作的。
第四種:從3.4.0開始歹垫,zookeeper提供了自動清理snapshot和事務(wù)日志的功能剥汤,通過配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 這兩個參數(shù)能夠?qū)崿F(xiàn)定時清理了。這兩個參數(shù)都是在zoo.cfg中配置的:
autopurge.purgeInterval 這個參數(shù)指定了清理頻率排惨,單位是小時吭敢,需要填寫一個1或更大的整數(shù),默認(rèn)是0暮芭,表示不開啟自己清理功能鹿驼。
autopurge.snapRetainCount 這個參數(shù)和上面的參數(shù)搭配使用,這個參數(shù)指定了需要保留的文件數(shù)目辕宏。默認(rèn)是保留3個畜晰。
關(guān)閉防火墻
systemctl stop firewalld
systemctl disable firewalld
啟動服務(wù)
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start
檢查服務(wù)狀態(tài)
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh status
zk集群一般只有一個leader,多個follower瑞筐,
主一般是相應(yīng)客戶端的讀寫請求舷蟀,而從主同步數(shù)據(jù),
當(dāng)主掛掉之后就會從follower里投票選舉一個leader出來。
#server1:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
#server2:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
#server3:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader
若是啟動時報如下錯誤
Cannot open channel to 3 at election address /192.168.0.197:13888
請把三臺都啟動,
若三臺都啟動后, 仍報錯:
1)關(guān)閉防火墻
2)仔細(xì)檢查各配置文件, 不要有中文等錯誤字符, 注意路徑
3)刪除曾經(jīng)啟動時, zkdata目錄下的文件
"
drwxr-xr-x. 2 root root 47 Mar 11 21:27 version-2
-rw-r--r--. 1 root root 5 Mar 11 21:26 zookeeper_server.pid
"
4)再次重啟
用jps命令查看zk進(jìn)程
[root@localhost zookeeper]# jps
36741 QuorumPeerMain
37257 Jps
關(guān)閉zookeeper命令
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop
2.1.2 在zookeeper集群基礎(chǔ)上搭建kafka集群
切換目錄, 下載kafka
cd /home/
mkdir kafka
cd kafka
mkdir kafkalogs #創(chuàng)建kafka消息目錄野宜,主要存放kafka消息
wget https://www-us.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
#或者鏡像下載
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
tar -zxf kafka_2.11-2.1.1.tgz
環(huán)境變量配置
#vi source /etc/profile
export KAFKA_HOME=/home/kafka/kafka_2.11-2.1.1
export PATH=$PATH:$KAFKA_HOME/bin
#保存使其立即生效
source /etc/profile
vi /home/kafka/kafka_2.11-2.1.1/config/server.properties
#每臺服務(wù)器的broker.id都不能相同
broker.id=199或198或197
#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#設(shè)置zookeeper的連接端口
zookeeper.connect=192.168.0.199:12181,192.168.0.198:12181,192.168.0.197:12181
分別啟動三臺機(jī)器上的kafka集群
/home/kafka/kafka_2.11-2.1.1/bin/kafka-server-start.sh -daemon /home/kafka/kafka_2.11-2.1.1/config/server.properties
檢查服務(wù)是否啟動
[root@localhost config]# jps
123988 Jps
121142 QuorumPeerMain
123965 Kafka
關(guān)閉kafka命令
/home/kafka/kafka_2.11-2.1.1/bin/kafka-server-stop.sh
創(chuàng)建一個topic
/home/kafka/kafka_2.11-2.1.1/bin/kafka-topics.sh --zookeeper 192.168.0.199:12181 --replication-factor 1 --partitions 1 --topic test_kafka
啟動生產(chǎn)者
/home/kafka/kafka_2.11-2.1.1/bin/kafka-console-producer.sh --broker-list 192.168.0.199:9092 --topic test_kafka
啟動消費(fèi)者
/home/kafka/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.199:9092 --topic test_kafka --from-beginning
在生產(chǎn)者所在命令行發(fā)送消息, 在消費(fèi)者所在shell窗口便可以接收到消息
2.2 windows上安裝kafka單機(jī)
以kafka_2.12-2.3.0為例
#step1.下載 kafka, 該版本不必下載 zookeeper
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#step2.切換到kafka層的目錄
cd D:\soft_for_dev\kafka_2.12-2.3.0
#step3.啟動zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
#step4.啟動kafka
bin\windows\kafka-server-start.bat config\server.properties
#step5.創(chuàng)建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-log
#step6.創(chuàng)建生產(chǎn)者產(chǎn)生消息,不關(guān)閉頁面
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic user-log
#step7.創(chuàng)建消費(fèi)者接收消息魔策,不關(guān)閉頁面
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic user-log --from-beginning
布控kafka-manager
#1.概述
為了簡化開發(fā)者和服務(wù)工程師維護(hù)Kafka集群的工作匈子,
yahoo構(gòu)建了一個叫做Kafka管理器的基于Web工具,叫做 Kafka Manager闯袒。
可以很容易地發(fā)現(xiàn)分布在集群中的哪些topic分布不均勻虎敦,或者是分區(qū)在整個集群分布不均勻的的情況。
它支持管理多個集群政敢、選擇副本其徙、副本重新分配以及創(chuàng)建Topic。
此外還有如下功能:
>> 管理多個kafka集群
>> 便捷的檢查kafka集群狀態(tài)(topics,brokers,備份分布情況,分區(qū)分布情況)
>> 選擇你要運(yùn)行的副本
>> 基于當(dāng)前分區(qū)狀況進(jìn)行
>> 可以選擇topic配置并創(chuàng)建topic
>> 刪除topic(只支持0.8.2以上的版本并且要在broker配置中設(shè)置delete.topic.enable=true)
>> Topic list會指明哪些topic被刪除
>> 為已存在的topic增加分區(qū)
>> 為已存在的topic更新配置
>> 在多個topic上批量重分區(qū)
>> 在多個topic上批量重分區(qū)(可選partition broker位置)
#2.安裝環(huán)境要求
>> java
>> scala
>> sbt
#3.## 安裝步驟 (windows版)
1.下載最新的kafka-manager的releases中zip版本https://github.com/yahoo/kafka-manager/releases, 解壓
2.下載安裝kafka-manager要求的java版本喷户,檢查系統(tǒng)環(huán)境path路徑是否添加
3.下載安裝kafka-manager要求scala的window版本對應(yīng)的msi文件 https://www.scala-lang.org/download/唾那,雙擊安裝,檢查系統(tǒng)環(huán)境path路徑是否添加(任意目錄進(jìn)入cmd, 輸入scala 回車)
4.下載安裝kafka-manager要求sbt的window版本對應(yīng)的msi文件https://www.scala-sbt.org/download.html, 雙擊安裝褪尝,檢查系統(tǒng)環(huán)境path路徑是否添加
(sbt的配置參考: https://blog.csdn.net/u014532217/article/details/78966807)
#4.編譯解壓運(yùn)行
>> 進(jìn)入kafka-manager目錄, 執(zhí)行 解壓命令: sbt clean dist
>> 提取zip包: D:\soft_for_dev\kafka-manager-master\target\universal\kafka-manager-2.0.0.2.zip
>> 修改application.conf配置文件
kafka-manager.zkhosts="localhost:2181" // 這里修改下
#kafka-manager.zkhosts=${?ZK_HOSTS} // 該行要注釋掉
(另外需修改kafka下的配置文件zookeeper.properties
maxClientCnxns=100)
>> 可用git客戶端打開shell窗口, 執(zhí)行命令: ./bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=9999
>> 瀏覽器中輸入 http://localhost:9999即可看到 kafka-manager界面
https://blog.csdn.net/weixin_41846320/article/details/84782871 (kafka的windows版安裝)
2.3 docker-compose安裝
2.3.1 安裝zookeeper
#準(zhǔn)備docker-compose.yml
#在docker-compose.yml文件同一目錄下, 執(zhí)行'docker-compose up -d'命令
#執(zhí)行'docker ps'命令, 查看進(jìn)程號
#執(zhí)行'docker exec -it 進(jìn)程號 bash'進(jìn)入某一zk下
#進(jìn)入后, 執(zhí)行'zkServer.sh status'命令, 查看節(jié)點(diǎn)狀態(tài)
docker-compose.yml
version: '3.4'
services:
zoo1:
image: zookeeper:3.4
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper:3.4
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper:3.4
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
# notice: execute command 'docker-compose up -d' to run zookeeper clusters
https://blog.csdn.net/mmmaaaggg/article/details/85000604 (docker安裝kafka集群)
https://testerhome.com/topics/16126 (docker安裝kafka集群)
3.kafka配置文件詳解(適配: kafka_2.12-2.3.0)
3.1 bin目錄下腳本文件
kafka的安裝包除了包括kafka自身的工具以外闹获,也包括了一系列簡易的zookeeper工具,
能夠通過zookeeper-server-start.sh腳本啟動簡易的單點(diǎn)zookeeper實例河哑,供kafka使用避诽。
但一般僅限于測試環(huán)境使用。
腳本 | 功能 |
---|---|
kafka-server-start.sh | 啟動kafka服務(wù)器璃谨; |
kafka-server-stop.sh | 停止kafka服務(wù)器沙庐; |
kafka-topics.sh | topic管理; |
kafka-console-producer.sh | 基于命令行的生產(chǎn)者佳吞; |
kafka-console-consumer.sh | 基于命令行的消費(fèi)者拱雏; |
kafka-run-class.sh | 運(yùn)行java類的腳本,由kafka-server-start.sh和kafka-server-stop.sh容达、kafka-topics.sh等腳本調(diào)用古涧; |
zookeeper-server-start.sh | 啟動kafka自帶的zookeeper服務(wù)器; |
zookeeper-server-stop.sh | 停止kafka自帶的zookeeper服務(wù)器花盐; |
zookeeper-shell.sh | 在命令行連接zookeeper的客戶端工具羡滑; |
connect-standalone.sh | 在命令行啟動單點(diǎn)的connector; |
connect-distributed.sh | 在命令行啟動基于集群connector算芯; |
3.2 config目錄下配置文件
腳本 | 功能 |
---|---|
server.properties | kafka實例的配置文件柒昏,配置kafka最重要的配置文件; |
log4j.properties | kafka日志配置熙揍; |
zookeeper.properties | 自帶zk的配置文件职祷; |
producer.properties | 基于命令行的生產(chǎn)者工具配置文件;(測試用) |
consumer.properties | 基于命令行的消費(fèi)者工具配置文件;(測試用) |
connect-standalone.properties | 自帶單點(diǎn)connector的配置文件有梆,存放connector的序列化方式是尖、監(jiān)聽broker的地址端口等通用配置;(測試用) |
connect-file-source.properties | 配置文件讀取connector泥耀,用于逐行讀取文件饺汹,導(dǎo)入入topic;(測試用) |
connect-file-sink.properties | 配置文件寫入connector痰催,用于將topic中的數(shù)據(jù)導(dǎo)出到topic中兜辞;(測試用) |
3.2.1 server.properties (broker配置文件)
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics (基本配置) #############################
# broker id, id必須是唯一的整數(shù)
broker.id=0
# 是否可以刪除topic,如果為true夸溶,我們可以在命令行刪除topic逸吵,否則,不能缝裁。
#delete.topic.enable=true
############################# Socket Server Settings (socket配置) #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# broker監(jiān)聽地址扫皱。如果沒有配置,默認(rèn)為java.net.InetAddress.getCanonicalHostName()方法返回的地址
#申明此kafka服務(wù)器需要監(jiān)聽的端口號压语,如果是在本機(jī)上跑虛擬機(jī)運(yùn)行可以不用配置本項啸罢,
#默認(rèn)會使用localhost的地址,如果是在遠(yuǎn)程服務(wù)器上運(yùn)行則必須配置
#listeners=PLAINTEXT://:9092
# broker的主機(jī)名和端口號將會廣播給消費(fèi)者與生產(chǎn)者胎食。如果沒有設(shè)置扰才,默認(rèn)為監(jiān)聽配置,否則使用java.net.InetAddress.getCanonicalHostName()方法返回的地址
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 監(jiān)聽協(xié)議厕怜,默認(rèn)為PLAINTEXT
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 服務(wù)器接受請求和響應(yīng)請求的線程數(shù)
num.network.threads=3
# 處理請求的線程數(shù)衩匣,包括磁盤的IO操作
num.io.threads=8
# 服務(wù)器socket發(fā)送緩存
socket.send.buffer.bytes=102400
# 服務(wù)器socket接收緩存
socket.receive.buffer.bytes=102400
# 服務(wù)器接收請求的最大值
socket.request.max.bytes=104857600
############################# Log Basics (log基本配置) #############################
# log日志文件夾
log.dirs=/tmp/kafka-logs
# 每個topic的默認(rèn)日志分區(qū)數(shù)。允許分區(qū)數(shù)大于并行消費(fèi)數(shù)粥航,這樣可能導(dǎo)致琅捏,更多的文件將會跨broker
num.partitions=1
# 在啟動和關(guān)閉刷新時,沒有數(shù)據(jù)目錄用于日志恢復(fù)的線程數(shù)递雀。
# 這個值柄延,強(qiáng)烈建議在隨著在RAID陣列中的安裝數(shù)據(jù)目錄的增長而增長。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings (內(nèi)部topic配置) #############################
# 內(nèi)部__consumer_offsets和__transaction_state兩個topic缀程,分組元數(shù)據(jù)的復(fù)制因子搜吧。
# 除開發(fā)測試外的使用,強(qiáng)烈建議值大于1杨凑,以保證可用性滤奈,比如3。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy (日志刷新策略) #############################
# 消息立刻被寫到文件系統(tǒng)撩满,默認(rèn)調(diào)用fsync方法蜒程,懶同步操作系統(tǒng)緩存绅你。下面的配置用于控制刷新數(shù)據(jù)到磁盤。
# 這里是一些折中方案:
# 1. 持久性:如果沒有使用replication昭躺,沒刷新的數(shù)據(jù)可能丟失忌锯。
# 2. 延遲性:當(dāng)有大量的數(shù)據(jù)需要刷新,刷新操作發(fā)生時领炫,比較大的刷新間隔可能會導(dǎo)致延時汉规。
# 3. 吞吐量:刷新操作代價比較高,較小的刷新間隔驹吮,將會引起過渡的seek文件操作。
# 下面的配置刷新策略晶伦,允許在一個的刷新間隔或消息數(shù)量下碟狞,刷新數(shù)據(jù),這個配置是全局的婚陪,可以在每個topic下重寫族沃。
# 在強(qiáng)制刷新數(shù)據(jù)到磁盤前,允許接受消息數(shù)量
#log.flush.interval.messages=10000
# 在強(qiáng)制刷新前泌参,一個消息可以日志中停留在最大時間
#log.flush.interval.ms=1000
############################# Log Retention Policy (日志保留策略) #############################
# 下面的配置用于控制日志segments的處理脆淹。這些策略可以在一定的時間間隔和數(shù)據(jù)累積到一定的size,可以刪除segments沽一。
# 兩種策略只要有一種觸發(fā)盖溺,segments將會被刪除。刪除總是從log的末端铣缠。
# log文件的保留的時間
log.retention.hours=168
# log文件保留的size
#log.retention.bytes=1073741824
# 日志segments文件最大size烘嘱,當(dāng)日志文件的大于最大值,則創(chuàng)建一個新的log segment
log.segment.bytes=1073741824
# 日志保留檢查間隔
log.retention.check.interval.ms=300000
############################# Zookeeper (Zookeeper配置) #############################
# zookeeper地址蝗蛙,多個以逗號隔開比如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=localhost:2181
# 連接zookeeper超時時間
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings (分組協(xié)調(diào)配置) #############################
# 下面的配置為毫秒時間蝇庭,用于延時消費(fèi)者重平衡的時間。重平衡將會進(jìn)一步在新成員添加分組是捡硅,
# 延時group.initial.rebalance.delay.ms時間哮内,直到到達(dá)maximum of max.poll.interval.ms時間。
# 默認(rèn)值為3秒壮韭,我們重寫0北发,主要是用戶開發(fā)測試體驗。在生產(chǎn)環(huán)境下泰涂,默認(rèn)值3s鲫竞,在應(yīng)用啟動期間,
# 幫助避免不必要及潛在的代價高的rebalances逼蒙,是比較合適的从绘。
group.initial.rebalance.delay.ms=0
3.2.2 zookeeper.properties (內(nèi)置zk的配置文件, 生產(chǎn)可不用)
# 數(shù)據(jù)目錄
dataDir=/tmp/zookeeper
# 監(jiān)聽端口
clientPort=2181
# 最大連接數(shù),非生產(chǎn)環(huán)境配置
maxClientCnxns=0
3.2.3 producer.properties(測試用, 生產(chǎn)在項目配置文件中配置)
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
生產(chǎn)者基本配置
# broker地址配置,集群則格式為 host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 是否壓縮數(shù)據(jù)僵井,有none, gzip, snappy, lz4陕截,默認(rèn)為壓縮
compression.type=none
# 分區(qū)事件的類名,默認(rèn)隨機(jī)
#partitioner.class=
# 請求超時時間
#request.timeout.ms=
# `KafkaProducer.send` and `KafkaProducer.partitionsFor`最長阻塞時間
#max.block.ms=
# 生產(chǎn)者延時發(fā)送消息的時間批什,以便可以批量發(fā)送消息
#linger.ms=
# 最大請求size
#max.request.size=
# 每次可以批量發(fā)送到一個分區(qū)的消息記錄數(shù)
#batch.size=
# 在消息發(fā)送至server前农曲,生產(chǎn)者可以緩存的消息大小
#buffer.memory=
3.2.4 consumer.properties(測試用, 生產(chǎn)在項目配置文件中配置)
# see kafka.consumer.ConsumerConfig for more details
# zookeeper連接地址,集群則個時如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
zookeeper.connect=127.0.0.1:2181
# zookeeper 連接超時時間
zookeeper.connection.timeout.ms=6000
# 消費(fèi)者分組id
group.id=test-consumer-group
# 消費(fèi)超時時間
#consumer.timeout.ms=5000
# 一次最大消費(fèi)的消息字節(jié)數(shù), 若實際消息大于該值, 消費(fèi)端就無法消費(fèi)驻债,
# 導(dǎo)致一直卡在這一條消息乳规,現(xiàn)象就是消費(fèi)停止。
#kafka0.8版本為fetch.message.max.bytes合呐,默認(rèn)是1M
#fetch.max.bytes=50 * 1024 * 1024; (本版本默認(rèn) 50M)
https://yq.aliyun.com/articles/370504 (kafka_2.11-1.0.0.tgz)
https://www.iteye.com/blog/donald-draper-2397000 (kafka_2.11-1.0.0)
https://www.cnblogs.com/jun1019/p/6256371.html
https://www.cnblogs.com/alan319/p/8651434.html (Kafka0.8)
4.spring中使用kafaka (適配: kafka_2.12-2.3.0)
4.1 消息提供方
4.2 消費(fèi)端
https://www.cnblogs.com/lshan/p/11544111.html (手動創(chuàng)建topic)
https://blog.csdn.net/tmeng521/article/details/90902236 (consumer篇)
5.常見問題
5.1消息積壓在消息隊列里的問題
5.1.1消費(fèi)端不消費(fèi)
原因1: 實際發(fā)送消息量 > 配置量
# 一次最大消費(fèi)的消息字節(jié)數(shù), 若實際消息大于該值, 消費(fèi)端就無法消費(fèi)暮的,
# 導(dǎo)致一直卡在這一條消息,現(xiàn)象就是消費(fèi)停止淌实。
#kafka0.8版本為fetch.message.max.bytes冻辩,默認(rèn)是1M
#fetch.max.bytes=50 * 1024 * 1024; (本版本kafka2.12默認(rèn) 50M)
解決方案:
在consumer層的代碼中配置 fetch.max.bytes 屬性大些即可
// 原因2: 查看日志是不是包含ConsumerRebalanceFailedException
如果包含,說明是rebalance失敗拆祈,更改配置恨闪,
確保 rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
(默認(rèn)符合,如果機(jī)器較多放坏,或者消費(fèi)的Topic較多咙咽,建議rebalance.max.retries設(shè)置大一點(diǎn)),并且滾動重啟消費(fèi)轻姿!
如果滾動重啟重試多次都是失敗犁珠,建議將所有消費(fèi)端都停止一段時間后(目的是為了保證監(jiān)控到消費(fèi)端全部下線)抒巢,
滾動重啟(并且消費(fèi)服務(wù)啟動間隔時間變長)
#其他情形:
排查消費(fèi)端線程是否有數(shù)據(jù)庫阻塞, FullGC, OOM等導(dǎo)致消費(fèi)線程阻塞的,
此外消費(fèi)端代碼需要進(jìn)行try...catch...包裹
結(jié)論: 總體解決方案
臨時擴(kuò)容底挫,以更快的速度去消費(fèi)數(shù)據(jù)。具體操作步驟和思路如下:
①先修復(fù)consumer的問題奔穿,確保其恢復(fù)消費(fèi)速度豹休,然后將現(xiàn)有consumer都停掉炊昆。
②臨時建立好原先10倍或者20倍的queue數(shù)量(新建一個topic,partition是原來的10倍)威根。
③然后寫一個臨時分發(fā)消息的consumer程序凤巨,這個程序部署上去消費(fèi)積壓的消息,
消費(fèi)之后不做耗時處理洛搀,直接均勻輪詢寫入臨時建好分10數(shù)量的queue里面敢茁。
④緊接著征用10倍的機(jī)器來部署consumer,每一批consumer消費(fèi)一個臨時queue的消息留美。
⑤這種做法相當(dāng)于臨時將queue資源和consumer資源擴(kuò)大10倍彰檬,以正常速度的10倍來消費(fèi)消息伸刃。
⑥等快速消費(fèi)完了之后,恢復(fù)原來的部署架構(gòu)逢倍,重新用原來的consumer機(jī)器來消費(fèi)消息捧颅。
5.2 消息重復(fù)消費(fèi)的問題 (冪等性)
kafka有一個叫做offset的概念,就是每個消息寫進(jìn)去较雕,都有一個offset代表他的序號碉哑,
然后consumer消費(fèi)了數(shù)據(jù)之后,每隔一段時間亮蒋,會把自己消費(fèi)過的消息的offset提交一下扣典,
代表我已經(jīng)消費(fèi)過了,下次就算重啟慎玖,kafka就會讓消費(fèi)者從上次消費(fèi)到的offset來繼續(xù)消費(fèi)激捏。
如果consumer消費(fèi)了數(shù)據(jù),還沒來得及發(fā)送自己已經(jīng)消費(fèi)的消息的offset就掛了凄吏,
那么重啟之后就會收到重復(fù)的數(shù)據(jù)(可以打斷點(diǎn)來模擬實現(xiàn))。
解決思路
要保證消息的冪等性闰蛔,這個要結(jié)合業(yè)務(wù)的類型來進(jìn)行處理痕钢。下面提供幾個思路供參考:
(1)可在內(nèi)存中維護(hù)一個set,只要從消息隊列里面獲取到一個消息序六,
先查詢這個消息在不在set里面任连,如果在表示已消費(fèi)過,直接丟棄例诀;
如果不在随抠,則在消費(fèi)后將其加入set當(dāng)中。
(2)如何要寫數(shù)據(jù)庫繁涂,可以拿唯一鍵先去數(shù)據(jù)庫查詢一下拱她,
如果不存在在寫,如果存在直接更新或者丟棄消息扔罪。
(3)如果是寫redis那沒有問題秉沼,每次都是set,天然的冪等性矿酵。
(4)讓生產(chǎn)者發(fā)送消息時唬复,每條消息加一個全局的唯一id,然后消費(fèi)時全肮,將該id保存到redis里面敞咧。
消費(fèi)時先去redis里面查一下有么有,沒有再消費(fèi)辜腺。
(5)數(shù)據(jù)庫操作可以設(shè)置唯一鍵休建,防止重復(fù)數(shù)據(jù)的插入乍恐,這樣插入只會報錯而不會插入重復(fù)數(shù)據(jù)。
5.3 消息丟失的問題
5.3.1 producer 丟失了消息
生產(chǎn)者沒有設(shè)置相應(yīng)的策略丰包,發(fā)送過程中丟失數(shù)據(jù)禁熏。
解決方案
#同時設(shè)置下述兩個配置項
// 1.ack=all (或者acks=-1, 負(fù)1)
某個partition 的 leader接收到消息,所有的follower都同步到了消息之后邑彪,才認(rèn)為本次寫成功了瞧毙。
// 2.retries=MAX(很大的一個值,表示無限重試)
如果沒滿足這個條件寄症,生產(chǎn)者會自動不斷的重試宙彪,重試無限次。
(個人覺得不妥, 重要消息, 應(yīng)重試指定次數(shù)后, 告警即可.)
5.3.2 brokers 丟失了數(shù)據(jù)
比較常見的一個場景有巧,就是kafka的某個broker宕機(jī)了释漆,然后重新選舉partition的leader時。
如果此時follower還沒來得及同步數(shù)據(jù)篮迎,leader就掛了男图,然后某個follower成為了leader,數(shù)據(jù)丟失甜橱。
解決方案
一般要求設(shè)置4個參數(shù)來保證消息不丟失:
①給topic設(shè)置 replication.factor參數(shù):
這個值必須大于1逊笆,表示要求每個partition必須至少有2個副本。
②在kafka服務(wù)端設(shè)置min.isync.replicas參數(shù):
這個值必須大于1岂傲,表示要求一個leader至少感知到有至少一個follower在跟自己保持聯(lián)系正常同步數(shù)據(jù)难裆,
這樣才能保證leader掛了之后還有一個follower。
③在生產(chǎn)者端設(shè)置acks=all:(或者acks=-1, 負(fù)1)
表示要求每條每條數(shù)據(jù)镊掖,必須是寫入所有replica副本之后乃戈,才能認(rèn)為是寫入成功了
④在生產(chǎn)者端設(shè)置retries=MAX(很大的一個值,表示無限重試):
表示 這個是要求一旦寫入失敗亩进,就一直重試
(個人覺得不妥, 重要消息, 應(yīng)重試指定次數(shù)后, 告警即可.)
5.3.3 consumer 丟失了數(shù)據(jù)
消費(fèi)者消費(fèi)到了這個數(shù)據(jù)症虑,然后消費(fèi)之自動提交了offset,讓kafka知道你已經(jīng)消費(fèi)了這個消息归薛,
當(dāng)你準(zhǔn)備處理這個消息時侦讨,自己掛掉了,那么這條消息就丟了苟翻。
解決方案
關(guān)閉自動提交offset韵卤,在自己處理完畢之后手動提交offset,這樣就不會丟失數(shù)據(jù)崇猫。
5.4 如何保證消息按順序執(zhí)行
5.4.1 為什么要保證順序
消息隊列中的若干消息如果是對同一個數(shù)據(jù)進(jìn)行操作沈条,
這些操作具有前后的關(guān)系,必須要按前后的順序執(zhí)行诅炉,否則就會造成數(shù)據(jù)異常蜡歹。
舉例:
1) 通過mysql binlog進(jìn)行兩個數(shù)據(jù)庫的數(shù)據(jù)同步屋厘,由于對數(shù)據(jù)庫的數(shù)據(jù)操作是具有順序性的,
如果操作順序搞反月而,就會造成不可估量的錯誤汗洒。
2) 數(shù)據(jù)庫對一條數(shù)據(jù)依次進(jìn)行了 插入->更新->刪除操作,這個順序必須是這樣父款,
如果在同步過程中溢谤,消息的順序變成了 刪除->插入->更新,
那么原本應(yīng)該被刪除的數(shù)據(jù)憨攒,就沒有被刪除世杀,造成數(shù)據(jù)的不一致問題。
5.4.2 出現(xiàn)順序錯亂的場景及解決方案
5.4.2.1 consuemr單線程消費(fèi)亂序
consuemr單線程消費(fèi)亂序
具有順序的數(shù)據(jù)寫入到了不同的partition里面肝集,不同的消費(fèi)者去消費(fèi)瞻坝,
但是每個consumer的執(zhí)行時間是不固定的,無法保證先讀到消息的consumer一定先完成操作杏瞻,
這樣就會出現(xiàn)消息并沒有按照順序執(zhí)行所刀,造成數(shù)據(jù)順序錯誤。
consuemr單線程消費(fèi)解決方案
確保同一個消息發(fā)送到同一個partition捞挥,一個topic勉痴,一個partition,一個consumer树肃,內(nèi)部單線程消費(fèi)。
5.4.2.2 consuemr多線程消費(fèi)亂序
consuemr多線程消費(fèi)亂序
kafka一個topic瀑罗,一個partition胸嘴,一個consumer,
但是consumer內(nèi)部進(jìn)行多線程消費(fèi)斩祭,這樣數(shù)據(jù)會出現(xiàn)順序錯亂問題劣像。
consuemr多線程消費(fèi)解決方案
寫N個內(nèi)存queue,然后N個線程分別消費(fèi)一個內(nèi)存queue即可
5.5 保證消息的可靠傳輸
5.5.1 brokers配置
broker 有 3 個配置參數(shù)會影響 Kafka 消息存儲的可靠性摧玫。
與其他配置參數(shù)一樣耳奕,它們可以應(yīng)用在 broker 級別,用于控制所有主題的行為诬像,
也可以應(yīng)用在主題級別屋群,用于控制個別主題的行為。
///////// 1.復(fù)制系數(shù)
>> topic級別的配置參數(shù)是 replication.factor;
>> broker級別配置參數(shù)是 default.replication.factor 來配置自動創(chuàng)建的主題坏挠。
如果復(fù)制系數(shù)為 N芍躏,那么在 N-1 個 broker 失效的情況下,仍然能夠從主題讀取數(shù)據(jù)或向主
題寫入數(shù)據(jù)降狠。
所以对竣,更高的復(fù)制系數(shù)會帶來更高的可用性庇楞、可靠性和更少的故障。
另一方
面否纬,復(fù)制系數(shù) N需要至少 N 個 broker吕晌,而且會有 N 個數(shù)據(jù)副本,也就是說它們會占用 N
倍的磁盤空間临燃。
我們一般會在可用性和存儲硬件之間作出權(quán)衡睛驳。
///////// 2.不完全的首領(lǐng)選舉
unclean.leader.election.enable 只能在 broker 級別(實際上是在集群范圍內(nèi))進(jìn)行配置, 它的默認(rèn)值是 true 谬俄。
當(dāng)分區(qū)首領(lǐng)不可用時柏靶, 一個同步副本會被選為新首領(lǐng)。
如果在選舉過程中沒有丟失數(shù)據(jù)溃论,也就是說提交的數(shù)據(jù)同時存在于所有的同步副本上屎蜓,那么這個選舉就是“完全”的。
但如果在首領(lǐng)不可用時其他副本都是不同步的钥勋,我們該怎么辦呢炬转?
>> 如果不同步的副本不能被提升為新首領(lǐng),那么分區(qū)在舊首領(lǐng)(最后一個同步副本)恢復(fù)
之前是不可用的算灸。
有時候這種狀態(tài)會持續(xù)數(shù)小時(比如更換內(nèi)存芯片)扼劈。
>> 如果不同步的副本可以被提升為新首領(lǐng),那么在這個副本變?yōu)椴煌街髮懭肱f首領(lǐng)的
消息菲驴、會全部丟失荐吵,導(dǎo)致數(shù)據(jù)不一致。
為什么會這樣呢赊瞬?假設(shè)在副本 0 和副本 l 不可用時先煎,偏移量 100-200 的消息被寫入副本 2 (首領(lǐng))。
現(xiàn)在副本 2 變?yōu)椴豢捎玫那山В北?0 變?yōu)榭捎玫氖硇8北?0 只包含偏移量 O~ 100 的消息,不包含偏移量 100~200 的悄息谤绳。
如果我們允許副本 0 成為新首領(lǐng)占锯,生產(chǎn)者就可以繼續(xù)寫人數(shù)據(jù),悄費(fèi)者可以繼續(xù)讀取數(shù)據(jù)缩筛。
于是 消略,新首領(lǐng)就有了偏移量 100~200 的新梢息。
這樣瞎抛,部分消費(fèi)者會讀取到偏移量 100~200 的舊消息疑俭,部分消費(fèi)者會讀取到偏移量 100~200 的新消息,還有部分消費(fèi)者讀取的是二者的混合。
這樣會導(dǎo)致非常不好的結(jié)果钞艇,比如生成不準(zhǔn)確的報表啄寡。
另外, 副本 2 可能會重新變?yōu)榭捎茫⒊蔀樾率最I(lǐng)的跟隨者哩照。
這個時候挺物,它會把比當(dāng)前首領(lǐng)舊的消息全部刪除,而這些消息對于所有消費(fèi)者來說都是不可用的飘弧。
簡而言之识藤,
如果我們允許不同步的副本成為首領(lǐng),那么就要承擔(dān)丟失數(shù)據(jù)和出現(xiàn)數(shù)據(jù)不一致的風(fēng)險次伶。
如果不允許它們成為首領(lǐng)痴昧,那么就要接受較低的可用性,因為我們必須等待原先的首領(lǐng)恢復(fù)到可用狀態(tài)冠王。
如果把 unclean.leader.election.enable 設(shè)為 true 赶撰,就是允許不同步的副本成為首領(lǐng), 也就是"不完全的選舉",那么我們將面臨丟失消息的風(fēng)險柱彻。
如果把這個參數(shù)設(shè)為 false ,就要等待原先的首領(lǐng)重新上線豪娜,從而降低了可用性。
我們經(jīng)秤纯看到一些對數(shù)據(jù)質(zhì)量和數(shù)據(jù)一致性要求較高的系統(tǒng)會禁用這種不完全的首領(lǐng)選舉(把這個參數(shù)設(shè)為 false) 瘤载。
銀行系統(tǒng)是這方面最好的例子,大部分銀行系統(tǒng)寧愿選擇在幾分鐘甚至幾個小時內(nèi)不處理信用卡支付事務(wù)卖擅,也不會冒險處理錯誤的消息鸣奔。
不過在對可用性要求較高的系統(tǒng)里,比如實時點(diǎn)擊流分析系統(tǒng)惩阶, 一般會啟用不完全的首領(lǐng)選舉挎狸。
///////// 3.最少同步副本
在topic級別和 broker 級別上,這個參數(shù)都叫 min.insync.replicas 琳猫。
盡管為一個主題配置了 3 個副本,還是會出現(xiàn)只有一個同步副本的情況(另外2個掛了)私痹。
如果這個同步副本變?yōu)椴豢捎闷晟覀儽仨氃诳捎眯院鸵恢滦灾g作出選擇, 這是一個兩難的選擇。
根據(jù) Kafka 對可靠性保證的定義紊遵,消息只有在被寫入到所有同步副本之后才被認(rèn)為是已提交的账千。
但如果這里的“所有副本”只包含一個同步副本,那么在這個副本變?yōu)椴豢捎脮r 暗膜,數(shù)據(jù)就會丟失匀奏。
如果要確保已提交的數(shù)據(jù)被寫入不止一個副本,就需要把最少同步副本數(shù)量設(shè)置為大一點(diǎn)的值学搜。
對于一個包含 3 個副本的主題娃善,如果 min.insync.replicas 被設(shè)為 2论衍,那么至少要存在兩個同步副本才能向分區(qū)寫入數(shù)據(jù)。
如果 3 個副本都是同步的聚磺,或者其中一個副本變?yōu)椴豢捎门魈ǎ疾粫惺裁磫栴}。
不過瘫寝,如果有兩個副本變?yōu)椴豢捎醚牙伲敲?broker 就會停止接受生產(chǎn)者的請求。
嘗試發(fā)送數(shù)據(jù)的生產(chǎn)者會收到 NotEnoughReplicasException 異常焕阿。消費(fèi)者仍然可以繼續(xù)讀取已有的數(shù)據(jù)咪啡。
實際上,如果使用這樣的配置暮屡,那么當(dāng)只剩下一個同步副本時撤摸,它就變成只讀了,這是為了避免在發(fā)生不完全選舉時數(shù)據(jù)的寫入和讀取出現(xiàn)非預(yù)期的行為栽惶。
為了從只讀狀態(tài)中恢復(fù)愁溜,必須讓兩個不可用分區(qū)中的一個重新變?yōu)榭捎玫模ū热缰貑?broker ),并等待它變?yōu)橥降?外厂。
5.5.2 producer配置
//////// 1.發(fā)送確認(rèn)
>> acks=0 意味著如果生產(chǎn)者能夠通過網(wǎng)絡(luò)把消息發(fā)送出去冕象,那么就認(rèn)為消息已成功寫入Kafka。
>> acks=1 意味若首領(lǐng)在收到消息并把它寫入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤上)時會返回確認(rèn)或錯誤響應(yīng)汁蝶。
>> acks=all 意味著首領(lǐng)在返回確認(rèn)或錯誤響應(yīng)之前渐扮,會等待所有同步副本都收到悄息。
如果和 min.insync.replicas 參數(shù)結(jié)合起來掖棉,就可以決定在返回確認(rèn)前至少有多少個副本能夠收到悄息 墓律。
這是最保險的做也一一生產(chǎn)者會一直重試直到消息被成功提交。
//////// 2.配置生產(chǎn)者的重試參數(shù)
生產(chǎn)者需要處理的錯誤包括兩部分 :
一部分是生產(chǎn)者可以自動處理的錯誤幔亥,還有一部分是需要開發(fā)者手動處理的錯誤耻讽。
LEADER_NOT_AVAILABLE等是可重試的;
序列化錯誤 &
5.5.3 consumer配置
///////// 1.group.ld
如果兩個消費(fèi)者具有相同的 group.ld,井且訂閱了同 一個主題帕棉,那么每個消費(fèi)者會分到主題分區(qū)的一個子集针肥,
也就是說它們只能讀到所有消息的一個子集(不過群組會讀取主題所有的消息)。
如果你希望消費(fèi)者可以看到主題的所有消息香伴,那么需要為它們設(shè)置唯一的 group.ld慰枕。
///////// auto.offset.reset
這個參數(shù)指定了在沒有偏移量可提交時(比如消費(fèi)者第 l 次啟動時)或者請求的偏移量在 broker 上不存在時,消費(fèi)者會做些什么即纲。
這個參數(shù)有兩種配置 具帮。
一種是 earliest ,如果選擇了這種配置,消費(fèi)者會從分區(qū)的開始位置讀取數(shù)據(jù)蜂厅,不管偏移量是否有效匪凡,這樣會導(dǎo)致消費(fèi)者讀取大量的重復(fù)數(shù)據(jù),但可以保證最少的數(shù)據(jù)丟失葛峻。
一種是 latest 锹雏,如果選擇了這種配置, f肖費(fèi)者會從分區(qū)的末尾開始讀取數(shù)據(jù)术奖,這樣可以減少重復(fù)處理消息礁遵,但很有可能會錯過一些消息。
///////// enable.auto.commit
這是一個非常重要的配置參數(shù)采记,你可以讓悄費(fèi)者基于任務(wù)調(diào)度自動提交偏移量 佣耐,也可以在代碼里手動提交偏移量。
自動提交的一個最大好處是唧龄,在實現(xiàn)消費(fèi)者邏輯時可以少考慮一些問題兼砖。
如果你在消費(fèi)者輪詢操作里處理所有的數(shù)據(jù),那么自動提交可以保證只提交已經(jīng)處理過的偏移量既棺。
自動提交的主要缺點(diǎn)是讽挟,無怯控制重復(fù)處理消息(比如消費(fèi)者在自動提交偏移量之前停止處理悄息),
而且如果把消息交給另外一個后臺線程去處理丸冕,自動提交機(jī)制可能會在消息還沒有處理完畢就提交偏移量耽梅。
///////// auto.commit.interval.ms 與 enable.auto.commit 有直接的聯(lián)系
如果選擇了自動提交偏移量,可以通過該參數(shù)配置提交的頻度胖烛, 默認(rèn)值是每 5 秒鐘提交一次眼姐。
一般來說,頻繁提交會增加額外的開銷佩番,但也會降低重復(fù)處理消息的概率众旗。
參考資源
http://www.reibang.com/p/02fdcb9e8784 (保證消息按順序執(zhí)行--源自中華石杉)
http://www.reibang.com/p/8ed16edc73e4 (保證數(shù)據(jù)不丟失--源自中華石杉)
http://www.reibang.com/p/172295e2e978 (保證消息重復(fù)消費(fèi)--源自中華石杉)
http://www.importnew.com/23199.html
https://blog.csdn.net/linke1183982890/article/details/83303003
https://blog.csdn.net/b6ecl1k7bs8o/article/details/80251930 (consumer消費(fèi)策略)
https://www.cnblogs.com/wolf-bin/p/9085370.html (producer發(fā)送策略)
https://www.aboutyun.com//forum.php/?mod=viewthread&tid=9341&extra=page%3D1&page=1&(kafka詳解)
https://www.cnblogs.com/gxc2015/p/9835837.html (kafka配置詳解)
https://www.cnblogs.com/yinchengzhe/p/5111648.html(算法)
https://blog.csdn.net/qq_35457078/article/details/88838511 (consumer手動提交 offset)
http://www.reibang.com/p/f62099d174d9 (性能優(yōu)化-->0.8版本)
https://segmentfault.com/a/1190000019147699 (日志文件, 清理策略)
https://www.cnblogs.com/jasongj/p/6760039.html (replication 數(shù)據(jù)復(fù)制)
https://www.cnblogs.com/binyue/p/10308754.html (kafka為什么這么快)
《kafka權(quán)威指南》[美] Neha Narkhede Gwen Shapira Todd Palino 著, 薛命燈譯