進程狀態(tài)和調(diào)度
在程序運行的時候,由于被操作系統(tǒng)的調(diào)度算法控制,程序會進入幾個狀態(tài):就緒,運行,阻塞
1. 就緒狀態(tài): 當(dāng)進程分配到CPU以外的所有的資源,只要獲取的處理器的使用權(quán)就可立即執(zhí)行,這時就是進入了就緒狀態(tài)
2. 執(zhí)行/運行狀態(tài): 當(dāng)程序已經(jīng)獲取了處理器的使用權(quán),其程序正在處理器上運行,此時的進程狀態(tài)稱為執(zhí)行/運行狀態(tài)
3. 阻塞狀態(tài): 當(dāng)程序由于等待某個事件而無法執(zhí)行的時候,便放棄處理器而處于阻塞狀態(tài).引起阻塞的原因可能有多種:例如:等待IO的完成,申請緩存區(qū)不能滿足,等待信號等
Python中使用多進程
multiprocessing模塊
multiprocessing.Process 介紹
Process模塊是一個創(chuàng)建進程的模塊,借助這個模塊可以創(chuàng)建進程
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象摹闽,表示一個子進程中的任務(wù)(尚未啟動)
強調(diào):
1. 需要使用關(guān)鍵字的方式來指定參數(shù)
2. args指定的為傳給target函數(shù)的位置參數(shù)堤如,是一個元組形式几晤,必須有逗號
參數(shù)介紹:
group參數(shù)未使用秕脓,值始終為None
target表示調(diào)用對象队魏,即子進程要執(zhí)行的任務(wù)
args表示調(diào)用對象的位置參數(shù)元組室囊,args=(1,2,'egon',)
kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18}
name為子進程的名稱
方法介紹
p.start():啟動進程泼橘,并調(diào)用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調(diào)用target指定的函數(shù)厦酬,我們自定義類的類中一定要實現(xiàn)該方法
p.terminate():強制終止進程p胆描,不會進行任何清理操作,如果p創(chuàng)建了子進程弃锐,該子進程就成了僵尸進程袄友,
使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放霹菊,進而導(dǎo)致死鎖
p.is_alive():如果p仍然運行剧蚣,返回True
p.join([timeout]):主線程等待p終止(強調(diào):是主線程處于等的狀態(tài),而p是處于運行的狀態(tài))旋廷。
timeout是可選的超時時間鸠按,需要強調(diào)的是,p.join只能join住start開啟的進程饶碘,而不能join住run開啟的進程
屬性介紹
p.daemon:默認值為False目尖,如果設(shè)為True,代表p為后臺運行的守護進程扎运,當(dāng)p的父進程終止時瑟曲,p也隨之終止,
并且設(shè)定為True后豪治,p不能創(chuàng)建自己的新進程洞拨,必須在p.start()之前設(shè)置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N负拟,表示被信號N結(jié)束(了解即可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串烦衣。這個鍵的用途是
為涉及網(wǎng)絡(luò)連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
window中使用Process注意事項:
在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進程的機制),在創(chuàng)建子進程的時候會自動 import 啟動它的這個文件花吟,而在 import 的時候又執(zhí)行了整個文件秸歧。因此如果將process()直接寫在文件中就會無限遞歸創(chuàng)建子進程報錯。所以必須把創(chuàng)建子進程的部分使用if __name__
==‘__main__
’ 判斷保護起來衅澈,import 的時候 键菱,就不會遞歸運行了。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 19:51'
from multiprocessing import Process
import time
def task(name):
print('{} 開始運行'.format(name))
time.sleep(2)
print('{} 運行結(jié)束'.format(name))
# windows 開子進程要在__name__ == '__mian__'下
# 因為開子進程會重新加載父進程的內(nèi)容
if __name__ == '__main__':
p1 = Process(target=task, args=('t1',))
p2 = Process(target=task, kwargs={'name': 't2'})
p1.start()
p2.start()
print('---主進程---')
查看主進程的id和父進程的id
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:01'
import time
from multiprocessing import Process
import os
def task(name):
print('{} 開始運行'.format(name))
print('子進程id:{},父進程id:{}'.format(os.getpid(),os.getppid()))
time.sleep(2)
print('{} 運行結(jié)束'.format(name))
if __name__ == '__main__':
print('主進程id:',os.getpid())
p = Process(target=task,args=('進程1',))
p.start()
print('---主進程運行結(jié)束---')
join方法,優(yōu)雅的結(jié)束子進程
可以實現(xiàn)父進程優(yōu)雅的等待子進程的結(jié)束,join表示的是主進程等待當(dāng)前的子進程結(jié)束.
并且進程之間的數(shù)據(jù)是獨立的
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:09'
from multiprocessing import Process
x = 100
def change():
global x
x = 10
print('子進程修改了x,子進程結(jié)束了')
print(x)
if __name__ == '__main__':
p = Process(target=change)
p.start()
p.join() # 優(yōu)雅的實現(xiàn)了主進程等待子進程先結(jié)束,然后再結(jié)束,避免了僵尸進程的出現(xiàn).
print(x)
print('主進程結(jié)束')
結(jié)果:
使用繼承Process類重寫run方法的方式創(chuàng)建多進程
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:16'
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(os.getpid(),os.getppid())
print('{} 正在打游戲..'.format(self.name))
if __name__ == '__main__':
p1 = MyProcess('fioman')
p2 = MyProcess('jingjing')
p3 = MyProcess('mengmeng')
p1.start() # start會自動調(diào)用run
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print('主線程')
守護進程
父進程中將一個子進程設(shè)置為守護進程,那么這個子進程會隨著主進程的結(jié)束而結(jié)束
主進程創(chuàng)建守護進程
1.守護進程會在主進程結(jié)束的時候而終止
2.守護進程將無法再創(chuàng)建子進程
守護進程的創(chuàng)建方式:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:25'
from multiprocessing import Process
import os,time
class MyProcess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(),os.getppid())
time.sleep(2)
print("子進程結(jié)束")
if __name__ == '__main__':
p = MyProcess('子進程')
p.daemon = True # 一定要在p.start()之前設(shè)置,一旦設(shè)置這個屬性,表示這個進程是守護進程,主進程結(jié)束的時候,它也會結(jié)束
p.start()
print('主進程結(jié)束')
運行結(jié)果,主進程結(jié)束了,子進程由于在休眠,所以后面的子進程結(jié)束并不會打印
Process中的其他方法
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:32'
from multiprocessing import Process
import time
import random
class MyProcess(Process):
def __init__(self,person):
self.name = person
super().__init__()
def run(self):
print('{} 正在打游戲'.format(self.name))
time.sleep(random.randrange(1,5))
print('{} 還在打游戲'.format(self.name))
p1 = MyProcess('Fioman')
p1.start()
p1.terminate() # 關(guān)閉進程,不會馬上關(guān)閉,所以is_alive立刻查看結(jié)果可能還是存貨
print(p1.is_alive()) # True
print('開始')
time.sleep(1)
print(p1.is_alive()) # False
互斥鎖
通過剛剛的學(xué)習(xí)矾麻,我們千方百計實現(xiàn)了程序的異步纱耻,讓多個任務(wù)可以同時在幾個進程中并發(fā)處理,他們之間的運行沒有順序险耀,一旦開啟也不受我們控制。盡管并發(fā)編程讓我們能更加充分的利用IO資源玖喘,但是也給我們帶來了新的問題甩牺。
當(dāng)多個進程使用同一份數(shù)據(jù)資源的時候,就會因為競爭而引發(fā)數(shù)據(jù)安全或順序混亂問題累奈。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:00'
from multiprocessing import Process
import time
import random
def task1():
print('這是 task1 任務(wù)'.center(30, '-'))
print('task1 進了洗手間')
time.sleep(random.randint(1, 3))
print('task1 辦事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手間')
def task2():
print('這是 task2 任務(wù)'.center(30, '-'))
print('task2 進了洗手間')
time.sleep(random.randint(1, 3))
print('task2 辦事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手間')
def task3():
print('這是 task3 任務(wù)'.center(30, '-'))
print('task3 進了洗手間')
time.sleep(random.randint(1, 3))
print('task3 辦事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手間')
if __name__ == '__main__':
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p2.start()
p3.start()
通過加鎖來控制
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:00'
from multiprocessing import Process, Lock
import time
import random
# 生成一個互斥鎖
mutex_lock = Lock()
def task1(lock):
lock.acquire() # 獲取到鎖對象,當(dāng)其他進程想要獲取鎖對象的時候,就會進入等待狀態(tài)
print('這是 task1 任務(wù)'.center(30, '-'))
print('task1 進了洗手間')
time.sleep(random.randint(1, 3))
print('task1 辦事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手間')
lock.release()
def task2(lock):
lock.acquire()
print('這是 task2 任務(wù)'.center(30, '-'))
print('task2 進了洗手間')
time.sleep(random.randint(1, 3))
print('task2 辦事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手間')
lock.release()
def task3(lock):
lock.acquire()
print('這是 task3 任務(wù)'.center(30, '-'))
print('task3 進了洗手間')
time.sleep(random.randint(1, 3))
print('task3 辦事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手間')
lock.release()
if __name__ == '__main__':
p1 = Process(target=task1, args=(mutex_lock,))
p2 = Process(target=task2, args=(mutex_lock,))
p3 = Process(target=task3, args=(mutex_lock,))
p1.start()
p2.start()
p3.start()
上面雖然通過加鎖控制住了進程因為爭奪資源而出現(xiàn)的同事對同一資源的訪問,但是程序又變成串行了.這樣確實浪費了時間,但是確保了程序數(shù)據(jù)的安全性.
互斥鎖模擬搶票
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:27'
import json,time
from multiprocessing import Process,Lock
def buy_ticket(i,lock):
lock.acquire()
with open('ticket','r',encoding='utf-8') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0:
dic['ticket'] -= 1
print('{} 買到了票'.format(i))
else:
print('{} 沒有買到票'.format(i))
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket,args=(i,lock))
p.start()
一點思考
加鎖可以保證多個進程修改同一塊數(shù)據(jù)時贬派,同一時間只能有一個任務(wù)可以進行修改,即串行的修改澎媒,在犧牲速度的前提下保證數(shù)據(jù)安全搞乏。
雖然可以用文件共享數(shù)據(jù)實現(xiàn)進程間通信,但問題是:
- 效率低(共享數(shù)據(jù)基于文件戒努,而文件是硬盤上的數(shù)據(jù))
- 需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
- 效率高(多個進程共享一塊內(nèi)存的數(shù)據(jù))
- 幫我們處理好鎖問題请敦。
mutiprocessing模塊中為我們提供了一個IPC通信機制:隊列和管道。
隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中储玫,隊列又是基于(管道+鎖)實現(xiàn)的侍筛,可以讓我們從復(fù)雜的鎖問題中解脫出來,
我們應(yīng)該盡量避免使用共享數(shù)據(jù)撒穷,盡可能使用消息傳遞和隊列匣椰,避免處理復(fù)雜的同步和鎖問題,而且在進程數(shù)目增多時端礼,往往可以獲得更好的可擴展性禽笑。
進程間的同步控制之信號量(Semaphore)
信號量是用來解決什么問題的?
信號量是用來解決,當(dāng)我們的某段資源只希望有N個進程訪問的時候,用的一種方式.相當(dāng)于規(guī)定了這個進程,只可以拿到幾個鎖資源.如果超過了,限定的個數(shù),進程就會進入等待阻塞狀態(tài),直到有的進程釋放了鎖,這個等待的進程才可以獲取鎖的使用權(quán).信號量有效的控制了同一時間某段資源同時只能開啟的進程的最大數(shù),有效的防止了程序因為開啟太多的進程,造成內(nèi)存浪費的情況.
信號量的使用方式:
1. 創(chuàng)建信號量實例對象
2. 將對象以參數(shù)的形式傳遞給進程.
3. 在事件函數(shù)中,開始的時候獲取鎖,結(jié)束的時候,釋放鎖.當(dāng)它的鎖釋放的時候,新的等待的進程就會獲取到鎖.
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 11:32'
from multiprocessing import Process, Semaphore
import time
import random
def task(name, sem):
sem.acquire()
print('{} 正在執(zhí)行任務(wù)'.format(name))
time.sleep(random.randrange(1, 3))
print('{} 任務(wù)執(zhí)行完畢'.format(name))
sem.release()
if __name__ == '__main__':
sem = Semaphore()
for i in range(20):
p = Process(target=task, args=(i, sem))
p.start()
進程間控制之事件
事件的原理就是: 在主進程中創(chuàng)建一個事件對象Event(),然后用參數(shù)的形式傳遞給子進程.在子進程中通過一個進程的事件狀態(tài)改變,來決定另一個進程是阻塞狀態(tài)還是繼續(xù)執(zhí)行.
比如:
車和紅綠燈事件. 紅燈的時候,將事件的is_set()設(shè)置為False,默認就是False.當(dāng)是綠燈的時候?qū)⑵湓O(shè)置為True,然后車就可以根據(jù)事件的狀態(tài)來決定是否可以通行.類似于通知的形式,用一個共同的標志位,來通過改變標志位,通知相應(yīng)的進程是運行還是阻塞.
事件常用的方法
obj.is_set():默認值為False,事件是通過此方法的bool值去標示wait()的阻塞狀態(tài)
obj.set():將is_set()的bool值改為True
obj.clear():將is_set()的bool值改為False
obj.wait():is_set()的值為False時阻塞蛤奥,否則不阻塞
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 12:21'
import time,random
from multiprocessing import Process,Event
def car(e,i):
if not e.is_set():
print('{} 號車在紅燈處等待'.format(i))
e.wait() # 這里會阻塞,直到得到一個 事件狀態(tài)為True的信號事件.
print('{} 號車通過'.format(i))
def light(e):
while True:
if e.is_set():
e.clear() # 兩秒之后,變成紅燈
print('紅燈亮了')
else:
e.set() # 兩秒之后,變成綠燈
print('綠燈亮了')
time.sleep(1)
if __name__ == '__main__':
e = Event()
lig = Process(target=light,args=(e,))
lig.start()
for i in range(10):
c = Process(target=car,args=(e,i))
c.start()
time.sleep(random.random())
進程間通信(隊列和管道m(xù)ultiprocessing.Queue和multiprocess.Pipe)
進程間通信之隊列 MultiProcessing.Queue
進程間通信
IPC Inter-Process Communication
隊列
創(chuàng)建一個進程隊列,Queue是多進程安全的隊列,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞.
Queue 隊列常用的方法
Queue([maxsize])
創(chuàng)建共享的進程隊列佳镜。
參數(shù) :maxsize是隊列中允許的最大項數(shù)。如果省略此參數(shù)喻括,則無大小限制邀杏。
底層隊列使用管道和鎖定實現(xiàn)。
Queue([maxsize])
創(chuàng)建共享的進程隊列。maxsize是隊列中允許的最大項數(shù)望蜡。如果省略此參數(shù)唤崭,則無大小限制。
底層隊列使用管道和鎖定實現(xiàn)脖律。另外谢肾,還需要運行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空小泉,此方法將阻塞芦疏,直到隊列中有項目可用為止。block用于控制阻塞行為微姊,
默認為True. 如果設(shè)置為False酸茴,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間兢交,
用在阻塞模式中薪捍。如果在制定的時間間隔內(nèi)沒有項目變?yōu)榭捎茫瑢⒁l(fā)Queue.Empty異常配喳。
q.get_nowait( )
同q.get(False)方法酪穿。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿晴裹,此方法將阻塞至有空間可用為止被济。block控制阻塞行為,默認為True涧团。
如果設(shè)置為False只磷,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。
timeout指定在阻塞模式中等待可用空間的時間長短少欺。超時后將引發(fā)Queue.Full異常喳瓣。
q.qsize()
返回隊列中目前項目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠赞别,因為在返回結(jié)果和在稍后程序中使用結(jié)果之間畏陕,
隊列中可能添加或刪除了項目。在某些系統(tǒng)上仿滔,此方法可能引發(fā)NotImplementedError異常惠毁。
q.empty()
如果調(diào)用此方法時 q為空,返回True崎页。如果其他進程或線程正在往隊列中添加項目鞠绰,結(jié)果是不可靠的。
也就是說飒焦,在返回和使用結(jié)果之間蜈膨,隊列中可能已經(jīng)加入新的項目屿笼。
q.full()
如果q已滿,返回為True. 由于線程的存在翁巍,結(jié)果也可能是不可靠的(參考q.empty()方法)驴一。。
其他方法
q.close()
關(guān)閉隊列灶壶,防止隊列中加入更多數(shù)據(jù)肝断。調(diào)用此方法時,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù)驰凛,
但將在此方法完成時馬上關(guān)閉胸懈。如果q被垃圾收集,將自動調(diào)用此方法恰响。關(guān)閉隊列不會在隊列使用者中生成任何類型的
數(shù)據(jù)結(jié)束信號或異常趣钱。例如,如果某個使用者正被阻塞在get()操作上渔隶,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤羔挡。
q.cancel_join_thread()
不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞间唉。
q.join_thread()
連接隊列的后臺線程。此方法用于在調(diào)用q.close()方法后利术,等待所有隊列項被消耗呈野。默認情況下,
此方法由不是q的原始創(chuàng)建者的所有進程調(diào)用印叁。調(diào)用q.cancel_join_thread()方法可以禁止這種行為被冒。
隊列常用的方法舉例:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:12'
from multiprocessing import Queue
q = Queue(3)
# put get put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
# q.put(4,block=False) # 如果隊列已經(jīng)滿了,程序會被阻塞到這里
# 如果這里設(shè)置非阻塞,但是當(dāng)隊列已經(jīng)滿的時候會報錯
try:
q.put_nowait(3) # 這里相當(dāng)于是強制不阻塞的往隊列中塞東西,當(dāng)塞不進去的時候就會報錯
except Exception as e:
print('隊列已經(jīng)滿了.')
# 因此,我們再放入數(shù)據(jù)之前,可以先看一下隊列的狀態(tài),如果已經(jīng)滿了,就不要再放了
print(q.full()) # True
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 同put方法一樣,如果隊列為空的時候,會阻塞
try:
q.get_nowait()
except Exception as e:
print('隊列已經(jīng)空了')
print(q.empty())
使用隊列進行進程間通信的簡單示例:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:21'
from multiprocessing import Process, Queue
import time
def task(q):
q.put([time.ctime(), 'from Eva', 'hello'])
if __name__ == '__main__':
q = Queue() # 創(chuàng)建一個沒有大小限制的隊列對象
p = Process(target=task, args=(q,))
p.start()
print(q.get())
p.join()
再看看一個較為復(fù)雜的例子
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:27'
from multiprocessing import Process, Queue
import time
import os
def inputQ(q):
info = str(str(os.getpid()) + 'put:' + str(time.ctime()))
q.put(info)
def outputQ(q):
info = q.get()
print('獲取到的存放的數(shù)據(jù)是: {}'.format(info))
if __name__ == '__main__':
q = Queue(3)
record1 = [] # 記錄存放數(shù)據(jù)的進程
record2 = [] # 記錄取出數(shù)據(jù)的進程
# 往隊列中存放數(shù)據(jù)的進程
for i in range(10):
process = Process(target=inputQ, args=(q,))
process.start()
record1.append(process)
# 從隊列中取數(shù)據(jù)的進程
for i in range(10):
process = Process(target=outputQ, args=(q,))
process.start()
record2.append(process)
# 讓主進程等待它的結(jié)束,使用join
for i in record1:
i.join()
for i in record2:
i.join()
生產(chǎn)者消費模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度轮蜕。
為什么要使用該模型?
在線程世界里昨悼,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程跃洛。在多線程開發(fā)當(dāng)中率触,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢汇竭,那么生產(chǎn)者就必須等待消費者處理完葱蝗,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理细燎,如果消費者的處理能力大于生產(chǎn)者两曼,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式玻驻。
什么是生產(chǎn)者消費者模型?
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題悼凑。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理户辫,直接扔給阻塞隊列渐夸,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取寸莫,阻塞隊列就相當(dāng)于一個緩沖區(qū)捺萌,平衡了生產(chǎn)者和消費者的處理能力。
基于隊列實現(xiàn)生產(chǎn)者消費模型
生產(chǎn)者消費模型,完美的解決了生產(chǎn)者和消費者供需不平衡的問題.
當(dāng)生產(chǎn)速度比較快的時候,可以用增加消費者進程數(shù)量的方法平衡它們之間的生產(chǎn)和消費關(guān)系.
當(dāng)消費速度比較快的時候,可以用增加生產(chǎn)者進程數(shù)量的方法平衡它們之間的生產(chǎn)和消費關(guān)系.
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,Queue
import time,random
def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一個空,生產(chǎn)者已經(jīng)完事了')
break
print('\033[31m{}消費了{}\033[0m'.format(name,food))
time.sleep(random.random())
def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生產(chǎn)了{}{}'.format(name,food,i)
print(f)
q.put(f)
if __name__ == '__main__':
q = Queue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','饅頭',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.start()
c2.start()
# 讓主程序可以等待子進程的結(jié)束.
p1.join()
p2.join()
# 生產(chǎn)者的進程結(jié)束,這里需要放置兩個空值,供消費者獲取,用來判斷已經(jīng)沒有存貨了
q.put(None)
q.put(None)
print('主程序結(jié)束..........')
使用隊列寫生產(chǎn)者消費者模型的時候,注意事項是當(dāng)生產(chǎn)者生產(chǎn)完畢的時候,一定要通知消費者,通過放一個特殊的值的方式.如果有多個消費者,就要放置多個標志結(jié)束的值.這樣消費者的進程,才能知道什么時候結(jié)束.這種屬于邊生產(chǎn)邊消費的模型,不能使用is_empty()來判斷數(shù)據(jù)隊列是否為空,因為消費者在獲取數(shù)據(jù)的時候,有可能生產(chǎn)者正在生產(chǎn)中.所以不能用這種方式來決定消費者的進程是否結(jié)束,必須通過一個特殊的標識值來通知消費者,生產(chǎn)者已經(jīng)沒有數(shù)據(jù)了.你可以結(jié)束消費進程了.已經(jīng)沒有東西可以給你消費了.
注意: 使用隊列的時候一個缺點,如果有N個消費者,就要放置N個值去通知消費者,有點費勁.
這里我們引入一個JoinableQueue()
創(chuàng)建可連接的共享進程隊列,它們也是隊列,但是這些隊列比較特殊.它們可以允許消費者通知生產(chǎn)者項目已經(jīng)被成功處理.注意,這里必須是生產(chǎn)者生產(chǎn)完了,生產(chǎn)者的進程被掛起,等到消費者完全消費的時候,生產(chǎn)者進程就結(jié)束,然后主程序結(jié)束.將消費者進程設(shè)置為守護進程,這樣的話,主進程結(jié)束的時候,消費進程也就結(jié)束了.
JoinableQueue()比普通的Queue()多了兩個方法:
q.task_done()
使用者使用此方法發(fā)出信號膘茎,表示q.get()返回的項目已經(jīng)被處理桃纯。如果調(diào)用此方法的次數(shù)大于從隊列中刪除的項目數(shù)量,將引發(fā)ValueError異常披坏。
q.join()
生產(chǎn)者將使用此方法進行阻塞态坦,直到隊列中所有項目均被處理。阻塞將持續(xù)到為隊列中的每個項目均調(diào)用q.task_done()方法為止棒拂。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,JoinableQueue
import time,random
def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一個空,生產(chǎn)者已經(jīng)完事了')
break
print('\033[31m{}消費了{}\033[0m'.format(name,food))
time.sleep(random.random())
q.task_done() # 向生產(chǎn)者發(fā)送信號,表示消費了一個
def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生產(chǎn)了{}{}'.format(name,food,i)
print(f)
q.put(f)
q.join() # 當(dāng)生產(chǎn)者生產(chǎn)完畢之后,會在這里阻塞.等待消費者的消費.
if __name__ == '__main__':
q = JoinableQueue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','饅頭',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.daemon = True # 將消費者設(shè)置為守護進程
c2.daemon = True # 將消費者設(shè)置為守護進程
c1.start()
c2.start()
# 讓主程序可以等待生產(chǎn)子進程的結(jié)束.
p1.join()
p2.join()
print('主程序結(jié)束..........')