簡單記錄一個讀寫kafka demo
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import time # 引入time模塊
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
# 循環(huán)發(fā)送數(shù)據(jù)次數(shù)
n = 1
#數(shù)據(jù)從174獲取,發(fā)送到175
KAFAKA_TOPIC = "poseidon_receiver_raw_data_RTCM3.2"
KAFAKA_HOST_PRODUCTER = "192.168.xx.xx"
KAFAKA_HOST_CONSUMER = "192.168.xx.xx"
KAFAKA_PORT = 9092
logging.basicConfig(
level=logging.INFO, # 定義輸出到文件的log級別嫡纠,大于此級別的都被輸出
# format='%(asctime)s %(filename)s %(levelno)s : %(levelname)s %(message)s', # 定義輸出log的格式
format='%(asctime)s : %(message)s', # 定義輸出log的格式
datefmt='%Y-%m-%d %A %H:%M:%S', # 時間
filename='obs_info.log', # log文件名
filemode='w') # 寫入模式“w”或“a”
class Kafka_producer():
'''''
生產(chǎn)模塊:根據(jù)不同的key赖歌,區(qū)分消息
'''
def __init__(self, kafkahost, kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
print("producer:h,p,t", kafkahost, kafkaport, kafkatopic)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:", bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
def send(self, k, v):
try:
producer = self.producer
k = k.encode('utf-8')
resp = producer.send(self.kafkatopic, key=k, value=v)
# print(resp.succeeded())
producer.flush()
except KafkaError as e:
print(e)
class Kafka_consumer():
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort)
)
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print(e)
if __name__ == '__main__':
group = 'tunnel2QA'
consumer = Kafka_consumer(KAFAKA_HOST_PRODUCTER, KAFAKA_PORT, KAFAKA_TOPIC, group)
producer = Kafka_producer(KAFAKA_HOST_CONSUMER, KAFAKA_PORT, KAFAKA_TOPIC)
message = consumer.consume_data()
pre_time = 0
for msg in message:
key = str(msg.key, "utf-8")
ticks = int(time.time())
if pre_time != ticks:
for i in range(0, n):
key = i << 8 + 1 # key自定義
producer.send(str(key), msg.value)
pre_time = ticks