python:kafka to mongo

直接上code

import pymysql
import configparser
from confluent_kafka import KafkaError,Consumer, KafkaException
from bson import json_util
import pandas as pd
import hashlib
import logging
import os
import datetime
import time
import sqlalchemy
from confluent_kafka.avro.serializer import SerializerError
from pymongo import MongoClient

class kafkaconsumer():
    def __init__(self):
        self.config = configparser.RawConfigParser()
        self.config.read('./config.cfg')

        if not os.path.exists('./Log'):
            os.makedirs('./Log')   
        self.logger=logging.getLogger('writeMysql')
        self.logger.setLevel(logging.INFO)
        fh = logging.FileHandler('Log/writeMysql.log')
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)  

        self.GatewayName = 'Test_Kafka'
        self.MongoClient = MongoClient(self.config.get('MongoDB','uri'))

    def connect_kafka(self):  
        global consumer
        epoch_start = datetime.datetime(1970, 1, 1)
        Count = 0
        TopicBuffer = {}
        try :
            Source_Kafka_Consumer = Consumer({
                'bootstrap.servers':self.config.get('Source_Kafka','kafkaservers')
                ,'group.id':self.config.get('Source_Kafka','groupID')
                ,'auto.offset.reset':self.config.get('Source_Kafka','offsetReset')
                , 'session.timeout.ms': 6000
            }, logger=self.logger)
            Source_Kafka_Topics = self.config.get('Source_Kafka','topics').split(',')
            json_util.DEFAULT_JSON_OPTIONS.strict_uuid = True
            consumer = Source_Kafka_Consumer
            consumer.subscribe(Source_Kafka_Topics)
            print('Kafka connect Successfully !')

        except Exception as inst :
            Source_Kafka_Consumer = None
            consumer = None
            print('Source Kafka Consumer init fail')
            print(inst)

        mongodb = self.MongoClient['Kafka']
            
        try:
            while True:
                try :
                    msg = consumer.poll(1)
                except SerializerError as e:
                    print('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
                        msg.topic(),
                        msg.partition(),
                        msg.offset(),
                        e
                    ))
                    logging.error('Message deserialization failed for message at {} [{}] offset {}: {}'.format(
                        msg.topic(),
                        msg.partition(),
                        msg.offset(),
                        e
                    ))
                    continue

                if msg is None:
                    continue
                if msg.error():
                    print('Consumer error: {}'.format(msg.error()))
                    continue
                # data = msg.value()
                data = json_util.loads(msg.value())
                
                if data.get('evt_dt',None) != None :
                    delta = datetime.timedelta(milliseconds=data['evt_dt'])
                    data['evt_dt2'] = (epoch_start + delta)
                    data['evt_dt2_China'] = (epoch_start + delta + datetime.timedelta(hours=8))

                Count += 1
                print(data)
                if TopicBuffer.get(msg.topic()) is None :
                    TopicBuffer[msg.topic()] = []
                TopicBuffer[msg.topic()].append(data)

                if Count % 100 == 0 :
                    print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
                    logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
                    
                    topics = TopicBuffer.keys()
                    for topic in topics :
                        col = mongodb[topic]
                        if len(TopicBuffer[topic]) > 0 :
                            try :
                                col.insert_many(TopicBuffer[topic])
                                TopicBuffer[topic] = []
                            except Exception as inst :
                                print('Mongo write fail')
                                print(inst)
                if Count % 10000 == 0 :
                    Count = 0
        except KeyboardInterrupt:
            print ('KeyboardInterrupt')
        except Exception as inst :
            print (inst)

        if Count % 100 != 0 :
            print('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
            logging.info('Message {0} is Processed At {1}'.format(Count,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
            topics = TopicBuffer.keys()
            for topic in topics :
                col = mongodb[topic]
                if len(TopicBuffer[topic]) > 0 :
                    try :
                        col.insert_many(TopicBuffer[topic])
                        TopicBuffer[topic] = []
                    except Exception as inst :
                        print('Mongo write fail')
                        print(inst)

        consumer.unsubscribe()
        consumer.close()
        print('{0} Finish'.format(self.GatewayName))
        logging.info('{0} Finish'.format(self.GatewayName))

if __name__ == '__main__':
    commonObj = kafkaconsumer()
    commonObj.connect_kafka()

config

[Source_Kafka]
kafkaservers = 
groupID =  
offsetReset = earliest
topics = 

[MongoDB]
uri = mongodb://XXX:XXX
蟹蟹.jpg
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載泪掀,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者论巍。
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子量九,更是在濱河造成了極大的恐慌,老刑警劉巖舷丹,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涮毫,死亡現(xiàn)場離奇詭異,居然都是意外死亡呢铆,警方通過查閱死者的電腦和手機(jī)晦鞋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來棺克,“玉大人悠垛,你說我怎么就攤上這事∧纫辏” “怎么了确买?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長纱皆。 經(jīng)常有香客問我湾趾,道長芭商,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任搀缠,我火速辦了婚禮铛楣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘艺普。我一直安慰自己簸州,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布歧譬。 她就那樣靜靜地躺著岸浑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瑰步。 梳的紋絲不亂的頭發(fā)上矢洲,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天,我揣著相機(jī)與錄音面氓,去河邊找鬼兵钮。 笑死,一個胖子當(dāng)著我的面吹牛舌界,可吹牛的內(nèi)容都是我干的掘譬。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼呻拌,長吁一口氣:“原來是場噩夢啊……” “哼葱轩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起藐握,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤靴拱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后猾普,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體袜炕,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片零如。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖陌知,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情掖肋,我是刑警寧澤仆葡,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站志笼,受9級特大地震影響沿盅,放射性物質(zhì)發(fā)生泄漏把篓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一嗡呼、第九天 我趴在偏房一處隱蔽的房頂上張望纸俭。 院中可真熱鬧,春花似錦南窗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至呜袁,卻和暖如春敌买,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背阶界。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工虹钮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人膘融。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓芙粱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親氧映。 傳聞我的和親對象是個殘疾皇子春畔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一。spring-cloud-bus是什么岛都? 回答這個問題之前律姨,我們先回顧先前的分布式配置,當(dāng)配置中心發(fā)生變化后臼疫,...
    wingsoldold閱讀 452評論 0 0
  • 為什么學(xué)習(xí)Python择份? 通過什么途徑學(xué)習(xí)的Python? 上網(wǎng)收集視頻烫堤,資料 關(guān)注公證號 買教程荣赶,書籍 Pyth...
    130920閱讀 1,216評論 0 0
  • 關(guān)鍵詞: 接口自動化測試 接口測試流程 確定測試接口的工具 —> 配置需要的接口參數(shù) —> 進(jìn)行測試 —> 檢查測...
    Jeff_9021閱讀 1,150評論 0 7
  • 夜鶯2517閱讀 127,720評論 1 9
  • 版本:ios 1.2.1 亮點(diǎn): 1.app角標(biāo)可以實時更新天氣溫度或選擇空氣質(zhì)量,建議處女座就不要選了塔逃,不然老想...
    我就是沉沉閱讀 6,898評論 1 6