消息隊列中間件(三)Kafka 入門指南

Kafka

Kafka 來源

Kafka的前身是由LinkedIn開源的一款產(chǎn)品预皇,2011年初開始開源,加入了 Apache 基金會仇穗,2012年從 Apache Incubator 畢業(yè)變成了 Apache 頂級開源項目屯曹。同時LinkedIn還有許多著名的開源產(chǎn)品摹蘑。如:

  • 分布式數(shù)據(jù)同步系統(tǒng)Databus
  • 高性能計算引擎Cubert
  • Java異步處理框架ParSeq
  • Kafka流處理平臺

Kafka 介紹

Kafka 用于構建實時數(shù)據(jù)管道和流應用程序。它具有水平可擴展性扰法,容錯性蛹含,快速性,并在數(shù)千家公司的生產(chǎn)環(huán)境中運行塞颁。

從官方我們可以知道ApacheKafka一個分布式流媒體平臺浦箱。這到底是什么意思呢?

流媒體平臺有三個關鍵功能:

  • 發(fā)布和訂閱記錄數(shù)據(jù)流殴边,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)憎茂。
  • 有容錯能力的可以持久化的存儲數(shù)據(jù)流。
  • 記錄發(fā)生時可以進行流處理锤岸。

Kafka 通常用于兩大類應用:

  • 構建可在系統(tǒng)或應用程序之間可靠獲取數(shù)據(jù)的實時流數(shù)據(jù)管道
  • 構建轉換或響應數(shù)據(jù)流的實時流處理

Kafka 基本概念

  • Producer - 消息和數(shù)據(jù)的生產(chǎn)者竖幔,向 Kafka 的一個 Topic 發(fā)布消息的進程/代碼/服務。
  • **Consumer **- 消息和數(shù)據(jù)的消費者是偷,訂閱數(shù)據(jù)(Topic)并且處理其發(fā)布的消息的進程/代碼/服務拳氢。
  • Consumer Group - 邏輯概念,對于同一個 Topic蛋铆,會廣播不同的 Group馋评,一個Group中,只有一個consumer 可以消費該消息刺啦。
  • Broker - 物理概念留特,Kafka 集群中的每個 Kafka 節(jié)點。
  • Topic - 邏輯概念玛瘸,Kafka消息的類別蜕青,對數(shù)據(jù)進行區(qū)分,隔離糊渊。
  • Partition - 物理概念右核,分片,Kafka 下數(shù)據(jù)存儲的基本單元渺绒,一個 Topic 數(shù)據(jù)贺喝,會被分散存儲到多個Partition菱鸥,每一個Partition是有序的。
  • **Replication **- 副本躏鱼,同一個 Partition 可能會有多個 Replica 氮采,多個 Replica 之間數(shù)據(jù)是一樣的。
  • Replication Leader - 一個 Partition 的多個 Replica 上染苛,需要一個 Leade r負責該 Partition 上與 Produce 和 Consumer 交互
  • ReplicaManager - 負責管理當前的 broker 所有分區(qū)和副本的信息扳抽,處理 KafkaController 發(fā)起的一些請求,副本狀態(tài)的切換殖侵,添加/讀取消息等贸呢。

概念的延伸

Partition

  • 每一個Topic被切分為多個Partitions
  • 消費者數(shù)據(jù)要小于等于Partition的數(shù)量
  • Broker Group中的每一個Broker保存Topic的一個或多個Partitions
  • Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partions,并且是唯一的Consumer拢军。

Replication

  • 當集群中有Broker掛掉的時候楞陷,系統(tǒng)可以主動的使用Replicas提供服務。
  • 系統(tǒng)默認設置每一個Topic的Replication的系數(shù)為1茉唉,可以在創(chuàng)建Topic的時候單獨設置固蛾。

Replication特點

  • Replication的基本單位是Topic的Partition。
  • 所有的讀和寫都從Leader進度陆,F(xiàn)ollowers只是作為備份艾凯。
  • Follower必須能夠及時的復制Leader的數(shù)據(jù)
  • 增加容錯性與可擴展性。

Kafka 消息結構

在 Kafka2.0 中的消息結構如下(整理自官網(wǎng))懂傀。

baseOffset: int64 - 用于記錄Kafka這個消息所處的偏移位置
batchLength: int32 - 用于記錄整個消息的長度
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2) - 一個固定值趾诗,用于快速判斷是否是Kafka消息
crc: int32 - 用于校驗信息的完整性
attributes: int16 - 當前消息的一些屬性

bit 0~2:

0: no compression
1: gzip
2: snappy
3: lz4

bit 3: timestampType
? bit 4: isTransactional (0 means not transactional)
? bit 5: isControlBatch (0 means not a control batch)
? bit 6~15: unused

lastOffsetDelta: int32
firstTimestamp : int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records:

length: varint
attributes: int8

bit 0~7: unused

timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]

headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

關于消息結構的一些釋義。

  • Offset -用于記錄Kafka這個消息所處的偏移位置
  • Length - 用于記錄整個消息的長度
  • CRC32 - 用于校驗信息的完整性
  • Magic - 一個固定值蹬蚁,用于快速判斷是否是Kafka消息
  • Attributes - 當前消息的一些屬性
  • Timestamp - 消息的時間戳
  • Key Length - key的長度
  • Key - Key的具體值
  • Value Length - 值的長度
  • Value - 具體的消息值

Kafka 優(yōu)點

  1. 分布式 - Kafka是分布式的恃泪,多分區(qū),多副本的和多訂閱者的犀斋,基于Zookeeper調度贝乎。
  2. 持久性和擴展性 - Kafka使用分布式提交日志,這意味著消息會盡可能快地保留在磁盤上叽粹,因此它是持久的览效。同時具有一定的容錯性,Kafka支持在線的水平擴展虫几,消息的自平衡锤灿。
  3. 高性能 - Kafka對于發(fā)布和訂閱消息都具有高吞吐量。 即使存儲了許多TB的消息持钉,它也保持穩(wěn)定的性能衡招。且延遲低篱昔,適用高并發(fā)每强。時間復雜的為o(1)始腾。

Kafka 應用

  1. 用于聚合分布式應用程序中的消息。進行操作監(jiān)控空执。
  2. 用于跨組織的從多個服務收集日志浪箭,然后提供給多個服務器,解決日志聚合問題辨绊。
  3. 用于流處理奶栖,如Storm和Spark Streaming,從kafka中讀取數(shù)據(jù)门坷,然后處理在寫入kafka供應用使用宣鄙。

Kafka 安裝

安裝 Jdk

具體步驟此處不說。

安裝 Kafka

直接官方網(wǎng)站下載對應系統(tǒng)的版本解壓即可默蚌。
由于Kafka對于windows和Unix平臺的控制腳本是不同的冻晤,因此如果是windows平臺,要使用bin\windows\而不是bin/绸吸,并將腳本擴展名更改為.bat鼻弧。以下命令是基于Unix平臺的使用。

# 解壓
tar -xzf kafka_2.11-2.0.0.tgz
# 啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 啟動Kafka
bin/kafka-server-start.sh config/server.properties
# 或者后臺啟動
bin/kafka-server-start.sh config/server.properties &

讓我們創(chuàng)建一個名為“test”的主題锦茁,它只包含一個分區(qū)攘轩,只有一個副本:

`> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我們運行l(wèi)ist topic命令,我們現(xiàn)在可以看到該主題:

`> bin/kafka-topics.sh --list --zookeeper localhost:2181 test

或者码俩,您也可以將代理配置為在發(fā)布不存在的主題時自動創(chuàng)建主題度帮,而不是手動創(chuàng)建主題。

查看Topic的信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic Hello-Kafka

運行生產(chǎn)者稿存,然后在控制臺中鍵入一些消息以發(fā)送到服務器够傍。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message`

運行消費者,查看收到的消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
> This is a message
> This is another message

Kafka 工程實例

POM 依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

生產(chǎn)者

編寫生產(chǎn)者 Java 代碼挠铲。關于 Properties 中的值的意思描述可以在官方文檔中找到 http://kafka.apache.org/ 冕屯。下面的生產(chǎn)者向 Kafka 推送了10條消息。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * <p>
 * Kafka生產(chǎn)者,發(fā)送10個數(shù)據(jù)
 *
 * @Author niujinpeng
 * @Date 2018/11/16 15:45
 */
public class MyProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.110.132:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }

}

消費者

編寫消費者 Java 代碼拂苹。


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * <p>
 * Kafka消費者
 *
 * @Author niujinpeng
 * @Date 2018/11/19 15:01
 */
public class MyConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.110.132:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}

可以在控制臺看到成功運行后的輸出安聘,由 offset 可以看到已經(jīng)消費了10條消息。

 INFO | Kafka version : 2.0.0
 INFO | Kafka commitId : 3402a8361b734732
 INFO | Cluster ID: 0Xrk5M1CSJet0m1ut3zbiw
 INFO | [Consumer clientId=consumer-1, groupId=test] Discovered group coordinator 192.168.110.132:9092 (id: 2147483647 rack: null)
 INFO | [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions []
 INFO | [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
 INFO | [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 4
 INFO | [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [test-0]
offset = 38, key = 0, value = 0
offset = 39, key = 1, value = 1
offset = 40, key = 2, value = 2
offset = 41, key = 3, value = 3
offset = 42, key = 4, value = 4
offset = 43, key = 5, value = 5
offset = 44, key = 6, value = 6
offset = 45, key = 7, value = 7
offset = 46, key = 8, value = 8
offset = 47, key = 9, value = 9

問題

如果java.net.InetAddress.getCanonicalHostName 取到的是主機名瓢棒。需要修改 Kafka 的配置文件浴韭。

vim server.properties
# x.x.x.x是服務器IP
advertised.listeners=PLAINTEXT://x.x.x.x:9092

<完>
本文原發(fā)于個人博客:https://www.codingme.net 轉載請注明出處

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市脯宿,隨后出現(xiàn)的幾起案子念颈,更是在濱河造成了極大的恐慌,老刑警劉巖连霉,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件榴芳,死亡現(xiàn)場離奇詭異嗡靡,居然都是意外死亡,警方通過查閱死者的電腦和手機窟感,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門讨彼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人柿祈,你說我怎么就攤上這事哈误。” “怎么了躏嚎?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵蜜自,是天一觀的道長。 經(jīng)常有香客問我卢佣,道長袁辈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任珠漂,我火速辦了婚禮晚缩,結果婚禮上,老公的妹妹穿的比我還像新娘媳危。我一直安慰自己荞彼,他們只是感情好,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布待笑。 她就那樣靜靜地躺著鸣皂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪暮蹂。 梳的紋絲不亂的頭發(fā)上寞缝,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天,我揣著相機與錄音仰泻,去河邊找鬼荆陆。 笑死,一個胖子當著我的面吹牛集侯,可吹牛的內容都是我干的被啼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼棠枉,長吁一口氣:“原來是場噩夢啊……” “哼浓体!你這毒婦竟也來了?” 一聲冷哼從身側響起辈讶,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤命浴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后贱除,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體生闲,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡媳溺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了跪腹。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡飞醉,死狀恐怖冲茸,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情缅帘,我是刑警寧澤轴术,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站钦无,受9級特大地震影響逗栽,放射性物質發(fā)生泄漏。R本人自食惡果不足惜失暂,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一彼宠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧弟塞,春花似錦凭峡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至系宫,卻和暖如春索昂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背扩借。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工椒惨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人潮罪。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓框产,卻偏偏與公主長得像,于是被迫代替她去往敵國和親错洁。 傳聞我的和親對象是個殘疾皇子秉宿,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內容

  • 姓名:周小蓬 16019110037 轉載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,713評論 13 425
  • 背景介紹 Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)屯碴。主要設計目標如下: 以時間復雜度為O...
    高廣超閱讀 12,826評論 8 167
  • Kafka簡介 Kafka是一種分布式的描睦,基于發(fā)布/訂閱的消息系統(tǒng)。主要設計目標如下: 以時間復雜度為O(1)的方...
    Alukar閱讀 3,074評論 0 43
  • 《六項精進》打卡第37天 姓名:胡欽欽 公司:寧波市塞納電熱電器有限公司 組別:364期·反省二組 【知~學習】 ...
    Q_8576閱讀 170評論 0 0
  • 今天開始比較正兒八經(jīng)地畫商業(yè)圖表导而,雖然自己也不是很喜歡(我更偏向于文藝風忱叭、生活化的)隔崎,但是這樣的邏輯圖表能很好地理...
    世界記憶大師程程閱讀 322評論 0 0