kafka-python文檔
一、consumer
1. 常用api
#建立連接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
api_version=(0,10),group_id='my_group')
# topic所有的partition
consumer.partitions_for_topic(topic)
# 構(gòu)造topicPartition對象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# 為consumer分配分區(qū)
consumer.assign(tps)
# kafka每個分區(qū)的最新offset
consumer.end_offsets(tps)
# 當前groupid 每個分區(qū)消費到的位置
for i in range(len(tps)):
consumer.position(tps[i])
# 消費數(shù)據(jù)
for message in consumer:
partition = message.partition
offset = message.offset
value = message.value
# 重置offset
for i in range(len(tps)):
consumer.seek(tps[i], partition_offset[i]) #partition_offset保存每
# partition_offset保存每個分區(qū)的起始消費位置
# 形如{0:123, 1:345 },表示0分區(qū)從123開始再次消費
二鸽斟、producer
三、其他
3.1 json處理
額外的包:
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此時得到的value是dict類型
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者