4.發(fā)送和接受消息

文章參考:Rabbit實戰(zhàn)指南

發(fā)送消息

? 如果要發(fā)送一個消息,可以使用Channel類的basicPublish方法武鲁,比如發(fā)送一條內(nèi)容為“Hello World”的消息蝠检,參考如下:

byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

為了更好地可控制發(fā)送叹谁,可以使用mandatory這個參數(shù),或者可以發(fā)送一些特定屬性的信息:

channel.basicPublish(exchangeName,routingKey,mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

下面這行代碼發(fā)送了一條消息憔涉,這條消息的投遞模式(delivery mode)設(shè)置為2析苫,即消息會被持久化(存入磁盤中)在服務(wù)器中衩侥。同時這條消息的優(yōu)先級(priority)設(shè)置為1,content-type為“text/plain”跪但÷臀可以自己設(shè)定消息的屬性:

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .contentType("text/plain")
                     .deliveryMode(2)
                     .priority(1)
                     .userId("hidden")
                     .build(),
                     messageBodyBytes
                    );

也可以發(fā)送一條帶有headers的消息:

Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
                    new AMQP.BasicProperties.Builder()
                    .headers(headers)
                     .build(),
                     messageBodyBytes
                    );

也可以發(fā)送一條帶有過期時間(expiration)的消息

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .expiration("6000")
                     .build(),
                     messageBodyBytes
                    );

basicPublish的重載方法:

void basicPublish(String exchange,String routingKey,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,boolean imediate,
                  BasicProperties props,
                  byte[] body) throws IOException;

具體參數(shù)解釋如下:

  • exchange:交換器的名稱被环,指明消息需要發(fā)送到哪個交換器中蛤售。如果設(shè)置為空字符串悴能,則消息會發(fā)送到RabbitMQ默認(rèn)的交換器中雳灾。
  • routingKey:路由鍵,交換器根據(jù)路由鍵將消息存儲到相應(yīng)的隊列炒嘲。
  • props:消息的基本屬性集匈庭,其包含14個屬性成員阱持,分別有contentType、contentEncoding、header(Map<Stirng,Object>)蒜绽、deliveryMode躲雅、priority骡和。correlationId、replyTo慰于、expiration东囚、messageId、timestamp桨嫁、type、urserId璃吧、appId畜挨、clusterId噩凹。
  • byte[] body:消息體(payload),真正需要發(fā)送的消息逮刨。
  • mandatory和immediate

消費消息

RabbitMQ的消費模式分為兩種:推(Push)模式和拉(Pull)模式堵泽。推模式采用Basic.Consume進(jìn)行消費,而拉模式則調(diào)用Basic.Get進(jìn)行消費睬愤。

推模式

在推模式中尤辱,可以通過持續(xù)訂閱的方式來消費信息,使用到的相關(guān)類有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;

接收消息一般通過實現(xiàn)Consumer接口或者繼承DefaultConsumer類來實現(xiàn)啥刻。當(dāng)調(diào)用與Consumer類來實現(xiàn)可帽。當(dāng)調(diào)用與Consumer相關(guān)的API方法窗怒,不同的訂閱采用不同的消費者標(biāo)簽來區(qū)分彼此,在同一個Channel中的消費者也需要通過唯一的消費者標(biāo)簽以作區(qū)別努隙。代碼如下:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
        new DefaultConsumer(channel){
            @Override
            public void handleDevlivery(String consumerTag,
                                       Envelop envelope,
                                  AMQP.BasicProperties properties,
                                        byte[] bode
                                       )throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                //(process the message components here ...)
                channel.basicAck(deliveryTag,false);
            }
                    }
                    )

上面代碼顯式地設(shè)置autoAck為false荸镊,然后在接受到消息之后進(jìn)行顯式ack操作(channel.basicAck)躬存,對應(yīng)消費者來說這個設(shè)置是非常必要的舀锨,可以防止消息不必要的丟失。

? Channel類中basicConsume方法有如下幾種形式:

  1. String basicConsume(String queue,Consumer callback) throws IOException

  2. String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException

  3. String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException

  4. String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException

  5. String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException

    對應(yīng)參數(shù)說明:

    • queue:隊列的名稱
    • autoAck:設(shè)置是否自動確認(rèn)盾剩。建議設(shè)置為false
    • consumerTag:消費者標(biāo)簽告私,用來區(qū)分多個消費者德挣;
    • noLocal:設(shè)置為true則表示不能將同一個Connection中生產(chǎn)者發(fā)送的消息傳送給這個Connection中的消費者
    • exclusive:設(shè)置是否排他快毛;
    • arguments:設(shè)置消費者的其他參數(shù)
    • callback:設(shè)置消費者的回調(diào)函數(shù)。用來處理RabbitMQ推送過來的消息屯掖,比如DefaultConsumer襟衰,使用時需要客戶端重寫其中的方法。

    對于消費者客戶端來說绍坝,重寫handleDelivery方法是十分方便的。更復(fù)雜的消費者客戶端會重寫更多的方法椎咧,具體如下:

    void handleConsumeOk(String consumerTag);
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag) throws IOException;
    void handleShutdownSignal(String consumerTag,ShutdowSignalException sig);
    void handleRecoverOk(String consumerTag);
    

    比如handleShutdownSignal方法把介,當(dāng)Channel或者Connection關(guān)閉的時候回調(diào)用。再者脚牍,handleConsumeOk方法會在其他方法之前調(diào)用巢墅,返回消費者標(biāo)簽。

    重寫handleCancelOk和handleCancel方法驯遇,這樣消費端可以再顯示地或者隱式地取消訂閱的時候調(diào)用妹懒。也可以通過channel.basicCancel方法來顯式地取消一個消費者的訂閱:

    ? channel.basicCancel(consumerTag)

    注意上面這行代碼會首先觸發(fā)handleConsumerOk方法双吆,之后觸發(fā)handleDelivery方法,最后才觸發(fā)handleCanceOk方法匾竿。

 和生產(chǎn)者一樣岭妖,消費者客戶端同樣需要考慮線程安全的問題反璃。消費者客戶端的這些callback會被分配到與Channel不同的線程池上,這意味著消費者客戶端可以安全地調(diào)用這些阻塞方法斋攀,如channe.queueDeclare梧田、channel.basicCancel等。

 每個Channel都擁有自己獨立的線程鹉梨。最常用的做法是一個Channel對應(yīng)一個消費者。意味著消費者彼此之間沒有任何關(guān)聯(lián)晌坤。也可以在一個Channel中維持多個消費者泡仗,但是要注意一個問題猜憎,如果Channel中的一個消費者一直在運行胰柑,那么其他消費者的callback會被“耽擱”爬泥。

拉模式

通過channel.basicGet方法可以單條的獲取消息,其返回值是GetResponse踩官。Channel類的basicGet方法沒有其他重載方法境输,只有:

 GetResponse basicGet(String queue,boolean autoAck) throws IOException;

其中queue代表隊列名稱嗅剖,如果設(shè)置autoAck為false信粮,那么同樣需要調(diào)用channel.basicAck來確認(rèn)消息被成功接收黔攒。

拉模式的關(guān)鍵代碼如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:

Basic.Consume將信道(Channel)置為接收模式督惰,知道取消隊列的訂閱為止赏胚。在接收模式期間辞友,RabbitMQ會不斷地推送消息給消費者,當(dāng)然推送消息的個數(shù)還是會受到Basic.Qos的限制戳晌。

如果只想從隊列獲取單挑消息而不是持續(xù)訂閱沦偎,建議使用Basic.Get進(jìn)行消費。但不能將Basic.Get放在一個循環(huán)里來代替Basic.Consume,這樣做會嚴(yán)重影響RabbitMQ的性能豪嚎。消費者理應(yīng)使用Basic.Consume方法侈询。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末扔字,一起剝皮案震驚了整個濱河市革为,隨后出現(xiàn)的幾起案子舵鳞,更是在濱河造成了極大的恐慌,老刑警劉巖抛虏,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異错忱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)挂据,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門以清,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人崎逃,你說我怎么就攤上這事掷倔。” “怎么了个绍?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵勒葱,是天一觀的道長浪汪。 經(jīng)常有香客問我,道長凛虽,這世上最難降的妖魔是什么死遭? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮凯旋,結(jié)果婚禮上呀潭,老公的妹妹穿的比我還像新娘钠署。我一直安慰自己,他們只是感情好该面,可當(dāng)我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布傍菇。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪袜腥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音辽社,去河邊找鬼。 笑死失息,一個胖子當(dāng)著我的面吹牛守伸,可吹牛的內(nèi)容都是我干的见芹。 我是一名探鬼主播和二,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼甲锡,長吁一口氣:“原來是場噩夢啊……” “哼搔体!你這毒婦竟也來了缩多?” 一聲冷哼從身側(cè)響起梁钾,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拇勃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后榆骚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體职恳,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年斤寂,在試婚紗的時候發(fā)現(xiàn)自己被綠了罗侯。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讲弄。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡钳枕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出昔瞧,到底是詐尸還是另有隱情稍坯,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏盼理。R本人自食惡果不足惜鸽粉,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一偏友、第九天 我趴在偏房一處隱蔽的房頂上張望舞竿。 院中可真熱鬧,春花似錦较鼓、人聲如沸畜伐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春如失,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背动雹。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工姊氓, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留禾唁,地道東北人无切。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像辱士,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子塔拳,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,941評論 2 355