因為GIL(全局解釋器鎖)的限制(GIL是用來保證在任意時刻只能有一個控制線程在執(zhí)行)球凰,所以python中的多線程并非真正的多線程。只有python程序是I/O密集型應(yīng)用時,多線程才會對運行效率有顯著提高(因在等待I/O的時抚吠,會釋放GIL允許其他線程繼續(xù)執(zhí)行),而在計算密集型應(yīng)用中弟胀,多線程并沒有什么用處楷力。考慮到要充分利用多核CPU的資源邮利,允許python可以并行處理一些任務(wù)弥雹,這里就用到了python多進(jìn)程編程了垃帅。multiprocessing是python中的多進(jìn)程模塊延届,使用這個模塊可以方便地進(jìn)行多進(jìn)程應(yīng)用程序開發(fā)。multiprocessing模塊中提供了:Process贸诚、Pool方庭、Queue、Manager等組件酱固。
1 Process類
1.1 構(gòu)造方法
def __init__(self, group=None, target=None, name=None, args=(), kwargs={})
group:進(jìn)程所屬組械念,基本不用
target:進(jìn)程調(diào)用對象(可以是一個函數(shù)名,也可以是一個可調(diào)用的對象(實現(xiàn)了__call__方法的類))
args:調(diào)用對象的位置參數(shù)元組
name:別名
kwargs:調(diào)用對象的關(guān)鍵字參數(shù)字典
1.2 實例方法
is_alive():返回進(jìn)程是否在運行
start():啟動進(jìn)程运悲,等待CPU調(diào)度
join([timeout]):阻塞當(dāng)前上下文環(huán)境龄减,直到調(diào)用此方法的進(jìn)程終止或者到達(dá)指定timeout
terminate():不管任務(wù)是否完成,立即停止該進(jìn)程
run():start()調(diào)用該方法班眯,當(dāng)實例進(jìn)程沒有傳入target參數(shù)希停,stat()將執(zhí)行默認(rèn)的run()方法
1.3 屬性
authkey:
daemon:守護(hù)進(jìn)程標(biāo)識,在start()調(diào)用之前可以對其進(jìn)行修改
exitcode:進(jìn)程的退出狀態(tài)碼
name:進(jìn)程名
pid:進(jìn)程id
1.4 實例
實例一:傳入的target為一個函數(shù)
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Process
def foo(i):
print time.ctime(), "process the %d begin ......" %i
time.sleep(random.uniform(1,3))
print time.ctime(), "process the %d end !!!!!!" %i
if __name__ == "__main__":
print time.ctime(), "process begin......"
p_lst = list()
for i in range(4):
p_lst.append(Process(target=foo, args=(i,))) #創(chuàng)建4個子進(jìn)程
#啟動子進(jìn)程
for p in p_lst:
p.start()
#等待子進(jìn)程全部結(jié)束
for p in p_lst:
p.join()
print time.ctime(), "process end!!!!!"
實例二:傳入的target為一個可調(diào)用對象
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Process
class Foo(object):
def __init__(self, i):
self.i = i
def __call__(self):
'''
使Foo的實例對象成為可調(diào)用對象
'''
print time.ctime(), "process the %d begin ......" %self.i
time.sleep(random.uniform(1,3))
print time.ctime(), "process the %d end !!!!!!" %self.i
if __name__ == "__main__":
print time.ctime(), "process begin......"
p_lst = list()
for i in range(4):
p_lst.append(Process(target=Foo(i))) #創(chuàng)建4個子進(jìn)程
#啟動子進(jìn)程
for p in p_lst:
p.start()
#等待子進(jìn)程全部結(jié)束
for p in p_lst:
p.join()
print time.ctime(), "process end!!!!!"
實例三:派生Process子類署隘,并創(chuàng)建子類的實例
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, i):
Process.__init__(self)
self.i = i
def run(self):
'''
#重寫run方法宠能,當(dāng)調(diào)用start方法時,就會調(diào)用當(dāng)前重寫的run方法中的程序
'''
print time.ctime(), "process the %d begin ......" %self.i
time.sleep(random.uniform(1,3))
print time.ctime(), "process the %d end !!!!!!" %self.i
if __name__ == "__main__":
print time.ctime(), "process begin......"
p_lst = list()
for i in range(4):
p_lst.append(MyProcess(i)) #創(chuàng)建4個子進(jìn)程
#啟動子進(jìn)程
for p in p_lst:
p.start()
#等待子進(jìn)程全部結(jié)束
for p in p_lst:
p.join()
print time.ctime(), "process end!!!!!"
2 Pool類
當(dāng)使用Process類管理非常多(幾十上百個)的進(jìn)程時磁餐,就會顯得比較繁瑣违崇,這是就可以使用Pool(進(jìn)程池)來對進(jìn)程進(jìn)行統(tǒng)一管理。當(dāng)池中進(jìn)程已滿時诊霹,有新進(jìn)程請求執(zhí)行時羞延,就會被阻塞,直到池中有進(jìn)程執(zhí)行結(jié)束脾还,新的進(jìn)程請求才會被放入池中并執(zhí)行肴楷。
2.1 構(gòu)造方法
def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None)
processes:池中可容納的工作進(jìn)程數(shù)量,默認(rèn)情況使用os.cpu_count()返回的數(shù)值荠呐,一般默認(rèn)即可
其他參數(shù)暫不清楚有什么用處......
2.2 實例方法
apply(self, func, args=(), kwds={}):阻塞型進(jìn)程池赛蔫,會阻塞主進(jìn)程砂客,直到工作進(jìn)程全部退出,一般不用這個
apply_async(self, func, args=(), kwds={}, callback=None):非阻塞型進(jìn)程池
map(self, func, iterable, chunksize=None):與內(nèi)置map行為一致呵恢,它會阻塞主進(jìn)程鞠值,直到map運行結(jié)束
map_async(self, func, iterable, chunksize=None, callback=None):非阻塞版本的map
close():關(guān)閉進(jìn)程池,不在接受新任務(wù)
terminate():結(jié)束工作進(jìn)程
join():阻塞主進(jìn)程等待子進(jìn)程退出渗钉,該方法必須在close或terminate之后執(zhí)行
2.3 實例
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Pool
def foo(i):
print time.ctime(), "process the %d begin ......" %i
time.sleep(random.uniform(1,3))
print time.ctime(), "process the %d end !!!!!!" %i
if __name__ == "__main__":
print time.ctime(), "process begin......"
pool = Pool(processes = 2) #設(shè)置進(jìn)程池中最大并行工作進(jìn)程數(shù)為2
for i in range(4):
pool.apply_async(foo, args=(i,)) #提交4個子進(jìn)程任務(wù)
pool.close()
pool.join()
print time.ctime(), "process end!!!!!"
結(jié)果:
Fri Nov 18 13:57:22 2016 process begin......
Fri Nov 18 13:57:22 2016 process the 0 begin ......
Fri Nov 18 13:57:22 2016 process the 1 begin ......
Fri Nov 18 13:57:23 2016 process the 1 end !!!!!!
Fri Nov 18 13:57:23 2016 process the 2 begin ......
Fri Nov 18 13:57:24 2016 process the 0 end !!!!!!
Fri Nov 18 13:57:24 2016 process the 3 begin ......
Fri Nov 18 13:57:25 2016 process the 2 end !!!!!!
Fri Nov 18 13:57:25 2016 process the 3 end !!!!!!
Fri Nov 18 13:57:25 2016 process end!!!!!
3 Queue類
Queue主要提供進(jìn)程間通信以及共享數(shù)據(jù)等功能彤恶。除Queue外還可以使用Pipes實現(xiàn)進(jìn)程間通信(Pipes是兩個進(jìn)程間進(jìn)行通信)
3.1 構(gòu)造方法
def __init__(self, maxsize=0)
maxsize:用于設(shè)置隊列最大長度,當(dāng)為maxsize<=0時鳄橘,隊列的最大長度會被設(shè)置為一個非常大的值(我的系統(tǒng)中隊列最大長度被設(shè)置為2147483647)
3.2 實例方法
put(self, obj, block=True, timeout=None)
1声离、block為True,若隊列已滿瘫怜,并且timeout為正值术徊,該方法會阻塞timeout指定的時間,直到隊列中有出現(xiàn)剩余空間鲸湃,如果超時赠涮,會拋出Queue.Full異常
2、block為False暗挑,若隊列已滿笋除,立即拋出Queue.Full異常
get(self, block=True, timeout=None)
block為True,若隊列為空炸裆,并且timeout為正值垃它,該方法會阻塞timeout指定的時間,直到隊列中有出現(xiàn)新的數(shù)據(jù)烹看,如果超時国拇,會拋出Queue.Empty異常
block為False,若隊列為空听系,立即拋出Queue.Empty異常
3.3 實例
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Process, Queue
def write(q):
for value in "abcd":
print time.ctime(), "put %s to queue" %value
q.put(value)
time.sleep(random.random())
def read(q):
while True:
value = q.get()
print time.ctime(), "get %s from queue" %value
if __name__ == "__main__":
#主進(jìn)程創(chuàng)建Queue贝奇,并作為參數(shù)傳遞給子進(jìn)程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#啟動子進(jìn)程pw,往Queue中寫入
pw.start()
#啟動子進(jìn)程pr靠胜,從Queue中讀取
pr.start()
#等待寫進(jìn)程執(zhí)行結(jié)束
pw.join()
#終止讀取進(jìn)程
pr.terminate()
運行結(jié)果:
Fri Nov 18 15:04:13 2016 put a to queue
Fri Nov 18 15:04:13 2016 get a from queue
Fri Nov 18 15:04:13 2016 put b to queue
Fri Nov 18 15:04:13 2016 get b from queue
Fri Nov 18 15:04:13 2016 put c to queue
Fri Nov 18 15:04:13 2016 get c from queue
Fri Nov 18 15:04:13 2016 put d to queue
Fri Nov 18 15:04:13 2016 get d from queue
4 Manager類
Manager是進(jìn)程間數(shù)據(jù)共享的高級接口掉瞳。
Manager()返回的manager對象控制了一個server進(jìn)程,此進(jìn)程包含的python對象可以被其他的進(jìn)程通過proxies來訪問浪漠。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全陕习。Manager支持的類型有l(wèi)ist, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array。
如下是使用Manager管理一個用于多進(jìn)程共享的dict數(shù)據(jù)
#!/usr/bin/python
#coding=utf-8
import time
import random
from multiprocessing import Manager, Pool
def worker(d, key, value):
print time.ctime(), "insert the k-v pair to dict begin: {%d: %d}" %(key, value)
time.sleep(random.uniform(1,2))
d[key] = value #訪問共享數(shù)據(jù)
print time.ctime(), "insert the k-v pair to dict end: {%d: %d}" %(key, value)
if __name__ == "__main__":
print time.ctime(), "process for manager begin"
mgr = Manager()
d = mgr.dict()
pool = Pool(processes=4)
for i in range(10):
pool.apply_async(worker, args=(d, i, i*i))
pool.close()
pool.join()
print "Result:"
print d
print time.ctime(), "process for manager end"
運行結(jié)果
Fri Nov 18 16:36:19 2016 process for manager begin
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {0: 0}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {1: 1}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {2: 4}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {3: 9}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict end: {3: 9}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict begin: {4: 16}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict end: {0: 0}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict begin: {5: 25}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {2: 4}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {6: 36}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {1: 1}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {7: 49}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {5: 25}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {8: 64}
Fri Nov 18 16:36:22 2016 insert the k-v pair to dict end: {4: 16}
Fri Nov 18 16:36:22 2016 insert the k-v pair to dict begin: {9: 81}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {8: 64}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {6: 36}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {7: 49}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {9: 81}
Result:
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
Fri Nov 18 16:36:23 2016 process for manager end