Item Pipeline(項(xiàng)目管道)
在一個(gè)項(xiàng)目被spider
抓取后茄螃,它被發(fā)送到Item Pipeline
气筋,Item Pipeline
通過順序執(zhí)行的幾個(gè)組件處理它拆内,決定該項(xiàng)目是否應(yīng)該繼續(xù)通過Pipeline
或被丟棄并且不再處理。
Item Pipeline
的典型用途是:
- 清理HTML數(shù)據(jù)
- 驗(yàn)證已刪除的數(shù)據(jù)(檢查項(xiàng)目是否包含某些字段)
- 檢查重復(fù)項(xiàng)(并刪除它們)
- 將已刪除的項(xiàng)目存儲(chǔ)在數(shù)據(jù)庫中
編寫自己的項(xiàng)目管道
每個(gè)項(xiàng)管道組件都是一個(gè)必須實(shí)現(xiàn)以下方法的Python類:
process_item(self, item, spider)
為每個(gè)Item Pipeline
組件調(diào)用此方法宠默。process_item()
必須:返回帶數(shù)據(jù)的dict麸恍,返回Item
(或任何后代類)對(duì)象,返回Twisted Deferred或引發(fā)DropItem
異常搀矫。 丟棄的項(xiàng)目不再由其他管道組件處理抹沪。
open_spider(self, spider)
打開spider時(shí)會(huì)調(diào)用此方法。
close_spider(self, spider)
當(dāng)spider關(guān)閉時(shí)調(diào)用此方法瓤球。
from_crawler(cls, crawler)
如果存在融欧,則調(diào)用此類方法以從a創(chuàng)建管道實(shí)例Crawler
。它必須返回管道的新實(shí)例卦羡。Crawler對(duì)象提供對(duì)所有Scrapy核心組件的訪問蹬癌,如設(shè)置和信號(hào); 它是管道訪問它們并將其功能掛鉤到Scrapy的一種方式权她。
激活I(lǐng)tem Pipeline組件
要激活I(lǐng)tem Pipeline組件,必須將settings.py
中的ITEM_PIPELINES
設(shè)置打開逝薪,如下例所示:
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
此設(shè)置中為類分配的整數(shù)值決定了它們運(yùn)行的??順序:較低值優(yōu)先級(jí)高隅要。習(xí)慣上在0-1000范圍內(nèi)定義這些數(shù)字。
項(xiàng)目管道示例
1董济、 價(jià)格驗(yàn)證和丟棄物品沒有價(jià)格
調(diào)整 price
那些不包含增值稅(price_excludes_vat
屬性)的項(xiàng)目的屬性步清,并刪除那些不包含價(jià)格的項(xiàng)目:
from scrapy.exceptions import DropItem
class PricePipeline(object):
vat_factor = 1.15
def process_item(self, item, spider):
if item.get('price'):
if item.get('price_excludes_vat'):
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)
2、 將項(xiàng)目寫入JSON文件
以下管道將所有已刪除的項(xiàng)目(來自所有蜘蛛)存儲(chǔ)到一個(gè)items.jl
文件中虏肾,每行包含一個(gè)以JSON格式序列化的項(xiàng)目:
import json
class JsonWriterPipeline(object):
def open_spider(self, spider):
self.file = open('items.jl', 'w')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
3廓啊、將項(xiàng)目寫入MongoDB
在這個(gè)例子中,我們將使用pymongo將項(xiàng)目寫入MongoDB封豪。MongoDB地址和數(shù)據(jù)庫名稱在Scrapy設(shè)置中指定谴轮;MongoDB集合以item類命名。
這個(gè)例子的要點(diǎn)是展示如何使用from_crawler()
方法以及如何正確地清理資源:
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item
4吹埠、截取項(xiàng)目的截圖
此示例演示如何從process_item()
方法返回Deferred
第步。它使用Splash
渲染項(xiàng)目URL的屏幕截圖。Pipeline
向本地運(yùn)行的Splash
實(shí)例發(fā)出請求缘琅。下載請求并延遲回調(diào)激活后粘都,它會(huì)將項(xiàng)目保存到文件并將文件名添加到項(xiàng)目中。
import scrapy
import hashlib
from urllib.parse import quote
class ScreenshotPipeline(object):
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
def process_item(self, item, spider):
encoded_item_url = quote(item["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
dfd = spider.crawler.engine.download(request, spider)
dfd.addBoth(self.return_item, item)
return dfd
def return_item(self, response, item):
if response.status != 200:
# Error happened, return item.
return item
# Save screenshot to file, filename will be hash of url.
url = item["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = "{}.png".format(url_hash)
with open(filename, "wb") as f:
f.write(response.body)
# Store filename in item.
item["screenshot_filename"] = filename
return item
5刷袍、重復(fù)過濾
一個(gè)過濾器翩隧,用于查找重復(fù)項(xiàng)目,并刪除已處理的項(xiàng)目呻纹。假設(shè)我們的項(xiàng)目具有唯一ID堆生,但我們的spider會(huì)返回具有相同ID的多個(gè)項(xiàng)目:
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item