定位消息

尋找最新的offset

def get_latest_offset():
    topic = f'{get_env_tag()}_broker_rateRule'
    _consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
                              group_id='test_group_id')
    topic_partition, last_offset = None, 0
    for p in _consumer.partitions_for_topic(topic):
        tp = TopicPartition(topic, p)
        _consumer.assign([tp])
        _consumer.seek_to_end(tp)
        temp_last_offset = _consumer.position(tp)
        if temp_last_offset >= last_offset:
            topic_partition, last_offset = tp, temp_last_offset

    _consumer.close(autocommit=False)
    return topic_partition, last_offset

從指定的offset檢索消息

def seek_msg_from_kafka(offset, tp, seek_msg_type=MsgType.SWITCH_STATE.value, time_out=5 * 1000, max_msg_num=1):
    """
    從指定的offset讀取msg,返回指定的msg類型
    :param offset:
    :param tp:
    :param seek_msg_type:
    :param time_out:
    :param max_msg_num: 最多等待多少條信息
    :return:
    """
    _consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
                              group_id='test_group_id',
                              consumer_timeout_ms=time_out)
    _consumer.assign([tp, ])
    _consumer.seek(tp, offset)
    msg_results = []
    recv_msg_num = 0
    for msg in _consumer:
        if b"body" in msg.value:
            json_msg = json.loads(msg.value.decode('utf-8'))
            if json_msg['type'] == seek_msg_type:
                recv_msg_num += 1
                if json_msg['type'] == MsgType.SWITCH_STATE.value:
                    obj_msg = SwitchStateMsg(json.dumps(json_msg))
                    obj_msg.msg_body = SwitchStateBody(obj_msg.msg_body)

                elif json_msg['type'] == MsgType.TRADE_FEE.value:
                    obj_msg = TradeFeeMsg(json.dumps(json_msg))
                    obj_msg.msg_body = TradeFeeBody(obj_msg.msg_body)

                elif json_msg['type'] == MsgType.SYMBOL_DISCOUNT.value:
                    obj_msg = SymbolDiscountMsg(json.dumps(json_msg))
                    obj_msg.msg_body = SymbolDiscountBody(obj_msg.msg_body)

                msg_results.append(obj_msg)
                if recv_msg_num >= max_msg_num:
                    break

    _consumer.close()
    return msg_results

實際檢索消息的代碼

tp, offset = get_latest_offset()

self.comm_api.set_user_point_switch(switch_on=True, token=token)

msg_res = seek_msg_from_kafka(offset, tp, seek_msg_type=MsgType.SWITCH_STATE.value, max_msg_num=2)

self.verify_msg(msg_res, user_tag)
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌画恰,老刑警劉巖哮翘,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件山叮,死亡現(xiàn)場離奇詭異,居然都是意外死亡洋幻,警方通過查閱死者的電腦和手機碳却,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門队秩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人昼浦,你說我怎么就攤上這事馍资。” “怎么了关噪?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵鸟蟹,是天一觀的道長。 經(jīng)常有香客問我使兔,道長建钥,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任虐沥,我火速辦了婚禮熊经,結果婚禮上,老公的妹妹穿的比我還像新娘欲险。我一直安慰自己镐依,他們只是感情好,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布天试。 她就那樣靜靜地躺著槐壳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喜每。 梳的紋絲不亂的頭發(fā)上务唐,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天,我揣著相機與錄音灼卢,去河邊找鬼绍哎。 笑死,一個胖子當著我的面吹牛鞋真,可吹牛的內容都是我干的崇堰。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼涩咖,長吁一口氣:“原來是場噩夢啊……” “哼海诲!你這毒婦竟也來了?” 一聲冷哼從身側響起檩互,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤特幔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后闸昨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蚯斯,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡薄风,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拍嵌。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遭赂。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖横辆,靈堂內的尸體忽然破棺而出撇他,到底是詐尸還是另有隱情,我是刑警寧澤狈蚤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布困肩,位于F島的核電站,受9級特大地震影響脆侮,放射性物質發(fā)生泄漏锌畸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一他嚷、第九天 我趴在偏房一處隱蔽的房頂上張望蹋绽。 院中可真熱鬧,春花似錦筋蓖、人聲如沸卸耘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚣抗。三九已至,卻和暖如春瓮下,著一層夾襖步出監(jiān)牢的瞬間翰铡,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工讽坏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锭魔,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓路呜,卻偏偏與公主長得像迷捧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子胀葱,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內容