本章介紹了從Python3.2以來引入的concurrent.futures
模塊,阻塞性I/O與GIL拇惋,以及期物的概念勋功。
這一章給的示例代碼中的網(wǎng)頁http://flupy.org/data/flags目前無法訪問,這里就不再復(fù)制無法執(zhí)行的示例代碼了米绕,反正在這里也可直接查看垢村。
作為看完后的應(yīng)用割疾,試著對一段實際代碼進行改進。
下面是一段通過訪問網(wǎng)易財經(jīng)的接口得到上證和深證股票歷史數(shù)據(jù)嘉栓,寫入csv文件的程序
# -*- coding: utf-8 -*-
import csv
import datetime
import requests
from lxml import etree
now = datetime.datetime.now()
today = now.strftime('%Y%m%d')
before = now + datetime.timedelta(-1)
yesterday = before.strftime('%Y%m%d')
def getHTMLtext(url, param={}, code='ANSI'):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'}
try:
r = requests.get(url, param, headers=headers)
r.encoding = code
r.raise_for_status()
return r.text
except:
return ''
def stockList(url, param={}):
stockSH, stockSZ = [], []
text = getHTMLtext(url, param)
html = etree.HTML(text)
for url in html.xpath('//a/@href'):
if 'http://quote.eastmoney.com/sh' in url:
stockSH.append(url.split('sh')[1][:6])
elif 'http://quote.eastmoney.com/sz' in url:
stockSZ.append(url.split('sz')[1][:6])
print('all stock number got!')
return stockSH, stockSZ
def stockInfo(lst, strnum):
# 依序下載宏榕,無并發(fā)
print('start get all stock history, will take a long time, please wait...')
L = []
for i in lst:
# code=0000001分開看,0是上證胸懈,1是深證担扑,000001是股票代碼
history_url = "http://quotes.money.163.com/service/chddata.html?code={0}&start={1}&end={2}&fields=TCLOSE;HIGH;LOW;TOPEN;LCLOSE;CHG;PCHG;VOTURNOVER;VATURNOVER".format(strnum+i, '20170801', yesterday)
perday = getHTMLtext(history_url).split('\r\n')
if len(perday) <= 2:
continue
perday.pop()
for day in perday[1:]:
L.append(day.split(','))
print('all stock info got!')
with open(today+'stock.csv', 'a+', newline='', encoding='gb18030') as file:
w = csv.writer(file)
w.writerows(L)
return L
def main():
stockList_url = "http://quote.eastmoney.com/stocklist.html"
SH, SZ = stockList(stockList_url)
stockInfo(SH, '0')
stockInfo(SZ, '1')
if __name__ == "__main__":
main()
由于是依序下載,速度比較慢趣钱,這段代碼要運行十分鐘以上才執(zhí)行完畢涌献。
那么,現(xiàn)在是利用本章內(nèi)容加速代碼運行的時候了首有。
很明顯燕垃,stockInfo
函數(shù)中的for
循環(huán)互相之間沒有聯(lián)系,可以改成函數(shù)以便并發(fā)調(diào)用井联。
# -*- coding: utf-8 -*-
import csv
import datetime
from concurrent import futures
import requests
from lxml import etree
MAX_WORKERS = 20
L = []
now = datetime.datetime.now()
today = now.strftime('%Y%m%d')
before = now + datetime.timedelta(-1)
yesterday = before.strftime('%Y%m%d')
def getHTMLtext(url, param={}, code='ANSI'):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'}
try:
r = requests.get(url, param, headers=headers)
r.encoding = code
r.raise_for_status()
return r.text
except:
return ''
def stockList(url, param={}):
stockSH, stockSZ = [], []
text = getHTMLtext(url, param)
html = etree.HTML(text)
for url in html.xpath('//a/@href'):
if 'http://quote.eastmoney.com/sh' in url:
stockSH.append(url.split('sh')[1][:6])
elif 'http://quote.eastmoney.com/sz' in url:
stockSZ.append(url.split('sz')[1][:6])
print('all stock number got!')
return stockSH, stockSZ
def stockInfo(i, num: str):
# code=0000001應(yīng)該分開看卜壕,0是上證,1是深證烙常,000001是股票代碼
history_url = "http://quotes.money.163.com/service/chddata.html?code={0}&start={1}&end={2}&fields=TCLOSE;HIGH;LOW;TOPEN;LCLOSE;CHG;PCHG;VOTURNOVER;VATURNOVER".format(num+i, '20170820', yesterday)
perday = getHTMLtext(history_url).split('\r\n')
if len(perday) > 2:
perday.pop()
for day in perday[1:]:
L.append(day.split(','))
return L
def write2CSV(lst: list):
with open(today+'stock.csv', 'a+', newline='', encoding='gb18030') as file:
w = csv.writer(file)
w.writerows(lst)
def downloadOne(i):
luca = stockInfo(i, '0')
write2CSV(luca)
# stockInfo(lst, '1') 沒有跑深證的股票
def downloadMany(lst: list):
workers = min(MAX_WORKERS, len(lst))
with futures.ThreadPoolExecutor(workers) as e:
res = e.map(downloadOne, sorted(lst))
return len(list(res))
def main():
stockList_url = "http://quote.eastmoney.com/stocklist.html"
SH, SZ = stockList(stockList_url)
downloadMany(SH)
# downloadMany(SZ)
if __name__ == "__main__":
main()
由于開了workers
個線程轴捎,速度近似提高了workers
倍,明顯感覺到一下子就執(zhí)行完畢了蚕脏。
事實上侦副,很多時候Python的多線程是無法提速的。這是因為CPython解釋器本身不是線程安全的驼鞭,因此存在全局解釋器鎖(GIL, global interpreter lock)秦驯,1次只允許1個線程執(zhí)行Python代碼,因此挣棕,1個Python進程通常不能使用多個CPU核心译隘。與Python語言本身無關(guān)亲桥,Jython等沒有這個限制。
不過固耘,標(biāo)準(zhǔn)庫中所有執(zhí)行阻塞型I/O操作的函數(shù)在等待操作系統(tǒng)返回結(jié)果時都會釋放GIL题篷,這意味著I/O密集型Python程序在多線程下可以正常運轉(zhuǎn):1個Python線程等待網(wǎng)絡(luò)響應(yīng)時,阻塞型I/O函數(shù)會釋放GIL玻驻,從而再運行1個線程悼凑。
包括 time.sleep()
函數(shù),即使sleep(0)
璧瞬,也會釋放GIL。
那么渐夸,如果是CPU密集型Python程序嗤锉,無法使用多線程時,怎么辦墓塌?
這時可以使用多進程瘟忱。
使用concurrent.futures
模塊啟動多進程非常簡單,只需要把
def downloadMany(lst: list):
workers = min(MAX_WORKERS, len(lst))
with futures.ThreadPoolExecutor(workers) as e:
res = e.map(downloadOne, sorted(lst))
改成
def downloadMany(lst: list):
with futures.ProcessPoolExecutor() as e:
res = e.map(downloadOne, sorted(lst))
就可以了苫幢。
ThreadPoolExecutor()
函數(shù)需要1個參數(shù)指定線程池中線程的數(shù)量访诱,而ProcessPoolExecutor()
函數(shù)通常不需要指定進程數(shù),默認是os.cpu_count()
函數(shù)返回的CPU數(shù)量韩肝,也可以自行指定其他值触菜,但對CPU密集型的處理而言,進程數(shù)不得超過CPU數(shù)量哀峻。
實際上涡相,對于I/O密集型程序,這個框架提供的多進程方式做不到進程間通信剩蟀,不會節(jié)省運行時間催蝗,只是同時在跑4個完全一樣的程序最終得到4組相同數(shù)據(jù)而已。要實現(xiàn)進程間通信應(yīng)該使用multiprocessing
模塊育特。
譯者把future
翻譯成期物丙号,future
是一種對象,表示異步執(zhí)行的操作缰冤,通常自己不應(yīng)該創(chuàng)建它犬缨,而是由并發(fā)框架實例化。
要創(chuàng)建的話锋谐,可以這樣寫
def downloadMany(lst: list):
workers = min(MAX_WORKERS, len(lst))
with futures.ThreadPoolExecutor(workers) as e:
to_do = []
for i in sorted(lst):
future = e.submit(downloadOne, i)
to_do.append(future)
results = []
for future in futures.as_completed(to_do):
res = future.result()
results.append(res)
return len(results)
其實這個函數(shù)的return
并沒有多大作用遍尺,發(fā)生異常的話會在此拋出而已。
上面有一點沒有提到涮拗,如何向map
函數(shù)傳入多個參數(shù)乾戏?不過網(wǎng)上有一堆教程迂苛,暫時就不寫了。