kafka入門

Apache Kafka 入門

1.kafka簡(jiǎn)介和產(chǎn)生的背景

什么是 Kafka

Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng)断部,具有高性能淋昭、高吞吐量慎式、系統(tǒng)快速, 可擴(kuò)展并且可持久化. 它的分區(qū)特性, 可復(fù)制和可容錯(cuò)都是其不錯(cuò)的特性而被廣泛應(yīng)用與大數(shù)據(jù)傳輸場(chǎng)景。它是由 LinkedIn 公司開發(fā)澈蟆,使用 Scala 語言編寫墨辛,之后成為 Apache 基金會(huì)的一個(gè)頂級(jí)項(xiàng)目。 kafka 提供了類似 JMS 的特性趴俘,但是在設(shè)計(jì)和實(shí)現(xiàn)上是完全不同的睹簇,而且他也不是 JMS 規(guī)范的實(shí)現(xiàn)。

kafka 產(chǎn)生的背景

kafka 作為一個(gè)消息系統(tǒng)寥闪,早起設(shè)計(jì)的目的是用作 LinkedIn 的活動(dòng)流(ActivityStream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)太惠。活動(dòng)流數(shù)據(jù)是所有的網(wǎng)站對(duì)用戶的使用情況做分析的時(shí)候要用到的最常規(guī)的部分,活動(dòng)數(shù)據(jù)包括頁面的訪問量(PV)疲憋、被查看內(nèi)容方面的信息以及搜索內(nèi)容凿渊。這種數(shù)據(jù)通常的處理方式是先
把各種活動(dòng)以日志的形式寫入某種文件,然后周期性的對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。運(yùn)營數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU埃脏、 IO 使用率搪锣、請(qǐng)求時(shí)間、服務(wù)日志等) 彩掐。

Kafka 的應(yīng)用場(chǎng)景

由于 kafka 具有更好的吞吐量构舟、內(nèi)置分區(qū)、冗余及容錯(cuò)性的優(yōu)點(diǎn)(kafka 每秒可以處理幾十萬消息)堵幽,讓 kafka 成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案狗超。所以在企業(yè)級(jí)應(yīng)用長(zhǎng),主要會(huì)應(yīng)用于如下幾個(gè)方面

  • 行為跟蹤: kafka 可以用于跟蹤用戶瀏覽頁面谐檀、搜索及其他行為抡谐。通過發(fā)布-訂閱模式實(shí)時(shí)記錄到對(duì)應(yīng)的 topic 中,通過后端大數(shù)據(jù)平臺(tái)接入處理分析桐猬,并做更進(jìn)一步的實(shí)時(shí)處理和監(jiān)控

  • 日志收集:日志收集方面麦撵,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume溃肪,很多公司使用kafka 代理日志聚合免胃。日志聚合表示從服務(wù)器上收集日志文件,然后放到一個(gè)集中的平臺(tái)(文件服務(wù)器)進(jìn)行處理惫撰。在實(shí)際應(yīng)用開發(fā)中羔沙,我們應(yīng)用程序的 log 都會(huì)輸出到本地的磁盤上,排查問題的話通過 linux 命令來搞定厨钻,如果應(yīng)用程序組成了負(fù)載均衡集群扼雏,并且集群的機(jī)器有幾十臺(tái)以上,那么想通過日志快速定位到問題夯膀,就是很麻煩的事情了诗充。所以一般都會(huì)做一個(gè)日志統(tǒng)一收集平臺(tái)管理 log 日志用來快速查詢重要應(yīng)用的問題。所以很多公司的套路都是把應(yīng)用日志幾種到 kafka 上诱建,然后分別導(dǎo)入到 es 和 hdfs 上蝴蜓,用來做實(shí)時(shí)檢索分析和離線統(tǒng)計(jì)數(shù)據(jù)備份等。而另一方面俺猿, kafka 本身又提供了很好的 api 來集成日志并且做日志收集

2.kafka的本身架構(gòu)

一個(gè)典型的 kafka 集群包含若干 Producer(可以是應(yīng)用節(jié)點(diǎn)產(chǎn)生的消息茎匠,也可以是通過Flume 收集日志產(chǎn)生的事件),若干個(gè) Broker(kafka 支持水平擴(kuò)展)押袍、若干個(gè) ConsumerGroup诵冒,以及一個(gè) zookeeper 集群。 kafka 通過 zookeeper 管理集群配置及服務(wù)協(xié)同谊惭。Producer 使用 push 模式將消息發(fā)布到 broker造烁, consumer 通過監(jiān)聽使用 pull 模式從broker 訂閱并消費(fèi)消息否过。多個(gè) broker 協(xié)同工作, producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中惭蟋。三者通過zookeeper 管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)药磺。圖上有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn)告组, producer 發(fā)送消息到 broker
的過程是 push,而 consumer 從 broker 消費(fèi)消息的過程是 pull癌佩,主動(dòng)去拉數(shù)
據(jù)木缝。而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer

2.1kafka術(shù)語介紹

(1)Topics(主題)
屬于特定類別的消息流稱為主題。 數(shù)據(jù)存儲(chǔ)在主題中围辙。Topic相當(dāng)于Queue我碟。
主題被拆分成分區(qū)。 每個(gè)這樣的分區(qū)包含不可變有序序列的消息姚建。 分區(qū)被實(shí)現(xiàn)為具有相等大小的一組分段文件矫俺。
(2)Partition(分區(qū))

kafka_Partition.jpg

  • 一個(gè)Topic可以分成多個(gè)Partition,這是為了平行化處理掸冤。
  • 每個(gè)Partition內(nèi)部消息有序厘托,其中每個(gè)消息都有一個(gè)offset序號(hào)。
  • 一個(gè)Partition只對(duì)應(yīng)一個(gè)Broker稿湿,一個(gè)Broker可以管理多個(gè)Partition铅匹。

(3)Partition offset(分區(qū)偏移)
每個(gè)分區(qū)消息具有稱為 offset 的唯一序列標(biāo)識(shí)。
(4)Replicas of partition(分區(qū)備份)
副本只是一個(gè)分區(qū)的備份饺藤。 副本從不讀取或?qū)懭霐?shù)據(jù)包斑。 它們用于防止數(shù)據(jù)丟失。
(5)Brokers(經(jīng)紀(jì)人)

  • 代理是負(fù)責(zé)維護(hù)發(fā)布數(shù)據(jù)的簡(jiǎn)單系統(tǒng)涕俗。 每個(gè)代理可以每個(gè)主題具有零個(gè)或多個(gè)分區(qū)罗丰。 假設(shè),如果在一個(gè)主題和N個(gè)代理中有N個(gè)分區(qū)咽袜,每個(gè)代理將有一個(gè)分區(qū)丸卷。
  • 假設(shè)在一個(gè)主題中有N個(gè)分區(qū)并且多于N個(gè)代理(n + m),則第一個(gè)N代理將具有一個(gè)分區(qū)询刹,并且下一個(gè)M代理將不具有用于該特定主題的任何分區(qū)谜嫉。
  • 假設(shè)在一個(gè)主題中有N個(gè)分區(qū)并且小于N個(gè)代理(n-m),每個(gè)代理將在它們之間具有一個(gè)或多個(gè)分區(qū)共享凹联。 由于代理之間的負(fù)載分布不相等沐兰,不推薦使用此方案。

(6)Kafka Cluster(Kafka集群)
Kafka有多個(gè)代理被稱為Kafka集群蔽挠。 可以擴(kuò)展Kafka集群住闯,無需停機(jī)瓜浸。 這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制。
(7)Producers(生產(chǎn)者)
生產(chǎn)者是發(fā)送給一個(gè)或多個(gè)Kafka主題的消息的發(fā)布者比原。 生產(chǎn)者向Kafka經(jīng)紀(jì)人發(fā)送數(shù)據(jù)插佛。 每當(dāng)生產(chǎn)者將消息發(fā)布給代理時(shí),代理只需將消息附加到最后一個(gè)段文件量窘。實(shí)際上雇寇,該消息將被附加到分區(qū)。 生產(chǎn)者還可以向他們選擇的分區(qū)發(fā)送消息蚌铜。
(8)Consumers(消費(fèi)者)
Consumers從經(jīng)紀(jì)人處讀取數(shù)據(jù)锨侯。 消費(fèi)者訂閱一個(gè)或多個(gè)主題,并通過從代理中提取數(shù)據(jù)來使用已發(fā)布的消息冬殃。

  • Consumer自己維護(hù)消費(fèi)到哪個(gè)offet
  • 每個(gè)Consumer都有對(duì)應(yīng)的group
  • group內(nèi)是queue消費(fèi)模型:各個(gè)Consumer消費(fèi)不同的partition囚痴,因此一個(gè)消息在group內(nèi)只消費(fèi)一次
  • group間是publish-subscribe消費(fèi)模型:各個(gè)group各自獨(dú)立消費(fèi),互不影響审葬,因此一個(gè)消息被每個(gè)group消費(fèi)一次深滚。

2.2 kafka架構(gòu)圖

kafka架構(gòu).png

3.kafka安裝部署和基本操作

3.1 搭建

3.1.1 hosts 映射(可選, 建議)

echo "192.168.1.10 kfk1" >> /etc/hosts
echo "192.168.1.11 kfk2" >> /etc/hosts
echo "192.168.1.12 kfk3" >> /etc/hosts

3.1.2 搭建 Zookeeper (單機(jī) or 集群)

  1. 需要先啟動(dòng) zookeeper,如果沒有搭建 zookeeper 環(huán)境耳璧,可以直接運(yùn)行
    kafka 內(nèi)嵌的 zookeeper
    啟動(dòng)命令: bin/zookeeper-server-start.sh config/zookeeper.properties &

Broker, Producer, Consumer 的運(yùn)行都需要 ZooKeeper

自己搭建見 [3.2.2.1 單機(jī)模式](#3.2.2.1 單機(jī)模式) or 見 [3.2.2.3 集群模式

3.1.3 Broker 的配置

tar xzf kafka_2.11-0.9.0.1.tgz -C /usr/local/
cd /usr/local/kafka_2.11-0.9.0.1

config 文件夾下是各個(gè)組件的配置文件, server.properties 是 Broker 的配置文件, 需要修改的有

######################### Server Basics #########################
broker.id=0                    # 本 Broker 的 id, 只要非負(fù)數(shù)且各 Broker 的 id 不同即可, 一般依次加 1

##################### Socket Server Settings #####################
listeners=PLAINTEXT://:9092    # Broker 監(jiān)聽的端口, Producer, Consumer 會(huì)連接這個(gè)端口
port=9092                      # 同上
host.name=kfk1                 # 本 Broker 的 hostname

########################## Topic Basics ##########################
delete.topic.enable=true       # 配置為可以使用 delete topic 命令

########################### Log Basics ###########################
log.dirs=/var/lib/kafka-logs   # log 目錄, 此目錄要存在且有足夠權(quán)限

###################### Log Retention Policy ######################
log.roll.hours=2               # 開始一個(gè)新的 log 文件片段的最大時(shí)間
log.retention.hours=24         # 控制一個(gè) log 文件保留多長(zhǎng)個(gè)小時(shí)
log.retention.bytes=1073741824 # 所有 log 文件的最大大小
log.segment.bytes=104857600    # 單一的 log 文件最大大小
log.cleanup.policy=delete      # log 清除策略
log.retention.check.interval.ms=60000

############################ Zookeeper ############################
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181 # Zookeeper 的連接信息

注意: broker.id 和 host.name 在每臺(tái)機(jī)器上是不一樣的, 要按實(shí)際填寫

即在 kfk2, kfk3 上

broker.id=1
host.name=kfk2
broker.id=2
host.name=kfk3

PS: 在 kafka 安裝目錄下的 ./site-docs 目錄下有 kafka_config.html, producer_config.html, consumer_config.html 三個(gè)文件, 分別講解 broker, producer, consumer 配置參數(shù)含義. 附錄 A 是其翻譯.

3.1.4 開放端口

把 Kafka 用到的端口開放出來

firewall-cmd --zone=public --add-port=9100/tcp --permanent     # 永久開啟 9100 端口(kafka manager)
firewall-cmd --zone=public --add-port=9092/tcp --permanent     # 永久開啟 9092 端口(brokers)
firewall-cmd --zone=public --add-port=9999/tcp --permanent     # 永久開啟 9999 端口(JMX)
firewall-cmd --reload                                          # 重新加載防火墻規(guī)則

3.1.5 Broker 運(yùn)行與終止

Broker 運(yùn)行與終止命令如下.

# 運(yùn)行: 如下有兩種方式
# 以守護(hù)進(jìn)程的方式啟動(dòng)(推薦)
bin/kafka-server-start.sh -daemon config/server.properties
# 將 Broker 放到后臺(tái)執(zhí)行, 且不受終端關(guān)閉的影響, 標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出定向到 `./logs/kafka-server-boot.log`, 有問題時(shí)可以去看這個(gè)文件
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &

# 終止
bin/kafka-server-stop.sh config/server.properties

3.1.6 測(cè)試

我們使用 Kafka 自帶的基于 終端 的 Producer 和 Consumer 腳本做測(cè)試.

先只啟動(dòng)一臺(tái)機(jī)器上的 Broker. 在 kfk1 上運(yùn)行

nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &
1. 創(chuàng)建 Topic

創(chuàng)建一個(gè) 名為"TestCase"的 單分區(qū) 單副本 的 Topic.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic TestCase --replication-factor 1 --partitions 1

查看有哪些 Topic:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
TestCase

運(yùn)行describe topics命令, 可以知道 Topic 的具體分配:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

解釋一下輸出的內(nèi)容. 第一行給出了所有 partition 的一個(gè)摘要, 每行給出一個(gè) partition 的信息. 因?yàn)槲覀冞@個(gè) topic 只有一個(gè) partition 所以只有一行信息.

  • "leader" 負(fù)責(zé)所有 partition 的讀和寫請(qǐng)求的響應(yīng). "leader" 是隨機(jī)選定的.
  • "replicas" 是備份節(jié)點(diǎn)列表, 包含所有復(fù)制了此 partition log 的節(jié)點(diǎn), 不管這個(gè)節(jié)點(diǎn)是否為 leader 也不管這個(gè)節(jié)點(diǎn)當(dāng)前是否存活, 只是顯示.
  • "isr" 是當(dāng)前處于同步狀態(tài)的備份節(jié)點(diǎn)列表. 即 "replicas" 列表中處于存活狀態(tài)并且與 leader 一致的節(jié)點(diǎn).

可以發(fā)現(xiàn)這個(gè) Topic 沒有副本而且它在 [我們創(chuàng)建它時(shí)集群僅有的一個(gè)節(jié)點(diǎn)] Broker 0 上.

另外, 除去手工創(chuàng)建 Topic 以外, 你也可以將你的 Brokers 配置成當(dāng)消息發(fā)布到一個(gè)不存在的 Topic 時(shí)自動(dòng)創(chuàng)建此 Topic.

2. 啟動(dòng) 生產(chǎn)者

Kafka 附帶一個(gè) 終端生產(chǎn)者 可以從文件或者標(biāo)準(zhǔn)輸入中讀取輸入然后發(fā)送這個(gè)消息到 Kafka 集群. 默認(rèn)情況下每行信息被當(dāng)做一個(gè)消息發(fā)送.

運(yùn)行生產(chǎn)者腳本然后在終端中輸入一些消息, 即可發(fā)送到 Broker.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestCase
This is a message
This is another message

PS: 通過鍵入 Ctrl-C 來終止終端生產(chǎn)者.

3. 啟動(dòng) 消費(fèi)者

Kafka 也附帶了一個(gè) 終端生產(chǎn)者 可以導(dǎo)出這些消息到標(biāo)準(zhǔn)輸出.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TestCase --from-beginning
This is a message
This is another message

--from-beginning 參數(shù)使得可以接收到 topic 的所有消息, 包括 consumer 啟動(dòng)前的. 去掉后則為僅接收 consumer 啟動(dòng)后 kafka 收到的消息.

如果你在不同的終端運(yùn)行生產(chǎn)者和消費(fèi)者這兩個(gè)命令, 那么現(xiàn)在你就應(yīng)該能再生產(chǎn)者的終端中鍵入消息同時(shí)在消費(fèi)者的終端中看到.

所有的命令行工具都有很多可選的參數(shù); 不添加參數(shù)直接執(zhí)行這些命令將會(huì)顯示它們的使用方法, 更多內(nèi)容可以參考他們的手冊(cè).

PS: 通過鍵入 Ctrl-C 來終止終端消費(fèi)者.

4. 配置一個(gè)多節(jié)點(diǎn)集群

我們已經(jīng)成功的以單 Broker 的模式運(yùn)行起來了, 但這并沒有意思. 對(duì)于 Kafka 來說, 一個(gè)單獨(dú)的 Broker 就是一個(gè)大小為 1 的集群, 所以集群模式就是多啟動(dòng)幾個(gè) Broker 實(shí)例.

我們將我們的集群擴(kuò)展到3個(gè)節(jié)點(diǎn). 在另外兩臺(tái)機(jī)器 kfk2, kfk3 上運(yùn)行

nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &

現(xiàn)在我們可以創(chuàng)建一個(gè)新的 Topic 并制定副本數(shù)量為 3:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-replicated-topic --replication-factor 3 --partitions 1

運(yùn)行describe topics命令, 可以知道每個(gè) Broker 具體的工作:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

注意本例中 Broker 1 是這個(gè)有一個(gè) partition 的 topic 的 leader.

現(xiàn)在我們發(fā)布幾個(gè)消息到我們的新 topic 上:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
my test message 1
my test message 2

現(xiàn)在讓我們消費(fèi)這幾個(gè)消息:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning
my test message 1
my test message 2

現(xiàn)在讓我們測(cè)試一下集群容錯(cuò). Broker 1 正在作為 leader, 所以我們殺掉它:

$ ps | grep server.properties
...
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
...

$ kill -9 7564

或在 kfk1 機(jī)器上運(yùn)行

bin/kafka-server-stop.sh config/server.properties

此時(shí), 集群領(lǐng)導(dǎo)已經(jīng)切換到一個(gè)從服務(wù)器上, Broker 1 節(jié)點(diǎn)也不在出現(xiàn)在同步副本列表中了:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0

而且現(xiàn)在消息的消費(fèi)仍然能正常進(jìn)行, 即使原來負(fù)責(zé)寫的節(jié)點(diǎn)已經(jīng)失效了.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2

5. 使用 Kafka Connect 進(jìn)行數(shù)據(jù)導(dǎo)入導(dǎo)出

從終端寫入數(shù)據(jù), 數(shù)據(jù)也寫回終端是默認(rèn)的. 但是你可能希望從一些其它的數(shù)據(jù)源或者導(dǎo)出 Kafka 的數(shù)據(jù)到其它的系統(tǒng). 相比其它系統(tǒng)需要自己編寫集成代碼, 你可以直接使用Kafka的 Connect 直接導(dǎo)入或者導(dǎo)出數(shù)據(jù). Kafka Connect 是 Kafka 自帶的用于數(shù)據(jù)導(dǎo)入和導(dǎo)出的工具. 它是一個(gè)擴(kuò)展的可運(yùn)行連接器(runsconnectors)工具, 可實(shí)現(xiàn)自定義的邏輯來實(shí)現(xiàn)與外部系統(tǒng)的集成交互. 在這個(gè)快速入門中我們將介紹如何通過一個(gè)簡(jiǎn)單的從文本導(dǎo)入數(shù)據(jù), 導(dǎo)出數(shù)據(jù)到文本的連接器來調(diào)用 Kafka Connect. 首先我們從創(chuàng)建一些測(cè)試的基礎(chǔ)數(shù)據(jù)開始:

echo -e "foo\nbar" > test.txt

接下來我們采用standalone模式啟動(dòng)兩個(gè) connectors, 也就是讓它們都運(yùn)行在獨(dú)立的, 本地的, 不同的進(jìn)程中. 我們提供三個(gè)參數(shù)化的配置文件, 第一個(gè)提供共有的配置用于 Kafka Connect 處理, 包含共有的配置比如連接哪個(gè) Kafka broker 和數(shù)據(jù)的序列化格式. 剩下的配置文件制定每個(gè) connector 創(chuàng)建的特定信息. 這些文件包括唯一的 connector 的名字, connector 要實(shí)例化的類和其它的一些 connector 必備的配置.

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

上述簡(jiǎn)單的配置文件已經(jīng)被包含在 Kafka 的發(fā)行包中, 它們將使用默認(rèn)的之前我們啟動(dòng)的本地集群配置創(chuàng)建兩個(gè) connector: 第一個(gè)作為源 connector 從一個(gè)文件中讀取每行數(shù)據(jù)然后將他們發(fā)送 Kafka 的 topic, 第二個(gè)是一個(gè)輸出(sink)connector 從 Kafka 的 topic 讀取消息, 然后將它們輸出成輸出文件的一行行的數(shù)據(jù). 在啟動(dòng)的過程你講看到一些日志消息, 包括一些提示 connector 正在被實(shí)例化的信息. 一旦 Kafka Connect 進(jìn)程啟動(dòng)以后, 源 connector 應(yīng)該開始從 test.txt 中讀取數(shù)據(jù)行, 并將他們發(fā)送到 topic connect-test 上, 然后輸出 connector 將會(huì)開始從 topic 讀取消息然后把它們寫入到 test.sink.txt 中.

我們可以查看輸出文件來驗(yàn)證通過整個(gè)管線投遞的數(shù)據(jù):

$ cat test.sink.txt
foo
bar

注意這些數(shù)據(jù)已經(jīng)被保存到了 Kafka 的 connect-test topic 中, 所以我們還可以運(yùn)行一個(gè)終端消費(fèi)者來看到這些數(shù)據(jù)(或者使用自定義的消費(fèi)者代碼來處理數(shù)據(jù)):

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

connector 在持續(xù)的處理著數(shù)據(jù), 所以我們可以向文件中添加數(shù)據(jù)然后觀察到它在這個(gè)管線中的傳遞:

echo "Another line" >> test.txt

你應(yīng)該可以觀察到新的數(shù)據(jù)行出現(xiàn)在終端消費(fèi)者中和輸出文件中.

3.2 Zookeeper

3.2.1 簡(jiǎn)介

Apache Zookeeper 是 Hadoop 的一個(gè)子項(xiàng)目, 是一個(gè)致力于開發(fā)和管理開源服務(wù)器, 并且能實(shí)現(xiàn)高可靠性的分布式協(xié)調(diào)框架. 它包含一個(gè)簡(jiǎn)單的原語集, 分布式應(yīng)用程序可以基于它實(shí)現(xiàn)同步服務(wù), 配置維護(hù)和命名服務(wù)等.

Zookeeper 保證 2n + 1 臺(tái)機(jī)器的集群最大允許 n 臺(tái)機(jī)器掛掉而事務(wù)不中斷.

3.2.2 搭建

3.2.2.1 單機(jī)模式

此模式主要用于開發(fā)人員本地環(huán)境下測(cè)試代碼

1. 解壓 Zookeeper 并進(jìn)入其根目錄
tar xzf zookeeper-3.4.9.tar.gz -C /usr/local/
cd /usr/local/zookeeper-3.4.9
2. 創(chuàng)建配置文件 conf/zoo.cfg
cp conf/zoo_sample.cfg conf/zoo.cfg
3. 修改內(nèi)容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/logs
clientPort=2181
  • tickTime: 是 zookeeper 的最小時(shí)間單元的長(zhǎng)度(以毫秒為單位), 它被用來設(shè)置心跳檢測(cè)和會(huì)話最小超時(shí)時(shí)間(tickTime 的兩倍)
  • initLimit: 初始化連接時(shí)能容忍的最長(zhǎng) tickTime 個(gè)數(shù)
  • syncLimit: follower 用于同步的最長(zhǎng) tickTime 個(gè)數(shù)
  • dataDir: 服務(wù)器存儲(chǔ) 數(shù)據(jù)快照 的目錄
  • dataLogDir: 服務(wù)器存儲(chǔ) 事務(wù)日志 的目錄
  • clientPort: 用于 client 連接的 server 的端口

其中需要注意的是dataDirdataLogDir, 分別是 zookeeper 運(yùn)行時(shí)的數(shù)據(jù)目錄和日志目錄, 要保證 這兩個(gè)目錄已創(chuàng)建運(yùn)行 zookeeper 的用戶擁有這兩個(gè)目錄的所有權(quán)

4.測(cè)試
  • 啟動(dòng)/關(guān)閉 Zookeeper:
bin/zkServer.sh start
bin/zkServer.sh stop
  • 查看 Zookeeper 狀態(tài):
bin/zkServer.sh status

顯示 mode: standalone, 單機(jī)模式.

  • 使用 java 客戶端連接 ZooKeeper
./bin/zkCli.sh -server 127.0.0.1:2181

然后就可以使用各種命令了, 跟文件操作命令很類似, 輸入 help 可以看到所有命令.

3.2.2.2 集群模式

此模式是 生產(chǎn)環(huán)境中實(shí)際使用的模式

因?yàn)?zookeeper 保證 2n + 1 臺(tái)機(jī)器最大允許 n 臺(tái)機(jī)器掛掉, 所以配置集群模式最好是奇數(shù)臺(tái)機(jī)器: 3, 5, 7...

最少 3 臺(tái)構(gòu)成集群

1 hosts 映射(可選)
echo "192.168.1.1 zoo1" >> /etc/hosts
echo "192.168.1.2 zoo2" >> /etc/hosts
echo "192.168.1.3 zoo3" >> /etc/hosts
2 修改 zookeeper-3.4.9/conf/zoo.cfg 文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
clientPort=2181
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888

與單機(jī)模式的不同就是最后三條: server.X=host:portA:portB

server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888

server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

X 為標(biāo)識(shí)為 X 的機(jī)器, host 為其 hostname 或 IP, portA 用于這臺(tái)機(jī)器與集群中的 Leader 機(jī)器通信, portB 用于 server 選舉 leader.

PS: 要配單機(jī)偽分布式的話, 可以修改這里為

server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

然后每個(gè) zookeeper 實(shí)例的 dataDir 和 dataLogDir 配置為不同的即可

3 myid 文件

在標(biāo)示為 X 的機(jī)器上, 將 X 寫入 ${dataDir}/myid 文件, 如: 在 192.168.1.2 機(jī)器上的 /var/lib/zookeeper/data 目錄下建立文件 myid, 寫入 2

echo "2" > /var/lib/zookeeper/data/myid
4 開放端口

CentOS 7 使用 firewalld 代替了原來的 iptables, 基本使用如下

systemctl start firewalld                                      # 啟動(dòng)防火墻
firewall-cmd --state                                           # 檢查防火墻狀態(tài)
firewall-cmd --zone=public --add-port=2888/tcp --permanent     # 永久開啟 2888 端口
firewall-cmd --reload                                          # 重新加載防火墻規(guī)則
firewall-cmd --list-all                                        # 列出所有防火墻規(guī)則

把 Zookeeper 用到的端口開放出來

firewall-cmd --zone=public --add-port=2181/tcp --permanent     # 永久開啟 2181 端口
firewall-cmd --zone=public --add-port=2888/tcp --permanent     # 永久開啟 2888 端口
firewall-cmd --zone=public --add-port=3888/tcp --permanent     # 永久開啟 3888 端口
firewall-cmd --reload                                          # 重新加載防火墻規(guī)則
5 測(cè)試
  • 集群中所有機(jī)器上 啟動(dòng) zookeeper(盡量同時(shí)):
bin/zkServer.sh start
  • 查看狀態(tài), 應(yīng)該有一臺(tái)機(jī)器顯示mode: leader, 其余為mode: follower
bin/zkServer.sh status
  • 使用 java 客戶端連接 ZooKeeper
./bin/zkCli.sh -server 192.168.1.1:2181

然后就可以使用各種命令了, 跟文件操作命令很類似, 輸入help可以看到所有命令.

  • 關(guān)閉 zookeeper:
./bin/zkServer.sh stop

3.2.3 Zookeeper 常見問題

查看狀態(tài)時(shí), 應(yīng)該有一臺(tái)機(jī)器顯示mode: leader, 其余為mode: follower

bin/zkServer.sh status

當(dāng)顯示Error contacting service. It is probably not running.時(shí), 可以查看日志

cat zookeeper.out

查看 zookeeper.out 日志可以看到是那些機(jī)器連不上, 可能是 網(wǎng)絡(luò), ip, 端口, 配置文件, myid 文件 的問題.
正常應(yīng)該是: 先是一些 java 異常, 這是因?yàn)?ZooKeeper 集群?jiǎn)?dòng)的時(shí)候, 每個(gè)結(jié)點(diǎn)都試圖去連接集群中的其它結(jié)點(diǎn), 先啟動(dòng)的肯定連不上后面還沒啟動(dòng)的, 所以上面日志前面部分的異常是可以忽略的, 當(dāng)集群所有的機(jī)器的 zookeeper 都啟動(dòng)起來, 就沒有異常了, 并選舉出來了 leader.

PS: 因?yàn)?zkServer.sh 腳本中是用 nohup 命令啟動(dòng) zookeeper 的, 所以 zookeeper.out 文件是在調(diào)用 zkServer.sh 時(shí)的路徑下, 如:用 bin/zkServer.sh start 啟動(dòng)則 zookeeper.out 文件在 zookeeper-3.4.9/ 下; 用 zkServer.sh start 啟動(dòng)則 zookeeper.out 文件在 zookeeper-3.4.9/bin/ 下.

4.kafka的應(yīng)用

未完待續(xù)3审铩!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末旨枯,一起剝皮案震驚了整個(gè)濱河市蹬昌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件域滥,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡明刷,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門满粗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辈末,“玉大人,你說我怎么就攤上這事映皆〖菲福” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵捅彻,是天一觀的道長(zhǎng)组去。 經(jīng)常有香客問我,道長(zhǎng)步淹,這世上最難降的妖魔是什么从隆? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任诚撵,我火速辦了婚禮,結(jié)果婚禮上键闺,老公的妹妹穿的比我還像新娘寿烟。我一直安慰自己,他們只是感情好艾杏,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布韧衣。 她就那樣靜靜地躺著,像睡著了一般购桑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上氏淑,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天勃蜘,我揣著相機(jī)與錄音,去河邊找鬼假残。 笑死缭贡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的辉懒。 我是一名探鬼主播阳惹,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼眶俩!你這毒婦竟也來了莹汤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤颠印,失蹤者是張志新(化名)和其女友劉穎纲岭,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體线罕,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡止潮,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了钞楼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片喇闸。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖询件,靈堂內(nèi)的尸體忽然破棺而出燃乍,到底是詐尸還是另有隱情,我是刑警寧澤雳殊,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布橘沥,位于F島的核電站,受9級(jí)特大地震影響夯秃,放射性物質(zhì)發(fā)生泄漏座咆。R本人自食惡果不足惜痢艺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望介陶。 院中可真熱鬧堤舒,春花似錦、人聲如沸哺呜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽某残。三九已至国撵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間玻墅,已是汗流浹背介牙。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留澳厢,地道東北人环础。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像剩拢,于是被迫代替她去往敵國和親线得。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評(píng)論 13 425
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,827評(píng)論 4 54
  • 一徐伐、基本概念 介紹 Kafka是一個(gè)分布式的贯钩、可分區(qū)的、可復(fù)制的消息系統(tǒng)呵晨。它提供了普通消息系統(tǒng)的功能魏保,但具有自己獨(dú)...
    ITsupuerlady閱讀 1,631評(píng)論 0 9
  • Kafka官網(wǎng):http://kafka.apache.org/入門1.1 介紹Kafka? 是一個(gè)分布式流處理系...
    it_zzy閱讀 3,894評(píng)論 3 53
  • 心情低落到極點(diǎn),遇到難過的事只想逃避摸屠,躺在床上大睡一覺谓罗,什么都不愿意去想,可是會(huì)荒廢了今天一天的光陰季二,想想還是應(yīng)該...
    高知女性圈閱讀 336評(píng)論 2 5