《Learning Scrapy》(中文版)第11章 Scrapyd分布式抓取和實時分析


序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲基礎(chǔ)
第4章 從Scrapy到移動應(yīng)用
第5章 快速構(gòu)建爬蟲
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實時分析


我們已經(jīng)學(xué)了很多東西穷吮。我們先學(xué)習(xí)了兩種基礎(chǔ)的網(wǎng)絡(luò)技術(shù)夷蚊,HTML和XPath巧婶,然后我們學(xué)習(xí)了使用Scrapy抓取復(fù)雜的網(wǎng)站螃壤。接著栖榨,我們深入學(xué)習(xí)了Scrapy的設(shè)置找都,然后又進(jìn)一步深入學(xué)習(xí)了Scrapy和Python的內(nèi)部架構(gòu)和Twisted引擎的異步特征捏膨。在上一章中,我們學(xué)習(xí)了Scrapy的性能和以及處理復(fù)雜的問題以提高性能驯妄。

在本章中荷并,我將展示如何在多臺服務(wù)器上進(jìn)一步提高性能。我們會發(fā)現(xiàn)抓取通常是一個并行問題青扔;因此源织,我們可以水平延展至多臺服務(wù)器。為了這么做微猖,我們會使用一個Scrapy中間件谈息,我們還會使用Scrapyd,一個用來管理遠(yuǎn)程服務(wù)器爬蟲的應(yīng)用凛剥。它可以讓我們像第6章那樣進(jìn)行抓取侠仇。

我們最后用Apache Spark對提取的數(shù)據(jù)進(jìn)行實時分析。Spark一個非常流行的大數(shù)據(jù)處理框架当悔。收集的數(shù)據(jù)越多傅瞻、結(jié)果就變得越準(zhǔn)確踢代,我們使用Spark Streaming API展示結(jié)果盲憎。最后的結(jié)果展示了Python的強大和成熟,單單用Python的簡明代碼就全棧開發(fā)了從抓取到分析的全過程胳挎。

房子的標(biāo)題如何影響價格饼疙?

我們要研究個問題是房子的標(biāo)題和價格有什么關(guān)系。我們預(yù)計像“按摩浴缸”和“游泳池”可能和高價相關(guān)慕爬,而“打折”會和低價有關(guān)窑眯。將標(biāo)題與地點結(jié)合,例如医窿,可以根據(jù)地點和描述磅甩,實時判斷哪個房子最劃算。

我們想計算的就是特定名詞對價格造成的偏移:

例如姥卢,如果平均租金是$1000卷要,我們觀察到帶有按摩浴缸的房子的平均價格是$1300渣聚,沒有的價格是$995,因此按摩浴缸的偏移值為shiftjacuzzi=(1300-995)/1000=30.5%僧叉。如果一個帶有按摩浴缸的房子的價格直逼平均價格高5%奕枝,那么它的價格就很劃算。

因為名詞效應(yīng)會有累加瓶堕,所以這個指標(biāo)并不繁瑣隘道。例如,標(biāo)題同時含有按摩浴缸和打折會有一個混合效果郎笆。我們收集分析的數(shù)據(jù)越多谭梗,估算就會越準(zhǔn)確。稍后回到這個問題宛蚓,接下來講一個流媒體解決方案默辨。

Scrapyd

現(xiàn)在,我們來介紹Scrapyd苍息。Scrapyd是一個應(yīng)用缩幸,使用它,我們可以將爬蟲附屬到服務(wù)器上竞思,并對抓取進(jìn)行規(guī)劃表谊。我們來看看它的使用是多么容易,我們用第3章的代碼盖喷,只做一點修改爆办。

我們先來看Scrapyd的界面,在http://localhost:6800/课梳。

Scrapyd的界面

你可以看到距辆,它有幾個部分,有Jobs暮刃、Items跨算、Logs和Documentation。它還給出了如何規(guī)劃抓取工作的API方法椭懊。

為了這么做诸蚕,我們必須首先將爬蟲部署到服務(wù)器上。第一步是修改scrapy.cfg氧猬,如下所示:

$ pwd
/root/book/ch03/properties
$ cat scrapy.cfg 
...
[settings]
default = properties.settings
[deploy]
url = http://localhost:6800/
project = properties

我們要做的就是取消url的注釋背犯。默認(rèn)的設(shè)置適合我們。現(xiàn)在盅抚,為了部署爬蟲漠魏,我們使用scrapyd-client提供的工具scrapyd-deploy。scrapyd-client以前是Scrapy的一部分妄均,但現(xiàn)在是一個獨立的模塊柱锹,可以用pip install scrapyd-client進(jìn)行安裝(開發(fā)機中已經(jīng)安裝了):

$ scrapyd-deploy 
Packing version 1450044699
Deploying to project "properties" in http://localhost:6800/addversion.
json
Server response (200):
{"status": "ok", "project": "properties", "version": "1450044699", 
"spiders": 3, "node_name": "dev"}

部署好之后破讨,就可以在Scrapyd的界面的Available projects看到。我們現(xiàn)在可以根據(jù)提示奕纫,在當(dāng)前頁提交一個任務(wù):

$ curl http://localhost:6800/schedule.json -d project=properties -d spider=easy
{"status": "ok", "jobid": " d4df...", "node_name": "dev"}

如果我們返回Jobs提陶,我們可以使用jobid schedule.json,它可以在之后用cancel.json取消任務(wù):

$ curl http://localhost:6800/cancel.json -d project=properties -d job=d4df...
{"status": "ok", "prevstate": "running", "node_name": "dev"}

一定要取消進(jìn)程匹层,否則會浪費計算資源隙笆。

完畢之后,訪問Logs升筏,我們可以看到日志撑柔,在Items我們可以看到抓取過的items。這些數(shù)據(jù)會被周期清空以節(jié)省空間您访,所以一段時間后就會失效铅忿。

如果發(fā)生沖突或有其它理由的話,我們可以通過http_port修改端口灵汪,它是Scrapyd的諸多設(shè)置之一檀训。最好閱讀文檔http://scrapyd.readthedocs.org/,多了解下享言。我們的部署必須要設(shè)置的是max_proc峻凫。如果使用默認(rèn)值0,任務(wù)的并行數(shù)量最多可以是CPU核心的四位览露。因為我們可能會在虛擬機中運行多個Scrapyd服務(wù)器荧琼,我們將max_proc設(shè)為4,可以允許4個任務(wù)同時進(jìn)行差牛。在真實環(huán)境中命锄,使用默認(rèn)值就可以。

分布式系統(tǒng)概述

設(shè)計這個系統(tǒng)對我是個挑戰(zhàn)偏化。我一開始添加了許多特性脐恩,導(dǎo)致復(fù)雜度升高,只有高性能的機器才能完成工作夹孔。然后被盈,又不得不進(jìn)行簡化,既對硬件性能要求不那么高搭伤,也可以讓本章的重點仍然是Scrapy。

最后袜瞬,系統(tǒng)中會包括我們的開發(fā)機和幾臺服務(wù)器怜俐。我們用開發(fā)機進(jìn)行首頁的水平抓取,提取幾批URL邓尤。然后用輪訓(xùn)機制將URL分發(fā)到Scrapyd的結(jié)點拍鲤,并進(jìn)行抓取贴谎。最后,通過FTP傳遞.jl文件和Items到運行Spark的服務(wù)器上季稳。我選擇FTP和本地文件系統(tǒng)擅这,而不是HDFS或Apache Kafka,是因為FTP內(nèi)存需求少景鼠,并且作為FEED_URI被Scrapy支持仲翎。請記住,只要簡單設(shè)置Scrapyd和Spark的配置铛漓,我們就可以使用亞馬遜S3存儲這些文件溯香,獲得冗余度和可伸縮性等便利,而不用再使用其它技術(shù)浓恶。

筆記:FTP的缺點之一是玫坛,上傳過程可能會造成文件不完整。為了避免這點包晰,一旦上傳完成湿镀,我們便使用Pure-FTPd和調(diào)回腳本將文件上傳到/root/items。

每過幾秒伐憾,Spark都讀一下目錄/root/items肠骆,讀取任何新文件,取一個小批次進(jìn)行分析塞耕。我們使用Spark是因為它支持Python作為編程語言蚀腿,也支持流分析。到現(xiàn)在扫外,我們使用的爬蟲都比較短莉钙,實際中有的爬蟲是24小時運行的,不斷發(fā)出數(shù)據(jù)流并進(jìn)行分析筛谚,數(shù)據(jù)越多磁玉,分析的結(jié)果越準(zhǔn)確。我們就是要用Spark進(jìn)行這樣的演示驾讲。

筆記:除了Spark和Scrapy蚊伞,你還可以使用MapReduce,Apache Storm或其它框架吮铭。

在本章中时迫,我們不向數(shù)據(jù)庫中插入items。我們在第9章中用的方法也可以在這里使用谓晌,但是性能很糟掠拳。很少有數(shù)據(jù)庫喜歡每秒被pipelines寫入幾千個文件。如果想進(jìn)行寫入的話纸肉,應(yīng)該用Spark專用的方法溺欧,即批次導(dǎo)入Items喊熟。你可以修改我們Spark的例子,向任何數(shù)據(jù)庫進(jìn)行批次導(dǎo)入姐刁。

還有芥牌,這個系統(tǒng)的彈性很差。我們假設(shè)每個結(jié)點都是健康的聂使,任何一個損壞的話壁拉,也不會對總系統(tǒng)造成影響。Spark提供高可用性的彈性配置岩遗。Scrapy不提供此類內(nèi)建的功能扇商,除了Scrapyd的“持續(xù)排隊”功能,即當(dāng)結(jié)點恢復(fù)時宿礁,可以繼續(xù)失敗的任務(wù)案铺。這個功能不一定對你有用。如果彈性對你十分重要梆靖,你就得搭建一個監(jiān)督系統(tǒng)和分布式排隊方案(例如控汉,基于Kafka或RabbitMQ),可以重啟失敗的任務(wù)返吻。

修改爬蟲和中間件

為了搭建這個系統(tǒng)姑子,我們要稍稍修改爬蟲和中間件。更具體地测僵,我們要做如下工作:

  • 微調(diào)爬蟲街佑,使抓取索引頁的速度達(dá)到最大
  • 寫一個中間件,可以將URL批次發(fā)送給scrapyd服務(wù)器捍靠。
  • 使用相同的中間件沐旨,使系統(tǒng)啟動時就可以將URL分批

我們盡量用簡明的方式來完成這些工作。理想狀態(tài)下榨婆,整個過程應(yīng)該對底層的爬蟲代碼簡潔易懂磁携。這是一個底層層面的要求,通過破解爬蟲達(dá)到相同目的不是好主意良风。

抓取共享首頁

第一步是優(yōu)化抓取首頁的速度眼俊,速度越快越好岩饼。開始之前拳芙,先明確一下目的瞒渠。假設(shè)爬蟲的并發(fā)數(shù)是16,源網(wǎng)站的延遲大概是0.25秒吊档。這樣篙议,最大吞吐量是16/0.25=64頁/秒。首頁有5000O個子頁怠硼,每個索引頁有30個子頁鬼贱,那就有1667個索引頁。預(yù)計下載整個首頁需要香璃,1667/64=26秒这难。

將第3章中的爬蟲重命名為easy。我們將首先進(jìn)行垂直抓取的Rule(含有callback='parse_item'的一項)注釋掉葡秒,因為現(xiàn)在只想抓取索引頁姻乓。

提示:本章的代碼位于目錄ch11。

在進(jìn)行優(yōu)化之前眯牧,我們讓scrapy crawl只抓取10個頁面蹋岩,結(jié)果如下:

$ ls
properties  scrapy.cfg
$ pwd
/root/book/ch11/properties
$ time scrapy crawl easy -s CLOSESPIDER_PAGECOUNT=10
...
DEBUG: Crawled (200) <GET ...index_00000.html> (referer: None)
DEBUG: Crawled (200) <GET ...index_00001.html> (referer: ...index_00000.
html)
...
real  0m4.099s

如果10個頁面用時4秒,26秒內(nèi)是不可能完成1700個頁面的学少。通過查看日志剪个,我們看到每個索引頁都來自前一個頁面,也就是說版确,任何時候最多是在處理一個頁面扣囊。實際上,并發(fā)數(shù)是1绒疗。我們需要將其并行化侵歇,使達(dá)到并發(fā)數(shù)16。我們將索引頁相互共享吓蘑,即URL互相連接惕虑,再加入一些其他的鏈接,以免爬蟲中沒有URL磨镶。我們將首頁分廠20個部分溃蔫。實際上,任何大于16的數(shù)棋嘲,都可以提速酒唉,但是一旦超過20,速度反而會下降沸移。我們用下面的方法計算每個部分的起始索引頁:

>>> map(lambda x: 1667 * x / 20, range(20))
[0, 83, 166, 250, 333, 416, 500, ...  1166, 1250, 1333, 1416, 1500, 1583]

據(jù)此痪伦,設(shè)置start_URL如下:

start_URL = ['http://web:9312/properties/index_%05d.html' % id
              for id in map(lambda x: 1667 * x / 20, range(20))]

這可能會和你的情況不同,所以就不做美化了雹锣。將并發(fā)數(shù)(CONCURRENT_REQUESTS, CONCURRENT_REQUESTS_PER_DOMAIN)設(shè)為16网沾,再次運行爬蟲,運行如下:

$ time scrapy crawl easy -s CONCURRENT_REQUESTS=16 -s CONCURRENT_
REQUESTS_PER_DOMAIN=16
...
real  0m32.344s

結(jié)果接近了我們的目標(biāo)蕊爵。下載速度是1667頁面/32秒=52頁面/秒辉哥,也就是說,每秒可以產(chǎn)生52*30=1560個子頁面。我們現(xiàn)在可以注釋掉垂直抓取的Rule醋旦,將文件保存成一個爬蟲恒水。我們不需要進(jìn)一步修改爬蟲代碼,而是用一個功能強大的中間件繼續(xù)來做饲齐。如果只用開發(fā)機運行爬蟲钉凌,假設(shè)可以像抓取索引頁一樣抓取子頁,可以在50000/52=16分鐘內(nèi)完成抓取捂人。

這里有兩個要點御雕。在學(xué)習(xí)完第10章之后,我們在做的都是工程項目滥搭。我們可以想方設(shè)法計算出系統(tǒng)確切的性能酸纲。第二點是,抓取索引頁會產(chǎn)生子頁瑟匆,但實際的吞吐量不大闽坡。如果產(chǎn)生URL的速度快過scrapyd處理URL的速度,URL就會在scrapyd排隊脓诡∥尬纾或者,如果產(chǎn)生URL的速度太慢祝谚,scrapyd就會空閑宪迟。

批次抓取URL

現(xiàn)在來處理子頁面的URL,并把它們分批交惯,然后直接發(fā)送給scrapyds次泽,而不是繼續(xù)抓取。

如果檢查Scrapy的架構(gòu)席爽,我們可以明白這么做就是為了做一個中間件意荤,它可以執(zhí)行process_spider_output(),在Requests到達(dá)下載器之前就可以進(jìn)行處理或取消只锻。我們限定中間件只支持CrawlSpider的爬蟲玖像,并且只支持簡單的GET請求。如果要提高復(fù)雜度齐饮,例如捐寥,POST或認(rèn)證請求,我們必須開發(fā)更多的功能祖驱,以傳遞參數(shù)握恳、頭文件、每個批次進(jìn)行重新登陸捺僻。

打開Scrapy的GitHub乡洼,查看SPIDER_MIDDLEWARES_BASE設(shè)置崇裁,看看能否重利用哪個程序。Scrapy 1.0有以下中間件:HttpErrorMiddleware束昵、OffsiteMiddleware拔稳、RefererMiddleware、UrlLengthMiddleware和DepthMiddleware妻怎。我們看到OffsiteMiddleware(只有60行)好像使我們需要的壳炎。它根據(jù)爬蟲屬性allowed_domains限定URL泞歉。我們可以用相同的方法嗎逼侦?不是丟棄URL,我們轉(zhuǎn)而將它們分批腰耙,發(fā)送給scrapyds榛丢。我們確實可以這么做,部分代碼如下:

def __init__(self, crawler):
    settings = crawler.settings
    self._target = settings.getint('DISTRIBUTED_TARGET_RULE', -1)
    self._seen = set()
    self._URL = []
    self._batch_size = settings.getint('DISTRIBUTED_BATCH_SIZE', 1000)
    ...
def process_spider_output(self, response, result, spider):
    for x in result:
        if not isinstance(x, Request):
            yield x
        else:
            rule = x.meta.get('rule')
            if rule == self._target:
                self._add_to_batch(spider, x)
            else:
                yield x
def _add_to_batch(self, spider, request):
    url = request.url
    if not url in self._seen:
        self._seen.add(url)
        self._URL.append(url)
        if len(self._URL) >= self._batch_size:
            self._flush_URL(spider)

process_spider_output()處理Item和Request挺庞。我們只需要Request晰赞,其它就不考慮了。如果查看CrawlSpider的源代碼选侨,我們看到將Request/Response映射到Rule的方式是用一個meta dict中的名為“rule”的整數(shù)字段掖鱼。我們檢查這個數(shù)字,如果它指向我們想要的Rule(DISTRIBUTED_TARGET_RULE設(shè)置)援制,我們調(diào)用_add_to_batch()戏挡,將它的URL添加到這個批次里面。我們?nèi)缓笕∠@個Request晨仑。我們接著產(chǎn)生出其他的請求褐墅,例如下一頁的鏈接,不進(jìn)行改動洪己。The _add_to_batch()方法起到去重的作用妥凳。但是,我們前面描述的碎片化過程答捕,意味著有的URL可能要提取兩次逝钥。我們使用_seen set檢測并去除重復(fù)項。然后將這些URL添加到_URL列表拱镐,如果它的大小超過了_batch_size(根據(jù)DISTRIBUTED_BATCH_SIZE設(shè)置)艘款,就會調(diào)用_flush_URL()。這個方法提供了一下功能:

def __init__(self, crawler):
    ...
    self._targets = settings.get("DISTRIBUTED_TARGET_HOSTS")
    self._batch = 1
    self._project = settings.get('BOT_NAME')
    self._feed_uri = settings.get('DISTRIBUTED_TARGET_FEED_URL', None)
    self._scrapyd_submits_to_wait = []
def _flush_URL(self, spider):
    if not self._URL:
        return
    target = self._targets[(self._batch-1) % len(self._targets)]
    data = [
        ("project", self._project),
        ("spider", spider.name),
        ("setting", "FEED_URI=%s" % self._feed_uri),
        ("batch", str(self._batch)),
    ]
    json_URL = json.dumps(self._URL)
    data.append(("setting", "DISTRIBUTED_START_URL=%s" % json_URL))
    d = treq.post("http://%s/schedule.json" % target,
                  data=data, timeout=5, persistent=False)
    self._scrapyd_submits_to_wait.append(d)
    self._URL = []
    self._batch += 1

首先痢站,它使用了批次計數(shù)器(_batch)來決定發(fā)向哪個scrapyd服務(wù)器磷箕。可用服務(wù)器保存在_targets(見DISTRIBUTED_TARGET_HOSTS設(shè)置)阵难。我們?nèi)缓笙騭crapyd的schedule.json做一個POST請求岳枷。這比之前用過的curl方法高級,因為它傳遞了經(jīng)過仔細(xì)選擇的參數(shù)】辗保基于這些常熟殿衰,scrapyd就規(guī)劃了一次抓取,如下所示:

scrapy crawl distr \
-s DISTRIBUTED_START_URL='[".../property_000000.html", ... ]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' \
-a batch=1

除了項目和爬蟲的名字盛泡,我們想爬蟲傳遞了一個FEED_URI設(shè)置闷祥。它的值是從DISTRIBUTED_TARGET_FEED_URL得到的。

因為Scrapy支持FTP傲诵,我們可以讓scrapyds用一個匿名FTP將抓取的Item文件上傳到Spark服務(wù)器凯砍。它的格式包括爬蟲的名字(%(name)s和時間(%(time)s)。如果只有這兩項的話拴竹,那么同一時間創(chuàng)建出來的兩個文件就會有沖突悟衩。為了避免覆蓋,我們加入一個參數(shù)%(batch)栓拜。Scrapy默認(rèn)是不知道批次的座泳,所以我們必須給設(shè)定一個值。scrapyd的schedule.json API的特點之一是幕与,每個不是設(shè)置的參數(shù)或已知的參數(shù)都被傳遞給了爬蟲挑势。默認(rèn)時,爬蟲的參數(shù)成為了爬蟲的屬性啦鸣,然后在爬蟲的屬性中尋找未知的FEED_URI參數(shù)潮饱。因此,將一批參數(shù)傳遞給schedule.json赏陵,我們就可以在FEED_URI中使用它饼齿,以避免沖突。

最后是將DISTRIBUTED_START_URL和這一批次的子頁URL編譯為JSON蝙搔,因為JSON是最簡潔的文本格式缕溉。

筆記:用命令行將大量數(shù)據(jù)傳遞到Scrapy并不可取。如果你想將參數(shù)存儲到數(shù)據(jù)庫(例如Redis)吃型,只傳遞給Scrapy一個ID证鸥。這么做的話,需要小幅修改_flush_URL()和process_start_requests()勤晚。

我們用treq.post()來做POST請求枉层。Scrapyd處理持續(xù)連接并不好,因此我們用persistent=False取消它赐写。我們還設(shè)置了一個5秒的暫停鸟蜡。這個請求的的延遲項被保存在_scrapyd_submits_to_wait列表。要關(guān)閉這個函數(shù)挺邀,我們重置_URL列表揉忘,并加大當(dāng)前的_batch跳座。

奇怪的是,關(guān)閉操作中會出現(xiàn)許多方法泣矛,如下所示:

def __init__(self, crawler):
    ...
    crawler.signals.connect(self._closed, signal=signals.spider_
closed)
@defer.inlineCallbacks
def _closed(self, spider, reason, signal, sender):
    # Submit any remaining URL
    self._flush_URL(spider)
    yield defer.DeferredList(self._scrapyd_submits_to_wait)

調(diào)用_closed()可能是因為我們按下了Ctrl + C或因為抓取結(jié)束疲眷。兩種情況下,我們不想失去任何最后批次的還未發(fā)送的URL您朽。這就是為什么在_closed()中狂丝,第一件事是調(diào)用_flush_URL(spider)加載最后的批次。第二個問題是哗总,因為是非阻塞的几颜,停止抓取時,treq.post()可能結(jié)束也可能沒結(jié)束魂奥。為了避免丟失最后批次菠剩,我們要使用前面提到過的scrapyd_submits_to_wait列表,它包括所有的treq.post()延遲項耻煤。我們使用defer.DeferredList()等待,直到全部完成准颓。因為_closed()使用了@defer.inlineCallbacks哈蝇,當(dāng)所有請求完成時,我們只yield它并繼續(xù)攘已。

總結(jié)一下炮赦,DISTRIBUTED_START_URL設(shè)置中的批次URL會被發(fā)送到scrapyds,scrapyds上面運行著相同的爬蟲样勃。很明顯吠勘,我們需要使用這個設(shè)置以啟動start_URL。

從settings啟動URL

中間件還提供了一個process_start_requests()方法峡眶,使用它可以處理爬蟲提供的start_requests剧防。檢測是否設(shè)定了DISTRIBUTED_START_URL,設(shè)定了的話辫樱,用JSON解碼峭拘,并使用它的URL產(chǎn)生相關(guān)的請求。對于這些請求狮暑,我們設(shè)定CrawlSpider的_response_downloaded()方法作為回調(diào)函數(shù)鸡挠,再設(shè)定參數(shù)meta['rule'],以讓恰當(dāng)?shù)腞ule處理響應(yīng)搬男。我們查看Scrapy的源碼拣展,找到CrawlSpider創(chuàng)建請求的方法,并依法而做:

def __init__(self, crawler):
    ...
    self._start_URL = settings.get('DISTRIBUTED_START_URL', None)
    self.is_worker = self._start_URL is not None
def process_start_requests(self, start_requests, spider):
    if not self.is_worker:
        for x in start_requests:
            yield x
    else:
        for url in json.loads(self._start_URL):
            yield Request(url, spider._response_downloaded,
                          meta={'rule': self._target})

中間件就準(zhǔn)備好了缔逛。我們在settings.py進(jìn)行設(shè)置以啟動它:

SPIDER_MIDDLEWARES = {
    'properties.middlewares.Distributed': 100,
}
DISTRIBUTED_TARGET_RULE = 1
DISTRIBUTED_BATCH_SIZE = 2000
DISTRIBUTED_TARGET_FEED_URL = ("ftp://anonymous@spark/"
                               "%(batch)s_%(name)s_%(time)s.jl")
DISTRIBUTED_TARGET_HOSTS = [
    "scrapyd1:6800",
    "scrapyd2:6800",
    "scrapyd3:6800",
]

有人可能認(rèn)為DISTRIBUTED_TARGET_RULE不應(yīng)該作為設(shè)置备埃,因為它會使爬蟲差異化溜腐。你可以認(rèn)為這是個默認(rèn)值,你可以在你的爬蟲中使用屬性custom_settings覆蓋它瓜喇,例如:

custom_settings = {
    'DISTRIBUTED_TARGET_RULE': 3
}

我們的例子并不需要這么做挺益。我們可以做一個測試運行,只抓取一個頁面:

$ scrapy crawl distr -s \
DISTRIBUTED_START_URL='["http://web:9312/properties/property_000000.html"]'

這個成功之后乘寒,我們進(jìn)一步望众,抓取一個頁面之后,用FTP將它傳送到Spark服務(wù)器:

scrapy crawl distr -s \
DISTRIBUTED_START_URL='["http://web:9312/properties/property_000000.html"]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' -a batch=12

用ssh連接Spark服務(wù)器伞辛,你可以看到一個文件烂翰,例如/root/items下的12_distr_date_time.jl。
這個中間件的例子可以讓你完成scrapyd的分布式抓取蚤氏。你可以將它當(dāng)做起點甘耿,進(jìn)行改造。你可能要做如下修改:

  • 爬蟲的類型竿滨。除了CrawlSpider佳恬,你必須讓爬蟲用恰當(dāng)?shù)膍eta標(biāo)記分布式的請求,用慣用命名法執(zhí)行調(diào)回于游。
  • 向scrapyds傳遞URL的方式毁葱。你可能想限定域名,減少傳遞的量贰剥。例如倾剿,你只想傳遞IDs。
  • 你可以用分布式排隊方案蚌成,讓爬蟲可以從失敗恢復(fù)前痘,讓scrapyds執(zhí)行更多的URL批次。
  • 你可以動態(tài)擴展服務(wù)器的規(guī)模担忧,以適應(yīng)需求芹缔。

將項目部署到scrapyd服務(wù)器

為了將爬蟲附屬到三臺scrapyd服務(wù)器上,我們必須將它們添加到scrapy.cfg文件涵妥。文件上的每個[deploy:target-name]定義了一個新的部署目標(biāo):

$ pwd
/root/book/ch11/properties
$ cat scrapy.cfg
...
[deploy:scrapyd1]
url = http://scrapyd1:6800/
[deploy:scrapyd2]
url = http://scrapyd2:6800/
[deploy:scrapyd3]
url = http://scrapyd3:6800/

你可以用scrapyd-deploy -l查詢可用的服務(wù)器:

$ scrapyd-deploy -l
scrapyd1             http://scrapyd1:6800/
scrapyd2             http://scrapyd2:6800/
scrapyd3             http://scrapyd3:6800/

用scrapyd-deploy <target name>進(jìn)行部署:

$ scrapyd-deploy scrapyd1
Packing version 1449991257
Deploying to project "properties" in http://scrapyd1:6800/addversion.json
Server response (200):
{"status": "ok", "project": "properties", "version": "1449991257", 
"spiders": 2, "node_name": "scrapyd1"}

這個過程會產(chǎn)生一些新的目錄和文件(build乖菱、project.egg-info、setup.py)蓬网,可以刪掉窒所。其實,scrapyd-deploy做的就是打包你的項目帆锋,并用addversion.json吵取,傳遞到目標(biāo)服務(wù)器上。

之后锯厢,如果我們用scrapyd-deploy –L查詢服務(wù)器皮官,我們可以確認(rèn)項目被成功部署了:

$ scrapyd-deploy -L scrapyd1
properties

我還用touch在項目的目錄創(chuàng)建了三個空文件夾脯倒,scrapyd1-3。這樣可以將scrapyd的名字傳遞給下面的文件捺氢,同時也是服務(wù)器的名字藻丢。然后可以用bash loop將其部署服務(wù)器: for i in scrapyd*; do scrapyd-deploy $i; done。

創(chuàng)建自定義監(jiān)視命令

如果你想在多臺scrapyd服務(wù)器上監(jiān)視抓取的進(jìn)程摄乒,你必須親自編寫程序悠反。這是一個練習(xí)所學(xué)知識的好機會,寫一個原生的Scrapy命令馍佑,scrapy monitor斋否,用它監(jiān)視一組scrapyd服務(wù)器。文件命名為monitor.py拭荤,在settings.py中添加COMMANDS_MODULE = 'properties.monitor'茵臭。快速查看scrapyd的文檔舅世,listjobs.json API給我們提供了關(guān)于任務(wù)的信息旦委。如果我們想找到給定目標(biāo)的根URL,我們可以斷定歇终,它只能是在scrapyd-deploy的代碼中社证。如果查看https://github.com/scrapy/scrapyd-client/blob/master/scrapyd-client/scrapyd-deploy,我們可以發(fā)現(xiàn)一個_get_targets()函數(shù)(執(zhí)行它不會添加許多值评凝,所以略去了),它可以給出目標(biāo)的名字和根URL腺律。我們現(xiàn)在就可以執(zhí)行命令的第一部分了奕短,如下所示:

class Command(ScrapyCommand):
    requires_project = True
    def run(self, args, opts):
        self._to_monitor = {}
        for name, target in self._get_targets().iteritems():
            if name in args:
               project = self.settings.get('BOT_NAME')
                url = target['url'] + "listjobs.json?project=" + project
               self._to_monitor[name] = url
        l = task.LoopingCall(self._monitor)
        l.start(5)  # call every 5 seconds
        reactor.run()

這段代碼將名字和想要監(jiān)視的API的終點提交給dict _to_monitor。我們?nèi)缓笫褂胻ask.LoopingCall()規(guī)劃向_monitor()方法發(fā)起遞歸調(diào)用匀钧。_monitor()使用treq和deferred翎碑,我們使用@defer.inlineCallbacks對它進(jìn)行簡化。方法如下(省略了一些錯誤處理和代碼美化):

@defer.inlineCallbacks
def _monitor(self):
    all_deferreds = []
    for name, url in self._to_monitor.iteritems():
        d = treq.get(url, timeout=5, persistent=False)
        d.addBoth(lambda resp, name: (name, resp), name)
        all_deferreds.append(d)
    all_resp = yield defer.DeferredList(all_deferreds)
    for (success, (name, resp)) in all_resp:
        json_resp = yield resp.json()
        print "%-20s running: %d, finished: %d, pending: %d" %
              (name,  len(json_resp['running']),
              len(json_resp['finished']), len(json_resp['pending']))

這幾行代碼包括了目前我們學(xué)過的所有Twisted方法之斯。我們使用treq調(diào)用scrapyd的API和defer.DeferredList日杈,立即處理所有的響應(yīng)。當(dāng)all_resp有了所有結(jié)果之后佑刷,我們重復(fù)這個過程莉擒,取回它們的JSON對象。treq Response'json()方法返回延遲項瘫絮,而不是實際值涨冀,以與后續(xù)的實際值繼續(xù)任務(wù)。我們最后打印出結(jié)果麦萤。JSON響應(yīng)的列表信息包括懸掛鹿鳖、運行中扁眯、結(jié)束的任務(wù),我們打印出它的長度翅帜。

用Apache Spark streaming計算偏移值

我們的Scrapy系統(tǒng)現(xiàn)在就功能完備了姻檀。讓我們來看看Apache Spark的使用。

讓我來看如何執(zhí)行涝滴。請記住這不是Scrapy代碼绣版,所以看起來會覺得陌生,但是是可以看懂的狭莱。你可以在 boostwords.py文件找到這個應(yīng)用僵娃,這個文件包括了復(fù)雜的測試代碼,可以忽略腋妙。它的主要代碼如下:

# Monitor the files and give us a DStream of term-price pairs
raw_data = raw_data = ssc.textFileStream(args[1])
word_prices = preprocess(raw_data)
# Update the counters using Spark's updateStateByKey
running_word_prices = word_prices.updateStateByKey(update_state_
function)
# Calculate shifts out of the counters
shifts = running_word_prices.transform(to_shifts)
# Print the results
shifts.foreachRDD(print_shifts)

Spark使用DStream代表數(shù)據(jù)流默怨。textFileStream()方法監(jiān)督文件系統(tǒng)的一個目錄,當(dāng)檢測到新文件時骤素,就傳出來匙睹。我們的preprocess()函數(shù)將它們轉(zhuǎn)化為term/price對。我們用update_state_function()函數(shù)和Spark的updateStateByKey()方法累加這些term/price對济竹。我們最后通過運行to_shifts()計算偏移值痕檬,并用print_shifts()函數(shù)打印出極值。大多我們的函數(shù)修改不大送浊,只是高效重塑了數(shù)例據(jù)梦谜。例外的是shifts()函數(shù):

def to_shifts(word_prices):
    (sum0, cnt0) = word_prices.values().reduce(add_tuples)
    avg0 = sum0 / cnt0
    def calculate_shift((isum, icnt)):
        avg_with = isum / icnt
        avg_without = (sum0 - isum) / (cnt0 - icnt)
        return (avg_with - avg_without) / avg0
    return word_prices.mapValues(calculate_shift)

這段代碼完全是按照公式做的。盡管很簡單袭景,Spark的mapValues()可以讓calculate_shift在Spark服務(wù)器上用最小開銷高效運行唁桩。

進(jìn)行分布式抓取

我進(jìn)行四臺終端進(jìn)行抓取。我想讓這部分盡量獨立耸棒,所以我還提供了vagrant ssh命令荒澡,可以在終端使用。

使用四臺終端進(jìn)行抓取

用終端1來檢測集群的CPU和內(nèi)存的使用与殃。這可以確認(rèn)和修復(fù)問題单山。設(shè)置方法如下:

$ alias provider_id="vagrant global-status --prune | grep 'docker-
provider' | awk '{print \$1}'"
$ vagrant ssh $(provider_id)
$ docker ps --format "{{.Names}}" | xargs docker stats

前兩行可以讓我們用ssh打開docker provider VM。如果沒有使用VM幅疼,只在docker Linux運行米奸,我們只需要最后一行。
終端2用作診斷衣屏,如下運行 scrapy monitor:

$ vagrant ssh
$ cd book/ch11/properties
$ scrapy monitor scrapyd*

使用scrapyd*和空文件夾躏升,空文件夾名字是scrapy monitor,這會擴展到scrapy monitor scrapyd1 scrapyd2 scrapyd3狼忱。

終端3膨疏,是我們啟動抓取的終端一睁。除此之外,它基本是閑置的佃却。開始一個新的抓取者吁,我們操作如下:

$ vagrant ssh
$ cd book/ch11/properties
$ for i in scrapyd*; do scrapyd-deploy $i; done
$ scrapy crawl distr

最后兩行很重要。首先饲帅,我們使用一個for循環(huán)和scrapyd-deploy复凳,將爬蟲部署到服務(wù)器上。然后我們用scrapy crawl distr開始抓取灶泵。我們隨時可以運行小的抓取育八,例如,scrapy crawl distr -s CLOSESPIDER_PAGECOUNT=100赦邻,來抓取100個索引頁髓棋,它會產(chǎn)生大概3000個子頁。
終端4用來連接Spark服務(wù)器惶洲,我們用它進(jìn)行實時分析:

$ vagrant ssh spark
$ pwd
/root
$ ls
book items
$ spark-submit book/ch11/boostwords.py items

只有最后一行重要按声,它運行了boostwords.py,將本地items目錄傳給了監(jiān)視器恬吕。有時签则,我還使用watch ls -1 items來監(jiān)視item文件。
到底哪個詞對價格的影響最大呢铐料?這個問題留給讀者渐裂。

系統(tǒng)性能

系統(tǒng)的性能極大地依賴于硬件、CPU的數(shù)量钠惩、虛擬機分配內(nèi)存的大小芯义。在真實情況下,我們可以進(jìn)行水平擴展妻柒,使抓取提速。

理論最大吞吐量是3臺服務(wù)器4個CPU16并發(fā)數(shù)*4頁/秒=768頁/秒耘分。
實際中举塔,使用分配了4G內(nèi)存、8CPU的虛擬機的Macbook Pro求泰,2分40秒內(nèi)下載了50000條URL央渣,即315頁/秒。在一臺亞馬遜EC2 m4.large渴频,它有2個vCPUs芽丹、8G內(nèi)存,因為CPU頻率低卜朗,用時6分12秒拔第,即134頁/秒咕村。在一臺臺亞馬遜EC2 m4.4xlarge,它有16個vCPUs蚊俺、64G內(nèi)存懈涛,用時1分44秒,即480頁/秒泳猬。在同一臺機器上批钠,我將scrapyd的數(shù)量提高到6(修改Vagrantfile、scrapy.cfg和settings.py)得封,用時1分15秒埋心,即667頁/秒。在最后的例子中忙上,網(wǎng)絡(luò)服務(wù)器似乎是瓶頸拷呆。

實際和理論計算存在差距是合理的。我們的粗略計算中沒有考慮許多小延遲晨横。盡管我們聲明了每個頁有250ms的延遲洋腮,我們在前幾章已經(jīng)看到,實際延遲要更高手形,這是因為我們還有額外的Twisted和操作系統(tǒng)延遲啥供。還有開發(fā)機向scrapyds傳遞URL的時間,F(xiàn)TP向Spark傳遞Items的時間库糠,還有scrapyd發(fā)現(xiàn)新文件和規(guī)劃任務(wù)的時間(平均要2.5秒伙狐,根據(jù)scrapyd的poll_interval設(shè)置)。還沒計算開發(fā)機和scrapyd的啟動時間瞬欧。如果不能確定可以提高吞吐量的話贷屎,我是不會試圖改進(jìn)這些延遲的。我的下一步是擴大抓取的規(guī)模艘虎,比如500000個頁面唉侄、網(wǎng)絡(luò)服務(wù)器的負(fù)載均衡,在擴大的過程中發(fā)現(xiàn)新的挑戰(zhàn)野建。

要點

本章的要點是属划,如果要進(jìn)行分布式抓取,一定要使用大小合適的批次候生。
取決于源網(wǎng)站的響應(yīng)速度同眯,你可能有數(shù)百、數(shù)千唯鸭、上萬個URL须蜗。你希望它們越大越好(在幾分鐘的水平),這樣就可以分?jǐn)倖拥馁M用。另一方面明肮,你也不希望它們太大菱农,以免造成機器故障。在一個有容錯的分布式系統(tǒng)中晤愧,你需要重試失敗的批次大莫,而且重試不要浪費太多時間。

總結(jié)

希望你能喜歡這本關(guān)于Scrapy的書」俜荩現(xiàn)在你對Scrapy應(yīng)該已經(jīng)有深入的了解了只厘,并可以解決簡單或復(fù)雜的問題了。你還學(xué)到了Scrapy復(fù)雜的結(jié)構(gòu)舅巷,以及如何發(fā)揮出它的最大性能羔味。通過抓取,你可以在應(yīng)用中使用龐大的數(shù)據(jù)資源钠右。我們已經(jīng)看到了如何在移動應(yīng)用中使用Scrapy抓取的數(shù)據(jù)并進(jìn)行分析赋元。希望你能用Scrapy做出更多強大的應(yīng)用,為世界做出貢獻(xiàn)飒房。祝你好運搁凸!


序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲基礎(chǔ)
第4章 從Scrapy到移動應(yīng)用
第5章 快速構(gòu)建爬蟲
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實時分析


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市狠毯,隨后出現(xiàn)的幾起案子护糖,更是在濱河造成了極大的恐慌,老刑警劉巖嚼松,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嫡良,死亡現(xiàn)場離奇詭異,居然都是意外死亡献酗,警方通過查閱死者的電腦和手機寝受,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來罕偎,“玉大人很澄,你說我怎么就攤上這事⊙占埃” “怎么了痴怨?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長器予。 經(jīng)常有香客問我,道長捐迫,這世上最難降的妖魔是什么乾翔? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上反浓,老公的妹妹穿的比我還像新娘偿枕。我一直安慰自己衍腥,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挥等,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喳篇。 梳的紋絲不亂的頭發(fā)上敌买,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天,我揣著相機與錄音猜揪,去河邊找鬼惭墓。 笑死,一個胖子當(dāng)著我的面吹牛而姐,可吹牛的內(nèi)容都是我干的腊凶。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼拴念,長吁一口氣:“原來是場噩夢啊……” “哼钧萍!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起政鼠,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤风瘦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后缔俄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弛秋,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年俐载,在試婚紗的時候發(fā)現(xiàn)自己被綠了蟹略。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡遏佣,死狀恐怖挖炬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情状婶,我是刑警寧澤意敛,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站膛虫,受9級特大地震影響草姻,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜稍刀,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一撩独、第九天 我趴在偏房一處隱蔽的房頂上張望敞曹。 院中可真熱鬧,春花似錦综膀、人聲如沸澳迫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽橄登。三九已至,卻和暖如春讥此,著一層夾襖步出監(jiān)牢的瞬間拢锹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工暂论, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留面褐,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓取胎,卻偏偏與公主長得像展哭,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子闻蛀,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容