http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%96%87%E6%96%87%E6%A1%A3/#Message-消息體
1.前言
Spring AMQP項(xiàng)目是用于開發(fā)AMQP的解決方案换淆。 我們提供一個(gè)“模板”作為發(fā)送和接收消息的抽象绅作。我們還為普通POJO進(jìn)行提供消息處理支持暂题。這些庫(kù)促進(jìn)AMQP資源的管理,同時(shí)支持使用依賴注入和聲明式配置兼雄。 在所有情況下熊泵,您將看到與Spring Framework中的JMS支持的相似之處虑鼎。有關(guān)其他項(xiàng)目相關(guān)信息瘤泪,請(qǐng)?jiān)L問Spring AMQP項(xiàng)目主頁(yè)谋右。
該幫助文檔的第一部分是Spring AMQP以及基本概念和一些代碼段的概述硬猫,可以盡快幫助您快速使用。
五分鐘快速使用Spring AMQP.
先決條件:安裝并運(yùn)行RabbitMQ改执。然后在您的項(xiàng)目中加入如下MAVEN依賴:
org.springframework.amqp
spring-rabbit
1.7.2.RELEASE
雖然默認(rèn)的Spring Framework版本依賴關(guān)系為4.3.x啸蜜,但Spring AMQP通常與早期版本的Spring Framework兼容。
基于注解的監(jiān)聽器和RabbitMessagingTemplate需要Spring Framework 4.1或更高版本辈挂。
最低的amqp-client的java客戶端庫(kù)版本是4.0.0盔性。
注意這是指java客戶端庫(kù);
一般來說,它將適用于較舊的代理版本呢岗。
使用簡(jiǎn)單同步的Java發(fā)送和接收消息:
ConnectionFactory connectionFactory =newCachingConnectionFactory();
AmqpAdmin admin =newRabbitAdmin(connectionFactory);
admin.declareQueue(newQueue("myqueue"));
AmqpTemplate template =newRabbitTemplate(connectionFactory);
template.convertAndSend("myqueue","foo");
String foo = (String) template.receiveAndConvert("myqueue");
請(qǐng)注意,Java Rabbit客戶端中也有一個(gè)ConnectionFactory后豫。
我們?cè)谏厦娴拇a中使用了Spring抽象的ConnectionFactory悉尾。
我們使用Rabbit的默認(rèn)exchange(因?yàn)榘l(fā)送中沒有指定),并且所有隊(duì)列默認(rèn)綁定到默認(rèn)exchange(因此我們可以在發(fā)送中使用隊(duì)列名稱作為routing key)挫酿。
這些行為在AMQP規(guī)范中定義构眯。
上述例子在XML中的配置形式如下
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
? ? ? ? ? http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
? ? ? ? ? http://www.springframework.org/schema/beans
? ? ? ? ? http://www.springframework.org/schema/beans/spring-beans.xsd">
ApplicationContext context =
newGenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue","foo");
String foo = (String) template.receiveAndConvert("myqueue");
默認(rèn)情況下,聲明會(huì)自動(dòng)查找類型為Queue早龟,Exchange和Binding的bean惫霸,并將他們綁定,因此不需要在簡(jiǎn)單的Java程序中明確使用該bean葱弟。在XML模式中配置組件的屬性有很多選項(xiàng) - 您可以使用XML編輯器的自動(dòng)完成功能來瀏覽它們并查看其文檔壹店。
相同的代碼在java代碼中的另一種配置
ApplicationContext context =
newAnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue","foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
publicclassRabbitConfiguration{
@Bean
publicConnectionFactoryconnectionFactory(){
returnnewCachingConnectionFactory("localhost");
? ? }
@Bean
publicAmqpAdminamqpAdmin(){
returnnewRabbitAdmin(connectionFactory());
? ? }
@Bean
publicRabbitTemplaterabbitTemplate(){
returnnewRabbitTemplate(connectionFactory());
? ? }
@Bean
publicQueuemyQueue(){
returnnewQueue("myqueue");
? ? }
}
參考http://docs.spring.io/spring-amqp/reference/htmlsingle/#whats-new
幫助文檔的一部分詳細(xì)介紹了Spring AMQP的各種組件。
主要章節(jié)介紹開發(fā)AMQP應(yīng)用程序的核心教程芝加。
本部分還包括有關(guān)示例應(yīng)用程序硅卢。
在本章中,我們將探討使用Spring AMQP開發(fā)應(yīng)用程序的基本組件的接口和類。
Spring AMQP由幾個(gè)模塊組成将塑,每個(gè)模塊由發(fā)布中的JAR表示脉顿。這些模塊有:spring-amqp和spring-rabbit。 spring-amqp模塊包含org.springframework.amqp.core心 AMQP”model”的類点寥。我們的目的是提供不依賴于任何特定AMQP代理實(shí)現(xiàn)或客戶端庫(kù)的泛型抽象艾疟。最終用戶代碼在供應(yīng)商實(shí)現(xiàn)中將更加便攜,因?yàn)樗荒茚槍?duì)抽象層進(jìn)行開發(fā)敢辩。這些抽象然后由代理特定的模塊實(shí)現(xiàn)汉柒,例如spring-rabbit。目前只有一個(gè)RabbitMQ實(shí)現(xiàn);但是除了RabbitMQ之外责鳍,使用Apache Qpid的.NET中的抽象已被驗(yàn)證碾褂。由于AMQP原則上在協(xié)議級(jí)別運(yùn)行,所以RabbitMQ客戶端可以與支持相同協(xié)議版本的任何代理一起使用历葛,但目前我們還沒有測(cè)試任何其他代理正塌。
這里的概述假設(shè)您已經(jīng)熟悉AMQP規(guī)范的基礎(chǔ)知識(shí)。如果沒有,請(qǐng)查看第5章“其他資源”中列出的資源。
0-8和0-9-1 AMQP規(guī)范不定義Message類或接口最域。
相反,當(dāng)執(zhí)行諸如basicPublish()的操作時(shí)鸠天,內(nèi)容作為字節(jié)數(shù)組參數(shù)傳遞,并且附加屬性作為單獨(dú)的參數(shù)傳入帐姻。
Spring AMQP將Message類定義為更普通的AMQP域模型稠集。
Message類的目的是簡(jiǎn)單地將主體和屬性封裝在單個(gè)實(shí)例中,以便API可以更簡(jiǎn)單饥瓷。
publicclassMessage{
privatefinalMessageProperties messageProperties;
privatefinalbyte[] body;
publicMessage(byte[] body, MessageProperties messageProperties){
this.body = body;
this.messageProperties = messageProperties;
? ? }
publicbyte[] getBody() {
returnthis.body;
? ? }
publicMessagePropertiesgetMessageProperties(){
returnthis.messageProperties;
? ? }
}
MessageProperties接口定義了幾個(gè)常見的屬性剥纷,如messageId,timestamp呢铆,contentType等等晦鞋。
這些屬性也可以通過調(diào)用setHeader(String key,Object value)方法來擴(kuò)展用戶定義的頭屬性棺克。
Exchange接口表示AMQP Exchange悠垛,這是消息生產(chǎn)者發(fā)送到的。
代理的虛擬主機(jī)中的每個(gè)Exchange將具有唯一的名稱以及一些其他屬性:
publicinterfaceExchange{
StringgetName();
StringgetExchangeType();
booleanisDurable();
booleanisAutoDelete();
? ? Map getArguments();
}
如您所見娜谊,Exchange還具有由ExchangeTypes中定義的常量表示的類型确买。
基本類型有:Direct, Topic, Fanout, 和 Headers。
在核心包中因俐,您將找到每種類型的Exchange接口的實(shí)現(xiàn)拇惋。
這些Exchange類型的行為在如何處理與隊(duì)列綁定方面有所不同周偎。
例如抹剩,Direct exchange允許隊(duì)列被固定的routing key(通常是隊(duì)列的名稱)綁定撑帖。
Topic exchange支持綁定與路由模式,可能包括*和#通配符
AMQP規(guī)范還要求任何代理提供沒有名稱的“默認(rèn)”Exchange澳眷。
所有被聲明的隊(duì)列將被綁定到該默認(rèn)的Exchange名稱作為routing key胡嘿。
您將在3.1.4節(jié)“AmqpTemplate”中了解Spring AMQP中默認(rèn)Exchange的使用情況。
Queue類表示消息使用者接收消息的組件钳踊。
像各種Exchange類一樣衷敌,我們的實(shí)現(xiàn)意圖是這個(gè)核心AMQP類型的抽象表示。
publicclassQueue{
privatefinalString name;
privatevolatilebooleandurable;
privatevolatilebooleanexclusive;
privatevolatilebooleanautoDelete;
privatevolatileMap arguments;
/**
? ? * 隊(duì)列是持久的拓瞪,非排他的和非自動(dòng)刪除的缴罗。
? ? *
*@paramname 隊(duì)列名
? ? */
publicQueue(String name){
this(name,true,false,false);
? ? }
// Getters and Setters omitted for brevity
}
請(qǐng)注意,構(gòu)造函數(shù)采用隊(duì)列名稱祭埂。根據(jù)實(shí)現(xiàn)面氓,管理模板可以提供用于生成唯一命名的隊(duì)列的方法。這樣的隊(duì)列可以用作reply-to地址或其他臨時(shí)情況蛆橡。因此舌界,自動(dòng)生成的隊(duì)列的exclusive和autoDelete屬性都將設(shè)置為true。
有關(guān)使用命名空間支持(包括隊(duì)列參數(shù))聲明隊(duì)列的信息泰演,請(qǐng)參見第3.1.10節(jié)“配置代理”中的隊(duì)列部分呻拌。
鑒于生產(chǎn)者發(fā)送到Exchange并且消費(fèi)者從隊(duì)列接收到消息,將隊(duì)列連接到exchange的綁定對(duì)于通過消息傳遞連接這些生產(chǎn)者和消費(fèi)者至關(guān)重要睦焕。
在Spring AMQP中藐握,我們定義一個(gè)Binding類來表示這些連接。
我們來看看將隊(duì)列綁定到交換機(jī)的基本選項(xiàng)垃喊。
您可以使用固定的routing key將隊(duì)列綁定到DirectExchange趾娃。
newBinding(someQueue, someDirectExchange,"foo.bar")
您可以使用路由模式將隊(duì)列綁定到TopicExchange。
newBinding(someQueue, someTopicExchange,"foo.*")
您可以使用無routing key將Queue綁定到FanoutExchange缔御。
newBinding(someQueue, someFanoutExchange)
我們還提供了一個(gè)BindingBuilder進(jìn)行鏈?zhǔn)斤L(fēng)格的構(gòu)建抬闷。
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
為了清楚起見,上面顯示了BindingBuilder類耕突,但是對(duì)于bind()方法使用靜態(tài)導(dǎo)入時(shí)笤成,此樣式很好。
Binding類的一個(gè)實(shí)例本身就是持有關(guān)于連接的數(shù)據(jù)眷茁。
換句話說炕泳,它不是一個(gè)“活躍”組件。
但是上祈,正如您將在3.1.10節(jié)“配置代理”中看到的培遵,AmqpAdmin類可以使用Binding實(shí)例來實(shí)際觸發(fā)代理上的綁定操作浙芙。
另外,正如你將在同一部分中看到的籽腕,Binding實(shí)例可以使用@Configuration類中的Spring的@ Bean風(fēng)格進(jìn)行定義嗡呼。
還有一個(gè)方便的基類,它進(jìn)一步簡(jiǎn)化了生成AMQP相關(guān)bean定義的方法皇耗,并識(shí)別隊(duì)列南窗,交換和綁定,以便在應(yīng)用程序啟動(dòng)時(shí)將它們?nèi)柯暶髟贏MQP代理上郎楼。
AmqpTemplate也在核心包中定義万伤。作為實(shí)際AMQP消息傳遞中涉及的主要組件之一,將在其自己的部分中詳細(xì)討論(參見第3.1.4節(jié)“AmqpTemplate”)呜袁。
我們上一節(jié)描述的AMQP模型是通用的敌买,適用于所有實(shí)現(xiàn),當(dāng)我們進(jìn)入資源管理時(shí)阶界,特定的場(chǎng)景需要特殊實(shí)現(xiàn)虹钮。因此,在本節(jié)中荐操,我們將專注于僅存在于我們的“spring-rabbit”模塊中的代碼芜抒,因?yàn)樵谶@一點(diǎn)上,RabbitMQ是唯一支持的實(shí)現(xiàn)托启。
用于管理與RabbitMQ代理的連接的中心組件是ConnectionFactory接口宅倒。?ConnectionFactory實(shí)現(xiàn)的責(zé)任是提供一個(gè)org.springframework.amqp.rabbit.connection.Connection的實(shí)例,它是com.rabbitmq.client.Connection的包裝器屯耸。我們提供的唯一具體實(shí)現(xiàn)是CachingConnectionFactory拐迁,默認(rèn)情況下,它建立可以由應(yīng)用程序共享的單個(gè)連接代理疗绣。連接是共享的线召,因?yàn)榕cAMQP通信的“工作單位”實(shí)際上是一個(gè)“通道”(在某些方面,這與JMS中的連接和會(huì)話之間的關(guān)系類似)多矮。您可以想像缓淹,連接實(shí)例提供了一個(gè)createChannel方法。?CachingConnectionFactory實(shí)現(xiàn)支持對(duì)這些通道的緩存塔逃,并且基于它們是否是事務(wù)來維護(hù)單獨(dú)的通道高速緩存讯壶。創(chuàng)建CachingConnectionFactory實(shí)例時(shí),可以通過構(gòu)造函數(shù)提供主機(jī)名湾盗。還應(yīng)提供用戶名和密碼屬性伏蚊。如果要配置通道緩存的大小(默認(rèn)值為25),您也可以在此處調(diào)用setChannelCacheSize()方法格粪。
從1.3版開始躏吊,CachingConnectionFactory可以配置為緩存連接以及僅通道氛改。
在這種情況下,每次調(diào)用createConnection()都會(huì)創(chuàng)建一個(gè)新的連接(或從緩存中檢索一個(gè)空閑的連接)比伏。
關(guān)閉連接將返回到緩存(如果尚未達(dá)到高速緩存大小)胜卤。
在這種連接上創(chuàng)建的通道也被緩存。
使用單獨(dú)的連接在某些環(huán)境中可能是有用的凳怨,例如從HA群集中消耗負(fù)載均衡器連接到不同的群集成員瑰艘。
將cacheMode設(shè)置為CacheMode.CONNECTION是鬼。
這不限制連接數(shù)肤舞,它指定允許多少空閑打開連接。
從版本1.5.5開始均蜜,提供了一個(gè)新的屬性connectionLimit李剖。當(dāng)設(shè)置此項(xiàng)時(shí),它限制允許的連接總數(shù)囤耳。設(shè)置后篙顺,如果達(dá)到限制,則使用channelCheckoutTimeLimit等待連接變?yōu)榭臻e狀態(tài)充择。如果超過時(shí)間德玫,則拋出AmqpTimeoutException。
重要提示
當(dāng)緩存模式為CONNECTION時(shí)椎麦,不支持自動(dòng)聲明隊(duì)列等(請(qǐng)參閱“自動(dòng)聲明交換宰僧,隊(duì)列和綁定”一節(jié))。
此外观挎,在編寫本文時(shí)琴儿,rabbitmq-client庫(kù)默認(rèn)為每個(gè)連接創(chuàng)建一個(gè)固定的線程池(5個(gè)線程)。當(dāng)使用大量連接時(shí)嘁捷,應(yīng)考慮在CachingConnectionFactory上設(shè)置自定義執(zhí)行程序造成。然后,所有連接將使用相同的執(zhí)行程序雄嚣,并且可以共享它的線程晒屎。執(zhí)行者的線程池應(yīng)該是無限制的,或者針對(duì)預(yù)期的利用率進(jìn)行適當(dāng)設(shè)置(通常每個(gè)連接至少有一個(gè)線程)缓升。如果在每個(gè)連接上創(chuàng)建多個(gè)通道鼓鲁,則池大小將影響并發(fā)性,因此變量(或簡(jiǎn)單的緩存)線程池執(zhí)行器將是最合適的仔沿。
重要的是要明白坐桩,緩存大小(默認(rèn)情況下)不是限制,只是可以緩存的通道數(shù)封锉。具有例如10的高速緩存大小绵跷,實(shí)際上可以使用任何數(shù)量的頻道膘螟。如果正在使用10個(gè)以上的通道,并將它們?nèi)糠祷氐骄彺婺刖郑瑒t10將進(jìn)入高速緩存;其余部分將被物理關(guān)閉荆残。
從版本1.6開始,默認(rèn)通道緩存大小從1增加到25净当。在高容量内斯,多線程環(huán)境中,小緩存意味著以高速率創(chuàng)建和關(guān)閉通道像啼。增加默認(rèn)緩存大小將避免這種開銷俘闯。您應(yīng)該通過RabbitMQ管理界面監(jiān)視正在使用的頻道,并考慮在創(chuàng)建和關(guān)閉許多通道時(shí)進(jìn)一步增加高速緩存大小忽冻。緩存只會(huì)按需增長(zhǎng)(以適應(yīng)應(yīng)用程序的并發(fā)需求)真朗,因此此更改不會(huì)影響現(xiàn)有的低容量應(yīng)用程序。
從版本1.4.2開始僧诚,CachingConnectionFactory具有一個(gè)屬性channelCheckoutTimeout遮婶。當(dāng)此屬性大于零時(shí),channelCacheSize將成為可在連接上創(chuàng)建的通道數(shù)量的限制湖笨。如果達(dá)到限制旗扑,調(diào)用線程將阻塞,直到通道可用或達(dá)到此超時(shí)慈省,在這種情況下拋出一個(gè)AmqpTimeoutException臀防。
框架內(nèi)使用的通道(例如RabbitTemplate)將可靠地返回到緩存。如果您在框架之外創(chuàng)建渠道(例如通過直接訪問連接并調(diào)用createChannel())辫呻,則必須可靠地將其返回(通過關(guān)閉)清钥,也許在finally塊中,以避免使用通道放闺。
CachingConnectionFactory connectionFactory =newCachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用XML時(shí)颇玷,配置可能如下所示:
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
還有一個(gè)SingleConnectionFactory實(shí)現(xiàn)贞让,僅在框架的單元測(cè)試代碼中可用。它比CachingConnectionFactory簡(jiǎn)單,因?yàn)樗痪彺嫱ǖ揽郑怯捎谄淙狈π阅芎蛷椥蕴俾眨贿m用于簡(jiǎn)單測(cè)試之外的實(shí)際使用咏窿。如果您因?yàn)槟承┰蛐枰獙?shí)現(xiàn)自己的ConnectionFactory碌秸,那么AbstractConnectionFactory基類可能會(huì)提供一個(gè)很好的起點(diǎn)。
可以使用rabbit命名空間快速方便地創(chuàng)建ConnectionFactory:
在大多數(shù)情況下艳悔,這將是優(yōu)先的急凰,因?yàn)榭蚣芸梢詾槟x擇最佳默認(rèn)值。創(chuàng)建的實(shí)例將是一個(gè)CachingConnectionFactory猜年。請(qǐng)注意抡锈,通道的默認(rèn)緩存大小為25.如果要更多通道被緩存疾忍,則通過channelCacheSize屬性設(shè)置較大的值。在XML中床三,它將如下所示:
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
使用命名空間一罩,您只需添加channel-cache-size屬性即可:
id="connectionFactory"channel-cache-size="50"/>
默認(rèn)緩存模式是CHANNEL,但您可以將其配置為緩存連接;在這種情況下撇簿,我們使用connection-cache-size:
id="connectionFactory"cache-mode="CONNECTION"connection-cache-size="25"/>
可以使用命名空間設(shè)置主機(jī)和端口屬性
id="connectionFactory"host="somehost"port="5672"/>
或者聂渊,如果在群集環(huán)境中運(yùn)行,請(qǐng)使用addresses屬性四瘫。
id="connectionFactory"addresses="host1:5672,host2:5672"/>
這里有一個(gè)自定義線程工廠的例子汉嗽,它使用rabbitmq-前綴線程名。
thread-factory="tf"
channel-cache-size="10"username="user"password="password"/>
從1.7版開始莲组,提供了一個(gè)ConnectionNameStrategy诊胞,用于注入到AbstractionConnectionFactory中暖夭。生成的名稱用于目標(biāo)RabbitMQ連接的應(yīng)用程序特定標(biāo)識(shí)锹杈。如果RabbitMQ服務(wù)器支持,連接名稱將顯示在管理界面中迈着。該值不必是唯一的竭望,不能用作連接標(biāo)識(shí)符,例如在HTTP API請(qǐng)求中裕菠。該值應(yīng)該是人類可讀的咬清,并且是connection_name鍵下的ClientProperties的一部分∨耍可以用作簡(jiǎn)單的Lambda:
connectionFactory.setConnectionNameStrategy(connectionFactory ->"MY_CONNECTION");
ConnectionFactory參數(shù)可以用來區(qū)分目標(biāo)連接名稱一些邏輯旧烧。默認(rèn)情況下,使用AbstractConnectionFactory的beanName和內(nèi)部計(jì)數(shù)器來生成connection_name画髓。命名空間組件也提供了connection-name-strategy屬性掘剪。
CachingConnectionFactory使用Rabbit客戶端ConnectionFactory的實(shí)例;當(dāng)設(shè)置CachingConnectionFactory上的等效屬性時(shí),會(huì)傳遞一些配置屬性(例如奈虾,host夺谁,port,userName肉微,password匾鸥,requestedHeartBeat,connectionTimeout)碉纳。要設(shè)置其他屬性(例如clientProperties)勿负,請(qǐng)定義 rabbit factory的實(shí)例,并使用適當(dāng)?shù)腃achingConnectionFactory構(gòu)造函數(shù)提供對(duì)它的引用劳曹。當(dāng)使用如上所述的命名空間時(shí)奴愉,在connection-factory屬性中提供對(duì)配置工廠的引用攒至。為方便起見,提供了一個(gè)工廠bean躁劣,以幫助在Spring應(yīng)用程序環(huán)境中配置連接工廠迫吐,如下一節(jié)所述。
id="connectionFactory"connection-factory="rabbitConnectionFactory"/>
4.0.x客戶端默認(rèn)啟用自動(dòng)恢復(fù);與此功能兼容账忘,Spring AMQP具有自己的恢復(fù)機(jī)制志膀,并且通常不需要客戶端恢復(fù)功能。建議禁用amqp-client自動(dòng)恢復(fù)鳖擒,以避免在代理可用時(shí)使AutoRecoverConnectionNotCurrentlyOpenException異常溉浙,但連接尚未恢復(fù)。您可能會(huì)注意到這種異常蒋荚,例如戳稽,當(dāng)RabbitTemplate中配置RetryTemplate時(shí),即使在群集中的其他代理進(jìn)行故障轉(zhuǎn)移時(shí)也是如此期升。由于自動(dòng)恢復(fù)連接在定時(shí)器上恢復(fù)惊奇,因此使用Spring AMQP的恢復(fù)機(jī)制可以更快地恢復(fù)連接。從版本1.7.1開始播赁,除非您明確創(chuàng)建自己的RabbitMQ連接工廠并將其提供給CachingConnectionFactory颂郎,否則Spring AMQP將禁用它。由RabbitConnectionFactoryBean創(chuàng)建的RabbitMQ ConnectionFactory實(shí)例也將默認(rèn)禁用該選項(xiàng)容为。
RabbitConnectionFactoryBean和配置SSL
從版本1.4開始乓序,提供了一個(gè)方便的RabbitConnectionFactoryBean,以便使用依賴注入在底層客戶端連接工廠上方便地配置SSL屬性坎背。其他設(shè)置者只需委托給底層工廠替劈。以前,您必須以編程方式配置SSL選項(xiàng)得滤。
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}"password="${password}"/>
class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
有關(guān)配置SSL的信息陨献,請(qǐng)參閱RabbitMQ文檔。省略keyStore和trustStore配置以通過SSL連接耿戚,而無需證書驗(yàn)證湿故。密鑰和信任存儲(chǔ)配置可以提供如下:
sslPropertiesLocation屬性是指向包含以下鍵的屬性文件的Spring?Resource:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore和truststore是指向磁盤的Spring資源文件。通常膜蛔,此屬性文件將由操作系統(tǒng)保護(hù)坛猪,應(yīng)用程序具有讀訪問權(quán)限。
從Spring AMQP版本1.5開始皂股,這些屬性可以直接在工廠bean上設(shè)置墅茉。如果提供了離散屬性和sslPropertiesLocation,后者中的屬性將覆蓋離散值。
從1.3版開始就斤,引入了AbstractRoutingConnectionFactory悍募。這提供了一種機(jī)制來為幾個(gè)ConnectionFactories配置映射,并在運(yùn)行時(shí)由某些lookupKey確定目標(biāo)ConnectionFactory洋机。通常坠宴,實(shí)現(xiàn)檢查線程綁定的上下文。為了方便起見绷旗,Spring AMQP提供了SimpleRoutingConnectionFactory喜鼓,它從SimpleResourceHolder獲取當(dāng)前線程綁定的lookupKey:
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
publicclassMyService{
@Autowired
privateRabbitTemplate rabbitTemplate;
publicvoidservice(String vHost, String payload){
? ? ? ? SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
? ? ? ? rabbitTemplate.convertAndSend(payload);
? ? ? ? SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
? ? }
}
重要的是在使用后取消綁定資源。有關(guān)更多信息衔肢,請(qǐng)參閱AbstractRoutingConnectionFactory的JavaDocs庄岖。
從版本1.4開始,RabbitTemplate支持SpEL sendConnectionFactorySelectorExpression和receiveConnectionFactorySelectorExpression屬性角骤,它們?cè)诿總€(gè)AMQP協(xié)議交互操作(send隅忿,sendAndReceive,receive或receiveAndReply)上進(jìn)行評(píng)估邦尊,解析為所提供的AbstractRoutingConnectionFactory的lookupKey值背桐。
Bean的引用,比如“@vHostResolver.getVHost(#root)”可以在表達(dá)式中使用。對(duì)于發(fā)送操作胳赌,要發(fā)送的消息是root評(píng)估對(duì)象;對(duì)于接收操作牢撼,queueName是root評(píng)估對(duì)象。
路由算法是:如果選擇器表達(dá)式為空疑苫,或者被評(píng)估為null,或者所提供的ConnectionFactory不是AbstractRoutingConnectionFactory的一個(gè)實(shí)例纷责,則一切都將如前所述捍掺,依賴于提供的ConnectionFactory實(shí)現(xiàn)。如果評(píng)估結(jié)果不為空再膳,但是沒有用于該lookupKey的目標(biāo)ConnectionFactory挺勿,并且使用lenientFallback = true配置AbstractRoutingConnectionFactory,則會(huì)發(fā)生相同的情況喂柒。當(dāng)然不瓶,在一個(gè)AbstractRoutingConnectionFactory的情況下,它會(huì)基于determinCurrentLookupKey()來回溯到其路由實(shí)現(xiàn)灾杰。但是蚊丐,如果lenientFallback = false,則會(huì)拋出IllegalStateException異常艳吠。
命名空間支持還在組件上提供send-connection-factory-selector-expression和receive-connection-factory-selector-expression屬性麦备。
從1.4版開始,您可以在監(jiān)聽器容器中配置路由連接工廠。在這種情況下凛篙,隊(duì)列名稱列表用作查找鍵黍匾。例如,如果使用setQueueNames(“foo”呛梆,“bar”)配置容器锐涯,則查找鍵將為“[foo,bar]”(無空格)填物。
從版本1.6.9開始全庸,您可以在監(jiān)聽器容器上使用setLookupKeyQualifier添加限定符到查找鍵。例如融痛,這可以使用相同名稱監(jiān)聽隊(duì)列壶笼,但是不可以在不同的虛擬主機(jī)中(每個(gè)隊(duì)列中都有一個(gè)連接工廠)。
例如雁刷,使用查找鍵限定符foo和容器監(jiān)聽隊(duì)列欄覆劈,您將注冊(cè)目標(biāo)連接工廠的查找鍵將為foo [bar]。
Queue Affinity和LocalizedQueueConnectionFactory
在群集中使用HA隊(duì)列時(shí)沛励,為獲得最佳性能责语,可能希望連接到主隊(duì)列所在的物理代理。而CachingConnectionFactory可以配置多個(gè)代理地址;這是故障切換目派,客戶端將嘗試按順序連接坤候。 LocalizedQueueConnectionFactory使用由管理插件提供的REST API來確定哪個(gè)節(jié)點(diǎn)已被掌握。然后企蹭,它將創(chuàng)建(或從緩存中檢索)將連接到該節(jié)點(diǎn)的CachingConnectionFactory白筹。如果連接失敗,則確定新的主節(jié)點(diǎn)谅摄,并且消費(fèi)者連接到它徒河。 LocalizedQueueConnectionFactory配置了默認(rèn)連接工廠,以防無法確定隊(duì)列的物理位置送漠,在這種情況下顽照,它將正常連接到集群。
LocalizedQueueConnectionFactory是一個(gè)RoutingConnectionFactory闽寡,SimpleMessageListenerContainer使用隊(duì)列名稱作為查詢鍵代兵,如上面的“路由連接工廠”部分所述。
由于這個(gè)原因(使用查詢的隊(duì)列名稱)爷狈,只有當(dāng)容器配置為監(jiān)聽單個(gè)隊(duì)列時(shí)植影,才能使用LocalizedQueueConnectionFactory淆院。
必須在每個(gè)節(jié)點(diǎn)上啟用RabbitMQ管理插件何乎。
警告
此連接工廠用于長(zhǎng)期連接句惯,例如SimpleMessageListenerContainer使用的連接。它不用于短連接使用支救,例如使用RabbitTemplate抢野,因?yàn)樵谶M(jìn)行連接之前調(diào)用REST API的開銷。此外,對(duì)于發(fā)布操作,隊(duì)列是未知的列牺,并且消息也被發(fā)布到所有集群成員,所以查找節(jié)點(diǎn)的邏輯沒有什么價(jià)值恃轩。
以下是一個(gè)示例配置,使用Spring Boot的RabbitProperties配置工廠:
@Autowired
privateRabbitProperties props;
privatefinalString[] adminUris = {"http://host1:15672","http://host2:15672"};
privatefinalString[] nodes = {"rabbit@host1","rabbit@host2"};
@Bean
publicConnectionFactorydefaultConnectionFactory(){
CachingConnectionFactory cf =newCachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
returncf;
}
@Bean
publicConnectionFactoryqueueAffinityCF(
@Qualifier("defaultConnectionFactory")ConnectionFactory defaultCF)
{
returnnewLocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris,this.nodes,
this.props.getVirtualHost(),this.props.getUsername(),this.props.getPassword(),
false,null);
}
請(qǐng)注意黎做,前三個(gè)參數(shù)是addresses叉跛,adminUris和nodes。這些是位置性的蒸殿,因?yàn)楫?dāng)容器嘗試連接到隊(duì)列時(shí)筷厘,它確定隊(duì)列被掌握在哪個(gè)節(jié)點(diǎn)上并連接到同一陣列位置的地址。
Publisher Confirms and Returns
通過將CachingConnectionFactory的publisherConfirms和publisherReturns屬性分別設(shè)置為“true”宏所,支持確認(rèn)和返回的消息酥艳。
設(shè)置這些選項(xiàng)時(shí),工廠創(chuàng)建的通道將被包裝在PublisherCallbackChannel中爬骤,該通道用于方便回調(diào)充石。當(dāng)獲得這樣的頻道時(shí),客戶端可以使用頻道注冊(cè)一個(gè)PublisherCallbackChannel.Listener霞玄。 PublisherCallbackChannel實(shí)現(xiàn)包含將 確認(rèn)或返回 路由到適當(dāng)?shù)谋O(jiān)聽器的邏輯骤铃。以下部分將進(jìn)一步說明這些功能。
有關(guān)更多背景信息溃列,請(qǐng)參閱RabbitMQ小組題為“引入發(fā)布者確認(rèn)”的博客文章劲厌。
版本1.5中引入了一種使用戶能夠控制日志記錄級(jí)別的機(jī)制。
CachingConnectionFactory使用默認(rèn)策略來記錄通道關(guān)閉听隐,如下所示:
正常通道關(guān)閉(200 OK)不記錄。
如果通道由于被動(dòng)隊(duì)列聲明失敗而關(guān)閉哄啄,則會(huì)在調(diào)試級(jí)別進(jìn)行記錄雅任。
如果由于消費(fèi)者條件驗(yàn)證不通過,則通道關(guān)閉咨跌,則會(huì)在INFO級(jí)別記錄沪么。
所有其他情況都記錄在ERROR級(jí)別。
要修改此行為锌半,請(qǐng)?jiān)谄鋍loseExceptionLogger屬性中的CachingConnectionFactory中注入自定義ConditionalExceptionLogger禽车。
另請(qǐng)參閱“消費(fèi)者失敗事件”一節(jié)。
Table 3.1 CacheMode.CHANNEL的緩存屬性
屬性說明
connectionNameConnectionNameStrategy生成的連接的名稱。
channelCacheSize當(dāng)前配置的允許空閑的最大通道殉摔。
localPort連接的本地端口(如果可用)州胳。這可以用于與RabbitMQ管理界面上的連接/通道相關(guān)聯(lián)。
idleChannelsTx當(dāng)前空閑(緩存)的事務(wù)通道的數(shù)量逸月。
idleChannelsNotTx當(dāng)前空閑(高速緩存)的非事務(wù)性通道的數(shù)量栓撞。
idleChannelsTxHighWater同時(shí)空閑(緩存)的事務(wù)通道的最大數(shù)量。
idleChannelsNotTxHighWater非事務(wù)性通道的最大數(shù)量同時(shí)處于空閑狀態(tài)(緩存)碗硬。
Table 3.2 CacheMode.CONNECTION的緩存屬性
屬性說明
connectionName:localPortConnectionNameStrategy生成的連接的名稱瓤湘。
openConnections表示與經(jīng)紀(jì)人連接的連接對(duì)象數(shù)。
channelCacheSize當(dāng)前配置的允許空閑的最大通道恩尾。
connectionCacheSize當(dāng)前配置的允許空閑的最大連接弛说。
idleConnections當(dāng)前空閑的連接數(shù)。
idleConnectionsHighWater同時(shí)空閑的最大連接數(shù)翰意。
idleChannelsTx:localPort當(dāng)前為此連接空閑(高速緩存)的事務(wù)通道的數(shù)量木人。屬性名稱的localPort部分可用于與RabbitMQ Admin UI上的連接/通道相關(guān)聯(lián)。
idleChannelsNotTx:localPort當(dāng)前為此連接空閑(高速緩存)的非事務(wù)性通道的數(shù)量猎物。屬性名稱的localPort部分可用于與RabbitMQ Admin UI上的連接/通道相關(guān)聯(lián)虎囚。
idleChannelsTxHighWater:localPort同時(shí)空閑(緩存)的事務(wù)通道的最大數(shù)量。屬性名稱的localPort部分可用于與RabbitMQ Admin UI上的連接/通道相關(guān)聯(lián)蔫磨。
idleChannelsNotTxHighWater:localPort非事務(wù)性通道的最大數(shù)量同時(shí)處于空閑狀態(tài)(緩存)淘讥。屬性名稱的localPort部分可用于與RabbitMQ Admin UI上的連接/通道相關(guān)聯(lián)。
cacheMode屬性(還包括CHANNEL或CONNECTION)堤如。
此處輸入圖片的描述
RabbitMQ自動(dòng)連接/拓?fù)浠謴?fù)
從Spring AMQP的第一個(gè)版本開始蒲列,框架在發(fā)生代理失敗時(shí)提供了自己的連接和通道恢復(fù)。另外搀罢,如第3.1.10節(jié)“配置代理”所述蝗岖,當(dāng)重新建立連接時(shí),RabbitAdmin將重新聲明任何基礎(chǔ)架構(gòu)bean(隊(duì)列等)榔至。因此抵赢,它不依賴于由amqp-client庫(kù)提供的自動(dòng)恢復(fù)。 Spring AMQP現(xiàn)在使用4.0.x版本的amqp-client唧取,默認(rèn)情況下啟用自動(dòng)恢復(fù)铅鲤。 Spring AMQP仍然可以使用自己的恢復(fù)機(jī)制,如果您希望枫弟,在客戶端禁用它(通過將底層RabbitMQ connectionFactory上的automaticRecoveryEnabled屬性設(shè)置為false)邢享。但是,該框架與啟用自動(dòng)恢復(fù)完全兼容淡诗。這意味著您在代碼中創(chuàng)建的任何消費(fèi)者(可能通過RabbitTemplate.execute())都可以自動(dòng)恢復(fù)骇塘。
CachingConnectionFactory現(xiàn)在允許您訪問底層連接工廠伊履,例如設(shè)置自定義客戶端屬性:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo","bar");
查看連接時(shí),這些屬性將顯示在RabbitMQ管理界面中款违。
與Spring框架和相關(guān)項(xiàng)目提供的許多其他高級(jí)抽象一樣唐瀑,Spring AMQP提供了一個(gè)起著核心作用的“模板”。定義主要操作的界面稱為AmqpTemplate奠货。這些操作涵蓋發(fā)送和接收消息的一般行為介褥。換句話說,它們不是任何實(shí)現(xiàn)的唯一递惋,因此名稱中的“AMQP”柔滔。另一方面,該接口的實(shí)現(xiàn)與AMQP協(xié)議的實(shí)現(xiàn)相關(guān)萍虽。不同于JMS睛廊,它是本身是一個(gè)接口級(jí)API,AMQP是一個(gè)線級(jí)協(xié)議杉编。該協(xié)議的實(shí)現(xiàn)提供自己的客戶端庫(kù)超全,因此模板接口的每個(gè)實(shí)現(xiàn)將取決于特定的客戶端庫(kù)。目前邓馒,只有一個(gè)實(shí)現(xiàn):RabbitTemplate嘶朱。在下面的示例中,您將經(jīng)彻夂ǎ看到“AmqpTemplate”的使用疏遏,但是當(dāng)您查看配置示例或調(diào)用模板實(shí)例化和/或設(shè)置的任何代碼摘錄時(shí),您將看到實(shí)現(xiàn)類型(例如“RabbitTemplate”)救军。
如上所述财异,AmqpTemplate接口定義了發(fā)送和接收消息的所有基本操作。我們將在下面的兩個(gè)部分分別探討消息發(fā)送和接收唱遭。
另請(qǐng)參見“AsyncRabbitTemplate”一節(jié)戳寸。
從版本1.3開始,您現(xiàn)在可以將RabbitTemplate配置為使用RetryTemplate來幫助處理代理連接問題拷泽。有關(guān)完整信息疫鹊,請(qǐng)參閱spring-retry項(xiàng)目。以下只是使用指數(shù)退出策略和默認(rèn)SimpleRetryPolicy的一個(gè)示例司致,它將在向調(diào)用者拋出異常之前進(jìn)行三次嘗試订晌。
使用XML命名空間:
使用@Configuration:
@Bean
publicAmqpTemplaterabbitTemplate();
RabbitTemplate template =newRabbitTemplate(connectionFactory());
RetryTemplate retryTemplate =newRetryTemplate();
ExponentialBackOffPolicy backOffPolicy =newExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
? ? retryTemplate.setBackOffPolicy(backOffPolicy);
? ? template.setRetryTemplate(retryTemplate);
returntemplate;
}
從版本1.4開始,除了retryTemplate屬性之外蚌吸,RabbitTemplate還支持recoveryCallback選項(xiàng)。它被用作RetryTemplate.execute(RetryCallback retryCallback砌庄,RecoveryCallback recoveryCallback)的第二個(gè)參數(shù)羹唠。
RecoveryCallback有些限制奕枢,因?yàn)橹卦嚿舷挛膬H包含lastThrowable字段。對(duì)于更復(fù)雜的用例佩微,您應(yīng)該使用外部RetryTemplate缝彬,以便您可以通過上下文的屬性向RecoveryCallback傳遞附加信息:
retryTemplate.execute(
newRetryCallback() {
@Override
publicObjectdoWithRetry(RetryContext context)throwsException{
context.setAttribute("message", message);
returnrabbitTemplate.convertAndSend(exchange, routingKey, message);
? ? ? ? }
},newRecoveryCallback() {
@Override
publicObjectrecover(RetryContext context)throwsException{
Object message = context.getAttribute("message");
? ? ? ? ? ? Throwable t = context.getLastThrowable();
// Do something with message
returnnull;
? ? ? ? }
? ? });
}
在這種情況下,您不會(huì)將RetryTemplate注入RabbitTemplate哺眯。
Publisher Confirms and Returns
AmqpTemplate的RabbitTemplate實(shí)現(xiàn)Publisher Confirms and Returns谷浅。
對(duì)于返回的消息,模板的必需屬性必須設(shè)置為true奶卓,否則強(qiáng)制表達(dá)式必須對(duì)特定消息進(jìn)行求值一疯。此功能需要一個(gè)CachedConnectionFactory,其publisherReturns屬性設(shè)置為true(參見“發(fā)布者確認(rèn)和返回”一節(jié))夺姑。通過調(diào)用setReturnCallback(ReturnCallback callback)注冊(cè)一個(gè)RabbitTemplate.ReturnCallback墩邀,返回給客戶端≌嫡悖回調(diào)必須實(shí)現(xiàn)這個(gè)方法:
voidreturnedMessage(Message message,intreplyCode, String replyText,
? ? ? ? ? String exchange, String routingKey)
;
每個(gè)RabbitTemplate只支持一個(gè)ReturnCallback眉睹。另見“Reply Timeout”一節(jié)。
對(duì)于發(fā)布者確認(rèn)(aka Publisher Acknowledgements)废膘,該模板需要一個(gè)CachedConnectionFactory竹海,其publisherConfirms屬性設(shè)置為true。通過調(diào)用setConfirmCallback(ConfirmCallback callback)注冊(cè)RabbitTemplate.ConfirmCallback丐黄,確認(rèn)發(fā)送給客戶端斋配。回調(diào)必須實(shí)現(xiàn)這個(gè)方法:
voidconfirm(CorrelationData correlationData,booleanack, String cause);
CorrelationData是發(fā)送原始消息時(shí)由客戶端提供的對(duì)象孵稽。 ack是真實(shí)的许起,對(duì)于一個(gè)nack是假的。對(duì)于nack菩鲜,原因可能包含一個(gè)原因,如果在生成nack時(shí)可用侣诵。一個(gè)例子是向不存在的交換發(fā)送消息時(shí)甘凭。在這種情況下恐仑,經(jīng)紀(jì)人關(guān)閉渠道;原因包括在內(nèi)的原因。原因在版本1.4中被添加。
RabbitTemplate只支持一個(gè)ConfirmCallback乌昔。
RabbitTemplate發(fā)送操作完成后邻吞,通道關(guān)閉;這將阻止在連接工廠緩存已滿時(shí)(當(dāng)緩存中有空間,通道沒有物理關(guān)閉并且返回/確認(rèn)將正常進(jìn)行時(shí))的接收確認(rèn)或返回葫男。當(dāng)緩存已滿時(shí)抱冷,框架會(huì)將關(guān)閉延遲最多5秒,以便允許接收確認(rèn)/返回的時(shí)間腾誉。當(dāng)使用確認(rèn)時(shí)徘层,當(dāng)接收到最后一次確認(rèn)時(shí),通道將被關(guān)閉利职。當(dāng)僅使用退貨時(shí)趣效,通道將保持打開5秒鐘。通常建議將連接工廠的channelCacheSize設(shè)置為足夠大的值猪贪,以便將發(fā)布消息的通道返回到緩存跷敬,而不是關(guān)閉。您可以使用RabbitMQ管理插件監(jiān)視頻道使用情況;如果您看到頻道正在快速打開/關(guān)閉热押,您應(yīng)該考慮增加緩存大小以減少服務(wù)器上的開銷西傀。
從版本1.4開始斤寇,構(gòu)建在RabbitTemplate之上的RabbitMessagingTemplate提供了與Spring Framework消息抽象(即org.springframework.messaging.Message)的集成。這允許您使用spring-messaging?Message抽象發(fā)送和接收消息拥褂。這種抽象是由Spring Integration和Spring的STOMP支持的其他Spring項(xiàng)目使用的娘锁。有兩個(gè)消息轉(zhuǎn)換器涉及;一個(gè)用于在Spring消息傳遞Message和Spring AMQP的Message抽象之間進(jìn)行轉(zhuǎn)換,另一個(gè)用于在Spring AMQP的Message抽象與底層RabbitMQ客戶端庫(kù)所需的格式之間進(jìn)行轉(zhuǎn)換饺鹃。默認(rèn)情況下莫秆,消息有效載荷由提供的RabbitTemplate的消息轉(zhuǎn)換器轉(zhuǎn)換』谙辏或者镊屎,您可以使用其他有效載荷轉(zhuǎn)換器注入自定義MessagingMessageConverter:
MessagingMessageConverter amqpMessageConverter =newMessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTempalte.setAmqpMessageConverter(amqpMessageConverter);
從版本1.6開始,該模板現(xiàn)在支持用戶id表達(dá)式(使用Java配置時(shí)的userIdExpression)茄螃。如果發(fā)送消息缝驳,則在評(píng)估此表達(dá)式后,設(shè)置用戶id屬性(如果尚未設(shè)置)归苍。用于評(píng)估的根對(duì)象是要發(fā)送的消息用狱。
例子:
第一個(gè)例子是一個(gè)文字表達(dá);第二個(gè)從應(yīng)用程序上下文中的連接工廠bean獲取用戶名屬性。
發(fā)送消息時(shí)霜医,可以使用以下任一方法:
voidsend(Message message)throwsAmqpException;
voidsend(String routingKey, Message message)throwsAmqpException;
voidsend(String exchange, String routingKey, Message message)throwsAmqpException;
我們可以用上面列出的最后一個(gè)方法開始討論褪尝,因?yàn)樗鼘?shí)際上是最明確的菩暗。它允許在運(yùn)行時(shí)提供AMQP Exchange名稱以及路由密鑰斥赋。最后一個(gè)參數(shù)是負(fù)責(zé)實(shí)例創(chuàng)建Message實(shí)例的回調(diào)采够。使用此方法發(fā)送消息的示例可能是這樣的:
amqpTemplate.send("marketData.topic","quotes.nasdaq.FOO",
newMessage("12.34".getBytes(), someProperties));
如果您打算在大部分或全部時(shí)間使用該模板實(shí)例發(fā)送到同一個(gè)交換機(jī),則可以在模板本身上設(shè)置“交換”屬性医男。在這種情況下砸狞,可以使用上面列出的第二種方法。以下示例在功能上等同于上一個(gè)示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO",newMessage("12.34".getBytes(), someProperties));
如果在模板上設(shè)置了“交換”和“路由密鑰”屬性镀梭,則可以使用僅接受消息的方法:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(newMessage("12.34".getBytes(), someProperties));
更好的思考交換和路由關(guān)鍵屬性的方法是顯式方法參數(shù)將始終覆蓋模板的默認(rèn)值刀森。實(shí)際上,即使你沒有在模板上顯式設(shè)置這些屬性报账,總是存在默認(rèn)值研底。在這兩種情況下,默認(rèn)是一個(gè)空字符串透罢,但這實(shí)際上是一個(gè)明智的默認(rèn)值榜晦。就路由密鑰而言,首先并不總是必需的(例如扇出交換機(jī))羽圃。此外乾胶,隊(duì)列可能與一個(gè)空字符串綁定到一個(gè)Exchange。這些都是依賴模板的路由密鑰屬性的默認(rèn)空字符串值的合法場(chǎng)景。就Exchange名稱而言识窿,空字符串是常用的斩郎,因?yàn)锳MQP規(guī)范將“默認(rèn)Exchange”定義為沒有名稱。由于所有隊(duì)列都使用其名稱作為綁定值自動(dòng)綁定到該默認(rèn)Exchange(即直接Exchange)喻频,所以上述第二種方法可用于通過默認(rèn)Exchange進(jìn)行的任何隊(duì)列的簡(jiǎn)單點(diǎn)對(duì)點(diǎn)消息傳遞缩宜。只需提供隊(duì)列名稱作為“routingKey” - 或者通過在運(yùn)行時(shí)提供方法參數(shù):
RabbitTemplate template =newRabbitTemplate();// using default no-name Exchange
template.send("queue.helloWorld",newMessage("Hello World".getBytes(), someProperties));
或者,如果您希望創(chuàng)建一個(gè)將主要或?qū)iT用于單個(gè)隊(duì)列發(fā)布的模板半抱,以下是完全合理的:
RabbitTemplate template =newRabbitTemplate();// using default no-name Exchange
template.setRoutingKey("queue.helloWorld");// but we'll always send to this Queue
template.send(newMessage("Hello World".getBytes(), someProperties));
從版本1.3開始脓恕,消息構(gòu)建器API由MessageBuilder和MessagePropertiesBuilder提供;它們提供了一種方便的“流利”手段創(chuàng)建消息或消息屬性:
Message message = MessageBuilder.withBody("foo".getBytes())
? ? .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar","baz")
? ? .build();
或
MessageProperties props = MessagePropertiesBuilder.newInstance()
? ? .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar","baz")
? ? .build();
Message message = MessageBuilder.withBody("foo".getBytes())
? ? .andProperties(props)
? ? .build();
可以設(shè)置MessageProperties上定義的每個(gè)屬性。其他方法包括setHeader(String key窿侈,String value),removeHeader(String key)秋茫,removeHeaders()和copyProperties(MessageProperties屬性)史简。每個(gè)屬性設(shè)置方法都有一個(gè)setIfAbsent()變體。在存在默認(rèn)初始值的情況下肛著,該方法命名為setIfAbsentOrDefault()圆兵。
提供了五種靜態(tài)方法來創(chuàng)建初始消息構(gòu)建器:
publicstaticMessageBuilderwithBody(byte[] body)1
publicstaticMessageBuilderwithClonedBody(byte[] body)2
publicstaticMessageBuilderwithBody(byte[] body,intfrom,intto)3
publicstaticMessageBuilderfromMessage(Message message)4
publicstaticMessageBuilderfromClonedMessage(Message message)5
1.由構(gòu)建器創(chuàng)建的消息將具有直接引用參數(shù)的主體。
2.由構(gòu)建器創(chuàng)建的消息將具有一個(gè)包含參數(shù)中字節(jié)副本的新數(shù)組的主體枢贿。
3.由構(gòu)建器創(chuàng)建的消息將具有一個(gè)新的數(shù)組殉农,其中包含參數(shù)中的字節(jié)范圍。有關(guān)更多詳細(xì)信息局荚,請(qǐng)參閱Arrays.copyOfRange();
4.他由建設(shè)者創(chuàng)建的消息將具有一個(gè)直接引用參數(shù)體的主體超凳。參數(shù)的屬性被復(fù)制到一個(gè)新的MessageProperties對(duì)象。
5.由構(gòu)建器創(chuàng)建的消息將具有一個(gè)包含參數(shù)的正文副本的新數(shù)組的主體耀态。參數(shù)的屬性被復(fù)制到一個(gè)新的MessageProperties對(duì)象轮傍。
publicstaticMessagePropertiesBuildernewInstance()1
publicstaticMessagePropertiesBuilderfromProperties(MessageProperties properties)2
publicstaticMessagePropertiesBuilderfromClonedProperties(MessageProperties properties)3
1.新的消息屬性對(duì)象使用默認(rèn)值初始化。
2.構(gòu)建器初始化首装,并且build()將返回所提供的屬性對(duì)象创夜。
3.參數(shù)的屬性被復(fù)制到一個(gè)新的MessageProperties對(duì)象。
使用AmqpTemplate的RabbitTemplate實(shí)現(xiàn)仙逻,每個(gè)send()方法都有一個(gè)重載的版本驰吓,它接受一個(gè)附加的CorrelationData對(duì)象。當(dāng)發(fā)布者確認(rèn)被啟用時(shí)系奉,該對(duì)象將在第3.1.4節(jié)“AmqpTemplate”中描述的回調(diào)中返回檬贰。這允許發(fā)送者將確認(rèn)(ack或nack)與發(fā)送的消息相關(guān)聯(lián)。
從1.6.7版開始喜最,引入了CorrelationAwareMessagePostProcessor接口偎蘸,允許在消息轉(zhuǎn)換后修改相關(guān)數(shù)據(jù):
MessagepostProcessMessage(Message message, Correlation correlation);
同樣從版本1.6.7開始,提供了一個(gè)新的回調(diào)接口CorrelationDataPostProcessor;所有MessagePostProcessor(在send()方法中提供)以及在setBeforePublishPostProcessors()中提供的那些)之后調(diào)用。實(shí)現(xiàn)可以更新或替換send()方法中提供的相關(guān)數(shù)據(jù)(如果有的話)迷雪。消息和原始CorrelationData(如果有)作為參數(shù)提供限书。
CorrelationDatapostProcess(Message message, CorrelationData correlationData);
當(dāng)模板的強(qiáng)制屬性為true時(shí),返回的消息由第3.1.4節(jié)“AmqpTemplate”中描述的回調(diào)提供章咧。
從版本1.4開始倦西,RabbitTemplate支持根據(jù)每個(gè)請(qǐng)求消息評(píng)估的Spel mandatoryExpression屬性作為根評(píng)估對(duì)象,解析為布爾值赁严。
Bean的引用,比如“@myBean.isMandatory(#root)”可以在表達(dá)式中使用扰柠。
發(fā)送者返回也可以在RabbitTemplate內(nèi)部用于發(fā)送和接收操作。有關(guān)詳細(xì)信息疼约,請(qǐng)參閱“回復(fù)超時(shí)”一節(jié)卤档。
從版本1.4.2開始,已經(jīng)介紹了BatchingRabbitTemplate程剥。這是RabbitTemplate的一個(gè)子類劝枣,具有重寫的發(fā)送方法,根據(jù)BatchingStrategy對(duì)消息進(jìn)行批處理;只有批量完成時(shí)才將消息發(fā)送給RabbitMQ织鲸。
publicinterfaceBatchingStrategy{
MessageBatchaddToBatch(String exchange, String routingKey, Message message);
DatenextRelease();
CollectionreleaseBatches();
}
警告
批量數(shù)據(jù)保存在內(nèi)存中;在發(fā)生系統(tǒng)故障的情況下舔腾,未發(fā)送的消息可能會(huì)丟失。
提供了SimpleBatchingStrategy搂擦。它支持將消息發(fā)送到單個(gè)交換/路由密鑰稳诚。它有屬性:
batchSize 發(fā)送批次之前的消息數(shù)
bufferLimit 批量消息的最大大小;這將超過batchSize,并且導(dǎo)致部分批處理被發(fā)送
timeout 當(dāng)沒有新活動(dòng)向批量添加消息時(shí)瀑踢,將發(fā)送部分批次的時(shí)間
SimpleBatchingStrategy通過使用4字節(jié)二進(jìn)制長(zhǎng)度的每個(gè)嵌入式消息進(jìn)行格式化扳还。通過將springBatchFormat消息屬性設(shè)置為lengthHeader4,將其傳送到接收系統(tǒng)丘损。
注意
批量消息由監(jiān)聽器容器(使用springBatchFormat消息頭)自動(dòng)分段普办。拒絕批次中的任何消息將導(dǎo)致整個(gè)批次被拒絕。
消息接收總是比發(fā)送更復(fù)雜一些徘钥。有兩種方式可以接收消息衔蹲。更簡(jiǎn)單的選擇是一次輪詢方法調(diào)用來輪詢單個(gè)消息。更復(fù)雜但更常見的方法是注冊(cè)將按需異步接收消息的監(jiān)聽器呈础。我們將在接下來的兩個(gè)小節(jié)中看一下每種方法的一個(gè)例子舆驶。
AmqpTemplate本身可用于輪詢的消息接收。默認(rèn)情況下而钞,如果沒有消息可用沙廉,則立即返回null;沒有阻塞。從版本1.5開始臼节,您現(xiàn)在可以設(shè)置receiveTimeout(以毫秒為單位)撬陵,并且接收方法將阻塞長(zhǎng)達(dá)數(shù)秒珊皿,等待消息。小于零的值意味著無限期地阻止(或至少直到與代理的連接丟失)巨税。版本1.6引入了接收方法的變體蟋定,允許每次調(diào)用傳遞超時(shí)。
警告
由于接收操作為每個(gè)消息創(chuàng)建一個(gè)新的QueueingConsumer草添,因此該技術(shù)并不適用于大容量環(huán)境;考慮使用異步消費(fèi)者驶兜,或者對(duì)于這些用例使用receiveTimeout為零。
有四種簡(jiǎn)單的接收方法可用远寸。與發(fā)送方的Exchange一樣抄淑,有一種方法需要直接在模板本身設(shè)置默認(rèn)隊(duì)列屬性,并且有一種在運(yùn)行時(shí)接受隊(duì)列參數(shù)的方法驰后。版本1.6引入了接受timeoutMillis的變體來根據(jù)每個(gè)請(qǐng)求重寫receiveTimeout肆资。
Messagereceive()throwsAmqpException;
Messagereceive(String queueName)throwsAmqpException;
Messagereceive(longtimeoutMillis)throwsAmqpException;
Messagereceive(String queueName,longtimeoutMillis)throwsAmqpException;
就像在發(fā)送消息的情況下,AmqpTemplate有一些方便的接收POJO而不是Message實(shí)例的方法灶芝,并且實(shí)現(xiàn)將提供一種自定義用于創(chuàng)建返回的對(duì)象的MessageConverter的方法:
ObjectreceiveAndConvert()throwsAmqpException;
ObjectreceiveAndConvert(String queueName)throwsAmqpException;
MessagereceiveAndConvert(longtimeoutMillis)throwsAmqpException;
MessagereceiveAndConvert(String queueName,longtimeoutMillis)throwsAmqpException;
類似于sendAndReceive方法迅耘,從版本1.3開始,AmqpTemplate具有幾個(gè)方便的receiveAndReply方法來同步接收监署,處理和回復(fù)消息:
booleanreceiveAndReply(ReceiveAndReplyCallback callback)
throwsAmqpException;
booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback callback)
throwsAmqpException;
booleanreceiveAndReply(ReceiveAndReplyCallback callback,
String replyExchange, String replyRoutingKey)throwsAmqpException
;
booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback callback,
String replyExchange, String replyRoutingKey)throwsAmqpException
;
booleanreceiveAndReply(ReceiveAndReplyCallback callback,
ReplyToAddressCallback replyToAddressCallback)throwsAmqpException
;
booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback callback,
ReplyToAddressCallback replyToAddressCallback)throwsAmqpException
;
AmqpTemplate實(shí)現(xiàn)負(fù)責(zé)接收和回復(fù)階段。在大多數(shù)情況下纽哥,您應(yīng)該只提供一個(gè)ReceiveAndReplyCallback的實(shí)現(xiàn)來為接收到的消息執(zhí)行一些業(yè)務(wù)邏輯钠乏,如果需要,可以構(gòu)建回復(fù)對(duì)象或消息春塌。注意晓避,ReceiveAndReplyCallback可能返回null。在這種情況下只壳,沒有發(fā)送回復(fù)俏拱,receiveAndReply類似于receive方法。這允許將相同的隊(duì)列用于消息的混合吼句,其中一些可能不需要回復(fù)锅必。
僅當(dāng)提供的回調(diào)不是ReceiveAndReplyMessageCallback的實(shí)例(提供原始消息交換合同)時(shí),才應(yīng)用自動(dòng)消息(請(qǐng)求和回復(fù))轉(zhuǎn)換惕艳。
ReplyToAddressCallback對(duì)于需要自定義邏輯在運(yùn)行時(shí)根據(jù)接收到的消息和ReceiveAndReplyCallback的回復(fù)來確定replyTo地址的情況很有用搞隐。默認(rèn)情況下,請(qǐng)求消息中的replyTo信息用于路由回復(fù)远搪。
以下是基于POJO的接收和回復(fù)的示例…
booleanreceived =
this.template.receiveAndReply(ROUTE,newReceiveAndReplyCallback() {
publicInvoicehandle(Order order){
returnprocessOrder(order);
? ? ? ? ? ? ? ? }
? ? ? ? });
if(received) {
log.info("We received an order!");
}
Asynchronous Consumer(異步消費(fèi)者)
注意
Spring AMQP還通過使用@RabbitListener注解來支持帶注解的監(jiān)聽端點(diǎn)劣纲,并提供了一種開放的基礎(chǔ)設(shè)施,以編程方式注冊(cè)端點(diǎn)谁鳍。這是設(shè)置異步消費(fèi)者的最方便的方法癞季,有關(guān)詳細(xì)信息劫瞳,請(qǐng)參閱“注解驅(qū)動(dòng)的監(jiān)聽器端點(diǎn)”一節(jié)。
Message Listener
對(duì)于異步消息接收绷柒,涉及專用組件(而不是AmqpTemplate)志于。該組件是消息回收消息的容器。我們將在短時(shí)間內(nèi)查看容器及其屬性辉巡,但首先我們應(yīng)該看一下回調(diào)恨憎,因?yàn)檫@樣你的應(yīng)用程序代碼將與郵件系統(tǒng)集成在一起。從MessageListener接口的實(shí)現(xiàn)開始郊楣,有幾個(gè)回調(diào)選項(xiàng):
publicinterfaceMessageListener{
voidonMessage(Message message);
}
如果您的回調(diào)邏輯由于任何原因取決于AMQP Channel實(shí)例憔恳,您可以改為使用ChannelAwareMessageListener。它看起來相似净蚤,但有一個(gè)額外的參數(shù):
publicinterfaceChannelAwareMessageListener{
voidonMessage(Message message, Channel channel)throwsException;
}
MessageListenerAdapter
如果您希望在應(yīng)用程序邏輯和消息傳遞API之間保持更嚴(yán)格的分隔钥组,則可以依靠框架提供的適配器實(shí)現(xiàn)。這通常被稱為“消息驅(qū)動(dòng)的POJO”支持今瀑。使用適配器時(shí)程梦,只需要提供適配器本身應(yīng)該調(diào)用的實(shí)例的引用。
MessageListenerAdapter listener =newMessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以將適配器子類化并提供getListenerMethodName()的實(shí)現(xiàn)橘荠,以根據(jù)消息動(dòng)態(tài)選擇不同的方法屿附。這個(gè)方法有兩個(gè)參數(shù),即原始的消息和extractMessage哥童,后者是任何轉(zhuǎn)換的結(jié)果挺份。默認(rèn)情況下,配置SimpleMessageConverter;有關(guān)可用的其他轉(zhuǎn)換器的更多信息和信息贮懈,請(qǐng)參閱“SimpleMessageConverter”一節(jié)匀泊。
從版本1.4.2開始,原始消息具有consumerQueue和consumerTag屬性朵你,可用于確定從哪個(gè)隊(duì)列接收消息各聘。
從版本1.5開始,您可以將消費(fèi)者隊(duì)列/標(biāo)記的映射配置為方法名稱抡医,以動(dòng)態(tài)選擇要調(diào)用的方法躲因。如果map中沒有條目,我們將回到默認(rèn)監(jiān)聽器方法魂拦。
Container
現(xiàn)在您已經(jīng)看到了Message-listening回調(diào)的各種選項(xiàng)毛仪,我們可以將注意力轉(zhuǎn)向容器⌒究保基本上箱靴,容器處理“主動(dòng)”的責(zé)任,使得監(jiān)聽器回調(diào)可以保持被動(dòng)荷愕。容器是“生命周期”組件的示例衡怀。它提供了啟動(dòng)和停止的方法棍矛。配置容器時(shí),您基本上彌合了AMQP隊(duì)列和MessageListener實(shí)例之間的差距抛杨。您必須提供對(duì)ConnectionFactory以及該監(jiān)聽器應(yīng)從其消費(fèi)消息的隊(duì)列名稱或隊(duì)列實(shí)例的引用够委。這是使用默認(rèn)實(shí)現(xiàn)的最基本的例子SimpleMessageListenerContainer:
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(newMessageListenerAdapter(somePojo));
作為“活動(dòng)”組件,最常見的是使用bean定義創(chuàng)建監(jiān)聽器容器怖现,以便它可以在后臺(tái)運(yùn)行茁帽。這可以通過XML來完成:
或者,您可能更喜歡使用與上述實(shí)際代碼片段非常相似的@Configuration樣式:
@Configuration
publicclassExampleAmqpConfiguration{
@Bean
publicSimpleMessageListenerContainermessageListenerContainer(){
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
? ? ? ? container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
? ? ? ? container.setMessageListener(exampleListener());
returncontainer;
? ? }
@Bean
publicConnectionFactoryrabbitConnectionFactory(){
? ? ? ? CachingConnectionFactory connectionFactory =
newCachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
returnconnectionFactory;
? ? }
@Bean
publicMessageListenerexampleListener(){
returnnewMessageListener() {
publicvoidonMessage(Message message){
System.out.println("received: "+ message);
? ? ? ? ? ? }
? ? ? ? };
? ? }
}
從RabbitMQ版本3.2開始屈嗤,代理現(xiàn)在支持消費(fèi)者優(yōu)先級(jí)(請(qǐng)參閱使用RICSMQ使用消費(fèi)者優(yōu)先級(jí))潘拨。這通過在消費(fèi)者上設(shè)置x-priority參數(shù)來啟用囚似。 SimpleMessageListenerContainer現(xiàn)在支持設(shè)置消費(fèi)者參數(shù):
container.setConsumerArguments(Collections.
singletonMap("x-priority", Integer.valueOf(10)));
為方便起見蜒蕾,命名空間提供了listener元素的priority屬性:
從版本1.3開始吼鳞,容器正在監(jiān)聽的隊(duì)列可以在運(yùn)行時(shí)修改;Section 3.1.18, “Listener Container Queues”.
auto-delete Queues
當(dāng)容器配置為監(jiān)聽自動(dòng)刪除隊(duì)列時(shí)痹屹,或者隊(duì)列具有x-expires選項(xiàng),或者Broker上配置了“即時(shí)生存”策略吵冒,則當(dāng)容器為停止(最后消費(fèi)者被取消)科展。在版本1.3之前低缩,由于隊(duì)列丟失算谈,容器無法重新啟動(dòng);當(dāng)連接被關(guān)閉/打開時(shí)涩禀,RabbitAdmin只會(huì)自動(dòng)重新發(fā)送隊(duì)列等,當(dāng)容器停止/啟動(dòng)時(shí)然眼,不會(huì)發(fā)生這種情況埋泵。
從版本1.3開始,容器現(xiàn)在將在啟動(dòng)期間使用RabbitAdmin重新聲明任何丟失的隊(duì)列罪治。
您還可以使用條件聲明(稱為“條件聲明”一節(jié))以及auto-startup =“false”管理員延遲隊(duì)列聲明,直到容器啟動(dòng)礁蔗。
auto-startup="false"/>
在這種情況下觉义,隊(duì)列和交換由containerAdmin聲明,它具有auto-startup =“false”浴井,因此在上下文初始化期間不會(huì)聲明元素晒骇。同樣,由于同樣的原因磺浙,容器也沒有啟動(dòng)洪囤。當(dāng)容器稍后啟動(dòng)時(shí),它使用它來引用containerAdmin來聲明元素伦泥。
批量消息由監(jiān)聽器容器(使用springBatchFormat消息頭)自動(dòng)分段平道。拒絕批次中的任何消息將導(dǎo)致整個(gè)批次被拒絕宙帝。有關(guān)批處理的更多信息农渊,請(qǐng)參閱“批處理”一節(jié)。
Consumer Failure Events(消費(fèi)失敗事件)
從1.5版開始或颊,SimpleMessageListenerContainer每當(dāng)監(jiān)聽器(消費(fèi)者)遇到某種故障時(shí)砸紊,都會(huì)發(fā)布應(yīng)用程序事件。事件ListenerContainerConsumerFailedEvent具有以下屬性:
container - 消費(fèi)者遇到問題的監(jiān)聽器容器囱挑。
reason - 失敗的一個(gè)文本原因醉顽。
fatal - 一個(gè)布爾值表示失敗是否是致命的;與非致命的例外,容器將嘗試重新啟動(dòng)消費(fèi)者平挑,根據(jù)retryInterval游添。
throwable - the Throwable that was caught.
這些事件可以通過實(shí)現(xiàn)ApplicationListener來消耗。
當(dāng)并發(fā)消費(fèi)者大于1時(shí)通熄,系統(tǒng)范圍的事件(如連接失敗)將由所有消費(fèi)者發(fā)布唆涝。
如果一個(gè)消費(fèi)者失敗,因?yàn)橐粋€(gè)如果它的隊(duì)列被專門使用唇辨,默認(rèn)情況下石抡,以及發(fā)布事件,將發(fā)出一個(gè)WARN日志助泽。要更改此日志記錄行為,請(qǐng)?jiān)赟impleMessageListenerContainer的exclusiveConsumerExceptionLogger屬性中提供自定義ConditionalExceptionLogger嚎京。另見“記錄通道關(guān)閉事件”一節(jié)嗡贺。
致命錯(cuò)誤始終記錄在ERROR級(jí)別;這不可修改。
Consumer Tags(消費(fèi)者標(biāo)簽)
從版本1.4.5開始鞍帝,您現(xiàn)在可以提供生成消費(fèi)者標(biāo)簽的策略诫睬。默認(rèn)情況下,消費(fèi)者標(biāo)簽將由代理生成帕涌。
publicinterfaceConsumerTagStrategy{
StringcreateConsumerTag(String queue);
}
隊(duì)列可用摄凡,因此可以(可選地)在標(biāo)簽中使用续徽。
See Section 3.1.15, “Message Listener Container Configuration”.
Annotation-driven Listener Endpoints(注解驅(qū)動(dòng)的監(jiān)聽器端點(diǎn))
介紹
從版本1.4開始,異步接收消息的最簡(jiǎn)單的方法是使用帶注解的監(jiān)聽端點(diǎn)基礎(chǔ)結(jié)構(gòu)亲澡。簡(jiǎn)而言之钦扭,它允許您將托管bean的方法公開為Rabbit監(jiān)聽器端點(diǎn)。
@Component
publicclassMyService{
@RabbitListener(queues ="myQueue")
publicvoidprocessOrder(String data){
? ? ? ? ...
? ? }
}
上述示例的想法是床绪,每當(dāng)org.springframework.amqp.core.Queue“myQueue”上都有可用的消息時(shí)客情,將相應(yīng)地調(diào)用processOrder方法(在這種情況下,與消息的有效內(nèi)容相關(guān))癞己。
注解架構(gòu)使用RabbitListenerContainerFactory為每個(gè)注解方法在幕后創(chuàng)建一個(gè)消息監(jiān)聽器容器膀斋。
在上面的例子中,myQueue必須已經(jīng)存在并被綁定到一些交換痹雅。從版本1.5.0開始仰担,只要應(yīng)用程序上下文中存在RabbitAdmin,隊(duì)列可以自動(dòng)聲明和綁定绩社。
@Component
publicclassMyService{
@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="myQueue", durable ="true"),
exchange =@Exchange(value ="auto.exch", ignoreDeclarationExceptions ="true"),
key ="orderRoutingKey")
? )
publicvoidprocessOrder(String data){
? ? ...
? }
@RabbitListener(bindings =@QueueBinding(
value =@Queue,
exchange =@Exchange(value ="auto.exch"),
key ="invoiceRoutingKey")
? )
publicvoidprocessInvoice(String data){
? ? ...
? }
}
在第一個(gè)示例中摔蓝,如果需要,隊(duì)列myQueue將自動(dòng)聲明(持久)铃将,并與交換機(jī)綁定到路由密鑰项鬼。在第二個(gè)示例中,將聲明并綁定匿名(獨(dú)占劲阎,自動(dòng)刪除)隊(duì)列绘盟。可以提供多個(gè)QueueBinding條目悯仙,允許監(jiān)聽器監(jiān)聽多個(gè)隊(duì)列龄毡。
只有DIRECT,F(xiàn)ANOUT锡垄,TOPIC和HEADERS沦零,這種機(jī)制支持交換類型。當(dāng)需要更多高級(jí)配置時(shí)货岭,請(qǐng)使用正常的@Bean定義
在第一個(gè)例子中路操,請(qǐng)注意ignoreDeclarationExchangeions對(duì)交換。這允許例如綁定到可能具有不同設(shè)置(如內(nèi)部)的現(xiàn)有交換機(jī)千贯。默認(rèn)情況下屯仗,現(xiàn)有交換機(jī)的屬性必須匹配。
從版本1.6開始搔谴,現(xiàn)在可以在@QueueBinding注解中為隊(duì)列魁袜,交換和綁定指定參數(shù)。例如:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="auto.headers", autoDelete ="true",
arguments =@Argument(name ="x-message-ttl", value ="10000",
type ="java.lang.Integer")),
exchange =@Exchange(value ="auto.headers", type = ExchangeTypes.HEADERS, autoDelete ="true"),
? ? ? ? arguments = {
@Argument(name ="x-match", value ="all"),
@Argument(name ="foo", value ="bar"),
@Argument(name ="baz")
? ? ? ? })
)
publicStringhandleWithHeadersExchange(String foo){
? ? ...
}
請(qǐng)注意,x-message-ttl參數(shù)為隊(duì)列設(shè)置為10秒峰弹。由于參數(shù)類型不是String店量,我們必須指定其類型;在這種情況下整數(shù)。與所有這樣的聲明一樣鞠呈,如果隊(duì)列已經(jīng)存在融师,那么這些參數(shù)必須與隊(duì)列上的一致。對(duì)于headers exchange粟按,我們?cè)O(shè)置綁定參數(shù)以匹配頭foo設(shè)置為bar的消息诬滩,并且頭baz必須與任何值一起顯示。 x匹配參數(shù)意味著必須滿足這兩個(gè)條件灭将。
參數(shù)名稱疼鸟,值,和類型可以是財(cái)產(chǎn)的占位符($ {…})或該表達(dá)式(# {…})庙曙。名稱必須解析為字符串空镜;類型表達(dá)式必須解析為類或類的完全限定名。價(jià)值必須解決的東西捌朴,可以被defaultconversionservice的類型(如在上面的例子中x-message-ttl)吴攒。
如果名稱解析為null或空字符串,則忽略該參數(shù)砂蔽。
Meta-Annotations(元注解)
有時(shí)您可能希望為多個(gè)監(jiān)聽器使用相同的配置洼怔。為了減少樣板設(shè)置,您可以使用元注解來創(chuàng)建自己的監(jiān)聽器注解:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings =@QueueBinding(
value =@Queue,
exchange =@Exchange(value ="metaFanout", type = ExchangeTypes.FANOUT)))
public@interfaceMyAnonFanoutListener {
}
publicclassMetaListener{
@MyAnonFanoutListener
publicvoidhandle1(String foo){
? ? ? ? ...
? ? }
@MyAnonFanoutListener
publicvoidhandle2(String foo){
? ? ? ? ...
? ? }
}
在此示例中左驾,由@MyAnonFanoutListener注解創(chuàng)建的每個(gè)監(jiān)聽器將將匿名自動(dòng)刪除隊(duì)列綁定到扇出交換機(jī)的metaFanout镣隶。元注解機(jī)制很簡(jiǎn)單,因?yàn)橛脩舳x的注解上的屬性未被檢查 - 因此您不能從元注解中覆蓋設(shè)置诡右。當(dāng)需要更多高級(jí)配置時(shí)安岂,請(qǐng)使用正常的@Bean定義。
啟用監(jiān)聽器端點(diǎn)注解
要啟用對(duì)@RabbitListener注解的支持帆吻,請(qǐng)將@EnableRabbit添加到您的一個(gè)@Configuration類中域那。
@Configuration
@EnableRabbit
publicclassAppConfig{
@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
? ? ? ? factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
returnfactory;
? ? }
}
默認(rèn)情況下,基礎(chǔ)組件將查找名為rabbitListenerContainerFactory的bean作為工廠用于創(chuàng)建消息監(jiān)聽器容器的源猜煮。在這種情況下次员,忽略RabbitMQ基礎(chǔ)架構(gòu)設(shè)置,可以使用3個(gè)線程的核心輪詢大小和10個(gè)線程的最大池大小來調(diào)用processOrder方法王带。
可以自定義監(jiān)聽器容器工廠以使用每個(gè)注解淑蔚,或者可以通過實(shí)現(xiàn)RabbitListenerConfigurer接口來配置顯式默認(rèn)值。僅當(dāng)至少有一個(gè)端點(diǎn)沒有特定的容器工廠注冊(cè)時(shí)辫秧,才需要默認(rèn)值。有關(guān)詳細(xì)信息和示例被丧,請(qǐng)參閱javadoc盟戏。
如果您喜歡XML配置绪妹,請(qǐng)使用元素。
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
Message Conversion for Annotated Methods
在調(diào)用監(jiān)聽器之前柿究,有兩個(gè)轉(zhuǎn)換步驟在管道中邮旷。第一個(gè)使用MessageConverter將傳入的Spring AMQP消息轉(zhuǎn)換為spring-message消息。當(dāng)調(diào)用目標(biāo)方法時(shí)蝇摸,如有必要婶肩,將消息有效載荷轉(zhuǎn)換為方法參數(shù)類型。
第一步的默認(rèn)MessageConverter是一個(gè)Spring AMQP SimpleMessageConverter貌夕,用于處理轉(zhuǎn)換為String和java.io.Serializable對(duì)象;所有其他的都保留為一個(gè)byte[]律歼。在下面的討論中,我們稱之為消息轉(zhuǎn)換器啡专。
第二步的默認(rèn)轉(zhuǎn)換器是一個(gè)GenericMessageConverter险毁,它委托給轉(zhuǎn)換服務(wù)(DefaultFormattingConversionService的一個(gè)實(shí)例)。在下面的討論中们童,我們稱之為方法參數(shù)轉(zhuǎn)換器畔况。
要更改消息轉(zhuǎn)換器,只需將其作為屬性添加到容器工廠bean中:
@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
? ? ...
factory.setMessageConverter(newJackson2JsonMessageConverter());
? ? ...
returnfactory;
}
這配置了一個(gè)Jackson2轉(zhuǎn)換器慧库,期望頭信息存在跷跪,以引導(dǎo)轉(zhuǎn)換。
您還可以考慮一個(gè)ContentTypeDelegatingMessageConverter齐板,它可以處理不同內(nèi)容類型的轉(zhuǎn)換吵瞻。
在大多數(shù)情況下,除非要使用自定義ConversionService覆积,否則不需要自定義方法參數(shù)轉(zhuǎn)換器听皿。
在1.6之前的版本中,轉(zhuǎn)換JSON的類型信息必須在消息頭中提供宽档,或需要自定義ClassMapper尉姨。從版本1.6開始,如果沒有類型信息頭吗冤,則可以從目標(biāo)方法參數(shù)推斷類型又厉。
此類型推斷僅適用于方法級(jí)別的@RabbitListener。
See the section called “Jackson2JsonMessageConverter” for more information.
如果你想自定義方法參數(shù)轉(zhuǎn)換器椎瘟,你可以這樣做覆致,如下所示:
@Configuration
@EnableRabbit
publicclassAppConfigimplementsRabbitListenerConfigurer{
? ? ...
@Bean
publicDefaultMessageHandlerMethodFactorymyHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory factory =newDefaultMessageHandlerMethodFactory();
factory.setMessageConverter(newGenericMessageConverter(myConversionService()));
returnfactory;
? ? }
@Bean
publicConversionServicemyConversionService(){
DefaultConversionService conv =newDefaultConversionService();
? ? ? ? conv.addConverter(mySpecialConverter());
returnconv;
? ? }
@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrar registrar){
? ? ? ? registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
? ? }
? ? ...
}
重點(diǎn)
對(duì)于多方法監(jiān)聽器(參見“Multi-Method Listeners”一節(jié)),方法選擇是基于消息轉(zhuǎn)換后消息的有效載荷;方法參數(shù)轉(zhuǎn)換器只有在方法被選擇之后才被調(diào)用肺蔚。
Programmatic Endpoint Registration
RabbitListenerEndpoint提供了Rabbit端點(diǎn)的模型煌妈,并負(fù)責(zé)為該模型配置容器。基礎(chǔ)設(shè)施允許您以編程方式配置端點(diǎn)璧诵,除了由RabbitListener注解檢測(cè)到的端點(diǎn)汰蜘。
@Configuration
@EnableRabbit
publicclassAppConfigimplementsRabbitListenerConfigurer{
@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrar registrar){
SimpleRabbitListenerEndpoint endpoint =newSimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
? ? ? ? endpoint.setMessageListener(message -> {
// processing
? ? ? ? });
? ? ? ? registrar.registerEndpoint(endpoint);
? ? }
}
在上面的例子中,我們使用了SimpleRabbitListenerEndpoint之宿,它提供了實(shí)際的MessageListener來調(diào)用族操,但是你也可以構(gòu)建自己的端點(diǎn)變體來描述自定義調(diào)用機(jī)制。
應(yīng)該注意的是比被,您也可以跳過使用@RabbitListener色难,只通過RabbitListenerConfigurer以編程方式注冊(cè)您的端點(diǎn)。
Annotated Endpoint Method Signature
到目前為止等缀,我們已經(jīng)在我們的端點(diǎn)注入了一個(gè)簡(jiǎn)單的String枷莉,但實(shí)際上它可以有一個(gè)非常靈活的方法簽名。我們重寫它以使用自定義標(biāo)題注入Order:
@Component
publicclassMyService{
@RabbitListener(queues ="myQueue")
publicvoidprocessOrder(Order order, @Header("order_type")String orderType){
? ? ? ? ...
? ? }
}
這些是您可以在監(jiān)聽器端點(diǎn)注入的主要元素:
原始的org.springframework.amqp.core.Message项滑。
接收消息的com.rabbitmq.client.Channel
表示傳入AMQP消息的org.springframework.messaging.Message依沮。請(qǐng)注意,此消息包含自定義標(biāo)頭和標(biāo)準(zhǔn)標(biāo)題(由AmqpHeaders定義)枪狂。
從版本1.6開始危喉,入站deliveryMode標(biāo)題現(xiàn)在在名稱為AmqpHeaders.RECEIVED_DELIVERY_MODE的標(biāo)題中可用,而不是AmqpHeaders.DELIVERY_MODE州疾。
@ Header-annotated方法參數(shù)來提取特定的頭值辜限,包括標(biāo)準(zhǔn)AMQP頭。
@ Headers-annotated參數(shù)严蓖,必須也可以分配給java.util.Map以獲取對(duì)所有頭的訪問薄嫡。
-
不被支持的類型(即消息和信道)的非注釋元素被認(rèn)為是有效載荷待侵。您可以通過使用@Payload注釋參數(shù)來使其顯式而昨。您還可以通過添加額外的@Valid來啟用驗(yàn)證。
注入Spring Message消息抽象的能力特別適用于從傳輸特定消息中存儲(chǔ)的所有信息中獲益驾诈,而不依賴于特定于傳輸?shù)腁PI毒姨。
@RabbitListener(queues ="myQueue")
publicvoidprocessOrder(Message order){ ...
}
方法參數(shù)的處理由DefaultMessageHandlerMethodFactory提供哑蔫,可以進(jìn)一步自定義以支持其他方法參數(shù)。轉(zhuǎn)換和驗(yàn)證支持也可以在這里定制弧呐。
例如闸迷,如果我們想在處理之前確保我們的訂單有效,我們可以使用@Valid對(duì)有效負(fù)載進(jìn)行注釋俘枫,并配置必要的驗(yàn)證器腥沽,如下所示:
@Configuration
@EnableRabbit
publicclassAppConfigimplementsRabbitListenerConfigurer{
@Override
publicvoidconfigureRabbitListeners(RabbitListenerEndpointRegistrar registrar){
? ? ? ? registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
? ? }
@Bean
publicDefaultMessageHandlerMethodFactorymyHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory factory =newDefaultMessageHandlerMethodFactory();
? ? ? ? factory.setValidator(myValidator());
returnfactory;
? ? }
}
Listening to Multiple Queues
使用queues屬性時(shí),可以指定關(guān)聯(lián)的容器可以監(jiān)聽多個(gè)隊(duì)列鸠蚪。您可以使用@Header注釋來創(chuàng)建POJO方法可接收消息的隊(duì)列名稱今阳。
@Component
publicclassMyService{
@RabbitListener(queues = {"queue1","queue2"} )
publicvoidprocessOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE)String queue){
? ? ? ? ...
? ? }
}
從版本1.5開始师溅,您可以使用屬性占位符和SpEL來對(duì)隊(duì)列名稱進(jìn)行外部化:
@Component
publicclassMyService{
@RabbitListener(queues ="#{'${property.with.comma.delimited.queue.names}'.split(',')}")
publicvoidprocessOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE)String queue){
? ? ? ? ...
? ? }
}
在版本1.5之前,只能通過這種方式指定一個(gè)隊(duì)列;每個(gè)隊(duì)列需要一個(gè)單獨(dú)的屬性盾舌。
Reply Management
MessageListenerAdapter中的現(xiàn)有支持已經(jīng)允許您的方法具有非空返回類型险胰。在這種情況下,調(diào)用的結(jié)果被封裝在以原始消息的ReplyToAddress頭中指定的地址或在監(jiān)聽器上配置的默認(rèn)地址中發(fā)送的消息中】篌荩現(xiàn)在可以使用消息傳遞抽象的@SendTo注釋來設(shè)置該默認(rèn)地址。
假設(shè)我們的processOrder方法現(xiàn)在應(yīng)該返回一個(gè)OrderStatus棚贾,可以按如下方式寫入自動(dòng)發(fā)送回復(fù):
@RabbitListener(destination ="myQueue")
@SendTo("status")
publicOrderStatusprocessOrder(Order order){
// order processing
returnstatus;
}
如果您需要以傳輸獨(dú)立的方式設(shè)置其他標(biāo)頭窖维,則可以返回一條消息,如下所示:
@RabbitListener(destination ="myQueue")
@SendTo("status")
publicMessageprocessOrder(Order order){
// order processing
returnMessageBuilder
? ? ? ? .withPayload(status)
.setHeader("code",1234)
? ? ? ? .build();
}
@SendTo值被假定為exchange/routingKey之后的回復(fù)exchange和routingKey妙痹,其中可以省略其中一個(gè)部分铸史。有效值為:
1.foo/bar - the replyTo exchange and routingKey.
2.foo/ - the replyTo exchange and default (empty) routingKey.
3.bar or /bar - the replyTo routingKey and default (empty) exchange.
4./ or empty - the replyTo default exchange and default routingKey.
還可以使用@SendTo而不使用value屬性。這種情況等于一個(gè)空的sendTo模式怯伊。 @SendTo僅在入站郵件沒有replyToAddress屬性時(shí)使用琳轿。
從版本1.5開始,@SendTo值可以是一個(gè)bean初始化SpEL Expression耿芹,例如…
@RabbitListener(queues ="test.sendTo.spel")
@SendTo("#{spelReplyTo}")
publicStringcapitalizeWithSendToSpel(String foo){
returnfoo.toUpperCase();
}
...
@Bean
publicStringspelReplyTo(){
return"test.sendTo.reply.spel";
}
表達(dá)式必須求值為一個(gè)String崭篡,它可以是一個(gè)簡(jiǎn)單的隊(duì)列名稱(發(fā)送到默認(rèn)exchange),也可以是如上所述的exchange / routingKey形式吧秕。
#{…}表達(dá)式在初始化時(shí)執(zhí)行一次
對(duì)于動(dòng)態(tài)回復(fù)路由琉闪,消息發(fā)送方應(yīng)包含一個(gè)reply_to消息屬性或使用下面描述的備用運(yùn)行時(shí)Spel表達(dá)式。
從版本1.6開始砸彬,@SendTo可以是在運(yùn)行時(shí)針對(duì)請(qǐng)求和回復(fù)進(jìn)行評(píng)估的Spel表達(dá)式:
@RabbitListener(queues ="test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
publicBarcapitalizeWithSendToSpel(Foo foo){
returnprocessTheFooAndReturnABar(foo);
}
Spel表達(dá)式的運(yùn)行時(shí)性質(zhì)由!{...}分隔符指示颠毙。表達(dá)式的評(píng)估上下文#root對(duì)象具有三個(gè)屬性:
request - o.s.amqp.core.Message請(qǐng)求對(duì)象。
source - 轉(zhuǎn)換后的o.s.messaging.Message<砂碉?>
result - 方法結(jié)果蛀蜜。
上下文具有映射屬性訪問器,標(biāo)準(zhǔn)類型轉(zhuǎn)換器和bean解析器增蹭,允許引用上下文中的其他bean(例如@someBeanName.determineReplyQ(request, result)))滴某。
總而言之,#{...}在初始化期間被評(píng)估一次沪铭,#root對(duì)象是應(yīng)用程序上下文; bean被其名稱引用壮池。在運(yùn)行時(shí)對(duì)每個(gè)消息的!{...}進(jìn)行評(píng)估,其中根對(duì)象具有上述屬性杀怠,并且bean以其名稱引用椰憋,以@為前綴。
Multi-Method Listeners
從版本1.5.0開始赔退,現(xiàn)在可以在類級(jí)別上指定@RabbitListener注釋橙依。與新的@RabbitHandler注釋一起证舟,這允許單個(gè)監(jiān)聽器基于傳入消息的有效載荷類型來調(diào)用不同的方法。這最好用一個(gè)例子來描述:
@RabbitListener(id="multi", queues ="someQueue")
publicclassMultiListenerBean{
@RabbitHandler
@SendTo("my.reply.queue")
publicStringbar(Bar bar){
? ? ? ? ...
? ? }
@RabbitHandler
publicStringbaz(Baz baz){
? ? ? ? ...
? ? }
@RabbitHandler
publicStringqux(@Header("amqp_receivedRoutingKey")String rk, @Payload Qux qux){
? ? ? ? ...
? ? }
}
在這種情況下窗骑,如果轉(zhuǎn)換的有效載荷是Bar女责,Baz或Qux,則會(huì)調(diào)用單獨(dú)的@RabbitHandler方法创译。重要的是要了解系統(tǒng)必須能夠基于有效載荷類型識(shí)別唯一的方法抵知。檢查類型是否具有不具有注釋的單個(gè)參數(shù)的可分配性,或者使用@Payload注釋進(jìn)行注釋软族。請(qǐng)注意刷喜,相同的方法簽名適用于上述方法級(jí)@RabbitListener中討論的方法。
請(qǐng)注意立砸,必須在每個(gè)方法(如果需要)上指定@SendTo;它在類級(jí)別不支持掖疮。
@Repeatable @RabbitListener
從版本1.6開始,@RabbitListener注釋被標(biāo)記為@Repeatable颗祝。這意味著注釋可以多次出現(xiàn)在相同的注釋元素(方法或類)上浊闪。在這種情況下,為每個(gè)注釋創(chuàng)建一個(gè)單獨(dú)的監(jiān)聽器容器螺戳,每個(gè)注釋都調(diào)用相同的監(jiān)聽器@Bean搁宾。可重復(fù)注釋可與Java 8或更高版本一起使用;當(dāng)使用Java 7或更早版本時(shí)倔幼,通過使用@RabbitListeners“容器”注釋可以獲得與@RabbitListener注釋數(shù)組相同的效果猛铅。
Proxy @RabbitListener and Generics
如果您的服務(wù)旨在被代理(例如,在@Transactional的情況下)凤藏,當(dāng)接口具有通用參數(shù)時(shí)奸忽,有一些注意事項(xiàng)。具有通用界面和特定實(shí)現(xiàn)揖庄,例如:
interfaceTxService
{
Stringhandle(P payload, String header);
}
staticclassTxServiceImplimplementsTxService{
@Override
@RabbitListener(...)
publicStringhandle(Foo foo, String rk){
? ? ? ? ...
? ? }
}
您必須切換到CGLIB目標(biāo)類代理栗菜,因?yàn)閷?shí)際實(shí)現(xiàn)的接口句柄方法是一種橋接方法。在事務(wù)管理的情況下蹄梢,使用注釋選項(xiàng) - @EnableTransactionManagement(proxyTargetClass = true)來配置CGLIB的使用疙筹。在這種情況下,所有注釋必須在實(shí)現(xiàn)中的目標(biāo)方法上聲明:
staticclassTxServiceImplimplementsTxService{
@Override
@Transactional
@RabbitListener(...)
publicStringhandle(@Payload Foo foo, @Header("amqp_receivedRoutingKey")String rk){
? ? ? ? ...
? ? }
}
Container Management
為注釋創(chuàng)建的容器未在應(yīng)用程序上下文中注冊(cè)禁炒。您可以通過調(diào)用RabbitListenerEndpointRegistry?bean上的getListenerContainers()來獲取所有容器的集合而咆。然后,您可以遍歷此集合幕袱,例如暴备,停止/啟動(dòng)所有容器或調(diào)用注冊(cè)表本身的Lifecycle方法,這將調(diào)用每個(gè)容器上的操作们豌。
您還可以使用其id來獲取對(duì)單個(gè)容器的引用涯捻,使用getListenerContainer(String id);例如上面的代碼段創(chuàng)建的容器的registry.getListenerContainer(“multi”)浅妆。
從1.5.2版開始,您可以使用getListenerContainerIds()獲取注冊(cè)容器的id障癌。
從1.5版開始凌外,您現(xiàn)在可以將組分配給RabbitListener端點(diǎn)上的容器。這提供了一種獲取對(duì)容器子集的引用的機(jī)制;添加一個(gè)組屬性會(huì)使一個(gè)類型為Collection的bean注冊(cè)到具有組名稱的上下文中涛浙。
Threading and Asynchronous Consumers(線程和異步消費(fèi)者)
異步消費(fèi)者涉及到多線程康辑。
在SimpleMessageListener中配置的TaskExecutor的線程用于在RabbitMQ Client發(fā)送新消息時(shí)調(diào)用MessageListener。如果未配置轿亮,則使用SimpleAsyncTaskExecutor晾捏。如果使用了線程池,請(qǐng)確保池大小足以處理配置的并發(fā)哀托。
當(dāng)使用默認(rèn)的SimpleAsyncTaskExecutor時(shí),對(duì)于調(diào)用監(jiān)聽器的線程劳秋,監(jiān)聽器容器beanName用作threadNamePrefix仓手。這對(duì)日志分析很有用;通常建議在日志追蹤器配置中始終包含線程名稱。當(dāng)TaskExecutor通過SimpleMessageListenerContainer上的taskExecutor屬性特別提供時(shí)玻淑,它將按原樣使用嗽冒,無需修改。建議您使用類似的技術(shù)來命名由自定義TaskExecutor bean定義創(chuàng)建的線程补履,以幫助日志消息中的線程標(biāo)識(shí)添坊。
在CachingConnectionFactory中配置的執(zhí)行程序在創(chuàng)建連接時(shí)被傳遞到RabbitMQ客戶端,其線程用于將新消息傳遞給監(jiān)聽器容器箫锤。在撰寫本文時(shí)贬蛙,如果未配置,客戶端將使用池大小為5的內(nèi)部線程池執(zhí)行程序谚攒。
RabbitMQ客戶端使用ThreadFactory創(chuàng)建用于低級(jí)I / O(套接字)操作的線程阳准。要修改此工廠,您需要配置底層RabbitMQ ConnectionFactory馏臭,如“Configuring the Underlying Client Connection Factory(配置底層客戶端連接工廠)”一節(jié)中所述野蝇。
Detecting Idle Asynchronous Consumers(檢測(cè)空閑異步消費(fèi)者)
雖然效率高, 但是異步消費(fèi)者的一個(gè)問題是檢測(cè)他們什么時(shí)候空閑 - 如果沒有消息到達(dá)一段時(shí)間括儒,用戶可能需要采取一些行動(dòng)绕沈。
從版本1.6開始,現(xiàn)在可以將監(jiān)聽器容器配置為發(fā)布ListenerContainerIdleEvent帮寻,當(dāng)有一段時(shí)間沒有消息傳遞乍狐。當(dāng)容器空閑時(shí),每個(gè)idleEventInterval毫秒將發(fā)布一個(gè)事件澜躺。
要配置此功能蝉稳,請(qǐng)?jiān)谌萜魃显O(shè)置idleEventInterval:
xml
...
idle-event-interval="60000"
...
? ? ? ? >
Java
@Bean
publicSimpleMessageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);
? ? ...
container.setIdleEventInterval(60000L);
? ? ...
returncontainer;
}
@RabbitListener
@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
? ? factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
? ? ...
returnfactory;
}
在每種情況下,容器空閑時(shí)掘鄙,每分鐘會(huì)發(fā)布一次事件耘戚。
Event Consumption
您可以通過實(shí)現(xiàn)ApplicationListener來捕獲這些事件,無論是通用監(jiān)聽器還是收件人操漠,只能收到此特定事件收津。您還可以使用Spring Framework 4.2中引用的@EventListener。
以下示例將@RabbitListener和@EventListener組合到一個(gè)類中浊伙。了解應(yīng)用程序監(jiān)聽器將獲取所有容器的事件非常重要撞秋,因此如果要根據(jù)哪個(gè)容器空閑來執(zhí)行特定操作,則可能需要檢查監(jiān)聽器ID嚣鄙。您也可以為此使用@EventListener條件吻贿。
事件有4個(gè)屬性:
source 監(jiān)聽器容器實(shí)例
id 監(jiān)聽器id(或容器bean名稱)
idleTime 當(dāng)事件發(fā)布時(shí),容器空閑的時(shí)間
queueNames 容器監(jiān)聽到的隊(duì)列的名稱
publicclassListener{
@RabbitListener(id="foo", queues="#{queue.name}")
publicStringlisten(String foo){
returnfoo.toUpperCase();
? ? }
@EventListener(condition ="event.listenerId == 'foo'")
publicvoidonApplicationEvent(ListenerContainerIdleEvent event){
? ? ? ? ...
? ? }
}
注意
事件聽眾將看到所有容器的事件;因此哑子,在上面的示例中舅列,我們縮小了基于監(jiān)聽器ID接收的事件。
警告
如果要使用空閑事件來停止lister容器卧蜓,則不應(yīng)在調(diào)用監(jiān)聽器的線程上調(diào)用container.stop()帐要,這將導(dǎo)致延遲和不必要的日志消息。相反弥奸,您應(yīng)該將事件切換到另一個(gè)線程榨惠,然后可以停止容器。
3.1.7 Message Converters(消息轉(zhuǎn)換器)
AmqpTemplate還定義了幾種用于發(fā)送和接收將委托給MessageConverter的消息的方法盛霎。 MessageConverter本身很簡(jiǎn)單赠橙。它為每個(gè)方向提供單一方法:一種用于轉(zhuǎn)換為消息,另一種用于從消息轉(zhuǎn)換愤炸。請(qǐng)注意简烤,轉(zhuǎn)換為消息時(shí),除了對(duì)象之外摇幻,還可以提供屬性横侦。 “object”參數(shù)通常對(duì)應(yīng)于消息體。
publicinterfaceMessageConverter{
MessagetoMessage(Object object, MessageProperties messageProperties)
throwsMessageConversionException;
ObjectfromMessage(Message message)throwsMessageConversionException;
}
以下列出了AmqpTemplate中的相關(guān)消息發(fā)送方法绰姻。它們比以前討論的方法簡(jiǎn)單枉侧,因?yàn)樗鼈儾恍枰狹essage實(shí)例。相反狂芋,MessageConverter負(fù)責(zé)通過將提供的對(duì)象轉(zhuǎn)換為消息體的字節(jié)數(shù)組榨馁,然后添加任何提供的MessageProperties來“創(chuàng)建”每個(gè)消息。
voidconvertAndSend(Object message)throwsAmqpException;
voidconvertAndSend(String routingKey, Object message)throwsAmqpException;
voidconvertAndSend(String exchange, String routingKey, Object message)
throwsAmqpException;
voidconvertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throwsAmqpException;
voidconvertAndSend(String routingKey, Object message,
? ? MessagePostProcessor messagePostProcessor)throwsAmqpException
;
voidconvertAndSend(String exchange, String routingKey, Object message,
? ? MessagePostProcessor messagePostProcessor)throwsAmqpException
;
在接收端帜矾,只有兩種方法:一種接受隊(duì)列名稱翼虫,一種依賴于模板的“隊(duì)列”屬性已被設(shè)置屑柔。
ObjectreceiveAndConvert()throwsAmqpException;
ObjectreceiveAndConvert(String queueName)throwsAmqpException;
“異步消費(fèi)者”一節(jié)中提到的MessageListenerAdapter也使用MessageConverter。
MessageConverter策略的默認(rèn)實(shí)現(xiàn)稱為SimpleMessageConverter珍剑。如果您沒有顯式配置替代方案掸宛,這將是RabbitTemplate實(shí)例使用的轉(zhuǎn)換器。它處理基于文本的內(nèi)容招拙,序列化的Java對(duì)象和簡(jiǎn)單的字節(jié)數(shù)組唧瘾。
Converting From a Message
如果輸入消息的內(nèi)容類型以“text”(例如“text / plain”)開頭,它還將檢查content-encoding屬性别凤,以確定將Message body字節(jié)數(shù)組轉(zhuǎn)換為Java String時(shí)要使用的字符集饰序。如果在輸入消息中未設(shè)置content-encoding屬性,則默認(rèn)情況下將使用“UTF-8”字符集规哪。如果需要覆蓋該默認(rèn)設(shè)置求豫,可以配置SimpleMessageConverter的實(shí)例,設(shè)置其“defaultCharset”屬性诉稍,然后將其注入到RabbitTemplate實(shí)例中蝠嘉。
如果輸入Message的content-type屬性值被設(shè)置為“application / x-java-serialized-object”,那么SimpleMessageConverter將嘗試將字節(jié)數(shù)組反序列化(再水化)為Java對(duì)象均唉。雖然這對(duì)于簡(jiǎn)單的原型設(shè)計(jì)可能是有用的,但一般不建議依賴Java序列化肚菠,因?yàn)樗鼤?huì)導(dǎo)致生產(chǎn)者和消費(fèi)者之間的緊密耦合舔箭。當(dāng)然,它也排除了非Java系統(tǒng)在任何一方的使用蚊逢。使用AMQP作為線程協(xié)議层扶,不幸的是,通過這樣的限制可以損失大部分優(yōu)勢(shì)烙荷。在接下來的兩節(jié)中镜会,我們將探討一些傳遞豐富域?qū)ο髢?nèi)容的方法,而不依賴于Java序列化终抽。
對(duì)于所有其他內(nèi)容類型戳表,SimpleMessageConverter將直接將消息體內(nèi)容作為字節(jié)數(shù)組返回。
有關(guān)重要信息昼伴,請(qǐng)參閱“Java Deserialization”一節(jié)匾旭。
Converting To a Message
當(dāng)從任意Java對(duì)象轉(zhuǎn)換為消息時(shí),SimpleMessageConverter同樣處理字節(jié)數(shù)組圃郊,字符串和可序列化實(shí)例价涝。它會(huì)將每個(gè)字節(jié)轉(zhuǎn)換為字節(jié)(在字節(jié)數(shù)組的情況下,沒有任何轉(zhuǎn)換)沐寺,并且將相應(yīng)地設(shè)置content-type屬性沼沈。如果要轉(zhuǎn)換的對(duì)象與這些類型不匹配,則消息體將為空若治。
該轉(zhuǎn)換器類似于SimpleMessageConverter居兆,除了可以使用其他Spring Framework Serializer和Deserializer實(shí)現(xiàn)進(jìn)行配置覆山,以實(shí)現(xiàn)應(yīng)用程序/ x-java-serialized-object轉(zhuǎn)換。
Converting to a Message
如上一節(jié)所述史辙,通常不推薦依靠Java序列化汹买。一種比較常見的替代方法是在不同語(yǔ)言和平臺(tái)上更加靈活和便攜,是JSON(JavaScript Object Notation)聊倔』薇校可以在任何RabbitTemplate實(shí)例上配置轉(zhuǎn)換器,以覆蓋其SimpleMessageConverter默認(rèn)值的使用耙蔑。 Jackson2JsonMessageConverter使用com.fasterxml.jackson 2.x庫(kù)见妒。
如上所示,Jackson2JsonMessageConverter默認(rèn)使用DefaultClassMapper甸陌。類型信息被添加到MessageProperties(并從中檢索)须揣。如果入站郵件在MessageProperties中不包含類型信息,但您知道預(yù)期的類型钱豁,則可以使用defaultType屬性配置靜態(tài)類型
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
Converting from a Message
根據(jù)發(fā)送系統(tǒng)添加到標(biāo)題的類型信息耻卡,將入站消息轉(zhuǎn)換為對(duì)象。
在1.6之前的版本中牲尺,如果類型信息不存在卵酪,轉(zhuǎn)換將失敗。從版本1.6開始谤碳,如果類型信息丟失溃卡,轉(zhuǎn)換器將使用Jackson默認(rèn)值(通常為map)轉(zhuǎn)換JSON。
此外蜒简,從版本1.6開始瘸羡,當(dāng)使用@RabbitListener注釋(在方法上)時(shí),推斷的類型信息將添加到MessageProperties中;這允許轉(zhuǎn)換器轉(zhuǎn)換為目標(biāo)方法的參數(shù)類型搓茬。這僅適用于沒有注釋的一個(gè)參數(shù)或@Payload注釋的單個(gè)參數(shù)犹赖。分析期間忽略消息類型的參數(shù)。
注意:
默認(rèn)情況下卷仑,推斷的類型信息將覆蓋由發(fā)送系統(tǒng)創(chuàng)建的入站__TypeId__和相關(guān)頭冷尉。這允許接收系統(tǒng)自動(dòng)轉(zhuǎn)換成不同的域?qū)ο蟆_@僅適用于參數(shù)類型為具體(不是抽象或接口)或來自java.util包的情況系枪。在所有其他情況下雀哨,將使用__TypeId__和相關(guān)的標(biāo)題。在某些情況下,您可能希望覆蓋默認(rèn)行為雾棺,并始終使用__TypeId__信息膊夹。例如,假設(shè)你有一個(gè)@RabbitListener捌浩,它接受一個(gè)Foo參數(shù)放刨,但該消息包含一個(gè)Bar,它是Foo的一個(gè)子類(這是具體的)尸饺。推斷的類型將不正確进统。為了處理這種情況,將Jackson2JsonMessageConverter上的TypePrecedence屬性設(shè)置為TYPE_ID浪听,而不是默認(rèn)的INFERRED螟碎。該屬性實(shí)際上是在轉(zhuǎn)換器的DefaultJackson2JavaTypeMapper上,但為了方便起見迹栓,轉(zhuǎn)換器上提供了一個(gè)setter掉分。如果您注入自定義類型的映射器,則應(yīng)該在映射器上設(shè)置屬性克伊。
從消息轉(zhuǎn)換時(shí)酥郭,傳入的MessageProperties.getContentType()必須符合JSON(使用邏輯contentType.contains(“json”))。否則愿吹,WARN日志消息無法轉(zhuǎn)換帶有content-type[...]的傳入消息不从,并且發(fā)送message.getBody()?- 作為byte[]返回。因此犁跪,為了滿足消費(fèi)者方面的Jackson2JsonMessageConverter要求椿息,生產(chǎn)者必須添加contentType消息屬性,例如作為application/json耘拇,text/x-json或者只是使用Jackson2JsonMessageConverter撵颊,它將自動(dòng)設(shè)置標(biāo)題宇攻。
@RabbitListener
publicvoidfoo(Foo foo){...}
@RabbitListener
publicvoidfoo(@Payload Foo foo, @Header("amqp_consumerQueue")String queue){...}
@RabbitListener
publicvoidfoo(Foo foo, o.s.amqp.core.Message message){...}
@RabbitListener
publicvoidfoo(Foo foo, o.s.messaging.Message message){...}
@RabbitListener
publicvoidfoo(Foo foo, String bar){...}
@RabbitListener
publicvoidfoo(Foo foo, o.s.messaging.Message message){...}
在前四個(gè)情況下惫叛,轉(zhuǎn)換器將嘗試轉(zhuǎn)換為Foo類型。第五個(gè)例子是無效的逞刷,因?yàn)槲覀儫o法確定哪個(gè)參數(shù)應(yīng)該接收消息有效載荷嘉涌。在第六個(gè)例子中,由于通用類型是通配符夸浅,Jackson的默認(rèn)值將會(huì)被應(yīng)用仑最。
但是,您可以創(chuàng)建一個(gè)自定義轉(zhuǎn)換器帆喇,并使用targetMethod消息屬性來決定將JSON轉(zhuǎn)換為哪種類型警医。
注意
只有在方法級(jí)別聲明@RabbitListener注釋時(shí),才能實(shí)現(xiàn)此類型推斷。使用類級(jí)別的@RabbitListener预皇,轉(zhuǎn)換后的類型用于選擇要調(diào)用哪個(gè)@RabbitHandler方法侈玄。因此,基礎(chǔ)架構(gòu)提供了可以由自定義轉(zhuǎn)換器用于確定類型的targetObject消息屬性吟温。
另一個(gè)選擇是MarshallingMessageConverter序仙。它委托Spring OXM庫(kù)的Marshaller和Unmarshaller策略接口的實(shí)現(xiàn)。在配置方面鲁豪,最常見的是提供構(gòu)造函數(shù)參數(shù)潘悼,因?yàn)镸arshaller的大多數(shù)實(shí)現(xiàn)也將實(shí)現(xiàn)Unmarshaller。
ContentTypeDelegatingMessageConverter
該類在1.4.2版本中引入爬橡,并允許根據(jù)MessageProperties中的內(nèi)容類型屬性委派給特定的MessageConverter治唤。默認(rèn)情況下,如果沒有contentType屬性堤尾,或者與沒有配置的轉(zhuǎn)換器匹配的值肝劲,它將委托給SimpleMessageConverter。
注意
從不受信任的源反序列化java對(duì)象時(shí)郭宝,可能存在一個(gè)漏洞辞槐。如果您使用content-type?application/x-java-serialized-object接受來自不受信任來源的郵件,則應(yīng)考慮配置哪些包/類被允許反序列化粘室。這適用于SimpleMessageConverter和SerializerMessageConverter榄檬,當(dāng)它被配置為使用DefaultDeserializer?- 隱式或通過配置。默認(rèn)情況下衔统,白名單為空鹿榜,表示所有類將被反序列化。您可以設(shè)置模式列表锦爵,如foo.*舱殿,foo.bar.Baz或*.MySafeClass。將按順序檢查模式险掀,直到找到匹配項(xiàng)沪袭。如果沒有匹配,則拋出SecurityException樟氢。使用這些轉(zhuǎn)換器上的whiteListPatterns屬性設(shè)置模式冈绊。
MessagePropertiesConverter策略接口用于在Rabbit Client BasicProperties和Spring AMQP MessageProperties之間進(jìn)行轉(zhuǎn)換。默認(rèn)實(shí)現(xiàn)(DefaultMessagePropertiesConverter)通常足以滿足大多數(shù)用途埠啃,但如果需要死宣,您可以實(shí)現(xiàn)自己的。當(dāng)大小不大于1024字節(jié)時(shí)碴开,默認(rèn)屬性轉(zhuǎn)換器將將LongString類型的BasicProperties元素轉(zhuǎn)換為String毅该。較大的LongString不會(huì)轉(zhuǎn)換(見下文)博秫。可以使用構(gòu)造函數(shù)參數(shù)覆蓋此限制眶掌。
從版本1.6開始台盯,長(zhǎng)度超過長(zhǎng)字符串限制的標(biāo)題(默認(rèn)1024)現(xiàn)在默認(rèn)由DefaultMessagePropertiesConverter保留為L(zhǎng)ongString。您可以通過getBytes[]畏线,toString()或getStream()方法訪問內(nèi)容静盅。
以前,DefaultMessagePropertiesConverter將這樣的頭部轉(zhuǎn)換為DataInputStream(實(shí)際上它只是引用了LongString的DataInputStream)寝殴。在輸出時(shí)蒿叠,該標(biāo)頭未被轉(zhuǎn)換(除了一個(gè)字符串,例如蚣常,通過調(diào)用流上的toString()市咽,java.io.DataInputStream@1d057a39)。
大量傳入的LongString headers現(xiàn)在在輸出上也被正確地“轉(zhuǎn)換”(默認(rèn)情況下)抵蚊。
提供了一個(gè)新的構(gòu)造函數(shù)施绎,允許您將轉(zhuǎn)換器配置為如前所述:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
*@paramlongStringLimit the limit.
*@paramconvertLongLongStrings LongString when false,
* DataInputStream when true.
*@since1.6
*/
publicDefaultMessagePropertiesConverter(intlongStringLimit,booleanconvertLongLongStrings){ ... }
從1.6版開始,一個(gè)新的屬性correlationIdString已經(jīng)添加到MessageProperties中贞绳。以前谷醉,當(dāng)轉(zhuǎn)換為RabbitMQ客戶端使用的BasicProperties時(shí),會(huì)執(zhí)行不必要的byte[]<->String轉(zhuǎn)換冈闭,因?yàn)镸essageProperties.correlationId是一個(gè)byte[]俱尼,但是BasicProperties使用一個(gè)String。 (最終萎攒,RabbitMQ客戶端使用UTF-8將String轉(zhuǎn)換為字節(jié)以輸入?yún)f(xié)議消息)遇八。
為了提供最大的向后兼容性,將新的屬性correlationIdPolicy添加到DefaultMessagePropertiesConverter中耍休。這需要一個(gè)DefaultMessagePropertiesConverter.CorrelationIdPolicy枚舉參數(shù)刃永。默認(rèn)情況下,它被設(shè)置為BYTES羊精,它復(fù)制以前的行為斯够。
入站消息:
STRING - 只有correlationIdString屬性被映射
BYTES - 只有correlationId屬性被映射
BOTH - 兩個(gè)屬性都被映射
出站消息:
STRING - 只有correlationIdString屬性被映射
BYTES - 只有correlationId屬性被映射
BOTH - 兩個(gè)屬性將被考慮,String屬性優(yōu)先
從版本1.6開始园匹,入站deliveryMode屬性不再映射到MessageProperties.deliveryMode雳刺,而是映射到MessageProperties.receivedDeliveryMode劫灶。此外裸违,入站userId屬性不再映射到MessageProperties.userId,而是映射到MessageProperties.receivedUserId本昏。如果將相同的MessageProperties對(duì)象用于出站消息供汛,則這些更改是為了避免這些屬性的意外傳播。
3.1.8 Modifying Messages - Compression and More(修改消息、壓縮)
可以在注解配置的消息接收之前或消息發(fā)送之后對(duì)消息內(nèi)容進(jìn)行修改
從第3.1.7節(jié)“消息轉(zhuǎn)換器”可以看出怔昨,在AmqpTemplate?convertAndReceive操作中雀久,您可以提供一個(gè)MessagePostProcessor。
例如趁舀,在您的POJO被轉(zhuǎn)換之后赖捌,MessagePostProcessor使您能夠在消息中設(shè)置自定義headers或properties。
從版本1.4.2開始矮烹,其他擴(kuò)展點(diǎn)已添加到RabbitTemplate?-?setBeforePublishPostProcessors()和setAfterReceivePostProcessors()中越庇。第一個(gè)是后臺(tái)處理器在發(fā)送到RabbitMQ之前立即運(yùn)行。當(dāng)使用批處理(參見“批處理”一節(jié))時(shí)奉狈,會(huì)在批量組裝之后并在發(fā)送批處理之前調(diào)用該批處理卤唉。第二個(gè)是在收到消息后立即調(diào)用。
這些擴(kuò)展點(diǎn)用于壓縮等功能仁期,為此桑驱,提供了幾個(gè)MessagePostProcessor:
用于消息發(fā)送之前處理的
GZipPostProcessor
ZipPostProcessor
用于消息接收之前處理的
GUnzipPostProcessor
UnzipPostProcessor
SimpleMessageListenerContainer也有一個(gè)setAfterReceivePostProcessors()方法,允許在容器接收到消息之后執(zhí)行解壓縮跛蛋。
3.1.9 Request/Reply Messaging(請(qǐng)求/回復(fù)消息)
AmqpTemplate還提供了各種sendAndReceive方法熬的,它們接受與單向發(fā)送操作(exchange,routingKey和Message)相同的參數(shù)選項(xiàng)赊级。這些方法對(duì)于請(qǐng)求/回復(fù)方案非常有用悦析,因?yàn)樗鼈冊(cè)诎l(fā)送之前處理必要的“回復(fù)”屬性的配置,并且可以在為此目的在內(nèi)部創(chuàng)建的排他隊(duì)列上監(jiān)聽回復(fù)消息此衅。
類似的請(qǐng)求/應(yīng)答MessageConverter的方法也可以應(yīng)用于請(qǐng)求和回復(fù)强戴。
這些方法是convertSendAndReceive命名。有關(guān)更多詳細(xì)信息挡鞍,請(qǐng)參閱AmqpTemplate的Javadoc骑歹。
默認(rèn)情況下,發(fā)送和接收方法將在5秒后超時(shí)墨微,并返回null道媚。這可以通過設(shè)置
replyTimeout屬性來修改。從版本1.5開始翘县,如果將mandatory屬性設(shè)置為true(或mandatory-expression對(duì)特定消息計(jì)算為true)最域,則如果消息無法傳遞到隊(duì)列,則將拋出AmqpMessageReturnedException锈麸。這個(gè)異常已經(jīng)返回了Message镀脂,replyCode忘伞,replyText屬性薄翅,以及用于發(fā)送的exchange和routingKey沙兰。
此功能使用發(fā)布者返回,并通過在CachingConnectionFactory上將publisherReturns設(shè)置為true來啟用(請(qǐng)參閱“Publisher Confirms and Returns”一節(jié))翘魄。此外鼎天,您不能在RabbitTemplate中注冊(cè)自己的ReturnCallback
RabbitMQ Direct reply-to(RabbitMQ直接回復(fù))
從版本3.4.0開始,RabbitMQ服務(wù)器現(xiàn)在支持直接回復(fù);這是取消固定應(yīng)答隊(duì)列的主要原因(以避免為每個(gè)請(qǐng)求創(chuàng)建一個(gè)臨時(shí)隊(duì)列)暑竟。從Spring AMQP版本1.4.1開始斋射,默認(rèn)情況下將使用直接回復(fù)(如果服務(wù)器支持),而不是創(chuàng)建臨時(shí)回復(fù)隊(duì)列但荤。當(dāng)沒有提供replyQueue(或者使用名稱amq.rabbitmq.reply-to設(shè)置)時(shí)绩鸣,RabbitTemplate將自動(dòng)檢測(cè)是否支持Direct reply-to,并使用它或回退到使用臨時(shí)回復(fù)隊(duì)列纱兑。當(dāng)使用直接回復(fù)時(shí)呀闻,不需要回復(fù)監(jiān)聽器,不需要配置潜慎。
命名隊(duì)列(不同于amq.rabbitmq.reply-to)支持回復(fù)監(jiān)聽器捡多,允許控制回復(fù)并發(fā)等。
從1.6版開始铐炫,如果由于某些原因希望為每個(gè)回復(fù)使用臨時(shí)的垒手、排他性的自動(dòng)刪除隊(duì)列時(shí),請(qǐng)將useTemporaryReplyQueues屬性設(shè)置為true倒信。如果您設(shè)置了replyAddress科贬,則此屬性將被忽略。
可以通過對(duì)RabbitTemplate進(jìn)行子類化并覆蓋useDirectReplyTo()來更改是否使用直接回復(fù)鳖悠。該方法只被調(diào)用一次;在發(fā)送第一個(gè)請(qǐng)求時(shí)榜掌。
Message Correlation With A Reply Queue(與回復(fù)隊(duì)列的消息相關(guān))
當(dāng)使用固定的應(yīng)答隊(duì)列(不同于amq.rabbitmq.reply-to)時(shí)陕靠,需要提供相關(guān)數(shù)據(jù)狼犯,以便可以將請(qǐng)求與請(qǐng)求相關(guān)聯(lián)钉赁。請(qǐng)參閱RabbitMQ遠(yuǎn)程過程調(diào)用(RPC)潦闲。默認(rèn)情況下,標(biāo)準(zhǔn)correlationId屬性將用于保存相關(guān)數(shù)據(jù)田柔。但是舌界,如果要使用自定義屬性來保存關(guān)聯(lián)數(shù)據(jù)综芥,可以在上設(shè)置correlation-key屬性九妈。將屬性顯式設(shè)置為correlationId與默認(rèn)屬性相同反砌。當(dāng)然,客戶端和服務(wù)器必須使用相同的頭相關(guān)數(shù)據(jù)萌朱。
Spring AMQP版本1.1對(duì)此數(shù)據(jù)使用了一個(gè)自定義屬性spring_reply_correlation宴树。如果您希望使用當(dāng)前版本恢復(fù)此行為,或許要使用1.1保持與其他應(yīng)用程序的兼容性嚷兔,則必須將屬性設(shè)置為spring_reply_correlation森渐。
Reply Listener Container(回復(fù)監(jiān)聽容器)
在3.4.0之前使用RabbitMQ版本時(shí),每個(gè)回復(fù)都使用一個(gè)新的臨時(shí)隊(duì)列冒晰。但是同衣,可以在模板上配置單個(gè)應(yīng)答隊(duì)列,這樣可以更有效壶运,并且還可以在該隊(duì)列中設(shè)置參數(shù)耐齐。但是,在這種情況下蒋情,您還必須提供一個(gè)子元素埠况。此元素為回復(fù)隊(duì)列提供監(jiān)聽器容器,模板為監(jiān)聽器棵癣。除了連接工廠和消息轉(zhuǎn)換器之外辕翰,元素上允許使用上允許的所有3.1.15節(jié)“Message Listener容器配置”屬性,該屬性從模板的配置繼承狈谊。
如果您運(yùn)行多個(gè)應(yīng)用程序?qū)嵗蚴褂枚鄠€(gè)RabbitTemplate喜命,則必須為每個(gè)RabbitTemplate使用唯一的應(yīng)答隊(duì)列,否則RabbitMQ無法從隊(duì)列中選擇消息河劝,因此壁榕,如果它們都使用相同的隊(duì)列,則每個(gè)實(shí)例將競(jìng)爭(zhēng)回復(fù)而不一定收到自己的赎瞎。
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
雖然容器和模板共享連接工廠牌里,但它們不共享通道,因此請(qǐng)求和回復(fù)不會(huì)在同一事務(wù)中執(zhí)行(如果是事務(wù)性的)务甥。
在版本1.5.0之前牡辽,reply-address屬性不可用,回復(fù)始終使用默認(rèn)exchage和reply-queue名稱作為routing key敞临。這仍然是默認(rèn)值催享,但您現(xiàn)在可以指定新的reply-address屬性。reply-address可以包含一個(gè)形式為?/?的地址哟绊,并且回復(fù)將被路由到指定的交換機(jī)并路由到與routing key綁定的隊(duì)列因妙。reply-address優(yōu)先于reply-queue。必須將配置為單獨(dú)的組件票髓,當(dāng)只有reply-address正在使用時(shí)攀涵,無論如何,reply-address和reply-queue(或上的隊(duì)列屬性)必須引用在邏輯上相同的隊(duì)列洽沟。
在這個(gè)配置中,我們使用SimpleListenerContainer收到回復(fù);當(dāng)使用定義模板時(shí)以故,如上所示,解析器將模板中的 container和wires定義為監(jiān)聽器裆操。
當(dāng)模板不使用固定的replyQueue(或正在使用Direct reply-to - 請(qǐng)參閱“RabbitMQ Direct reply-to”一節(jié))時(shí)怒详,不需要監(jiān)聽器容器炉媒。直接回復(fù)是使用RabbitMQ 3.4.0或更高版本的首選機(jī)制。
如果您將RabbitTemplate定義為昆烁,或者使用@Configuration類將其定義為@Bean吊骤,或者以編程方式創(chuàng)建模板,則需要自己定義并連接回復(fù)監(jiān)聽器容器静尼。如果您無法執(zhí)行此操作白粉,模板將永遠(yuǎn)不會(huì)收到回復(fù),并將最終超時(shí)并返回null作為對(duì)sendAndReceive方法的調(diào)用的回復(fù)鼠渺。
從1.5版開始鸭巴,RabbitTemplate將檢測(cè)它是否被配置為MessageListener以接收回復(fù)。如果沒有拦盹,嘗試發(fā)送和接收具有回復(fù)地址的消息將失敗鹃祖,并顯示IllegalStateException(因?yàn)椴粫?huì)收到回復(fù))。
此外普舆,如果使用一個(gè)簡(jiǎn)單的replyAddress(隊(duì)列名稱)惯豆,那么回復(fù)監(jiān)聽器容器將驗(yàn)證它正在監(jiān)聽具有相同名稱的隊(duì)列。如果回復(fù)地址是exchange和routing key奔害,并且調(diào)試日志消息將被寫入楷兽,則無法執(zhí)行此檢查。
當(dāng)您自己接收回復(fù)監(jiān)聽器和模板時(shí)华临,確保模板的replyQueue和容器的隊(duì)列(或queueNames)屬性引用相同的隊(duì)列是非常重要的芯杀。模板將回復(fù)隊(duì)列插入到出站消息replyTo屬性中。
以下是如何手動(dòng)連接bean的示例雅潭。
@Bean
publicRabbitTemplateamqpTemplate(){
RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory());
? ? rabbitTemplate.setMessageConverter(msgConv());
? ? rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
returnrabbitTemplate;
}
@Bean
publicSimpleMessageListenerContainerreplyListenerContainer(){
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
? ? container.setConnectionFactory(connectionFactory());
? ? container.setQueues(replyQueue());
? ? container.setMessageListener(amqpTemplate());
returncontainer;
}
@Bean
publicQueuereplyQueue(){
returnnewQueue("my.reply.queue");
}
該測(cè)試用例顯示了一個(gè)連接了固定應(yīng)答隊(duì)列的RabbitTemplate的完整示例揭厚,以及處理請(qǐng)求并返回答復(fù)的“遠(yuǎn)程”監(jiān)聽器容器。
當(dāng)回復(fù)超時(shí)(replyTimeout)時(shí)扶供,sendAndReceive()方法返回null筛圆。
在版本1.3.6之前,只是記錄了超時(shí)消息的遲到回復(fù)〈慌ǎ現(xiàn)在太援,如果收到遲到的回復(fù),它將被拒絕(模板拋出一個(gè)AmqpRejectAndDontRequeueException)扳碍。如果回復(fù)隊(duì)列被配置為將拒絕的消息發(fā)送到dead letter exchange提岔,則可以檢索回復(fù)以供稍后分析。只需使用等于回復(fù)queue名稱的routing key將隊(duì)列綁定到配置的dead letter exchange中笋敞。
有關(guān)配置dead lettering的更多信息碱蒙,請(qǐng)參閱?RabbitMQ Dead Letter Documentation。您還可以查看一個(gè)示例的FixedReplyQueueDeadLetterTests測(cè)試用例。
版本1.6引入了AsyncRabbitTemplate赛惩。這與AmqpTemplate中的sendAndReceive(和convertSendAndReceive)類似哀墓,但是它們返回一個(gè)ListenableFuture。
您可以稍后通過調(diào)用get()來同步檢索結(jié)果喷兼,也可以注冊(cè)一個(gè)將結(jié)果異步調(diào)用的回調(diào)盒齿。
@Autowired
privateAsyncRabbitTemplate template;
...
publicvoiddoSomeWorkAndGetResultLater(){
? ? ...
ListenableFuture future =this.template.convertSendAndReceive("foo");
// do some more work
String reply =null;
try{
? ? ? ? reply = future.get();
? ? }
catch(ExecutionException e) {
? ? ? ? ...
? ? }
? ? ...
}
publicvoiddoSomeWorkAndGetResultAsync(){
? ? ...
RabbitConverterFuture future =this.template.convertSendAndReceive("foo");
future.addCallback(newListenableFutureCallback() {
@Override
publicvoidonSuccess(String result){
? ? ? ? ? ? ...
? ? ? ? }
@Override
publicvoidonFailure(Throwable ex){
? ? ? ? ? ? ...
? ? ? ? }
? ? });
? ? ...
}
如果設(shè)置了mandatory,并且消息無法傳遞儒恋,則將來會(huì)拋出一個(gè)執(zhí)行錯(cuò)誤的ExecutionException衣洁,其原因是AmqpMessageReturnedException封裝返回的消息和有關(guān)返回的信息。
如果已設(shè)置enableConfirms欣喧,未來將具有一個(gè)屬性確認(rèn)鸽捻,它本身是一個(gè)ListenableFuture 多艇,表示成功發(fā)布黎侈。如果確認(rèn)的未來是false克饶,RabbitFuture將有另一個(gè)屬性nackCause(失敗的原因(如果有的話))。
如果在回復(fù)后收到發(fā)件人確認(rèn),則會(huì)被丟棄 - 因?yàn)榛貜?fù)意味著成功發(fā)布。
設(shè)置模板上的receiveTimeout屬性以配置超時(shí)回復(fù)(默認(rèn)為30000 - 30秒)鞍泉。如果發(fā)生超時(shí),將來將使用AmqpReplyTimeoutException完成肮帐。
該模板實(shí)現(xiàn)SmartLifecycle;在有待處理的回復(fù)的情況下停止模板將導(dǎo)致未處理的Future被取消咖驮。
Spring框架具有一般的遠(yuǎn)程處理能力边器,允許使用各種傳輸?shù)倪h(yuǎn)程過程調(diào)用(RPC)。 Spring-AMQP支持與客戶端上的AmqpProxyFactoryBean和服務(wù)器上的AmqpInvokerServiceExporter類似的機(jī)制托修。這提供了RPC over AMQP忘巧。在客戶端,如上所述使用RabbitTemplate;在服務(wù)器端睦刃,調(diào)用者(配置為MessageListener)接收消息砚嘴,調(diào)用配置的服務(wù),并使用入站消息的replyTo信息返回回復(fù)涩拙。
客戶端工廠bean可以注入任何bean(使用其serviceInterface);然后际长,客戶端可以在代理上調(diào)用方法,導(dǎo)致通過AMQP進(jìn)行遠(yuǎn)程執(zhí)行兴泥。
使用默認(rèn)MessageConverter工育,方法參數(shù)和返回值必須是Serializable的實(shí)例。
在服務(wù)器端搓彻,AmqpInvokerServiceExporter具有AmqpTemplate和MessageConverter屬性如绸。目前,模板的MessageConverter沒有被使用旭贬。如果您需要提供自定義消息轉(zhuǎn)換器怔接,那么您應(yīng)該使用messageConverter屬性提供它。在客戶端骑篙,可以將自定義消息轉(zhuǎn)換器添加到使用其amqpTemplate屬性提供給AmqpProxyFactoryBean的AmqpTemplate中蜕提。
客戶端和服務(wù)器配置示例如下所示森书。
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
routing-key="remoting.binding"exchange="remoting.exchange"/>
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
AmqpInvokerServiceExporter只能處理正確的消息靶端,例如從AmqpProxyFactoryBean發(fā)送的消息。如果收到不能解釋的消息凛膏,則將序列化的RuntimeException作為回復(fù)發(fā)送杨名。如果該消息沒有replyToAddress屬性,且未配置Dead Letter Exchange猖毫,該消息將被拒絕并永久丟失台谍。
默認(rèn)情況下,如果請(qǐng)求消息無法傳遞吁断,則調(diào)用線程將最終超時(shí)趁蕊,并將拋出RemoteProxyFailureException。超時(shí)時(shí)間默認(rèn)為5秒仔役,可以通過設(shè)置RabbitTemplate上的replyTimeout屬性進(jìn)行修改掷伙。從1.5版開始,將mandatory屬性設(shè)置為true又兵,并在連接工廠啟用返回(請(qǐng)參閱“發(fā)布者確認(rèn)和返回”一節(jié))任柜,調(diào)用線程將拋出一個(gè)AmqpMessageReturnedException卒废。有關(guān)詳細(xì)信息,請(qǐng)參閱“回復(fù)超時(shí)”一節(jié)宙地。
AMQP規(guī)范描述了如何使用協(xié)議來配置代理上的隊(duì)列摔认,交換和綁定。這些可從0.8規(guī)范和更高版本移植的操作存在于org.springframework.amqp.core包中的AmqpAdmin界面中宅粥。該類的RabbitMQ實(shí)現(xiàn)是位于org.springframework.amqp.rabbit.core包中的RabbitAdmin参袱。
AmqpAdmin接口基于使用Spring AMQP域抽象,如下所示:
publicinterfaceAmqpAdmin{
// Exchange Operations
voiddeclareExchange(Exchange exchange);
voiddeleteExchange(String exchangeName);
// Queue Operations
QueuedeclareQueue();
StringdeclareQueue(Queue queue);
voiddeleteQueue(String queueName);
voiddeleteQueue(String queueName,booleanunused,booleanempty);
voidpurgeQueue(String queueName,booleannoWait);
// Binding Operations
voiddeclareBinding(Binding binding);
voidremoveBinding(Binding binding);
PropertiesgetQueueProperties(String queueName);
}
getQueueProperties()方法返回有關(guān)隊(duì)列的一些有限信息(消息計(jì)數(shù)和消費(fèi)者計(jì)數(shù))秽梅。返回的屬性的鍵可用作RabbitTemplate(QUEUE_NAME蓖柔,QUEUE_MESSAGE_COUNT,QUEUE_CONSUMER_COUNT)中的常量风纠。 RabbitMQ REST API在QueueInfo對(duì)象中提供了更多的信息况鸣。
no-arg declareQueue()方法定義了一個(gè)自動(dòng)生成名稱的代理上的隊(duì)列。此自動(dòng)生成隊(duì)列的附加屬性為exclusive = true竹观,autoDelete = true和durable = false镐捧。
declareQueue(Queue queue)方法接受Queue對(duì)象并返回聲明隊(duì)列的名稱。如果提供的Queue的name屬性是空字符串臭增,則代理使用生成的名稱聲明隊(duì)列懂酱,并將該名稱返回給調(diào)用者。隊(duì)列對(duì)象本身沒有改變誊抛。此功能只能通過直接調(diào)用RabbitAdmin以編程方式使用列牺。管理員自動(dòng)聲明不支持在應(yīng)用程序上下文中聲明性地定義隊(duì)列。
這與AnonymousQueue形成對(duì)照拗窃,其中框架生成一個(gè)唯一的(UUID)名稱瞎领,并將持久性設(shè)置為false和exclusive,autoDelete為true随夸。具有空或缺少的name屬性的將始終創(chuàng)建AnonymousQueue九默。
請(qǐng)參閱“AnonymousQueue”部分了解為什么AnonymousQueue優(yōu)先于代理生成的隊(duì)列名稱,以及如何控制名稱的格式宾毒。聲明隊(duì)列必須具有固定名稱驼修,因?yàn)樗鼈兛赡茉谏舷挛闹械钠渌恢帽灰茫缭诒O(jiān)聽器中:
請(qǐng)參考“Automatic Declaration of Exchanges, Queues and Bindings”這一節(jié).
該接口的RabbitMQ實(shí)現(xiàn)是RabbitAdmin诈铛,當(dāng)使用Spring XML進(jìn)行配置時(shí)乙各,它將如下所示:
當(dāng)CachingConnectionFactory緩存模式是CHANNEL(默認(rèn)值)時(shí),RabbitAdmin實(shí)現(xiàn)會(huì)在同一個(gè)ApplicationContext中聲明的Queue幢竹,Exchanges和Bindings自動(dòng)延遲聲明耳峦。這些組件將被聲明為s0on,因?yàn)檫B接已打開到代理妨退。有一些命名空間功能使得這非常方便妇萄,例如:
xmlns="http://www.springframework.org/schema/rabbit">
xmlns="http://www.springframework.org/schema/rabbit">
在上面的例子中蜕企,我們使用匿名隊(duì)列(實(shí)際上內(nèi)部只是具有由框架生成的名稱的隊(duì)列,而不是broker)冠句,并通過ID引用它們轻掩。我們還可以使用顯式名稱聲明隊(duì)列,這也可以作為上下文中bean定義的標(biāo)識(shí)符懦底。例如唇牧。
您可以同時(shí)提供id和name屬性。這允許您通過獨(dú)立于隊(duì)列名稱的id來引用隊(duì)列(例如在綁定中)聚唐。它還允許標(biāo)準(zhǔn)的Spring功能丐重,如屬性占位符和隊(duì)列名稱的SpEL表達(dá)式;當(dāng)使用名稱作為bean標(biāo)識(shí)符時(shí),這些功能不可用杆查。
可以使用其他參數(shù)配置隊(duì)列扮惦,例如x-message-ttl或x-ha-policy。使用命名空間支持亲桦,使用元素以參數(shù)名稱/參數(shù)值對(duì)映射的形式提供它們崖蜜。
默認(rèn)情況下,這些參數(shù)被假定為字符串客峭。對(duì)于其他類型的參數(shù)豫领,需要提供類型。
當(dāng)提供混合類型的參數(shù)時(shí)舔琅,為每個(gè)條目元素提供類型:
100
使用Spring Framework 3.2及更高版本等恐,可以更簡(jiǎn)潔地聲明這一點(diǎn):
RabbitMQ代理將不允許聲明具有不匹配參數(shù)的隊(duì)列。例如,如果queue已經(jīng)存在关顷,沒有time to live參數(shù),并且嘗試使用key =“x-message-ttl”value =“100”進(jìn)行聲明债热,則會(huì)拋出異常砾嫉。
默認(rèn)情況下,當(dāng)發(fā)生任何異常時(shí)窒篱,RabbitAdmin將立即停止處理所有聲明;這可能會(huì)導(dǎo)致下游問題焕刮,例如監(jiān)聽器容器無法初始化,因?yàn)槲绰暶髁硪粋€(gè)隊(duì)列(在錯(cuò)誤之后定義)墙杯。
可以通過在RabbitAdmin上將ignore-declaration-exceptions屬性設(shè)置為true來修改此行為配并。此選項(xiàng)指示RabbitAdmin記錄異常,并繼續(xù)聲明其他元素高镐。當(dāng)使用java配置RabbitAdmin時(shí)溉旋,此屬性為ignoreDeclarationExceptions桃移。這是一個(gè)適用于所有元素苦银,隊(duì)列癌蚁,交換和綁定的全局設(shè)置浙垫,具有僅適用于這些元素的類似屬性。
在版本1.6之前梧油,此屬性僅在通道上發(fā)生IOException(例如當(dāng)前和所需屬性不匹配時(shí))才會(huì)生效∩凰剩現(xiàn)在,此屬性對(duì)任何異常生效婶溯,包括TimeoutException等鲸阔。
另外,任何聲明異常將導(dǎo)致發(fā)布DeclarationExceptionEvent迄委,它是可以由上下文中任何ApplicationListener使用的ApplicationEvent褐筛。該事件包含對(duì)管理員的引用,被聲明的元素和Throwable叙身。
從版本1.3開始渔扎,HeadersExchange可以配置為匹配多個(gè)標(biāo)頭;您還可以指定任何或所有標(biāo)題必須匹配:
從版本1.6開始 Exchanges可以使用internal標(biāo)志(默認(rèn)為false)進(jìn)行配置,并且Exchange將通過RabbitAdmin(如果存在于應(yīng)用程序上下文中)在代理上正確配置信轿。如果交換機(jī)的內(nèi)部標(biāo)志為true晃痴,則RabbitMQ將不允許客戶端使用交換機(jī)。這對(duì)于您不希望交易所直接由發(fā)布商使用的死信交換或交換到交換綁定非常有用财忽。
要了解如何使用Java來配置AMQP基礎(chǔ)設(shè)施倘核,請(qǐng)查看Stock示例應(yīng)用程序,其中有@Configuration類AbstractStockRabbitConfiguration即彪,它們又具有RabbitClientConfiguration和RabbitServerConfiguration子類紧唱。 AbstractStockRabbitConfiguration的代碼如下所示
@Configuration
publicabstractclassAbstractStockAppRabbitConfiguration{
@Bean
publicConnectionFactoryconnectionFactory(){
? ? ? ? CachingConnectionFactory connectionFactory =
newCachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
returnconnectionFactory;
? ? }
@Bean
publicRabbitTemplaterabbitTemplate(){
RabbitTemplate template =newRabbitTemplate(connectionFactory());
? ? ? ? template.setMessageConverter(jsonMessageConverter());
? ? ? ? configureRabbitTemplate(template);
returntemplate;
? ? }
@Bean
publicMessageConverterjsonMessageConverter(){
returnnewJsonMessageConverter();
? ? }
@Bean
publicTopicExchangemarketDataExchange(){
returnnewTopicExchange("app.stock.marketdata");
? ? }
// additional code omitted for brevity
}
在庫(kù)存應(yīng)用程序中,服務(wù)器使用以下@Configuration類進(jìn)行配置:
@Configuration
publicclassRabbitServerConfigurationextendsAbstractStockAppRabbitConfiguration{
@Bean
publicQueuestockRequestQueue(){
returnnewQueue("app.stock.request");
? ? }
}
這是@Configuration類的整個(gè)繼承鏈的結(jié)尾隶校。最終的結(jié)果是在應(yīng)用程序啟動(dòng)時(shí)漏益,TopicExchange和Queue將被聲明給代理。在服務(wù)器配置中沒有將TopicExchange綁定到隊(duì)列深胳,因?yàn)檫@在客戶端應(yīng)用程序中完成绰疤。然而,庫(kù)存請(qǐng)求隊(duì)列將自動(dòng)綁定到AMQP默認(rèn)交換 - 此行為由規(guī)范定義舞终。
客戶端@Configuration類有點(diǎn)有趣轻庆,如下所示。
@Configuration
publicclassRabbitClientConfigurationextendsAbstractStockAppRabbitConfiguration{
@Value("${stocks.quote.pattern}")
privateString marketDataRoutingKey;
@Bean
publicQueuemarketDataQueue(){
returnamqpAdmin().declareQueue();
? ? }
/**
? ? * Binds to the market data exchange.
? ? * Interested in any stock quotes
? ? * that match its routing key.
? ? */
@Bean
publicBindingmarketDataBinding(){
returnBindingBuilder.bind(
? ? ? ? ? ? ? ? marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
? ? }
// additional code omitted for brevity
}
客戶端通過AmqpAdmin上的declareQueue()方法聲明另一個(gè)隊(duì)列权埠,并通過在外部屬性文件中的routing pattern 將該隊(duì)列綁定到market data exchange榨了。
Builder API for Queues and Exchanges(構(gòu)建Queues和Exchanges的API)
版本1.6引入了方便的API,用于在使用Java配置時(shí)配置queue和Exchange對(duì)象:
@Bean
publicQueuequeue(){
returnQueueBuilder.nonDurable("foo")
? ? ? ? .autoDelete()
? ? ? ? .exclusive()
.withArgument("foo","bar")
? ? ? ? .build();
}
@Bean
publicExchangeexchange(){
returnExchangeBuilder.directExchange("foo")
? ? ? .autoDelete()
? ? ? .internal()
.withArgument("foo","bar")
? ? ? .build();
}
有關(guān)更多信息攘蔽,請(qǐng)參閱org.springframework.amqp.core.QueueBuilder和org.springframework.amqp.core.ExchangeBuilder的javadocs。
Declaring Collections of Exchanges, Queues, Bindings(聲明交換呐粘、隊(duì)列满俗、綁定的集合)
從1.5版開始转捕,現(xiàn)在可以通過重新生成一個(gè)集合來聲明一個(gè)@Bean的多個(gè)實(shí)體。
只考慮第一個(gè)元素是可聲明的集合唆垃,并且只處理來自這些集合的可聲明元素五芝。
@Configuration
publicstaticclassConfig{
@Bean
publicConnectionFactorycf(){
returnnewCachingConnectionFactory("localhost");
? ? }
@Bean
publicRabbitAdminadmin(ConnectionFactory cf){
returnnewRabbitAdmin(cf);
? ? }
@Bean
publicDirectExchangee1(){
returnnewDirectExchange("e1",false,true);
? ? }
@Bean
publicQueueq1(){
returnnewQueue("q1",false,false,true);
? ? }
@Bean
publicBindingb1(){
returnBindingBuilder.bind(q1()).to(e1()).with("k1");
? ? }
@Bean
publicListes(){
returnArrays.asList(
newDirectExchange("e2",false,true),
newDirectExchange("e3",false,true)
? ? );
? ? }
@Bean
publicListqs(){
returnArrays.asList(
newQueue("q2",false,false,true),
newQueue("q3",false,false,true)
? ? );
? ? }
@Bean
publicListbs(){
returnArrays.asList(
newBinding("q2", DestinationType.QUEUE,"e2","k2",null),
newBinding("q3", DestinationType.QUEUE,"e3","k3",null)
? ? );
? ? }
@Bean
publicListds(){
returnArrays.asList(
newDirectExchange("e4",false,true),
newQueue("q4",false,false,true),
newBinding("q4", DestinationType.QUEUE,"e4","k4",null)
? ? );
? ? }
}
默認(rèn)情況下,所有隊(duì)列辕万,交換和綁定都由應(yīng)用程序上下文中的所有RabbitAdmin實(shí)例(具有auto-startup =“true”)聲明枢步。
從1.2版本開始,可以有條件地聲明這些元素渐尿。當(dāng)應(yīng)用程序連接到多個(gè)代理程序并且需要指定哪個(gè)代理程序應(yīng)該聲明特定元素時(shí)醉途,這是特別有用的。
代表這些元素的類實(shí)現(xiàn)Declarable砖茸,它有兩個(gè)方法:shouldDeclare()和getDeclaringAdmins()隘擎。 RabbitAdmin使用這些方法來確定特定實(shí)例是否應(yīng)該實(shí)際處理其連接上的聲明。
這些屬性作為命名空間中的屬性凉夯,如以下示例所示货葬。
默認(rèn)情況下,auto-declare屬性為true劲够,如果未提供聲明(或?yàn)榭?震桶,則所有RabbitAdmin將聲明該對(duì)象(只要admin的auto-startup屬性為true;默認(rèn)值為true)。
同樣征绎,您也可以使用基于Java的@Configuration實(shí)現(xiàn)相同的效果蹲姐。在這個(gè)例子中,這些組件將由admin1聲明炒瘸,而不是admin2:
@Bean
publicRabbitAdminadmin(){
RabbitAdmin rabbitAdmin =newRabbitAdmin(cf1());
? ? rabbitAdmin.afterPropertiesSet();
returnrabbitAdmin;
}
@Bean
publicRabbitAdminadmin2(){
RabbitAdmin rabbitAdmin =newRabbitAdmin(cf2());
? ? rabbitAdmin.afterPropertiesSet();
returnrabbitAdmin;
}
@Bean
publicQueuequeue(){
Queue queue =newQueue("foo");
? ? queue.setAdminsThatShouldDeclare(admin());
returnqueue;
}
@Bean
publicExchangeexchange(){
DirectExchange exchange =newDirectExchange("bar");
? ? exchange.setAdminsThatShouldDeclare(admin());
returnexchange;
}
@Bean
publicBindingbinding(){
Binding binding =newBinding("foo", DestinationType.QUEUE, exchange().getName(),"foo",null);
? ? binding.setAdminsThatShouldDeclare(admin());
returnbinding;
}
通常淤堵,當(dāng)需要唯一命名的,排他性的自動(dòng)刪除隊(duì)列時(shí)顷扩,建議使用AnonymousQueue而不是代理定義的隊(duì)列名稱(使用“”作為隊(duì)列名稱將導(dǎo)致broker生成隊(duì)列名稱) 拐邪。
原因如下:
在建立到代理的連接時(shí),隊(duì)列實(shí)際上是被聲明的隘截;這是在bean被創(chuàng)建并連接在一起之后的很長(zhǎng)時(shí)間扎阶;使用隊(duì)列的bean需要知道它的名字。事實(shí)上婶芭,當(dāng)應(yīng)用程序啟動(dòng)時(shí)东臀,代理甚至可能無法運(yùn)行。
如果由于某種原因與代理的連接丟失犀农,則管理員將重新聲明具有相同名稱的AnonymousQueue惰赋。如果我們使用代理聲明的隊(duì)列,隊(duì)列名稱將會(huì)更改。
從1.5.3版開始赁濒,您可以控制AnonymousQueue使用的隊(duì)列名稱的格式轨奄。
默認(rèn)情況下,隊(duì)列名稱是UUID的String表示形式;例如:07afcfe9-fe77-4983-8645-0061ec61a47a拒炎。
您現(xiàn)在可以在構(gòu)造函數(shù)參數(shù)中提供AnonymousQueue.NamingStrategy實(shí)現(xiàn):
@Bean
publicQueueanon1(){
returnnewAnonymousQueue(newAnonymousQueue.Base64UrlNamingStrategy());
}
@Bean
publicQueueanon2(){
returnnewAnonymousQueue(newAnonymousQueue.Base64UrlNamingStrategy("foo-"));
}
第一個(gè)將生成一個(gè)前綴為spring.gen-的后綴為UUID的base64表示形式的隊(duì)列名稱挪拟,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二個(gè)將生成一個(gè)以foo為前綴的隊(duì)列名稱击你,后跟UUID的base64表示形式玉组。
Base64的編碼采用“URL和文件安全的字母“RFC 4648;尾部的填充字符(=)被移除丁侄。
您可以提供自己的命名策略惯雳,您可以將其他信息(例如應(yīng)用程序,客戶端主機(jī))包含在隊(duì)列名稱中绒障。
從版本1.6開始吨凑,可以在使用XML配置時(shí)指定命名策略;命名策略屬性存在于實(shí)現(xiàn)AnonymousQueue.NamingStrategy的bean引用的元素中。
第一個(gè)創(chuàng)建具有UUID的String表示形式的名稱户辱。第二個(gè)創(chuàng)建名稱如spring.gen-MRBv9sqISkuCiPfOYfpo4g鸵钝。第三個(gè)創(chuàng)建名稱,如custom.gen-MRBv9sqISkuCiPfOYfpo4g
當(dāng)然庐镐,您可以提供自己的命名策略bean恩商。
3.1.11 Delayed Message Exchange(延遲消息Exchange)
1.6版引入了對(duì)延遲消息exchange插件的支持
該插件目前被標(biāo)記為實(shí)驗(yàn)性,但已有一年以上(在撰寫本文時(shí))必逆。如果插件的更改需要怠堪,我們將盡快添加對(duì)這些更改的支持。因此名眉,Spring AMQP中的這種支持也應(yīng)該被認(rèn)為是實(shí)驗(yàn)性的粟矿。該功能使用RabbitMQ 3.6.0和版本0.0.1的插件進(jìn)行了測(cè)試。
要使用RabbitAdmin將延遲聲明為exchange损拢,只需將exchange bean上的delayed屬性設(shè)置為true即可陌粹。 RabbitAdmin將使用交換類型(Direct,F(xiàn)anout等)設(shè)置x-delayed-type參數(shù)福压,并使用x-delayed-message類型聲明交換掏秩。
使用XML配置交換bean時(shí),delayed(默認(rèn)為false)也可用荆姆。
要發(fā)送延遲的消息蒙幻,只需要通過MessageProperties設(shè)置x-delay:
MessageProperties properties =newMessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
或者
rabbitTemplate.convertAndSend(exchange, routingKey,"foo",newMessagePostProcessor() {
@Override
publicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setDelay(15000);
returnmessage;
? ? }
});
要檢查消息是否延遲,請(qǐng)?jiān)贛essageProperties上使用getReceivedDelay()方法胆筒。它是一個(gè)單獨(dú)的屬性邮破,以避免意外傳播到從輸入消息生成的輸出消息。
啟用管理插件后,RabbitMQ服務(wù)器公開一個(gè)REST API來監(jiān)視和配置代理【龊酰現(xiàn)在提供了API的Java綁定队询。一般來說派桩,您可以直接使用該API构诚,但是提供了一個(gè)方便的包裝器來使用熟悉的Spring AMQP隊(duì)列、Exchange和Binding域?qū)ο笈cAPI铆惑。當(dāng)直接使用com.rabbitmq.http.client.Client API(分別為QueueInfo范嘱,ExchangeInfo和BindingInfo)時(shí),這些對(duì)象可以獲得更多信息员魏。以下操作在RabbitManagementTemplate上可用:
publicinterfaceAmqpManagementOperations{
voidaddExchange(Exchange exchange);
voidaddExchange(String vhost, Exchange exchange);
voidpurgeQueue(Queue queue);
voidpurgeQueue(String vhost, Queue queue);
voiddeleteQueue(Queue queue);
voiddeleteQueue(String vhost, Queue queue);
QueuegetQueue(String name);
QueuegetQueue(String vhost, String name);
ListgetQueues();
ListgetQueues(String vhost);
voidaddQueue(Queue queue);
voidaddQueue(String vhost, Queue queue);
voiddeleteExchange(Exchange exchange);
voiddeleteExchange(String vhost, Exchange exchange);
ExchangegetExchange(String name);
ExchangegetExchange(String vhost, String name);
ListgetExchanges();
ListgetExchanges(String vhost);
ListgetBindings();
ListgetBindings(String vhost);
ListgetBindingsForExchange(String vhost, String exchange);
}
有關(guān)詳細(xì)信息丑蛤,請(qǐng)參閱javadocs。
3.1.13 Exception Handling(異常處理器)
使用RabbitMQ Java客戶端的許多操作可以拋出已檢查的異常撕阎。例如受裹,可能會(huì)拋出IOExceptions的情況很多。 RabbitTemplate虏束,SimpleMessageListenerContainer和其他Spring AMQP組件將捕獲這些異常并將其轉(zhuǎn)換為運(yùn)行時(shí)層次結(jié)構(gòu)中的一個(gè)異常棉饶。這些在org.springframework.amqp包中定義,AmqpException是層次結(jié)構(gòu)的基礎(chǔ)镇匀。
當(dāng)一個(gè)監(jiān)聽器拋出一個(gè)異常時(shí)照藻,它被包裝在一個(gè)ListenerExecutionFailedException中,通常這個(gè)消息被代理拒絕和重新排序汗侵。將defaultRequeueRejected設(shè)置為false將導(dǎo)致消息被丟棄(或路由到dead letter exchange)幸缕。如在“消息監(jiān)聽器和異步事件”一節(jié)中所討論的,監(jiān)聽器可以拋出一個(gè)AmqpRejectAndDontRequeueException來有條件地控制這種行為晰韵。
但是发乔,有一類錯(cuò)誤,監(jiān)聽器無法控制行為雪猪。當(dāng)遇到無法轉(zhuǎn)換的消息(例如無效的content_encoding標(biāo)頭)時(shí)栏尚,會(huì)在消息達(dá)到用戶代碼之前拋出一些異常。將defaultRequeueRejected設(shè)置為true(默認(rèn))浪蹂,這些消息將被重新傳遞抵栈。在版本1.3.2之前,用戶需要編寫一個(gè)自定義的ErrorHandler坤次,如3.1.13節(jié)“異常處理”所述古劲,以避免這種情況。
從版本1.3.2開始缰猴,默認(rèn)的ErrorHandler現(xiàn)在是一個(gè)ConditionalRejectingErrorHandler产艾,它將拒絕(而不是重新排序)消息,并發(fā)生不可恢復(fù)的錯(cuò)誤:
o.s.amqp…MessageConversionException
o.s.messaging…MessageConversionException
o.s.messaging…MethodArgumentNotValidException
o.s.messaging…MethodArgumentTypeMismatchException
java.lang.NoSuchMethodException
java.lang.ClassCastException
使用MessageConverter轉(zhuǎn)換傳入的消息有效負(fù)載時(shí),可以拋出第一個(gè)闷堡。如果在映射到@RabbitListener方法時(shí)需要額外的轉(zhuǎn)換隘膘,則轉(zhuǎn)換服務(wù)可能會(huì)拋出第二個(gè)。如果在監(jiān)聽器中使用驗(yàn)證(例如@Valid)杠览,并且驗(yàn)證失敗弯菊,則可能會(huì)拋出第三個(gè)。如果入站郵件轉(zhuǎn)換為目標(biāo)方法不正確的類型踱阿,則可能會(huì)拋出第四個(gè)郵件管钳。例如,該參數(shù)被聲明為Message软舌,但接收到Message才漆。
版本1.6.3中添加了第五和第六醇滥。
可以使用FatalExceptionStrategy配置此錯(cuò)誤處理程序的實(shí)例超营,以便用戶可以提供自己的條件消息拒絕規(guī)則鸳玩,例如。來自Spring Retry(稱為“消息監(jiān)聽器和異步情況”的部分)的BinaryExceptionClassifier的委托實(shí)現(xiàn)糟描。另外怀喉,ListenerExecutionFailedException現(xiàn)在有一個(gè)failMessage屬性可以在決定中使用。如果FatalExceptionStrategy.isFatal()方法返回true船响,則錯(cuò)誤處理程序?qū)伋鲆粋€(gè)AmqpRejectAndDontRequeueException異常躬拢。當(dāng)異常確定為致命時(shí),默認(rèn)的FatalExceptionStrategy會(huì)記錄一條警告消息见间。
自1.6.3版本以來聊闯,將用戶異常添加到致命列表中的方便方法是將ConditionalRejectingErrorHandler.DefaultExceptionStrategy子類化,并覆蓋方法isUserCauseFatal(Throwable cause)為致命異常返回true米诉。
Spring Rabbit框架支持在同步和異步使用情況下進(jìn)行自動(dòng)事務(wù)管理菱蔬,具有多種不同的語(yǔ)義,可以聲明式選擇史侣,正如Spring事務(wù)的現(xiàn)有用戶所熟悉的那樣拴泌。這使得許多如果不是最常見的消息傳遞模式非常容易實(shí)現(xiàn)。
有兩種方式將所需的事務(wù)語(yǔ)義信號(hào)發(fā)送到框架惊橱。在RabbitTemplate和SimpleMessageListenerContainer中都有一個(gè)標(biāo)志channelTransacted蚪腐,如果為true,則告知框架使用事務(wù)通道税朴,并根據(jù)結(jié)果結(jié)束提交或回滾以結(jié)束所有操作(發(fā)送或接收)回季,并發(fā)出異常指示回滾家制。另一個(gè)信號(hào)是提供一個(gè)外部事務(wù)與Spring的PlatformTransactionManager實(shí)現(xiàn)之一作為正在進(jìn)行的操作的上下文。如果在框架發(fā)送或接收消息時(shí)已經(jīng)有一個(gè)事務(wù)正在進(jìn)行泡一,并且channelTransacted標(biāo)志為真颤殴,那么消息傳遞事務(wù)的提交或回滾將被推遲到當(dāng)前事務(wù)結(jié)束。如果channelTransacted標(biāo)志為false鼻忠,則沒有事務(wù)語(yǔ)義適用于消息傳遞操作(它是自動(dòng)檢測(cè)的)涵但。
channelTransacted標(biāo)志是一個(gè)配置時(shí)間設(shè)置:當(dāng)AMQP組件被創(chuàng)建時(shí),它通常在應(yīng)用程序啟動(dòng)時(shí)被聲明和處理一次粥烁。外部事務(wù)原則上更動(dòng)態(tài)贤笆,因?yàn)橄到y(tǒng)在運(yùn)行時(shí)響應(yīng)當(dāng)前的Thread狀態(tài),但實(shí)際上當(dāng)事務(wù)按聲明方式分層到應(yīng)用程序時(shí)通常也是一個(gè)配置設(shè)置讨阻。
對(duì)于使用RabbitTemplate的同步用例,外部事務(wù)由調(diào)用者提供篡殷,無論是聲明性還是根據(jù)味道(通常的Spring事務(wù)模型)強(qiáng)制執(zhí)行钝吮。聲明性方法(通常是首選,因?yàn)樗欠乔秩胄缘?的示例板辽,其中模板已配置為channelTransacted = true:
@Transactional
publicvoiddoSomething(){
? ? String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
? ? String outgoing = processInDatabaseAndExtractReply(incoming);
? ? rabbitTemplate.convertAndSend(outgoing);
}
在一個(gè)標(biāo)記為@Transactional的方法內(nèi)接收奇瘦、轉(zhuǎn)換和發(fā)送一個(gè)字符串作為一個(gè)消息體,因此如果數(shù)據(jù)庫(kù)處理異常失敗劲弦,傳入的消息將返回給代理耳标,而傳出的消息將不會(huì)被發(fā)送。
對(duì)于使用SimpleMessageListenerContainer的異步使用情況邑跪,如果需要外部事務(wù)次坡,則必須在容器設(shè)置監(jiān)聽器時(shí)請(qǐng)求它。為了表明需要外部事務(wù)画畅,用戶在配置時(shí)向容器提供了PlatformTransactionManager的實(shí)現(xiàn)砸琅。例如:
@Configuration
publicclassExampleExternalTransactionAmqpConfiguration{
@Bean
publicSimpleMessageListenerContainermessageListenerContainer(){
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
? ? ? ? container.setConnectionFactory(rabbitConnectionFactory());
? ? ? ? container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
? ? ? ? container.setMessageListener(exampleListener());
returncontainer;
? ? }
}
在上面的例子中,事務(wù)管理器被添加為從另一個(gè)bean定義(未顯示)注入的依賴關(guān)系轴踱,并且channelTransacted標(biāo)志也被設(shè)置為true。效果是淫僻,如果監(jiān)聽器失敗并發(fā)生異常诱篷,則事務(wù)將被回滾,并且該消息也將返回給代理雳灵。重要的是棕所,如果事務(wù)無法提交(例如數(shù)據(jù)庫(kù)約束錯(cuò)誤或連接性問題),則AMQP事務(wù)也將被回滾细办,并且該消息將被返回給代理橙凳。這有時(shí)被稱為最佳努力1階段提交蕾殴,并且是可靠消息傳遞的非常強(qiáng)大的模式。如果在上面的示例中將channelTransacted標(biāo)志設(shè)置為false岛啸,這是默認(rèn)值钓觉,則仍將為監(jiān)聽器提供外部事務(wù),但所有消息傳遞操作都將自動(dòng)檢測(cè)坚踩,因此其效果是即使提交消息傳遞操作在業(yè)務(wù)運(yùn)行的回滾荡灾。
在版本1.6.6之前,當(dāng)回收規(guī)則添加到容器的transactionAttribute中時(shí)瞬铸,使用外部事務(wù)管理器(例如JDBC)不起作用;異撑希總是回滾事務(wù)。
此外嗓节,當(dāng)在容器的建議鏈中使用事務(wù)建議時(shí)荧缘,條件回滾并不是非常有用,因?yàn)樗斜O(jiān)聽器異常都被包裝在ListenerExecutionFailedException中拦宣。
第一個(gè)問題已得到糾正诱渤,規(guī)則現(xiàn)在得到適當(dāng)應(yīng)用固灵。此外,現(xiàn)在提供了ListenerFailedRuleBasedTransactionAttribute;它是RuleBasedTransactionAttribute的一個(gè)子類,唯一的區(qū)別是它知道ListenerExecutionFailedException遗菠,并且使用規(guī)則的這種異常的原因场绿。此事務(wù)屬性可以直接在容器中使用扼仲,也可以通過事務(wù)建議使用寺董。
使用此規(guī)則的示例如下:
@Bean
publicAbstractMessageListenerContainercontainer(){
? ? ...
? ? container.setTransactionManager(transactionManager);
? ? RuleBasedTransactionAttribute transactionAttribute =
newListenerFailedRuleBasedTransactionAttribute();
? ? transactionAttribute.setRollbackRules(Collections.singletonList(
newNoRollbackRuleAttribute(DontRollBackException.class)));
? ? container.setTransactionAttribute(transactionAttribute);
? ? ...
}
A note on Rollback of Received Messages
AMQP事務(wù)只適用于發(fā)送到代理的消息和acks,所以當(dāng)回滾Spring事務(wù)并且收到一條消息時(shí)外驱,Spring AMQP必須做的不僅僅是回滾事務(wù)育灸,而且手動(dòng)拒絕消息(這是一個(gè)壞消息,但這不是規(guī)范所說的)略步。對(duì)消息拒絕采取的操作與事務(wù)無關(guān)描扯,并且取決于defaultRequeueRejected屬性(默認(rèn)為true)。有關(guān)拒絕失敗消息的更多信息趟薄,請(qǐng)參閱“消息監(jiān)聽器和異步事件”一節(jié)绽诚。
有關(guān)RabbitMQ事務(wù)的更多信息及其限制,請(qǐng)參閱RabbitMQ代理語(yǔ)法杭煎。
在RabbitMQ 2.7.0之前恩够,這樣的消息(以及任何在通道關(guān)閉或中斷時(shí)未被解除的消息)都會(huì)到達(dá)Rabbit代理隊(duì)列的后面,因?yàn)?.7.0羡铲,拒絕的消息到隊(duì)列的前面蜂桶,以與JMS相似的方式回滾消息。
事務(wù)回滾中的消息重新排序在本地事務(wù)和提供TransactionManager時(shí)不一致也切。在前一種情況下扑媚,適用正常的重新排序邏輯(AmqpRejectAndDontRequeueException或defaultRequeueRejected = false)(參見“消息監(jiān)聽器和異步情況”一節(jié));與一個(gè)事務(wù)管理器腰湾,該消息是無條件地回滾。從版本1.7.1開始疆股,您可以通過將容器的alwaysRequeueWithTxManagerRollback屬性設(shè)置為false來啟用一致的行為;在默認(rèn)情況下费坊,它將為false。請(qǐng)參見第3.1.15節(jié)“消息監(jiān)聽器容器配置”旬痹。
Using the RabbitTransactionManager
RabbitTransactionManager是在外部事務(wù)中執(zhí)行Rabbit操作并與外部事務(wù)同步的替代方法附井。此事務(wù)管理器是PlatformTransactionManager接口的實(shí)現(xiàn),應(yīng)與單個(gè)Rabbit ConnectionFactory一起使用两残。
此策略不能提供XA事務(wù)永毅,例如為了在消息傳遞和數(shù)據(jù)庫(kù)訪問之間共享事務(wù)。
需要應(yīng)用程序代碼通過ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory人弓,boolean)來檢索事務(wù)性Rabbit資源沼死,而不是后續(xù)通道創(chuàng)建時(shí)的標(biāo)準(zhǔn)Connection.createChannel()調(diào)用。當(dāng)使用Spring AMQP的RabbitTemplate時(shí)票从,它將自動(dòng)檢測(cè)線程綁定的通道并自動(dòng)參與其事務(wù)漫雕。
使用Java配置,您可以使用以下方式設(shè)置新的RabbitTransactionManager:
@Bean
publicRabbitTransactionManagerrabbitTransactionManager(){
returnnewRabbitTransactionManager(connectionFactory);
}
如果您喜歡使用XML配置峰鄙,請(qǐng)?jiān)赬ML應(yīng)用程序上下文文件中聲明以下bean:
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
3.1.15 Message Listener Container Configuration(Message Listener容器配置)
有很多的選項(xiàng)配置SimpleMessageListenerContainer相關(guān)事務(wù)和服務(wù),其中一些可以相互作用太雨。
下表顯示了使用命名空間配置時(shí)的容器屬性名稱及其等效屬性名稱(括號(hào)中)吟榴。
PS:其他內(nèi)容還在繼續(xù)翻譯當(dāng)中