1改基,導(dǎo)入依賴包pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.edu.mq</groupId>
<artifactId>rabbitmq-java-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-java-client</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
</dependencies>
</project>
2,通過rabbitClient創(chuàng)建一個(gè)新的exchange
public static void main(String[] args) throws IOException, TimeoutException {
//常見連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//上述創(chuàng)建工廠的時(shí)候咖为,配置factory信息比較繁瑣秕狰,有多種方式
//可以配置。
//獲取連接
Connection connection = factory.newConnection();
//獲取channel躁染,所有的操作都是在channel鸣哀,一個(gè)connection可以創(chuàng)建多個(gè)channel
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","direct");
channel.close();
connection.close();
}
結(jié)果如圖:
上圖的logs 是一個(gè)沒有任何features的exchange。
通過rabbitMQ的channel吞彤,exchange我衬,queue,binding的創(chuàng)建饰恕,刪除挠羔,屬性等操作
package com.edu.mq.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 類說明:
*
* @author zhangkewei
* @date 2018/11/21下午4:40
*/
public class App {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//常見連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//獲取連接
Connection connection = factory.newConnection();
//獲取channel,所有的操作都是在channel埋嵌,一個(gè)connection可以創(chuàng)建多個(gè)channel
Channel channel = connection.createChannel();
/***************************************** exchange開始 ********************************************/
/**
* exchange的一些操作破加,創(chuàng)建,刪除雹嗦,是否存在范舀,添加是否持久化,自動(dòng)刪除俐银,類型尿背,alternate-exchange,其中重復(fù)創(chuàng)建
* 已經(jīng)存在的exchange是不會(huì)有問題的捶惜,當(dāng)重復(fù)創(chuàng)建的時(shí)候田藐,如果已經(jīng)存在,不會(huì)在創(chuàng)建
*/
channel.exchangeDeclare("logs", "direct");
//通過枚舉的方式寫type,防止自己寫的時(shí)候汽久,拼寫錯(cuò)誤鹤竭。
channel.exchangeDeclare("log.info.first", BuiltinExchangeType.DIRECT);
//其中的第三個(gè)參數(shù)表示是否持久化。
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare("log.info.two", BuiltinExchangeType.DIRECT, true);
//其中的第四個(gè)參數(shù)表示是否自動(dòng)刪除景醇。第五個(gè)參數(shù)表示是否給exchange 添加額外的exchange臀稚,當(dāng)沒有路由到隊(duì)列的時(shí)候,消息轉(zhuǎn)發(fā)到log的exchange中三痰。
Map<String, Object> arguments = new HashMap();
arguments.put("alternate-exchange", "log");
channel.exchangeDeclare("log.warn", BuiltinExchangeType.DIRECT, true, false, arguments);
//判斷某個(gè)exchange是否存在吧寺。
channel.exchangeDeclare("log", BuiltinExchangeType.DIRECT);
AMQP.Exchange.DeclareOk log = channel.exchangeDeclarePassive("log");
System.out.println(log);
//如果沒有會(huì)報(bào)錯(cuò)
// log = channel.exchangeDeclarePassive("log2");
// System.out.println(log);
// 刪除某個(gè)exchange,如果沒有這個(gè)這個(gè)exchange會(huì)報(bào)錯(cuò)
AMQP.Exchange.DeleteOk log1 = channel.exchangeDelete("log");
/**************************************** exchange結(jié)束 *******************************************/
/*———————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————*/
/***************************************** queue開始 ********************************************/
//創(chuàng)建隊(duì)列(可以重復(fù)執(zhí)行).如果已經(jīng)有該隊(duì)列散劫,而且該隊(duì)列有消息稚机,這個(gè)時(shí)候創(chuàng)建也沒有問題,不會(huì)刪除原有的隊(duì)列中的消息
//其中第一個(gè)參數(shù)是隊(duì)列名稱获搏,第二個(gè)是是否持久化赖条,第三個(gè)參數(shù)是是否是排他的(ture的時(shí)候,如果connection斷開常熙,就會(huì)消失)
//第四個(gè)參數(shù)表示是否持久化纬乍,第五個(gè)是隊(duì)列的一些屬性的設(shè)置,可以通過map設(shè)置
channel.queueDeclare("debug_queue", true, false, false, null);
//隊(duì)列是否存在裸卫,如果不存在會(huì)報(bào)錯(cuò)仿贬。
channel.queueDeclarePassive("debug_queue");
/***************************************** queue結(jié)束 ********************************************/
/*———————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————*/
/***************************************** binding開始 ********************************************/
//綁定分為兩種,1彼城,exchange和queue綁定诅蝶。2退个,exchange和exchange綁定募壕。都可以重復(fù)執(zhí)行
//將隊(duì)列綁定到exchange中,第一個(gè)參數(shù)是隊(duì)列名稱语盈,第二個(gè)參數(shù)表示exchange名稱舱馅,第三個(gè)參數(shù)表示routingKey
channel.queueBind("debug_queue","log.warn","info");
channel.exchangeBind("log.warn","log.info","log");
//逆向操作,解綁刀荒,可以重復(fù)執(zhí)行
// channel.queueUnbind("debug_queue","log.warn","info");
// channel.exchangeUnbind("log.warn","log.info","log");
/***************************************** binding結(jié)束 ********************************************/
/**
* 以上的exchange代嗤,queue,binding都有異步的操作XXXNoWait方法缠借,不等待執(zhí)行結(jié)果干毅。
*/
Thread.sleep(5000L);
channel.close();
connection.close();
}
}
執(zhí)行上面代碼之后,結(jié)果如下:
提示:有些說明在代碼中泼返。