1. Transport
傳輸消息的工具,比如RabbitMQ等一些常見的消息隊列,通常獲取的途徑是通過用戶直接配置transport_url
調用接口來獲取:
import oslo_messaging
transport = oslo_messaging.get_transport(cfg, url=None, **kwargs)
2. Target
用來標識消息的目的地蛾号,會對消息進行封裝,決定消息將發(fā)送到何處或者其他Server將會監(jiān)聽什么類型的消息涯雅,它的對象類的定義如下:
class oslo_messaging.Target(exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None)
而對于不同封裝在Target的子類消息對象可能需要的初始化參數不一樣:
Target類型 | Required Params | Optional Params |
---|---|---|
RPC Server Target | topic鲜结、server | exchange |
RPC Endpoint | namespace、version | |
RPC Client | topic | all other attributes |
Notification Server Target | topic | exchange, all other attributes ignored |
Notifier Target | topic | exchange, all other attributes ignored |
3. RPC
3.1 RPC Server
一個RPC Server的創(chuàng)建需要依賴至少三個部分:Transport、Target和Endpoints(這里的Endpoints可以理解成一個方法的集合類精刷,這些方法會用來被RPC Client遠程調用)拗胜,下面是一個RPC服務的類偽代碼實現,包括RPC Server的創(chuàng)建和啟動等:
class test1Endpoint(object):
def test_func01(self, ctxt, *args):
pass
class test2Endpoint(object):
def test_func02(self, ctxt, *args):
pass
class RPCService(os_service.Service):
def __init__(self):
super(RPCService, self).__init__()
endpoints = [test1Endpoint(), test2Endpoint()]
transport = oslo_messaging.get_transport(conf, url=None)
target = osl_messaging.Target(server='host1', topic=topic)
self.rpc_server = oslo_messaging.get_rpc_server(transport, target, endpoints, executor='eventlet')
def start(self):
super(RPCService, self).start()
self.rpc_server.start()
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.rpc_server.stop()
super(RPCService, self).stop()
3.2 RPC Client
調用遠程RPC Server方法的類贬养,如下是實現一個RPC Client的偽代碼:
class RPCClient(object):
def __init__(self):
transport = oslo_messaging.get_transport(conf, url=None)
target = osl_messaging.Target(topic=topic)
self.rpc_client = oslo_messing.RPCClient(transport, target)
def func01(self, ctxt, *args):
return self.rpc_client.call(ctxt, 'test_func01', args=args)
# self.rpc_client.cast(ctxt, 'test_func01', args=args) # call等待直至返回一個結果挤土,cast不等待立即返回
def func02(self, ctxt, *args):
cctxt = self.rpc_client.prepare(server='host01', version='4.0')
return cctxt.call(ctxt, 'test_func02', args=args)
4. Notification
4.1 Notifier
功能:發(fā)送通知消息,消息的格式如下:
{'message_id': six.text_type(uuid.uuid4()),
'publisher_id': 'compute.host1',
'timestamp': timeutils.utcnow(),
'priority': 'WARN',
'event_type': 'compute.create_instance',
'payload': {'instance_id': 12, ... }}
構建Notifier的類:
class oslo_messaging.Notifier(transport, publisher_id=None, driver=None, serializer=None, retry=None, topics=None)
具體實現流程:創(chuàng)建或者獲取Notifier --> Notifier發(fā)送不同級別的消息误算,下面是一個簡單實現的偽代碼:
transport = oslo_messaging.get_transport(conf)
notifier = oslo_messaging.Notifier(transport, publisher_id='host1', driver='messagingv2', topics=['notifications'])
# notifier = notifier.prepare(publisher_id='host0') # 可以通過這種方式對Notifier的某些屬性進行覆蓋仰美,跟RPC Client的prepare功能相同
notifier.info(ctxt, 'compute.instance.create.start', payload) # 第二個參數是event_type,payload則是跟event_type對應任務相關的一些有用數據信息
4.2 Notification Driver
常見的Notification Driver有l(wèi)og儿礼、messaging咖杂、messagingv2等,比如常用消息隊列來發(fā)送通知消息蚊夫,而它具體driver出自oslo_messaging.notify.messaging
4.3 Notification Listener
功能:接收和處理通知消息诉字,這一部分一般至少需要兩個部分的實現:Notification Listener創(chuàng)建和Endpoints(會實現一些方法,跟Notifier
端發(fā)送消息的級別對應知纷,實現獲取通知消息后如何處理及留存)壤圃,如下是一個簡單的偽代碼實現:
class Endpoint(object):
def __init__(self):
pass
def info(self, ctxt, publisher_id, event_type, payload, metadata):
pass
class NotificationService(os_service.Service)
def __init__(self):
super(NotificationService, self).__init__()
self.listeners = []
def start(self):
super(NotificationService, self).start()
transport = oslo_messaging.get_transport(conf)
target = oslo_messaging.Target(topic='notifications')
endpoints = [Endpoint()]
listener = oslo_messaging.get_notification_listener(transport, target, endpoints, executor='eventlet', pool='pool0')
listener.start()
self.listeners.append(listener)
self.tg.add_timer(604800, lambda: None)
def stop(self):
if self.listeners:
map(lambda: x: x.stop(), self.listeners)
super(NotificationService, self).stop()