轉(zhuǎn)載地址:https://www.cnblogs.com/hlhdidi/p/6535677.html
springboot學習筆記-6 springboot整合RabbitMQ
一 RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).
消息中間件最主要的作用是解耦乳附,中間件最標準的用法是生產(chǎn)者生產(chǎn)消息傳送到隊列绞灼,消費者從隊列中拿取消息并處理技俐,生產(chǎn)者不用關心是誰來消費懈息,消費者不用關心誰在生產(chǎn)消息,從而達到解耦的目的;在分布式的系統(tǒng)中覆致,消息隊列也會被用在很多其它的方面粤攒,比如:分布式事務的支持,RPC的調(diào)用等等;
RabbitMQ是實現(xiàn)AMQP(高級消息隊列協(xié)議)的消息中間件的一種,主要是為了實現(xiàn)系統(tǒng)之間的雙向解耦而實現(xiàn)的庐完。當生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時,消費者無法快速消費徘熔,那么需要一個中間層。保存這個數(shù)據(jù);AMQP淆党,即Advanced Message Queuing Protocol酷师,高級消息隊列協(xié)議讶凉,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計山孔。消息中間件主要用于組件之間的解耦懂讯,消息的發(fā)送者無需知道消息使用者的存在,反之亦然台颠。AMQP的主要特征是面向消息褐望、隊列、路由(包括點對點和發(fā)布/訂閱)串前、可靠性瘫里、安全;RabbitMQ是一個開源的AMQP實現(xiàn),服務器端用Erlang語言編寫荡碾,支持多種客戶端谨读,如:Python、Ruby坛吁、.NET劳殖、Java、JMS拨脉、C哆姻、PHP、ActionScript玫膀、XMPP矛缨、STOMP等,支持AJAX匆骗。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息;
通常隊列服務, 會有三個概念: 發(fā)消息者劳景、隊列、收消息者碉就,RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發(fā)消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發(fā)消息者和隊列就沒有直接聯(lián)系, 轉(zhuǎn)而變成發(fā)消息者把消息給交換器, 交換器根據(jù)調(diào)度策略再把消息再給隊列盟广。
左側(cè) P 代表 生產(chǎn)者,也就是往 RabbitMQ 發(fā)消息的程序瓮钥。
中間即是 RabbitMQ筋量,其中包括了 交換機 和 隊列。
右側(cè) C 代表 消費者碉熄,也就是往 RabbitMQ 拿消息的程序
虛擬主機:一個虛擬主機持有一組交換機桨武、隊列和綁定。為什么需要多個虛擬主機呢锈津?很簡單呀酸,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制琼梆。?因此性誉,如果需要禁止A組訪問B組的交換機/隊列/綁定窿吩,必須為A和B分別創(chuàng)建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”错览。
交換機:Exchange 用于轉(zhuǎn)發(fā)消息纫雁,但是它不會做存儲?,如果沒有 Queue bind 到 Exchange 的話倾哺,它會直接丟棄掉 Producer 發(fā)送過來的消息轧邪。
這里有一個比較重要的概念:路由鍵?。消息到交換機的時候羞海,交互機會轉(zhuǎn)發(fā)到對應的隊列中忌愚,那么究竟轉(zhuǎn)發(fā)到哪個隊列,就要根據(jù)該路由鍵扣猫。
綁定:也就是交換機需要和隊列相綁定菜循,這其中如上圖所示,是多對多的關系
交換機(Exchange)
交換機的功能主要是接收消息并且轉(zhuǎn)發(fā)到綁定的隊列申尤,交換機不存儲消息癌幕,在啟用ack模式后,交換機找不到隊列會返回錯誤昧穿。交換機有四種類型:Direct, topic, Headers and Fanout
Direct:direct 類型的行為是"先匹配, 再投送". 即在綁定時設定一個?routing_key, 消息的routing_key?匹配時, 才會被交換器投送到綁定的隊列中去.
Topic:按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)
Headers:設置header attribute參數(shù)類型的交換機
Fanout:轉(zhuǎn)發(fā)消息到所有綁定隊列
交換機類型的介紹如下
Direct Exchange
Direct Exchange是RabbitMQ默認的交換機模式勺远,也是最簡單的模式,根據(jù)key全文匹配去尋找隊列时鸵。
第一個 X - Q1 就有一個 binding key胶逢,名字為 orange; X - Q2 就有 2 個 binding key饰潜,名字為 black 和 green初坠。當消息中的 路由鍵 和 這個 binding key 對應上的時候,那么就知道了該消息去到哪一個隊列中彭雾。
Ps:為什么 X 到 Q2 要有 black碟刺,green,2個 binding key呢薯酝,一個不就行了嗎半沽? - 這個主要是因為可能又有 Q3,而Q3只接受 black 的信息吴菠,而Q2不僅接受black 的信息者填,還接受 green 的信息。
Topic Exchange
Topic Exchange 轉(zhuǎn)發(fā)消息主要是根據(jù)通配符做葵。?在這種交換機下占哟,隊列和交換機的綁定會定義一種路由模式,那么,通配符就要在這種路由模式和路由鍵之間匹配后交換機才能轉(zhuǎn)發(fā)消息;
在這種交換機模式下:
路由鍵必須是一串字符榨乎,用句號(.) 隔開嗓化,比如說 agreements.us,或者 agreements.eu.stockholm 等谬哀。
路由模式必須包含一個 星號(*),主要用于匹配路由鍵指定位置的一個單詞严肪,比如說史煎,一個路由模式是這樣子:agreements..b.*,那么就只能匹配路由鍵是這樣子的:第一個單詞是 agreements驳糯,第四個單詞是 b吉执。 井號(#)就表示相當于一個或者多個單詞恨溜,例如一個匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin開頭的路由鍵都是可以的食茎。
具體代碼發(fā)送的時候還是一樣,第一個參數(shù)表示交換機疾党,第二個參數(shù)表示routing key间狂,第三個參數(shù)即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 類似, 只是匹配上支持了"模式", 在"點分"的 routing_key 形式中, 可以使用兩個通配符:
*表示一個詞.
#表示零個或多個詞
Headers Exchange
headers 也是根據(jù)規(guī)則匹配, 相較于 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規(guī)則的類型.
在隊列與交換器綁定時, 會設定一組鍵值對規(guī)則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 消息被投送到對應隊列.
Fanout Exchange
Fanout Exchange 消息廣播的模式竣付,不管路由鍵或者是路由模式诡延,會把消息發(fā)給綁定給它的全部隊列,如果配置了routing_key會被忽略古胆。
消息中間件的工作過程可以用生產(chǎn)者消費者模型來表示.即,生產(chǎn)者不斷的向消息隊列發(fā)送信息,而消費者從消息隊列中消費信息.具體過程如下:
從上圖可看出,對于消息隊列來說,生產(chǎn)者,消息隊列,消費者是最重要的三個概念,生產(chǎn)者發(fā)消息到消息隊列中去,消費者監(jiān)聽指定的消息隊列,并且當消息隊列收到消息之后,接收消息隊列傳來的消息,并且給予相應的處理.消息隊列常用于分布式系統(tǒng)之間互相信息的傳遞.
對于RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產(chǎn)者和消息隊列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機,而交換機則根據(jù)調(diào)度策略把相應的消息轉(zhuǎn)發(fā)給對應的消息隊列.那么RabitMQ的工作流程如下所示:
緊接著說一下交換機.交換機的主要作用是接收相應的消息并且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout.
Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創(chuàng)建消息隊列的時候,指定一個BindingKey.當發(fā)送者發(fā)送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發(fā)送到該消息隊列中.
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊列和交換機的綁定主要是依據(jù)一種模式(通配符+字符串),而當發(fā)送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發(fā)送到該消息隊列中.
headers也是根據(jù)一個規(guī)則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規(guī)則,而發(fā)送消息的時候也會指定一組鍵值對規(guī)則,當兩組鍵值對規(guī)則相匹配的時候,消息會被發(fā)送到匹配的消息隊列中.
Fanout是路由廣播的形式,將會把消息發(fā)給綁定它的全部隊列,即便設置了key,也會被忽略.
二.SpringBoot整合RabbitMQ(Direct模式)
SpringBoot整合RabbitMQ非常簡單!感覺SpringBoot真的極大簡化了開發(fā)的搭建環(huán)境的時間..這樣我們程序員就可以把更多的時間用在業(yè)務上了,下面開始搭建環(huán)境:
首先創(chuàng)建兩個maven工程,這是為了模擬分布式應用系統(tǒng)中,兩個應用之間互相交流的過程,一個發(fā)送者(Sender),一個接收者(Receiver)
緊接著,配置pom.xml文件,注意其中用到了springboot對于AMQP(高級消息隊列協(xié)議,即面向消息的中間件的設計)
<parent>
? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? <artifactId>spring-boot-starter-parent</artifactId>
? ? ? ? <version>1.4.0.RELEASE</version>
? ? </parent>
? ? <properties>
? ? ? ? <java.version>1.7</java.version>
? ? ? ? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
? ? </properties>
? ? <dependencies>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter</artifactId>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-devtools</artifactId>
? ? ? ? ? ? <optional>true</optional>
? ? ? ? ? ? <scope>true</scope>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-test</artifactId>
? ? ? ? ? ? <scope>test</scope>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-actuator</artifactId>
? ? ? ? </dependency>
? ? ? ? <!-- 添加springboot對amqp的支持 -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-amqp</artifactId>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-tomcat</artifactId>
? ? ? ? ? ? <scope>provided</scope>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.tomcat.embed</groupId>
? ? ? ? ? ? <artifactId>tomcat-embed-jasper</artifactId>
? ? ? ? ? ? <scope>provided</scope>
? ? ? ? </dependency>
? ? </dependencies>
緊接著,我們編寫發(fā)送者相關的代碼.首先毫無疑問,要書寫啟動類:
@SpringBootApplication
public class App{
? ? public static void main(String[] args) {
? ? ? ? SpringApplication.run(App.class, args);
? ? }
}
接著在application.properties中,去編輯和RabbitMQ相關的配置信息,配置信息的代表什么內(nèi)容根據(jù)鍵就能很直觀的看出了.這里端口是5672,不是15672...15672是管理端的端口!
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
隨后,配置Queue(消息隊列).那注意由于采用的是Direct模式,需要在配置Queue的時候,指定一個鍵,使其和交換機綁定.
@Configuration
public class SenderConf {
? ? @Bean
? ? public Queue queue() {
? ? ? ? ? return new Queue("queue");
? ? }
}
接著就可以發(fā)送消息啦!在SpringBoot中,我們使用AmqpTemplate去發(fā)送消息!代碼如下:
@Component
public class HelloSender {
? ? @Autowired
? ? private AmqpTemplate template;
? ? public void send() {
? ? template.convertAndSend("queue","hello,rabbit~");
? ? }
}
編寫測試類!這樣我們的發(fā)送端代碼就編寫完了~
@SpringBootTest(classes=App.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class TestRabbitMQ {
? ? @Autowired
? ? private HelloSender helloSender;
? ? @Test
? ? public void testRabbit() {
? ? ? ? helloSender.send();
? ? }
}
接著我們編寫接收端.接收端的pom文件,application.properties(修改spring.application.name),Queue配置類,App啟動類都是一致的!這里省略不計.主要在于我們需要配置監(jiān)聽器去監(jiān)聽綁定到的消息隊列,當消息隊列有消息的時候,予以接收,代碼如下:
@Component
public class HelloReceive {
? ? @RabbitListener(queues="queue")? ? //監(jiān)聽器監(jiān)聽指定的Queue
? ? public void processC(String str) {
? ? ? ? System.out.println("Receive:"+str);
? ? }
}
接下來就可以測試啦,首先啟動接收端的應用,緊接著運行發(fā)送端的單元測試,接收端應用打印出來接收到的消息,測試即成功!
需要注意的地方,Direct模式相當于一對一模式,一個消息被發(fā)送者發(fā)送后,會被轉(zhuǎn)發(fā)到一個綁定的消息隊列中,然后被一個接收者接收!
實際上RabbitMQ還可以支持發(fā)送對象:當然由于涉及到序列化和反序列化,該對象要實現(xiàn)Serilizable接口.HelloSender做出如下改寫:
public void send() {
? ? ? ? User user=new User();? ? //實現(xiàn)Serializable接口
? ? ? ? user.setUsername("hlhdidi");
? ? ? ? user.setPassword("123");
? ? ? ? template.convertAndSend("queue",user);
}
HelloReceiver做出如下改寫:
@RabbitListener(queues="queue")//監(jiān)聽器監(jiān)聽指定的Queuepublicvoidprocess1(User user) {//用User作為參數(shù)System.out.println("Receive1:"+user);
? ? }
三.SpringBoot整合RabbitMQ(Topic轉(zhuǎn)發(fā)模式)
首先我們看發(fā)送端,我們需要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規(guī)則綁定到交換機上:
@Configuration
public class SenderConf {
? ? ? ? @Bean(name="message")
? ? ? ? public Queue queueMessage() {
? ? ? ? ? ? return new Queue("topic.message");
? ? ? ? }
? ? ? ? @Bean(name="messages")
? ? ? ? public Queue queueMessages() {
? ? ? ? ? ? return new Queue("topic.messages");
? ? ? ? }
? ? ? ? @Bean
? ? ? ? public TopicExchange exchange() {
? ? ? ? ? ? return new TopicExchange("exchange");
? ? ? ? }
? ? ? ? @Bean
? ? ? ? Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
? ? ? ? ? ? return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
? ? ? ? }
? ? ? ? @Bean
? ? ? ? Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
? ? ? ? ? ? return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞
? ? ? ? }
}
而在接收端,我們配置兩個監(jiān)聽器,分別監(jiān)聽不同的隊列:
@RabbitListener(queues="topic.message") //監(jiān)聽器監(jiān)聽指定的Queue
? ? public void process1(String str) {? ?
? ? ? ? System.out.println("message:"+str);
? ? }
? ? @RabbitListener(queues="topic.messages")? ? //監(jiān)聽器監(jiān)聽指定的Queue
? ? public void process2(String str) {
? ? ? ? System.out.println("messages:"+str);
? ? }
好啦!接著我們可以進行測試了!首先我們發(fā)送如下內(nèi)容:
templateConvertAndSend("exchange","topic.message","hello,rabbit!");
方法的第一個參數(shù)是交換機名稱,第二個參數(shù)是發(fā)送的key,第三個參數(shù)是內(nèi)容,RabbitMQ將會根據(jù)第二個參數(shù)去尋找有沒有匹配此規(guī)則的隊列,如果有,則把消息給它,如果有不止一個,則把消息分發(fā)給匹配的隊列(每個隊列都有消息!),顯然在我們的測試中,參數(shù)2匹配了兩個隊列,因此消息將會被發(fā)放到這兩個隊列中,而監(jiān)聽這兩個隊列的監(jiān)聽器都將收到消息!那么如果把參數(shù)2改為topic.messages呢?顯然只會匹配到一個隊列,那么process2方法對應的監(jiān)聽器收到消息!
四.SpringBoot整合RabbitMQ(Fanout Exchange形式)
那前面已經(jīng)介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發(fā)送到路由器的消息會使得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規(guī)則(即上文中convertAndSend方法的參數(shù)2),也會被忽略!那么直接上代碼,發(fā)送端配置如下:
@Configuration
public class SenderConf {
? ? ? ? @Bean(name="Amessage")
? ? ? ? public Queue AMessage() {
? ? ? ? ? ? return new Queue("fanout.A");
? ? ? ? }
? ? ? ? @Bean(name="Bmessage")
? ? ? ? public Queue BMessage() {
? ? ? ? ? ? return new Queue("fanout.B");
? ? ? ? }
? ? ? ? @Bean(name="Cmessage")
? ? ? ? public Queue CMessage() {
? ? ? ? ? ? return new Queue("fanout.C");
? ? ? ? }
? ? ? ? @Bean
? ? ? ? FanoutExchange fanoutExchange() {
? ? ? ? ? ? return new FanoutExchange("fanoutExchange");//配置廣播路由器
? ? ? ? }
? ? ? ? @Bean
? ? ? ? Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
? ? ? ? ? ? return BindingBuilder.bind(AMessage).to(fanoutExchange);
? ? ? ? }
? ? ? ? @Bean
? ? ? ? Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
? ? ? ? ? ? return BindingBuilder.bind(BMessage).to(fanoutExchange);
? ? ? ? }
? ? ? ? @Bean
? ? ? ? Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
? ? ? ? ? ? return BindingBuilder.bind(CMessage).to(fanoutExchange);
? ? ? ? }
}
發(fā)送端使用如下代碼發(fā)送:
templateConvertAndSend("fanoutExchange","","xixi,rabbit!");//參數(shù)2將被忽略
接收端監(jiān)聽器配置如下:
@Component
public class HelloReceive {
? ? @RabbitListener(queues="fanout.A")
? ? public void processA(String str1) {
? ? ? ? System.out.println("ReceiveA:"+str1);
? ? }
? ? @RabbitListener(queues="fanout.B")
? ? public void processB(String str) {
? ? ? ? System.out.println("ReceiveB:"+str);
? ? }
? ? @RabbitListener(queues="fanout.C")
? ? public void processC(String str) {
? ? ? ? System.out.println("ReceiveC:"+str);
? ? }
}
運行測試代碼,發(fā)現(xiàn)三個監(jiān)聽器都接收到了數(shù)據(jù),測試成功!
第一個 X - Q1 就有一個 binding key肆良,名字為 orange; X - Q2 就有 2 個 binding key逸绎,名字為 black 和 green惹恃。當消息中的 路由鍵 和 這個 binding key 對應上的時候,那么就知道了該消息去到哪一個隊列中棺牧。