一奢赂、pom引入包
<dependency>
? ? ? ? ? ? <groupId>com.rabbitmq</groupId>
? ? ? ? ? ? <artifactId>amqp-client</artifactId>
? ? ? ? ? ? <version>3.6.5</version>
</dependency>
二、創(chuàng)建消息發(fā)送者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生成者
*/
public class Producer {
????public final static String QUEUE_NAME = "rabbitMQ.test";
????public static void main(String[] args) throws IOException, TimeoutException {
????????// 創(chuàng)建連接工廠
????????ConnectionFactory factory = new ConnectionFactory();
????????// 設(shè)置RabbitMQ相關(guān)信息
????????factory.setHost("localhost");
????????// factory.setUsername("guest");
????????// factory.setPassword("guest");
????????// factory.setPort(5672);
????????// 創(chuàng)建一個(gè)新的連接
????????Connection connection = factory.newConnection();
????????// 創(chuàng)建一個(gè)通道
????????Channel channel = connection.createChannel();
????????// 聲明一個(gè)隊(duì)列
????????// 1.String queue表示隊(duì)列名稱
????????// 2.boolean durable是否持久化
????????// 3.exclusive是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列觅彰,斷開(kāi)后自動(dòng)刪除)
????????// 4.boolean autoDelete當(dāng)所有消費(fèi)者客戶端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
????????// 5.Map<String, Object> arguments隊(duì)列的其他參數(shù)
????????channel.queueDeclare(QUEUE_NAME, false, false, false, null);
????????String message = "Hello RabbitMQ";
????????// 發(fā)送消息到隊(duì)列中
????????// 1.String exchange交換機(jī)名稱
????????// 2.String routingKey隊(duì)列映射的路由key
????????// 3.BasicProperties props消息的其他屬性
????????// 4.byte[] body發(fā)送信息的主體
????????channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
????????System.out.println("Producer Send +'" + message + "'");
????????// 關(guān)閉通道和連接
????????channel.close();
????????connection.close();
????}
}
三、創(chuàng)建消息接收者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消息接收者
*/
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
????????// 創(chuàng)建連接工廠
????????ConnectionFactory factory = new ConnectionFactory();
????????// 設(shè)置RabbitMQ地址
????????factory.setHost("localhost");
????????// 創(chuàng)建一個(gè)新的連接
????????Connection connection = factory.newConnection();
????????// 創(chuàng)建一個(gè)通道
????????Channel channel = connection.createChannel();
????????// 聲明要關(guān)注的隊(duì)列
????????// channel.queueDeclare(QUEUE_NAME, false, false, true, null);
????????System.out.println("Customer Waiting Received messages");
????????// DefaultConsumer類實(shí)現(xiàn)了Consumer接口,通過(guò)傳入一個(gè)頻道,
????????// 告訴服務(wù)器我們需要那個(gè)頻道的消息六剥,如果頻道中有消息,就會(huì)執(zhí)行回調(diào)函數(shù)handleDelivery
????????Consumer consumer = new DefaultConsumer(channel) {
????????@Override
????????public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
????????????byte[] body) throws IOException {
????????????????String message = new String(body, "UTF-8");
????????????????System.out.println("Customer Received '" + message + "'");
????????????}
????????};
????????// 自動(dòng)回復(fù)隊(duì)列應(yīng)答 -- RabbitMQ中的消息確認(rèn)機(jī)制
????????channel.basicConsume(QUEUE_NAME, true, consumer);
????}
}
四峰伙、測(cè)試
? ? 1.運(yùn)行Customer消息接收者開(kāi)啟服務(wù)
? ? 2.運(yùn)行Producer消息發(fā)送者
? ? 3.可以看到Customer打印出Customer Received 'Hello RabbitMQ'