Kafka
官方網(wǎng)站:http://kafka.apache.org/quickstart
消息隊(duì)列&分布式流處理
1. 單機(jī)搭建測試
Step 1: Download the code
安裝包下載:
http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
下載煤傍,解壓
Step 2: Start the server
-daemon為可選丁存,后臺運(yùn)行
> bin/kafka-server-start.sh [-daemon] config/server.properties
Step 3: Create a topic
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Step 4: Send some messages
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Step 5: Start a consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
2. 集群搭建測試
Step 6: Setting up a multi-broker cluster
[以下操作均需要在所以機(jī)器上執(zhí)行]
機(jī)器信息
192.168.8.10嗡载,192.168.8.11,192.168.8.12
編輯server.properties:
#broker.id需要每臺節(jié)點(diǎn)唯一
broker.id=1
#以下配置統(tǒng)一
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.8.10:2181,192.168.8.11:2181,192.168.8.12:2181
然后每臺機(jī)器分別執(zhí)行
> bin/kafka-server-start.sh -daemon config/server.properties
創(chuàng)建一個帶副本的topic
> bin/kafka-topics.sh --create --zookeeper 192.168.8.10:2181 --replication-factor 2 --partitions 1 --topic taotao
> Created topic taotao.
查看Topic信息
> bin/kafka-topics.sh --describe --zookeeper 192.168.8.10:2181 --topic taotao
> Topic:taotao PartitionCount:1 ReplicationFactor:2 Configs:
Topic: taotao Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
leader" 是負(fù)責(zé)給定分區(qū)的所有讀寫的節(jié)點(diǎn)奈籽。每個節(jié)點(diǎn)都是分區(qū)中隨機(jī)選擇的一部分的leader。
"replicas" 是指存在的于哪個或者哪些個broker上,不管broker是否存活
"isr" 是指存在的于哪個或者哪些個broker上疾宏,且broker是存活狀態(tài)
也就是說昭卓,Kafka數(shù)據(jù)寫入成功后愤钾,只要保證一個節(jié)點(diǎn)也就是isr至少有一個存活狀態(tài)的broker,就能正常處理消息
Kafka的基本原理
數(shù)據(jù)存放是.index索引文件和.log數(shù)據(jù)文件候醒,Kafka消費(fèi)數(shù)據(jù)時需要查找offset也就是偏移量绰垂,這個偏移量的索引就在.index里面,如若有多個索引文件火焰,則采用二分查找法查找索引文件
1. Kafka分區(qū)策略
1.1 分區(qū)原因
1.1 方便在集群中擴(kuò)展劲装,每個partition可以通過調(diào)整以適應(yīng)所在的機(jī)器,而一個topic又可以有多個Partition組成昌简,所以集群就可以適應(yīng)任意數(shù)據(jù)量的大小
1.2 可以提高并發(fā)占业,可以按照Partition為單位讀寫了
1.2 副本同步策略
方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
1. 半數(shù)以上完成同步就發(fā)送ack | 延遲低 | 選舉新Leader時,容忍n臺節(jié)點(diǎn)故障纯赎,需要有2n+1個節(jié)點(diǎn)才可以 |
2. 全部完成數(shù)據(jù)同步才發(fā)送ack | 選舉新Leader時谦疾,容忍n臺節(jié)點(diǎn)故障,只要有n+1個副本就可以 | 延遲高 |
問題: 基于方案2犬金,若當(dāng)Leader收到數(shù)據(jù)念恍,所有follower開始同步數(shù)據(jù)六剥,此時某個節(jié)點(diǎn)故障且無法發(fā)送ack給Leader,就無法完成同步狀態(tài)的更新
解決方式:
- 這時就引入了ISR的概念峰伙,ISR表示"保持和Leader同步"的節(jié)點(diǎn)疗疟,若節(jié)點(diǎn)長時間未向Leader同步數(shù)據(jù),則踢出ISR列表瞳氓,這個時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定策彤,這時當(dāng)Leader掛掉時,就會從ISR例表里面選取新的Leader
在Kafka0.9.0.0版本開始匣摘,移除了replica.lag.max.message 參數(shù)被移除店诗,因?yàn)榕袛嗍欠窦尤隝SR的條件有兩個,兩個條件為或的關(guān)系音榜,且不同時滿足庞瘸,則會頻繁加入ISR踢出ISR,同時也頻繁的操作Zookeeper的元數(shù)據(jù)信息赠叼,所以新版本移除了其中一個恕洲,移除了判斷條數(shù)的參數(shù)
2. Kafka數(shù)據(jù)一致性
2.1. LEO(Log End Offset):
每一個副本的最后一個Offset
2.2. HW(Hight Watermark):
所有副本中最小的LEO,也就是Consumer可以獲取到的最大的Offset梅割,這句很重要
[圖片上傳失敗...(image-66c844-1583334746512)]
3. 故障處理:
3.1. Follower故障:
follower 發(fā)生故障后會被臨時踢出 ISR霜第,待該 follower 恢復(fù)后, follower 會讀取本地磁盤記錄的上次的 HW户辞,并將 log 文件高于 HW 的部分截取掉泌类,從 HW 開始向 leader 進(jìn)行同步。等該 follower 的 LEO 大于等于該 Partition 的 HW底燎,即 follower 追上 leader 之后刃榨,就可以重新加入 ISR 了。
3.2. Leader故障:
leader 發(fā)生故障之后双仍,會從 ISR 中選出一個新的 leader枢希,之后,為保證多個副本之間的 數(shù)據(jù)一致性朱沃,其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉苞轿,然后從新的 leader同步數(shù)據(jù)。
* 這只能保證副本之間的數(shù)據(jù)一致性逗物,并不能保證數(shù)據(jù)不丟失或者不重復(fù)搬卒。
4. Kafka冪等性( Exactly Once 語義)
將服務(wù)器的 ACK 級別設(shè)置為-1,可以保證 Producer 到 Server 之間不會丟失數(shù)據(jù)翎卓,即 AtLeast Once 語義契邀。相對的,將服務(wù)器 ACK 級別設(shè)置為0失暴,可以保證生產(chǎn)者每條消息只會被發(fā)送一次坯门,即 At Most Once 語義微饥。At Least Once 可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)古戴;相對的欠橘, At Least Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失允瞧。 但是,對于一些非常重要的信息蛮拔,比如說交易數(shù)據(jù)述暂,下游數(shù)據(jù)消費(fèi)者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語義建炫。 在 0.11 版本以前的 Kafka畦韭,對此是無能為力的,只能保證數(shù)據(jù)不丟失肛跌,再在下游消費(fèi)者對數(shù)據(jù)做全局去重艺配。對于多個下游應(yīng)用的情況,每個都需要單獨(dú)做全局去重衍慎,這就對性能造成了很大影響转唉。0.11 版本的 Kafka,引入了一項(xiàng)重大特性:冪等性稳捆。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復(fù)數(shù)據(jù)赠法, Server 端都只會持久化一條。冪等性結(jié)合 At Least Once 語義乔夯,就構(gòu)成了 Kafka 的 Exactly Once 語義砖织。即:At Least Once + 冪等性 = Exactly Once要啟用冪等性,只需要將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可末荐。 Kafka的冪等性實(shí)現(xiàn)其實(shí)就是將原來下游需要做的去重放在了數(shù)據(jù)上游侧纯。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發(fā)往同一 Partition 的消息會附帶 Sequence Number甲脏。而Broker 端會對<PID, Partition, SeqNumber>做緩存眶熬,當(dāng)具有相同主鍵的消息提交時, Broker 只會持久化一條块请。但是 PID 重啟就會變化聋涨,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區(qū)跨會話的 Exactly Once负乡。
5. Kafka生產(chǎn)者總結(jié):
5.1. ACK問題:決定數(shù)據(jù)會不會丟失
1. (1)
5.2. ISR(In-Sync-Replica):
6. Kafka消費(fèi)
6.1 消費(fèi)方式:
Consumer是采用Pull的方式從broker中讀取數(shù)據(jù)牍白,Push很難適應(yīng)不同的消費(fèi)者,因?yàn)椴煌南M(fèi)者消費(fèi)速率差異很大抖棘,所以Push很容易造成有的Consumer消費(fèi)能力很強(qiáng)但是處理的消息太少茂腥,有的Consumer消費(fèi)能力弱狸涌,來不及處理消息。 Pull 模式不足之處是最岗,如果 kafka 沒有數(shù)據(jù)帕胆,消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)般渡。針對這一點(diǎn)懒豹,Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時會傳入一個時長參數(shù) timeout,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi)驯用,Consumer 會等待一段時間之后再返回脸秽,這段時長即為 timeout。
6.2 消費(fèi)者組案例:
需求:測試同一個消費(fèi)者組中的消費(fèi)者蝴乔,同一時刻只能有一個消費(fèi)者消費(fèi)
1.修改config/consumer.properties文件记餐,新增group.id=NAME(NAME隨意)
2.運(yùn)行兩個命令行consumer: kafka-console-consumer.sh --bootstrap-server kd-bd01:9092 --topic zhangwentao --consumer.config consumer.properties
3.運(yùn)行一個不指定--consumer.config選項(xiàng)的consumer: kafka-console-consumer.sh --bootstrap-server kd-bd01:9092 --topic zhangwentao
結(jié)論:2中的兩個consumer因?yàn)榕渲昧送粋€組,所以只能有一個人拿到消息薇正,3因?yàn)椴皇峭粋€組片酝,所以就算指定和2一樣的topic也能拿到消息