% rabbitMQ learn
% qijun
% 19/01/2018
mq 的一些概念
- mq: mq 是一個message broker (消息中介)
- AMQP (Advanced Message Queue ) 一個標(biāo)準(zhǔn)的消息隊列標(biāo)準(zhǔn)
- RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的開源實現(xiàn)
rabbit mq 的一些概念
rabbit mq 的適用場景架構(gòu)圖
- Client A &Client B 為消息的producer 消息由payload 和 label 組成晰洒,label是exchange的名字或者說是一個tag闺金,它描述了payload湃崩,而且RabbitMQ也是通過這個label來決定把這個Message發(fā)給哪個Consumer
- client 1 & client2 & client3 消息的consumer, 消息的接受者 接收到的消息是去除label 的消息艇纺,緊包含消息的內(nèi)容苔严,消費者通過訂閱隊列獲取消息刀荒。
- 中間是的 rabbit server 由 交換器,routingKey 和queue 組成首懈,交換器和queue 通過routingKey 綁定看成,消息通過交換器和routingKey 路由到相應(yīng)的queue
- Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的鹏秋。程序的起始處就是建立這個TCP連接尊蚁。
- Channels: 虛擬連接。它建立在上述的TCP連接中侣夷。數(shù)據(jù)流動都是在Channel中進(jìn)行的横朋。也就是說,一般情況是程序起始建立TCP連接百拓,第二步就是建立這個Channel琴锭。
四種交換器
由上面可知,消息通過交換器衙传,通過對應(yīng)的routekey 路由到queue, 交換器的類型一共有三種
- direct 如果 routing key 匹配, 那么Message就會被傳遞到相應(yīng)的queue中
- fanout 廣播到所有綁定的queue(假設(shè)你有一個消息需要發(fā)送給a和b,如果現(xiàn)在還需要發(fā)送給c决帖,使用fanout 交換器,只需要在c的代碼中創(chuàng)建一個隊列蓖捶,然后綁定到fanout 交換器即可)
- topic 對key進(jìn)行模式匹配地回,比如ab.1,ab.2都可以傳遞到所有routingkey 為ab.*的queue
基于topic類型交換器的routing key不是唯一的,而是一系列詞俊鱼,基于點區(qū)分刻像。
例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
binding key也是。*表示只匹配一個關(guān)鍵字 #可以匹配0或者多個關(guān)鍵字并闲。
比如*.a.b的隊列接受1.a.b 或者2.a.b等等 - header header交換器和 direct幾乎一樣细睡,性能更差,基本不會用到
匿名交換器(默認(rèn))
事實上焙蚓,你在代碼中不創(chuàng)建交換器也是可以通過rabbit mq 發(fā)送消息的纹冤,因為rabbit 提供了默認(rèn)的交換器。
如圖中空白字符串名字的交換器為默認(rèn)的交換器购公,類型為direct
本質(zhì)上所有的消息發(fā)送都要送往exchange(可以沒有隊列萌京,但不能沒有交換機(jī),沒有隊列時消息直接被丟棄)宏浩。
RabbitMQ提供了一種直接向Queue發(fā)送消息的快捷方法:直接使用未命名的exchange知残,不用綁定routing_key,直接用它指定隊列名比庄。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 發(fā)送消息
String message = "Hello World!";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
// 參數(shù)1 exchange :交換器
// 參數(shù)2 routingKey : 路由鍵
// 參數(shù)3 props : 消息的其他參數(shù)
// 參數(shù)4 body : 消息體
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
消息的確認(rèn)和拒絕
使用ack確認(rèn)Message的正確傳遞
默認(rèn)情況下求妹,如果Message 已經(jīng)被某個Consumer正確的接收到了,那么該Message就會被從queue中移除佳窑。當(dāng)然也可以讓同一個Message發(fā)送到很多的Consumer
如果一個queue沒被任何的Consumer Subscribe(訂閱)制恍,那么础废,如果這個queue有數(shù)據(jù)到達(dá)韧掩,那么這個數(shù)據(jù)會被cache,不會被丟棄铅乡。當(dāng)有Consumer時,這個數(shù)據(jù)會被立即發(fā)送到這個Consumer鹃唯,這個數(shù)據(jù)被Consumer正確收到時爱榕,這個數(shù)據(jù)就被從queue中刪除。
那么什么是正確收到呢坡慌?通過ack黔酥。每個Message都要被acknowledged(確認(rèn),ack)洪橘。我們可以顯示的在程序中去ack跪者,也可以自動的ack。
如果在收到數(shù)據(jù)后處理數(shù)據(jù)時程序發(fā)生錯誤梨树,無法正確處理數(shù)據(jù)坑夯,而是被reject岖寞。reject 參數(shù)設(shè)為true時RabbitMQ Server會把這個信息發(fā)送到下一個Consumer抡四,設(shè)為false也可以從隊列中把這條消息刪除。
如果這個app有bug仗谆,忘記了ack指巡,那么RabbitMQ Server不會再發(fā)送數(shù)據(jù)給它,因為Server認(rèn)為這個Consumer處理能力有限隶垮。
而且ack的機(jī)制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ack藻雪,甚至在額外的延時后發(fā)送ack,將有效的balance Consumer的load狸吞。
在什么地方創(chuàng)建queue
Consumer和Procuder都可以通過 queue.declare 創(chuàng)建queue勉耀。對于某個Channel來說,Consumer不能declare一個queue蹋偏,卻訂閱其他的queue便斥。當(dāng)然也可以創(chuàng)建私有的queue。這樣只有app本身才可以使用這個queue威始。queue也可以自動刪除枢纠,被標(biāo)為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創(chuàng)建一個已經(jīng)存在的queue呢黎棠?那么不會有任何的影響晋渺。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣脓斩,那么該操作雖然成功木西,但是queue的屬性并不會被修改。
那么誰應(yīng)該負(fù)責(zé)創(chuàng)建這個queue呢随静?是Consumer八千,還是Producer?
如果queue不存在,當(dāng)然Consumer不會得到任何的Message叼丑。但是如果queue不存在关翎,那么Producer Publish的Message會被丟棄。所以鸠信,還是為了數(shù)據(jù)不丟失纵寝,Consumer和Producer都try to create the queue!反正不管怎么樣星立,這個接口都不會出問題爽茴。
queue對load balance的處理是完美的。對于多個Consumer來說绰垂,RabbitMQ 使用循環(huán)的方式(round-robin)的方式均衡的發(fā)送給不同的Consumer室奏。
VirtualHost
在RabbitMQ中可以虛擬消息服務(wù)器VirtualHost,每個VirtualHost相當(dāng)月一個相對獨立的RabbitMQ服務(wù)器劲装,每個VirtualHost之間是相互隔離的胧沫。exchange、queue占业、message不能互通绒怨。
在RabbitMQ中無法通過AMQP創(chuàng)建VirtualHost,可以通過以下命令來創(chuàng)建谦疾。
rabbitmqctl add_vhost [vhostname]
windows下如何安裝rabbit mq
- rabbit mq 運行于erlang之上南蹂,需要先安裝erlang http://www.erlang.org/downloads 下載,并使用管理員運行安裝
- 安裝rabbit mq https://www.rabbitmq.com/download.html
- 新增環(huán)境變量 ERLANG_HOME= C:\Program Files\erl9.2
RABBITMQ_SERVER = C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.2
配置環(huán)境變量
Path=%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin - 替換 erlang cookie
拷貝C:\WINDOWS 下的.erlang.cookie (還有可能在C:\Windows\System32\config\systemprofile)文件替換 C:\Users%USERNAME%.erlang.cookie 或者 C:\Documents and
Settings%USERNAME%.erlang.cookie - 通過startMenu 啟動erlang 服務(wù) 和停止 rabbit mq 可以以服務(wù)的方式和按進(jìn)程的方式啟動念恍,建議使用服務(wù)方式啟動,然后在rabbit mq的命令行(RabbitMQ Command Prompt 開始菜單中) 執(zhí)行 rabbitmq-plugins enable rabbitmq_management
最后就可以通過 http://localhost:15672/ 賬號guest 密碼guest 訪問rabbit mq的控制臺 /是默認(rèn)的VirtualHost
常用命令
停止 broker
查詢 broker 狀態(tài) rabbitmqctl status
更多的命令請查閱 https://www.rabbitmq.com/man/rabbitmqctl.8.html
實戰(zhàn)
下面會通過兩個例子六剥,演示如何使用rabbitmq,第一個原生的java api 使用direct 交換器演示 routing,第二個例子使用topic 交換器峰伙。spring mvc疗疟,spring boot 中的注解和接口本質(zhì)上是對原生接口的包裝,spring 會隱藏一些操作词爬,對理解rabbit mq的工作流程會造成阻礙秃嗜,先使用原生api做演示一般的工作流程,而后結(jié)合springboot 演示在項目中如何使用rabbit mq顿膨。
rabbitmq 消費者和生產(chǎn)者兩端的在處理消息時經(jīng)歷的步驟
- 創(chuàng)建連接工廠ConnectionFactory
- 通過連接獲取通信通道Channel
- 聲明交換機(jī)Exchange(可選)
- 申明隊列(可選)
- 綁定交換機(jī)和隊列(可選)
之后生產(chǎn)者通過channel發(fā)送消息锅锨,消費者獲取并處理消息
rabbitmq comsumer 消息獲取方式
rabbitMQ中consumer通過建立到queue的連接,創(chuàng)建channel對象恋沃,通過channel通道獲取message,
Consumer可以聲明式的以API輪詢poll的方式主動從queue的獲取消息必搞,也可以通過訂閱的方式被動的從Queue中消費消息。
使用原生rabbitmq api 的例子
代碼發(fā)送三種類型的日志到交換器囊咏,交換器通過routingkey 分發(fā)到不同的queue
maven 依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.3</version>
</dependency>
消息發(fā)送
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 指定一個交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 發(fā)送消息
for (int i = 0; i < 10; i++) {
int rand = new Random().nextInt(3);
String severity = LOG_LEVEL_ARR[rand];
String message = "Qijun-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
// 發(fā)布消息至交換器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
消息接收
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置 RabbitMQ 的主機(jī)名
factory.setHost("localhost");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個通道
Channel channel = connection.createChannel();
// 指定一個交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 設(shè)置日志級別
int rand = new Random().nextInt(3);
// 創(chuàng)建三個非持久的恕洲、唯一的塔橡、自動刪除的隊列,分別接收不同的日志信息
String debugQueueName = channel.queueDeclare().getQueue();
String InfoQueueName = channel.queueDeclare().getQueue();
String ErrorQueueName = channel.queueDeclare().getQueue();
// 綁定交換器和隊列
// queueBind(String queue, String exchange, String routingKey)
// 參數(shù)1 queue :隊列名
// 參數(shù)2 exchange :交換器名
// 參數(shù)3 routingKey :路由鍵名
channel.queueBind(debugQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[0]);
channel.queueBind(InfoQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[1]);
channel.queueBind(ErrorQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[2]);
// rabbit mq 消息的推送支持poll 也支持訂閱霜第,先創(chuàng)建一個poll 方式的comsumer
QueueingConsumer pollConsumer = new QueueingConsumer(channel);
channel.basicConsume(ErrorQueueName, true, pollConsumer);
// 創(chuàng)建訂閱類型的消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received '" + message + "' from "+envelope.getRoutingKey()+ " by subscribe" );
}
};
channel.basicConsume(debugQueueName, true, consumer);
channel.basicConsume(InfoQueueName, true, consumer);
// 通過 循環(huán)poll 獲取隊列中的所有消息
while (true) {
QueueingConsumer.Delivery delivery = null;
try {
delivery = pollConsumer.nextDelivery();
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("Received '" + message + "' from "+routingKey +" by poll");
}
}
}
springboot 中使用rabbit mq 的例子
maven 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
ConnectionFactory配置
// 項目中可通過配置文件讀取來獲取 connect 參數(shù)
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("localhost");
cachingConnectionFactory.setPort(5672);
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
CachingConnectionFactory 內(nèi)部通過com.rabbitmq.client.ConnectionFactory 去設(shè)置 connect的參數(shù)
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware {
private static final String BAD_URI = "setUri() was passed an invalid URI; it is ignored";
protected final Log logger = LogFactory.getLog(this.getClass());
private final com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;
通過 RabbitAdmin 配置隊列葛家,交換機(jī)和binding
public static final String ROUTER_KEY_1 = "*.orange.*";
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
//申明一個 一個topic類型的交換機(jī),routingkey 使用通配符
TopicExchange topicExchange =(TopicExchange)ExchangeBuilder.topicExchange(QUEUE_EXCHANGE_NAME).durable(true).build();
rabbitAdmin.declareExchange(topicExchange);
Queue firstQueue = new Queue(QUEUE_NAME);
rabbitAdmin.declareQueue(firstQueue);
rabbitAdmin.declareBinding(BindingBuilder.bind(firstQueue).to(topicExchange).with(ROUTER_KEY_1));
return rabbitAdmin;
}
消息消費的兩種方法(推薦使用第二種泌类,更靈活)
- 通過SimpleMessageListenerContainer 綁定特定的messageListener
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receive2");
}
@Bean
SimpleMessageListenerContainer container(MessageListenerAdapter messageListenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(messageListenerAdapter);
return container;
}
@Service
public class Receiver {
public void receiveMessage(String message) {
System.out.println("Received<" + message + ">");
}
public void receive2(String in) throws InterruptedException {
System.out.println("in message"+in);
}
}
- 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
@Bean
public SimpleRabbitListenerContainerFactory myContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//設(shè)置了每個消費者再不回ack的情況下最大可接收消息的條數(shù)
factory.setPrefetchCount(100);
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* @author 祁軍
* 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
*/
@Service
public class Receiver1 {
@RabbitListener(queues = "${rabbitConfiguration.queue}", containerFactory = "myContainerFactory")
public void processMessage(String msg){
System.out.println("Receiver1 got message" + msg);
}
}
sender
@Service
public class Sender {
private RabbitTemplate rabbitTemplate;
@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send() {
// 發(fā)送兩次routing key不同 由于 是topic exchange routing key 為通配符可達(dá)到同一隊列
System.out.println("sender is sending message");
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.bbb", "hello,world1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.ccc", "hello,world2");
}
}
測試
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitMQTest {
@Autowired
private Sender sender;
@Test
public void send() throws Exception {
sender.send();
}
}
rabbit mq 的其他應(yīng)用場景
working queue
當(dāng)有Consumer需要大量的運算時癞谒,RabbitMQ Server需要一定的分發(fā)機(jī)制來balance每個Consumer的load。試想一下刃榨,對于web application來說弹砚,在一個很多的HTTP request里是沒有時間來處理復(fù)雜的運算的,只能通過后臺的一些工作線程來完成枢希。應(yīng)用場景就是RabbitMQ Server會將queue的Message分發(fā)給不同的Consumer以處理計算密集型的任務(wù)桌吃。
RPC
MQ本身是基于異步的消息處理,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會知道消費者(C)處理成功或者失敯巍(甚至連有沒有消費者來處理這條消息都不知道)茅诱。
但實際的應(yīng)用場景中,我們很可能需要一些同步處理呕屎,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理让簿。這相當(dāng)于RPC(Remote Procedure Call敬察,遠(yuǎn)程過程調(diào)用)秀睛。在RabbitMQ中也支持RPC。
RabbitMQ中實現(xiàn)RPC的機(jī)制是:
- 客戶端發(fā)送請求(消息)時莲祸,在消息的屬性(MessageProperties蹂安,在AMQP協(xié)議中定義了14中properties,這些屬性會隨著消息一起發(fā)送)中設(shè)置兩個值replyTo(一個Queue名稱锐帜,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個Queue中)和correlationId(此次請求的標(biāo)識號田盈,服務(wù)器處理完成后需要將此屬性返還,客戶端將根據(jù)這個id了解哪條請求被成功執(zhí)行了或執(zhí)行失斀裳帧)
- 服務(wù)器端收到消息并處理
- 服務(wù)器端處理完消息后允瞧,將生成一條應(yīng)答消息到replyTo指定的Queue,同時帶上correlationId屬性
- 客戶端之前已訂閱replyTo指定的Queue蛮拔,從中收到服務(wù)器的應(yīng)答消息后述暂,根據(jù)其中的correlationId屬性分析哪條請求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理
rabbitmq 消息的可靠性
- 發(fā)送端的comfirm 機(jī)制建炫,通過注冊回調(diào)畦韭,我們可以知道消息是否已經(jīng)發(fā)送到exchange 或者queue,如果沒有正確發(fā)送,我們可以通過replycode來判斷進(jìn)行后續(xù)什么操作肛跌,然后根據(jù)業(yè)務(wù)場景
比如發(fā)送告警艺配,或者重發(fā)來應(yīng)對察郁。 - 消息的持久化,通過交換機(jī)转唉,隊列和消息的持久化來實現(xiàn)
- rabbitmq 從queue 發(fā)消息給消費者皮钠,如果消費者選擇no ack 則queue每發(fā)一條消息,rabbitmq 就會把消息刪除赠法,如果cosumer 由于某種問題消費消息出錯鳞芙,rabbitmq也會把消息刪除。
我們需要在comsumer 關(guān)閉自動ack期虾,使用basic ack 手工應(yīng)答保證消息被正確消費原朝,如果消費失敗,basic nack 可以刪除隊列消息或者重新入原隊列镶苞,可能導(dǎo)致死循環(huán)
如果不希望把有問題的消息刪除或者重新入原來的隊列喳坠,可以指定一個死信隊列,錯誤的消息重新入死信對列茂蚓,然后再次被消費壕鹉。
發(fā)送端的ack
rabbitmq提供了確認(rèn)ack機(jī)制,可以用來確認(rèn)消息是否到broker 或者queue聋涨。
/**confirmcallback用來確認(rèn)消息是否到達(dá)broker*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
//log error
} else {
//maybe delete msg in db
}
});
/**若消息不能正確的達(dá)到指定的隊列會調(diào)用 */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
log.info("send message failed: " + replyCode + " " + replyText);
// resend message
});
消息的持久化
// 交換機(jī)的持久化
// 參數(shù)1 name :交互器名
// 參數(shù)2 durable :是否持久化
// 參數(shù)3 autoDelete :當(dāng)所有消費客戶端連接斷開后晾浴,是否自動刪除隊列
new TopicExchange(name, durable, autoDelete)
// 隊列是持久化
// 參數(shù)1 name :隊列名
// 參數(shù)2 durable :是否持久化
// 參數(shù)3 exclusive :僅創(chuàng)建者可以使用的私有隊列,斷開后自動刪除
// 參數(shù)4 autoDelete : 當(dāng)所有消費客戶端連接斷開后牍白,是否自動刪除隊列
new Queue(name, durable, exclusive, autoDelete);
springAMQP 的消息持久化是默認(rèn)的
消費者端的手工確認(rèn)
如果一直不回ack脊凰,mq會block 這個消費者
@Bean
SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(QUEUE_NAME);
//設(shè)定單次可分發(fā)給消費則的消息個數(shù)
container.setPrefetchCount(1);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
try {
log.info("receive msg: " + new String(body));
//do something
} catch (Exception e) {
} finally {
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費
}
}
});
return container;
}
springAMQP 提供的確認(rèn)方式
很明顯上述代碼提供的手工確認(rèn)方式(使用ChannelAwareMessageListener)很不優(yōu)雅,你需要創(chuàng)建多個bean 然后綁定queue茂腥。
當(dāng)setDefaultRequeueRejected(true) (默認(rèn)情況下)狸涌,如果消息被正常消費,container 會ack最岗,然后隊列刪除消息帕胆,如果消費者拋出異常,container會reject這個消息般渡,然后這個消息會requeue到原來的消息隊列懒豹,如果業(yè)務(wù)一直處在這個異常情況下,requeue的消息會再次回到消費者驯用,然后死循環(huán)脸秽,這種情況很顯然不行,spring AMQP 提供的替代方式:listener拋出AmqpRejectAndDontRequeueException晨汹,則這個消息會被拋棄豹储,或者進(jìn)入死信隊列,Listener拋出AmqpRejectAndDontRequeueException還可以通過配置factory 的ErrorHandler 把你拋出的異常 轉(zhuǎn)換為AmqpRejectAndDontRequeueException,如下式例淘这,如果你的listener 拋出了XMLException 則這個消息會被discard(在沒有配置死信隊列的情況下)剥扣。
factory.setErrorHandler(new ConditionalRejectingErrorHandler(
t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof XMLException));
factory.setDefaultRequeueRejected(false); 則只要listener 拋出異常巩剖,message就會被discard或者轉(zhuǎn)入死信隊列,如果需要針對不同的異常(比如可短時間內(nèi)恢復(fù)的異常钠怯,需要重入原隊列佳魔,不可恢復(fù)的異常discard 或者入死信隊列)建議設(shè)置成true,然后配置ErrorHandler 如上
springAMQP 如何配置死信隊列
當(dāng)然你可以通過創(chuàng)建一個死信隊列晦炊,然后在listener端消費時重新發(fā)送到死信隊列鞠鲜,但springAMQP 提供了更好的方式如下
@Bean
TopicExchange exchange()
{
return new TopicExchange(DEFAULT_EXCHANGE);
}
@Bean
Queue deadLetterQueue()
{
return new Queue(DEAD_LETTER_QUEUE,true);
}
@Bean
Queue queue()
{
// 通過args參數(shù)為當(dāng)前隊列綁定一個死信隊列
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE);
return new Queue(WORKORDER_QUEUE,true,false,false,args);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange)
{
return BindingBuilder.bind(queue).to(exchange).with(WORKORDER_QUEUE);
}
@Bean
Binding bindingDeadLetter(Queue deadLetterQueue, TopicExchange exchange)
{
return BindingBuilder.bind(deadLetterQueue).to(exchange).with(DEAD_LETTER_QUEUE);
}
消費者拋出AmqpRejectAndDontRequeueException 異常時則會進(jìn)入死信隊列
@RabbitListener(queues = RabbitConfig.WORKORDER_QUEUE)
public void processMessage(String msg) throws Exception
{
throw new AmqpRejectAndDontRequeueException("to dead-letter");
}
死信隊列的消費者
@Service
public class ErrorHandler {
@RabbitListener(queues = "dead_queue", containerFactory = "myContainerFactory")
public void handleError(Object message){
System.out.println("XXXXXXX"+message);
}
}
其他高級主題
rabbit mq的消息確認(rèn)機(jī)制(包括producer到broker 和broker 到 consumer的確認(rèn)),集群等等。
參考
https://www.rabbitmq.com/getstarted.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/spring-amqp
https://docs.spring.io/spring-amqp/reference/html/
http://blog.720ui.com/2017/springboot_06_mq_rabbitmq/
http://www.cnblogs.com/xingzc/p/5945030.html
https://www.cnblogs.com/diegodu/p/4971586.html
http://blog.csdn.net/column/details/rabbitmq.html
http://blog.csdn.net/u013256816/article/category/6532725/1