1. Kafka的介紹
Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),由Scala和Java編寫呼胚。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)毯焕,它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)挽拂。 這種動(dòng)作(網(wǎng)頁瀏覽垒玲,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素唱歧。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決朦乏。 對(duì)于像Hadoop一樣的日志數(shù)據(jù)和離線分析系統(tǒng)球及,但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案呻疹。Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理吃引,也是為了通過集群來提供實(shí)時(shí)的消息。
2. 搭建環(huán)境
JDK 10+[圖片上傳中...(1.png-2f5f56-1625274266941-0)]
Zookeeper
Kafka 2.x
.NET 5
Visual Studio 2019
Confluent.Kafka
3. Kafka集群
集群的結(jié)構(gòu)
如下圖:
- Zookeeper :對(duì)kafka選舉做集群的節(jié)點(diǎn)的選舉刽锤,可以看作是個(gè)數(shù)據(jù)庫镊尺,可做分布式鎖,可實(shí)現(xiàn)強(qiáng)一致性并思。
以下是選舉流程
kafka每個(gè)節(jié)點(diǎn)服務(wù)運(yùn)行后庐氮,首先向zk中注冊(cè) watcher ,注冊(cè)成功后watcher就與該節(jié)點(diǎn)之間產(chǎn)生心跳宋彼,運(yùn)行一段時(shí)間后當(dāng)主節(jié)點(diǎn)宕機(jī)后弄砍,其對(duì)應(yīng)的zk中的節(jié)點(diǎn)注冊(cè)也會(huì)消失。同時(shí)激活watcher宙暇,讀取剩下的所有節(jié)點(diǎn)確定宕機(jī)的節(jié)點(diǎn)的分區(qū)與消費(fèi)信息输枯,然后從剩下的節(jié)點(diǎn)中做選舉。其中選舉有三種情況:
正常選舉:剩下的節(jié)點(diǎn)向ZK 發(fā)送指令LeadersandISR占贫,寫的快的節(jié)點(diǎn)為新的leader桃熄。
剩下一個(gè)節(jié)點(diǎn) ,這個(gè)節(jié)點(diǎn)直接成為leader。
所有節(jié)點(diǎn)都宕機(jī)瞳收,等待其中一個(gè)恢復(fù)中碉京。
4. 相關(guān)術(shù)語
- Producer(生產(chǎn)者):生產(chǎn)數(shù)據(jù)對(duì)應(yīng)客戶端。
- Consumer(消費(fèi)者):負(fù)責(zé)處理kafka服務(wù)里面消息螟深。
- Consumer Group/Consumers(消費(fèi)者組):就是kafka獨(dú)特處理輪詢還是廣播谐宙。
輪詢:消費(fèi)者每一個(gè)處理一條。
廣播:一條信息界弧,多個(gè)消費(fèi)者同時(shí)處理凡蜻。 - Broker(服務(wù)):就是kafka服務(wù),一個(gè)Broker可以創(chuàng)建多個(gè)topic垢箕。
- Topic(主題):寫入broker的主題划栓,一個(gè)kafka集群里面可以有多個(gè)Topic,為了區(qū)分業(yè)務(wù)和模塊使用。
- Partition(分區(qū)):把一個(gè)topic的信息分成幾個(gè)區(qū)条获,利用多個(gè)節(jié)點(diǎn)把多個(gè)分區(qū)忠荞,放在不同節(jié)點(diǎn)上面,實(shí)現(xiàn)負(fù)載均衡帅掘,kafka內(nèi)部實(shí)現(xiàn)的委煤。
- Replica(副本):防止主節(jié)點(diǎn)宕機(jī)數(shù)據(jù)丟失了,保證高可用修档。
- Offset(偏移量):就是消息的主鍵碧绞,生產(chǎn)者寫入數(shù)據(jù)后返回的偏移量,消費(fèi)者消費(fèi)數(shù)據(jù)知道數(shù)據(jù)消費(fèi)的位置萍悴,防止重復(fù)消費(fèi)头遭。
5. Kafka集群消息管理
以下是示意圖:
- 生產(chǎn)者生成的消息對(duì)Key做Hash 后做相應(yīng)的規(guī)則區(qū)分放到 分區(qū)0/1 中的Leader中,Leader 會(huì)內(nèi)部把數(shù)據(jù)備份到其他broker的備份中癣诱,這樣的交叉?zhèn)浞莸暮锰幘褪钱?dāng)其中一個(gè)broker 宕機(jī)后计维,不會(huì)導(dǎo)致數(shù)據(jù)的丟失。
6. Kafka的搭建
6.1 單機(jī)版(Docker 搭建)
搭建使用docker-compose.yml
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "6181:2181"
container_name: zookeeper_kafka
# kafka version: 1.1.0
# scala version: 2.12
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 81.70.91.63
# zoo1:2181 也可以改成:81.70.91.63:6181
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
- zoo1
container_name: kafka
運(yùn)行:
到docker-compose.yml 所在的目錄下運(yùn)行:docker-compose up -d
6.2 集群版(Docker 搭建)
首先會(huì)對(duì)zookeeper 做集群
以下是 zk.yml 文件
version: '3.4'
services:
zoo1:
image: zookeeper:3.4
restart: always
hostname: zoo1
container_name: zoo1
ports:
- 2184:2181
volumes:
- "/szw/volume/zkcluster/zoo1/data:/data"
- "/szw/volume/zkcluster/zoo1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.11
zoo2:
image: zookeeper:3.4
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2185:2181
volumes:
- "/szw/volume/zkcluster/zoo2/data:/data"
- "/szw/volume/zkcluster/zoo2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.12
zoo3:
image: zookeeper:3.4
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2186:2181
volumes:
- "/szw/volume/zkcluster/zoo3/data:/data"
- "/szw/volume/zkcluster/zoo3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
networks:
kafka:
ipv4_address: 172.19.0.13
networks:
kafka:
external:
name: kafka
Kafka 做集群
以下是:kafka.yml 文件
version: '3.4'
services:
kafka1:
image: wurstmeister/kafka
restart: always
hostname: kafka1
container_name: kafka1
privileged: true
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9092
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka1/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.14
kafka2:
image: wurstmeister/kafka
restart: always
hostname: kafka2
container_name: kafka2
privileged: true
ports:
- 9093:9093
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_LISTENERS: PLAINTEXT://kafka2:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9093
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka2/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.15
kafka3:
image: wurstmeister/kafka
restart: always
hostname: kafka3
container_name: kafka3
privileged: true
ports:
- 9094:9094
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_LISTENERS: PLAINTEXT://kafka3:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9094
KAFKA_ADVERTISED_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
volumes:
- /szw/volume/kfkluster/kafka3/logs:/kafka
external_links:
- zoo1
- zoo2
- zoo3
networks:
kafka:
ipv4_address: 172.19.0.16
networks:
kafka:
external:
name: kafka
界面管理工具:用來監(jiān)控集群管理
以下是:kafkamanage.yml 文件
version: "3.4"
services:
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- 9000:9000
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
kafka:
ipv4_address: 172.19.0.17
networks:
kafka:
external:
name: kafka
啟動(dòng)
#docker-compose 默認(rèn)會(huì)創(chuàng)建網(wǎng)絡(luò)撕予,不需要手動(dòng)執(zhí)行
#如果執(zhí)行錯(cuò)誤,則需要?jiǎng)h除其他的network
#docker network ls
#查看詳細(xì)的 network
#docker network inspect name/id
#
docker network create --driver bridge --subnet 172.19.0.0/16 --gateway 172.19.0.1 kafka
docker-compose -f zk.yml -f kafka.yml -f kafkamanage.yml up -d
安裝成功:
訪問 kafkamanage 管理工具的地址:(http://81.70.91.63:9000/)
注意:添加cluster 和 Zookeeper host 注意默認(rèn)數(shù)值提示鲫惶,保存。
添加成功后:
7. 代碼對(duì)接
7.1 生產(chǎn)者
Nuget:安裝Confluent.Kafka包
對(duì)接代碼:
public static async Task Produce(string brokerlist, string topicname, string content)
{
string brokerList = brokerlist;
string topicName = topicname;
var config = new ProducerConfig
{
BootstrapServers = brokerList,
Acks = Acks.All,
// 冪等性实抡,保證不會(huì)丟數(shù)據(jù)欠母。
EnableIdempotence = true,
//信息發(fā)送完,多久數(shù)據(jù)發(fā)送到broker里面吆寨。
LingerMs = 10000,
BatchNumMessages = 2,//字節(jié)數(shù)
// 只要上面的兩個(gè)要求符合一個(gè)赏淌,則后臺(tái)線程立刻馬上把數(shù)據(jù)推送給broker
// 可以看到發(fā)送的偏移量,如果沒有偏移量啄清,則就是沒有寫成功
MessageSendMaxRetries = 3,//補(bǔ)償重試六水,發(fā)送失敗了則重試
// Partitioner = Partitioner.Random
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
var deliveryReport = await producer.
ProduceAsync(
topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
}
}
注意:
- 生產(chǎn)端寫ack ,消費(fèi)端不需要。
- ACK 保證數(shù)據(jù)不丟失但是會(huì)影響到我們性能掷贾。越高級(jí)別數(shù)據(jù)不丟失睛榄,則寫入的性能越差。
- 建議使用異步想帅,性能比較好场靴。
- ProduceAsync 中的Key:Key 注意是做負(fù)載均衡,比如港准,有三個(gè)節(jié)點(diǎn)旨剥,一個(gè)topic,創(chuàng)建了三個(gè)分區(qū)叉趣。一個(gè)節(jié)點(diǎn)一個(gè)分區(qū)泞边,如果寫入的數(shù)據(jù)的時(shí)候该押,沒有寫key,會(huì)導(dǎo)致疗杉,所有的數(shù)據(jù)存放到一個(gè)分區(qū)上面。如果用了分區(qū)蚕礼,必須要寫key .根據(jù)自己的業(yè)務(wù)烟具,可以提前配置好。key的隨機(jī)數(shù)奠蹬,可以根據(jù)業(yè)務(wù)朝聋,搞一個(gè)權(quán)重,如果節(jié)點(diǎn)的資源不一樣囤躁,合理利用資源冀痕。
- 數(shù)據(jù)寫入如果默認(rèn)一個(gè)分區(qū),則是有順序狸演,如果是多個(gè)分區(qū)言蛇,則不能保證數(shù)據(jù)的順序。
8. 數(shù)據(jù)的可靠性
數(shù)據(jù)的可靠性就是保證數(shù)據(jù)能寫入Broker宵距,并且在Broker宕機(jī)后重新選舉出來的Leader也不會(huì)導(dǎo)致數(shù)據(jù)的丟失腊尚,這樣就關(guān)系到了ACK的返回機(jī)制。
以下是Kafka的數(shù)據(jù)寫入流程满哪。
對(duì)于ACK的返回策略有兩種:1. 半數(shù)以上的Follower完成同步返回ACK 婿斥,2. 全部的Follower完成同步返回ACK。
以下是兩種ACK的優(yōu)缺點(diǎn)對(duì)比
方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
半數(shù)以上完成同步發(fā)送ACK | 延遲低 | 選舉Leader哨鸭,容忍N(yùn)臺(tái)節(jié)點(diǎn)故障民宿,需要2N+1個(gè)副本(數(shù)據(jù)的大量冗余) |
全部完成發(fā)送ACK | 選舉Leader,容忍N(yùn)臺(tái)節(jié)點(diǎn)故障像鸡,需要N+1個(gè)副本 | 延遲高 |
最后Kafka選用了方案2 活鹰,全部完成發(fā)送ACK。
9. ISR(同步副本集)
ISR: 同步的副本集合,維護(hù)當(dāng)前還在存活辦事的副本华望,由于數(shù)據(jù)可靠性的選擇蕊蝗,解決防止當(dāng)一個(gè)副本出現(xiàn)問題時(shí)候,不能正常的返回ack赖舟。
維護(hù)ISR的原理:維護(hù)是由ZK 完成的蓬戚,一般的判斷斷線是:心跳與數(shù)據(jù)備份量。
如果是根據(jù)數(shù)據(jù)的備份量:leader和副本數(shù)據(jù)相差一定條數(shù)宾抓,則就認(rèn)為副本節(jié)點(diǎn)斷開子漩,然后從isr移除,當(dāng)數(shù)據(jù)備份跟的上來石洗,然后又重新加入到ISR集合幢泼。
心跳:一定的時(shí)間沒有進(jìn)行心跳。 超過配置時(shí)間讲衫,則認(rèn)為斷開連接缕棵,從ISR中移除當(dāng)心跳跟的上,在進(jìn)入ISR集合涉兽。
kafka是個(gè)高吞吐的消息隊(duì)列招驴, 發(fā)送數(shù)據(jù)的時(shí)候,有批量發(fā)送的功能枷畏,每次發(fā)數(shù)據(jù)的可以發(fā)送大量的數(shù)據(jù)别厘,這個(gè)是可配的。 所以如果根據(jù)條數(shù)拥诡,則副本節(jié)點(diǎn)會(huì)經(jīng)常性的從ISR移除和加入触趴。 因?yàn)檫@種考慮,kafka的開發(fā)者渴肉,選擇使用根據(jù)時(shí)間來判斷冗懦。
其中控制批量發(fā)送的條數(shù)就是: BatchNumMessages(可通過生產(chǎn)者代碼對(duì)接中找到):當(dāng)內(nèi)存的數(shù)據(jù)條數(shù)達(dá)到了,立刻馬上發(fā)送到Broker
10. ACKS
系統(tǒng)提供的ACK 設(shè)置有三種:-1宾娜,0批狐, 1。默認(rèn)設(shè)置是0前塔。
10.1 ACKS為0
Broker接收到數(shù)據(jù)立刻返回到生產(chǎn)者ACK嚣艇,并且在數(shù)據(jù)做持久化之前。如下圖:
優(yōu)點(diǎn):性能最高华弓。
缺點(diǎn):丟失數(shù)據(jù)概率也最大食零。當(dāng)Leader接收到數(shù)據(jù),但是還沒有持久化時(shí)宕機(jī)寂屏,會(huì)導(dǎo)致數(shù)據(jù)的丟失贰谣。
使用場景:日志系統(tǒng)娜搂,IOT設(shè)備狀態(tài)信息上傳。
10.2 ACKS為1
Broker接收到數(shù)據(jù)并且做完持久化落盤后返回到生產(chǎn)者ACK吱抚。如下圖:
優(yōu)點(diǎn):性能中等百宇。
缺點(diǎn):有丟失數(shù)據(jù)概率也最大。當(dāng)Leader接收到數(shù)據(jù)秘豹,携御。持久化后,沒有做Follower Leader宕機(jī)既绕。新的Follower 沒有更新下Leader最新數(shù)據(jù)啄刹,然后選舉成了Leader。只有通過人工介入找回?cái)?shù)據(jù)凄贩。
使用場景:暫沒發(fā)現(xiàn)誓军。
10.3 ACKS為-1
Leader和所有的Follower全部落盤成功后返回ACK。如下圖:
優(yōu)點(diǎn):數(shù)據(jù)不會(huì)丟失疲扎。
缺點(diǎn):
- 導(dǎo)致新的問題爆出昵时,冪等性問題。導(dǎo)致冪等性問題的原因?yàn)椋寒?dāng)數(shù)據(jù)都備份完成评肆,要返回ACK時(shí)候leader宕機(jī)债查,新的副本替代成為Leader,生產(chǎn)者因?yàn)闆]有收到ACK瓜挽,所以補(bǔ)償重試,再次發(fā)送信息導(dǎo)致數(shù)據(jù)重復(fù)征绸。
- 性能比較低下久橙,原因?yàn)椋?. 數(shù)據(jù)的備份。2. 如果加入冪等性管怠,服務(wù)端會(huì)驗(yàn)證數(shù)據(jù)的唯一性淆衷。
- 使用場景:使用不多。
11. 冪等性
var config = new ProducerConfig
{
BootstrapServers = brokerList,
Acks = Acks.All,
//開啟冪等性的配置
EnableIdempotence = true,
}
開啟冪等性:發(fā)送的數(shù)據(jù)的時(shí)候渤弛,服務(wù)端會(huì)驗(yàn)證信息的唯一性祝拯,如果之前發(fā)送過,就不在接受她肯,然后只會(huì)保留一條相同的信息佳头。
冪等性是由:消息ID,客戶端ID晴氨,分區(qū)Key 共同的組成康嘉。
12. 生產(chǎn)者事務(wù)
如果發(fā)送的消息的Topic,是在多個(gè)分區(qū)籽前,需要用事務(wù)的模式來保證多個(gè)分區(qū)的冪等性亭珍。
示例如下:
string brokerList = "192.168.1.2:9092,192.168.1.3:9093,192.168.1.4:9094";
// 不同的topic的testtransactionalId就不同
string topicName = "test";
// 不一樣的topic敷钾,transactionalId就寫的不一樣。肄梨。
string transactionalId = "transtest1";
var config = new ProducerConfig
{
BootstrapServers = brokerList,
EnableIdempotence = true,
Acks = Acks.All,
TransactionalId = transactionalId,
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
//初始化事務(wù)
producer.InitTransactions(DefaultTimeout);
var currentState = ProducerState.InitState;
producer.BeginTransaction();
for (int i = 100; i < 110; i++)
{
var content = i.ToString();
producer.Produce(
topicName, new Message<string, string> { Key = content, Value = content });
}
//提交
producer.CommitTransaction(DefaultTimeout);
}
catch (Exception ex)
{
//回滾
producer.AbortTransaction(DefaultTimeout);
Console.WriteLine(ex.Message);
}
}
如果:寫入數(shù)據(jù)有一個(gè) 節(jié)點(diǎn)的Leader 失敗阻荒,就會(huì)自動(dòng)的通知其他的Leader 做書的回滾。
13. 高效的原理
13.1 批量發(fā)送
-
生產(chǎn)者發(fā)送數(shù)據(jù)是批量的
生產(chǎn)者在進(jìn)行數(shù)據(jù)的寫入的時(shí)候众羡,會(huì)有兩個(gè)線程在維護(hù)财松,a.寫數(shù)據(jù)的線程,b.后臺(tái)線程纱控。a.線程負(fù)責(zé)把數(shù)據(jù)寫入到生產(chǎn)者維護(hù)的緩存區(qū)中辆毡。b.線程負(fù)責(zé)把緩存區(qū)的數(shù)據(jù)寫入到Kafka 中。
而控制b線程進(jìn)入Kafka的變量就是:
LingerMs = 10000, //時(shí)間毫秒為單位
BatchNumMessages = 2,//字節(jié)數(shù)
以上條件有一個(gè)就滿足則立馬寫入Kafka節(jié)點(diǎn)中甜害。消費(fèi)端消費(fèi)數(shù)據(jù)的時(shí)候是批量的
13.2 順序讀寫
大數(shù)據(jù)處理一般都是做的順序讀寫:
增:順序?qū)懭刖涂梢浴?/p>
修改:也是順序的寫入舶掖,后臺(tái)線程會(huì)去處理,合并修改的數(shù)據(jù)尔店。
刪除:也是順序的寫入眨攘,后臺(tái)線程會(huì)去處理,合并刪除的數(shù)據(jù)嚣州。
所以鲫售,這類數(shù)據(jù)處理適合處理大量寫入的數(shù)據(jù),少了修改和刪除的數(shù)據(jù)该肴,因?yàn)檫@樣會(huì)降低數(shù)據(jù)處理性能情竹,并且修改,刪除的數(shù)據(jù)處理也會(huì)有延時(shí)匀哄。
13.3 零拷貝
傳統(tǒng)的數(shù)據(jù)處理是:三次數(shù)據(jù)的拷貝:磁盤->內(nèi)核->用戶進(jìn)程->內(nèi)核 秦效,目的就是為了保證資源的安全性。以下是示意圖:
是由Linux 系統(tǒng)實(shí)現(xiàn)的一
種快捷的方式涎嚼。減少了內(nèi)核到用戶阱州,用戶到內(nèi)核的兩次拷貝。如下圖:
14. 消費(fèi)者
消費(fèi)者:有兩種方式法梯,一種是推送苔货,一種是拉去。
14.1 推送
kafka主動(dòng)去推數(shù)據(jù)立哑,如果遇到高并發(fā)的時(shí)候夜惭,可能消費(fèi)端還沒有把之前的數(shù)據(jù)處理完,然后強(qiáng)推了大量的數(shù)據(jù)過來刁憋,有可能造成我們消費(fèi)端的掛機(jī)
14.2 拉取
消費(fèi)端主動(dòng)的去拉取滥嘴,可能存在數(shù)據(jù)延遲消費(fèi),不會(huì)造成我們消費(fèi)端的宕機(jī)至耻,同樣的存在一個(gè)微循環(huán)若皱,不停的拉取數(shù)據(jù)镊叁。
總結(jié):kafka 選取的是拉取的模式去消費(fèi)數(shù)據(jù)。對(duì)比同樣的MQ 走触,rabbitmq 則晦譬,既可以使用拉取,也可以使用推送(推送的時(shí)候可以設(shè)置限流的方式)的方式去消費(fèi)互广。
14.3 消費(fèi)的偏移量Offset
0.9版本之前的編譯量是由Zookeeper保存的維護(hù)的敛腌。
0.9版本之后是由自己維護(hù)(topic: __consumer_offsets)的。
14.4 代碼的對(duì)接
public static void Run_Consume(string brokerList, List<string> topics, string group)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = group,
// 有些屬性可以寫惫皱,但是沒有用到
//Acks = Acks.All,
//消費(fèi)方式自動(dòng)提交
EnableAutoCommit = false,
//消費(fèi)模式
AutoOffsetReset = AutoOffsetReset.Earliest,
//EnablePartitionEof = true,
//PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range,
//FetchMaxBytes =,
//FetchWaitMaxMs=1,
//代表數(shù)據(jù)超過了6000沒有處理完業(yè)務(wù)像樊,則把數(shù)據(jù)給其他消費(fèi)端
//一定要注意。SessionTimeoutMs值一定要小于MaxPollIntervalMs
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 10000,
};
const int commitPeriod = 1;
//提交偏移量的時(shí)候,也可以批量去提交
using (var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")).SetPartitionsAssignedHandler((c, partitions) =>
{
//自定義存儲(chǔ)偏移量
//1.每次消費(fèi)完成旅敷,把相應(yīng)的分區(qū)id和offset寫入到mysql數(shù)據(jù)庫存儲(chǔ)
//2.從指定分區(qū)和偏移量開始拉取數(shù)據(jù)
//分配的時(shí)候調(diào)用
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
#region 指定分區(qū)消費(fèi)
// 之前可以自動(dòng)均衡,現(xiàn)在不可以了
//List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
//// 我當(dāng)前讀取所有的分區(qū)里面的從10開始
//foreach (var item in partitions)
//{
// topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
//}
//return topics;
#endregion
}) .SetPartitionsRevokedHandler((c, partitions) =>
{
//新加入消費(fèi)者的時(shí)候調(diào)用
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
}).Build())
{
//消費(fèi)者會(huì)影響在平衡分區(qū)生棍,當(dāng)同一個(gè)組新加入消費(fèi)者時(shí),分區(qū)會(huì)在分配
consumer.Subscribe(topics);
try
{
// 死循環(huán) 拉取模式
while (true)
{
try
{
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
continue;
}
Console.WriteLine($": {consumeResult.TopicPartitionOffset}::{consumeResult.Message.Value}");
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
//提交偏移量媳谁,數(shù)據(jù)自己已經(jīng)處理完成了
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
//調(diào)用方式
Consumer("192.168.1.10:9092,192.168.1.11:9093,192.168.1.12:9094", "test", "groupname");
- 自動(dòng)ACK:Acks = Acks.All 標(biāo)記是無效的
- 消費(fèi)提交:EnableAutoCommit = false涂滴,自動(dòng)提交服務(wù)端數(shù)據(jù)已經(jīng)消費(fèi),服務(wù)端標(biāo)記本數(shù)據(jù)為晴音。一般設(shè)置為:false柔纵。如果設(shè)置成True自動(dòng)提交,在接收到數(shù)據(jù)后處理過程出現(xiàn)異常锤躁,會(huì)導(dǎo)致無法重復(fù)消費(fèi)這個(gè)數(shù)據(jù)搁料,丟失。如設(shè)置成false进苍,可能導(dǎo)致數(shù)據(jù)的重復(fù)消費(fèi)加缘,比如手動(dòng)提交時(shí)候服務(wù)器斷開。重新連接后重復(fù)消費(fèi)觉啊,解決辦法就是:發(fā)送消息加上一個(gè)唯一ID,消費(fèi)了就加入到Redis 中沈贝,下次來了判斷數(shù)據(jù)是否消費(fèi)過杠人,沒消費(fèi)就重新消費(fèi)创夜,消費(fèi)了就提交诵冒。
- 消費(fèi)模式:AutoOffsetReset:AutoOffsetReset.Latest 即:0,表示消費(fèi)者消費(fèi)啟動(dòng)之后的數(shù)據(jù)丽声。啟動(dòng)之前的服務(wù)端還沒消費(fèi)的數(shù)據(jù)消費(fèi)不到蜓氨。AutoOffsetReset.Earliest = 1,每次都從頭開始消費(fèi)沒有消費(fèi)過的數(shù)據(jù)(推薦模式)赫舒。AutoOffsetReset.Error = 2.淋样。報(bào)錯(cuò)后無法消費(fèi)嘹悼。
- 消費(fèi)端:組和組之間是廣播模式排苍,組內(nèi)是根據(jù)分區(qū)數(shù)量枝笨。多個(gè)客戶端去消費(fèi)袁铐,如果是組相同則自動(dòng)做負(fù)載均衡揭蜒,開啟的消費(fèi)的相同組客戶端最大數(shù)量等于分區(qū)的數(shù)量,開啟多出來的客戶端消費(fèi)不到數(shù)據(jù)剔桨。重新連接新的組則從新的消費(fèi)該組沒有消費(fèi)的數(shù)據(jù)與別的組消費(fèi)數(shù)據(jù)無關(guān)屉更。 原則上是:topic的數(shù)量=broker的數(shù)量,broker數(shù)量=分區(qū)數(shù)量洒缀,分區(qū)數(shù)量=一個(gè)組內(nèi)的消費(fèi)者數(shù)量瑰谜。
15. 異常情況消費(fèi)
當(dāng)Leader offset 為8 宕機(jī) Follow1 offset 為7 Follow2 offset 為6 Follow2選擇為Leader 。消費(fèi)數(shù)據(jù)的時(shí)候是能從8 消費(fèi)的树绩,但是Follow2 最為主節(jié)點(diǎn)時(shí)候萨脑,沒有8 。這種情況Kafka是這樣處理的饺饭。
Kafka使用的 LEO和HW的機(jī)制去處理的渤早。
LEO:指的是每個(gè)副本最大的 offset。
HW:指的是消費(fèi)者能見到的最大的 offset砰奕,ISR隊(duì)列中最小的 LEO蛛芥。
這樣就保證了消費(fèi)者看到的是全部備份完的的偏移量了。
16. 高級(jí)消費(fèi)
如果當(dāng)前超過時(shí)間沒有消費(fèi)完成军援,則返回給另一個(gè)分區(qū)去消費(fèi)仅淑,以下是設(shè)置參數(shù)。
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 10000,
注意: MaxPollIntervalMs的值必須大于等于SessionTimeoutMs
上面類似心跳胸哥,如果消費(fèi)水平太慢涯竟,則會(huì)引起重新分配
17. 文件儲(chǔ)存機(jī)制
文件的架構(gòu):通過小文件的不斷合并最后轉(zhuǎn)成了一個(gè)大文件。結(jié)構(gòu)如圖:
保存文件的結(jié)構(gòu):
000000000000000000000000.log
000000000000000000000000.index
000000000000000000000700.log
000000000000000000000700.index
000000000000000000002000.log
000000000000000000002000.index
注意:log文件和index 文件是成對(duì)出現(xiàn)的空厌,000000000000000000000000.log 保存的是 0-699 的數(shù)據(jù)庐船。
數(shù)據(jù)的查找:通過文件的名字,使用的二分發(fā)做的查找嘲更。如下圖:
注意:除了爆露出來的偏移量之外筐钟,Kafka 內(nèi)部還key值對(duì)應(yīng)與log文件中。
18. 日志的壓縮策略
kafka定期將相同key的消息進(jìn)行合并赋朦,只保留最新的value值篓冲。
保存的每一條數(shù)據(jù),會(huì)記錄是增加宠哄,刪除壹将,還是修改。
19. 自定義存儲(chǔ)
自定義存儲(chǔ)(解決重復(fù)消費(fèi))
- 每次消費(fèi)完成毛嫉,把相應(yīng)的分區(qū)和offset寫入到mysql數(shù)據(jù)庫诽俯。
- 從指定分區(qū)和偏移量開始拉取數(shù)據(jù)。
.SetPartitionsAssignedHandler((c, partitions) =>
{ //獲取mysql存儲(chǔ)結(jié)果承粤,從當(dāng)前開始獲取
foreach (var item in partitions)
{
topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
}
})
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList());
20.注意:
- 消費(fèi)者 一般使用workservice暴区。
- 消費(fèi)端只關(guān)心topic和偏移量闯团,其余不關(guān)心。
- 保留7天颜启,kafka可以配置偷俭。默認(rèn)7天,消息積壓有處理缰盏。
- 消費(fèi)數(shù)據(jù)先消費(fèi)Leader 的涌萤。