爬蟲(chóng)服務(wù)在服務(wù)器上跑著淋叶,心里面難免會(huì)犯嘀咕仑荐,
爬蟲(chóng)死掉了怎么辦雕拼?
爬蟲(chóng)漏了數(shù)據(jù)怎么辦?
爬蟲(chóng)被網(wǎng)站封禁了怎么辦粘招?
目標(biāo)網(wǎng)站掛了怎么辦啥寇?
返回頁(yè)面錯(cuò)誤或被跳轉(zhuǎn)怎么辦?
...
以上來(lái)自一個(gè)被迫害妄想癥患者的自白(誤)
一次性爬取數(shù)據(jù)可以通過(guò)人工看日志來(lái)判斷洒扎,不行就多爬幾遍示姿,
但如果是放在服務(wù)器上定時(shí)爬取的服務(wù)怎么辦?尤其是已經(jīng)部署在Docker中的爬蟲(chóng)服務(wù)逊笆。
如果每天都登進(jìn)服務(wù)器查看Scrapy本地的日志信息栈戳,還是很麻煩的。
那么我們就需要一個(gè)特定的日志來(lái)存放我們的統(tǒng)計(jì)數(shù)據(jù)难裆,每天爬取完畢后發(fā)送郵件給管理員子檀。
這樣的話爬蟲(chóng)出現(xiàn)問(wèn)題時(shí)管理員就能很快知道,還要能夠統(tǒng)計(jì)爬取的數(shù)據(jù)乃戈。
本文主要描述嘗試在Scrapy爬蟲(chóng)的過(guò)程中構(gòu)建工具的思路與體驗(yàn)褂痰,代碼寫(xiě)的太水就算了(捂臉)
在Scrapy框架中本身內(nèi)置Logging,但因?yàn)槭浅鯇W(xué)Scrapy症虑,不會(huì)構(gòu)建自己的框架缩歪。
因?yàn)長(zhǎng)ogging模塊「Level Info」輸出就會(huì)多出很多無(wú)用的信息,排版不清晰和沒(méi)法提醒等問(wèn)題谍憔。
所以在Scrapy.logging輸出日志到文件的基礎(chǔ)上匪蝙,寫(xiě)了自己的日志統(tǒng)計(jì)工具來(lái)監(jiān)控爬蟲(chóng)的狀態(tài)。
設(shè)計(jì)思路
在爬蟲(chóng)書(shū)寫(xiě)的過(guò)程中遇到的問(wèn)題還是很多的习贫,比如:
目標(biāo)網(wǎng)站宕機(jī)或請(qǐng)求錯(cuò)誤(404,500,503)
指定DOM不存在或Response.text返回錯(cuò)誤內(nèi)容
網(wǎng)站請(qǐng)求池過(guò)載
觸發(fā)假數(shù)據(jù)或IP被封禁
數(shù)據(jù)庫(kù)報(bào)錯(cuò)或操作失敗
...
除此之外還希望能夠統(tǒng)計(jì)以下的數(shù)據(jù)逛球,比如:
爬取成功的分頁(yè)條數(shù)
爬過(guò)的信息條數(shù)
已有數(shù)據(jù)的重復(fù)條數(shù)
爬蟲(chóng)開(kāi)始時(shí)間 / 結(jié)束時(shí)間 / 耗時(shí)
...
圖的左邊是爬蟲(chóng)的基本流程,中間部分是觸發(fā)事件苫昌,右側(cè)是執(zhí)行的統(tǒng)計(jì)操作
整理之后得到以下JSON格式來(lái)存儲(chǔ)統(tǒng)計(jì)信息
stat.log = {
'time': { # 全局時(shí)間
'start_time': 0, # 爬蟲(chóng)開(kāi)始時(shí)間
'end_time': 0, # 爬蟲(chóng)結(jié)束時(shí)間
'consume_time': 0, # 爬蟲(chóng)總耗時(shí)
},
'spider1': { # 爬蟲(chóng)1的信息
'request_sucess': 0, # 分頁(yè)請(qǐng)求成功
'request_error': 0, # 分頁(yè)請(qǐng)求失敗
'data_crawl': 0, # 爬過(guò)的數(shù)據(jù)項(xiàng) (爬蟲(chóng)發(fā)現(xiàn)的總條目
'data_new': 0, # 新增的數(shù)據(jù)項(xiàng) (不重復(fù)并成功寫(xiě)庫(kù)的
'data_error': 0, # 錯(cuò)誤的數(shù)據(jù)項(xiàng) (數(shù)據(jù)項(xiàng)內(nèi)容請(qǐng)求錯(cuò)誤的
'data_illegal': 0, # 違規(guī)的數(shù)據(jù)項(xiàng) (數(shù)據(jù)項(xiàng)格式檢查錯(cuò)誤的
'data_repeat': 0, # 重復(fù)的數(shù)據(jù)項(xiàng) (數(shù)據(jù)庫(kù)中已有重復(fù)數(shù)據(jù)
'db_error': 0, # 數(shù)據(jù)庫(kù)錯(cuò)誤項(xiàng) (數(shù)據(jù)庫(kù)連接錯(cuò)誤等
'db_operate': 0, # 數(shù)據(jù)庫(kù)操作項(xiàng) (讀寫(xiě)等操作返回值異常
},
'spider2': { # 爬蟲(chóng)2的信息
'request_success': 0, # ...
# ...
}
# ...
}
收集反饋
Scrapy的全局執(zhí)行可以異步執(zhí)行所有爬蟲(chóng)颤绕,
在打開(kāi)爬蟲(chóng)前初始化時(shí)間,結(jié)束后記錄結(jié)束時(shí)間并計(jì)算耗時(shí)
Scrapy管道(pipelines.py)中有從父類繼承來(lái)的方法 open_spider()祟身,
在管道中實(shí)例化類奥务,實(shí)例化后存儲(chǔ)json中初始化對(duì)應(yīng)數(shù)據(jù)字段。
從數(shù)據(jù)庫(kù)中提取已爬取的條數(shù)袜硫,并打印反饋氯葬。
Scrapy中間件(middlewares.py)中爬蟲(chóng)打開(kāi)和頁(yè)面請(qǐng)求進(jìn)行標(biāo)注。
spider_opened() 觸發(fā)顯示爬蟲(chóng)打開(kāi)父款。
process_spider_input() 當(dāng)頁(yè)面請(qǐng)求后觸發(fā)方法溢谤,判斷如果是200成功請(qǐng)求,記錄請(qǐng)求成功憨攒。
在記錄請(qǐng)求成功后輸出當(dāng)前爬蟲(chóng)的json日志世杀,防止爬蟲(chóng)進(jìn)程意外中斷看不到記錄的情況。
process_spider_exception() 當(dāng)頁(yè)面404,500,503...異常時(shí)觸發(fā)該方法肝集,記錄請(qǐng)求錯(cuò)誤瞻坝。
在爬蟲(chóng)執(zhí)行中判斷數(shù)據(jù)的重復(fù)/缺少字段/錯(cuò)誤/新增等情況
日志輸出
日志的輸出情況如下
日志的結(jié)尾的輸出
完整代碼
代碼寫(xiě)的實(shí)在太糟糕了,初學(xué)見(jiàn)諒
# statistics.py
# update /18.03.12.1
import time
import json
import logging
class Statistics():
CUR_LOG = {
'time': {
'start_time': 0, # 開(kāi)始時(shí)間
'end_time': 0, # 結(jié)束時(shí)間
'consume_time': 0, # 共耗時(shí)
},
# 'spider1': {
# 'request_sucess': 0, # 請(qǐng)求成功(分頁(yè)數(shù))
# 'request_error': 0, # 請(qǐng)求錯(cuò)誤
# 'data_crawl': 0, # 爬取到的數(shù)量
# 'data_new': 0, # 獲取到的新數(shù)據(jù)
# 'data_error': 0, # 數(shù)據(jù)出錯(cuò)
# 'data_illegal': 0, # 數(shù)據(jù)格式錯(cuò)誤
# 'data_repeat': 0, # 重復(fù)的數(shù)據(jù)
# 'db_error': 0, # 數(shù)據(jù)庫(kù)系統(tǒng)錯(cuò)誤
# 'db_operate': 0, # 數(shù)據(jù)庫(kù)操作返回錯(cuò)誤
# },
}
logger = logging.getLogger('stat')
def __init__(self, name=None):
if name:
self.CUR_LOG[name] = {}
self.CUR_LOG[name]['request_success'] = 0
self.CUR_LOG[name]['request_error'] = 0
self.CUR_LOG[name]['data_crawl'] = 0
self.CUR_LOG[name]['data_new'] = 0
self.CUR_LOG[name]['data_error'] = 0
self.CUR_LOG[name]['data_illegal'] = 0
self.CUR_LOG[name]['data_repeat'] = 0
self.CUR_LOG[name]['db_error'] = 0
self.CUR_LOG[name]['db_operate'] = 0
def start_time(self):
self.CUR_LOG['time']['start_time'] = time.time()
self.logger.warning('{: <6s}'.format('all') \
+ '{: <17s} '.format('[start_time]') \
+ time.strftime('%Y-%m-%d-%H:%M:%S', time.localtime()))
def end_time(self):
self.CUR_LOG['time']['end_time'] = time.time()
self.CUR_LOG['time']['consume_time'] \
= self.CUR_LOG['time']['end_time'] \
- self.CUR_LOG['time']['start_time']
self.CUR_LOG['time']['start_time'] \
= time.strftime('%Y-%m-%d-%H:%M:%S', time.localtime(self.CUR_LOG['time']['start_time']))
self.CUR_LOG['time']['end_time'] \
= time.strftime('%Y-%m-%d-%H:%M:%S', time.localtime(self.CUR_LOG['time']['end_time']))
hours = '{:0>2s}'.format(str(int(self.CUR_LOG['time']['consume_time'] // 3600)))
minutes = '{:0>2s}'.format(str(int((self.CUR_LOG['time']['consume_time'] // 60) % 60)))
seconds = '{:0>2s}'.format(str(int(self.CUR_LOG['time']['consume_time'] % 60)))
self.CUR_LOG['time']['consume_time'] = hours + ':' + minutes + ':' + seconds
self.logger.warning('{: <6s}'.format('all') \
+ '{: <17s} '.format('[end_time]') \
+ time.strftime('%Y-%m-%d-%H:%M:%S', time.localtime()))
self.logger.warning('{: <6s}'.format('all') \
+ '{: <17s} '.format('[consume_time]') \
+ self.CUR_LOG['time']['consume_time'])
def add_request_success(self, name, msg='', num=1):
self.CUR_LOG[name]['request_success'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[request_success]') \
+ str(msg).replace('\n', ' '))
def add_request_error(self, name, msg='', num=1):
self.CUR_LOG[name]['request_error'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[request_error]') \
+ str(msg).replace('\n', ' '))
def add_data_crawl(self, name, msg='', num=1):
self.CUR_LOG[name]['data_crawl'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[data_crawl]') \
+ str(num))
def add_data_new(self, name, msg='', num=1):
self.CUR_LOG[name]['data_new'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[data_new]') \
+ str(msg).replace('\n', ' '))
def add_data_error(self, name, msg='', num=1):
self.CUR_LOG[name]['data_error'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[data_error]') \
+ str(msg).replace('\n', ' '))
def add_data_illegal(self, name, msg='', num=1):
self.CUR_LOG[name]['data_illegal'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[data_illegal]') \
+ str(msg).replace('\n', ' '))
def add_data_repeat(self, name, msg='', num=1):
self.CUR_LOG[name]['data_repeat'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[data_repeat]') \
+ str(msg).replace('\n', ' '))
def add_db_error(self, name, msg='', num=1):
self.CUR_LOG[name]['db_error'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[db_error]') \
+ str(msg).replace('\n', ' '))
def add_db_operate(self, name, msg='', num=1):
self.CUR_LOG[name]['db_operate'] += num
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[db_operate]') \
+ str(msg).replace('\n', ' '))
def json_display(self, name=None):
if name:
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[json_display]') \
+ str(self.CUR_LOG[name]))
else:
self.logger.warning('{: <6s}'.format('all') \
+ '{: <17s} \n'.format('[json_display]') \
+ str(json.dumps(self.CUR_LOG, indent=4)))
def crawled_display(self, name, msg=''):
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[crawled_number]') \
+ str(msg))
def open_display(self, name, msg=''):
self.logger.warning('{: <6s}'.format(name) \
+ '{: <17s} '.format('[opened]'))