看完上一篇伊脓,相信大家對消息系統(tǒng)以及Kafka的整體構(gòu)成都有了初步了解府寒,學(xué)習(xí)一個東西最好的辦法,就是去使用它丽旅,今天就讓我們一起窺探一下Kafka椰棘,并完成自己的處女作。
消息在Kafka中的歷程
雖然我們掌握東西要一步一步來榄笙,但是我們在大致了解了一個東西后邪狞,會有利于我們對它的理解和學(xué)習(xí),所以我們可以先來看一下一條消息從發(fā)出到最后被消息者接收到底經(jīng)歷了什么茅撞?
上圖簡要的說明了消息在Kafka中的整個流轉(zhuǎn)過程(假設(shè)已經(jīng)部署好了整個Kafka系統(tǒng)帆卓,并創(chuàng)建了相應(yīng)的Topic,分區(qū)等細節(jié)后續(xù)再單獨講):
- 1.消息生產(chǎn)者將消息發(fā)布到具體的Topic米丘,根據(jù)一定算法或者隨機被分發(fā)到具體的分區(qū)中剑令;
- 2.根據(jù)實際需求,是否需要實現(xiàn)處理消息邏輯拄查;
- 3.若需要吁津,則實現(xiàn)具體邏輯后將結(jié)果發(fā)布到輸出Topic;
- 4.消費者根據(jù)需求訂閱相關(guān)Topic堕扶,并消費消息碍脏;
總的來說,怎么流程還是比較清晰和簡單的稍算,下面就跟我一起來練習(xí)Kafka的基本操作典尾,最后實現(xiàn)一個單詞計數(shù)的小demo。
基礎(chǔ)操作
以下代碼及相應(yīng)測試在以下環(huán)境測試通過:Mac OS + JDK1.8糊探,Linux系統(tǒng)應(yīng)該也能跑通钾埂,Windows有興趣的同學(xué)可以去官網(wǎng)下載相應(yīng)版本進行相應(yīng)的測試練習(xí)。
下載Kafka
Mac系統(tǒng)同學(xué)可以使用brew安裝:
brew install kafka
Linux系統(tǒng)同學(xué)可以從官網(wǎng)下載源碼解壓科平,也可以直接執(zhí)行以下命令:
cd
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1
啟動
Kafka使用Zookeeper來維護集群信息褥紫,所以這里我們先要啟動Zookeeper,Kafka與Zookeeper的相關(guān)聯(lián)系跟結(jié)合后續(xù)再深入了解瞪慧,畢竟不能一口吃成一個胖子髓考。
bin/zookeeper-server-start.sh config/zookeeper.properties
接著我們啟動一個Kafka Server節(jié)點:
bin/kafka-server-start.sh config/server.properties
這時候Kafka系統(tǒng)已經(jīng)算是啟動起來了。
創(chuàng)建Topic
在一切就緒之后汞贸,我們要開始做極其重要的一步绳军,那就是創(chuàng)建Topic,Topic是整個系統(tǒng)流轉(zhuǎn)的核心矢腻,另外Topic本身也包含著很多復(fù)雜的參數(shù)门驾,比如復(fù)制因子個數(shù),分區(qū)個數(shù)等多柑,這里為了從簡奶是,我們將對應(yīng)的參數(shù)都設(shè)為1,方便大家測試:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test
其中參數(shù)的具體含義:
屬性 | 功能 |
---|---|
--create | 代表創(chuàng)建Topic |
--zookeeper | zookeeper集群信息 |
--replication-factor | 復(fù)制因子 |
--partitions | 分區(qū)信息 |
--topic | Topic名稱 |
這時候我們已經(jīng)創(chuàng)建好了一個叫kakfa-test的Topic了竣灌。
向Topic發(fā)送消息
在有了Topic后我們就可以向其發(fā)送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test
然后我們向控制臺輸入一些消息:
this is my first test kafka
so good
這時候消息已經(jīng)被發(fā)布在kakfa-test這個主題上了聂沙。
從Topic獲取消息
現(xiàn)在Topic上已經(jīng)有消息了,現(xiàn)在可以從中獲取消息被消費:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning
這時候我們可以在控制臺看到:
this is my first test kafka
so good
至此我們就測試了最簡單的Kafka Demo初嘹,希望大家能自己動手去試試及汉,雖然很簡單,但是這能讓你對整個Kafka流程能更熟悉屯烦。
WordCount
下面我們來利用上面的一些基本操作來實現(xiàn)一個簡單WordCount程序坷随,它具備以下功能:
- 1.支持詞組持續(xù)輸入,即生產(chǎn)者不斷生成消息驻龟;
- 2.程序自動從輸入Topic中獲取原始數(shù)據(jù)温眉,然后經(jīng)過處理,將處理結(jié)果發(fā)布在計數(shù)Topic中翁狐;
- 3.消費者可以從計數(shù)Topic獲取相應(yīng)的WordCount的結(jié)果类溢;
1.啟動kafka
與上文的啟動一樣,按照其操作即可露懒。
2.創(chuàng)建輸入Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1
3.向Topic輸入消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input
4.流處理邏輯
這部分內(nèi)容是整個例子的核心闯冷,這部分代碼有Java 8+和Scala版本,個人認為流處理用函數(shù)式語法表達的更加簡潔清晰隐锭,推薦大家用函數(shù)式的思維去嘗試寫以下窃躲,發(fā)現(xiàn)自己再也不想寫Java匿名內(nèi)部類這種語法了。
我們先來看一個Java 8的版本:
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
Pattern pattern = Pattern.compile("\\W+");
source
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
.toStream()
.to("kafka-word-count-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
是不是很驚訝钦睡,用java也能寫出如此簡潔的代碼蒂窒,所以說如果有適用場景,推薦大家嘗試的用函數(shù)式的思維去寫寫java代碼荞怒。
我們再來看看Scala版本的:
object WordCount {
def main(args: Array[String]) {
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val source: KStream[String, String] = builder.stream("kafka-word-count-input")
source
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
}
}
可以發(fā)現(xiàn)使用Java 8函數(shù)式風(fēng)格編寫的代碼已經(jīng)跟Scala很相似了洒琢。
5.啟動處理邏輯
很多同學(xué)電腦上并沒有裝sbt,所以這里演示的利用Maven構(gòu)建的Java版本褐桌,具體執(zhí)行步驟請參考戳這里kafka-word-count上的說明衰抑。
6.啟動消費者進程
最后我們啟動消費者進程,并在生產(chǎn)者中輸入一些單詞荧嵌,比如:
最后我們可以在消費者進程中看到以下輸出:
bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092 --property print.key=true
總結(jié)
本篇文章主要是講解了Kafka的基本運行過程和一些基礎(chǔ)操作呛踊,但這是我們學(xué)習(xí)一個東西必不可少的一步砾淌,只有把基礎(chǔ)扎實好,才能更深入的去了解它谭网,理解它為什么這么設(shè)計汪厨,我在這個過程中也遇到很多麻煩,所以還是希望大家能夠自己動手去實踐一下愉择,最終能收獲更多劫乱。