基本概念
Amqp概念
amqp,既Advanced Message Queuing Protocol ,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議
包括的要素
信道:多個線程間通過一個 tcp鏈接與服務端通訊,每個線程使用一個信道通訊偎漫,保證
交換器-隊列-綁定-路由鍵
- 路由鍵是發(fā)送者指定由交換機和隊列的綁定
- 生產(chǎn)者發(fā)送消息到交換機
- 消費者綁定隊列
- 到達了無人訂閱的隊列,消息會一直排隊等待
- 一個隊列有多個訂閱者---消息會循環(huán)方式 以此發(fā)個這幾個消費者
- 發(fā)送者發(fā)送一個不存在的路由鍵--消息會丟失
消息的確認
消費者收到的每一條消息都必須進行確認(手動或者自動)
- 消費者遲遲不確認,rabbitMQ 會一直會保持這個消息性昭,直到鏈接的斷開颖低,會將消息投遞到另一個消費者(前提是有多個消費者)
topic模式
可以使通配符 通過交換機&路由鍵是消息到多個隊列中去
虛擬主機
/service1
/service2
多個應用分區(qū)串纺,類似oracle - 表空間
每個用戶名只能連自己的虛擬主機
多個應用時可以很好地做服務隔離
基礎(chǔ)使用
使用RabbitMQ原生Java客戶端進行消息通信
客戶端需要amqp-client-5.0.0.jar和slf4j-api-1.6.1.jar
建議使用Maven:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具體的版本號到Maven的中央倉庫查)的版本染服。
使用RabbitMQ原生Java客戶端進行消息通信
Direct Exchange示例
簡單形式的 生產(chǎn)者-消費者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*direct類型交換器的生產(chǎn)者
*/
public class DirectProducer {
public final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args)
throws IOException, TimeoutException {
/* 創(chuàng)建連接,連接到RabbitMQ*/
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
/*創(chuàng)建信道*/
Channel channel = connection.createChannel();
/*創(chuàng)建交換器*/
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
/*日志消息級別,作為路由鍵使用*/
String[] serverities = {"error","info","warning"};
for(int i=0;i<3;i++){
String severity = serverities[i%3];
String msg = "Hellol,RabbitMq"+(i+1);
/*發(fā)布消息叨恨,需要參數(shù):交換器柳刮,路由鍵,其中以日志消息級別為路由鍵*/
channel.basicPublish(EXCHANGE_NAME,severity,null,
msg.getBytes());
System.out.println("Sent "+severity+":"+msg);
}
channel.close();
connection.close();
}
}
普通消費者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*普通的消費者
*/
public class NormalConsumer {
public static void main(String[] argv)
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道痒钝,與發(fā)送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
/*聲明一個隊列*/
String queueName = "focuserror";
channel.queueDeclare(queueName,false,false,
false,null);
/*綁定诚亚,將隊列和交換器通過路由鍵進行綁定*/
String routekey = "info";/*表示只關(guān)注error級別的日志消息*/
channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);
System.out.println("waiting for message........");
/*聲明了一個消費者*/
//envelope 信封 可以獲取路由鍵,隊列名 等信息
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
}
};
/*消費者正式開始在指定隊列上消費消息*/
channel.basicConsume(queueName,true,consumer);
}
}
消費者綁定多個路由鍵
結(jié)果:這個消費者 會收到多個隊列的消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*隊列和交換器的多重綁定
*/
public class MulitBindConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道午乓,與發(fā)送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
//聲明一個隨機隊列
String queueName = channel.queueDeclare().getQueue();
/*隊列綁定到交換器上時站宗,是允許綁定多個路由鍵的,也就是多重綁定*/
String[] severities={"error","info","warning"};
for(String serverity:severities){
channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,
serverity);
}
System.out.println(" [*] Waiting for messages:");
// 創(chuàng)建隊列消費者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" Received "
+ envelope.getRoutingKey() + ":'" + message
+ "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
一個連接多個信道
結(jié)果:每個消費者 都會收到所有的消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*一個連接多個信道
*/
public class MulitChannelConsumer {
private static class ConsumerWorker implements Runnable{
final Connection connection;
public ConsumerWorker(Connection connection) {
this.connection = connection;
}
public void run() {
try {
/*創(chuàng)建一個信道益愈,意味著每個線程單獨一個信道*/
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
// 聲明一個隨機隊列
String queueName = channel.queueDeclare().getQueue();
//消費者名字梢灭,打印輸出用
final String consumerName
= Thread.currentThread().getName()+"-all";
//所有日志嚴重性級別
String[] severities={"error","info","warning"};
for (String severity : severities) {
//關(guān)注所有級別的日志(多重綁定)
channel.queueBind(queueName,
DirectProducer.EXCHANGE_NAME, severity);
}
System.out.println("["+consumerName+"] Waiting for messages:");
// 創(chuàng)建隊列消費者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message =
new String(body, "UTF-8");
System.out.println(consumerName
+" Received " + envelope.getRoutingKey()
+ ":'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
Connection connection = factory.newConnection();
//一個連接多個信道
for(int i=0;i<2;i++){
/*將連接作為參數(shù)蒸其,傳遞給每個線程*/
Thread worker =new Thread(new ConsumerWorker(connection));
worker.start();
}
}
}
一個隊列多個消費者敏释,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送。
消費者會輪詢收到消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*一個隊列多個消費者摸袁,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送钥顽。
*/
public class MulitConsumerOneQueue {
private static class ConsumerWorker implements Runnable{
final Connection connection;
final String queueName;
public ConsumerWorker(Connection connection,String queueName) {
this.connection = connection;
this.queueName = queueName;
}
public void run() {
try {
/*創(chuàng)建一個信道,意味著每個線程單獨一個信道*/
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
/*聲明一個隊列,rabbitmq靠汁,如果隊列已存在蜂大,不會重復創(chuàng)建*/
channel.queueDeclare(queueName,
false,false,
false,null);
//消費者名字,打印輸出用
final String consumerName
= Thread.currentThread().getName();
//所有日志嚴重性級別
String[] severities={"error","info","warning"};
for (String severity : severities) {
//關(guān)注所有級別的日志(多重綁定)
channel.queueBind(queueName,
DirectProducer.EXCHANGE_NAME, severity);
}
System.out.println(" ["+consumerName+"] Waiting for messages:");
// 創(chuàng)建隊列消費者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message =
new String(body, "UTF-8");
System.out.println(consumerName
+" Received " + envelope.getRoutingKey()
+ ":'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道蝶怔,與發(fā)送端一樣
Connection connection = factory.newConnection();
//3個線程奶浦,線程之間共享隊列,一個隊列多個消費者
String queueName = "focusAll";
for(int i=0;i<3;i++){
/*將隊列名作為參數(shù),傳遞給每個線程*/
Thread worker =new Thread(new ConsumerWorker(connection,queueName));
worker.start();
}
}
}
個人理解:信道只是客戶端與服務端建立連接 踢星,消息消費時 是多個消費者輪詢收到消息 還是 每個消費者收到全部消息 取決于這些消費者是否監(jiān)聽同一個隊列
模式:
一個連接多個信道 -- 實際是 每個信道中有一個隨機產(chǎn)生的隊列名澳叉,此時是多個隊列,是不同的隊列,每個隊列 有一個消費者 那么這個消費者就會收到這個隊列里的所有消息成洗;在上一層 每個信道使用(邦定)的是同一個路由鍵(就是生產(chǎn)者發(fā)布的路由鍵)五督,所以此時每個隊列都會收到生產(chǎn)者發(fā)布的所有消息,進而每個消費者就可以收到所有消息
一個隊列多個消費者瓶殃,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送 -- 實際上是 多個信道使用(邦定)的同一個路由鍵充包,而這些信道使用的是同一個隊列,消息會發(fā)布到這個隊列中(此時只有這一個隊列)碌燕,所以多個消費者監(jiān)聽的是同一個隊列误证,此時消息會被這些消費者(輪詢,分發(fā))收到