關(guān)鍵詞
JMS、ActiveMQ(ActivityMQ)寂诱、Apollo癣缅、MQTT、Android
摘要
由于項(xiàng)目開發(fā)需要岂傲,涉及到 Android 客戶端接收來自 JMS 中間件的消息推送难裆,本文以學(xué)習(xí)過程為線索,進(jìn)行記錄镊掖。
目錄
一乃戈、簡述 JMS (引出 ActiveMQ、Apollo)
二亩进、MQTT 協(xié)議
三症虑、MQTT 服務(wù)器搭建
四、 MQTT Android 客戶端具體實(shí)現(xiàn)
—— 1归薛、添加依賴
—— 2谍憔、添加權(quán)限
—— 3、基本字段說明
—— 4主籍、注冊Service
—— 5习贫、Android 端具體實(shí)現(xiàn)
正文
一、簡述 JMS (引出 ActiveQM千元、Apollo)
企業(yè)消息系統(tǒng)(Java Message Service)
又稱之為面向消息的中間件(MOM)苫昌,提供了應(yīng)用程序之間,異步數(shù)據(jù)的存儲和轉(zhuǎn)發(fā)幸海,即應(yīng)用程序彼此不直接通信祟身,而是與作為中介的 MOM 通信屋厘。應(yīng)用程序開發(fā)人員無需了解消息的發(fā)送和協(xié)議的細(xì)節(jié)。
過程如下圖月而,應(yīng)用程序 A 只需要將 Message 發(fā)送到服務(wù)器上汗洒,然后應(yīng)用程序 B 即可從服務(wù)器中接收到 A 發(fā)來的消息,由此可知 JMS 具有提供消息靈活性父款,松散耦合等優(yōu)點(diǎn)溢谤。
- JMS 由 SUN 提出,是一系列的接口及相關(guān)語義的集合憨攒,通過這些接口和和其中的方法世杀,JMS 客戶端可以訪問消息系統(tǒng),完成消息的創(chuàng)建肝集、發(fā)送瞻坝、接收及讀取。
- JMS 通過 MOM 為 Java 程序提供了一個發(fā)送和接收消息的標(biāo)準(zhǔn)杏瞻。根據(jù) JMS 編寫的客戶端程序可以訪問任何實(shí)現(xiàn) JMS 標(biāo)準(zhǔn)的 MOM所刀。
JMS兩種消息模型:
發(fā)送消息的客戶端:生產(chǎn)者, 接收消息的客戶端:消費(fèi)者捞挥, MOM :消息傳遞系統(tǒng)
點(diǎn)到點(diǎn)(P2P)消息傳遞模型
- 生產(chǎn)者發(fā)送消息到MOM的一個特定隊列(Queue)中浮创,而消費(fèi)者從一個消息隊列中得到消息。
- 每條消息只有一個消費(fèi)者砌函,如果一條消息被消息者接收斩披,那么其他的消費(fèi)者就不能得到這條消息了。
- 收到消息后消費(fèi)者必須確認(rèn)消息已被接收讹俊,否則消息傳遞系統(tǒng)會認(rèn)為該消息沒有被接收垦沉,那么這條消息仍然可以被其他人接收(程序可以自動進(jìn)行確認(rèn),不需要人工干預(yù))仍劈。
發(fā)布/訂閱(Pub/Sub)消息傳遞模型
- 發(fā)布/訂閱傳遞消息類型與主題(Topic)有關(guān)厕倍。生產(chǎn)者發(fā)布消息,而消費(fèi)者訂閱感興趣的消息耳奕,生產(chǎn)者將消息和一個特定的主題(Topic)連在一起绑青,消息傳遞系統(tǒng)根據(jù)消費(fèi)者注冊的興趣,將消息傳遞給消費(fèi)者屋群。這種類型非常類似出版報紙住闯、雜志的形式煌珊。
- 每個消息都可以有多個(0遭商,1宪巨,……)訂閱者。即每條消息可以有多個消費(fèi)者,如果報紙和雜志一樣庇楞。
- 訂閱者只能消費(fèi)他們訂閱之后出版的消息榜配。這就要求訂閱者必須先訂閱,再等待生產(chǎn)者的運(yùn)行吕晌,發(fā)布蛋褥。
- 訂閱者必須保持為活動狀態(tài)才能獲得這些消息,即訂閱者必須保持活動狀態(tài)等待發(fā)布者發(fā)布的消息睛驳。
關(guān)于 JMS 更詳細(xì)的介紹烙心,推薦閱讀下面的貼,更加深入淺出乏沸,通俗易懂
深入淺出JMS(一)——JMS簡介
深入淺出JMS(二)——JMS的組成
ActiveMQ (或ActivityMQ)
由 Apache 貢獻(xiàn)的 ActiveMQ 正是 MOM 中優(yōu)秀的一員淫茵。它是一個非常流行、強(qiáng)大蹬跃、開源的消息和集成模式服務(wù)器匙瘪,速度快、支持多種跨語言客戶端和協(xié)議蝶缀,易于使用企業(yè)集成模式丹喻,擁有許多先進(jìn)的特性,完全支持 JMS 1.1和 J2EE 1.4 規(guī)范扼劈。
Apollo
是以 ActiveMQ5.x 為基礎(chǔ)驻啤,采用全新的線程和消息調(diào)度架構(gòu)重新實(shí)現(xiàn)的 MOM,針對多核處理器進(jìn)行了優(yōu)化處理荐吵,它的速度更快、更可靠赊瞬、更易于維護(hù)先煎。Apollo 與 ActiveMQ 一樣支持多協(xié)議:STOMP、AMQP巧涧、MQTT薯蝎、Openwire、 SSL谤绳、WebSockets占锯,本文只介紹 MQTT 協(xié)議的 Android 客戶端使用。
二缩筛、MQTT 協(xié)議
1消略、 Android 端實(shí)現(xiàn)消息推送的幾種方式
- 輪詢:客戶端定時向服務(wù)器請求數(shù)據(jù),屬于偽推送瞎抛。缺點(diǎn):費(fèi)電艺演,費(fèi)流量。
- XMPP:是基于可擴(kuò)展標(biāo)記語言(XML)的協(xié)議。缺點(diǎn):XML 格式的胎撤,冗余很大晓殊,花費(fèi)流量。
- MQTT:由 IBM 開發(fā)的傳輸協(xié)議伤提,它被設(shè)計用于輕量級的發(fā)布/訂閱式消息傳輸巫俺,旨在為低帶寬和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中的設(shè)備提供可靠的網(wǎng)絡(luò)服務(wù)。相比于 XMPP 等傳統(tǒng)協(xié)議肿男,MQTT 是專門針對移動互聯(lián)網(wǎng)開發(fā)的輕量級傳輸協(xié)議介汹,這種傳輸協(xié)議連接穩(wěn)定、心跳數(shù)據(jù)包小次伶,所以具備耗電量低痴昧、耗流量低的優(yōu)勢。推送服務(wù)的最佳協(xié)議冠王!
2赶撰、 MQTT 協(xié)議簡介
MQTT官網(wǎng):http://mqtt.org/
MQTT介紹:http://www.ibm.com
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html
3、 Apollo + MQTT 協(xié)議的消息推送特征
- Apollo 使用 MQTT 協(xié)議柱彻,加上 Android 上的 paho 包豪娜,即可簡單實(shí)現(xiàn)消息通知功能,但是 MQTT 協(xié)議僅支持主題消息廣播(Topic)哟楷,即發(fā)布/訂閱消息傳遞模式瘤载,而且不能用Selector,不能直接實(shí)現(xiàn)點(diǎn)對點(diǎn)的消息投遞卖擅。
- 利用上述的特征鸣奔,可將 Android 客戶端劃分為眾多的部落,創(chuàng)建不同的主題惩阶, 針對特定訂閱者群體進(jìn)行消息傳遞挎狸。該特征特別適用于智能家居等物聯(lián)網(wǎng)系統(tǒng)。
三断楷、MQTT 服務(wù)器搭建
1锨匆、下載Apollo服務(wù)器,解壓(免安裝的)冬筒。
2恐锣、進(jìn)入解壓后文件夾的 bin 目錄下(例: E:\MQTT\apache-apollo-1.7.1\bin),按住 Shift 鍵舞痰,點(diǎn)擊鼠標(biāo)右鍵選擇 "在此處打開命令窗口"土榴;
3、在命令窗口匀奏,輸入 apollo create 服務(wù)器實(shí)例名稱(例:apollo create mybroker)鞭衩,之后會在 bin 目錄下創(chuàng)建該名稱的文件夾学搜。該文件夾中, etc\apollo.xml 文件是配置服務(wù)器信息的文件论衍。etc\users.properties 文件包含連接 MQTT 服務(wù)器時用到的用戶名和密碼瑞佩,默認(rèn)為 admin=password,即賬號為admin坯台,密碼為 password炬丸,可自行更改。
4蜒蕾、在命令窗口稠炬,輸入 cd xxx/bin, 進(jìn)入該實(shí)例的bin目錄下咪啡,執(zhí)行 apollo-broker.cmd run 命令首启,開啟服務(wù)器,看到如下界面代表搭建完成撤摸。
之后在瀏覽器輸入 http://127.0.0.1:61680/毅桃,查看是否安裝成功。
四准夷、 MQTT Android 客戶端具體實(shí)現(xiàn)
1钥飞、在 module 的 build.gradle 添加依賴
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-releases/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
}
2、添加權(quán)限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
3衫嵌、基本字段說明
topic:中文意思是“話題”读宙。在 MQTT 中訂閱了( subscribe )同一話題(topic)的客戶端會同時收到消息推送。直接實(shí)現(xiàn)了“群聊”功能楔绞。
clientId:客戶身份唯一標(biāo)識结闸。
qos:服務(wù)質(zhì)量。
retained:要保留最后的斷開連接信息酒朵。
MqttAndroidClient#subscribe():訂閱某個話題膀估。
MqttAndroidClient#publish(): 向某個話題發(fā)送消息,之后服務(wù)器會推送給所有訂閱了此話題的客戶耻讽。
userName:連接到MQTT服務(wù)器的用戶名。
passWord :連接到MQTT服務(wù)器的密碼帕棉。
4针肥、注冊Service
<!-- Mqtt Service -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
<service android:name="com.mqtt.demo.MQTTService"/>
5、Android 端具體實(shí)現(xiàn)
public class MQTTService extends Service {
public static final String TAG = MQTTService.class.getSimpleName();
private static MQTTService instance;
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String host = "tcp://192.168.7.31:61613";
private String userName = "admin";
private String passWord = "password";
private static String myTopic = "topic";
private String clientId = "test2";
@Override
public void onCreate() {
super.onCreate();
instance = this;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onDestroy() {
instance = null;
try {
client.disconnect();
client.unregisterResources();
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
public static void publish(String msg){
String topic = myTopic;
Integer qos = 0;
Boolean retained = false;
try {
client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
// MQTT監(jiān)聽連接情況
private IMqttActionListener iMqttActionListener = new ActionListener();
// MQTT監(jiān)聽并且接受消息
private MqttCallback mqttCallback = new CallBack();
private void init() {
clientId = MacAddressUtil.getLocalMacAddress(this);
// 服務(wù)器地址(協(xié)議+地址+端口號)
String uri = host;
client = new MqttAndroidClient(this, uri, clientId);
// 設(shè)置MQTT監(jiān)聽并且接受消息
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
// 清除緩存
conOpt.setCleanSession(true);
// 設(shè)置超時時間香伴,單位:秒
conOpt.setConnectionTimeout(10);
// 心跳包發(fā)送間隔慰枕,單位:秒
conOpt.setKeepAliveInterval(20);
// 用戶名
conOpt.setUserName(userName);
// 密碼
conOpt.setPassword(passWord.toCharArray());
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + clientId + "\"}";
String topic = myTopic;
Integer qos = 0;
Boolean retained = false;
if (TextUtils.isEmpty(message) || TextUtils.isEmpty(topic)) {
// 最后發(fā)送的消息
try {
conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
/** 連接MQTT服務(wù)器 */
private void doClientConnection() {
if (!client.isConnected() && isConnectIsNomarl()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/** 判斷網(wǎng)絡(luò)是否連接 */
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "MQTT當(dāng)前網(wǎng)絡(luò)名稱:" + name);
return true;
} else {
Log.i(TAG, "MQTT 沒有可用網(wǎng)絡(luò)");
return false;
}
}
public static boolean isConnect(){
if (instance != null && client != null){
return instance.client.isConnected();
}
return false;
}
public static void connect(){
if (instance != null && client != null){
instance.doClientConnection();
}
}
public static void disconnect(){
if (instance != null){
try {
instance.client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
public static class ActionListener implements IMqttActionListener {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "連接成功 ");
try {
// 訂閱myTopic話題
client.subscribe(myTopic,1);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
// 連接失敗,重連
Log.i("mtqq", "onFailure");
}
}
public static class CallBack implements MqttCallback{
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String str1 = new String(message.getPayload());
MQTTMessage msg = new MQTTMessage();
msg.setMessage(str1);
EventBus.getDefault().post(msg);
String str2 = "topic:" + topic + ", qos:" + message.getQos() + ", retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
Log.i("mtqq", "deliveryComplete");
}
@Override
public void connectionLost(Throwable arg0) {
// 失去連接即纲,重連
String message = "null";
if (arg0 != null) {
message = arg0.getMessage();
}
Log.i("mtqq", "connectionLost:"+message);
}
}
}
5具帮、手機(jī)顯示
6、服務(wù)端顯示
打開服務(wù)端http://127.0.0.1:61680/,看到的是這個樣子
7蜂厅、例子代碼下載
https://github.com/jkjk66/AndroidMQTT.git