Hello 大家好为流!我又來了。
[![QQ圖片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)](http://qiniu.cuiqingcai.com/wp-content/uploads/2016/11/QQ%E5%9B%BE%E7%89%8720161102215153.jpg)
你是不是發(fā)現(xiàn)下載圖片速度特別慢让簿、難以忍受熬床臁!對(duì)于這種問題 一般解決辦法就是多進(jìn)程了尔当!一個(gè)進(jìn)程速度慢莲祸!我就用十個(gè)進(jìn)程,相當(dāng)于十個(gè)人一起干椭迎。速度就會(huì)快很多啦H裰摹(為什么不說多線程?懂點(diǎn)Python的小伙伴都知道畜号、GIL的存在 導(dǎo)致Python的多線程有點(diǎn)坑敖裳帧!)今天就教大家來做一個(gè)多進(jìn)程的爬蟲(其實(shí)吧简软、可以用來做一個(gè)超簡(jiǎn)化版的分布式爬蟲)
其實(shí)吧蛮拔!還有一種加速的方法叫做“異步”!不過這玩意兒我沒怎么整明白就不出來誤人子弟了1陨(因?yàn)榕老x大部分時(shí)間都是在等待response中建炫!‘異步’則能讓程序在等待response的時(shí)間去做的其他事情。)
[![QQ圖片2016102219331](http://upload-images.jianshu.io/upload_images/4233558-4fb17673cf991148.gif?imageMogr2/auto-orient/strip)](http://qiniu.cuiqingcai.com/wp-content/uploads/2016/10/QQ%E5%9B%BE%E7%89%8720161022193315.gif)
學(xué)過Python基礎(chǔ)的同學(xué)都知道视卢、在多進(jìn)程中踱卵,進(jìn)程之間是不能相互通信的,這就有一個(gè)很坑爹的問題的出現(xiàn)了据过!多個(gè)進(jìn)程怎么知道那那些需要爬取惋砂、哪些已經(jīng)被爬取了!
這就涉及到一個(gè)東西绳锅!這玩意兒叫做隊(duì)列N鞫!隊(duì)列A圮健眷柔!隊(duì)列!原朝!其實(shí)吧正常來說應(yīng)該給大家用隊(duì)列來完成這個(gè)教程的驯嘱, 比如 Tornado 的queue模塊。(如果需要更為穩(wěn)定健壯的隊(duì)列喳坠,則請(qǐng)考慮使用Celery這一類的專用消息傳遞工具)
不過為了簡(jiǎn)化技術(shù)種類熬掀馈!(才不會(huì)告訴你們是我懶壕鹉,嫌麻煩呢L昊稀)這次我們繼續(xù)使用MongoDB聋涨。
好了!先來理一下思路:
每個(gè)進(jìn)程需要知道那些URL爬取過了负乡、哪些URL需要爬入拱住!我們來給每個(gè)URL設(shè)置兩種狀態(tài):
outstanding:等待爬取的URL
complete:爬取完成的URL
誒抖棘!等等我們好像忘了啥茂腥? 失敗的URL的怎么辦啊钉答?我們?cè)谠黾右环N狀態(tài):
processing:正在進(jìn)行的URL础芍。
嗯杈抢!當(dāng)一個(gè)所有初始的URL狀態(tài)都為outstanding数尿;當(dāng)開始爬取的時(shí)候狀態(tài)改為:processing;爬取完成狀態(tài)改為:complete惶楼;失敗的URL重置狀態(tài)為:outstanding右蹦。為了能夠處理URL進(jìn)程被終止的情況、我們?cè)O(shè)置一個(gè)計(jì)時(shí)參數(shù)歼捐,當(dāng)超過這個(gè)值時(shí)何陆;我們則將狀態(tài)重置為outstanding。
下面開整Go Go Go豹储!
首先我們需要一個(gè)模塊:datetime(這個(gè)模塊比內(nèi)置time模塊要好使一點(diǎn))不會(huì)裝贷盲??不是吧剥扣! pip install datetime
還有上一篇博文我們已經(jīng)使用過的pymongo
下面是隊(duì)列的代碼:
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MogoQueue():
OUTSTANDING = 1 ##初始狀態(tài)
PROCESSING = 2 ##正在下載狀態(tài)
COMPLETE = 3 ##下載完成狀態(tài)
def __init__(self, db, collection, timeout=300):##初始mongodb連接
self.client = MongoClient()
self.Client = self.client[db]
self.db = self.Client[collection]
self.timeout = timeout
def __bool__(self):
"""
這個(gè)函數(shù)巩剖,我的理解是如果下面的表達(dá)為真,則整個(gè)類為真
至于有什么用钠怯,后面我會(huì)注明的(如果我的理解有誤佳魔,請(qǐng)指點(diǎn)出來謝謝,我也是Python新手)
$ne的意思是不匹配
"""
record = self.db.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False
def push(self, url, title): ##這個(gè)函數(shù)用來添加新的URL進(jìn)隊(duì)列
try:
self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主題': title})
print(url, '插入隊(duì)列成功')
except errors.DuplicateKeyError as e: ##報(bào)錯(cuò)則代表已經(jīng)存在于隊(duì)列之中了
print(url, '已經(jīng)存在于隊(duì)列中了')
pass
def push_imgurl(self, title, url):
try:
self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
print('圖片地址插入成功')
except errors.DuplicateKeyError as e:
print('地址已經(jīng)存在了')
pass
def pop(self):
"""
這個(gè)函數(shù)會(huì)查詢隊(duì)列中的所有狀態(tài)為OUTSTANDING的值晦炊,
更改狀態(tài)鞠鲜,(query后面是查詢)(update后面是更新)
并返回_id(就是我們的URL),MongDB好使吧断国,^_^
如果沒有OUTSTANDING的值則調(diào)用repair()函數(shù)重置所有超時(shí)的狀態(tài)為OUTSTANDING贤姆,
$set是設(shè)置的意思,和MySQL的set語法一個(gè)意思
"""
record = self.db.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError
def pop_title(self, url):
record = self.db.find_one({'_id': url})
return record['主題']
def peek(self):
"""這個(gè)函數(shù)是取出狀態(tài)為 OUTSTANDING的文檔并返回_id(URL)"""
record = self.db.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
"""這個(gè)函數(shù)是更新已完成的URL完成"""
self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def repair(self):
"""這個(gè)函數(shù)是重置狀態(tài)$lt是比較"""
record = self.db.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print('重置URL狀態(tài)', record['_id'])
def clear(self):
"""這個(gè)函數(shù)只有第一次才調(diào)用稳衬、后續(xù)不要調(diào)用霞捡、因?yàn)檫@是刪庫啊宋彼!"""
self.db.drop()
好了弄砍,隊(duì)列我們做好了仙畦,下面是獲取所有頁面的代碼:
from Download import request
from mongodb_queue import MogoQueue
from bs4 import BeautifulSoup
spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
def start(url):
response = request.get(url, 3)
Soup = BeautifulSoup(response.text, 'lxml')
all_a = Soup.find('div', class_='all').find_all('a')
for a in all_a:
title = a.get_text()
url = a['href']
spider_queue.push(url, title)
"""上面這個(gè)調(diào)用就是把URL寫入MongoDB的隊(duì)列了"""
if __name__ == "__main__":
start('http://www.mzitu.com/all')
"""這一段兒就不解釋了哦!超級(jí)簡(jiǎn)單的"""
下面就是多進(jìn)程+多線程的下載代碼了:
import os
import time
import threading
import multiprocessing
from mongodb_queue import MogoQueue
from Download import request
from bs4 import BeautifulSoup
SLEEP_TIME = 1
def mzitu_crawler(max_threads=10):
crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##這個(gè)是我們獲取URL的隊(duì)列
##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
def pageurl_crawler():
while True:
try:
url = crawl_queue.pop()
print(url)
except KeyError:
print('隊(duì)列沒有數(shù)據(jù)')
break
else:
img_urls = []
req = request.get(url, 3).text
title = crawl_queue.pop_title(url)
mkdir(title)
os.chdir('D:\mzitu\\' + title)
max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
for page in range(1, int(max_span) + 1):
page_url = url + '/' + str(page)
img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
img_urls.append(img_url)
save(img_url)
crawl_queue.complete(url) ##設(shè)置為完成狀態(tài)
##img_queue.push_imgurl(title, img_urls)
##print('插入數(shù)據(jù)庫成功')
def save(img_url):
name = img_url[-9:-4]
print(u'開始保存:', img_url)
img = request.get(img_url, 3)
f = open(name + '.jpg', 'ab')
f.write(img.content)
f.close()
def mkdir(path):
path = path.strip()
isExists = os.path.exists(os.path.join("D:\mzitu", path))
if not isExists:
print(u'建了一個(gè)名字叫做', path, u'的文件夾音婶!')
os.makedirs(os.path.join("D:\mzitu", path))
return True
else:
print(u'名字叫做', path, u'的文件夾已經(jīng)存在了慨畸!')
return False
threads = []
while threads or crawl_queue:
"""
這兒crawl_queue用上了,就是我們__bool__函數(shù)的作用衣式,為真則代表我們MongoDB隊(duì)列里面還有數(shù)據(jù)
threads 或者 crawl_queue為真都代表我們還沒下載完成寸士,程序就會(huì)繼續(xù)執(zhí)行
"""
for thread in threads:
if not thread.is_alive(): ##is_alive是判斷是否為空,不是空則在隊(duì)列中刪掉
threads.remove(thread)
while len(threads) < max_threads or crawl_queue.peek(): ##線程池中的線程少于max_threads 或者 crawl_qeue時(shí)
thread = threading.Thread(target=pageurl_crawler) ##創(chuàng)建線程
thread.setDaemon(True) ##設(shè)置守護(hù)線程
thread.start() ##啟動(dòng)線程
threads.append(thread) ##添加進(jìn)線程隊(duì)列
time.sleep(SLEEP_TIME)
def process_crawler():
process = []
num_cpus = multiprocessing.cpu_count()
print('將會(huì)啟動(dòng)進(jìn)程數(shù)為:', num_cpus)
for i in range(num_cpus):
p = multiprocessing.Process(target=mzitu_crawler) ##創(chuàng)建進(jìn)程
p.start() ##啟動(dòng)進(jìn)程
process.append(p) ##添加進(jìn)進(jìn)程隊(duì)列
for p in process:
p.join() ##等待進(jìn)程隊(duì)列里面的進(jìn)程結(jié)束
if __name__ == "__main__":
process_crawler()
好啦!一個(gè)多進(jìn)程多線的爬蟲就完成了碴卧,(其實(shí)你可以設(shè)置一下MongoDB弱卡,然后調(diào)整一下連接配置,在多臺(tái)機(jī)器上跑哦W〔帷婶博!嗯,就是超級(jí)簡(jiǎn)化版的分布式爬蟲了荧飞,雖然很是簡(jiǎn)陋凡人。)
本來還想下載圖片那一塊兒加上異步(畢竟下載圖片是I\O等待最久的時(shí)間了,)叹阔,可惜異步我也沒怎么整明白挠轴,就不拿出來貽笑大方了。
另外耳幢,各位小哥兒可以參考上面代碼岸晦,單獨(dú)處理圖片地址試試(就是多個(gè)進(jìn)程直接下載圖片)?
我測(cè)試了一下八分鐘下載100套圖
[![QQ圖片2016110221515](http://upload-images.jianshu.io/upload_images/4233558-0b75bb6320a8debb.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)](http://qiniu.cuiqingcai.com/wp-content/uploads/2016/11/QQ%E5%9B%BE%E7%89%8720161102215153.jpg)
小白教程就到此結(jié)束了睛藻,后面我教大家玩玩Scrapy