編寫(xiě)一個(gè)basespider爬蟲(chóng)
在spider腳本中編寫(xiě)頁(yè)面跳轉(zhuǎn)邏輯
打開(kāi)spiders文件夾下的腳本文件董习,在代碼頭部添加一行語(yǔ)句, 引入一個(gè)request類,這個(gè)類的構(gòu)造函數(shù)可以將類中的參數(shù)url交給scrapy進(jìn)行下載, 然后通過(guò)另一個(gè)參數(shù)callback設(shè)置回調(diào)函數(shù), 將下載后返回的響應(yīng)體進(jìn)行處理:
import scrapy
from urllib.parse import urljoin,urlparse
from scrapy.http import Request
from BKSpider.items import BkItemLoader,BkdoneItem
class BeikeSpider(scrapy.Spider):
name = 'beike'
allowed_domains = ['hz.ke.com']
start_urls = ['http://hz.ke.com/chengjiao//']
# 第一層獲取所有區(qū)域地址
def parse(self, response):
parts = response.css('dl > dd > div > div > a')
for part in parts:
part_url = urljoin(response.url, part.css('a::attr(href)').extract_first(""))
part_name = part.css('a::text').extract_first("")
yield Request(url=part_url,meta={'part_name': part_name},callback=self.parse_segment,dont_filter=True)
# 第二層獲取所有街道地址
def parse_segment(self, response):
segments = response.css('dl:nth-child(2) div > div:nth-child(2) > a')
for segment in segments:
segment_url = urljoin(response.url, segment.css('a::attr(href)').extract_first(""))
segment_name = segment.css('a::text').extract_first("")
yield Request(url=segment_url,meta={'segment': segment_url,'segment_name': segment_name,'part_name':response.meta.get("part_name", "")}, callback=self.parse_unity,dont_filter=True)
我們可以考慮將URL交給Request, Request會(huì)先將URL交給scrapy下載偶垮,響應(yīng)體則通過(guò)回調(diào)函數(shù)交給其他函數(shù)處理遗增。
目錄頁(yè)URL通過(guò)Request回調(diào)給parse函數(shù)解析出單個(gè)文章頁(yè)面的URL叫惊, 然后再一次使用Request回調(diào)給自定義函數(shù)做進(jìn)一步處理。
有時(shí)候, 我們希望往Request返回的響應(yīng)體中添加其它的信息, 這種情況下我們可以使用Request的另一個(gè)參數(shù)meta={},
這些信息就會(huì)以字典的形式存儲(chǔ)到response響應(yīng)體的meta這一項(xiàng)中贡定。
spider的去重機(jī)制
Requset這個(gè)函數(shù)里有個(gè)參數(shù)dont_filter需要我們特別注意一下, 這里需要了解一下scrapy的去重機(jī)制赋访。
首先, scrapy的Request下載一個(gè)頁(yè)面時(shí),會(huì)分析URL的host是否存在于allowed_domains中缓待,如果不存在則會(huì)直接被剔除掉蚓耽;
然后,scrapy進(jìn)程會(huì)維護(hù)一個(gè)指紋集合旋炒,Request下載URL對(duì)應(yīng)頁(yè)面前步悠,會(huì)對(duì)URL執(zhí)行“RFPDupeFilter”去重篩選(將URL處理成指紋,然后與集合匹配瘫镇,如果存在鼎兽,也會(huì)被剔除。)具體的去重邏輯編寫(xiě)在scrapy.dupefilter.py 文件中铣除。
scrapy用于去重的指紋集合有內(nèi)存存儲(chǔ)和磁盤(pán)存儲(chǔ)兩種方式谚咬,默認(rèn)是內(nèi)存存儲(chǔ),當(dāng)爬蟲(chóng)程序結(jié)束時(shí)集合內(nèi)容會(huì)釋放尚粘;如果希望保存指紋集合择卦,可以選擇用磁盤(pán)存儲(chǔ)方式,具體操作是郎嫁,在settings.py中定義一個(gè)“JOBDIR”變量定義指紋集合存儲(chǔ)成文本的路徑秉继,那么在進(jìn)程啟動(dòng)時(shí)會(huì)在該路徑下生成一個(gè)requests.seen文件用于存儲(chǔ)指紋集合,當(dāng)進(jìn)程結(jié)束時(shí)泽铛,該文件不會(huì)被刪除尚辑,而是會(huì)保留下來(lái)。
如果某一個(gè)Request執(zhí)行時(shí)不希望對(duì)該URL去重盔腔,可以設(shè)置參數(shù)dont_filter=True(默認(rèn)設(shè)置為False)杠茬。
另外月褥,如果不定義allowed_domains或者將其置為空,進(jìn)程也不會(huì)進(jìn)行去重澈蝙。
配置items.py文件
scrapy提供了一種類似于map的數(shù)據(jù)結(jié)構(gòu)item, 用于將scrapy下載頁(yè)面返回的響應(yīng)體數(shù)據(jù)進(jìn)行結(jié)構(gòu)化吓坚。
這樣便于數(shù)據(jù)在pipline階段時(shí),給不同的存儲(chǔ)數(shù)據(jù)庫(kù)復(fù)用灯荧。
我們可以在items.py這個(gè)腳本中定義item的具體信息:
class BkdoneItem(scrapy.Item):
# 主鍵鏈家編號(hào)
data_id = scrapy.Field()
# 數(shù)據(jù)源來(lái)源
data_source = scrapy.Field()
# 成交日期
done_date = scrapy.Field()
# 數(shù)據(jù)來(lái)源鏈接
detail_url = scrapy.Field()
# 交易總價(jià)
total_price = scrapy.Field()
# 交易均價(jià)
per_price = scrapy.Field()
## 基本信息
# 所在街道
segment = scrapy.Field()
# 所在區(qū)縣
part = scrapy.Field()
def get_sql(self):
rows_list = ('data_id','data_source','done_date','detail_url', 'total_price', 'per_price','segment','part')
insert_sql = "INSERT INTO src_hz_done_house%s VALUES%s;" % (str(rows_list).replace("'", ""), ('%s',) * len(rows_list))
return insert_sql,rows_list
......
回到腳本文件中,將定義的items引入文件盐杂,并創(chuàng)建一個(gè)items實(shí)例逗载,將對(duì)應(yīng)的數(shù)據(jù)填充進(jìn)去, 填充完以后可以通過(guò)yield將這個(gè)item傳給pipeline這一層:
from BKSpider.items import BkItemLoader,BkdoneItem
......
def parse_detail(self, response):
Bk_item = BkdoneItem()
......
Bk_item['title'] = title
......
yield Bk_item
考慮到url一般是不定長(zhǎng)的, scrapy為了實(shí)現(xiàn)不重復(fù)讀取頁(yè)面, 需要維護(hù)一下已下載頁(yè)面的列表, 如果這個(gè)列表里保存的是原始url的話, 會(huì)占用很大的內(nèi)存, 所以我們可以將原url用散列函數(shù)進(jìn)行處理, 我們可以寫(xiě)一個(gè)common模塊用于編寫(xiě)常用的全局函數(shù), 第一個(gè)函數(shù)就用于URL的壓縮好了:
import hashlib
def get_md5(url):
if isinstance(url,str):
url = url.encode("utf-8")
m = hashlib.md5()
m.update(url)
return m.hexdigest()
配置pipelines.py文件
pipelines這一層用于接收item中的數(shù)據(jù), 并最終定義數(shù)據(jù)的存儲(chǔ)方式, 使用pipeline時(shí)需要先打開(kāi)settings.py文件, 該文件中有一段默認(rèn)被注釋的代碼需要激活。
ITEM_PIPELINES = {
'BKSpider.pipelines.BkspiderPipeline': 300,
}
1. 圖片下載配置
我們之前存儲(chǔ)的圖片url, 也可以通過(guò)設(shè)置實(shí)現(xiàn)圖片的自動(dòng)下載, 我們需要在pipelines中添加一個(gè)scrapy自帶的圖片下載pipeline:
import os
......
project_dir = os_path_abspath(os_path.dirname(__file__))
......
ITEM_PIPELINES = {
'BKSpider.pipelines.BkspiderPipeline': 300,
'scrapy.pipelines.images.ImagesPipeline':1
}
IMAGES_URLS_FILED = "front_image_url" //這個(gè)字段名需要和item中的字段名一致,且對(duì)應(yīng)item的值需要改成列表形式
IMAGES_STORE = os_path_join(project_dir,"images")
IMAGES_MIN_HEIGHT = 100
IMAGES_MIN_WIDTH = 100
item_piplines字典中的value對(duì)應(yīng)的是pipeline的執(zhí)行順序, 序號(hào)越小的管道越早執(zhí)行链烈。
images_urls_filed這個(gè)變量中存儲(chǔ)的是item中需要自動(dòng)下載的圖片的URL, pipeline會(huì)從item中選擇這個(gè)字段名進(jìn)行自動(dòng)下載厉斟。
images_store這個(gè)變量存儲(chǔ)的是自動(dòng)下載的圖片存放的目錄路徑。
settings.py中設(shè)置的IMAGES_MIN_HEIGHT和IMAGES_MIN_WIDTH這兩個(gè)變量用于過(guò)濾小圖片强衡。我們下載了圖片, 但是還需要想個(gè)辦法獲得圖片在載到本地的存儲(chǔ)路徑, 我們可以重寫(xiě)一個(gè)pipeline對(duì)象來(lái)實(shí)現(xiàn)這個(gè)功能, 在pipeline.py中重寫(xiě)一個(gè)繼承ImagesPipeline的類:
from scrapy.pipelines.images import ImagesPipeline
class ArticleImagePipeline(ImagesPipeline):
def item_completed(self, results, item, info):
for ok,value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
在這個(gè)重寫(xiě)的ImagePipelines中, 我們可以從item_completed這個(gè)方法中拿到圖片存儲(chǔ)的路徑擦秽。為了使這個(gè)類生效, 我們需要將重寫(xiě)的ImagePipeline在settings.py中替換掉原來(lái)的ImagePipeline:
ITEM_PIPELINES = {
'BKSpider.pipelines.BkspiderPipeline': 300,
'ArticleSpider.pipelines.ArticleImagePipeline':1
}
2. 將數(shù)據(jù)保存到j(luò)son本地文件
scrapy提供多種格式文件存儲(chǔ)數(shù)據(jù), 這里我們主要介紹用json文件的存儲(chǔ), 這里我們主要用到scrapy.exporters里的JsonItemExporter實(shí)現(xiàn):
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#調(diào)用scrapy提供的json export導(dǎo)出
def __init__(self):
self.file = open('articleexport.json','wb')
self.exporter = JsonItemExporter(self.file,encoding='utf-8',ensure_ascii=False)
self.exporter.start_exporting()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
def close_spider(self,spider):
self.exporter.finish_exporting()
self.file.close()
當(dāng)然, 也需要在settings.py中進(jìn)行相應(yīng)的配置:
ITEM_PIPELINES = {
'ArticleSpider.pipelines.JsonExporterPipleline': 2,
'ArticleSpider.pipelines.ArticleImagePipeline':1
}
3. 將數(shù)據(jù)保存到MySQL
用數(shù)據(jù)庫(kù)存儲(chǔ)數(shù)據(jù)是比較常用的方法, 這里我們先在MySQL中根據(jù)items的數(shù)據(jù)結(jié)構(gòu)建立相應(yīng)的數(shù)據(jù)表, 然后我們也要確保我們的語(yǔ)言環(huán)境中已經(jīng)安裝了MySQL的驅(qū)動(dòng):
pip install mysqlclient
linux下面安裝可能會(huì)報(bào)錯(cuò), 這時(shí)需要先安裝一些其它的包:
sudo apt-get install libmysqlclient-dev #(Ubuntu)
sudo yum install python-devel mysql-devel #(centOS)
這里我們介紹另一個(gè)兼容性更好的驅(qū)動(dòng):
pip install pymysql
在pipelines.py中代碼如下:
import pymysql
class MysqlPipeline(object):
def __init__(self):
self.conn = pymysql.connect(host='',port=3306,user='',password="",db="",charset='utf8')
self.cursor = self.conn.cursor()
def process_item(self,item,spider):
insert_sql = "INSERT INTO tablename(columname) VALUES(%s);" % item['title']
self.cursor.execute(insert_sql)
self.conn.commit()
之后也需要在settings.py中配置好。
4. MySQL優(yōu)化連接池
由于爬蟲(chóng)爬取速度會(huì)大于數(shù)據(jù)庫(kù)中插入數(shù)據(jù)的速度, 所以我們需要提高數(shù)據(jù)庫(kù)的使用性能, scrapy的解決思路是使用連接池, 讓爬取數(shù)據(jù)和MySQL的插入數(shù)據(jù)兩個(gè)任務(wù)變成異步漩勤。我們先在settings.py中配置好MySQL的連接信息:
MYSQL_HOST = ""
MYSQL_DBNAME = ""
MYSQL_USER = ""
MYSQL_PASSWORD = ""
之后就可以在pipelines.py中添加一個(gè)新的pipeline:
import pymysql
from twisted.enterprise import adbapi
class MysqlTwistedPipeline(object):
def __init__(self,dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls,settings):
dbparams = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DB"],
user = settings["MYSQL_USER"],
password = settings["MYSQL_PASSEORD"],
charset = "utf8",
cursorclass = pymysql.cursors.DictCursor,
use_unicode = True
)
dbpool = adbapi.ConnectionPool(dbapiName="pymysql",**dbparams)
return cls(dbpool)
def process_item(self,item,spider):
#異步處理獲取和插入
query = self.dbpool.runInteraction(self.do_insert,item)
query.addErrback(self.handle_error,item,spider)
def do_insert(self,cursor,item):
#這里cursor參數(shù)直接從類中獲得
insert_sql, rows_list = item.get_sql()
cursor.execute(insert_sql % tuple(map(item.get, rows_list)))
def handle_error(self, failure,item,spider):
# 處理異常
print(failure)
最后感挥,settings也要做配置:
ITEM_PIPELINES = {
'BKSpider.pipelines.MysqlTwistedPipeline': 300,
}
Item loader機(jī)制
item loader的使用能提高代碼的解析效率, 節(jié)約內(nèi)存。我們先在spider腳本中引入item loader:
from scrapy.loader import ItemLoader
然后我們使用Item loader改寫(xiě)我們的parse_detail函數(shù):
from BKSpider.items import BkItemLoader,BkdoneItem
# 第四層獲取單個(gè)房屋交易明細(xì)數(shù)據(jù)
def parse_detail(self, response):
item_loader = BkItemLoader(item=BkdoneItem(), response=response)
# 主鍵-鏈家編號(hào)
item_loader.add_css("data_id", "div.transaction > div.content > ul > li:nth-child(1)::text")
# 數(shù)據(jù)源
item_loader.add_value("data_source", 'lianjia')
# 成交日期
done_date = response.meta.get("unity_done_date", "").replace(".", "-")
item_loader.add_value("done_date", done_date)
# 數(shù)據(jù)來(lái)源鏈接
item_loader.add_value("detail_url", response.url)
# 交易總價(jià)
item_loader.add_css("total_price", "div.price > span > i::text")
# 交易均價(jià)
item_loader.add_css("per_price", "div.price > b::text")
# 基本信息
# 所在街道
item_loader.add_value("segment", response.meta.get("segment_name", ""))
# 所在區(qū)縣
item_loader.add_value("part", response.meta.get("part_name", ""))
yield item_loader.load_item()
還有兩個(gè)問(wèn)題:
一越败、上述代碼實(shí)際上item中各字段對(duì)應(yīng)的值都是list.
二触幼、假如用css解析出來(lái)的數(shù)據(jù), 我們希望先預(yù)處理以后再傳入item_loader, 該怎樣實(shí)現(xiàn)呢?
我們可以在items.py中處理:
from scrapy.loader.processors import MapCompose,TakeFirst,Join
func1(value):
....
func2(value):
....
class ZirudoneItem(scrapy.Item)::
done_date = scrapy.Field(
input_processor = MapCompose(func1,func2),
output_processor = TakeFirst()
)
front_image_url = scrapy.Field(
input_processor = MapCompose(func1,func2),
output_processor = Join(',')
)
......
其實(shí)每一個(gè)scrapy.Field中都有兩個(gè)參數(shù):
第一個(gè)是input_processor, 這個(gè)參數(shù)可以定義一個(gè)預(yù)處理函數(shù), 對(duì)傳入item的字段進(jìn)行預(yù)處理, 我們也可以使用MapCompose, 實(shí)現(xiàn)對(duì)傳入的數(shù)據(jù)連續(xù)使用多個(gè)函數(shù)進(jìn)行預(yù)處理;
第二個(gè)是output_processor, 這個(gè)參數(shù)和TakeFirst()搭配使用, 可以使輸出的值只取list中的第一個(gè)值。
另外, 還有有一個(gè)Join函數(shù), 可以將list拼接成str, 例如tag傳入的值是['a','b','c'], 則輸出的值是"a,b,c"究飞。
如果我們有很多字段, 每個(gè)都要定義這兩個(gè)參數(shù), 就會(huì)顯得很冗余, 這里我們有一個(gè)技巧:
from scrapy.loader import ItemLoader
......
class ZiruItemLoader(ItemLoader):
default_input_processor = MapCompose(pre_str)
default_output_processor = TakeFirst()
我們可以在items中重寫(xiě)ItemLoader這個(gè)類, 設(shè)置output_processor的默認(rèn)值, 然后在spider腳本中用重寫(xiě)后的ItemLoader替換之前使用的ItemLoader置谦。