基本概念
并行:多個(gè)任務(wù)同時(shí)執(zhí)行狮暑,在同一時(shí)刻有多個(gè)任務(wù)在同時(shí)執(zhí)行艰躺。
并發(fā):多個(gè)任務(wù)分時(shí)交替執(zhí)行侄泽,在同一時(shí)刻僅有1個(gè)任務(wù)在執(zhí)行,但在宏觀上看著像一起執(zhí)行正勒。一般指的是抗QPS的能力得院。
進(jìn)程:系統(tǒng)資源分配的最小單元,有獨(dú)立的內(nèi)存空間章贞,開銷大
線程:CPU調(diào)度的最小單元祥绞,空間內(nèi)存共享,需要依賴進(jìn)程存活鸭限。并發(fā)編程時(shí)要注意線程安全的問題蜕径。每個(gè)線程大約占用30K左右開銷
協(xié)程:不被操作系統(tǒng)控制,完全由用戶程序控制里覆。開銷大約幾K空間丧荐,協(xié)程間的切換只發(fā)生在用戶態(tài),不需要像線程切換那樣再進(jìn)入內(nèi)核態(tài)喧枷;協(xié)程切換次數(shù)一般情況比線切換程少(這里怎么理解虹统?),產(chǎn)生IO時(shí)讓出CPU隧甚,asyncio.sleep(0)時(shí)會(huì)主動(dòng)讓出CPU车荔,但如果一個(gè)協(xié)程是x+=1操作,會(huì)一直霸占著CPU戚扳。
性能對(duì)比:http://www.reibang.com/p/6c63dafa70bf
- 同步阻塞:A調(diào)用B忧便,等待B執(zhí)行完任務(wù)后返回。
- 同步非阻塞:A調(diào)用B帽借,B直接返回成功或者什么都不返回或者某個(gè)狀態(tài)珠增,然后B去執(zhí)行任務(wù)。過一段時(shí)間后A再去調(diào)用B砍艾。一般情況下A還會(huì)去輪詢B執(zhí)行任務(wù)的結(jié)果蒂教。
- 異步阻塞:A調(diào)用B1,B2,B3...Bx,需要等待B1,B2,B3...Bx的所有返回結(jié)果脆荷。掉B1,B2,B3...Bx時(shí)不是串行的凝垛,是并發(fā)的。
- 異步非阻塞:A調(diào)用B蜓谋,B直接返回成功或者什么都不返回或者某個(gè)狀態(tài)梦皮,然后B再去執(zhí)行任務(wù),任務(wù)執(zhí)行完后桃焕,B再回調(diào)A剑肯,或者把結(jié)果推送給某個(gè)隊(duì)列讓A去消費(fèi)。
相關(guān)函數(shù):
Thread.setDaemon() 設(shè)置為后臺(tái)線程观堂,默認(rèn)為False,設(shè)置為True后,主線程退出挤庇,則整個(gè)程序退出
Thread.join() 阻塞線程,join后面的主線程語(yǔ)句荐虐,會(huì)等子線程執(zhí)行完成后再執(zhí)行。多個(gè)join()語(yǔ)句不分前后次序
繼承方式:平時(shí)常用繼承的方式寫線程方法丸凭,控制起來更像面向?qū)ο缶幊?/p>
class MyThread(threading.Thread):
def __init__(self):
pass
def run(self):
pass
# Method representing the thread's activity
mythread = MyThread()
mythread.start()
鎖機(jī)制
GIL鎖簡(jiǎn)介:http://www.reibang.com/p/633b7aacf722
GIL的簡(jiǎn)單結(jié)論:
對(duì)于IO密集型場(chǎng)景福扬,更適合使用多線程。比如WEB惜犀,磁盤
對(duì)于CPU密集型場(chǎng)景铛碑,更適合使用多進(jìn)程。比如模型的運(yùn)算
threading模塊下的鎖們虽界,multiprocessing模塊下的鎖類似
鎖 | 簡(jiǎn)介 |
---|---|
非遞歸鎖(互斥鎖) Lock() | 多個(gè)線程訪問共享變量時(shí)需要互斥鎖汽烦;互斥鎖的操作acquire()和release()需要成對(duì)出現(xiàn);如果某個(gè)線程方法里acquire了多個(gè)鎖時(shí)莉御,要注意死鎖的問題撇吞。 |
遞歸鎖 RLock() | RLock內(nèi)部維護(hù)了一個(gè)Lock和一個(gè)計(jì)數(shù)器counter,一個(gè)線程能夠同時(shí)獲取N次Lock礁叔,只有當(dāng)一個(gè)線程所有acquire的鎖都release牍颈,counter變?yōu)?后,其他線程才能搶到這把鎖琅关。寫代碼時(shí)煮岁,如果底層方法封裝了lock.acquire()和release(),上次方法又想封裝lock時(shí)涣易,最好使用RLock() |
條件鎖 Condition() | 使用條件鎖更像使用協(xié)程画机,由程序控制自己什么時(shí)候釋放鎖 acquire():獲取鎖 release():釋放鎖 wait():線程進(jìn)入blocking狀態(tài),直到收到notify通知或超時(shí)才繼續(xù)運(yùn)行 notify():通知其他await的線程新症,可以加參數(shù)n=1或者k步氏,需要注意notify()和await()的次序,由程序決定 notifyAll():notify所有await的線程 |
事件 Event() | await():掛起當(dāng)前線程账劲,直到收到event為True時(shí)才繼續(xù)執(zhí)行當(dāng)前線程。 set():設(shè)置event為True金抡。 clear():設(shè)置event為False瀑焦。 isSet():獲取event狀態(tài) |
Timer() | 幾秒鐘后執(zhí)行任務(wù),multiprocessing下沒看到Timer() |
信號(hào)量 Semaphore() | semaphore是個(gè)內(nèi)部數(shù)據(jù)梗肝,它的內(nèi)部有個(gè)計(jì)數(shù)器榛瓮,表明當(dāng)前共享資源最多有多少個(gè)線程可以同時(shí)使用。 |
有界信號(hào)量 BoundedSemaphore() | 和Semaphore相似巫击,這個(gè)更嚴(yán)格一些禀晓,超過信號(hào)量限制時(shí)會(huì)return ValueError |
柵欄(障礙) Barrier() | threading.Barrier(3, action=xxx_action, timeout=None)精续,等barrier.wait()的數(shù)量到達(dá)3后,優(yōu)先執(zhí)行xxx_action方法粹懒。 wait():方法表示想要通過柵欄,如果沒跨過柵欄就阻塞重付,跨過柵欄后優(yōu)先執(zhí)行xxx_action方法 wait(N):N表示秒,如果到達(dá)N秒后,還未跨過柵欄,則引發(fā)BrokenBarrierError錯(cuò)誤。 reset():重置柵欄 |
隊(duì)列機(jī)制
隊(duì)列 | 簡(jiǎn)介 |
---|---|
先進(jìn)先出隊(duì)列 | FIFO--queue.Queue(5) |
先進(jìn)后出隊(duì)列 | LIFO--queue.LifoQueue(5) |
優(yōu)先級(jí)隊(duì)列 | q = queue.PriorityQueue() q.put([1, 'aaa']) q.put([20, 'bbb'] 數(shù)值越小優(yōu)先級(jí)越高,在q.get()是會(huì)被優(yōu)先去出來凫乖,底層是通過heapq實(shí)現(xiàn)的 |
阻塞隊(duì)列 | python的queue.Queue支持阻塞方式,也有g(shù)et_nowait和put_nowait方法 |
延遲隊(duì)列 | python本身不支持延遲隊(duì)列,需要通過其他手段實(shí)現(xiàn)确垫。 利用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn) 利用redis的zset結(jié)構(gòu)實(shí)現(xiàn) 利用rabbitmq實(shí)現(xiàn) 如果只是為了定時(shí)執(zhí)行任務(wù),用Timer()多線程也可以 http://www.reibang.com/p/a663e52e6488 |
進(jìn)程間通信IPC(Inter-process communication)
IPC機(jī)制 | 簡(jiǎn)介 |
---|---|
隊(duì)列 | 進(jìn)程間通信都可能用到隊(duì)列 |
共享內(nèi)存 | multiprocessing模塊里提供了共享Value,共享List等對(duì)象 |
管道 | 用于父進(jìn)程和子進(jìn)程間通信 |
信號(hào) | 只用于進(jìn)程間的通信帽芽,信號(hào)是個(gè)軟中斷删掀,捕捉信號(hào)的過程:1、主程序控制流收到信號(hào)后中斷导街,由用戶態(tài)進(jìn)入內(nèi)核態(tài)披泪。2、內(nèi)核捕捉中斷信號(hào)搬瑰,進(jìn)行中斷處理款票。3、如果中斷函數(shù)是用戶自定義函數(shù)跌捆,則跳回用戶態(tài)執(zhí)行中斷函數(shù)徽职。4、中斷函數(shù)處理完成后佩厚,再次進(jìn)入內(nèi)核態(tài)姆钉,準(zhǔn)備恢復(fù)主程序流程。5抄瓦、返回用戶態(tài)從主控制流程中上次被中斷的地方繼續(xù)向下執(zhí)行潮瓶。 由于中斷可以出發(fā)中斷函數(shù),中斷函數(shù)中能能夠在主程序blocking時(shí)處理大量的業(yè)務(wù)邏輯钙姊,因此可用做平滑重啟和熱加載毯辅。 |
信號(hào)量 | 和鎖里的信號(hào)量類似,能夠獲取信號(hào)量時(shí)煞额,就能操作資源思恐。列在這為了和信號(hào)區(qū)分是兩個(gè)概念 |
socket/zmq | zeromq用起來更像是一個(gè)封裝好的socket接口 |
zookeeper | 分布式協(xié)調(diào)服務(wù) |
池
線程池from concurrent.futures import ThreadPoolExecutor
進(jìn)程池from concurrent.futures import ProcessPoolExecutor
注意和multiprocessing的Pool用法不一樣
常用方法:
map:阻塞直到返回,result并不是你map_fun返回的結(jié)果膊毁,而是一個(gè)生成器胀莹,如果要從中遍歷去結(jié)果。map能夠保證線程任務(wù)的順序性
submit:提交執(zhí)行的函數(shù)到線/進(jìn)程池中婚温,submit函數(shù)立即返回描焰,不阻塞
task.cancel():取消某個(gè)任務(wù),該任務(wù)沒有放入線程池中才能取消成功
task.done():判斷任務(wù)是否已完成,沒啥用栅螟。用as_completed()
task.result():獲取task的結(jié)果荆秦,如果獲取了結(jié)果篱竭,就會(huì)造成阻塞
as_completed():此方法是一個(gè)生成器,在沒有任務(wù)完成的時(shí)候步绸,會(huì)阻塞掺逼,在有某個(gè)任務(wù)完成的時(shí)候,就能繼續(xù)執(zhí)行for循環(huán)后面的語(yǔ)句靡努,然后繼續(xù)阻塞住坪圾,循環(huán)到所有的任務(wù)結(jié)束。
for job in as_completed(all_jobs):
res = job.result()
asyncio(python3.6+)
async和await語(yǔ)法需要成對(duì)出現(xiàn)
在工程中注意一協(xié)全協(xié)
如果代碼需要主動(dòng)讓出CPU惑朦,注意讓出CPU的位置
TODO是否有必要實(shí)現(xiàn)協(xié)程池兽泄,為什么?
TODOjava中在并發(fā)編程里定義了許多概念
eg:
可見性漾月,原子性病梢,有序性