在介紹了rabbitmq的Publish/subscrige模式之后拳亿,這一節(jié)我們將闡述它的Routing模式的用法逾苫。
1吧黄、前提約束
- 已經完成rabbitmq的第一個簡單的測試程序
http://www.reibang.com/p/77bfc4fe5a1a -
2氛濒、操作步驟
我們馬上要測試的rabbitmq的工作模式如下:
Routing - 在src/main/java文件夾下創(chuàng)建包net.wanho.rabbitmq.routing
- 在net.wanho.rabbitmq.routing創(chuàng)建Producer.java
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] argv) throws Exception {
String queue_inform_email = "queue_inform_email";
String queue_inform_sms = "queue_inform_sms";
String exchange_routing_inform = "exchange_routing_inform";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建與交換機的通道产场,每個通道代表一個會話
Channel channel = connection.createChannel();
/**
聲明交換機 String exchange, BuiltinExchangeType type
參數(shù)明細
1、交換機名稱
2舞竿、交換機類型京景,fanout、topic骗奖、direct确徙、headers
*/
channel.exchangeDeclare(exchange_routing_inform, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queue_inform_email, true, false, false, null);
channel.queueDeclare(queue_inform_sms, true, false, false, null);
channel.queueBind(queue_inform_email, exchange_routing_inform, queue_inform_email);
channel.queueBind(queue_inform_sms, exchange_routing_inform, queue_inform_sms);
for (int i = 0; i < 10; i++) {
String message = "email inform to user" + i;
channel.basicPublish(exchange_routing_inform, queue_inform_email, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
for (int i = 0; i < 10; i++) {
String message = "sms inform to user" + i;
channel.basicPublish(exchange_routing_inform, queue_inform_sms, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}
}
- 在net.wanho.rabbitmq.routing創(chuàng)建Consumer1.java
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
String QUEUE_INFORM_EMAIL = "queue_inform_email";
String EXCHANGE_ROUTING_INFORM = "inform_exchange_routing";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String message = new String(body, "utf-8");
System.out.println(message);
}
};
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
- 在net.wanho.rabbitmq.routing創(chuàng)建Consumer2.java
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
String QUEUE_INFORM_SMS = "queue_inform_sms";
String EXCHANGE_ROUTING_INFORM = "inform_exchange_routing";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String message = new String(body, "utf-8");
System.out.println(message);
}
};
channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer);
}
}
- 測試
啟動Consumer1兩次,啟動Consumer2一次执桌,啟動Producer鄙皇,我們會看到屬于consumer1的消息會被兩個consumer1實例合起來消費,屬于consumer2的消息會被consumer2全部消費仰挣。
以上就是我們完成的rabbitmq的Routing模式的測試伴逸。