1.MQ的作用
1)解耦:在項目啟動之初是很難預(yù)測未來會遇到什么困難的睦霎,消息中間件在處理過程中插入了一個隱含的梢卸,基于數(shù)據(jù)的接口層,兩邊都實現(xiàn)這個接口副女,這樣就允許獨立的修改或者擴展兩邊的處理過程蛤高,只要兩邊遵守相同的接口約束即可。
2)冗余(存儲):在某些情況下處理數(shù)據(jù)的過程中會失敗碑幅,消息中間件允許把數(shù)據(jù)持久化知道他們完全被處理
擴展性:消息中間件解耦了應(yīng)用的過程戴陡,所以提供消息入隊和處理的效率是很容易的,只需要增加處理流程就可以了沟涨。
3)削峰:在訪問量劇增的情況下恤批,但是應(yīng)用仍然需要發(fā)揮作用,但是這樣的突發(fā)流量并不常見裹赴。而使用消息中間件采用隊列的形式可以減少突發(fā)訪問壓力喜庞,不會因為突發(fā)的超時負荷要求而崩潰
4)可恢復(fù)性:當(dāng)系統(tǒng)一部分組件失效時,不會影響到整個系統(tǒng)棋返。消息中間件降低了進程間的耦合性延都,當(dāng)一個處理消息的進程掛掉后,加入消息中間件的消息仍然可以在系統(tǒng)恢復(fù)后重新處理
5)順序保證:在大多數(shù)場景下睛竣,處理數(shù)據(jù)的順序也很重要晰房,大部分消息中間件支持一定的順序性
6)緩沖:消息中間件通過一個緩沖層來幫助任務(wù)最高效率的執(zhí)行
7)異步通信:通過把把消息發(fā)送給消息中間件,消息中間件并不立即處。
本文只討論削峰填谷的應(yīng)用場景:
舉個業(yè)務(wù)場景的栗子殊者,秒殺業(yè)務(wù):
上游發(fā)起下單操作
下游完成秒殺業(yè)務(wù)邏輯(庫存檢查与境,庫存凍結(jié),余額檢查猖吴,余額凍結(jié)摔刁,訂單生成,余額扣減海蔽,庫存扣減簸搞,生成流水,余額解凍准潭,庫存解凍)
上游下單業(yè)務(wù)簡單,每秒發(fā)起了10000個請求域仇,下游秒殺業(yè)務(wù)復(fù)雜刑然,每秒只能處理2000個請求,很有可能上游不限速的下單暇务,導(dǎo)致下游系統(tǒng)被壓垮泼掠,引發(fā)雪崩。
為了避免雪崩垦细,常見的優(yōu)化方案有兩種:
1)業(yè)務(wù)上游隊列緩沖择镇,限速發(fā)送
2)業(yè)務(wù)下游隊列緩沖,限速執(zhí)行
本文只討論下游隊列括改,就是消費端的限速執(zhí)行
rabbitmq提供了一種服務(wù)質(zhì)量保障功能腻豌,即在非自動確認消息的前提下,如果一定數(shù)目的消息未被確認嘱能,不進行消費新的消息吝梅。
使用 basicqos方法:
在消費端進行使用。 0 1 false
prefetSize:0
prefetCount:這個值一般在設(shè)置為非自動ack的情況下生效惹骂,一般大小為1
global: true是channel級別苏携, false是消費者級別
注意:我們要使用非自動ack
消費者代碼:
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.197.244");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck設(shè)置為 false
channel.basicQos(0,3,false);
channel.basicConsume(queueName,false,new MyConsumer(channel));
}
}
自定義消費者代碼:
package com.bfxy.rabbitmq.api.limit;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
生產(chǎn)者代碼:
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.197.244");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
調(diào)試步驟:
1)啟動消費者類,效果如圖:
2)在自定義消費者類中注釋掉channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類对粪,mq管控臺信息
可以看到1個待確認的右冻,4個準備好的消息,
3)放開代碼channel.basicAck(envelope.getDeliveryTag(), false);
啟動生產(chǎn)者類著拭,mq管控臺信息
總結(jié):消費者消費成功一個消息后纱扭,需要設(shè)置成手動確認,當(dāng)返回確認成功后茫死,再去消費下一個消息跪但,這樣可以實現(xiàn)消費端的削峰限流,不至于讓消費端服務(wù)崩潰。
到這里是不是以為結(jié)束了呢屡久,其實還有一個知識點忆首,就是消費端對沒有消費成功的消息,可以不進行確認被环,讓其重回隊列糙及,再次消費,與上面的代碼相比筛欢,只需修改自定義的消費者浸锨,設(shè)置如果滿足我們自己設(shè)置的條件就認為是沒有消費成功,讓其重回隊列版姑,這個時候broker端會再此發(fā)出這條消息柱搜。
修改如下:
啟動生產(chǎn)者和消費者,消費者控制臺信息如下: