創(chuàng)建生產(chǎn)者代碼示例:
public class Producer {
//RabbitMQ服務(wù)器地址
public final static String host="192.168.1.1";
//RabbitMQ端口
public final static int port=5672;
//RabbitMQ虛擬主機(jī)
public static final String virtualHost="/";
//RabbitMQ用戶名
public final static String username="admin";
//RabbitMQ密碼
public final static String password="123456";
//隊(duì)列名稱
public final static String queue_name="serviceNotice.queue";
public static void main(String[] args) throws IOException{
//創(chuàng)建連接工廠乱投,此部分可以單獨(dú)抽出作為一個(gè)靜態(tài)抽象方法以便調(diào)用
ConnectionFactory factory=new ConnectionFactory();
//設(shè)置服務(wù)器地址
factory.setHost(host);
//設(shè)置服務(wù)器端口
factory.setPort(port);
//設(shè)置虛擬主機(jī)
factory.setVirtualHost(virtualHost);
//設(shè)置用戶名
factory.setUsername(userName);
//設(shè)置密碼
factory.setPassword(password);
//獲取連接
Connection connection=factory.newConnection();
//創(chuàng)建信道
Channel channel=connection.createChannel();
//信道指定隊(duì)列設(shè)置,如果在Rabbit管理工具中創(chuàng)建了隊(duì)列,則不需要調(diào)用此方法
//參數(shù)(名字散劫,是否持久化诗鸭,獨(dú)占的隊(duì)列西疤,不使用時(shí)是否自動刪除,其他參數(shù))
channel.queueDeclare(queue_name,true,false,true,null);
String message="這是一個(gè)測試消息";
//發(fā)布消息
//參數(shù)(交換器名稱,隊(duì)列名稱,屬性,參數(shù)的字節(jié)數(shù)據(jù))
channel.basicPublish("",queue_name,null,message.getBytes());
//關(guān)閉信道
channel.close();
//關(guān)閉連接
connection.close();
}
}
創(chuàng)建消費(fèi)者代碼示例:
public class Consumer {
//RabbitMQ服務(wù)器地址
public final static String host="192.168.1.1";
//RabbitMQ端口
public final static int port=5672;
//RabbitMQ虛擬主機(jī)
public static final String virtualHost="/"蜡秽;
//RabbitMQ用戶名
public final static String username="admin";
//RabbitMQ密碼
public final static String password="123456";
//隊(duì)列名稱
public final static String queue_name="serviceNotice.queue";
public static void main(String[] args) throws IOException {
//創(chuàng)建連接工廠,此部分可以單獨(dú)抽出作為一個(gè)靜態(tài)抽象方法以便調(diào)用
ConnectionFactory factory=new ConnectionFactory();
//設(shè)置服務(wù)器地址
factory.setHost(host);
//設(shè)置服務(wù)器端口
factory.setPort(port);
//設(shè)置虛擬主機(jī)
factory.setVirtualHost(virtualHost);
//設(shè)置用戶名
factory.setUsername(userName);
//設(shè)置密碼
factory.setPassword(password);
//獲取連接
Connection connection = factory.newConnection();
//創(chuàng)建信道
Channel channel = connection.createChannel();
//信道設(shè)置,如果在Rabbit管理工具中創(chuàng)建了隊(duì)列缆镣,則不需要調(diào)用此方法
channel.queueDeclare(queue_name, true, false, true, null);
//創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//消費(fèi)消息芽突,false表示需手動確認(rèn)消息已成功獲取
channel.basicConsume(queue_name,false,queueingConsumer);
while (true) { //消費(fèi)者程序運(yùn)行開著 如果生產(chǎn)者新增了數(shù)據(jù)會自動獲取
// nextDelivery是一個(gè)阻塞方法(內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收消息:" + message);
//消息確認(rèn)為成功獲取,false表示不重新入隊(duì)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}