顧名思義,進程即正在執(zhí)行的一個過程优训。進程是對正在運行程序的一個抽象猎贴。
進程的概念起源于操作系統(tǒng)腻豌,是操作系統(tǒng)最核心的概念,也是操作系統(tǒng)提供的最古老也是最重要的抽象概念之一嘱能。操作系統(tǒng)的其他所有內(nèi)容都是圍繞進程的概念展開的吝梅。
PS:即使可以利用的cpu只有一個(早期的計算機確實如此),也能保證支持(偽)并發(fā)的能力惹骂。將一個單獨的cpu變成多個虛擬的cpu(多道技術(shù):時間多路復(fù)用和空間多路復(fù)用+硬件上支持隔離)苏携,沒有進程的抽象,現(xiàn)代計算機將不復(fù)存在对粪。
必備的理論基礎(chǔ)
一 操作系統(tǒng)的作用:
1:隱藏丑陋復(fù)雜的硬件接口右冻,提供良好的抽象接口
2:管理、調(diào)度進程著拭,并且將多個進程對硬件的競爭變得有序二 多道技術(shù):
1.產(chǎn)生背景:針對單核纱扭,實現(xiàn)并發(fā)
ps:現(xiàn)在的主機一般是多核,那么每個核都會利用多道技術(shù)
有4個cpu儡遮,運行于cpu1的某個程序遇到io阻塞乳蛾,會等到io結(jié)束再重新調(diào)度,會被調(diào)度到4個
cpu中的任意一個,具體由操作系統(tǒng)調(diào)度算法決定肃叶。
2.空間上的復(fù)用:如內(nèi)存中同時有多道程序
3.時間上的復(fù)用:復(fù)用一個cpu的時間片
強調(diào):遇到io切蹂随,占用cpu時間過長也切,核心在于切之前將進程的狀態(tài)保存下來因惭,這樣
才能保證下次切換回來時岳锁,能基于上次切走的位置繼續(xù)運行
一、什么是進程
進程(Process)是計算機中的程序關(guān)于某數(shù)據(jù)集合上的一次運行活動蹦魔,是系統(tǒng)進行資源分配和調(diào)度的基本單位激率,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)。在早期面向進程設(shè)計的計算機結(jié)構(gòu)中勿决,進程是程序的基本執(zhí)行實體柱搜;在當代面向線程設(shè)計的計算機結(jié)構(gòu)中,進程是線程的容器剥险。程序是指令聪蘸、數(shù)據(jù)及其組織形式的描述,進程是程序的實體表制。
狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)健爬。
廣義定義:進程是一個具有一定獨立功能的程序關(guān)于某個數(shù)據(jù)集合的一次運行活動。它是操作系統(tǒng)動態(tài)執(zhí)行的基本單元么介,在傳統(tǒng)的操作系統(tǒng)中娜遵,進程既是基本的分配單元,也是基本的執(zhí)行單元壤短。
進程的進一步了解:
第一设拟,進程是一個實體。每一個進程都有它自己的地址空間久脯,一般情況下纳胧,包括文本區(qū)域(text region)、數(shù)據(jù)區(qū)域(data region)和堆棧(stack region)帘撰。文本區(qū)域存儲處理器執(zhí)行的代碼跑慕;數(shù)據(jù)區(qū)域存儲變量和進程執(zhí)行期間使用的動態(tài)分配的內(nèi)存;堆棧區(qū)域存儲著活動過程調(diào)用的指令和本地變量摧找。
第二核行,進程是一個“執(zhí)行中的程序”。程序是一個沒有生命的實體蹬耘,只有處理器賦予程序生命時(操作系統(tǒng)執(zhí)行之)芝雪,它才能成為一個活動的實體,我們稱其為進程综苔。
進程是操作系統(tǒng)中最基本惩系、重要的概念位岔。是多道程序系統(tǒng)出現(xiàn)后,為了刻畫系統(tǒng)內(nèi)部出現(xiàn)的動態(tài)情況蛆挫,描述系統(tǒng)內(nèi)部各道程序的活動規(guī)律引進的一個概念,所有多道程序設(shè)計操作系統(tǒng)都建立在進程的基礎(chǔ)上赃承。
操作系統(tǒng)引入進程的概念的原因
從理論角度看妙黍,是對正在運行的程序過程的抽象悴侵;
從實現(xiàn)角度看,是一種數(shù)據(jù)結(jié)構(gòu)拭嫁,目的在于清晰地刻畫動態(tài)系統(tǒng)的內(nèi)在規(guī)律可免,有效管理和調(diào)度進入計算機系統(tǒng)主存儲器運行的程序。
進程的特征
動態(tài)性:進程的實質(zhì)是程序在多道程序系統(tǒng)中的一次執(zhí)行過程做粤,進程是動態(tài)產(chǎn)生浇借,動態(tài)消亡的。
并發(fā)性:任何進程都可以同其他進程一起并發(fā)執(zhí)行
獨立性:進程是一個能獨立運行的基本單位怕品,同時也是系統(tǒng)分配資源和調(diào)度的獨立單位妇垢;
異步性:由于進程間的相互制約,使進程具有執(zhí)行的間斷性肉康,即進程按各自獨立的闯估、不可預(yù)知的速度向前推進
結(jié)構(gòu)特征:
進程由程序、數(shù)據(jù)和進程控制塊三部分組成吼和。
多個不同的進程可以包含相同的程序:一個程序在不同的數(shù)據(jù)集里就構(gòu)成不同的進程涨薪,能得到不同的結(jié)果;但是執(zhí)行過程中炫乓,程序不能發(fā)生改變刚夺。
進程與程序的區(qū)別
程序是指令和數(shù)據(jù)的有序集合,其本身沒有任何運行的含義末捣,是一個靜態(tài)的概念侠姑。
而進程是程序在處理機上的一次執(zhí)行過程,它是一個動態(tài)的概念箩做。
程序可以作為一種軟件資料長期存在结借,而進程是有一定生命期的。
程序是永久的卒茬,進程是暫時的船老。
注意:同一個程序執(zhí)行兩次,就會在操作系統(tǒng)中出現(xiàn)兩個進程圃酵,所以我們可以同時運行一個軟件柳畔,分別做不同的事情也不會混亂。
二郭赐、進程調(diào)度
要想多個進程交替運行薪韩,操作系統(tǒng)必須對這些進程進行調(diào)度确沸,這個調(diào)度也不是隨即進行的,而是需要遵循一定的法則俘陷,由此就有了進程的調(diào)度算法罗捎。
先來先服務(wù)調(diào)度算法:
先來先服務(wù)(FCFS)調(diào)度算法是一種最簡單的調(diào)度算法,該算法既可用于作業(yè)調(diào)度拉盾,也可用于進程調(diào)度桨菜。FCFS算法比較有利于長作業(yè)(進程),而不利于短作業(yè)(進程)捉偏。由此可知倒得,本算法適合于CPU繁忙型作業(yè),而不利于I/O繁忙型的作業(yè)(進程)夭禽。
短作業(yè)優(yōu)先調(diào)度算法:
短作業(yè)(進程)優(yōu)先調(diào)度算法(SJ/PF)是指對短作業(yè)或短進程優(yōu)先調(diào)度的算法霞掺,該算法既可用于作業(yè)調(diào)度,也可用于進程調(diào)度讹躯。但其對長作業(yè)不利菩彬;不能保證緊迫性作業(yè)(進程)被及時處理;作業(yè)的長短只是被估算出來的潮梯。
時間片輪轉(zhuǎn)算法:
時間片輪轉(zhuǎn)(Round Robin骗灶,RR)法的基本思路是讓每個進程在就緒隊列中的等待時間與享受服務(wù)的時間成比例。在時間片輪轉(zhuǎn)法中酷麦,需要將CPU的處理時間分成固定大小的時間片矿卑,例如,幾十毫秒至幾百毫秒沃饶。如果一個進程在被調(diào)度選中之后用完了系統(tǒng)規(guī)定的時間片母廷,但又未完成要求的任務(wù),則它自行釋放自己所占有的CPU而排到就緒隊列的末尾糊肤,等待下一次調(diào)度琴昆。同時,進程調(diào)度程序又去調(diào)度當前就緒隊列中的第一個進程馆揉。
顯然业舍,輪轉(zhuǎn)法只能用來調(diào)度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪升酣,而且可以將它們再分配給別的進程舷暮。CPU是可搶占資源的一種。但打印機等資源是不可搶占的噩茄。由于作業(yè)調(diào)度是對除了CPU之外的所有系統(tǒng)硬件資源的分配下面,其中包含有不可搶占資源,所以作業(yè)調(diào)度不使用輪轉(zhuǎn)法绩聘。
在輪轉(zhuǎn)法中沥割,時間片長度的選取非常重要耗啦。首先,時間片長度的選擇會直接影響到系統(tǒng)的開銷和響應(yīng)時間机杜。如果時間片長度過短帜讲,則調(diào)度程序搶占處理機的次數(shù)增多。這將使進程上下文切換次數(shù)也大大增加椒拗,從而加重系統(tǒng)開銷似将。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執(zhí)行時間最長的進程能執(zhí)行完畢枚尼,則輪轉(zhuǎn)法變成了先來先服務(wù)法惰匙。時間片長度的選擇是根據(jù)系統(tǒng)對響應(yīng)時間的要求和就緒隊列中所允許最大的進程數(shù)來確定的。
在輪轉(zhuǎn)法中预茄,加入到就緒隊列的進程有3種情況:
一種是分給它的時間片用完兴溜,但進程還未完成,回到就緒隊列的末尾等待下次調(diào)度去繼續(xù)執(zhí)行耻陕。
另一種情況是分給該進程的時間片并未用完拙徽,只是因為請求I/O或由于進程的互斥與同步關(guān)系而被阻塞。當阻塞解除之后再回到就緒隊列诗宣。
第三種情況就是新創(chuàng)建進程進入就緒隊列膘怕。
如果對這些進程區(qū)別對待,給予不同的優(yōu)先級和時間片從直觀上看召庞,可以進一步改善系統(tǒng)服務(wù)質(zhì)量和效率岛心。例如,我們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞原因分成不同的就緒隊列篮灼,每個隊列按FCFS原則排列忘古,各隊列之間的進程享有不同的優(yōu)先級,但同一隊列內(nèi)優(yōu)先級相同诅诱。這樣髓堪,當一個進程在執(zhí)行完它的時間片之后,或從睡眠中被喚醒以及被創(chuàng)建之后娘荡,將進入不同的就緒隊列干旁。多級反饋隊列調(diào)度算法
前面介紹的各種用作進程調(diào)度的算法都有一定的局限性。如短進程優(yōu)先的調(diào)度算法炮沐,僅照顧了短進程而忽略了長進程争群,而且如果并未指明進程的長度,則短進程優(yōu)先和基于進程長度的搶占式調(diào)度算法都將無法使用央拖。
而多級反饋隊列調(diào)度算法則不必事先知道各種進程所需的執(zhí)行時間祭阀,而且還可以滿足各種類型進程的需要鹉戚,因而它是目前被公認的一種較好的進程調(diào)度算法。在采用多級反饋隊列調(diào)度算法的系統(tǒng)中专控,調(diào)度算法的實施過程如下所述抹凳。
(1) 應(yīng)設(shè)置多個就緒隊列,并為各個隊列賦予不同的優(yōu)先級伦腐。第一個隊列的優(yōu)先級最高赢底,第二個隊列次之,其余各隊列的優(yōu)先權(quán)逐個降低柏蘑。該算法賦予各個隊列中進程執(zhí)行時間片的大小也各不相同幸冻,在優(yōu)先權(quán)愈高的隊列中,為每個進程所規(guī)定的執(zhí)行時間片就愈小咳焚。例如洽损,第二個隊列的時間片要比第一個隊列的時間片長一倍,……革半,第i+1個隊列的時間片要比第i個隊列的時間片長一倍碑定。
(2) 當一個新進程進入內(nèi)存后,首先將它放入第一隊列的末尾又官,按FCFS原則排隊等待調(diào)度延刘。當輪到該進程執(zhí)行時,如它能在該時間片內(nèi)完成六敬,便可準備撤離系統(tǒng)碘赖;如果它在一個時間片結(jié)束時尚未完成,調(diào)度程序便將該進程轉(zhuǎn)入第二隊列的末尾外构,再同樣地按FCFS原則等待調(diào)度執(zhí)行普泡;如果它在第二隊列中運行一個時間片后仍未完成,再依次將它放入第三隊列典勇,……劫哼,如此下去,當一個長作業(yè)(進程)從第一隊列依次降到第n隊列后割笙,在第n 隊列便采取按時間片輪轉(zhuǎn)的方式運行权烧。
(3) 僅當?shù)谝魂犃锌臻e時,調(diào)度程序才調(diào)度第二隊列中的進程運行伤溉;僅當?shù)?~(i-1)隊列均空時般码,才會調(diào)度第i隊列中的進程運行。如果處理機正在第i隊列中為某進程服務(wù)時乱顾,又有新進程進入優(yōu)先權(quán)較高的隊列(第1~(i-1)中的任何一個隊列)板祝,則此時新進程將搶占正在運行進程的處理機,即由調(diào)度程序把正在運行的進程放回到第i隊列的末尾走净,把處理機分配給新到的高優(yōu)先權(quán)進程券时。
三孤里、進程的并行和并發(fā)
并行 : 并行是指兩者同時執(zhí)行,比如賽跑橘洞,兩個人都在不停的往前跑捌袜;(資源夠用,比如三個線程炸枣,四核的CPU )
并發(fā) : 并發(fā)是指資源有限的情況下虏等,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人适肠,A走一段后霍衫,讓給B,B用完繼續(xù)給A 侯养,交替使用敦跌,目的是提高效率。
并行是從微觀上沸毁,也就是在一個精確的時間片刻峰髓,有不同的程序在執(zhí)行傻寂,這就要求必須有多個處理器息尺。
并發(fā)是從宏觀上,在一個時間段上可以看出是同時執(zhí)行的疾掰,比如一個服務(wù)器同時處理多個session搂誉。
四、同步異步阻塞非阻塞
三狀態(tài)轉(zhuǎn)換圖
在了解其他概念之前静檬,我們首先要了解進程的幾個狀態(tài)炭懊。在程序運行的過程中,由于被操作系統(tǒng)的調(diào)度算法控制拂檩,程序會進入幾個狀態(tài):就緒侮腹,運行和阻塞。
〉纠(1)就緒(Ready)狀態(tài)
當進程已分配到除CPU以外的所有必要的資源父阻,只要獲得處理機便可立即執(zhí)行,這時的進程狀態(tài)稱為就緒狀態(tài)望抽。
〖用(2)執(zhí)行/運行(Running)狀態(tài)
當進程已獲得處理機,其程序正在處理機上執(zhí)行煤篙,此時的進程狀態(tài)稱為執(zhí)行狀態(tài)斟览。
(3)阻塞(Blocked)狀態(tài)
正在執(zhí)行的進程辑奈,由于等待某個事件發(fā)生而無法執(zhí)行時苛茂,便放棄處理機而處于阻塞狀態(tài)已烤。引起進程阻塞的事件可有多種,例如妓羊,等待I/O完成草戈、申請緩沖區(qū)不能滿足、等待信件(信號)等侍瑟。
實例
同步和異步
所謂同步就是一個任務(wù)的完成需要依賴另外一個任務(wù)時唐片,只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成涨颜,這是一種可靠的任務(wù)序列费韭。要么成功都成功,失敗都失敗庭瑰,兩個任務(wù)的狀態(tài)可以保持一致星持。
所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作弹灭,依賴的任務(wù)也立即執(zhí)行督暂,只要自己完成了整個任務(wù)就算完成了。至于被依賴的任務(wù)最終是否真正完成穷吮,依賴它的任務(wù)無法確定逻翁,所以它是不可靠的任務(wù)序列。
阻塞與非阻塞
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態(tài)有關(guān)捡鱼。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態(tài)角度來說的
同步/異步與阻塞/非阻塞
同步阻塞形式
效率最低八回。拿上面的例子來說,就是你專心排隊驾诈,什么別的事都不做缠诅。異步阻塞形式
如果在銀行等待辦理業(yè)務(wù)的人采用的是異步的方式去等待消息被觸發(fā)(通知),也就是領(lǐng)了一張小紙條乍迄,假如在這段時間里他不能離開銀行做其它的事情管引,那么很顯然,這個人被阻塞在了這個等待的操作上面闯两;異步操作是可以被阻塞住的褥伴,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞生蚁。
同步非阻塞形式
實際上是效率低下的噩翠。
想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話邦投,這個程序需要在這兩種不同的行為之間來回的切換伤锚,效率可想而知是低下的。異步非阻塞形式
效率更高,
因為打電話是你(等待者)的事情屯援,而通知你則是柜臺(消息觸發(fā)機制)的事情猛们,程序沒有在兩種不同的操作中來回切換。比如說狞洋,這個人突然發(fā)覺自己煙癮犯了弯淘,需要出去抽根煙,于是他告訴大堂經(jīng)理說吉懊,排到我這個號碼的時候麻煩到外面通知我一下庐橙,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了借嗽。
很多人會把同步和阻塞混淆态鳖,是因為很多時候同步操作會以阻塞的形式表現(xiàn)出來,同樣的恶导,很多人也會把異步和非阻塞混淆浆竭,因為異步操作一般都不會在真正的IO操作處被阻塞。
五惨寿、進程的創(chuàng)建與結(jié)束
進程的創(chuàng)建
但凡是硬件邦泄,都需要有操作系統(tǒng)去管理,只要有操作系統(tǒng)裂垦,就有進程的概念顺囊,就需要有創(chuàng)建進程的方式,一些操作系統(tǒng)只為一個應(yīng)用程序設(shè)計缸废,比如微波爐中的控制器包蓝,一旦啟動微波爐,所有的進程都已經(jīng)存在企量。
而對于通用系統(tǒng)(跑很多應(yīng)用程序),需要有系統(tǒng)運行過程中創(chuàng)建或撤銷進程的能力亡电,主要分為4中形式創(chuàng)建新的進程:
1. 系統(tǒng)初始化(查看進程linux中用ps命令届巩,windows中用任務(wù)管理器,前臺進程負責與用戶交互份乒,后臺運行的進程與用戶無關(guān)恕汇,運行在后臺并且只在需要時才喚醒的進程,稱為守護進程或辖,如電子郵件瘾英、web頁面、新聞颂暇、打尤鼻础)
2. 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求耳鸯,而創(chuàng)建一個新進程(如用戶雙擊暴風(fēng)影音)
4. 一個批處理作業(yè)的初始化(只在大型機的批處理系統(tǒng)中應(yīng)用)
無論哪一種湿蛔,新進程的創(chuàng)建都是由一個已經(jīng)存在的進程執(zhí)行了一個用于創(chuàng)建進程的系統(tǒng)調(diào)用而創(chuàng)建的膀曾。
進程的結(jié)束
- 正常退出(自愿,如用戶點擊交互式頁面的叉號阳啥,或程序執(zhí)行完畢調(diào)用發(fā)起系統(tǒng)調(diào)用正常退出添谊,在linux中用exit,在windows中用ExitProcess)
- 出錯退出(自愿察迟,python a.py中a.py不存在)
- 嚴重錯誤(非自愿斩狱,執(zhí)行非法指令,如引用不存在的內(nèi)存扎瓶,1/0等喊废,可以捕捉異常,try...except...)
- 被其他進程殺死(非自愿栗弟,如kill -9)
六污筷、在python程序中的進程操作---multiprocess模塊
仔細說來,multiprocess不是一個模塊而是python中一個操作乍赫、管理進程的包瓣蛀。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關(guān)的所有子模塊。由于提供的子模塊非常多雷厂,為了方便大家歸類記憶惋增,我將這部分大致分為四個部分:創(chuàng)建進程部分,進程同步部分改鲫,進程池部分诈皿,進程之間數(shù)據(jù)共享。
1. 創(chuàng)建進程——multiprocessing.process模塊
import time
from multiprocessing import Process
def f(name):
print('hello', name)
time.sleep(1)
if __name__ == '__main__':
p_lst = []
for i in range(5):
p = Process(target=f, args=('bob',))
p.start()
p_lst.append(p)
[p.join() for p in p_lst]
print('父進程在執(zhí)行')
參數(shù)介紹:
1 group參數(shù)未使用像棘,值始終為None
2 target表示調(diào)用對象稽亏,即子進程要執(zhí)行的任務(wù)
3 args表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'egon',)
4 kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18}
5 name為子進程的名稱
方法介紹
1 p.start():啟動進程缕题,并調(diào)用該子進程中的p.run()
2 p.run():進程啟動時運行的方法截歉,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法
3 p.terminate():強制終止進程p烟零,不會進行任何清理操作瘪松,如果p創(chuàng)建了子進程,該子進程就成了僵尸進程锨阿,使用該方法需要特別小心這種情況宵睦。如果p還保存了一個鎖那么也將不會被釋放,進而導(dǎo)致死鎖
4 p.is_alive():如果p仍然運行墅诡,返回True
5 p.join([timeout]):主線程等待p終止(強調(diào):是主線程處于等的狀態(tài)壳嚎,而p是處于運行的狀態(tài))。timeout是可選的超時時間,需要強調(diào)的是诬辈,p.join只能join住start開啟的進程酵使,而不能join住run開啟的進程
屬性介紹
1 p.daemon:默認值為False,如果設(shè)為True焙糟,代表p為后臺運行的守護進程口渔,當p的父進程終止時,p也隨之終止穿撮,并且設(shè)定為True后缺脉,p不能創(chuàng)建自己的新進程,必須在p.start()之前設(shè)置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時為None悦穿、如果為–N攻礼,表示被信號N結(jié)束(了解即可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進程間通信提供安全性栗柒,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
注意:在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進程的機制)礁扮,在創(chuàng)建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執(zhí)行了整個文件瞬沦。因此如果將process()直接寫在文件中就會無限遞歸創(chuàng)建子進程報錯太伊。所以必須把創(chuàng)建子進程的部分使用if _name_ ==‘_main_’ 判斷保護起來,import 的時候 逛钻,就不會遞歸運行了僚焦。
除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式:
import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(os.getpid())
print('%s 正在和女主播聊天' % self.name)
if __name__ == '__main__':
p1 = MyProcess('wangjifei')
p1.start() # start會自動調(diào)用run
p1.join()
print('主線程')
進程之間的數(shù)據(jù)隔離:
進程與進程之間的數(shù)據(jù)是隔離的,內(nèi)存空間是不能共享的,從而保證了數(shù)據(jù)的安全曙痘。
from multiprocessing import Process
def work():
global n
n=0
print('子進程內(nèi): ',n)
if __name__ == '__main__':
n = 100
p=Process(target=work)
p.start()
print('主進程內(nèi): ',n)
守護進程
守護進程會隨著主進程的結(jié)束而結(jié)束芳悲。
主進程創(chuàng)建守護進程
其一:守護進程會在主進程代碼執(zhí)行結(jié)束后就終止
其二:守護進程內(nèi)無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結(jié)束边坤,守護進程隨即終止
class Myprocess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(),self.name)
print('%s正在和女主播聊天' %self.person)
if __name__ == '__main__':
p=Myprocess('wangjifei')
// 一定要在p.start()前設(shè)置,設(shè)置p為守護進程,禁止p創(chuàng)建子進程,并且父進程代碼執(zhí)行結(jié)束,p即終止運行
p.daemon=True
p.start()
time.sleep(5)
print('主進程')
socket聊天并發(fā)示例:
//服務(wù)器端
from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__': #windows下start進程一定要寫到這下面
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))
p.start()
---------------------------------------------------------------------------------------------
//客戶端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
2. 進程同步(multiprocessing.Lock名扛、multiprocessing.Semaphore)
鎖 —— multiprocessing.Lock
通過剛剛的學(xué)習(xí),我們千方百計實現(xiàn)了程序的異步惩嘉,讓多個任務(wù)可以同時在幾個進程中并發(fā)處理罢洲,他們之間的運行沒有順序,一旦開啟也不受我們控制文黎。盡管并發(fā)編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題殿较。
當多個進程使用同一份數(shù)據(jù)資源的時候耸峭,就會引發(fā)數(shù)據(jù)安全或順序混亂問題。
import time
import os
import random
from multiprocessing import Process, Lock
def work(lock, n):
# 用法一
with lock:
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
# 用法二
# lock.acquire() //上鎖
# print('%s: %s is running' % (n, os.getpid()))
# time.sleep(random.random())
# print('%s: %s is done' % (n, os.getpid()))
# lock.release() // 還鎖
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=work, args=(lock, i))
p.start()
上面這種情況雖然使用加鎖的形式實現(xiàn)了順序的執(zhí)行淋纲,但是程序又重新變成串行了劳闹,這樣確實會浪費了時間,卻保證了數(shù)據(jù)的安全。
信號量 —— multiprocess.Semaphore(了解)鎖加計數(shù)器
信號量同步基于內(nèi)部計數(shù)器本涕,每調(diào)用一次acquire()业汰,計數(shù)器減1;每調(diào)用一次release()菩颖,計數(shù)器加1.當計數(shù)器為0時样漆,acquire()調(diào)用被阻塞。
死鎖與遞歸鎖
所謂死鎖:
是指兩個或兩個以上的進程在執(zhí)行過程中晦闰,因爭奪資源而造成的一種互相等待的現(xiàn)象放祟,若無外力作用,它們都將無法推進下去呻右。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖跪妥,這些永遠在互相等待的進程稱為死鎖進程。解決方法声滥,遞歸鎖:
在Python中為了支持在同一線程中多次請求同一資源眉撵,python提供了可重入鎖RLock。這個RLock內(nèi)部維護著一個Lock和一個counter變量落塑,counter記錄了acquire的次數(shù)纽疟,從而使得資源可以被多次require。直到一個線程所有的acquire都被release芜赌,其他的線程才能獲得資源仰挣。經(jīng)典的科學(xué)家吃面問題,就是死鎖的原因缠沈,通過RLock遞歸鎖來解決它1旌!洲愤!
from multiprocessing import RLock
mutexA=RLock()
mutexA.acquire()
mutexA.acquire()
print("遞歸鎖")
mutexA.release()
mutexA.release()
3. 進程間通信——隊列和管道(multiprocess.Queue颓芭、multiprocess.Pipe)
進程間通信:IPC(Inter-Process Communication)
隊列——multiprocessing.Queue
創(chuàng)建共享的進程隊列,Queue是多進程安全的隊列柬赐,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞亡问。
import os
import time
import multiprocessing
# 向queue中輸入數(shù)據(jù)的函數(shù)
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
# 向queue中輸出數(shù)據(jù)的函數(shù)
def outputQ(queue):
info = queue.get()
print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))
# Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] # store input processes
record2 = [] # store output processes
queue = multiprocessing.Queue(3)
# 輸入進程
for i in range(10):
process = multiprocessing.Process(target=inputQ, args=(queue,))
process.start()
record1.append(process)
# 輸出進程
for i in range(10):
process = multiprocessing.Process(target=outputQ, args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
Queue方法介紹:
Queue([maxsize])創(chuàng)建共享的進程隊列。maxsize是隊列中允許的最大項數(shù)肛宋。如果省略此參數(shù)州藕,則無大小限制。底層隊列使用管道和鎖定實現(xiàn)酝陈。另外床玻,還需要運行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>Queue的實例q具有以下方法:
q.get(block=True, timeout=None):返回q中的一個項目。如果q為空沉帮,此方法將阻塞锈死,直到隊列中有項目可用為止贫堰。block用于控制阻塞行為,默認為True. 如果設(shè)置為False待牵,將引發(fā)Queue.Empty異常(定義在Queue模塊中)其屏。timeout是可選超時時間,用在阻塞模式中缨该。如果在制定的時間間隔內(nèi)沒有項目變?yōu)榭捎觅诵校瑢⒁l(fā)Queue.Empty異常。
q.get_nowait( ):同q.get(False)方法压彭。如果隊列滿了不會阻塞睦优,但是會因為沒取到值而報錯。
q.put(item, block=True, timeout=None ):將item放入隊列壮不。如果隊列已滿汗盘,此方法將阻塞至有空間可用為止。block控制阻塞行為询一,默認為True隐孽。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)健蕊。timeout指定在阻塞模式中等待可用空間的時間長短菱阵。超時后將引發(fā)Queue.Full異常。
put_nowait(obj): 同put(obj, False),可以使用put_nowait(obj)缩功,如果隊列滿了不會阻塞晴及,但是會因為隊列滿了而報錯。
q.qsize(): 返回隊列中目前項目的正確數(shù)量嫡锌。此函數(shù)的結(jié)果并不可靠虑稼,因為在返回結(jié)果和在稍后程序中使用結(jié)果之間,隊列中可能添加或刪除了項目势木。在某些系統(tǒng)上蛛倦,此方法可能引發(fā)NotImplementedError異常。
q.empty():如果調(diào)用此方法時 q為空啦桌,返回True溯壶。如果其他進程或線程正在往隊列中添加項目,結(jié)果是不可靠的甫男。也就是說且改,在返回和使用結(jié)果之間,隊列中可能已經(jīng)加入新的項目板驳。
q.full(): 如果q已滿钾虐,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考 q.empty())
q.close():關(guān)閉隊列笋庄,防止隊列中加入更多數(shù)據(jù)效扫。調(diào)用此方法時,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù)直砂,但將在此方法完成時馬上關(guān)閉菌仁。如果q被垃圾收集,將自動調(diào)用此方法静暂。關(guān)閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號或異常济丘。例如,如果某個使用者正被阻塞在get()操作上洽蛀,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤摹迷。
q.cancel_join_thread() : 不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞郊供。
q.join_thread() : 連接隊列的后臺線程峡碉。此方法用于在調(diào)用q.close()方法后,等待所有隊列項被消耗驮审。默認情況下鲫寄,此方法由不是q的原始創(chuàng)建者的所有進程調(diào)用。調(diào)用q.cancel_join_thread()方法可以禁止這種行為疯淫。
生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題地来。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題熙掺。生產(chǎn)者和消費者彼此之間不直接通訊未斑,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理币绩,直接扔給阻塞隊列蜡秽,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取类浪,阻塞隊列就相當于一個緩沖區(qū)沃缘,平衡了生產(chǎn)者和消費者的處理能力嫌蚤。
為什么要使用生產(chǎn)者和消費者模式
在進程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進程,消費者就是消費數(shù)據(jù)的進程林螃。在多進程開發(fā)當中,如果生產(chǎn)者處理速度很快典蝌,而消費者處理速度很慢雀哨,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)眠蚂。同樣的道理煞聪,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者逝慧。為了解決這個問題于是引入了生產(chǎn)者和消費者模式昔脯。
生產(chǎn)著消費者模型代碼實現(xiàn)
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結(jié)束信號則結(jié)束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(2):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
p1.join()
q.put(None) #發(fā)送結(jié)束信號
print('主')
主進程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送結(jié)束信號None啄糙,但不一定一定由主進程發(fā),生產(chǎn)者進程在生產(chǎn)完畢后云稚,也可以往隊列中發(fā)一個結(jié)束信號隧饼,這樣消費者進程在接收到結(jié)束信號后就可以break出死循環(huán)。
但上述解決方式静陈,在有多個生產(chǎn)者和多個消費者時燕雁,我們則需要多次添加None,很low>ㄓ怠9崭瘛!
高級解決方式:JoinableQueue([maxsize])
創(chuàng)建可連接的共享進程隊列刑赶。這就像是一個Queue對象捏浊,但隊列允許項目的使用者通知生產(chǎn)者項目已經(jīng)被成功處理。通知進程是使用共享的信號和條件變量來實現(xiàn)的角撞。代碼如下:
from multiprocessing import Process, JoinableQueue
import time, random, os
def consumer(q):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
q.task_done() # 向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了
def producer(name, q):
for i in range(10):
time.sleep(random.randint(1, 3))
res = '%s%s' % (name, i)
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' % (os.getpid(), res))
q.join() # 生產(chǎn)完畢呛伴,使用此方法進行阻塞,直到隊列中所有項目均被處理谒所。
if __name__ == '__main__':
q = JoinableQueue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=('包子', q))
p2 = Process(target=producer, args=('骨頭', q))
p3 = Process(target=producer, args=('泔水', q))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.daemon = True
c2.daemon = True
# 開始
p_l = [p1, p2, p3, c1, c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')
主進程等待p1,p2,p3結(jié)束热康;p1,p2,p3等待c1,c2結(jié)束
p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊列的數(shù)據(jù)
因而c1,c2也沒有存在的價值了,不需要繼續(xù)阻塞在進程中影響主進程了。應(yīng)該隨著主進程的結(jié)束而結(jié)束,所以設(shè)置成守護進程就可以了劣领。
管道——multiprocessing.Pipe
Pipe([duplex]):在進程之間創(chuàng)建一條管道姐军,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象尖淘,強調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道
from multiprocessing import Process, Pipe
def f(conn):
conn.send("Hello The_Third_Wave")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
參數(shù)介紹:
Pipe(duplex=True)::默認管道是全雙工的奕锌,如果將duplex射成False,conn1只能用于接收村生,conn2只能用于發(fā)送惊暴。主要方法:
conn1.recv():接收conn2.send(obj)發(fā)送的對象。如果沒有消息可接收趁桃,recv方法會一直阻塞辽话。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會拋出EOFError卫病。
conn1.send(obj):通過連接發(fā)送對象油啤。obj是與序列化兼容的任意對象
conn1.close():關(guān)閉連接。如果conn1被垃圾回收蟀苛,將自動調(diào)用此方法
conn1.fileno():返回連接使用的整數(shù)文件描述符
conn1.poll(timeout=None):如果連接上的數(shù)據(jù)可用益咬,返回True。timeout=None帜平,操作將無限期地等待數(shù)據(jù)到達
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息幽告。maxlength指定要接收的最大字節(jié)數(shù)梅鹦。如果進入的消息,超過了這個最大值评腺,將引發(fā)IOError異常帘瞭,并且在連接上無法進行進一步讀取。如果連接的另外一端已經(jīng)關(guān)閉蒿讥,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常抛腕。
conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū)芋绸,buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量担敌,而size是要發(fā)送字節(jié)數(shù)摔敛。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進行接收
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息全封,并把它保存在buffer對象中马昙,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移刹悴。返回值是收到的字節(jié)數(shù)行楞。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常土匀。--------應(yīng)該特別注意管道端點的正確管理問題子房。如果是生產(chǎn)者或消費者中都沒有使用管道的某個端點,就應(yīng)將它關(guān)閉就轧。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端证杭,在消費者中關(guān)閉管道的輸入端。如果忘記執(zhí)行這些步驟妒御,程序可能在消費者中的recv()操作上掛起解愤。管道是由操作系統(tǒng)進行引用計數(shù)的,必須在所有進程中關(guān)閉管道后才能生成EOFError異常乎莉。因此,在生產(chǎn)者中關(guān)閉管道不會有任何效果梦鉴,除非消費者也關(guān)閉了相同的管道端點李茫。
pipe實現(xiàn)生產(chǎn)者消費者模型
多個消費者之間的競爭問題帶來的數(shù)據(jù)不安全問題
from multiprocessing import Process,Pipe,Lock
def consumer(p,name,lock):
produce, consume=p
produce.close()
while True:
lock.acquire()
baozi=consume.recv()
lock.release()
if baozi:
print('%s 收到包子:%s' %(name,baozi))
else:
consume.close()
break
def producer(p,n):
produce, consume=p
consume.close()
for i in range(n):
produce.send(i)
produce.send(None)
produce.send(None)
produce.close()
if __name__ == '__main__':
produce,consume=Pipe()
lock = Lock()
c1=Process(target=consumer,args=((produce,consume),'c1',lock))
c2=Process(target=consumer,args=((produce,consume),'c2',lock))
p1=Process(target=producer,args=((produce,consume),10))
c1.start()
c2.start()
p1.start()
produce.close()
consume.close()
c1.join()
c2.join()
p1.join()
print('主進程')
進程之間的數(shù)據(jù)共享
展望未來,基于消息傳遞的并發(fā)編程是大勢所趨
即便是使用線程肥橙,推薦做法也是將程序設(shè)計為大量獨立的線程集合魄宏,通過消息隊列交換數(shù)據(jù)。
這樣極大地減少了對使用鎖定和其他同步手段的需求存筏,還可以擴展到分布式系統(tǒng)中宠互。
但進程間應(yīng)該盡量避免通信味榛,即便需要通信,也應(yīng)該選擇進程安全的工具來避免加鎖帶來的問題予跌。
以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進程之間的數(shù)據(jù)共享問題搏色。
進程間數(shù)據(jù)是獨立的,可以借助于隊列或管道實現(xiàn)通信券册,二者都是基于消息傳遞的
也可以通過multiprocessing.Manager實現(xiàn)數(shù)據(jù)共享(數(shù)據(jù)不安全频轿,需要加鎖)。
from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會出現(xiàn)數(shù)據(jù)錯亂
d['count']-=1
if __name__ == '__main__':
lock=Lock()
with Manager() as m:
dic=m.dict({'count':100})
p_l=[]
for i in range(100):
p=Process(target=work,args=(dic,lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
4. 進程池和multiprocessing.Pool模塊
為什么要有進程池?進程池的概念
在程序?qū)嶋H處理問題過程中烁焙,忙時會有成千上萬的任務(wù)需要被執(zhí)行航邢,閑時可能只有零星任務(wù)。那么在成千上萬個任務(wù)需要被執(zhí)行的時候骄蝇,我們就需要去創(chuàng)建成千上萬個進程么膳殷?首先,創(chuàng)建進程需要消耗時間九火,銷毀進程也需要消耗時間赚窃。第二即便開啟了成千上萬的進程,操作系統(tǒng)也不能讓他們同時執(zhí)行岔激,這樣反而會影響程序的效率勒极。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進程。那么我們要怎么做呢鹦倚?
在這里河质,要給大家介紹一個進程池的概念,定義一個池子震叙,在里面放上固定數(shù)量的進程掀鹅,有需求來了,就拿一個池中的進程來處理任務(wù)媒楼,等到處理完畢乐尊,進程并不關(guān)閉,而是將進程再放回進程池中繼續(xù)等待任務(wù)划址。如果有很多任務(wù)需要執(zhí)行扔嵌,池中的進程數(shù)量不夠,任務(wù)就要等待之前的進程執(zhí)行任務(wù)完畢歸來夺颤,拿到空閑進程才能繼續(xù)執(zhí)行痢缎。也就是說,池中進程的數(shù)量是固定的世澜,那么同一時間最多有固定數(shù)量的進程在運行独旷。這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進程的時間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果嵌洼。
進程池的代碼實現(xiàn):異步提交:
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid())
time.sleep(random.random())
return n ** 2
if __name__ == '__main__':
# 進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)
p = Pool(3)
res_l = []
for i in range(10):
# 異步運行案疲,根據(jù)進程池中有的進程數(shù),每次最多3個子進程在異步執(zhí)行
# 返回結(jié)果之后麻养,將結(jié)果放入列表褐啡,歸還進程,之后再執(zhí)行新的任務(wù)
# 需要注意的是鳖昌,進程池中的三個進程不會同時開啟或者同時結(jié)束
# 而是執(zhí)行完一個就釋放一個進程备畦,這個進程就去接收新的任務(wù)。
res = p.apply_async(work, args=(i,))
res_l.append(res)
# 異步apply_async用法:如果使用異步提交的任務(wù)遗遵,主進程需要使用jion萍恕,
# 等待進程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果
# 否則车要,主進程結(jié)束,進程池可能還沒來得及執(zhí)行崭倘,也就跟著一起結(jié)束了
#調(diào)用join之前翼岁,先調(diào)用close函數(shù),否則會出錯司光。執(zhí)行完close后不會有
# 新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束
p.close()
p.join()
for res in res_l:
# 使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,
# 因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
print(res.get())
同步提交:
import os, time
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid())
time.sleep(3)
return n ** 2
if __name__ == '__main__':
# 進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務(wù)
p = Pool(3)
res_l = []
for i in range(10):
# 同步調(diào)用琅坡,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務(wù)是否存在阻塞残家,同步調(diào)用都會在原地等著
res = p.apply(work, args=(i,))
print(res_l)
multiprocessing.Pool(processes=None, initializer=None, initargs=())參數(shù)介紹:
1 . numprocess:要創(chuàng)建的進程數(shù)榆俺,如果省略,將默認使用cpu_count()的值
2 . initializer:是每個工作進程啟動時要執(zhí)行的可調(diào)用對象坞淮,默認為None
3 . initargs:是要傳給initializer的參數(shù)組
主要方法:
p.apply(func, args=(), kwds={}):在一個池工作進程中執(zhí)行func(args,kwargs),然后返回結(jié)果茴晋。'''需要強調(diào)的是:此操作并不會在所有池工作進程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù)回窘,必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''诺擅。
p.apply_async(func, args=(), kwds={}, callback=None):在一個池工作進程中執(zhí)行func(*args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實例啡直,callback是可調(diào)用對象烁涌,接收輸入?yún)?shù)。當func的結(jié)果變?yōu)榭捎脮r酒觅,將理解傳遞給callback撮执。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果舷丹。
p.close():關(guān)閉進程池抒钱,防止進一步操作。如果所有操作持續(xù)掛起,它們將在工作進程終止前完成
p.jion():等待所有工作進程退出继效。此方法只能在close()或teminate()之后調(diào)用方法apply_async()和map_async()的返回值是AsyncResul的實例obj症杏。實例具有以下方法:
obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達瑞信。timeout是可選的厉颤。如果在指定時間內(nèi)還沒有到達,將引發(fā)一場凡简。如果遠程操作中引發(fā)了異常逼友,它將在調(diào)用此方法時再次被引發(fā)。
obj.ready():如果調(diào)用完成秤涩,返回True
obj.successful():如果調(diào)用完成且沒有引發(fā)異常帜乞,返回True,如果在結(jié)果就緒之前調(diào)用此方法筐眷,引發(fā)異常
obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?br> obj.terminate():立即終止所有工作進程黎烈,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收匀谣,將自動調(diào)用此函數(shù)照棋。需要回調(diào)函數(shù)的場景:
進程池中任何一個任務(wù)一旦處理完了,就立即告知主進程:我好了額武翎,你可以處理我的結(jié)果了烈炭。主進程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時間(阻塞)的任務(wù)放到進程池中宝恶,然后指定回調(diào)函數(shù)(主進程負責執(zhí)行)符隙,這樣主進程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果垫毙。如果在主進程中等待進程池中所有任務(wù)都執(zhí)行完畢后霹疫,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)B毒谩8住!
進程池版socket并發(fā)聊天代碼示例:
//servers
# Pool內(nèi)的進程數(shù)默認是cpu核數(shù)毫痕,假設(shè)為4(查看方法os.cpu_count())
# 開啟6個客戶端征峦,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)
# 在每個進程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個消请,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn):
print('進程pid: %s' % os.getpid())
while True:
try:
msg = conn.recv(1024)
if not msg: break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p = Pool(4)
while True:
conn, *_ = server.accept()
p.apply_async(talk, args=(conn,))
# p.apply(talk,args=(conn,client_addr)) #同步的話栏笆,則同一時間只有一個客戶端能訪問
----------------------------------------------------------------------------------------------
// client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
更多關(guān)于進程的知識https://docs.python.org/dev/library/concurrent.futures.html