多進程
進程:正在執(zhí)行的應(yīng)用程序
多進程:多個運行的應(yīng)用程序
Python多進程開發(fā):內(nèi)建標準模塊multiprocessing
可以通過該模塊的Process進程類型,可以很方便的創(chuàng)建和管理多個進程
常見的multiprocessing屬性和模塊
multiprocessing.Process 進程類型窟勃,用于創(chuàng)建和管理進程
multiprocessing.Lock/RLock 進程互斥鎖/重用鎖掉分, 用于進程同步备图,數(shù)據(jù)鎖
multiprocessing.Event 進程事件類型誉裆,用于進程同步,數(shù)據(jù)信號通信
multiprocessing.Condition 進程條件類型怔揩, 用于進程同步右蕊, 數(shù)據(jù)信號通信
multiprocessing.Queue 進程隊列類型琼稻,用于多進程數(shù)據(jù)共享
multiprocessing.Manager 進程管理類型,專門用于多進程數(shù)據(jù)共享
multiprocessing.Listener/Client 進程監(jiān)聽客戶端饶囚,基于網(wǎng)絡(luò)多進程之間的數(shù)據(jù)共享
多進程的基礎(chǔ)操作及面向?qū)ο蟮膶崿F(xiàn)
#coding:utf-8
import multiprocessing, os
#基于函數(shù)
def my_proc():
print("第一個獨立的進程帕翻,程序的進程編號:", os.getpid(), os.getppid())
def my_proc2(name):
print(name, "第二個獨立的進程,程序的進程編號:", os.getpid(), os.getppid())
#基于類型的
class MyProc(multiprocessing.Process):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print(self.name, "基于類型的進程執(zhí)行", os.getpid(),self.ident, os.getppid())
if __name__ == "__main__":
#創(chuàng)建基于函數(shù)的多進程
p1 = multiprocessing.Process(target=my_proc)
p2 = multiprocessing.Process(target=my_proc)
#啟動進程
p1.start()
p2.start()
#基于函數(shù)的萝风,添加參數(shù)之后的多進程
p11 = multiprocessing.Process(target=my_proc2, args=('tom',))#元組中若只有一個元素嘀掸,則必須帶逗號!
p12 = multiprocessing.Process(target=my_proc2, args=('jerry',))#元組中若只有一個元素规惰,則必須帶逗號睬塌!
#啟動進程
p11.start()
p12.start()
#創(chuàng)建基于類型的多個進程
p1 = MyProc("進程A")
p2 = MyProc("進程B")
#啟動進程
p1.start()
p2.start()
多進程的簡化:內(nèi)置進程池
多進程的操作在實際應(yīng)用中也是非常多的,但是純底層的代碼開發(fā)控制并發(fā)也是一件非常繁 瑣的事情歇万,所以就出現(xiàn)了面向過程多進程并發(fā)的優(yōu)化操作方式:進程池 Pool
通過進程池 Pool 可以快速創(chuàng)建多個進程執(zhí)行指定函數(shù)揩晴,完成高并發(fā)處理操作
"""
進程池
"""
#引入需要的模塊
import multiprocessing, os, time
#創(chuàng)建程序函數(shù)
def my_proc():
print(multiprocessing.current_process().name, "一個進程正在執(zhí)行", os.getpid(), os.getppid())
time.sleep(1)
if __name__ == "__main__":
#創(chuàng)建一個進程池
pool = multiprocessing.Pool(2) #同時工作的任務(wù)數(shù)
#循環(huán)任務(wù)
for i in range(20): #安排的任務(wù)數(shù)
pool.apply_async(my_proc)
#停止提交任務(wù)
pool.close()
#獨占執(zhí)行
pool.join()
簡單案例:多進程下載 有了進程池,可以簡單完成一個多進程任務(wù)下載的操作處理
#模擬下載:通過多進程的方式執(zhí)行
import time
from multiprocessing import current_process, Pool
def download(url):
#下載函數(shù)
print(current_process().name, "開始下載數(shù)據(jù)》》》》", url)
time.sleep(1)
print(current_process().name, "下載數(shù)據(jù)完成《《《")
time.sleep(1)
return "下載號的數(shù)據(jù)"
def sava_data(data):
print("保存下載的數(shù)據(jù):", data)
if __name__ == "__main__":
#創(chuàng)建一個進程池
pool = Pool(5)
#循環(huán)下載數(shù)據(jù)
for i in range(20):
pool.apply_async(download, args=("http://baudu.com",), callback=sava_data)
#停止提交任務(wù)給進程池
pool.close()
#獨占
pool.join()
多個進程通信:multiprocessing.Manager
不同線程之間的數(shù)據(jù)通信贪磺,涉及到核心的數(shù)據(jù)共享問題硫兰,主要由 PYTHON 中提供的內(nèi)建模 塊 multiprocessing.Manager 類型實現(xiàn),該類型內(nèi)置了大量的用于數(shù)據(jù)共享的操作
multiprocessing.Manager 常見屬性和方法
Array 內(nèi)置進程間共享數(shù)組類型
Queue 內(nèi)置進程間共享隊列類型
list() 內(nèi)置進程間共享列表類型
dict() 內(nèi)置進程間共享字典類型
Value 內(nèi)置進程間共享值類型
Barrier 進程同步類型
BoundedSemaphore| Semaphore 進程信號量類型
Lock|RLock 進程互斥鎖/重用鎖
Event 進程同步事件類型
Condition 進程同步條件類型
import multiprocessing, time, random
#創(chuàng)建一個條件對象
con = multiprocessing.Condition()
#創(chuàng)建籃子
basket = list()
def product():
while True:
time.sleep(1)
#上鎖
con.acquire()
if len(basket) > 20:
print(":籃子滿了寒锚, 快來吃吧")
con.wait()
else:
#生產(chǎn)一個包子
_no = random.randint(1, 10)
print("蒸好了一個包子", _no)
basket.append(_no)
con.notify()
#解鎖
con.release()
def consumer():
while True:
time.sleep(0.5)
#上鎖
con.acquire()
if len(basket) <= 0:
print("吃光了劫映,快上菜吧")
con.wait()
else:
_no = basket.pop()
print("吃掉了一個包子", _no)
con.notify()
#解鎖
con.release()
if __name__ == "__main__":
for i in range(5):
p = multiprocessing.Process(name="生產(chǎn)者" + str(i) + "號", target=product)
p.start()
for j in range(5):
c = multiprocessing.Process(name="消費者" + str(j) + "號", target=consumer)
c.start()
多個進程通信:multiprocessing.Queue
多個進程之間的通信操作,數(shù)據(jù)的傳遞在 PYTHON 中的 multiprocessing 模塊中提供了一個 專門用于多進程之間進行數(shù)據(jù)傳遞的隊列:Queue
multiprocessing.Queue 常見屬性和方法
put(data [, timeout=None]) 添加一個數(shù)據(jù)到隊列中
put_nowait(data) 添加一個數(shù)據(jù)到隊列中刹前,非阻塞模式
get([timeout=None]) 從隊列中獲取一個數(shù)據(jù)
get_nowait() 從隊列中獲取一個數(shù)據(jù)泳赋,非阻塞模式
full() 判斷隊列是否已滿
empty() 判斷隊列是否已空
close() 關(guān)閉隊列
qsize() 獲取隊列中的元素數(shù)量
引入需要的模塊
import multiprocessing, time, random
#定義一個隊列儲存數(shù)據(jù)
basket = multiprocessing.Queue(20)
def product():
while True:
time.sleep(1)
_no = random.randint(0, 10)
try:
basket.put(_no, timeout=1)
print("生產(chǎn)者生產(chǎn)了一個數(shù)據(jù)", _no)
except:
basket.full()
print("數(shù)據(jù)框滿了")
def consumer():
while True:
time.sleep(0.5)
try:
_no = basket.get(timeout=1)
print("消費者取走了一個數(shù)據(jù)", _no)
except:
basket.empty()
print("數(shù)據(jù)框空了")
if __name__ == "__main__":
for i in range(2):
p = multiprocessing.Process(target=product)
p.start()
for j in range(2):
c = multiprocessing.Process(target=consumer)
c.start()