Python-logging實(shí)現(xiàn)分布式日志


title: Python實(shí)現(xiàn)分布式日志
date: 2019-06-25 08:55:44
tags:

  • python
  • logging
    categories:
  • 開(kāi)發(fā)
  • python
  • logging

Python實(shí)現(xiàn)分布式日志

分布式日志初探

? 在寫(xiě)分布式爬蟲(chóng)過(guò)程中,需要打印一些關(guān)鍵性日志丰歌,但是程序是分布在各個(gè)機(jī)器上,這樣不便于我們程序的日志的統(tǒng)計(jì),以及錯(cuò)誤代碼的查看扇住;

? 所以我查看了關(guān)于logging的教程峡捡,顯示logging的handler存在以下幾種:

StreamHandler:logging.StreamHandler型诚;日志輸出到流,可以是sys.stderr场勤,sys.stdout或者文件
FileHandler:logging.FileHandler戈锻;日志輸出到文件
BaseRotatingHandler:logging.handlers.BaseRotatingHandler;基本的日志回滾方式
RotatingHandler:logging.handlers.RotatingHandler和媳;日志回滾方式格遭,支持日志文件最大數(shù)量和日志文件回滾
TimeRotatingHandler:logging.handlers.TimeRotatingHandler;日志回滾方式留瞳,在一定時(shí)間區(qū)域內(nèi)回滾日志文件
SocketHandler:logging.handlers.SocketHandler拒迅;遠(yuǎn)程輸出日志到TCP/IP sockets
DatagramHandler:logging.handlers.DatagramHandler;遠(yuǎn)程輸出日志到UDP sockets
SMTPHandler:logging.handlers.SMTPHandler她倘;遠(yuǎn)程輸出日志到郵件地址
SysLogHandler:logging.handlers.SysLogHandler璧微;日志輸出到syslog
NTEventLogHandler:logging.handlers.NTEventLogHandler;遠(yuǎn)程輸出日志到Windows NT/2000/XP的事件日志
MemoryHandler:logging.handlers.MemoryHandler帝牡;日志輸出到內(nèi)存中的指定buffer
HTTPHandler:logging.handlers.HTTPHandler往毡;通過(guò)"GET"或者"POST"遠(yuǎn)程輸出到HTTP服務(wù)器

其中我特別注意到的handler是SocketHandler, handler可以實(shí)現(xiàn)socket發(fā)送日志靶溜;

? 而聯(lián)想到使用es來(lái)存取日志开瞭,本來(lái)是想用es api來(lái)接收日志(這樣就可以使用HTTPHandler),但是http畢竟是上層協(xié)議罩息,發(fā)送可能會(huì)慢嗤详,所以繼續(xù)找到了logstash, 作為ELK的一員瓷炮,我們可以先發(fā)送到logstash再轉(zhuǎn)發(fā)到es葱色,接下來(lái)就查看logstash是否有python開(kāi)源包,接下來(lái)就發(fā)現(xiàn)了python-logstash娘香。

安裝并運(yùn)行l(wèi)ogstash

  • 安裝java
  • 進(jìn)入官網(wǎng)下載安裝包
  • 上傳服務(wù)器并解壓(這里使用的是6.3.2)
tar -zxvf logstash-6.3.2.tar.gz
  • 編寫(xiě)logstash的配置文件
input {                                                                           
    udp {                                                                         
       port => 5959                                                              
       codec => json                                                             
    }                                                                             
}                                                                                 
                                                                                  
output {                                                                          
    elasticsearch{                                                                
       hosts => ["192.168.1.17:9200", "192.168.1.18:9200", "192.168.1.19:9200"]  
       index => "crawler-%{+YYYY.MM.dd}"                                         
    }                                                                             
    stdout{                                                                       
       codec => rubydebug                                                        
    }                                                                             
}                                                     
  • 運(yùn)行l(wèi)ogstash
logstash -f logstash.conf
  • 截圖
logstash運(yùn)行成功.jpg

使用python-logstash

代碼如下:

import logging
import logstash
import sys

host = 'localhost'

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))

extra = {
    'test_string': 'python version: ' + repr(sys.version_info),
    'test_boolean': True,
    'test_dict': {'a': 1, 'b': 'c'},
    'test_float': 1.23,
    'test_integer': 123,
    'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)

運(yùn)行后logstash顯示:


python-logstash.png

查看es中是否接收到數(shù)據(jù):


log-es.png

以上苍狰,我們基本的分布式日志已經(jīng)發(fā)送測(cè)試成功;

深入研究logging的日志

? 第三方庫(kù)中的日志格式并不滿(mǎn)足我們的需求烘绽,所以我們可以對(duì)其進(jìn)行修改(借鑒其源碼):

源碼:詳細(xì)代碼

class LogstashFormatterBase(logging.Formatter):

    def __init__(self, message_type='Logstash', tags=None, fqdn=False):
        pass
    def get_extra_fields(self, record):
        pass

    def get_debug_fields(self, record):
        pass

    @classmethod
    def format_source(cls, message_type, host, path):
        pass

    @classmethod
    def format_timestamp(cls, time):
        pass

    @classmethod
    def format_exception(cls, exc_info):
        pass

    @classmethod
    def serialize(cls, message):
        pass

這里我們發(fā)現(xiàn)其繼承了Formatter淋昭;查看Logging官方文檔可以得知,實(shí)現(xiàn)自定義format只要實(shí)現(xiàn)其format方法就可以了安接,第一次我直接繼承了python-logstash的LogstashFormatterVersion1:

class LogstashFormat(LogstashFormatterVersion1):

    def format(self, record):
        message = {
            '@timestamp': self.format_timestamp(record.created),
            '@message': record.getMessage(),
            'log_level': record.levelname,
            'log_file': record.filename,
            'line_no': record.lineno,
            'host': self.host,
        }
        message.update(self.get_extra_fields(record))

        return self.serialize(message)

    @classmethod
    def format_timestamp(cls, time):
        current_time = datetime.datetime.fromtimestamp(time)
        return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
                        ".%03d"%(current_time.microsecond / 1000),
                        "Z"])
 
def init_logstash_logger(level, host, port):
    logger = logging.getLogger('python-logstash-logger')
    logger.setLevel(LEVELS.get(level))
    logstash_format = LogstashFormat()
    logstash_handler = LogstashHandler(host, port)
    logstash_handler.setFormatter(logstash_format)
    logger.addHandler(logstash_handler)
    return logger

? 修改了其對(duì)時(shí)間的處理翔忽,以及format返回的處理,這里我們實(shí)現(xiàn)一個(gè)初始化方法進(jìn)行測(cè)試盏檐;

執(zhí)行:

exec_init.png

結(jié)果:

exec_logstash.png

以上我們就可以自定義歇式,自己的代碼發(fā)送到logstash上了。

es會(huì)對(duì)上傳的日志進(jìn)行日期建立索引

es_index.png

提供發(fā)送日志到kafka源碼:

要求: python3.7

庫(kù): confluent-kafka-python

import datetime                                                                        
import json                                                                            
import logging                                                                         
                                                                                       
from logging import Formatter                                                          
from logging.handlers import QueueHandler                                              
from confluent_kafka import Producer                                                   
                                                                                       
                                                                                       
produce = Producer({'bootstrap.servers': '192.168.11.5:9092'})                         
                                                                                       
class KafkaQueueHandler(QueueHandler):                                                 
                                                                                       
    def emit(self, record):                                                            
    ┆   self.queue.poll(0)                                                             
    ┆   self.queue.produce('test', self.format(record), callback=self.delivery_report) 
    ┆   self.queue.flush()                                                             
                                                                                       
    def delivery_report(self, err, msg):                                               
    ┆   if err is not None:                                                            
    ┆   ┆   print('message:%s', err)                                                   
    ┆   else:                                                                          
    ┆   ┆   print('message deliverd %s', msg.topic())                                  
                                                                                       
                                                                                       
class QueueFormat(Formatter):                                                          
                                                                                       
    def __init__(self, topic):                                                         
        self.topic = topic                                                             
    ┆   super(QueueFormat, self).__init__()                                            
                                                                                       
    def get_extra_fields(self, record):                                                
    ┆   skip_list = (                                                                  
    ┆   ┆   'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',          
    ┆   ┆   'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',              
    ┆   ┆   'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',         
    ┆   ┆   'processName', 'relativeCreated', 'thread', 'threadName', 'extra',         
    ┆   ┆   'auth_token', 'password'                                                   
    ┆   )                                                                              
    ┆   easy_types = (str, bool, dict, float, int, list, type(None))                   
                                                                                       
    ┆   fields = {}                                                                    
    ┆   for key, value in record.__dict__.items():                                     
    ┆   ┆   if key not in skip_list:                                                   
    ┆   ┆   ┆   if isinstance(value, easy_types):                                      
    ┆   ┆   ┆   ┆   fields[key] = value                                                
    ┆   ┆   ┆   else:                                                                  
    ┆   ┆   ┆   ┆   fields[key] = repr(value)                                          
                                                                                       
    ┆   return fields                                                                  
                                                                                       
    @classmethod                                                                       
    def format_timestamp(cls, time):                                                   
    ┆   current_time = datetime.datetime.fromtimestamp(time)                           
    ┆   return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),                    
    ┆   ┆   ┆   ┆   ┆   ".%03d"%(current_time.microsecond / 1000),                     
    ┆   ┆   ┆   ┆   ┆   "Z"])                                                          
                          @classmethod                                                                    
    def format_exception(cls, exc_info):                                            
    ┆   return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''   
                                                                                    
    @classmethod                                                                    
    def serialize(cls, message):                                                    
    ┆   return bytes(json.dumps(message), 'utf-8')                                  
                                                                                    
    def format(self, record):                                                       
    ┆   message = {                                                                 
    ┆   ┆   '@timestamp': self.format_timestamp(record.created),                    
    ┆   ┆   '@message': record.getMessage(),                                        
    ┆   ┆   'log_level': record.levelname,                                          
    ┆   ┆   'log_file': record.filename,                                            
    ┆   ┆   'line_no': record.lineno,                                               
    ┆   ┆   'topic': self.topic                                                     
    ┆   }                                                                           
    ┆   message.update(self.get_extra_fields(record))                               
    ┆   return self.serialize(message)                                              
                                                                                    
logger = logging.getLogger('kafka')                                                 
kafka_handle = KafkaQueueHandler(produce)                                           
log_format = QueueFormat('test')                                                    
kafka_handle.setFormatter(log_format)                                               
                                                                                    
logger.setLevel(logging.DEBUG)                                                      
logger.addHandler(kafka_handle)                                                     
logger.info('aaaa')                                                         

向kafka發(fā)送日志后胡野,可以使用logstash的插件從kafka中讀取日志材失,發(fā)送到es。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末给涕,一起剝皮案震驚了整個(gè)濱河市豺憔,隨后出現(xiàn)的幾起案子额获,更是在濱河造成了極大的恐慌,老刑警劉巖恭应,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抄邀,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡昼榛,警方通過(guò)查閱死者的電腦和手機(jī)境肾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)胆屿,“玉大人奥喻,你說(shuō)我怎么就攤上這事》羌#” “怎么了环鲤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)憎兽。 經(jīng)常有香客問(wèn)我冷离,道長(zhǎng),這世上最難降的妖魔是什么纯命? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任西剥,我火速辦了婚禮,結(jié)果婚禮上亿汞,老公的妹妹穿的比我還像新娘瞭空。我一直安慰自己,他們只是感情好疗我,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布咆畏。 她就那樣靜靜地躺著,像睡著了一般吴裤。 火紅的嫁衣襯著肌膚如雪鳖眼。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,829評(píng)論 1 290
  • 那天嚼摩,我揣著相機(jī)與錄音,去河邊找鬼矿瘦。 笑死枕面,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缚去。 我是一名探鬼主播潮秘,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼易结!你這毒婦竟也來(lái)了枕荞?” 一聲冷哼從身側(cè)響起柜候,我...
    開(kāi)封第一講書(shū)人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎躏精,沒(méi)想到半個(gè)月后渣刷,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡矗烛,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年辅柴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞭吃。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡碌嘀,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出歪架,到底是詐尸還是另有隱情股冗,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布和蚪,位于F島的核電站止状,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏惠呼。R本人自食惡果不足惜导俘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望剔蹋。 院中可真熱鬧旅薄,春花似錦、人聲如沸泣崩。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)矫付。三九已至凯沪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間买优,已是汗流浹背妨马。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留杀赢,地道東北人烘跺。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像脂崔,于是被迫代替她去往敵國(guó)和親滤淳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容