進程間通信——隊列和管道(multiprocess.Queue旺芽、multiprocess.Pipe)
進程間通信
IPC(Inter-Process Communication)
隊列?
概念介紹
創(chuàng)建共享的進程隊列裸弦,Queue是多進程安全的隊列掌腰,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞满力。?
Queue([maxsize])
創(chuàng)建共享的進程隊列焕参。
參數(shù) :maxsize是隊列中允許的最大項數(shù)轻纪。如果省略此參數(shù),則無大小限制叠纷。
底層隊列使用管道和鎖定實現(xiàn)刻帚。
方法介紹
Queue([maxsize])
創(chuàng)建共享的進程隊列。maxsize是隊列中允許的最大項數(shù)涩嚣。如果省略此參數(shù)崇众,則無大小限制。底層隊列使用管道和鎖定實現(xiàn)航厚。另外校摩,還需要運行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空阶淘,此方法將阻塞衙吩,直到隊列中有項目可用為止。block用于控制阻塞行為溪窒,默認為True. 如果設(shè)置為False坤塞,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間澈蚌,用在阻塞模式中摹芙。如果在制定的時間間隔內(nèi)沒有項目變?yōu)榭捎茫瑢⒁l(fā)Queue.Empty異常宛瞄。
q.get_nowait( )
同q.get(False)方法浮禾。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿份汗,此方法將阻塞至有空間可用為止盈电。block控制阻塞行為,默認為True杯活。如果設(shè)置為False匆帚,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短旁钧。超時后將引發(fā)Queue.Full異常吸重。
q.qsize()
返回隊列中目前項目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠歪今,因為在返回結(jié)果和在稍后程序中使用結(jié)果之間嚎幸,隊列中可能添加或刪除了項目。在某些系統(tǒng)上寄猩,此方法可能引發(fā)NotImplementedError異常嫉晶。
q.empty()
如果調(diào)用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目车遂,結(jié)果是不可靠的封断。也就是說斯辰,在返回和使用結(jié)果之間舶担,隊列中可能已經(jīng)加入新的項目。
q.full()
如果q已滿彬呻,返回為True. 由于線程的存在衣陶,結(jié)果也可能是不可靠的(參考q.empty()方法)。闸氮。
其他方法(了解)
q.close()
關(guān)閉隊列剪况,防止隊列中加入更多數(shù)據(jù)。調(diào)用此方法時蒲跨,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù)译断,但將在此方法完成時馬上關(guān)閉。如果q被垃圾收集或悲,將自動調(diào)用此方法孙咪。關(guān)閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號或異常。例如巡语,如果某個使用者正被阻塞在get()操作上翎蹈,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動連接后臺線程男公。這可以防止join_thread()方法阻塞荤堪。
q.join_thread()
連接隊列的后臺線程。此方法用于在調(diào)用q.close()方法后枢赔,等待所有隊列項被消耗澄阳。默認情況下,此方法由不是q的原始創(chuàng)建者的所有進程調(diào)用踏拜。調(diào)用q.cancel_join_thread()方法可以禁止這種行為寇荧。
代碼實例
單看隊列用法
'''
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基于消息傳遞實現(xiàn)的,但是隊列接口
'''from multiprocessing import Queue
q=Queue(3)#put ,get ,put_nowait,get_nowait,full,emptyq.put(3)
q.put(3)
q.put(3)# q.put(3)? # 如果隊列已經(jīng)滿了,程序就會停在這里执隧,等待數(shù)據(jù)被別人取走揩抡,再將數(shù)據(jù)放入隊列。? ? ? ? ? # 如果隊列中的數(shù)據(jù)一直不被取走镀琉,程序就會永遠停在這里峦嗤。try:
? ? q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞屋摔,但是會因為隊列滿了而報錯烁设。except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息装黑。? ? print('隊列已經(jīng)滿了')# 因此副瀑,我們再放入數(shù)據(jù)之前,可以先看一下隊列的狀態(tài)恋谭,如果已經(jīng)滿了糠睡,就不繼續(xù)put了。print(q.full()) #滿了print(q.get())print(q.get())print(q.get())# print(q.get()) # 同put方法一樣疚颊,如果隊列已經(jīng)空了狈孔,那么繼續(xù)取就會出現(xiàn)阻塞。try:
? ? q.get_nowait(3) # 可以使用get_nowait材义,如果隊列滿了不會阻塞均抽,但是會因為沒取到值而報錯。except: # 因此我們可以用一個try語句來處理這個錯誤其掂。這樣程序不會一直阻塞下去油挥。? ? print('隊列已經(jīng)空了')print(q.empty()) #空了
上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法款熬,以及這些方法的使用和現(xiàn)象深寥。
import timefrom multiprocessing import Process, Queuedef f(q):
? ? q.put([time.asctime(), 'from Eva', 'hello'])? #調(diào)用主函數(shù)中p進程傳遞過來的進程參數(shù) put函數(shù)為向隊列中添加一條數(shù)據(jù)。if __name__ == '__main__':
? ? q = Queue() #創(chuàng)建一個Queue對象? ? p = Process(target=f, args=(q,)) #創(chuàng)建一個進程? ? p.start()
? ? print(q.get())
? ? p.join()
批量生產(chǎn)數(shù)據(jù)放入隊列再批量獲取結(jié)果 x
上面是一個queue的簡單應(yīng)用华烟,使用隊列q對象調(diào)用get函數(shù)來取得隊列中最先進入的數(shù)據(jù)翩迈。 接下來看一個稍微復(fù)雜一些的例子:
import osimport timeimport multiprocessing# 向queue中輸入數(shù)據(jù)的函數(shù)def inputQ(queue):
? ? info = str(os.getpid()) + '(put):' + str(time.asctime())
? ? queue.put(info)# 向queue中輸出數(shù)據(jù)的函數(shù)def outputQ(queue):
? ? info = queue.get()
? ? print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Mainif __name__ == '__main__':
? ? multiprocessing.freeze_support()
? ? record1 = []? # store input processes? ? record2 = []? # store output processes? ? queue = multiprocessing.Queue(3)
? ? # 輸入進程? ? for i in range(10):
? ? ? ? process = multiprocessing.Process(target=inputQ,args=(queue,))
? ? ? ? process.start()
? ? ? ? record1.append(process)
? ? # 輸出進程? ? for i in range(10):
? ? ? ? process = multiprocessing.Process(target=outputQ,args=(queue,))
? ? ? ? process.start()
? ? ? ? record2.append(process)
? ? for p in record1:
? ? ? ? p.join()
? ? for p in record2:
? ? ? ? p.join()
生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度盔夜。
為什么要使用生產(chǎn)者和消費者模式
在線程世界里负饲,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程喂链。在多線程開發(fā)當中返十,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢椭微,那么生產(chǎn)者就必須等待消費者處理完洞坑,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理蝇率,如果消費者的處理能力大于生產(chǎn)者迟杂,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式本慕。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題排拷。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊锅尘,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理监氢,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取浪腐,阻塞隊列就相當于一個緩沖區(qū)纵揍,平衡了生產(chǎn)者和消費者的處理能力。
基于隊列實現(xiàn)生產(chǎn)者消費者模型
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
? ? while True:
? ? ? ? res=q.get()
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
? ? for i in range(10):
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? res='包子%s' %i
? ? ? ? q.put(res)
? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
? ? q=Queue()
? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
? ? #開始? ? p1.start()
? ? c1.start()
? ? print('主')
此時的問題是主進程永遠不會結(jié)束议街,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了泽谨,但是消費者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步傍睹。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后隔盛,往隊列中再發(fā)一個結(jié)束信號犹菱,這樣消費者在接收到結(jié)束信號后就可以break出死循環(huán)拾稳。
改良版——生產(chǎn)者消費者模型
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
? ? while True:
? ? ? ? res=q.get()
? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
? ? for i in range(10):
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? res='包子%s' %i
? ? ? ? q.put(res)
? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
? ? q.put(None) #發(fā)送結(jié)束信號if __name__ == '__main__':
? ? q=Queue()
? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
? ? #開始? ? p1.start()
? ? c1.start()
? ? print('主')
注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā)腊脱,主進程里同樣可以發(fā)访得,但主進程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號
主進程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號None
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
? ? while True:
? ? ? ? res=q.get()
? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
? ? for i in range(2):
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? res='包子%s' %i
? ? ? ? q.put(res)
? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
? ? q=Queue()
? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
? ? #開始? ? p1.start()
? ? c1.start()
? ? p1.join()
? ? q.put(None) #發(fā)送結(jié)束信號? ? print('主')
但上述解決方式,在有多個生產(chǎn)者和多個消費者時陕凹,我們則需要用一個很low的方式去解決
多個消費者的例子:有幾個消費者就需要發(fā)送幾次結(jié)束信號
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
? ? while True:
? ? ? ? res=q.get()
? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):
? ? for i in range(2):
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? res='%s%s' %(name,i)
? ? ? ? q.put(res)
? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
? ? q=Queue()
? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))
? ? p2=Process(target=producer,args=('骨頭',q))
? ? p3=Process(target=producer,args=('泔水',q))
? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
? ? c2=Process(target=consumer,args=(q,))
? ? #開始? ? p1.start()
? ? p2.start()
? ? p3.start()
? ? c1.start()
? ? p1.join() #必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號? ? p2.join()
? ? p3.join()
? ? q.put(None) #有幾個消費者就應(yīng)該發(fā)送幾次結(jié)束信號None? ? q.put(None) #發(fā)送結(jié)束信號? ? print('主')
JoinableQueue([maxsize])?
創(chuàng)建可連接的共享進程隊列悍抑。這就像是一個Queue對象,但隊列允許項目的使用者通知生產(chǎn)者項目已經(jīng)被成功處理杜耙。通知進程是使用共享的信號和條件變量來實現(xiàn)的搜骡。?
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發(fā)出信號佑女,表示q.get()返回的項目已經(jīng)被處理记靡。如果調(diào)用此方法的次數(shù)大于從隊列中刪除的項目數(shù)量,將引發(fā)ValueError異常团驱。
q.join()
生產(chǎn)者將使用此方法進行阻塞摸吠,直到隊列中所有項目均被處理。阻塞將持續(xù)到為隊列中的每個項目均調(diào)用q.task_done()方法為止嚎花。
下面的例子說明如何建立永遠運行的進程寸痢,使用和處理隊列上的項目。生產(chǎn)者將項目放入隊列紊选,并等待它們被處理啼止。
方法介紹
from multiprocessing import Process,JoinableQueueimport time,random,osdef consumer(q):
? ? while True:
? ? ? ? res=q.get()
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
? ? ? ? q.task_done() #向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了def producer(name,q):
? ? for i in range(10):
? ? ? ? time.sleep(random.randint(1,3))
? ? ? ? res='%s%s' %(name,i)
? ? ? ? q.put(res)
? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
? ? q.join() #生產(chǎn)完畢,使用此方法進行阻塞兵罢,直到隊列中所有項目均被處理献烦。if __name__ == '__main__':
? ? q=JoinableQueue()
? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))
? ? p2=Process(target=producer,args=('骨頭',q))
? ? p3=Process(target=producer,args=('泔水',q))
? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
? ? c2=Process(target=consumer,args=(q,))
? ? c1.daemon=True
? ? c2.daemon=True
? ? #開始? ? p_l=[p1,p2,p3,c1,c2]
? ? for p in p_l:
? ? ? ? p.start()
? ? p1.join()
? ? p2.join()
? ? p3.join()
? ? print('主')
? ? #主進程等--->p1,p2,p3等---->c1,c2? ? #p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊列的數(shù)據(jù)? ? #因而c1,c2也沒有存在的價值了,不需要繼續(xù)阻塞在進程中影響主進程了。應(yīng)該隨著主進程的結(jié)束而結(jié)束,所以設(shè)置成守護進程就可以了趣些。
管道(了解)
介紹
#創(chuàng)建管道的類:Pipe([duplex]):在進程之間創(chuàng)建一條管道仿荆,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道#參數(shù)介紹:dumplex:默認管道是全雙工的拢操,如果將duplex射成False锦亦,conn1只能用于接收,conn2只能用于發(fā)送令境。#主要方法:? ? conn1.recv():接收conn2.send(obj)發(fā)送的對象杠园。如果沒有消息可接收,recv方法會一直阻塞舔庶。如果連接的另外一端已經(jīng)關(guān)閉抛蚁,那么recv方法會拋出EOFError。
? ? conn1.send(obj):通過連接發(fā)送對象惕橙。obj是與序列化兼容的任意對象
#其他方法:conn1.close():關(guān)閉連接瞧甩。如果conn1被垃圾回收,將自動調(diào)用此方法
conn1.fileno():返回連接使用的整數(shù)文件描述符
conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用弥鹦,返回True肚逸。timeout指定等待的最長時限。如果省略此參數(shù)彬坏,方法將立即返回結(jié)果朦促。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達栓始。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息务冕。maxlength指定要接收的最大字節(jié)數(shù)。如果進入的消息幻赚,超過了這個最大值禀忆,將引發(fā)IOError異常,并且在連接上無法進行進一步讀取坯屿。如果連接的另外一端已經(jīng)關(guān)閉油湖,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常领跛。
conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū)乏德,buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量吠昭,而size是要發(fā)送字節(jié)數(shù)喊括。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進行接收? ?
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息矢棚,并把它保存在buffer對象中郑什,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移蒲肋。返回值是收到的字節(jié)數(shù)蘑拯。如果消息長度大于可用的緩沖區(qū)空間钝满,將引發(fā)BufferTooShort異常。
pipe初使用
from multiprocessing import Process, Pipedef f(conn):
? ? conn.send("Hello The_Third_Wave")
? ? conn.close()if __name__ == '__main__':
? ? parent_conn, child_conn = Pipe()
? ? p = Process(target=f, args=(child_conn,))
? ? p.start()
? ? print(parent_conn.recv())
? ? p.join()
應(yīng)該特別注意管道端點的正確管理問題申窘。如果是生產(chǎn)者或消費者中都沒有使用管道的某個端點弯蚜,就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端剃法,在消費者中關(guān)閉管道的輸入端碎捺。如果忘記執(zhí)行這些步驟,程序可能在消費者中的recv()操作上掛起贷洲。管道是由操作系統(tǒng)進行引用計數(shù)的收厨,必須在所有進程中關(guān)閉管道后才能生成EOFError異常。因此优构,在生產(chǎn)者中關(guān)閉管道不會有任何效果诵叁,除非消費者也關(guān)閉了相同的管道端點。?
pipe初使用
from multiprocessing import Process, Pipedef f(parent_conn,child_conn):
? ? #parent_conn.close() #不寫close將不會引發(fā)EOFError? ? while True:
? ? ? ? try:
? ? ? ? ? ? print(child_conn.recv())
? ? ? ? except EOFError:
? ? ? ? ? ? child_conn.close()if __name__ == '__main__':
? ? parent_conn, child_conn = Pipe()
? ? p = Process(target=f, args=(parent_conn,child_conn,))
? ? p.start()
? ? child_conn.close()
? ? parent_conn.send('hello')
? ? parent_conn.close()
? ? p.join()
pipe實現(xiàn)生產(chǎn)者消費者模型
from multiprocessing import Process,Pipedef consumer(p,name):
? ? produce, consume=p
? ? produce.close()
? ? while True:
? ? ? ? try:
? ? ? ? ? ? baozi=consume.recv()
? ? ? ? ? ? print('%s 收到包子:%s' %(name,baozi))
? ? ? ? except EOFError:
? ? ? ? ? ? breakdef producer(seq,p):
? ? produce, consume=p
? ? consume.close()
? ? for i in seq:
? ? ? ? produce.send(i)if __name__ == '__main__':
? ? produce,consume=Pipe()
? ? c1=Process(target=consumer,args=((produce,consume),'c1'))
? ? c1.start()
? ? seq=(i for i in range(10))
? ? producer(seq,(produce,consume))
? ? produce.close()
? ? consume.close()
? ? c1.join()
? ? print('主進程')
多個消費之之間的競爭問題帶來的數(shù)據(jù)不安全問題
from multiprocessing import Process,Pipe,Lockdef consumer(p,name,lock):
? ? produce, consume=p
? ? produce.close()
? ? while True:
? ? ? ? lock.acquire()
? ? ? ? baozi=consume.recv()
? ? ? ? lock.release()
? ? ? ? if baozi:
? ? ? ? ? ? print('%s 收到包子:%s' %(name,baozi))
? ? ? ? else:
? ? ? ? ? ? consume.close()
? ? ? ? ? ? breakdef producer(p,n):
? ? produce, consume=p
? ? consume.close()
? ? for i in range(n):
? ? ? ? produce.send(i)
? ? produce.send(None)
? ? produce.send(None)
? ? produce.close()if __name__ == '__main__':
? ? produce,consume=Pipe()
? ? lock = Lock()
? ? c1=Process(target=consumer,args=((produce,consume),'c1',lock))
? ? c2=Process(target=consumer,args=((produce,consume),'c2',lock))
? ? p1=Process(target=producer,args=((produce,consume),10))
? ? c1.start()
? ? c2.start()
? ? p1.start()
? ? produce.close()
? ? consume.close()
? ? c1.join()
? ? c2.join()
? ? p1.join()
? ? print('主進程')
進程之間的數(shù)據(jù)共享
展望未來俩块,基于消息傳遞的并發(fā)編程是大勢所趨
即便是使用線程黎休,推薦做法也是將程序設(shè)計為大量獨立的線程集合浓领,通過消息隊列交換數(shù)據(jù)玉凯。
這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統(tǒng)中联贩。
但進程間應(yīng)該盡量避免通信漫仆,即便需要通信,也應(yīng)該選擇進程安全的工具來避免加鎖帶來的問題泪幌。
以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進程之間的數(shù)據(jù)共享問題盲厌。
Manager模塊介紹
進程間數(shù)據(jù)是獨立的,可以借助于隊列或管道實現(xiàn)通信祸泪,二者都是基于消息傳遞的
雖然進程間數(shù)據(jù)獨立吗浩,但可以通過Manager實現(xiàn)數(shù)據(jù)共享,事實上Manager的功能遠不止于此
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager例子
from multiprocessing import Manager,Process,Lockdef work(d,lock):
? ? with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會出現(xiàn)數(shù)據(jù)錯亂? ? ? ? d['count']-=1if __name__ == '__main__':
? ? lock=Lock()
? ? with Manager() as m:
? ? ? ? dic=m.dict({'count':100})
? ? ? ? p_l=[]
? ? ? ? for i in range(100):
? ? ? ? ? ? p=Process(target=work,args=(dic,lock))
? ? ? ? ? ? p_l.append(p)
? ? ? ? ? ? p.start()
? ? ? ? for p in p_l:
? ? ? ? ? ? p.join()
? ? ? ? print(dic)
進程池和multiprocess.Pool模塊
進程池
為什么要有進程池?進程池的概念没隘。
在程序?qū)嶋H處理問題過程中懂扼,忙時會有成千上萬的任務(wù)需要被執(zhí)行,閑時可能只有零星任務(wù)右蒲。那么在成千上萬個任務(wù)需要被執(zhí)行的時候阀湿,我們就需要去創(chuàng)建成千上萬個進程么?首先瑰妄,創(chuàng)建進程需要消耗時間陷嘴,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程间坐,操作系統(tǒng)也不能讓他們同時執(zhí)行灾挨,這樣反而會影響程序的效率邑退。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進程。那么我們要怎么做呢劳澄?
在這里瓜饥,要給大家介紹一個進程池的概念,定義一個池子浴骂,在里面放上固定數(shù)量的進程乓土,有需求來了,就拿一個池中的進程來處理任務(wù)溯警,等到處理完畢趣苏,進程并不關(guān)閉,而是將進程再放回進程池中繼續(xù)等待任務(wù)梯轻。如果有很多任務(wù)需要執(zhí)行食磕,池中的進程數(shù)量不夠,任務(wù)就要等待之前的進程執(zhí)行任務(wù)完畢歸來喳挑,拿到空閑進程才能繼續(xù)執(zhí)行彬伦。也就是說,池中進程的數(shù)量是固定的伊诵,那么同一時間最多有固定數(shù)量的進程在運行单绑。這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進程的時間曹宴,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果搂橙。
multiprocess.Pool模塊
概念介紹
Pool([numprocess? [,initializer [, initargs]]]):創(chuàng)建進程池
參數(shù)介紹
1 numprocess:要創(chuàng)建的進程數(shù),如果省略笛坦,將默認使用cpu_count()的值2 initializer:是每個工作進程啟動時要執(zhí)行的可調(diào)用對象区转,默認為None3 initargs:是要傳給initializer的參數(shù)組
主要方法
1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。2 '''需要強調(diào)的是:此操作并不會在所有池工作進程中并執(zhí)行func函數(shù)版扩。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù)废离,必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''3 4 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。5 '''此方法的結(jié)果是AsyncResult類的實例礁芦,callback是可調(diào)用對象蜻韭,接收輸入?yún)?shù)。當func的結(jié)果變?yōu)榭捎脮r宴偿,將理解傳遞給callback湘捎。callback禁止執(zhí)行任何阻塞操作兑障,否則將接收其他異步操作中的結(jié)果钱豁。'''6? ? 7 p.close():關(guān)閉進程池,防止進一步操作怪蔑。如果所有操作持續(xù)掛起娩践,它們將在工作進程終止前完成8 9 P.jion():等待所有工作進程退出活翩。此方法只能在close()或teminate()之后調(diào)用
其他方法(了解)
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj烹骨。實例具有以下方法2 obj.get():返回結(jié)果材泄,如果有必要則等待結(jié)果到達沮焕。timeout是可選的峦树。如果在指定時間內(nèi)還沒有到達,將引發(fā)一場旦事。如果遠程操作中引發(fā)了異常魁巩,它將在調(diào)用此方法時再次被引發(fā)姐浮。3 obj.ready():如果調(diào)用完成,返回True4 obj.successful():如果調(diào)用完成且沒有引發(fā)異常卖鲤,返回True肾扰,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常5 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩? obj.terminate():立即終止所有工作進程集晚,同時不執(zhí)行任何清理或結(jié)束任何掛起工作换怖。如果p被垃圾回收沉颂,將自動調(diào)用此函數(shù)
代碼實例
同步和異步
進程池的同步調(diào)用
import os,timefrom multiprocessing import Pooldef work(n):
? ? print('%s run' %os.getpid())
? ? time.sleep(3)
? ? return n**2if __name__ == '__main__':
? ? p=Pool(3) #進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)? ? res_l=[]
? ? for i in range(10):
? ? ? ? res=p.apply(work,args=(i,)) # 同步調(diào)用悦污,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 但不管該任務(wù)是否存在阻塞彻坛,同步調(diào)用都會在原地等著? ? print(res_l)
進程池的異步調(diào)用
import osimport timeimport randomfrom multiprocessing import Pooldef work(n):
? ? print('%s run' %os.getpid())
? ? time.sleep(random.random())
? ? return n**2if __name__ == '__main__':
? ? p=Pool(3) #進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)? ? res_l=[]
? ? for i in range(10):
? ? ? ? res=p.apply_async(work,args=(i,)) # 異步運行踏枣,根據(jù)進程池中有的進程數(shù)茵瀑,每次最多3個子進程在異步執(zhí)行? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 返回結(jié)果之后,將結(jié)果放入列表竞帽,歸還進程,之后再執(zhí)行新的任務(wù)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 需要注意的是疙渣,進程池中的三個進程不會同時開啟或者同時結(jié)束? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 而是執(zhí)行完一個就釋放一個進程堆巧,這個進程就去接收新的任務(wù)妄荔。? ? ? ? ? res_l.append(res)
? ? # 異步apply_async用法:如果使用異步提交的任務(wù)懦冰,主進程需要使用jion谣沸,等待進程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果? ? # 否則内地,主進程結(jié)束赋除,進程池可能還沒來得及執(zhí)行举农,也就跟著一起結(jié)束了? ? p.close()
? ? p.join()
? ? for res in res_l:
? ? ? ? print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
server:進程池版socket并發(fā)聊天
#Pool內(nèi)的進程數(shù)默認是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())
#開啟6個客戶端航背,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)
#在每個進程內(nèi)查看pid棱貌,會發(fā)現(xiàn)pid使用為4個婚脱,即多個客戶端公用4個進程from socket import *from multiprocessing import Poolimport os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn):
? ? print('進程pid: %s' %os.getpid())
? ? while True:
? ? ? ? try:
? ? ? ? ? ? msg=conn.recv(1024)
? ? ? ? ? ? if not msg:break? ? ? ? ? ? conn.send(msg.upper())
? ? ? ? except Exception:
? ? ? ? ? ? breakif __name__ == '__main__':
? ? p=Pool(4)
? ? while True:
? ? ? ? conn,*_=server.accept()
? ? ? ? p.apply_async(talk,args=(conn,))
? ? ? ? # p.apply(talk,args=(conn,client_addr)) #同步的話障贸,則同一時間只有一個客戶端能訪問
client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:
? ? msg=input('>>: ').strip()
? ? if not msg:continue? ? client.send(msg.encode('utf-8'))
? ? msg=client.recv(1024)
? ? print(msg.decode('utf-8'))
發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務(wù)端同一時間只有4個不同的pid涩维,只能結(jié)束一個客戶端嘀粱,另外一個客戶端才會進來.
回調(diào)函數(shù)
需要回調(diào)函數(shù)的場景:進程池中任何一個任務(wù)一旦處理完了,就立即告知主進程:我好了額垄分,你可以處理我的結(jié)果了薄湿。主進程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時間(阻塞)的任務(wù)放到進程池中吆倦,然后指定回調(diào)函數(shù)(主進程負責執(zhí)行)坐求,這樣主進程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果须妻。
使用多進程請求多個url來減少網(wǎng)絡(luò)等待浪費的時間
from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):
? ? print('<進程%s> get %s' %(os.getpid(),url))
? ? respone=requests.get(url)
? ? if respone.status_code == 200:
? ? ? ? return {'url':url,'text':respone.text}def pasrse_page(res):
? ? print('<進程%s> parse %s' %(os.getpid(),res['url']))
? ? parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
? ? with open('db.txt','a') as f:
? ? ? ? f.write(parse_res)if __name__ == '__main__':
? ? urls=[
? ? ? ? 'https://www.baidu.com',
? ? ? ? 'https://www.python.org',
? ? ? ? 'https://www.openstack.org',
? ? ? ? 'https://help.github.com/',
? ? ? ? 'http://www.sina.com.cn/'? ? ]
? ? p=Pool(3)
? ? res_l=[]
? ? for url in urls:
? ? ? ? res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
? ? ? ? res_l.append(res)
? ? p.close()
? ? p.join()
? ? print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了'''
打印結(jié)果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
爬蟲實例
import refrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(url,pattern):
? ? response=urlopen(url).read().decode('utf-8')
? ? return pattern,responsedef parse_page(info):
? ? pattern,page_content=info
? ? res=re.findall(pattern,page_content)
? ? for item in res:
? ? ? ? dic={
? ? ? ? ? ? 'index':item[0].strip(),
? ? ? ? ? ? 'title':item[1].strip(),
? ? ? ? ? ? 'actor':item[2].strip(),
? ? ? ? ? ? 'time':item[3].strip(),
? ? ? ? }
? ? ? ? print(dic)if __name__ == '__main__':
? ? regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'? ? pattern1=re.compile(regex,re.S)
? ? url_dic={
? ? ? ? 'http://maoyan.com/board/7':pattern1,
? ? }
? ? p=Pool()
? ? res_l=[]
? ? for url,pattern in url_dic.items():
? ? ? ? res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
? ? ? ? res_l.append(res)
? ? for i in res_l:
? ? ? ? i.get()
如果在主進程中等待進程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果绰更,則無需回調(diào)函數(shù)
無需回調(diào)函數(shù)
from multiprocessing import Poolimport time,random,osdef work(n):
? ? time.sleep(1)
? ? return n**2if __name__ == '__main__':
? ? p=Pool()
? ? res_l=[]
? ? for i in range(10):
? ? ? ? res=p.apply_async(work,args=(i,))
? ? ? ? res_l.append(res)
? ? p.close()
? ? p.join() #等待進程池中所有進程執(zhí)行完畢? ? nums=[]
? ? for res in res_l:
? ? ? ? nums.append(res.get()) #拿到所有結(jié)果? ? print(nums) #主進程拿到所有的處理結(jié)果,可以在主進程中進行統(tǒng)一進行處理