kafka基礎(chǔ)

1. kafka架構(gòu)圖

kafka架構(gòu)圖

2. 角色分析

1. Broker

kafka作為一個消息中間件限番,用于存儲和轉(zhuǎn)發(fā)消息肌索,可以把它想象成一個中介嵌溢,股票經(jīng)紀(jì)人就叫做broker。默認(rèn)端口是9092办斑,生產(chǎn)者和消費(fèi)者都需要跟這個Broker建立連接才可以實(shí)現(xiàn)消息的收發(fā)外恕。

2. 消息

客戶端之間傳輸?shù)臄?shù)據(jù)稱之為消息, 或者說是記錄(record)俄周。請記住吁讨,對于kafka來說,不管是消費(fèi)者還是生產(chǎn)者都是客戶端峦朗。 在客戶端的代碼中建丧,Record可以是一個key-value鍵值對,生產(chǎn)者對應(yīng)的封裝類是ProducerRecord波势, 消費(fèi)者對應(yīng)的封裝類是ConsumerRecord翎朱。消息在傳輸?shù)倪^程中需要序列化,所有需要我們在代碼中執(zhí)行序列化工具尺铣。消息在服務(wù)端中存儲的格式(RecordBatch和Record)拴曲。

3. 生產(chǎn)者

我們將發(fā)送消息的一方稱之為生產(chǎn)者,接收消息的乙方稱之為消費(fèi)者凛忿,為了提升消息發(fā)送的速率澈灼,生產(chǎn)者并不是組條發(fā)送消息到broker中,而是批量發(fā)送的店溢。多少條發(fā)送一次叁熔,由配置中的一個參數(shù)決定。

props.put("batch.size", 16384);

4. 消費(fèi)者

一般來說床牧,消費(fèi)者獲取消息存在兩種方式荣回,一種是pull, 一種是push。kafka采用的是pull模式戈咳。WHY?
\color{red}{在push模式下心软,如果消息產(chǎn)生的速度遠(yuǎn)大于消費(fèi)者消費(fèi)的速度壕吹,消費(fèi)者會不堪重負(fù),最終掛掉删铃。}
消費(fèi)者可以控制自己一次消費(fèi)多少條消息

max.poll.record=500    #默認(rèn)是500條

5. \color{red}{TOPIC}

生產(chǎn)者和消費(fèi)者之間每條消息之間是如何關(guān)聯(lián)起來的呢耳贬?也就是消費(fèi)者怎么就知道自己需要消費(fèi)什么消息?
隊(duì)列的存在就是解決這個問題的泳姐。在kafka里面這個隊(duì)列就是topic效拭,\color{red}{它是一個邏輯概念}暂吉。
生產(chǎn)者和Topic胖秒,Topic和消費(fèi)者的關(guān)系都是多對多(不建議這么做)。
當(dāng)生產(chǎn)者發(fā)送消息時慕的,沒有對應(yīng)的Topic阎肝,這個時候會自動創(chuàng)建Topic“菇郑可以通過參數(shù)控制

auto.enable.topics.enable=true   #默認(rèn)時true

6. partition和Cluster

分區(qū)其實(shí)是一種數(shù)據(jù)庫分片的思想风题。試想一下,如果一個topic中消息過多嫉父,會產(chǎn)生什么樣的問題沛硅。

  • 不方便橫向擴(kuò)展,通過擴(kuò)展機(jī)器而不是升級硬件擴(kuò)展绕辖。
  • 并發(fā)負(fù)載摇肌,所有的客戶端都操作同一個topic,在高并發(fā)的場景下仪际,性能瓶頸
    kafka分區(qū)概念---partition围小。一個topic可以劃分成多個分區(qū),分區(qū)在創(chuàng)建topic的時候指定树碱,每個topic至少有一個分區(qū)肯适。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xiong

如果沒有指定分區(qū)數(shù),默認(rèn)分區(qū)是1個成榜,可通過下述參數(shù)修改

num.partitions=1

partition負(fù)載的實(shí)現(xiàn)框舔。舉例說明,Topic有三個分區(qū)赎婚,生產(chǎn)者發(fā)送了9條消息刘绣,第一個分區(qū)存儲了1 4 7, 第二個分區(qū)存儲了2 5 8惑淳,第三個分區(qū)存儲了3 6 9额港。這種情況下其實(shí)就是負(fù)載的一種體現(xiàn)

每個partition都會有一個物理目錄。kafka的配置文件下可以配置日志的存儲路徑歧焦,默認(rèn)存儲在/tmp/kafka-logs下移斩,假設(shè)topic=xiongTopic, 每個分區(qū)的存儲目錄就是xiongTopic-0肚医、xiongTopic-1.....

\color{red}{與rabbitMq不同的是,Partition中的消息被讀取以后不會被刪除向瓷,kafka是通過一個類似游標(biāo)的東} \color{red}{西用來記錄當(dāng)前消息讀取的位置偏移量信息肠套。同時,同一批消息在一個partition里面是順序追加寫入的猖任。} \color{red}{這里也是kafka吞吐量大的一個重要原因.}

7. 副本機(jī)制

如果partition的數(shù)據(jù)只存儲了一份你稚,在發(fā)生網(wǎng)絡(luò)或者硬件故障的時候,該分區(qū)的數(shù)據(jù)會無法訪問或者無法恢復(fù)了朱躺。kafka在0.8版本之后增加了副本機(jī)制刁赖, 每個partiotion可以有若干個副本,\color{red}{副本必須在不同的broker上}长搀。一般我們說的副本包括其中的主節(jié)點(diǎn)宇弛。
由replication-factor指定一個Topic的副本數(shù):

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partition --topic testxiong

服務(wù)端有個參數(shù)控制默認(rèn)的副本數(shù)

offsets.topic.replication.factor=3
分區(qū)、副本

leader用粉紅色標(biāo)識源请,follower用綠色標(biāo)識枪芒,leader是由選舉得出。
生產(chǎn)者消費(fèi)者消息傳遞都是通過leader來操作谁尸,follower的數(shù)據(jù)是通過leader同步過來的舅踪。

8. Segment

kafka的數(shù)據(jù)是放在后綴為.log的文件中,試想一下良蛮,kafka的數(shù)據(jù)在同一個partition中是順序?qū)懭氲某槁担覀儾粩嗟淖芳訑?shù)據(jù),那保存數(shù)據(jù)的文件就會越來越大背镇,這個時候檢索的效率就會越來越低咬展。
所以,kafka這塊干脆對partition再次進(jìn)行了切分瞒斩,切分出來的單位就就做段(segment)破婆,實(shí)際上kafka數(shù)據(jù)的存儲是分段的。我們可以在kafka的存儲目錄下看到這三個文件都是成對出現(xiàn)的:


segment

這其中是一個數(shù)據(jù)文件胸囱,2個索引文件祷舀。segment的默認(rèn)存儲大小是1G,可以通過一下參數(shù)進(jìn)行控制烹笔。

log.segment.bytes=1073741824

9. Consumer Group

在kafka中裳扯,消費(fèi)者是以消費(fèi)者組的形式對消息進(jìn)行接收。每個消費(fèi)者組都會由一個group id與對應(yīng)的topic進(jìn)行綁定谤职。
\color{blue}{注意: 同一個group中的消費(fèi)者不能消費(fèi)相同的partition饰豺,可以將partition比作一個座位,一個座位最多坐一個人允蜈。}

  • 消費(fèi)者組中冤吨,消費(fèi)者數(shù)量比partition數(shù)量少的情況下蒿柳,一個消費(fèi)者同時消費(fèi)多個partition。
  • 消費(fèi)者組中漩蟆,消費(fèi)者數(shù)量比partition數(shù)量多的情況下垒探,存在消費(fèi)者空閑。

這兩種情況都不是效率最高的情況怠李,只有消費(fèi)者數(shù)量和partition數(shù)量保持一致才是最好的選擇圾叼。如果想要消費(fèi)同一個partition,就需要另一個消費(fèi)者組來進(jìn)行捺癞。

10. Comsumer Offset

我們前面談到夷蚊,在Kafka中消息是順序寫入的,并且消費(fèi)的消息是不會被刪除的翘簇。那么撬码,如果消費(fèi)者突然掛掉,或者進(jìn)行下次讀寫時版保,如何知道自己已經(jīng)讀取了哪些信息,該從何處繼續(xù)讀取消息呢夫否?
既然消息是有序的彻犁,那我們就可以給消息進(jìn)行編號,來唯一標(biāo)識一條消息凰慈。

offset

這里的編號我們就稱之為offset汞幢,偏移量。offset記錄著下一條將要發(fā)送給consumer的消息序號微谓。offset的保存是保存在服務(wù)端的森篷,并不是保存在ZK上面。

3. Kafka Java開發(fā)

生產(chǎn)者:

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        // 設(shè)置key  value序列化的工具
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //設(shè)置消息接收確認(rèn)模式  0 發(fā)出就立刻確認(rèn)豺型, 1 leader接收到就確認(rèn)  all 所有follower同步完成再確認(rèn)
        props.put("acks","1");
        // 異常重試次數(shù)
        props.put("retries", 3);
        // 設(shè)置批量發(fā)送數(shù)據(jù)一次仲智,數(shù)據(jù)大小,默認(rèn)16k
        props.put("batch.size",16384);
        // 設(shè)置批量發(fā)送等待時間
        props.put("linger.ms", 5);
        // 設(shè)置客戶端緩沖區(qū)大小姻氨,默認(rèn)是32M钓辆,滿了以后也會出發(fā)消息發(fā)送
        props.put("buffer.memory", 33554432);
        // 獲取元數(shù)據(jù)時生產(chǎn)者的阻塞時間,超時后拋出異常
        props.put("max.block.ms", 3000);

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        for (int i=0; i < 100; i ++) {
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

消費(fèi)者

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        props.put("group.id", "xiong-group");
        // 是否自動提交偏移量肴焊,只有commit之后才更新消費(fèi)者組的offset
        props.put("enable.auto.commit", "true");
        // 消費(fèi)者自動提交的時間間隔
        props.put("auto.commit.interval.ms", "1000");
        // 從最早的數(shù)據(jù)開始消費(fèi)earliest | latest | none
        props.put("auto.offset.reset", "earliest");
        // 設(shè)置key  value反序列化的工具
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱隊(duì)列
        consumer.subscribe(Arrays.asList("mytopic"));
        try{
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset=%d, key=%s, value=%s, partition=%s%n",
                            record.offset(), record.key(), record.value(), record.partition());
                }
            }

        } finally {
            consumer.close();
        }
    }
}

查詢消費(fèi)者相關(guān)偏移量數(shù)據(jù):

./kafka-consumer-groups.sh --bootstrap-server 192.168.182.128:9092 --describe --group xiong-group

4. 消息冪等性

什么叫做消息冪等性前联?
簡單來說就是,消息發(fā)送一次的結(jié)果和發(fā)送多次的結(jié)果是一樣的娶眷。
有時候消息消費(fèi)失敗的情況下似嗤,我們可能會采用消息重發(fā)的機(jī)制。但是生產(chǎn)者有時候是不知道消息是不是真的消費(fèi)失敗時届宠,這時候消息的重發(fā)可能會產(chǎn)生消息重復(fù)的情況烁落。
kafka實(shí)現(xiàn)消息的冪等性是在broker中實(shí)現(xiàn)的壳咕,而不是消費(fèi)者端實(shí)現(xiàn),大大的解放了消費(fèi)者的雙手顽馋。
如何實(shí)現(xiàn)消息的去重谓厘?
去重是需要依賴生產(chǎn)者消息的唯一標(biāo)識的,不然我們沒法知道是否是同一條消息寸谜,kafka中可以通過如下配置來產(chǎn)生唯一標(biāo)識竟稳,將producer升級成冪等性的producer。

props.put("enable.idempotence", true);

實(shí)現(xiàn)機(jī)制:

  • PID(Producer ID), 冪等性的生產(chǎn)者每個客戶端都有一個唯一的編號熊痴。
  • sequence number他爸,冪等性的生產(chǎn)者發(fā)送的每條消息都會帶sequence number, Server端就是通過這個值來判斷消息是否重復(fù)果善。如果server端發(fā)現(xiàn)sequence number的值比服務(wù)端記錄的值要小诊笤,那證明這個消息是重復(fù)的消息。(同一分區(qū)消息順序?qū)懭虢砩拢叭绻嬖趕equence number較小的在后面寫入讨跟,那證明之前肯定已經(jīng)有相同的消息已經(jīng)發(fā)送過來過了)。

作用范圍:

  1. sequence number并不是全局有序鄙煤,不能保證所有時間上的冪等晾匠。只能保證單分區(qū)上的冪等。
  2. 單會話上的冪等梯刚,這里的會話是指producer進(jìn)程的一次運(yùn)行凉馆。當(dāng)producer重啟以后就不能保證了。

5. 生產(chǎn)者事務(wù)

生產(chǎn)者與事務(wù)有關(guān)的方法如下:(kafka 0.11版本以后才支持事務(wù))

對象 描述
initTransactions() 初始化事務(wù)
beginTransaction() 開啟事務(wù)
commitTransaction() 提交事務(wù)
abortTransaction() 中止事務(wù)
sendOffsetsToTransaction() sendOffsetsToTransaction方法是消費(fèi)者和生產(chǎn)者在同一段代碼使用的(從上游接收消息發(fā)送給下游)亡资,在提交的時候把消費(fèi)消息的offset發(fā)送給consumer Corordinator.

代碼示例:

        //事務(wù)的前提是消費(fèi)者的冪等性
        props.put("enable.idempotence", true);
        //設(shè)置事務(wù)id澜共,唯一
        props.put("transactional.id", UUID.randomUUID().toString());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.initTransactions();
        try{
            producer.beginTransaction();
            for (int i=0; i < 100; i ++) {
                producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
                if (i == 20) {
                    Integer j = 1/0; //制造異常
                }
            }
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(100), Integer.toString(100)));
            producer.commitTransaction();
        } catch (KafkaException e) {
            producer.abortTransaction();
        }
        producer.close();

kafka分布式事務(wù)的實(shí)現(xiàn):

  1. 生產(chǎn)者的消息會分區(qū),所以這里的事務(wù)屬于分布式事務(wù)锥腻。kafka采用的是2PC提交嗦董。如果大家都可以commit就提交,否則就abort旷太;
  2. 2PC的情況下展懈,需要一個協(xié)調(diào)者,在Kafka中這個角色叫做Transaction Coordinator供璧。
  3. 事務(wù)管理必須有事務(wù)日志來記錄事務(wù)的狀態(tài)存崖,以便在Coordinator以外掛掉以后繼續(xù)處理原來的事務(wù)。事務(wù)日志的存儲類似于消費(fèi)者offset的存儲睡毒,kafka使用了一個特殊topic--transaction_state來記錄事務(wù)的狀態(tài)信息来惧。
  4. 如果生產(chǎn)者掛了,事務(wù)要在重啟以后繼續(xù)處理就需要有一個唯一的事務(wù)id來找到對應(yīng)的事務(wù)演顾,這個就是transaction.id供搀。配置了transaction.id隅居,此時生產(chǎn)者必須是冪等性的生產(chǎn)者。事務(wù)id相同的生產(chǎn)者可以繼續(xù)處理原來的事務(wù)葛虐。
事務(wù)處理

步驟描述:
A: 生產(chǎn)者通過initTransactions Api向coordinator注冊事務(wù)id胎源。
B: Corrdinator記錄事務(wù)日志
C: 生產(chǎn)者將消息寫入目標(biāo)分區(qū)
D: 分區(qū)域Coordinator的交互,當(dāng)事務(wù)完成以后消息的狀態(tài)應(yīng)該是已提交屿脐。這時候消費(fèi)者才能消費(fèi)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末涕蚤,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子的诵,更是在濱河造成了極大的恐慌万栅,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件西疤,死亡現(xiàn)場離奇詭異烦粒,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)代赁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門扰她,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人管跺,你說我怎么就攤上這事义黎。” “怎么了豁跑?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長泻云。 經(jīng)常有香客問我艇拍,道長,這世上最難降的妖魔是什么宠纯? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任卸夕,我火速辦了婚禮,結(jié)果婚禮上婆瓜,老公的妹妹穿的比我還像新娘快集。我一直安慰自己,他們只是感情好廉白,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布个初。 她就那樣靜靜地躺著,像睡著了一般猴蹂。 火紅的嫁衣襯著肌膚如雪院溺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天磅轻,我揣著相機(jī)與錄音珍逸,去河邊找鬼逐虚。 笑死,一個胖子當(dāng)著我的面吹牛谆膳,可吹牛的內(nèi)容都是我干的叭爱。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼漱病,長吁一口氣:“原來是場噩夢啊……” “哼买雾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起缨称,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤凝果,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后睦尽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體器净,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年当凡,在試婚紗的時候發(fā)現(xiàn)自己被綠了山害。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡沿量,死狀恐怖浪慌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情朴则,我是刑警寧澤权纤,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站乌妒,受9級特大地震影響汹想,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜撤蚊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一古掏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧侦啸,春花似錦槽唾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至顶捷,卻和暖如春挂绰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工葵蒂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留交播,地道東北人。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓践付,卻偏偏與公主長得像秦士,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子永高,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容