Python并發(fā)編程之多進(jìn)程
-
什么是進(jìn)程险耀?
進(jìn)程是一個(gè)抽象的概念琳袄,進(jìn)程的概念起源于操作系統(tǒng),正在進(jìn)行的一個(gè)過程或者一個(gè)過程的總和漆撞,而負(fù)責(zé)執(zhí)行過程的則是CPU殴泰。- 進(jìn)程與程序的區(qū)別
程序僅僅只是一堆文件于宙、一堆代碼而已,而進(jìn)程指的是程序的運(yùn)行過程或過程的總和悍汛。 - 并發(fā)與并行
無論是并行還是并發(fā)捞魁,在用戶看來都是'同時(shí)'運(yùn)行的,不管是進(jìn)程還是線程离咐,都只是一個(gè)任務(wù)而已谱俭,真是干活的是CPU,CPU來做這些任務(wù)宵蛀,而一個(gè)CPU同一時(shí)刻只能執(zhí)行一個(gè)任務(wù)昆著。- 并發(fā):是偽并行,即看起來是同時(shí)運(yùn)行术陶。單個(gè)CPU+多道技術(shù)就可以實(shí)現(xiàn)并發(fā)凑懂,(并行也屬于并發(fā))。
- 并行:同時(shí)運(yùn)行梧宫,只有具備多個(gè)CPU才能實(shí)現(xiàn)并行接谨。
- 多道技術(shù)
多道的產(chǎn)生背景是想要在單核CPU的情況下實(shí)現(xiàn)多個(gè)進(jìn)程并發(fā)執(zhí)行,具有兩大核心特點(diǎn):- 空間上的復(fù)用(多道程序復(fù)用內(nèi)存的空間):多道程序同時(shí)讀入內(nèi)存塘匣,等待被CPU執(zhí)行脓豪,即產(chǎn)生了多個(gè)進(jìn)程;進(jìn)程之間的內(nèi)存地址空間是相互隔離的忌卤,而且這種隔離是物理級(jí)別實(shí)現(xiàn)的扫夜。
- 時(shí)間上的復(fù)用(多道程序復(fù)用CPU的時(shí)間):正在執(zhí)行的進(jìn)程遇到了I/O操作 或 正在執(zhí)行的進(jìn)程占用CPU時(shí)間過長(zhǎng) 或 來了一個(gè)優(yōu)先級(jí)更高的進(jìn)程,操作系統(tǒng)都會(huì)強(qiáng)制收回當(dāng)前進(jìn)程的CPU使用權(quán)限驰徊,并將其分配給其他進(jìn)程笤闯。
- 因此,若要提升程序的執(zhí)行效率棍厂,需要減少或降低I/O操作望侈。
- 進(jìn)程與程序的區(qū)別
-
開啟子進(jìn)程的兩種方式
-
第一種
from multiprocessing import Process import time def task(name): print(F'{name} is running') time.sleep(3) print(F'{name} is done') if __name__ == '__main__': # 在Windows系統(tǒng)之上,開啟子進(jìn)程的操作一定要放到這下面 # if __name__ == '__main__'的意思是:當(dāng).py文件被直接運(yùn)行時(shí)勋桶,if __name__ == '__main__'之下的代碼塊將被運(yùn)行; # 當(dāng).py文件以模塊形式被導(dǎo)入時(shí)侥猬,if __name__ == '__main__'之下的代碼塊不被運(yùn)行例驹。 P = Process(target=task, args=('Python',)) # args= 后面是一個(gè)元組,因此一個(gè)參數(shù)的時(shí)候必須加逗號(hào) # Process(target=task,kwargs={'name':'Python'}) # kwargs= 后面是一個(gè)字典退唠,這兩種傳參方式任選一種 P.start() # 向操作系統(tǒng)發(fā)送申請(qǐng)內(nèi)存空間請(qǐng)求鹃锈,注意,是發(fā)請(qǐng)求給操作系統(tǒng)瞧预,至于操作系統(tǒng)怎么申請(qǐng)內(nèi)存空間不管屎债,然后把父進(jìn)程的數(shù)據(jù)拷貝給子進(jìn)程仅政,作為子進(jìn)程的初始狀態(tài) print('父進(jìn)程')
-
第二種
from multiprocessing import Process import time class MyProcess(Process): # 自己定義一個(gè)類,并繼承Process def __init__(self, name): super(Process, self).__init__() self.name = name def run(self): # 規(guī)定必須要定義run方法 print(F'{self.name} is running') time.sleep(3) print(F'{self.name} is done') if __name__ == "__main__": p = MyProcess('Python') p.start() # 規(guī)定盆驹,start()調(diào)用的是MyProcess()中的run(self)方法 print('父進(jìn)程')
-
-
父進(jìn)程等待子進(jìn)程結(jié)束
-
驗(yàn)證進(jìn)程的內(nèi)存空間相互隔離
from multiprocessing import Process import time x = 1 # 子進(jìn)程要執(zhí)行的代碼 def task(): time.sleep(3) global x x = 0 print('子進(jìn)程結(jié)束',x) # 打印結(jié)果:子進(jìn)程結(jié)束 0 # 開啟子進(jìn)程 if __name__ == '__main__': p = Process(target=task) p.start() p.join() # 讓父進(jìn)程等待子進(jìn)程結(jié)束圆丹,子進(jìn)程結(jié)束后才會(huì)執(zhí)行下一行代碼,也有回收僵尸進(jìn)程的效果 print(x) # 打印結(jié)果:1
-
驗(yàn)證父進(jìn)程等待子進(jìn)程結(jié)束后才會(huì)結(jié)束
from multiprocessing import Process import time import random # 子進(jìn)程要執(zhí)行的代碼 def task(n): print(F'{n} is running') time.sleep(random.randint(1,10)) # 開啟子進(jìn)程 start_time = time.time() p_l= [] if __name__ == '__main__': # 申請(qǐng)開啟5次子進(jìn)程 for i in range(5): p = Process(target=task, args=(i,)) p_l.append(p) p.start() # 由于p.start()只是向操作系統(tǒng)申請(qǐng)開辟內(nèi)存空間躯喇,每次的操作系統(tǒng)開辟內(nèi)存空間的時(shí)間程序無法掌控 # 等待所有子進(jìn)程結(jié)束 for p in p_l: p.join() end_time = time.time() print('父進(jìn)程',(end_time - start_time)) ''' 打印機(jī)結(jié)果: 主進(jìn)程 2 is running 1 is running 0 is running 4 is running 3 is running 父進(jìn)程 9.107510566711426 Process finished with exit code 0 '''
-
-
父進(jìn)程操作子進(jìn)程的其他屬性(方法)
from multiprocessing import Process import time import os # 子進(jìn)程要執(zhí)行的代碼 def task(): print('子進(jìn)程開始辫封,子進(jìn)程:%s,父進(jìn)程:%s' %(os.getpid廉丽,os.ppid)) time.sleep(3) print('子進(jìn)程結(jié)束',x) # 開啟子進(jìn)程 if __name__ == '__main__': print(os.getpid) #查看父進(jìn)程的進(jìn)程號(hào) p = Process(target=task) p.start() print(p.pid) # 查看子進(jìn)程的進(jìn)程號(hào) p.terminate() # 向操作系統(tǒng)發(fā)送結(jié)束子進(jìn)程請(qǐng)求 time.sleep(1) print(p.is_alive()) # 判斷子進(jìn)程是否存活倦微,返回的是布爾值
-
僵尸進(jìn)程與孤兒進(jìn)程
- 僵尸進(jìn)程:僵尸進(jìn)程是當(dāng)子進(jìn)程比父進(jìn)程先結(jié)束,而父進(jìn)程又沒有回收子進(jìn)程正压,釋放子進(jìn)程占用的資源欣福,此時(shí)子進(jìn)程將成為一個(gè)僵尸進(jìn)程。
- 危害:僵尸進(jìn)程占用進(jìn)程號(hào)焦履,如果存在過多的僵尸進(jìn)程拓劝,很容易導(dǎo)致沒有新的進(jìn)程號(hào)提供給要產(chǎn)生的進(jìn)程。一般來說父進(jìn)程結(jié)束后會(huì)調(diào)用
wait/waitpid
來回收僵尸進(jìn)程裁良,如果父進(jìn)程沒有回收僵尸進(jìn)程凿将,則僵尸進(jìn)程變成孤兒進(jìn)程,然后由init
進(jìn)程進(jìn)行回收价脾。 - 孤兒進(jìn)程:在操作系統(tǒng)領(lǐng)域中牧抵,孤兒進(jìn)程指的是在其父進(jìn)程執(zhí)行完成或被終止后仍繼續(xù)運(yùn)行的一類進(jìn)程。這些孤兒進(jìn)程將被
init
進(jìn)程(進(jìn)程號(hào)為1)所收養(yǎng)侨把,并由init
進(jìn)程對(duì)它們完成狀態(tài)收集工作犀变。
-
守護(hù)進(jìn)程
-
守護(hù)進(jìn)程(daemon)是一類在后臺(tái)運(yùn)行的特殊進(jìn)程,用于執(zhí)行特定的系統(tǒng)任務(wù)秋柄。如果父進(jìn)程將子進(jìn)程設(shè)置為守護(hù)進(jìn)程获枝,那么在父進(jìn)程代碼運(yùn)行完畢后,注意骇笔,是父進(jìn)程'代碼運(yùn)行完畢'省店,而不是父進(jìn)程結(jié)束,守護(hù)進(jìn)程就立即被回收笨触。
For example: from multiprocessing import Process import time # 子進(jìn)程代碼 def task(name): print('%s is running' %(name)) # 父進(jìn)程代碼 if __name__ == '__main__': obj = Process(target=task,args=('process',)) obj.daemon = True # 將obj設(shè)置為守護(hù)進(jìn)程懦傍,守護(hù)進(jìn)程的特點(diǎn)是當(dāng)父進(jìn)程代碼運(yùn)行完畢后,無論自己的任務(wù)有沒有完成芦劣,都隨之結(jié)束粗俱。 obj.start() print('父進(jìn)程')
-
-
互斥鎖
在編程中,引入了對(duì)象互斥鎖的概念虚吟,來保證共享數(shù)據(jù)操作的完整性寸认。每個(gè)對(duì)象都對(duì)應(yīng)于一個(gè)可稱為'互斥鎖' 的標(biāo)記签财,這個(gè)標(biāo)記用來保證在任一時(shí)刻,只能有一個(gè)線程訪問該對(duì)象偏塞。
互斥鎖和join的區(qū)別:
-
二者原理一樣唱蒸,都是將并發(fā)變成串行,從而保證有序烛愧;
- 區(qū)別一:join是按照人為指定的順序執(zhí)行油宜,互斥鎖是所有進(jìn)程平等的競(jìng)爭(zhēng),誰先搶到誰執(zhí)行怜姿。
- 區(qū)別二:互斥鎖可以讓一部分代碼(修改共享數(shù)據(jù)的代碼)串行慎冤,而join只能將代碼整體串行。
-
互斥鎖用法(區(qū)別一舉例):
from multiprocessing import Process,Lock lock = Lock() def task1(lock): lock.acquire() # lock.acquire()只能獲取一次互斥鎖 print('task1 is running') lock.release() # lock.release()釋放互斥鎖,一定要在操作完畢后釋放鎖沧卢! def task2(lock): lock.acquire() print('task2 is running') lock.release() def task3(lock): lock.acquire() print('task3 is running') lock.release() if __name__ == '__main__': p1 = Process(target=task1,args=(lock,)) p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
-
互斥鎖應(yīng)用場(chǎng)景之搶票(區(qū)別二舉例):
import os import json import time import random from multiprocessing import Process,Lock mutex_lock = Lock() def search_ticket(): ''' 這是一個(gè)車票查詢的操作 :return: ''' time.sleep(random.randint(1,3)) with open('db.json', mode='r', encoding='utf-8') as f: dic = json.load(f) print(F'用戶:{os.getpid()} 查詢到剩余車票:{dic['count']}' return dic['count'] def get_ticket(): ''' 這是一個(gè)搶票的操作 :return: ''' with open('db.json', mode='r', encoding='utf-8') as f: time.sleep(random.randint(1, 3)) dic = json.load(f) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', mode='w', encoding='utf-8') as f: json.dump(dic,f) print('用戶:%s 購票成功蚁堤!' %(os.getpid())) else: print('用戶:%s 購票失敗但狭!' %(os.getpid())) def buy_ticket(): ''' 這是一個(gè)購買車票的函數(shù) :return: ''' search_ticket() with mutex_lock: get_ticket() if __name__ == '__main__': for i in range(10): p = Process(target = buy_ticket) p.start()
-
IPC機(jī)制(Inter Process Communication)
-
IPC:進(jìn)程間通信披诗,進(jìn)程間為什么要通信?因?yàn)檫M(jìn)程需要協(xié)同完成工作立磁,就涉及到一個(gè)進(jìn)程要把自己處理的結(jié)果交給另外一個(gè)進(jìn)程呈队,而進(jìn)程之間內(nèi)存空間相互隔離,因此進(jìn)程之間通信必須找到一種介質(zhì)唱歧,該介質(zhì)必須滿足:
- 是所有進(jìn)程共享的宪摧;(實(shí)現(xiàn)進(jìn)程共享的機(jī)制有管道和隊(duì)列,隊(duì)列本質(zhì)是由管道+鎖實(shí)現(xiàn))
- 必須是內(nèi)存空間颅崩;
- 幫我們處理好鎖的問題几于。
-
但凡涉及到進(jìn)程之間通信,就使用Queue,隊(duì)列是IPC機(jī)制實(shí)現(xiàn)的一種方式
強(qiáng)調(diào):
- 隊(duì)列是用來存進(jìn)程之間通信消息的沿后,數(shù)據(jù)量不應(yīng)該過大沿彭;
-
Queue(maxsize)
中的maxsize
的值超過內(nèi)存限制就變得毫無意義;
from multiprocessing import Queue # 從multiprocessing導(dǎo)入隊(duì)列尖滚,隊(duì)列特點(diǎn):先進(jìn)先出 q = Queue(3,block=True) # maxsize=3喉刘,若不指定大小就無限大,受限于內(nèi)存大衅崤饱搏;block=True默認(rèn)是阻塞。 q.put('Hello') q.put({'name':'HC'}) q.put(100) # q.put(55) 由于maxsize=3置逻,第四個(gè)put會(huì)引起阻塞 print(q.get()) print(q.get()) print(q.get())
-