rabbitmq direct reply-to 在springAMQP和python之間的使用

背景

公司的一個項目使用rabbitmq作為broker進行交互攻旦,并且數(shù)據(jù)的查詢方法使用RPC模式挤聘,RPC Client端使用java編寫并使用springAMQP包與rabbitmq交互献雅,在RPC Server端使用python的 pika包與rabbitmq交互玫鸟。兩端都使用標準官方例程缓熟,發(fā)現(xiàn)在Client端發(fā)送的消息可以被Server端接收并處理然后返回結果梢卸,但是Client端只會會收到一個null值走诞。

問題排查

1 理解傳統(tǒng)的RPC模式運行流程

傳統(tǒng)模式下 Client端向一個指定的隊列里推送消息,并聲明一個一次性排他隊列蛤高,然后將發(fā)送消息頭部的reply-to屬性的值設置為隊列的名字蚣旱,correlation_id屬性設置為一個隨機生成的值用于消息鑒定然后發(fā)送消息碑幅。在發(fā)送后Client端監(jiān)聽聲明的排他隊列,當收到消息后比對correaltiion_id,正確則處理消息斷開監(jiān)聽連接塞绿,然后此隊列被系統(tǒng)自動回收沟涨。 在Server端收到消息后處理消息然后將消息返回,返回的消息的routing-key設置為reply-to的值异吻,properties中設置correlation_id為收到的correlation_id值裹赴。這樣就完成一次RPC交互模式。
要解決今天這個問題我們還要知道幾個知識點:

  • 1當消息發(fā)送到exchange后如果沒有隊列接收此消息涧黄,那么此消息就會丟失篮昧。
  • 2 一次性的排他隊列在Client不在監(jiān)聽此隊列就會自動被rabbitmq刪除。

排查1 Client端收到的Null值從哪里來笋妥?

因為我是使用python寫RPC Server端并且我也不怎么會java代碼懊昨。……
所以這個null值從那里來我就無法從Client端下手春宣。那我們只能從Server端進行排查酵颁。(最后我認為是在java代碼編寫錯誤(是自己的代碼)的情況下 springAMQP返回的一個默認值)

排查2 Server端收到消息后是否正確的將消息返回

在Server端打印收到的message并打印此消息的header信息和body信息,看到在reply-to中就是Client端設置的隊列月帝。并且通過rabbitmq也看到了這條消息的返回躏惋。

排查3 觀察消息有沒有被推送回reply-to隊列

然后我在Server端收到消息后的callback函數(shù)的頭部大了斷點,接收到消息后Server端程序掛起嚷辅。此時我去查看reply-to中的隊列簿姨,發(fā)現(xiàn)其已經(jīng)不存在于rabbitmq中了。 由上面的傳統(tǒng)RPC模式我推斷出 可能是Client端發(fā)送代碼后沒有監(jiān)聽reply-to隊列造成隊列消失簸搞,然后Server端發(fā)送的消息因為沒有接收隊列而被丟棄扁位。此時我們基本已經(jīng)將問題鎖定在Client端了。但是Client端的代碼是按照rabbitmq官方給的例程書寫趁俊,應該是沒有問題的域仇。此時似乎陷入了僵局。

定位問題:Google大發(fā)加官方文檔

這時候我Google一下SpringAMQP框架的是如何寫RPC代碼寺擂?在一些帖子中我發(fā)現(xiàn)有的代碼會添加一個Listener的類暇务,但有的又不添加。我們假設他們都是可以運行的怔软。那么是什么原因會造成這種情況呢垦细?我第一個就是想到了版本問題。隨著版本的改變可能代碼也會發(fā)生變化挡逼。之后我就在SpringAMQP的官方文檔里面進行查找蝠检。果然被我找到了,官方文檔里面有這樣一段描述:

Starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to), the RabbitTemplate will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue. When using Direct reply-to, a reply-listener is not required and should not be configured.

springAMQP官方地址
翻譯一下大體意思就是在RabbitMQ3.4.0版本以后官方提供一種叫做Direct reply-to的方式來實現(xiàn)RPC(這種方式可以極大的提高RPC的性能挚瘟,因為他不需要每次請求Client端都要新申請一個隊列叹谁,之后我會再寫一篇來詳細介紹(翻譯 o(∩_∩)o 哈哈 )這個特性。并且在SpringAMQP version 1.4.1版本之后默認使用特性乘盖,看了一下服務器上的rabbitmq版本3.3.0 這個真的老果然不支持焰檩,SpringAMQP的版本果然也是高于這個版本,問題找到订框。開心 析苫, 但是怎么解決呢?
Direct reply-to 官方介紹

解決方案

一: 提升rabbitmq版本穿扳,并使兩端代碼適配direct reply-to 方式

  • 難點1 python的官網(wǎng)沒有給例程 衩侥,不過給了介紹也告訴了如何來實現(xiàn)
  • 難點2 服務器提升版本,已經(jīng)有業(yè)務跑在上面了矛物,我這種對rabbitmq的萌新對rabbitmq各版本升級后的改變并不是很了解茫死,估計是難說動領導換了。

針對難點2 我就不想了 不過難點1的我已經(jīng)寫出來python如何適配direct reply-to的代碼履羞。
更改都是在Client端峦萎,Server端還是可以保持不變。主要主機這幾個方面

  • 1 reply-to的名字更改為‘a(chǎn)mq.rabbitmq.reply-to’這條虛擬隊列忆首,你在rabbitmq的控制臺上是看不到這條隊列的爱榔。
  • 2 然后Client監(jiān)聽這條隊列的時候要設為為no-ack模式。

下面是根據(jù)官方python RPC代碼更改的 適配 Direct reply-to的python代碼
Client端 python代碼

# -*- coding:utf-8 -*-  
#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        # result = self.channel.queue_declare(exclusive=True)
        # self.callback_queue = result.method.queue
        # self.channel.basic_consume(self.on_response, no_ack=True,
        #                            queue=self.callback_queue)
        # 監(jiān)聽隊列為 amp.rabbitmq.reply-to 啟動no_ack 模式
        self.channel.basic_consume(self.on_response,
                                   queue='amq.rabbitmq.reply-to',
                                   no_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         # reply_to = self.callback_queue,
                                         # 更改了隊列名字
                                         reply_to='amq.rabbitmq.reply-to',
                                         correlation_id=self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

Server端代碼 沒有改動

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n  = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    # ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

解決辦法2 java代碼不使用默認的direct reply-to模式

這個辦法因為我不是寫java的所以我只能寫一些我在官方文檔里面理解的東西了糙及。就是當你不使用SpringAMQP的默認RPC模式的化需要增加Listener對象來監(jiān)聽自己的隊列详幽。

RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory);  
            rabbitTempete.setExchange(exchangeName);  
            rabbitTempete.setRoutingKey(topic);  
           //比官方文檔多的
            Queue  replyqQueue=replyQueue();  
            admin.declareQueue(replyqQueue); 
            rabbitTempete.setReplyQueue(replyqQueue);  
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
            container.setConnectionFactory(connectionFactory);  
            container.setQueues(replyqQueue);  
            container.setMessageListener(rabbitTempete);  
            container.start();  
            //比官方文檔多的停止
            Object  response=rabbitTempete.convertSendAndReceive(t);  

SpringAMQP書寫官方文檔
相比較要自己申請隊列自己監(jiān)聽。不過我也沒試過這段代碼就不知道能不能用了浸锨。

總結

這個問題基本得到很好的解決了唇聘。解決一個問題首先你要明白一個東西正常情況下是一種什么狀況,然后出了問題就從前往后揣钦,從后往前雳灾,從中往兩邊等等等。然后Google冯凹,或者官方文檔谎亩,官方論壇。我個人認為官方文檔真的是好東西宇姚。無數(shù)的淺坑的解決辦法都在官方文檔匈庭。當然深坑就不說了那就是論壇加能力加運氣才能排查出來的了。不過官方大多都是英文浑劳。真是愁人阱持,我 加強英語能力吧。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末魔熏,一起剝皮案震驚了整個濱河市衷咽,隨后出現(xiàn)的幾起案子鸽扁,更是在濱河造成了極大的恐慌,老刑警劉巖镶骗,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件桶现,死亡現(xiàn)場離奇詭異,居然都是意外死亡鼎姊,警方通過查閱死者的電腦和手機骡和,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來相寇,“玉大人慰于,你說我怎么就攤上這事』缴溃” “怎么了婆赠?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長战授。 經(jīng)常有香客問我页藻,道長,這世上最難降的妖魔是什么植兰? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任份帐,我火速辦了婚禮,結果婚禮上楣导,老公的妹妹穿的比我還像新娘废境。我一直安慰自己,他們只是感情好筒繁,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布噩凹。 她就那樣靜靜地躺著,像睡著了一般毡咏。 火紅的嫁衣襯著肌膚如雪驮宴。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天呕缭,我揣著相機與錄音堵泽,去河邊找鬼。 笑死恢总,一個胖子當著我的面吹牛迎罗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播片仿,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼纹安,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起厢岂,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤光督,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后咪笑,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體可帽,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年窗怒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蓄拣。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡扬虚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出球恤,到底是詐尸還是另有隱情辜昵,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布咽斧,位于F島的核電站堪置,受9級特大地震影響,放射性物質發(fā)生泄漏张惹。R本人自食惡果不足惜舀锨,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宛逗。 院中可真熱鬧坎匿,春花似錦、人聲如沸雷激。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽屎暇。三九已至承桥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間根悼,已是汗流浹背凶异。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留番挺,地道東北人唠帝。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像玄柏,于是被迫代替她去往敵國和親襟衰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理粪摘,服務發(fā)現(xiàn)瀑晒,斷路器绍坝,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 異步RPC(Remote procedure call) Server:提供服務的服務,即RPC模型中的Serve...
    二月_春風閱讀 3,876評論 0 6
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器苔悦。支持消息的持久化轩褐、事務、擁塞控...
    jiangmo閱讀 10,350評論 2 34
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件玖详,它能夠在應用之間提供可靠的消息傳輸把介。在易用性,擴展性蟋座,高可用性...
    點融黑幫閱讀 2,991評論 3 41
  • 獻給剛剛經(jīng)歷完高考的孩子們 若將我們的人生比作一個時鐘向臀,那么我們可以清楚的看到原來我們的人生才只走了短短的一程巢墅,未...
    燭下的眷戀閱讀 765評論 0 1