基于官方文檔:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日樂購叠骑,剛才看到的一個(gè)博客空盼,寫的都不太對(duì)卑笨,還是基于官方的比較穩(wěn)妥
我就是喜歡抄官方的悔政,哈哈
-
Class Process
通常我們使用Process實(shí)例化一個(gè)進(jìn)程岗宣,并調(diào)用 他的 start() 方法啟動(dòng)它褐荷。
這種方法和 Thread 是一樣的计济。
- 一個(gè)最賤單的例子:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('傻逼',))
p.start()
p.join()
print("運(yùn)行結(jié)束")
>> hello 傻逼
運(yùn)行結(jié)束
上圖中罩引,我寫了 p.join() 所以主進(jìn)程是 等待 子進(jìn)程執(zhí)行完后各吨,才執(zhí)行 print("運(yùn)行結(jié)束")
否則就是反過來了(這個(gè)不一定,看你的語句了蜒程,順序其實(shí)是隨機(jī)的)例如:
p = Process(target=f, args=('傻逼',))
p.start()
# time.sleep(1)
print("運(yùn)行結(jié)束")
>> 運(yùn)行結(jié)束
hello 傻逼
主進(jìn)加個(gè) sleep
結(jié)果:
hello 傻逼
運(yùn)行結(jié)束 (大約等了1秒后绅你,才輸出 運(yùn)行結(jié)束)
所以不加join() ,其實(shí)子進(jìn)程和主進(jìn)程是各干各的,誰也不等誰昭躺。都執(zhí)行完后忌锯,文件運(yùn)行就結(jié)束了
- 查看子進(jìn)程和父進(jìn)程的 id
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # 獲取父進(jìn)程的 pid (process id)
print('process id:', os.getpid())
def f(name):
info('function f') # use info
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
>> main line
module name: __main__
parent process: 14212
process id: 1940
function f
module name: __mp_main__
parent process: 1940
process id: 15020
hello bob
上面我們用了 os.getpid() 和 os.getppid() 獲取 當(dāng)前進(jìn)程,和父進(jìn)程的id
下面就講一下领炫,這兩個(gè)函數(shù)的用法:
os.getpid()
返回當(dāng)前進(jìn)程的id
os.getppid()
返回父進(jìn)程的id偶垮。 父進(jìn)程退出后,unix 返回初始化進(jìn)程(1)中的一個(gè)
windows返回相同的id (可能被其他進(jìn)程使用了)
這也就解釋了帝洪,為啥我上面 的程序運(yùn)行多次似舵, 第一次打印的parentid 都是 14212 了。
而子進(jìn)程的父級(jí) process id 是調(diào)用他的那個(gè)進(jìn)程的 id : 1940
-
上下文和啟動(dòng)子進(jìn)程
這個(gè)內(nèi)容太多需要葱峡,蠻蠻總結(jié)砚哗。wc。
視頻筆記:
多進(jìn)程:使用大致方法:
-
多進(jìn)程通信
- queue 管道:隊(duì)列是線程和進(jìn)程安全的(不會(huì)競爭)
q = mp.Queue()
把q 傳給args, 然后在里面 q.put(你的東西)
取值在外面直接 q.get() # 先進(jìn)先出
普通的
Process 函數(shù)用 return砰奕,我們也拿不到任何返回值 (智能通過 queue 或者共享內(nèi)存來交換數(shù)據(jù))
pool.map (函數(shù)可以有return 也可以共享內(nèi)存或queue) 結(jié)果直接是個(gè)列表
def b(x):
# return 20
print("child process")
if __name__ == "__main__":
p =Pool(processes=2)
print(p.map(b, [1,2,3,4]))
>>
child process
child process
child process
child process
[None, None, None, None]
poll.apply_async() (同map,只不過是一個(gè)進(jìn)程蛛芥,返回結(jié)果用 xx.get() 獲得)
- 共享內(nèi)存使用:
multiprocessing.Value("type", value) # 單個(gè)值 value 智能是數(shù)字類型的,具體看 type 的設(shè)置 比如 i是 有符號(hào)整形
multiprocessing.Array("type", [single level list])
- Pipe 管道
Pipe()
函數(shù)返回一個(gè)由管道連接的連接對(duì)象
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
返回的兩個(gè)連接對(duì)象
Pipe()
表示管道的兩端军援。每個(gè)連接對(duì)象都有send()
和recv()
方法
- 資源競爭 枷鎖
lock = mp.Lock()
把 lock 傳給進(jìn)程的args 參數(shù)
在進(jìn)程函數(shù)里面寫 lock.acquire() 枷鎖
lock.release() 釋放鎖
練習(xí)過程中出現(xiàn)的問題:
from multiprocessing import Pool
pool = Pool()
def search(a):
# while a<1000000:
# a+=1
print("子進(jìn)程結(jié)束")
return 1
if __name__ == "__main__":
# pool = Pool()
a = time.time()
result = pool.map(search, [1,2,10])
b = time.time()
print(b-a)
報(bào)錯(cuò):
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
參考 :https://blog.csdn.net/xiemanR/article/details/71700531
把 pool = Pool() 放到 if name == "main": 下面初始化搞定仅淑。
結(jié)果:
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
0.7355780601501465
這個(gè)肯定有解釋的
測試多進(jìn)程計(jì)算效果:
進(jìn)程池運(yùn)行:
class aa():
def search(self, a):
while a<100000000:
a+=1
print("子進(jìn)程結(jié)束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.map(c.search, [1,2,10])
b = time.time()
print(b-a)
結(jié)果:
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
16.176724195480347
普通計(jì)算:
class aa():
def search(self, a):
while a<100000000:
a+=1
print("子進(jìn)程結(jié)束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
# result = pool.map(c.search, [1,2,10])
c.search(1)
c.search(2)
c.search(10)
b = time.time()
print(b-a)
我們同樣傳入 1 2 10 三個(gè)參數(shù)測試:
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
24.153281211853027
其實(shí)對(duì)比下來開始快了一半的胸哥;
我們把循環(huán)里的數(shù)字去掉一個(gè) 0涯竟;
單進(jìn)程:
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
3.1451985836029053
多進(jìn)程:
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
子進(jìn)程結(jié)束
2.4246084690093994
兩次測試 單進(jìn)程/進(jìn)程池 分別為 0.669 和 0.772 幾乎成正比的。
問題 二:
視圖:
post 視圖里面
class xxviews():
def post(request):
music = Music()
result = music.run()
return ...
Music 類:
pool = Pool()
class Music():
def __init__(self):
pass
def run(self):
result = self.search(...)
pass
def search(self, keyword, target_src):
pool.map(....)
直接報(bào)錯(cuò):
寫在 類里面也 在函數(shù)里用 self.pool 調(diào)用也不行空厌,也是相同的錯(cuò)誤庐船。
最后 把 pool = Pool 直接寫在 search 函數(shù)里面,奇跡出現(xiàn)了:
class Music():
def __init__(self):
pass
def run(self):
result = self.search(...)
pass
def search(self, keyword, target_src):
pool = Pool()
pool.map(....)
pass
前臺(tái)也能顯示搜索的音樂結(jié)果了
總結(jié)一點(diǎn)嘲更,進(jìn)程這個(gè)東西醉鳖,最好 寫在 直接運(yùn)行的函數(shù)里面,而不是 一個(gè)函數(shù)跳來跳去哮内。因?yàn)樽詈罂赡?是在子進(jìn)程的子進(jìn)程運(yùn)行的,這是不許的,會(huì)報(bào)錯(cuò)北发。
還有一點(diǎn)纹因,多進(jìn)程運(yùn)行的函數(shù)對(duì)象,不能是 lambda 函數(shù)琳拨。也許lambda 虛擬瞭恰,在內(nèi)存?狱庇?
使用 pool.map 子進(jìn)程 函數(shù)報(bào)錯(cuò)惊畏,導(dǎo)致整個(gè) pool 掛了:
參考:https://blog.csdn.net/hedongho/article/details/79139606
主要你要,對(duì)函數(shù)內(nèi)部捕獲錯(cuò)誤密任,而不能讓異常拋出就可以了颜启。
關(guān)于map 傳多個(gè)函數(shù)參數(shù)
我一開始,就是正常思維浪讳,多個(gè)參數(shù)缰盏,搞個(gè)元祖,讓參數(shù)一一對(duì)應(yīng)不就行了:
class aa():
def search(self, a,b):
while a<10000000:
a+=1
print("子進(jìn)程結(jié)束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.map(c.search, ((1,3),))
報(bào)錯(cuò):
return self._map_async(func, iterable, mapstar, chunksize).get()
File "E:\Program\python\lib\multiprocessing\pool.py", line 657, in get
raise self._value
TypeError: search() missing 1 required positional argument: 'b'
參考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 當(dāng)讓可以穿多個(gè)參數(shù)淹遵,map 卻不知道咋傳的口猜。
apply_async 和map 一樣,不知道咋傳的透揣。
最簡單的方法:
使用 starmap 而不是 map
class aa():
def search(self, a,b):
while a<10000000:
a+=1
print("子進(jìn)程結(jié)束")
return 1
if __name__ == "__main__":
pool = Pool()
c = aa()
a = time.time()
result = pool.starmap(c.search, ((1,3),))
b = time.time()
print(b-a)
結(jié)果:
子進(jìn)程結(jié)束
1.8399453163146973
成功拿到結(jié)果了
關(guān)于map 和 starmap 不同的地方看源碼:
def mapstar(args):
return list(map(*args))
def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
def starmap(self, func, iterable, chunksize=None):
'''
Like `map()` method but the elements of the `iterable` are
expected to
be iterables as well and will be unpacked as arguments. Hence
`func` and (a, b) becomes func(a, b).
'''
return self._map_async(func, iterable, starmapstar, chunksize).get()
關(guān)于apply_async() ,我沒找到多參數(shù)的方法济炎,大不了用 一個(gè)迭代的 starmap 實(shí)現(xiàn)。哈哈
關(guān)于 上面源碼里面有 itertools.starmap
itertools 用法參考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions
有個(gè)問題辐真,多進(jìn)程最好不要使用全部的 cpu , 因?yàn)檫@樣可能影響其他任務(wù)须尚,所以 在進(jìn)程池 添加 process 參數(shù) 指定,cpu 個(gè)數(shù):
Pool(processes=os.cpu_count()-1)
上面就是預(yù)留了 一個(gè)cpu 干其他事的
后面直接使用 Queue 遇到這個(gè)問題:
RuntimeError: Queue objects should only be shared between processes through inheritance
解決:
Manager().Queue() 代替 Queue()
from multiprocessing import Manager, Queue
queue = Manager().Queue()
# queue = Queue()
因?yàn)?queue.get() 是堵塞型的拆祈,所以可以提前判斷是不是 空的恨闪,以免堵塞進(jìn)程。比如下面這樣:
使用 queue.empty() 空為True
current_search = "稍等" if music_current_search.empty() else
music_current_search.get()