目標(biāo)
在大規(guī)模爬取數(shù)據(jù)前琳拭,先定一個(gè)能達(dá)到的小目標(biāo),比方說先爬個(gè)10萬條數(shù)據(jù)描验。
爬蟲爬數(shù)據(jù)太慢了白嘁,怎么爬快點(diǎn)?
程序中途中斷了怎么辦膘流,好不容易爬了這么多數(shù)據(jù)絮缅,又要重頭開始爬嗎/(ㄒoㄒ)/
數(shù)據(jù)有重復(fù)的鲁沥,占用多余的空間,影響統(tǒng)計(jì)怎么辦耕魄?
這些都是剛開始爬取大規(guī)模數(shù)據(jù)都會(huì)遇到的問題画恰,這次就來說說解決這些問題的思路。
涉及的知識(shí)點(diǎn)如下:
- 多線程的生產(chǎn)者和消費(fèi)者模型
- 斷點(diǎn)數(shù)據(jù)的記錄和恢復(fù)
- 數(shù)據(jù)入庫前的去重
多線程的生產(chǎn)者和消費(fèi)者模型
1. 單線程
-
其實(shí)可以看成生產(chǎn)者生產(chǎn)一個(gè)任務(wù)(比如構(gòu)造出一個(gè)url)吸奴,然后消費(fèi)者執(zhí)行這個(gè)任務(wù)(爬取url對(duì)應(yīng)的網(wǎng)站)允扇,消費(fèi)者任務(wù)還沒執(zhí)行完時(shí),生產(chǎn)者就不會(huì)生產(chǎn)任務(wù)则奥,所以他們相對(duì)任務(wù)來說是一對(duì)一的考润,同步執(zhí)行的。當(dāng)生產(chǎn)速率遠(yuǎn)遠(yuǎn)大于消費(fèi)速率读处,這時(shí)生產(chǎn)者也會(huì)被拖累糊治。
生產(chǎn)者和消費(fèi)者單線程版本 - 還有一個(gè)問題就是,我們的爬蟲任務(wù)罚舱,在做請(qǐng)求網(wǎng)絡(luò)時(shí)井辜,實(shí)際上cpu大多數(shù)時(shí)候都在等待網(wǎng)絡(luò)返回的包,沒有完全發(fā)揮出cpu的計(jì)算能力馆匿,所以要想辦法讓cpu在等待網(wǎng)絡(luò)響應(yīng)時(shí)抑胎,也能動(dòng)起來做些計(jì)算任務(wù)。而多線程就是為了解決這種情況出現(xiàn)的渐北,操作系統(tǒng)會(huì)自動(dòng)安排cpu在不同的線程中切換阿逃,提高cpu利用率。所以這就是為什么要開啟多線程的原因赃蛛。
2. 多線程
主要考慮的是線程的管理恃锉,任務(wù)的調(diào)度,還有線程間共享數(shù)據(jù)會(huì)出現(xiàn)的沖突問題呕臂。
-
先來看下面這張圖破托。整個(gè)流程就是,我們給每個(gè)生產(chǎn)者和消費(fèi)者都開啟不同的線程歧蒋,生產(chǎn)者生產(chǎn)任務(wù)土砂,放入隊(duì)列中存儲(chǔ),消費(fèi)者從隊(duì)列中取出任務(wù)谜洽,并執(zhí)行任務(wù)萝映,cpu會(huì)在不同的線程中切換,來執(zhí)行任務(wù)的阐虚。
生產(chǎn)者和消費(fèi)者多線程版本 生產(chǎn)者不需要管任務(wù)是否被消費(fèi)掉序臂,只需要不停生產(chǎn)就行,而消費(fèi)者也不需要去等待生產(chǎn)者实束,只要隊(duì)列中有任務(wù)奥秆,就取出來執(zhí)行就行了逊彭,這樣就解決了任務(wù)的調(diào)度問題。
因?yàn)殛?duì)列本身是線程安全的构订,換句話說就是隊(duì)列中的數(shù)據(jù)侮叮,在多線程下不會(huì)出現(xiàn)數(shù)據(jù)沖突的問題,這樣也就解決了隊(duì)列中共享數(shù)據(jù)的沖突問題悼瘾。對(duì)于其他共享的數(shù)據(jù) 签赃,可以使用線程鎖,來給共享數(shù)據(jù)加鎖分尸,這樣在釋放鎖之前,其他線程就不會(huì)訪問這些數(shù)據(jù)歹嘹,防止沖突箩绍。
因?yàn)樯a(chǎn)者和消費(fèi)者中間隔了一個(gè)隊(duì)列,使得他們互不干擾尺上,解耦程度高材蛛。這樣也帶來一個(gè)好處 ,當(dāng)生產(chǎn)速率遠(yuǎn)大于消費(fèi)速率怎抛,這時(shí)可以添加多個(gè)消費(fèi)者(實(shí)際就是多開幾個(gè)線程)卑吭,提高消費(fèi)速率,與生產(chǎn)速率達(dá)到相對(duì)的平衡马绝,提高資源利用率豆赏。
關(guān)于多線程知識(shí)的補(bǔ)充
如果對(duì)于線程的基礎(chǔ)概念不了解的,可以看下參考鏈接富稻,講解十分到位掷邦。
廖雪峰 進(jìn)程和線程
菜鳥教程 python多線程
多線程代碼思路
前面的都是介紹偏概念的知識(shí),估計(jì)耐心看的人不多椭赋,這里就講講代碼中的思路
- 導(dǎo)入threading線程庫,隊(duì)列庫Queue抚岗,python3的是queue。
- 創(chuàng)建任務(wù)隊(duì)列哪怔,必須要設(shè)置隊(duì)列的大小宣蔚,否則隊(duì)列中的任務(wù)數(shù)量會(huì)無限增長,占用大量內(nèi)存认境。
task_queue = Queue.Queue(maxsize=thread_max_count*10)
- 創(chuàng)建生產(chǎn)者線程胚委,threading.Thread(target=producer)需要傳入線程要執(zhí)行函數(shù)的名字,注意函數(shù)名不要加括號(hào)元暴。調(diào)用start后篷扩,線程才會(huì)真正的跑起來。
"""負(fù)責(zé)對(duì)生產(chǎn)者線程的創(chuàng)建"""
def producer_manager():
thread = threading.Thread(target=producer)
thread.start()
- 生產(chǎn)者執(zhí)行的生產(chǎn)任務(wù)
"""生產(chǎn)者負(fù)責(zé)請(qǐng)求網(wǎng)站首頁茉盏,解析出每個(gè)帖子的url鉴未,和創(chuàng)建出帖子的請(qǐng)求任務(wù)并放入任務(wù)隊(duì)列中"""
def producer():
for title_page in range(start_page, total_page):
text = request_title(title_page)
if not text:
continue
tie_list = parse_title(text)
for tie in tie_list:
if tieba.find_one({'link':tie['link']}): #數(shù)據(jù)去重的判斷
print('Data already exists: '+tie['link'])
else:
task_queue.put(tie) #將任務(wù)放入隊(duì)列
log.update_one(run_log,{'$set':{'run_page':title_page}}) #記錄斷點(diǎn)數(shù)據(jù)
- 創(chuàng)建消費(fèi)者線程枢冤。將線程放入list中放入方便管理。 調(diào)用task_queue.join()铜秆,表示若隊(duì)列中還存在任務(wù)淹真,那么主線程就阻塞住,不會(huì)往下面執(zhí)行连茧。
"""負(fù)責(zé)創(chuàng)建并管理消費(fèi)者的線程"""
def consumer_manager():
threads = []
while len(threads) < thread_max_count:
thread = threading.Thread(target=consumer)
thread.setName("thread%d" % len(threads))
threads.append(thread)
thread.start()
task_queue.join()
6.消費(fèi)者執(zhí)行的任務(wù)核蘸。這里用了一個(gè)while True做死循環(huán),這樣線程就不會(huì)結(jié)束啸驯,避免了創(chuàng)建和銷毀線程帶來的開銷客扎,能提高一點(diǎn)運(yùn)行效率。任務(wù)執(zhí)行完后需要調(diào)用queue.task_done()函數(shù)罚斗,告訴隊(duì)列已經(jīng)完成一個(gè)任務(wù)徙鱼,只有所有任務(wù)都調(diào)用過queue.task_done()以后,隊(duì)列才會(huì)解除阻塞针姿,主線程繼續(xù)往下執(zhí)行袱吆。
"""消費(fèi)者負(fù)責(zé)從任務(wù)隊(duì)列中取出任務(wù)(即任務(wù)的消費(fèi)),并執(zhí)行爬取每篇帖子和里面評(píng)論的任務(wù)"""
def consumer():
while True:
if task_queue.qsize() == 0:
time.sleep(3)
else:
task_count[0] = task_count[0] + 1
#print("run time second %s, ready task counts %d, finish task counts %d, db counts %d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
print("運(yùn)行時(shí)間:%s秒,隊(duì)列中剩余任務(wù)數(shù)%d,已完成任務(wù)數(shù)%d,數(shù)據(jù)已保存條數(shù)%d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
tie = task_queue.get()
request_comment(tie)
insert_db(tie)
task_queue.task_done()
斷點(diǎn)記錄和恢復(fù)
為了每次開始爬取數(shù)據(jù)不必重頭開始距淫,必須要記錄下上次斷點(diǎn)的位置
- 記錄的手段绞绒,可以使用csv或者是數(shù)據(jù)庫,這里用的是mongodb榕暇。
- 記錄的位置蓬衡。這個(gè)需要考慮一下。如果放在每個(gè)消費(fèi)者線程中的話彤枢,記錄的位置會(huì)比較多撤蟆,到時(shí)候恢復(fù)起來比較麻煩。所以還是放在生產(chǎn)者線程中記錄會(huì)比較好堂污,每次分頁請(qǐng)求導(dǎo)航頁時(shí)家肯,使用數(shù)據(jù)更新的方式將頁數(shù)記錄下來,恢復(fù)時(shí)讀出頁數(shù)盟猖,從這里開始繼續(xù)爬取讨衣。
數(shù)據(jù)更新的函數(shù)update_one,接受的第一個(gè)參數(shù)式镐,是表示查詢的位置的反镇,第二個(gè)參數(shù)里 '$set‘ 是固定用法,后面是更新的數(shù)據(jù)娘汞。
log.update_one(run_log,{'$set':{'run_page':title_page}})
- 因?yàn)榛謴?fù)的時(shí)候可能會(huì)存在重復(fù)的數(shù)據(jù)歹茶,所以還需要做去重處理。
去重
- 去重最簡單的方法,就是在寫入數(shù)據(jù)庫前惊豺,查找有沒有這條記錄燎孟,有的話就不寫入,比較適合數(shù)據(jù)量不多時(shí)采用的方法尸昧。但在海量數(shù)據(jù)時(shí)揩页,會(huì)受到空間和時(shí)間效率的限制,這時(shí)可以采用性能更加優(yōu)秀的Bloom-Filter烹俗,即布隆過濾器算法爆侣,不過本人沒研究過,這里不詳細(xì)討論了幢妄。
- 查詢的條件要有唯一性兔仰,本文中就是直接對(duì) URL 進(jìn)行查詢。
- 如果需要對(duì)數(shù)據(jù)庫中某個(gè)字段頻繁查詢的話蕉鸳,會(huì)涉及到查詢的效率問題斋陪,那就需要對(duì)這個(gè)字段做索引。索引就像書的目錄置吓,如果查找某內(nèi)容在沒有目錄的幫助下,只能全篇從頭到尾查找翻閱缔赠,這導(dǎo)致效率非常的低下衍锚;如果在借助目錄情況下,就能很快的定位內(nèi)容所在位置嗤堰,效率會(huì)直線提高戴质。
- 對(duì)字段做索引時(shí)需要的條件,該字段最好是能滿足唯一性的踢匣,比如 ID告匠,URL 這些數(shù)據(jù),這樣查找返回的值只有一個(gè)离唬。還有這個(gè)字段的內(nèi)容不能頻繁變化后专,因?yàn)閿?shù)據(jù)庫引擎會(huì)對(duì)索引維護(hù),其實(shí)就是對(duì)索引進(jìn)行排序输莺,索引值經(jīng)常變化就會(huì)加大排序的負(fù)擔(dān)戚哎,影響性能。
運(yùn)行結(jié)果
可以放在服務(wù)器上運(yùn)行嫂用,抓取了10萬條帖子的數(shù)據(jù)型凳。我用個(gè)人電腦運(yùn)行時(shí),可能是因?yàn)榫W(wǎng)絡(luò)或者路由器的問題嘱函,最多只能開5個(gè)線程甘畅,多了容易出現(xiàn)請(qǐng)求超時(shí)的情況,所以最后沒辦法只能放在服務(wù)器上去跑,速度挺快的疏唾,可以開20個(gè)線程蓄氧,每秒能抓取十幾個(gè)帖子,只不過cpu是瓶頸荸实,一運(yùn)行cpu就滿載匀们,想以后再考慮優(yōu)化下吧。
完整代碼
以下是python2.7版本的代碼准给。
使用python3.6運(yùn)行的話泄朴,import Queue要變成小寫的import queue,還有用queue.Queue()來創(chuàng)建隊(duì)列露氮。
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import pymongo
import re, math
import time,sys
import threading
import Queue
thread_max_count = 20
total_page = 1501
db_name = 'tieba3'
"""若請(qǐng)求超時(shí)祖灰,則重試請(qǐng)求,重試次數(shù)在5次以內(nèi)"""
def request(method, url, **kwargs):
retry_count = 5
while retry_count > 0:
try:
res = requests.get(url, **kwargs) if method == 'get' else requests.post(url, **kwargs)
return res.text
except:
print('retry...', url)
retry_count -= 1
"""請(qǐng)求網(wǎng)站的導(dǎo)航頁畔规,獲取帖子數(shù)據(jù)"""
def request_title(title_page=1):
title_url = "http://guba.eastmoney.com/list,cjpl_" + str(title_page) + ".html"
return request('get', title_url, timeout=5)
"""解析導(dǎo)航頁帖子的標(biāo)題數(shù)據(jù)局扶,包括閱讀數(shù),評(píng)論數(shù)叁扫,標(biāo)題三妈,作者,發(fā)布時(shí)間莫绣,評(píng)論的總頁數(shù)"""
def parse_title(text):
article_list = []
soup = BeautifulSoup(text, 'lxml')
host_url = 'http://guba.eastmoney.com'
elem_article = soup.find_all(name='div', class_='articleh')
for item in elem_article:
article_dict = {'read_count': '', 'comment_count': '', 'page': '', 'title': '', 'tie': '', 'author': '',
'time': '', 'link': '', 'comment': ''}
article_dict['read_count'] = item.select_one("span.l1").text
article_dict['comment_count'] = item.select_one("span.l2").text
article_dict['page'] = int(math.ceil(int(article_dict['comment_count']) / 30.0))
article_dict['title'] = item.select_one("span.l3 > a").text
article_dict['author'] = item.select_one("span.l4 > a").text if item.select_one("span.l4 > a") else u'匿名作者'
article_dict['time'] = item.select_one("span.l5").text
href = item.select_one("span.l3 > a").get("href")
article_dict['link'] = host_url + href if href[:1] == '/' else host_url + '/' + href
article_dict['comment'] = []
article_list.append(article_dict)
return article_list
"""根據(jù)評(píng)論的總頁數(shù)畴蒲,拼接出每一個(gè)評(píng)論頁的url"""
def get_comment_urls(tie):
comment_urls = []
for cur_page in range(1, tie['page'] + 1 if tie['page'] > 0 else tie['page'] + 2):
comment_url = tie['link'][:-5] + '_' + str(cur_page) + ".html"
comment_urls.append(comment_url)
return comment_urls
"""請(qǐng)求評(píng)論頁的數(shù)據(jù)"""
def request_comment(tie):
"""跳過一些不是帖子的鏈接"""
if re.compile(r'news,cjpl').search(tie['link']) == None:
return
print(tie['link']+' '+threading.currentThread().name)
for comment_url in get_comment_urls(tie):
text = request('get', comment_url, timeout=5)
parse_comment(text, tie)
"""解析出評(píng)論頁的數(shù)據(jù),包括作者对室,時(shí)間模燥,評(píng)論內(nèi)容和計(jì)算評(píng)論樓層"""
def parse_comment(text, tie):
soup = BeautifulSoup(text, 'lxml')
if (soup.find(name='div', id='zw_body')):
tie['tie'] = soup.find(name='div', id='zw_body').text.replace(u'\u3000', u'')
div_list = soup.find(id="mainbody").find_all(name='div', class_="zwlitxt")
for item in div_list:
comment_info = {"author": '', "time": '', "content": '', "lou": 0}
comment_info['author'] = item.find(name='span', class_="zwnick").text
comment_info['lou'] = len(tie['comment']) + 1
comment_info['time'] = item.find(name='div', class_="zwlitime").text[3:]
if (item.find(name='div', class_="zwlitext stockcodec")):
comment_info['content'] = item.find(name='div', class_="zwlitext stockcodec").text
comment_info['content'] = u"沒有評(píng)論內(nèi)容" if comment_info['content'] == '' else comment_info['content']
else:
comment_info['content'] = u"沒有評(píng)論內(nèi)容"
tie['comment'].append(comment_info)
"""消費(fèi)者負(fù)責(zé)從任務(wù)隊(duì)列中取出任務(wù)(即任務(wù)的消費(fèi)),并執(zhí)行爬取每篇帖子和里面評(píng)論的任務(wù)"""
def consumer():
while True:
if task_queue.qsize() == 0:
time.sleep(3)
else:
task_count[0] = task_count[0] + 1
#print("run time second %s, ready task counts %d, finish task counts %d, db counts %d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
print("運(yùn)行時(shí)間:%s秒,隊(duì)列中剩余任務(wù)數(shù)%d,已完成任務(wù)數(shù)%d,數(shù)據(jù)已保存條數(shù)%d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
tie = task_queue.get()
request_comment(tie)
insert_db(tie)
task_queue.task_done()
"""負(fù)責(zé)創(chuàng)建并管理消費(fèi)者的線程"""
def consumer_manager():
threads = []
while len(threads) < thread_max_count:
thread = threading.Thread(target=consumer)
thread.setName("thread%d" % len(threads))
threads.append(thread)
thread.start()
task_queue.join()
"""數(shù)據(jù)保存"""
def insert_db(tie):
tieba.insert_one(tie)
"""生產(chǎn)者負(fù)責(zé)請(qǐng)求網(wǎng)站首頁掩宜,解析出每個(gè)帖子的url蔫骂,和創(chuàng)建出帖子的請(qǐng)求任務(wù)并放入任務(wù)隊(duì)列中"""
def producer():
for title_page in range(start_page, total_page):
text = request_title(title_page)
if not text:
continue
tie_list = parse_title(text)
for tie in tie_list:
if tieba.find_one({'link':tie['link']}): #數(shù)據(jù)去重的判斷
print('Data already exists: '+tie['link'])
else:
task_queue.put(tie)
log.update_one(run_log,{'$set':{'run_page':title_page}}) #記錄斷點(diǎn)數(shù)據(jù)
"""負(fù)責(zé)對(duì)生產(chǎn)者線程的創(chuàng)建"""
def producer_manager():
thread = threading.Thread(target=producer)
thread.start()
if __name__ == '__main__':
start_time = time.time()
task_count = [0]
client = pymongo.MongoClient('localhost', 27017)
test = client['test']
tieba = test[db_name]
"""
創(chuàng)建一個(gè)log數(shù)據(jù)庫,記錄斷點(diǎn)的位置牺汤,每次重新運(yùn)行就從斷點(diǎn)為位置重爬辽旋,
這里記錄的斷點(diǎn)數(shù)據(jù)是帖子在首頁的頁數(shù)
"""
log = test['log']
run_log = {'db_name':db_name}
if not log.find_one(run_log):
log.insert_one(run_log)
start_page = 1
else:
start_page = log.find_one(run_log)['run_page']
print('start_page',start_page)
"""使用帖子的鏈接作為索引,可以提高去重時(shí)的查詢效率"""
tieba.create_index('link')
"""必須要設(shè)置隊(duì)列的大小檐迟,否則隊(duì)列中的任務(wù)數(shù)量會(huì)無限增長戴已,占用大量內(nèi)存"""
task_queue = Queue.Queue(maxsize=thread_max_count*10)
producer_manager() #創(chuàng)建生產(chǎn)者線程
consumer_manager() #創(chuàng)建消費(fèi)者線程