python:進(jìn)程

什么是進(jìn)程

進(jìn)程(Process)是計(jì)算機(jī)中的程序關(guān)于某數(shù)據(jù)集合上的一次運(yùn)行活動(dòng)勾缭,是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)石挂。在早期面向進(jìn)程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中颠通,進(jìn)程是程序的基本執(zhí)行實(shí)體;在當(dāng)代面向線程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中,進(jìn)程是線程的容器仅仆。程序是指令器赞、書及其組織形式的描述,進(jìn)程是程序的實(shí)體墓拜。

狹義定義:進(jìn)程是正在運(yùn)行的程序的實(shí)例港柜。
廣義定義:進(jìn)程是一個(gè)具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合的一次運(yùn)行活動(dòng)。它是操作系統(tǒng)動(dòng)態(tài)執(zhí)行的基本單元咳榜,在傳統(tǒng)的操作系統(tǒng)中夏醉,進(jìn)程既是基本的分配單元,也是基本的執(zhí)行單元贿衍。

進(jìn)程的特征:
1授舟、動(dòng)態(tài)性:進(jìn)程的實(shí)質(zhì)是程序系統(tǒng)中的一次執(zhí)行過程救恨,進(jìn)程是動(dòng)態(tài)產(chǎn)生的贸辈,動(dòng)態(tài)消亡的。
2肠槽、并發(fā)性:任何進(jìn)程都可以同其他進(jìn)程一起并發(fā)執(zhí)行擎淤。
3、獨(dú)立性:進(jìn)程是一個(gè)能獨(dú)立運(yùn)行的基本單位秸仙,同事也是系統(tǒng)非配資源和調(diào)度的獨(dú)立單位嘴拢。
4、異步性:由于進(jìn)程間的相互制約寂纪,使進(jìn)程具有執(zhí)行的間斷性席吴,即進(jìn)程按各自獨(dú)立的、不可預(yù)知的速度向前推進(jìn)捞蛋。
5孝冒、結(jié)構(gòu)特征:進(jìn)程由程序、數(shù)據(jù)和進(jìn)程控制塊三部分組成拟杉。
多個(gè)不同的進(jìn)程可以包含相同的程序:一個(gè)程序在不同的數(shù)據(jù)集里就構(gòu)成不同的進(jìn)程庄涡,能得到不同的結(jié)果;但是執(zhí)行過程中搬设,程序不能發(fā)生改變穴店。

進(jìn)程與程序的區(qū)別:
程序是指令和數(shù)據(jù)的有序集合,其本身沒有任何運(yùn)行的含義拿穴,是一個(gè)靜態(tài)的概念泣洞。而進(jìn)程是程序在處理機(jī)上的一次執(zhí)行過程,它是一個(gè)動(dòng)態(tài)的概念默色。程序可以作為一種軟飲料長(zhǎng)期存在球凰,而進(jìn)程是有一定生命期的。程序是永久的,晉城市戰(zhàn)士的弟蚀。

注意:同一個(gè)程序執(zhí)行兩次蚤霞,就會(huì)在操作系統(tǒng)中出現(xiàn)兩個(gè)進(jìn)程,所以我們可以同事運(yùn)行一個(gè)軟件义钉,分別做不同的事情也不會(huì)混亂昧绣。

進(jìn)程的并行與并發(fā)

并行:并行是指兩者同時(shí)執(zhí)行,比如賽跑捶闸,兩個(gè)人都在不停地往前跑夜畴;

并發(fā):并發(fā)是指資源有限的情況下,兩者交替輪流使用資源删壮,比如一段路同時(shí)只能過一個(gè)人贪绘,A走一段后,讓給B央碟,B用完税灌,繼續(xù)給A,交替使用亿虽,目的是提高效率菱涤。

區(qū)別:
并行是從微觀上,也就是在一個(gè)精確的時(shí)間片刻洛勉,有不同的程序在執(zhí)行粘秆,這就要求必須有多個(gè)處理器。
并發(fā)是從宏觀上收毫,在一個(gè)時(shí)間段史昂可以看出是同事執(zhí)行的攻走,比如一個(gè)服務(wù)器同時(shí)處理多個(gè)session。

同步異步阻塞非阻塞

進(jìn)程三狀態(tài)轉(zhuǎn)換圖:


1.png

在了解其他概念之前此再,我們首先要了解進(jìn)程的幾個(gè)狀態(tài)昔搂。在程序運(yùn)行的過程中,由于被操作系統(tǒng)的調(diào)度算法控制引润,程序會(huì)進(jìn)入幾個(gè)狀態(tài):就緒巩趁,運(yùn)行和阻塞。

(1)就緒(Ready)狀態(tài)

當(dāng)進(jìn)程已分配到除CPU以外的所有必要的資源淳附,只要獲得處理機(jī)便可立即執(zhí)行议慰,這時(shí)的進(jìn)程狀態(tài)稱為就緒狀態(tài)。

(2)執(zhí)行/運(yùn)行(Running)狀態(tài)當(dāng)進(jìn)程已獲得處理機(jī)奴曙,其程序正在處理機(jī)上執(zhí)行别凹,此時(shí)的進(jìn)程狀態(tài)稱為執(zhí)行狀態(tài)。

(3)阻塞(Blocked)狀態(tài)正在執(zhí)行的進(jìn)程洽糟,由于等待某個(gè)事件發(fā)生而無法執(zhí)行時(shí)炉菲,便放棄處理機(jī)而處于阻塞狀態(tài)堕战。引起進(jìn)程阻塞的事件可有多種,例如拍霜,等待I/O完成嘱丢、申請(qǐng)緩沖區(qū)不能滿足、等待信件(信號(hào))等祠饺。


2.png

同步和異步

所謂同步就是一個(gè)任務(wù)的完成需要依賴另外一個(gè)任務(wù)時(shí)越驻,只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成道偷,這是一種可靠的任務(wù)序列缀旁。要么成功都成功,失敗都失敗勺鸦,兩個(gè)任務(wù)的狀態(tài)可以保持一致并巍。

所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作换途,依賴的任務(wù)也立即執(zhí)行懊渡,只要自己完成了整個(gè)任務(wù)就算完成了。至于被依賴的任務(wù)最終是否真正完成怀跛,依賴它的任務(wù)無法確定距贷,所以它是不可靠的任務(wù)序列柄冲。

比如我去銀行辦理業(yè)務(wù)吻谋,可能會(huì)有兩種方式:
第一種 :選擇排隊(duì)等候;
第二種 :選擇取一個(gè)小紙條上面有我的號(hào)碼现横,等到排到我這一號(hào)時(shí)由柜臺(tái)的人通知我輪到我去辦理業(yè)務(wù)了漓拾;

第一種:前者(排隊(duì)等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業(yè)務(wù)情況戒祠;

第二種:后者(等待別人通知)就是異步等待消息通知骇两。在異步消息處理中,等待消息通知者(在這個(gè)例子中就是等待辦理業(yè)務(wù)的人)往往注冊(cè)一個(gè)回調(diào)機(jī)制姜盈,在所等待的事件被觸發(fā)時(shí)由觸發(fā)機(jī)制(在這里是柜臺(tái)的人)通過某種機(jī)制(在這里是寫在小紙條上的號(hào)碼低千,喊號(hào))找到等待該事件的人。

阻塞與非阻塞

阻塞和非阻塞這兩個(gè)概念與程序(線程)等待消息通知(無所謂同步或者異步)時(shí)的狀態(tài)有關(guān)馏颂。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時(shí)的狀態(tài)角度來說的

繼續(xù)上面的那個(gè)例子示血,不論是排隊(duì)還是使用號(hào)碼等待通知,如果在這個(gè)等待的過程中救拉,等待者除了等待消息通知之外不能做其它的事情难审,那么該機(jī)制就是阻塞的,表現(xiàn)在程序中,也就是該程序一直阻塞在該函數(shù)調(diào)用處不能繼續(xù)往下執(zhí)行亿絮。
相反告喊,有的人喜歡在銀行辦理這些業(yè)務(wù)的時(shí)候一邊打打電話發(fā)發(fā)短信一邊等待麸拄,這樣的狀態(tài)就是非阻塞的,因?yàn)樗?等待者)沒有阻塞在這個(gè)消息通知上黔姜,而是一邊做自己的事情一邊等待拢切。

注意:同步非阻塞形式實(shí)際上是效率低下的,想象一下你一邊打著電話一邊還需要抬頭看到底隊(duì)伍排到你了沒有秆吵。如果把打電話和觀察排隊(duì)的位置看成是程序的兩個(gè)操作的話失球,這個(gè)程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的帮毁;而異步非阻塞形式卻沒有這樣的問題实苞,因?yàn)榇螂娫捠悄?等待者)的事情,而通知你則是柜臺(tái)(消息觸發(fā)機(jī)制)的事情烈疚,程序沒有在兩種不同的操作中來回切換黔牵。

同步/異步與阻塞/非阻塞

同步阻塞形式
  效率最低。拿上面的例子來說爷肝,就是你專心排隊(duì)猾浦,什么別的事都不做。

異步阻塞形式
  如果在銀行等待辦理業(yè)務(wù)的人采用的是異步的方式去等待消息被觸發(fā)(通知)灯抛,也就是領(lǐng)了一張小紙條金赦,假如在這段時(shí)間里他不能離開銀行做其它的事情,那么很顯然对嚼,這個(gè)人被阻塞在了這個(gè)等待的操作上面夹抗;

異步操作是可以被阻塞住的,只不過它不是在處理消息時(shí)阻塞纵竖,而是在等待消息通知時(shí)被阻塞漠烧。

同步非阻塞形式
  實(shí)際上是效率低下的。

想象一下你一邊打著電話一邊還需要抬頭看到底隊(duì)伍排到你了沒有靡砌,如果把打電話和觀察排隊(duì)的位置看成是程序的兩個(gè)操作的話已脓,這個(gè)程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的通殃。

異步非阻塞形式
  效率更高度液,

因?yàn)榇螂娫捠悄?等待者)的事情,而通知你則是柜臺(tái)(消息觸發(fā)機(jī)制)的事情画舌,程序沒有在兩種不同的操作中來回切換堕担。

比如說,這個(gè)人突然發(fā)覺自己煙癮犯了骗炉,需要出去抽根煙照宝,于是他告訴大堂經(jīng)理說,排到我這個(gè)號(hào)碼的時(shí)候麻煩到外面通知我一下句葵,那么他就沒有被阻塞在這個(gè)等待的操作上面厕鹃,自然這個(gè)就是異步+非阻塞的方式了兢仰。

很多人會(huì)把同步和阻塞混淆,是因?yàn)楹芏鄷r(shí)候同步操作會(huì)以阻塞的形式表現(xiàn)出來剂碴,同樣的把将,很多人也會(huì)把異步和非阻塞混淆,因?yàn)楫惒讲僮饕话愣疾粫?huì)在真正的IO操作處被阻塞忆矛。

進(jìn)程的創(chuàng)建與結(jié)束

進(jìn)程的創(chuàng)建

但凡是硬件察蹲,都需要有操作系統(tǒng)去管理,只要有操作系統(tǒng)催训,就有進(jìn)程的概念洽议,就需要有創(chuàng)建進(jìn)程的方式,一些操作系統(tǒng)只為一個(gè)應(yīng)用程序設(shè)計(jì)漫拭,比如微波爐中的控制器亚兄,一旦啟動(dòng)微波爐,所有的進(jìn)程都已經(jīng)存在采驻。

而對(duì)于通用系統(tǒng)(跑很多應(yīng)用程序)审胚,需要有系統(tǒng)運(yùn)行過程中創(chuàng)建或撤銷進(jìn)程的能力,主要分為4中形式創(chuàng)建新的進(jìn)程:

1. 系統(tǒng)初始化(查看進(jìn)程linux中用ps命令礼旅,windows中用任務(wù)管理器膳叨,前臺(tái)進(jìn)程負(fù)責(zé)與用戶交互,后臺(tái)運(yùn)行的進(jìn)程與用戶無關(guān)痘系,運(yùn)行在后臺(tái)并且只在需要時(shí)才喚醒的進(jìn)程菲嘴,稱為守護(hù)進(jìn)程,如電子郵件碎浇、web頁面临谱、新聞、打优А)

2. 一個(gè)進(jìn)程在運(yùn)行過程中開啟了子進(jìn)程(如nginx開啟多進(jìn)程,os.fork,subprocess.Popen等)

3. 用戶的交互式請(qǐng)求城豁,而創(chuàng)建一個(gè)新進(jìn)程(如用戶雙擊暴風(fēng)影音)

4. 一個(gè)批處理作業(yè)的初始化(只在大型機(jī)的批處理系統(tǒng)中應(yīng)用)

無論哪一種苟穆,新進(jìn)程的創(chuàng)建都是由一個(gè)已經(jīng)存在的進(jìn)程執(zhí)行了一個(gè)用于創(chuàng)建進(jìn)程的系統(tǒng)調(diào)用而創(chuàng)建的。

進(jìn)程的結(jié)束

1· 正常的退出(自愿唱星,如用戶電機(jī)及哦啊胡頁面的叉號(hào)雳旅,或者程序執(zhí)行完畢調(diào)用發(fā)起系統(tǒng)調(diào)用正常退出,在Linux中用exit间聊,在windows中用ExitProcess)
2· 出錯(cuò)退出
3·嚴(yán)重錯(cuò)誤(非自愿攒盈,執(zhí)行非法指令)
4· 被其他進(jìn)程殺死(非自愿)

python程序中的進(jìn)程操作:multiprocess模塊

Process模塊

Process模塊是一個(gè)創(chuàng)建進(jìn)程的模塊,借助這個(gè)模塊哎榴,就可以完成進(jìn)程的創(chuàng)建型豁。

Process([group [, target [, name [, args [, kwargs]]]]])僵蛛,由該類實(shí)例化得到的對(duì)象,表示一個(gè)子進(jìn)程中的任務(wù)(尚未啟動(dòng))

強(qiáng)調(diào):

  1. 需要使用關(guān)鍵字的方式來指定參數(shù)
  2. args指定的為傳給target函數(shù)的位置參數(shù)迎变,是一個(gè)元組形式充尉,必須有逗號(hào)

參數(shù)介紹:
1 group參數(shù)未使用,值始終為None
2 target表示調(diào)用對(duì)象衣形,即子進(jìn)程要執(zhí)行的任務(wù)
3 args表示調(diào)用對(duì)象的位置參數(shù)元組驼侠,args=(1,2,'egon',)
4 kwargs表示調(diào)用對(duì)象的字典,kwargs={'name':'egon','age':18}
5 name為子進(jìn)程的名稱

方法介紹

1 p.start():?jiǎn)?dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
2 p.run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法谆吴,正是它去調(diào)用target指定的函數(shù)倒源,我們自定義類的類中一定要實(shí)現(xiàn)該方法
3 p.terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作句狼,如果p創(chuàng)建了子進(jìn)程相速,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況鲜锚。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放突诬,進(jìn)而導(dǎo)致死鎖
4 p.is_alive():如果p仍然運(yùn)行,返回True
5 p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài)芜繁,而p是處于運(yùn)行的狀態(tài))旺隙。timeout是可選的超時(shí)時(shí)間,需要強(qiáng)調(diào)的是骏令,p.join只能join住start開啟的進(jìn)程蔬捷,而不能join住run開啟的進(jìn)程

屬性介紹

1 p.daemon:默認(rèn)值為False,如果設(shè)為True榔袋,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程周拐,當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止凰兑,并且設(shè)定為True后妥粟,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置
2 p.name:進(jìn)程的名稱
3 p.pid:進(jìn)程的pid
4 p.exitcode:進(jìn)程在運(yùn)行時(shí)為None吏够、如果為–N勾给,表示被信號(hào)N結(jié)束(了解即可)
5 p.authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性锅知,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功

使用process模塊創(chuàng)建進(jìn)程

在python中開啟第一個(gè)子進(jìn)程

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子進(jìn)程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('執(zhí)行主進(jìn)程的內(nèi)容了')

join方法

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)
    print('我是子進(jìn)程')


if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    #p.join()
    print('我是父進(jìn)程')

查看子進(jìn)程及其詳情

import os
from multiprocessing import Process

def f(x):
    print('子進(jìn)程id :',os.getpid(),'父進(jìn)程id :',os.getppid())
    return x*x

if __name__ == '__main__':
    print('主進(jìn)程id :', os.getpid())
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=(i,))
        p.start()

多個(gè)進(jìn)程同事運(yùn)行(子進(jìn)程的執(zhí)行順序不是根據(jù)啟動(dòng)順序決定的)

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)

再談join方法

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父進(jìn)程在執(zhí)行')
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父進(jìn)程在執(zhí)行')

除了上面這些開啟進(jìn)程的方法播急,還有一種以繼承Process類的形式開啟進(jìn)程的方式

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會(huì)自動(dòng)調(diào)用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')

進(jìn)程之間的數(shù)據(jù)隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子進(jìn)程內(nèi): ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主進(jìn)程內(nèi): ',n)

守護(hù)進(jìn)程

會(huì)隨著主進(jìn)程的結(jié)束而結(jié)束。

主進(jìn)程創(chuàng)建守護(hù)進(jìn)程

其一:守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止

其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

進(jìn)程之間是相互獨(dú)立的售睹,主進(jìn)程代碼運(yùn)行結(jié)束桩警,守護(hù)進(jìn)程隨即終止

守護(hù)進(jìn)程的啟動(dòng)

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True #一定要在p.start()前設(shè)置,設(shè)置p為守護(hù)進(jìn)程,禁止p創(chuàng)建子進(jìn)程,并且父進(jìn)程代碼執(zhí)行結(jié)束,p即終止運(yùn)行
p.start()
time.sleep(10) # 在sleep時(shí)查看進(jìn)程id對(duì)應(yīng)的進(jìn)程ps -ef|grep id
print('主')

主進(jìn)程代碼執(zhí)行結(jié)束守護(hù)進(jìn)程立即結(jié)束

from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印該行則主進(jìn)程代碼結(jié)束,則守護(hù)進(jìn)程p1應(yīng)該被終止.#可能會(huì)有p1任務(wù)執(zhí)行的打印信息123,因?yàn)橹鬟M(jìn)程打印main----時(shí),p1也執(zhí)行了,但是隨即被終止.

多進(jìn)程中的其他方法

進(jìn)程對(duì)象的其他方法:terminate, is_alive

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網(wǎng)紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網(wǎng)紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關(guān)閉進(jìn)程,不會(huì)立即關(guān)閉,所以is_alive立刻查看的結(jié)果可能還是存活
print(p1.is_alive()) #結(jié)果為True

print('開始')
print(p1.is_alive()) #結(jié)果為False

進(jìn)程對(duì)象的其他屬性:pid和name

class Myprocess(Process):
     def __init__(self,person):
         self.name=person   # name屬性是Process中的屬性,標(biāo)示進(jìn)程的名字
         super().__init__() # 執(zhí)行父類的初始化方法會(huì)覆蓋name屬性
         #self.name = person # 在這里設(shè)置就可以修改進(jìn)程名字了
         #self.person = person #如果不想覆蓋進(jìn)程名昌妹,就修改屬性名稱就可以了
     def run(self):
         print('%s正在和網(wǎng)紅臉聊天' %self.name)
         # print('%s正在和網(wǎng)紅臉聊天' %self.person)
         time.sleep(random.randrange(1,5))
         print('%s正在和網(wǎng)紅臉聊天' %self.name)
         # print('%s正在和網(wǎng)紅臉聊天' %self.person)

 
p1=Myprocess('哪吒')
p1.start()
print(p1.pid)    #可以查看子進(jìn)程的進(jìn)程id

進(jìn)程同步(multiprocess.Lock捶枢、multiprocess.Semaphore握截、multiprocess.Event)

鎖——multiprocess.Lock

程序的異步,就是讓多個(gè)任務(wù)可以同時(shí)在幾個(gè)進(jìn)程中斌并發(fā)處理柱蟀,他們之間的運(yùn)行沒有順序川蒙,一旦開啟也不受我們的控制。盡管并發(fā)編程讓我們更加充分的利用IO資源长已,但是也給我們帶來新的問題畜眨。

當(dāng)多個(gè)進(jìn)程使用同一份數(shù)據(jù)資源的時(shí)候,就會(huì)引發(fā)數(shù)據(jù)安全或順序混亂的問題术瓮。

進(jìn)程搶占輸出資源

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()

使用鎖維護(hù)執(zhí)行順序

# 由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競(jìng)爭(zhēng)
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()

上面這種情況雖然使用了加鎖的形式實(shí)現(xiàn)了順序的執(zhí)行康聂,但是程序又重新編程串行了,這樣的確會(huì)浪費(fèi)時(shí)間胞四,卻保證了數(shù)據(jù)的安全恬汁。

加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改辜伟,即串行的修改氓侧,沒錯(cuò),速度是慢了导狡,但犧牲了速度卻保證了數(shù)據(jù)安全约巷。
雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信,但問題是:
1.效率低(共享數(shù)據(jù)基于文件旱捧,而文件是硬盤上的數(shù)據(jù))
2.需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:1独郎、效率高(多個(gè)進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))2、幫我們處理好鎖問題枚赡。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊(duì)列和管道氓癌。
隊(duì)列和管道都是將數(shù)據(jù)存放于內(nèi)存中
隊(duì)列又是基于(管道+鎖)實(shí)現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來贫橙,
我們應(yīng)該盡量避免使用共享數(shù)據(jù)贪婉,盡可能使用消息傳遞和隊(duì)列,避免處理復(fù)雜的同步和鎖問題料皇,而且在進(jìn)程數(shù)目增多時(shí)谓松,往往可以獲得更好的可獲展性。

信號(hào)量——multiprocess.Semaphore

互斥鎖同時(shí)只允許一個(gè)線程更改數(shù)據(jù)践剂,而信號(hào)量Semaphore是同時(shí)允許一定數(shù)量的線程更改數(shù)據(jù) 。
假設(shè)商場(chǎng)里有4個(gè)迷你唱吧娜膘,所以同時(shí)可以進(jìn)去4個(gè)人逊脯,如果來了第五個(gè)人就要在外面等待,等到有人出來才能再進(jìn)去玩竣贪。
實(shí)現(xiàn):
信號(hào)量同步基于內(nèi)部計(jì)數(shù)器军洼,每調(diào)用一次acquire()巩螃,計(jì)數(shù)器減1;每調(diào)用一次release()匕争,計(jì)數(shù)器加1.當(dāng)計(jì)數(shù)器為0時(shí)避乏,acquire()調(diào)用被阻塞。這是迪科斯徹(Dijkstra)信號(hào)量概念P()和V()的Python實(shí)現(xiàn)甘桑。信號(hào)量同步機(jī)制適用于訪問像服務(wù)器這樣的有限資源拍皮。
信號(hào)量與進(jìn)程池的概念很像,但是要區(qū)分開跑杭,信號(hào)量涉及到加鎖的概念

例子:

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 占到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每個(gè)人在ktv中待的時(shí)間不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

事件——multiprocess.Event

python線程的事件用于主線程控制其他線程的執(zhí)行铆帽,事件主要提供了三個(gè)方法 set、wait德谅、clear爹橱。

事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False窄做,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞愧驱,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞椭盏。

clear:將“Flag”設(shè)置為False
set:將“Flag”設(shè)置為True

紅綠燈實(shí)例:

from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 進(jìn)程剛開啟组砚,is_set()的值是Flase,模擬信號(hào)燈為紅色
            print('\033[31m紅燈亮\033[0m庸汗,car%s等著' % n)
            e.wait()    # 阻塞惫确,等待is_set()的值變成True,模擬信號(hào)燈為綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #如果is_set()的值是Flase蚯舱,也就是紅燈改化,仍然回到while語句開始
                continue
            print('車開遠(yuǎn)了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 進(jìn)程剛開啟,is_set()的值是Flase枉昏,模擬信號(hào)燈為紅色
            print('\033[31m紅燈亮\033[0m陈肛,car%s等著' % n)
            e.wait(0.1) # 阻塞,等待設(shè)置等待時(shí)間兄裂,等待0.1s之后沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[33m紅燈,警車先走\(yùn)033[0m句旱,car %s' % n)
            else:
                print('\033[33;46m綠燈,警車走\(yùn)033[0m晰奖,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設(shè)置為False
        else:
            e.set()    # ---->將is_set()的值設(shè)置為True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 創(chuàng)建是個(gè)進(jìn)程控制10輛車
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 創(chuàng)建5個(gè)進(jìn)程控制5輛警車
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 創(chuàng)建一個(gè)進(jìn)程控制紅綠燈
    t.start()

    print('============》')

隊(duì)列和管道(multiprocess.Queue谈撒、multiprocess.Pipe)

隊(duì)列

創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列匾南,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞啃匿。

Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列。
參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù)溯乒,則無大小限制夹厌。
底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。

方法介紹:

Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列裆悄。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)矛纹。如果省略此參數(shù),則無大小限制光稼。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)或南。另外,還需要運(yùn)行支持線程以便隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?br> Queue的實(shí)例q具有以下方法:

q.get( [ block [ ,timeout ] ] )
返回q中的一個(gè)項(xiàng)目钟哥。如果q為空迎献,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止腻贰。block用于控制阻塞行為吁恍,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)播演。timeout是可選超時(shí)時(shí)間冀瓦,用在阻塞模式中。如果在制定的時(shí)間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎眯纯荆瑢⒁l(fā)Queue.Empty異常翼闽。

q.get_nowait( )
同q.get(False)方法。

q.put(item [, block [,timeout ] ] )
將item放入隊(duì)列洲炊。如果隊(duì)列已滿感局,此方法將阻塞至有空間可用為止。block控制阻塞行為暂衡,默認(rèn)為True询微。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)狂巢。timeout指定在阻塞模式中等待可用空間的時(shí)間長(zhǎng)短撑毛。超時(shí)后將引發(fā)Queue.Full異常。

q.qsize()
返回隊(duì)列中目前項(xiàng)目的正確數(shù)量唧领。此函數(shù)的結(jié)果并不可靠藻雌,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目斩个。在某些系統(tǒng)上胯杭,此方法可能引發(fā)NotImplementedError異常。

q.empty()
如果調(diào)用此方法時(shí) q為空受啥,返回True歉摧。如果其他進(jìn)程或線程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的腔呜。也就是說叁温,在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目核畴。

q.full()
如果q已滿膝但,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()方法)谤草。

q.close()
關(guān)閉隊(duì)列跟束,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí)丑孩,后臺(tái)線程將繼續(xù)寫入那些已入隊(duì)列但尚未寫入的數(shù)據(jù)冀宴,但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集温学,將自動(dòng)調(diào)用此方法略贮。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常欺嗤。例如哈误,如果某個(gè)使用者正被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤簿盅。

q.cancel_join_thread()
不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程轧拄。這可以防止join_thread()方法阻塞揽祥。

q.join_thread()
連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法后檩电,等待所有隊(duì)列項(xiàng)被消耗拄丰。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用俐末。調(diào)用q.cancel_join_thread()方法可以禁止這種行為料按。

生產(chǎn)者消費(fèi)者模型

在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線成和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度鹅搪。

為什么要用生產(chǎn)者消費(fèi)者模型站绪?
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程丽柿,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程恢准。在多項(xiàng)策劃好難過開發(fā) 當(dāng)中,如果生產(chǎn)者處理速度很快甫题,而消費(fèi)者處理速度很慢馁筐,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)坠非。同樣的道理敏沉,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了就覺這個(gè)問題于是就引入了生產(chǎn)者和消費(fèi)者模型盟迟。

什么事生產(chǎn)者和消費(fèi)者模型秋泳?
生產(chǎn)者消費(fèi)者模型是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者之間的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者批次之間不直接通訊你攒菠,而是通過阻塞隊(duì)列來進(jìn)行通訊迫皱,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給了阻塞隊(duì)列辖众,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)卓起,而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)凹炸,品能夠橫了生產(chǎn)者和消費(fèi)者的處理能力戏阅。

基于生產(chǎn)者消費(fèi)者模型:

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費(fèi)者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了啤它,但是消費(fèi)者c在取空了q之后奕筐,則一直處于死循環(huán)中且卡在q.get()這一步。

解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后蚕键,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào)救欧,這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)。

改良版:

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結(jié)束信號(hào)則結(jié)束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發(fā)送結(jié)束信號(hào)
if __name__ == '__main__':
    q=Queue()
    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費(fèi)者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

注意:結(jié)束信號(hào)None锣光,不一定要生產(chǎn)者發(fā)笆怠,主進(jìn)程同樣可以發(fā),但是主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送信號(hào)

可是誊爹,當(dāng)我們有多個(gè)消費(fèi)者的時(shí)候蹬刷,我們就需要發(fā)送多次結(jié)束信號(hào)。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結(jié)束信號(hào)則結(jié)束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費(fèi)者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號(hào)
    p2.join()
    p3.join()
    q.put(None) #有幾個(gè)消費(fèi)者就應(yīng)該發(fā)送幾次結(jié)束信號(hào)None
    q.put(None) #發(fā)送結(jié)束信號(hào)
    print('主')
joinableQueue([maxsize])

創(chuàng)建可連接的共享隊(duì)列频丘。這就像是一個(gè)Queue對(duì)象办成,單隊(duì)列允許項(xiàng)目使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)搂漠。

方法介紹:

JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外迂卢,還具有以下方法:

q.task_done()
使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理桐汤。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除的項(xiàng)目數(shù)量而克,將引發(fā)ValueError異常。

q.join()
生產(chǎn)者將使用此方法進(jìn)行阻塞怔毛,直到隊(duì)列中所有項(xiàng)目均被處理员萍。阻塞將持續(xù)到為隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。
下面的例子說明如何建立永遠(yuǎn)運(yùn)行的進(jìn)程拣度,使用和處理隊(duì)列上的項(xiàng)目碎绎。生產(chǎn)者將項(xiàng)目放入隊(duì)列螃壤,并等待它們被處理。

JoinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
    q.join() #生產(chǎn)完畢筋帖,使用此方法進(jìn)行阻塞奸晴,直到隊(duì)列中所有項(xiàng)目均被處理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生產(chǎn)者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費(fèi)者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    #主進(jìn)程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊(duì)列的數(shù)據(jù)
    #因而c1,c2也沒有存在的價(jià)值了,不需要繼續(xù)阻塞在進(jìn)程中影響主進(jìn)程了幕随。應(yīng)該隨著主進(jìn)程的結(jié)束而結(jié)束,所以設(shè)置成守護(hù)進(jìn)程就可以了蚁滋。

管道

創(chuàng)建管道的類:
Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1赘淮,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道
參數(shù)介紹:
dumplex:默認(rèn)管道是全雙工的睦霎,如果將duplex射成False梢卸,conn1只能用于接收,conn2只能用于發(fā)送副女。
主要方法:
conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象蛤高。如果沒有消息可接收,recv方法會(huì)一直阻塞碑幅。如果連接的另外一端已經(jīng)關(guān)閉戴陡,那么recv方法會(huì)拋出EOFError。
conn1.send(obj):通過連接發(fā)送對(duì)象沟涨。obj是與序列化兼容的任意對(duì)象
其他方法:
conn1.close():關(guān)閉連接恤批。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法
conn1.fileno():返回連接使用的整數(shù)文件描述符
conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用裹赴,返回True喜庞。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù)棋返,方法將立即返回結(jié)果延都。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)睛竣。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息晰房。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息射沟,超過了這個(gè)最大值殊者,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取躏惋。如果連接的另外一端已經(jīng)關(guān)閉幽污,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常簿姨。
conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū)距误,buffer是支持緩沖區(qū)接口的任意對(duì)象簸搞,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)准潭。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出趁俊,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收

conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中刑然,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)寺擂。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)泼掠。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間怔软,將引發(fā)BufferT ooShort異常。

例子:

from multiprocessing import Process, Pipe


def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

應(yīng)該特別注意管道端點(diǎn)的正確管理問題择镇。如果是生產(chǎn)者或消費(fèi)者中都沒有使用管道的某個(gè)端點(diǎn)挡逼,就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端腻豌,在消費(fèi)者中關(guān)閉管道的輸入端家坎。如果忘記執(zhí)行這些步驟,程序可能在消費(fèi)者中的recv()操作上掛起吝梅。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的虱疏,必須在所有進(jìn)程中關(guān)閉管道后才能生成EOFError異常。因此苏携,在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果做瞪,除非消費(fèi)者也關(guān)閉了相同的管道端點(diǎn)。

引發(fā)EOFError:

from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會(huì)引發(fā)EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()

pipe實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主進(jìn)程')

多個(gè)消費(fèi)者之間的競(jìng)爭(zhēng)問題帶來的數(shù)據(jù)不安全問題

from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進(jìn)程')

進(jìn)程之間的數(shù)據(jù)共享

對(duì)于將來來說兜叨,基于消息傳遞的并發(fā)編程肯定是大勢(shì)所趨穿扳。即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合国旷,通過消息隊(duì)列交換數(shù)據(jù)矛物。
這樣極大地減少了對(duì)使用鎖定和其他同步瘦點(diǎn)的需求,還可以擴(kuò)展到分布式系統(tǒng)中跪但。
但是進(jìn)程間就應(yīng)該盡量避免通信履羞,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題屡久。

Manager模塊的介紹

進(jìn)程間數(shù)據(jù)是獨(dú)立的忆首,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的
雖然進(jìn)程間數(shù)據(jù)獨(dú)立被环,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享糙及,事實(shí)上Manager的功能遠(yuǎn)不止于此

Manager例子

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會(huì)出現(xiàn)數(shù)據(jù)錯(cuò)亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

進(jìn)程池和multiprocess.Pool模塊

進(jìn)程池

在程序?qū)嶋H處理問題過程中,忙時(shí)會(huì)有成千上萬的任務(wù)需要被執(zhí)行筛欢,閑時(shí)可能只有零星任務(wù)浸锨。那么在成千上萬個(gè)任務(wù)需要被執(zhí)行的時(shí)候唇聘,我們就需要去創(chuàng)建成千上萬個(gè)進(jìn)程么?首先柱搜,創(chuàng)建進(jìn)程需要消耗時(shí)間迟郎,銷毀進(jìn)程也需要消耗時(shí)間。第二即便開啟了成千上萬的進(jìn)程聪蘸,操作系統(tǒng)也不能讓他們同時(shí)執(zhí)行宪肖,這樣反而會(huì)影響程序的效率。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程健爬。那么我們要怎么做呢控乾?

在這里,要給大家介紹一個(gè)進(jìn)程池的概念浑劳,定義一個(gè)池子阱持,在里面放上固定數(shù)量的進(jìn)程,有需求來了魔熏,就拿一個(gè)池中的進(jìn)程來處理任務(wù),等到處理完畢鸽扁,進(jìn)程并不關(guān)閉蒜绽,而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。如果有很多任務(wù)需要執(zhí)行桶现,池中的進(jìn)程數(shù)量不夠躲雅,任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行骡和。也就是說相赁,池中進(jìn)程的數(shù)量是固定的,那么同一時(shí)間最多有固定數(shù)量的進(jìn)程在運(yùn)行慰于。這樣不會(huì)增加操作系統(tǒng)的調(diào)度難度钮科,還節(jié)省了開閉進(jìn)程的時(shí)間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果婆赠。

multiprocess.Poll模塊

Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池

參數(shù)方法 :

1 numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù)绵脯,如果省略,將默認(rèn)使用cpu_count()的值
2 initializer:是每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象休里,默認(rèn)為None
3 initargs:是要傳給initializer的參數(shù)組

主要方法:

1 p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,kwargs),然后返回結(jié)果蛆挫。
2 '''需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù)妙黍,必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''
3
4 p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(
args,**kwargs),然后返回結(jié)果悴侵。
5 '''此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象拭嫁,接收輸入?yún)?shù)可免。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí)抓于,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作巴元,否則將接收其他異步操作中的結(jié)果毡咏。'''
6
7 p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作逮刨。如果所有操作持續(xù)掛起呕缭,它們將在工作進(jìn)程終止前完成
8
9 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用

其他方法:

1 方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj修己。實(shí)例具有以下方法
2 obj.get():返回結(jié)果恢总,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的睬愤。如果在指定時(shí)間內(nèi)還沒有到達(dá)片仿,將引發(fā)一場(chǎng)。如果遠(yuǎn)程操作中引發(fā)了異常尤辱,它將在調(diào)用此方法時(shí)再次被引發(fā)砂豌。
3 obj.ready():如果調(diào)用完成,返回True
4 obj.successful():如果調(diào)用完成且沒有引發(fā)異常光督,返回True阳距,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常
5 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?br> 6 obj.terminate():立即終止所有工作進(jìn)程结借,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作筐摘。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末船老,一起剝皮案震驚了整個(gè)濱河市咖熟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柳畔,老刑警劉巖馍管,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異荸镊,居然都是意外死亡咽斧,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門躬存,熙熙樓的掌柜王于貴愁眉苦臉地迎上來张惹,“玉大人,你說我怎么就攤上這事岭洲⊥鸲海” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵盾剩,是天一觀的道長(zhǎng)雷激。 經(jīng)常有香客問我替蔬,道長(zhǎng),這世上最難降的妖魔是什么屎暇? 我笑而不...
    開封第一講書人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任承桥,我火速辦了婚禮,結(jié)果婚禮上根悼,老公的妹妹穿的比我還像新娘凶异。我一直安慰自己,他們只是感情好挤巡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開白布剩彬。 她就那樣靜靜地躺著,像睡著了一般矿卑。 火紅的嫁衣襯著肌膚如雪喉恋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評(píng)論 1 305
  • 那天母廷,我揣著相機(jī)與錄音轻黑,去河邊找鬼。 笑死琴昆,一個(gè)胖子當(dāng)著我的面吹牛苔悦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播椎咧,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼把介!你這毒婦竟也來了勤讽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤拗踢,失蹤者是張志新(化名)和其女友劉穎脚牍,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巢墅,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡诸狭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了君纫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驯遇。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蓄髓,靈堂內(nèi)的尸體忽然破棺而出叉庐,到底是詐尸還是另有隱情,我是刑警寧澤会喝,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布陡叠,位于F島的核電站玩郊,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏枉阵。R本人自食惡果不足惜译红,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望兴溜。 院中可真熱鬧侦厚,春花似錦、人聲如沸昵慌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽斋攀。三九已至已卷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間淳蔼,已是汗流浹背侧蘸。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鹉梨,地道東北人讳癌。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像存皂,于是被迫代替她去往敵國(guó)和親晌坤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進(jìn)行...
    月亮是我踢彎得閱讀 5,967評(píng)論 3 28
  • 1.進(jìn)程和線程 隊(duì)列:1旦袋、進(jìn)程之間的通信: q = multiprocessing.Queue()2骤菠、...
    一只寫程序的猿閱讀 1,109評(píng)論 0 17
  • 一多任務(wù)的引入 有很多的場(chǎng)景中的事情是同時(shí)進(jìn)行的,比如開車的時(shí)候手和腳共同來駕駛汽車疤孕,再比如唱歌跳舞也是同時(shí)進(jìn)行的...
    五行缺覺閱讀 657評(píng)論 0 0
  • 【打卡】D16祭阀,A1702021-COCO鹉戚,第27輪 項(xiàng)目一:每天一節(jié)攝影課并整理筆記? 項(xiàng)目二:每天頭條號(hào)發(fā)布一...
    蔻蔻的美食物語閱讀 182評(píng)論 0 0
  • 《客戶需求分析》課題綱要 課題類別: 《市場(chǎng)營(yíng)銷管理》技能類課題。 培訓(xùn)對(duì)象: 企業(yè)中专控、高層管理人員抹凳、營(yíng)銷人員、銷...
    初小湄閱讀 139評(píng)論 0 1