20.2局冰、python進程間通信——隊列和管道

進程間通信——隊列和管道(multiprocess.Queue旺芽、multiprocess.Pipe)

進程間通信

IPC(Inter-Process Communication)

隊列?

概念介紹

創(chuàng)建共享的進程隊列裸弦,Queue是多進程安全的隊列掌腰,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞满力。?

Queue([maxsize])

創(chuàng)建共享的進程隊列焕参。

參數(shù) :maxsize是隊列中允許的最大項數(shù)轻纪。如果省略此參數(shù),則無大小限制叠纷。

底層隊列使用管道和鎖定實現(xiàn)刻帚。

方法介紹

Queue([maxsize])

創(chuàng)建共享的進程隊列。maxsize是隊列中允許的最大項數(shù)涩嚣。如果省略此參數(shù)崇众,則無大小限制。底層隊列使用管道和鎖定實現(xiàn)航厚。另外校摩,還需要運行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>

Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] )

返回q中的一個項目。如果q為空阶淘,此方法將阻塞衙吩,直到隊列中有項目可用為止。block用于控制阻塞行為溪窒,默認為True. 如果設(shè)置為False坤塞,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間澈蚌,用在阻塞模式中摹芙。如果在制定的時間間隔內(nèi)沒有項目變?yōu)榭捎茫瑢⒁l(fā)Queue.Empty異常宛瞄。

q.get_nowait( )

同q.get(False)方法浮禾。

q.put(item [, block [,timeout ] ] )

將item放入隊列。如果隊列已滿份汗,此方法將阻塞至有空間可用為止盈电。block控制阻塞行為,默認為True杯活。如果設(shè)置為False匆帚,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短旁钧。超時后將引發(fā)Queue.Full異常吸重。

q.qsize()

返回隊列中目前項目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠歪今,因為在返回結(jié)果和在稍后程序中使用結(jié)果之間嚎幸,隊列中可能添加或刪除了項目。在某些系統(tǒng)上寄猩,此方法可能引發(fā)NotImplementedError異常嫉晶。

q.empty()

如果調(diào)用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目车遂,結(jié)果是不可靠的封断。也就是說斯辰,在返回和使用結(jié)果之間舶担,隊列中可能已經(jīng)加入新的項目。

q.full()

如果q已滿彬呻,返回為True. 由于線程的存在衣陶,結(jié)果也可能是不可靠的(參考q.empty()方法)。闸氮。

其他方法(了解)

q.close()

關(guān)閉隊列剪况,防止隊列中加入更多數(shù)據(jù)。調(diào)用此方法時蒲跨,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù)译断,但將在此方法完成時馬上關(guān)閉。如果q被垃圾收集或悲,將自動調(diào)用此方法孙咪。關(guān)閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號或異常。例如巡语,如果某個使用者正被阻塞在get()操作上翎蹈,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤。

q.cancel_join_thread()

不會再進程退出時自動連接后臺線程男公。這可以防止join_thread()方法阻塞荤堪。

q.join_thread()

連接隊列的后臺線程。此方法用于在調(diào)用q.close()方法后枢赔,等待所有隊列項被消耗澄阳。默認情況下,此方法由不是q的原始創(chuàng)建者的所有進程調(diào)用踏拜。調(diào)用q.cancel_join_thread()方法可以禁止這種行為寇荧。

代碼實例

單看隊列用法

'''

multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列

都是基于消息傳遞實現(xiàn)的,但是隊列接口

'''from multiprocessing import Queue

q=Queue(3)#put ,get ,put_nowait,get_nowait,full,emptyq.put(3)

q.put(3)

q.put(3)# q.put(3)? # 如果隊列已經(jīng)滿了,程序就會停在這里执隧,等待數(shù)據(jù)被別人取走揩抡,再將數(shù)據(jù)放入隊列。? ? ? ? ? # 如果隊列中的數(shù)據(jù)一直不被取走镀琉,程序就會永遠停在這里峦嗤。try:

? ? q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞屋摔,但是會因為隊列滿了而報錯烁设。except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息装黑。? ? print('隊列已經(jīng)滿了')# 因此副瀑,我們再放入數(shù)據(jù)之前,可以先看一下隊列的狀態(tài)恋谭,如果已經(jīng)滿了糠睡,就不繼續(xù)put了。print(q.full()) #滿了print(q.get())print(q.get())print(q.get())# print(q.get()) # 同put方法一樣疚颊,如果隊列已經(jīng)空了狈孔,那么繼續(xù)取就會出現(xiàn)阻塞。try:

? ? q.get_nowait(3) # 可以使用get_nowait材义,如果隊列滿了不會阻塞均抽,但是會因為沒取到值而報錯。except: # 因此我們可以用一個try語句來處理這個錯誤其掂。這樣程序不會一直阻塞下去油挥。? ? print('隊列已經(jīng)空了')print(q.empty()) #空了

上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法款熬,以及這些方法的使用和現(xiàn)象深寥。

import timefrom multiprocessing import Process, Queuedef f(q):

? ? q.put([time.asctime(), 'from Eva', 'hello'])? #調(diào)用主函數(shù)中p進程傳遞過來的進程參數(shù) put函數(shù)為向隊列中添加一條數(shù)據(jù)。if __name__ == '__main__':

? ? q = Queue() #創(chuàng)建一個Queue對象? ? p = Process(target=f, args=(q,)) #創(chuàng)建一個進程? ? p.start()

? ? print(q.get())

? ? p.join()

批量生產(chǎn)數(shù)據(jù)放入隊列再批量獲取結(jié)果 x

上面是一個queue的簡單應(yīng)用华烟,使用隊列q對象調(diào)用get函數(shù)來取得隊列中最先進入的數(shù)據(jù)翩迈。 接下來看一個稍微復(fù)雜一些的例子:

import osimport timeimport multiprocessing# 向queue中輸入數(shù)據(jù)的函數(shù)def inputQ(queue):

? ? info = str(os.getpid()) + '(put):' + str(time.asctime())

? ? queue.put(info)# 向queue中輸出數(shù)據(jù)的函數(shù)def outputQ(queue):

? ? info = queue.get()

? ? print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Mainif __name__ == '__main__':

? ? multiprocessing.freeze_support()

? ? record1 = []? # store input processes? ? record2 = []? # store output processes? ? queue = multiprocessing.Queue(3)

? ? # 輸入進程? ? for i in range(10):

? ? ? ? process = multiprocessing.Process(target=inputQ,args=(queue,))

? ? ? ? process.start()

? ? ? ? record1.append(process)

? ? # 輸出進程? ? for i in range(10):

? ? ? ? process = multiprocessing.Process(target=outputQ,args=(queue,))

? ? ? ? process.start()

? ? ? ? record2.append(process)

? ? for p in record1:

? ? ? ? p.join()

? ? for p in record2:

? ? ? ? p.join()

生產(chǎn)者消費者模型

在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度盔夜。

為什么要使用生產(chǎn)者和消費者模式

在線程世界里负饲,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程喂链。在多線程開發(fā)當中返十,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢椭微,那么生產(chǎn)者就必須等待消費者處理完洞坑,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理蝇率,如果消費者的處理能力大于生產(chǎn)者迟杂,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式本慕。

什么是生產(chǎn)者消費者模式

生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題排拷。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊锅尘,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理监氢,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取浪腐,阻塞隊列就相當于一個緩沖區(qū)纵揍,平衡了生產(chǎn)者和消費者的處理能力。

基于隊列實現(xiàn)生產(chǎn)者消費者模型

from multiprocessing import Process,Queueimport time,random,osdef consumer(q):

? ? while True:

? ? ? ? res=q.get()

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):

? ? for i in range(10):

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? res='包子%s' %i

? ? ? ? q.put(res)

? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':

? ? q=Queue()

? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))

? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))

? ? #開始? ? p1.start()

? ? c1.start()

? ? print('主')

此時的問題是主進程永遠不會結(jié)束议街,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了泽谨,但是消費者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步傍睹。

解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后隔盛,往隊列中再發(fā)一個結(jié)束信號犹菱,這樣消費者在接收到結(jié)束信號后就可以break出死循環(huán)拾稳。

改良版——生產(chǎn)者消費者模型

from multiprocessing import Process,Queueimport time,random,osdef consumer(q):

? ? while True:

? ? ? ? res=q.get()

? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):

? ? for i in range(10):

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? res='包子%s' %i

? ? ? ? q.put(res)

? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))

? ? q.put(None) #發(fā)送結(jié)束信號if __name__ == '__main__':

? ? q=Queue()

? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))

? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))

? ? #開始? ? p1.start()

? ? c1.start()

? ? print('主')

注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā)腊脱,主進程里同樣可以發(fā)访得,但主進程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號

主進程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號None

from multiprocessing import Process,Queueimport time,random,osdef consumer(q):

? ? while True:

? ? ? ? res=q.get()

? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):

? ? for i in range(2):

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? res='包子%s' %i

? ? ? ? q.put(res)

? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':

? ? q=Queue()

? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))

? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))

? ? #開始? ? p1.start()

? ? c1.start()

? ? p1.join()

? ? q.put(None) #發(fā)送結(jié)束信號? ? print('主')

但上述解決方式,在有多個生產(chǎn)者和多個消費者時陕凹,我們則需要用一個很low的方式去解決

多個消費者的例子:有幾個消費者就需要發(fā)送幾次結(jié)束信號

from multiprocessing import Process,Queueimport time,random,osdef consumer(q):

? ? while True:

? ? ? ? res=q.get()

? ? ? ? if res is None:break #收到結(jié)束信號則結(jié)束? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):

? ? for i in range(2):

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? res='%s%s' %(name,i)

? ? ? ? q.put(res)

? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':

? ? q=Queue()

? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))

? ? p2=Process(target=producer,args=('骨頭',q))

? ? p3=Process(target=producer,args=('泔水',q))

? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))

? ? c2=Process(target=consumer,args=(q,))

? ? #開始? ? p1.start()

? ? p2.start()

? ? p3.start()

? ? c1.start()

? ? p1.join() #必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號? ? p2.join()

? ? p3.join()

? ? q.put(None) #有幾個消費者就應(yīng)該發(fā)送幾次結(jié)束信號None? ? q.put(None) #發(fā)送結(jié)束信號? ? print('主')

JoinableQueue([maxsize])?

創(chuàng)建可連接的共享進程隊列悍抑。這就像是一個Queue對象,但隊列允許項目的使用者通知生產(chǎn)者項目已經(jīng)被成功處理杜耙。通知進程是使用共享的信號和條件變量來實現(xiàn)的搜骡。?

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:

q.task_done()

使用者使用此方法發(fā)出信號佑女,表示q.get()返回的項目已經(jīng)被處理记靡。如果調(diào)用此方法的次數(shù)大于從隊列中刪除的項目數(shù)量,將引發(fā)ValueError異常团驱。

q.join()

生產(chǎn)者將使用此方法進行阻塞摸吠,直到隊列中所有項目均被處理。阻塞將持續(xù)到為隊列中的每個項目均調(diào)用q.task_done()方法為止嚎花。

下面的例子說明如何建立永遠運行的進程寸痢,使用和處理隊列上的項目。生產(chǎn)者將項目放入隊列紊选,并等待它們被處理啼止。

方法介紹

from multiprocessing import Process,JoinableQueueimport time,random,osdef consumer(q):

? ? while True:

? ? ? ? res=q.get()

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

? ? ? ? q.task_done() #向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了def producer(name,q):

? ? for i in range(10):

? ? ? ? time.sleep(random.randint(1,3))

? ? ? ? res='%s%s' %(name,i)

? ? ? ? q.put(res)

? ? ? ? print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))

? ? q.join() #生產(chǎn)完畢,使用此方法進行阻塞兵罢,直到隊列中所有項目均被處理献烦。if __name__ == '__main__':

? ? q=JoinableQueue()

? ? #生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))

? ? p2=Process(target=producer,args=('骨頭',q))

? ? p3=Process(target=producer,args=('泔水',q))

? ? #消費者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))

? ? c2=Process(target=consumer,args=(q,))

? ? c1.daemon=True

? ? c2.daemon=True

? ? #開始? ? p_l=[p1,p2,p3,c1,c2]

? ? for p in p_l:

? ? ? ? p.start()

? ? p1.join()

? ? p2.join()

? ? p3.join()

? ? print('主')

? ? #主進程等--->p1,p2,p3等---->c1,c2? ? #p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊列的數(shù)據(jù)? ? #因而c1,c2也沒有存在的價值了,不需要繼續(xù)阻塞在進程中影響主進程了。應(yīng)該隨著主進程的結(jié)束而結(jié)束,所以設(shè)置成守護進程就可以了趣些。

管道(了解)

介紹

#創(chuàng)建管道的類:Pipe([duplex]):在進程之間創(chuàng)建一條管道仿荆,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道#參數(shù)介紹:dumplex:默認管道是全雙工的拢操,如果將duplex射成False锦亦,conn1只能用于接收,conn2只能用于發(fā)送令境。#主要方法:? ? conn1.recv():接收conn2.send(obj)發(fā)送的對象杠园。如果沒有消息可接收,recv方法會一直阻塞舔庶。如果連接的另外一端已經(jīng)關(guān)閉抛蚁,那么recv方法會拋出EOFError。

? ? conn1.send(obj):通過連接發(fā)送對象惕橙。obj是與序列化兼容的任意對象

#其他方法:conn1.close():關(guān)閉連接瞧甩。如果conn1被垃圾回收,將自動調(diào)用此方法

conn1.fileno():返回連接使用的整數(shù)文件描述符

conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用弥鹦,返回True肚逸。timeout指定等待的最長時限。如果省略此參數(shù)彬坏,方法將立即返回結(jié)果朦促。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達栓始。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息务冕。maxlength指定要接收的最大字節(jié)數(shù)。如果進入的消息幻赚,超過了這個最大值禀忆,將引發(fā)IOError異常,并且在連接上無法進行進一步讀取坯屿。如果連接的另外一端已經(jīng)關(guān)閉油湖,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常领跛。

conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū)乏德,buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量吠昭,而size是要發(fā)送字節(jié)數(shù)喊括。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進行接收? ?

conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息矢棚,并把它保存在buffer對象中郑什,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移蒲肋。返回值是收到的字節(jié)數(shù)蘑拯。如果消息長度大于可用的緩沖區(qū)空間钝满,將引發(fā)BufferTooShort異常。

pipe初使用

from multiprocessing import Process, Pipedef f(conn):

? ? conn.send("Hello The_Third_Wave")

? ? conn.close()if __name__ == '__main__':

? ? parent_conn, child_conn = Pipe()

? ? p = Process(target=f, args=(child_conn,))

? ? p.start()

? ? print(parent_conn.recv())

? ? p.join()

應(yīng)該特別注意管道端點的正確管理問題申窘。如果是生產(chǎn)者或消費者中都沒有使用管道的某個端點弯蚜,就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端剃法,在消費者中關(guān)閉管道的輸入端碎捺。如果忘記執(zhí)行這些步驟,程序可能在消費者中的recv()操作上掛起贷洲。管道是由操作系統(tǒng)進行引用計數(shù)的收厨,必須在所有進程中關(guān)閉管道后才能生成EOFError異常。因此优构,在生產(chǎn)者中關(guān)閉管道不會有任何效果诵叁,除非消費者也關(guān)閉了相同的管道端點。?

pipe初使用

from multiprocessing import Process, Pipedef f(parent_conn,child_conn):

? ? #parent_conn.close() #不寫close將不會引發(fā)EOFError? ? while True:

? ? ? ? try:

? ? ? ? ? ? print(child_conn.recv())

? ? ? ? except EOFError:

? ? ? ? ? ? child_conn.close()if __name__ == '__main__':

? ? parent_conn, child_conn = Pipe()

? ? p = Process(target=f, args=(parent_conn,child_conn,))

? ? p.start()

? ? child_conn.close()

? ? parent_conn.send('hello')

? ? parent_conn.close()

? ? p.join()

pipe實現(xiàn)生產(chǎn)者消費者模型

from multiprocessing import Process,Pipedef consumer(p,name):

? ? produce, consume=p

? ? produce.close()

? ? while True:

? ? ? ? try:

? ? ? ? ? ? baozi=consume.recv()

? ? ? ? ? ? print('%s 收到包子:%s' %(name,baozi))

? ? ? ? except EOFError:

? ? ? ? ? ? breakdef producer(seq,p):

? ? produce, consume=p

? ? consume.close()

? ? for i in seq:

? ? ? ? produce.send(i)if __name__ == '__main__':

? ? produce,consume=Pipe()

? ? c1=Process(target=consumer,args=((produce,consume),'c1'))

? ? c1.start()

? ? seq=(i for i in range(10))

? ? producer(seq,(produce,consume))

? ? produce.close()

? ? consume.close()

? ? c1.join()

? ? print('主進程')

多個消費之之間的競爭問題帶來的數(shù)據(jù)不安全問題

from multiprocessing import Process,Pipe,Lockdef consumer(p,name,lock):

? ? produce, consume=p

? ? produce.close()

? ? while True:

? ? ? ? lock.acquire()

? ? ? ? baozi=consume.recv()

? ? ? ? lock.release()

? ? ? ? if baozi:

? ? ? ? ? ? print('%s 收到包子:%s' %(name,baozi))

? ? ? ? else:

? ? ? ? ? ? consume.close()

? ? ? ? ? ? breakdef producer(p,n):

? ? produce, consume=p

? ? consume.close()

? ? for i in range(n):

? ? ? ? produce.send(i)

? ? produce.send(None)

? ? produce.send(None)

? ? produce.close()if __name__ == '__main__':

? ? produce,consume=Pipe()

? ? lock = Lock()

? ? c1=Process(target=consumer,args=((produce,consume),'c1',lock))

? ? c2=Process(target=consumer,args=((produce,consume),'c2',lock))

? ? p1=Process(target=producer,args=((produce,consume),10))

? ? c1.start()

? ? c2.start()

? ? p1.start()

? ? produce.close()

? ? consume.close()

? ? c1.join()

? ? c2.join()

? ? p1.join()

? ? print('主進程')

進程之間的數(shù)據(jù)共享

展望未來俩块,基于消息傳遞的并發(fā)編程是大勢所趨

即便是使用線程黎休,推薦做法也是將程序設(shè)計為大量獨立的線程集合浓领,通過消息隊列交換數(shù)據(jù)玉凯。

這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統(tǒng)中联贩。

但進程間應(yīng)該盡量避免通信漫仆,即便需要通信,也應(yīng)該選擇進程安全的工具來避免加鎖帶來的問題泪幌。

以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進程之間的數(shù)據(jù)共享問題盲厌。

Manager模塊介紹

進程間數(shù)據(jù)是獨立的,可以借助于隊列或管道實現(xiàn)通信祸泪,二者都是基于消息傳遞的

雖然進程間數(shù)據(jù)獨立吗浩,但可以通過Manager實現(xiàn)數(shù)據(jù)共享,事實上Manager的功能遠不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

Manager例子

from multiprocessing import Manager,Process,Lockdef work(d,lock):

? ? with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會出現(xiàn)數(shù)據(jù)錯亂? ? ? ? d['count']-=1if __name__ == '__main__':

? ? lock=Lock()

? ? with Manager() as m:

? ? ? ? dic=m.dict({'count':100})

? ? ? ? p_l=[]

? ? ? ? for i in range(100):

? ? ? ? ? ? p=Process(target=work,args=(dic,lock))

? ? ? ? ? ? p_l.append(p)

? ? ? ? ? ? p.start()

? ? ? ? for p in p_l:

? ? ? ? ? ? p.join()

? ? ? ? print(dic)

進程池和multiprocess.Pool模塊

進程池

為什么要有進程池?進程池的概念没隘。

在程序?qū)嶋H處理問題過程中懂扼,忙時會有成千上萬的任務(wù)需要被執(zhí)行,閑時可能只有零星任務(wù)右蒲。那么在成千上萬個任務(wù)需要被執(zhí)行的時候阀湿,我們就需要去創(chuàng)建成千上萬個進程么?首先瑰妄,創(chuàng)建進程需要消耗時間陷嘴,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程间坐,操作系統(tǒng)也不能讓他們同時執(zhí)行灾挨,這樣反而會影響程序的效率邑退。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進程。那么我們要怎么做呢劳澄?

在這里瓜饥,要給大家介紹一個進程池的概念,定義一個池子浴骂,在里面放上固定數(shù)量的進程乓土,有需求來了,就拿一個池中的進程來處理任務(wù)溯警,等到處理完畢趣苏,進程并不關(guān)閉,而是將進程再放回進程池中繼續(xù)等待任務(wù)梯轻。如果有很多任務(wù)需要執(zhí)行食磕,池中的進程數(shù)量不夠,任務(wù)就要等待之前的進程執(zhí)行任務(wù)完畢歸來喳挑,拿到空閑進程才能繼續(xù)執(zhí)行彬伦。也就是說,池中進程的數(shù)量是固定的伊诵,那么同一時間最多有固定數(shù)量的進程在運行单绑。這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進程的時間曹宴,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果搂橙。

multiprocess.Pool模塊

概念介紹

Pool([numprocess? [,initializer [, initargs]]]):創(chuàng)建進程池

參數(shù)介紹

1 numprocess:要創(chuàng)建的進程數(shù),如果省略笛坦,將默認使用cpu_count()的值2 initializer:是每個工作進程啟動時要執(zhí)行的可調(diào)用對象区转,默認為None3 initargs:是要傳給initializer的參數(shù)組

主要方法

1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。2 '''需要強調(diào)的是:此操作并不會在所有池工作進程中并執(zhí)行func函數(shù)版扩。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù)废离,必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''3 4 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。5 '''此方法的結(jié)果是AsyncResult類的實例礁芦,callback是可調(diào)用對象蜻韭,接收輸入?yún)?shù)。當func的結(jié)果變?yōu)榭捎脮r宴偿,將理解傳遞給callback湘捎。callback禁止執(zhí)行任何阻塞操作兑障,否則將接收其他異步操作中的結(jié)果钱豁。'''6? ? 7 p.close():關(guān)閉進程池,防止進一步操作怪蔑。如果所有操作持續(xù)掛起娩践,它們將在工作進程終止前完成8 9 P.jion():等待所有工作進程退出活翩。此方法只能在close()或teminate()之后調(diào)用

其他方法(了解)

1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj烹骨。實例具有以下方法2 obj.get():返回結(jié)果材泄,如果有必要則等待結(jié)果到達沮焕。timeout是可選的峦树。如果在指定時間內(nèi)還沒有到達,將引發(fā)一場旦事。如果遠程操作中引發(fā)了異常魁巩,它將在調(diào)用此方法時再次被引發(fā)姐浮。3 obj.ready():如果調(diào)用完成,返回True4 obj.successful():如果調(diào)用完成且沒有引發(fā)異常卖鲤,返回True肾扰,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常5 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩? obj.terminate():立即終止所有工作進程集晚,同時不執(zhí)行任何清理或結(jié)束任何掛起工作换怖。如果p被垃圾回收沉颂,將自動調(diào)用此函數(shù)

代碼實例

同步和異步

進程池的同步調(diào)用

import os,timefrom multiprocessing import Pooldef work(n):

? ? print('%s run' %os.getpid())

? ? time.sleep(3)

? ? return n**2if __name__ == '__main__':

? ? p=Pool(3) #進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)? ? res_l=[]

? ? for i in range(10):

? ? ? ? res=p.apply(work,args=(i,)) # 同步調(diào)用悦污,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 但不管該任務(wù)是否存在阻塞彻坛,同步調(diào)用都會在原地等著? ? print(res_l)

進程池的異步調(diào)用

import osimport timeimport randomfrom multiprocessing import Pooldef work(n):

? ? print('%s run' %os.getpid())

? ? time.sleep(random.random())

? ? return n**2if __name__ == '__main__':

? ? p=Pool(3) #進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)? ? res_l=[]

? ? for i in range(10):

? ? ? ? res=p.apply_async(work,args=(i,)) # 異步運行踏枣,根據(jù)進程池中有的進程數(shù)茵瀑,每次最多3個子進程在異步執(zhí)行? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 返回結(jié)果之后,將結(jié)果放入列表竞帽,歸還進程,之后再執(zhí)行新的任務(wù)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 需要注意的是疙渣,進程池中的三個進程不會同時開啟或者同時結(jié)束? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 而是執(zhí)行完一個就釋放一個進程堆巧,這個進程就去接收新的任務(wù)妄荔。? ? ? ? ? res_l.append(res)

? ? # 異步apply_async用法:如果使用異步提交的任務(wù)懦冰,主進程需要使用jion谣沸,等待進程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果? ? # 否則内地,主進程結(jié)束赋除,進程池可能還沒來得及執(zhí)行举农,也就跟著一起結(jié)束了? ? p.close()

? ? p.join()

? ? for res in res_l:

? ? ? ? print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get

server:進程池版socket并發(fā)聊天

#Pool內(nèi)的進程數(shù)默認是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())

#開啟6個客戶端航背,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)

#在每個進程內(nèi)查看pid棱貌,會發(fā)現(xiàn)pid使用為4個婚脱,即多個客戶端公用4個進程from socket import *from multiprocessing import Poolimport os

server=socket(AF_INET,SOCK_STREAM)

server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

server.bind(('127.0.0.1',8080))

server.listen(5)def talk(conn):

? ? print('進程pid: %s' %os.getpid())

? ? while True:

? ? ? ? try:

? ? ? ? ? ? msg=conn.recv(1024)

? ? ? ? ? ? if not msg:break? ? ? ? ? ? conn.send(msg.upper())

? ? ? ? except Exception:

? ? ? ? ? ? breakif __name__ == '__main__':

? ? p=Pool(4)

? ? while True:

? ? ? ? conn,*_=server.accept()

? ? ? ? p.apply_async(talk,args=(conn,))

? ? ? ? # p.apply(talk,args=(conn,client_addr)) #同步的話障贸,則同一時間只有一個客戶端能訪問

client

from socket import *

client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8080))while True:

? ? msg=input('>>: ').strip()

? ? if not msg:continue? ? client.send(msg.encode('utf-8'))

? ? msg=client.recv(1024)

? ? print(msg.decode('utf-8'))

發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務(wù)端同一時間只有4個不同的pid涩维,只能結(jié)束一個客戶端嘀粱,另外一個客戶端才會進來.

回調(diào)函數(shù)

需要回調(diào)函數(shù)的場景:進程池中任何一個任務(wù)一旦處理完了,就立即告知主進程:我好了額垄分,你可以處理我的結(jié)果了薄湿。主進程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)

我們可以把耗時間(阻塞)的任務(wù)放到進程池中吆倦,然后指定回調(diào)函數(shù)(主進程負責執(zhí)行)坐求,這樣主進程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果须妻。

使用多進程請求多個url來減少網(wǎng)絡(luò)等待浪費的時間

from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):

? ? print('<進程%s> get %s' %(os.getpid(),url))

? ? respone=requests.get(url)

? ? if respone.status_code == 200:

? ? ? ? return {'url':url,'text':respone.text}def pasrse_page(res):

? ? print('<進程%s> parse %s' %(os.getpid(),res['url']))

? ? parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))

? ? with open('db.txt','a') as f:

? ? ? ? f.write(parse_res)if __name__ == '__main__':

? ? urls=[

? ? ? ? 'https://www.baidu.com',

? ? ? ? 'https://www.python.org',

? ? ? ? 'https://www.openstack.org',

? ? ? ? 'https://help.github.com/',

? ? ? ? 'http://www.sina.com.cn/'? ? ]

? ? p=Pool(3)

? ? res_l=[]

? ? for url in urls:

? ? ? ? res=p.apply_async(get_page,args=(url,),callback=pasrse_page)

? ? ? ? res_l.append(res)

? ? p.close()

? ? p.join()

? ? print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了'''

打印結(jié)果:

<進程3388> get https://www.baidu.com

<進程3389> get https://www.python.org

<進程3390> get https://www.openstack.org

<進程3388> get https://help.github.com/

<進程3387> parse https://www.baidu.com

<進程3389> get http://www.sina.com.cn/

<進程3387> parse https://www.python.org

<進程3387> parse https://help.github.com/

<進程3387> parse http://www.sina.com.cn/

<進程3387> parse https://www.openstack.org

[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]

'''

爬蟲實例

import refrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(url,pattern):

? ? response=urlopen(url).read().decode('utf-8')

? ? return pattern,responsedef parse_page(info):

? ? pattern,page_content=info

? ? res=re.findall(pattern,page_content)

? ? for item in res:

? ? ? ? dic={

? ? ? ? ? ? 'index':item[0].strip(),

? ? ? ? ? ? 'title':item[1].strip(),

? ? ? ? ? ? 'actor':item[2].strip(),

? ? ? ? ? ? 'time':item[3].strip(),

? ? ? ? }

? ? ? ? print(dic)if __name__ == '__main__':

? ? regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'? ? pattern1=re.compile(regex,re.S)

? ? url_dic={

? ? ? ? 'http://maoyan.com/board/7':pattern1,

? ? }

? ? p=Pool()

? ? res_l=[]

? ? for url,pattern in url_dic.items():

? ? ? ? res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)

? ? ? ? res_l.append(res)

? ? for i in res_l:

? ? ? ? i.get()

如果在主進程中等待進程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果绰更,則無需回調(diào)函數(shù)

無需回調(diào)函數(shù)

from multiprocessing import Poolimport time,random,osdef work(n):

? ? time.sleep(1)

? ? return n**2if __name__ == '__main__':

? ? p=Pool()

? ? res_l=[]

? ? for i in range(10):

? ? ? ? res=p.apply_async(work,args=(i,))

? ? ? ? res_l.append(res)

? ? p.close()

? ? p.join() #等待進程池中所有進程執(zhí)行完畢? ? nums=[]

? ? for res in res_l:

? ? ? ? nums.append(res.get()) #拿到所有結(jié)果? ? print(nums) #主進程拿到所有的處理結(jié)果,可以在主進程中進行統(tǒng)一進行處理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末儡湾,一起剝皮案震驚了整個濱河市员辩,隨后出現(xiàn)的幾起案子奠滑,更是在濱河造成了極大的恐慌妒穴,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杰赛,死亡現(xiàn)場離奇詭異乏屯,居然都是意外死亡,警方通過查閱死者的電腦和手機蛤迎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門替裆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來窘问,“玉大人,你說我怎么就攤上這事把鉴《郏” “怎么了概疆?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長凯旭。 經(jīng)常有香客問我使套,道長侦高,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任计螺,我火速辦了婚禮瞧壮,結(jié)果婚禮上咆槽,老公的妹妹穿的比我還像新娘。我一直安慰自己麦射,他們只是感情好,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布蛔琅。 她就那樣靜靜地躺著半等,像睡著了一般杀饵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朽缎,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天谜悟,我揣著相機與錄音葡幸,去河邊找鬼。 笑死床蜘,一個胖子當著我的面吹牛蔑水,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播丹擎,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼蒂培,長吁一口氣:“原來是場噩夢啊……” “哼榜苫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤羔飞,失蹤者是張志新(化名)和其女友劉穎逻淌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體田柔,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡骨望,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年擎鸠,在試婚紗的時候發(fā)現(xiàn)自己被綠了劣光。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡牲剃,死狀恐怖雄可,靈堂內(nèi)的尸體忽然破棺而出滞项,到底是詐尸還是另有隱情,我是刑警寧澤过椎,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布戏仓,位于F島的核電站赏殃,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏榜揖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一思劳、第九天 我趴在偏房一處隱蔽的房頂上張望妨猩。 院中可真熱鬧壶硅,春花似錦、人聲如沸椒舵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至侥加,卻和暖如春粪躬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背提前。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工狈网, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留笨腥,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓士鸥,卻偏偏與公主長得像烤礁,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子币砂,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355