scrapy-redis的源碼詳解

scrapy-redis是結(jié)合了分布式數(shù)據(jù)庫redis匾嘱,重寫了scrapy一些比較關(guān)鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲

scrapy-redis工程的主體還是redis和scrapy兩個庫橱乱,這個工程就像膠水一樣办成,把這兩個插件粘結(jié)了起來。

scrapy-redis所實現(xiàn)的兩種分布式:爬蟲分布式以及item處理分布式官疲。分別是由模塊scheduler和模塊pipelines實現(xiàn)袱结。

scrapy-redi重寫了scrapy一些比較關(guān)鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲途凫。

image
  • 作為一個分布式爬蟲垢夹,是需要有一個Master端(核心服務(wù)器)的,在Master端维费,會搭建一個Redis數(shù)據(jù)庫果元,用來存儲start_urls、request犀盟、items而晒。Master的職責(zé)是負責(zé)url指紋判重,Request的分配阅畴,以及數(shù)據(jù)的存儲(一般在Master端會安裝一個mongodb用來存儲redis中的items)倡怎。除了Master之外,還有一個角色就是slaver(爬蟲程序執(zhí)行端)贱枣,它主要負責(zé)執(zhí)行爬蟲程序爬取數(shù)據(jù)监署,并將爬取過程中新的Request提交到Master的redis數(shù)據(jù)庫中。

  • 如上圖纽哥,假設(shè)我們有四臺電腦:A钠乏, B, C春塌, D 晓避,其中任意一臺電腦都可以作為 Master端 或 Slaver端。整個流程是:

    • 首先Slaver端從Master端拿任務(wù)(Request只壳、url)進行數(shù)據(jù)抓取侠仇,Slaver抓取數(shù)據(jù)的同時守谓,產(chǎn)生新任務(wù)的Request便提交給 Master 處理钦购;
    • Master端只有一個Redis數(shù)據(jù)庫戴陡,負責(zé)將未處理的Request去重和任務(wù)分配梯投,將處理后的Request加入待爬隊列命辖,并且存儲爬取的數(shù)據(jù)。
  • Scrapy-Redis默認使用的就是這種策略分蓖,我們實現(xiàn)起來很簡單尔艇,因為任務(wù)調(diào)度等工作Scrapy-Redis都已經(jīng)幫我們做好了,我們只需要繼承RedisSpider么鹤、指定redis_key就行了终娃。

  • 缺點是,Scrapy-Redis調(diào)度的任務(wù)是Request對象蒸甜,里面信息量比較大(不僅包含url棠耕,還有callback函數(shù)余佛、headers等信息),可能導(dǎo)致的結(jié)果就是會降低爬蟲速度窍荧、而且會占用Redis大量的存儲空間辉巡,所以如果要保證效率,那么就需要一定硬件水平蕊退。

scrapy-redis的目錄結(jié)構(gòu)如下, 各個模塊功能見注釋

├── __init__.py
├── connection.py # 負責(zé)根據(jù)setting中配置實例化redis連接郊楣。被dupefilter和scheduler調(diào)用,總之涉及到redis存取的都要使用到這個模塊瓤荔。
├── defaults.py         # 一些默認參數(shù)配置
├── dupefilter.py       # 用于請求隊列的去重, 繼承了scrapy本身的去重器.當(dāng)request不重復(fù)時净蚤,將其存入到queue中,調(diào)度時將其彈出输硝。
├── picklecompat.py     # 使用pickle進行序列化
├── pipelines.py        # 它將Item存儲在redis中以實現(xiàn)分布式處理今瀑。
├── queue.py            # 調(diào)度隊列, 調(diào)度器會使用該隊列
├── scheduler.py        # 調(diào)度器, 負責(zé)任務(wù)的調(diào)度工作。其利用的數(shù)據(jù)結(jié)構(gòu)來自于queue中實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)腔丧。
├── spiders.py          # spider基類, 加入了信號等
└── utils.py            # spider從redis中讀取要爬的url,然后執(zhí)行爬取放椰,若爬取過程中返回更多的url,那么繼續(xù)進行直至所有的request完成愉粤。之后繼續(xù)從redis中讀取url砾医,循環(huán)這個過程。

defaults.py

import redis


# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

utils.py

import six


def bytes_to_str(s, encoding='utf-8'):
    """Returns a str if a bytes object is given."""
    if six.PY3 and isinstance(s, bytes):
        return s.decode(encoding)
    return s

spider.py

spider.py文件是分布式爬蟲的入口代碼:
  1衣厘、通過connection接口如蚜,spider初始化時,通過setup_redis()函數(shù)初始化好和redis的連接影暴。
  2错邦、通過next_requests函數(shù)從redis中取出strat url,spider使用少量的start url + LinkExtractor型宙,可以發(fā)展出很多新的url撬呢,這些url會進入scheduler進行判重和調(diào)度。直到spider跑到調(diào)度池內(nèi)沒有url的時候妆兑,會觸發(fā)spider_idle信號魂拦,從而觸發(fā)spider的next_requests函數(shù)。
  3搁嗓、再次從redis的start url池中讀取一些url芯勘。

spider空閑的時候會從start_urls隊列中讀取url, 默認一次讀取CONCURRENT_REQUESTS個url, 可以在settings中設(shè)置REDIS_START_URLS_BATCH_SIZE來改變每次的讀取數(shù)量, 一般我會在使用的時候增大這個值, 可以降低spide進入idle的次數(shù), 從而適當(dāng)提升抓取性能。

設(shè)計的這個spider從redis中讀取要爬的url腺逛,然后執(zhí)行爬取荷愕,若爬取過程中返回更多的url,那么繼續(xù)進行直至所有的request完成。之后繼續(xù)從redis中讀取url安疗,循環(huán)這個過程抛杨。

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider

from . import connection, defaults
from .utils import bytes_to_str

# 實現(xiàn)從redis的隊列中讀取url
class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    redis_key = None
    redis_batch_size = None
    redis_encoding = None

    # Redis client placeholder.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    # 鏈接redis
    def setup_redis(self, crawler=None):
        """初始化了redis參數(shù), 包括使用的種子url的key, 批量讀取url的數(shù)量等信息"""
        """Setup redis connection and idle signal.

        This should be called after the spider has set its crawler object.
        """
        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler', None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE',
                settings.getint('CONCURRENT_REQUESTS'),
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        if self.redis_encoding is None:
            self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                         self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        # 當(dāng)spider空閑的時候會觸發(fā)該信號, 調(diào)用spider_idle函數(shù)
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    # 這個方法的作用就是從redis中獲取start_url
    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
        fetch_one = self.server.spop if use_set else self.server.lpop
        # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        """Returns a Request instance from data coming from Redis.

        By default, ``data`` is an encoded URL. You can override this method to
        provide your own message decoding.

        Parameters
        ----------
        data : bytes
            Message from redis.

        """
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        # TODO: While there is capacity, schedule a batch of redis requests.
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)


    def spider_idle(self):
        """空閑的時候觸發(fā)該函數(shù), 嘗試請求下一批url. 有url的時候會直接請求, 最后都會拋出異常, 防止spider被關(guān)閉, 然后等待新的url過來"""
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_batch_size : int (default: CONCURRENT_REQUESTS)
        Number of messages to fetch from redis on each attempt.
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
        Default number of messages to fetch from redis on each attempt.
    REDIS_START_URLS_AS_SET : bool (default: False)
        Use SET operations to retrieve messages from the redis queue. If False,
        the messages are retrieve using the LPOP command.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.

    """

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_batch_size : int (default: CONCURRENT_REQUESTS)
        Number of messages to fetch from redis on each attempt.
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
        Default number of messages to fetch from redis on each attempt.
    REDIS_START_URLS_AS_SET : bool (default: True)
        Use SET operations to retrieve messages from the redis queue.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.

    """

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return objfrom scrapy import signals

spider的改動也不是很大,主要是通過connect接口荐类,給spider綁定了spider_idle信號蝶桶,spider初始化時,通過setup_redis函數(shù)初始化好和redis的連接掉冶,之后通過next_requests函數(shù)從redis中取出strat url真竖,使用的key是settings中REDIS_START_URLS_AS_SET定義的(注意了這里的初始化url池和我們上邊的queue的url池不是一個東西,queue的池是用于調(diào)度的厌小,初始化url池是存放入口url的恢共,他們都存在redis中,但是使用不同的key來區(qū)分璧亚,就當(dāng)成是不同的表吧)讨韭,spider使用少量的start url,可以發(fā)展出很多新的url癣蟋,這些url會進入scheduler進行判重和調(diào)度透硝。直到spider跑到調(diào)度池內(nèi)沒有url的時候,會觸發(fā)spider_idle信號疯搅,從而觸發(fā)spider的next_requests函數(shù)濒生,再次從redis的start url池中讀取一些url。

connection.py

負責(zé)根據(jù)setting中配置實例化redis連接幔欧。被dupefilter和scheduler調(diào)用罪治,總之涉及到redis存取的都要使用到這個模塊。

connect文件引入了redis模塊礁蔗,這個是redis-python庫的接口觉义,用于通過python訪問redis數(shù)據(jù)庫,可見浴井,這個文件主要是實現(xiàn)連接redis數(shù)據(jù)庫的功能(返回的是redis庫的Redis對象或者StrictRedis對象晒骇,這倆都是可以直接用來進行數(shù)據(jù)操作的對象)。這些連接接口在其他文件中經(jīng)常被用到磺浙。其中洪囤,我們可以看到,要想連接到redis數(shù)據(jù)庫屠缭,和其他數(shù)據(jù)庫差不多箍鼓,需要一個ip地址崭参、端口號呵曹、用戶名密碼(可選)和一個整形的數(shù)據(jù)庫編號,同時我們還可以在scrapy工程的setting文件中配置套接字的超時時間、等待時間等奄喂。

import six
from scrapy.utils.misc import load_object
from . import defaults

# 快速映射settings配置文件中redis的基礎(chǔ)配置字典
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP = {
    'REDIS_URL': 'url',
    'REDIS_HOST': 'host',
    'REDIS_PORT': 'port',
    'REDIS_ENCODING': 'encoding',
}

# 根據(jù)scrapy中settings配置文件信息返回一個redis客戶端實例對象
def get_redis_from_settings(settings):
    """Returns a redis client instance from given Scrapy settings object.

    This function uses ``get_client`` to instantiate the client and uses
    ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
    can override them using the ``REDIS_PARAMS`` setting.

    Parameters
    ----------
    settings : Settings
        A scrapy settings object. See the supported settings below.

    Returns
    -------
    server
        Redis client instance.

    Other Parameters
    ----------------
    REDIS_URL : str, optional
        Server connection URL.
    REDIS_HOST : str, optional
        Server host.
    REDIS_PORT : str, optional
        Server port.
    REDIS_ENCODING : str, optional
        Data encoding.
    REDIS_PARAMS : dict, optional
        Additional client parameters.

    """
    params = defaults.REDIS_PARAMS.copy()
    params.update(settings.getdict('REDIS_PARAMS'))
    # XXX: Deprecate REDIS_* settings.
    for source, dest in SETTINGS_PARAMS_MAP.items():
        val = settings.get(source)
        if val:
            params[dest] = val

    # Allow ``redis_cls`` to be a path to a class.
    if isinstance(params.get('redis_cls'), six.string_types):
        params['redis_cls'] = load_object(params['redis_cls'])

    return get_redis(**params)


# Backwards compatible alias.
from_settings = get_redis_from_settings


# 返回一個redis的Strictredis實例對象
def get_redis(**kwargs):
    """Returns a redis client instance.

    Parameters
    ----------
    redis_cls : class, optional
        Defaults to ``redis.StrictRedis``.
    url : str, optional
        If given, ``redis_cls.from_url`` is used to instantiate the class.
    **kwargs
        Extra parameters to be passed to the ``redis_cls`` class.

    Returns
    -------
    server
        Redis client instance.

    """
    redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
    url = kwargs.pop('url', None)
    if url:
        return redis_cls.from_url(url, **kwargs)
    else:
        return redis_cls(**kwargs)

scheduler.py

這個文件重寫了scheduler類铐殃,用來代替scrapy.core.scheduler的原有調(diào)度器。實現(xiàn)原理是使用指定的一個redis內(nèi)存作為數(shù)據(jù)存儲的媒介跨新,以達到各個爬蟲之間的統(tǒng)一調(diào)度富腊。
  1、scheduler負責(zé)調(diào)度各個spider的request請求域帐,scheduler初始化時赘被,通過settings文件讀取queue和dupefilters(url去重)的類型,配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters肖揣,這樣對于同一種spider的不同實例民假,就會使用相同的數(shù)據(jù)塊了)。
  2龙优、每當(dāng)一個request要被調(diào)度時羊异,enqueue_request被調(diào)用,scheduler使用dupefilters來判斷這個url是否重復(fù)彤断,如果不重復(fù)野舶,就添加到queue的容器中(三種隊列方式:先進先出,先進后出和優(yōu)先級都可以宰衙,可以在settings中配置)平道。
  3、當(dāng)調(diào)度完成時供炼,next_request被調(diào)用巢掺,scheduler就通過queue容器的接口,取出一個request劲蜻,把他發(fā)送給相應(yīng)的spider陆淀,讓spider進行爬取工作。

此擴展是對scrapy中自帶的scheduler的替代(在settings的SCHEDULER變量中指出)先嬉,正是利用此擴展實現(xiàn)crawler的分布式調(diào)度轧苫。其利用的數(shù)據(jù)結(jié)構(gòu)來自于queue中實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)。

scrapy-redis所實現(xiàn)的兩種分布式:爬蟲分布式以及item處理分布式就是由模塊scheduler和模塊pipelines實現(xiàn)疫蔓。上述其它模塊作為為二者輔助的功能模塊含懊。

替換了scrapy原生的scheduler, 所有方法名稱和原生scheduler保持一致, 在爬蟲開啟后會連接待抓取隊列和去重集合, 然后就是不斷把新的請求去重后放入待抓取隊列, 然后從待抓取隊列拿出請求給下載器

調(diào)度器肯定要和請求隊列去重隊列進行交互, 所以初始化要獲取使用的queuedupfilter的類, 并在open方法中完成實例化

import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults


# TODO: add SCRAPY_JOB support.
class Scheduler(object):
    """Redis-based scheduler

    Settings
    --------
    SCHEDULER_PERSIST : bool (default: False)
        Whether to persist or clear redis queue.
    SCHEDULER_FLUSH_ON_START : bool (default: False)
        Whether to flush redis queue on start.
    SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
        How many seconds to wait before closing if no message is received.
    SCHEDULER_QUEUE_KEY : str
        Scheduler redis key.
    SCHEDULER_QUEUE_CLASS : str
        Scheduler queue class.
    SCHEDULER_DUPEFILTER_KEY : str
        Scheduler dupefilter redis key.
    SCHEDULER_DUPEFILTER_CLASS : str
        Scheduler dupefilter class.
    SCHEDULER_SERIALIZER : str
        Scheduler serializer.

    """

    def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key=defaults.SCHEDULER_QUEUE_KEY,
                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                 idle_before_close=0,
                 serializer=None):
        """Initialize scheduler.

        Parameters
        ----------
        server : Redis
            The redis server instance.
        persist : bool
            Whether to flush requests when closing. Default is False.
        flush_on_start : bool
            Whether to flush requests on start. Default is False.
        queue_key : str
            Requests queue key.
        queue_cls : str
            Importable path to the queue class.
        dupefilter_key : str
            Duplicates filter key.
        dupefilter_cls : str
            Importable path to the dupefilter class.
        idle_before_close : int
            Timeout before giving up.

        """
        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")

        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None

    def __len__(self):
        return len(self.queue)

    @classmethod
    def from_settings(cls, settings):   #settings是傳過來的配置文件信息
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }

        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }

        # 讀取上面的配置文件,取settings里面找到相對應(yīng)的值衅胀,拿到settings后面的結(jié)果
        for name, setting_name in optional.items():
            val = settings.get(setting_name)        # 匹配settings對應(yīng)的值出來(自己配置的).取配置文件settings里面拿到相對應(yīng)的值出來岔乔,settings里面的鍵是在這里面循環(huán)拿到的(optional),也就是optional后面的值,對應(yīng)settinsg里面的鍵
            if val:
                kwargs[name] = val                  # 存進去

        # Support serializer as a path to a module.
        # 序列化操作,爬蟲key序列化
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()

        return cls(server=server, **kwargs)

    @classmethod
    def from_crawler(cls, crawler): # 當(dāng)你執(zhí)行調(diào)度器scrapy-redis的時候滚躯,就會傳入settigs進來雏门,配置信息是在crawler.settings
        instance = cls.from_settings(crawler.settings)  # #crawlwe.settinsg拿到的是setting對象<scrapy.settings.Settings object at 0x00000265B2E41940>
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance

    def open(self, spider):
        """爬蟲啟動時觸發(fā), 主要是連接待抓取和去重模塊"""
        self.spider = spider

        try:
            # 得到隊列queue的實例化對象
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {'spider': spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)

        try:
            # 得到去重的實例化對象
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                key=self.dupefilter_key % {'spider': spider.name},
                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                             self.dupefilter_cls, e)

        if self.flush_on_start: # 如果為True, 要在爬蟲開啟前刪除對應(yīng)爬蟲request隊列和dupfilter隊列
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

    def close(self, reason):
        if not self.persist:
            self.flush()

    def flush(self):
        self.df.clear()
        self.queue.clear()

    def enqueue_request(self, request):
        """把請求去重后放入 待抓取隊列中"""
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        """從請求隊列拿出下一個請求并返回"""
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

    def has_pending_requests(self):
        return len(self) > 0

這個文件重寫了scheduler類嘿歌,用來代替scrapy.core.scheduler的原有調(diào)度器。其實對原有調(diào)度器的邏輯沒有很大的改變茁影,主要是使用了redis作為數(shù)據(jù)存儲的媒介宙帝,以達到各個爬蟲之間的統(tǒng)一調(diào)度。

scheduler負責(zé)調(diào)度各個spider的request請求募闲,scheduler初始化時步脓,通過settings文件讀取queue和dupefilters的類型(一般就用上邊默認的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters浩螺,這樣對于同一種spider的不同實例靴患,就會使用相同的數(shù)據(jù)塊了)。每當(dāng)一個request要被調(diào)度時要出,enqueue_request被調(diào)用蚁廓,scheduler使用dupefilters來判斷這個url是否重復(fù),如果不重復(fù)厨幻,就添加到queue的容器中(先進先出相嵌,先進后出和優(yōu)先級都可以,可以在settings中配置)况脆。當(dāng)調(diào)度完成時饭宾,next_request被調(diào)用,scheduler就通過queue容器的接口格了,取出一個request看铆,把他發(fā)送給相應(yīng)的spider,讓spider進行爬取工作盛末。

dupefilter.py

分布式爬蟲url去重原理:
  通過分析可以知道self.server為redis實例弹惦,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一個spider是相同的,redis是一個key-value的數(shù)據(jù)庫悄但,如果key是相同的棠隐,訪問到的值就是相同的,默認使用spider名字 + fingerpoint的key就是為了區(qū)分在不同主機上的不同spider實例檐嚣,只要數(shù)據(jù)是同一個spider助泽,就會訪問到redis中的同一個spider-set而這個set就是url的判重池)。去重指紋計算使用的是sha1算法, 計算值包括請求方法, url, body等信息

負責(zé)執(zhí)行requst的去重嚎京,實現(xiàn)的很有技巧性嗡贺,使用redis的set數(shù)據(jù)結(jié)構(gòu)。但是注意scheduler并不使用其中用于在這個模塊中實現(xiàn)的dupefilter鍵做request的調(diào)度鞍帝,而是使用queue.py模塊中實現(xiàn)的queue诫睬。當(dāng)request不重復(fù)時,將其存入到queue中帕涌,調(diào)度時將其彈出摄凡。

import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings


logger = logging.getLogger(__name__)

# TODO: Rename class to RedisDupeFilter.
# 對請求做去重處理续徽,可以被分布式下不同的schedule調(diào)用
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.

    This class can also be used with default Scrapy's scheduler.

    """

    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.

        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.

        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    # 通過settings配置文件信息返回一個redis示例對象
    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.

        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.

        Parameters
        ----------
        settings : scrapy.settings.Settings

        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.


        """
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.

        Parameters
        ----------
        crawler : scrapy.crawler.Crawler

        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.

        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """獲取請求指紋并添加到redis的去重集合中去"""
        """Returns True if request was already seen.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        bool

        """
        fp = self.request_fingerprint(request)      # 得到請求的指紋
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)      # 把指紋添加到redis的集合中
        return added == 0


    # 這個方法是用來調(diào)用request_fingerprint接口的,這個接口通過sha1算法來判斷兩個url請求地址是否相同(注意架谎,這里面不完全是我們之前理解的hash了,如果兩個url的地址相同辟躏,請求方式和參數(shù)都相同谷扣,,但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址,)從而達到url的去重功能捎琐。
    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)     # 得到請求指紋

    # Scrapy's scheduler調(diào)用会涎,刪除數(shù)據(jù),關(guān)閉連接
    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.

        Parameters
        ----------
        reason : str, optional

        """
        self.clear()

    # 清空操作記錄數(shù)據(jù)
    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    # 請求日志信息
    def log(self, request, spider):
        """Logs given request.

        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider

        """
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

這個文件看起來比較復(fù)雜瑞凑,重寫了scrapy本身已經(jīng)實現(xiàn)的request判重功能末秃。因為本身scrapy單機跑的話,只需要讀取內(nèi)存中的request隊列或者持久化的request隊列(scrapy默認的持久化似乎是json格式的文件籽御,不是數(shù)據(jù)庫)就能判斷這次要發(fā)出的request url是否已經(jīng)請求過或者正在調(diào)度(本地讀就行了)练慕。而分布式跑的話,就需要各個主機上的scheduler都連接同一個數(shù)據(jù)庫的同一個request池來判斷這次的請求是否是重復(fù)的了技掏。

在這個文件中铃将,通過繼承BaseDupeFilter重寫他的方法,實現(xiàn)了基于redis的判重哑梳。根據(jù)源代碼來看劲阎,scrapy-redis使用了scrapy本身的一個fingerprint接request_fingerprint,這個接口很有趣鸠真,根據(jù)scrapy文檔所說悯仙,他通過hash來判斷兩個url是否相同(相同的url會生成相同的hash結(jié)果),但是當(dāng)兩個url的地址相同吠卷,get型參數(shù)相同但是順序不同時锡垄,也會生成相同的hash結(jié)果(這個真的比較神奇)所以scrapy-redis依舊使用url的fingerprint來判斷request請求是否已經(jīng)出現(xiàn)過。這個類通過連接redis祭隔,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一種spider是相同的偎捎,redis是一個key-value的數(shù)據(jù)庫,如果key是相同的序攘,訪問到的值就是相同的茴她,這里使用spider名字+DupeFilter的key就是為了在不同主機上的不同爬蟲實例,只要屬于同一種spider程奠,就會訪問到同一個set丈牢,而這個set就是他們的url判重池),如果返回值為0瞄沙,說明該set中該fingerprint已經(jīng)存在(因為集合是沒有重復(fù)值的)己沛,則返回False慌核,如果返回值為1,說明添加了一個fingerprint到set中申尼,則說明這個request沒有重復(fù)垮卓,于是返回True,還順便把新fingerprint加入到數(shù)據(jù)庫中了师幕。 DupeFilter判重會在scheduler類中用到粟按,每一個request在進入調(diào)度之前都要進行判重,如果重復(fù)就不需要參加調(diào)度霹粥,直接舍棄就好了灭将,不然就是白白浪費資源。

image

request.py

request_fingerprint接口:
通過request_fingerprint接口后控,通過sha1算法來判斷兩個url請求地址是否相同(注意庙曙,這里面不完全是我們之前理解的hash了,如果兩個url的地址相同浩淘,請求方式和參數(shù)都相同捌朴,但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111)從而達到url的去重功能。

"""This module provides some useful functions for working with scrapy.http.Request objects"""
from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached

_fingerprint_cache = weakref.WeakKeyDictionary()
def request_fingerprint(request, include_headers=None):
    """Return the request fingerprint"""
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

queue.py

這是個隊列類张抄,它會作為scheduler調(diào)度request的容器來維護一個秩序:
  1男旗、 scheduler在每個主機上都會實例化一個,并且和spider一一對應(yīng)欣鳖,所以分布式運行時會有一個spider的多個實例和一個scheduler的多個實例存在于不同的主機上察皇。
  2、因為scheduler都是用相同的容器泽台,而這些容器都連接同一個 redis服務(wù)器什荣,又都使用spider名 + queue來作為key 讀寫數(shù)據(jù),所以不同主機上的不同爬蟲實例公用一個request調(diào)度池怀酷,實現(xiàn)了分布式爬蟲之間的統(tǒng)一調(diào)度稻爬。

其作用如dupefilter.py所述,但是這里實現(xiàn)了三種方式的queue:FIFO的SpiderQueue蜕依,SpiderPriorityQueue桅锄,以及LIFI的SpiderStack。默認使用的是第二種

from scrapy.utils.reqser import request_to_dict, request_from_dict
from . import picklecompat

# 隊列基類
class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        """Initialize per-spider redis queue.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        spider : Spider
            Scrapy spider instance.
        key: str
            Redis key where to put and get messages.
        serializer : object
            Serializer object with ``loads`` and ``dumps`` methods.

        """
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)

#隊列----先進先出
class FifoQueue(Base):
    """使用了redis的list結(jié)構(gòu)"""
    """Per-spider FIFO queue"""

    def __len__(self):
        """返回隊列長度大小"""
        """Return the length of the queue"""
        return self.server.llen(self.key)

    # request 進棧样眠,進棧前對request做處理友瘤,request請求先被scrapy的接口request_to_dict變成了一個dict對象(因為request對象實在是比較復(fù)雜,有方法有屬性不好串行化)檐束,,之后使用picklecompat中的serializer串行化為字符串辫秧,然后使用一個特定的key存入redis中,(該key在同一種spider中是相同的)
    def push(self, request):
        """發(fā)送請求到隊列左邊"""
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    # request出棧,其實就是從redis用那個特定的key去讀其值(一個list)被丧,從list中讀取最早進去的那個盟戏,于是就先進先出了.
    def pop(self, timeout=0):
        """從隊列右邊拋出請求"""
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

# 優(yōu)先級隊列
class PriorityQueue(Base):
    """使用了redis的有序集合結(jié)構(gòu)"""
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """返回隊列長度大小"""
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """放入請求到zset中"""
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don't use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """從zset中拋出請求. 此處不支持timeout參數(shù)"""
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

# 棧----后進先出, 使用list結(jié)構(gòu)實現(xiàn),和先進先出隊列基本一樣, 實現(xiàn)了棧結(jié)構(gòu)
class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)


# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue

該文件實現(xiàn)了幾個容器類绪妹,可以看這些容器和redis交互頻繁,同時使用了我們上邊picklecompat中定義的serializer柿究。這個文件實現(xiàn)的幾個容器大體相同邮旷,只不過一個是隊列,一個是棧蝇摸,一個是優(yōu)先級隊列婶肩,這三個容器到時候會被scheduler對象實例化,來實現(xiàn)request的調(diào)度探入。比如我們使用SpiderQueue作為調(diào)度隊列的類型狡孔,到時候request的調(diào)度方法就是先進先出懂诗,而實用SpiderStack就是先進后出了蜂嗽。

我們可以仔細看看SpiderQueue的實現(xiàn),他的push函數(shù)就和其他容器的一樣殃恒,只不過push進去的request請求先被scrapy的接口request_to_dict變成了一個dict對象(因為request對象實在是比較復(fù)雜植旧,有方法有屬性不好串行化),之后使用picklecompat中的serializer串行化為字符串离唐,然后使用一個特定的key存入redis中(該key在同一種spider中是相同的)病附。而調(diào)用pop時,其實就是從redis用那個特定的key去讀其值(一個list)亥鬓,從list中讀取最早進去的那個完沪,于是就先進先出了。

這些容器類都會作為scheduler調(diào)度request的容器嵌戈,scheduler在每個主機上都會實例化一個覆积,并且和spider一一對應(yīng),所以分布式運行時會有一個spider的多個實例和一個scheduler的多個實例存在于不同的主機上熟呛,但是宽档,因為scheduler都是用相同的容器,而這些容器都連接同一個redis服務(wù)器庵朝,又都使用spider名加queue來作為key讀寫數(shù)據(jù)吗冤,所以不同主機上的不同爬蟲實例公用一個request調(diào)度池,實現(xiàn)了分布式爬蟲之間的統(tǒng)一調(diào)度九府。

picklecompat.py

這里實現(xiàn)了loads和dumps兩個函數(shù)椎瘟,其實就是實現(xiàn)了一個serializer:
  1、因為redis數(shù)據(jù)庫不能存儲復(fù)雜對象(value部分只能是字符串侄旬,字符串列表降传,字符串集合和hash,key部分只能是字符串)勾怒,所以我們存啥都要先串行化成文本才行婆排。這里使用的就是python的pickle模塊声旺,一個兼容py2和py3的串行化工具。

"""A pickle wrapper module with protocol=-1 by default."""
try:
    import cPickle as pickle  # PY2
except ImportError:
    import pickle

def loads(s):
    return pickle.loads(s)

def dumps(obj):
    return pickle.dumps(obj, protocol=-1)

這里實現(xiàn)了loads和dumps兩個函數(shù)段只,其實就是實現(xiàn)了一個serializer腮猖,因為redis數(shù)據(jù)庫不能存儲復(fù)雜對象(value部分只能是字符串,字符串列表赞枕,字符串集合和hash澈缺,key部分只能是字符串),所以我們存啥都要先串行化成文本才行炕婶。這里使用的就是python的pickle模塊姐赡,一個兼容py2和py3的串行化工具。這個serializer主要用于一會的scheduler存reuqest對象柠掂,至于為什么不實用json格式项滑,我也不是很懂,item pipeline的串行化默認用的就是json涯贞。

pipelines.py

pipeline.py文件用來實現(xiàn)數(shù)據(jù)分布式處理枪狂。它通過從settings中拿到我們配置的REDIS_ITEMS_KEY作為key,把item串行化之后存入redis數(shù)據(jù)庫對應(yīng)的value中(這個value可以看出是個list宋渔,我們的每個item是這個list中的一個結(jié)點)州疾,這個pipeline把提取出的item存起來,主要是為了方便我們延后處理數(shù)據(jù)皇拣。

這是是用來實現(xiàn)分布式處理的作用严蓖。它將Item存儲在redis中以實現(xiàn)分布式處理。另外可以發(fā)現(xiàn)氧急,同樣是編寫pipelines颗胡,在這里的編碼實現(xiàn)不同于文章中所分析的情況,由于在這里需要讀取配置态蒂,所以就用到了from_crawler()函數(shù)杭措。

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):

    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )
        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        return self.key % {'spider': spider.name}

pipeline文件實現(xiàn)了一個item pipieline類,和scrapy的item pipeline是同一個對象钾恢,通過從settings中拿到我們配置的REDIS_ITEMS_KEY作為key手素,把item串行化之后存入redis數(shù)據(jù)庫對應(yīng)的value中(這個value可以看出出是個list,我們的每個item是這個list中的一個結(jié)點)瘩蚪,這個pipeline把提取出的item存起來盛险,主要是為了方便我們延后處理數(shù)據(jù)惧眠。

最后總結(jié)一下scrapy-redis的總體思路:這個工程通過重寫scheduler和spider類暇番,實現(xiàn)了scheduler調(diào)度噩翠、spider啟動和固定redis的交互。實現(xiàn)新的dupefilter和queue類,達到了去重和調(diào)度容器和redis的交互邓嘹,因為每個主機上的爬蟲進程都訪問同一個redis數(shù)據(jù)庫酣栈,所以調(diào)度和去重都統(tǒng)一進行統(tǒng)一管理,達到了分布式爬蟲的目的汹押。

組件之間的關(guān)系

image

當(dāng)spider被初始化時矿筝,同時會初始化一個對應(yīng)的scheduler對象,這個調(diào)度器對象通過讀取settings棚贾,配置好自己的調(diào)度容器queue和判重工具dupefilter窖维。每當(dāng)一個spider產(chǎn)出一個request的時候,scrapy內(nèi)核會把這個reuqest遞交給這個spider對應(yīng)的scheduler對象進行調(diào)度妙痹,scheduler對象通過訪問redis對request進行判重铸史,如果不重復(fù)就把他添加進redis中的調(diào)度池。當(dāng)調(diào)度條件滿足時怯伊,scheduler對象就從redis的調(diào)度池中取出一個request發(fā)送給spider琳轿,讓他爬取。當(dāng)spider爬取的所有暫時可用url之后震贵,scheduler發(fā)現(xiàn)這個spider對應(yīng)的redis的調(diào)度池空了利赋,于是觸發(fā)信號spider_idle水评,spider收到這個信號之后猩系,直接連接redis讀取strart url池,拿去新的一批url入口中燥,然后再次重復(fù)上邊的工作寇甸。

為什么要提供這些組件?

我們先從scrapy的“待爬隊列”和“Scheduler”入手:玩過爬蟲的同學(xué)都多多少少有些了解疗涉,在爬蟲爬取過程當(dāng)中有一個主要的數(shù)據(jù)結(jié)構(gòu)是“待爬隊列”拿霉,以及能夠操作這個隊列的調(diào)度器(也就是Scheduler)。scrapy官方文檔對這二者的描述不多咱扣,基本上沒提绽淘。

scrapy使用什么樣的數(shù)據(jù)結(jié)構(gòu)來存放待爬取的request呢?其實沒用高大上的數(shù)據(jù)結(jié)構(gòu)闹伪,就是python自帶的collection.deque(改造過后的)沪铭,問題來了,該怎么讓兩個以上的Spider共用這個deque呢偏瓤?

scrapy-redis提供了一個解決方法杀怠,把deque換成redis數(shù)據(jù)庫,我們從同一個redis服務(wù)器存放要爬取的request厅克,這樣就能讓多個spider去同一個數(shù)據(jù)庫里讀取赔退,這樣分布式的主要問題就解決了嘛。

那么問題又來了,我們換了redis來存放隊列硕旗,哪scrapy就能直接分布式了么窗骑?。scrapy中跟“待爬隊列”直接相關(guān)的就是調(diào)度器“Scheduler”漆枚,它負責(zé)對新的request進行入列操作(加入deque)慧域,取出下一個要爬取的request(從deque中取出)等操作。在scrapy中浪读,Scheduler并不是直接就把deque拿來就粗暴的使用了昔榴,而且提供了一個比較高級的組織方法,它把待爬隊列按照優(yōu)先級建立了一個字典結(jié)構(gòu)碘橘,比如:

{
    priority0:隊列0
    priority1:隊列2
    priority2:隊列2
}

然后根據(jù)request中的priority屬性互订,來決定該入哪個隊列。而出列時痘拆,則按priority較小的優(yōu)先出列仰禽。為了管理這個比較高級的隊列字典,Scheduler需要提供一系列的方法纺蛆。你要是換了redis做隊列吐葵,這個scrapy下的Scheduler就用不了,所以自己寫一個吧桥氏。于是就出現(xiàn)了scrapy-redis的專用scheduler温峭。

那么既然使用了redis做主要數(shù)據(jù)結(jié)構(gòu),能不能把其他使用自帶數(shù)據(jù)結(jié)構(gòu)關(guān)鍵功能模塊也換掉呢字支? 在我們爬取過程當(dāng)中凤藏,還有一個重要的功能模塊,就是request去重堕伪。scrapy中是如何實現(xiàn)這個去重功能的呢揖庄?用集合~scrapy中把已經(jīng)發(fā)送的request指紋放入到一個集合中,把下一個request的指紋拿到集合中比對欠雌,如果該指紋存在于集合中蹄梢,說明這個request發(fā)送過了,如果沒有則繼續(xù)操作富俄。

為了分布式禁炒,把這個集合也換掉吧,換了redis蛙酪,照樣也得把去重類給換了齐苛。于是就有了scrapy-redis的dupefilter。那么依次類推桂塞,接下來的其他組件(Pipeline和Spider)凹蜂,我們也可以輕松的猜到,他們是為什么要被修改呢。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末玛痊,一起剝皮案震驚了整個濱河市汰瘫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌擂煞,老刑警劉巖混弥,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異对省,居然都是意外死亡蝗拿,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門蒿涎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哀托,“玉大人,你說我怎么就攤上這事劳秋〔质郑” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵玻淑,是天一觀的道長嗽冒。 經(jīng)常有香客問我,道長补履,這世上最難降的妖魔是什么添坊? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮干像,結(jié)果婚禮上帅腌,老公的妹妹穿的比我還像新娘驰弄。我一直安慰自己麻汰,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布戚篙。 她就那樣靜靜地躺著五鲫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪岔擂。 梳的紋絲不亂的頭發(fā)上位喂,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天,我揣著相機與錄音乱灵,去河邊找鬼塑崖。 笑死,一個胖子當(dāng)著我的面吹牛痛倚,可吹牛的內(nèi)容都是我干的规婆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼抒蚜!你這毒婦竟也來了掘鄙?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤嗡髓,失蹤者是張志新(化名)和其女友劉穎操漠,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饿这,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡浊伙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了长捧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吧黄。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖唆姐,靈堂內(nèi)的尸體忽然破棺而出拗慨,到底是詐尸還是另有隱情,我是刑警寧澤奉芦,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布赵抢,位于F島的核電站,受9級特大地震影響声功,放射性物質(zhì)發(fā)生泄漏烦却。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一先巴、第九天 我趴在偏房一處隱蔽的房頂上張望其爵。 院中可真熱鬧,春花似錦伸蚯、人聲如沸摩渺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摇幻。三九已至,卻和暖如春挥萌,著一層夾襖步出監(jiān)牢的瞬間绰姻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工引瀑, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留狂芋,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓憨栽,卻偏偏與公主長得像帜矾,于是被迫代替她去往敵國和親辆影。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,066評論 2 355