1.下載指定版本的安裝包
或者直接將下載完成的安裝包上傳到我們家目錄下
★★★★★注意此時分多個版本:
1,測試的安裝包必須是這個版本的(“kafka_2.11-0.10.2.0.tgz ”),
- 2.11是針對Scala 2.11版本
- 0.10.2.0是針對kafka本身的版本
2豺妓,測試版本是 kafka_2.11-2.4.1.tgz
- 2.11是針對Scala 2.11的版本
- 2.4.1 是針對kafka版本
所以對其進行了不同的測試
- 主機器(更改了主機名、hosts映射)
- 測試機器1:2.11-0.10.2.0 版本
- 測試機器2:2.11-2.4.1 版本
2.解壓
tar -zxvf kafka_2.11-0.10.2.0.tgz -C /usr/local/
cd /usr/local
mv kafka_2.11-0.10.2.0/ kafka
# 我們在安裝之前的時候可以創(chuàng)建一個新的賬戶(非必選)
# sudo chown -R hadoop ./kafka
3.測試
完全可以配置環(huán)境變量之后可以在任意目錄下執(zhí)行相應的代碼嫌褪,這里在測試的時候沒有配置環(huán)境變量,所以一直需要先進去當前kafka的主目錄下從而進行相應的操作。
(1).啟動默認的 Zookeeper
這里為了方便宪躯,直接使用的是默認的zookeeper啟動的,當然也可通過配置單機的 zookeeper來實現(xiàn)以下操作位迂。
# 新建一個終端窗口
# 0.1访雪,進入kafka所在的目錄
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
(2.1),啟動默認的 Kafka
# 0.2,新建一個終端窗口
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
(2.2),也可以修改Kafka的配置文件server.properties
# 確保每個機器上的id不一樣
broker.id=0
# 配置服務端的監(jiān)控地址
listeners=PLAINTEXT://192.168.51.128:9092
# kafka 日志目錄
log.dirs=/data/servers/kafka_2.11-2.4.0/logs
# kafka設置的partitons的個數(shù)
num.partitions=1
# zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群,多個節(jié)點之間使用“,”隔開
zookeeper.connect=192.168.51.128:2181
(3).創(chuàng)建topic
kafka-topics.sh
:基本腳本命令命令參數(shù)詳解:
--list
:查看topic
--create
: 創(chuàng)建Topic
--topic <String: topic>
:指定 Topic(主題) 名囤官,
--partitions
:指定該Topic分區(qū)的個數(shù)冬阳,
--replication-factor
:指定每個Partition的備份數(shù)
--zookeeper <String: host1:port,host2:port>
:指定 Zookeeper 的連接地址參數(shù)提示可能并不贊成這樣使用党饮,那么就可以直接指定Kafka的連接地址(DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.)
--bootstrap-server<String: host1:port肝陪,host2:port>
:指定kafka的連接信息,采用Direct方式的時候推薦使用這個刑顺。
# 0.3氯窍,查看當前集群中的topic,在創(chuàng)建之前和之后都可以進行查看蹲堂,根據(jù)自己的實際需要進行相應的修改
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 新建一個終端窗口
# 1.1狼讨,主機器(也是就我最近常用的機器,主機名和hosts映射都重新做的)
# 創(chuàng)建一個名字為demo01的Topic柒竞,只有1個分區(qū)政供、每個Partition只有1個備份
# 這里是使用的Zookeeper作為連接
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic demo01
# 2.1,測試機器1(也就 第一個版本的測試朽基,第二個版本的測試也是一樣的)
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo01
# 創(chuàng)建 topic test1
# 存在的價值:1.在指定連接信息的時候可以采用直連(Direct)的方式布隔,也就是說使用配置的Kafka的地址和端口;2.指定集群中的多個地址的時候可以使用","進行隔開稼虎;3.這里使用的ip地址指定的衅檀,也可以使用主機名的方式指定,但是前提要配置hosts映射
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
(4).創(chuàng)建Producer
kafka-console-producer.sh
基本腳本命令命令參數(shù)詳解:
--broker-list <String: broker-list>
:指定kafka的連接信息霎俩,集群中多個連接信息之間使用","隔開
--topic <String: topic>
:指定Topic
--timeout <Integer: timeout_ms>
:設置超時時間
--sync
:異步發(fā)送信息
# 1.2哀军,常用機器
# 在當前topic窗口執(zhí)行以下命令來創(chuàng)建Producer
bin/kafka-console-producer.sh --broker-list master:9092 --topic demo01
# 此時可以在命令下邊輸入消息內容,每一行為一條消息打却,此時可以將消息推送到kafka服務器杉适,以在consumer中進行查看
# 2.2,測試機器1和2
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo01
# 并在當前窗口運行成功后輸入以下信息
hello hadoop
hello wyq
(5).創(chuàng)建Consumer
kafka-console-consumer.sh
:基本腳本命令命令參數(shù)詳解:
--bootstrap-server
:指定kafka的連接信息,集群中多臺機器采用","進行分割
--topic <String: topic>
:指定消費的Topic
--from-beginning
: 是用來指定消費信息是從頭開始的学密。
--group <String: consumer group id>
:指定消費者組
# 新建一個終端窗口
# 1.3.1淘衙,常用機器
# 創(chuàng)建消費者,查看消費情況(此時為方法1)腻暮,此時是通過間接訪問 zookeeper的2181端口消費數(shù)據(jù)
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic demo01 --from-beginning
# 1.3.2彤守,也可以用方法2(說明: 因為此時的版本是舊的毯侦,會提示--zookeeper將會被取代,然后我們就可以使用--bootstrap-server去實現(xiàn)具垫,但是指定的連接端口(此時是通過kafka的端口9092消費數(shù)據(jù))不一樣侈离,而是直接指定連接Producer的端口)
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic demo01 --new-consumer --from-beginning
# 測試機器1(也就是 2.11-0.10.2.0 版本)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo01 --from-beginning
# 此時可以看到我們剛剛生產的三條消息,說明kafka安裝成功
# 測試機器2(也就是 2.11-2.4.1 版本)
# 注意此時版本中將之前的"--new-consumer"參數(shù)取消了筝蚕,所以可以直接去掉
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo01 --from-beginning
4.關閉所有程序的順序
原則上是:按照啟動順序倒著關X阅搿!起宽!
- 1.關閉 Consumer
- 2.關閉 Producer
- 3.關閉 Kafka
- 4.關閉 zookeeper
5.附加操作
(1).后臺常駐的方式啟動kafka洲胖,帶上參數(shù) -daemon
(2).停止kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/bin/config/server.properties
(3),查詢Topic
1).查看指定的Topic的詳細信息
--describe
配合--topic
參數(shù)查看指定Topic的詳細信息,同時要指定--zookeeper
的連接信息
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
2).查看 topic 指定分區(qū) offset 的最大值或最小值
time 為 -1 時表示最大值坯沪,為 -2 時表示最小值:
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0
(4),增加Topic的Partition數(shù)量
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5
(5),刪除Topic
--delete
配合--topic
參數(shù)指定要刪除的Topic绿映,同時要指定--zookeeper
的連接信息如果要實現(xiàn)真正意義上的刪除,必選將配置文件中
delete.topic.enable=true
這個配置項進行更改腐晾;因為會提示以下的信息Topic demo02 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 1.需要事先將config/server.properties
delete.topic.enable=true
# 2.執(zhí)行刪除操作
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(6).消費信息
1).指定從頭開始
--from-beginning
關鍵字的使用叉弦!
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2),從尾部獲取數(shù)據(jù),此時必須指定分區(qū)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
3).指定獲取的個數(shù)的信息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1