Kafka學(xué)習(xí)筆記(二) :初探Kafka

看完上一篇伊脓,相信大家對消息系統(tǒng)以及Kafka的整體構(gòu)成都有了初步了解府寒,學(xué)習(xí)一個東西最好的辦法,就是去使用它丽旅,今天就讓我們一起窺探一下Kafka椰棘,并完成自己的處女作。

消息在Kafka中的歷程

雖然我們掌握東西要一步一步來榄笙,但是我們在大致了解了一個東西后邪狞,會有利于我們對它的理解和學(xué)習(xí),所以我們可以先來看一下一條消息從發(fā)出到最后被消息者接收到底經(jīng)歷了什么茅撞?

message-flow.png

上圖簡要的說明了消息在Kafka中的整個流轉(zhuǎn)過程(假設(shè)已經(jīng)部署好了整個Kafka系統(tǒng)帆卓,并創(chuàng)建了相應(yīng)的Topic,分區(qū)等細節(jié)后續(xù)再單獨講):

  • 1.消息生產(chǎn)者將消息發(fā)布到具體的Topic米丘,根據(jù)一定算法或者隨機被分發(fā)到具體的分區(qū)中剑令;
  • 2.根據(jù)實際需求,是否需要實現(xiàn)處理消息邏輯拄查;
  • 3.若需要吁津,則實現(xiàn)具體邏輯后將結(jié)果發(fā)布到輸出Topic;
  • 4.消費者根據(jù)需求訂閱相關(guān)Topic堕扶,并消費消息碍脏;

總的來說,怎么流程還是比較清晰和簡單的稍算,下面就跟我一起來練習(xí)Kafka的基本操作典尾,最后實現(xiàn)一個單詞計數(shù)的小demo。

基礎(chǔ)操作

以下代碼及相應(yīng)測試在以下環(huán)境測試通過:Mac OS + JDK1.8糊探,Linux系統(tǒng)應(yīng)該也能跑通钾埂,Windows有興趣的同學(xué)可以去官網(wǎng)下載相應(yīng)版本進行相應(yīng)的測試練習(xí)。

下載Kafka

Mac系統(tǒng)同學(xué)可以使用brew安裝:

brew install kafka

Linux系統(tǒng)同學(xué)可以從官網(wǎng)下載源碼解壓科平,也可以直接執(zhí)行以下命令:

cd 
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1

啟動

Kafka使用Zookeeper來維護集群信息褥紫,所以這里我們先要啟動Zookeeper,Kafka與Zookeeper的相關(guān)聯(lián)系跟結(jié)合后續(xù)再深入了解瞪慧,畢竟不能一口吃成一個胖子髓考。

bin/zookeeper-server-start.sh config/zookeeper.properties

接著我們啟動一個Kafka Server節(jié)點:

bin/kafka-server-start.sh config/server.properties

這時候Kafka系統(tǒng)已經(jīng)算是啟動起來了。

創(chuàng)建Topic

在一切就緒之后汞贸,我們要開始做極其重要的一步绳军,那就是創(chuàng)建Topic,Topic是整個系統(tǒng)流轉(zhuǎn)的核心矢腻,另外Topic本身也包含著很多復(fù)雜的參數(shù)门驾,比如復(fù)制因子個數(shù),分區(qū)個數(shù)等多柑,這里為了從簡奶是,我們將對應(yīng)的參數(shù)都設(shè)為1,方便大家測試:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test

其中參數(shù)的具體含義:

屬性 功能
--create 代表創(chuàng)建Topic
--zookeeper zookeeper集群信息
--replication-factor 復(fù)制因子
--partitions 分區(qū)信息
--topic Topic名稱

這時候我們已經(jīng)創(chuàng)建好了一個叫kakfa-test的Topic了竣灌。

向Topic發(fā)送消息

在有了Topic后我們就可以向其發(fā)送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test

然后我們向控制臺輸入一些消息:

this is my first test kafka
so good

這時候消息已經(jīng)被發(fā)布在kakfa-test這個主題上了聂沙。

從Topic獲取消息

現(xiàn)在Topic上已經(jīng)有消息了,現(xiàn)在可以從中獲取消息被消費:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning

這時候我們可以在控制臺看到:

this is my first test kafka
so good

至此我們就測試了最簡單的Kafka Demo初嘹,希望大家能自己動手去試試及汉,雖然很簡單,但是這能讓你對整個Kafka流程能更熟悉屯烦。

WordCount

下面我們來利用上面的一些基本操作來實現(xiàn)一個簡單WordCount程序坷随,它具備以下功能:

  • 1.支持詞組持續(xù)輸入,即生產(chǎn)者不斷生成消息驻龟;
  • 2.程序自動從輸入Topic中獲取原始數(shù)據(jù)温眉,然后經(jīng)過處理,將處理結(jié)果發(fā)布在計數(shù)Topic中翁狐;
  • 3.消費者可以從計數(shù)Topic獲取相應(yīng)的WordCount的結(jié)果类溢;

1.啟動kafka

與上文的啟動一樣,按照其操作即可露懒。

2.創(chuàng)建輸入Topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1

3.向Topic輸入消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input

4.流處理邏輯

這部分內(nèi)容是整個例子的核心闯冷,這部分代碼有Java 8+和Scala版本,個人認為流處理用函數(shù)式語法表達的更加簡潔清晰隐锭,推薦大家用函數(shù)式的思維去嘗試寫以下窃躲,發(fā)現(xiàn)自己再也不想寫Java匿名內(nèi)部類這種語法了。

我們先來看一個Java 8的版本:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
        Pattern pattern = Pattern.compile("\\W+");
        source
           .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
           .toStream()
           .to("kafka-word-count-output");
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

是不是很驚訝钦睡,用java也能寫出如此簡潔的代碼蒂窒,所以說如果有適用場景,推薦大家嘗試的用函數(shù)式的思維去寫寫java代碼荞怒。

我們再來看看Scala版本的:


object WordCount {
  def main(args: Array[String]) {
    val props: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    val source: KStream[String, String] = builder.stream("kafka-word-count-input")
    source
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
    val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
    streams.start()
  }
}

可以發(fā)現(xiàn)使用Java 8函數(shù)式風(fēng)格編寫的代碼已經(jīng)跟Scala很相似了洒琢。

5.啟動處理邏輯

很多同學(xué)電腦上并沒有裝sbt,所以這里演示的利用Maven構(gòu)建的Java版本褐桌,具體執(zhí)行步驟請參考戳這里kafka-word-count上的說明衰抑。

6.啟動消費者進程

最后我們啟動消費者進程,并在生產(chǎn)者中輸入一些單詞荧嵌,比如:

kafka-word-count-input.png

最后我們可以在消費者進程中看到以下輸出:

bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092  --property print.key=true
kafka-word-count-output.png

總結(jié)

本篇文章主要是講解了Kafka的基本運行過程和一些基礎(chǔ)操作呛踊,但這是我們學(xué)習(xí)一個東西必不可少的一步砾淌,只有把基礎(chǔ)扎實好,才能更深入的去了解它谭网,理解它為什么這么設(shè)計汪厨,我在這個過程中也遇到很多麻煩,所以還是希望大家能夠自己動手去實踐一下愉择,最終能收獲更多劫乱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市锥涕,隨后出現(xiàn)的幾起案子衷戈,更是在濱河造成了極大的恐慌,老刑警劉巖层坠,帶你破解...
    沈念sama閱讀 212,542評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件殖妇,死亡現(xiàn)場離奇詭異,居然都是意外死亡窿春,警方通過查閱死者的電腦和手機拉一,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旧乞,“玉大人蔚润,你說我怎么就攤上這事〕咂埽” “怎么了嫡纠?”我有些...
    開封第一講書人閱讀 158,021評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長延赌。 經(jīng)常有香客問我除盏,道長,這世上最難降的妖魔是什么挫以? 我笑而不...
    開封第一講書人閱讀 56,682評論 1 284
  • 正文 為了忘掉前任者蠕,我火速辦了婚禮,結(jié)果婚禮上掐松,老公的妹妹穿的比我還像新娘踱侣。我一直安慰自己,他們只是感情好大磺,可當(dāng)我...
    茶點故事閱讀 65,792評論 6 386
  • 文/花漫 我一把揭開白布抡句。 她就那樣靜靜地躺著,像睡著了一般杠愧。 火紅的嫁衣襯著肌膚如雪待榔。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,985評論 1 291
  • 那天流济,我揣著相機與錄音锐锣,去河邊找鬼腌闯。 笑死,一個胖子當(dāng)著我的面吹牛雕憔,可吹牛的內(nèi)容都是我干的绑嘹。 我是一名探鬼主播,決...
    沈念sama閱讀 39,107評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼橘茉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了姨丈?” 一聲冷哼從身側(cè)響起畅卓,我...
    開封第一講書人閱讀 37,845評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蟋恬,沒想到半個月后翁潘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,299評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡歼争,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,612評論 2 327
  • 正文 我和宋清朗相戀三年拜马,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沐绒。...
    茶點故事閱讀 38,747評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡俩莽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乔遮,到底是詐尸還是另有隱情扮超,我是刑警寧澤,帶...
    沈念sama閱讀 34,441評論 4 333
  • 正文 年R本政府宣布蹋肮,位于F島的核電站出刷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏坯辩。R本人自食惡果不足惜馁龟,卻給世界環(huán)境...
    茶點故事閱讀 40,072評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望漆魔。 院中可真熱鬧坷檩,春花似錦、人聲如沸有送。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽雀摘。三九已至裸删,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間阵赠,已是汗流浹背涯塔。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評論 1 267
  • 我被黑心中介騙來泰國打工肌稻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人匕荸。 一個月前我還...
    沈念sama閱讀 46,545評論 2 362
  • 正文 我出身青樓爹谭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親榛搔。 傳聞我的和親對象是個殘疾皇子诺凡,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,658評論 2 350

推薦閱讀更多精彩內(nèi)容