搖滾吧毫炉!多進(jìn)程
由于cpython GIL的特性施逾,在單核下是完美的钉答,但是面對(duì)多核,老爹的推薦是開(kāi)啟多進(jìn)程灸叼,所以要發(fā)揮多核最好利用多進(jìn)程
什么是進(jìn)程
進(jìn)程是程序執(zhí)行中資源分配的基本單元神汹。程序執(zhí)行說(shuō)明了他是動(dòng)態(tài)的,就會(huì)有等待和執(zhí)行古今,僵尸等狀態(tài)屁魏。而資源分配就是指的堆棧,數(shù)據(jù)段捉腥,信號(hào)等氓拼。
進(jìn)程的創(chuàng)建
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',)) # 獲得句柄
p.start() # 運(yùn)行進(jìn)程
p.join() # 等待子進(jìn)程退出
當(dāng)調(diào)用Process的時(shí)候,實(shí)際并沒(méi)有創(chuàng)建子進(jìn)程抵碟,只是創(chuàng)建了一個(gè)關(guān)聯(lián)句柄桃漾,這個(gè)句柄包含當(dāng)前環(huán)境的進(jìn)程ID,就是父進(jìn)程iD拟逮,子進(jìn)程的標(biāo)識(shí)撬统,子進(jìn)程啟動(dòng)之后的任務(wù),還有參數(shù)傳遞敦迄。
當(dāng)你調(diào)用start函數(shù)執(zhí)行的時(shí)候恋追,就會(huì)調(diào)用fork創(chuàng)建子進(jìn)程凭迹,而子進(jìn)程就會(huì)執(zhí)行“_bootstrap”函數(shù),并最終調(diào)用"run"方法苦囱,如果run方法不是循環(huán)嗅绸,那么執(zhí)行完成就會(huì)退出了。run方法默認(rèn)是執(zhí)行你之前填入的target方法和參數(shù)的,target(*args, **kwargs)撕彤。
join實(shí)際上是父進(jìn)程等待子進(jìn)程完成退出的意思鱼鸠。子進(jìn)程的默認(rèn)計(jì)數(shù)是從1開(kāi)始的。
第二種創(chuàng)建進(jìn)程的方法
from multiprocessing import Process
class Worker(Process):
def __init__(self, *args, **kwargs):
"""可定制參數(shù)"""
super(Worker, self).__init__(*args, **kwargs)
def run(self):
"""可重寫(xiě),比如不通過(guò)輸入?yún)?shù)來(lái)指定羹铅,可以通過(guò)隊(duì)列來(lái)獲取數(shù)據(jù)"""
print("I am rocking")
super(Worker, self).run()
def f(name):
print(" hello world!:", name)
if __name__ == "__main__":
w = Worker(target=f, args=("Bomb",) )
w.start()
w.join()
為什么我一般不用pool而采用process蚀狰,如果只是簡(jiǎn)單類(lèi)似于map計(jì)算,是可以使用map的睦裳,但是面對(duì)復(fù)雜的業(yè)務(wù),我會(huì)選擇process撼唾,因?yàn)樽约焊鶕?jù)業(yè)務(wù)控制行為廉邑。
常用屬性
p.run# 主要用于overrite的方法
p.start() # 創(chuàng)建子進(jìn)程
p.join() # 父進(jìn)程等待子進(jìn)程完成
p.name # 子進(jìn)程的名字
p.is_alive() # 查看子進(jìn)程是否存活
p.pid# 子進(jìn)程起來(lái)之后,才有值
p.terminate() #殺死子進(jìn)程
進(jìn)程的通訊
進(jìn)程間的通訊基本都是共享內(nèi)存,只是形式不一樣倒谷,Queue都是基于pipe實(shí)現(xiàn)的蛛蒙。甚至于有的加了鎖機(jī)制,保證數(shù)據(jù)同步渤愁,比如queue就是線(xiàn)程或者進(jìn)程安全的
Pipe
返回兩個(gè)可連接對(duì)象
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
Queue
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
共享狀態(tài)Value牵祟,Array,Copy對(duì)象
https://docs.python.org/2/library/ctypes.html#module-ctypes
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', 'hello world', lock=lock) # lock在于多個(gè)并發(fā)寫(xiě)時(shí)抖格,保證數(shù)據(jù)的一致性
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n.value
print x.value
print s.value
print [(a.x, a.y) for a in A]
共享狀態(tài)Managers
managers是一個(gè)管理資源的進(jìn)程诺苹,支持的類(lèi)型
dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Queue
,Value
and [Array
](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array,算是一個(gè)代理雹拄,其他進(jìn)程通過(guò)這個(gè)服務(wù)進(jìn)程獲取數(shù)據(jù)收奔。優(yōu)勢(shì)就是支持多種類(lèi)型的資源,缺點(diǎn)就是得開(kāi)個(gè)進(jìn)程,如果這個(gè)進(jìn)程掛了滓玖,就完蛋了坪哄。
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', 'hello world', lock=lock) # lock在于多個(gè)并發(fā)寫(xiě)時(shí),保證數(shù)據(jù)的一致性
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n.value
print x.value
print s.value
print [(a.x, a.y) for a in A]