scrapy-redis是結(jié)合了分布式數(shù)據(jù)庫redis匾嘱,重寫了scrapy一些比較關(guān)鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲
scrapy-redis工程的主體還是redis和scrapy兩個庫橱乱,這個工程就像膠水一樣办成,把這兩個插件粘結(jié)了起來。
scrapy-redis所實現(xiàn)的兩種分布式:爬蟲分布式以及item處理分布式官疲。分別是由模塊scheduler和模塊pipelines實現(xiàn)袱结。
scrapy-redi重寫了scrapy一些比較關(guān)鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲途凫。
作為一個分布式爬蟲垢夹,是需要有一個Master端(核心服務(wù)器)的,在Master端维费,會搭建一個Redis數(shù)據(jù)庫果元,用來存儲start_urls、request犀盟、items而晒。Master的職責(zé)是負責(zé)url指紋判重,Request的分配阅畴,以及數(shù)據(jù)的存儲(一般在Master端會安裝一個mongodb用來存儲redis中的items)倡怎。除了Master之外,還有一個角色就是slaver(爬蟲程序執(zhí)行端)贱枣,它主要負責(zé)執(zhí)行爬蟲程序爬取數(shù)據(jù)监署,并將爬取過程中新的Request提交到Master的redis數(shù)據(jù)庫中。
-
如上圖纽哥,假設(shè)我們有四臺電腦:A钠乏, B, C春塌, D 晓避,其中任意一臺電腦都可以作為 Master端 或 Slaver端。整個流程是:
- 首先Slaver端從Master端拿任務(wù)(Request只壳、url)進行數(shù)據(jù)抓取侠仇,Slaver抓取數(shù)據(jù)的同時守谓,產(chǎn)生新任務(wù)的Request便提交給 Master 處理钦购;
- Master端只有一個Redis數(shù)據(jù)庫戴陡,負責(zé)將未處理的Request去重和任務(wù)分配梯投,將處理后的Request加入待爬隊列命辖,并且存儲爬取的數(shù)據(jù)。
Scrapy-Redis默認使用的就是這種策略分蓖,我們實現(xiàn)起來很簡單尔艇,因為任務(wù)調(diào)度等工作Scrapy-Redis都已經(jīng)幫我們做好了,我們只需要繼承RedisSpider么鹤、指定redis_key就行了终娃。
缺點是,Scrapy-Redis調(diào)度的任務(wù)是Request對象蒸甜,里面信息量比較大(不僅包含url棠耕,還有callback函數(shù)余佛、headers等信息),可能導(dǎo)致的結(jié)果就是會降低爬蟲速度窍荧、而且會占用Redis大量的存儲空間辉巡,所以如果要保證效率,那么就需要一定硬件水平蕊退。
scrapy-redis的目錄結(jié)構(gòu)如下, 各個模塊功能見注釋
├── __init__.py
├── connection.py # 負責(zé)根據(jù)setting中配置實例化redis連接郊楣。被dupefilter和scheduler調(diào)用,總之涉及到redis存取的都要使用到這個模塊瓤荔。
├── defaults.py # 一些默認參數(shù)配置
├── dupefilter.py # 用于請求隊列的去重, 繼承了scrapy本身的去重器.當(dāng)request不重復(fù)時净蚤,將其存入到queue中,調(diào)度時將其彈出输硝。
├── picklecompat.py # 使用pickle進行序列化
├── pipelines.py # 它將Item存儲在redis中以實現(xiàn)分布式處理今瀑。
├── queue.py # 調(diào)度隊列, 調(diào)度器會使用該隊列
├── scheduler.py # 調(diào)度器, 負責(zé)任務(wù)的調(diào)度工作。其利用的數(shù)據(jù)結(jié)構(gòu)來自于queue中實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)腔丧。
├── spiders.py # spider基類, 加入了信號等
└── utils.py # spider從redis中讀取要爬的url,然后執(zhí)行爬取放椰,若爬取過程中返回更多的url,那么繼續(xù)進行直至所有的request完成愉粤。之后繼續(xù)從redis中讀取url砾医,循環(huán)這個過程。
defaults.py
import redis
# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
PIPELINE_KEY = '%(spider)s:items'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
utils.py
import six
def bytes_to_str(s, encoding='utf-8'):
"""Returns a str if a bytes object is given."""
if six.PY3 and isinstance(s, bytes):
return s.decode(encoding)
return s
spider.py
spider.py文件是分布式爬蟲的入口代碼:
1衣厘、通過connection接口如蚜,spider初始化時,通過setup_redis()函數(shù)初始化好和redis的連接影暴。
2错邦、通過next_requests函數(shù)從redis中取出strat url,spider使用少量的start url + LinkExtractor型宙,可以發(fā)展出很多新的url撬呢,這些url會進入scheduler進行判重和調(diào)度。直到spider跑到調(diào)度池內(nèi)沒有url的時候妆兑,會觸發(fā)spider_idle信號魂拦,從而觸發(fā)spider的next_requests函數(shù)。
3搁嗓、再次從redis的start url池中讀取一些url芯勘。
spider空閑的時候會從start_urls隊列中讀取url, 默認一次讀取CONCURRENT_REQUESTS個url, 可以在settings中設(shè)置REDIS_START_URLS_BATCH_SIZE來改變每次的讀取數(shù)量, 一般我會在使用的時候增大這個值, 可以降低spide進入idle的次數(shù), 從而適當(dāng)提升抓取性能。
設(shè)計的這個spider從redis中讀取要爬的url腺逛,然后執(zhí)行爬取荷愕,若爬取過程中返回更多的url,那么繼續(xù)進行直至所有的request完成。之后繼續(xù)從redis中讀取url安疗,循環(huán)這個過程抛杨。
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection, defaults
from .utils import bytes_to_str
# 實現(xiàn)從redis的隊列中讀取url
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
# 鏈接redis
def setup_redis(self, crawler=None):
"""初始化了redis參數(shù), 包括使用的種子url的key, 批量讀取url的數(shù)量等信息"""
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE',
settings.getint('CONCURRENT_REQUESTS'),
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
if self.redis_encoding is None:
self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
# 當(dāng)spider空閑的時候會觸發(fā)該信號, 調(diào)用spider_idle函數(shù)
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
# 這個方法的作用就是從redis中獲取start_url
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis.
By default, ``data`` is an encoded URL. You can override this method to
provide your own message decoding.
Parameters
----------
data : bytes
Message from redis.
"""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""空閑的時候觸發(fā)該函數(shù), 嘗試請求下一批url. 有url的時候會直接請求, 最后都會拋出異常, 防止spider被關(guān)閉, 然后等待新的url過來"""
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: True)
Use SET operations to retrieve messages from the redis queue.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return objfrom scrapy import signals
spider的改動也不是很大,主要是通過connect接口荐类,給spider綁定了spider_idle信號蝶桶,spider初始化時,通過setup_redis函數(shù)初始化好和redis的連接掉冶,之后通過next_requests函數(shù)從redis中取出strat url真竖,使用的key是settings中REDIS_START_URLS_AS_SET定義的(注意了這里的初始化url池和我們上邊的queue的url池不是一個東西,queue的池是用于調(diào)度的厌小,初始化url池是存放入口url的恢共,他們都存在redis中,但是使用不同的key來區(qū)分璧亚,就當(dāng)成是不同的表吧)讨韭,spider使用少量的start url,可以發(fā)展出很多新的url癣蟋,這些url會進入scheduler進行判重和調(diào)度透硝。直到spider跑到調(diào)度池內(nèi)沒有url的時候,會觸發(fā)spider_idle信號疯搅,從而觸發(fā)spider的next_requests函數(shù)濒生,再次從redis的start url池中讀取一些url。
connection.py
負責(zé)根據(jù)setting中配置實例化redis連接幔欧。被dupefilter和scheduler調(diào)用罪治,總之涉及到redis存取的都要使用到這個模塊。
connect文件引入了redis模塊礁蔗,這個是redis-python庫的接口觉义,用于通過python訪問redis數(shù)據(jù)庫,可見浴井,這個文件主要是實現(xiàn)連接redis數(shù)據(jù)庫的功能(返回的是redis庫的Redis對象或者StrictRedis對象晒骇,這倆都是可以直接用來進行數(shù)據(jù)操作的對象)。這些連接接口在其他文件中經(jīng)常被用到磺浙。其中洪囤,我們可以看到,要想連接到redis數(shù)據(jù)庫屠缭,和其他數(shù)據(jù)庫差不多箍鼓,需要一個ip地址崭参、端口號呵曹、用戶名密碼(可選)和一個整形的數(shù)據(jù)庫編號,同時我們還可以在scrapy工程的setting文件中配置套接字的超時時間、等待時間等奄喂。
import six
from scrapy.utils.misc import load_object
from . import defaults
# 快速映射settings配置文件中redis的基礎(chǔ)配置字典
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP = {
'REDIS_URL': 'url',
'REDIS_HOST': 'host',
'REDIS_PORT': 'port',
'REDIS_ENCODING': 'encoding',
}
# 根據(jù)scrapy中settings配置文件信息返回一個redis客戶端實例對象
def get_redis_from_settings(settings):
"""Returns a redis client instance from given Scrapy settings object.
This function uses ``get_client`` to instantiate the client and uses
``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
can override them using the ``REDIS_PARAMS`` setting.
Parameters
----------
settings : Settings
A scrapy settings object. See the supported settings below.
Returns
-------
server
Redis client instance.
Other Parameters
----------------
REDIS_URL : str, optional
Server connection URL.
REDIS_HOST : str, optional
Server host.
REDIS_PORT : str, optional
Server port.
REDIS_ENCODING : str, optional
Data encoding.
REDIS_PARAMS : dict, optional
Additional client parameters.
"""
params = defaults.REDIS_PARAMS.copy()
params.update(settings.getdict('REDIS_PARAMS'))
# XXX: Deprecate REDIS_* settings.
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val
# Allow ``redis_cls`` to be a path to a class.
if isinstance(params.get('redis_cls'), six.string_types):
params['redis_cls'] = load_object(params['redis_cls'])
return get_redis(**params)
# Backwards compatible alias.
from_settings = get_redis_from_settings
# 返回一個redis的Strictredis實例對象
def get_redis(**kwargs):
"""Returns a redis client instance.
Parameters
----------
redis_cls : class, optional
Defaults to ``redis.StrictRedis``.
url : str, optional
If given, ``redis_cls.from_url`` is used to instantiate the class.
**kwargs
Extra parameters to be passed to the ``redis_cls`` class.
Returns
-------
server
Redis client instance.
"""
redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
url = kwargs.pop('url', None)
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)
scheduler.py
這個文件重寫了scheduler類铐殃,用來代替scrapy.core.scheduler的原有調(diào)度器。實現(xiàn)原理是使用指定的一個redis內(nèi)存作為數(shù)據(jù)存儲的媒介跨新,以達到各個爬蟲之間的統(tǒng)一調(diào)度富腊。
1、scheduler負責(zé)調(diào)度各個spider的request請求域帐,scheduler初始化時赘被,通過settings文件讀取queue和dupefilters(url去重)的類型,配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters肖揣,這樣對于同一種spider的不同實例民假,就會使用相同的數(shù)據(jù)塊了)。
2龙优、每當(dāng)一個request要被調(diào)度時羊异,enqueue_request被調(diào)用,scheduler使用dupefilters來判斷這個url是否重復(fù)彤断,如果不重復(fù)野舶,就添加到queue的容器中(三種隊列方式:先進先出,先進后出和優(yōu)先級都可以宰衙,可以在settings中配置)平道。
3、當(dāng)調(diào)度完成時供炼,next_request被調(diào)用巢掺,scheduler就通過queue容器的接口,取出一個request劲蜻,把他發(fā)送給相應(yīng)的spider陆淀,讓spider進行爬取工作。
此擴展是對scrapy中自帶的scheduler的替代(在settings的SCHEDULER變量中指出)先嬉,正是利用此擴展實現(xiàn)crawler的分布式調(diào)度轧苫。其利用的數(shù)據(jù)結(jié)構(gòu)來自于queue中實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)。
scrapy-redis所實現(xiàn)的兩種分布式:爬蟲分布式以及item處理分布式就是由模塊scheduler和模塊pipelines實現(xiàn)疫蔓。上述其它模塊作為為二者輔助的功能模塊含懊。
替換了scrapy原生的scheduler, 所有方法名稱和原生scheduler保持一致, 在爬蟲開啟后會連接待抓取隊列和去重集合, 然后就是不斷把新的請求去重后放入待抓取隊列, 然后從待抓取隊列拿出請求給下載器
調(diào)度器肯定要和請求隊列
和去重隊列
進行交互, 所以初始化要獲取使用的queue
和dupfilter
的類, 并在open
方法中完成實例化
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler
Settings
--------
SCHEDULER_PERSIST : bool (default: False)
Whether to persist or clear redis queue.
SCHEDULER_FLUSH_ON_START : bool (default: False)
Whether to flush redis queue on start.
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
How many seconds to wait before closing if no message is received.
SCHEDULER_QUEUE_KEY : str
Scheduler redis key.
SCHEDULER_QUEUE_CLASS : str
Scheduler queue class.
SCHEDULER_DUPEFILTER_KEY : str
Scheduler dupefilter redis key.
SCHEDULER_DUPEFILTER_CLASS : str
Scheduler dupefilter class.
SCHEDULER_SERIALIZER : str
Scheduler serializer.
"""
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings): #settings是傳過來的配置文件信息
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
# 讀取上面的配置文件,取settings里面找到相對應(yīng)的值衅胀,拿到settings后面的結(jié)果
for name, setting_name in optional.items():
val = settings.get(setting_name) # 匹配settings對應(yīng)的值出來(自己配置的).取配置文件settings里面拿到相對應(yīng)的值出來岔乔,settings里面的鍵是在這里面循環(huán)拿到的(optional),也就是optional后面的值,對應(yīng)settinsg里面的鍵
if val:
kwargs[name] = val # 存進去
# Support serializer as a path to a module.
# 序列化操作,爬蟲key序列化
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler): # 當(dāng)你執(zhí)行調(diào)度器scrapy-redis的時候滚躯,就會傳入settigs進來雏门,配置信息是在crawler.settings
instance = cls.from_settings(crawler.settings) # #crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940>
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
"""爬蟲啟動時觸發(fā), 主要是連接待抓取和去重模塊"""
self.spider = spider
try:
# 得到隊列queue的實例化對象
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
# 得到去重的實例化對象
self.df = load_object(self.dupefilter_cls)(
server=self.server,
key=self.dupefilter_key % {'spider': spider.name},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start: # 如果為True, 要在爬蟲開啟前刪除對應(yīng)爬蟲request隊列和dupfilter隊列
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
"""把請求去重后放入 待抓取隊列中"""
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
"""從請求隊列拿出下一個請求并返回"""
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0
這個文件重寫了scheduler類嘿歌,用來代替scrapy.core.scheduler的原有調(diào)度器。其實對原有調(diào)度器的邏輯沒有很大的改變茁影,主要是使用了redis作為數(shù)據(jù)存儲的媒介宙帝,以達到各個爬蟲之間的統(tǒng)一調(diào)度。
scheduler負責(zé)調(diào)度各個spider的request請求募闲,scheduler初始化時步脓,通過settings文件讀取queue和dupefilters的類型(一般就用上邊默認的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters浩螺,這樣對于同一種spider的不同實例靴患,就會使用相同的數(shù)據(jù)塊了)。每當(dāng)一個request要被調(diào)度時要出,enqueue_request被調(diào)用蚁廓,scheduler使用dupefilters來判斷這個url是否重復(fù),如果不重復(fù)厨幻,就添加到queue的容器中(先進先出相嵌,先進后出和優(yōu)先級都可以,可以在settings中配置)况脆。當(dāng)調(diào)度完成時饭宾,next_request被調(diào)用,scheduler就通過queue容器的接口格了,取出一個request看铆,把他發(fā)送給相應(yīng)的spider,讓spider進行爬取工作盛末。
dupefilter.py
分布式爬蟲url去重原理:
通過分析可以知道self.server為redis實例弹惦,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一個spider是相同的,redis是一個key-value的數(shù)據(jù)庫悄但,如果key是相同的棠隐,訪問到的值就是相同的,默認使用spider名字 + fingerpoint的key就是為了區(qū)分在不同主機上的不同spider實例檐嚣,只要數(shù)據(jù)是同一個spider助泽,就會訪問到redis中的同一個spider-set而這個set就是url的判重池)。去重指紋計算使用的是sha1算法, 計算值包括請求方法, url, body等信息
負責(zé)執(zhí)行requst的去重嚎京,實現(xiàn)的很有技巧性嗡贺,使用redis的set數(shù)據(jù)結(jié)構(gòu)。但是注意scheduler并不使用其中用于在這個模塊中實現(xiàn)的dupefilter鍵做request的調(diào)度鞍帝,而是使用queue.py模塊中實現(xiàn)的queue诫睬。當(dāng)request不重復(fù)時,將其存入到queue中帕涌,調(diào)度時將其彈出摄凡。
import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings
logger = logging.getLogger(__name__)
# TODO: Rename class to RedisDupeFilter.
# 對請求做去重處理续徽,可以被分布式下不同的schedule調(diào)用
class RFPDupeFilter(BaseDupeFilter):
"""Redis-based request duplicates filter.
This class can also be used with default Scrapy's scheduler.
"""
logger = logger
def __init__(self, server, key, debug=False):
"""Initialize the duplicates filter.
Parameters
----------
server : redis.StrictRedis
The redis server instance.
key : str
Redis key Where to store fingerprints.
debug : bool, optional
Whether to log filtered requests.
"""
self.server = server
self.key = key
self.debug = debug
self.logdupes = True
# 通過settings配置文件信息返回一個redis示例對象
@classmethod
def from_settings(cls, settings):
"""Returns an instance from given settings.
This uses by default the key ``dupefilter:<timestamp>``. When using the
``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
it needs to pass the spider name in the key.
Parameters
----------
settings : scrapy.settings.Settings
Returns
-------
RFPDupeFilter
A RFPDupeFilter instance.
"""
server = get_redis_from_settings(settings)
# XXX: This creates one-time key. needed to support to use this
# class as standalone dupefilter with scrapy's default scheduler
# if scrapy passes spider on open() method this wouldn't be needed
# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
@classmethod
def from_crawler(cls, crawler):
"""Returns instance from crawler.
Parameters
----------
crawler : scrapy.crawler.Crawler
Returns
-------
RFPDupeFilter
Instance of RFPDupeFilter.
"""
return cls.from_settings(crawler.settings)
def request_seen(self, request):
"""獲取請求指紋并添加到redis的去重集合中去"""
"""Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"""
fp = self.request_fingerprint(request) # 得到請求的指紋
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp) # 把指紋添加到redis的集合中
return added == 0
# 這個方法是用來調(diào)用request_fingerprint接口的,這個接口通過sha1算法來判斷兩個url請求地址是否相同(注意架谎,這里面不完全是我們之前理解的hash了,如果兩個url的地址相同辟躏,請求方式和參數(shù)都相同谷扣,,但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址,)從而達到url的去重功能捎琐。
def request_fingerprint(self, request):
"""Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"""
return request_fingerprint(request) # 得到請求指紋
# Scrapy's scheduler調(diào)用会涎,刪除數(shù)據(jù),關(guān)閉連接
def close(self, reason=''):
"""Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"""
self.clear()
# 清空操作記錄數(shù)據(jù)
def clear(self):
"""Clears fingerprints data."""
self.server.delete(self.key)
# 請求日志信息
def log(self, request, spider):
"""Logs given request.
Parameters
----------
request : scrapy.http.Request
spider : scrapy.spiders.Spider
"""
if self.debug:
msg = "Filtered duplicate request: %(request)s"
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes:
msg = ("Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)")
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
self.logdupes = False
這個文件看起來比較復(fù)雜瑞凑,重寫了scrapy本身已經(jīng)實現(xiàn)的request判重功能末秃。因為本身scrapy單機跑的話,只需要讀取內(nèi)存中的request隊列或者持久化的request隊列(scrapy默認的持久化似乎是json格式的文件籽御,不是數(shù)據(jù)庫)就能判斷這次要發(fā)出的request url是否已經(jīng)請求過或者正在調(diào)度(本地讀就行了)练慕。而分布式跑的話,就需要各個主機上的scheduler都連接同一個數(shù)據(jù)庫的同一個request池來判斷這次的請求是否是重復(fù)的了技掏。
在這個文件中铃将,通過繼承BaseDupeFilter重寫他的方法,實現(xiàn)了基于redis的判重哑梳。根據(jù)源代碼來看劲阎,scrapy-redis使用了scrapy本身的一個fingerprint接request_fingerprint,這個接口很有趣鸠真,根據(jù)scrapy文檔所說悯仙,他通過hash來判斷兩個url是否相同(相同的url會生成相同的hash結(jié)果),但是當(dāng)兩個url的地址相同吠卷,get型參數(shù)相同但是順序不同時锡垄,也會生成相同的hash結(jié)果(這個真的比較神奇)所以scrapy-redis依舊使用url的fingerprint來判斷request請求是否已經(jīng)出現(xiàn)過。這個類通過連接redis祭隔,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一種spider是相同的偎捎,redis是一個key-value的數(shù)據(jù)庫,如果key是相同的序攘,訪問到的值就是相同的茴她,這里使用spider名字+DupeFilter的key就是為了在不同主機上的不同爬蟲實例,只要屬于同一種spider程奠,就會訪問到同一個set丈牢,而這個set就是他們的url判重池),如果返回值為0瞄沙,說明該set中該fingerprint已經(jīng)存在(因為集合是沒有重復(fù)值的)己沛,則返回False慌核,如果返回值為1,說明添加了一個fingerprint到set中申尼,則說明這個request沒有重復(fù)垮卓,于是返回True,還順便把新fingerprint加入到數(shù)據(jù)庫中了师幕。 DupeFilter判重會在scheduler類中用到粟按,每一個request在進入調(diào)度之前都要進行判重,如果重復(fù)就不需要參加調(diào)度霹粥,直接舍棄就好了灭将,不然就是白白浪費資源。
request.py
request_fingerprint接口:
通過request_fingerprint接口后控,通過sha1算法來判斷兩個url請求地址是否相同(注意庙曙,這里面不完全是我們之前理解的hash了,如果兩個url的地址相同浩淘,請求方式和參數(shù)都相同捌朴,但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111)從而達到url的去重功能。
"""This module provides some useful functions for working with scrapy.http.Request objects"""
from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached
_fingerprint_cache = weakref.WeakKeyDictionary()
def request_fingerprint(request, include_headers=None):
"""Return the request fingerprint"""
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
return cache[include_headers]
queue.py
這是個隊列類张抄,它會作為scheduler調(diào)度request的容器來維護一個秩序:
1男旗、 scheduler在每個主機上都會實例化一個,并且和spider一一對應(yīng)欣鳖,所以分布式運行時會有一個spider的多個實例和一個scheduler的多個實例存在于不同的主機上察皇。
2、因為scheduler都是用相同的容器泽台,而這些容器都連接同一個 redis服務(wù)器什荣,又都使用spider名 + queue來作為key 讀寫數(shù)據(jù),所以不同主機上的不同爬蟲實例公用一個request調(diào)度池怀酷,實現(xiàn)了分布式爬蟲之間的統(tǒng)一調(diào)度稻爬。
其作用如dupefilter.py所述,但是這里實現(xiàn)了三種方式的queue:FIFO的SpiderQueue蜕依,SpiderPriorityQueue桅锄,以及LIFI的SpiderStack。默認使用的是第二種
from scrapy.utils.reqser import request_to_dict, request_from_dict
from . import picklecompat
# 隊列基類
class Base(object):
"""Per-spider base queue class"""
def __init__(self, server, spider, key, serializer=None):
"""Initialize per-spider redis queue.
Parameters
----------
server : StrictRedis
Redis client instance.
spider : Spider
Scrapy spider instance.
key: str
Redis key where to put and get messages.
serializer : object
Serializer object with ``loads`` and ``dumps`` methods.
"""
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
#隊列----先進先出
class FifoQueue(Base):
"""使用了redis的list結(jié)構(gòu)"""
"""Per-spider FIFO queue"""
def __len__(self):
"""返回隊列長度大小"""
"""Return the length of the queue"""
return self.server.llen(self.key)
# request 進棧样眠,進棧前對request做處理友瘤,request請求先被scrapy的接口request_to_dict變成了一個dict對象(因為request對象實在是比較復(fù)雜,有方法有屬性不好串行化)檐束,,之后使用picklecompat中的serializer串行化為字符串辫秧,然后使用一個特定的key存入redis中,(該key在同一種spider中是相同的)
def push(self, request):
"""發(fā)送請求到隊列左邊"""
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
# request出棧,其實就是從redis用那個特定的key去讀其值(一個list)被丧,從list中讀取最早進去的那個盟戏,于是就先進先出了.
def pop(self, timeout=0):
"""從隊列右邊拋出請求"""
"""Pop a request"""
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
# 優(yōu)先級隊列
class PriorityQueue(Base):
"""使用了redis的有序集合結(jié)構(gòu)"""
"""Per-spider priority queue abstraction using redis' sorted set"""
def __len__(self):
"""返回隊列長度大小"""
"""Return the length of the queue"""
return self.server.zcard(self.key)
def push(self, request):
"""放入請求到zset中"""
"""Push a request"""
data = self._encode_request(request)
score = -request.priority
# We don't use zadd method as the order of arguments change depending on
# whether the class is Redis or StrictRedis, and the option of using
# kwargs only accepts strings, not bytes.
self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0):
"""從zset中拋出請求. 此處不支持timeout參數(shù)"""
"""
Pop a request
timeout not support in this queue class
"""
# use atomic range/remove using multi/exec
pipe = self.server.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return self._decode_request(results[0])
# 棧----后進先出, 使用list結(jié)構(gòu)實現(xiàn),和先進先出隊列基本一樣, 實現(xiàn)了棧結(jié)構(gòu)
class LifoQueue(Base):
"""Per-spider LIFO queue."""
def __len__(self):
"""Return the length of the stack"""
return self.server.llen(self.key)
def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.lpop(self.key)
if data:
return self._decode_request(data)
# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
該文件實現(xiàn)了幾個容器類绪妹,可以看這些容器和redis交互頻繁,同時使用了我們上邊picklecompat中定義的serializer柿究。這個文件實現(xiàn)的幾個容器大體相同邮旷,只不過一個是隊列,一個是棧蝇摸,一個是優(yōu)先級隊列婶肩,這三個容器到時候會被scheduler對象實例化,來實現(xiàn)request的調(diào)度探入。比如我們使用SpiderQueue作為調(diào)度隊列的類型狡孔,到時候request的調(diào)度方法就是先進先出懂诗,而實用SpiderStack就是先進后出了蜂嗽。
我們可以仔細看看SpiderQueue的實現(xiàn),他的push函數(shù)就和其他容器的一樣殃恒,只不過push進去的request請求先被scrapy的接口request_to_dict變成了一個dict對象(因為request對象實在是比較復(fù)雜植旧,有方法有屬性不好串行化),之后使用picklecompat中的serializer串行化為字符串离唐,然后使用一個特定的key存入redis中(該key在同一種spider中是相同的)病附。而調(diào)用pop時,其實就是從redis用那個特定的key去讀其值(一個list)亥鬓,從list中讀取最早進去的那個完沪,于是就先進先出了。
這些容器類都會作為scheduler調(diào)度request的容器嵌戈,scheduler在每個主機上都會實例化一個覆积,并且和spider一一對應(yīng),所以分布式運行時會有一個spider的多個實例和一個scheduler的多個實例存在于不同的主機上熟呛,但是宽档,因為scheduler都是用相同的容器,而這些容器都連接同一個redis服務(wù)器庵朝,又都使用spider名加queue來作為key讀寫數(shù)據(jù)吗冤,所以不同主機上的不同爬蟲實例公用一個request調(diào)度池,實現(xiàn)了分布式爬蟲之間的統(tǒng)一調(diào)度九府。
picklecompat.py
這里實現(xiàn)了loads和dumps兩個函數(shù)椎瘟,其實就是實現(xiàn)了一個serializer:
1、因為redis數(shù)據(jù)庫不能存儲復(fù)雜對象(value部分只能是字符串侄旬,字符串列表降传,字符串集合和hash,key部分只能是字符串)勾怒,所以我們存啥都要先串行化成文本才行婆排。這里使用的就是python的pickle模塊声旺,一個兼容py2和py3的串行化工具。
"""A pickle wrapper module with protocol=-1 by default."""
try:
import cPickle as pickle # PY2
except ImportError:
import pickle
def loads(s):
return pickle.loads(s)
def dumps(obj):
return pickle.dumps(obj, protocol=-1)
這里實現(xiàn)了loads和dumps兩個函數(shù)段只,其實就是實現(xiàn)了一個serializer腮猖,因為redis數(shù)據(jù)庫不能存儲復(fù)雜對象(value部分只能是字符串,字符串列表赞枕,字符串集合和hash澈缺,key部分只能是字符串),所以我們存啥都要先串行化成文本才行炕婶。這里使用的就是python的pickle模塊姐赡,一個兼容py2和py3的串行化工具。這個serializer主要用于一會的scheduler存reuqest對象柠掂,至于為什么不實用json格式项滑,我也不是很懂,item pipeline的串行化默認用的就是json涯贞。
pipelines.py
pipeline.py文件用來實現(xiàn)數(shù)據(jù)分布式處理枪狂。它通過從settings中拿到我們配置的REDIS_ITEMS_KEY作為key,把item串行化之后存入redis數(shù)據(jù)庫對應(yīng)的value中(這個value可以看出是個list宋渔,我們的每個item是這個list中的一個結(jié)點)州疾,這個pipeline把提取出的item存起來,主要是為了方便我們延后處理數(shù)據(jù)皇拣。
這是是用來實現(xiàn)分布式處理的作用严蓖。它將Item存儲在redis中以實現(xiàn)分布式處理。另外可以發(fā)現(xiàn)氧急,同樣是編寫pipelines颗胡,在這里的編碼實現(xiàn)不同于文章中所分析的情況,由于在這里需要讀取配置态蒂,所以就用到了from_crawler()函數(shù)杭措。
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
return self.key % {'spider': spider.name}
pipeline文件實現(xiàn)了一個item pipieline類,和scrapy的item pipeline是同一個對象钾恢,通過從settings中拿到我們配置的REDIS_ITEMS_KEY作為key手素,把item串行化之后存入redis數(shù)據(jù)庫對應(yīng)的value中(這個value可以看出出是個list,我們的每個item是這個list中的一個結(jié)點)瘩蚪,這個pipeline把提取出的item存起來盛险,主要是為了方便我們延后處理數(shù)據(jù)惧眠。
最后總結(jié)一下scrapy-redis的總體思路:這個工程通過重寫scheduler和spider類暇番,實現(xiàn)了scheduler調(diào)度噩翠、spider啟動和固定redis的交互。實現(xiàn)新的dupefilter和queue類,達到了去重和調(diào)度容器和redis的交互邓嘹,因為每個主機上的爬蟲進程都訪問同一個redis數(shù)據(jù)庫酣栈,所以調(diào)度和去重都統(tǒng)一進行統(tǒng)一管理,達到了分布式爬蟲的目的汹押。
組件之間的關(guān)系
當(dāng)spider被初始化時矿筝,同時會初始化一個對應(yīng)的scheduler對象,這個調(diào)度器對象通過讀取settings棚贾,配置好自己的調(diào)度容器queue和判重工具dupefilter窖维。每當(dāng)一個spider產(chǎn)出一個request的時候,scrapy內(nèi)核會把這個reuqest遞交給這個spider對應(yīng)的scheduler對象進行調(diào)度妙痹,scheduler對象通過訪問redis對request進行判重铸史,如果不重復(fù)就把他添加進redis中的調(diào)度池。當(dāng)調(diào)度條件滿足時怯伊,scheduler對象就從redis的調(diào)度池中取出一個request發(fā)送給spider琳轿,讓他爬取。當(dāng)spider爬取的所有暫時可用url之后震贵,scheduler發(fā)現(xiàn)這個spider對應(yīng)的redis的調(diào)度池空了利赋,于是觸發(fā)信號spider_idle水评,spider收到這個信號之后猩系,直接連接redis讀取strart url池,拿去新的一批url入口中燥,然后再次重復(fù)上邊的工作寇甸。
為什么要提供這些組件?
我們先從scrapy的“待爬隊列”和“Scheduler”入手:玩過爬蟲的同學(xué)都多多少少有些了解疗涉,在爬蟲爬取過程當(dāng)中有一個主要的數(shù)據(jù)結(jié)構(gòu)是“待爬隊列”拿霉,以及能夠操作這個隊列的調(diào)度器(也就是Scheduler)。scrapy官方文檔對這二者的描述不多咱扣,基本上沒提绽淘。
scrapy使用什么樣的數(shù)據(jù)結(jié)構(gòu)來存放待爬取的request呢?其實沒用高大上的數(shù)據(jù)結(jié)構(gòu)闹伪,就是python自帶的collection.deque(改造過后的)沪铭,問題來了,該怎么讓兩個以上的Spider共用這個deque呢偏瓤?
scrapy-redis提供了一個解決方法杀怠,把deque換成redis數(shù)據(jù)庫,我們從同一個redis服務(wù)器存放要爬取的request厅克,這樣就能讓多個spider去同一個數(shù)據(jù)庫里讀取赔退,這樣分布式的主要問題就解決了嘛。
那么問題又來了,我們換了redis來存放隊列硕旗,哪scrapy就能直接分布式了么窗骑?。scrapy中跟“待爬隊列”直接相關(guān)的就是調(diào)度器“Scheduler”漆枚,它負責(zé)對新的request進行入列操作(加入deque)慧域,取出下一個要爬取的request(從deque中取出)等操作。在scrapy中浪读,Scheduler并不是直接就把deque拿來就粗暴的使用了昔榴,而且提供了一個比較高級的組織方法,它把待爬隊列按照優(yōu)先級建立了一個字典結(jié)構(gòu)碘橘,比如:
{
priority0:隊列0
priority1:隊列2
priority2:隊列2
}
然后根據(jù)request中的priority屬性互订,來決定該入哪個隊列。而出列時痘拆,則按priority較小的優(yōu)先出列仰禽。為了管理這個比較高級的隊列字典,Scheduler需要提供一系列的方法纺蛆。你要是換了redis做隊列吐葵,這個scrapy下的Scheduler就用不了,所以自己寫一個吧桥氏。于是就出現(xiàn)了scrapy-redis的專用scheduler温峭。
那么既然使用了redis做主要數(shù)據(jù)結(jié)構(gòu),能不能把其他使用自帶數(shù)據(jù)結(jié)構(gòu)關(guān)鍵功能模塊也換掉呢字支? 在我們爬取過程當(dāng)中凤藏,還有一個重要的功能模塊,就是request去重堕伪。scrapy中是如何實現(xiàn)這個去重功能的呢揖庄?用集合~scrapy中把已經(jīng)發(fā)送的request指紋放入到一個集合中,把下一個request的指紋拿到集合中比對欠雌,如果該指紋存在于集合中蹄梢,說明這個request發(fā)送過了,如果沒有則繼續(xù)操作富俄。
為了分布式禁炒,把這個集合也換掉吧,換了redis蛙酪,照樣也得把去重類給換了齐苛。于是就有了scrapy-redis的dupefilter。那么依次類推桂塞,接下來的其他組件(Pipeline和Spider)凹蜂,我們也可以輕松的猜到,他們是為什么要被修改呢。