Windows 安裝及利用Python操作kafka
1. 安裝zookeeper
https://zookeeper.apache.org/releases.html
下載解壓文件后
將conf文件夾下的 zoo_sample.cfg 文件名改成 zoo.cfg罢屈,打開 zoo.cfg文件邀窃,修改dataDir的值路徑,將dataDir=/tmp/zookeeper改成
dataDir=E:\apache-zookeeper-3.5.9-bin\data
并加入
dataLogDir=E:\apache-zookeeper-3.5.9-bin\log
data和log文件夾可以自己創(chuàng)建
啟動zookeeper
打開cmd 路徑切換到解壓的文件夾下的bin文件夾
運(yùn)行 zkServer.cmd
2. 安裝kafka
下載kafka文件:
http://kafka.apache.org/downloads.html
解壓完成后進(jìn)行properties文件配置:
新建logs文件夾(自己命名)
找到config文件夾下的server.properties文件罗心,將log.dirs的值改成log.dirs=E:\work\kafka_2.12-0.10.2.0\kafka-logs(改為自己的地址)
啟動Kafka
打開新的cmd 路徑切換到解壓的文件夾下木蹬,輸入如下命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
3. 使用python操作Kafka
python安裝kafka
pip install kafka-python
不知道是否是因?yàn)閳?zhí)行過pip install kafka
使用
# 生產(chǎn)者
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(3):
msg = 'test %d' % i
print(msg)
producer.send('test',msg.encode())
producer.close()
# 消費(fèi)者
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
print(msg.value, type(msg.value))
- 錯誤記錄
使用KafkaProducer的send函數(shù)時强重,報了如下錯誤:
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
AssertionError
錯誤原因:send函數(shù)的value_bytes是str類型
解決辦法:
對字符串使用encode()
字符串類str里有一個==encode()==方法礼旅,它是從字符串向比特流的編碼過程。
bytes類型恰好有個==decode()==方法俐东,它是從比特流向字符串解碼的過程跌穗。
關(guān)閉kafka
.\bin\windows\kafka-server-stop.bat