Python并發(fā)編程——多進(jìn)程

摘要:Python多進(jìn)程

進(jìn)程是操作系統(tǒng)中具體的處理任務(wù)臣咖,米一個(gè)進(jìn)程都會有自己獨(dú)立的內(nèi)存空間,它是線程的載體豌鹤。當(dāng)一個(gè)程序啟動時(shí)亡哄,會默認(rèn)啟動一個(gè)進(jìn)程,將該進(jìn)程裝載到內(nèi)存布疙,同時(shí)在該進(jìn)程中還會默認(rèn)啟動一個(gè)線程蚊惯,來執(zhí)行本進(jìn)程中的內(nèi)容。


創(chuàng)建多進(jìn)程程序

在Python中進(jìn)程的相關(guān)實(shí)現(xiàn)被封裝在模塊multiprocessing中灵临,進(jìn)程的創(chuàng)建方法和線程一樣有兩種

  • 直接通過multiprocessing中的進(jìn)程類Process創(chuàng)建
  • 繼承multiprocessing的Process類截型,重寫run方法

類似于線程創(chuàng)建,對Process類實(shí)例化時(shí)直接將進(jìn)程的處理函數(shù)傳入即可儒溉,通過target來接收處理函數(shù)宦焦,創(chuàng)建好進(jìn)程后通過實(shí)例化對象的start方法將其啟動

(1)使用類實(shí)例化方法創(chuàng)建進(jìn)程

定義進(jìn)程處理函數(shù) run_proc 。在實(shí)例化 Process 時(shí) ,將 run_proc 傳入 顿涣。接著使用實(shí)例化對象
start 方法啟動進(jìn)程波闹,然后調(diào)用join代表該子進(jìn)程結(jié)束之后主進(jìn)程才能結(jié)束

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a", ))
    p2 = Process(target=run_proc, args=("b", ))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

輸出如下,課件主進(jìn)程和子進(jìn)程的進(jìn)程號pid都不一樣涛碑,在調(diào)用join之后主進(jìn)程阻塞直到所有子進(jìn)程結(jié)束

main process: 12611
a start, pid:12612
b start, pid:12613
a end, pid:12612
b end, pid:12613
main process end...
(2)使用繼承類的方式創(chuàng)建進(jìn)程

實(shí)現(xiàn)自定義進(jìn)程類使其繼承于系統(tǒng)進(jìn)程類 Process 精堕。重寫類中的 run 方法,

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


class MyProcess(Process):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        run_proc(self.sid)


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a",))
    p2 = Process(target=run_proc, args=("b",))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

輸出如下,和直接實(shí)例化Process類輸出效果一致

main process: 14859
a start, pid:14860
b start, pid:14861
a end, pid:14860
b end, pid:14861
main process end...

創(chuàng)建進(jìn)程池

當(dāng)項(xiàng)目達(dá)到一定的規(guī)模蒲障,頻繁創(chuàng)建/銷毀進(jìn)程或者線程是非常消耗資源的歹篓,這個(gè)時(shí)候就要編寫自己的線程池/進(jìn)程池,在Python只有兩個(gè)模塊實(shí)現(xiàn)進(jìn)程池揉阎,分別是concurrent.futuresmultiprocessing.Pool

(1)multiprocessing.Pool

傳遞給Pool一個(gè)參數(shù)設(shè)置進(jìn)程池內(nèi)的最大進(jìn)程數(shù)庄撮,Pool有多個(gè)方法,主要是apply毙籽,apply_async洞斯,mapmap_async坑赡,starmap 巡扇,starmap_async,區(qū)別如下

  • apply:單次同步執(zhí)行垮衷,每次執(zhí)行傳入一個(gè)執(zhí)行函數(shù)的參數(shù)厅翔,并且執(zhí)行完畢才能執(zhí)行下一個(gè)進(jìn)程,如果執(zhí)行函數(shù)有返回值返回最后一個(gè)執(zhí)行完進(jìn)程的值
  • apply_async:單次啟動一個(gè)任務(wù)搀突,傳入一個(gè)執(zhí)行函數(shù)的參數(shù)刀闷,異步執(zhí)行,啟動后不等這個(gè)進(jìn)程結(jié)束又開始執(zhí)行新任務(wù),如果執(zhí)行函數(shù)有返回值返回最后一個(gè)執(zhí)行完進(jìn)程的對象甸昏,需要調(diào)用get獲得結(jié)果值
  • map:執(zhí)行一個(gè)任務(wù)列表顽分,傳入執(zhí)行函數(shù)的參數(shù)一個(gè)參數(shù)列表,同步執(zhí)行施蜜,即主進(jìn)程阻塞直到任務(wù)列表中的任務(wù)全部執(zhí)行完畢卒蘸,如果執(zhí)行函數(shù)有返回值,返回一個(gè)結(jié)果列表翻默,順序和輸入列表的元素順序一致
  • map_async:執(zhí)行一個(gè)任務(wù)列表缸沃,不需要等待任務(wù)列表中的任務(wù)執(zhí)行完畢,主進(jìn)程可以繼續(xù)往下執(zhí)行修械,如果執(zhí)行函數(shù)有返回值趾牧,返回一個(gè)結(jié)果對象,需要調(diào)用get方法得到輸出值,順序和輸入列表的元素順序一致
  • starmap:和map執(zhí)行類似,區(qū)別是map對執(zhí)行函數(shù)只能傳入一個(gè)參數(shù)拾并,而startmap可以傳入多個(gè)參數(shù),如果執(zhí)行函數(shù)有返回值哄芜,返回一個(gè)結(jié)果列表,順序和輸入列表的元素順序一致
  • starmap_async:和map_async執(zhí)行類似柬唯,區(qū)別是map_async對執(zhí)行函數(shù)只能傳入一個(gè)參數(shù)认臊,而starmap_async可以傳入多個(gè)參數(shù),如果執(zhí)行函數(shù)有返回值权逗,返回一個(gè)結(jié)果對象美尸,需要調(diào)用get方法得到輸出值冤议,順序和輸入列表的元素順序一致

除此之外斟薇,在執(zhí)行完畢多進(jìn)程任務(wù)后,調(diào)用close關(guān)閉Process對象恕酸,釋放與之關(guān)聯(lián)的所有資源堪滨,調(diào)用join阻塞主進(jìn)程,直到調(diào)用join的子進(jìn)程執(zhí)行完畢再退出

下面用代碼測試說明

  1. apply
import time
import datetime
import os
from multiprocessing import Pool


def run_proc(sid):
    time.sleep(3)
    print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply(run_proc, i)
    pool.close()
    pool.join()

輸出如下蕊温,可見apply每次只輸入一個(gè)執(zhí)行函數(shù)的參數(shù)袱箱,即每次執(zhí)行一次,多個(gè)參數(shù)組成的任務(wù)列表只能一個(gè)接著一個(gè)執(zhí)行

a end, pid:25260, time:2021-05-16 18:15:08
b end, pid:25261, time:2021-05-16 18:15:11
c end, pid:25262, time:2021-05-16 18:15:14
d end, pid:25263, time:2021-05-16 18:15:17

2.apply_async
直接修改Pool的apply方法改為apply_assync

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply_async(run_proc, i)
    print("main process")
    pool.close()
    pool.join()

輸出如下义矛,可見達(dá)到了并行效果发笔,進(jìn)程之間執(zhí)行任務(wù)互補(bǔ)阻塞,主進(jìn)程也沒有被阻塞凉翻,main process被提前打印了出來

main process
a end, pid:25721, time:2021-05-16 18:17:33
b end, pid:25722, time:2021-05-16 18:17:33
c end, pid:25723, time:2021-05-16 18:17:33
d end, pid:25724, time:2021-05-16 18:17:33

3.map
修改Pool的方法為map

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

輸出如下了讨,可見map達(dá)到了并行的效果,但是主進(jìn)程被阻塞,即map執(zhí)行完任務(wù)列表后前计,主進(jìn)程才執(zhí)行

a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43

b end, pid:26940, time:2021-05-16 18:21:43
c end, pid:26941, time:2021-05-16 18:21:43
main process

4.map_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    res = pool.map_async(run_proc, jobs)
    print("main process...")
    pool.close()
    pool.join()
    print(res.get())

輸出如下胞谭,可見效果了map類似,區(qū)別是主進(jìn)程沒有被阻塞男杈,main process被提前打印了出來

main process...
c end, pid:21401, time:2021-05-16 21:41:54
b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54

d end, pid:21402, time:2021-05-16 21:41:54
['a', 'b', 'c', 'd']

5.starmap
將執(zhí)行函數(shù)的入?yún)⒏臑閮蓚€(gè)丈屹,Pool函數(shù)改為starmap,并且傳入的任務(wù)列表的每個(gè)參數(shù)也改為兩個(gè)

def run_proc(sid, other):
    time.sleep(3)
    print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid) + str(other)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.starmap(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

輸入如下伶棒,執(zhí)行函數(shù)正確的接收了兩個(gè)參數(shù)旺垒,主進(jìn)程阻塞

b end, other:r, pid:27642, time:2021-05-16 18:24:52
a end, other:b, pid:27641, time:2021-05-16 18:24:52
c end, other:d, pid:27643, time:2021-05-16 18:24:52
d end, other:a, pid:27644, time:2021-05-16 18:24:52
main process

看一個(gè)map是否支持兩個(gè)參數(shù)

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

直接報(bào)錯(cuò),執(zhí)行函數(shù)缺少參數(shù)

TypeError: run_proc() missing 1 required positional argument: 'other'

6.starmap_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    res = pool.starmap_async(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()
    print(type(res.get()))

輸出如下苞冯,可見和starmap相比袖牙,主進(jìn)程沒有被阻塞,其他一樣

main process
a end, other:b, pid:17123, time:2021-05-16 21:28:30
b end, other:r, pid:17124, time:2021-05-16 21:28:30
d end, other:a, pid:17126, time:2021-05-16 21:28:30
c end, other:d, pid:17125, time:2021-05-16 21:28:30
<class 'list'>

多線程和多進(jìn)程的區(qū)別

多進(jìn)程和多線程都可以用并行機(jī)制來提升系統(tǒng)的運(yùn)行效率舅锄,二者的區(qū)別在于運(yùn)行時(shí)所占的內(nèi)存分布不同

  • 多線程是共用一套內(nèi)存的代碼塊區(qū)間
  • 多進(jìn)程是各用一套獨(dú)立的內(nèi)存區(qū)間

在大型計(jì)算機(jī)集群系統(tǒng)中鞭达,都會將多進(jìn)程程序分布運(yùn)行在不同的計(jì)算機(jī)上協(xié)同工作,而每一臺機(jī)器上的進(jìn)程內(nèi)部皇忿,又會由多個(gè)線程來并行工作

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末畴蹭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鳍烁,更是在濱河造成了極大的恐慌叨襟,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件幔荒,死亡現(xiàn)場離奇詭異糊闽,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)爹梁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門右犹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人姚垃,你說我怎么就攤上這事念链。” “怎么了积糯?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵掂墓,是天一觀的道長。 經(jīng)常有香客問我看成,道長君编,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任川慌,我火速辦了婚禮吃嘿,結(jié)果婚禮上偿荷,老公的妹妹穿的比我還像新娘。我一直安慰自己唠椭,他們只是感情好跳纳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著贪嫂,像睡著了一般寺庄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上力崇,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天斗塘,我揣著相機(jī)與錄音,去河邊找鬼亮靴。 笑死馍盟,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的茧吊。 我是一名探鬼主播贞岭,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搓侄!你這毒婦竟也來了瞄桨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤讶踪,失蹤者是張志新(化名)和其女友劉穎芯侥,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乳讥,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡柱查,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了云石。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唉工。...
    茶點(diǎn)故事閱讀 39,992評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖留晚,靈堂內(nèi)的尸體忽然破棺而出酵紫,到底是詐尸還是另有隱情告嘲,我是刑警寧澤错维,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站橄唬,受9級特大地震影響赋焕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜仰楚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一隆判、第九天 我趴在偏房一處隱蔽的房頂上張望犬庇。 院中可真熱鬧,春花似錦侨嘀、人聲如沸臭挽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽欢峰。三九已至,卻和暖如春涨共,著一層夾襖步出監(jiān)牢的瞬間纽帖,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工举反, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留懊直,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓火鼻,卻偏偏與公主長得像室囊,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子魁索,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容