介紹
這次爬取的是實時空氣污染指數(shù)(AQI)。關(guān)于這個項目的介紹可以參考聯(lián)系世界的空氣質(zhì)量指數(shù)項目團隊鲸拥,對我而言僧免,它是一個能夠比較準確的提供空氣污染指數(shù)。這個網(wǎng)站也提供了API用來獲得數(shù)據(jù)撞叨,但是請求數(shù)量有限制,不得超過16PRS牵敷。后面發(fā)現(xiàn)中國的監(jiān)測點有2534個,因此16PRS是遠遠不夠的姐军,所以我選擇將監(jiān)測點的目錄抓下來尖淘,然后自己訪問每個監(jiān)測點頁面并抓取數(shù)據(jù)。
監(jiān)測點目錄獲取
監(jiān)測點目錄在URL: http://aqicn.org/city/all/cn/上惊暴,通過正則抓取辽话,不贅述了卫病。
import re
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
def db_init():
req = requests.ohRequests()
content = req.get("http://aqicn.org/city/all/cn/")
pattern = re.compile("中國</div><br>(.*?)五家渠農(nóng)水大廈</a>", re.S)
data = pattern.findall(content)[0] + "五家渠農(nóng)水大廈</a>"
soup = BeautifulSoup(data, 'lxml')
links = soup.find_all('a')
with sqlite.ohSqlite3(DB_NAME) as db:
db.execute("CREATE TABLE aqicn (location text, url text)")
for link in links:
db.execute("INSERT INTO aqicn VALUES (?,?)", (link.text, link.get('href'),))
db.execute("DELETE FROM aqicn WHERE location = ' '")
if __name__ == "__main__":
db_init()
單線程抓取
上一節(jié)提到的監(jiān)測點總共有2534個,對應(yīng)了2534條URL益咬,如果通過遍歷的方式抓個抓取幽告,抓取函數(shù)如下:
import re
import time
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
def parser_single(location, url):
req = requests.ohRequests()
content = req.get(url)
pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
data = pattern.findall(content)
if data:
data = "<table class='api' {} </table>".format(data[0])
soup = BeautifulSoup(data, 'lxml')
aqi = soup.find(id='aqiwgtvalue').text
if aqi == '-':
return None
t = soup.find(id='aqiwgtutime').get('val')
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
return [location, aqi, t]
最后程序跑了1463s裆甩,將近24min。
使用協(xié)程
理論上使用多進程也是可以的冻河,這里我使用了協(xié)程茉帅,代碼如下:
import re
import time
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import ohSqlite3 as sqlite
import ohRequests as requests
async def parser():
req = requests.ohRequests()
while URLS_LIST:
url = URLS_LIST.pop(0)
header = {'user-agent': req.faker_user_agent()}
async with aiohttp.ClientSession() as session:
async with session.get(url[1], headers=header) as response:
content = await response.text()
pattern = re.compile('<table class=\'api\'(.*?)</table>', re.S)
data = pattern.findall(content)
if not data:
print ("Something is wrong. Might be station removed:[{}]({})".format(url[0], url[1]))
continue
data = "<table class='api' {} </table>".format(data[0])
soup = BeautifulSoup(data, 'lxml')
aqi = soup.find(id='aqiwgtvalue').text
if aqi == '-':
print ("No Data:[{}]({})".format(url[0], url[1]))
continue
t = soup.find(id='aqiwgtutime').get('val')
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(t)))
print ([url[0], aqi, t])
def main():
global URLS_LIST
req = requests.ohRequests()
with sqlite.ohSqlite3(DB_NAME) as db:
URLS_LIST = db.execute("select * from aqicn")
coroutine_cnts = 10
t = time.time()
coros = []
loop = asyncio.get_event_loop()
for i in range(coroutine_cnts):
coros.append(parser())
loop.run_until_complete(asyncio.gather(*coros))
print ("Total {}s".format(time.time()-t))
if __name__ == "__main__":
main()
協(xié)程的數(shù)量不能太高摔敛,雖然IP不會被封全封,但是會導(dǎo)致大量的失敗請求桃犬。我猜想這個網(wǎng)站其實用的就是它們提供的數(shù)據(jù)API攒暇,因此對它的訪問是受到16PRS的限制的子房。
使用10條協(xié)程時形用,執(zhí)行的時間為1160s田度;50條協(xié)程時解愤,時間為321s。效果還是很明顯的奸笤。
一個不難理解的事實是,協(xié)程不斷增加不代表時間會越來越短监右,達到某個閾值的時候异希,時間就不會再變了。這是因為當?shù)谝粋€協(xié)程收到響應(yīng)時味榛,本來應(yīng)該醒來繼續(xù)執(zhí)行,但是因為協(xié)程數(shù)量過多,導(dǎo)致其不會立刻蘇醒善茎,就等于繼續(xù)阻塞了。如果無限增加協(xié)程數(shù)量烁焙,一方面會導(dǎo)致資源消耗增大耕赘,另一方面也會導(dǎo)致性能下降。
結(jié)果比較
序號 | 使用方法 | 執(zhí)行時間(s) |
---|---|---|
1 | 單線程 | 1463 |
2 | 10條協(xié)程 | 1160 |
3 | 50條協(xié)程 | 321 |