參考https://github.com/tenlee2012/scrapy-kafka-redis
Scrpay-Kafka-Redis
在有大量請(qǐng)求堆積的情況下,即使用了Bloomfilter
算法狂窑,使用scrapy-redis仍然會(huì)占用大量?jī)?nèi)存媳板,本項(xiàng)目參考scrapy-redis
青团,
特點(diǎn)
- 支持分布式
- 使用Redis作為去重隊(duì)列
同時(shí)使用Bloomfilter去重算法浆熔,降低了內(nèi)存占用,但是增加了可去重?cái)?shù)量 - 使用Kafka作為請(qǐng)求隊(duì)列
可支持大量請(qǐng)求堆積孤页,容量和磁盤大小相關(guān)丛晦,而不是和運(yùn)行內(nèi)存相關(guān) - 由于Kafka的特性巨缘,不支持優(yōu)先隊(duì)列,只支持先進(jìn)先出隊(duì)列
依賴
- Python 3.0+
- Redis >= 2.8
- Scrapy >= 1.5
- kafka-python >= 1.4.0
使用
pip install scrapy-kafka-redis
- 配置
settings.py
必須要添加在settings.py
的內(nèi)容
# 啟用Kafka調(diào)度存儲(chǔ)請(qǐng)求隊(duì)列
SCHEDULER = "scrapy_kafka_redis.scheduler.Scheduler"
# 使用BloomFilter作為去重隊(duì)列
DUPEFILTER_CLASS = "scrapy_kafka_redis.dupefilter.BloomFilter"
其他可選參數(shù)的默認(rèn)值
# 單獨(dú)使用情況下采呐,去重隊(duì)列在redis中存儲(chǔ)的key
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
REDIS_URL = 'redis://localhost:6378/1'
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
KAFKA_BOOTSTRAP_SERVERS=['localhost:9092']
# 調(diào)度隊(duì)列的默認(rèn)TOPIC
SCHEDULER_QUEUE_TOPIC = '%(spider)s-requests'
# 默認(rèn)使用的調(diào)度隊(duì)列
SCHEDULER_QUEUE_CLASS = 'scrapy_kafka_redis.queue.KafkaQueue'
# 去重隊(duì)列在redis中存儲(chǔ)的key名
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
# 調(diào)度器使用的去重算法
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_kafka_redis.dupefilter.BloomFilter'
# BloomFilter的塊個(gè)數(shù)
BLOOM_BLOCK_NUM = 1
# start urls使用的TOPIC
START_URLS_TOPIC = '%(name)s-start_urls'
KAFKA_BOOTSTRAP_SERVERS = None
# 構(gòu)造請(qǐng)求隊(duì)列的Kafka生產(chǎn)者
KAFKA_REQUEST_PRODUCER_PARAMS = {
'api_version': (0, 10, 1),
'value_serializer': dumps
}
# 構(gòu)造請(qǐng)求隊(duì)列的Kafka消費(fèi)者
KAFKA_REQUEST_CONSUMER_PARAMS = {
'group_id': 'requests',
'api_version': (0, 10, 1),
'value_deserializer': loads
}
# 構(gòu)造開始隊(duì)列的Kafka消費(fèi)者
KAFKA_START_URLS_CONSUMER_PARAMS = {
'group_id': 'start_url',
'api_version': (0, 10, 1),
'value_deserializer': lambda m: m.decode('utf-8'),
}
-
spiders
使用
import scrapy
from scrapy_kafka_redis.spiders import KafkaSpider
class DemoSpider(KafkaSpider):
name = "demo"
def parse(self, response):
pass
- 創(chuàng)建
Topic
根據(jù)需要?jiǎng)?chuàng)建的分布式scrapy實(shí)例若锁,設(shè)置topic的分區(qū)數(shù),比如
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic demo-start_urls
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic demo-requests
- 發(fā)送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo-start_urls
建議手動(dòng)創(chuàng)建Topic并指定分區(qū)數(shù)
- 運(yùn)行分布式scrapy