Kafka

1. Kafka的介紹

Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),由ScalaJava編寫呼胚。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)毯焕,它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)挽拂。 這種動(dòng)作(網(wǎng)頁瀏覽垒玲,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素唱歧。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決朦乏。 對(duì)于像Hadoop一樣的日志數(shù)據(jù)和離線分析系統(tǒng)球及,但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案呻疹。Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理吃引,也是為了通過集群來提供實(shí)時(shí)的消息。

2. 搭建環(huán)境

JDK 10+[圖片上傳中...(1.png-2f5f56-1625274266941-0)]

Zookeeper

Kafka 2.x

.NET 5

Visual Studio 2019

Confluent.Kafka

3. Kafka集群

集群的結(jié)構(gòu)

如下圖:


1.png
  • Zookeeper :對(duì)kafka選舉做集群的節(jié)點(diǎn)的選舉刽锤,可以看作是個(gè)數(shù)據(jù)庫镊尺,可做分布式鎖,可實(shí)現(xiàn)強(qiáng)一致性并思。

以下是選舉流程

2.png

kafka每個(gè)節(jié)點(diǎn)服務(wù)運(yùn)行后庐氮,首先向zk中注冊(cè) watcher ,注冊(cè)成功后watcher就與該節(jié)點(diǎn)之間產(chǎn)生心跳宋彼,運(yùn)行一段時(shí)間后當(dāng)主節(jié)點(diǎn)宕機(jī)后弄砍,其對(duì)應(yīng)的zk中的節(jié)點(diǎn)注冊(cè)也會(huì)消失。同時(shí)激活watcher宙暇,讀取剩下的所有節(jié)點(diǎn)確定宕機(jī)的節(jié)點(diǎn)的分區(qū)與消費(fèi)信息输枯,然后從剩下的節(jié)點(diǎn)中做選舉。其中選舉有三種情況:

  1. 正常選舉:剩下的節(jié)點(diǎn)向ZK 發(fā)送指令LeadersandISR占贫,寫的快的節(jié)點(diǎn)為新的leader桃熄。

  2. 剩下一個(gè)節(jié)點(diǎn) ,這個(gè)節(jié)點(diǎn)直接成為leader。

  3. 所有節(jié)點(diǎn)都宕機(jī)瞳收,等待其中一個(gè)恢復(fù)中碉京。

4. 相關(guān)術(shù)語

  • Producer(生產(chǎn)者):生產(chǎn)數(shù)據(jù)對(duì)應(yīng)客戶端。
  • Consumer(消費(fèi)者):負(fù)責(zé)處理kafka服務(wù)里面消息螟深。
  • Consumer Group/Consumers(消費(fèi)者組):就是kafka獨(dú)特處理輪詢還是廣播谐宙。
    輪詢:消費(fèi)者每一個(gè)處理一條。
    廣播:一條信息界弧,多個(gè)消費(fèi)者同時(shí)處理凡蜻。
  • Broker(服務(wù)):就是kafka服務(wù),一個(gè)Broker可以創(chuàng)建多個(gè)topic垢箕。
  • Topic(主題):寫入broker的主題划栓,一個(gè)kafka集群里面可以有多個(gè)Topic,為了區(qū)分業(yè)務(wù)和模塊使用。
  • Partition(分區(qū)):把一個(gè)topic的信息分成幾個(gè)區(qū)条获,利用多個(gè)節(jié)點(diǎn)把多個(gè)分區(qū)忠荞,放在不同節(jié)點(diǎn)上面,實(shí)現(xiàn)負(fù)載均衡帅掘,kafka內(nèi)部實(shí)現(xiàn)的委煤。
  • Replica(副本):防止主節(jié)點(diǎn)宕機(jī)數(shù)據(jù)丟失了,保證高可用修档。
  • Offset(偏移量):就是消息的主鍵碧绞,生產(chǎn)者寫入數(shù)據(jù)后返回的偏移量,消費(fèi)者消費(fèi)數(shù)據(jù)知道數(shù)據(jù)消費(fèi)的位置萍悴,防止重復(fù)消費(fèi)头遭。

5. Kafka集群消息管理

以下是示意圖:


3.png
  1. 生產(chǎn)者生成的消息對(duì)Key做Hash 后做相應(yīng)的規(guī)則區(qū)分放到 分區(qū)0/1 中的Leader中,Leader 會(huì)內(nèi)部把數(shù)據(jù)備份到其他broker的備份中癣诱,這樣的交叉?zhèn)浞莸暮锰幘褪钱?dāng)其中一個(gè)broker 宕機(jī)后计维,不會(huì)導(dǎo)致數(shù)據(jù)的丟失。

6. Kafka的搭建

6.1 單機(jī)版(Docker 搭建)

搭建使用docker-compose.yml

version: '2'

services:
  zoo1:
    image: wurstmeister/zookeeper
    restart: unless-stopped
    hostname: zoo1
    ports:
      - "6181:2181"
    container_name: zookeeper_kafka

  # kafka version: 1.1.0
  # scala version: 2.12
  kafka1:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 81.70.91.63
      # zoo1:2181 也可以改成:81.70.91.63:6181
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
    depends_on:
      - zoo1
    container_name: kafka

運(yùn)行:

到docker-compose.yml 所在的目錄下運(yùn)行:docker-compose up -d

6.2 集群版(Docker 搭建)

首先會(huì)對(duì)zookeeper 做集群

以下是 zk.yml 文件

version: '3.4'

services:
  zoo1:
    image: zookeeper:3.4
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
    - 2184:2181
    volumes:
    - "/szw/volume/zkcluster/zoo1/data:/data"
    - "/szw/volume/zkcluster/zoo1/datalog:/datalog"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    networks:
      kafka:
        ipv4_address: 172.19.0.11

  zoo2:
    image: zookeeper:3.4
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
    - 2185:2181
    volumes:
    - "/szw/volume/zkcluster/zoo2/data:/data"
    - "/szw/volume/zkcluster/zoo2/datalog:/datalog"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
    networks:
      kafka:
        ipv4_address: 172.19.0.12

  zoo3:
    image: zookeeper:3.4
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
    - 2186:2181
    volumes:
    - "/szw/volume/zkcluster/zoo3/data:/data"
    - "/szw/volume/zkcluster/zoo3/datalog:/datalog"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
    networks:
      kafka:
        ipv4_address: 172.19.0.13

networks:
  kafka:
    external:
      name: kafka

Kafka 做集群

以下是:kafka.yml 文件

version: '3.4'

services:
  kafka1:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka1
    container_name: kafka1
    privileged: true
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /szw/volume/kfkluster/kafka1/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 172.19.0.14

  kafka2:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka2
    container_name: kafka2
    privileged: true
    ports:
    - 9093:9093
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_LISTENERS: PLAINTEXT://kafka2:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9093
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /szw/volume/kfkluster/kafka2/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 172.19.0.15

  kafka3:
    image: wurstmeister/kafka
    restart: always
    hostname: kafka3
    container_name: kafka3
    privileged: true
    ports:
    - 9094:9094
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_LISTENERS: PLAINTEXT://kafka3:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://81.70.91.63:9094
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
    volumes:
    - /szw/volume/kfkluster/kafka3/logs:/kafka
    external_links:
    - zoo1
    - zoo2
    - zoo3
    networks:
      kafka:
        ipv4_address: 172.19.0.16

networks:
  kafka:
    external:
      name: kafka

界面管理工具:用來監(jiān)控集群管理

以下是:kafkamanage.yml 文件

version: "3.4"

services:
  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    restart: always
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
     - 9000:9000
    environment:
     ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
     KAFKA_BROKERS: kafka1:9092,kafka2:9092,kafka3:9092
     APPLICATION_SECRET: letmein
     KM_ARGS: -Djava.net.preferIPv4Stack=true
    networks:
     kafka:
      ipv4_address: 172.19.0.17
networks:
  kafka:
    external:
      name: kafka

啟動(dòng)

#docker-compose 默認(rèn)會(huì)創(chuàng)建網(wǎng)絡(luò)撕予,不需要手動(dòng)執(zhí)行
#如果執(zhí)行錯(cuò)誤,則需要?jiǎng)h除其他的network
#docker network ls
#查看詳細(xì)的 network
#docker network inspect name/id
#
docker network create --driver bridge --subnet 172.19.0.0/16 --gateway 172.19.0.1 kafka


docker-compose -f zk.yml -f kafka.yml -f kafkamanage.yml up -d

安裝成功:
訪問 kafkamanage 管理工具的地址:(http://81.70.91.63:9000/)

4.png

注意:添加cluster 和 Zookeeper host 注意默認(rèn)數(shù)值提示鲫惶,保存。

添加成功后:


5.png

7. 代碼對(duì)接

7.1 生產(chǎn)者

Nuget:安裝Confluent.Kafka包

對(duì)接代碼:

public static async Task Produce(string brokerlist, string topicname, string content)
{
    string brokerList = brokerlist;
    string topicName = topicname;
    var config = new ProducerConfig
    {
        BootstrapServers = brokerList,
        Acks = Acks.All,
        // 冪等性实抡,保證不會(huì)丟數(shù)據(jù)欠母。
        EnableIdempotence = true,
        //信息發(fā)送完,多久數(shù)據(jù)發(fā)送到broker里面吆寨。
        LingerMs = 10000, 
        BatchNumMessages = 2,//字節(jié)數(shù)
        // 只要上面的兩個(gè)要求符合一個(gè)赏淌,則后臺(tái)線程立刻馬上把數(shù)據(jù)推送給broker
        // 可以看到發(fā)送的偏移量,如果沒有偏移量啄清,則就是沒有寫成功
        MessageSendMaxRetries = 3,//補(bǔ)償重試六水,發(fā)送失敗了則重試 
        //  Partitioner = Partitioner.Random
    };

    using (var producer = new ProducerBuilder<string, string>(config).Build())
    {
        try
        {
            var deliveryReport = await producer.
                ProduceAsync(
                topicName, new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
            Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");

        }
        catch (ProduceException<string, string> e)
        {
            Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
        }
    }
}

注意:

  1. 生產(chǎn)端寫ack ,消費(fèi)端不需要。
  2. ACK 保證數(shù)據(jù)不丟失但是會(huì)影響到我們性能掷贾。越高級(jí)別數(shù)據(jù)不丟失睛榄,則寫入的性能越差。
  3. 建議使用異步想帅,性能比較好场靴。
  4. ProduceAsync 中的Key:Key 注意是做負(fù)載均衡,比如港准,有三個(gè)節(jié)點(diǎn)旨剥,一個(gè)topic,創(chuàng)建了三個(gè)分區(qū)叉趣。一個(gè)節(jié)點(diǎn)一個(gè)分區(qū)泞边,如果寫入的數(shù)據(jù)的時(shí)候该押,沒有寫key,會(huì)導(dǎo)致疗杉,所有的數(shù)據(jù)存放到一個(gè)分區(qū)上面。如果用了分區(qū)蚕礼,必須要寫key .根據(jù)自己的業(yè)務(wù)烟具,可以提前配置好。key的隨機(jī)數(shù)奠蹬,可以根據(jù)業(yè)務(wù)朝聋,搞一個(gè)權(quán)重,如果節(jié)點(diǎn)的資源不一樣囤躁,合理利用資源冀痕。
  5. 數(shù)據(jù)寫入如果默認(rèn)一個(gè)分區(qū),則是有順序狸演,如果是多個(gè)分區(qū)言蛇,則不能保證數(shù)據(jù)的順序。

8. 數(shù)據(jù)的可靠性

數(shù)據(jù)的可靠性就是保證數(shù)據(jù)能寫入Broker宵距,并且在Broker宕機(jī)后重新選舉出來的Leader也不會(huì)導(dǎo)致數(shù)據(jù)的丟失腊尚,這樣就關(guān)系到了ACK的返回機(jī)制。

以下是Kafka的數(shù)據(jù)寫入流程满哪。


6.png

對(duì)于ACK的返回策略有兩種:1. 半數(shù)以上的Follower完成同步返回ACK 婿斥,2. 全部的Follower完成同步返回ACK。

以下是兩種ACK的優(yōu)缺點(diǎn)對(duì)比

方案 優(yōu)點(diǎn) 缺點(diǎn)
半數(shù)以上完成同步發(fā)送ACK 延遲低 選舉Leader哨鸭,容忍N(yùn)臺(tái)節(jié)點(diǎn)故障民宿,需要2N+1個(gè)副本(數(shù)據(jù)的大量冗余)
全部完成發(fā)送ACK 選舉Leader,容忍N(yùn)臺(tái)節(jié)點(diǎn)故障像鸡,需要N+1個(gè)副本 延遲高

最后Kafka選用了方案2 活鹰,全部完成發(fā)送ACK。

9. ISR(同步副本集)

ISR: 同步的副本集合,維護(hù)當(dāng)前還在存活辦事的副本华望,由于數(shù)據(jù)可靠性的選擇蕊蝗,解決防止當(dāng)一個(gè)副本出現(xiàn)問題時(shí)候,不能正常的返回ack赖舟。

維護(hù)ISR的原理:維護(hù)是由ZK 完成的蓬戚,一般的判斷斷線是:心跳與數(shù)據(jù)備份量。

如果是根據(jù)數(shù)據(jù)的備份量:leader和副本數(shù)據(jù)相差一定條數(shù)宾抓,則就認(rèn)為副本節(jié)點(diǎn)斷開子漩,然后從isr移除,當(dāng)數(shù)據(jù)備份跟的上來石洗,然后又重新加入到ISR集合幢泼。

心跳:一定的時(shí)間沒有進(jìn)行心跳。 超過配置時(shí)間讲衫,則認(rèn)為斷開連接缕棵,從ISR中移除當(dāng)心跳跟的上,在進(jìn)入ISR集合涉兽。

kafka是個(gè)高吞吐的消息隊(duì)列招驴, 發(fā)送數(shù)據(jù)的時(shí)候,有批量發(fā)送的功能枷畏,每次發(fā)數(shù)據(jù)的可以發(fā)送大量的數(shù)據(jù)别厘,這個(gè)是可配的。 所以如果根據(jù)條數(shù)拥诡,則副本節(jié)點(diǎn)會(huì)經(jīng)常性的從ISR移除和加入触趴。 因?yàn)檫@種考慮,kafka的開發(fā)者渴肉,選擇使用根據(jù)時(shí)間來判斷冗懦。

其中控制批量發(fā)送的條數(shù)就是: BatchNumMessages(可通過生產(chǎn)者代碼對(duì)接中找到):當(dāng)內(nèi)存的數(shù)據(jù)條數(shù)達(dá)到了,立刻馬上發(fā)送到Broker

10. ACKS

系統(tǒng)提供的ACK 設(shè)置有三種:-1宾娜,0批狐, 1。默認(rèn)設(shè)置是0前塔。

10.1 ACKS為0

Broker接收到數(shù)據(jù)立刻返回到生產(chǎn)者ACK嚣艇,并且在數(shù)據(jù)做持久化之前。如下圖:


7.png

優(yōu)點(diǎn):性能最高华弓。

缺點(diǎn):丟失數(shù)據(jù)概率也最大食零。當(dāng)Leader接收到數(shù)據(jù),但是還沒有持久化時(shí)宕機(jī)寂屏,會(huì)導(dǎo)致數(shù)據(jù)的丟失贰谣。

使用場景:日志系統(tǒng)娜搂,IOT設(shè)備狀態(tài)信息上傳。

10.2 ACKS為1

Broker接收到數(shù)據(jù)并且做完持久化落盤后返回到生產(chǎn)者ACK吱抚。如下圖:


8.png

優(yōu)點(diǎn):性能中等百宇。

缺點(diǎn):有丟失數(shù)據(jù)概率也最大。當(dāng)Leader接收到數(shù)據(jù)秘豹,携御。持久化后,沒有做Follower Leader宕機(jī)既绕。新的Follower 沒有更新下Leader最新數(shù)據(jù)啄刹,然后選舉成了Leader。只有通過人工介入找回?cái)?shù)據(jù)凄贩。

使用場景:暫沒發(fā)現(xiàn)誓军。

10.3 ACKS為-1

Leader和所有的Follower全部落盤成功后返回ACK。如下圖:


9.png

優(yōu)點(diǎn):數(shù)據(jù)不會(huì)丟失疲扎。

缺點(diǎn):

  1. 導(dǎo)致新的問題爆出昵时,冪等性問題。導(dǎo)致冪等性問題的原因?yàn)椋寒?dāng)數(shù)據(jù)都備份完成评肆,要返回ACK時(shí)候leader宕機(jī)债查,新的副本替代成為Leader,生產(chǎn)者因?yàn)闆]有收到ACK瓜挽,所以補(bǔ)償重試,再次發(fā)送信息導(dǎo)致數(shù)據(jù)重復(fù)征绸。
  2. 性能比較低下久橙,原因?yàn)椋?. 數(shù)據(jù)的備份。2. 如果加入冪等性管怠,服務(wù)端會(huì)驗(yàn)證數(shù)據(jù)的唯一性淆衷。
  3. 使用場景:使用不多。

11. 冪等性

var config = new ProducerConfig
{
BootstrapServers = brokerList,
Acks = Acks.All,
//開啟冪等性的配置
EnableIdempotence = true,
}

開啟冪等性:發(fā)送的數(shù)據(jù)的時(shí)候渤弛,服務(wù)端會(huì)驗(yàn)證信息的唯一性祝拯,如果之前發(fā)送過,就不在接受她肯,然后只會(huì)保留一條相同的信息佳头。

冪等性是由:消息ID,客戶端ID晴氨,分區(qū)Key 共同的組成康嘉。

12. 生產(chǎn)者事務(wù)

如果發(fā)送的消息的Topic,是在多個(gè)分區(qū)籽前,需要用事務(wù)的模式來保證多個(gè)分區(qū)的冪等性亭珍。
示例如下:

string brokerList = "192.168.1.2:9092,192.168.1.3:9093,192.168.1.4:9094";
// 不同的topic的testtransactionalId就不同
string topicName = "test";
// 不一樣的topic敷钾,transactionalId就寫的不一樣。肄梨。
string transactionalId = "transtest1";
var config = new ProducerConfig
{
    BootstrapServers = brokerList,
    EnableIdempotence = true,
    Acks = Acks.All,
    TransactionalId = transactionalId,
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
    try
    {
        //初始化事務(wù)
        producer.InitTransactions(DefaultTimeout);
        var currentState = ProducerState.InitState;
        producer.BeginTransaction();
        for (int i = 100; i < 110; i++)
        {
            var content = i.ToString();
            producer.Produce(
                topicName, new Message<string, string> { Key = content, Value = content });
        }
        //提交
        producer.CommitTransaction(DefaultTimeout);
    }
    catch (Exception ex)
    {
        //回滾 
        producer.AbortTransaction(DefaultTimeout);
        Console.WriteLine(ex.Message);
    }
}

如果:寫入數(shù)據(jù)有一個(gè) 節(jié)點(diǎn)的Leader 失敗阻荒,就會(huì)自動(dòng)的通知其他的Leader 做書的回滾。

13. 高效的原理

13.1 批量發(fā)送

  1. 生產(chǎn)者發(fā)送數(shù)據(jù)是批量的

    生產(chǎn)者在進(jìn)行數(shù)據(jù)的寫入的時(shí)候众羡,會(huì)有兩個(gè)線程在維護(hù)财松,a.寫數(shù)據(jù)的線程,b.后臺(tái)線程纱控。a.線程負(fù)責(zé)把數(shù)據(jù)寫入到生產(chǎn)者維護(hù)的緩存區(qū)中辆毡。b.線程負(fù)責(zé)把緩存區(qū)的數(shù)據(jù)寫入到Kafka 中。

    10.png
  2. 而控制b線程進(jìn)入Kafka的變量就是:
    LingerMs = 10000, //時(shí)間毫秒為單位
    BatchNumMessages = 2,//字節(jié)數(shù)
    以上條件有一個(gè)就滿足則立馬寫入Kafka節(jié)點(diǎn)中甜害。

  3. 消費(fèi)端消費(fèi)數(shù)據(jù)的時(shí)候是批量的

13.2 順序讀寫

大數(shù)據(jù)處理一般都是做的順序讀寫:

增:順序?qū)懭刖涂梢浴?/p>

修改:也是順序的寫入舶掖,后臺(tái)線程會(huì)去處理,合并修改的數(shù)據(jù)尔店。

刪除:也是順序的寫入眨攘,后臺(tái)線程會(huì)去處理,合并刪除的數(shù)據(jù)嚣州。

所以鲫售,這類數(shù)據(jù)處理適合處理大量寫入的數(shù)據(jù),少了修改和刪除的數(shù)據(jù)该肴,因?yàn)檫@樣會(huì)降低數(shù)據(jù)處理性能情竹,并且修改,刪除的數(shù)據(jù)處理也會(huì)有延時(shí)匀哄。

13.3 零拷貝

傳統(tǒng)的數(shù)據(jù)處理是:三次數(shù)據(jù)的拷貝:磁盤->內(nèi)核->用戶進(jìn)程->內(nèi)核 秦效,目的就是為了保證資源的安全性。以下是示意圖:


11.png

是由Linux 系統(tǒng)實(shí)現(xiàn)的一
種快捷的方式涎嚼。減少了內(nèi)核到用戶阱州,用戶到內(nèi)核的兩次拷貝。如下圖:


12.png

14. 消費(fèi)者

消費(fèi)者:有兩種方式法梯,一種是推送苔货,一種是拉去。

14.1 推送

kafka主動(dòng)去推數(shù)據(jù)立哑,如果遇到高并發(fā)的時(shí)候夜惭,可能消費(fèi)端還沒有把之前的數(shù)據(jù)處理完,然后強(qiáng)推了大量的數(shù)據(jù)過來刁憋,有可能造成我們消費(fèi)端的掛機(jī)

14.2 拉取

消費(fèi)端主動(dòng)的去拉取滥嘴,可能存在數(shù)據(jù)延遲消費(fèi),不會(huì)造成我們消費(fèi)端的宕機(jī)至耻,同樣的存在一個(gè)微循環(huán)若皱,不停的拉取數(shù)據(jù)镊叁。

總結(jié):kafka 選取的是拉取的模式去消費(fèi)數(shù)據(jù)。對(duì)比同樣的MQ 走触,rabbitmq 則晦譬,既可以使用拉取,也可以使用推送(推送的時(shí)候可以設(shè)置限流的方式)的方式去消費(fèi)互广。

14.3 消費(fèi)的偏移量Offset

0.9版本之前的編譯量是由Zookeeper保存的維護(hù)的敛腌。

0.9版本之后是由自己維護(hù)(topic: __consumer_offsets)的。

14.4 代碼的對(duì)接

public static void Run_Consume(string brokerList, List<string> topics, string group)
{
    var config = new ConsumerConfig
    {
        BootstrapServers = brokerList,
        GroupId = group,
        // 有些屬性可以寫惫皱,但是沒有用到
        //Acks = Acks.All,
        //消費(fèi)方式自動(dòng)提交
        EnableAutoCommit = false,
        //消費(fèi)模式 
        AutoOffsetReset = AutoOffsetReset.Earliest,
        //EnablePartitionEof = true,
        //PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range,
        //FetchMaxBytes =,
        //FetchWaitMaxMs=1,   
        //代表數(shù)據(jù)超過了6000沒有處理完業(yè)務(wù)像樊,則把數(shù)據(jù)給其他消費(fèi)端
        //一定要注意。SessionTimeoutMs值一定要小于MaxPollIntervalMs
        SessionTimeoutMs = 6000,
        MaxPollIntervalMs = 10000,
    };

    const int commitPeriod = 1;
    //提交偏移量的時(shí)候,也可以批量去提交
    using (var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")).SetPartitionsAssignedHandler((c, partitions) =>
      {
          //自定義存儲(chǔ)偏移量
          //1.每次消費(fèi)完成旅敷,把相應(yīng)的分區(qū)id和offset寫入到mysql數(shù)據(jù)庫存儲(chǔ)
          //2.從指定分區(qū)和偏移量開始拉取數(shù)據(jù)
          //分配的時(shí)候調(diào)用
          Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
          #region 指定分區(qū)消費(fèi)
              // 之前可以自動(dòng)均衡,現(xiàn)在不可以了 
              //List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
              //// 我當(dāng)前讀取所有的分區(qū)里面的從10開始
              //foreach (var item in partitions)
              //{
              //    topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
              //}
              //return topics;
           #endregion
              }) .SetPartitionsRevokedHandler((c, partitions) =>
                {
                    //新加入消費(fèi)者的時(shí)候調(diào)用 
                    Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
                 }).Build())
        {
        //消費(fèi)者會(huì)影響在平衡分區(qū)生棍,當(dāng)同一個(gè)組新加入消費(fèi)者時(shí),分區(qū)會(huì)在分配
        consumer.Subscribe(topics);
        try
        {
            // 死循環(huán) 拉取模式
            while (true)
            {

                try
                {
                    var consumeResult = consumer.Consume();

                    if (consumeResult.IsPartitionEOF)
                    {
                        continue;
                    }
                    Console.WriteLine($": {consumeResult.TopicPartitionOffset}::{consumeResult.Message.Value}");

                    if (consumeResult.Offset % commitPeriod == 0)
                    {
                        try
                        {
                            //提交偏移量媳谁,數(shù)據(jù)自己已經(jīng)處理完成了
                            consumer.Commit(consumeResult);
                        }
                        catch (KafkaException e)
                        {
                            Console.WriteLine($"Commit error: {e.Error.Reason}");
                        }
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Consume error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Closing consumer.");
            consumer.Close();
        }
    }
}
//調(diào)用方式
Consumer("192.168.1.10:9092,192.168.1.11:9093,192.168.1.12:9094", "test", "groupname");
  1. 自動(dòng)ACK:Acks = Acks.All 標(biāo)記是無效的
  2. 消費(fèi)提交:EnableAutoCommit = false涂滴,自動(dòng)提交服務(wù)端數(shù)據(jù)已經(jīng)消費(fèi),服務(wù)端標(biāo)記本數(shù)據(jù)為晴音。一般設(shè)置為:false柔纵。如果設(shè)置成True自動(dòng)提交,在接收到數(shù)據(jù)后處理過程出現(xiàn)異常锤躁,會(huì)導(dǎo)致無法重復(fù)消費(fèi)這個(gè)數(shù)據(jù)搁料,丟失。如設(shè)置成false进苍,可能導(dǎo)致數(shù)據(jù)的重復(fù)消費(fèi)加缘,比如手動(dòng)提交時(shí)候服務(wù)器斷開。重新連接后重復(fù)消費(fèi)觉啊,解決辦法就是:發(fā)送消息加上一個(gè)唯一ID,消費(fèi)了就加入到Redis 中沈贝,下次來了判斷數(shù)據(jù)是否消費(fèi)過杠人,沒消費(fèi)就重新消費(fèi)创夜,消費(fèi)了就提交诵冒。
  3. 消費(fèi)模式:AutoOffsetReset:AutoOffsetReset.Latest 即:0,表示消費(fèi)者消費(fèi)啟動(dòng)之后的數(shù)據(jù)丽声。啟動(dòng)之前的服務(wù)端還沒消費(fèi)的數(shù)據(jù)消費(fèi)不到蜓氨。AutoOffsetReset.Earliest = 1,每次都從頭開始消費(fèi)沒有消費(fèi)過的數(shù)據(jù)(推薦模式)赫舒。AutoOffsetReset.Error = 2.淋样。報(bào)錯(cuò)后無法消費(fèi)嘹悼。
  4. 消費(fèi)端:組和組之間是廣播模式排苍,組內(nèi)是根據(jù)分區(qū)數(shù)量枝笨。多個(gè)客戶端去消費(fèi)袁铐,如果是組相同則自動(dòng)做負(fù)載均衡揭蜒,開啟的消費(fèi)的相同組客戶端最大數(shù)量等于分區(qū)的數(shù)量,開啟多出來的客戶端消費(fèi)不到數(shù)據(jù)剔桨。重新連接新的組則從新的消費(fèi)該組沒有消費(fèi)的數(shù)據(jù)與別的組消費(fèi)數(shù)據(jù)無關(guān)屉更。 原則上是:topic的數(shù)量=broker的數(shù)量,broker數(shù)量=分區(qū)數(shù)量洒缀,分區(qū)數(shù)量=一個(gè)組內(nèi)的消費(fèi)者數(shù)量瑰谜。

15. 異常情況消費(fèi)

當(dāng)Leader offset 為8 宕機(jī) Follow1 offset 為7 Follow2 offset 為6 Follow2選擇為Leader 。消費(fèi)數(shù)據(jù)的時(shí)候是能從8 消費(fèi)的树绩,但是Follow2 最為主節(jié)點(diǎn)時(shí)候萨脑,沒有8 。這種情況Kafka是這樣處理的饺饭。

Kafka使用的 LEO和HW的機(jī)制去處理的渤早。

LEO:指的是每個(gè)副本最大的 offset。

HW:指的是消費(fèi)者能見到的最大的 offset砰奕,ISR隊(duì)列中最小的 LEO蛛芥。

這樣就保證了消費(fèi)者看到的是全部備份完的的偏移量了。

16. 高級(jí)消費(fèi)

如果當(dāng)前超過時(shí)間沒有消費(fèi)完成军援,則返回給另一個(gè)分區(qū)去消費(fèi)仅淑,以下是設(shè)置參數(shù)。

SessionTimeoutMs = 6000,

MaxPollIntervalMs = 10000,

注意: MaxPollIntervalMs的值必須大于等于SessionTimeoutMs

上面類似心跳胸哥,如果消費(fèi)水平太慢涯竟,則會(huì)引起重新分配

17. 文件儲(chǔ)存機(jī)制

文件的架構(gòu):通過小文件的不斷合并最后轉(zhuǎn)成了一個(gè)大文件。結(jié)構(gòu)如圖:


13.png

保存文件的結(jié)構(gòu):

000000000000000000000000.log

000000000000000000000000.index

000000000000000000000700.log

000000000000000000000700.index

000000000000000000002000.log

000000000000000000002000.index

注意:log文件和index 文件是成對(duì)出現(xiàn)的空厌,000000000000000000000000.log 保存的是 0-699 的數(shù)據(jù)庐船。

數(shù)據(jù)的查找:通過文件的名字,使用的二分發(fā)做的查找嘲更。如下圖:


14.png

注意:除了爆露出來的偏移量之外筐钟,Kafka 內(nèi)部還key值對(duì)應(yīng)與log文件中。

18. 日志的壓縮策略

kafka定期將相同key的消息進(jìn)行合并赋朦,只保留最新的value值篓冲。

保存的每一條數(shù)據(jù),會(huì)記錄是增加宠哄,刪除壹将,還是修改。

19. 自定義存儲(chǔ)

自定義存儲(chǔ)(解決重復(fù)消費(fèi))

  1. 每次消費(fèi)完成毛嫉,把相應(yīng)的分區(qū)和offset寫入到mysql數(shù)據(jù)庫诽俯。
  2. 從指定分區(qū)和偏移量開始拉取數(shù)據(jù)。

.SetPartitionsAssignedHandler((c, partitions) =>

{ //獲取mysql存儲(chǔ)結(jié)果承粤,從當(dāng)前開始獲取

foreach (var item in partitions)

{
topics.Add(new TopicPartitionOffset(item.Topic, item.Partition, new Offset(10)));
}
})
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList());

20.注意:

  1. 消費(fèi)者 一般使用workservice暴区。
  2. 消費(fèi)端只關(guān)心topic和偏移量闯团,其余不關(guān)心。
  3. 保留7天颜启,kafka可以配置偷俭。默認(rèn)7天,消息積壓有處理缰盏。
  4. 消費(fèi)數(shù)據(jù)先消費(fèi)Leader 的涌萤。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市口猜,隨后出現(xiàn)的幾起案子负溪,更是在濱河造成了極大的恐慌,老刑警劉巖济炎,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件川抡,死亡現(xiàn)場離奇詭異,居然都是意外死亡须尚,警方通過查閱死者的電腦和手機(jī)崖堤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耐床,“玉大人密幔,你說我怎么就攤上這事×煤洌” “怎么了胯甩?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長堪嫂。 經(jīng)常有香客問我偎箫,道長,這世上最難降的妖魔是什么皆串? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任淹办,我火速辦了婚禮,結(jié)果婚禮上恶复,老公的妹妹穿的比我還像新娘娇唯。我一直安慰自己,他們只是感情好寂玲,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著梗摇,像睡著了一般拓哟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伶授,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天断序,我揣著相機(jī)與錄音流纹,去河邊找鬼。 笑死违诗,一個(gè)胖子當(dāng)著我的面吹牛漱凝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播诸迟,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼茸炒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了阵苇?” 一聲冷哼從身側(cè)響起壁公,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绅项,沒想到半個(gè)月后紊册,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡快耿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年囊陡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片掀亥。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撞反,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出铺浇,到底是詐尸還是另有隱情痢畜,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布鳍侣,位于F島的核電站丁稀,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏倚聚。R本人自食惡果不足惜线衫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惑折。 院中可真熱鬧授账,春花似錦、人聲如沸惨驶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粗卜。三九已至屋确,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背攻臀。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國打工焕数, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人刨啸。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓堡赔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親设联。 傳聞我的和親對(duì)象是個(gè)殘疾皇子善已,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 來源:https://www.cnblogs.com/bainianminguo/p/12247158.html[...
    夜空_2cd3閱讀 1,200評(píng)論 0 6
  • [TOC]Kafka 中采用了多副本的機(jī)制粘招,這是大多數(shù)分布式系統(tǒng)中慣用的手法啥寇,以此來實(shí)現(xiàn)水平擴(kuò) 展、提供容災(zāi)能力洒扎、...
    tracy_668閱讀 876評(píng)論 0 2
  • 1. Kafka工作流程及文件存儲(chǔ)機(jī)制 Kafka中消息是以topic進(jìn)行分類的辑甜,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息袍冷,...
    做個(gè)合格的大廠程序員閱讀 268評(píng)論 0 0
  • 表情是什么磷醋,我認(rèn)為表情就是表現(xiàn)出來的情緒。表情可以傳達(dá)很多信息胡诗。高興了當(dāng)然就笑了邓线,難過就哭了。兩者是相互影響密不可...
    Persistenc_6aea閱讀 124,190評(píng)論 2 7
  • 16宿命:用概率思維提高你的勝算 以前的我是風(fēng)險(xiǎn)厭惡者煌恢,不喜歡去冒險(xiǎn)骇陈,但是人生放棄了冒險(xiǎn),也就放棄了無數(shù)的可能瑰抵。 ...
    yichen大刀閱讀 6,033評(píng)論 0 4