從redis隊列改為內(nèi)存隊列的原因
公司項目,同一時間有大量的任務(wù)進(jìn)來芜飘,導(dǎo)致redis經(jīng)常連接超時和連接失敗务豺。導(dǎo)致任務(wù)緩慢,來不及處理嗦明。
為了減輕redis的壓力笼沥,所以將請求隊列放到內(nèi)存中。因為放到內(nèi)存中娶牌,并且減少很多redis的請求奔浅,所以可以加快程序的執(zhí)行速度。
改成內(nèi)存隊列的缺點
1诗良、當(dāng)程序重啟時汹桦,當(dāng)爬蟲隊列中還有任務(wù)未執(zhí)行時,在內(nèi)存中的數(shù)據(jù)會丟失鉴裹。
2舞骆、不能充分使用scrapy-redis斷點續(xù)爬的特性灵嫌。
分析scrapy queue
查看scrapy-redis隊列,我們只需要對此隊列進(jìn)行重寫即可葛作。
image.png
重寫B(tài)ase類
from scrapy.squeues import LifoMemoryQueue
from scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy_redis import picklecompat
class Base(object):
"""Per-spider base queue class"""
def __init__(self, server, spider, key, serializer=None):
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
raise NotImplementedError
內(nèi)存隊列
class SpiderQueue(Base):
def __init__(self, *args, **kwargs):
self.queues = {}
self.qfactory = LifoMemoryQueue # 原生scrapy的內(nèi)存隊列
self.curprio = None
super(SpiderQueue, self).__init__(*args, **kwargs)
def __len__(self):
"""Return the length of the queue"""
return sum(len(x) for x in self.queues.values()) if self.queues else 0
def push(self, request):
# a = time.time()
data = self._encode_request(request)
priority = -request.priority
if priority not in self.queues:
self.queues[priority] = self.qfactory()
q = self.queues[priority]
q.push(data)
if self.curprio is None or priority < self.curprio:
self.curprio = priority
# logging.info(f'入隊列耗時:{time.time()-a}')
def pop(self, timeout=0):
# a = time.time()
if self.curprio is None:
return
q = self.queues[self.curprio]
m = q.pop()
if len(q) == 0:
# b = time.time()
del self.queues[self.curprio]
prios = [p for p, q in self.queues.items() if len(q) > 0]
self.curprio = min(prios) if prios else None
# logging.info(f'調(diào)整指針耗時:{time.time() - b} 出隊列耗時:{time.time() - a}')
return self._decode_request(m)
settings中配置
SCHEDULER_QUEUE_CLASS='路徑.SpiderQueue',