消息消費(fèi)失敗處理方式:
一 進(jìn)入死信隊(duì)列(進(jìn)入死信的三種方式)
1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
2.消息TTL過期過期時(shí)間
3.隊(duì)列達(dá)到最大長(zhǎng)度
DLX也是一下正常的Exchange同一般的Exchange沒有區(qū)別姊扔,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性挠羔,當(dāng)這個(gè)隊(duì)列中有死信時(shí)申鱼,RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange中去赘理,進(jìn)而被路由到另一個(gè)隊(duì)列蛔翅, publish可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理乘盖, 這個(gè)特性可以彌補(bǔ)R abbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能呐赡。
rabbitmq的三種模式:
一. Fanout Exchange ?廣播
所有發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange 綁定(Binding)的所有Queue上娃豹。Fanout Exchange ?不需要處理RouteKey 焚虱。只需要簡(jiǎn)單的將隊(duì)列綁定到exchange 上。這樣發(fā)送到exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上懂版。類似子網(wǎng)廣播鹃栽,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。所以,F(xiàn)anout Exchange?轉(zhuǎn)發(fā)消息是最快的民鼓。
二. Direct Exchange ?點(diǎn)對(duì)點(diǎn)
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的Queue薇芝。Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進(jìn)行任何綁定(binding)操作 丰嘉。消息傳遞時(shí)夯到,RouteKey必須完全匹配,才會(huì)被隊(duì)列接收饮亏,否則該消息會(huì)被拋棄耍贾。
三. Topic Exchange ?模糊匹配
所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上,Exchange?將RouteKey?和某Topic?進(jìn)行模糊匹配路幸。此時(shí)隊(duì)列需要綁定一個(gè)Topic荐开。可以使用通配符進(jìn)行模糊匹配简肴,符號(hào)“#”匹配一個(gè)或多個(gè)詞晃听,符號(hào)“*”匹配不多不少一個(gè)詞。因此“l(fā)og.#”能夠匹配到“l(fā)og.info.oa”砰识,但是“l(fā)og.*” 只會(huì)匹配到“l(fā)og.error”能扒。所以,Topic Exchange?使用非常靈活辫狼。
mq也支持重發(fā)機(jī)制:
rabbitmq的消息確認(rèn)機(jī)制分兩部分一部分是生產(chǎn)端赫粥,一部分是消費(fèi)端生產(chǎn)端有兩種選擇,transaction ? 和 ? confirm予借。confirm ?的性能要好于transaction
//transaction?機(jī)制
channel.txSelect();
String?msg?="msg??test?!!!";
for(inti=0;i<10000;i++){
? ?msg?=?i+"?:?msg??test?!!!";
? channel.basicPublish(EXCHAGE,?QUEUE_NAME,null,msg.getBytes());
? System.out.println("publish?msg?"+msg);
? if(i>0&&i%100==0){
? ? //批量提交
? ? channel.txCommit();
? ?}
? }//?若出現(xiàn)異常?進(jìn)行?channel.txRollback(),對(duì)相應(yīng)批次的msg進(jìn)行重發(fā)或記錄
channel.txCommit();
生產(chǎn)者配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
? ? ? ? http://www.springframework.org/schema/beans
? ? ? ? http://www.springframework.org/schema/beans/spring-beans.xsd
? ? ? ? http://www.springframework.org/schema/rabbit
? ? ? ? ?http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factoryid="connectionFactory"
? ? host="10.153.25.15"username="insurance"password="insurance"port="5672"/>
<rabbit:adminconnection-factory="connectionFactory"/>
<rabbit:queueid="queue_insurance"durable="true"auto-delete="false"
? ? ? ? exclusive="false"name="queue_insurance">正常隊(duì)列當(dāng)中指向死信
<rabbit:queue-arguments>
? ? ? ? <entrykey="x-message-ttl">設(shè)置超時(shí)
? ? ? ? ? ? <valuetype="java.lang.Long">30000
? ? ? ? </entry>
? ? ? ? <entrykey="x-dead-letter-exchange">指定交換機(jī)
? ? ? ? ? ? <valuetype="java.lang.String">alter</value>
? ? ? ? </entry>
? </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queueid="alter_queue"durable="true"auto-delete="false"exclusive="false"name="alter_queue"/>死信隊(duì)列
<rabbit:direct-exchangename="alter"
? ? ? ? durable="true"auto-delete="false"id="alter">死信交換機(jī)
? ? ? ? <rabbit:bindings>
? ? ? ? ? ? ?<rabbit:bindingqueue="alter_queue"key="queue_key_insurance"/>
? ? ? ? </rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:direct-exchangename="exchange_insurance"
? ? ? durable="true"auto-delete="false"id="exchange_insurance">正常交換機(jī)
? ? ?<rabbit:bindings>
? ? ? ? ? ?<rabbit:bindingqueue="queue_insurance"key="queue_key_insurance"/>
? ? ?</rabbit:bindings>
</rabbit:direct-exchange>
<!-- (5)客戶端投遞消息到exchange频蛔。 -->
<rabbit:templateid="amqpTemplate"exchange="exchange_insurance"
? ? ? connection-factory="connectionFactory"/>
</beans>
消費(fèi)者配置:
<?xml version="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
? ? ?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? xmlns:rabbit="http://www.springframework.org/schema/rabbit"
? ? ? xsi:schemaLocation="
? ? ? ? ? ? ? http://www.springframework.org/schema/beans
? ? ? ? ? ? ? http://www.springframework.org/schema/beans/spring-beans.xsd
? ? ? ? ? ? ? http://www.springframework.org/schema/rabbit
? ? ? ? ? ? ? ?http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 連接服務(wù)配置 -->
<rabbit:connection-factoryid="connectionFactory2"host="10.153.25.15"
? ? ? ? ? ? ? ?username="insurance"password="insurance"port="5672"/>
<rabbit:adminconnection-factory="connectionFactory2"/>
<!-- queue 隊(duì)列聲明 -->
<!-- queue 隊(duì)列聲明? name 隊(duì)里的額name 是關(guān)聯(lián)生產(chǎn)表和消費(fèi)表的為唯一線索? -->
<rabbit:queueid="queue_insurance"name="queue_insurance">
? ? ? ? <rabbit:queue-argumentsvalue-type="java.lang.Long">
? ? ? ? ? ? ?<entrykey="x-message-ttl"value="30000"/>
? ? ? ? </rabbit:queue-arguments>
</rabbit:queue>
<!-- 定義消費(fèi)者監(jiān)聽器 -->
<!-- 創(chuàng)建一個(gè)bean實(shí)例灵迫,bean實(shí)例中聲明處理請(qǐng)求的類 -->
<beanid="consumerLitener2"class="com.insurance.mq.CommissionController"></bean>
<rabbit:listener-container connection-factory="connectionFactory2"acknowledge="auto"concurrency="8">
<!-- queues屬性從那個(gè)隊(duì)列中接收消息,ref屬性是當(dāng)存在消息是使用哪個(gè)類去處理 -->
<rabbit:listenerqueues="queue_insurance"ref="consumerLitener2"/>
</rabbit:listener-container>
</beans>