前言
這次分享的文章是我《Python爬蟲開發(fā)與項(xiàng)目實(shí)戰(zhàn)》基礎(chǔ)篇 第七章的內(nèi)容,關(guān)于如何手工打造簡單分布式爬蟲 (如果大家對這本書感興趣的話,可以看一下 試讀樣章),下面是文章的具體內(nèi)容。
本章講的依舊是實(shí)戰(zhàn)項(xiàng)目,實(shí)戰(zhàn)內(nèi)容是打造分布式爬蟲,這對初學(xué)者來說,是一個(gè)不小的挑戰(zhàn)喘蟆,也是一次有意義的嘗試。這次打造的分布式爬蟲采用比較簡單的主從模式鼓鲁,完全手工打造蕴轨,不使用成熟框架,基本上涵蓋了前六章的主要知識點(diǎn)骇吭,其中涉及分布式的知識點(diǎn)是分布式進(jìn)程和進(jìn)程間通信的內(nèi)容橙弱,算是對Python爬蟲基礎(chǔ)篇的總結(jié)。
現(xiàn)在大型的爬蟲系統(tǒng)都是采取分布式爬取結(jié)構(gòu)绵跷,通過此次實(shí)戰(zhàn)項(xiàng)目膘螟,讓大家對分布式爬蟲有一個(gè)比較清晰地了解,為之后系統(tǒng)的講解分布式爬蟲打下基礎(chǔ)碾局,其實(shí)它并沒有多么困難荆残。實(shí)戰(zhàn)目標(biāo):爬取2000個(gè)百度百科網(wǎng)絡(luò)爬蟲詞條以及相關(guān)詞條的標(biāo)題、摘要和鏈接等信息净当,采用分布式結(jié)構(gòu)改寫第六章的基礎(chǔ)爬蟲内斯,使功能更加強(qiáng)大蕴潦。爬取頁面請看圖6.1。
7.1簡單分布式爬蟲結(jié)構(gòu)
本次分布式爬蟲采用主從模式俘闯。主從模式是指由一臺主機(jī)作為控制節(jié)點(diǎn)負(fù)責(zé)所有運(yùn)行網(wǎng)絡(luò)爬蟲的主機(jī)進(jìn)行管理潭苞,爬蟲只需要從控制節(jié)點(diǎn)那里接收任務(wù),并把新生成任務(wù)提交給控制節(jié)點(diǎn)就可以了真朗,在這個(gè)過程中不必與其他爬蟲通信此疹,這種方式實(shí)現(xiàn)簡單利于管理。而控制節(jié)點(diǎn)則需要與所有爬蟲進(jìn)行通信遮婶,因此可以看到主從模式是有缺陷的蝗碎,控制節(jié)點(diǎn)會(huì)成為整個(gè)系統(tǒng)的瓶頸,容易導(dǎo)致整個(gè)分布式網(wǎng)絡(luò)爬蟲系統(tǒng)性能下降旗扑。
此次使用三臺主機(jī)進(jìn)行分布式爬取蹦骑,一臺主機(jī)作為控制節(jié)點(diǎn),另外兩臺主機(jī)作為爬蟲節(jié)點(diǎn)臀防。爬蟲結(jié)構(gòu)如圖7.1所示:
7.2 控制節(jié)點(diǎn)ControlNode
控制節(jié)點(diǎn)主要分為URL管理器眠菇、數(shù)據(jù)存儲(chǔ)器和控制調(diào)度器「ぶ裕控制調(diào)度器通過三個(gè)進(jìn)程來協(xié)調(diào)URL管理器和數(shù)據(jù)存儲(chǔ)器的工作捎废,一個(gè)是URL管理進(jìn)程,負(fù)責(zé)URL的管理和將URL傳遞給爬蟲節(jié)點(diǎn)祟昭,一個(gè)是數(shù)據(jù)提取進(jìn)程缕坎,負(fù)責(zé)讀取爬蟲節(jié)點(diǎn)返回的數(shù)據(jù),將返回?cái)?shù)據(jù)中的URL交給URL管理進(jìn)程篡悟,將標(biāo)題和摘要等數(shù)據(jù)交給數(shù)據(jù)存儲(chǔ)進(jìn)程,最后一個(gè)是數(shù)據(jù)存儲(chǔ)進(jìn)程匾寝,負(fù)責(zé)將數(shù)據(jù)提取進(jìn)程中提交的數(shù)據(jù)進(jìn)行本地存儲(chǔ)搬葬。執(zhí)行流程如圖7.2所示:
7.2.1 URL管理器
URL管理器查考第六章的代碼,做了一些優(yōu)化修改艳悔。由于我們采用set內(nèi)存去重的方式急凰,如果直接存儲(chǔ)大量的URL鏈接,尤其是URL鏈接很長的時(shí)候猜年,很容易造成內(nèi)存溢出抡锈,所以我們采用將爬取過的URL進(jìn)行MD5處理,由于字符串經(jīng)過MD5處理后的信息摘要長度可以128bit乔外,將生成的MD5摘要存儲(chǔ)到set后床三,可以減少好幾倍的內(nèi)存消耗,Python中的MD5算法生成的是32位的字符串杨幼,由于我們爬取的url較少撇簿,md5沖突不大聂渊,完全可以取中間的16位字符串,即16位MD5加密四瘫。同時(shí)添加了save_progress和load_progress方法進(jìn)行序列化的操作汉嗽,將未爬取URL集合和已爬取的URL集合序列化到本地,保存當(dāng)前的進(jìn)度找蜜,以便下次恢復(fù)狀態(tài)饼暑。URL管理器URLManager.py代碼如下:
#coding:utf-8
import cPickle
import hashlib
class UrlManager(object):
def __init__(self):
self.new_urls = self.load_progress('new_urls.txt')#未爬取URL集合
self.old_urls = self.load_progress('old_urls.txt')#已爬取URL集合
def has_new_url(self):
'''
判斷是否有未爬取的URL
:return:
'''
return self.new_url_size()!=0
def get_new_url(self):
'''
獲取一個(gè)未爬取的URL
:return:
'''
new_url = self.new_urls.pop()
m = hashlib.md5()
m.update(new_url)
self.old_urls.add(m.hexdigest()[8:-8])
return new_url
def add_new_url(self,url):
'''
將新的URL添加到未爬取的URL集合中
:param url:單個(gè)URL
:return:
'''
if url is None:
return
m = hashlib.md5()
m.update(url)
url_md5 = m.hexdigest()[8:-8]
if url not in self.new_urls and url_md5 not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self,urls):
'''
將新的URLS添加到未爬取的URL集合中
:param urls:url集合
:return:
'''
if urls is None or len(urls)==0:
return
for url in urls:
self.add_new_url(url)
def new_url_size(self):
'''
獲取未爬取URL集合的s大小
:return:
'''
return len(self.new_urls)
def old_url_size(self):
'''
獲取已經(jīng)爬取URL集合的大小
:return:
'''
return len(self.old_urls)
def save_progress(self,path,data):
'''
保存進(jìn)度
:param path:文件路徑
:param data:數(shù)據(jù)
:return:
'''
with open(path, 'wb') as f:
cPickle.dump(data, f)
def load_progress(self,path):
'''
從本地文件加載進(jìn)度
:param path:文件路徑
:return:返回set集合
'''
print '[+] 從文件加載進(jìn)度: %s' % path
try:
with open(path, 'rb') as f:
tmp = cPickle.load(f)
return tmp
except:
print '[!] 無進(jìn)度文件, 創(chuàng)建: %s' % path
return set()
7.2.2數(shù)據(jù)存儲(chǔ)器
數(shù)據(jù)存儲(chǔ)器的內(nèi)容基本上和第六章的一樣,不過生成文件按照當(dāng)前時(shí)間進(jìn)行命名避免重復(fù)洗做,同時(shí)對文件進(jìn)行緩存寫入撵孤。代碼如下:
#coding:utf-8
import codecs
import time
class DataOutput(object):
def __init__(self):
self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) )
self.output_head(self.filepath)
self.datas=[]
def store_data(self,data):
if data is None:
return
self.datas.append(data)
if len(self.datas)>10:
self.output_html(self.filepath)
def output_head(self,path):
'''
將HTML頭寫進(jìn)去
:return:
'''
fout=codecs.open(path,'w',encoding='utf-8')
fout.write("<html>")
fout.write("<body>")
fout.write("<table>")
fout.close()
def output_html(self,path):
'''
將數(shù)據(jù)寫入HTML文件中
:param path: 文件路徑
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
for data in self.datas:
fout.write("<tr>")
fout.write("<td>%s</td>"%data['url'])
fout.write("<td>%s</td>"%data['title'])
fout.write("<td>%s</td>"%data['summary'])
fout.write("</tr>")
self.datas.remove(data)
fout.close()
def ouput_end(self,path):
'''
輸出HTML結(jié)束
:param path: 文件存儲(chǔ)路徑
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
fout.write("</table>")
fout.write("</body>")
fout.write("</html>")
fout.close()
7.2.3控制調(diào)度器
控制調(diào)度器主要是產(chǎn)生并啟動(dòng)URL管理進(jìn)程、數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程竭望,同時(shí)維護(hù)4個(gè)隊(duì)列保持進(jìn)程間的通信邪码,分別為url_queue,result_queue, conn_q,store_q。4個(gè)隊(duì)列說明如下:
- url_q隊(duì)列是URL管理進(jìn)程將URL傳遞給爬蟲節(jié)點(diǎn)的通道
- result_q隊(duì)列是爬蟲節(jié)點(diǎn)將數(shù)據(jù)返回給數(shù)據(jù)提取進(jìn)程的通道
- conn_q隊(duì)列是數(shù)據(jù)提取進(jìn)程將新的URL數(shù)據(jù)提交給URL管理進(jìn)程的通道
- store_q隊(duì)列是數(shù)據(jù)提取進(jìn)程將獲取到的數(shù)據(jù)交給數(shù)據(jù)存儲(chǔ)進(jìn)程的通道
因?yàn)橐凸ぷ鞴?jié)點(diǎn)進(jìn)行通信咬清,所以分布式進(jìn)程必不可少闭专。參考1.4.4小節(jié)分布式進(jìn)程中服務(wù)進(jìn)程中的代碼(linux版),創(chuàng)建一個(gè)分布式管理器旧烧,定義為start_manager方法影钉。方法代碼如下:
def start_Manager(self,url_q,result_q):
'''
創(chuàng)建一個(gè)分布式管理器
:param url_q: url隊(duì)列
:param result_q: 結(jié)果隊(duì)列
:return:
'''
#把創(chuàng)建的兩個(gè)隊(duì)列注冊在網(wǎng)絡(luò)上,利用register方法掘剪,callable參數(shù)關(guān)聯(lián)了Queue對象平委,
# 將Queue對象在網(wǎng)絡(luò)中暴露
BaseManager.register('get_task_queue',callable=lambda:url_q)
BaseManager.register('get_result_queue',callable=lambda:result_q)
#綁定端口8001,設(shè)置驗(yàn)證口令‘baike’夺谁。這個(gè)相當(dāng)于對象的初始化
manager=BaseManager(address=('',8001),authkey='baike')
#返回manager對象
return manager
URL管理進(jìn)程將從conn_q隊(duì)列獲取到的新URL提交給URL管理器廉赔,經(jīng)過去重之后,取出URL放入url_queue隊(duì)列中傳遞給爬蟲節(jié)點(diǎn)匾鸥,代碼如下:
def url_manager_proc(self,url_q,conn_q,root_url):
url_manager = UrlManager()
url_manager.add_new_url(root_url)
while True:
while(url_manager.has_new_url()):
#從URL管理器獲取新的url
new_url = url_manager.get_new_url()
#將新的URL發(fā)給工作節(jié)點(diǎn)
url_q.put(new_url)
print 'old_url=',url_manager.old_url_size()
#加一個(gè)判斷條件蜡塌,當(dāng)爬去2000個(gè)鏈接后就關(guān)閉,并保存進(jìn)度
if(url_manager.old_url_size()>2000):
#通知爬行節(jié)點(diǎn)工作結(jié)束
url_q.put('end')
print '控制節(jié)點(diǎn)發(fā)起結(jié)束通知!'
#關(guān)閉管理節(jié)點(diǎn),同時(shí)存儲(chǔ)set狀態(tài)
url_manager.save_progress('new_urls.txt',url_manager.new_urls)
url_manager.save_progress('old_urls.txt',url_manager.old_urls)
return
#將從result_solve_proc獲取到的urls添加到URL管理器之間
try:
if not conn_q.empty():
urls = conn_q.get()
url_manager.add_new_urls(urls)
except BaseException,e:
time.sleep(0.1)#延時(shí)休息
數(shù)據(jù)提取進(jìn)程從result_queue隊(duì)列讀取返回的數(shù)據(jù)勿负,并將數(shù)據(jù)中的URL添加到conn_q隊(duì)列交給URL管理進(jìn)程馏艾,將數(shù)據(jù)中的文章標(biāo)題和摘要添加到store_q隊(duì)列交給數(shù)據(jù)存儲(chǔ)進(jìn)程。代碼如下:
def result_solve_proc(self,result_q,conn_q,store_q):
while(True):
try:
if not result_q.empty():
content = result_q.get(True)
if content['new_urls']=='end':
#結(jié)果分析進(jìn)程接受通知然后結(jié)束
print '結(jié)果分析進(jìn)程接受通知然后結(jié)束!'
store_q.put('end')
return
conn_q.put(content['new_urls'])#url為set類型
store_q.put(content['data'])#解析出來的數(shù)據(jù)為dict類型
else:
time.sleep(0.1)#延時(shí)休息
except BaseException,e:
time.sleep(0.1)#延時(shí)休息
數(shù)據(jù)存儲(chǔ)進(jìn)程從store_q隊(duì)列中讀取數(shù)據(jù)奴愉,并調(diào)用數(shù)據(jù)存儲(chǔ)器進(jìn)行數(shù)據(jù)存儲(chǔ)琅摩。代碼如下:
def store_proc(self,store_q):
output = DataOutput()
while True:
if not store_q.empty():
data = store_q.get()
if data=='end':
print '存儲(chǔ)進(jìn)程接受通知然后結(jié)束!'
output.ouput_end(output.filepath)
return
output.store_data(data)
else:
time.sleep(0.1)
最后將分布式管理器、URL管理進(jìn)程锭硼、 數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程進(jìn)行啟動(dòng)房资,并初始化4個(gè)隊(duì)列。代碼如下:
if __name__=='__main__':
#初始化4個(gè)隊(duì)列
url_q = Queue()
result_q = Queue()
store_q = Queue()
conn_q = Queue()
#創(chuàng)建分布式管理器
node = NodeManager()
manager = node.start_Manager(url_q,result_q)
#創(chuàng)建URL管理進(jìn)程账忘、 數(shù)據(jù)提取進(jìn)程和數(shù)據(jù)存儲(chǔ)進(jìn)程
url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm',))
result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,))
store_proc = Process(target=node.store_proc, args=(store_q,))
#啟動(dòng)3個(gè)進(jìn)程和分布式管理器
url_manager_proc.start()
result_solve_proc.start()
store_proc.start()
manager.get_server().serve_forever()
7.3 爬蟲節(jié)點(diǎn)SpiderNode
爬蟲節(jié)點(diǎn)相對簡單志膀,主要包含HTML下載器熙宇、HTML解析器和爬蟲調(diào)度器。執(zhí)行流程如下:
- 爬蟲調(diào)度器從控制節(jié)點(diǎn)中的url_q隊(duì)列讀取URL
- 爬蟲調(diào)度器調(diào)用HTML下載器溉浙、HTML解析器獲取網(wǎng)頁中新的URL和標(biāo)題摘要
- 最后爬蟲調(diào)度器將新的URL和標(biāo)題摘要傳入result_q隊(duì)列交給控制節(jié)點(diǎn)
7.3.1 HTML下載器
HTML下載器的代碼和第六章的一致烫止,只要注意網(wǎng)頁編碼即可。代碼如下:
#coding:utf-8
import requests
class HtmlDownloader(object):
def download(self,url):
if url is None:
return None
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers={'User-Agent':user_agent}
r = requests.get(url,headers=headers)
if r.status_code==200:
r.encoding='utf-8'
return r.text
return None
7.3.2 HTML解析器
HTML解析器的代碼和第六章的一致戳稽,詳細(xì)的網(wǎng)頁分析過程可以回顧第六章馆蠕。代碼如下:
#coding:utf-8
import re
import urlparse
from bs4 import BeautifulSoup
class HtmlParser(object):
def parser(self,page_url,html_cont):
'''
用于解析網(wǎng)頁內(nèi)容抽取URL和數(shù)據(jù)
:param page_url: 下載頁面的URL
:param html_cont: 下載的網(wǎng)頁內(nèi)容
:return:返回URL和數(shù)據(jù)
'''
if page_url is None or html_cont is None:
return
soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8')
new_urls = self._get_new_urls(page_url,soup)
new_data = self._get_new_data(page_url,soup)
return new_urls,new_data
def _get_new_urls(self,page_url,soup):
'''
抽取新的URL集合
:param page_url: 下載頁面的URL
:param soup:soup
:return: 返回新的URL集合
'''
new_urls = set()
#抽取符合要求的a標(biāo)簽
links = soup.find_all('a',href=re.compile(r'/view/\d+\.htm'))
for link in links:
#提取href屬性
new_url = link['href']
#拼接成完整網(wǎng)址
new_full_url = urlparse.urljoin(page_url,new_url)
new_urls.add(new_full_url)
return new_urls
def _get_new_data(self,page_url,soup):
'''
抽取有效數(shù)據(jù)
:param page_url:下載頁面的URL
:param soup:
:return:返回有效數(shù)據(jù)
'''
data={}
data['url']=page_url
title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title']=title.get_text()
summary = soup.find('div',class_='lemma-summary')
#獲取tag中包含的所有文版內(nèi)容包括子孫tag中的內(nèi)容,并將結(jié)果作為Unicode字符串返回
data['summary']=summary.get_text()
return data
7.3.3 爬蟲調(diào)度器
爬蟲調(diào)度器需要用到分布式進(jìn)程中工作進(jìn)程的代碼,具體內(nèi)容可以參考第一章的分布式進(jìn)程章節(jié)惊奇。爬蟲調(diào)度器需要先連接上控制節(jié)點(diǎn)互躬,然后依次完成從url_q隊(duì)列中獲取URL,下載并解析網(wǎng)頁颂郎,將獲取的數(shù)據(jù)交給result_q隊(duì)列吼渡,返回給控制節(jié)點(diǎn)等各項(xiàng)任務(wù),代碼如下:
class SpiderWork(object):
def __init__(self):
#初始化分布式進(jìn)程中的工作節(jié)點(diǎn)的連接工作
# 實(shí)現(xiàn)第一步:使用BaseManager注冊獲取Queue的方法名稱
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 實(shí)現(xiàn)第二步:連接到服務(wù)器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗(yàn)證口令注意保持與服務(wù)進(jìn)程設(shè)置的完全一致:
self.m = BaseManager(address=(server_addr, 8001), authkey='baike')
# 從網(wǎng)絡(luò)連接:
self.m.connect()
# 實(shí)現(xiàn)第三步:獲取Queue的對象:
self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
#初始化網(wǎng)頁下載器和解析器
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print 'init finish'
def crawl(self):
while(True):
try:
if not self.task.empty():
url = self.task.get()
if url =='end':
print '控制節(jié)點(diǎn)通知爬蟲節(jié)點(diǎn)停止工作...'
#接著通知其它節(jié)點(diǎn)停止工作
self.result.put({'new_urls':'end','data':'end'})
return
print '爬蟲節(jié)點(diǎn)正在解析:%s'%url.encode('utf-8')
content = self.downloader.download(url)
new_urls,data = self.parser.parser(url,content)
self.result.put({"new_urls":new_urls,"data":data})
except EOFError,e:
print "連接工作節(jié)點(diǎn)失敗"
return
except Exception,e:
print e
print 'Crawl fali '
if __name__=="__main__":
spider = SpiderWork()
spider.crawl()
在爬蟲調(diào)度器設(shè)置了一個(gè)本地IP:127.0.0.1乓序,大家可以將在一臺機(jī)器上測試代碼的正確性寺酪。當(dāng)然也可以使用三臺VPS服務(wù)器,兩臺運(yùn)行爬蟲節(jié)點(diǎn)程序替劈,將IP改為控制節(jié)點(diǎn)主機(jī)的公網(wǎng)IP寄雀,一臺運(yùn)行控制節(jié)點(diǎn)程序,進(jìn)行分布式爬取陨献,這樣更貼近真實(shí)的爬取環(huán)境盒犹。下面圖7.3為最終爬取的數(shù)據(jù),圖7.4為new_urls.txt內(nèi)容眨业,圖7.5為old_urls.txt內(nèi)容急膀,大家可以進(jìn)行對比測試,這個(gè)簡單的分布式爬蟲還有很大發(fā)揮的空間坛猪,希望大家發(fā)揮自己的聰明才智進(jìn)一步完善脖阵。
7.4小結(jié)
本章講解了一個(gè)簡單的分布式爬蟲結(jié)構(gòu),主要目的是幫助大家將Python爬蟲基礎(chǔ)篇的知識進(jìn)行總結(jié)和強(qiáng)化墅茉,開拓大家的思維,同時(shí)也讓大家知道分布式爬蟲并不是這么高不可攀呜呐。不過當(dāng)你親手打造一個(gè)分布式爬蟲后就斤,就會(huì)知道分布式爬蟲的難點(diǎn)在于節(jié)點(diǎn)的調(diào)度,什么樣的結(jié)構(gòu)能讓各個(gè)節(jié)點(diǎn)穩(wěn)定高效的運(yùn)作才是分布式爬蟲要考慮的核心內(nèi)容蘑辑。到本章為止洋机,Python爬蟲基礎(chǔ)篇已經(jīng)結(jié)束,這個(gè)時(shí)候大家基本上可以編寫簡單的爬蟲洋魂,爬取一些靜態(tài)網(wǎng)站的內(nèi)容绷旗,但是Python爬蟲開發(fā)不僅如此喜鼓,大家接著往下學(xué)習(xí)吧。