Num01-->進程的創(chuàng)建-fork
Python的os模塊封裝了常見的系統(tǒng)調(diào)用妖混,其中就包括fork懈词,可以在Python程序中輕松創(chuàng)建子進程臼闻。
import os
# 注意,fork函數(shù)释漆,只在Unix/Linux/Mac上運行芬膝,windows不可以
pid = os.fork()
if pid == 0:
print('我是子進程')
else:
print('我是父進程')
#結果如下:
#我是父進程
#我是子進程
以上代碼加以說明如下:
程序執(zhí)行到os.fork()時望门,操作系統(tǒng)會創(chuàng)建一個新的進程(子進程),然后復制父進程的所有信息到子進程中锰霜。
然后父進程和子進程都會從fork()函數(shù)中得到一個返回值筹误,在子進程中這個值一定是0,而父進程中是子進程的 id號癣缅。
在Unix/Linux操作系統(tǒng)中厨剪,提供了一個fork()系統(tǒng)函數(shù),它非常特殊所灸。
普通的函數(shù)調(diào)用丽惶,調(diào)用一次,返回一次爬立,但是fork()調(diào)用一次钾唬,返回兩次,因為操作系統(tǒng)自動把當前進程(稱為父進程)復制了一份(稱為子進程)侠驯,然后抡秆,分別在父進程和子進程內(nèi)返回。
子進程永遠返回0吟策,而父進程返回子進程的ID儒士。
這樣做的理由是,一個父進程可以fork出很多子進程檩坚,所以着撩,父進程要記下每個子進程的ID,而子進程只需要調(diào)用getppid()就可以拿到父進程的ID匾委。
Num02-->多進程修改全局變量
#coding=utf-8
import os
import time
num = 0
# 注意拖叙,fork函數(shù),只在Unix/Linux/Mac上運行赂乐,windows不可以
pid = os.fork()
if pid == 0:
num+=1
print('我是子進程---num=%d'%num)
else:
time.sleep(1)
num+=1
print('我是父進程---num=%d'%num)
#結果如下:
#我是父進程---num=1
#我是子進程---num=1
在多進程中個薯鳍,每個進程中所有數(shù)據(jù)(包括全局變量)都各擁有一份,互不影響挨措。
Num03-->多次fork問題
Test01-->fork兩次產(chǎn)生四個進程
#coding=utf-8
import os
import time
# 注意挖滤,fork函數(shù)崩溪,只在Unix/Linux/Mac上運行,windows不可以
pid = os.fork()
if pid == 0:
print('我是第一次fork中的子進程')
else:
print('我是第一次fork中的父進程')
pid = os.fork()
if pid == 0:
print('我是第二次fork中的子進程')
else:
print('我是第二次fork中的父進程')
time.sleep(1)
Test02-->fork兩次產(chǎn)生三個進程
#! /usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import time
def sing():
print('--我是第一次fork的子進程--')
time.sleep(1)
def dance():
ppid = os.fork()
if ppid > 0:
print('--我是第二次fork的父進程--')
time.sleep(1)
elif ppid == 0:
print('--我是第二次fork的子進程--')
time.sleep(1)
def main():
pid = os.fork()
if pid > 0:
dance()
elif pid == 0:
sing()
if __name__ == "__main__":
main()
Num04-->進程的第一種創(chuàng)建方式-multiprocessing
multiprocessing模塊提供了一個Process類來代表一個進程對象斩松。
#coding=utf-8
from multiprocessing import Process
import os
# 子進程要執(zhí)行的代碼
def fun_proc(name):
print('子進程運行中伶唯,name= %s ,pid=%d' % (name, os.getpid()))
if __name__=='__main__':
print('父進程 %d' % os.getpid())
p = Process(target=fun_proc, args=('我是子進程',))
print('子進程將要執(zhí)行')
p.start()
p.join()
print('子進程已結束')
# 結果如下:
# 父進程 11876
# 子進程將要執(zhí)行
# 子進程運行中,name= 我是子進程 ,pid=14644
# 子進程已結束
對以上代碼加以說明:
1惧盹,用Process類創(chuàng)建子進程時抵怎,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù)(是一個元組)。
2岭参,調(diào)用start()方式啟動子進程。
3尝艘,join()方法可以等待子進程結束后再繼續(xù)往下運行演侯,通常用于進程間的同步。
Test01-->Process的語法如下:
Process([group [, target [, name [, args [, kwargs]]]]])
target:表示這個進程實例所調(diào)用對象背亥;
args:表示調(diào)用對象的位置參數(shù)元組秒际;
kwargs:表示調(diào)用對象的關鍵字參數(shù)字典;
name:為當前進程實例的別名狡汉;
group:大多數(shù)情況下用不到娄徊,表示在哪個組;
Process類常用方法:
is_alive():判斷進程實例是否還在執(zhí)行盾戴;
join([timeout]):是否等待進程實例執(zhí)行結束寄锐,或等待多少秒;
start():啟動進程實例(創(chuàng)建子進程)尖啡;
run():如果沒有給定target參數(shù)橄仆,對這個對象調(diào)用start()方法時,就將執(zhí)行對象中的run()方法衅斩;
terminate():不管任務是否完成盆顾,立即終止;
Process類常用屬性:
name:當前進程實例別名畏梆,默認為Process-N您宪,N為從1開始遞增的整數(shù);
pid:當前進程實例的PID值奠涌;
Test02-->創(chuàng)建一個進程對象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
from multiprocessing import Process
import os
from time import sleep
# 子進程要執(zhí)行的代碼
def fun_proc(name, age, **kwargs):
for i in range(5):
print('子進程運行中宪巨,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
print(kwargs)
sleep(1)
if __name__=='__main__':
print('父進程 %d' % os.getpid())
p = Process(target=fun_proc, args=('我是子進程',66), kwargs={"得分":666})
print('子進程將要執(zhí)行')
p.start()
sleep(1)
# p.terminate()# 提前結束子進程,不管子進程的任務是否完成
p.join()
print('子進程已結束')
# 結果如下:
# 父進程 7744
# 子進程將要執(zhí)行
# 子進程運行中铣猩,name= 我是子進程,age=66 ,pid=8064...
# {'得分': 666}
# 子進程運行中揖铜,name= 我是子進程,age=66 ,pid=8064...
# {'得分': 666}
# 子進程運行中,name= 我是子進程,age=66 ,pid=8064...
# {'得分': 666}
# 子進程運行中达皿,name= 我是子進程,age=66 ,pid=8064...
# {'得分': 666}
# 子進程運行中天吓,name= 我是子進程,age=66 ,pid=8064...
# {'得分': 666}
# 子進程已結束
Test03-->創(chuàng)建兩個進程對象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
#coding=utf-8
from multiprocessing import Process
import time
import os
def worker_1(interval):
print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval) #程序將會被掛起interval秒
t_end = time.time()
print("worker_1,執(zhí)行時間為'%0.2f'秒"%(t_end - t_start))
def worker_2(interval):
print("worker_2,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval)
t_end = time.time()
print("worker_2,執(zhí)行時間為'%0.2f'秒"%(t_end - t_start))
def main():
#輸出當前程序的ID
print("進程ID:%s"%os.getpid())
#創(chuàng)建兩個進程對象贿肩,target指向這個進程對象要執(zhí)行的對象名稱,
#如果不指定name參數(shù)龄寞,默認的進程對象名稱為Process-N汰规,N為一個遞增的整數(shù)
p1=Process(target=worker_1,args=(2,))
p2=Process(target=worker_2,name="xiaoke",args=(1,))
#使用"進程對象名稱.start()"來創(chuàng)建并執(zhí)行一個子進程,
#這兩個進程對象在start后物邑,就會分別去執(zhí)行worker_1和worker_2方法中的內(nèi)容
p1.start()
p2.start()
#同時父進程仍然往下執(zhí)行溜哮,如果p2進程還在執(zhí)行,將會返回True
print("p2.is_alive=%s"%p2.is_alive())
print("p1.is_alive=%s"%p1.is_alive())
#輸出p1和p2進程的別名和pid
print("--p1進程的別名和pid--")
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("--p2進程的別名和pid--")
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
#join括號中不攜帶參數(shù)色解,表示父進程在這個位置要等待p1進程執(zhí)行完成后茂嗓,再繼續(xù)執(zhí)行下面的語句,一般用于進程間的數(shù)據(jù)同步科阎,
# 如果不寫這一句述吸,下面的is_alive判斷將會是True,
#可以嘗試著將下面的這條語句改成p1.join(1)锣笨,
#因為p2需要2秒以上才可能執(zhí)行完成蝌矛,父進程等待1秒很可能,不能讓p1完全執(zhí)行完成错英,
#所以下面的print會輸出True入撒,即p1仍然在執(zhí)行
print("--p1進程是否執(zhí)行完畢?椭岩?--")
p1.join()
print("p1.is_alive=%s"%p1.is_alive())
p2.join()
print("p2.is_alive=%s"%p2.is_alive())
if __name__ == '__main__':
main()
# 結果如下:
# 進程ID:4004
# p2.is_alive=True
# p1.is_alive=True
# --p1進程的別名和pid--
# p1.name=Process-1
# p1.pid=3352
# --p2進程的別名和pid--
# p2.name=xiaoke
# p2.pid=6092
# --p1進程是否執(zhí)行完畢茅逮??--
# worker_2,父進程(4004),當前進程(6092)
# worker_2,執(zhí)行時間為'1.00'秒
# worker_1,父進程(4004),當前進程(3352)
# worker_1,執(zhí)行時間為'2.00'秒
# p1.is_alive=False
# p2.is_alive=False
Num05-->進程的第二種創(chuàng)建方式--自己創(chuàng)建一個類簿煌,繼承Process類
定義:創(chuàng)建新的進程還可以使用類的方式氮唯。可以自定義一個類姨伟,繼承Process類惩琉。每次實例化這個類的時候,就等同于實例化這個進程對象夺荒。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
from multiprocessing import Process
import time
import os
#繼承Process類
class Process_Class(Process):
#因為Process類本身也有__init__方法瞒渠,這個子類相當于重寫了這個方法,
#但這樣就會帶來一個問題技扼,我們并沒有完全的初始化一個Process類伍玖,
# 所以就不能使用從這個類繼承的一些方法和屬性,
#最好的方法就是將繼承類本身傳遞給Process.__init__方法剿吻,完成這些初始化操作
def __init__(self,interval):
Process.__init__(self)
# 傳遞進來的屬性
self.interval = interval
#重寫了Process類的run()方法
def run(self):
print("子進程(%s) 開始執(zhí)行窍箍,父進程為(%s)"%(os.getpid(),os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print("子進程(%s)執(zhí)行結束,耗時%0.2f秒"%(os.getpid(),t_stop-t_start))
if __name__=="__main__":
t_start = time.time()
print("當前程序進程(%s)"%os.getpid())
p1 = Process_Class(2)
#對一個不包含target屬性的Process類執(zhí)行start()方法,就會運行這個類中的run()方法椰棘,所以這里會執(zhí)行p1.run()
p1.start()
p1.join()
t_stop = time.time()
print("父進程(%s)執(zhí)行結束纺棺,耗時%0.2f秒"%(os.getpid(),t_stop-t_start))
# 結果為:
# 當前程序進程(14736)
# 子進程(4292) 開始執(zhí)行,父進程為(14736)
# 子進程(4292)執(zhí)行結束邪狞,耗時2.00秒
# 父進程(14736)執(zhí)行結束祷蝌,耗時2.11秒
Num06-->進程池--Pool
當需要創(chuàng)建的子進程數(shù)量不多時,可以直接利用multiprocessing中的Process動態(tài)生成多個進程帆卓。但如果是上百甚至上千個目標巨朦,手動的去創(chuàng)建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法剑令。
初始化Pool時糊啡,可以指定一個最大進程數(shù),當有新的請求提交到Pool中時吁津,如果池還沒有滿悔橄,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到指定的最大值腺毫,那么該請求就會等待,直到池中有進程結束挣柬,才會創(chuàng)建新的進程來執(zhí)行潮酒。
#采用非阻塞的方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool
import os, time, random
def worker(num):
t_start = time.time()
print("子進程-%s開始執(zhí)行,進程號為%d" % (num, os.getpid()))
# random.random()隨機生成0~1之間的浮點數(shù)
time.sleep(random.random() * 2)
t_stop = time.time()
print("子進程-%s執(zhí)行完畢,耗時%0.2f" % (num, t_stop - t_start))
def main():
# 定義一個進程池邪蛔,最大進程數(shù)5
p = Pool(3)
for i in range(10):
# Pool.apply_async(要調(diào)用的目標,(傳遞給目標的參數(shù)元祖,))
# 異步的方式急黎,每次循環(huán)將會用空閑出來的子進程去調(diào)用目標
p.apply_async(worker, (i,))
m_start = time.time()
print("----start----")
p.close() # 關閉進程池,關閉后p不再接收新的請求
p.join() # 等待p中所有子進程執(zhí)行完成侧到,必須放在close語句之后
print("-----end-----")
m_stop = time.time()
print("父進程執(zhí)行完畢勃教,耗時%0.2f" % (m_stop - m_start))
if __name__ == '__main__':
main()
# 結果如下:
# ----start----
# 子進程-0開始執(zhí)行,進程號為3360
# 子進程-1開始執(zhí)行,進程號為16084
# 子進程-2開始執(zhí)行,進程號為12580
# 子進程-1執(zhí)行完畢,耗時0.05
# 子進程-3開始執(zhí)行,進程號為16084
# 子進程-3執(zhí)行完畢匠抗,耗時0.80
# 子進程-4開始執(zhí)行,進程號為16084
# 子進程-2執(zhí)行完畢故源,耗時1.63
# 子進程-5開始執(zhí)行,進程號為12580
# 子進程-0執(zhí)行完畢,耗時1.80
# 子進程-6開始執(zhí)行,進程號為3360
# 子進程-4執(zhí)行完畢汞贸,耗時1.55
# 子進程-7開始執(zhí)行,進程號為16084
# 子進程-7執(zhí)行完畢绳军,耗時0.07
# 子進程-8開始執(zhí)行,進程號為16084
# 子進程-8執(zhí)行完畢,耗時0.05
# 子進程-9開始執(zhí)行,進程號為16084
# 子進程-5執(zhí)行完畢矢腻,耗時1.01
# 子進程-6執(zhí)行完畢门驾,耗時1.10
# 子進程-9執(zhí)行完畢,耗時1.57
# -----end-----
# 父進程執(zhí)行完畢多柑,耗時4.26
multiprocessing.Pool常用函數(shù)解析:
apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行奶是,堵塞方式必須等待上一個進程退出才能執(zhí)行下一個進程),args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關鍵字參數(shù)列表聂沙;
apply(func[, args[, kwds]]):使用阻塞方式調(diào)用func
close():關閉Pool秆麸,使其不再接受新的任務;
terminate():不管任務是否完成逐纬,立即終止蛔屹;
join():主進程阻塞,等待子進程的退出豁生, 必須在close或terminate之后使用兔毒;
采用apply阻塞的方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool
import os, time, random
def worker(num):
t_start = time.time()
print("子進程-%s開始執(zhí)行,進程號為%d" % (num, os.getpid()))
# random.random()隨機生成0~1之間的浮點數(shù)
time.sleep(random.random() * 2)
t_stop = time.time()
print("子進程-%s執(zhí)行完畢,耗時%0.2f" % (num, t_stop - t_start))
def main():
# 定義一個進程池甸箱,最大進程數(shù)5
p = Pool(3)
for i in range(10):
# Pool.apply_async(要調(diào)用的目標,(傳遞給目標的參數(shù)元祖,))
# 異步的方式育叁,每次循環(huán)將會用空閑出來的子進程去調(diào)用目標
p.apply(worker, (i,))
m_start = time.time()
print("----start----")
p.close() # 關閉進程池,關閉后p不再接收新的請求
p.join() # 等待p中所有子進程執(zhí)行完成芍殖,必須放在close語句之后
print("-----end-----")
m_stop = time.time()
print("父進程執(zhí)行完畢豪嗽,耗時%0.2f" % (m_stop - m_start))
if __name__ == '__main__':
main()
# 結果如下:
# 子進程-0開始執(zhí)行,進程號為4464
# 子進程-0執(zhí)行完畢,耗時1.75
# 子進程-1開始執(zhí)行,進程號為11640
# 子進程-1執(zhí)行完畢豌骏,耗時1.33
# 子進程-2開始執(zhí)行,進程號為8756
# 子進程-2執(zhí)行完畢龟梦,耗時1.86
# 子進程-3開始執(zhí)行,進程號為4464
# 子進程-3執(zhí)行完畢,耗時0.70
# 子進程-4開始執(zhí)行,進程號為11640
# 子進程-4執(zhí)行完畢窃躲,耗時1.29
# 子進程-5開始執(zhí)行,進程號為8756
# 子進程-5執(zhí)行完畢计贰,耗時0.69
# 子進程-6開始執(zhí)行,進程號為4464
# 子進程-6執(zhí)行完畢,耗時0.33
# 子進程-7開始執(zhí)行,進程號為11640
# 子進程-7執(zhí)行完畢蒂窒,耗時1.83
# 子進程-8開始執(zhí)行,進程號為8756
# 子進程-8執(zhí)行完畢躁倒,耗時1.58
# 子進程-9開始執(zhí)行,進程號為4464
# 子進程-9執(zhí)行完畢,耗時1.37
# ----start----
# -----end-----
# 父進程執(zhí)行完畢洒琢,耗時0.08
Num07-->進程間的通信--Queue
進程(Process)之間有時間需要通信秧秉,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。如Queue衰抑、Pipes等戳晌。Queue本身是一個消息隊列瘟芝。
Test01--> 先看一個簡單的案例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Queue
q = Queue(3) # 初始化一個Queue對象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) # False
q.put("消息3")
print(q.full()) # True
# 因為消息列隊已滿下面的try都會拋出異常,第一個try會等待3秒后再拋出異常宫静,第二個Try會立刻拋出異常
try:
q.put("消息4", True, 3)
except:
print("消息列隊已滿卸勺,現(xiàn)有消息數(shù)量:%s" % q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列隊已滿致份,現(xiàn)有消息數(shù)量:%s" % q.qsize())
# 推薦的方式崇裁,先判斷消息列隊是否已滿,再寫入
if not q.full():
q.put_nowait("消息4")
# 讀取消息時蜻底,先判斷消息列隊是否為空骄崩,再讀取
if not q.empty():
for i in range(q.qsize()):
# print("取出消息:%s" % q.get())
print("取出消息:%s" % q.get_nowait())
# 結果是:
# False
# True
# 消息列隊已滿聘鳞,現(xiàn)有消息數(shù)量:3
# 消息列隊已滿,現(xiàn)有消息數(shù)量:3
# 取出消息:消息1
# 取出消息:消息2
# 取出消息:消息3
以上代碼加以說明:
初始化Queue()對象時(例如:q=Queue())要拂,若括號中沒有指定最大可接收的消息數(shù)量抠璃,或數(shù)量為負值,那么就代表可接受的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)脱惰;
Queue.qsize():返回當前隊列包含的消息數(shù)量搏嗡;
Queue.empty():如果隊列為空,返回True拉一,反之False 采盒;
Queue.full():如果隊列滿了,返回True,反之False蔚润;
Queue.get([block[, timeout]]):獲取隊列中的一條消息磅氨,然后將其從列隊中移除,block默認值為True嫡纠;
1)如果block使用默認值烦租,且沒有設置timeout(單位秒),消息列隊如果為空除盏,此時程序將被阻塞(停在讀取狀態(tài))叉橱,直到從消息列隊讀到消息為止,如果設置了timeout者蠕,則會等待timeout秒赏迟,若還沒讀取到任何消息,則拋出"Queue.Empty"異常蠢棱;
2)如果block值為False,消息列隊如果為空甩栈,則會立刻拋出"Queue.Empty"異常泻仙;
Queue.get_nowait():相當Queue.get(False);
Queue.put(item,[block[, timeout]]):將item消息寫入隊列量没,block默認值為True玉转;
1)如果block使用默認值,且沒有設置timeout(單位秒)殴蹄,消息列隊如果已經(jīng)沒有空間可寫入究抓,此時程序將被阻塞(停在寫入狀態(tài)),直到從消息列隊騰出空間為止袭灯,如果設置了timeout刺下,則會等待timeout秒,若還沒空間稽荧,則拋出"Queue.Full"異常橘茉;
2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常畅卓;
Queue.put_nowait(item):相當Queue.put(item, False)擅腰;
Test02-->在父進程創(chuàng)建兩個子進程,一個往Queue里面寫數(shù)據(jù)翁潘,一個從Queue里面讀數(shù)據(jù)趁冈。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進程
def write(q):
for value in ['A', 'B', 'C', 'quit']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進程
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
# 因為讀進程是一個死循環(huán),所以要設置一個標記拜马,用于退出
if value == "quit":
break
time.sleep(random.random())
if __name__ == '__main__':
# 父進程創(chuàng)建Queue渗勘,并傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子進程pr一膨,讀取:
pr.start()
pr.join()
print('所有數(shù)據(jù)都寫入并且讀完')
print("pw is alive:%s" % pw.is_alive())
print("pr is alive:%s" % pr.is_alive())
# 結果如下:
# Put A to queue...
# Put B to queue...
# Put C to queue...
# Put quit to queue...
# Get A from queue.
# Get B from queue.
# Get C from queue.
# Get quit from queue.
# 所有數(shù)據(jù)都寫入并且讀完
# pw is alive:False
# pr is alive:False
Test03-->進程池Pool中的Queue來進行進程間的通信
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool, Queue, Manager
import os
import time
import random
def task_write(q):
for s in ('hello', 'python', 'world', 'quit'):
q.put(s) # 向隊列中添加消息
print('%s 進程向隊列中添加消息:%s' % (os.getpid(), s))
time.sleep(random.random() * 2)
print('%s 進程要結束了' % os.getpid())
def task_read(q):
while True:
msg = q.get() # 阻塞式從隊列中收消息
print('%s 進程從隊列中取出消息:%s' % (os.getpid(), msg))
if msg == "quit":
break
time.sleep(random.random() * 2)
print('%s 進程要結束了' % os.getpid())
def main():
# 1.創(chuàng)建消息隊列對象
# q = Queue() #只能用于父子進程
# Manger().Queue() 消息隊列可用于進程池
q = Manager().Queue()
# 2.創(chuàng)建進程池呀邢,里面放兩個進程
my_pool = Pool(2)
# 3.添加任務
# 采用阻塞的方式
my_pool.apply(task_write, args=(q,))
my_pool.apply(task_read, args=(q,))
# 采用非阻塞的方式
# my_pool.apply_async(task_write, args=(q,))
# my_pool.apply_async(task_read, args=(q,))
# 4.關閉進程池
my_pool.close()
# 5.等待所有進程結束
my_pool.join()
if __name__ == "__main__":
main()
# 采用非阻塞的方式結果:
# 7380 進程向隊列中添加消息:hello
# 11256 進程從隊列中取出消息:hello
# 7380 進程向隊列中添加消息:python
# 11256 進程從隊列中取出消息:python
# 7380 進程向隊列中添加消息:world
# 11256 進程從隊列中取出消息:world
# 7380 進程向隊列中添加消息:quit
# 11256 進程從隊列中取出消息:quit
# 11256 進程要結束了
# 7380 進程要結束了
# 采用阻塞的方式結果:
# 96 進程向隊列中添加消息:hello
# 96 進程向隊列中添加消息:python
# 96 進程向隊列中添加消息:world
# 96 進程向隊列中添加消息:quit
# 96 進程要結束了
# 12412 進程從隊列中取出消息:hello
# 12412 進程從隊列中取出消息:python
# 12412 進程從隊列中取出消息:world
# 12412 進程從隊列中取出消息:quit
# 12412 進程要結束了