python 多進(jìn)程

基于官方文檔:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日樂購叠骑,剛才看到的一個(gè)博客空盼,寫的都不太對(duì)卑笨,還是基于官方的比較穩(wěn)妥
我就是喜歡抄官方的悔政,哈哈

  • Class Process

通常我們使用Process實(shí)例化一個(gè)進(jìn)程岗宣,并調(diào)用 他的 start() 方法啟動(dòng)它褐荷。
這種方法和 Thread 是一樣的计济。

  • 一個(gè)最賤單的例子:
from multiprocessing import Process
def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('傻逼',))
    p.start()
    p.join()
    print("運(yùn)行結(jié)束")
>> hello 傻逼
運(yùn)行結(jié)束

上圖中罩引,我寫了 p.join() 所以主進(jìn)程是 等待 子進(jìn)程執(zhí)行完后各吨,才執(zhí)行 print("運(yùn)行結(jié)束")
否則就是反過來了(這個(gè)不一定,看你的語句了蜒程,順序其實(shí)是隨機(jī)的)例如:

p = Process(target=f, args=('傻逼',))
    p.start()
    # time.sleep(1)
    print("運(yùn)行結(jié)束")

>> 運(yùn)行結(jié)束
hello 傻逼

主進(jìn)加個(gè) sleep

結(jié)果:
hello 傻逼 
運(yùn)行結(jié)束 (大約等了1秒后绅你,才輸出 運(yùn)行結(jié)束)

所以不加join() ,其實(shí)子進(jìn)程和主進(jìn)程是各干各的,誰也不等誰昭躺。都執(zhí)行完后忌锯,文件運(yùn)行就結(jié)束了

  • 查看子進(jìn)程和父進(jìn)程的 id
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #  獲取父進(jìn)程的 pid  (process id)
    print('process id:', os.getpid())


def f(name):
    info('function f') # use info
    print('hello', name)


if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

>> main line
module name: __main__
parent process: 14212
process id: 1940
function f
module name: __mp_main__
parent process: 1940
process id: 15020
hello bob

上面我們用了 os.getpid() 和 os.getppid() 獲取 當(dāng)前進(jìn)程,和父進(jìn)程的id
下面就講一下领炫,這兩個(gè)函數(shù)的用法:
os.getpid()
返回當(dāng)前進(jìn)程的id
os.getppid()
返回父進(jìn)程的id偶垮。 父進(jìn)程退出后,unix 返回初始化進(jìn)程(1)中的一個(gè)
windows返回相同的id (可能被其他進(jìn)程使用了)
這也就解釋了帝洪,為啥我上面 的程序運(yùn)行多次似舵, 第一次打印的parentid 都是 14212 了。
而子進(jìn)程的父級(jí) process id 是調(diào)用他的那個(gè)進(jìn)程的 id : 1940

  • 上下文和啟動(dòng)子進(jìn)程
    這個(gè)內(nèi)容太多需要葱峡,蠻蠻總結(jié)砚哗。wc。

視頻筆記:
多進(jìn)程:使用大致方法:

  • 多進(jìn)程通信

參考:進(jìn)程通信(pipe和queue)

  • queue 管道:隊(duì)列是線程和進(jìn)程安全的(不會(huì)競爭)
    q = mp.Queue()
    把q 傳給args, 然后在里面 q.put(你的東西)
    取值在外面直接 q.get() # 先進(jìn)先出
    普通的
    Process 函數(shù)用 return砰奕,我們也拿不到任何返回值 (智能通過 queue 或者共享內(nèi)存來交換數(shù)據(jù))

pool.map (函數(shù)可以有return 也可以共享內(nèi)存或queue) 結(jié)果直接是個(gè)列表

def b(x):
    # return 20
    print("child process")
if __name__ == "__main__":
    p  =Pool(processes=2)
    print(p.map(b, [1,2,3,4]))
>>
child process
child process
child process
child process
[None, None, None, None]

poll.apply_async() (同map,只不過是一個(gè)進(jìn)程蛛芥,返回結(jié)果用 xx.get() 獲得)

  • 共享內(nèi)存使用:
    multiprocessing.Value("type", value) # 單個(gè)值 value 智能是數(shù)字類型的,具體看 type 的設(shè)置 比如 i是 有符號(hào)整形
    multiprocessing.Array("type", [single level list])
  • Pipe 管道

Pipe() 函數(shù)返回一個(gè)由管道連接的連接對(duì)象

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

返回的兩個(gè)連接對(duì)象 Pipe() 表示管道的兩端军援。每個(gè)連接對(duì)象都有 send()recv() 方法

  • 資源競爭 枷鎖
    lock = mp.Lock()
    把 lock 傳給進(jìn)程的args 參數(shù)
    在進(jìn)程函數(shù)里面寫 lock.acquire() 枷鎖
    lock.release() 釋放鎖
練習(xí)過程中出現(xiàn)的問題:
from multiprocessing import Pool
pool = Pool()
def search(a):

    # while a<1000000:
    #     a+=1
    print("子進(jìn)程結(jié)束")
    return 1
if __name__ == "__main__":
    # pool = Pool()
    a = time.time()
    result = pool.map(search, [1,2,10])
    b = time.time()
    print(b-a)

報(bào)錯(cuò):

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

參考 :https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == "main": 下面初始化搞定仅淑。
結(jié)果:

子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
0.7355780601501465

這個(gè)肯定有解釋的


spawn.py 的解釋

測試多進(jìn)程計(jì)算效果:
進(jìn)程池運(yùn)行:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子進(jìn)程結(jié)束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, [1,2,10])
    b = time.time()
    print(b-a)

結(jié)果:

子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
16.176724195480347
紅框是程序運(yùn)行的狀態(tài)

普通計(jì)算:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子進(jìn)程結(jié)束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    # result = pool.map(c.search, [1,2,10])
    c.search(1)
    c.search(2)
    c.search(10)

    b = time.time()
    print(b-a)

我們同樣傳入 1 2 10 三個(gè)參數(shù)測試:

子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
24.153281211853027
紅框?yàn)閳?zhí)行計(jì)算的過程,看出來很平穩(wěn)

其實(shí)對(duì)比下來開始快了一半的胸哥;
我們把循環(huán)里的數(shù)字去掉一個(gè) 0涯竟;
單進(jìn)程:

子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
3.1451985836029053

多進(jìn)程:

子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
2.4246084690093994

兩次測試 單進(jìn)程/進(jìn)程池 分別為 0.669 和 0.772 幾乎成正比的。
問題 二:
視圖:
post 視圖里面

class xxviews():
    def  post(request):
        music = Music()
        result = music.run()
    return ...

Music 類:

pool = Pool()
class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
             pool.map(....)     

直接報(bào)錯(cuò):


好像是說 子進(jìn)程自己不可以在創(chuàng)建子進(jìn)程了

寫在 類里面也 在函數(shù)里用 self.pool 調(diào)用也不行空厌,也是相同的錯(cuò)誤庐船。

最后 把 pool = Pool 直接寫在 search 函數(shù)里面,奇跡出現(xiàn)了:


class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
              pool = Pool()      
              pool.map(....)       
              pass
成功運(yùn)行了

前臺(tái)也能顯示搜索的音樂結(jié)果了


總結(jié)一點(diǎn)嘲更,進(jìn)程這個(gè)東西醉鳖,最好 寫在 直接運(yùn)行的函數(shù)里面,而不是 一個(gè)函數(shù)跳來跳去哮内。因?yàn)樽詈罂赡?是在子進(jìn)程的子進(jìn)程運(yùn)行的,這是不許的,會(huì)報(bào)錯(cuò)北发。
還有一點(diǎn)纹因,多進(jìn)程運(yùn)行的函數(shù)對(duì)象,不能是 lambda 函數(shù)琳拨。也許lambda 虛擬瞭恰,在內(nèi)存?狱庇?

使用 pool.map 子進(jìn)程 函數(shù)報(bào)錯(cuò)惊畏,導(dǎo)致整個(gè) pool 掛了:
參考:https://blog.csdn.net/hedongho/article/details/79139606
主要你要,對(duì)函數(shù)內(nèi)部捕獲錯(cuò)誤密任,而不能讓異常拋出就可以了颜启。

關(guān)于map 傳多個(gè)函數(shù)參數(shù)
我一開始,就是正常思維浪讳,多個(gè)參數(shù)缰盏,搞個(gè)元祖,讓參數(shù)一一對(duì)應(yīng)不就行了:

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子進(jìn)程結(jié)束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, ((1,3),))

報(bào)錯(cuò):

   return self._map_async(func, iterable, mapstar, chunksize).get()
  File "E:\Program\python\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
TypeError: search() missing 1 required positional argument: 'b'

參考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 當(dāng)讓可以穿多個(gè)參數(shù)淹遵,map 卻不知道咋傳的口猜。
apply_async 和map 一樣,不知道咋傳的透揣。

最簡單的方法:
使用 starmap 而不是 map

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子進(jìn)程結(jié)束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.starmap(c.search, ((1,3),))
    b = time.time()
    print(b-a)

結(jié)果:
子進(jìn)程結(jié)束
1.8399453163146973
成功拿到結(jié)果了

關(guān)于map 和 starmap 不同的地方看源碼:

def mapstar(args):
    return list(map(*args))

def starmapstar(args):
    return list(itertools.starmap(args[0], args[1]))

def map(self, func, iterable, chunksize=None):
        '''
        Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned.
        '''
        return self._map_async(func, iterable, mapstar, chunksize).get()

def starmap(self, func, iterable, chunksize=None):
        '''
        Like `map()` method but the elements of the `iterable` are 
expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b).
        '''
        return self._map_async(func, iterable, starmapstar, chunksize).get()

關(guān)于apply_async() ,我沒找到多參數(shù)的方法济炎,大不了用 一個(gè)迭代的 starmap 實(shí)現(xiàn)。哈哈

關(guān)于 上面源碼里面有 itertools.starmap
itertools 用法參考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有個(gè)問題辐真,多進(jìn)程最好不要使用全部的 cpu , 因?yàn)檫@樣可能影響其他任務(wù)须尚,所以 在進(jìn)程池 添加 process 參數(shù) 指定,cpu 個(gè)數(shù):

Pool(processes=os.cpu_count()-1)

上面就是預(yù)留了 一個(gè)cpu 干其他事的

后面直接使用 Queue 遇到這個(gè)問題:

RuntimeError: Queue objects should only be shared between processes through inheritance

解決:
Manager().Queue() 代替 Queue()

from multiprocessing import Manager, Queue
queue = Manager().Queue()
#  queue =   Queue()

因?yàn)?queue.get() 是堵塞型的拆祈,所以可以提前判斷是不是 空的恨闪,以免堵塞進(jìn)程。比如下面這樣:
使用 queue.empty() 空為True

 current_search = "稍等" if music_current_search.empty() else 
music_current_search.get()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末放坏,一起剝皮案震驚了整個(gè)濱河市咙咽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌淤年,老刑警劉巖钧敞,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異麸粮,居然都是意外死亡溉苛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門弄诲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來愚战,“玉大人娇唯,你說我怎么就攤上這事〖帕幔” “怎么了塔插?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拓哟。 經(jīng)常有香客問我想许,道長,這世上最難降的妖魔是什么断序? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任流纹,我火速辦了婚禮,結(jié)果婚禮上违诗,老公的妹妹穿的比我還像新娘漱凝。我一直安慰自己,他們只是感情好较雕,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布碉哑。 她就那樣靜靜地躺著,像睡著了一般亮蒋。 火紅的嫁衣襯著肌膚如雪扣典。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天慎玖,我揣著相機(jī)與錄音贮尖,去河邊找鬼。 笑死趁怔,一個(gè)胖子當(dāng)著我的面吹牛湿硝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播润努,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼关斜,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了铺浇?” 一聲冷哼從身側(cè)響起痢畜,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鳍侣,沒想到半個(gè)月后丁稀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡倚聚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年线衫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惑折。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡授账,死狀恐怖枯跑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情矗积,我是刑警寧澤全肮,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站棘捣,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏休建。R本人自食惡果不足惜乍恐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望测砂。 院中可真熱鬧茵烈,春花似錦、人聲如沸砌些。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽存璃。三九已至仑荐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間纵东,已是汗流浹背粘招。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留偎球,地道東北人洒扎。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像衰絮,于是被迫代替她去往敵國和親袍冷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348