Python 3的多進(jìn)程
多進(jìn)程庫(kù)名叫multiprocessing。有幾點(diǎn)記錄一下:
-
multiprocessing
提供了本地和遠(yuǎn)端的并發(fā)锣险,同時(shí)使用子進(jìn)程機(jī)制避免了GIL只锭,它可以在Unix和Windows運(yùn)行狂秦。 -
multiprocessing
提供了一個(gè)叫Pool
的對(duì)象肉拓,它通過(guò)不同的進(jìn)程給執(zhí)行函數(shù)體賦予不同的輸入值葵硕,以此實(shí)現(xiàn)數(shù)據(jù)并發(fā)(data parallelism)罐监。
Pool 對(duì)象
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
這里要注意吴藻,一定要有if __name__ == '__main__'
參考Programming guidelines。
這個(gè)Pool對(duì)象或者Pool類看起來(lái)非常有用弓柱,熟悉之后用它寫Python非常方便沟堡。它返回的是一個(gè)進(jìn)程集合侧但,隱藏了進(jìn)程管理的操作。是批量處理進(jìn)程的不二選擇弦叶。你可以直接使用這些worker進(jìn)程俊犯,比如說(shuō)下面這個(gè)例子里,首先新建了4個(gè)worker進(jìn)程伤哺,然后讓它們完成平方計(jì)算燕侠,但是代碼不需要關(guān)注哪個(gè)進(jìn)程計(jì)算那個(gè)數(shù)字。當(dāng)然你也可以手動(dòng)新建4個(gè)進(jìn)程立莉,效果是一樣的绢彤。
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
- multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
-
processes
是worker進(jìn)程的數(shù)量,如果為None蜓耻,則使用os.cpu_count()
的返回值 - 如果initializer不是None茫舶,那么worker進(jìn)程在啟動(dòng)的時(shí)候會(huì)調(diào)用initializer(*initargs)
- maxtasksperchild代表一個(gè)worker進(jìn)程在退出前能夠完成的任務(wù)數(shù),之后它會(huì)被替換為一個(gè)新的worker進(jìn)程刹淌。
- 默認(rèn)值為None饶氏,表示workder進(jìn)程的生命周期和pool一樣
- context是用來(lái)啟動(dòng)worker進(jìn)程的上下文。
-
-
pool對(duì)象的方法只能被創(chuàng)建這個(gè)pool的進(jìn)程調(diào)用有勾。
- apply(func[, args[, kwds]])疹启,調(diào)用方法func并阻塞,直到方法執(zhí)行完畢蔼卡。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])喊崖,調(diào)用func,當(dāng)方法執(zhí)行完畢后調(diào)用callback雇逞。
- map(func, iterable[, chunksize])荤懂,用于批量調(diào)用func,chunksize代表了iterable里chunk的大小塘砸。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])节仿,同上,完成之后調(diào)用callback
- imap(func, iterable[, chunksize])谣蠢,輕量版的map()
- imap_unordered(func, iterable[, chunksize])粟耻,和imap()一樣,只是返回的結(jié)果順序是不確定的眉踱。
- starmap(func, iterable[, chunksize])挤忙,和map()類似,只不過(guò)func返回的期望值為iterable的谈喳。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])册烈, 同上,結(jié)果時(shí)調(diào)用callback。
- close()赏僧, 停止接受更多的任務(wù)大猛。當(dāng)前pool里的任務(wù)都完成后,退出淀零。
- terminate()挽绩,立刻停止所有worker進(jìn)程。
- join()驾中,等待workder進(jìn)程退出唉堪。
-
apply_async()和map_async()類返回的類為AsyncResult,它的方法包括:
- get([timeout])肩民,在結(jié)果到達(dá)時(shí)返回它唠亚,否則拋出異常。
- wait([timeout])持痰,等待結(jié)果到達(dá)灶搜。
- ready(),返回調(diào)用操作是否完成工窍。
- successful()割卖,同ready(),但不會(huì)拋出異常患雏。
Process 類
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
方法
-
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- 對(duì)于進(jìn)程來(lái)說(shuō)串塑,
group
總是為None -
target
是子進(jìn)程要運(yùn)行的對(duì)象嘹履,args
是它的參數(shù)谒养,kwargs
是鍵:target(args) -kwargs
-
name
是進(jìn)程名 -
daemon
決定子進(jìn)程是不是守護(hù)進(jìn)程掰烟,如果是None守屉,就繼承父進(jìn)程的daemon值
- 對(duì)于進(jìn)程來(lái)說(shuō)串塑,
run()偏竟,上述
target
就是run要調(diào)用的對(duì)象庭惜,你也可以在子類中覆寫這個(gè)方法这揣,一般不用悬槽。start()怀吻,啟動(dòng)子進(jìn)程,讓進(jìn)程開始執(zhí)行run(), 它需要在創(chuàng)建進(jìn)程之后立刻調(diào)用初婆。
-
join([timeout])
- 阻塞當(dāng)前進(jìn)程蓬坡,并開始執(zhí)行子進(jìn)程
- 如果timeout是None,父進(jìn)程會(huì)阻塞直到子進(jìn)程結(jié)束磅叛;如果timeout是一個(gè)正整數(shù)屑咳,則子進(jìn)程會(huì)在timeout秒之后阻塞。
- 父進(jìn)程可以調(diào)用join很多次弊琴,但是進(jìn)程不能自己join自己兆龙。
- FYI,這和我在上文中所提的LWT中join的實(shí)現(xiàn)不一樣敲董,LWT中的join()和這里的start是一樣的紫皇,LWT中的yeild()和這里的join類似慰安。
name, is_alivel, daemon, pid, exitcode, quthkey, sentinel,一些常量聪铺。
terminate()化焕,終止當(dāng)前進(jìn)程,注意當(dāng)前進(jìn)程的子孫不會(huì)被終結(jié)铃剔。
異常:ProcessError, BufferTooShort, AuthenticationError, TimeoutError
關(guān)于start方法
multiprocessing
是封裝之后的庫(kù)撒桨,它的start
方法的底層實(shí)現(xiàn)有三種,分別是:spawn, fork和forkserver番宁,你可以用set_start_method()
選擇其中一種元莫,但一個(gè)程序只能設(shè)置一次;也可以使用get_context()
來(lái)獲取當(dāng)前的使用的上下文蝶押,后者返回了一個(gè)和multiprocessing
模塊具有相同APIs的對(duì)象踱蠢。大多數(shù)情況下,直接調(diào)用start()
就好了棋电。
一些功能方法
- active_children()茎截,返回當(dāng)前進(jìn)程的子進(jìn)程列表;
-
cpu_count()赶盔,返回CPU的數(shù)量企锌,不是當(dāng)前進(jìn)程所使用的CPU數(shù)量。后者可以用
len(os.sched_getaffinity(0))
獲取于未。 - current_process()撕攒,返回當(dāng)前進(jìn)程對(duì)象。
- freeze_support()烘浦,Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable.
- get_all_start_methods()抖坪,返回所有支持的start方法,包括spawn, fork和forkserver闷叉。
- get_context(method=None)擦俐,返回一個(gè)context對(duì)象,用起來(lái)和multiprocessing一樣握侧。
- get_start_method(allow_none=False)蚯瞧,獲取用于啟動(dòng)進(jìn)程的start方法。
- set_executable()品擎,為子進(jìn)程設(shè)置Python解釋器的路徑埋合。
- set_start_method(method),設(shè)置start方法孽查。
進(jìn)程通信的數(shù)據(jù)結(jié)構(gòu)
進(jìn)程通信的方法主要有兩種:隊(duì)列(Queues)和管道(Pipes)饥悴,另外還有JoinableQueue和SimeleQueue。
Pipes
Pipe可以是單工或者雙工的,卻決于其參數(shù)Pipe[duplex=True]
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
管道的模式是一端進(jìn)西设,一端出瓣铣。但是它的文檔指出如果兩個(gè)進(jìn)程同時(shí)從管道頭讀取或者從管道尾寫入,則數(shù)據(jù)有可能被損壞(corrupted)贷揽。
Queue
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
-
multiprocessing.Queue
和queue.Queue
只是類似棠笑,不一樣。 - 隊(duì)列是進(jìn)程安全和線程安全的禽绪,這說(shuō)明Queue類包含了鎖機(jī)制的實(shí)現(xiàn)蓖救。
- 根據(jù)文檔,Queue是用Pipe和鎖/信號(hào)量來(lái)實(shí)現(xiàn)的印屁。
- 其他關(guān)于Queue的常用方法:qsize(), empty(), full(), put(obj[, block[, timeout]]), put_nowait(obj), get([block[, timeout]]), get_nowait()
- 還有一些不是很重要循捺,大多數(shù)時(shí)候用不到的方法:close(), join_thread(), cancel_join_thread()
SimpeQueue
這是一個(gè)簡(jiǎn)化版的Queue,只實(shí)現(xiàn)了三個(gè)方法雄人,類似于Pipe:empty(), get()和put(item)从橘。
JoinableQueue
除了Queue中實(shí)現(xiàn)的方法,JoinableQueuee額外實(shí)現(xiàn)了task_done()和join()础钠。
進(jìn)程通信的Connection對(duì)象
Connection對(duì)象允許收發(fā)picklable對(duì)象或字符串恰力,有點(diǎn)類似于socket程序。它一般使用Pipe創(chuàng)建旗吁。Picklable是指大約不超過(guò)32MB的數(shù)據(jù)踩萎。
- send(obj), recv(), 收發(fā)對(duì)象,太大的話會(huì)引發(fā)ValueError很钓。
- fileno()香府,返回connection使用的文件描述符或句柄。
- close()码倦,關(guān)閉connection對(duì)象回还,一般是一個(gè)pipe。
- poll([timeout])叹洲,返回是否有可以讀取的數(shù)據(jù)
-
send_bytes(buffer[, offset[,size]]), recv_bytes([maxlength]), recv_bytes_into(buffer[, offset]),收發(fā)二進(jìn)制的數(shù)據(jù)工禾。
- send_bytes發(fā)送的數(shù)據(jù)不能超過(guò)32MB运提,否則拋出ValueError
- recv_bytes和recv_bytes_into: 如果沒有東西接收會(huì)阻塞,如果對(duì)方關(guān)閉了連接則拋出EOFError闻葵;如果收到的數(shù)據(jù)超過(guò)了maxlength民泵,拋出OSError。
- 例子
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
偵聽和客戶端
上述通信模式的進(jìn)一步封裝槽畔,由 multiprocessing.connection 模塊提供:
- multiprocessing.connection.deliver_challenge(connection, authkey)栈妆,發(fā)送一條任意消息,然后進(jìn)入等待。如果對(duì)方返回的是使用authkey加密的正確內(nèi)容鳞尔,則發(fā)送welcome消息嬉橙。使用HMAC加密。
- multiprocessing.connection.answer_challenge(connection, authkey)寥假,接收一條消息市框,使用authkey進(jìn)行加密,然后返回加密后的消息糕韧,等待welcome消息枫振。
- multiprocessing.connection.Client(address[, family[, authkey]]),設(shè)置客戶端萤彩,嘗試向地址為address的偵聽器建立連接粪滤。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]]),設(shè)置偵聽器雀扶,偵聽address表示的地址范圍杖小。
- multiprocessing.connection.accept(), 在偵聽段上接收一個(gè)連接怕吴,并返回一個(gè)Connection窍侧。
- close(),關(guān)閉偵聽器相關(guān)的套接字或pipe转绷。
- address伟件,偵聽器使用的底地址。
- last_accepted议经,返回最后接收的連接的address斧账,沒有則為None。
- multiprocessing.connection.wait(object_list, timeout=None)煞肾,等待object_list中某個(gè)object對(duì)象準(zhǔn)備完成咧织,并返回準(zhǔn)備完成的對(duì)象。
例子:
Server
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Client
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
wait的使用例子
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
進(jìn)程同步
鎖
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
鎖是用的最多的籍救,除了普通的鎖习绢,Python也支持遞歸鎖,在后者中蝙昙,同一個(gè)進(jìn)程可以多次申請(qǐng)鎖闪萄,但是必須要依次釋放每一個(gè)鎖。兩種鎖的用法是一樣的奇颠,除了名字败去。遞歸鎖是RLock()。
其他
其他的同步用的名詞有:Barrier, BoundedSemaphore, Condition, Event, Semaphor烈拒。鎖其實(shí)是一種特殊的信號(hào)量圆裕。
進(jìn)程共享
共享ctypes對(duì)象
共享ctypes對(duì)象可以用來(lái)使用共享的內(nèi)存广鳍,常見的對(duì)象有Value和Array,如:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
- multiprocessing.Value(typecode_or_type, *args, lock=True)
- typecode_or_type決定了Value對(duì)象的返回對(duì)象類型吓妆,它可以是ctype對(duì)象或者是Value中使用的數(shù)據(jù)類型赊时。
- args是參數(shù)
- lock不僅可以是True或是False,代表是否需要鎖耿战;也可以直接傳入一個(gè)Lock或Rlock對(duì)象用來(lái)作為這個(gè)Value對(duì)象的鎖蛋叼。如果選擇False,這個(gè)Value就不再是進(jìn)程安全的了剂陡。
- multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
- 基本和Value差不多狈涮,size_or_initializer如果是一個(gè)數(shù)字,則代表了這個(gè)Array的大小鸭栖,所有的元素會(huì)被初始化為0歌馍;它也可以是一個(gè)用于初始化的序列,序列的內(nèi)容就是初始化的值晕鹊,序列的大小就是Arrary的大小松却。
還有幾個(gè)用來(lái)操作ctype類型對(duì)象的方法,它們屬于sharedctypes模塊:
- multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
- multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
- multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
- multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
- multiprocessing.sharedctypes.copy(obj)
- multiprocessing.sharedctypes.synchronized(obj[, lock])
管理者進(jìn)程
使用Manager()
對(duì)象可以創(chuàng)建一條管理者進(jìn)程溅话,它可以用來(lái)存儲(chǔ)Python對(duì)象晓锻,并允許其他本地或者遠(yuǎn)端的進(jìn)程通過(guò)代理操作這些對(duì)象。支持的對(duì)象類型有: list, dict, Namespace, Lock, RLock, Semaphor, BoundedSemaphore, Condition, Event, Barrier, Queue, Value和Array 飞几。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
Manager類定義在 multiprocessing.managers 模塊里:
-
BaseManager([address[, authkey]])
-
address
是這個(gè)管理者進(jìn)程用來(lái)進(jìn)行偵聽的地址砚哆,如果值是None,則會(huì)任意選擇一個(gè)地址屑墨。authkey
是一個(gè)二進(jìn)制字符串躁锁,如果值為None,則會(huì)使用current_process().authkey
里的值卵史。 - 其他方法:
-
start([initializer[, initargs]])战转,啟動(dòng)一條子進(jìn)程用于啟動(dòng)這個(gè)manager。如果initializer不為None以躯,則子進(jìn)程啟動(dòng)的時(shí)候和調(diào)用
initializer(*initargs)
槐秧。 - get_server(),返回一個(gè)server對(duì)象忧设,它代表了Manager控制下的真實(shí)服務(wù)器色鸳。
- connect(),將一個(gè)本地manager連接到一個(gè)遠(yuǎn)程的manager進(jìn)程见转。
- shutdown(),關(guān)閉本manager使用的進(jìn)程蒜哀。
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])斩箫。
- address吏砂, 只讀變量,值為manager的地址乘客。
-
-
SyncManager
自定義manager
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
訪問(wèn)遠(yuǎn)端manager的例子
本地服務(wù)器
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
遠(yuǎn)程客戶端
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地客戶端
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理對(duì)象
A proxy is an object which refers to a shared object which lives (presumably) in a different process. The shared object is said to be the referent of the proxy. Multiple proxy objects may have the same referent.
這個(gè)屬于那種很容易理解但是很難用文字表述的內(nèi)容匈织。舉個(gè)例子:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
注意manager.list()
返回的是一個(gè)代理對(duì)象,因此manager.list([1,2,3]) == [1,2,3]
是False的牡直。
- multiprocessing.managers. BaseProxy
- Proxy對(duì)象是BaseProxy子類的實(shí)例
- _callmethod(methodname[, args[, kwds]])缀匕,用來(lái)調(diào)用引用的方法
- _getvalue()返回引用的一份拷貝
-
__repr__
() 返回Proxy對(duì)象的描述 -
__str__
() 返回引用的描述
日志
logging 包可以支持日志記錄,但是它沒有使用共享鎖碰逸,因此來(lái)自于不同進(jìn)程的日志消息可能會(huì)混亂乡小。
- multiprocessing.get_logger(),返回一個(gè)logger對(duì)象饵史。如果是新建的logger满钟,它的日志等級(jí)為
logging.NOTSET
。 - multiprocessing.log_to_stderr()胳喷,這個(gè)方法將get_logger()的日志重定向到
sys.stderr
上湃番,格式為[%(levelname)s/%(processName)s] %(message)s
。
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0