代碼
# -.- coding:utf-8 -.-
# __author__ == 'zhengtong'
from __future__ import print_function
def exception_wrapper(func):
def wrapper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
if result is not None:
return result
except Exception:
import sys, traceback
ex_type, ex, tb = sys.exc_info()
traceback.print_exception(ex_type, ex, tb)
return False
return True
return wrapper
class Producer(object):
def __init__(self):
self.topic = {}
@exception_wrapper
def create_topic(self, topic):
if topic not in self.topic:
self.topic[topic] = []
@exception_wrapper
def publish(self, topic, data):
self.create_topic(topic)
if not self.topic[topic]:
return '當前主題沒有任何觀察者/訂閱者/消費者'
self.notify_observers(topic, data)
@exception_wrapper
def subscribe(self, topic, *observers):
self.create_topic(topic)
for observer in observers:
if observer not in self.topic[topic]:
self.topic[topic].append(observer)
@exception_wrapper
def unsubscribe(self, topic, *observers):
l = self.topic[topic]
for observer in observers:
l.pop(l.index(observer))
@exception_wrapper
def notify_observers(self, topic, data):
for observer in self.topic[topic]:
observer.get_message(data)
class SendMailObserver(object):
def get_message(self, data):
print('send mail to: ', data)
class WriteMailObserver(object):
def get_message(self, data):
print('write mail to: ', data)
class ProcessMailObserver(object):
def get_message(self, data):
print('ProcessMail mail: ', data)
if __name__ == '__main__':
# 啟動Producer服務
producer = Producer()
# 創(chuàng)建主題
producer.create_topic('sss')
producer.create_topic('bbb')
# 注冊觀察者/訂閱者/消費者
send_mail_observer = SendMailObserver()
write_mail_observer = WriteMailObserver()
Process_mail_observer = ProcessMailObserver()
producer.subscribe('send_mail', *[send_mail_observer,
write_mail_observer,
Process_mail_observer])
# 查看每個隊列有多少的consumer
print('producer.topic: ', producer.topic)
# 發(fā)布消息
producer.publish('send_mail', '330356463@qq.com')
# 移除觀察者/訂閱者/消費者
producer.unsubscribe('send_mail', write_mail_observer)
# 查看每個隊列有多少的consumer
print(producer.topic)
# 查看現(xiàn)在總共有幾個頻道
print(producer.topic.keys())
# 顯示結果
producer.topic: {'bbb': [], 'sss': [], 'send_mail': [<__main__.SendMailObserver object at 0x03492F50>, <__main__.WriteMailObserver object at 0x03492F70>, <__main__.ProcessMailObserver object at 0x03492F90>]}
send mail to: 330356463@qq.com
write mail to: 330356463@qq.com
ProcessMail mail: 330356463@qq.com
{'bbb': [], 'sss': [], 'send_mail': [<__main__.SendMailObserver object at 0x03492F50>, <__main__.ProcessMailObserver object at 0x03492F90>]}
['bbb', 'sss', 'send_mail']
?
?
模式總結
觀察者模式強調(diào)的是松耦合掘鄙,通過程序之間的強約定弱依賴實現(xiàn)通用場景的<發(fā)布/訂閱>模型。
核心理念
當數(shù)據(jù)發(fā)生變化時執(zhí)行回調(diào)函數(shù)(provide a callback for notification of events/changes to data).
模式類型
行為模式
設計原則
- 為了交互對象之間的松耦合設計而努力济炎。
參考
- [x] Head First設計模式
- [x] Head First設計模式Python版源碼
- [x] patterns-idioms
- [x] py-patterns