Python 多進程編程

多進程編程

multiprocessing

由于 GIL(全局解釋鎖) 的原因, 多線程并不能充分利用多核處理器, 如果是一個 CPU 計算型的任務, 應該使用多進程模塊 multiprocessing, 它的工作方式與線程庫不同, 但是兩種庫的接口相似。multiprocessing 給每個進程賦予了單獨的 Python 解釋器, 這樣就規(guī)避了 GIL 所帶來的問題施符。

import multiprocessing


def worker():
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

# 輸出:
Worker
Worker
Worker
Worker
Worker

目標函數(shù)也支持傳入?yún)?shù):

import multiprocessing


def worker(num):
    print(f'Worker: {num}')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, ))
        jobs.append(p)
        p.start()


# 輸出:
Worker: 0
Worker: 1
Worker: 3
Worker: 2
Worker: 4

守護進程

和多線程模塊一樣, 多進程也是可以設置守護進程的, 守護進程可以一直運行而不影響主程序的結(jié)束。

import multiprocessing
import time


def daemon():
    p = multiprocessing.current_process() # 獲取當前的進程
    print(f'Starting: {p.name} {p.pid}')
    time.sleep(2)
    print(f'Exiting: {p.name} {p.pid}')



def non_daemon():
    p = multiprocessing.current_process()
    print(f'Starting: {p.name} {p.pid}')
    print(f'Exiting: {p.name} {p.pid}')


if __name__ == '__main__':
    d = multiprocessing.Process(
        name = 'daemon',
        target = daemon,
        daemon=True
    )

    n = multiprocessing.Process(
        name = 'non-daemon',
        target = non_daemon,
    )

    d.start()
    time.sleep(1)
    n.start()

# 輸出:
Starting: daemon 7068
Starting: non-daemon 10116
Exiting: non-daemon 10116

通過上面代碼的輸出可以看到, 守護進程沒有完成程序就結(jié)束了埂奈。如果需要等待守護進程完成工作后再結(jié)束, 可以使用 join() 方法:

import multiprocessing
import time


def daemon():
    p = multiprocessing.current_process() # 獲取當前的進程
    print(f'Starting: {p.name} {p.pid}')
    time.sleep(2)
    print(f'Exiting: {p.name} {p.pid}')



def non_daemon():
    p = multiprocessing.current_process()
    print(f'Starting: {p.name} {p.pid}')
    print(f'Exiting: {p.name} {p.pid}')


if __name__ == '__main__':
    d = multiprocessing.Process(
        name = 'daemon',
        target = daemon,
        daemon=True
    )

    n = multiprocessing.Process(
        name = 'non-daemon',
        target = non_daemon,
    )

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

# 輸出:
Starting: daemon 2928
Starting: non-daemon 13376
Exiting: non-daemon 13376
Exiting: daemon 2928

join() 方法接收一個超時的參數(shù), 默認是 none, 表示會一直阻塞, 可以設置一個超時時間, 在這個時間內(nèi)沒有完成, 就會阻塞:

import multiprocessing
import time


def daemon():
    p = multiprocessing.current_process() # 獲取當前的進程
    print(f'Starting: {p.name} {p.pid}')
    time.sleep(2)
    print(f'Exiting: {p.name} {p.pid}')



def non_daemon():
    p = multiprocessing.current_process()
    print(f'Starting: {p.name} {p.pid}')
    print(f'Exiting: {p.name} {p.pid}')


if __name__ == '__main__':
    d = multiprocessing.Process(
        name = 'daemon',
        target = daemon,
        daemon=True
    )

    n = multiprocessing.Process(
        name = 'non-daemon',
        target = non_daemon,
    )

    d.start()
    time.sleep(1)
    n.start()

    d.join(1) # 傳入的是 1 秒, 小于守護進程里面設置的兩秒
    print('d.is_alive()', d.is_alive()) # 得到進程當前的狀態(tài)
    n.join()

# 輸出:
Starting: daemon 7272
Starting: non-daemon 5872
Exiting: non-daemon 5872
d.is_alive() True

進程池

任務的執(zhí)行周期決定了 CPU 核數(shù)和任務的分配算法, 使用多進程編程 Pool 是一個很靈活的保證效率的方法:

from functools import lru_cache
from multiprocessing import Pool


# lur_cache 裝飾器使用最近最少使用算法
# 會把最近使用的對象存儲起來, 并把最近最少使用的對象在緩存值達到預設值之前從內(nèi)存中移除
# 這里 maxsize 設置為 None, 表示永遠都不會達到這個預設值
# lru_cache 這個裝飾器適合把耗時的函數(shù)的執(zhí)行結(jié)果保存起來, 避免傳入相同的參數(shù)時重復計算
@lru_cache(maxsize=None)
def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)

if __name__ == '__main__':
    # 在 windows 下運行會造成 RuntimeError, 應該使用 __name__ == '__main__' 來保護程序的入口點
    pool = Pool(2)
    pool.map(fib, [35] * 2)

dummy

multiprocessing.dummy 這個子模塊雖然在多進程模塊的代碼中, 但是接口和多線程的接口基本是一樣的, 如果分不清一個任務是 CPU 密集型還是 I/O 密集型, 可以使用如下方法去試:

from multiprocessing import Pool
from multiprocessing.dummy import Pool

這種兼容的方式, 便于在多線程/多進程之間切換。

Queue(隊列)

多線程里面有 Queue 模塊實現(xiàn)隊列, 多進程的 multiprocessing 里面包含了 Queue 這個類, 它是線程和進程安全的定躏。下面是一個生產(chǎn)者/消費者的例子, 用到了兩個隊列, 一個隊列用于存儲完成的任務, 另外一個用于存儲任務完成后的結(jié)果:

import time
from multiprocessing import Process, JoinableQueue, Queue
from random import random


def double(n):
    return n * 2


def producer(in_queue):
    while 1:
        wait_time = random()
        time.sleep(wait_time)
        in_queue.put((double, wait_time))
        if wait_time > 0.9:
            in_queue.put(None)
            print('停止生產(chǎn)')
            break


def consumer(in_queue, out_queue):
    while 1:
        task = in_queue.get()
        if task is None:
            break
        func, arg = task
        result = func(arg)
        in_queue.task_done()
        out_queue.put(result)


if __name__ == '__main__':
    tasks_queue = JoinableQueue() # JoinableQueue 里面有 join() 方法和 task_done() 方法, 而 Queue 內(nèi)沒有, tasks_queue 可以用這兩個方法來標識任務是否完成以決定要不要阻塞下去
    results_queue = Queue()
    processes = []

    p = Process(target=producer, args=(tasks_queue, ))
    p.start()
    processes.append(p)

    p = Process(target=consumer, args=(tasks_queue, results_queue))
    p.start()
    processes.append(p)

    tasks_queue.join()

    for p in processes:
        p.join()

    while True:
        if results_queue.empty():
            break
        result = results_queue.get()
        print(f'Result: {result}')

# 輸出:
停止生產(chǎn)
Result: 1.3500119015795484
Result: 1.7651301976930043
Result: 1.6336519677702004
Result: 0.06429843269363
Result: 0.29352347406759494
Result: 1.0097954936153397
Result: 0.19863644698178606
Result: 0.9589181928209678
Result: 1.4618869426710388
Result: 1.6837862156424794
Result: 0.8653351112396082
Result: 1.5958573192798793
Result: 0.15849993035736087
Result: 1.3471427672620973
Result: 1.7492282062851205
Result: 0.27695109993667644
Result: 0.7201581558818728
Result: 1.9614106580291402

進程間共享狀態(tài)

multiprocessing 提供了在進程之間共享狀態(tài)的方案, 主要有兩種: 共享內(nèi)存服務器進程账磺。

共享內(nèi)存

共享內(nèi)存主要通過 Value 和 Array 來實現(xiàn), 在多個進程之間共享一份數(shù)據(jù), 常見的共享類型有下面這些:

>>> from multiprocessing.sharedctypes import typecode_to_type
>>> typecode_to_type
{'c': <class 'ctypes.c_char'>, # 左邊是縮寫, 右邊是全稱
'u': <class 'ctypes.c_wchar'>,
'b': <class 'ctypes.c_byte'>,
'B': <class 'ctypes.c_ubyte'>,
'h': <class 'ctypes.c_short'>,
'H': <class 'ctypes.c_ushort'>,
'i': <class 'ctypes.c_long'>,
'I': <class 'ctypes.c_ulong'>,
'l': <class 'ctypes.c_long'>,
'L': <class 'ctypes.c_ulong'>,
'f': <class 'ctypes.c_float'>,
'd': <class 'ctypes.c_double'>}

下面是一個共享內(nèi)存的實現(xiàn)示例:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_bool, c_double


class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]


def modify(n, b, s, arr, A):
    n.value **= 2
    b.value = True
    s.value = s.value.upper()
    arr[0] = 10
    for a in A:
        a.x **= 2
        a.y **= 2


if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7) # 這里使用的共享類型的縮寫
    b = Value(c_bool, False, lock=False)
    s = Array('c', b'hello world', lock=lock)
    arr = Array('i', range(5), lock=True)
    A = Array(Point, [(1.525, -6.25), (-5.75, 2.5)], lock=lock)

    p = Process(target=modify, args=(n, b, s, arr, A)) # 通過 modify() 把這些內(nèi)容的值更改
    p.start()
    p.join()

    print(n.value)
    print(b.value)
    print(s.value)
    print(arr[:])
    print([(a.x, a.y) for a in A])

# 輸出
49
True
b'HELLO WORLD'
[10, 1, 2, 3, 4]
[(2.3256249999999996, 39.0625), (33.0625, 6.25)]

服務器進程

一個 multiprocessing 的 Manager 對象會控制一個服務器的進程, 其他進程可以通過代理的方式來訪問這個服務器進程, 常見的共享方式有以下幾種:

  1. Namespace: 創(chuàng)建一個可分享的命名空間。

  2. Value/Array: 和共享內(nèi)存中共享 ctypes 對象的方式一樣痊远。

  3. dict/list: 創(chuàng)建一個可分享的 dict/list, 支持對應數(shù)據(jù)結(jié)構(gòu)的方法垮抗。

  4. Condition/Event/Lock/Queue/Semaphore: 創(chuàng)建一個可分享的對應同步原語的對象。

from multiprocessing import Manager, Process


def modify(ns, lproxy, dproxy):
    ns.a **= 2
    lproxy.extend(['b', 'c'])
    dproxy['b'] = 1


if __name__ == '__main__':
    manager = Manager()
    ns = manager.Namespace() # 創(chuàng)建一個命名空間
    ns.a = 1
    lproxy = manager.list()
    lproxy.append('a')
    dproxy = manager.dict()
    dproxy['b'] = 0

    p = Process(target=modify, args=(ns, lproxy, dproxy))
    p.start()
    print(f'PID: {p.pid}')
    p.join()

    print(ns.a)
    print(lproxy)
    print(dproxy)

# 輸出:
PID: 6556
1
['a', 'b', 'c']
{'b': 1}

分布式的進程間通信

使用 manager 可以實現(xiàn)一個簡單的分布式的不同服務器之間不同進程的通信, 也就是一個簡單的 client/server 模型:

# remote_server.py

from multiprocessing.managers import BaseManager


class RemoteManager(BaseManager):
    pass


if __name__ == '__main__':
    host = '127.0.0.1'  # 主機名
    port = 5000 # 端口
    authkey = b'secret' # 驗證 key

    shared_list = [] # 分享列表

    RemoteManager.register('get_list', callable=lambda: shared_list) # 遠程服務器支持 get_list 方法, 可以獲得 shared_list 的值
    mgr = RemoteManager(address=(host, port), authkey=authkey)
    server = mgr.get_server()
    server.serve_forever()
# client.py

from multiprocessing.managers import BaseManager


class RemoteManager(BaseManager):
    pass


if __name__ == '__main__':
    host = '127.0.0.1'
    port = 5000
    authkey = b'secret'

    RemoteManager.register('get_list')
    mgr = RemoteManager(address=(host, port), authkey=authkey)
    mgr.connect()

    l = mgr.get_list() # 拿到分享的 list
    print(l) # 一開始 l 為 空
    l.append(1) # 添加一個值 1
    print(mgr.get_list())
> python remote_server.py

> python client.py
[]
[1]
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末碧聪,一起剝皮案震驚了整個濱河市冒版,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逞姿,老刑警劉巖辞嗡,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異滞造,居然都是意外死亡续室,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門谒养,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挺狰,“玉大人,你說我怎么就攤上這事蝴光。” “怎么了达址?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵蔑祟,是天一觀的道長。 經(jīng)常有香客問我沉唠,道長疆虚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮径簿,結(jié)果婚禮上罢屈,老公的妹妹穿的比我還像新娘。我一直安慰自己篇亭,他們只是感情好缠捌,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著译蒂,像睡著了一般曼月。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上柔昼,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天哑芹,我揣著相機與錄音,去河邊找鬼捕透。 笑死聪姿,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的乙嘀。 我是一名探鬼主播末购,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼乒躺!你這毒婦竟也來了招盲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤嘉冒,失蹤者是張志新(化名)和其女友劉穎曹货,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體讳推,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡顶籽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了银觅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片礼饱。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖究驴,靈堂內(nèi)的尸體忽然破棺而出镊绪,到底是詐尸還是另有隱情,我是刑警寧澤洒忧,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布蝴韭,位于F島的核電站,受9級特大地震影響熙侍,放射性物質(zhì)發(fā)生泄漏榄鉴。R本人自食惡果不足惜履磨,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望庆尘。 院中可真熱鬧剃诅,春花似錦、人聲如沸驶忌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽位岔。三九已至如筛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抒抬,已是汗流浹背杨刨。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留擦剑,地道東北人妖胀。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像惠勒,于是被迫代替她去往敵國和親赚抡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時...
    歐辰_OSR閱讀 29,383評論 8 265
  • 引言 講到進程纠屋,不得不先說下linux的fork()函數(shù)涂臣,一個進程調(diào)用fork()函數(shù)后,系統(tǒng)先給新的進程分配資源...
    StormZhu閱讀 1,555評論 0 1
  • 又來到了一個老生常談的問題售担,應用層軟件開發(fā)的程序員要不要了解和深入學習操作系統(tǒng)呢赁遗? 今天就這個問題開始,來談談操...
    tangsl閱讀 4,124評論 0 23
  • 互聯(lián)網(wǎng)上面現(xiàn)在有很多很多的免費資源、免費微課哥攘,曾何幾時我的朋友圈經(jīng)常就是一句“報名參加”后面附上聽課鏈接剖煌,然后截圖...
    Jeudi閱讀 384評論 0 2
  • 她又遇見愛情了,在被傷的遍體鱗傷后逝淹。有時候我挺佩服她的勇氣耕姊,終究比我勇敢。我有時候覺得很奇怪栅葡,我們這樣的兩個人怎么...
    喻喻喻菇?jīng)?/span>閱讀 195評論 0 1