先說一下安裝rabbitmq
1,安裝opt_win64_22.0.exe (Erlang )
2僚饭,安裝rabbitmq
3,計算機--管理--服務(wù) 找到rabbitmq服務(wù)右鍵屬性胧砰,更改為當前登錄人并填寫密碼鳍鸵,然后重啟服務(wù)
4,去安裝目錄C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin的路徑的cmd窗口 執(zhí)行:rabbitmq-plugins enable rabbitmq_management 打開管理窗口
5尉间,管理地址:http://localhost:15672 登錄名:guest偿乖,密碼:guest(注意:本地登錄才有效,遠程登錄不行)
概述:
rabbitmq是一個消息隊列哲嘲,主要用于系統(tǒng)之間的異步和解耦贪薪,同時也能起到消息緩存,消息分發(fā)的作用眠副。
一般的消息隊列有3個關(guān)鍵部分画切,生產(chǎn)者,隊列囱怕,消費者霍弹;但是rabbitmq還有一個exchange(交換機);交換機的作用是:生產(chǎn)者把消息給交換機然后根據(jù)策略路由到隊列上面去保存娃弓;交換機不能保存消息典格。
rabbitmq還有一個虛擬主機的概念:其作用就是權(quán)限隔離,A虛擬機的交換機和隊列跟B虛擬機的交換機和隊列是不能互相訪問(針對用戶)台丛,就是用戶僅有A虛擬機的權(quán)限就不能去訪問B虛擬機里的交換機和隊列钝计。
交換機四種模式(每個隊列中的消息只能被消費一次):
交換機的作用就是把消息路由到綁定的隊列中去。
direct:簡單模式齐佳,一個發(fā)私恬,另一個收
topic:主題模式,比direct靈活
fanout:廣播模式炼吴,綁定的隊列全部都能消費
headers:設(shè)置 header attribute 參數(shù)類型的交換機(使用的少本鸣,后期再琢磨)
特性:
ack:應(yīng)答機制,用于很重要的消息硅蹦,必須確保送到且消費荣德。默認是自動應(yīng)答;如需手動應(yīng)答需要自己設(shè)置配置文件童芹。
durable:持久化機制涮瞻,把隊列中的數(shù)據(jù)持久化到硬盤,避免rabbitmq服務(wù)器down機假褪,而數(shù)據(jù)不會丟失署咽。
貼代碼(springboot整合rabbitmq):
properties配置:
#rabbitmq配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
direct模式(不用配置交換機,有默認的)
配置隊列:
package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq的默認模式:direct生音,最簡單的隊列模式宁否,一個生產(chǎn)者,一個消費者缀遍,交換機也是默認的不用創(chuàng)建慕匠,路由key默認是隊列名
* 1個消息只能被消費一次,如果有多個消費者域醇,rabbitmq服務(wù)器會自己做負載均衡台谊,平均消費
*/
@Configuration
public class DirectConfig {
public static final String queue_direct_a = "queue.direct.a";
public static final String queue_direct_user = "queue.direct.user";
@Bean(name = queue_direct_a)
public Queue queue1(){
return new Queue(queue_direct_a);
}
@Bean(name = queue_direct_user)
public Queue queue2(){
return new Queue(queue_direct_user);
}
}
生產(chǎn)者:
package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 生產(chǎn)者
*/
@Component
public class DirectProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String info){
String now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
String msg = "時間: " + now + ", info: " + info;
amqpTemplate.convertAndSend(DirectConfig.queue_direct_a, msg);
}
/**
* mq之間傳遞Java對象(Java對象要實現(xiàn)序列化接口)
* @param user
*/
public void sendMsg(User user){
amqpTemplate.convertAndSend(DirectConfig.queue_direct_user, user);
}
}
消費者:
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = DirectConfig.queue_direct_user)
public class DirectConsumer3 {
@RabbitHandler
public void receive(User user){
// 如果是String類型,接收類型就改為String
System.out.println("-----------------------------user--------------------------");
System.out.println(user);
}
}
topic模式
配置隊列譬挚,交換機锅铅,互相綁定
package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic模式相比direct模式就是靈活一點(在同一個隊列中也是1個消息實體只能被消費1次)
* 一個交換機可以綁定到多個隊列上,也可以使用匹配模式來匹配符合規(guī)則的隊列
* 匹配模式:*代表一個元素殴瘦,#代表0個或多個元素
* 隊列命名原則:aaa.bbbb.ccc 以.隔開
*/
@Configuration
public class TopicConfig {
public static final String queue_topic_a = "queue.topicA"; //不支持queue.topic.a 三級模式(這個坑狠角,我剛踩完)
public static final String queue_topic_b = "queue.topicB";
public static final String exchange_topic = "exchange_topic";
@Bean(name = queue_topic_a)
public Queue queue1(){
return new Queue(queue_topic_a);
}
@Bean(name = queue_topic_b)
public Queue queue2(){
return new Queue(queue_topic_b);
}
@Bean(name = exchange_topic)
public TopicExchange exchange(){
return new TopicExchange(exchange_topic);
}
/**
* 把隊列綁定到交換機上面
* with方法中的參數(shù)是路由key
* 在使用的時候會根據(jù)交換和路由隊列名來推送消息,
* 如果隊列名符合路由key的規(guī)則就會把消息推送到綁定的隊列中去
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindingExchangeA(@Qualifier(queue_topic_a) Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with(queue_topic_a);
}
@Bean
public Binding bindingExchangeB(@Qualifier(queue_topic_b) Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("queue.*");
}
}
生產(chǎn)者:
package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String text){
amqpTemplate.convertAndSend(TopicConfig.exchange_topic, TopicConfig.queue_topic_b, text);
}
}
消費者:
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(queues = TopicConfig.queue_topic_a)
public void receiveA(String info){
System.out.println("-----------------------隊列A:" + info);
}
@RabbitListener(queues = TopicConfig.queue_topic_b)
public void receiveB(String info){
System.out.println("-----------------------隊列B:" + info);
}
}
fanout模式就是廣播模式蚪腋,和topic類似丰歌,把不同點說說
@Bean
public FanoutExchange exchange() {
// 是fanout的交換機
return new FanoutExchange(exchange_fanout);
}
@Bean
public Binding bindingFanoutA(@Qualifier(queue_fanout_a) Queue queue, FanoutExchange fanoutExchange){
// 沒有路由key,(后面沒有with)
return BindingBuilder.bind(queue).to(fanoutExchange);
}
// 生產(chǎn)者屉凯,注意隊列名為null
public void sendMsg(String info){
// amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, info); 此用法無效
amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, null, info);
}
應(yīng)答機制(properties記得開啟配置):
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.FanoutConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class FanoutConsumer {
@RabbitListener(queues = FanoutConfig.queue_fanout_a)
public void receive1(String info, Channel channel, Message message){
try {
// 應(yīng)答這條消息立帖,如果沒有應(yīng)答,下次連接消息依然會收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("----------------------------隊列A: "+info);
}catch (Exception e){
e.printStackTrace();
}
}
@RabbitListener(queues = FanoutConfig.queue_fanout_b)
public void receive2(String info, Channel channel, Message message){
try {
// 銷毀這條消息(在隊列中銷毀)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("----------------------------隊列B: "+info);
}
}
持久化機制:默認是持久化
看源碼的構(gòu)造方法:隊列和交換機
隊列:
public class Queue extends AbstractDeclarable {
public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
private final String name;
private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private volatile String actualName;
public Queue(String name) {
this(name, true, false, false); //看第2個參數(shù)默認true
}
public Queue(String name, boolean durable) {
this(name, durable, false, false, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, (Map)null);
}
}
交換機:
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
private final String name;
private final boolean durable;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private volatile boolean delayed;
private boolean internal;
public AbstractExchange(String name) {
this(name, true, false); //看第2個參數(shù)默認為true
}
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, (Map)null);
}
}