各消息組件的適用場(chǎng)景
- ActiveMQ
ActiveMQ 是一款比較早期的消息組件爸邢,由Apache開源出來的,它能滿足吞吐量一般的業(yè)務(wù)場(chǎng)景拿愧,但是對(duì)于高并發(fā)場(chǎng)景杠河,性能較差。 - Kafka
Kafka追求高吞吐量的特性浇辜,它一開始使用于日志的收集券敌。缺點(diǎn)是消息可靠性支持較少,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)柳洋。 - RocketMQ
RocketMQ 早期由阿里團(tuán)隊(duì)開發(fā)的待诅,現(xiàn)在升級(jí)為Apache的頂級(jí)項(xiàng)目。純 Java 開發(fā)熊镣,具有高吞吐量卑雁、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)轧钓。缺點(diǎn)是序厉,有些功能不是開源的锐膜,如消息事務(wù)毕箍。 - RabbitMQ
RabbitMQ 是由 Erlang 語言編寫的,適合對(duì)數(shù)據(jù)一致性道盏、穩(wěn)定性和可靠性要求很高的場(chǎng)景而柑,對(duì)性能和吞吐量的要求還在其次。
AMQP協(xié)議
AMQP荷逞,即Advanced Message Queuing Protocol媒咳,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)种远,為面向消息的中間件設(shè)計(jì)涩澡。
AMQP的主要特征是面向消息、隊(duì)列坠敷、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)妙同、可靠性、安全膝迎。
1粥帚、AMQP模型
消息是直接由生產(chǎn)者發(fā)送到Exchange中的,然后Exchange和Message Queue之間通過某種規(guī)則關(guān)聯(lián)起來(后面會(huì)講)限次;消費(fèi)者是直接訂閱Message Queue的芒涡,只要Queue里面有消息,就會(huì)被消費(fèi)者消費(fèi)。這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的低耦合性费尽。
2赠群、AMQP核心概念
- Channel:網(wǎng)絡(luò)通信,幾乎所有的操作都在Channel中進(jìn)行旱幼,Channel是進(jìn)行消息讀寫的通道乎串。客戶端可建立多個(gè)Channel速警,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)叹誉。
- Virtual host:虛擬地址,用于進(jìn)行邏輯隔離闷旧,最上層的消息路由长豁。
- Exchange:交換機(jī),接收消息忙灼,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列匠襟。
- Routing key:一個(gè)路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個(gè)特定消息
- Queue:也稱 Message Queue该园,消息隊(duì)列酸舍,保存消息,并將它們轉(zhuǎn)發(fā)給消費(fèi)者里初。
- Binding:Exchange和Queue之間的虛擬連接啃勉,binding 中可以指定 routing key。
RabbitMQ
RabbitMQ 是基于AMQP協(xié)議開發(fā)的消息組件.双妨。
1淮阐、RabbitMQ的整體架構(gòu)
生產(chǎn)者發(fā)送消息到Exchange中的,然后Exchange和Message Queue之間通過Routing key建立路由規(guī)則刁品,把消息發(fā)送給特定的Queue泣特,然后消息推送給訂閱了該Queue的消費(fèi)者。
生產(chǎn)者發(fā)送消息時(shí)挑随,需要指定兩個(gè)參數(shù)状您,Exchange和Routing key,如果Exchange不指定(即為空)兜挨,則會(huì)采用默認(rèn)的AMQP default的Exchange膏孟,其會(huì)根據(jù)Routing key的值,路由到對(duì)應(yīng)的 Queue暑劝。
2骆莹、Exchange
Exchange有四種類型,分別是Direct担猛,Topic幕垦,F(xiàn)anout 丢氢,Header。
- Direct Exchange
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的Queue先改。
注意:Direct模式可以使用RabbitMQ自帶的Exchange:AMQP default疚察,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí)仇奶,RouteKey必須完全匹配才會(huì)被隊(duì)列接收貌嫡,否則該消息會(huì)被拋棄。
exchange在和queue進(jìn)行binding時(shí)會(huì)設(shè)置routingkey:
channel.QueueBind(queue: "create_pdf_queue",
exchange: "pdf_events",
routingKey: "pdf_create",
arguments: null);
然后我們?cè)趯⑾l(fā)送到exchange時(shí)會(huì)設(shè)置對(duì)應(yīng)的routingkey:
channel.BasicPublish(exchange: "pdf_events",
routingKey: "pdf_create",
basicProperties: properties,
body: body);
在direct類型的exchange中该溯,只有這兩個(gè)routingkey完全相同岛抄,exchange才會(huì)選擇對(duì)應(yīng)的binging進(jìn)行消息路由。
具體流程如下:
-
Topic Exchange
此類型exchange和上面的direct類型差不多狈茉,但direct類型要求routingkey完全相等夫椭,這里的 bindings 中的 routingkey 可以有通配符:“*”,“#”,其中 “*” 表示匹配一個(gè)單詞, “#”則表示匹配沒有或者多個(gè)單詞氯庆。
舉個(gè)栗子:
-
Fanout Exchange
此exchange的路由規(guī)則很簡(jiǎn)單蹭秋,直接將消息路由到所有綁定的隊(duì)列中,無須對(duì)消息的routingkey進(jìn)行匹配操作堤撵。
- Header Exchange
此類型的exchange和以上三個(gè)都不一樣仁讨,其路由的規(guī)則是根據(jù)header來判斷,其中的header就是以下方法的arguments參數(shù):
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
exchange: "agreements",
routingKey: string.Empty,
arguments: aHeader);
其中的x-match為特殊的header实昨,可以為all則表示要匹配所有的header洞豁,如果為any則表示只要匹配其中的一個(gè)header即可。
在發(fā)布消息的時(shí)候就需要傳入header值:
Properties properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;
以上就是exchange 類型的總結(jié)屠橄,一般來說direct和topic用來具體的路由消息族跛,如果要用廣播的消息一般用fanout的exchange闰挡。
header類型用的比較少锐墙,但還是知道一點(diǎn)好。
RabbitMQ與SpringBoot整合
- 添加 amqp 依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置文件 application.properties
# rabbitmq
spring.rabbitmq.host=node2
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
# 對(duì) rabbitmqTemplate 進(jìn)行監(jiān)聽,當(dāng)消息由于server的原因無法到達(dá)queue時(shí)长酗,就會(huì)被監(jiān)聽到溪北,以便執(zhí)行ReturnCallback方法
# 默認(rèn)為false,Server端會(huì)自動(dòng)刪除不可達(dá)消息
spring.rabbitmq.template.mandatory=true
# 消費(fèi)端手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 并發(fā)消費(fèi)
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
- 生產(chǎn)者
/**
* @author K. L. Mao
* @create 2018/9/20
*/
@Service
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 用于監(jiān)聽Server端給我們返回的確認(rèn)請(qǐng)求,消息到了exchange,ack 就返回true
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
System.out.println("correlationData:" + correlationData);
System.out.println("ack:" + ack);
if (ack){
System.out.println("將msg-db數(shù)據(jù)更新為處理成功");
} else {
System.out.println("記錄異常日志...夺脾,后續(xù)會(huì)有補(bǔ)償機(jī)制(定時(shí)器)");
}
};
/**
* 監(jiān)聽對(duì)不可達(dá)的消息進(jìn)行后續(xù)處理;
* 不可達(dá)消息:指定的路由key路由不到之拨。
*/
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText,
exchange, routingKey) -> System.out.println("return exchange:" + exchange + ", routingKey:" + routingKey +
", replyText:" + replyText);
/**
* 發(fā)送消息
* @param order
*/
public void sendOrder(Order order) {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData cd = new CorrelationData();
// 消息唯一標(biāo)識(shí)
cd.setId(UUID.randomUUID().toString().replace("-","") + DateUtils.formatDate(new Date(), "yyyyMMdd"));
rabbitTemplate.convertAndSend("exchange-2", "springboot.abc", order, cd);
}
}
- 消費(fèi)者
/**
* @author K. L. Mao
* @create 2018/9/20
*/
@Service
public class RabbitConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-2", durable = "true"),
exchange = @Exchange(value = "exchange-2",
durable = "true", type = "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.#")
)
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> properties, Channel channel) throws IOException {
System.out.println("消費(fèi)端 order:" + order);
// deliveryTag: 對(duì)于每個(gè)Channel來說,每個(gè)消息都會(huì)有一個(gè)DeliveryTag咧叭,一般用接收消息的順序(index)來表示蚀乔,一條消息就為1
Long deliveryTag = (Long) properties.get(AmqpHeaders.DELIVERY_TAG);
System.out.println("deliveryTag:" + deliveryTag);
// 限流處理:消息體大小不限制,每次限制消費(fèi)一條菲茬,只作用于該Consumer層吉挣,不作用于Channel
channel.basicQos(0, 1, false);
// 手工ACK,不批量ack派撕,multiple:當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息
channel.basicAck(deliveryTag, false);
}
可以直接使用@RabbitListener注解睬魂,聲明Queue和Exchange以及Binding關(guān)系终吼。
消費(fèi)端接收的消息是Message對(duì)象,結(jié)構(gòu)為:
package org.springframework.messaging;
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
我們可以直接通過注解@Payload 獲取我們傳輸?shù)臄?shù)據(jù)氯哮,通過注解@Headers 獲取消息請(qǐng)求頭际跪。
這里我們?cè)黾恿讼⑾蘖鞯墓δ埽乐股a(chǎn)過多喉钢,導(dǎo)致消費(fèi)者消費(fèi)吃力的情況:
channel.basicQos(0, 1, false):0表示對(duì)消息的大小無限制姆打,1表示每次只允許消費(fèi)一條,false表示該限制不作用于channel肠虽。
同時(shí)穴肘,我們這里采用手工ACK的方式,因?yàn)槲覀兣渲梦募渲昧藄pring.rabbitmq.listener.simple.acknowledge-mode=manual:
channel.basicAck(deliveryTag, false):deliveryTag表示處理的消息條數(shù)(一般為1)舔痕,從heaers中取评抚,false表示不批量ack。
DLX(死信隊(duì)列)
- DLX定義
DLX為Dead Letter Exchange伯复,死信隊(duì)列慨代。當(dāng)一個(gè)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能重新publish到另一個(gè)Exchange啸如,那么這個(gè)讓消息變?yōu)樗佬诺腅xchange就是DLX(死信隊(duì)列)侍匙。 - 消息變成死信的幾種情況
1、消息被拒絕叮雳,ack為false想暗,并且 requeue=false;
2、消息TTL(Time To Live)過期帘不,指消息達(dá)到了過期時(shí)間说莫;
3、隊(duì)列達(dá)到最大長(zhǎng)度寞焙。 - 死信隊(duì)列代碼演示:
1储狭、聲明一個(gè)死信Exchange、Queue以及Binding
/**
* 聲明一個(gè)死信交換機(jī),不一定為Topic Exchange, 和交換機(jī)類型無關(guān)
* @return
*/
@Bean
public Exchange deadLetterExchange(){
return ExchangeBuilder.topicExchange("DL_EXCHANGE").durable(true).build();
}
/**
* 聲明一個(gè)死信隊(duì)列捣郊,并且配置轉(zhuǎn)發(fā)交換機(jī)
* @return
*/
@Bean
public Queue deadLetterQueue(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 聲明 死信需要轉(zhuǎn)發(fā)的交換機(jī)
args.put("x-dead-letter-exchange", "FORWARD_EXCHANGE");
// x-dead-letter-routing-key 如果沒有指定辽狈,那么消息本身使用的 routing key 將被使用。即DL.xxx
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
/**
* 死信隊(duì)列進(jìn)行綁定呛牲,
* @return
*/
@Bean
public Binding deadLetterBinding(){
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("DL.#").noargs();
}
2刮萌、聲明一個(gè)轉(zhuǎn)發(fā)隊(duì)列,來接收死信消息
/**
* 聲明一個(gè)轉(zhuǎn)發(fā)交換機(jī)
* @return
*/
@Bean
public Exchange forwardExchange(){
return ExchangeBuilder.topicExchange("FORWARD_EXCHANGE").durable(true).build();
}
/**
* 轉(zhuǎn)發(fā)隊(duì)列,即requeue娘扩、過期(TTL)着茸、超過隊(duì)列容量僧凤,就會(huì)被轉(zhuǎn)發(fā)到此隊(duì)列
* @return
*/
@Bean
public Queue forwardQueue(){
return QueueBuilder.durable("FORWARD_QUEUE").build();
}
/**
* 綁定轉(zhuǎn)發(fā)交換機(jī)和隊(duì)列,這里的routing一般為"#"元扔,因?yàn)槲覀兩厦嫠佬抨?duì)列沒有設(shè)置x-dead-letter-routing-key躯保,
* 故被轉(zhuǎn)發(fā)的消息會(huì)攜帶原消息的routingKey,即DL.xxx澎语,為了能夠被路由到FORWARD_QUEUE途事,故最好"#"
* @return
*/
@Bean
public Binding forwardBinding(){
return BindingBuilder.bind(forwardQueue()).to(forwardExchange()).with("#").noargs();
}
3、對(duì)發(fā)送的消息設(shè)置TTL擅羞,模擬DLX場(chǎng)景
/**
* 發(fā)送死信隊(duì)列
* @param msg
*/
public void sendDlx(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 設(shè)置編碼
messageProperties.setContentEncoding("UTF-8");
// 設(shè)置過期時(shí)間 10s,到了過期時(shí)間還沒被消費(fèi)尸变,則會(huì)進(jìn)入死信隊(duì)列
messageProperties.setExpiration("10000");
return message;
};
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL.AAA", msg,
messagePostProcessor, correlationData);
}
這里設(shè)置了10s的過期時(shí)間,消息一開始是在DL_QUEUE隊(duì)列上减俏,如果10s之內(nèi)還沒被消費(fèi)召烂,則會(huì)進(jìn)入FORWARD_QUEUE(轉(zhuǎn)發(fā))隊(duì)列。
以上的代碼示例娃承,通過DLX實(shí)現(xiàn)了延遲隊(duì)列的功能奏夫,即將消息發(fā)送至DL_QUEUE隊(duì)列上,10s之后才會(huì)被FORWARD_QUEUE消費(fèi)(消費(fèi)者監(jiān)聽該隊(duì)列即可)历筝,從而起到了延遲消費(fèi)的功能酗昼。
可靠性投遞解決方案
可靠性投遞,即保證消息的100%被消費(fèi)梳猪。目前麻削,互聯(lián)網(wǎng)大廠主要的解決方案,有兩種:
1春弥、消息落庫(kù)呛哟,對(duì)消息狀態(tài)進(jìn)行打標(biāo)
2、消息的延遲投遞匿沛,做二次確認(rèn)扫责,回調(diào)檢查
-
消息落庫(kù)
-
延遲投遞
延遲投遞方案相比消息落庫(kù)方案,優(yōu)勢(shì)是在于把msg db剝離了核心業(yè)務(wù)俺祠,在大業(yè)務(wù)量的場(chǎng)景中公给,會(huì)減少核心業(yè)務(wù)的數(shù)據(jù)庫(kù)壓力(少了一次msg db的數(shù)據(jù)插入)。
消息冪等性解決方案
冪等性指的是蜘渣,使用相同參數(shù)對(duì)同一資源重復(fù)調(diào)用某個(gè)接口的結(jié)果與調(diào)用一次的結(jié)果相同。
-
可能導(dǎo)致消息出現(xiàn)非冪等的原因:
主要的原因是第三個(gè)肺然,當(dāng)消費(fèi)端拋出異常蔫缸,并且requeue=true(默認(rèn)為true),消息會(huì)一直重新入隊(duì)進(jìn)行消費(fèi),這樣就導(dǎo)致了重復(fù)消費(fèi)际起。
-
冪等性解決方案
首先拾碌,給每條消息生成一個(gè)全局唯一的ID標(biāo)識(shí)(messageId)吐葱,然后在到達(dá)消費(fèi)端時(shí),先將消息(messageId作為主鍵)插入MSG DB數(shù)據(jù)庫(kù)校翔,然后再執(zhí)行業(yè)務(wù)弟跑。這樣如果消費(fèi)端中途拋出了異常,重新入隊(duì)消費(fèi)時(shí)防症,由于還是同一個(gè)消息孟辑,則meesageId還是不變的,當(dāng)插入MSG DB數(shù)據(jù)庫(kù)時(shí)蔫敲,會(huì)因?yàn)橹麈I沖突而拋異常饲嗽,此時(shí)我們捕捉該異常,并且拒絕該消息(channel.basicReject)即可奈嘿。
上面是通過阻止消息的多次消費(fèi)來保證冪等性的貌虾,但是有些場(chǎng)景,我們就需要消息的重試機(jī)制裙犹。比如:消費(fèi)端接收到消息更新了 table1 的記錄尽狠,但是更新 table2 的記錄的時(shí)候報(bào)錯(cuò)了,這時(shí)候消息就會(huì)重新發(fā)送叶圃。我們希望晚唇,下次消息過來,不再去更新已經(jīng)處理了的 table1盗似,而只更新 table2 的數(shù)據(jù)哩陕。
這時(shí)候我們就需要一個(gè)操作流水表了,來記錄每個(gè)訂單下的操作表記錄赫舒,當(dāng)更新 table1 時(shí)悍及,同時(shí)在流水表插入一條該訂單的操作記錄(同一個(gè)事務(wù))。于是下次消息重發(fā)過來的時(shí)候接癌,先判斷流水表中是否有該訂單對(duì) table1 的操作記錄心赶,如果有則跳過,直接執(zhí)行下面的操作缺猛。