說明
RabbitMQ中MQ是MessageQueue的簡寫掂碱,整體的意思應該是像兔子一樣快的高效消息中間件鼻百,組件本質上就是生產者消費者模式愁茁,由一邊接受消息岗照,另一邊轉發(fā)消息村象,其中可以對消息進行緩存,轉發(fā)或清除攒至。
一般實際中使用情況是推送和IM(及時通訊)
如何使用
首先要含有封裝底層的jar包厚者,下載RabbitMQ-java-Client,下載之后放在AndroidStudio項目中l(wèi)ibs包中編譯迫吐,或者在module中build.gradle中添加依賴:
dependencies {
compile 'com.rabbitmq:amqp-client:4.2.0'
}
RabbitMQ是由pivotal公司開源的一個項目库菲,在Github上可以查看的源碼。
需要訪問網(wǎng)絡志膀,不要忘記在清單文件中添加網(wǎng)絡權限:
<uses-permission android:name="android.permission.INTERNET"/>
一張簡易的圖表示工作原理:
連接
首先配置連接服務端基本信息:
ConnectionFactory factory = new ConnectionFactory(); //創(chuàng)建連接工廠
//設置服務端連接認證:ip號熙宇,端口號鳖擒,登錄名,密碼
factory.setHost(“HOST_ID");
factory.setPort(PORT);
factory.setUsername("USERNAME");
factory.setPassword("PASSWORD")
factory.setAutomaticRecoveryEnabled(true); //設置自動恢復連接
factory.setNetworkRecoveryInterval(5000); //設置自動連接間隔(毫秒)
以上都是一些基本配置奇颠,設置服務端上認證信息败去,設置恢復連接機制,在已經(jīng)連接過之后斷開連接烈拒,會主動嘗試連接服務圆裕,直至連接上服務或者關閉連接工廠。
如果第一沒有連接上荆几,則會報錯吓妆。需要循環(huán)創(chuàng)建連接,直至第一次連接很成功之后才會自動連接吨铸,包括所有的通道(Channel)行拢。
現(xiàn)在有了連接工廠,創(chuàng)建一個連接就夠多個通道使用诞吱,也是高效率的方式:
Connection connection = factory.newConnection();
注意這個需要在子線程中執(zhí)行舟奠,否則會報錯誤: Android.os.NetworkOnMainThreadException。
發(fā)送消息
- 建立通道及配置
- 發(fā)送消息
Channel channelSend = connection.createChannel();
//第一種方式房维,直接指定消費隊列
1.channelSend.queueDeclare(queueName, false, false, false, null);
channelSend.basicPublish("", queueName, null, message.getBytes());
//第二種方式沼瘫,指定轉換器和綁定密鑰(routingkey)
2.channelSend.exchangeDeclare(EXCHANGENAME, "topic"); //類型四種:fanout,direct咙俩,topic耿戚,handers
channelSend.basicPublish(EXCHANGENAME, routingkey, null, message.getBytes());
就是這么簡單,但還是需要簡單說明阿趁。
完整的流程:發(fā)送者->轉換器->隊列->接受者
第一種情況膜蛔,雖然沒有聲明轉換器,但是會使用匿名轉換器脖阵,發(fā)送到routingKey為隊列名的隊列中皂股。
第二種情況,只說明轉換器的四種類型特點:
- fanout類型:忽略routingkey的值發(fā)送給所有綁定該類型的隊列中,
- direct類型:根據(jù)routingkey的值發(fā)送給匹配該值的隊列中命黔,
- topic類型:它的routingKey可以使用*和#來表示呜呐,前者表示一個連著的單詞,如work纷铣;后者表示零個或多個連著的單詞卵史,如work.first战转。只要發(fā)送消息的routingKey匹配接受者設置綁定exchange的routingKey的值搜立,接受者隊列就可以收到到由exchange發(fā)送的消息。
- headers類型:以匹配鍵值對的形式發(fā)送和接受消息槐秧。匹配有兩種方式all和any啄踊。用的比較少忧设,所以就沒有深入了解。
接受消息
- 建立通道及配置
- 設置監(jiān)聽通道消息
Channel channel = connection.createChannel();
//不定義裝換器(默認匿名裝換器)颠通,命名隊列址晕,獲取實時和緩存在隊列中的消息
1. final Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, queueName + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應答
}
});
//自定義裝換器名稱和類型,匿名隊列顿锰,監(jiān)聽符合routingKey的消息谨垃,只能獲取實時消息
2. final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, routingKey + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應答
}
});
//自定義裝換器名稱和類型,命名隊列硼控,監(jiān)聽符合routingKey的消息刘陶,獲取實時和緩存在隊列中的消息
3. final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey); //設置綁定
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, routingKey + queueName + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應答
}
});
以上是主要的三種監(jiān)聽消息方式:
第一種,匿名轉換器牢撼,命名隊列匙隔,監(jiān)聽隊列中的消息。只能一對一發(fā)送并且接受熏版,局限比較大纷责。
第二種,指定轉換器類型撼短,匿名隊列再膳,監(jiān)聽實時routingKey的消息,篩選出匹配routingKey消息進行發(fā)送阔加,所以有可能發(fā)送到多個隊列中饵史。每次連接都是獲取新的空的隊列,并且失去連接后隊列將被刪除胜榔,所以只能獲取實時的消息胳喷。
第三種,指定裝換器類型夭织,命名隊列吭露,監(jiān)聽隊列中routingKey的消息,篩選出匹配routingKey消息進行發(fā)送尊惰,所以有可能發(fā)送到多個隊列中讲竿。獲取實時和緩存在隊列中的消息
匿名隊列的特點:
- 一旦被創(chuàng)建就是一個新的空的隊列
- 一旦失去消費者連接該隊列就會被服務端刪除
命名隊列特點:
- 默認循環(huán)均勻發(fā)送消息給多個消費者,但是可以設置channel.basicQos(1)高效的發(fā)送消息給多個消費者弄屡。
- 消息應答機制题禀,服務端確認消息被銷毀才會移除該消息。
- 隊列和消息持久化膀捷,在服務端奔潰或者重啟是可以保存隊列和消息迈嘹。
- 一個消息只能被一個消費者消費一次。
關閉
if (channelSend != null && channelSend.isOpen()) {
try {
channelSend.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (channelReceive != null && channelReceive.isOpen()) {
try {
channelReceive.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
封裝RabbitMQ-Android的簡單使用,歡迎下載使用秀仲,并且反饋融痛。
注:這是RabbitMQ-java版Client的指導教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊列的使用
第三篇Publish/Subscribe介紹轉換器以及其中fanout類型
第四篇Routing介紹direct類型轉換器
第五篇Topics介紹topic類型轉換器
第六篇RPC介紹遠程調用