眾所周知撒会,Python是使用偽多線程運(yùn)行的嘹朗。這導(dǎo)致在多核情況下,CPU并沒(méi)有被良好地利用起來(lái)诵肛。為了提高性能屹培,我們一般會(huì)選擇多進(jìn)程進(jìn)行工作。
進(jìn)程之間的協(xié)作需要通過(guò)通信完成怔檩,考慮到管道的緩存能力和安全性褪秀,我們選擇隊(duì)列作為通信內(nèi)容的載體。
multiprocessing模塊下實(shí)現(xiàn)封裝有很多基礎(chǔ)類型供給多線程間共享調(diào)用薛训,這足夠滿足我們的需要媒吗。
但是又一個(gè)問(wèn)題出現(xiàn)了。進(jìn)程的發(fā)送方可以根據(jù)需要向隊(duì)列中加入數(shù)據(jù)许蓖,但是對(duì)于接收方不斷地接收數(shù)據(jù)來(lái)保證加入隊(duì)列的數(shù)據(jù)可以得到及時(shí)地處理蝴猪。或許輪詢式的嘗試從隊(duì)列中獲取數(shù)據(jù)是個(gè)好選擇膊爪。
那么多久循環(huán)一次呢自阱?無(wú)限制循環(huán)卑雁?一個(gè)Linux tick時(shí)間系宜?無(wú)論多長(zhǎng),在空閑的時(shí)候都會(huì)造成對(duì)CPU的性能無(wú)意義損耗啃炸。所以另一個(gè)解決方案呼之欲出赃额,如果隊(duì)列為空那么等待一個(gè)信號(hào)加派,直到隊(duì)列中被放入數(shù)據(jù)后觸發(fā)這個(gè)信號(hào)后繼續(xù)該進(jìn)程。這個(gè)借助multiprocessing.Event()便可輕易做到跳芳。
那么怎么對(duì)Queue進(jìn)行封裝呢芍锦?關(guān)于這一點(diǎn)參考之前寫過(guò)的這篇文章: Python中輪詢觸發(fā)更替事件驅(qū)動(dòng)的簡(jiǎn)單方法
那么現(xiàn)在,我們已經(jīng)成功得對(duì)Queue進(jìn)行了封裝飞盆,那么我們是否可以對(duì)multiprocessing中manager里的其他的管理的數(shù)據(jù)內(nèi)容進(jìn)行封裝呢娄琉?于是我給出了如下的解決方案。
import multiprocessing
#這是所有操作符相關(guān)過(guò)或者其他功能相關(guān)的特殊函數(shù)吓歇。
common_list = {"__add__","__concat__","__contains__",
"__truediv__","__floordiv__", "__and___",
"__xor__","__invert__","__or__","__pow__",
"__is__","__is_not__","__setitem__","__delitem__",
"__getitem__","__lshift__","__mod__","__mul__",
"__matmul__","__neg__","__not__","__pos__",
"__rshift__","__setitem__","__delitem__","__getitem__",
"__mod__","__sub__","__truth__","__lt__",
"__le__","__eq__","__ne__","__ge__",
"__gt__","__str__"}
#定義需要封裝事件信號(hào)的類型和觸發(fā)事件信號(hào)的方法
source_map = {
'Queue': {'put'},
'dict':{'__setitem__'}
}
# 封裝了multiprocessing.Manager孽水,
# 由于multiprocessing.Manager實(shí)際是一個(gè)方法,
# 所以使用了下述__getattribute__的方式城看。
class Manager(object):
def __init__(self):
self.manager = multiprocessing.Manager()
# 將multiprocessing.Manager的方法映射向了Manager
def __getattribute__(self, name):
# 如果該方法在source_map 內(nèi)女气,則對(duì)該方法進(jìn)行封裝事件處理
if name in source_map:
# 初始化的方法,實(shí)例化被封裝的實(shí)例和該實(shí)例所用的事件信號(hào)
def __init__(self_q, *args, **kwargs):
self_q.sign = self.manager.Event()
self_q.base = self.manager.__getattribute__(name)(*args, **kwargs)
#生成一個(gè)對(duì)應(yīng)的函數(shù)的函數(shù)
def functionfactory(i,m,self=None):
if self:
def f(*args, **kwargs):
r = self.base.__getattribute__(m)(*args, **kwargs)
if i:
self.sign_set()
return r
else:
def f(self_q, *args, **kwargs):
r = self_q.base.__getattribute__(m)(*args, **kwargs)
if i:
self_q.sign_set()
return r
return f
# 對(duì)一些屬性方法做映射
def __getattribute__(self_q, name_q):
if name_q[:5] == 'sign_':
return self_q.sign.__getattribute__(name_q[5:])
elif name_q in source_map[name] and name_q not in common_list:
return functionfactory(True, name_q, self_q)
elif name_q[:2]=='__' or name_q in ['sign', 'base'] or \
(name_q in common_list and name_q in source_map[name]):
return object.__getattribute__(self_q, name_q)
else:
return self_q.base.__getattribute__(name_q)
# __dir__ 方法的重寫
def __dir__(self):
r = (dir(self.base)
+ ["sign_%s" % m for m in dir(self.sign) if m[:2]!='__']
+ ['sign', 'base'])
r.sort()
return r
# 生成動(dòng)態(tài)生成類型的方法字典
method_map = {
'__init__': __init__,
'__getattribute__': __getattribute__,
'__dir__': __dir__
}
for m in common_list.intersection(
dir(type(self.manager.__getattribute__(name)()))
):
method_map[m] = functionfactory(m in source_map[name],m)
#生成并返回動(dòng)態(tài)類型
return type(name, (object,), method_map)
elif name in ['manager', 'source_map']:
return object.__getattribute__(self, name)
else:
return self.manager.__getattribute__(self, name)
def __dir__(self):
r = dir(self.manager) + ['manager']
r.sort()
return r
if __name__ == "__main__":
m = Manager()
q = m.Queue()
q.sign_set()
print(q.sign_is_set()) # True
q.sign_clear()
q.put(1)
print(q.sign_wait()) # True
d = m.dict()
d[3] = 2
print(d.sign_is_set()) # True
d.sign_clear()
print(d[3]) # 2
print(d.sign_is_set()) # False
print(d) # {3:2}
其中测柠,通過(guò)重寫__getattribute__方法的方式炼鞠,簡(jiǎn)單實(shí)現(xiàn)了對(duì)封裝的外部類和對(duì)應(yīng)內(nèi)部類的屬性進(jìn)行了映射缘滥。即當(dāng)在代碼中調(diào)用a.b的時(shí)候,其實(shí)是通過(guò)a.__getattribute__返回的b屬性簇搅。比如:
class AllMethodAllowed(object):
def __getattribute__(self, name):
if name in dir(object):
return object.__getattribute__(self, name)
else:
return 'Hello, %s !' % name
if __name__=="__main__":
ama = AllMethodAllowed()
print(ama.Emma) # Hello, Emma !
print(ama.World) # Hello, World !
但是python的基本操作是由實(shí)例直接對(duì)應(yīng)的內(nèi)置方法決定的而非__getattribute__方法獲取的完域,所以需要通過(guò)定義的方式進(jìn)行。