Python 多進程 multiprocessing.Pool類詳解

Python 多進程 multiprocessing.Pool類詳解

multiprocessing模塊

multiprocessing包是Python中的多進程管理包。它與 threading.Thread類似情竹,可以利用multiprocessing.Process對象來創(chuàng)建一個進程仿野。該進程可以允許放在Python程序內(nèi)部編寫的函數(shù)中魂务。該Process對象與Thread對象的用法相同凌受,擁有is_alive()裂明、join([timeout])偶芍、run()拌禾、start()取胎、terminate()等方法。屬性有:authkey湃窍、daemon(要通過start()設置)闻蛀、exitcode(進程在運行時為None、如果為–N您市,表示被信號N結(jié)束)觉痛、name、pid茵休。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類薪棒,用來同步進程,其用法也與threading包中的同名類一樣榕莺。multiprocessing的很大一部份與threading使用同一套API俐芯,只不過換到了多進程的情境。

這個模塊表示像線程一樣管理進程帽撑,這個是multiprocessing的核心泼各,它與threading很相似,對多核CPU的利用率會比threading好的多亏拉。

看一下Process類的構(gòu)造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數(shù)說明:

group:進程所屬組扣蜻。基本不用
target:表示調(diào)用對象及塘。
args:表示調(diào)用對象的位置參數(shù)元組莽使。
name:別名
kwargs:表示調(diào)用對象的字典。

創(chuàng)建進程的簡單實例:

#coding=utf-8
import multiprocessing

def do(n) :
  #獲取當前線程的名字
  name = multiprocessing.current_process().name
  print name,'starting'
  print "worker ", n
  return

if __name__ == '__main__' :
  numList = []
  for i in xrange(5) :
    p = multiprocessing.Process(target=do, args=(i,))
    numList.append(p)
    p.start()
    p.join()
    print "Process end."

執(zhí)行結(jié)果:

Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

創(chuàng)建子進程時笙僚,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù)芳肌,創(chuàng)建一個Process實例,并用其start()方法啟動肋层,這樣創(chuàng)建進程比fork()還要簡單亿笤。
join()方法表示等待子進程結(jié)束以后再繼續(xù)往下運行,通常用于進程間的同步栋猖。

注意:

在Windows上要想使用進程模塊净薛,就必須把有關(guān)進程的代碼寫在當前.py文件的if name == ‘main’ :語句的下面,才能正常使用Windows下的進程模塊蒲拉。Unix/Linux下則不需要肃拜。

Pool類

在使用Python進行系統(tǒng)管理時痴腌,特別是同時操作多個文件目錄或者遠程控制多臺主機,并行操作可以節(jié)約大量的時間燃领。如果操作的對象數(shù)目不大時士聪,還可以直接使用Process類動態(tài)的生成多個進程,十幾個還好猛蔽,但是如果上百個甚至更多剥悟,那手動去限制進程數(shù)量就顯得特別的繁瑣,此時進程池就派上用場了枢舶。
Pool類可以提供指定數(shù)量的進程供用戶調(diào)用懦胞,當有新的請求提交到Pool中時替久,如果池還沒有滿凉泄,就會創(chuàng)建一個新的進程來執(zhí)行請求。如果池滿蚯根,請求就會告知先等待后众,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行這些請求颅拦。
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法

apply()

函數(shù)原型:

apply(func[, args=()[, kwds={}]])

該函數(shù)用于傳遞不定參數(shù)蒂誉,主進程會被阻塞直到函數(shù)執(zhí)行結(jié)束(不建議使用,并且3.x以后不在出現(xiàn))距帅。

apply_async()

函數(shù)原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一樣右锨,但它是非阻塞且支持結(jié)果返回進行回調(diào)。

map()

函數(shù)原型:

map(func, iterable[, chunksize=None])

Pool類中的map方法碌秸,與內(nèi)置的map函數(shù)用法行為基本一致绍移,它會使進程阻塞直到返回結(jié)果。
注意讥电,雖然第二個參數(shù)是一個迭代器蹂窖,但在實際使用中,必須在整個隊列都就緒后恩敌,程序才會運行子進程瞬测。

close()

關(guān)閉進程池(pool),使其不在接受新的任務纠炮。

terminate()

結(jié)束工作進程月趟,不在處理未處理的任務。

join()

主進程阻塞等待子進程的退出恢口,join方法必須在close或terminate之后使用孝宗。

multiprocessing.Pool類的實例:

import time
from multiprocessing import Pool
def run(fn):
  #fn: 函數(shù)參數(shù)是數(shù)據(jù)列表的一個元素
  time.sleep(1)
  return fn*fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6]
  print 'shunxu:' #順序執(zhí)行(也就是串行執(zhí)行,單進程)
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print "順序執(zhí)行時間:", int(e1 - s)

  print 'concurrent:' #創(chuàng)建多個進程弧蝇,并行執(zhí)行
  pool = Pool(5)  #創(chuàng)建擁有5個進程數(shù)量的進程池
  #testFL:要處理的數(shù)據(jù)列表碳褒,run:處理testFL列表中數(shù)據(jù)的函數(shù)
  rl =pool.map(run, testFL)
  pool.close()#關(guān)閉進程池折砸,不再接受新的進程
  pool.join()#主進程阻塞等待子進程的退出
  e2 = time.time()
  print "并行執(zhí)行時間:", int(e2-e1)
  print rl

執(zhí)行結(jié)果:

shunxu:
順序執(zhí)行時間: 6
concurrent:
并行執(zhí)行時間: 2
[1, 4, 9, 16, 25, 36]

上例是一個創(chuàng)建多個進程并發(fā)處理與順序執(zhí)行處理同一數(shù)據(jù),所用時間的差別沙峻。從結(jié)果可以看出睦授,并發(fā)執(zhí)行的時間明顯比順序執(zhí)行要快很多,但是進程是要耗資源的摔寨,所以平時工作中去枷,進程數(shù)也不能開太大。
程序中的r1表示全部進程執(zhí)行結(jié)束后全局的返回結(jié)果集是复,run函數(shù)有返回值删顶,所以一個進程對應一個返回結(jié)果,這個結(jié)果存在一個列表中淑廊,也就是一個結(jié)果堆中逗余,實際上是用了隊列的原理,等待所有進程都執(zhí)行完畢季惩,就返回這個列表(列表的順序不定)录粱。
對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close()画拾,讓其不再接受新的Process了啥繁。

再看一個實例:

import time
from multiprocessing import Pool
def run(fn) :
  time.sleep(2)
  print fn
if __name__ == "__main__" :
  startTime = time.time()
  testFL = [1,2,3,4,5]
  pool = Pool(10)#可以同時跑10個進程
  pool.map(run,testFL)
  pool.close()
  pool.join()
  endTime = time.time()
  print "time :", endTime - startTime

執(zhí)行結(jié)果:

21

3
4
5
time : 2.51999998093

再次執(zhí)行結(jié)果如下:

1
34

2
5
time : 2.48600006104

結(jié)果中為什么還有空行和沒有折行的數(shù)據(jù)呢?其實這跟進程調(diào)度有關(guān)青抛,當有多個進程并行執(zhí)行時旗闽,每個進程得到的時間片時間不一樣,哪個進程接受哪個請求以及執(zhí)行完成時間都是不定的蜜另,所以會出現(xiàn)輸出亂序的情況适室。那為什么又會有沒這行和空行的情況呢?因為有可能在執(zhí)行第一個進程時蚕钦,剛要打印換行符時亭病,切換到另一個進程,這樣就極有可能兩個數(shù)字打印到同一行嘶居,并且再次切換回第一個進程時會打印一個換行符罪帖,所以就會出現(xiàn)空行的情況。

進程實戰(zhàn)實例

并行處理某個目錄下文件中的字符個數(shù)和行數(shù)邮屁,存入res.txt文件中整袁,
每個文件一行,格式為:filename:lineNumber,charNumber

import os
import time
from multiprocessing import Pool

def getFile(path) :
  #獲取目錄下的文件list
  fileList = []
  for root, dirs, files in list(os.walk(path)) :
    for i in files :
      if i.endswith('.txt') or i.endswith('.10w') :
        fileList.append(root + "\\" + i)
  return fileList

def operFile(filePath) :
  #統(tǒng)計每個文件中行數(shù)和字符數(shù)佑吝,并返回
  filePath = filePath
  fp = open(filePath)
  content = fp.readlines()
  fp.close()
  lines = len(content)
  alphaNum = 0
  for i in content :
    alphaNum += len(i.strip('\n'))
  return lines,alphaNum,filePath

def out(list1, writeFilePath) :
  #將統(tǒng)計結(jié)果寫入結(jié)果文件中
  fileLines = 0
  charNum = 0
  fp = open(writeFilePath,'a')
  for i in list1 :
    fp.write(i[2] + " 行數(shù):"+ str(i[0]) + " 字符數(shù):"+str(i[1]) + "\n")
    fileLines += i[0]
    charNum += i[1]
  fp.close()
  print fileLines, charNum

if __name__ == "__main__":
  #創(chuàng)建多個進程去統(tǒng)計目錄中所有文件的行數(shù)和字符數(shù)
  startTime = time.time()
  filePath = "C:\\wcx\\a"
  fileList = getFile(filePath)
  pool = Pool(5)
  resultList =pool.map(operFile, fileList)
  pool.close()
  pool.join()

  writeFilePath = "c:\\wcx\\res.txt"
  print resultList
  out(resultList, writeFilePath)
  endTime = time.time()
  print "used time is ", endTime - startTime

執(zhí)行結(jié)果:

1

耗時不到1秒坐昙,可見多進程并發(fā)執(zhí)行速度是很快的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末芋忿,一起剝皮案震驚了整個濱河市炸客,隨后出現(xiàn)的幾起案子疾棵,更是在濱河造成了極大的恐慌,老刑警劉巖痹仙,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件是尔,死亡現(xiàn)場離奇詭異,居然都是意外死亡开仰,警方通過查閱死者的電腦和手機拟枚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來众弓,“玉大人恩溅,你說我怎么就攤上這事∥酵蓿” “怎么了脚乡?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長傻粘。 經(jīng)常有香客問我每窖,道長帮掉,這世上最難降的妖魔是什么弦悉? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮蟆炊,結(jié)果婚禮上稽莉,老公的妹妹穿的比我還像新娘。我一直安慰自己涩搓,他們只是感情好污秆,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著昧甘,像睡著了一般良拼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上充边,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天庸推,我揣著相機與錄音,去河邊找鬼浇冰。 笑死贬媒,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的肘习。 我是一名探鬼主播际乘,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼漂佩!你這毒婦竟也來了脖含?” 一聲冷哼從身側(cè)響起罪塔,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎养葵,沒想到半個月后垢袱,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡港柜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年请契,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夏醉。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡爽锥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出畔柔,到底是詐尸還是另有隱情氯夷,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布靶擦,位于F島的核電站腮考,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏玄捕。R本人自食惡果不足惜踩蔚,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望枚粘。 院中可真熱鬧馅闽,春花似錦、人聲如沸馍迄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽攀圈。三九已至暴凑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間赘来,已是汗流浹背现喳。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留撕捍,地道東北人拿穴。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像忧风,于是被迫代替她去往敵國和親默色。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容