java client的使用
本篇博客介紹RabbitMQ java client的一些簡單的api使用,如聲明Exchange,Queue,發(fā)送消息特愿,消費消息,一些高級api會在下面的章節(jié)詳細(xì)的說明勾缭。
概述
首先加入RabbitMQ java client依賴:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
</dependencies>
RabbitMQ的java client使用com.rabbitmq.client
作為其頂級包揍障。關(guān)鍵的類和接口是:
com.rabbitmq.client.Channel
com.rabbitmq.client.Connection
com.rabbitmq.client.ConnectionFactory
com.rabbitmq.client.Consumer
通過Channel可以進行一系列的api操作。 Connection(連接)用于打開通道俩由,注冊連接生命周期事件處理程序毒嫡,并關(guān)閉不再需要的連接。 Connection(連接)通過ConnectionFactory實例化幻梯,ConnectionFactory可以設(shè)置一些Collection(連接)的一些配置兜畸,比如說vhost或者說username等等。
Connections(連接)和Channels(管道)
核心的類是Connections(連接)和Channels(管道)碘梢,分別代表著AMQP 0-9-1協(xié)議中的Connections(連接)和Channels(管道)咬摇,一般被導(dǎo)入
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
連接服務(wù)器
下面的代碼時使用給定的參數(shù)(host name,端口等等)連接AMQP的服務(wù)器煞躬。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
所有的這些參數(shù)RabbitMQ服務(wù)器都設(shè)置了默認(rèn)值肛鹏,可以在ConnectionFactory類中查看這些默認(rèn)值。
另外恩沛,URI可以以下面的方法進行連接都有默認(rèn)值在扰。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection(連接)接口可以被用作創(chuàng)建一個channel(管道):
Channel channel = conn.createChannel();
可以使用channel(管道)發(fā)送和接收消息,下面會有講到雷客。
關(guān)閉連接芒珠,只需要關(guān)閉channel(管道)和connection(連接):
channel.close();
conn.close();
注意,關(guān)閉管道是被認(rèn)為是最佳實踐搅裙,但是卻不是嚴(yán)格意義的必要的妓局。當(dāng)?shù)讓拥倪B接關(guān)閉時候,channel(管道)也就自動的被關(guān)閉了呈宇。
使用Exchanges和Queues
客戶端應(yīng)用必須應(yīng)用在exchanges和queues好爬,這些都是AMQP協(xié)議定義的。使用這些(exchanges和queues)首先必須“聲明”它(就是創(chuàng)建的意思)甥啄。
下面的代碼就是怎樣去"聲明"一個exchange和隊列存炮,并且將它們綁定在一起。
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
可以通過參數(shù)去設(shè)置exchange和queue的一些屬性,使用這些方法的一些重載方法進行相關(guān)設(shè)置穆桂。
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
發(fā)送消息(Publishing messages)
使用Channel.basicPublish方法將消息發(fā)送給一個exchange:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
為了更好的控制宫盔,你可以使用重載的參數(shù)來設(shè)置消息的一些屬性(比如說mandatory標(biāo)志,關(guān)于mandatory標(biāo)志享完,下面會講到)灼芭,或者在發(fā)送消息前設(shè)定一些消息屬性。
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
可以自己構(gòu)建BasicProperties的對象般又,如下面的代碼:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
發(fā)送消息指定頭信息:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
發(fā)送一個有過期時間的消息彼绷,下面的博客也會講到:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
通道和并發(fā)注意事項(線程安全)
根據(jù)經(jīng)驗,在線程間共享Channel(通道)是要避免的茴迁。應(yīng)用應(yīng)該優(yōu)先使用每個線程自己的Channel(通道)實例寄悯,而不是多個線程共享這個Channel(通道)實例。
雖然有些在Channel(通道)上的操作是可以并發(fā)安全的調(diào)用堕义,但是一些操作不行會導(dǎo)致一些邊界交錯猜旬,雙重確認(rèn)等等。
在共享(多線程)Channel(通道)上進行并發(fā)發(fā)布會導(dǎo)致一些邊界交錯倦卖,觸發(fā)連接協(xié)議異常和連接關(guān)閉洒擦。因此需要嚴(yán)格在應(yīng)用中同步調(diào)用(Channel#basicPublish必須在正確關(guān)鍵的地方調(diào)用)。線程之間的共享也會干擾生產(chǎn)者的消息確認(rèn)怕膛。我們強烈的推薦不應(yīng)該在通道上進行并發(fā)的發(fā)布消息秘遏。
在共享的Channel(通道)上一個線程生產(chǎn)(publish)消息,一個線程消費(consume)消息是線程安全的嘉竟。
服務(wù)器推送可以同時發(fā)送邦危,保證每通道的訂閱被保留。 調(diào)度機制使用java.util.concurrent.ExecutorService
舍扰。 可以使用單列的ConnectionFactory
調(diào)用ConnectionFactory#setSharedExecutor
去設(shè)置所有連接共用的executor
倦蚪。
當(dāng)我們手動確認(rèn)manual acknowledgements 的時候,很重要的是考慮什么線程去做這個ack確認(rèn)边苹。如果接收傳遞的線程(例如陵且,Consumer#handleDelivery委托給不同線程的傳遞處理)不同于手動確認(rèn)的線程,則將多個線程參數(shù)設(shè)置為true是線程不安全的并導(dǎo)致雙重確認(rèn)个束,因此導(dǎo)致通道協(xié)議異常導(dǎo)致Channel關(guān)閉慕购。一次確認(rèn)一條消息可以確保安全的。
訂閱消息("Push API")
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
最有效的接收消息的方法是使用Consumer接口去訂閱茬底。當(dāng)消息到達(dá)消費端的時候會自動的傳遞消費(delivered)沪悲,而不需要去請求。
當(dāng)我們調(diào)用Consumers(消費者)有關(guān)的api的時候阱表,會生成一個消費者標(biāo)識符(consumer tag)殿如。
不同的Consumer實例必須有不同的消費者標(biāo)簽贡珊。 強烈建議不要在連接上重復(fù)使用消費者標(biāo)簽,不然在監(jiān)視消費者時可能導(dǎo)致自動連接恢復(fù)和混淆監(jiān)控數(shù)據(jù)的問題涉馁。
實現(xiàn)Consumer的最簡單的方法是將便利(convenience)類DefaultConsumer子類化门岔。 該子類的對象可以在basicConsume方法調(diào)用中傳遞以設(shè)置訂閱:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
在這里,因為我們設(shè)置了自動確認(rèn)(autoAck
)的值為false烤送,所以有必要在傳遞給消費者的方法中進行自動確認(rèn)(handleDelivery
方法中)寒随。
更復(fù)雜的消費者將會重寫更多的方法。事實上帮坚,handleShutdownSignal
方法被調(diào)用當(dāng)Channel(通道)和連接關(guān)閉的時候妻往。并且在調(diào)用該消費者的任何回調(diào)方法之前將consumer tag
傳遞給handleConsumeOk
(com.rabbitmq.client.Consumer接口中定義的方法)方法
消費者還可以分別實現(xiàn)handleCancelOk
(com.rabbitmq.client.Consumer接口中定義的方法)和handleCancel
(com.rabbitmq.client.Consumer接口中定義的方法)方法來通知顯式和隱式取消。
你也可以使用Channel.basicCancel方法明確的取消一個特定的消費叶沛,傳遞consumer tag蒲讯,
channel.basicCancel(consumerTag);
和生產(chǎn)者一樣忘朝,對于消費者來說并發(fā)處理消息也要慎重考慮灰署。
回調(diào)給消費者是在與實例化其Channel
(管道)的線程分開的線程池中調(diào)度的。 這意味著消費者可以安全地在Connection
或Channel
上調(diào)用阻塞方法局嘁,例如Channel#queueDeclare
或Channel#basicCancel
溉箕。
每一個Channel
(管道)都有自己的調(diào)度線程。對于最常用的使用方式就是一個消費者一個Channel
(管道)悦昵,意味著一個消費者不會阻塞其他的消費肴茄。如果是一個Channel
(管道)多消費者必須明白一個長時間的消費調(diào)用可能會阻塞其他消費者的回調(diào)調(diào)度。
翻譯未完待續(xù)......
demo
通過ConnectionFactory獲得Connection但指,Connection得到Channel
public class ExchangeTest {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//創(chuàng)建exchange寡痰,類型是direct類型
channel.exchangeDeclare("zhihao.miao","direct");
//創(chuàng)建exchange,類型是direct類型
channel.exchangeDeclare("zhihao.miao.info", BuiltinExchangeType.DIRECT);
//第三個參數(shù)表示是否持久化棋凳,同步操作拦坠,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("zhihao.miao.debug",BuiltinExchangeType.DIRECT,true);
System.out.println(ok);
//設(shè)置屬性
Map<String,Object> argument = new HashMap<>();
argument.put("alternate-exchange","log");
channel.exchangeDeclare("zhihao.miao.warn",BuiltinExchangeType.TOPIC,true,false,argument);
//異步創(chuàng)建exchange,沒有返回值
channel.exchangeDeclareNoWait("zhihao.miao.log",BuiltinExchangeType.TOPIC,true,false,false,argument);
//判斷exchange是否存在,存在的返回ok剩岳,不存在的exchange則報錯
/*
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("zhihao.miao.info");
System.out.println(declareOk);
declareOk = channel.exchangeDeclarePassive("zhihao.miao.info2");
System.out.println(declareOk);
*/
//刪除exchange(可重復(fù)執(zhí)行)贞滨,刪除一個不存在的也不會報錯
channel.exchangeDelete("zhihao.miao");
channel.exchangeDelete("zhihao.miao.debug");
channel.exchangeDelete("zhihao.miao.info");
channel.exchangeDelete("zhihao.miao.warn");
//刪除exchange
channel.exchangeDelete("zhihao.miao.log");
channel.close();
connection.close();
}
}
隊列的api操作。
public class QueueTest {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二個參數(shù)表示是否持久化拍棕,第三個參數(shù)是判斷這個隊列是否在連接是否生效晓铆,為true表示連接關(guān)閉隊列刪除。
AMQP.Queue.DeclareOk ok = channel.queueDeclare("zhihao.info",true,false,false,null);
System.out.println(ok);
//異步?jīng)]有返回值的方法api
channel.queueDeclareNoWait("zhihao.info.miao",true,false,false,null);
//判斷queue是否存在绰播,不存在會拋出異常
//channel.exchangeDeclarePassive("zhihao.info");
//拋出錯誤
//channel.exchangeDeclarePassive("zhihao.info.miao2");
//exchange和queue進行綁定(可重復(fù)執(zhí)行骄噪,不會重復(fù)創(chuàng)建)
channel.queueBind("zhihao.info","zhihao.miao.order","info");
//異步進行綁定
channel.queueBindNoWait("zhihao.info.miao","zhihao.miao.pay","info",null);
//exchange與exchange進行綁定(可重復(fù)執(zhí)行,不會重復(fù)創(chuàng)建)
channel.exchangeBind("zhihao.miao.email","zhihao.miao.weixin","debug");
//exchange和queue進行解綁(可重復(fù)執(zhí)行)
channel.queueUnbind("zhihao.info","zhihao.miao.order","info");
//exchange和exchange進行解綁(可重復(fù)執(zhí)行)
channel.exchangeUnbind("zhihao.info.miao","zhihao.miao.pay","debug");
//刪除隊列
channel.queueDelete("zhihao.info");
channel.close();
connection.close();
}
}
消息的發(fā)送:
public class Sender {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
//第一個參數(shù)是exchange參數(shù)蠢箩,如果是為空字符串腰池,那么就會發(fā)送到(AMQP default)默認(rèn)的exchange尾组,而且routingKey
//便是所要發(fā)送到的隊列名
channel.basicPublish("","zhihao.info.miao",properties,"忘記密碼,驗證碼是1234".getBytes());
channel.basicPublish("","zhihao.miao",properties,"忘記密碼示弓,六位驗證密碼是343sdf".getBytes());
//direct類型的exchange類型的exchange讳侨,zhihao.miao.order綁定zhihao.info.miao隊列,route key是order
channel.basicPublish("zhihao.miao.order","order",properties,"愛奇藝會員到期了".getBytes());
//zhihao.miao.pay綁定zhihao.info.miao隊列奏属,route key是order
channel.basicPublish("zhihao.miao.pay","pay",properties,"優(yōu)酷會員到期了".getBytes());
//topic類型的exchange
channel.basicPublish("log","user.log",properties,"你的外賣已經(jīng)送達(dá)".getBytes());
channel.basicPublish("log","user.log.info",properties,"你的外賣正在配送中".getBytes());
channel.basicPublish("log","user",properties,"你的投訴已經(jīng)采納".getBytes());
channel.close();
connection.close();
}
}
消息消費:
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
//客戶端的消費消息
Map<String,Object> clientProperties = new HashMap<>();
clientProperties.put("desc","支付系統(tǒng)2.0");
clientProperties.put("author","zhihao.miao");
clientProperties.put("user","zhihao.miao@xxx.com");
connectionFactory.setClientProperties(clientProperties);
//給客戶端的connetction命名
Connection connection = connectionFactory.newConnection("log隊列的消費者");
//給channel起個編號
Channel channel = connection.createChannel(10);
//返回consumerTag跨跨,也可以通過重載方法進行設(shè)置consumerTag
String consumerTag = channel.basicConsume("user_log_queue",true,new SimpleConsumer(channel));
System.out.println(consumerTag);
TimeUnit.SECONDS.sleep(30);
channel.close();
connection.close();
}
}
具體的消息邏輯,繼承DefaultConsumer類重寫handleDelivery方法囱皿,如果是手工確認(rèn)消息勇婴,會在handleDelivery方法中進行相關(guān)的確認(rèn)(調(diào)用相關(guān)api),下面會在確認(rèn)消息博客中去詳細(xì)講解這個嘱腥。
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
}
}