博客首發(fā)于www.litreily.top
應(yīng)一位金融圈的朋友所托添祸,幫忙寫個爬蟲桶至,幫他爬取中國期貨行業(yè)協(xié)議網(wǎng)站中所有金融機(jī)構(gòu)的從業(yè)人員信息狡相。網(wǎng)站數(shù)據(jù)的獲取本身比較簡單后专,但是為了學(xué)習(xí)一些新的爬蟲方法和技巧呀癣,即本文要講述的生產(chǎn)者消費(fèi)者模型美浦,我又學(xué)習(xí)了一下Python中隊(duì)列庫queue
及線程庫Thread
的使用方法。
生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型非常簡單项栏,相信大部分程序員都知道浦辨,就是一方作為生產(chǎn)者不斷提供資源,另一方作為消費(fèi)者不斷消費(fèi)資源沼沈。簡單點(diǎn)說流酬,就好比餐館的廚師和顧客,廚師作為生產(chǎn)者不斷制作美味的食物列另,而顧客作為消費(fèi)者不斷食用廚師提供的食物芽腾。此外,生產(chǎn)者與消費(fèi)者之間可以是一對一页衙、一對多摊滔、多對一和多對多的關(guān)系。
那么這個模型和爬蟲有什么關(guān)系呢店乐?其實(shí)艰躺,爬蟲可以認(rèn)為是一個生產(chǎn)者,它不斷從網(wǎng)站爬取數(shù)據(jù)眨八,爬取到的數(shù)據(jù)就是食物腺兴;而所得數(shù)據(jù)需要消費(fèi)者進(jìn)行數(shù)據(jù)清洗,把有用的數(shù)據(jù)吸收掉廉侧,把無用的數(shù)據(jù)丟棄页响。
在實(shí)踐過程中,爬蟲爬取和數(shù)據(jù)清洗分別對應(yīng)一個Thread
伏穆,兩個線程之間通過順序隊(duì)列queue
傳遞數(shù)據(jù)拘泞,數(shù)據(jù)傳遞過程就好比餐館服務(wù)員從廚房把食物送到顧客餐桌上的過程。爬取線程負(fù)責(zé)爬取網(wǎng)站數(shù)據(jù)枕扫,并將原始數(shù)據(jù)存入隊(duì)列陪腌,清洗線程從隊(duì)列中按入隊(duì)順序讀取原始數(shù)據(jù)并提取出有效數(shù)據(jù)。
以上便是對生產(chǎn)者消費(fèi)者模型的簡單介紹了,下面針對本次爬取任務(wù)予以詳細(xì)說明诗鸭。
分析站點(diǎn)
http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfo
我們要爬取的數(shù)據(jù)是主頁顯示的表格中所有期貨公司的從業(yè)人員信息染簇,每個公司對應(yīng)一個機(jī)構(gòu)編號(G01001~G01198
)。從上圖可以看到有主頁有分頁强岸,共8頁锻弓。以G01001
方正中期期貨公司為例,點(diǎn)擊該公司名稱跳轉(zhuǎn)至對應(yīng)網(wǎng)頁如下:
從網(wǎng)址及網(wǎng)頁內(nèi)容可以提取出以下信息:
- 網(wǎng)址
-
http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo
-
organid
: 機(jī)構(gòu)編號蝌箍,+G01001+
~+G01198+
-
currentPage
: 該機(jī)構(gòu)從業(yè)人員信息當(dāng)前頁面編號 -
pageSize
: 每個頁面顯示的人員個數(shù)青灼,默認(rèn)20 -
selectType
: 固定為personinfo
-
- 機(jī)構(gòu)名稱
mechanism_name
,在每頁表格上方可以看到當(dāng)前機(jī)構(gòu)名稱 - 從業(yè)人員信息妓盲,即每頁的表格內(nèi)容杂拨,也是我們要爬取的對象
- 該機(jī)構(gòu)從業(yè)人員信息總頁數(shù)
page_cnt
我們最終爬取的數(shù)據(jù)可以按機(jī)構(gòu)名稱存儲到對應(yīng)的txt文件或excel文件中。
獲取機(jī)構(gòu)名稱
獲取到某機(jī)構(gòu)的任意從業(yè)信息頁面后悯衬,使用BeautifulSoup
可快速提取機(jī)構(gòu)名稱弹沽。
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
那么有人可能會問,既然主頁表格都已經(jīng)包含了所有機(jī)構(gòu)的編號和名稱筋粗,為何還要多此一舉的再獲取一次呢策橘?這是因?yàn)椋覊焊筒幌肱乐黜摰哪切┍砀衲纫冢苯痈鶕?jù)機(jī)構(gòu)編號的遞增規(guī)律生成對應(yīng)的網(wǎng)址即可丽已,所以獲取機(jī)構(gòu)名稱的任務(wù)就放在了爬取每個機(jī)構(gòu)首個信息頁面之后。
獲取機(jī)構(gòu)信息對應(yīng)的網(wǎng)頁數(shù)量
每個機(jī)構(gòu)的數(shù)據(jù)量是不等的暇唾,幸好每個頁面都包含了當(dāng)前頁面數(shù)及總頁面數(shù)促脉。使用以下代碼即可獲取頁碼數(shù)辰斋。
url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
從每個機(jī)構(gòu)首頁獲取頁碼數(shù)后策州,便可for
循環(huán)修改網(wǎng)址參數(shù)中的currentPage
,逐頁獲取機(jī)構(gòu)信息宫仗。
獲取當(dāng)前頁面從業(yè)人員信息
針對如上圖所示的一個特定信息頁時够挂,人員信息被存放于一個表中,除了固定的表頭信息外藕夫,人員信息均被包含在一個帶有id
的tr
標(biāo)簽中孽糖,所以使用BeautifulSoup
可以很容易提取出頁面內(nèi)所有人員信息。
soup.find_all('tr', id=True)
確定爬取方案
一般的想法當(dāng)然是逐頁爬取主頁信息毅贮,然后獲取每頁所有機(jī)構(gòu)對應(yīng)的網(wǎng)頁鏈接办悟,進(jìn)而繼續(xù)爬取每個機(jī)構(gòu)信息。
但是由于該網(wǎng)站的機(jī)構(gòu)信息網(wǎng)址具有明顯的規(guī)律滩褥,我們根據(jù)每個機(jī)構(gòu)的編號便可直接得到每個機(jī)構(gòu)每個信息頁面的網(wǎng)址病蛉。所以具體爬取方案如下:
- 將所有機(jī)構(gòu)編號網(wǎng)址存入隊(duì)列
url_queue
- 新建生產(chǎn)者線程
SpiderThread
完成抓取任務(wù)
- 循環(huán)從隊(duì)列
url_queue
中讀取一個編號,生成機(jī)構(gòu)首頁網(wǎng)址,使用requests
抓取之 - 從抓取結(jié)果中獲取頁碼數(shù)量铺然,若為0俗孝,則返回該線程第1步
- 循環(huán)爬取當(dāng)前機(jī)構(gòu)剩余頁面
- 將頁面信息存入隊(duì)列
html_queue
- 新建消費(fèi)者線程
DatamineThread
完成數(shù)據(jù)清洗任務(wù)
- 循環(huán)從隊(duì)列
html_queue
中讀取一組頁面信息 - 使用
BeautifulSoup
提取頁面中的從業(yè)人員信息 - 將信息以二維數(shù)組形式存儲,最后交由數(shù)據(jù)存儲類
Storage
存入本地文件
代碼實(shí)現(xiàn)
生成者SpiderThread
爬蟲線程先從隊(duì)列獲取一個機(jī)構(gòu)編號魄健,生成機(jī)構(gòu)首頁網(wǎng)址并進(jìn)行爬取赋铝,接著判斷機(jī)構(gòu)頁面數(shù)量是否為0,如若不為0則繼續(xù)獲取機(jī)構(gòu)名稱沽瘦,并根據(jù)頁面數(shù)循環(huán)爬取剩余頁面革骨,將原始html數(shù)據(jù)以如下dict
格式存入隊(duì)列html_queue
:
{
'name': mechanismId_mechanismName,
'num': currentPage,
'content': html
}
爬蟲產(chǎn)生的數(shù)據(jù)隊(duì)列html_queue
將由數(shù)據(jù)清洗線程進(jìn)行處理,下面是爬蟲線程的主程序析恋,整個線程代碼請看后面的源碼苛蒲。
def run(self):
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
消費(fèi)者DatamineThread
數(shù)據(jù)清洗線程比較簡單,就是從生產(chǎn)者提供的數(shù)據(jù)隊(duì)列html_queue
逐一提取html
數(shù)據(jù)绿满,然后從html
數(shù)據(jù)中提取從業(yè)人員信息臂外,以二維數(shù)組形式存儲,最后交由存儲模塊Storage
完成數(shù)據(jù)存儲工作喇颁。
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
數(shù)據(jù)存儲Storage
我寫了兩類文件格式的存儲函數(shù)漏健,write_txt
, write_excel
,分別對應(yīng)txt
,excel
文件橘霎。實(shí)際存儲時由調(diào)用方確定文件格式蔫浆。
def save(self, data):
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
存入txt文件
存入txt
文件是比較簡單的,就是以附加(a
)形式打開文件姐叁,寫入數(shù)據(jù)瓦盛,關(guān)閉文件。其中外潜,文件名稱由調(diào)用方提供原环。寫入數(shù)據(jù)時,每個人員信息占用一行处窥,以制表符\t
分隔嘱吗。
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
存入Excel文件
存入Excel
文件還是比較繁瑣的,由于經(jīng)驗(yàn)不多滔驾,選用的是xlwt
, xlrd
和xlutils
庫谒麦。說實(shí)話,這3個庫真心不大好用哆致,勉強(qiáng)完成任務(wù)而已绕德。為什么這么說,且看:
- 修改文件麻煩:
xlwt
只能寫,xlrd
只能讀摊阀,需要xlutils
的copy
函數(shù)將xlrd
讀取的數(shù)據(jù)復(fù)制到內(nèi)存耻蛇,再用xlwt
修改 - 只支持
.xls
文件:.xlsx
經(jīng)讀寫也會變成.xls
格式 - 表格樣式易變:只要重新寫入文件剩瓶,表格樣式必然重置
所以后續(xù)我肯定會再學(xué)學(xué)其它的excel
庫,當(dāng)然城丧,當(dāng)前解決方案暫時還用這三個延曙。代碼如下:
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
說明:
- 一個文件對應(yīng)一個機(jī)構(gòu)的數(shù)據(jù),需要多次讀取和寫入亡哄,所以需要計(jì)算文件寫入時的行數(shù)偏移量
offset
枝缔,即當(dāng)前文件已包含數(shù)據(jù)的行數(shù) - 當(dāng)被寫入文件被人為打開時,會出現(xiàn)
PermissionError
異常蚊惯,可以在捕獲該異常然后提示錯誤信息愿卸,并定時等待直到文件被關(guān)閉。
main
主函數(shù)用于創(chuàng)建和啟動生產(chǎn)者線程和消費(fèi)者線程截型,同時為生產(chǎn)者線程提供機(jī)構(gòu)編號隊(duì)列趴荸。
url_queue = queue.Queue()
html_queue = queue.Queue()
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
從主函數(shù)可以看到,兩個隊(duì)列都調(diào)用了join
函數(shù)宦焦,用于阻塞发钝,直到對應(yīng)隊(duì)列為空為止。要注意的是波闹,隊(duì)列操作中酝豪,每個出隊(duì)操作queue.get()
需要對應(yīng)一個queue.task_done()
操作,否則會出現(xiàn)隊(duì)列數(shù)據(jù)已全部處理完精堕,但主線程仍在執(zhí)行的情況孵淘。
至此,爬蟲的主要代碼便講解完了歹篓,下面是完整源碼瘫证。
源碼
#!/usr/bin/python3
# -*-coding:utf-8-*-
import queue
from threading import Thread
import requests
import re
from bs4 import BeautifulSoup
import os
import platform
import xlwt
from xlrd import open_workbook
from xlutils.copy import copy
import time
# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
#
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}
url_queue = queue.Queue()
html_queue = queue.Queue()
class SpiderThread(Thread):
"""Threaded Url Grab"""
def __init__(self, url_queue, html_queue):
Thread.__init__(self)
self.url_queue = url_queue
self.html_queue = html_queue
self.page_size = 20
self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}
def __get_url(self, mechanism_id, current_page):
return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+¤tPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \
% (mechanism_id, current_page, self.page_size)
def grab(self, url):
'''Grab html of url from web'''
while True:
try:
html = requests.get(url, headers=self.headers, timeout=20)
if html.status_code == 200:
break
except requests.exceptions.ConnectionError as e:
print(url + ' Connection error, try again...')
except requests.exceptions.ReadTimeout as e:
print(url + ' Read timeout, try again...')
except Exception as e:
print(str(e))
finally:
pass
return html
def run(self):
'''Grab all htmls of mechanism one by one
Steps:
1. grab first page of each mechanism from url_queue
2. get number of pages and mechanism name from first page
3. grab all html file of each mechanism
4. push all html to html_queue
'''
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
class Storage():
def __init__(self, filename, filetype):
self.filetype = filetype
self.filename = filename + filetype
self.table_header = ('姓名', '性別', '從業(yè)資格號', '投資咨詢從業(yè)證書號', '任職部門', '職務(wù)', '任現(xiàn)職時間')
self.path = self.__get_path()
def __get_path(self):
path = {
'Windows': 'D:/litreily/Documents/python/cfachina',
'Linux': '/mnt/d/litreily/Documents/python/cfachina'
}.get(platform.system())
if not os.path.isdir(path):
os.makedirs(path)
return '%s/%s' % (path, self.filename)
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
def save(self, data):
'''Write data to local file.
According filetype to choose function to save data, filetype can be '.txt'
or '.xls', but '.txt' type is saved more faster then '.xls' type
Args:
data: a 2d-list array that need be save
'''
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
if __name__ == '__main__':
main()
爬取測試
寫在最后
- 測試發(fā)現(xiàn),寫入
txt
的速度明顯高于寫入excel
的速度 - 如果將頁面網(wǎng)址中的
pageSize
修改為1000
或更大庄撮,則可以一次性獲取某機(jī)構(gòu)的所有從業(yè)人員信息背捌,而不用逐頁爬取,效率可以大大提高重窟。 - 該爬蟲已托管至github Python-demos