什么是rabbitmq茶行?
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)阴挣。RabbitMQ服務(wù)器是用Erlang語言編寫的气堕,而群集和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
第一種方式:點(diǎn)對(duì)點(diǎn)
- P:生成者
- C:消費(fèi)者
- 紅色方塊代表信道
生成者
void producerSendMessing() {
//生產(chǎn)者代碼
//創(chuàng)建一個(gè)rabbitmq的連接工廠
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
//開啟一個(gè)server連接
Connection connection = connectionFactory.newConnection();
//server連接創(chuàng)建一個(gè)信道
Channel channel = connection.createChannel();
//通過信道綁定一個(gè)隊(duì)列,兩個(gè)重載方法
/*
* @param queue :隊(duì)列的名稱
* @param durable 是否持久化茎芭,
* @param exclusive 是否獨(dú)占這個(gè)隊(duì)列
* @param autoDelete 是否消費(fèi)之后揖膜,刪除隊(duì)列
* @param arguments 其他參數(shù)設(shè)置,是一個(gè)map
* */
channel.queueDeclare("hello",true,false,false,null);
/**
* 三個(gè)重載方法梅桩,
*
* @param exchange 交換器
* @param routingKey 路由密鑰
* @param支持消息的其他屬性-路由標(biāo)頭等
* @param正文消息正文
*/
channel.basicPublish("","hello",null,"hello world!".getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
消費(fèi)者
void consumerReceivingMessing() {
//生產(chǎn)者代碼
//創(chuàng)建一個(gè)rabbitmq的連接工廠
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
//開啟一個(gè)server連接
Connection connection = connectionFactory.newConnection();
//server連接創(chuàng)建一個(gè)信道
Channel channel = connection.createChannel();
//通過信道綁定一個(gè)隊(duì)列,兩個(gè)重載方法
/*
* @param queue :隊(duì)列的名稱
* @param durable 是否持久化壹粟,
* @param exclusive 是否獨(dú)占這個(gè)隊(duì)列
* @param autoDelete 是否消費(fèi)之后,刪除隊(duì)列
* @param arguments 其他參數(shù)設(shè)置宿百,是一個(gè)map
* */
channel.queueDeclare("hello",true,false,false,null);
/**
* @param queue隊(duì)列名稱
* @param autoAck如果服務(wù)器應(yīng)考慮消息趁仙,則為true。自動(dòng)確認(rèn)機(jī)制
*交付后確認(rèn)犀呼; 如果服務(wù)器應(yīng)該期望幸撕,則返回false
*明確的確認(rèn)
* @param回調(diào)用戶對(duì)象的接口Consumer,默認(rèn)實(shí)現(xiàn)類DefaultConsumer,需要傳入信道
*/
String hello = channel.basicConsume("hello", true,new DefaultConsumer(channel){
//最后一個(gè)參數(shù)是消息體
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
System.out.println(new String(body));
}
});
System.out.println(hello);
//連接關(guān)閉
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
總結(jié)
- 無論是生成者還是消費(fèi)者都需要連接到rabbitmq的server外臂。通過信道操作消息
work模型
Work queues, 也被稱為(Task queues)坐儿, 任務(wù)模型。 當(dāng)消息處理比較耗時(shí)的時(shí)候宋光,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度貌矿。長此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理罪佳。此時(shí)就可以使用work模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列逛漫,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi)赘艳, 就會(huì)消失酌毡,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。
角色:
●P:生產(chǎn)者:任務(wù)的發(fā)布者
●C1:消費(fèi)者蕾管,領(lǐng)取任務(wù)并且完成任務(wù)枷踏,假設(shè)完成速度較慢
●C2:消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
總結(jié)
- 默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者掰曾。平均而言旭蠕,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為輪詢旷坦。
為了避免消費(fèi)者1處理消息的業(yè)務(wù)慢掏熬,消費(fèi)者2處理消息的業(yè)務(wù)快。但是因?yàn)檩喸儥C(jī)制秒梅,導(dǎo)致消息被消費(fèi)者1拿了過去進(jìn)行堵塞旗芬,從而導(dǎo)致系統(tǒng)宕機(jī)。消息確認(rèn)機(jī)制相應(yīng)出現(xiàn)捆蜀。
消費(fèi)者需要拿到消息之后回復(fù)隊(duì)列已處理才會(huì)拿到下一條消息疮丛,從而實(shí)現(xiàn)能者多勞
- 消費(fèi)者1:假設(shè)完成速度較慢
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//信道每次只傳遞1個(gè)消息
channel.basicQos(1);
//綁定到一個(gè)work隊(duì)列
channel.queueDeclare("work",true,false,false,null);
//獲取消息
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//模擬消費(fèi)者處理慢的場景
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費(fèi)者2:--->"+new String(body));
//參數(shù)1:根據(jù)標(biāo)簽回復(fù)那條消息辆琅,參數(shù)2:是否回復(fù)多條消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
- 消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//信道每次只傳遞1個(gè)消息
channel.basicQos(1);
//綁定到一個(gè)work隊(duì)列
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1:--->"+new String(body));
//參數(shù)1:根據(jù)標(biāo)簽回復(fù)那條消息,參數(shù)2:是否回復(fù)多條消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
注意
代碼中在消費(fèi)者里面設(shè)置了信道里面只消費(fèi)1條消息这刷,并且處理業(yè)務(wù)之后會(huì)回復(fù)消息已被消費(fèi)
fanout廣播模型
在廣播模式下婉烟,消息發(fā)送流程是這樣的:
●可以有多個(gè)消費(fèi)者
●每個(gè)消費(fèi)者有自己的queue (隊(duì)列)
●每個(gè)隊(duì)列都要綁定到Exchange (交換機(jī))
●生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī)暇屋,交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列似袁,生產(chǎn)者無法決定。
●交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
●隊(duì)列的消費(fèi)者都能拿到消息咐刨。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
生產(chǎn)者代碼
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//聲明一個(gè)交換機(jī)昙衅。參數(shù)1:交換機(jī)名稱,參數(shù)2:選擇交換機(jī)模式定鸟。fanout:廣播模式
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,("fanout條消息").getBytes());
//關(guān)閉連接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消費(fèi)者代碼
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//綁定一個(gè)交換機(jī)
channel.exchangeDeclare("logs","fanout");
//獲取臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//綁定交換機(jī)和隊(duì)列
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1:--->"+new String(body));
}
});
}
Routing模式:1direct(直連模型)
在Fanout模式中而涉,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)联予。但是在某些場景下啼县,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange沸久。
在Direct模型下:
●隊(duì)列與交換機(jī)的綁定季眷,不能是任意綁定了,而是要指定-個(gè)RoutingKey (路由key)
●消息的發(fā)送方在向Exchange發(fā)送消息時(shí)卷胯,也必須指定消息的RoutingKey子刮。
●Exchange不再把消息交給每- 個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷窑睁,只有隊(duì)列的Routingkey與消息的Routing key完全一致挺峡,才會(huì)接收到消息
生產(chǎn)者代碼
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//創(chuàng)建一個(gè)交換器,參數(shù)1:交換器名稱担钮,參數(shù)2:交換器模式:路由
channel.exchangeDeclare("log_direct","direct");
//聲明一個(gè)routingKey
String routingKey="error";
//發(fā)送消息
channel.basicPublish("log_direct",routingKey,null,("這是direc模型發(fā)送的消息:"+routingKey).getBytes());
//關(guān)閉鏈接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消費(fèi)者1代碼
public static void main(String[] args) throws IOException {
//獲取鏈接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建信道
Channel channel = mqConnection.createChannel();
//信道綁定交換器
channel.exchangeDeclare("log_direct","direct");
//獲取臨時(shí)隊(duì)列名稱
String queue = channel.queueDeclare().getQueue();
String routingKey="info";
//信道綁定交換器橱赠,路由鍵,隊(duì)列
channel.queueBind(queue,"log_direct",routingKey);
//獲取消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1:"+new String(body));
}
});
}
消費(fèi)者2代碼
public static void main(String[] args) throws IOException {
//創(chuàng)建一個(gè)連接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建一個(gè)信道
Channel channel = mqConnection.createChannel();
//綁定一個(gè)交換機(jī)
channel.exchangeDeclare("log_direct","direct");
//獲取信道的一個(gè)臨時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
//定義三個(gè)路由鍵
String routingKey="error";
String routingKeyWarring="warring";
String routingKeyInfo="info";
//信道綁定隊(duì)列裳朋,交換機(jī)和路由鍵
channel.queueBind(queue,"log_direct",routingKey);
channel.queueBind(queue,"log_direct",routingKeyWarring);
channel.queueBind(queue,"log_direct",routingKeyInfo);
//獲取消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者2:消費(fèi)了"+new String(body));
}
});
}
Routing模式:topic(訂閱模型)
Topic類型的Exchange與Direct相比病线,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列吓著。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key的時(shí)候使用通配
符!這種模型Routingkey 一般都是由一個(gè)或多個(gè)單詞組成鲤嫡,多個(gè)單詞之間以"分割,例如: item. insert
- 通配符:
* 代表只匹配一個(gè)單詞绑莺,比如user.*的路由鍵可以接受user.add暖眼,user.delete,user.update
#代表只匹配多個(gè)單詞纺裁,比如user.#的路由鍵可以接受user.add.all诫肠,user.delete.all
生產(chǎn)者代碼
public static void main(String[] args) throws IOException {
//創(chuàng)建連接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//創(chuàng)建信道
Channel channel = mqConnection.createChannel();
//綁定交換機(jī)
channel.exchangeDeclare("topic","topic");
//路由
String routingKey="user.save";
//發(fā)送消息
channel.basicPublish("topic",routingKey,null,("生產(chǎn)了"+routingKey+"消息").getBytes());
//關(guān)閉連接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消費(fèi)者1代碼是 * 的通配符
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
//路由鍵匹配一個(gè)單詞比如user.save
channel.queueBind(queue,"topic","user.*");
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1:"+new String(body));
}
});
}
消費(fèi)者2代碼是#號(hào)的通配符
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
//路由鍵匹配多個(gè)單詞比如user.id.delete
channel.queueBind(queue,"topic","user.#");
//消費(fèi)消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者1:"+new String(body));
}
});
}