程序是以二進制形式存放在硬盤之上的,當啟動程序?qū)?shù)據(jù)(代碼)加載至內(nèi)存中就稱之為進程砸西。
進程是一組資源(包括:代碼,顯示器址儒,硬盤芹枷,網(wǎng)絡(luò)....)的統(tǒng)稱。線程也能實現(xiàn)多任務莲趣,但相對進程來說是輕量級的鸳慈。
實際上進程是一個資源分配的單位,實際上操縱數(shù)據(jù)的是線程喧伞,一個進程至少一個主線程走芋。代碼至上而下運行的時候?qū)嶋H上就是主線程在運行绩郎,當遇到一個Thread對象調(diào)用start方法時,就再開一個線程翁逞。當遇到一個Process對象調(diào)用start方法時就將當前進程的資源復制一份到內(nèi)存的一個區(qū)域中肋杖,以一個新的進程去運行,它擁有獨立的內(nèi)存單元挖函,這就叫子進程状植。子進程因為有自己的資源,所以不與父進程共享全局變量怨喘。
簡單的來說進程是資源分配的單位津畸,而線程是操作系統(tǒng)調(diào)度的單位。
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
nums = [11, 22]
def work1():
"""子進程要執(zhí)行的代碼"""
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
for i in range(3):
nums.append(i)
time.sleep(1)
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
def work2():
"""子進程要執(zhí)行的代碼"""
print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
if __name__ == '__main__':
p1 = Process(target=work1)
p1.start()
p1.join()
p2 = Process(target=work2)
p2.start()
結(jié)果:
in process1 pid=7956 ,nums=[11, 22]
in process1 pid=7956 ,nums=[11, 22, 0]
in process1 pid=7956 ,nums=[11, 22, 0, 1]
in process1 pid=7956 ,nums=[11, 22, 0, 1, 2]
in process2 pid=22332 ,nums=[11, 22]
Process語法結(jié)構(gòu)如下:
Process([group [, target [, name [, args [, kwargs]]]]])
target:如果傳遞了函數(shù)的引用必怜,可以任務這個子進程就執(zhí)行這里的代碼
args:給target指定的函數(shù)傳遞的參數(shù)肉拓,以元組的方式傳遞
kwargs:給target指定的函數(shù)傳遞命名參數(shù)
name:給進程設(shè)定一個名字,可以不設(shè)定
group:指定進程組梳庆,大多數(shù)情況下用不到
Process創(chuàng)建的實例對象的常用方法:
start():啟動子進程實例(創(chuàng)建子進程)
is_alive():判斷進程子進程是否還在活著
join([timeout]):是否等待子進程執(zhí)行結(jié)束暖途,或等待多少秒
terminate():不管任務是否完成,立即終止子進程
Process創(chuàng)建的實例對象的常用屬性:
name:當前進程的別名膏执,默認為Process-N丧肴,N為從1開始遞增的整數(shù)
pid:當前進程的pid(進程號)
既然進程間數(shù)據(jù)不共享那么進程間又是怎么通信的呢
操作系統(tǒng)提供了很多機制實現(xiàn)進程間通信,可以使用文件胧后,socket,以及內(nèi)存抱环。在內(nèi)存中劃分一塊區(qū)域壳快,一個進程寫另一個進程讀就可以完成進程間通信了。
Queue的使用
可以使用multiprocessing模塊的Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞镇草,Queue本身是一個消息列隊程序眶痰。
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進程執(zhí)行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進程執(zhí)行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父進程創(chuàng)建Queue,并傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw梯啤,寫入:
pw.start()
# 等待pw結(jié)束:
pw.join()
# 啟動子進程pr竖伯,讀取:
pr.start()
pr.join()
# pr進程里是死循環(huán),無法等待其結(jié)束因宇,只能強行終止:
print('')
print('所有數(shù)據(jù)都寫入并且讀完')
結(jié)果:
Put A to queue...
Put B to queue...
Put C to queue...
Get A from queue.
Get B from queue.
Get C from queue.
所有數(shù)據(jù)都寫入并且讀完
進程池
當需要創(chuàng)建的子進程數(shù)量不多時七婴,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,但如果是上百甚至上千個目標察滑,手動的去創(chuàng)建進程的工作量巨大打厘,此時就可以用到multiprocessing模塊提供的Pool方法。
初始化Pool時贺辰,可以指定一個最大進程數(shù)户盯,當有新的請求提交到Pool中時嵌施,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求莽鸭;但如果池中的進程數(shù)已經(jīng)達到指定的最大值吗伤,那么該請求就會等待,直到池中有進程結(jié)束硫眨,才會用之前的進程來執(zhí)行新的任務
任務數(shù)固定用Process創(chuàng)建子進程足淆,不固定則用Pool來創(chuàng)建進程池
通過進程池(Pool)創(chuàng)建的子進程,主進程并不會等子進程執(zhí)行完才退出捺球,這時候就需要調(diào)用po.join堵塞主進程直到子進程執(zhí)行完畢缸浦。
multiprocessing.Pool常用函數(shù)解析:
apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行,堵塞方式必須等待上一個進程退出才能執(zhí)行下一個進程),args為傳遞給func的參數(shù)列表袄简,kwds為傳遞給func的關(guān)鍵字參數(shù)列表霜威;
close():關(guān)閉Pool,使其不再接受新的任務卜高;
terminate():不管任務是否完成,立即終止南片;
join():主進程阻塞掺涛,等待子進程的退出, 必須在close或terminate之后使用疼进;
子進程如果發(fā)生異常并不會產(chǎn)生異常信息
注意:進程池之間想要通信不能使用multiprocessing模塊的Queue薪缆,而是使用multiprocessing.Manager()中的Queue(),否則會得到一條如下的錯誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
示例:
# -*- coding:utf-8 -*-
# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader從Queue獲取到消息:%s" % q.get(True))
def writer(q):
print("writer啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
for i in "itcast":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先讓上面的任務向Queue存入數(shù)據(jù)伞广,然后再讓下面的任務開始從中取數(shù)據(jù)
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
結(jié)果:
(21424) start
writer啟動(25556),父進程為(21424)
reader啟動(9808),父進程為(21424)
reader從Queue獲取到消息:i
reader從Queue獲取到消息:t
reader從Queue獲取到消息:c
reader從Queue獲取到消息:a
reader從Queue獲取到消息:s
reader從Queue獲取到消息:t
(21424) End