title: Python實(shí)現(xiàn)分布式日志
date: 2019-06-25 08:55:44
tags:
- python
- logging
categories: - 開(kāi)發(fā)
- python
- logging
Python實(shí)現(xiàn)分布式日志
分布式日志初探
? 在寫(xiě)分布式爬蟲(chóng)過(guò)程中,需要打印一些關(guān)鍵性日志丰歌,但是程序是分布在各個(gè)機(jī)器上,這樣不便于我們程序的日志的統(tǒng)計(jì),以及錯(cuò)誤代碼的查看扇住;
? 所以我查看了關(guān)于logging的教程峡捡,顯示logging的handler存在以下幾種:
StreamHandler:logging.StreamHandler型诚;日志輸出到流,可以是sys.stderr场勤,sys.stdout或者文件
FileHandler:logging.FileHandler戈锻;日志輸出到文件
BaseRotatingHandler:logging.handlers.BaseRotatingHandler;基本的日志回滾方式
RotatingHandler:logging.handlers.RotatingHandler和媳;日志回滾方式格遭,支持日志文件最大數(shù)量和日志文件回滾
TimeRotatingHandler:logging.handlers.TimeRotatingHandler;日志回滾方式留瞳,在一定時(shí)間區(qū)域內(nèi)回滾日志文件
SocketHandler:logging.handlers.SocketHandler拒迅;遠(yuǎn)程輸出日志到TCP/IP sockets
DatagramHandler:logging.handlers.DatagramHandler;遠(yuǎn)程輸出日志到UDP sockets
SMTPHandler:logging.handlers.SMTPHandler她倘;遠(yuǎn)程輸出日志到郵件地址
SysLogHandler:logging.handlers.SysLogHandler璧微;日志輸出到syslog
NTEventLogHandler:logging.handlers.NTEventLogHandler;遠(yuǎn)程輸出日志到Windows NT/2000/XP的事件日志
MemoryHandler:logging.handlers.MemoryHandler帝牡;日志輸出到內(nèi)存中的指定buffer
HTTPHandler:logging.handlers.HTTPHandler往毡;通過(guò)"GET"或者"POST"遠(yuǎn)程輸出到HTTP服務(wù)器
其中我特別注意到的handler是SocketHandler, handler可以實(shí)現(xiàn)socket發(fā)送日志靶溜;
? 而聯(lián)想到使用es來(lái)存取日志开瞭,本來(lái)是想用es api來(lái)接收日志(這樣就可以使用HTTPHandler),但是http畢竟是上層協(xié)議罩息,發(fā)送可能會(huì)慢嗤详,所以繼續(xù)找到了logstash, 作為ELK的一員瓷炮,我們可以先發(fā)送到logstash再轉(zhuǎn)發(fā)到es葱色,接下來(lái)就查看logstash是否有python開(kāi)源包,接下來(lái)就發(fā)現(xiàn)了python-logstash娘香。
安裝并運(yùn)行l(wèi)ogstash
- 安裝java
- 進(jìn)入官網(wǎng)下載安裝包
- 上傳服務(wù)器并解壓(這里使用的是6.3.2)
tar -zxvf logstash-6.3.2.tar.gz
- 編寫(xiě)logstash的配置文件
input {
udp {
port => 5959
codec => json
}
}
output {
elasticsearch{
hosts => ["192.168.1.17:9200", "192.168.1.18:9200", "192.168.1.19:9200"]
index => "crawler-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug
}
}
- 運(yùn)行l(wèi)ogstash
logstash -f logstash.conf
- 截圖
使用python-logstash
代碼如下:
import logging
import logstash
import sys
host = 'localhost'
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))
extra = {
'test_string': 'python version: ' + repr(sys.version_info),
'test_boolean': True,
'test_dict': {'a': 1, 'b': 'c'},
'test_float': 1.23,
'test_integer': 123,
'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)
運(yùn)行后logstash顯示:
查看es中是否接收到數(shù)據(jù):
以上苍狰,我們基本的分布式日志已經(jīng)發(fā)送測(cè)試成功;
深入研究logging的日志
? 第三方庫(kù)中的日志格式并不滿(mǎn)足我們的需求烘绽,所以我們可以對(duì)其進(jìn)行修改(借鑒其源碼):
源碼:詳細(xì)代碼
class LogstashFormatterBase(logging.Formatter):
def __init__(self, message_type='Logstash', tags=None, fqdn=False):
pass
def get_extra_fields(self, record):
pass
def get_debug_fields(self, record):
pass
@classmethod
def format_source(cls, message_type, host, path):
pass
@classmethod
def format_timestamp(cls, time):
pass
@classmethod
def format_exception(cls, exc_info):
pass
@classmethod
def serialize(cls, message):
pass
這里我們發(fā)現(xiàn)其繼承了Formatter淋昭;查看Logging官方文檔可以得知,實(shí)現(xiàn)自定義format只要實(shí)現(xiàn)其format方法就可以了安接,第一次我直接繼承了python-logstash的LogstashFormatterVersion1:
class LogstashFormat(LogstashFormatterVersion1):
def format(self, record):
message = {
'@timestamp': self.format_timestamp(record.created),
'@message': record.getMessage(),
'log_level': record.levelname,
'log_file': record.filename,
'line_no': record.lineno,
'host': self.host,
}
message.update(self.get_extra_fields(record))
return self.serialize(message)
@classmethod
def format_timestamp(cls, time):
current_time = datetime.datetime.fromtimestamp(time)
return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
".%03d"%(current_time.microsecond / 1000),
"Z"])
def init_logstash_logger(level, host, port):
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(LEVELS.get(level))
logstash_format = LogstashFormat()
logstash_handler = LogstashHandler(host, port)
logstash_handler.setFormatter(logstash_format)
logger.addHandler(logstash_handler)
return logger
? 修改了其對(duì)時(shí)間的處理翔忽,以及format返回的處理,這里我們實(shí)現(xiàn)一個(gè)初始化方法進(jìn)行測(cè)試盏檐;
執(zhí)行:
結(jié)果:
以上我們就可以自定義歇式,自己的代碼發(fā)送到logstash上了。
es會(huì)對(duì)上傳的日志進(jìn)行日期建立索引
提供發(fā)送日志到kafka源碼:
要求: python3.7
庫(kù): confluent-kafka-python
import datetime
import json
import logging
from logging import Formatter
from logging.handlers import QueueHandler
from confluent_kafka import Producer
produce = Producer({'bootstrap.servers': '192.168.11.5:9092'})
class KafkaQueueHandler(QueueHandler):
def emit(self, record):
┆ self.queue.poll(0)
┆ self.queue.produce('test', self.format(record), callback=self.delivery_report)
┆ self.queue.flush()
def delivery_report(self, err, msg):
┆ if err is not None:
┆ ┆ print('message:%s', err)
┆ else:
┆ ┆ print('message deliverd %s', msg.topic())
class QueueFormat(Formatter):
def __init__(self, topic):
self.topic = topic
┆ super(QueueFormat, self).__init__()
def get_extra_fields(self, record):
┆ skip_list = (
┆ ┆ 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
┆ ┆ 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
┆ ┆ 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
┆ ┆ 'processName', 'relativeCreated', 'thread', 'threadName', 'extra',
┆ ┆ 'auth_token', 'password'
┆ )
┆ easy_types = (str, bool, dict, float, int, list, type(None))
┆ fields = {}
┆ for key, value in record.__dict__.items():
┆ ┆ if key not in skip_list:
┆ ┆ ┆ if isinstance(value, easy_types):
┆ ┆ ┆ ┆ fields[key] = value
┆ ┆ ┆ else:
┆ ┆ ┆ ┆ fields[key] = repr(value)
┆ return fields
@classmethod
def format_timestamp(cls, time):
┆ current_time = datetime.datetime.fromtimestamp(time)
┆ return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
┆ ┆ ┆ ┆ ┆ ".%03d"%(current_time.microsecond / 1000),
┆ ┆ ┆ ┆ ┆ "Z"])
@classmethod
def format_exception(cls, exc_info):
┆ return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''
@classmethod
def serialize(cls, message):
┆ return bytes(json.dumps(message), 'utf-8')
def format(self, record):
┆ message = {
┆ ┆ '@timestamp': self.format_timestamp(record.created),
┆ ┆ '@message': record.getMessage(),
┆ ┆ 'log_level': record.levelname,
┆ ┆ 'log_file': record.filename,
┆ ┆ 'line_no': record.lineno,
┆ ┆ 'topic': self.topic
┆ }
┆ message.update(self.get_extra_fields(record))
┆ return self.serialize(message)
logger = logging.getLogger('kafka')
kafka_handle = KafkaQueueHandler(produce)
log_format = QueueFormat('test')
kafka_handle.setFormatter(log_format)
logger.setLevel(logging.DEBUG)
logger.addHandler(kafka_handle)
logger.info('aaaa')
向kafka發(fā)送日志后胡野,可以使用logstash的插件從kafka中讀取日志材失,發(fā)送到es。