Apache Kafka詳解

一雹姊、消息隊(duì)列Message Queue

image.png

兩種模式

  1. 點(diǎn)對(duì)點(diǎn)模式
    點(diǎn)對(duì)點(diǎn)模式是一個(gè)基于拉取或輪詢的消息傳送模型,由消費(fèi)者主動(dòng)拉取數(shù)據(jù)果元,客戶端需要實(shí)時(shí)開啟一個(gè)線程監(jiān)控隊(duì)列中是否有數(shù)據(jù)。
  2. 發(fā)布/訂閱模式
    發(fā)布/訂閱模式是一個(gè)基于推送的消息傳送模型,由MQ主動(dòng)推送消息給所有訂閱者贱鄙,即使當(dāng)前訂閱者不可用。

優(yōu)點(diǎn)

  • 解耦
    允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程姨谷,只要確保它們遵循同樣的接口約束逗宁。
  • 冗余
    MQ把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)梦湘。
  • 擴(kuò)展
    因?yàn)镸Q解耦了你的處理過程瞎颗,所以增大消息入隊(duì)和處理的頻率是很容易的件甥,只要另外增加處理過程即可,提高了靈活性和峰值處理能力哼拔。
  • 可恢復(fù)性
    消息隊(duì)列降低了進(jìn)程間的耦合度引有,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理倦逐。
  • 順序保證
    隊(duì)列本身就遵循FIFO的原則轿曙,保證了數(shù)據(jù)的處理順序。
  • 緩沖
    有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度僻孝,解決生產(chǎn)者和消費(fèi)者處理速度不一致的問題导帝。
  • 異步通信
    MQ提供了異步處理機(jī)制,允許用戶把消息放入隊(duì)列但不立即處理它穿铆,直到需要時(shí)再去處理您单。

二、Kafka簡(jiǎn)介

  1. Apache Kafka是一個(gè)開源消息系統(tǒng)荞雏,由Scala寫成虐秦。是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源消息系統(tǒng)項(xiàng)目。
  2. Kafka最初是由LinkedIn公司開發(fā)凤优,并于2011年初開源悦陋。2012年10月從Apache Incubator畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一筑辨、高通量俺驶、低等待的平臺(tái)。
  3. Kafka是一個(gè)Distributed Message Queue棍辕。Kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類暮现,發(fā)送消息者稱為Producer,消息接收者稱為Consumer楚昭,此外Kafka集群有多個(gè)Kafka實(shí)例組成栖袋,每個(gè)實(shí)例稱為Broker。
  4. Kafka需要依賴于Zookeeper保存一些Meta信息抚太,來保證系統(tǒng)可用性塘幅。
  5. 在流式計(jì)算中,Kafka一般用來緩存數(shù)據(jù)尿贫,Storm通過消費(fèi)Kafka的數(shù)據(jù)進(jìn)行計(jì)算电媳。

三、Kafka架構(gòu)

image.png
No. 組件 說明
1 Producer 消息生產(chǎn)者帅霜,向Kafka中發(fā)消息的客戶端匆背。
2 Consumer 消息消費(fèi)者,從Kafka中取消息的客戶端身冀。
3 Broker 一臺(tái)Kafka服務(wù)器就是一個(gè)Broker钝尸,一個(gè)集群由多個(gè)Broker組成,一個(gè)Broker可以容納多個(gè)Topic搂根。
4 Topic 屬于特定類別的消息流稱為Topic珍促, 數(shù)據(jù)存儲(chǔ)在Topic中,可以理解為一個(gè)隊(duì)列剩愧。
5 Partition 為了實(shí)現(xiàn)負(fù)載均衡和高并發(fā)猪叙,一個(gè)非常大的Topic可以通過分為多個(gè)Partition分布到多個(gè)Broker上,每個(gè)Partition都是一個(gè)有序的隊(duì)列仁卷。
6 Consumer Group(CG) 這是Kafka用來實(shí)現(xiàn)一個(gè)消息的廣播(發(fā)給所有Consumer)和單播(發(fā)給任意一個(gè)Consumer)的手段穴翩。一個(gè)Topic可以有多個(gè)CG,Topic中的消息會(huì)復(fù)制(不是真的復(fù)制锦积,是概念上的)到所有的CG芒帕,但每個(gè)Partition只會(huì)把消息發(fā)給該CG中的一個(gè)Consumer。如果需要實(shí)現(xiàn)廣播丰介,只要每個(gè)Consumer都有一個(gè)獨(dú)立的CG即可背蟆,需要實(shí)現(xiàn)單播,只要所有Consumer都在同一個(gè)CG即可哮幢。用CG還可以將Consumer進(jìn)行自由分組而不需要重復(fù)發(fā)送消息到不同的Topic带膀。
7 Offset Partition中的每條消息都會(huì)被分配一個(gè)有序的id(Offset)。Kafka只保證每個(gè)Partition中的順序橙垢,不保證多個(gè)Partition的順序垛叨。
8 Leader 負(fù)責(zé)給定Partition的所有讀取和寫入的節(jié)點(diǎn)。
9 Follower 跟隨Leader指令的節(jié)點(diǎn)被稱為Follower柜某。 如果Leader節(jié)點(diǎn)宕機(jī)点额,其中一個(gè)Follower將通過選舉自動(dòng)成為新的Leader。

四莺琳、Kafka工作流程

Producer生產(chǎn)過程

  1. 寫入方式
    Producer采用Push模式將消息發(fā)布到Broker还棱,每條消息都被追加(append)到Partition中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高惭等,保障Kafka吞吐率)珍手。
  2. 分區(qū)(Partition)
    消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄辞做,而topic是由一些Partition Logs(分區(qū)日志)組成琳要,其組織結(jié)構(gòu)如下圖所示:


    image.png

    image.png

    我們可以看到,每個(gè)Partition中的消息都是有序的秤茅,生產(chǎn)的消息被不斷追加到Partition log上稚补,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。

    1. 分區(qū)的原因
      1. 負(fù)載均衡框喳,方便在集群中擴(kuò)展课幕,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器厦坛,而一個(gè)Topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了乍惊。
      2. 可以提高并發(fā)杜秸,因?yàn)榭梢砸訮artition為單位讀寫了。
    2. 分區(qū)的原則
      1. 已指定Partition润绎,則直接使用該P(yáng)artition撬碟。
      2. 未指定Partition但指定了Key,則通過對(duì)Key進(jìn)行哈希計(jì)算得出一個(gè)Partition莉撇。
      3. Partition和Key都未指定呢蛤,則輪詢選出一個(gè)Partition。
  3. 副本(Replication)
    同一個(gè)Partition可能會(huì)有多個(gè)Replication(對(duì)應(yīng)server.properties配置中的default.replication.factor=N)棍郎。沒有Replication的情況下其障,一旦Broker宕機(jī),其上所有Partition的數(shù)據(jù)都不可被Consumer消費(fèi)坝撑,同時(shí)Producer也不能再將數(shù)據(jù)存于其上的Partition静秆。引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replication巡李,而這時(shí)需要在這些Replication之間選出一個(gè)Leader抚笔,Producer和Consumer只與這個(gè)Leader交互,其它Replication作為Follower從Leader中復(fù)制數(shù)據(jù)侨拦。
  4. 寫入流程


    image.png

Broker保存過程

  1. 存儲(chǔ)方式
    物理上把Topic分成一個(gè)或多個(gè)Partition(對(duì)應(yīng)server.properties配置中的num.partitions=3)殊橙,每個(gè)Partition物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該P(yáng)artition的所有消息和索引文件),如下:
[atguigu@hadoop102 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:37 first-0
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:35 first-1
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:37 first-2
[atguigu@hadoop102 logs]$ cd first-0
[atguigu@hadoop102 first-0]$ ll
-rw-rw-r–. 1 atguigu atguigu 10485760 8月   6 14:33 00000000000000000000.index
-rw-rw-r–. 1 atguigu atguigu      219 8月   6 15:07 00000000000000000000.log
-rw-rw-r–. 1 atguigu atguigu 10485756 8月   6 14:33 00000000000000000000.timeindex
-rw-rw-r–. 1 atguigu atguigu        8 8月   6 14:37 leader-epoch-checkpoint
  1. 存儲(chǔ)策略
    無論消息是否被消費(fèi)狱从,Kafka都會(huì)保留所有消息膨蛮。有兩種策略可以刪除舊數(shù)據(jù):

    1. 基于時(shí)間:log.retention.hours=168
    2. 基于大小:log.retention.bytes=1073741824

    需要注意的是季研,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1)敞葛,即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka性能無關(guān)与涡。

  2. Zookeeper存儲(chǔ)結(jié)構(gòu)


    image.png

    注意:只有Broker和Consumer在Zookeeper中注冊(cè)惹谐,Producer不在Zookeeper中注冊(cè)。

Consumer消費(fèi)過程

  1. 消費(fèi)者組


    image.png

    Consumer是以Consumer Group的方式工作驼卖,由一個(gè)或者多個(gè)Consumer組成一個(gè)Group氨肌,共同消費(fèi)一個(gè)Topic。每個(gè)Partition在同一時(shí)間只能由Group中的一個(gè)Consumer讀取酌畜,但是多個(gè)Group可以同時(shí)消費(fèi)同一個(gè)Partition怎囚。在圖中,有一個(gè)由三個(gè)Consumer組成的Group桥胞,有一個(gè)Consumer讀取Topic中的兩個(gè)Partition恳守,另外兩個(gè)Consumer分別讀取一個(gè)Partition考婴。某個(gè)Consumer讀取某個(gè)Partition,也可以叫做某個(gè)Consumer是某個(gè)Partition的擁有者井誉。
    在這種情況下蕉扮,Consumer可以通過水平擴(kuò)展的方式同時(shí)讀取大量的消息整胃。另外颗圣,如果一個(gè)Consumer失敗了,那么Group中其它的Consumer會(huì)自動(dòng)負(fù)載均衡讀取之前失敗的Consumer讀取的Partition屁使。

  2. 消費(fèi)方式
    Consumer采用Pull模式從Broker中讀取數(shù)據(jù)在岂。
    Push模式很難適應(yīng)消費(fèi)速率不同的Consumer,因?yàn)橄l(fā)送速率是由Broker決定的蛮寂。它的目標(biāo)是盡可能以最快速度傳遞消息蔽午,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞酬蹋。而Pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息及老。
    對(duì)于Kafka而言,Pull模式更合適范抓,它可簡(jiǎn)化Broker的設(shè)計(jì)骄恶,Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可以批量消費(fèi)也可以逐條消費(fèi)匕垫,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義僧鲁。
    Pull模式不足之處是,如果Kafka沒有數(shù)據(jù)象泵,消費(fèi)者可能會(huì)陷入循環(huán)中寞秃,一直等待數(shù)據(jù)到達(dá)。為了避免這種情況偶惠,可以Pull請(qǐng)求中設(shè)置參數(shù)春寿,允許Consumer請(qǐng)求在等待數(shù)據(jù)到達(dá)的“長(zhǎng)輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大泻瞿酢)绑改。

五、Kafka案例

生產(chǎn)者

package com.fas.kafka.service;

import org.apache.log4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import com.fas.kafka.util.KafkaUtil;

@Service
@Log4j
public class KafkaProducerService {
    private KafkaTemplate<String, String> kafkaTemplate = KafkaUtil.getKafkaTemplate();

    public void send(String topic, String data) {
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
        listenableFuture.addCallback(result -> log.info("Send data to kafka [success] - " + topic + " - " + data), result -> log.info("Send data to kafka [error] - " + topic + " - " + data));
    }
}

消費(fèi)者

package com.fas.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

import com.alibaba.fastjson.JSONObject;
import com.fas.kafka.entity.DeviceOnlineData;

import lombok.extern.log4j.Log4j;

@Log4j
public class KafkaConsumer implements MessageListener<String, String> {
    public static final BlockingQueue<DeviceOnlineData> DATA_ACCESS_BLOCKING_QUEUE = new LinkedBlockingQueue<>();
    public static final String KAFKA_DATA_ACCESS_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.data_access.topic");
    public static final String KAFKA_TEST_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.test.topic");

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        String topic = consumerRecord.topic();
        log.info("topic: " + topic);
        String value = consumerRecord.value();
        log.info("value: " + value);

        if (KAFKA_DATA_ACCESS_TOPIC.equals(topic)) {
            JSONObject.parseArray(value, DeviceOnlineData.class).stream().forEach(deviceOnlineData -> {
                try {
                    DATA_ACCESS_BLOCKING_QUEUE.put(deviceOnlineData);
                    log.info("Put into blocking queue");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        } else if (KAFKA_TEST_TOPIC.equals(topic)) {
            log.info("Test success");
        } else {
            log.info("Unknown topic");
        }
    }
}

六扒腕、常用命令

  1. 查看所有Topic
    ./bin/kafka-topics.sh --list --zookeeper zk01:2181
  2. 創(chuàng)建Topic
    ./bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
  3. 刪除Topic
    ./bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
    需要server.properties中設(shè)置delete.topic.enable=true绢淀,否則只是標(biāo)記刪除,或者直接重啟
  4. 通過Shell命令發(fā)送消息
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  5. 通過Shell消費(fèi)消息
    ./bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test
  6. 查看消費(fèi)位置
    ./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
  7. 查看某個(gè)Topic的詳情
    ./bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
  8. 對(duì)分區(qū)數(shù)進(jìn)行修改
    ./bin/kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瘾腰,一起剝皮案震驚了整個(gè)濱河市皆的,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蹋盆,老刑警劉巖费薄,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件硝全,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡楞抡,警方通過查閱死者的電腦和手機(jī)伟众,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來召廷,“玉大人凳厢,你說我怎么就攤上這事【郝” “怎么了先紫?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)筹煮。 經(jīng)常有香客問我遮精,道長(zhǎng),這世上最難降的妖魔是什么败潦? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任本冲,我火速辦了婚禮,結(jié)果婚禮上劫扒,老公的妹妹穿的比我還像新娘檬洞。我一直安慰自己,他們只是感情好粟关,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布疮胖。 她就那樣靜靜地躺著,像睡著了一般闷板。 火紅的嫁衣襯著肌膚如雪澎灸。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天遮晚,我揣著相機(jī)與錄音性昭,去河邊找鬼。 笑死县遣,一個(gè)胖子當(dāng)著我的面吹牛糜颠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播萧求,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼其兴,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了夸政?” 一聲冷哼從身側(cè)響起元旬,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后匀归,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坑资,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年穆端,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了袱贮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡体啰,死狀恐怖攒巍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情狡赐,我是刑警寧澤窑业,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布钦幔,位于F島的核電站枕屉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏鲤氢。R本人自食惡果不足惜搀擂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望卷玉。 院中可真熱鬧哨颂,春花似錦、人聲如沸相种。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寝并。三九已至箫措,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間衬潦,已是汗流浹背斤蔓。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留镀岛,地道東北人弦牡。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像漂羊,于是被迫代替她去往敵國(guó)和親驾锰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容