1 消費(fèi)者
1創(chuàng)建連接--------2創(chuàng)建信道---------3聲明交換機(jī)(名稱累提,類型)------4發(fā)送消息交換器(交換器,路由)
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名
factory.setHost("127.0.0.1");
// 創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個(gè)信道
Channel channel = connection.createChannel();
// 指定轉(zhuǎn)發(fā)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = "producer_create";
channel.queueDeclare(queueName,false,false,
false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"test");
//所有日志嚴(yán)重性級(jí)別
String[] severities={"error","info","warning"};
for(int i=0;i<3;i++){
String severity = severities[i%3];//每一次發(fā)送一條不同嚴(yán)重性的日志
// 發(fā)送的消息
String message = "Hello World_"+(i+1);
//參數(shù)1:exchange name
//參數(shù)2:routing key
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
}
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
2 消費(fèi)者
1創(chuàng)建連接------2創(chuàng)建信道----3聲明交換機(jī)類型(名稱,類型)-----創(chuàng)建一個(gè)隊(duì)列----隊(duì)列綁定到交換機(jī)(隊(duì)列名稱 交換機(jī)名稱 路由名稱)
public class Consumer1 {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道踢匣,與發(fā)送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME, "direct");
// 聲明一個(gè)隨機(jī)隊(duì)列
String queueName = channel.queueDeclare().getQueue();
//所有日志嚴(yán)重性級(jí)別
String[] severities={"error","info","warning"};
for (String severity : severities) {
//關(guān)注所有級(jí)別的日志(多重綁定)
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages:");
// 創(chuàng)建隊(duì)列消費(fèi)者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" Received " + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
image.png
信道:信道是生產(chǎn)消費(fèi)者與rabbit通信的渠道型凳,生產(chǎn)者publish或是消費(fèi)者subscribe一個(gè)隊(duì)列都是通過信道來通信的。信道是建立在TCP連接上的虛擬連接,什么意思呢匀们?就是說rabbitmq在一條TCP上建立成百上千個(gè)信道來達(dá)到多個(gè)線程處理,這個(gè)TCP被多個(gè)線程共享,每個(gè)線程對(duì)應(yīng)一個(gè)信道,信道在rabbit都有唯一的ID ,保證了信道私有性,對(duì)應(yīng)上唯一的線程使用
交換器 生產(chǎn)者發(fā)送消息的地方 通過信道發(fā)送
總結(jié)
生產(chǎn)者---------通過信道 按照某種標(biāo)識(shí)(路由) 發(fā)送給到到交換器
消費(fèi)者 --------- 創(chuàng)建隊(duì)列,通過某種標(biāo)識(shí)(路由)綁定到交換器,獲取指定的消息進(jìn)行消費(fèi)