什么是進(jìn)程?
一個(gè)程序運(yùn)行起來(lái)后,代碼+用到的資源稱之為進(jìn)程,它是操作系統(tǒng)分配資源的基本單元
進(jìn)程額創(chuàng)建-multiprocessing
mutiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊,提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象,這個(gè)對(duì)象可以理解為是一個(gè)獨(dú)立的進(jìn)程,可以執(zhí)行另外的事情
from multiprocessing import Process
import time
def run_proc():
"""子進(jìn)程要執(zhí)行的代碼"""
while True:
print("2")
time.sleep(1)
if __name__ == '__main__':
p = Process(target=run_proc)
p.start()
while True:
print("1")
time.sleep(1)
Process參數(shù)如下:
Process(group,target,name,args,kwargs)
- targest:如果傳遞了函數(shù)的引用,可以任務(wù)這個(gè)子進(jìn)程就執(zhí)行這里代碼
- args:給target指定的函數(shù)傳遞的參數(shù),以元祖的方式傳遞
- kwargs:給target指定的函數(shù)傳遞命名參數(shù)
- name:給進(jìn)程設(shè)定一個(gè)名字,可以不設(shè)定
- group:指定進(jìn)程組,大多數(shù)情況下用不到
Process創(chuàng)建的實(shí)例對(duì)象的常用方法:
- strart():啟動(dòng)子進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程)
- is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
- join([timeout]):是否等待子進(jìn)程執(zhí)行結(jié)束,或等待多少秒
- terminate():不管任務(wù)是否完成,立即終止子進(jìn)程
Process創(chuàng)建的實(shí)例對(duì)象的常用屬性:
- name:當(dāng)前進(jìn)程的別名,默認(rèn)為Process-N,N為1開始遞增的整數(shù)
- pid:當(dāng)前進(jìn)程的pid(進(jìn)程號(hào))
進(jìn)程間通信-Queue
- Queue的使用可以使用multiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,Queue本身是一個(gè)消息列隊(duì)程序,以下小實(shí)例來(lái)演示一下Queue的工作原理
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個(gè)Queue對(duì)象卜朗,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True
#因?yàn)橄⒘嘘?duì)已滿下面的try都會(huì)拋出異常,第一個(gè)try會(huì)等待2秒后再拋出異常敢订,第二個(gè)Try會(huì)立刻拋出異常
try:
q.put("消息4",True,2)
except:
print("消息列隊(duì)已滿尤勋,現(xiàn)有消息數(shù)量:%s"%q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列隊(duì)已滿狂窑,現(xiàn)有消息數(shù)量:%s"%q.qsize())
#推薦的方式,先判斷消息列隊(duì)是否已滿舔株,再寫入
if not q.full():
q.put_nowait("消息4")
#讀取消息時(shí)首启,先判斷消息列隊(duì)是否為空房铭,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
- Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;
- Queue.empty():如果隊(duì)列為空,返回Ture,反之False:
- Queue.full():如果隊(duì)列滿了,返回True,反之False:
- Queue.get(block,timeout):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,默認(rèn)值為True:
進(jìn)程池Pool
- 當(dāng)需要?jiǎng)?chuàng)建的子進(jìn)程數(shù)量不多時(shí),可以直接利用multirocessing中的Process動(dòng)態(tài)生多個(gè)進(jìn)程,但如果是上百甚至上千個(gè)目標(biāo),手動(dòng)的去創(chuàng)建進(jìn)程的工作兩巨大,就可以用到mulitiprocessing模塊提供的Pool方法
- 初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)用之前的進(jìn)程來(lái)執(zhí)行新任務(wù)驻龟,請(qǐng)看下面的實(shí)例:
-- coding:utf-8 --
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開始執(zhí)行,進(jìn)程號(hào)為%d" % (msg,os.getpid()))
# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"執(zhí)行完畢,耗時(shí)%0.2f" % (t_stop-t_start))
po = Pool(3) # 定義一個(gè)進(jìn)程池缸匪,最大進(jìn)程數(shù)3
for i in range(0,10):
# Pool().apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元祖,))
# 每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)
po.apply_async(worker,(i,))
print("----start----")
po.close() # 關(guān)閉進(jìn)程池翁狐,關(guān)閉后po(進(jìn)程池)不再接收新任務(wù)
po.join() # 等待po(進(jìn)程池)中所有子進(jìn)程執(zhí)行完成,必須放在close語(yǔ)句之后
print("-----end-----")