Apache Kafka 入門
1.kafka簡(jiǎn)介和產(chǎn)生的背景
什么是 Kafka
Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng)断部,具有高性能淋昭、高吞吐量慎式、系統(tǒng)快速, 可擴(kuò)展并且可持久化. 它的分區(qū)特性, 可復(fù)制和可容錯(cuò)都是其不錯(cuò)的特性而被廣泛應(yīng)用與大數(shù)據(jù)傳輸場(chǎng)景。它是由 LinkedIn 公司開發(fā)澈蟆,使用 Scala 語言編寫墨辛,之后成為 Apache 基金會(huì)的一個(gè)頂級(jí)項(xiàng)目。 kafka 提供了類似 JMS 的特性趴俘,但是在設(shè)計(jì)和實(shí)現(xiàn)上是完全不同的睹簇,而且他也不是 JMS 規(guī)范的實(shí)現(xiàn)。
kafka 產(chǎn)生的背景
kafka 作為一個(gè)消息系統(tǒng)寥闪,早起設(shè)計(jì)的目的是用作 LinkedIn 的活動(dòng)流(ActivityStream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)太惠。活動(dòng)流數(shù)據(jù)是所有的網(wǎng)站對(duì)用戶的使用情況做分析的時(shí)候要用到的最常規(guī)的部分,活動(dòng)數(shù)據(jù)包括頁面的訪問量(PV)疲憋、被查看內(nèi)容方面的信息以及搜索內(nèi)容凿渊。這種數(shù)據(jù)通常的處理方式是先
把各種活動(dòng)以日志的形式寫入某種文件,然后周期性的對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。運(yùn)營數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU埃脏、 IO 使用率搪锣、請(qǐng)求時(shí)間、服務(wù)日志等) 彩掐。
Kafka 的應(yīng)用場(chǎng)景
由于 kafka 具有更好的吞吐量构舟、內(nèi)置分區(qū)、冗余及容錯(cuò)性的優(yōu)點(diǎn)(kafka 每秒可以處理幾十萬消息)堵幽,讓 kafka 成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案狗超。所以在企業(yè)級(jí)應(yīng)用長(zhǎng),主要會(huì)應(yīng)用于如下幾個(gè)方面
行為跟蹤: kafka 可以用于跟蹤用戶瀏覽頁面谐檀、搜索及其他行為抡谐。通過發(fā)布-訂閱模式實(shí)時(shí)記錄到對(duì)應(yīng)的 topic 中,通過后端大數(shù)據(jù)平臺(tái)接入處理分析桐猬,并做更進(jìn)一步的實(shí)時(shí)處理和監(jiān)控
日志收集:日志收集方面麦撵,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume溃肪,很多公司使用kafka 代理日志聚合免胃。日志聚合表示從服務(wù)器上收集日志文件,然后放到一個(gè)集中的平臺(tái)(文件服務(wù)器)進(jìn)行處理惫撰。在實(shí)際應(yīng)用開發(fā)中羔沙,我們應(yīng)用程序的 log 都會(huì)輸出到本地的磁盤上,排查問題的話通過 linux 命令來搞定厨钻,如果應(yīng)用程序組成了負(fù)載均衡集群扼雏,并且集群的機(jī)器有幾十臺(tái)以上,那么想通過日志快速定位到問題夯膀,就是很麻煩的事情了诗充。所以一般都會(huì)做一個(gè)日志統(tǒng)一收集平臺(tái)管理 log 日志用來快速查詢重要應(yīng)用的問題。所以很多公司的套路都是把應(yīng)用日志幾種到 kafka 上诱建,然后分別導(dǎo)入到 es 和 hdfs 上蝴蜓,用來做實(shí)時(shí)檢索分析和離線統(tǒng)計(jì)數(shù)據(jù)備份等。而另一方面俺猿, kafka 本身又提供了很好的 api 來集成日志并且做日志收集
2.kafka的本身架構(gòu)
一個(gè)典型的 kafka 集群包含若干 Producer(可以是應(yīng)用節(jié)點(diǎn)產(chǎn)生的消息茎匠,也可以是通過Flume 收集日志產(chǎn)生的事件),若干個(gè) Broker(kafka 支持水平擴(kuò)展)押袍、若干個(gè) ConsumerGroup诵冒,以及一個(gè) zookeeper 集群。 kafka 通過 zookeeper 管理集群配置及服務(wù)協(xié)同谊惭。Producer 使用 push 模式將消息發(fā)布到 broker造烁, consumer 通過監(jiān)聽使用 pull 模式從broker 訂閱并消費(fèi)消息否过。多個(gè) broker 協(xié)同工作, producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中惭蟋。三者通過zookeeper 管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)药磺。圖上有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn)告组, producer 發(fā)送消息到 broker
的過程是 push,而 consumer 從 broker 消費(fèi)消息的過程是 pull癌佩,主動(dòng)去拉數(shù)
據(jù)木缝。而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer
2.1kafka術(shù)語介紹
(1)Topics(主題)
屬于特定類別的消息流稱為主題。 數(shù)據(jù)存儲(chǔ)在主題中围辙。Topic相當(dāng)于Queue我碟。
主題被拆分成分區(qū)。 每個(gè)這樣的分區(qū)包含不可變有序序列的消息姚建。 分區(qū)被實(shí)現(xiàn)為具有相等大小的一組分段文件矫俺。
(2)Partition(分區(qū))
- 一個(gè)Topic可以分成多個(gè)Partition,這是為了平行化處理掸冤。
- 每個(gè)Partition內(nèi)部消息有序厘托,其中每個(gè)消息都有一個(gè)offset序號(hào)。
- 一個(gè)Partition只對(duì)應(yīng)一個(gè)Broker稿湿,一個(gè)Broker可以管理多個(gè)Partition铅匹。
(3)Partition offset(分區(qū)偏移)
每個(gè)分區(qū)消息具有稱為 offset 的唯一序列標(biāo)識(shí)。
(4)Replicas of partition(分區(qū)備份)
副本只是一個(gè)分區(qū)的備份饺藤。 副本從不讀取或?qū)懭霐?shù)據(jù)包斑。 它們用于防止數(shù)據(jù)丟失。
(5)Brokers(經(jīng)紀(jì)人)
- 代理是負(fù)責(zé)維護(hù)發(fā)布數(shù)據(jù)的簡(jiǎn)單系統(tǒng)涕俗。 每個(gè)代理可以每個(gè)主題具有零個(gè)或多個(gè)分區(qū)罗丰。 假設(shè),如果在一個(gè)主題和N個(gè)代理中有N個(gè)分區(qū)咽袜,每個(gè)代理將有一個(gè)分區(qū)丸卷。
- 假設(shè)在一個(gè)主題中有N個(gè)分區(qū)并且多于N個(gè)代理(n + m),則第一個(gè)N代理將具有一個(gè)分區(qū)询刹,并且下一個(gè)M代理將不具有用于該特定主題的任何分區(qū)谜嫉。
- 假設(shè)在一個(gè)主題中有N個(gè)分區(qū)并且小于N個(gè)代理(n-m),每個(gè)代理將在它們之間具有一個(gè)或多個(gè)分區(qū)共享凹联。 由于代理之間的負(fù)載分布不相等沐兰,不推薦使用此方案。
(6)Kafka Cluster(Kafka集群)
Kafka有多個(gè)代理被稱為Kafka集群蔽挠。 可以擴(kuò)展Kafka集群住闯,無需停機(jī)瓜浸。 這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制。
(7)Producers(生產(chǎn)者)
生產(chǎn)者是發(fā)送給一個(gè)或多個(gè)Kafka主題的消息的發(fā)布者比原。 生產(chǎn)者向Kafka經(jīng)紀(jì)人發(fā)送數(shù)據(jù)插佛。 每當(dāng)生產(chǎn)者將消息發(fā)布給代理時(shí),代理只需將消息附加到最后一個(gè)段文件量窘。實(shí)際上雇寇,該消息將被附加到分區(qū)。 生產(chǎn)者還可以向他們選擇的分區(qū)發(fā)送消息蚌铜。
(8)Consumers(消費(fèi)者)
Consumers從經(jīng)紀(jì)人處讀取數(shù)據(jù)锨侯。 消費(fèi)者訂閱一個(gè)或多個(gè)主題,并通過從代理中提取數(shù)據(jù)來使用已發(fā)布的消息冬殃。
- Consumer自己維護(hù)消費(fèi)到哪個(gè)offet
- 每個(gè)Consumer都有對(duì)應(yīng)的group
- group內(nèi)是queue消費(fèi)模型:各個(gè)Consumer消費(fèi)不同的partition囚痴,因此一個(gè)消息在group內(nèi)只消費(fèi)一次
- group間是publish-subscribe消費(fèi)模型:各個(gè)group各自獨(dú)立消費(fèi),互不影響审葬,因此一個(gè)消息被每個(gè)group消費(fèi)一次深滚。
2.2 kafka架構(gòu)圖
3.kafka安裝部署和基本操作
3.1 搭建
3.1.1 hosts 映射(可選, 建議)
echo "192.168.1.10 kfk1" >> /etc/hosts
echo "192.168.1.11 kfk2" >> /etc/hosts
echo "192.168.1.12 kfk3" >> /etc/hosts
3.1.2 搭建 Zookeeper (單機(jī) or 集群)
- 需要先啟動(dòng) zookeeper,如果沒有搭建 zookeeper 環(huán)境耳璧,可以直接運(yùn)行
kafka 內(nèi)嵌的 zookeeper
啟動(dòng)命令: bin/zookeeper-server-start.sh config/zookeeper.properties &
Broker, Producer, Consumer 的運(yùn)行都需要 ZooKeeper
自己搭建見 [3.2.2.1 單機(jī)模式](#3.2.2.1 單機(jī)模式) or 見 [3.2.2.3 集群模式
3.1.3 Broker 的配置
tar xzf kafka_2.11-0.9.0.1.tgz -C /usr/local/
cd /usr/local/kafka_2.11-0.9.0.1
config 文件夾下是各個(gè)組件的配置文件, server.properties 是 Broker 的配置文件, 需要修改的有
######################### Server Basics #########################
broker.id=0 # 本 Broker 的 id, 只要非負(fù)數(shù)且各 Broker 的 id 不同即可, 一般依次加 1
##################### Socket Server Settings #####################
listeners=PLAINTEXT://:9092 # Broker 監(jiān)聽的端口, Producer, Consumer 會(huì)連接這個(gè)端口
port=9092 # 同上
host.name=kfk1 # 本 Broker 的 hostname
########################## Topic Basics ##########################
delete.topic.enable=true # 配置為可以使用 delete topic 命令
########################### Log Basics ###########################
log.dirs=/var/lib/kafka-logs # log 目錄, 此目錄要存在且有足夠權(quán)限
###################### Log Retention Policy ######################
log.roll.hours=2 # 開始一個(gè)新的 log 文件片段的最大時(shí)間
log.retention.hours=24 # 控制一個(gè) log 文件保留多長(zhǎng)個(gè)小時(shí)
log.retention.bytes=1073741824 # 所有 log 文件的最大大小
log.segment.bytes=104857600 # 單一的 log 文件最大大小
log.cleanup.policy=delete # log 清除策略
log.retention.check.interval.ms=60000
############################ Zookeeper ############################
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181 # Zookeeper 的連接信息
注意: broker.id 和 host.name 在每臺(tái)機(jī)器上是不一樣的, 要按實(shí)際填寫
即在 kfk2, kfk3 上
broker.id=1
host.name=kfk2
broker.id=2
host.name=kfk3
PS: 在 kafka 安裝目錄下的 ./site-docs
目錄下有 kafka_config.html
, producer_config.html
, consumer_config.html
三個(gè)文件, 分別講解 broker, producer, consumer 配置參數(shù)含義. 附錄 A 是其翻譯.
3.1.4 開放端口
把 Kafka 用到的端口開放出來
firewall-cmd --zone=public --add-port=9100/tcp --permanent # 永久開啟 9100 端口(kafka manager)
firewall-cmd --zone=public --add-port=9092/tcp --permanent # 永久開啟 9092 端口(brokers)
firewall-cmd --zone=public --add-port=9999/tcp --permanent # 永久開啟 9999 端口(JMX)
firewall-cmd --reload # 重新加載防火墻規(guī)則
3.1.5 Broker 運(yùn)行與終止
Broker 運(yùn)行與終止命令如下.
# 運(yùn)行: 如下有兩種方式
# 以守護(hù)進(jìn)程的方式啟動(dòng)(推薦)
bin/kafka-server-start.sh -daemon config/server.properties
# 將 Broker 放到后臺(tái)執(zhí)行, 且不受終端關(guān)閉的影響, 標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出定向到 `./logs/kafka-server-boot.log`, 有問題時(shí)可以去看這個(gè)文件
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &
# 終止
bin/kafka-server-stop.sh config/server.properties
3.1.6 測(cè)試
我們使用 Kafka 自帶的基于 終端 的 Producer 和 Consumer 腳本做測(cè)試.
先只啟動(dòng)一臺(tái)機(jī)器上的 Broker. 在 kfk1 上運(yùn)行
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &
1. 創(chuàng)建 Topic
創(chuàng)建一個(gè) 名為"TestCase"的 單分區(qū) 單副本 的 Topic.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic TestCase --replication-factor 1 --partitions 1
查看有哪些 Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
TestCase
運(yùn)行describe topics
命令, 可以知道 Topic 的具體分配:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
解釋一下輸出的內(nèi)容. 第一行給出了所有 partition 的一個(gè)摘要, 每行給出一個(gè) partition 的信息. 因?yàn)槲覀冞@個(gè) topic 只有一個(gè) partition 所以只有一行信息.
- "leader" 負(fù)責(zé)所有 partition 的讀和寫請(qǐng)求的響應(yīng). "leader" 是隨機(jī)選定的.
- "replicas" 是備份節(jié)點(diǎn)列表, 包含所有復(fù)制了此 partition log 的節(jié)點(diǎn), 不管這個(gè)節(jié)點(diǎn)是否為 leader 也不管這個(gè)節(jié)點(diǎn)當(dāng)前是否存活, 只是顯示.
- "isr" 是當(dāng)前處于同步狀態(tài)的備份節(jié)點(diǎn)列表. 即 "replicas" 列表中處于存活狀態(tài)并且與 leader 一致的節(jié)點(diǎn).
可以發(fā)現(xiàn)這個(gè) Topic 沒有副本而且它在 [我們創(chuàng)建它時(shí)集群僅有的一個(gè)節(jié)點(diǎn)] Broker 0 上.
另外, 除去手工創(chuàng)建 Topic 以外, 你也可以將你的 Brokers 配置成當(dāng)消息發(fā)布到一個(gè)不存在的 Topic 時(shí)自動(dòng)創(chuàng)建此 Topic.
2. 啟動(dòng) 生產(chǎn)者
Kafka 附帶一個(gè) 終端生產(chǎn)者 可以從文件或者標(biāo)準(zhǔn)輸入中讀取輸入然后發(fā)送這個(gè)消息到 Kafka 集群. 默認(rèn)情況下每行信息被當(dāng)做一個(gè)消息發(fā)送.
運(yùn)行生產(chǎn)者腳本然后在終端中輸入一些消息, 即可發(fā)送到 Broker.
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestCase
This is a message
This is another message
PS: 通過鍵入 Ctrl-C 來終止終端生產(chǎn)者.
3. 啟動(dòng) 消費(fèi)者
Kafka 也附帶了一個(gè) 終端生產(chǎn)者 可以導(dǎo)出這些消息到標(biāo)準(zhǔn)輸出.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TestCase --from-beginning
This is a message
This is another message
--from-beginning
參數(shù)使得可以接收到 topic 的所有消息, 包括 consumer 啟動(dòng)前的. 去掉后則為僅接收 consumer 啟動(dòng)后 kafka 收到的消息.
如果你在不同的終端運(yùn)行生產(chǎn)者和消費(fèi)者這兩個(gè)命令, 那么現(xiàn)在你就應(yīng)該能再生產(chǎn)者的終端中鍵入消息同時(shí)在消費(fèi)者的終端中看到.
所有的命令行工具都有很多可選的參數(shù); 不添加參數(shù)直接執(zhí)行這些命令將會(huì)顯示它們的使用方法, 更多內(nèi)容可以參考他們的手冊(cè).
PS: 通過鍵入 Ctrl-C 來終止終端消費(fèi)者.
4. 配置一個(gè)多節(jié)點(diǎn)集群
我們已經(jīng)成功的以單 Broker 的模式運(yùn)行起來了, 但這并沒有意思. 對(duì)于 Kafka 來說, 一個(gè)單獨(dú)的 Broker 就是一個(gè)大小為 1 的集群, 所以集群模式就是多啟動(dòng)幾個(gè) Broker 實(shí)例.
我們將我們的集群擴(kuò)展到3個(gè)節(jié)點(diǎn). 在另外兩臺(tái)機(jī)器 kfk2, kfk3 上運(yùn)行
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka-server-boot.log 2>&1 &
現(xiàn)在我們可以創(chuàng)建一個(gè)新的 Topic 并制定副本數(shù)量為 3:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-replicated-topic --replication-factor 3 --partitions 1
運(yùn)行describe topics
命令, 可以知道每個(gè) Broker 具體的工作:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
注意本例中 Broker 1 是這個(gè)有一個(gè) partition 的 topic 的 leader.
現(xiàn)在我們發(fā)布幾個(gè)消息到我們的新 topic 上:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
my test message 1
my test message 2
現(xiàn)在讓我們消費(fèi)這幾個(gè)消息:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning
my test message 1
my test message 2
現(xiàn)在讓我們測(cè)試一下集群容錯(cuò). Broker 1 正在作為 leader, 所以我們殺掉它:
$ ps | grep server.properties
...
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
...
$ kill -9 7564
或在 kfk1 機(jī)器上運(yùn)行
bin/kafka-server-stop.sh config/server.properties
此時(shí), 集群領(lǐng)導(dǎo)已經(jīng)切換到一個(gè)從服務(wù)器上, Broker 1 節(jié)點(diǎn)也不在出現(xiàn)在同步副本列表中了:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
而且現(xiàn)在消息的消費(fèi)仍然能正常進(jìn)行, 即使原來負(fù)責(zé)寫的節(jié)點(diǎn)已經(jīng)失效了.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
5. 使用 Kafka Connect 進(jìn)行數(shù)據(jù)導(dǎo)入導(dǎo)出
從終端寫入數(shù)據(jù), 數(shù)據(jù)也寫回終端是默認(rèn)的. 但是你可能希望從一些其它的數(shù)據(jù)源或者導(dǎo)出 Kafka 的數(shù)據(jù)到其它的系統(tǒng). 相比其它系統(tǒng)需要自己編寫集成代碼, 你可以直接使用Kafka的 Connect 直接導(dǎo)入或者導(dǎo)出數(shù)據(jù). Kafka Connect 是 Kafka 自帶的用于數(shù)據(jù)導(dǎo)入和導(dǎo)出的工具. 它是一個(gè)擴(kuò)展的可運(yùn)行連接器(runsconnectors)工具, 可實(shí)現(xiàn)自定義的邏輯來實(shí)現(xiàn)與外部系統(tǒng)的集成交互. 在這個(gè)快速入門中我們將介紹如何通過一個(gè)簡(jiǎn)單的從文本導(dǎo)入數(shù)據(jù), 導(dǎo)出數(shù)據(jù)到文本的連接器來調(diào)用 Kafka Connect. 首先我們從創(chuàng)建一些測(cè)試的基礎(chǔ)數(shù)據(jù)開始:
echo -e "foo\nbar" > test.txt
接下來我們采用standalone模式啟動(dòng)兩個(gè) connectors, 也就是讓它們都運(yùn)行在獨(dú)立的, 本地的, 不同的進(jìn)程中. 我們提供三個(gè)參數(shù)化的配置文件, 第一個(gè)提供共有的配置用于 Kafka Connect 處理, 包含共有的配置比如連接哪個(gè) Kafka broker 和數(shù)據(jù)的序列化格式. 剩下的配置文件制定每個(gè) connector 創(chuàng)建的特定信息. 這些文件包括唯一的 connector 的名字, connector 要實(shí)例化的類和其它的一些 connector 必備的配置.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
上述簡(jiǎn)單的配置文件已經(jīng)被包含在 Kafka 的發(fā)行包中, 它們將使用默認(rèn)的之前我們啟動(dòng)的本地集群配置創(chuàng)建兩個(gè) connector: 第一個(gè)作為源 connector 從一個(gè)文件中讀取每行數(shù)據(jù)然后將他們發(fā)送 Kafka 的 topic, 第二個(gè)是一個(gè)輸出(sink)connector 從 Kafka 的 topic 讀取消息, 然后將它們輸出成輸出文件的一行行的數(shù)據(jù). 在啟動(dòng)的過程你講看到一些日志消息, 包括一些提示 connector 正在被實(shí)例化的信息. 一旦 Kafka Connect 進(jìn)程啟動(dòng)以后, 源 connector 應(yīng)該開始從 test.txt
中讀取數(shù)據(jù)行, 并將他們發(fā)送到 topic connect-test
上, 然后輸出 connector 將會(huì)開始從 topic 讀取消息然后把它們寫入到 test.sink.txt
中.
我們可以查看輸出文件來驗(yàn)證通過整個(gè)管線投遞的數(shù)據(jù):
$ cat test.sink.txt
foo
bar
注意這些數(shù)據(jù)已經(jīng)被保存到了 Kafka 的 connect-test
topic 中, 所以我們還可以運(yùn)行一個(gè)終端消費(fèi)者來看到這些數(shù)據(jù)(或者使用自定義的消費(fèi)者代碼來處理數(shù)據(jù)):
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
connector 在持續(xù)的處理著數(shù)據(jù), 所以我們可以向文件中添加數(shù)據(jù)然后觀察到它在這個(gè)管線中的傳遞:
echo "Another line" >> test.txt
你應(yīng)該可以觀察到新的數(shù)據(jù)行出現(xiàn)在終端消費(fèi)者中和輸出文件中.
3.2 Zookeeper
3.2.1 簡(jiǎn)介
Apache Zookeeper 是 Hadoop 的一個(gè)子項(xiàng)目, 是一個(gè)致力于開發(fā)和管理開源服務(wù)器, 并且能實(shí)現(xiàn)高可靠性的分布式協(xié)調(diào)框架. 它包含一個(gè)簡(jiǎn)單的原語集, 分布式應(yīng)用程序可以基于它實(shí)現(xiàn)同步服務(wù), 配置維護(hù)和命名服務(wù)等.
Zookeeper 保證 2n + 1 臺(tái)機(jī)器的集群最大允許 n 臺(tái)機(jī)器掛掉而事務(wù)不中斷.
3.2.2 搭建
3.2.2.1 單機(jī)模式
此模式主要用于開發(fā)人員本地環(huán)境下測(cè)試代碼
1. 解壓 Zookeeper 并進(jìn)入其根目錄
tar xzf zookeeper-3.4.9.tar.gz -C /usr/local/
cd /usr/local/zookeeper-3.4.9
2. 創(chuàng)建配置文件 conf/zoo.cfg
cp conf/zoo_sample.cfg conf/zoo.cfg
3. 修改內(nèi)容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/logs
clientPort=2181
- tickTime: 是 zookeeper 的最小時(shí)間單元的長(zhǎng)度(以毫秒為單位), 它被用來設(shè)置心跳檢測(cè)和會(huì)話最小超時(shí)時(shí)間(tickTime 的兩倍)
- initLimit: 初始化連接時(shí)能容忍的最長(zhǎng) tickTime 個(gè)數(shù)
- syncLimit: follower 用于同步的最長(zhǎng) tickTime 個(gè)數(shù)
- dataDir: 服務(wù)器存儲(chǔ) 數(shù)據(jù)快照 的目錄
- dataLogDir: 服務(wù)器存儲(chǔ) 事務(wù)日志 的目錄
- clientPort: 用于 client 連接的 server 的端口
其中需要注意的是dataDir
和dataLogDir
, 分別是 zookeeper 運(yùn)行時(shí)的數(shù)據(jù)目錄和日志目錄, 要保證 這兩個(gè)目錄已創(chuàng)建 且 運(yùn)行 zookeeper 的用戶擁有這兩個(gè)目錄的所有權(quán)
4.測(cè)試
- 啟動(dòng)/關(guān)閉 Zookeeper:
bin/zkServer.sh start
bin/zkServer.sh stop
- 查看 Zookeeper 狀態(tài):
bin/zkServer.sh status
顯示 mode: standalone
, 單機(jī)模式.
- 使用 java 客戶端連接 ZooKeeper
./bin/zkCli.sh -server 127.0.0.1:2181
然后就可以使用各種命令了, 跟文件操作命令很類似, 輸入 help 可以看到所有命令.
3.2.2.2 集群模式
此模式是 生產(chǎn)環(huán)境中實(shí)際使用的模式
因?yàn)?zookeeper 保證 2n + 1 臺(tái)機(jī)器最大允許 n 臺(tái)機(jī)器掛掉, 所以配置集群模式最好是奇數(shù)臺(tái)機(jī)器: 3, 5, 7...
最少 3 臺(tái)構(gòu)成集群
1 hosts 映射(可選)
echo "192.168.1.1 zoo1" >> /etc/hosts
echo "192.168.1.2 zoo2" >> /etc/hosts
echo "192.168.1.3 zoo3" >> /etc/hosts
2 修改 zookeeper-3.4.9/conf/zoo.cfg 文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
clientPort=2181
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
與單機(jī)模式的不同就是最后三條: server.X=host:portA:portB
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
或
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
X 為標(biāo)識(shí)為 X 的機(jī)器, host 為其 hostname 或 IP, portA 用于這臺(tái)機(jī)器與集群中的 Leader 機(jī)器通信, portB 用于 server 選舉 leader.
PS: 要配單機(jī)偽分布式的話, 可以修改這里為
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
然后每個(gè) zookeeper 實(shí)例的 dataDir 和 dataLogDir 配置為不同的即可
3 myid 文件
在標(biāo)示為 X 的機(jī)器上, 將 X 寫入 ${dataDir}/myid 文件, 如: 在 192.168.1.2 機(jī)器上的 /var/lib/zookeeper/data 目錄下建立文件 myid, 寫入 2
echo "2" > /var/lib/zookeeper/data/myid
4 開放端口
CentOS 7 使用 firewalld 代替了原來的 iptables, 基本使用如下
systemctl start firewalld # 啟動(dòng)防火墻
firewall-cmd --state # 檢查防火墻狀態(tài)
firewall-cmd --zone=public --add-port=2888/tcp --permanent # 永久開啟 2888 端口
firewall-cmd --reload # 重新加載防火墻規(guī)則
firewall-cmd --list-all # 列出所有防火墻規(guī)則
把 Zookeeper 用到的端口開放出來
firewall-cmd --zone=public --add-port=2181/tcp --permanent # 永久開啟 2181 端口
firewall-cmd --zone=public --add-port=2888/tcp --permanent # 永久開啟 2888 端口
firewall-cmd --zone=public --add-port=3888/tcp --permanent # 永久開啟 3888 端口
firewall-cmd --reload # 重新加載防火墻規(guī)則
5 測(cè)試
- 在 集群中所有機(jī)器上 啟動(dòng) zookeeper(盡量同時(shí)):
bin/zkServer.sh start
- 查看狀態(tài), 應(yīng)該有一臺(tái)機(jī)器顯示
mode: leader
, 其余為mode: follower
bin/zkServer.sh status
- 使用 java 客戶端連接 ZooKeeper
./bin/zkCli.sh -server 192.168.1.1:2181
然后就可以使用各種命令了, 跟文件操作命令很類似, 輸入help可以看到所有命令.
- 關(guān)閉 zookeeper:
./bin/zkServer.sh stop
3.2.3 Zookeeper 常見問題
查看狀態(tài)時(shí), 應(yīng)該有一臺(tái)機(jī)器顯示mode: leader
, 其余為mode: follower
bin/zkServer.sh status
當(dāng)顯示Error contacting service. It is probably not running.
時(shí), 可以查看日志
cat zookeeper.out
查看 zookeeper.out 日志可以看到是那些機(jī)器連不上, 可能是 網(wǎng)絡(luò), ip, 端口, 配置文件, myid 文件 的問題.
正常應(yīng)該是: 先是一些 java 異常, 這是因?yàn)?ZooKeeper 集群?jiǎn)?dòng)的時(shí)候, 每個(gè)結(jié)點(diǎn)都試圖去連接集群中的其它結(jié)點(diǎn), 先啟動(dòng)的肯定連不上后面還沒啟動(dòng)的, 所以上面日志前面部分的異常是可以忽略的, 當(dāng)集群所有的機(jī)器的 zookeeper 都啟動(dòng)起來, 就沒有異常了, 并選舉出來了 leader.
PS: 因?yàn)?zkServer.sh 腳本中是用 nohup 命令啟動(dòng) zookeeper 的, 所以 zookeeper.out 文件是在調(diào)用 zkServer.sh 時(shí)的路徑下, 如:用 bin/zkServer.sh start
啟動(dòng)則 zookeeper.out 文件在 zookeeper-3.4.9/
下; 用 zkServer.sh start
啟動(dòng)則 zookeeper.out 文件在 zookeeper-3.4.9/bin/
下.
4.kafka的應(yīng)用
未完待續(xù)3审铩!