在處理事件隊列的過程中不少情況是采用輪詢的方式進行的奔脐。
例如如下例子所示躯舔。在Example中彭雾,主線程和子線程通過隊列的形式進行通信,為模擬業(yè)務串稀,主線程將隨時獲取到的待處理任務放入對應的任務隊列(將一隨機數(shù)放入隨機的隊列中除抛,之后等待隨機的時間),子線程發(fā)現(xiàn)有隊列中有待處理的事件就將其取出進行處理(取出隊列中的數(shù)字進行打印)母截。
import time
from collections import deque
from random import Random
import threading
class Example():
def __init__(self):
self.r = Random(time.time())
def worker_run(self, name, q):
while True:
if q:
print("%s pop number: %s, %s numbers left" % (name, q.pop(), len(q)))
time.sleep(0.001)
def manager_run(self,worker_num):
queue_list = [deque() for i in range(worker_num)]
worker_list = [threading.Thread(target=self.worker_run,
args=["Thread-%s" % i, queue_list[i]]) for i in range(worker_num)]
for worker in worker_list:
worker.start()
while True:
i = 0
while i< self.r.randint(0,20):
queue_list[self.r.randint(0,worker_num - 1)].appendleft(
self.r.randint(1,100)
)
i += 1
time.sleep(self.r.random()/100)
if __name__ == '__main__':
Example().manager_run(3)
進行功能分離簡單到忽,目的性明確。但是存在效率性的問題微酬,如果想提高觸發(fā)效率绘趋,那么需要將worker_run中的每輪sleep時間設置的非常小颤陶,甚至是0。這么就造成了另一個問題陷遮,CPU的損耗問題滓走。無意義的while死循環(huán)無疑會造成CPU的無意義損耗。這個對機器性能消耗非常巨大的帽馋。所以搅方,需要對機制進行修改,由輪詢轉(zhuǎn)為事件驅(qū)動绽族。這樣的更改無疑會對代碼整體結構有較大的調(diào)整姨涡。但是是否有簡單的方式可以快速達到目的,且更代碼改較量小吧慢。
不難想到涛漂,只要在事件隊列為空時暫停工作線程,隊列放入值時重啟工作線程即可检诗。原本匈仗,直接對線程進行中斷和繼續(xù)是最好的解決方案,但是在python中threading.Thread并不支持逢慌。在官方文檔如是說:
The design of this module is loosely based on Java’s threading model. However, where Java makes locks and condition variables basic behavior of every object, they are separate objects in Python. Python’s Thread
class supports a subset of the behavior of Java’s Thread class; currently, there are no priorities, no thread groups, and threads cannot be destroyed, stopped, suspended, resumed, or interrupted. The static methods of Java’s Thread class, when implemented, are mapped to module-level functions.
這意味著悠轩,沒有一種方法直接對子線程進行暫停、重啟等攻泼。所以換了一個思路火架,使用threading.Event實現(xiàn)信號量作為暫停重啟的替代方案。當需要暫停時忙菠,使用threading.Event.wait()等待信號何鸡。當需要繼續(xù)時,使用threading.Event.set()發(fā)出信號只搁。這樣就解決了線程暫停音比、繼續(xù)的問題。
另一個問題是如何在隊列發(fā)生改變時觸發(fā)信號量的更改氢惋,不同的變量可以通過不同的方式實現(xiàn),事實上每一種變量都用不同的解決方案稽犁。
- 對于內(nèi)部實例化的示例可以通過繼承重新構建同名class的方式焰望,重寫修改內(nèi)容的方法實現(xiàn)觸發(fā)信號量。
- 外部變量則可以通過methodType的方式綁定重寫對于的方法已亥。
- 實例的基礎類型屬性則可以通過@property的方式在setter中加入信號觸發(fā)熊赖。
- ...
所以可以對上述示例進行如下修改。
import time
from collections import deque as dq
from random import Random
import threading
#重寫用于事件隊列的deque虑椎,使得對deque的變動會觸發(fā)信號量
class deque(dq):
#存儲信號的集合
sign = set()
#重寫append震鹉、appendleft俱笛、extend、extendleft方法
def append(self, *args, **kwargs):
dq.append(self, *args, **kwargs)
self.sign_set()
def appendleft(self, *args, **kwargs):
dq.appendleft(self, *args, **kwargs)
self.sign_set()
def extend(self, *args, **kwargs):
dq.extend(self, *args, **kwargs)
def extendleft(self, *args, **kwargs):
dq.extendleft(self, *args, **kwargs)
#觸發(fā)信號
def sign_set(self):
for s in self.sign:
s.set()
#添加一個信號
def add_sign(self,s):
self.sign.add(s)
class Example():
def __init__(self):
self.r = Random(time.time())
#添加一個信號量綁定給deque传趾,當發(fā)現(xiàn)deque中沒有值之后等待信號傳入
def worker_run(self, name, q):
sign = threading.Event()
q.add_sign(sign)
while True:
if q:
print("%s pop number: %s, %s numbers left" % (name, q.pop(), len(q)))
else:
sign.wait()
sign.clear()
def manager_run(self,worker_num):
queue_list = [deque() for i in range(worker_num)]
worker_list = [threading.Thread(target=self.worker_run,
args=["Thread-%s" % i, queue_list[i]])
for i in range(worker_num)]
for worker in worker_list:
worker.start()
while True:
i = 0
while i< self.r.randint(0,20):
queue_list[self.r.randint(0,worker_num - 1)].appendleft(
self.r.randint(1,100)
)
i += 1
time.sleep(self.r.random())
if __name__ == '__main__':
Example().manager_run(3)
通過這種方式最小限度的更改代碼迎膜,幾乎無代碼結構變動。就可以將輪詢觸發(fā)變?yōu)槭录?qū)動浆兰,在不影響原有業(yè)務的前提下降低系統(tǒng)的無意義負載磕仅。