《Learning Scrapy》(中文版)第9章 使用Pipelines


序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲(chóng)基礎(chǔ)
第4章 從Scrapy到移動(dòng)應(yīng)用
第5章 快速構(gòu)建爬蟲(chóng)
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實(shí)時(shí)分析


在上一章器一,我們學(xué)習(xí)了如何辨析Scrapy中間件课锌。在本章中,我們通過(guò)實(shí)例學(xué)習(xí)編寫pipelines祈秕,包括使用REST APIs渺贤、連接數(shù)據(jù)庫(kù)、處理CPU密集型任務(wù)请毛、與老技術(shù)結(jié)合志鞍。

我們?cè)诒菊轮袝?huì)使用集中新的數(shù)據(jù)庫(kù),列在下圖的右邊:

Vagrant已經(jīng)配置好了數(shù)據(jù)庫(kù)获印,我們可以從開(kāi)發(fā)機(jī)向其發(fā)送ping述雾,例如ping es或ping mysql。讓我們先來(lái)學(xué)習(xí)REST APIs兼丰。

使用REST APIs

REST是用來(lái)一套創(chuàng)建網(wǎng)絡(luò)服務(wù)的技術(shù)集合玻孟。它的主要優(yōu)點(diǎn)是,比起SOAP和專有web服務(wù)鳍征,REST更簡(jiǎn)單和輕量黍翎。軟件開(kāi)發(fā)者注意到了web服務(wù)的CRUD(Create、Read艳丛、Update匣掸、Delete)和HTTP操作(GET、POST氮双、PUT碰酝、DELETE)的相似性。它們還注意到傳統(tǒng)web服務(wù)調(diào)用需要的信息可以再URL源進(jìn)行壓縮戴差。例如送爸,http://api.mysite.com/customer/john是一個(gè)URL源,它可以讓我們分辨目標(biāo)服務(wù)器暖释,袭厂,更具體的,名字是john的服務(wù)器(行的主鍵)球匕。它與其它技術(shù)結(jié)合時(shí)纹磺,比如安全認(rèn)證、無(wú)狀態(tài)服務(wù)亮曹、緩存橄杨、輸出XML或JSON時(shí)秘症,可以提供一個(gè)強(qiáng)大但簡(jiǎn)單的跨平臺(tái)服務(wù)。REST席卷軟件行業(yè)并不奇怪讥珍。

Scrapy pipeline的功能可以用REST API來(lái)做历极。接下來(lái),我們來(lái)學(xué)習(xí)它衷佃。

使用treq

treq是一個(gè)Python包趟卸,它在Twisted應(yīng)用中和Python的requests包相似。它可以讓我們做出GET氏义、POST锄列、和其它HTTP請(qǐng)求」哂疲可以使用pip install treq安裝邻邮,開(kāi)發(fā)機(jī)中已經(jīng)安裝好了。

比起Scrapy的Request/crawler.engine.download() API克婶,我們使用treq筒严,因?yàn)楹笳呔哂行阅軆?yōu)勢(shì),詳見(jiàn)第10章情萤。

一個(gè)寫入Elasticsearch的pipeline

我們從一個(gè)向ES服務(wù)器(Elasticsearch)寫入Items的爬蟲(chóng)開(kāi)始鸭蛙。你可能覺(jué)得從ES開(kāi)始,而不是MySQL筋岛,有點(diǎn)奇怪娶视,但實(shí)際上ES是最容易的。ES可以是無(wú)模式的睁宰,意味著我們可以不用配置就使用它肪获。treq也足以應(yīng)付需要。如果想使用更高級(jí)的ES功能柒傻,我們應(yīng)該使用txes2和其它Python/Twisted ES包孝赫。

有了Vagrant,我們已經(jīng)有個(gè)一個(gè)運(yùn)行的ES服務(wù)器红符。登錄開(kāi)發(fā)機(jī)寒锚,驗(yàn)證ES是否運(yùn)行:

$ curl http://es:9200
{
  "name" : "Living Brain",
  "cluster_name" : "elasticsearch",
  "version" : { ... },
  "tagline" : "You Know, for Search"
}

在瀏覽器中登錄http://localhost:9200也可以看到相同的結(jié)果。如果訪問(wèn)http://localhost:9200/properties/property/_search违孝,我們可以看到一個(gè)響應(yīng),說(shuō)ES已經(jīng)進(jìn)行了全局嘗試泳赋,但是沒(méi)有找到索引頁(yè)雌桑。

筆記:在本章中,我們會(huì)在項(xiàng)集合中插入新的項(xiàng)祖今,如果你想恢復(fù)原始狀態(tài)的話校坑,可以用下面的命令:

$ curl -XDELETE http://es:9200/properties

本章中的pipeline完整代碼還有錯(cuò)誤處理的功能拣技,但我盡量讓這里的代碼簡(jiǎn)短,以突出重點(diǎn)耍目。

提示:本章位于目錄ch09膏斤,這個(gè)例子位于ch09/properties/properties/pipelines/es.py。

本質(zhì)上邪驮,這個(gè)爬蟲(chóng)只有四行:

@defer.inlineCallbacks
def process_item(self, item, spider):
    data = json.dumps(dict(item), ensure_ascii=False).encode("utf- 8")
    yield treq.post(self.es_url, data)

前兩行定義了一個(gè)標(biāo)準(zhǔn)process_item()方法莫辨,它可以產(chǎn)生延遲項(xiàng)。(參考第8章)

第三行準(zhǔn)備了插入的data毅访。ensure_ascii=False可使結(jié)果壓縮沮榜,并且沒(méi)有跳過(guò)非ASCII字符。我們?nèi)缓髮SON字符串轉(zhuǎn)化為JSON標(biāo)準(zhǔn)的默認(rèn)編碼UTF-8喻粹。

最后一行使用了treq的post()方法蟆融,模擬一個(gè)POST請(qǐng)求,將我們的文檔插入ElasticSearch守呜。es_url型酥,例如http://es:9200/properties/property存在settings.py文件中(ES_PIPELINE_URL設(shè)置),它提供重要的信息查乒,例如我們想要寫入的ES的IP和端口(es:9200)弥喉、集合名(properties)和對(duì)象類型(property)。

為了是pipeline生效侣颂,我們要在settings.py中設(shè)置ITEM_PIPELINES档桃,并啟動(dòng)ES_PIPELINE_URL設(shè)置:

ITEM_PIPELINES = {
    'properties.pipelines.tidyup.TidyUp': 100,
    'properties.pipelines.es.EsWriter': 800,
}
ES_PIPELINE_URL = 'http://es:9200/properties/property'

這么做完之后,我們前往相應(yīng)的目錄:

$ pwd
/root/book/ch09/properties
$ ls
properties  scrapy.cfg

然后運(yùn)行爬蟲(chóng):

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90
...
INFO: Enabled item pipelines: EsWriter...
INFO: Closing spider (closespider_itemcount)...
   'item_scraped_count': 106,

如果現(xiàn)在訪問(wèn)http://localhost:9200/properties/property/_search憔晒,除了前10條結(jié)果藻肄,我們可以在響應(yīng)的hits/total字段看到插入的文件數(shù)。我們還可以添加參數(shù)?size=100以看到更多的結(jié)果拒担。通過(guò)添加q= URL搜索中的參數(shù)嘹屯,我們可以在全域或特定字段搜索關(guān)鍵詞。相關(guān)性最強(qiáng)的結(jié)果會(huì)首先顯示出來(lái)从撼。例如州弟,http://localhost:9200/properties/property/_search?q=title:london,可以讓標(biāo)題變?yōu)長(zhǎng)ondon低零。對(duì)于更復(fù)雜的查詢婆翔,可以在https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html查詢ES文檔。

ES不需要配置掏婶,因?yàn)樗鶕?jù)提供的第一個(gè)文件啃奴,進(jìn)行模式(字段類型)自動(dòng)檢測(cè)的。通過(guò)訪問(wèn)http://localhost:9200/properties/雄妥,我們可以看到它自動(dòng)檢測(cè)的映射最蕾。

再次運(yùn)行crawl easy -s CLOSESPIDER_ITEMCOUNT=1000依溯。因?yàn)閜ipelines的平均時(shí)間從0.12變?yōu)?.15秒,平均延遲從0.78變?yōu)?.81秒瘟则。吞吐量仍保持每秒約25項(xiàng)黎炉。

筆記:用pipelines向數(shù)據(jù)庫(kù)插入Items是個(gè)好方法嗎?答案是否定的醋拧。通常來(lái)講慷嗜,數(shù)據(jù)庫(kù)更簡(jiǎn)單的方法以大量插入數(shù)據(jù),我們應(yīng)該使用這些方法大量批次插入數(shù)據(jù)趁仙,或抓取完畢之后進(jìn)行后處理洪添。我們會(huì)在最后一章看到這些方法。然后雀费,還是有很多人使用pipelines向數(shù)據(jù)庫(kù)插入文件干奢,相應(yīng)的就要使用Twisted APIs。

pipeline使用Google Geocoding API進(jìn)行地理編碼

我們的房子有各自所在的區(qū)域盏袄,我們還想對(duì)它們進(jìn)行地理編碼忿峻,即找到相應(yīng)的坐標(biāo)(經(jīng)度、緯度)辕羽。我們可以將坐標(biāo)顯示在地圖上逛尚,或計(jì)算距離。建這樣的數(shù)據(jù)庫(kù)需要復(fù)雜的數(shù)據(jù)庫(kù)刁愿、復(fù)雜的文本匹配绰寞,還有復(fù)雜的空間計(jì)算。使用Google Geocoding API铣口,我們可以避免這些滤钱。在瀏覽器中打開(kāi)它,或使用curl取回以下URL的數(shù)據(jù):

$ curl "https://maps.googleapis.com/maps/api/geocode/json?sensor=false&ad
dress=london"
{
   "results" : [
         ...
         "formatted_address" : "London, UK",
         "geometry" : {
            ...
            "location" : {
               "lat" : 51.5073509,
               "lng" : -0.1277583
          },
            "location_type" : "APPROXIMATE",
            ...
   ],
   "status" : "OK"
}

我們看到一個(gè)JSON對(duì)象脑题,如果搜索一個(gè)location件缸,我們可以快速獲取倫敦中心的坐標(biāo)。如果繼續(xù)搜索叔遂,我們可以看到相同文件中海油其它地點(diǎn)他炊。第一個(gè)是相關(guān)度最高的。因此如果存在results[0].geometry.location的話已艰,它就是我們要的結(jié)果痊末。

可以用前面的方法(treq)使用Google Geocoding API。只需要幾行哩掺,我們就可以找到一個(gè)地址的坐標(biāo)(目錄pipelines中的geo.py)舌胶,如下所示:

@defer.inlineCallbacks
def geocode(self, address):
   endpoint = 'http://web:9312/maps/api/geocode/json'
   parms = [('address', address), ('sensor', 'false')]
   response = yield treq.get(endpoint, params=parms)
   content = yield response.json()
   geo = content['results'][0]["geometry"]["location"]
   defer.returnValue({"lat": geo["lat"], "lon": geo["lng"]})

這個(gè)函數(shù)做出了一條URL,但我們讓它指向一個(gè)可以離線快速運(yùn)行的假程序疮丛。你可以使用endpoint = 'https://maps.googleapis.com/maps/api/geocode/json'連接Google服務(wù)器幔嫂,但要記住它對(duì)請(qǐng)求的限制很嚴(yán)格。address和sensor的值是URL自動(dòng)編碼的誊薄,使用treq的方法get()的參數(shù)params履恩。對(duì)于第二個(gè)yield,即response.json()呢蔫,我們必須等待響應(yīng)主題完全加載完畢對(duì)解析為Python對(duì)象切心。此時(shí),我們就可以找到第一個(gè)結(jié)果的地理信息片吊,格式設(shè)為dict绽昏,使用defer.returnValue()返回,它使用了inlineCallbacks俏脊。如果發(fā)生錯(cuò)誤全谤,這個(gè)方法會(huì)扔出例外,Scrapy會(huì)向我們報(bào)告爷贫。

通過(guò)使用geocode()认然,process_item()變成了一行語(yǔ)句:

item["location"] = yield self.geocode(item["address"][0])

設(shè)置讓pipeline生效,將它添加到ITEM_PIPELINES漫萄,并設(shè)定優(yōu)先數(shù)值卷员,該數(shù)值要小于ES的,以讓ES獲取坐標(biāo)值:

ITEM_PIPELINES = {
    ...
    'properties.pipelines.geo.GeoPipeline': 400,

開(kāi)啟數(shù)據(jù)調(diào)試腾务,然后運(yùn)行:

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=90 -L DEBUG
...
{'address': [u'Greenwich, London'],
...
 'image_URL': [u'http://web:9312/images/i06.jpg'],
 'location': {'lat': 51.482577, 'lon': -0.007659},
 'price': [1030.0],
...

我們現(xiàn)在可以看到Items里的location字段毕骡。如果使用真正的Google API的URL運(yùn)行,會(huì)得到例外:

File "pipelines/geo.py" in geocode (content['status'], address))
Exception: Unexpected status="OVER_QUERY_LIMIT" for  
address="*London"

這是為了檢查我們?cè)谕暾a中插入了地點(diǎn)岩瘦,以確保Geocoding API響應(yīng)的status字段有OK值未巫。除非是OK,否則我們?nèi)』氐臄?shù)據(jù)不會(huì)有設(shè)定好的格式担钮,進(jìn)而不能使用橱赠。對(duì)于這種情況,我們會(huì)得到OVER_QUERY_LIMIT狀態(tài)箫津,它指明我們?cè)谀程幾鲥e(cuò)了狭姨。這個(gè)問(wèn)題很重要,也很常見(jiàn)苏遥。應(yīng)用Scrapy的高性能引擎饼拍,進(jìn)行緩存、限制請(qǐng)求就很必要了田炭。

我們可以在Geocoder API的文檔师抄,查看它的限制,“每24小時(shí)教硫,免費(fèi)用戶可以進(jìn)行2500次請(qǐng)求叨吮,每秒5次請(qǐng)求”辆布。即使我們使用付費(fèi)版本,仍有每秒10次請(qǐng)求的限制茶鉴,所以這里的分析是有意義的锋玲。

筆記:后面的代碼看起來(lái)可能有些復(fù)雜,復(fù)雜度還要取決于實(shí)際情況涵叮。在多線程環(huán)境中創(chuàng)建這樣的組件惭蹂,需要線程池和同步,這樣代碼就會(huì)變復(fù)雜割粮。

這是一個(gè)簡(jiǎn)易的運(yùn)用Twisted技術(shù)的限制引擎:

class Throttler(object):
    def __init__(self, rate):
        self.queue = []
        self.looping_call = task.LoopingCall(self._allow_one)
        self.looping_call.start(1. / float(rate))
    def stop(self):
        self.looping_call.stop()
    def throttle(self):
        d = defer.Deferred()
        self.queue.append(d)
        return d
    def _allow_one(self):
        if self.queue:
            self.queue.pop(0).callback(None)

這可以讓延遲項(xiàng)在一個(gè)列表中排隊(duì)盾碗,逐個(gè)觸發(fā),調(diào)用_allow_one()舀瓢;_allow_one()檢查隊(duì)列是否為空廷雅,如果不是,它會(huì)調(diào)用第一個(gè)延遲項(xiàng)的callback()氢伟。我們使用Twisted的task.LoopingCall() API榜轿,周期性調(diào)用_allow_one()。使用Throttler很容易朵锣。我們?cè)趐ipeline的init初始化它谬盐,當(dāng)爬蟲(chóng)停止時(shí)清空它:

class GeoPipeline(object):
    def __init__(self, stats):
        self.throttler = Throttler(5)  # 5 Requests per second
    def close_spider(self, spider):
        self.throttler.stop()

在使用限定源之前,我們的例子是在process_item()中調(diào)用geocode()诚些,必須yield限制器的throttle()方法:

yield self.throttler.throttle()
item["location"] = yield self.geocode(item["address"][0])

對(duì)于第一個(gè)yield飞傀,代碼會(huì)暫停一下,一段時(shí)間之后诬烹,會(huì)繼續(xù)運(yùn)行砸烦。例如,當(dāng)某時(shí)有11個(gè)延遲項(xiàng)時(shí)绞吁,限制是每秒5次請(qǐng)求幢痘,即時(shí)間為11/5=2.2秒之后,隊(duì)列變空家破,代碼會(huì)繼續(xù)颜说。

使用Throttler,不再有錯(cuò)誤汰聋,但是爬蟲(chóng)會(huì)變慢门粪。我們看到示例中的房子只有幾個(gè)不同的地址。這時(shí)使用緩存非常好烹困。我們使用一個(gè)簡(jiǎn)單的Python dict來(lái)做玄妈,但這么可能會(huì)有競(jìng)爭(zhēng)條件,這樣會(huì)造成偽造的API請(qǐng)求。下面是一個(gè)沒(méi)有此類問(wèn)題的緩存方法拟蜻,展示了Python和Twisted的特點(diǎn):

class DeferredCache(object):
    def __init__(self, key_not_found_callback):
        self.records = {}
        self.deferreds_waiting = {}
        self.key_not_found_callback = key_not_found_callback
    @defer.inlineCallbacks
    def find(self, key):
        rv = defer.Deferred()
        if key in self.deferreds_waiting:
            self.deferreds_waiting[key].append(rv)
        else:
            self.deferreds_waiting[key] = [rv]
            if not key in self.records:
                try:
                    value = yield self.key_not_found_callback(key)
                    self.records[key] = lambda d: d.callback(value)
                except Exception as e:
                    self.records[key] = lambda d: d.errback(e)
            action = self.records[key]
            for d in self.deferreds_waiting.pop(key):
                reactor.callFromThread(action, d)
        value = yield rv
        defer.returnValue(value)

這個(gè)緩存看起來(lái)有些不同绎签,它包含兩個(gè)組件:

  • self.deferreds_waiting:這是一個(gè)延遲項(xiàng)的隊(duì)列,等待給鍵賦值
  • self.records:這是鍵值對(duì)中出現(xiàn)過(guò)的dict

在find()方法的中間瞭郑,如果沒(méi)有在self.records找到一個(gè)鍵辜御,我們會(huì)調(diào)用預(yù)先定義的callback函數(shù),以取回丟失的值(yield self.key_not_found_callback(key))屈张。這個(gè)調(diào)回函數(shù)可能會(huì)扔出一個(gè)例外。如何在Python中壓縮存儲(chǔ)值或例外呢袱巨?因?yàn)镻ython是一種函數(shù)語(yǔ)言阁谆,根據(jù)是否有例外,我們?cè)趕elf.records中保存小函數(shù)(lambdas)愉老,調(diào)用callback或errback场绿。lambda函數(shù)定義時(shí),就將值或例外附著在上面嫉入。將變量附著在函數(shù)上稱為閉包焰盗,閉包是函數(shù)語(yǔ)言最重要的特性之一。

筆記:緩存例外有點(diǎn)不常見(jiàn)咒林,但它意味著首次查找key時(shí)熬拒,key_not_found_callback(key)返回了一個(gè)例外。當(dāng)后續(xù)查找還找這個(gè)key時(shí)垫竞,就免去了調(diào)用澎粟,再次返回這個(gè)例外。

find()方法其余的部分提供了一個(gè)避免競(jìng)爭(zhēng)條件的機(jī)制欢瞪。如果查找某個(gè)鍵已經(jīng)在進(jìn)程中活烙,會(huì)在self.deferreds_waiting dict中有記錄。這時(shí)遣鼓,我們不在向key_not_found_callback()發(fā)起另一個(gè)調(diào)用啸盏,只是在延遲項(xiàng)的等待列表添加這個(gè)項(xiàng)。當(dāng)key_not_found_callback()返回時(shí)骑祟,鍵有了值回懦,我們觸發(fā)所有的等待這個(gè)鍵的延遲項(xiàng)。我們可以直接發(fā)起action(d)曾我,而不用reactor.callFromThread()粉怕,但需要處理每個(gè)扔給下游的例外,我們必須創(chuàng)建不必要的很長(zhǎng)的延遲項(xiàng)鏈抒巢。

使用這個(gè)緩存很容易贫贝。我們?cè)?strong>init()對(duì)其初始化,設(shè)定調(diào)回函數(shù)為API調(diào)用。在process_item()中稚晚,使用緩存查找的方法如下:

def __init__(self, stats):
    self.cache = DeferredCache(self.cache_key_not_found_callback)
@defer.inlineCallbacks
def cache_key_not_found_callback(self, address):
    yield self.throttler.enqueue()
    value = yield self.geocode(address)
    defer.returnValue(value)
@defer.inlineCallbacks
def process_item(self, item, spider):
    item["location"] = yield self.cache.find(item["address"][0])
    defer.returnValue(item)

提示:完整代碼位于ch09/properties/properties/pipelines/geo2.py崇堵。

為了使pipeline生效,我們使前一個(gè)方法無(wú)效客燕,并添加當(dāng)前的到settings.py的ITEM_PIPELINES:

ITEM_PIPELINES = {
    'properties.pipelines.tidyup.TidyUp': 100,
    'properties.pipelines.es.EsWriter': 800,
    # DISABLE 'properties.pipelines.geo.GeoPipeline': 400,
    'properties.pipelines.geo2.GeoPipeline': 400,
}

運(yùn)行爬蟲(chóng)鸳劳,用如下代碼:

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 15.8 items/s, avg latency: 1.74 s and avg time in pipelines: 
0.94 s
Scraped... 32.2 items/s, avg latency: 1.76 s and avg time in pipelines: 
0.97 s
Scraped... 25.6 items/s, avg latency: 0.76 s and avg time in pipelines: 
0.14 s
...
: Dumping Scrapy stats:...
   'geo_pipeline/misses': 35,
   'item_scraped_count': 1019,

當(dāng)填充緩存時(shí),我們看到抓取的延遲變高也搓。緩存結(jié)束時(shí)赏廓,延遲降低。數(shù)據(jù)還顯示有35個(gè)遺漏傍妒,正好是數(shù)據(jù)集中不同地點(diǎn)的數(shù)目幔摸。很明顯,上例中一共有1019 - 35= 984次API請(qǐng)求颤练。如果我們使用真正的Google API惑灵,并提高每秒的API請(qǐng)求數(shù)若贮,例如通過(guò)改變Throttler(5)到Throttler(10)坑质,使從5提高到10妈经,我們可以將重試添加到geo_pipeline/retries stat記錄中。如果有錯(cuò)誤的話宇挫,例如苛吱,使用API找不到某個(gè)地點(diǎn),會(huì)扔出一個(gè)例外捞稿,這會(huì)被geo_pipeline/errors stat記錄又谋。如果地點(diǎn)通過(guò)什么方式已經(jīng)存在了,會(huì)在geo_pipeline/already_set stat中指明娱局。最后彰亥,如果我們?cè)L問(wèn)http://localhost:9200/properties/property/_search,以檢查ES中的房子衰齐,我們可以看到包括地點(diǎn)的記錄任斋,例如{..."location": {"lat": 51.5269736, "lon": -0.0667204}...}。(運(yùn)行前確保清空集合耻涛,去除舊的值)

在Elasticsearch進(jìn)行地理索引

我們已經(jīng)有了地點(diǎn)废酷,我們可以將它們按距離排序。下面是一個(gè)HTTP POST請(qǐng)求抹缕,返回標(biāo)題中包含Angel的房子澈蟆,按照離點(diǎn){51.54, -0.19}的距離進(jìn)行排序:

$ curl http://es:9200/properties/property/_search -d '{
    "query" : {"term" : { "title" : "angel" } },
    "sort": [{"_geo_distance": {
        "location":      {"lat":  51.54, "lon": -0.19},
        "order":         "asc",
        "unit":          "km", 
        "distance_type": "plane" 
}}]}'

唯一的問(wèn)題是如果我們運(yùn)行它,我們會(huì)看到一個(gè)錯(cuò)誤信息"failed to find mapper for [location] for geo distance based sort"卓研。它指出趴俘,我們的location字段沒(méi)有正確的空間計(jì)算的格式睹簇。為了設(shè)定正確的格式,我們要手動(dòng)覆蓋默認(rèn)格式寥闪。首先太惠,我們將自動(dòng)檢測(cè)的映射保存起來(lái),將它作為起點(diǎn):

$ curl 'http://es:9200/properties/_mapping/property' > property.txt

然后疲憋,我們?nèi)缦滤揪庉媝roperty.txt:

"location":{"properties":{"lat":{"type":"double"},"lon":{"type":"double"}}}

我們將這行代碼替換為:

"location": {"type": "geo_point"}

我們還在文件最后刪除了{(lán)"properties":{"mappings": and two }}凿渊。文件現(xiàn)在就處理完了。我們現(xiàn)在可以刪除舊的類型缚柳,并用下面的schema建立新的類型:

$ curl -XDELETE 'http://es:9200/properties'
$ curl -XPUT 'http://es:9200/properties'
$ curl -XPUT 'http://es:9200/properties/_mapping/property' --data  
@property.txt

我們現(xiàn)在可以用之前的命令埃脏,進(jìn)行一個(gè)快速抓取,將結(jié)果按距離排序秋忙。我們的搜索返回的是房子的JSONs對(duì)象剂癌,其中包括一個(gè)額外的sort字段,顯示房子離某個(gè)點(diǎn)的距離翰绊。

連接數(shù)據(jù)庫(kù)與Python客戶端

可以連接Python Database API 2.0的數(shù)據(jù)庫(kù)有許多種,包括MySQL旁壮、PostgreSQL监嗜、Oracle、Microsoft抡谐、SQL Server和SQLite裁奇。它們的驅(qū)動(dòng)通常很復(fù)雜且進(jìn)行過(guò)測(cè)試,為Twisted再進(jìn)行適配會(huì)浪費(fèi)很多時(shí)間麦撵」舫Γ可以在Twisted應(yīng)用中使用數(shù)據(jù)庫(kù)客戶端,例如免胃,Scrapy可以使用twisted.enterprise.adbapi庫(kù)音五。我們使用MySQL作為例子,說(shuō)明用法羔沙,原則也適用于其他數(shù)據(jù)庫(kù)躺涝。

用pipeline寫入MySQL

MySQL是一個(gè)好用又流行的數(shù)據(jù)庫(kù)。我們來(lái)寫一個(gè)pipeline扼雏,來(lái)向其中寫入文件坚嗜。我們的虛擬環(huán)境中,已經(jīng)有了一個(gè)MySQL實(shí)例诗充。我們用MySQL命令行來(lái)做一些基本的管理操作苍蔬,命令行工具已經(jīng)在開(kāi)發(fā)機(jī)中預(yù)先安裝了:

$ mysql -h mysql -uroot -ppass

mysql>提示MySQL已經(jīng)運(yùn)行,我們可以建立一個(gè)簡(jiǎn)單的含有幾個(gè)字段的數(shù)據(jù)表蝴蜓,如下所示:

mysql> create database properties;
mysql> use properties
mysql> CREATE TABLE properties (
  url varchar(100) NOT NULL,
  title varchar(30),
  price DOUBLE,
  description varchar(30),
  PRIMARY KEY (url)
);
mysql> SELECT * FROM properties LIMIT 10;
Empty set (0.00 sec)

很好碟绑,現(xiàn)在已經(jīng)建好了一個(gè)包含幾個(gè)字段的MySQL數(shù)據(jù)表,它的名字是properties,可以開(kāi)始寫pipeline了蜈敢。保持MySQL控制臺(tái)打開(kāi)辜荠,我們過(guò)一會(huì)兒會(huì)返回查看是否有差入值。輸入exit抓狭,就可以退出伯病。

筆記:在這一部分中,我們會(huì)向MySQL數(shù)據(jù)庫(kù)插入properties否过。如果你想刪除午笛,使用以下命令:

mysql> DELETE FROM properties;

我們使用MySQL的Python客戶端。我們還要安裝一個(gè)叫做dj-database-url的小功能模塊(它可以幫我們?cè)O(shè)置不同的IP苗桂、端口药磺、密碼等等)。我們可以用pip install dj-database-url MySQL-python煤伟,安裝這兩項(xiàng)癌佩。我們的開(kāi)發(fā)機(jī)上已經(jīng)安裝好了。我們的MySQL pipeline很簡(jiǎn)單便锨,如下所示:

from twisted.enterprise import adbapi
...
class MysqlWriter(object):
    ...
    def __init__(self, mysql_url):
        conn_kwargs = MysqlWriter.parse_mysql_url(mysql_url)
        self.dbpool = adbapi.ConnectionPool('MySQLdb',
                                            charset='utf8',
                                            use_unicode=True,
                                            connect_timeout=5,
                                            **conn_kwargs)
    def close_spider(self, spider):
        self.dbpool.close()
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        try:
            yield self.dbpool.runInteraction(self.do_replace, item)
        except:
            print traceback.format_exc()
        defer.returnValue(item)
    @staticmethod
    def do_replace(tx, item):
        sql = """REPLACE INTO properties (url, title, price, description) VALUES (%s,%s,%s,%s)"""
        args = (
            item["url"][0][:100],
            item["title"][0][:30],
            item["price"][0],
            item["description"][0].replace("\r\n", " ")[:30]
        )
        tx.execute(sql, args)

提示:完整代碼位于ch09/properties/properties/pipelines/mysql.py围辙。

本質(zhì)上,這段代碼的大部分都很普通放案。為了簡(jiǎn)潔而省略的代碼將一條保存在MYSQL_PIPELINE_URL姚建、格式是mysql://user:pass@ip/database的URL,解析成了獨(dú)立的參數(shù)吱殉。在爬蟲(chóng)的init()中掸冤,將它們傳遞到adbapi.ConnectionPool(),它使用adbapi的底層結(jié)構(gòu)友雳,初始化MySQL連接池稿湿。第一個(gè)參數(shù)是我們想要引入的模塊的名字。對(duì)于我們的MySQL沥阱,它是MySQLdb缎罢。我們?yōu)镸ySQL客戶端另設(shè)了幾個(gè)參數(shù),以便正確處理Unicode和超時(shí)考杉。每當(dāng)adbapi打開(kāi)新連接時(shí)策精,所有這些參數(shù)都要進(jìn)入底層的MySQLdb.connect()函數(shù)。爬蟲(chóng)關(guān)閉時(shí)崇棠,我們調(diào)用連接池的close()方法咽袜。

我們的process_item()方法包裝了dbpool.runInteraction()。這個(gè)方法給調(diào)回方法排隊(duì)枕稀,會(huì)在當(dāng)連接池中一個(gè)連接的Transaction對(duì)象變?yōu)榭捎脮r(shí)被調(diào)用询刹。這個(gè)Transaction對(duì)象有一個(gè)和DB-API指針相似的API谜嫉。在我們的例子中,調(diào)回方法是do_replace()凹联,它定義在后面幾行沐兰。@staticmethod是說(shuō)這個(gè)方法關(guān)聯(lián)的是類而不是具體的類實(shí)例,因此蔽挠,我們可以忽略通常的self參數(shù)住闯。如果方法不使用成員的話,最好設(shè)其為靜態(tài)澳淑,如果你忘了設(shè)為靜態(tài)也不要緊比原。這個(gè)方法準(zhǔn)備了一個(gè)SQL字符串、幾個(gè)參數(shù)杠巡,并調(diào)用Transaction的execute()函數(shù)量窘,以進(jìn)行插入氢拥。我們的SQL使用REPLACE INTO嫩海,而不用更常見(jiàn)的INSERT INTO厘线,來(lái)替換鍵相同的項(xiàng)出革。這可以讓我們的案例簡(jiǎn)化渡讼。如果我們相擁SQL返回?cái)?shù)據(jù)骂束,例如SELECT聲明,我們使用dbpool.runQuery()展箱,我們可能還需要改變默認(rèn)指針混驰,方法是設(shè)置adbapi.ConnectionPool()的參數(shù)cursorclass為cursorclass=MySQLdb.cursors栖榨,這樣取回?cái)?shù)據(jù)更為簡(jiǎn)便明刷。

使用這個(gè)pipeline辈末,我們要在settings.py的ITEM_PIPELINES添加它,還要設(shè)置一下MYSQL_PIPELINE_URL:

ITEM_PIPELINES = { ...
    'properties.pipelines.mysql.MysqlWriter': 700,
...
MYSQL_PIPELINE_URL = 'mysql://root:pass@mysql/properties'

執(zhí)行以下命令:

scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000

運(yùn)行這條命令后捅彻,返回MySQL控制臺(tái)步淹,可以看到如下記錄:

mysql> SELECT COUNT(*) FROM properties;
+----------+
|     1006 |
+----------+
mysql> SELECT * FROM properties LIMIT 4;
+------------------+--------------------------+--------+-----------+
| url              | title                    | price  | description
+------------------+--------------------------+--------+-----------+
| http://...0.html | Set Unique Family Well   | 334.39 | website c
| http://...1.html | Belsize Marylebone Shopp | 388.03 | features                       
| http://...2.html | Bathroom Fully Jubilee S | 365.85 | vibrant own
| http://...3.html | Residential Brentford Ot | 238.71 | go court
+------------------+--------------------------+--------+-----------+
4 rows in set (0.00 sec)

延遲和吞吐量的性能和之前相同贤旷。結(jié)果讓人印象深刻幼驶。

使用Twisted 特定客戶端連接服務(wù)

目前為止盅藻,我們學(xué)習(xí)了如何用treq使用類REST APIs氏淑。Scrapy可以用Twisted特定客戶端連接許多其它服務(wù)假残。例如辉懒,如果我們想連接MongoDB眶俩,通過(guò)搜索“MongoDB Python”颠印,我們可以找到PyMongo抹竹,它是阻塞/同步的窃判,除非我們使用pipeline處理阻塞操作中的線程兢孝,我們不能在Twisted中使用PyMongo。如果我們搜索“MongoDB Twisted Python”橘沥,可以找到txmongo座咆,它可以完美適用于Twisted和Scrapy介陶。通常的哺呜,Twisted客戶端群體很小某残,但使用它比起自己寫一個(gè)客戶端還是要方便玻墅。下面澳厢,我們就使用這樣一個(gè)Twisted特定客戶端連接Redis鍵值對(duì)存儲(chǔ)赏酥。

用pipeline讀寫Redis

Google Geocoding API是按照每個(gè)IP進(jìn)行限制的。如果可以接入多個(gè)IPs(例如搬素,多臺(tái)服務(wù)器),當(dāng)一個(gè)地址已經(jīng)被另一臺(tái)機(jī)器做過(guò)地理編碼谓罗,就要設(shè)法避免對(duì)發(fā)出重復(fù)的請(qǐng)求揭措。如果一個(gè)地址之前已經(jīng)被查閱過(guò)绊含,也要避免再次查閱躬充。我們不想浪費(fèi)限制的額度充甚。

筆記:與API商家聯(lián)系伴找,以確保這符合規(guī)定疆瑰。你可能穆役,必須每幾分鐘/小時(shí)耿币,就要清空緩存記錄淹接,或者根本就不能緩存塑悼。

我們可以使用Redis鍵值緩存作為分布式dict厢蒜。Vagrant環(huán)境中已經(jīng)有了一個(gè)Redis實(shí)例斑鸦,我們現(xiàn)在可以連接它巷屿,用redis-cli作一些基本操作:

$ redis-cli -h redis
redis:6379> info keyspace
# Keyspace
redis:6379> set key value
OK
redis:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
redis:6379> FLUSHALL
OK
redis:6379> info keyspace
# Keyspace
redis:6379> exit

通過(guò)搜索“Redis Twisted”憨琳,我們找到一個(gè)txredisapi庫(kù)栽渴。它最大的不同是闲擦,它不僅是一個(gè)Python的同步封裝墅冷,還是一個(gè)Twisted庫(kù)寞忿,可以通過(guò)reactor.connectTCP()腔彰,執(zhí)行Twisted協(xié)議搓逾,連接Redis霞篡。其它庫(kù)也有類似用法朗兵,但是txredisapi對(duì)于Twisted效率更高余掖。我們可以通過(guò)安裝庫(kù)dj_redis_url可以安裝它盐欺,這個(gè)庫(kù)通過(guò)pip可以解析Redis配置URL(sudo pip install txredisapi dj_redis_url)歌憨。和以前一樣务嫡,開(kāi)發(fā)機(jī)中已經(jīng)安裝好了。

我們?nèi)缦聠?dòng)RedisCache pipeline:

from txredisapi import lazyConnectionPool
class RedisCache(object):
...
    def __init__(self, crawler, redis_url, redis_nm):
        self.redis_url = redis_url
        self.redis_nm = redis_nm
        args = RedisCache.parse_redis_url(redis_url)
        self.connection = lazyConnectionPool(connectTimeout=5,
                                             replyTimeout=5,
                                             **args)
        crawler.signals.connect(
                self.item_scraped,signal=signals.item_scraped)

這個(gè)pipeline比較簡(jiǎn)單。為了連接Redis服務(wù)器樊破,我們需要主機(jī)哲戚、端口等等顺少,它們?nèi)加肬RL格式存儲(chǔ)脆炎。我們用parse_redis_url()方法解析這個(gè)格式。使用命名空間做鍵的前綴很普遍簇爆,在我們的例子中入蛆,我們存儲(chǔ)在redis_nm哨毁。我們?nèi)缓笫褂胻xredisapi的lazyConnectionPool()打開(kāi)一個(gè)數(shù)據(jù)庫(kù)連接扼褪。

最后一行有一個(gè)有趣的函數(shù)。我們是想用pipeline封裝geo-pipeline幔崖。如果在Redis中沒(méi)有某個(gè)值赏寇,我們不會(huì)設(shè)定這個(gè)值嗅定,geo-pipeline會(huì)用API像之前一樣將地址進(jìn)行地理編碼忙迁。完畢之后动漾,我們必須要在Redis中緩存鍵值對(duì)旱眯,我們是通過(guò)連接signals.item_scraped信號(hào)來(lái)做的。我們定義的調(diào)回(即item_scraped()方法呀页,馬上會(huì)講)只有在最后才會(huì)被調(diào)用蓬蝶,那時(shí)丸氛,地址就設(shè)置好了。

提示:完整代碼位于ch09/properties/properties/pipelines/redis.py禾锤。

我們簡(jiǎn)化緩存恩掷,只尋找和存儲(chǔ)每個(gè)Item的地址和地點(diǎn)旦签。這對(duì)Redis來(lái)說(shuō)是合理的,因?yàn)樗ǔJ沁\(yùn)行在單一服務(wù)器上的氮凝,這可以讓它很快。如果不是這樣的話稿壁,可以加入一個(gè)dict結(jié)構(gòu)的緩存傅是,它與我們?cè)趃eo-pipeline中用到的相似。以下是我們?nèi)绾翁幚砣霂?kù)的Items:

process incoming Items:
@defer.inlineCallbacks
def process_item(self, item, spider):
    address = item["address"][0]
    key = self.redis_nm + ":" + address
    value = yield self.connection.get(key)
    if value:
        item["location"] = json.loads(value)
    defer.returnValue(item)

和預(yù)期的相同书闸。我們得到了地址浆劲,給它添加前綴,然后使用txredisapi connection的get()在Redis進(jìn)行查找走哺。我們將JSON編碼的對(duì)象在Redis中保存成值丙躏。如果一個(gè)值設(shè)定了,我們就使用JSON解碼废恋,然后將其設(shè)為地點(diǎn)拟烫。

當(dāng)一個(gè)Item到達(dá)pipelines的末端時(shí)硕淑,我們重新取得它,將其保存為Redis中的地點(diǎn)值拇囊。以下是我們的做法:

    from txredisapi import ConnectionError
    def item_scraped(self, item, spider):
        try:
            location = item["location"]
            value = json.dumps(location, ensure_ascii=False)
        except KeyError:
            return
        address = item["address"][0]
        key = self.redis_nm + ":" + address
        quiet = lambda failure: failure.trap(ConnectionError)
        return self.connection.set(key, value).addErrback(quiet)

如果我們找到了一個(gè)地點(diǎn),我們就取得了地址纠永,添加前綴尝江,然后使用它作為txredisapi連接的set()方法的鍵值對(duì)。set()方法沒(méi)有使用@defer.inlineCallbacks惭聂,因?yàn)樘幚韘ignals.item_scraped時(shí),它不被支持耕腾。這意味著,我們不能對(duì)connection.set()使用yield狼纬,但是我們可以返回一個(gè)延遲項(xiàng)疗琉,Scrapy可以在它后面排上其它信號(hào)對(duì)象涛癌。任何情況下,如果Redis的連接不能使用用connection.set(),它就會(huì)拋出一個(gè)例外坚俗。在這個(gè)錯(cuò)誤處理中猖败,我們把傳遞的錯(cuò)誤當(dāng)做參數(shù),我們讓它trap()任何ConnectionError降允。這是Twisted的延遲API的優(yōu)點(diǎn)之一恩闻。通過(guò)用trap()捕獲錯(cuò)誤項(xiàng),我們可以輕易忽略它們剧董。

使這個(gè)pipeline生效幢尚,我們要做的是在settings.py的ITEM_PIPELINES中添加它,并提供一個(gè)REDIS_PIPELINE_URL尉剩。必須要讓它的優(yōu)先級(jí)比geo-pipeline高,以免太晚就不能使用了:

ITEM_PIPELINES = { ...
    'properties.pipelines.redis.RedisCache': 300,
    'properties.pipelines.geo.GeoPipeline': 400,
...
REDIS_PIPELINE_URL = 'redis://redis:6379'

像之前一樣運(yùn)行毅臊。第一次運(yùn)行時(shí)和以前很像理茎,但隨后的運(yùn)行結(jié)果如下:

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=100
...
INFO: Enabled item pipelines: TidyUp, RedisCache, GeoPipeline, 
MysqlWriter, EsWriter
...
Scraped... 0.0 items/s, avg latency: 0.00 s, time in pipelines: 0.00 s
Scraped... 21.2 items/s, avg latency: 0.78 s, time in pipelines: 0.15 s
Scraped... 24.2 items/s, avg latency: 0.82 s, time in pipelines: 0.16 s
...
INFO: Dumping Scrapy stats: {...
   'geo_pipeline/already_set': 106,
   'item_scraped_count': 106,

我們看到GeoPipeline和RedisCache都生效了,RedisCache第一個(gè)輸出管嬉。還注意到在統(tǒng)計(jì)中g(shù)eo_pipeline/already_set: 106皂林。這是GeoPipeline發(fā)現(xiàn)的Redis緩存中填充的數(shù)目,它不調(diào)用Google API蚯撩。如果Redis緩存是空的式撼,你會(huì)看到Google API處理了一些鍵。從性能上來(lái)講求厕,我們看到GeoPipeline引發(fā)的初始行為消失了著隆。事實(shí)上扰楼,當(dāng)我們開(kāi)始使用內(nèi)存,我們繞過(guò)了每秒只有5次請(qǐng)求的API限制美浦。如果我們使用Redis弦赖,應(yīng)該考慮使用過(guò)期鍵,讓系統(tǒng)周期刷新緩存數(shù)據(jù)浦辨。

連接CPU密集型蹬竖、阻塞或舊方法

最后一部分講連接非Twisted的工作。盡管異步程序的優(yōu)點(diǎn)很多流酬,并不是所有庫(kù)都專門為Twisted和Scrapy寫的币厕。使用Twisted的線程池和reactor.spawnProcess()方法,我們可以使用任何Python庫(kù)和任何語(yǔ)言寫的編碼芽腾。

pipeline進(jìn)行CPU密集型和阻塞操作

我們?cè)诘?章中強(qiáng)調(diào)旦装,反應(yīng)器適合簡(jiǎn)短非阻塞的任務(wù)。如果我們不得不要處理復(fù)雜和阻塞的任務(wù)摊滔,又該怎么做呢阴绢?Twisted提供了線程池,有了它可以使用reactor.callInThread() API在分線程而不是主線程中執(zhí)行慢操作艰躺。這意味著呻袭,反應(yīng)器可以一直運(yùn)行并對(duì)事件反饋,而不中斷計(jì)算腺兴。但要記住左电,在線程池中運(yùn)行并不安全,當(dāng)你使用全局模式時(shí)页响,會(huì)有多線程的同步問(wèn)題券腔。讓我們從一個(gè)簡(jiǎn)單的pipeline開(kāi)始,逐漸做出完整的代碼:

class UsingBlocking(object):
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        price = item["price"][0]
        out = defer.Deferred()
        reactor.callInThread(self._do_calculation, price, out)
    item["price"][0] = yield out
        defer.returnValue(item)
    def _do_calculation(self, price, out):
        new_price = price + 1
        time.sleep(0.10)
        reactor.callFromThread(out.callback, new_price)

在前面的pipeline中拘泞,我們看到了一些基本用法纷纫。對(duì)于每個(gè)Item,我們提取出價(jià)格陪腌,我們相用_do_calucation()方法處理它辱魁。這個(gè)方法使用time.sleep(),一個(gè)阻塞操作诗鸭。我們用reactor.callInThread()調(diào)用染簇,讓它在另一個(gè)線程中運(yùn)行。顯然强岸,我們傳遞價(jià)格锻弓,我們還創(chuàng)建和傳遞了一個(gè)名為out的延遲項(xiàng)。當(dāng)_do_calucation()完成了計(jì)算蝌箍,我們使用out調(diào)回值青灼。下一步暴心,我們執(zhí)行延遲項(xiàng),并未價(jià)格設(shè)新的值杂拨,最后返回Item专普。

在_do_calucation()中,有一個(gè)細(xì)微之處弹沽,價(jià)格增加了1檀夹,進(jìn)而睡了100ms。這個(gè)時(shí)間很多策橘,如果調(diào)用進(jìn)反應(yīng)器主線程炸渡,每秒就不能抓取10頁(yè)了。通過(guò)在另一個(gè)線程中運(yùn)行丽已,就不會(huì)再有這個(gè)問(wèn)題蚌堵。任務(wù)會(huì)在線程池中排隊(duì),每次處理耗時(shí)100ms促脉。最后一步是觸發(fā)調(diào)回。一般的策州,我們可以使用out.callback(new_price)瘸味,但是因?yàn)槲覀儸F(xiàn)在是在另一個(gè)線程,這么做不安全够挂。如果這么做的話旁仿,延遲項(xiàng)的代碼會(huì)被從另一個(gè)線程調(diào)用,這樣遲早會(huì)產(chǎn)生錯(cuò)誤的數(shù)據(jù)孽糖。不這樣做枯冈,轉(zhuǎn)而使用reactor.callFromThread(),它也可以將函數(shù)當(dāng)做參數(shù)办悟,將任意其余參數(shù)傳遞到函數(shù)尘奏。這個(gè)函數(shù)會(huì)排隊(duì)并被調(diào)回主線程,主進(jìn)程反過(guò)來(lái)會(huì)打開(kāi)process_item()對(duì)象yield病蛉,并繼續(xù)Item的操作炫加。

如果我們用全局模式,例如計(jì)數(shù)器铺然、滑動(dòng)平均俗孝,又該怎么使用_do_calucation()呢?例如魄健,添加兩個(gè)變量赋铝,beta和delta,如下所示:

class UsingBlocking(object):
    def __init__(self):
        self.beta, self.delta = 0, 0
    ...
    def _do_calculation(self, price, out):
        self.beta += 1
        time.sleep(0.001)
self.delta += 1
        new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01
        time.sleep(0.10)...

這段代碼是斷言失敗錯(cuò)誤沽瘦。這是因?yàn)槿绻粋€(gè)線程在self.beta和self.delta間切換革骨,另一個(gè)線程繼續(xù)計(jì)算使用beta/delta計(jì)算價(jià)格农尖,它會(huì)發(fā)現(xiàn)它們狀態(tài)不一致(beta大于delta),因此苛蒲,計(jì)算出錯(cuò)誤的結(jié)果卤橄。短暫的睡眠可能會(huì)造成競(jìng)爭(zhēng)條件。為了不發(fā)生這些狀況臂外,我們要使一個(gè)鎖窟扑,例如Python的threading.RLock()遞歸鎖。使用它漏健,可以確保沒(méi)有兩個(gè)線程在同一時(shí)間操作被保護(hù)代碼:

class UsingBlocking(object):
    def __init__(self):
        ...
        self.lock = threading.RLock()
    ...
    def _do_calculation(self, price, out):
        with self.lock:
            self.beta += 1
            ...
            new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01 ...

代碼現(xiàn)在就正確了嚎货。記住,我們不需要保護(hù)整段代碼蔫浆,就足以處理全局模式殖属。

提示:完整代碼位于ch09/properties/properties/pipelines/computation.py。

要使用這個(gè)pipeline瓦盛,我們需要把它添加到settings.py的ITEM_PIPELINES中洗显。如下所示:

ITEM_PIPELINES = { ...
    'properties.pipelines.computation.UsingBlocking': 500,

像之前一樣運(yùn)行爬蟲(chóng),pipeline延遲達(dá)到了100ms原环,但吞吐量沒(méi)有發(fā)生變化挠唆,大概每秒25個(gè)items。

pipeline使用二進(jìn)制和腳本

最麻煩的借口當(dāng)屬獨(dú)立可執(zhí)行文件和腳本嘱吗。打開(kāi)需要幾秒(例如玄组,從數(shù)據(jù)庫(kù)加載數(shù)據(jù)),但是后面處理數(shù)值的延遲很小谒麦。即便是這種情況俄讹,Twisted也預(yù)料到了。我們可以使用reactor.spawnProcess() API和相關(guān)的protocol.ProcessProtocol來(lái)運(yùn)行任何執(zhí)行文件绕德。讓我們來(lái)看一個(gè)例子患膛。腳本如下:

#!/bin/bash
trap "" SIGINT
sleep 3
while read line
do
    # 4 per second
    sleep 0.25
    awk "BEGIN {print 1.20 * $line}"
done

這是一個(gè)簡(jiǎn)單的bash腳本。它運(yùn)行時(shí)耻蛇,會(huì)使Ctrl + C 無(wú)效剩瓶。這是為了避免系統(tǒng)的一個(gè)奇怪的錯(cuò)誤,將Ctrl + C增值到子流程并過(guò)早結(jié)束城丧,導(dǎo)致Scrapy強(qiáng)制等待流程結(jié)果延曙。在使Ctrl + C無(wú)效之后,它睡眠三秒亡哄,模擬啟動(dòng)時(shí)間枝缔。然后,它閱讀輸入的代碼語(yǔ)句,等待250ms愿卸,然后返回結(jié)果價(jià)格灵临,價(jià)格的值乘以了1.20,由Linux的awk命令計(jì)算而得趴荸。這段腳本的最大吞吐量為每秒1/250ms=4個(gè)Items儒溉。用一個(gè)短session檢測(cè):

$ properties/pipelines/legacy.sh 
12 <- If you type this quickly you will wait ~3 seconds to get results
14.40
13 <- For further numbers you will notice just a slight delay
15.60

因?yàn)镃trl + C失效了,我們用Ctrl + D必須結(jié)束session发钝。我們?cè)撊绾巫孲crapy使用這個(gè)腳本呢顿涣?再一次,我們從一個(gè)簡(jiǎn)化版開(kāi)始:

class CommandSlot(protocol.ProcessProtocol):
    def __init__(self, args):
      self._queue = []
        reactor.spawnProcess(self, args[0], args)
    def legacy_calculate(self, price):
        d = defer.Deferred()
        self._queue.append(d)
        self.transport.write("%f\n" % price)
        return d
    # Overriding from protocol.ProcessProtocol
    def outReceived(self, data):
        """Called when new output is received"""
        self._queue.pop(0).callback(float(data))
class Pricing(object):
    def __init__(self):
        self.slot = CommandSlot(['properties/pipelines/legacy.sh'])
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        item["price"][0] = yield self.slot.legacy_calculate(item["price"][0])
       defer.returnValue(item)

我們?cè)谶@里找到了一個(gè)名為CommandSlot的ProcessProtocol和Pricing爬蟲(chóng)酝豪。在init()中涛碑,我們創(chuàng)建了新的CommandSlot,它新建了一個(gè)空的隊(duì)列孵淘,并用reactor.spawnProcess()開(kāi)啟了一個(gè)新進(jìn)程蒲障。它調(diào)用收發(fā)數(shù)據(jù)的ProcessProtocol作為第一個(gè)參數(shù)。在這個(gè)例子中瘫证,是self的原因是spawnProcess()是被從類protocol調(diào)用的揉阎。第二個(gè)參數(shù)是可執(zhí)行文件的名字,第三個(gè)參數(shù)args背捌,讓二進(jìn)制命令行參數(shù)成為字符串序列毙籽。

在pipeline的process_item()中,我們用CommandSlot的legacy_calculate()方法代表所有工作载萌,CommandSlot可以返回產(chǎn)生的延遲項(xiàng)惧财。legacy_calculate()創(chuàng)建延遲項(xiàng)巡扇,將其排隊(duì)扭仁,用transport.write()將價(jià)格寫入進(jìn)程。ProcessProtocol提供了transport厅翔,可以讓我們與進(jìn)程溝通乖坠。無(wú)論何時(shí)我們從進(jìn)程收到數(shù)據(jù), outReceived()就會(huì)被調(diào)用刀闷。通過(guò)延遲項(xiàng)熊泵,進(jìn)程依次執(zhí)行,我們可以彈出最老的延遲項(xiàng)甸昏,用收到的值觸發(fā)它顽分。全過(guò)程就是這樣。我們可以讓這個(gè)pipeline生效施蜜,通過(guò)將它添加到ITEM_PIPELINES:

ITEM_PIPELINES = {...
    'properties.pipelines.legacy.Pricing': 600,

如果運(yùn)行的話卒蘸,我們會(huì)看到性能很差。進(jìn)程變成了瓶頸,限制了吞吐量缸沃。為了提高性能恰起,我們需要修改pipeline,允許多個(gè)進(jìn)程并行運(yùn)行趾牧,如下所示:

class Pricing(object):
    def __init__(self):
        self.concurrency = 16
        args = ['properties/pipelines/legacy.sh']
        self.slots = [CommandSlot(args) 
                      for i in xrange(self.concurrency)]
        self.rr = 0
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        slot = self.slots[self.rr]
        self.rr = (self.rr + 1) % self.concurrency
        item["price"][0] = yield
                         slot.legacy_calculate(item["price"][0])
        defer.returnValue(item)

這無(wú)非是開(kāi)啟16個(gè)實(shí)例检盼,將價(jià)格以輪轉(zhuǎn)的方式發(fā)出。這個(gè)pipeline的吞吐量是每秒16*4 = 64翘单。我們可以用下面的爬蟲(chóng)進(jìn)行驗(yàn)證:

 $ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 0.0 items/s, avg latency: 0.00 s and avg time in pipelines: 
0.00 s
Scraped... 21.0 items/s, avg latency: 2.20 s and avg time in pipelines: 
1.48 s
Scraped... 24.2 items/s, avg latency: 1.16 s and avg time in pipelines: 
0.52 s

延遲增加了250 ms吨枉,但吞吐量仍然是每秒25。
請(qǐng)記住前面的方法使用了transport.write()讓所有的價(jià)格在腳本shell中排隊(duì)县恕。這個(gè)可能對(duì)你的應(yīng)用不適用东羹,,尤其是當(dāng)數(shù)據(jù)量很大時(shí)忠烛。Git的完整代碼讓值和調(diào)回都進(jìn)行了排隊(duì)属提,不想腳本發(fā)送值,除非收到前一項(xiàng)的結(jié)果美尸。這種方法可能看起來(lái)更友好冤议,但是會(huì)增加代碼復(fù)雜度。

總結(jié)

你剛剛學(xué)習(xí)了復(fù)雜的Scrapy pipelines师坎。目前為止恕酸,你應(yīng)該就掌握了所有和Twisted編程相關(guān)的知識(shí)。并且你學(xué)會(huì)了如何在進(jìn)程中執(zhí)行復(fù)雜的功能胯陋,用Item Processing Pipelines存儲(chǔ)Items蕊温。我們看到了添加pipelines對(duì)延遲和吞吐量的影響。通常遏乔,延遲和吞吐量是成反比的义矛。但是,這是在恒定并發(fā)數(shù)的前提下(例如盟萨,一定數(shù)量的線程)凉翻。在我們的例子中,我們一開(kāi)始的并發(fā)數(shù)為N=ST=250.77?19,添加pipelines之后捻激,并發(fā)數(shù)為N=25*3.33?83制轰,并沒(méi)有引起性能的變化。這就是Twisted的強(qiáng)大之處胞谭!下面學(xué)習(xí)第10章垃杖,Scrapy的性能。


序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲(chóng)基礎(chǔ)
第4章 從Scrapy到移動(dòng)應(yīng)用
第5章 快速構(gòu)建爬蟲(chóng)
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實(shí)時(shí)分析


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末丈屹,一起剝皮案震驚了整個(gè)濱河市调俘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖脉漏,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件苞冯,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡侧巨,警方通過(guò)查閱死者的電腦和手機(jī)舅锄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)司忱,“玉大人皇忿,你說(shuō)我怎么就攤上這事√谷裕” “怎么了鳍烁?”我有些...
    開(kāi)封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)繁扎。 經(jīng)常有香客問(wèn)我幔荒,道長(zhǎng),這世上最難降的妖魔是什么梳玫? 我笑而不...
    開(kāi)封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任爹梁,我火速辦了婚禮,結(jié)果婚禮上提澎,老公的妹妹穿的比我還像新娘姚垃。我一直安慰自己,他們只是感情好盼忌,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布积糯。 她就那樣靜靜地躺著,像睡著了一般谦纱。 火紅的嫁衣襯著肌膚如雪看成。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天服协,我揣著相機(jī)與錄音绍昂,去河邊找鬼啦粹。 笑死偿荷,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的唠椭。 我是一名探鬼主播跳纳,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼贪嫂!你這毒婦竟也來(lái)了寺庄?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斗塘,沒(méi)想到半個(gè)月后赢织,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡馍盟,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年于置,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贞岭。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡八毯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瞄桨,到底是詐尸還是另有隱情话速,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布芯侥,位于F島的核電站泊交,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏柱查。R本人自食惡果不足惜活合,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望物赶。 院中可真熱鬧白指,春花似錦鲁猩、人聲如沸章母。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)清焕。三九已至矛紫,卻和暖如春仁热,著一層夾襖步出監(jiān)牢的瞬間耙厚,已是汗流浹背参歹。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工仰楚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人犬庇。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓僧界,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親臭挽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子捂襟,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容