此教程假設你剛剛開始沒有任何 Kafka 或 ZooKeeper 數(shù)據(jù)肺孤。Kafka的控制臺腳本在類Unix和Windows平臺不同,Windows平臺使用bin\windows\\
代替bin/
,腳本的擴展名改為.bat
。
第一步:下載代碼
下載0.10.1.0發(fā)行版并解壓啄育。
> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0
第二步:啟動服務
Kafka使用Zookeeper,所以如果你沒有的話需要首先啟動Zookeeper服務。你可以使用kafka自帶的腳本啟動一個簡單的單一節(jié)點Zookeeper實例置蜀。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
現(xiàn)在啟動Kafka服務:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
第三步:創(chuàng)建一個主題
讓我們來創(chuàng)建一個名為test
的topic奈搜,只使用單個分區(qū)和一個復本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
我們現(xiàn)在可以運行l(wèi)ist topic命令看到我們的主題盯荤。
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
另外馋吗,當沒有主題存在的時候,你也可以通過配置代理自動創(chuàng)建主題而不是手動創(chuàng)建秋秤。
第四步:發(fā)送消息
Kafka有自帶的命令行客戶端會從文件或者標準輸入接受數(shù)據(jù)當作消息發(fā)送到Kafka集群宏粤。默認情況下脚翘,每行作為一個獨立的消息發(fā)送。
運行生產(chǎn)者控制臺并且打幾行消息到控制臺發(fā)送到服務器绍哎。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
第5步:啟動一個消費者
Kafka還有個消費者控制臺来农,會把消息輸出到標準輸出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你上面的命令是在不同的終端運行崇堰,那么你可以在生產(chǎn)者終端輸入消息然后在消費者終端看到沃于。
所有的命令行工具都有一些額外的參數(shù):如果沒有使用參數(shù)運行命令,將會顯示它們的詳細用法海诲。
第六步:設置多個代理集群
目前為止繁莹,我們已經(jīng)在單個代理上運行了,但這不好玩特幔。對于Kafka咨演,單個代理只是大小為1的集群。所以沒什么改變除了多啟動幾個代理實例蚯斯。只是為了感受一下薄风,我們把集群擴展到3個節(jié)點(仍然在我們的本地機器上)。
首先溉跃,我們?yōu)槊總€代理新建一個配置文件(在windows上使用copy
命令):
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
現(xiàn)在編輯新文件設置一下屬性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id
屬性是唯一的村刨,在集群的每個節(jié)點永久不變。因為我們在單臺機器上運行代理撰茎,必須重寫端口和日志目錄嵌牺。
我們已經(jīng)有了Zookeeper并運行了一個節(jié)點,所以只需要啟動下面的兩個新節(jié)點:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
現(xiàn)在創(chuàng)建一個含有三個副本的主題:
```sh
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好龄糊,現(xiàn)在我們有了一個集群但是怎么知道哪個代理正在做什么逆粹?使用describe topice
命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
這是輸出解釋。第一行給出了各個分區(qū)的概況炫惩,額外的每行都給出了一個分區(qū)的信息僻弹。由于我們只有一個主題的分區(qū),所以只有一行他嚷。
-
leader
是負責當前分區(qū)的所有讀寫請求蹋绽。每個節(jié)點都將領導一個隨機選擇的分區(qū)。 -
replicas
是節(jié)點列表筋蓖,復制分區(qū)日志卸耘,不管他們是不是leader或者即使它們還活著。 -
isr
是in-sync
的集合粘咖。這是replicas
列表當前還活著的子集蚣抗。
注意在我們的示例中節(jié)點1是唯一的主題分區(qū)領導者。
我們運行同樣的命令查看我們已經(jīng)創(chuàng)建的原始主題在哪里:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
所以沒什么驚奇的瓮下,原始的主題沒有副本在節(jié)點0上翰铡,當我們創(chuàng)建它時唯一存在的節(jié)點服務器钝域。
讓我們發(fā)布一些消息到新的主題:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
現(xiàn)在消費這些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
現(xiàn)在測試容錯性。節(jié)點1是領導者锭魔,我們kill它例证。
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
在Windows上使用
> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar" kafka.Kafka config\server-1.properties 644
> taskkill /pid 644 /f
領導關系已經(jīng)改為了從節(jié)點中的一個,節(jié)點1也不再in-sync復本集中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是我們依然可以消費消息即使之前接受的領導者已經(jīng)掛掉:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
第七部:使用Kafka Connect導入導出數(shù)據(jù)
在控制臺輸入輸出數(shù)據(jù)是很方便赂毯,但是你可能使用來自其他數(shù)據(jù)源的數(shù)據(jù)或者把Kafka的數(shù)據(jù)導出到其他的系統(tǒng)中战虏。對于很多系統(tǒng),你可以直接使用Kafka Connect導入導出數(shù)據(jù)而不需要手寫自定義的集成代碼党涕。
Kafka Connect是Kafka自帶的導入導出工具烦感。它是運行連接器的可擴展工具,實現(xiàn)了集成外部系統(tǒng)的自定義邏輯膛堤。在快速教程里手趣,我們會看到如何使用Kafka Connect的簡單連接器從文件導入數(shù)據(jù)到Kafka主題,再從kafka主題導出數(shù)據(jù)到文件肥荔。
首先绿渣,我們先創(chuàng)建一些測試數(shù)據(jù):
echo -e "foo\nbar" >test.txt
然后我們在Standalone模式啟動兩個連接器,Standalone模式表示他們運行在一個本地進程中燕耿。我們提供了三個配置文件作為參數(shù)中符,第一個配置Kafka Connect進程,包含通用的配置如Kafka 代理連接和數(shù)據(jù)序列話工具誉帅。剩余的文件每個都指定一個連接器淀散。這些文件包含一個唯一連接器名,實例化的連接器類蚜锨,和一些其他的連接器配置档插。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
這些示例配置文件,包含在Kafka中亚再,使用默認的本地集群配置并創(chuàng)建了兩個連接器:第一個是源連接器從文件讀取數(shù)據(jù)行每行都生成消息發(fā)送到kafka主題郭膛,第二個是目標連接器從Kafka主題讀取消息生成行輸出到文件中。
在啟動的時候你會看到大量的日志信息氛悬,包含一些表示連接器初始化的则剃。一旦Kafka Connect進程啟動,源連接器開始從test.txt
讀取行并發(fā)送到connect-test
主題如捅,sink連接器開始從connect-test
主題讀取消息把他們寫到test.sink.txt
文件棍现。我們可以檢查輸出文件看到數(shù)據(jù)已經(jīng)通過管道傳遞完畢:
> cat test.sink.txt
foo
bar
注意數(shù)據(jù)存儲在Kafka的主題connect-test
中,我們可以運行消費者控制臺查看主題數(shù)據(jù)(或者消費者代碼處理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
連接器持續(xù)生成數(shù)據(jù)伪朽,所以我們可以給測試文件添加數(shù)據(jù)轴咱,看它通過管道汛蝙。
> echo "Another line" >> test.txt
你應該看到這行出現(xiàn)在消費者控制臺和目標文件中烈涮。
第八步:用Kafka Streams處理數(shù)據(jù)
Kafka Streams 是一個客戶端庫朴肺,為了實時流計算和分析Kafka集群中的存儲數(shù)據(jù)。此快速教程示例將會描述如何運行一個用這個庫編寫的流程序坚洽。這是WordCountDemo
的主要示例代碼(改成Java8 lambda表達式為了容易閱讀)戈稿。
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
它實現(xiàn)了WordCount算法,計算輸入文本單詞直方圖讶舰。然而鞍盗,不同你之前可能見過的WordCount例子(數(shù)據(jù)是有限),這個Demo程序有些不同跳昼,因為它設計為操作無限且沒有邊界的流數(shù)據(jù)般甲。與有界變量類似,它是一個有狀態(tài)的算法跟蹤和修改單詞的總數(shù)鹅颊。然而敷存,由于它必須假設潛在的輸入數(shù)據(jù)無限多,當處理更多數(shù)據(jù)時它會定期輸出它的當前狀態(tài)和結果堪伍,因為他不知道什么時候所有輸入數(shù)據(jù)會處理完成锚烦。
我們將給Kafka主題添加一些數(shù)據(jù),隨后會被Kafka流程序處理帝雇。
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
在Windows上:
> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt
下一步涮俄,我們使用生產(chǎn)者控制臺發(fā)送一些輸入數(shù)據(jù)給主題streams-file-input
(在實踐中,流數(shù)據(jù)會在系統(tǒng)啟動時持續(xù)不斷的流向Kafka系統(tǒng))尸闸。
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
現(xiàn)在可以運行WordCount示例程序處理輸入數(shù)據(jù):
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
不會有任何標準輸出彻亲,日志項作為結果持續(xù)寫到另一個名為streams-wordcount-output
Kafka主題中。示例會運行幾秒鐘后自動停止而不像以便的流處理程序室叉。
我們現(xiàn)在通過讀取它的輸出主題檢查WordCount 示例輸出:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
以下輸出數(shù)據(jù)將會打印在控制臺上:
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
第一列是Kafka消息鍵睹栖,第二列是消息的值,都是java.lang.String
格式茧痕。注意輸出實際上是一個持續(xù)更新的流野来,每個數(shù)據(jù)記錄(例如上面的每行)是一個單詞的更新總數(shù),又或者是如kafka這樣的鍵踪旷。對一個鍵有多條記錄曼氛,每個后面的記錄都會更新前面的。
現(xiàn)在你可以寫更多的消息發(fā)送到streams-file-input
主題中令野,并在streams-wordcount-output
主題中觀察添加的消息舀患,查看更新的單詞總數(shù)