閑的無聊爬了下維基百科有關(guān)古羅馬的數(shù)據(jù),爬取模式是分布式+增量爬取。數(shù)據(jù)爬完了項目卻沒有停手济丘,因為個人興趣開始研究python3.5加入的異步特性,經(jīng)過一段時間的添添補補洽蛀,一個簡單的小爬蟲就這樣誕生了~
本框架基于asyncio摹迷,aiohttp及redis(分布式模式需要)。目前已上架git和pypi郊供,名字取自畢生對抗羅馬共和國的迦太基名將漢尼拔峡碉。
git地址:JorgenLiu/hannibal
閑話少說,下面是框架的介紹驮审。
Mission
框架以每個url作為一個爬取單位鲫寄,使用時需將url封裝成任務(wù)Mission,序列化后放置入爬取隊列疯淫。
關(guān)于此種設(shè)計思路的解釋:
誠然大部分爬取工作只是不同的url地来,但目前遇到的一種情況是對某個相同的url通過POST不同的參數(shù)來獲取不同的數(shù)據(jù),故而將url封裝成Mission用來爬取熙掺。
Mission類的定義:
class Mission(object):
def __init__(self, unique_tag, url, method='GET', data=None, data_type='data'):
self.unique_tag = str(unique_tag)
self.url = url
if method not in ['GET', 'POST', 'PUT', 'DELETE']:
raise ValueError('invalid method')
else:
self.method = method
self.data = data
if data_type not in ['json', 'data']:
raise ValueError('invalid data type')
else:
self.data_type = data_type
unique_tag字段取代了url未斑,作為每個任務(wù)的唯一標(biāo)識。Mission類實現(xiàn)了serialize和deserialize方法币绩,分別負(fù)責(zé)對任務(wù)進行序列化和反序列化蜡秽,并分別在存入和取出任務(wù)時進行調(diào)用府阀。以下為定義Mission的示例:
方法為GET時:
Mission(unique_tag='1', url='http://httpbin.org/get?t=1')
方法為POST時:
Mission(unique_tag='4377', url='https://www.xytzq.cn:9443/tzq/pc/project/getProjectInfo',
method='POST',
data={'projectid': 4377})
Mission定義后,需加入采集隊列內(nèi)载城〖∷疲可通過調(diào)用隊列的 init_queue([mission_list]) (初始化queue,將一個Mission列表序列化后加入隊列)方法或enqueue (將單個Mission加入隊列)方法加入隊列诉瓦。
Collector
Collector類負(fù)責(zé)進行網(wǎng)頁內(nèi)容的爬取川队,目前實現(xiàn)的模式有兩種:
- 根據(jù)預(yù)設(shè)好的url列表進行抓取。
- 根據(jù)一個起點url睬澡,抓取后進行解析固额,將解析出的url列表添加至爬取隊列中進行增量抓取。
目前已實現(xiàn)兩種Collector:
- LocalCollector煞聪,本地運行的單進程采集節(jié)點斗躏,聲明時需傳入一個異步的解析函數(shù),負(fù)責(zé)對頁面內(nèi)容進行解析昔脯。
- DistributeCollector啄糙,通過redis隊列與解析節(jié)點進行交互的分布式采集節(jié)點,只負(fù)責(zé)爬取頁面云稚,并將爬取的頁面內(nèi)容放入解析隊列隧饼,由解析節(jié)點進行解析。
聲明LocalCollector需要以下參數(shù):
- mission_queue静陈,任務(wù)隊列
- href_pool燕雁,鏈接去重池
- parser_function,一個異步函數(shù)鲸拥,接受一個response對象拐格,可通過調(diào)用extract_json或extract_html方法從response對象中提取json或普通html數(shù)據(jù)。LocalCollector不需要額外的解析節(jié)點刑赶,會調(diào)用這個異步的解析函數(shù)對爬取的響應(yīng)體進行解析捏浊,從中提取數(shù)據(jù)進行操作。
- cache_size撞叨,默認(rèn)為3金踪,控制請求并發(fā)數(shù)。
DistributeCollector聲明所需參數(shù)和LocalCollector基本一致谒所,但不需要定義parser_function,取而代之的是一個額外的parse_queue沛申,爬取的結(jié)果會被序列化后放入parse_queue這個解析隊列中劣领,等待解析節(jié)點進行解析。
Collector支持注冊錯誤處理器铁材。注冊錯誤處理器需傳入一個錯誤狀態(tài)碼(int)和一個錯誤處理函數(shù)尖淘。錯誤處理函數(shù)接受一個response對象和一個url奕锌,可對錯誤進行處理。
Collector支持注冊爬取前中間件村生。中間件是一個同步函數(shù)惊暴,接受一個mission對象,將在爬取前執(zhí)行趁桃×苫埃可注冊多個中間件,執(zhí)行順序與注冊順序相同卫病。
以下是聲明一個爬取httpbin的LocalCollector的案例:
from hannibal.spider import LocalCollector
from hannibal.util import MemPool, MemQueue, extract_json, Mission
pool = MemPool(name='http_bin')
queue = MemQueue(name='http_bin', limited=True)
async def collect_function(response):
json_obj = await extract_json(response)
print(json_obj)
def demo_handler(response, url):
print('handel 400')
def demo_before_collect_middleware(mission):
print('this is url: %s' % mission.url)
def demo_before_collect_middleware1(mission):
print('this is tag: %s' % mission.unique_tag)
def ping_http_bin():
url_list = [Mission(unique_tag=i, url='http://httpbin.org/get?t=%d' % i) for i in range(1, 500)]
queue.init_queue(url_list)
collector = LocalCollector(mission_queue=queue, href_pool=pool, parse_function=collect_function, cache_size=10)
collector.register_middleware(demo_before_collect_middleware)
collector.register_middleware(demo_before_collect_middleware1)
collector.register_error_handler(400, demo_handler)
collector.conquer()
if __name__ == '__main__':
ping_http_bin()