1 前言
前一陣子看了不少關于分布式爬蟲系統(tǒng)的設計相關的博客碰凶,現(xiàn)在也想寫個練練手钳榨,就拿大家都喜歡看的豆瓣電影做個測試好了衔彻,代碼的框架結構如圖所示
編程之前需要熟悉:
- redis基本安裝和使用(python redis庫)
- MongoDB基本安裝和使用(python mongoengine庫)
- RabbitMQ消息隊列的基本安裝和使用(pyhton pika庫)
- Linux系統(tǒng)的screen 命令 L靥瘛W椤!非常便于vps管理
服務端程序基于python3 開發(fā)
爬蟲客戶端基于python3和scrapy開發(fā)
開發(fā)之前研究了下豆瓣的電影類目下網(wǎng)頁格式
https://movie.douban.com/j/new_search_subjects?sort=T&range=0,10&tags=電影&start=7100
start 從 0 到9979戳粒,指的是第一條數(shù)據(jù)的序號路狮,每次會返回20條數(shù)據(jù),總共有1萬條電影信息蔚约,我們請求的返回格式如下
,然后響應數(shù)據(jù)的url奄妨,就可以通過
bloom-filter過濾后存到我們新的任務隊列中,理想狀態(tài)下 100500次請求后苹祟,我們的數(shù)據(jù)里就會有10000條電影信息了(實際上爬出來了9982條和18條404被和諧的砸抛,然而豆瓣反爬真的很厲害,兩臺機器爬了一天多才完成任務树枫,速度問題后面會講直焙,主要是筆者沒有穩(wěn)定的ip池,免費的不好用以及客戶端太少并且豆瓣ip訪問頻率過高就返回302或403的反爬蟲策略太為嚴格導致的砂轻。奔誓。。插句題外話搔涝,爬小電影網(wǎng)站時不用分布式厨喂,一臺機器一天就爬了13萬個左右番號信息)
2 代碼基本解析
爬取數(shù)據(jù)比較重要的地方有下面幾塊
1.redis的使用 (任務進出管理和bloom-filter)
2.mongoDB的使用 (電影數(shù)據(jù)存儲和記錄未完成信息)
3.RabbitMQ的使用 (用于爬蟲客戶端和服務端rpc通訊,發(fā)布和完成任務)
2.1 redis 管理任務和去重
筆者將url任務分為a和b兩個優(yōu)先級,a>b ,將啟動的url(豆瓣的電影列表) 存在 arank的redis set里面庄呈,爬下來的電影詳情url 經(jīng)過bloom-filter去重后存到brank的redis set里面蜕煌。
大體代碼如下
redis_controller.py
#encoding=utf-8
import datetime
import traceback
from collections import Iterable
import redis
from hashlib import md5
from sql_model import monogo_controller
#連接redis
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
arank_str = 'arank'
brank_str = 'brank'
url_limit = 5
r = redis.Redis(connection_pool=pool)
def get_out_urls():
'''
取出給爬蟲客戶端的任務url
至多五個
:return:
'''
arank_data_len = r.scard(arank_str)
outdata = []
#從arank等級的redis里面尋找是否有任務
if arank_data_len > 0:
for i in range(url_limit):
popdata = r.spop(arank_str)
if popdata is not None:
outdata.append(popdata)
try:
#返回給客戶端時,將未完成任務存在mongoDB中
#完成后再刪除
monogo_controller.TempJob(_id=popdata,work_start=datetime.datetime.now()).save(force_insert=True)
except:
traceback.print_exc()
pass
else:
break
#arank等級的redis里面沒有任務時
# 尋找brank是否有任務
elif r.scard(brank_str) > 0:
for i in range(url_limit):
popdata = r.spop(brank_str)
if popdata is not None:
outdata.append(popdata)
try:
monogo_controller.TempJob(_id=popdata,work_start=datetime.datetime.now()).save(force_insert=True)
except:
traceback.print_exc()
pass
else:
break
#arank和brank里面都沒有任務時诬留,取出mongodb里面超過1h還未完成的任務
else:
for mogoi in range(5):
timejobs = monogo_controller.TempJob.objects(
work_start__lt=(datetime.datetime.now() - datetime.timedelta(hours=1))
).limit(1).modify(work_start=datetime.datetime.now())
if not timejobs:
break
outdata.append(timejobs._id)
return outdata
def puturl(rank,urls):
'''
將任務存入redis
:param rank: 任務等級
:param urls: 鏈接
:return:
'''
assert isinstance(urls,Iterable)
for url in urls:
#進行bloomfilter 過濾
if not bf.isContains(url.encode()):
bf.insert(url.encode())
if rank==arank_str:
r.sadd(arank_str,url)
else:
r.sadd(brank_str,url)
def puturl_safe(rank,urls):
'''
不經(jīng)過bloomfilter斜纪,直接將任務放入redis,用于手動造初始化數(shù)據(jù)url
:param rank:
:param urls:
:return:
'''
#安全的加入種子URL
assert isinstance(urls,Iterable)
for url in urls:
if rank==arank_str:
r.sadd(arank_str,url)
else:
r.sadd(brank_str,url)
class SimpleHash(object):
'''
bloomfilter使用的hash算法
網(wǎng)上找到
'''
def __init__(self, cap, seed):
self.cap = cap
self.seed = seed
def hash(self, value):
ret = 0
for i in range(len(value)):
ret += self.seed * ret + ord(value[i])
return (self.cap - 1) & ret
class BloomFilter(object):
def __init__(self, blockNum=1, key='doubanbloomfilter'):
"""
初始化布隆過濾器
:param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
:param key: the key's name in Redis
"""
self.server = r
self.bit_size = 1 << 31 # Redis的String類型最大容量為512M文兑,現(xiàn)使用256M
self.seeds = [5, 7, 11, 13, 31, 37, 61]
self.key = key
self.blockNum = blockNum
self.hashfunc = []
for seed in self.seeds:
self.hashfunc.append(SimpleHash(self.bit_size, seed))
def isContains(self, str_input):
'''
str_input是否有盒刚,沒有的話會自動入庫
:param str_input:
:return:
'''
if not str_input:
return False
m5 = md5()
m5.update(str_input)
str_input = m5.hexdigest()
ret = True
name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
for f in self.hashfunc:
loc = f.hash(str_input)
ret = ret & self.server.getbit(name, loc)
return ret
def insert(self, str_input):
'''
將hash出來的幾個指存入redis數(shù)據(jù)庫中的bit中
:param str_input:
:return:
'''
m5 = md5()
m5.update(str_input)
str_input = m5.hexdigest()
name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
for f in self.hashfunc:
loc = f.hash(str_input)
self.server.setbit(name, loc, 1)
bf = BloomFilter()
2.2 mongoDB(電影數(shù)據(jù)存儲和記錄未完成信息)
數(shù)據(jù)庫使用的是mongoDB,用python mongoengine 插件 進行ORM管理彩届,數(shù)據(jù)類如下
#電影數(shù)據(jù)類
import datetime
import mongoengine
import time
from mongoengine import StringField,DateTimeField,ListField,LongField,FloatField,IntField
#連接MongoDB
mongoengine.connect('douban',username='pig',password='pig@123456',authentication_source="admin")
class TempJob(mongoengine.Document):
'''
未完成任務數(shù)據(jù)ORM類
'''
_id= StringField(required=True,unique=True,primary_key=True)
work_start=DateTimeField(required=True,default=datetime.datetime.now())
class MoiveDataModel(mongoengine.Document):
'''
電影數(shù)據(jù)類
'''
director= ListField(StringField())
douban_id=LongField(unique=True,primary_key=True,required=True)
tags=ListField(StringField())
stars=ListField(StringField())
desc=StringField(required=True)
douban_remark=FloatField()
imdb_tag=StringField()
contry=StringField()
language=StringField()
publictime=DateTimeField()
runtime=IntField()
votes=IntField()
title=StringField(required=True)
def delete(urls):
#完成任務后刪除
for url in urls:
TempJob.objects(_id=url).delete()
2.3 rabbitmq 實現(xiàn)爬蟲客戶端和主服務端進行RPC通訊
通過rabbitmq 實現(xiàn)rpc的方式通訊伪冰,大體邏輯就是爬蟲客戶端通過rpc請求服務端分發(fā)任務,同時告知服務端任務完成情況和爬取到的數(shù)據(jù)對象,服務端收到請求時樟蠕,數(shù)據(jù)存到需要存到的地方贮聂,并且從redis和mongoDB找到下一批任務返回客戶端
rpc 服務端代碼如下
import json
import traceback
import pika
from main_server_side import redis_controller
from sql_model import monogo_controller
from sql_model.monogo_controller import MoiveDataModel
#連接MQ
#確保消息queue建立
cred = pika.PlainCredentials(username='pig', password='pig123')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='xx.xxx.xxx.xxx', credentials=cred))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue_douban')
def on_request(ch, method, props, body):
'''
收到客戶端請求的回調
:param ch:
:param method:
:param props:
:param body:
:return:
'''
try:
print("send_data")
jsondata = json.loads(body.decode())
print(jsondata)
done_urls=jsondata.get("done")
rankstr=jsondata.get('rankstr')
rankurls=jsondata.get("new_urls")
if done_urls is not None:
print("del done_urls")
print(done_urls)
monogo_controller.delete(done_urls)
if rankurls is not None:
redis_controller.puturl(rankstr,rankurls)
response=redis_controller.get_out_urls()
print("response is :")
print(response)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id
, content_type='application/json',
content_encoding='utf-8'),
body=json.dumps({"isok":True,"ans": response}))
ch.basic_ack(delivery_tag=method.delivery_tag)
result_map=jsondata.get("result_map")
if result_map is not None:
for mogodata in result_map:
try:
print(type(mogodata))
MoiveDataModel(**mogodata).save()
except:
traceback.print_exc()
pass
except Exception as e:
traceback.print_exc()
#設置每次只處理一次請求(單線程)
channel.basic_qos(prefetch_count=1,)
# 監(jiān)聽rpc_queue_douban
channel.basic_consume(on_request, queue='rpc_queue_douban')
print(" Awaiting DOUBAN RPC requests")
#等待請求
channel.start_consuming()
對應的rpc客戶端設計如下
#!/usr/bin/env python
#encoding=utf-8
import json
import uuid
import pika
class RPCClient(object):
def __init__(self):
self.credentials = pika.PlainCredentials('pig', 'pig123')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='xx.xx.xx.xx', credentials=self.credentials))
self.channel = self.connection.channel()
#設置回調為匿名唯一queue
result_queue=self.channel.queue_declare(exclusive=True)
self.callback_queue_name=result_queue.method.queue
self.channel.basic_consume(self.onresponse,self.callback_queue_name,no_ack=True)
self.responsedata=None
def onresponse(self,channel, method, properties, body):
if self.corrid == properties.correlation_id:
self.responsedata=body
def call(self,query_dict):
#correlation_id生成一個uuid
self.corrid=str(uuid.uuid4())
self.channel.basic_publish(exchange='',routing_key='rpc_queue_douban',body=json.dumps(query_dict)
,properties=pika.BasicProperties(content_type='application/json',content_encoding='utf-8'
,correlation_id=self.corrid,reply_to=self.callback_queue_name))
while self.responsedata is None:
self.connection.process_data_events(time_limit=None)
backresponse=self.responsedata
self.responsedata=None
return json.loads(backresponse.decode())
2.4爬蟲客戶端 scrapy接入RPC
scrapy客戶端利用rpc通訊從服務端拿到任務靠柑,通過xpath解析頁面拿到數(shù)據(jù),代碼如下
# -*- coding: utf-8 -*-
import json
import random
import re
import scrapy
import time
import logging
from urllib.parse import unquote
from scrapy import Request
from scrapy_client_side.scrapy_client_side.client_side import RPCClient
logging.basicConfig(filename='douban_spider.log', filemode="a", level=logging.ERROR)
class DoubanSpider(scrapy.Spider):
name = 'douban_spider'
urlpre = "https://movie.douban.com/"
done_urls = []
result_map = []
rankstr = None
new_urls = []
#豆瓣觸發(fā)反爬機制時會返回403和302
#這種時候爬蟲暫停兩個小時再爬取基本沒有異常
handle_httpstatus_list = [403,302]
def start_requests(self):
while (True):
#rpc請求成功后隨機停止30-60s,降低促發(fā)反爬蟲的概率
if self.rankstr is None:
try:
rpc_response = RPCClient().call({"query": "start"})
except:
#rpc有時會和服務端連接失敗吓懈,等待1分后重試
time.sleep(60)
continue
else:
try:
rpc_response = RPCClient().call(
{"done": self.done_urls, "rankstr": self.rankstr, "new_urls": self.new_urls,
"result_map": self.result_map})
print("get data from server sleep ")
time.sleep(random.randint(30,40))
except:
time.sleep(random.randint(55,65))
continue
try:
ansurls = rpc_response.get("ans")
print("ansis:")
print(ansurls)
#每次將數(shù)據(jù)rpc提交給服務端后清理掉
self.done_urls = []
self.rankstr = None
self.new_urls = []
self.result_map = []
if not ansurls :
time.sleep(30)
else:
for url in ansurls:
print("yield")
yield Request(self.urlpre + url, callback=self.parse,errback=self.errback_httpbin)
except:
time.sleep(30)
pass
def errback_httpbin(self, failure):
print(repr(failure))
def parse(self, response):
if not response.status==200:
time.sleep(7200)
yield Request(response.url, callback=self.parse,errback=self.errback_httpbin)
elif response.url.count(r'j/new_search_subjects') > 0:
resjson = json.loads(response.text)
urls = (unquote(data.get("url").replace("https://movie.douban.com/","")) for data in resjson.get('data'))
self.new_urls.extend(urls)
self.rankstr = 'brank'
self.done_urls.append(unquote(response.url.replace("https://movie.douban.com/","")))
elif response.url.count(r'subject/') > 0:
try:
response_dict = {}
# director = ListField(StringField)
# douban_id = LongField(unique=True, primary_key=True, required=True)
# tags = ListField(StringField)
# stars = ListField(StringField)
# desc = StringField(required=True)
# douban_remark = FloatField()
# imdb_tag = FloatField()
# contry = StringField()
# language = StringField()
# publictime = DateTimeField()
# runtime = IntField()
# votes = IntField()
response_dict["director"] = response.xpath("http://a[contains(@rel,'v:directedBy')]/text()").extract()
response_dict["douban_id"] = int(response.xpath("http://a[@share-id]/@share-id").get())
response_dict["tags"] = response.xpath("http://div[contains(@class,'tags-body')]/a/text()").extract()
response_dict["stars"] = response.xpath("http://a[contains(@rel,'v:starring')]/text()").extract()
response_dict["desc"] = "".join(
response.xpath("http://span[contains(@property,'v:summary')]/text()").extract()).replace("\u3000", " ")
response_dict["douban_remark"] = float(
response.xpath("http://strong[contains(@property,'v:average')]/text()").get())
response_dict["imdb_tag"] = response.xpath("http://a[contains(@href,'imdb')]/text()").get()
response_dict["contry"] = response.xpath("http://span[contains(text(),'制片國家')]/following-sibling::text()").get()
response_dict["language"] = response.xpath("http://span[contains(text(),'語言')]/following-sibling::text()").get()
str = response.xpath("http://span[contains(@property,'v:initialReleaseDate')]/text()").get()
try:
timestr = re.findall(r"\d{4}-\d{2}-\d{2}", str)[0]
response_dict["publictime"] = timestr
except:
pass
# 時長
try:
response_dict["runtime"] = int(response.xpath("http://span[contains(@property,'v:runtime')]/@content").get())
except:
response_dict["runtime"]=-1
pass
response_dict["votes"] = int(response.xpath("http://span[contains(@property,'v:votes')]/text()").get())
response_dict["title"] =response.xpath("http://title/text()").get().replace("\n","").replace('(豆瓣)',"").strip()
print(response_dict)
self.rankstr=""
self.result_map.append(response_dict)
self.done_urls.append(unquote(response.url.replace("https://movie.douban.com/", "")))
except Exception as e:
logging.exception("spider parse error")
pass
可以通過python代碼調用爬蟲啟動歼冰,并且設置setting項
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from scrapy_client_side.scrapy_client_side.spiders.douban_spider import DoubanSpider
s=get_project_settings()
s.set("USER_AGENT",'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:58.0) Gecko/20100101 Firefox/58.0')
s.set("ROBOTSTXT_OBEY" , False)
DOWNLOAD_DELAY = 10
RANDOMIZE_DOWNLOAD_DELAY = True
s.set('DOWNLOAD_DELAY',DOWNLOAD_DELAY)
s.set('RANDOMIZE_DOWNLOAD_DELAY',RANDOMIZE_DOWNLOAD_DELAY)
s.set('CONCURRENT_REQUESTS',1)
s.set('DOWNLOAD_TIMEOUT',60)
process1 = CrawlerProcess(s)
process1.crawl(DoubanSpider)
process1.start()
2.5造初始化的url數(shù)據(jù)
from main_server_side import redis_controller
urlstep=[]
for i in range(0,9981,20):
if i==9980:
num=9979
else:
num=i
urlstep.append("j/new_search_subjects?sort=T&range=0,10&tags=電影&start=%s"%(num))
redis_controller.puturl_safe(redis_controller.arank_str,urlstep)
關鍵代碼就是這些了,使用時稍微組織下代碼結構就可以了耻警,
scrapy項目用scrapy startproject xxxx 命令生成隔嫡,直接python -m的方式啟動rpc服務端代碼 和控制爬蟲的python腳本代碼運行
3后記
分布式爬取數(shù)據(jù)筆者認為解決了帶寬和ip限制的問題,在這種情況下爬取效率和vps數(shù)量成正比甘穿,因為個人vps空間不足沒有將下載的網(wǎng)頁緩存到主服務器或者別的oss服務器上(這一步筆者認為是比較重要的腮恩,因為緩存下來后,當有別的字段要解析時速度快多)温兼。這里寫一下也是記錄下自己的設計思路秸滴,也和各位讀者朋友探討下技術吧