進(jìn)程是線程的集合酥馍,每個(gè)進(jìn)程至少有一個(gè)主線程,進(jìn)程是在系統(tǒng)中起資源調(diào)度的作用板熊,進(jìn)程分配資源和管理數(shù)據(jù)框全,程序的執(zhí)行都是有線程執(zhí)行的。進(jìn)程和線程的執(zhí)行差不多干签,也是由時(shí)間片根據(jù)并發(fā)機(jī)制執(zhí)行的津辩。Python中關(guān)于進(jìn)程的內(nèi)建模塊multiprocessing,通過(guò)模塊的Process類型,可以創(chuàng)建多個(gè)進(jìn)程容劳。
Lock/RLock喘沿,互斥鎖/重用鎖,進(jìn)程鎖類型竭贩、Event事件類型蚜印,Condition條件類型可以很方便的完成進(jìn)程間的同步操作。Queue進(jìn)程隊(duì)列類型留量,用于多進(jìn)程的數(shù)據(jù)共享窄赋。Listener/Client 進(jìn)程監(jiān)聽(tīng)哟冬,基于網(wǎng)絡(luò)多進(jìn)程間的數(shù)據(jù)共享。多進(jìn)程面向?qū)ο蟮膶?shí)現(xiàn)和多線程的操作類似寝凌。自定義進(jìn)程類型柒傻,繼承系統(tǒng)進(jìn)程比哦準(zhǔn)類型multiprocessing.Process,重寫父類的run()方法较木,之方法中執(zhí)行代碼。在使用時(shí)創(chuàng)建該自定義進(jìn)程類型的對(duì)象青柄,
進(jìn)程池:包含多個(gè)進(jìn)程的池塘伐债;包含并且管理多個(gè)進(jìn)程的一個(gè)對(duì)象Pool
init(num) 初始化函數(shù),用于創(chuàng)建一個(gè)進(jìn)程池
apply(function, args) 同步執(zhí)行一個(gè)函數(shù)~當(dāng)進(jìn)程中的函數(shù)執(zhí)行完成才能退出
apply_async(function, args) 異步非阻塞執(zhí)行一個(gè)函數(shù)
close()停止向進(jìn)程池提交任務(wù)
join()讓進(jìn)程池工作完成致开,才能允許其他進(jìn)程繼續(xù)工作
import multiprocessing, time, os
def my_proc():
print(multiprocessing.current_process().name, "一個(gè)進(jìn)程正在工作", os.getppid(), os.getpid())
time.sleep(1)
if __name__ == "__main__":
# 創(chuàng)建一個(gè)進(jìn)程池
pool = multiprocessing.Pool(2
# 循環(huán)任務(wù)
for i in range(20):
pool.apply_async(my_proc)
# 停止提交任務(wù)
pool.close()
# 獨(dú)占執(zhí)行
pool.join()
現(xiàn)在有一個(gè)問(wèn)題峰锁,在進(jìn)程中,操作變量數(shù)據(jù)是共享的嗎双戳。操作的變量數(shù)據(jù)都是獨(dú)立的每個(gè)程序在操作系統(tǒng)數(shù)據(jù)的時(shí)候都會(huì)先將數(shù)據(jù)復(fù)制一下然后再操作復(fù)制的數(shù)據(jù)虹蒋,系統(tǒng)內(nèi)的數(shù)據(jù)并沒(méi)有改變。所以進(jìn)程之間的數(shù)據(jù)都是相互獨(dú)立的飒货!
問(wèn)題:進(jìn)程中魄衅,操作變量數(shù)據(jù),是共享的嗎塘辅?No 獨(dú)立的
from multiprocessing import current_process, Process
import time
全局變量:可以被多個(gè)進(jìn)程共享嗎晃虫?
ticket = 3
def my_proc():
# 1. 多個(gè)進(jìn)程,能不能都讀取到全局變量的值扣墩?True
# for i in range(ticket):
# print(current_process().name, ":", i)
# 2. 多個(gè)進(jìn)程哲银,使用的同一個(gè)全局變量的值,是共享的嗎呻惕?
global ticket
while ticket > 0:
print(current_process().name, ": " , ticket)
ticket -= 1
3. 函數(shù)參數(shù)處理
#def my_proc(t):
# while t > 0:
# print(current_process().name, ": ", t)
# t -= 1
if __name__ == "__main__":
# 創(chuàng)建兩個(gè)進(jìn)程
p1 = Process(target=my_proc)
p2 = Process(target=my_proc)
# p1 = Process(target=my_proc, args=(ticket,))
# p2 = Process(target=my_proc, args=(ticket,))
# 啟動(dòng)進(jìn)程
p1.start()
p2.start()
time.sleep(5)
print(ticket)
問(wèn)題:進(jìn)程之間怎么完成數(shù)據(jù)共享|通信
import multiprocessing
解決方案1:通過(guò)第三方的東西進(jìn)行交互
# A進(jìn)程將數(shù)據(jù)存儲(chǔ)到文件|數(shù)據(jù)庫(kù)中
# B進(jìn)程從文件|數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)
# 文件|數(shù)據(jù)庫(kù)是獨(dú)立于A和B進(jìn)程之外的第三方荆责,能被A/B同時(shí)訪問(wèn)
解決方案2:通過(guò)第三方、獨(dú)立于進(jìn)程你之外的數(shù)據(jù)類型完成進(jìn)程之間的通信
答案:可以亚脆!
python提供了如下幾種方案
- 事件對(duì)象multiprocessing.Event-進(jìn)程之間的狀態(tài)標(biāo)記通信
'''
threading.Event
multiprocessing.Event
'''
event = multiprocessing.Event()
2.multiprocessing模塊中提供了條件類型Condition做院,可以完成多個(gè)進(jìn)程之間的通信操作
'''
threading.Condition
multiprocessing.Condition
'acquire', 'notify', 'notify_all', 'release', 'wait', 'wait_for'
'''
3.multiprocessing模塊中提供了一個(gè)數(shù)據(jù)操作類型Queue,可以完成進(jìn)程之間的數(shù)據(jù)簡(jiǎn)單共享
不推薦使用
'''
threading: queue.Queue隊(duì)列實(shí)現(xiàn)線程間數(shù)據(jù)的安全共享
multiprocessing.Queue 進(jìn)程隊(duì)列
get()從隊(duì)列中獲取數(shù)據(jù)
get_nowait()從隊(duì)列中獲取數(shù)據(jù)型酥,非阻塞
put()向隊(duì)列中添加數(shù)據(jù)
put_nowait()向隊(duì)列中添加數(shù)據(jù)山憨,非阻塞
empty()判斷隊(duì)列是否為空
full()判斷隊(duì)列是否已滿
qsize()獲取隊(duì)列中元素的個(gè)數(shù)
'''
4.multiprocessing提供了一個(gè)專門用來(lái)進(jìn)行多個(gè)進(jìn)程之間數(shù)據(jù)共享的類型:Manager
可以在本地多個(gè)進(jìn)程之間完成數(shù)據(jù)共享和通信,可以在網(wǎng)絡(luò)上遠(yuǎn)程的兩個(gè)主機(jī)之間實(shí)現(xiàn)數(shù)據(jù)的共享和通信弥喉,可以在網(wǎng)絡(luò)上遠(yuǎn)程的兩個(gè)主機(jī)之間
'''
用于數(shù)據(jù)共享:存儲(chǔ)數(shù)據(jù)
Array:數(shù)組
Queue:列表
list:用于創(chuàng)建一個(gè)存儲(chǔ)數(shù)據(jù)的列表
ticket_box = multiprocessing.Manager.list()
dict:用于創(chuàng)建一個(gè)存儲(chǔ)數(shù)據(jù)的字典
ticket_box = multiprocessing.Manager.dict()
Value:用于創(chuàng)建一個(gè)存儲(chǔ)數(shù)據(jù)的變量
ticket = multiprocessing.Manager.Value('count', 100)
type a:
manager = Manager()
my_list = manager.list()
my_dict = manager.dict()
my_value= manager.Value('count', 100)
type b:
with Manager() as manager:
my_list = manager.list()
my_dict = manager.dict()
my_value= manager.Value('count', 100)
數(shù)據(jù)通信進(jìn)程管理:
Lock/RLock:進(jìn)程鎖
Barrir:進(jìn)程同步對(duì)象
Semaphore|BoundedSemaphore:信號(hào)量進(jìn)程同步類型
Event:事件類型
Condition:條件類型
Pool:進(jìn)程池類型
'''