Scrapy-redis的官方文檔寫的比較簡潔乖阵,沒有提及其運行原理,所以如果想全面的理解分布式爬蟲的運行原理采桃,還是得看scrapy-redis的源代碼才行土浸,不過scrapy-redis的源代碼很少,也比較好懂蕾殴,很快就能看完笑撞。
- Scrapy-redis的源碼大致分為以下py文件:connection.py + spider.py
spider.py文件是分布式爬蟲的入口代碼:
1、通過connection接口钓觉,spider初始化時茴肥,通過setup_redis()函數(shù)初始化好和redis的連接。
2荡灾、通過next_requests函數(shù)從redis中取出strat url瓤狐,spider使用少量的start url + LinkExtractor,可以發(fā)展出很多新的url批幌,這些url會進入scheduler進行判重和調(diào)度础锐。直到spider跑到調(diào)度池內(nèi)沒有url的時候,會觸發(fā)spider_idle信號荧缘,從而觸發(fā)spider的next_requests函數(shù)皆警。
3、再次從redis的start url池中讀取一些url截粗。
connection.py
import six
from scrapy.utils.misc import load_object
from . import defaults
# 快速映射settings配置文件中redis的基礎配置字典
SETTINGS_PARAMS_MAP = {
'REDIS_URL': 'url',
'REDIS_HOST': 'host',
'REDIS_PORT': 'port',
'REDIS_ENCODING': 'encoding',
}
# 根據(jù)scrapy中settings配置文件信息返回一個redis客戶端實例對象
def get_redis_from_settings(settings):
params = defaults.REDIS_PARAMS.copy()
params.update(settings.getdict('REDIS_PARAMS'))
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val
if isinstance(params.get('redis_cls'), six.string_types):
params['redis_cls'] = load_object(params['redis_cls'])
return get_redis(**params)
# 返回一個redis的Strictredis實例對象
def get_redis(**kwargs):
redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
url = kwargs.pop('url', None)
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)
spider.py
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection, defaults
from .utils import bytes_to_str
# 實現(xiàn)從redis的隊列中讀取url
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
# 鏈接redis
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal."""
pass
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
self.server = connection.from_settings(crawler.settings)
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
# 這個方法 的作用就是從redis中獲取start_url
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis."""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle."""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
scheduler.py
這個文件重寫了scheduler類耀怜,用來代替scrapy.core.scheduler的原有調(diào)度器。實現(xiàn)原理是使用指定的一個redis內(nèi)存作為數(shù)據(jù)存儲的媒介桐愉,以達到各個爬蟲之間的統(tǒng)一調(diào)度财破。
1、scheduler負責調(diào)度各個spider的request請求从诲,scheduler初始化時左痢,通過settings文件讀取queue和dupefilters(url去重)的類型,配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,這樣對于同一種spider的不同實例俊性,就會使用相同的數(shù)據(jù)塊了)略步。
2、每當一個request要被調(diào)度時定页,enqueue_request被調(diào)用趟薄,scheduler使用dupefilters來判斷這個url是否重復,如果不重復典徊,就添加到queue的容器中(三種隊列方式:先進先出杭煎,先進后出和優(yōu)先級都可以,可以在settings中配置)卒落。
3羡铲、當調(diào)度完成時,next_request被調(diào)用儡毕,scheduler就通過queue容器的接口也切,取出一個request,把他發(fā)送給相應的spider腰湾,讓spider進行爬取工作雷恃。
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults
class Scheduler(object):
def __init__(self, server,
pass
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
optional = {
pass
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
pass
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0
dupefilter.py
分布式爬蟲url去重原理:
通過分析可以知道self.server為redis實例,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一個spider是相同的费坊,redis是一個key-value的數(shù)據(jù)庫褂萧,如果key是相同的,訪問到的值就是相同的葵萎,默認使用spider名字 + fingerpoint的key就是為了區(qū)分在不同主機上的不同spider實例导犹,只要數(shù)據(jù)是同一個spider,就會訪問到redis中的同一個spider-set而這個set就是url的判重池)羡忘。
import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings
logger = logging.getLogger(__name__)
# 對請求做去重處理谎痢,可以被分布式下不同的schedule調(diào)用
class RFPDupeFilter(BaseDupeFilter):
logger = logger
def __init__(self, server, key, debug=False):
self.server = server
self.key = key
self.debug = debug
self.logdupes = True
# 通過settings配置文件信息返回一個redis示例對象
@classmethod
def from_settings(cls, settings):
server = get_redis_from_settings(settings)
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def request_seen(self, request):
fp = self.request_fingerprint(request)
added = self.server.sadd(self.key, fp)
return added == 0
# 這個方法是用來調(diào)用request_fingerprint接口的,這個接口通過sha1算法來判斷兩個url請
#求地址是否相同(注意卷雕,這里面不完全是我們之前理解的hash了节猿,如果兩個url的地址相同,請求方式和參數(shù)都相同漫雕,
#但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址滨嘱,)從而達到url的去重功能。
def request_fingerprint(self, request):
return request_fingerprint(request)
# Scrapy's scheduler調(diào)用浸间,刪除數(shù)據(jù)太雨,關(guān)閉連接
def close(self, reason=''):
self.clear()
# 清空操作記錄數(shù)據(jù)
def clear(self):
"""Clears fingerprints data."""
self.server.delete(self.key)
# 請求日志信息
def log(self, request, spider):
pass
request.py
request_fingerprint接口:
通過request_fingerprint接口,通過sha1算法來判斷兩個url請求地址是否相同(注意魁蒜,這里面不完全是我們之前理解的hash了囊扳,如果兩個url的地址相同吩翻,請求方式和參數(shù)都相同,但是請求參數(shù)的前后順序不同的話也別判定為同一個url地址
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111)從而達到url的去重功能锥咸。
"""This module provides some useful functions for working with scrapy.http.Request objects"""
from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached
_fingerprint_cache = weakref.WeakKeyDictionary()
def request_fingerprint(request, include_headers=None):
"""Return the request fingerprint"""
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
return cache[include_headers]
queue.py
這是個隊列類狭瞎,它會作為scheduler調(diào)度request的容器來維護一個秩序:
1、 scheduler在每個主機上都會實例化一個搏予,并且和spider一一對應熊锭,所以分布式運行時會有一個spider的多個實例和一個scheduler的多個實例存在于不同的主機上。
2雪侥、因為scheduler都是用相同的容器碗殷,而這些容器都連接同一個 redis服務器,又都使用spider名 + queue來作為key 讀寫數(shù)據(jù)校镐,所以不同主機上的不同爬蟲實例公用一個request調(diào)度池,實現(xiàn)了分布式爬蟲之間的統(tǒng)一調(diào)度捺典。
from scrapy.utils.reqser import request_to_dict, request_from_dict
from . import picklecompat
# 隊列基類
class Base(object):
def __init__(self, server, spider, key, serializer=None):
pass
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
#隊列----先進先出
class FifoQueue(Base):
"""Per-spider FIFO queue"""
def __len__(self):
"""Return the length of the queue"""
return self.server.llen(self.key)
def push(self, request):
# request 進棧鸟廓,進棧前對request做處理,request請求先被 scrapy的接口request_to_dict
#變成了一個dict對象(因為request對象實在#是比較復雜襟己,有方法有屬性不好串行化)引谜,
#之后使用picklecompat中的serializer串行化為字符串,然后使用一個特定的key存入redis中
#(該key在同一種spider中是相同的)
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
# request出棧擎浴,其實就是從redis用那個特定的key去讀其值(一個list)员咽,
#從list中讀取最早進去的那個,于是就先進先出了.
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
# 優(yōu)先級隊列
class PriorityQueue(Base):
pass
# 棧----后進先出
class LifoQueue(Base):
pass
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
picklecompat.py
這里實現(xiàn)了loads和dumps兩個函數(shù)贮预,其實就是實現(xiàn)了一個serializer:
1贝室、因為redis數(shù)據(jù)庫不能存儲復雜對象(value部分只能是字符串,字符串列表仿吞,字符串集合和hash滑频,key部分只能是字符串),所以我們存啥都要先串行化成文本才行唤冈。這里使用的就是python的pickle模塊峡迷,一個兼容py2和py3的串行化工具。
"""A pickle wrapper module with protocol=-1 by default."""
try:
import cPickle as pickle # PY2
except ImportError:
import pickle
def loads(s):
return pickle.loads(s)
def dumps(obj):
return pickle.dumps(obj, protocol=-1)
pipelines.py
pipelines.py中類的作用:
pipeline.py文件用來實現(xiàn)數(shù)據(jù)分布式處理你虹。它通過從settings中拿到我們配置的REDIS_ITEMS_KEY作為key绘搞,把item串行化之后存入redis數(shù)據(jù)庫對應的value中(這個value可以看出是個list,我們的每個item是這個list中的一個結(jié)點)傅物,這個pipeline把提取出的item存起來夯辖,主要是為了方便我們延后處理數(shù)據(jù)。
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
def __init__(self, server,key=defaults.PIPELINE_KEY,serialize_func=default_serialize):
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
return self.key % {'spider': spider.name}
這個項目通過重寫scheduler和spider類董饰,實現(xiàn)了scheduler調(diào)度楼雹、spider啟動和固定redis的交互模孩。實現(xiàn)新的dupefilter和queue類,達到了去重和調(diào)度容器和redis的交互贮缅,因為每個主機上的爬蟲進程都訪問同一個redis數(shù)據(jù)庫榨咐,所以調(diào)度和去重都統(tǒng)一進行統(tǒng)一管理,達到了分布式爬蟲的目的谴供。
當spider被初始化時块茁,同時會初始化一個對應的scheduler對象,這個調(diào)度器對象通過讀取settings桂肌,配置好自己的調(diào)度容器queue和判重工具dupefilter数焊。每當一個spider產(chǎn)出一個request的時候,scrapy內(nèi)核會把這個reuqest遞交給這個spider對應的scheduler對象進行調(diào)度崎场,scheduler對象通過訪問redis對request進行判重佩耳,如果不重復就把他添加進redis中的調(diào)度池。當調(diào)度條件滿足時谭跨,scheduler對象就從redis的調(diào)度池中取出一個request發(fā)送給spider干厚,讓他爬取。當spider爬取的所有暫時可用url之后螃宙,scheduler發(fā)現(xiàn)這個spider對應的redis的調(diào)度池空了蛮瞄,于是觸發(fā)信號spider_idle,spider收到這個信號之后谆扎,直接連接redis讀取strart url池挂捅,拿去新的一批url入口,然后再次重復上邊的工作堂湖。