@[toc]
內(nèi)存共享
-
通過(guò)Value,Array實(shí)現(xiàn)內(nèi)存共享
- 返回一個(gè)從共享內(nèi)存上創(chuàng)建的ctypes對(duì)象
- 從共享內(nèi)存中申請(qǐng)并返回一個(gè)具有ctypes類(lèi)型的數(shù)組對(duì)象
-
通過(guò)Manager實(shí)現(xiàn)內(nèi)存共享
- Manager返回的管理器對(duì)象控制一個(gè)服務(wù)進(jìn)程,且由該進(jìn)程保存Python對(duì)象并允許其他進(jìn)程通過(guò)代理操作對(duì)象
- 返回的管理器支持類(lèi)型支持list决摧、dict等
- 注意同步:可能需要加鎖亿蒸,尤其碰到+=更新時(shí)
from multiprocessing import Process
from multiprocessing import Manager, Lock
import time
import random
def register(d,name):
if name in d:
print('duplicated name found...register anyway...')
d[name]+=1
else:
d[name]=1
time.sleep(random.random())
def register_with_loc(d,name,lock):
with lock:
if name in d:
print('duplicated name found...register anyway...')
d[name]+=1
else:
d[name]=1
time.sleep(random.random())
if __name__=='__main__':
#Manager
names=['Amy','Lily','Dirk','Lily', 'Denial','Amy','Amy','Amy']
manager=Manager()
dic=manager.dict()
lock=Lock()
#manager.list()
students=[]
for i in range(len(names)):
#s=Process(target=register,args=(dic,names[i]))
s=Process(target=register_with_loc,args=(dic,names[i],lock))
students.append(s)
for s in students:
s.start()
for s in students:
s.join()
print('all processes ended...')
for k,v in dic.items():
print("{}\t{}".format(k,v))
可以共享字典
duplicated name found...register anyway...
duplicated name found...register anyway...
duplicated name found...register anyway...
duplicated name found...register anyway...
all processes ended...
Amy 4
Dirk 1
Denial 1
Lily 2
- 進(jìn)程池
- 進(jìn)程開(kāi)啟過(guò)多導(dǎo)致效率下降(同步、切換成本)
- 應(yīng)固定工作進(jìn)程的數(shù)目
- 由這些進(jìn)程執(zhí)行所有任務(wù)蜜徽,而非開(kāi)啟更多的進(jìn)程
- 與CPU的核數(shù)相關(guān)
創(chuàng)建進(jìn)程池
-
Pooll([numprocess [,initializer [, initargs]]])
- numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù)祝懂,默認(rèn)使用os.cpu_count()的值
- initializer:每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None
- initargs:initializer的參數(shù)
-
p.apply()
- 同步調(diào)用
- 只有一個(gè)進(jìn)程執(zhí)行(不并行)
- 但可以直接得到返回結(jié)果(阻塞至返回結(jié)果)
-
p.apply_async()
- 異步調(diào)用
- 并行執(zhí)行拘鞋,結(jié)果不一定馬上返回 (AsyncResult)
- 可以有回調(diào)函數(shù)砚蓬,進(jìn)程池中任意任務(wù)完成后會(huì)立即通知主進(jìn)程,主進(jìn)程將調(diào)用另一個(gè)函數(shù)去處理該結(jié)果盆色,該函數(shù)即回調(diào)函數(shù)灰蛙,其參數(shù)為返回結(jié)果
p.close()
-
p.join()
- 等待所有工作進(jìn)程退出祟剔,只能在close()或teminate()之后調(diào)用
補(bǔ)充
回調(diào)函數(shù)的參數(shù)只有一個(gè),即結(jié)果
回調(diào)函數(shù)是由主進(jìn)程調(diào)用的
回調(diào)函數(shù)應(yīng)該迅速結(jié)束
回調(diào)的順序跟子進(jìn)程啟動(dòng)的順序無(wú)關(guān)p.map()
并行摩梧,主進(jìn)程會(huì)等待所有子進(jìn)程結(jié)束p.map_async()
from multiprocessing import Process
from multiprocessing import Pool
from multiprocessing import current_process
import matplotlib.pyplot as plt
import os
import time
global_result=[]
def fib(max):
n,a,b=0,0,1
while n<max:
a,b=b,a+b
n+=1
return b
def job(n):
print('{} is working on {}...'.format(os.getpid(),n))
time.sleep(2)
return fib(n)
def add_result(res):#callback func
global global_result
print("called by {}, result is {}".format(current_process().pid,res))
#也可以返回進(jìn)程標(biāo)識(shí)信息物延,用以識(shí)別結(jié)果(比如進(jìn)程的參數(shù))
#可以用字典存儲(chǔ)
global_result.append(res)
def add_result_map(res):
global global_result
print("called by {}, result is {}".format(current_process().pid,res))
for r in res:
global_result.append(r)
if __name__=='__main__':
p=Pool()#cpu determines
ms=range(1,20)
results=[]
#同步調(diào)用
#創(chuàng)建多個(gè)進(jìn)程,但是只有一個(gè)執(zhí)行仅父,需要等到執(zhí)行結(jié)束后再可以返結(jié)果
#并不并行
for m in ms:
print('{} will be applied in main'.format(m))
res=p.apply(job,args=(m,))#會(huì)等待執(zhí)行結(jié)束后再執(zhí)行下一個(gè)
print(res)
print('{} is applied in main'.format(m))
results.append(res)
p.close()#!!叛薯!
print(results)
plt.figure()
plt.plot(ms,results)
plt.show()
plt.close()
#異步調(diào)用
#可以并行
'''for m in ms:
res=p.apply_async(job,args=(m,))#注意這里res只是一個(gè)引用
results.append(res)
#如果馬上打印,可能并沒(méi)有結(jié)果
print(res)
p.close()
p.join()
results2=[]
for res in results:
results2.append(res.get())
print(results2)
plt.figure()
plt.plot(ms,results2)
plt.show()
plt.close()'''
#callback
'''for m in ms:
p.apply_async(job,args=(m,),callback=add_result)
#callback函數(shù)只有一個(gè)參數(shù)
#callback函數(shù)是由主進(jìn)程執(zhí)行的
p.close()
p.join()
plt.figure()
plt.plot(ms,sorted(global_result))#順序可能是亂的笙纤,這里可以排序解決耗溜,但其他問(wèn)題不一定
plt.show()
plt.close()'''
#使用map
'''results3=p.map(job,ms)
print(type(results3))#list
p.close()
plt.figure()
plt.plot(ms,results3)
plt.show()
plt.close()'''
#使用map_async
'''p.map_async(job,ms,callback=add_result_map)
p.close()
p.join()
plt.figure()
print(len(ms))
print(len(global_result))
plt.plot(ms,global_result)
plt.show()
plt.close()'''
同步調(diào)用時(shí),res=p.apply(job,args=(m,))
會(huì)等待執(zhí)行結(jié)束后再執(zhí)行下一個(gè)省容,所以會(huì)消耗很多時(shí)間抖拴,運(yùn)行時(shí)間長(zhǎng)
1 will be applied in main
9028 is working on 1...
1
1 is applied in main
2 will be applied in main
3396 is working on 2...
2
2 is applied in main
3 will be applied in main
17520 is working on 3...
3
3 is applied in main
4 will be applied in main
12984 is working on 4...
5
4 is applied in main
5 will be applied in main
10720 is working on 5...
8
5 is applied in main
6 will be applied in main
18792 is working on 6...
13
6 is applied in main
7 will be applied in main
14768 is working on 7...
21
7 is applied in main
8 will be applied in main
19524 is working on 8...
34
8 is applied in main
9 will be applied in main
9028 is working on 9...
55
9 is applied in main
10 will be applied in main
3396 is working on 10...
89
10 is applied in main
11 will be applied in main
17520 is working on 11...
144
11 is applied in main
12 will be applied in main
12984 is working on 12...
233
12 is applied in main
13 will be applied in main
10720 is working on 13...
377
13 is applied in main
14 will be applied in main
18792 is working on 14...
610
14 is applied in main
15 will be applied in main
14768 is working on 15...
987
15 is applied in main
16 will be applied in main
19524 is working on 16...
1597
16 is applied in main
17 will be applied in main
9028 is working on 17...
2584
17 is applied in main
18 will be applied in main
3396 is working on 18...
4181
18 is applied in main
19 will be applied in main
17520 is working on 19...
6765
19 is applied in main
[1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765]
異步調(diào)用時(shí),中間打印不出結(jié)果腥椒,只有在全部執(zhí)行結(jié)束之后才會(huì)出結(jié)果阿宅,但是運(yùn)行速度較快。
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C16A0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1780>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1828>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C18D0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1978>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1A20>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1AC8>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1B70>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1C18>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1CC0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1D68>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1E10>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1EB8>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1F60>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C1FD0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C80F0>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C8198>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C8240>
<multiprocessing.pool.ApplyResult object at 0x000001EF6A9C82E8>
21056 is working on 1...
16880 is working on 2...
13832 is working on 3...
17564 is working on 4...
21056 is working on 5...
16940 is working on 6...
12728 is working on 7...
6592 is working on 8...
18296 is working on 9...
16880 is working on 10...
13832 is working on 11...
17564 is working on 12...
21056 is working on 13...
16940 is working on 14...
12728 is working on 15...
6592 is working on 16...
18296 is working on 17...
16880 is working on 18...
13832 is working on 19...
[1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765]
回調(diào)函數(shù)的結(jié)果
4600 is working on 1...
4584 is working on 2...
2892 is working on 3...
10088 is working on 4...
14404 is working on 5...
4600 is working on 6...
called by 14812, result is 116212 is working on 7...
4608 is working on 8...
called by 14812, result is 2
4584 is working on 9...
2632 is working on 10...
called by 14812, result is 3
2892 is working on 11...
called by 14812, result is 5
10088 is working on 12...
called by 14812, result is 8
14404 is working on 13...
called by 14812, result is 13
4600 is working on 14...
called by 14812, result is 21
16212 is working on 15...
called by 14812, result is 34
4608 is working on 16...
called by 14812, result is 55
4584 is working on 17...
called by 14812, result is 89
2632 is working on 18...
called by 14812, result is 144
2892 is working on 19...
called by 14812, result is 233
called by 14812, result is 377
called by 14812, result is 610
called by 14812, result is 987
called by 14812, result is 1597
called by 14812, result is 2584
called by 14812, result is 4181
called by 14812, result is 6765
ProcessPoolExecutor
- 對(duì)multiprocessing進(jìn)一步抽象
- 提供更簡(jiǎn)單笼蛛、統(tǒng)一的接口
- submit(fn, *args, **kwargs)
- returns a Future object representing the execution of the callable
- 補(bǔ)充:馬上調(diào)用Future的result()會(huì)阻塞
- map(func, *iterables, timeout=None)
? func is executed asynchronously, i.e., several calls to func may be made concurrently and returns an iterator of results
import concurrent.futures
from multiprocessing import current_process
import math
PRIMES = [
1112272535095293,
1112582705942171,
1112272535095291,
1115280095190773,
1115797848077099,
11099726899285419]
def is_prime(n):
print(f"{current_process().pid}")
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main_submit():
results=[]
with concurrent.futures.ProcessPoolExecutor() as executor:
for number in PRIMES:
n_future=executor.submit(is_prime,number)
#print(n_future.result())#block
results.append(n_future)
#這里要注意洒放,如果馬上在主進(jìn)程里獲取結(jié)果,即n_future.result()伐弹,即主進(jìn)程會(huì)阻塞拉馋,無(wú)法并行
#因此建議先將n_future擱進(jìn)list,等啟動(dòng)所有進(jìn)程后再獲取結(jié)果。
for number, res in zip(PRIMES,results):
print("%d is prime: %s" % (number,res.result()))
def main_map():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime_or_not in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime_or_not))
if __name__ == '__main__':
main_submit()
#main_map()
8004
8004
1112272535095293 is prime: False
4332
18212
18212
18212
1112582705942171 is prime: True
1112272535095291 is prime: True
1115280095190773 is prime: False
1115797848077099 is prime: False
11099726899285419 is prime: False
多進(jìn)程
- 分布式多進(jìn)程
- 多機(jī)環(huán)境
- 跨設(shè)備數(shù)據(jù)交換
- master-worker模型
- 通過(guò)manager暴露Queue
- GIL(Global Interpreter Lock)
- GIL非Python特性惨好,而是實(shí)現(xiàn)Python解釋器(Cpython)時(shí)引入的概念
- GIL本質(zhì)上是互斥鎖煌茴,控制同一時(shí)間共享數(shù)據(jù)只能被一個(gè)任務(wù)修改,以保證數(shù)據(jù)安全
- GIL在解釋器級(jí)保護(hù)共享數(shù)據(jù)日川,在用戶(hù)編程層面保護(hù)數(shù)據(jù)則需要自行加鎖處理
- Cpython解釋器中蔓腐,同一個(gè)進(jìn)程下開(kāi)啟的多線程,同一時(shí)刻只能有一個(gè)線程執(zhí)行龄句,無(wú)法利用多核優(yōu)勢(shì)
- 可能需要先獲取GIL
多線程編程
多進(jìn)程Process | 多線程thread |
---|---|
可以利用多核 | 無(wú)法利用多核 |
開(kāi)銷(xiāo)大 | 開(kāi)銷(xiāo)小 |
計(jì)算密集型 | IO密集型 |
金融分析 | socket回论,爬蟲(chóng),web |
threating
模塊multiprocessing
模塊和threating
模塊在使用層面十分類(lèi)似threading.currentThread()
:返回當(dāng)前的線程實(shí)例threading.enumerate()
:返回所有正在運(yùn)行線程的listthreading.activeCount()
:返回正在運(yùn)行的線程數(shù)量分歇,與len(threading.enumerate())
結(jié)果相同-
創(chuàng)建多線程
- 通過(guò)指定target參數(shù)
- 通過(guò)繼承Thread類(lèi)
- 設(shè)置守護(hù)線程
- setDaemon(True)
- 應(yīng)在start()之前
from threading import Thread,currentThread
import time
def task(name):
time.sleep(2)
print('%s print name: %s' %(currentThread().name,name))
class Task(Thread):
def __init__(self,name):
super().__init__()
self._name=name
def run(self):
time.sleep(2)
print('%s print name: %s' % (currentThread().name,self._name))
if __name__ == '__main__':
n=100
var='test'
t=Thread(target=task,args=('thread_task_func',))
t.start()
t.join()
t=Task('thread_task_class')
t.start()
t.join()
print('main')
Thread-1 print name: thread_task_func
thread_task_class print name: thread_task_class
main
- 線程同步
- 鎖(threading.Lock,threading.RLock,可重入鎖)
- 一旦線程獲得重入鎖傀蓉,再次獲取時(shí)將不阻塞
- 線程必須在每次獲取后釋放一次
- 區(qū)別:遞歸調(diào)用
- 信號(hào)量 threading.Semaphore
- 事件 threading.Event
- 條件 threading.Condition
- 定時(shí)器 threading.Timer
- Barrier
- 鎖(threading.Lock,threading.RLock,可重入鎖)
- 線程局部變量
- 隊(duì)列
- queue.Queue
- queue.LifoQueue
- queue.PriorityQueue
- 線程池 ThreadPoolExecutor
線程和進(jìn)程的比較
from multiprocessing import Process
from threading import Thread
import os,time,random
def dense_cal():
res=0
for i in range(100000000):
res*=i
def dense_io():
time.sleep(2)#simulate the io delay
def diff_pt(P=True,pn=4,target=dense_cal):
tlist=[]
start=time.time()
for i in range(pn):
if P:
p=Process(target=target)
else:
p=Thread(target=target)
tlist.append(p)
p.start()
for p in tlist:
p.join()
stop=time.time()
if P:
name='multi-process'
else:
name='multi-thread'
print('%s run time is %s' %(name,stop-start))
if __name__=='__main__':
diff_pt(P=True)
diff_pt(P=False)
diff_pt(P=True,pn=100,target=dense_io)
diff_pt(P=False,pn=100,target=dense_io)
multi-process run time is 28.328214168548584
multi-thread run time is 64.33887600898743
multi-process run time is 13.335324048995972
multi-thread run time is 5.584061861038208
信號(hào) signal
- 信號(hào)操作系統(tǒng)中進(jìn)程間通訊的一種有限制的方式
- 一種異步的通知機(jī)制,提醒進(jìn)程一個(gè)事件已經(jīng)發(fā)生
- 當(dāng)信號(hào)發(fā)送至進(jìn)程時(shí)职抡,操作系統(tǒng)將中斷其執(zhí)行
? 任何非原子操作都將被中斷
? 如果進(jìn)程定義了該信號(hào)的處理函數(shù)葬燎,將執(zhí)行該函數(shù),否
則執(zhí)行默認(rèn)處理函數(shù)
? signal.signal(signal.SIGTSTP, handler) - 可用于進(jìn)程的中止
- Python信號(hào)處理只在主線程中執(zhí)行
? 即使信號(hào)在另一個(gè)線程中接收
? 信號(hào)不能被用作線程間通信的手段
? 只有主線程才被允許設(shè)置新的信號(hào)處理程序