kafka快速開始教程

此教程假設你剛剛開始沒有任何 Kafka 或 ZooKeeper 數(shù)據(jù)肺孤。Kafka的控制臺腳本在類Unix和Windows平臺不同,Windows平臺使用bin\windows\\代替bin/,腳本的擴展名改為.bat

第一步:下載代碼

下載0.10.1.0發(fā)行版并解壓啄育。

> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0

第二步:啟動服務

Kafka使用Zookeeper,所以如果你沒有的話需要首先啟動Zookeeper服務。你可以使用kafka自帶的腳本啟動一個簡單的單一節(jié)點Zookeeper實例置蜀。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

現(xiàn)在啟動Kafka服務:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

第三步:創(chuàng)建一個主題

讓我們來創(chuàng)建一個名為test的topic奈搜,只使用單個分區(qū)和一個復本。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我們現(xiàn)在可以運行l(wèi)ist topic命令看到我們的主題盯荤。

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

另外馋吗,當沒有主題存在的時候,你也可以通過配置代理自動創(chuàng)建主題而不是手動創(chuàng)建秋秤。

第四步:發(fā)送消息

Kafka有自帶的命令行客戶端會從文件或者標準輸入接受數(shù)據(jù)當作消息發(fā)送到Kafka集群宏粤。默認情況下脚翘,每行作為一個獨立的消息發(fā)送。

運行生產(chǎn)者控制臺并且打幾行消息到控制臺發(fā)送到服務器绍哎。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

第5步:啟動一個消費者

Kafka還有個消費者控制臺来农,會把消息輸出到標準輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你上面的命令是在不同的終端運行崇堰,那么你可以在生產(chǎn)者終端輸入消息然后在消費者終端看到沃于。

所有的命令行工具都有一些額外的參數(shù):如果沒有使用參數(shù)運行命令,將會顯示它們的詳細用法海诲。

第六步:設置多個代理集群

目前為止繁莹,我們已經(jīng)在單個代理上運行了,但這不好玩特幔。對于Kafka咨演,單個代理只是大小為1的集群。所以沒什么改變除了多啟動幾個代理實例蚯斯。只是為了感受一下薄风,我們把集群擴展到3個節(jié)點(仍然在我們的本地機器上)。

首先溉跃,我們?yōu)槊總€代理新建一個配置文件(在windows上使用copy命令):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現(xiàn)在編輯新文件設置一下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id屬性是唯一的村刨,在集群的每個節(jié)點永久不變。因為我們在單臺機器上運行代理撰茎,必須重寫端口和日志目錄嵌牺。

我們已經(jīng)有了Zookeeper并運行了一個節(jié)點,所以只需要啟動下面的兩個新節(jié)點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
現(xiàn)在創(chuàng)建一個含有三個副本的主題:
```sh
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好龄糊,現(xiàn)在我們有了一個集群但是怎么知道哪個代理正在做什么逆粹?使用describe topice命令查看:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

這是輸出解釋。第一行給出了各個分區(qū)的概況炫惩,額外的每行都給出了一個分區(qū)的信息僻弹。由于我們只有一個主題的分區(qū),所以只有一行他嚷。

  • leader是負責當前分區(qū)的所有讀寫請求蹋绽。每個節(jié)點都將領導一個隨機選擇的分區(qū)。
  • replicas 是節(jié)點列表筋蓖,復制分區(qū)日志卸耘,不管他們是不是leader或者即使它們還活著。
  • isrin-sync的集合粘咖。這是replicas列表當前還活著的子集蚣抗。

注意在我們的示例中節(jié)點1是唯一的主題分區(qū)領導者。

我們運行同樣的命令查看我們已經(jīng)創(chuàng)建的原始主題在哪里:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

所以沒什么驚奇的瓮下,原始的主題沒有副本在節(jié)點0上翰铡,當我們創(chuàng)建它時唯一存在的節(jié)點服務器钝域。

讓我們發(fā)布一些消息到新的主題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在測試容錯性。節(jié)點1是領導者锭魔,我們kill它例证。

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

領導關系已經(jīng)改為了從節(jié)點中的一個,節(jié)點1也不再in-sync復本集中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是我們依然可以消費消息即使之前接受的領導者已經(jīng)掛掉:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

第七部:使用Kafka Connect導入導出數(shù)據(jù)

在控制臺輸入輸出數(shù)據(jù)是很方便赂毯,但是你可能使用來自其他數(shù)據(jù)源的數(shù)據(jù)或者把Kafka的數(shù)據(jù)導出到其他的系統(tǒng)中战虏。對于很多系統(tǒng),你可以直接使用Kafka Connect導入導出數(shù)據(jù)而不需要手寫自定義的集成代碼党涕。

Kafka Connect是Kafka自帶的導入導出工具烦感。它是運行連接器的可擴展工具,實現(xiàn)了集成外部系統(tǒng)的自定義邏輯膛堤。在快速教程里手趣,我們會看到如何使用Kafka Connect的簡單連接器從文件導入數(shù)據(jù)到Kafka主題,再從kafka主題導出數(shù)據(jù)到文件肥荔。
首先绿渣,我們先創(chuàng)建一些測試數(shù)據(jù):

echo -e "foo\nbar" >test.txt

然后我們在Standalone模式啟動兩個連接器,Standalone模式表示他們運行在一個本地進程中燕耿。我們提供了三個配置文件作為參數(shù)中符,第一個配置Kafka Connect進程,包含通用的配置如Kafka 代理連接和數(shù)據(jù)序列話工具誉帅。剩余的文件每個都指定一個連接器淀散。這些文件包含一個唯一連接器名,實例化的連接器類蚜锨,和一些其他的連接器配置档插。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

這些示例配置文件,包含在Kafka中亚再,使用默認的本地集群配置并創(chuàng)建了兩個連接器:第一個是源連接器從文件讀取數(shù)據(jù)行每行都生成消息發(fā)送到kafka主題郭膛,第二個是目標連接器從Kafka主題讀取消息生成行輸出到文件中。

在啟動的時候你會看到大量的日志信息氛悬,包含一些表示連接器初始化的则剃。一旦Kafka Connect進程啟動,源連接器開始從test.txt讀取行并發(fā)送到connect-test主題如捅,sink連接器開始從connect-test主題讀取消息把他們寫到test.sink.txt文件棍现。我們可以檢查輸出文件看到數(shù)據(jù)已經(jīng)通過管道傳遞完畢:

> cat test.sink.txt
foo
bar

注意數(shù)據(jù)存儲在Kafka的主題connect-test中,我們可以運行消費者控制臺查看主題數(shù)據(jù)(或者消費者代碼處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

連接器持續(xù)生成數(shù)據(jù)伪朽,所以我們可以給測試文件添加數(shù)據(jù)轴咱,看它通過管道汛蝙。

> echo "Another line" >> test.txt

你應該看到這行出現(xiàn)在消費者控制臺和目標文件中烈涮。

第八步:用Kafka Streams處理數(shù)據(jù)

Kafka Streams 是一個客戶端庫朴肺,為了實時流計算和分析Kafka集群中的存儲數(shù)據(jù)。此快速教程示例將會描述如何運行一個用這個庫編寫的流程序坚洽。這是WordCountDemo的主要示例代碼(改成Java8 lambda表達式為了容易閱讀)戈稿。

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

它實現(xiàn)了WordCount算法,計算輸入文本單詞直方圖讶舰。然而鞍盗,不同你之前可能見過的WordCount例子(數(shù)據(jù)是有限),這個Demo程序有些不同跳昼,因為它設計為操作無限且沒有邊界的流數(shù)據(jù)般甲。與有界變量類似,它是一個有狀態(tài)的算法跟蹤和修改單詞的總數(shù)鹅颊。然而敷存,由于它必須假設潛在的輸入數(shù)據(jù)無限多,當處理更多數(shù)據(jù)時它會定期輸出它的當前狀態(tài)和結果堪伍,因為他不知道什么時候所有輸入數(shù)據(jù)會處理完成锚烦。

我們將給Kafka主題添加一些數(shù)據(jù),隨后會被Kafka流程序處理帝雇。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

下一步涮俄,我們使用生產(chǎn)者控制臺發(fā)送一些輸入數(shù)據(jù)給主題streams-file-input(在實踐中,流數(shù)據(jù)會在系統(tǒng)啟動時持續(xù)不斷的流向Kafka系統(tǒng))尸闸。

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

現(xiàn)在可以運行WordCount示例程序處理輸入數(shù)據(jù):

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

不會有任何標準輸出彻亲,日志項作為結果持續(xù)寫到另一個名為streams-wordcount-outputKafka主題中。示例會運行幾秒鐘后自動停止而不像以便的流處理程序室叉。

我們現(xiàn)在通過讀取它的輸出主題檢查WordCount 示例輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

以下輸出數(shù)據(jù)將會打印在控制臺上:

all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

第一列是Kafka消息鍵睹栖,第二列是消息的值,都是java.lang.String格式茧痕。注意輸出實際上是一個持續(xù)更新的流野来,每個數(shù)據(jù)記錄(例如上面的每行)是一個單詞的更新總數(shù),又或者是如kafka這樣的鍵踪旷。對一個鍵有多條記錄曼氛,每個后面的記錄都會更新前面的。

現(xiàn)在你可以寫更多的消息發(fā)送到streams-file-input主題中令野,并在streams-wordcount-output主題中觀察添加的消息舀患,查看更新的單詞總數(shù)

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市气破,隨后出現(xiàn)的幾起案子聊浅,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件低匙,死亡現(xiàn)場離奇詭異旷痕,居然都是意外死亡,警方通過查閱死者的電腦和手機顽冶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門欺抗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人强重,你說我怎么就攤上這事绞呈。” “怎么了间景?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵佃声,是天一觀的道長。 經(jīng)常有香客問我倘要,道長秉溉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任碗誉,我火速辦了婚禮召嘶,結果婚禮上,老公的妹妹穿的比我還像新娘哮缺。我一直安慰自己弄跌,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布尝苇。 她就那樣靜靜地躺著铛只,像睡著了一般。 火紅的嫁衣襯著肌膚如雪糠溜。 梳的紋絲不亂的頭發(fā)上淳玩,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機與錄音非竿,去河邊找鬼蜕着。 笑死,一個胖子當著我的面吹牛红柱,可吹牛的內(nèi)容都是我干的承匣。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼锤悄,長吁一口氣:“原來是場噩夢啊……” “哼韧骗!你這毒婦竟也來了?” 一聲冷哼從身側響起零聚,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤袍暴,失蹤者是張志新(化名)和其女友劉穎些侍,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體政模,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡娩梨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了览徒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡颂龙,死狀恐怖习蓬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情措嵌,我是刑警寧澤躲叼,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站企巢,受9級特大地震影響枫慷,放射性物質發(fā)生泄漏。R本人自食惡果不足惜浪规,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一或听、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧笋婿,春花似錦誉裆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至庇配,卻和暖如春斩跌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背捞慌。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工耀鸦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啸澡。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓揭糕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親锻霎。 傳聞我的和親對象是個殘疾皇子著角,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容