對一個網(wǎng)站過高頻率的爬取可能會導(dǎo)致ip被加入黑名單紧显,這樣以后對該網(wǎng)站的訪問都會被拒絕,這個時候只能利用代理ip了层皱。網(wǎng)上有大量的公開免費代理性锭,然而大多是不能用的,也可以購買付費代理使用叫胖,但是手動設(shè)置ip非常麻煩草冈,并且可能由于其他人爬取同樣站點而被封禁,或者代理服務(wù)器故障等原因?qū)е耰p不可用,手動篩選可用ip也是非常麻煩的事情怎棱。
為了解決這個問題方淤,我們可以維護一個代理池,定期自動檢查以剔除其中的不可用代理蹄殃,并不斷爬取新的代理投入使用,項目地址你踩。
1. 結(jié)構(gòu)設(shè)計
1.1 存儲模塊
代理應(yīng)該是不重復(fù)的诅岩,并且要標(biāo)識可用情況,比較好的方法是使用redis的有序集合存儲带膜,每一個元素都是一個代理吩谦,形式為 ip:端口號 ,此外,每一個元素都有一個分數(shù)膝藕,用來標(biāo)識可用情況式廷,分數(shù)規(guī)則設(shè)置為:
- 分數(shù)100為可用,定期檢測時代理可用就設(shè)置為100芭挽,不可用就將分數(shù)減1滑废,直到減為0后刪除
- 新獲取代理設(shè)置為10,測試可行立即置為100袜爪,否則減1
1.2 獲取模塊
定期從各大代理網(wǎng)站爬取代理蠕趁,免費付費都可以,成功之后保存到數(shù)據(jù)庫
1.3 測試模塊
負責(zé)定期檢測數(shù)據(jù)庫中的代理辛馆,可以設(shè)置檢測鏈接俺陋,檢測在對應(yīng)網(wǎng)站是否可用
1.4 接口模塊
提供一個web api來對外提供隨機的可用代理,用flask實現(xiàn)
2. 模塊實現(xiàn)
2.1 存儲模塊(db.py)
第三方庫:
- redis
import redis
from random import choice
# 分數(shù)設(shè)置
MAX_SCORE = 100
MIN_SCORE = 0
INIT_SCORE = 10
# 連接信息
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'
# 數(shù)據(jù)庫最大存儲的代理數(shù)量
POOL_UPPER_THRESHLD = 10000
class RedisClient:
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, passwd=REDIS_PASSWORD):
"""初始化redis對象"""
self.db = redis.Redis(host=host, port=port, password=passwd, decode_responses=True)
def add(self, proxy, score=INIT_SCORE):
"""添加一個代理昙篙,設(shè)置初始分數(shù)"""
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, {proxy: score})
def random(self):
"""首先隨機獲取最高分的有效代理腊状,不存在則按排名獲取"""
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
# 從分數(shù)前50的代理中隨機獲取一個
result = self.db.zrevrange(REDIS_KEY, 0, 50)
if len(result):
return choice(result)
else:
Exception('無可用代理')
def decrease(self, proxy):
"""代理分數(shù)-1,小于指定閾值則刪除"""
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print(proxy, score, '-1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print(proxy, score, '移除')
return self.db.zrem(REDIS_KEY, proxy)
def max(self, proxy):
"""更新代理分數(shù)到最大值"""
print(proxy, MAX_SCORE)
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""獲取代理數(shù)量"""
return self.db.zcard(REDIS_KEY)
def all(self):
"""獲取全部代理"""
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
2.2 獲取模塊(getter.py)
第三方庫:
- beautifulsoup4
- requests
from bs4 import BeautifulSoup
from db import RedisClient, POOL_UPPER_THRESHLD
# requests獲取一個網(wǎng)頁的方法封裝到了一個工具類里
from utils import get_page
class ProxyMetaClass(type):
"""元類苔可,初始化類時獲取所有以crawl_開頭的方法"""
def __new__(mcs, name, bases, attrs):
count = 0
attrs['CrawlFunc'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['CrawlFunc'].append(k)
count += 1
attrs['CrawlFuncCount'] = count
return type.__new__(mcs, name, bases, attrs)
class Crawler(metaclass=ProxyMetaClass):
def get_proxies(self, crawl_func):
"""執(zhí)行指定方法來獲取代理"""
proxies = []
for proxy in eval("self.{}()".format(crawl_func)):
proxies.append(proxy)
return proxies
def crawl_kuaidaili(self, page_count=10):
"""獲取快代理網(wǎng)站的免費代理"""
start_url = 'https://www.kuaidaili.com/free/inha/{}/'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
html = get_page(url)
if html:
soup = BeautifulSoup(html, 'lxml')
trs = soup.find('tbody').find_all('tr')
for tr in trs:
ip = tr.find_all('td')[0].string
port = tr.find_all('td')[1].string
yield ':'.join([ip, port])
class Getter:
def __init__(self):
"""初始化數(shù)據(jù)庫類和代理爬蟲類"""
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""判斷數(shù)據(jù)庫是否已經(jīng)存滿"""
if self.redis.count() >= POOL_UPPER_THRESHLD:
return True
return False
def run(self):
"""開始抓取各個代理網(wǎng)站的免費代理存入數(shù)據(jù)庫"""
print('開始獲取...')
if not self.is_over_threshold():
for i in range(self.crawler.CrawlFuncCount):
crawl_func = self.crawler.CrawlFunc[i]
proxies = self.crawler.get_proxies(crawl_func)
for proxy in proxies:
print(proxy)
self.redis.add(proxy)
if __name__ == '__main__':
a = Getter()
a.run()
這里借助元類實現(xiàn)了自動定義屬性 CrawlFunc 來記錄Crawler類中所有以 crawl_ 開頭的方法缴挖,CrawlFuncCount 記錄這些方法的數(shù)量,這樣以后有新的代理網(wǎng)站可以獲取代理時硕蛹,只需要添加 crawl_XXX 方法以相同的方式返回解析后的數(shù)據(jù)就行了醇疼, get_proxies 會依次執(zhí)行傳入的方法列表中的方法, 可以非常方便的擴展法焰。
然后就是實現(xiàn)Getter類來管理存儲部分秧荆,run方法中先判斷是否到達存儲閾值,未滿則通過 crawler.CrawlFunc 獲取所有爬取方法交給 get_proxies 去執(zhí)行埃仪,然后將返回結(jié)果存入數(shù)據(jù)庫乙濒。這樣存儲部分的工作就完成了,實例化Getter類并調(diào)用run方法即可爬取代理并存入數(shù)據(jù)庫。
2.3 測試模塊
第三方庫:
- aiohttp
import asyncio
import aiohttp
import time
from db import RedisClient
# 目標(biāo)網(wǎng)址
TEST_URL = 'http://www.baidu.com'
# 正確的響應(yīng)碼列表
TRUE_STATUS_CODE = [200]
# 同時測試一組代理的數(shù)量
BATCH_TEST_SIZE = 50
class Tester:
def __init__(self):
"""初始化數(shù)據(jù)庫管理對象"""
self.redis = RedisClient()
async def test_one_proxy(self, proxy):
"""對目標(biāo)網(wǎng)站測試一個代理是否可用"""
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
# 解碼為字符串
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
if response.status in TRUE_STATUS_CODE:
# 代理可用
self.redis.max(proxy)
print(proxy, 100)
else:
# 代理不可用
self.redis.decrease(proxy)
print(proxy, -1, "狀態(tài)碼錯誤")
except Exception as e:
self.redis.decrease(proxy)
print(proxy, -1, e.args)
async def start(self):
"""啟動協(xié)程颁股, 測試所有代理"""
try:
proxies = self.redis.all()
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i: i+BATCH_TEST_SIZE]
tasks = [self.test_one_proxy(proxy) for proxy in test_proxies]
# 并發(fā)完成一批任務(wù)
await asyncio.gather(*tasks)
time.sleep(5)
except Exception as e:
print('測試器發(fā)生錯誤', e.args)
def run(self):
"""開始運行"""
asyncio.run(self.start())
# # python3.7之前的寫法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(self.start())
if __name__ == '__main__':
a = Tester()
a.run()
由于代理數(shù)量非常大么库,同步請求的requests很難高效率的完成這個任務(wù),我們使用異步請求庫aiohttp甘有,同時用到協(xié)程asyncio來成批的檢測代理诉儒。
測試網(wǎng)站這里設(shè)置為百度,使用時可以根據(jù)目標(biāo)不同設(shè)置對應(yīng)網(wǎng)站的網(wǎng)址亏掀。還定義了一個正確的狀態(tài)碼列表忱反,目前只有200,某些網(wǎng)站可能會有重定向等操作滤愕,可以自行添加狀態(tài)碼温算。
2.4 API模塊
第三方庫:
- flask
from flask import Flask, g
import json
from db import RedisClient
app = Flask(__name__)
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
return '<h2>hello</h2>'
@app.route('/get/')
def get_proxy():
"""獲取隨機可用代理"""
conn = get_conn()
try:
proxy = conn.random()
result = json.dumps({'status': 'success', 'proxy': proxy})
except Exception as e:
result = json.dumps({'status': 'failure', 'info': e})
finally:
return result
@app.route('/count/')
def get_count():
"""獲取代理總數(shù)"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run()
這里使用flask做了一個簡單的API,包含歡迎頁间影,獲取隨機代理頁和獲取代理總數(shù)頁注竿,獲取代理頁返回json格式的數(shù)據(jù)。
3. 整合運行
模塊寫好了魂贬,接下來只需要以多進程的方式將他們運行起來即可
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
# 周期
TESTER_CYCLE = 20
GETTER_CYCLE = 20
# 模塊開關(guān)
TESTER_ENABLE = True
GETTER_ENABLE = True
API_ENABLE = True
class Run:
def run_tester(self, cycle=TESTER_CYCLE):
"""定時檢測代理可用情況"""
tester = Tester()
while True:
print('開始測試')
tester.run()
time.sleep(cycle)
def run_getter(self, cycle=GETTER_CYCLE):
"""定時獲取代理"""
getter = Getter()
while True:
print('開始抓取代理')
getter.run()
time.sleep(cycle)
def run_api(self):
"""啟動API接口"""
app.run()
def run(self):
print('代理池開始運行')
if TESTER_ENABLE:
tester_process = Process(target=self.run_tester)
tester_process.start()
if GETTER_ENABLE:
getter_process = Process(target=self.run_getter)
getter_process.start()
if API_ENABLE:
api_process = Process(target=self.run_api)
api_process.start()
if __name__ == '__main__':
a = Run()
a.run()
運行此文件便可以開始整個代理池的運行:
4. 使用實例
import requests
import json
def get_proxy():
"""嘗試獲取代理"""
try:
r = requests.get('http://localhost:5000/get/')
r.raise_for_status
r.encoding = r.apparent_encoding
return r.text
except ConnectionError:
return None
r = get_proxy()
if r:
# 加載返回的json數(shù)據(jù)
data = json.loads(r)
if data['status'] == 'success':
proxy = data['proxy']
# 構(gòu)造代理字典巩割,根據(jù)請求鏈接不同自動設(shè)置
proxies = {
'http': 'http://' + proxy,
'https': 'http://' + proxy
}
try:
# 使用代理訪問網(wǎng)站
r = requests.get('http://www.baidu.com/', proxies=proxies)
r.status_code
r.encoding = r.apparent_encoding
print(r.text)
except Exception as e:
print(e.args)
else:
print(data['info'])
else:
print('未正常獲取代理')