Openstack基礎組件之oslo.messaging

1. Transport

傳輸消息的工具,比如RabbitMQ等一些常見的消息隊列,通常獲取的途徑是通過用戶直接配置transport_url調用接口來獲取:

import oslo_messaging
transport = oslo_messaging.get_transport(cfg, url=None, **kwargs)

2. Target

用來標識消息的目的地蛾号,會對消息進行封裝,決定消息將發(fā)送到何處或者其他Server將會監(jiān)聽什么類型的消息涯雅,它的對象類的定義如下:
class oslo_messaging.Target(exchange=None, topic=None, namespace=None, version=None, server=None, fanout=None, legacy_namespaces=None)

而對于不同封裝在Target的子類消息對象可能需要的初始化參數不一樣:

Target類型 Required Params Optional Params
RPC Server Target topic鲜结、server exchange
RPC Endpoint namespace、version
RPC Client topic all other attributes
Notification Server Target topic exchange, all other attributes ignored
Notifier Target topic exchange, all other attributes ignored

3. RPC

3.1 RPC Server

一個RPC Server的創(chuàng)建需要依賴至少三個部分:Transport、Target和Endpoints(這里的Endpoints可以理解成一個方法的集合類精刷,這些方法會用來被RPC Client遠程調用)拗胜,下面是一個RPC服務的類偽代碼實現,包括RPC Server的創(chuàng)建和啟動等:

class test1Endpoint(object):
  def test_func01(self, ctxt, *args):
    pass

class test2Endpoint(object):
  def test_func02(self, ctxt, *args):
    pass

class RPCService(os_service.Service):
  def __init__(self):
    super(RPCService, self).__init__()
    endpoints = [test1Endpoint(), test2Endpoint()]
    transport = oslo_messaging.get_transport(conf, url=None)
    target = osl_messaging.Target(server='host1', topic=topic)
    self.rpc_server = oslo_messaging.get_rpc_server(transport, target, endpoints, executor='eventlet')

  def start(self):
    super(RPCService, self).start()
    self.rpc_server.start()
    self.tg.add_timer(604800, lambda: None)

  def stop(self):
    self.rpc_server.stop()
    super(RPCService, self).stop()

3.2 RPC Client

調用遠程RPC Server方法的類贬养,如下是實現一個RPC Client的偽代碼:

class RPCClient(object):
  def __init__(self):
    transport = oslo_messaging.get_transport(conf, url=None)
    target = osl_messaging.Target(topic=topic)
    self.rpc_client = oslo_messing.RPCClient(transport, target)

  def func01(self, ctxt, *args):
    return self.rpc_client.call(ctxt, 'test_func01', args=args)
    # self.rpc_client.cast(ctxt, 'test_func01', args=args)  # call等待直至返回一個結果挤土,cast不等待立即返回

  def func02(self, ctxt, *args):
    cctxt = self.rpc_client.prepare(server='host01', version='4.0')
    return cctxt.call(ctxt, 'test_func02', args=args)

4. Notification

4.1 Notifier

功能:發(fā)送通知消息,消息的格式如下:

{'message_id': six.text_type(uuid.uuid4()),
 'publisher_id': 'compute.host1',
 'timestamp': timeutils.utcnow(),
 'priority': 'WARN',
 'event_type': 'compute.create_instance',
 'payload': {'instance_id': 12, ... }}

構建Notifier的類:
class oslo_messaging.Notifier(transport, publisher_id=None, driver=None, serializer=None, retry=None, topics=None)

具體實現流程:創(chuàng)建或者獲取Notifier --> Notifier發(fā)送不同級別的消息误算,下面是一個簡單實現的偽代碼:

transport = oslo_messaging.get_transport(conf)
notifier = oslo_messaging.Notifier(transport, publisher_id='host1', driver='messagingv2', topics=['notifications'])
# notifier = notifier.prepare(publisher_id='host0')  # 可以通過這種方式對Notifier的某些屬性進行覆蓋仰美,跟RPC Client的prepare功能相同
notifier.info(ctxt, 'compute.instance.create.start', payload)  # 第二個參數是event_type,payload則是跟event_type對應任務相關的一些有用數據信息

4.2 Notification Driver

常見的Notification Driver有l(wèi)og儿礼、messaging咖杂、messagingv2等,比如常用消息隊列來發(fā)送通知消息蚊夫,而它具體driver出自oslo_messaging.notify.messaging

4.3 Notification Listener

功能:接收和處理通知消息诉字,這一部分一般至少需要兩個部分的實現:Notification Listener創(chuàng)建和Endpoints(會實現一些方法,跟Notifier端發(fā)送消息的級別對應知纷,實現獲取通知消息后如何處理及留存)壤圃,如下是一個簡單的偽代碼實現:

class Endpoint(object):
  def __init__(self):
    pass

  def info(self, ctxt, publisher_id, event_type, payload, metadata):
    pass

class NotificationService(os_service.Service)
  def __init__(self):
    super(NotificationService, self).__init__()
    self.listeners = []

  def start(self):
    super(NotificationService, self).start()
    transport = oslo_messaging.get_transport(conf)
    target = oslo_messaging.Target(topic='notifications')
    endpoints = [Endpoint()]
    listener = oslo_messaging.get_notification_listener(transport, target, endpoints, executor='eventlet', pool='pool0')
    listener.start()
    self.listeners.append(listener)
    self.tg.add_timer(604800, lambda: None)

  def stop(self):
    if self.listeners:
      map(lambda: x: x.stop(), self.listeners)
    super(NotificationService, self).stop()

5. 參考資料

Openstack oslo_messaging官方文檔

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市琅轧,隨后出現的幾起案子伍绳,更是在濱河造成了極大的恐慌,老刑警劉巖乍桂,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件冲杀,死亡現場離奇詭異,居然都是意外死亡睹酌,警方通過查閱死者的電腦和手機权谁,發(fā)現死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來憋沿,“玉大人旺芽,你說我怎么就攤上這事》模” “怎么了甥绿?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長则披。 經常有香客問我共缕,道長,這世上最難降的妖魔是什么士复? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任图谷,我火速辦了婚禮翩活,結果婚禮上,老公的妹妹穿的比我還像新娘便贵。我一直安慰自己菠镇,他們只是感情好,可當我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布承璃。 她就那樣靜靜地躺著利耍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪盔粹。 梳的紋絲不亂的頭發(fā)上隘梨,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天,我揣著相機與錄音舷嗡,去河邊找鬼轴猎。 笑死,一個胖子當著我的面吹牛进萄,可吹牛的內容都是我干的捻脖。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼中鼠,長吁一口氣:“原來是場噩夢啊……” “哼可婶!你這毒婦竟也來了?” 一聲冷哼從身側響起援雇,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤矛渴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后熊杨,有當地人在樹林里發(fā)現了一具尸體曙旭,經...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡盗舰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年晶府,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片钻趋。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡川陆,死狀恐怖,靈堂內的尸體忽然破棺而出蛮位,到底是詐尸還是另有隱情较沪,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布失仁,位于F島的核電站尸曼,受9級特大地震影響,放射性物質發(fā)生泄漏萄焦。R本人自食惡果不足惜控轿,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一冤竹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茬射,春花似錦鹦蠕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至刚梭,卻和暖如春肠阱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背望浩。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工辖所, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人磨德。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓缘回,卻偏偏與公主長得像,于是被迫代替她去往敵國和親典挑。 傳聞我的和親對象是個殘疾皇子酥宴,可洞房花燭夜當晚...
    茶點故事閱讀 45,860評論 2 361