2.Python與Java
1.Python相關(guān)
1.Python PEP介紹
1.PEP是什么闷袒?
引用鏈接: https://www.cnblogs.com/abella/p/10056875.html
PEP的全稱是Python Enhancement Proposals
瑰艘,其中Enhancement是增強改進的意思纵菌,Proposals則可譯為提案或建議書拯坟,所以合起來恬吕,比較常見的翻譯是Python增強提案
或Python改進建議書
傀广。
我個人傾向于前一個翻譯遣臼,因為它更貼切。Python核心開發(fā)者主要通過郵件列表討論問題场晶、提議混埠、計劃等,PEP通常是匯總了多方信息诗轻,經(jīng)過了部分核心開發(fā)者review和認可钳宪,最終形成的正式文檔,起到了對外公示的作用扳炬,所以我認為翻譯成“提案”更恰當吏颖。
PEP的官網(wǎng)是:https://www.python.org/dev/peps/,這也就是PEP 0 的地址恨樟。其它PEP的地址是將編號拼接在后面半醉,例如:https://www.python.org/dev/peps/pep-0020/ 就是PEP 20 的鏈接,以此類推
2.PEP狀態(tài)表示
I - Informational PEP
P - Process PEP
S - Standards Track PEP
信息類:這類PEP就是提供信息劝术,有告知類信息缩多,也有指導(dǎo)類信息等等。例如PEP 20(The Zen of Python养晋,即著名的Python之禪)衬吆、PEP 404 (Python 2.8 Un-release Schedule,即宣告不會有Python2.8版本)绳泉。
流程類:這類PEP主要是Python本身之外的周邊信息逊抡。例如PEP 1(PEP Purpose and Guidelines,即關(guān)于PEP的指南)圈纺、PEP 347(Migrating the Python CVS to Subversion秦忿,即關(guān)于遷移Python代碼倉)。
標準類:這類PEP主要描述了Python的新功能和新實踐(implementation)蛾娶,是數(shù)量最多的提案灯谣。例如我之前提到過的f-string方式,它出自PEP 498(Literal String Interpolation蛔琅,字面字符串插值)胎许。
每個PEP最初都是一個草案(Draft),隨后會經(jīng)歷一個過程罗售,因此也就出現(xiàn)了不同的狀態(tài)
A – Accepted (Standards Track only) or Active proposal 已接受(僅限標準跟蹤)或有效提案
D – Deferred proposal 延期提案
F – Final proposal 最終提案
P – Provisional proposal 暫定提案
R – Rejected proposal 被否決的提案
S – Superseded proposal 被取代的提案
W – Withdrawn proposal 撤回提案
PEP提案已經(jīng)累積產(chǎn)生了478個辜窑,我們并不需要對每個PEP都熟知,沒有必要寨躁。下面穆碎,我列舉了一些PEP,推薦大家一讀:
PEP 0 -- Index of Python Enhancement Proposals
PEP 7 -- Style Guide for C Code职恳,C擴展
PEP 8 -- Style Guide for Python Code所禀,編碼規(guī)范(必讀)
PEP 20 -- The Zen of Python方面,Python之禪
PEP 202 -- List Comprehensions,列表生成式
PEP 274 -- Dict Comprehensions色徘,字典生成式
PEP 234 -- Iterators恭金,迭代器
PEP 257 -- Docstring Conventions,文檔注釋規(guī)范
PEP 279 -- The enumerate() built-in function褂策,enumerate枚舉
PEP 282 -- A Logging System横腿,日志模塊
PEP 285 -- Adding a bool type,布爾值(建議閱讀《Python對象的身份迷思:從全體公民到萬物皆數(shù)》)
PEP 289 -- Generator Expressions斤寂,生成器表達式
PEP 318 -- Decorators for Functions and Methods耿焊,裝飾器
PEP 342 -- Coroutines via Enhanced Generators,協(xié)程
PEP 343 -- The "with" Statement扬蕊,with語句
PEP 380 -- Syntax for Delegating to a Subgenerator搀别,yield from語法
PEP 405 -- Python Virtual Environments,虛擬環(huán)境
PEP 471 -- os.scandir() function尾抑,遍歷目錄
PEP 484 -- Type Hints歇父,類型約束
PEP 492 -- Coroutines with async and await syntax,async/await語法
PEP 498 -- Literal String Interpolation Python再愈,字符串插值
PEP 525 -- Asynchronous Generators榜苫,異步生成器
PEP 572 -- Assignment Expressions,表達式內(nèi)賦值(最爭議)
PEP 3105 -- Make print a function翎冲,print改為函數(shù)
PEP 3115 -- Metaclasses in Python 3000垂睬,元類
PEP 3120 -- Using UTF-8 as the default source encoding
PEP 3333 -- Python Web Server Gateway Interface v1.0.1,Web開發(fā)
PEP 8000 -- Python Language Governance Proposal Overview抗悍,GvR老爹推出決策層后驹饺,事關(guān)新決策方案
中文翻譯:
PEP8 -- https://dwz.cn/W01HexFD
PEP257 -- https://dwz.cn/JLctlNLC
PEP328 -- https://dwz.cn/4vCQJpEP
PEP333 -- https://dwz.cn/TAXIZdzc
PEP484 -- https://dwz.cn/dSLZgg5B
PEP492 -- http://t.cn/EALeaL0
PEP541 -- https://dwz.cn/ce98vc27
PEP3107 -- http://suo.im/4xFESR
PEP3333 -- https://dwz.cn/si3xylgw
3.python之禪
優(yōu)美勝于丑陋(Python以編寫優(yōu)美的代碼為目標)
明了勝于晦澀(優(yōu)美的代碼應(yīng)當是明了的,命名規(guī)范缴渊,風(fēng)格相似)
簡潔勝于復(fù)雜(優(yōu)美的代碼應(yīng)當是簡潔的赏壹,不要有復(fù)雜的內(nèi)部實現(xiàn))
復(fù)雜勝于凌亂(如果復(fù)雜不可避免,那代碼間也不能有難懂的關(guān)系衔沼,要保持接口簡潔)
扁平勝于嵌套(優(yōu)美的代碼應(yīng)當是扁平的蝌借,不能有太多的嵌套)
間隔勝于緊湊(優(yōu)美的代碼有適當?shù)拈g隔,不要奢望一行代碼解決問題)
可讀性很重要(優(yōu)美的代碼是可讀的)
即便假借特例的實用性之名指蚁,也不可違背這些規(guī)則(這些規(guī)則至高無上)
不要包容所有錯誤菩佑,除非您確定需要這樣做(精準地捕獲異常,不寫 except:pass 風(fēng)格的代碼)
當存在多種可能凝化,不要嘗試去猜測
而是盡量找一種稍坯,最好是唯一一種明顯的解決方案(如果不確定,就用窮舉法)
雖然這并不容易搓劫,因為您不是 Python 之父(這里的 Dutch 是指 Guido )
做也許好過不做劣光,但不假思索就動手還不如不做(動手之前要細思量)
如果您無法向人描述您的方案袜蚕,那肯定不是一個好方案;反之亦然(方案測評標準)
命名空間是一種絕妙的理念绢涡,我們應(yīng)當多加利用(倡導(dǎo)與號召)
引用鏈接:http://www.reibang.com/p/30fee49b8d55
2.Python基礎(chǔ)
1.GIL 全局解釋器鎖
GIL(全局解釋器鎖,GIL 只有cpython有):在同一個時刻遣疯,只能有一個線程在一個cpu上執(zhí)行字節(jié)碼雄可,沒法像c和Java一樣將多個線程映射到多個CPU上執(zhí)行,但是GIL會根據(jù)執(zhí)行的字節(jié)碼行數(shù)(為了讓各個線程能夠平均利用CPU時間缠犀,python會計算當前已執(zhí)行的微代碼數(shù)量数苫,達到一定閾值后就強制釋放GIL)和時間片以及遇到IO操作的時候主動釋放鎖,讓其他字節(jié)碼執(zhí)行辨液。
作用:限制多線程同時執(zhí)行虐急,保證同一個時刻只有一個線程執(zhí)行。
原因:線程并非獨立滔迈,在一個進程中多個線程共享變量的止吁,多個線程執(zhí)行會導(dǎo)致數(shù)據(jù)被污染造成數(shù)據(jù)混亂,這就是線程的不安全性燎悍,為此引入了互斥鎖敬惦。
互斥鎖:即確保某段關(guān)鍵代碼的數(shù)據(jù)只能又一個線程從頭到尾完整執(zhí)行习劫,保證了這段代碼數(shù)據(jù)的安全性笤成,但是這樣就會導(dǎo)致死鎖舔琅。
死鎖:多個子線程在等待對方解除占用狀態(tài)围详,但是都不先解鎖焰宣,互相等待费彼,這就是死鎖瓣喊。
基于GIL的存在癞埠,在遇到大量的IO操作(文件讀寫鸽粉,網(wǎng)絡(luò)等待)代碼時斜脂,使用多線程效率更高。
3.并發(fā)編程
1.多進程模塊 multiprocess
參考鏈接: https://thief.one/2016/11/23/Python-multiprocessing/
1.基礎(chǔ)用法
import os
import multiprocessing
# Main
print('Main:', os.getpid())
# worker function
def worker(sign, lock):
lock.acquire()
print(sign, os.getpid())
lock.release()
# Multi-process
record = []
lock = multiprocessing.Lock()
if __name__ == '__main__':
for i in range(5):
process = multiprocessing.Process(target=worker, args=('process', lock))
process.start()
record.append(process)
for process in record:
process.join()
# Main: 90827
# process 90828
# process 90829
# process 90830
# process 90831
# process 90832
2.數(shù)據(jù)通信
ipc:就是進程間的通信模式潜叛,常用的一半是socke秽褒,rpc,pipe和消息隊列等威兜。
multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue)销斟,效率上更高。應(yīng)優(yōu)先考慮Pipe和Queue椒舵,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據(jù)的不是用戶進程的資源)蚂踊。
3.使用Array共享數(shù)據(jù)
對于Array數(shù)組類,括號內(nèi)的“i”表示它內(nèi)部的元素全部是int類型笔宿,而不是指字符“i”犁钟,數(shù)組內(nèi)的元素可以預(yù)先指定棱诱,也可以只指定數(shù)組的長度。Array類在實例化的時候必須指定數(shù)組的數(shù)據(jù)類型和數(shù)組的大小涝动,類似temp = Array(‘i’, 5)迈勋。對于數(shù)據(jù)類型有下面的對應(yīng)關(guān)系:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
代碼:
from multiprocessing import Process
from multiprocessing import Array
def func(i,temp):
temp[0] += 100
print("進程%s " % i, ' 修改數(shù)組第一個元素后----->', temp[0])
if __name__ == '__main__':
temp = Array('i', [1, 2, 3, 4])
for i in range(5):
p = Process(target=func, args=(i, temp))
p.start()
# 進程0 修改數(shù)組第一個元素后-----> 101
# 進程1 修改數(shù)組第一個元素后-----> 201
# 進程2 修改數(shù)組第一個元素后-----> 301
# 進程3 修改數(shù)組第一個元素后-----> 401
# 進程4 修改數(shù)組第一個元素后-----> 501
4.使用Manager共享數(shù)據(jù)
通過Manager類也可以實現(xiàn)進程間數(shù)據(jù)的共享,主要用于線程池之間通信醋粟,Manager()返回的manager對象提供一個服務(wù)進程靡菇,使得其他進程可以通過代理的方式操作Python對象。manager對象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多種格式.??````````````````
from multiprocessing import Process
from multiprocessing import Manager
def func(i, dic):
dic["num"] = 100+i
print(dic.items())
if __name__ == '__main__':
dic = Manager().dict()
for i in range(5):
p = Process(target=func, args=(i, dic))
p.start()
p.join()
# [('num', 100)]
# [('num', 101)]
# [('num', 102)]
# [('num', 103)]
# [('num', 104)]
5.使用queues的Queue類共享數(shù)據(jù)
multiprocessing是一個包米愿,它內(nèi)部有一個queues模塊厦凤,提供了一個Queue隊列類,可以實現(xiàn)進程間的數(shù)據(jù)共享育苟,如下例所示:
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues
def func(i, q):
ret = q.get()
print("進程%s從隊列里獲取了一個%s较鼓,然后又向隊列里放入了一個%s" % (i, ret, i))
q.put(i)
if __name__ == "__main__":
lis = queues.Queue(20, ctx=multiprocessing)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()
# 進程1從隊列里獲取了一個0,然后又向隊列里放入了一個1
# 進程4從隊列里獲取了一個1违柏,然后又向隊列里放入了一個4
# 進程2從隊列里獲取了一個4博烂,然后又向隊列里放入了一個2
# 進程6從隊列里獲取了一個2,然后又向隊列里放入了一個6
# 進程0從隊列里獲取了一個6勇垛,然后又向隊列里放入了一個0
# 進程5從隊列里獲取了一個0脖母,然后又向隊列里放入了一個5
# 進程9從隊列里獲取了一個5,然后又向隊列里放入了一個9
# 進程7從隊列里獲取了一個9闲孤,然后又向隊列里放入了一個7
# 進程3從隊列里獲取了一個7谆级,然后又向隊列里放入了一個3
# 進程8從隊列里獲取了一個3,然后又向隊列里放入了一個8
6.使用pipe實現(xiàn)進程間通信
pipe只能適用于兩個進程間通信讼积,queue則沒這個限制肥照,他有兩個方法
from multiprocessing import Pipe,Process
import time
def produce(pipe):
pipe.send('666')
time.sleep(1)
def consumer(pipe):
print(pipe.recv())
# 有些類似socket的recv方法
if __name__ == '__main__':
send_pi,recv_pi = Pipe()
my_pro = Process(target=produce,args=(send_pi,))
my_con = Process(target=consumer,args=(recv_pi,))
my_pro.start()
my_con.start()
my_pro.join()
my_con.join()
7.進程鎖
一般來說每個進程使用單獨的空間,不必加進程鎖的勤众,但是如果你需要先實現(xiàn)進程數(shù)據(jù)共享舆绎,使用案例二中的代碼,又害怕造成數(shù)據(jù)搶奪和臟數(shù)據(jù)的問題们颜。就可以設(shè)置進程鎖吕朵,與threading類似,在multiprocessing里也有同名的鎖類RLock窥突,Lock努溃,Event,Condition和 Semaphore阻问,連用法都是一樣樣的梧税。
如果有多個進程要上鎖,使用multiprocessing.Manager().BoundedSemaphore(1)
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
def func(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi', lis[0])
lc.release()
if __name__ == "__main__":
array = Array('i', 1)
array[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=func, args=(i, array, lock))
p.start()
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
8.進程池
from multiprocessing import Pool導(dǎo)入就行,非常容易使用的第队。進程池內(nèi)部維護了一個進程序列哮塞,需要時就去進程池中拿取一個進程,如果進程池序列中沒有可供使用的進程凳谦,那么程序就會等待忆畅,直到進程池中有可用進程為止。
from multiprocessing import Pool
import time
def func(args):
time.sleep(1)
print("正在執(zhí)行進程 ", args)
if __name__ == '__main__':
p = Pool(5) # 創(chuàng)建一個包含5個進程的進程池
for i in range(30):
# 有30個任務(wù)
p.apply_async(func=func, args=(i,))
# 異步執(zhí)行尸执,并發(fā)邻眷。這里不用target,要用func
p.close() # 等子進程執(zhí)行完畢后關(guān)閉進程池
# time.sleep(2)
# p.terminate() # 立刻關(guān)閉進程池
p.join()
from multiprocessing.dummy import Pool as ThreadPool 是多線程進程池剔交,綁定一個cpu核心。from multiprocessing import Pool多進程改衩,運行于多個cpu核心岖常。multiprocessing 是多進程模塊, 而multiprocessing.dummy是以相同API實現(xiàn)的多線程模塊葫督。
沒有繞過GIL情況下竭鞍,多線程一定受GIL限制。
from multiprocessing.dummy import Pool as tp
def fun(i):
print i+i+i+i
list_i=[range(100)]
px = tp(processes=8)
# 開啟8個線程池
px.map(fun,list_i)
px.close()
px.join()
9.Process介紹
構(gòu)造方法:
- Process([group [, target [, name [, args [, kwargs]]]]])
- group: 線程組橄镜,目前還沒有實現(xiàn)偎快,庫引用中提示必須是None;
- target: 要執(zhí)行的方法洽胶;
- name: 進程名晒夹;
- args/kwargs: 要傳入方法的參數(shù)。
實例方法:
- is_alive():返回進程是否在運行姊氓。
- join([timeout]):阻塞當前上下文環(huán)境的進程程丐怯,直到調(diào)用此方法的進程終止或到達指定的3. timeout(可選參數(shù))。
- start():進程準備就緒翔横,等待CPU調(diào)度读跷。
- run():strat()調(diào)用run方法,如果實例進程時未制定傳入target禾唁,這star執(zhí)行t默認run()方法效览。
- terminate():不管任務(wù)是否完成,立即停止工作進程荡短。
屬性:
- authkey
- daemon:和線程的setDeamon功能一樣(將父進程設(shè)置為守護進程丐枉,當父進程結(jié)束時,子進程也結(jié)束)肢预。
- exitcode(進程在運行時為None矛洞、如果為–N,表示被信號N結(jié)束)。
- name:進程名字沼本。
- pid:進程號噩峦。
10.Pool介紹
Multiprocessing.Pool可以提供指定數(shù)量的進程供用戶調(diào)用,當有新的請求提交到pool中時抽兆,如果池還沒有滿识补,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值辫红,那么該請求就會等待凭涂,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行它贴妻。在共享資源時切油,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array名惩。Pool類用于需要執(zhí)行的目標很多澎胡,而手動限制進程數(shù)量又太繁瑣時,如果目標少且不用控制進程數(shù)量則可以用Process類娩鹉。
構(gòu)造方法:
- Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :使用的工作進程的數(shù)量攻谁,如果processes是None那么使用 os.cpu_count()返回的數(shù)量。
- initializer: 如果initializer是None弯予,那么每一個工作進程在開始的時候會調(diào)用initializer(*initargs)戚宦。
- maxtasksperchild:工作進程退出之前可以完成的任務(wù)數(shù),完成后用一個新的工作進程來替代原進程锈嫩,來讓閑置的資源被釋放受楼。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活祠挫。
- context: 用在制定工作進程啟動時的上下文那槽,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當?shù)脑O(shè)置了context等舔。
實例方法:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞骚灸。
- apply(func[, args[, kwds]])是阻塞的
- close() 關(guān)閉pool,使其不在接受新的任務(wù)慌植。
- terminate() 關(guān)閉pool甚牲,結(jié)束工作進程,不在處理未完成的任務(wù)蝶柿。
- join() 主進程阻塞丈钙,等待子進程的退出, join方法要在close或terminate之后使用交汤。
Pool使用方法
Pool+map函數(shù)
說明:此寫法缺點在于只能通過map向函數(shù)傳遞一個參數(shù)雏赦。
from multiprocessing import Pool
def test(i):
print i
if __name__=="__main__":
lists=[1,2,3]
pool=Pool(processes=2) #定義最大的進程數(shù)
pool.map(test,lists) #p必須是一個可迭代變量劫笙。
pool.close()
pool.join()
異步進程池(非阻塞)
from multiprocessing import Pool
def test(i):
print i
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
For循環(huán)中執(zhí)行步驟:
(1)循環(huán)遍歷,將500個子進程添加到進程池(相對父進程會阻塞)
(2)每次執(zhí)行10個子進程星岗,等一個子進程執(zhí)行完后填大,立馬啟動新的子進程。(相對父進程不阻塞)
apply_async為異步進程池寫法俏橘。
異步指的是啟動子進程的過程允华,與父進程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進程池添加子進程的過程寥掐,與父進程本身的執(zhí)行卻是同步的靴寂。
'''
pool.apply_async(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當一個進程執(zhí)行完后啟動一個新進程.
print “test”
pool.close()
pool.join()
執(zhí)行順序:For循環(huán)內(nèi)執(zhí)行了2個步驟召耘,第一步:將500個對象放入進程池(阻塞)百炬。第二步:同時執(zhí)行10個子進程(非阻塞),有結(jié)束的就立即添加污它,維持10個子進程運行收壕。(apply_async方法的會在執(zhí)行完for循環(huán)的添加步驟后,直接執(zhí)行后面的print語句轨蛤,而apply方法會等所有進程池中的子進程運行完以后再執(zhí)行后面的print語句)
注意:調(diào)用join之前,先調(diào)用close或者terminate方法虫埂,否則會出錯祥山。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束。
同步進程池(阻塞)
from multiprocessing import Pool
def test(p):
print p
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
實際測試發(fā)現(xiàn)掉伏,for循環(huán)內(nèi)部執(zhí)行步驟:
(1)遍歷500個可迭代對象缝呕,往進程池放一個子進程
(2)執(zhí)行這個子進程,等子進程執(zhí)行完畢斧散,再往進程池放一個子進程供常,再執(zhí)行。(同時只執(zhí)行一個子進程)
for循環(huán)執(zhí)行完畢鸡捐,再執(zhí)行print函數(shù)栈暇。
'''
pool.apply(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當一個進程執(zhí)行完后啟動一個新進程.
print “test”
pool.close()
pool.join()
說明:for循環(huán)內(nèi)執(zhí)行的步驟順序箍镜,往進程池中添加一個子進程源祈,執(zhí)行子進程,等待執(zhí)行完畢再添加一個子進程…..等500個子進程都執(zhí)行完了色迂,再執(zhí)行print “test”香缺。(從結(jié)果來看,并沒有多進程并發(fā))
11.子進程返回值
在實際使用多進程的時候歇僧,可能需要獲取到子進程運行的返回值图张。如果只是用來存儲,則可以將返回值保存到一個數(shù)據(jù)結(jié)構(gòu)中;如果需要判斷此返回值祸轮,從而決定是否繼續(xù)執(zhí)行所有子進程兽埃,則會相對比較復(fù)雜。另外在Multiprocessing中倔撞,可以利用Process與Pool創(chuàng)建子進程讲仰,這兩種用法在獲取子進程返回值上的寫法上也不相同。這篇中痪蝇,我們直接上代碼鄙陡,分析多進程中獲取子進程返回值的不同用法,以及優(yōu)缺點躏啰。
初級用法(Pool)
目的:存儲子進程返回值
說明:如果只是單純的存儲子進程返回值趁矾,則可以使用Pool的apply_async異步進程池;當然也可以使用Process给僵,用法與threading中的相同毫捣,這里只介紹前者。
實例:當進程池中所有子進程執(zhí)行完畢后帝际,輸出每個子進程的返回值蔓同。
from multiprocessing import Pool
def test(p):
return p
if __name__=="__main__":
pool = Pool(processes=10)
result=[]
for i in xrange(50000):
'''
for循環(huán)執(zhí)行流程:
(1)添加子進程到pool,并將這個對象(子進程)添加到result這個列表中蹲诀。(此時子進程并沒有運行)
(2)執(zhí)行子進程(同時執(zhí)行10個)
'''
result.append(pool.apply_async(test, args=(i,)))#維持執(zhí)行的進程總數(shù)為10斑粱,當一個進程執(zhí)行完后添加新進程.
pool.join()
'''
遍歷result列表,取出子進程對象脯爪,訪問get()方法则北,獲取返回值。(此時所有子進程已執(zhí)行完畢)
'''
for i in result:
print i.get()
錯誤寫法:
for i in xrange(50000):
t=pool.apply_async(test, args=(i,)))
print t.get()
說明:這樣會造成阻塞痕慢,因為get()方法只能等子進程運行完畢后才能調(diào)用成功尚揣,否則會一直阻塞等待。如果寫在for循環(huán)內(nèi)容掖举,相當于變成了同步快骗,執(zhí)行效率將會非常低。
高級用法(Pool)
目的:父進程實時獲取子進程返回值塔次,以此為標記結(jié)束所有進程滨巴。
實例(一)
執(zhí)行子進程的過程中,不斷獲取返回值并校驗俺叭,如果返回值為True則結(jié)果所有進程恭取。
from multiprocessing import Pool
import Queue
import time
def test(p):
time.sleep(0.001)
if p==10000:
return True
else:
return False
if __name__=="__main__":
pool = Pool(processes=10)
q=Queue.Queue()
for i in xrange(50000):
'''
將子進程對象存入隊列中。
'''
q.put(pool.apply_async(test, args=(i,)))#維持執(zhí)行的進程總數(shù)為10熄守,當一個進程執(zhí)行完后添加新進程.
'''
因為這里使用的為pool.apply_async異步方法蜈垮,因此子進程執(zhí)行的過程中耗跛,父進程會執(zhí)行while,獲取返回值并校驗攒发。
'''
while 1:
if q.get().get():
pool.terminate() #結(jié)束進程池中的所有子進程调塌。
break
pool.join()
說明:總共要執(zhí)行50000個子進程(并發(fā)數(shù)量為10),當其中一個子進程返回True時惠猿,結(jié)束進程池羔砾。因為使用了apply_async為異步進程,因此在執(zhí)行完for循環(huán)的添加子進程操作后(只是添加并沒有執(zhí)行完所有的子進程)偶妖,可以直接執(zhí)行while代碼姜凄,實時判斷子進程返回值是否有True,有的話結(jié)束所有進程趾访。
優(yōu)點:不必等到所有子進程結(jié)束再結(jié)束程序态秧,只要得到想要的結(jié)果就可以提前結(jié)束,節(jié)省資源扼鞋。
不足:當需要執(zhí)行的子進程非常大時申鱼,不適用,因為for循環(huán)在添加子進程時云头,要花費很長的時間捐友,雖然是異步,但是也需要等待for循環(huán)添加子進程操作結(jié)束才能執(zhí)行while代碼溃槐,因此會比較慢楚殿。
實例(二)
多線程+多進程,添加執(zhí)行子進程的過程中竿痰,不斷獲取返回值并校驗,如果返回值為True則結(jié)果所有進程砌溺。
from multiprocessing import Pool
import Queue
import threading
import time
def test(p):
time.sleep(0.001)
if p==10000:
return True
else:
return False
if __name__=="__main__":
result=Queue.Queue() #隊列
pool = Pool()
def pool_th():
for i in xrange(50000000): ##這里需要創(chuàng)建執(zhí)行的子進程非常多
try:
result.put(pool.apply_async(test, args=(i,)))
except:
break
def result_th():
while 1:
a=result.get().get() #獲取子進程返回值
if a:
pool.terminate() #結(jié)束所有子進程
break
'''
利用多線程影涉,同時運行Pool函數(shù)創(chuàng)建執(zhí)行子進程,以及運行獲取子進程返回值函數(shù)规伐。
'''
t1=threading.Thread(target=pool_th)
t2=threading.Thread(target=result_th)
t1.start()
t2.start()
t1.join()
t2.join()
pool.join()
執(zhí)行流程:利用多線程蟹倾,創(chuàng)建一個執(zhí)行pool_th函數(shù)線程,一個執(zhí)行result_th函數(shù)線程猖闪,pool_th函數(shù)用來添加進程池鲜棠,開啟進程執(zhí)行功能函數(shù)并將子進程對象存入隊列,而result_th()函數(shù)用來不停地從隊列中取子進程對象培慌,調(diào)用get()方法獲取返回值豁陆。等發(fā)現(xiàn)其中存在子進程的返回值為True時,結(jié)束所有進程吵护,最后結(jié)束線程盒音。
優(yōu)點:彌補了實例(一)的不足表鳍,即使for循環(huán)的子進程數(shù)量很多,也能提高性能祥诽,因為for循環(huán)與判斷子進程返回值同時進行譬圣。
2.多線程模塊
線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位。它被包含在進程之中秀菱,是進程中的實際運作單位讶请。一條線程指的是進程中一個單一順序的控制流息堂,一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)绳姨,多線程就是在一個進程中的多個線程,如果使用多線程默認開啟一個主線程笨农,按照程序需求自動開啟多個線程(也可以自己定義線程數(shù))
1.多線程知識點
- Python 在設(shè)計之初就考慮到要在解釋器的主循環(huán)中就缆,同時只有一個線程在執(zhí)行,即在任意時刻谒亦,只有一個線程在解釋器中運行竭宰。對Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行份招。
- 多線程共享主進程的資源切揭,所以可能還會改變其中的變量,這個時候就要加上線程鎖锁摔,每次執(zhí)行完一個線程在執(zhí)行下一個線程廓旬。
- 因為每次只能有一個線程運行,多線程怎么實現(xiàn)的呢谐腰?Python解釋器中一個線程做完了任務(wù)然后做IO(文件讀寫)操作的時候孕豹,這個線程就退出,然后下一個線程開始運行十气,循環(huán)之励背。
- 當你讀完上面三點你就知道多線程如何運行起來,并且知道多線程常用在那些需要等待然后執(zhí)行的應(yīng)用程序上(比如爬蟲讀取到數(shù)據(jù)砸西,然后保存的時候下一個線程開始啟動)也就是說多線程適用于IO密集型的任務(wù)量(文件存儲叶眉,網(wǎng)絡(luò)通信)。
- 注意一點芹枷,定義多線程衅疙,然后傳遞參數(shù)的時候,如果是有一個參數(shù)就是用args=(i鸳慈,)一定要加上逗號饱溢,如果有兩個或者以上的參數(shù)就不用這樣
2.案例一 多線程核心用法
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
def loop():
#定義一個要循環(huán)的函數(shù),當然后面肯定會定義好幾個函數(shù)
print 'thread %s is running...' % threading.current_thread().name
#threading.current_thread().name就是當前線程的名字 在聲明線程的時候可以自定義子線程的名字
n = 0
while n < 10:
n = n + 1
print '%s >>> %s' % (threading.current_thread().name, n)
#輸出當前線程名字 和循環(huán)的參數(shù)n
print 'thread %s ended.' % threading.current_thread().name
print 'thread %s is running...' % threading.current_thread().name
#下面的一部分就是threading的核心用法
#包括target name args 之類的 一般我只用targer=你定義的函數(shù)名
t = threading.Thread(target=loop, name='線程名:')
# 在這里就申明了這個線程的名字
t.start()
#開始
t.join()
#關(guān)于join的相關(guān)信息我會在后面的代碼詳說
print 'thread %s ended.' % threading.current_thread().name
運行結(jié)果:
thread MainThread is running...
thread 線程名: is running...
線程名: >>> 1
線程名: >>> 2
線程名: >>> 3
線程名: >>> 4
線程名: >>> 5
線程名: >>> 6
線程名: >>> 7
線程名: >>> 8
線程名: >>> 9
線程名: >>> 10
thread 線程名: ended.
thread MainThread ended.
3.案例二 線程鎖
前面有說到過走芋,多線程是共享內(nèi)存的理朋,所以其中的變量如果發(fā)生了改變的話就會改變后邊的變量絮识,導(dǎo)致異常,這個時候可以加上線程鎖嗽上。線程鎖的概念就是主要這個線程運行完后再運行下一個線程次舌。
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
def loop():
l.acquire()
# 這里相當于把線程加了鎖,目前只允許這一個線程運行
print 'thread %s is running...' % threading.current_thread().name
#threading.current_thread().name就是當前線程的名字 在聲明線程的時候可以自定義子線程的名字
n = 0
while n < 10:
n = n + 1
print '%s >>> %s' % (threading.current_thread().name, n)
#輸出當前線程名字 和循環(huán)的參數(shù)n
print 'thread %s ended.' % threading.current_thread().name
l.release()
# 這里是把線程鎖解開兽愤,可以再運行寫一個線程
print 'thread %s is running...' % threading.current_thread().name
#下面的一部分就是threading的核心用法
#包括target name args 之類的 一般我只用targer=你定義的函數(shù)名
t = threading.Thread(target=loop, name='線程名:')
l = threading.Lock()
# 這里申明一個線程鎖
t.start()
#開始
t.join()
#關(guān)于join的相關(guān)信息我會在后面的代碼詳說
print 'thread %s ended.' % threading.current_thread().name
使用線程鎖后彼念,程序按照一個一個有序執(zhí)行。其中l(wèi)ock還有Rlock的方法浅萧,RLock允許在同一線程中被多次acquire逐沙。而Lock卻不允許這種情況。否則會出現(xiàn)死循環(huán)洼畅,程序不知道解哪一把鎖吩案。注意:如果使用RLock,那么acquire和release必須成對出現(xiàn)帝簇,即調(diào)用了n次acquire徘郭,必須調(diào)用n次的release才能真正釋放所占用的鎖
4.案例三 join()方法的使用
在多線程中,每個線程自顧執(zhí)行自己的任務(wù)丧肴,當最后一個線程運行完畢后再退出残揉,所以這個時候如果你要打印信息的話,會看到打印出來的信息錯亂無章芋浮,有的時候希望主線程能夠等子線程執(zhí)行完畢后在繼續(xù)執(zhí)行抱环,就是用join()方法。
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
t00 = time.time()
# 獲取當前時間戳
def cs1():
time0 = time.time()
for x in range(9):
print x + time.time()-time0
# 計算用了多少時間
print threading.current_thread().name
# 打印這個線程名字
def cs2():
for x1 in range(6,9):
print x1
print threading.current_thread().name
threads=[]
# 定義一個空的列表
t1 = threading.Thread(target=cs1)
t2 = threading.Thread(target=cs2)
threads.append(t1)
threads.append(t2)
# 把這兩個線程的任務(wù)加載到這個列表中
for x in threads:
x.start()
# 然后執(zhí)行纸巷,這個案例很常用镇草,就是有多個函數(shù)要多線程執(zhí)行的時候用到
# 如果一個程序有多個函數(shù),但是你只想其中的某一個或者某兩個函數(shù)多線程瘤旨,用法一樣加入空的列表即可
x.join()
#線程堵塞 先運行第一個在運行第二個
#x.join()
#注意你的join放在這里是沒有意義的梯啤,和不加join一樣。線程不堵塞 但是會出現(xiàn)不勻稱的表現(xiàn) 并且會修改不同線程中的變量
print 'use time.{}'.format(time.time()-t00)
關(guān)于setDaemon()的概念就是:主線程A中裆站,創(chuàng)建了子線程B,并且在主線程A中調(diào)用了B.setDaemon(),這個的意思是黔夭,把主線程A設(shè)置為守護線程宏胯,這時候,要是主線程A執(zhí)行結(jié)束了本姥,就不管子線程B是否完成,一并和主線程A退出.這就是setDaemon方法的含義肩袍,這基本和join是相反的。此外婚惫,還有個要特別注意的:必須在start() 方法調(diào)用之前設(shè)置氛赐,如果不設(shè)置為守護線程魂爪,程序會被無限掛起。
5.案例四 線程鎖之信號Semaphore
類名:BoundedSemaphore艰管。這種鎖允許一定數(shù)量的線程同時更改數(shù)據(jù)滓侍,它不是互斥鎖。比如地鐵安檢牲芋,排隊人很多撩笆,工作人員只允許一定數(shù)量的人進入安檢區(qū),其它的人繼續(xù)排隊缸浦。
import time
import threading
def run(n, se):
se.acquire()
print("run the thread: %s" % n)
time.sleep(1)
se.release()
# 設(shè)置允許5個線程同時運行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=run, args=(i,semaphore))
t.start()
運行后夕冲,可以看到5個一批的線程被放行。
6.案例五 線程鎖之事件Event
事件線程鎖的運行機制:
全局定義了一個Flag裂逐,如果Flag的值為False歹鱼,那么當程序執(zhí)行wait()方法時就會阻塞,如果Flag值為True卜高,線程不再阻塞弥姻。這種鎖,類似交通紅綠燈(默認是紅燈)篙悯,它屬于在紅燈的時候一次性阻擋所有線程蚁阳,在綠燈的時候,一次性放行所有排隊中的線程鸽照。
事件主要提供了四個方法set()螺捐、wait()、clear()和is_set()矮燎。
調(diào)用clear()方法會將事件的Flag設(shè)置為False定血。
調(diào)用set()方法會將Flag設(shè)置為True。
調(diào)用wait()方法將等待“紅綠燈”信號诞外。
is_set():判斷當前是否"綠燈放行"狀態(tài)
下面是一個模擬紅綠燈澜沟,然后汽車通行的例子:
#利用Event類模擬紅綠燈
import threading
import time
event = threading.Event()
# 定義一個事件的對象
def lighter():
green_time = 5
# 綠燈時間
red_time = 5
# 紅燈時間
event.set()
# 初始設(shè)為綠燈
while True:
print("\33[32;0m 綠燈亮...\033[0m")
time.sleep(green_time)
event.clear()
print("\33[31;0m 紅燈亮...\033[0m")
time.sleep(red_time)
event.set()
def run(name):
while True:
if event.is_set():
# 判斷當前是否"放行"狀態(tài)
print("一輛[%s] 呼嘯開過..." % name)
time.sleep(1)
else:
print("一輛[%s]開來,看到紅燈峡谊,無奈的停下了..." % name)
event.wait()
print("[%s] 看到綠燈亮了茫虽,瞬間飛起....." % name)
if __name__ == '__main__':
light = threading.Thread(target=lighter,)
light.start()
for name in ['奔馳', '寶馬', '奧迪']:
car = threading.Thread(target=run, args=(name,))
car.start()
運行結(jié)果:
綠燈亮...
一輛[奔馳] 呼嘯開過...
一輛[寶馬] 呼嘯開過...
一輛[奧迪] 呼嘯開過...
一輛[奧迪] 呼嘯開過...
......
紅燈亮...
一輛[寶馬]開來,看到紅燈既们,無奈的停下了...
一輛[奧迪]開來濒析,看到紅燈,無奈的停下了...
一輛[奔馳]開來啥纸,看到紅燈号杏,無奈的停下了...
綠燈亮...
[奧迪] 看到綠燈亮了,瞬間飛起.....
一輛[奧迪] 呼嘯開過...
[奔馳] 看到綠燈亮了斯棒,瞬間飛起.....
一輛[奔馳] 呼嘯開過...
[寶馬] 看到綠燈亮了盾致,瞬間飛起.....
一輛[寶馬] 呼嘯開過...
一輛[奧迪] 呼嘯開過...
......
7.案例六 線程鎖之條件Condition
Condition稱作條件鎖主经,依然是通過acquire()/release()加鎖解鎖。
wait([timeout])方法將使線程進入Condition的等待池等待通知庭惜,并釋放鎖罩驻。使用前線程必須已獲得鎖定,否則將拋出異常蜈块。
notify()方法將從等待池挑選一個線程并通知鉴腻,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進入鎖定池),其他線程仍然在等待池中百揭。調(diào)用這個方法不會釋放鎖定爽哎。使用前線程必須已獲得鎖定,否則將拋出異常器一。
notifyAll()方法將通知等待池中所有的線程课锌,這些線程都將進入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定祈秕。使用前線程必須已獲得鎖定渺贤,否則將拋出異常。
實際案例
import threading
import time
num = 0
con = threading.Condition()
class Foo(threading.Thread):
def __init__(self, name, action):
super(Foo, self).__init__()
self.name = name
self.action = action
def run(self):
global num
con.acquire()
print("%s開始執(zhí)行..." % self.name)
while True:
if self.action == "add":
num += 1
elif self.action == 'reduce':
num -= 1
else:
exit(1)
print("num當前為:", num)
time.sleep(1)
if num == 5 or num == 0:
print("暫停執(zhí)行%s请毛!" % self.name)
con.notify()
con.wait()
print("%s開始執(zhí)行..." % self.name)
con.release()
if __name__ == '__main__':
a = Foo("線程A", 'add')
b = Foo("線程B", 'reduce')
a.start()
b.start()
如果不強制停止志鞍,程序會一直執(zhí)行下去,并循環(huán)下面的結(jié)果:
線程A開始執(zhí)行...
num當前為: 1
num當前為: 2
num當前為: 3
num當前為: 4
num當前為: 5
暫停執(zhí)行線程A方仿!
線程B開始執(zhí)行...
num當前為: 4
num當前為: 3
num當前為: 2
num當前為: 1
num當前為: 0
暫停執(zhí)行線程B固棚!
線程A開始執(zhí)行...
num當前為: 1
num當前為: 2
num當前為: 3
num當前為: 4
num當前為: 5
暫停執(zhí)行線程A!
線程B開始執(zhí)行...
8.案例 七定時器
定時器Timer類是threading模塊中的一個小工具仙蚜,用于指定n秒后執(zhí)行某操作此洲。一個簡單但很實用的東西。
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
# 表示1秒后執(zhí)行hello函數(shù)
t.start()
9.案例八 通過with語句使用線程鎖
類似于上下文管理器委粉,所有的線程鎖都有一個加鎖和釋放鎖的動作呜师,非常類似文件的打開和關(guān)閉。在加鎖后贾节,如果線程執(zhí)行過程中出現(xiàn)異持梗或者錯誤,沒有正常的釋放鎖栗涂,那么其他的線程會造到致命性的影響知牌。通過with上下文管理器,可以確保鎖被正常釋放戴差。其格式如下:
with some_lock:
# 執(zhí)行任務(wù)...
這相當于:
some_lock.acquire()
try:
# 執(zhí)行任務(wù)..
finally:
some_lock.release()
10.threading 的常用屬性
current_thread() 返回當前線程
active_count() 返回當前活躍的線程數(shù)送爸,1個主線程+n個子線程
get_ident() 返回當前線程
enumerater() 返回當前活動 Thread 對象列表
main_thread() 返回主 Thread 對象
settrace(func) 為所有線程設(shè)置一個 trace 函數(shù)
setprofile(func) 為所有線程設(shè)置一個 profile 函數(shù)
stack_size([size]) 返回新創(chuàng)建線程棧大蓄踔觥暖释;或為后續(xù)創(chuàng)建的線程設(shè)定棧大小為 size
TIMEOUT_MAX Lock.acquire(), RLock.acquire(), Condition.wait() 允許的最大超時時間
11.線程池 threadingpool
在使用多線程處理任務(wù)時也不是線程越多越好袭厂。因為在切換線程的時候,需要切換上下文環(huán)境球匕,線程很多的時候纹磺,依然會造成CPU的大量開銷。為解決這個問題亮曹,線程池的概念被提出來了橄杨。
預(yù)先創(chuàng)建好一個數(shù)量較為優(yōu)化的線程組,在需要的時候立刻能夠使用照卦,就形成了線程池式矫。在Python2中,沒有內(nèi)置的較好的線程池模塊役耕,需要自己實現(xiàn)或使用第三方模塊采转。
需要注意的是,線程池的整體構(gòu)造需要自己精心設(shè)計瞬痘,比如某個函數(shù)定義存在多少個線程故慈,某個函數(shù)定義什么時候運行這個線程,某個函數(shù)定義去獲取線程獲取任務(wù)框全,某個線程設(shè)置線程守護(線程鎖之類的)察绷,等等…
Python3 線程池:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 線程池,進程池
import threading, time
def test(arg):
print(arg, threading.current_thread().name)
if __name__ == "__main__":
thread_pool = ThreadPoolExecutor(5) # 定義5個線程執(zhí)行此任務(wù)
# process_pool = ProcessPoolExecutor(5) # 定義5個進程
for i in range(5):
thread_pool.submit(test, i)
# process_pool.submit(test,i)
下面是一個簡單的線程池:
import queue
import time
import threading
class MyThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._pool = queue.Queue(maxsize) # 使用queue隊列津辩,創(chuàng)建一個線程池
for _ in range(maxsize):
self._pool.put(threading.Thread)
def get_thread(self):
return self._pool.get()
def add_thread(self):
self._pool.put(threading.Thread)
def run(i, pool):
print('執(zhí)行任務(wù)', i)
time.sleep(1)
pool.add_thread() # 執(zhí)行完畢后拆撼,再向線程池中添加一個線程類
if __name__ == '__main__':
pool = MyThreadPool(5) # 設(shè)定線程池中最多只能有5個線程類
for i in range(20):
t = pool.get_thread() # 每個t都是一個線程類
obj = t(target=run, args=(i, pool)) # 這里的obj才是正真的線程對象
obj.start()
print("活動的子線程數(shù): ", threading.active_count()-1)
分析一下上面的代碼:
- 實例化一個MyThreadPool的對象,在其內(nèi)部建立了一個最多包含5個元素的阻塞隊列丹泉,并一次性將5個Thread類型添加進去情萤。
- 循環(huán)100次,每次從pool中獲取一個thread類摹恨,利用該類筋岛,傳遞參數(shù),實例化線程對象晒哄。
- 在run()方法中睁宰,每當任務(wù)完成后,又為pool添加一個thread類寝凌,保持隊列中始終有5個thread類柒傻。
- 一定要分清楚,代碼里各個變量表示的內(nèi)容较木。t表示的是一個線程類红符,也就是threading.Thread,而obj才是正真的線程對象。
上面的例子是把線程類當做元素添加到隊列內(nèi)预侯,從而實現(xiàn)的線程池致开。這種方法比較糙,每個線程使用后就被拋棄萎馅,并且一開始就將線程開到滿双戳,因此性能較差。下面是一個相對好一點的例子糜芳,在這個例子中飒货,隊列里存放的不再是線程類,而是任務(wù)峭竣,線程池也不是一開始就直接開辟所有線程塘辅,而是根據(jù)需要,逐步建立皆撩,直至池滿莫辨。
# -*- coding:utf-8 -*-
"""
一個基于thread和queue的線程池,以任務(wù)為隊列元素毅访,動態(tài)創(chuàng)建線程沮榜,重復(fù)利用線程,
通過close和terminate方法關(guān)閉線程池喻粹。
"""
import queue
import threading
import contextlib
import time
# 創(chuàng)建空對象,用于停止線程
StopEvent = object()
def callback(status, result):
"""
根據(jù)需要進行的回調(diào)函數(shù)蟆融,默認不執(zhí)行。
:param status: action函數(shù)的執(zhí)行狀態(tài)
:param result: action函數(shù)的返回值
:return:
"""
pass
def action(thread_name, arg):
"""
真實的任務(wù)定義在這個函數(shù)里
:param thread_name: 執(zhí)行該方法的線程名
:param arg: 該函數(shù)需要的參數(shù)
:return:
"""
# 模擬該函數(shù)執(zhí)行了0.1秒
time.sleep(0.1)
print("第%s個任務(wù)調(diào)用了線程 %s守呜,并打印了這條信息型酥!" % (arg+1, thread_name))
class ThreadPool:
def __init__(self, max_num, max_task_num=None):
"""
初始化線程池
:param max_num: 線程池最大線程數(shù)量
:param max_task_num: 任務(wù)隊列長度
"""
# 如果提供了最大任務(wù)數(shù)的參數(shù),則將隊列的最大元素個數(shù)設(shè)置為這個值查乒。
if max_task_num:
self.q = queue.Queue(max_task_num)
# 默認隊列可接受無限多個的任務(wù)
else:
self.q = queue.Queue()
# 設(shè)置線程池最多可實例化的線程數(shù)
self.max_num = max_num
# 任務(wù)取消標識
self.cancel = False
# 任務(wù)中斷標識
self.terminal = False
# 已實例化的線程列表
self.generate_list = []
# 處于空閑狀態(tài)的線程列表
self.free_list = []
def put(self, func, args, callback=None):
"""
往任務(wù)隊列里放入一個任務(wù)
:param func: 任務(wù)函數(shù)
:param args: 任務(wù)函數(shù)所需參數(shù)
:param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù)弥喉,回調(diào)函數(shù)有兩個參數(shù)
1、任務(wù)函數(shù)執(zhí)行狀態(tài)玛迄;2由境、任務(wù)函數(shù)返回值(默認為None,即:不執(zhí)行回調(diào)函數(shù))
:return: 如果線程池已經(jīng)終止蓖议,則返回True否則None
"""
# 先判斷標識虏杰,看看任務(wù)是否取消了
if self.cancel:
return
# 如果沒有空閑的線程,并且已創(chuàng)建的線程的數(shù)量小于預(yù)定義的最大線程數(shù)勒虾,則創(chuàng)建新線程纺阔。
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
# 構(gòu)造任務(wù)參數(shù)元組,分別是調(diào)用的函數(shù)修然,該函數(shù)的參數(shù)笛钝,回調(diào)函數(shù)质况。
w = (func, args, callback,)
# 將任務(wù)放入隊列
self.q.put(w)
def generate_thread(self):
"""
創(chuàng)建一個線程
"""
# 每個線程都執(zhí)行call方法
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)。在正常情況下玻靡,每個線程都保存生存狀態(tài)拯杠, 直到獲取線程終止的flag。
"""
# 獲取當前線程的名字
current_thread = threading.currentThread().getName()
# 將當前線程的名字加入已實例化的線程列表中
self.generate_list.append(current_thread)
# 從任務(wù)隊列中獲取一個任務(wù)
event = self.q.get()
# 讓獲取的任務(wù)不是終止線程的標識對象時
while event != StopEvent:
# 解析任務(wù)中封裝的三個參數(shù)
func, arguments, callback = event
# 抓取異常啃奴,防止線程因為異常退出
try:
# 正常執(zhí)行任務(wù)函數(shù)
result = func(current_thread, *arguments)
success = True
except Exception as e:
# 當任務(wù)執(zhí)行過程中彈出異常
result = None
success = False
# 如果有指定的回調(diào)函數(shù)
if callback is not None:
# 執(zhí)行回調(diào)函數(shù),并抓取異常
try:
callback(success, result)
except Exception as e:
pass
# 當某個線程正常執(zhí)行完一個任務(wù)時雄妥,先執(zhí)行worker_state方法
with self.worker_state(self.free_list, current_thread):
# 如果強制關(guān)閉線程的flag開啟最蕾,則傳入一個StopEvent元素
if self.terminal:
event = StopEvent
# 否則獲取一個正常的任務(wù),并回調(diào)worker_state方法的yield語句
else:
# 從這里開始又是一個正常的任務(wù)循環(huán)
event = self.q.get()
else:
# 一旦發(fā)現(xiàn)任務(wù)是個終止線程的標識元素老厌,將線程從已創(chuàng)建線程列表中刪除
self.generate_list.remove(current_thread)
def close(self):
"""
執(zhí)行完所有的任務(wù)后瘟则,讓所有線程都停止的方法
"""
# 設(shè)置flag
self.cancel = True
# 計算已創(chuàng)建線程列表中線程的個數(shù),
# 然后往任務(wù)隊列里推送相同數(shù)量的終止線程的標識元素
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self):
"""
在任務(wù)執(zhí)行過程中枝秤,終止線程醋拧,提前退出。
"""
self.terminal = True
# 強制性的停止線程
while self.generate_list:
self.q.put(StopEvent)
# 該裝飾器用于上下文管理
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于記錄空閑的線程淀弹,或從空閑列表中取出線程處理任務(wù)
"""
# 將當前線程丹壕,添加到空閑線程列表中
state_list.append(worker_thread)
# 捕獲異常
try:
# 在此等待
yield
finally:
# 將線程從空閑列表中移除
state_list.remove(worker_thread)
# 調(diào)用方式
if __name__ == '__main__':
# 創(chuàng)建一個最多包含5個線程的線程池
pool = ThreadPool(5)
# 創(chuàng)建100個任務(wù),讓線程池進行處理
for i in range(100):
pool.put(action, (i,), callback)
# 等待一定時間薇溃,讓線程執(zhí)行任務(wù)
time.sleep(3)
print("-" * 50)
print("\033[32;0m任務(wù)停止之前線程池中有%s個線程菌赖,空閑的線程有%s個!\033[0m"
% (len(pool.generate_list), len(pool.free_list)))
# 正常關(guān)閉線程池
pool.close()
print("任務(wù)執(zhí)行完畢沐序,正常退出琉用!")
# 強制關(guān)閉線程池
# pool.terminate()
# print("強制停止任務(wù)!")
3.協(xié)程模塊
協(xié)程的機制使得我們可以用同步的方式寫出異步運行的代碼策幼。
總所周知邑时,Python因為有GIL(全局解釋鎖)這玩意,不可能有真正的多線程的存在特姐,因此很多情況下都會用multiprocessing實現(xiàn)并發(fā)晶丘,而且在Python中應(yīng)用多線程還要注意關(guān)鍵地方的同步,不太方便唐含,用協(xié)程代替多線程和多進程是一個很好的選擇铣口,因為它吸引人的特性:主動調(diào)用/退出,狀態(tài)保存觉壶,避免cpu上下文切換等
1.什么是協(xié)程脑题?
協(xié)程,又稱作Coroutine铜靶。從字面上來理解叔遂,即協(xié)同運行的例程他炊,它是比是線程(thread)更細量級的用戶態(tài)線程,特點是允許用戶的主動調(diào)用和主動退出已艰,掛起當前的例程然后返回值或去執(zhí)行其他任務(wù)痊末,接著返回原來停下的點繼續(xù)執(zhí)行。等下哩掺,這是否有點奇怪凿叠?我們都知道一般函數(shù)都是線性執(zhí)行的,不可能說執(zhí)行到一半返回嚼吞,等會兒又跑到原來的地方繼續(xù)執(zhí)行盒件。但一些熟悉python(or其他動態(tài)語言)的童鞋都知道這可以做到,答案是用yield語句舱禽。其實這里我們要感謝操作系統(tǒng)(OS)為我們做的工作炒刁,因為它具有g(shù)etcontext和swapcontext這些特性,通過系統(tǒng)調(diào)用誊稚,我們可以把上下文和狀態(tài)保存起來翔始,切換到其他的上下文,這些特性為coroutine的實現(xiàn)提供了底層的基礎(chǔ)里伯。操作系統(tǒng)的Interrupts和Traps機制則為這種實現(xiàn)提供了可能性城瞎,因此它看起來可能是下面這樣的:
2.理解生成器(generator)
學(xué)過生成器和迭代器的同學(xué)應(yīng)該都知道python有yield這個關(guān)鍵字,yield能把一個函數(shù)變成一個generator疾瓮,與return不同全谤,yield在函數(shù)中返回值時會保存函數(shù)的狀態(tài),使下一次調(diào)用函數(shù)時會從上一次的狀態(tài)繼續(xù)執(zhí)行爷贫,即從yield的下一條語句開始執(zhí)行认然,這樣做有許多好處,比如我們想要生成一個數(shù)列漫萄,若該數(shù)列的存儲空間太大卷员,而我們僅僅需要訪問前面幾個元素,那么yield就派上用場了腾务,它實現(xiàn)了這種一邊循環(huán)一邊計算的機制毕骡,節(jié)省了存儲空間,提高了運行效率岩瘦。
def func():
def fib(max):
n, a, b = 0, 0, 1
while n < max:
yield b
a, b = b, a + b
n = n + 1
print([x for x in fib(6)][-1])
Coroutine與Generator
有些人會把生成器(generator)和協(xié)程(coroutine)的概念混淆未巫,我以前也會這樣,不過其實發(fā)現(xiàn)启昧,兩者的區(qū)別還是很大的叙凡。
直接上最重要的區(qū)別:
- generator總是生成值,一般是迭代的序列
- coroutine關(guān)注的是消耗值密末,是數(shù)據(jù)(data)的消費者
- coroutine不會與迭代操作關(guān)聯(lián)握爷,而generator會
- coroutine強調(diào)協(xié)同控制程序流违施,generator強調(diào)保存狀態(tài)和產(chǎn)生數(shù)據(jù)
相似的是薄坏,它們都是不用return來實現(xiàn)重復(fù)調(diào)用的函數(shù)/對象择浊,都用到了yield(中斷/恢復(fù))的方式來實現(xiàn)范咨。
asyncio
asyncio是python 3.4中新增的模塊,它提供了一種機制燥撞,使得你可以用協(xié)程(coroutines)座柱、IO復(fù)用(multiplexing I/O)在單線程環(huán)境中編寫并發(fā)模型。
根據(jù)官方說明物舒,asyncio模塊主要包括了:
- 具有特定系統(tǒng)實現(xiàn)的事件循環(huán)(event loop);
- 數(shù)據(jù)通訊和協(xié)議抽象(類似Twisted中的部分);
- TCP色洞,UDP,SSL,子進程管道茶鉴,延遲調(diào)用和其他;
- Future類;
- yield from的支持;
- 同步的支持;
- 提供向線程池轉(zhuǎn)移作業(yè)的接口;
下面來看下asyncio的一個例子:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
當事件循環(huán)開始運行時,它會在Task中尋找coroutine來執(zhí)行調(diào)度景用,因為事件循環(huán)注冊了print_sum()涵叮,因此print_sum()被調(diào)用,執(zhí)行result = await compute(x, y)這條語句(等同于result = yield from compute(x, y))伞插,因為compute()自身就是一個coroutine割粮,因此print_sum()這個協(xié)程就會暫時被掛起,compute()被加入到事件循環(huán)中媚污,程序流執(zhí)行compute()中的print語句舀瓢,打印”Compute %s + %s …”,然后執(zhí)行了await asyncio.sleep(1.0)耗美,因為asyncio.sleep()也是一個coroutine京髓,接著compute()就會被掛起,等待計時器讀秒商架,在這1秒的過程中堰怨,事件循環(huán)會在隊列中查詢可以被調(diào)度的coroutine,而因為此前print_sum()與compute()都被掛起了蛇摸,因此事件循環(huán)會停下來等待協(xié)程的調(diào)度备图,當計時器讀秒結(jié)束后,程序流便會返回到compute()中執(zhí)行return語句赶袄,結(jié)果會返回到print_sum()中的result中揽涮,最后打印result,事件隊列中沒有可以調(diào)度的任務(wù)了饿肺,此時loop.close()把事件隊列關(guān)閉蒋困,程序結(jié)
接下來是異步生成器,來看一個例子:
假如我要到一家超市去購買土豆敬辣,而超市貨架上的土豆數(shù)量是有限的:
class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos
all_potatos = Potato.make(5)
現(xiàn)在我想要買50個土豆家破,每次從貨架上拿走一個土豆放到籃子:
def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)
對應(yīng)到代碼中颜说,就是迭代一個生成器的模型,顯然汰聋,當貨架上的土豆不夠的時候门粪,這時只能夠死等,而且在上面例子中等多長時間都不會有結(jié)果(因為一切都是同步的)烹困,也許可以用多進程和多線程解決玄妈,而在現(xiàn)實生活中,更應(yīng)該像是這樣的:
async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
當貨架上的土豆沒有了之后髓梅,我可以詢問超市請求需要更多的土豆拟蜻,這時候需要等待一段時間直到生產(chǎn)者完成生產(chǎn)的過程:
async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))
當生產(chǎn)者完成和返回之后,這是便能從await掛起的地方繼續(xù)往下跑枯饿,完成消費的過程酝锅。而這整一個過程,就是一個異步生成器迭代的流程:
async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(f'Got potato {id(p)}...')
async for語法表示我們要后面迭代的是一個異步生成器奢方。
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()
用asyncio運行這段代碼搔扁,結(jié)果是這樣的:
Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...
既然是異步的,在請求之后不一定要死等蟋字,而是可以做其他事情稿蹲。比如除了土豆,我還想買番茄鹊奖,這時只需要在事件循環(huán)中再添加一個過程:
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()
再來運行這段代碼:
Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...
看下AsyncGenerator的定義苛聘,它需要實現(xiàn)aiter和anext兩個核心方法,以及asend忠聚,athrow设哗,aclose方法。
class AsyncGenerator(AsyncIterator):
__slots__ = ()
async def __anext__(self):
...
@abstractmethod
async def asend(self, value):
...
@abstractmethod
async def athrow(self, typ, val=None, tb=None):
...
async def aclose(self):
...
@classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, '__aiter__', '__anext__',
'asend', 'athrow', 'aclose')
return NotImplemented
異步生成器是在3.6之后才有的特性两蟀,同樣的還有異步推導(dǎo)表達式熬拒,因此在上面的例子中,也可以寫成這樣:
bucket = [p async for p in take_potatos(50)]
類似的垫竞,還有await表達式:
result = [await fun() for fun in funcs if await condition()]
除了函數(shù)之外澎粟,類實例的普通方法也能用async語法修飾:
class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
t = ThreeTwoOne()
await t.begin()
print('start')
實例方法的調(diào)用同樣是返回一個coroutine:
function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())
同理還有類方法:
class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
await ThreeTwoOne.begin()
print('start')
根據(jù)PEP 492中,async也可以應(yīng)用到上下文管理器中欢瞪,aenter和aexit需要返回一個Awaitable:
class GameContext:
async def __aenter__(self):
print('game loading...')
await asyncio.sleep(1)
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def game():
async with GameContext():
print('game start...')
await asyncio.sleep(2)
在3.7版本活烙,contextlib中會新增一個asynccontextmanager裝飾器來包裝一個實現(xiàn)異步協(xié)議的上下文管理器:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)
async修飾符也能用在call方法上:
class GameContext:
async def __aenter__(self):
self._started = time()
print('game loading...')
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def __call__(self, *args, **kws):
if args[0] == 'time':
return time() - self._started
async def game():
async with GameContext() as ctx:
print('game start...')
await asyncio.sleep(2)
print('game time: ', await ctx('time'))
await和yield from
Python3.3的yield from語法可以把生成器的操作委托給另一個生成器,生成器的調(diào)用方可以直接與子生成器進行通信:
def sub_gen():
yield 1
yield 2
yield 3
def gen():
return (yield from sub_gen())
def main():
for val in gen():
print(val)
# 1
# 2
# 3
利用這一特性遣鼓,使用yield from能夠編寫出類似協(xié)程效果的函數(shù)調(diào)用啸盏,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from語法來創(chuàng)建協(xié)程:
# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
然而骑祟,用yield from容易在表示協(xié)程和生成器中混淆回懦,沒有良好的語義性气笙,所以在Python 3.5推出了更新的async/await表達式來作為協(xié)程的語法。
因此類似以下的調(diào)用是等價的:
async with lock:
...
with (yield from lock):
...
######################
def main():
return (yield from coro())
def main():
return (await coro())
那么怯晕,怎么把生成器包裝為一個協(xié)程對象呢潜圃?這時候可以用到types包中的coroutine裝飾器(如果使用asyncio做驅(qū)動的話,那么也可以使用asyncio的coroutine裝飾器)舟茶,@types.coroutine裝飾器會將一個生成器函數(shù)包裝為協(xié)程對象:
import asyncio
import types
@types.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
盡管兩個函數(shù)分別使用了新舊語法谭期,但他們都是協(xié)程對象,也分別稱作native coroutine以及generator-based coroutine吧凉,因此不用擔心語法問題隧出。
下面觀察一個asyncio中Future的例子:
import asyncio
future = asyncio.Future()
async def coro1():
await asyncio.sleep(1)
future.set_result('data')
async def coro2():
print(await future)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
兩個協(xié)程在在事件循環(huán)中,協(xié)程coro1在執(zhí)行第一句后掛起自身切到asyncio.sleep阀捅,而協(xié)程coro2一直等待future的結(jié)果胀瞪,讓出事件循環(huán),計時器結(jié)束后coro1執(zhí)行了第二句設(shè)置了future的值饲鄙,被掛起的coro2恢復(fù)執(zhí)行凄诞,打印出future的結(jié)果'data'。
future可以被await證明了future對象是一個Awaitable傍妒,進入Future類的源碼可以看到有一段代碼顯示了future實現(xiàn)了await協(xié)議:
class Future:
...
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
當執(zhí)行await future這行代碼時幔摸,future中的這段代碼就會被執(zhí)行摸柄,首先future檢查它自身是否已經(jīng)完成颤练,如果沒有完成,掛起自身驱负,告知當前的Task(任務(wù))等待future完成嗦玖。
當future執(zhí)行set_result方法時,會觸發(fā)以下的代碼跃脊,設(shè)置結(jié)果宇挫,標記future已經(jīng)完成:
def set_result(self, result):
...
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()
最后future會調(diào)度自身的回調(diào)函數(shù),觸發(fā)Task._step()告知Task驅(qū)動future從之前掛起的點恢復(fù)執(zhí)行酪术,不難看出器瘪,future會執(zhí)行下面的代碼:
class Future:
...
def __iter__(self):
...
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
最終返回結(jié)果給調(diào)用方。
前面講了那么多關(guān)于asyncio的例子绘雁,那么除了asyncio橡疼,就沒有其他協(xié)程庫了嗎?asyncio作為python的標準庫庐舟,自然受到很多青睞欣除,但它有時候還是顯得太重量了,尤其是提供了許多復(fù)雜的輪子和協(xié)議挪略,不便于使用历帚。
你可以理解為滔岳,asyncio是使用async/await語法開發(fā)的協(xié)程庫,而不是有asyncio才能用async/await挽牢,除了asyncio之外谱煤,curio和trio是更加輕量級的替代物,而且也更容易使用卓研。
asynccontextmanager使用
從Python 3.7開始趴俘,有兩種方法可以編寫異步上下文管理器。一種就是前面提到的魔法函數(shù)的實現(xiàn)奏赘,另外一種就是contextlib的另外一個模塊asynccontextmanager寥闪。通過裝飾器的方式實現(xiàn)一個異步上下文管理器
import asyncio
from contextlib import asynccontextmanager
from concurrent.futures.thread import ThreadPoolExecutor
class AsyncFile(object):
def __init__(self, file, loop=None, executor=None):
if not loop:
loop = asyncio.get_running_loop() # 獲取當前運行事件循環(huán)
if not executor:
executor = ThreadPoolExecutor(10) # 線程池數(shù)量10
self.file = file
self.loop = loop
self.executor = executor
self.pending = []
self.result = []
def write(self, string):
"""
實現(xiàn)異步寫操作
:param string: 要寫的內(nèi)容
:return:
"""
self.pending.append(
self.loop.run_in_executor(
self.executor, self.file.write, string,
)
)
def read(self, i):
"""
實現(xiàn)異步讀操作
:param i:
:return:
"""
self.pending.append(
self.loop.run_in_executor(self.executor, self.file.read, i,)
)
def readlines(self):
self.pending.append(
self.loop.run_in_executor(self.executor, self.file.readlines, )
)
@asynccontextmanager
async def async_open(path, mode="w"):
with open(path, mode=mode) as f:
loop = asyncio.get_running_loop()
file = AsyncFile(f, loop=loop)
try:
yield file
finally:
file.result = await asyncio.gather(*file.pending, loop=loop)
上面的代碼通過使用asyncio中run_in_executor運行一個線程,來使得阻塞操作變?yōu)榉亲枞僮髂ヌ剩_到異步非阻塞的目的疲憋。
AsyncFile類提供了一些方法,這些方法將用于將write梁只、read和readlines的調(diào)用添加到pending列表中缚柳。這些任務(wù)通過finally塊中的事件循環(huán)在ThreadPoolExecutor進行調(diào)度。
yield 前半段用來表示aenter()
yield 后半段用來表示aexit()
使用finally以后可以保證鏈接資源等使用完之后能夠關(guān)閉搪锣。
運行異步上下文管理器
如果調(diào)用前面示例中的異步上下文管理器秋忙,則需要使用關(guān)鍵字async with來進行調(diào)用。另外帶有async with的語句只能在異步函數(shù)中使用构舟。
from asynciodemo.asyncwith import async_open
import asyncio
import tempfile
import os
async def main():
tempdir = tempfile.gettempdir()
path = os.path.join(tempdir, "run.txt")
print(f"臨時文件目錄:{path}")
async with async_open(path, mode='w') as f:
f.write("公眾號: ")
f.write("Python")
f.write("學(xué)習(xí)")
f.write("開發(fā)")
if __name__ == '__main__':
asyncio.run(main())
2.Java相關(guān)
1.Java新特性
2.精通并發(fā)編程灰追、注解、反射狗超、集合弹澎、web編程
3.熟悉網(wǎng)絡(luò)編程NIO Netty
4.熟悉JVM調(diào)優(yōu)
5. 開源框架,原理源碼
熟練使用spring/springmvc/springboot/mybatis等開源框架, 熟悉這些框架的原理、源碼