尋找最新的offset
def get_latest_offset():
topic = f'{get_env_tag()}_broker_rateRule'
_consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
group_id='test_group_id')
topic_partition, last_offset = None, 0
for p in _consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, p)
_consumer.assign([tp])
_consumer.seek_to_end(tp)
temp_last_offset = _consumer.position(tp)
if temp_last_offset >= last_offset:
topic_partition, last_offset = tp, temp_last_offset
_consumer.close(autocommit=False)
return topic_partition, last_offset
從指定的offset檢索消息
def seek_msg_from_kafka(offset, tp, seek_msg_type=MsgType.SWITCH_STATE.value, time_out=5 * 1000, max_msg_num=1):
"""
從指定的offset讀取msg,返回指定的msg類型
:param offset:
:param tp:
:param seek_msg_type:
:param time_out:
:param max_msg_num: 最多等待多少條信息
:return:
"""
_consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
group_id='test_group_id',
consumer_timeout_ms=time_out)
_consumer.assign([tp, ])
_consumer.seek(tp, offset)
msg_results = []
recv_msg_num = 0
for msg in _consumer:
if b"body" in msg.value:
json_msg = json.loads(msg.value.decode('utf-8'))
if json_msg['type'] == seek_msg_type:
recv_msg_num += 1
if json_msg['type'] == MsgType.SWITCH_STATE.value:
obj_msg = SwitchStateMsg(json.dumps(json_msg))
obj_msg.msg_body = SwitchStateBody(obj_msg.msg_body)
elif json_msg['type'] == MsgType.TRADE_FEE.value:
obj_msg = TradeFeeMsg(json.dumps(json_msg))
obj_msg.msg_body = TradeFeeBody(obj_msg.msg_body)
elif json_msg['type'] == MsgType.SYMBOL_DISCOUNT.value:
obj_msg = SymbolDiscountMsg(json.dumps(json_msg))
obj_msg.msg_body = SymbolDiscountBody(obj_msg.msg_body)
msg_results.append(obj_msg)
if recv_msg_num >= max_msg_num:
break
_consumer.close()
return msg_results
實際檢索消息的代碼
tp, offset = get_latest_offset()
self.comm_api.set_user_point_switch(switch_on=True, token=token)
msg_res = seek_msg_from_kafka(offset, tp, seek_msg_type=MsgType.SWITCH_STATE.value, max_msg_num=2)
self.verify_msg(msg_res, user_tag)