前言
主要講下Android如何使用MQTT通訊。用到的軟件或者框架有:
EMQ:https://www.emqx.io/cn/
org.eclipse.paho的MQTT通訊框架:https://github.com/eclipse/paho.mqtt.android
如果已經(jīng)有MQTT相關(guān)服務(wù)胞谭,可以跳過(guò)第一項(xiàng)抵乓,從第二項(xiàng)開(kāi)始看听绳。
一失暴、安裝EMQ及啟動(dòng)EMQ
1.安裝所需要的依賴(lài)包
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
2.使用以下命令設(shè)置穩(wěn)定存儲(chǔ)庫(kù),以 CentOS7 為例
$ sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
3.安裝最新版本的 EMQ X
$ sudo yum install emqx
4.安裝特定版本的 EMQ X
- 查詢(xún)可用版本
$ yum list emqx --showduplicates | sort -r
emqx.x86_64 3.1.0-1.el7 emqx-stable
emqx.x86_64 3.0.1-1.el7 emqx-stable
emqx.x86_64 3.0.0-1.el7 emqx-stable
- 根據(jù)第二列中的版本字符串安裝特定版本棕洋,例如 3.1.0
$ sudo yum install emqx-3.1.0
5.啟動(dòng) EMQ X
- 直接啟動(dòng)
$ emqx start
emqx 3.1.0 is started successfully!
$ emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v3.1.0 is running
- systemctl 啟動(dòng)
$ sudo systemctl start emqx
- service 啟動(dòng)
$ sudo service emqx start
EMQ管理后臺(tái)
地址:xxx.xxx.xxx:18083挡闰,地址為服務(wù)器ip或者域名,端口為18083端口
二掰盘、Android使用MQTT
1.在Android中導(dǎo)入依賴(lài)
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
項(xiàng)目地址:https://github.com/eclipse/paho.mqtt.android
2.創(chuàng)建MQTT連接的一個(gè)Service
public class MqttMessageService extends Service {
/**
* 訂閱的主題
*/
private String subTopic;
@Override
public void onCreate() {
super.onCreate();
//初始化mqtt配置
initMqtt();
//連接mqtt
connectMqtt();
}
/**
* 連接mqtt
*/
private void connectMqtt() {
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogUtils.e("MQTT連接失斏忝酢!@⒉丁I菅薄!" + exception.getCause());
new Thread(new Runnable() {
@Override
public void run() {
LogUtils.e("30s后次绘,嘗試重新連接" );
try {
Thread.sleep(1000*30);
} catch (InterruptedException e) {
e.printStackTrace();
}
connectMqtt();
}
}).start();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* mqtt初始化
*/
private void initMqtt() {
try {
//寫(xiě)上自己的url
String uri = "";
subTopic = "topic_test" ;
mqttAndroidClient = new MqttAndroidClient(
getApplicationContext(),
uri, "test");
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
LogUtils.i("MQTT重新連接成功瘪阁!serverURI:" + serverURI);
} else {
LogUtils.i("MQTT連接成功撒遣!serverURI:" + serverURI);
}
subscribeAllTopics();
}
@Override
public void connectionLost(Throwable cause) {
LogUtils.i("MQTT連接斷開(kāi)!" + cause.getCause());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
LogUtils.i("收到了消息:"+topic+ message.toString()+ message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
// MqttMessage mqttMessage = token.getMessage();
// LogUtils.i("消息發(fā)送成功:" + mqttMessage.toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
});
// 新建連接設(shè)置
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(MqttConfig.EMQ_USERNAME);
mqttConnectOptions.setPassword(MqttConfig.EMQ_PASSWORD.toCharArray());
//斷開(kāi)后管跺,是否自動(dòng)連接
mqttConnectOptions.setAutomaticReconnect(true);
//是否清空客戶(hù)端的連接記錄义黎。若為true,則斷開(kāi)后豁跑,broker將自動(dòng)清除該客戶(hù)端連接信息
mqttConnectOptions.setCleanSession(true);
//設(shè)置超時(shí)時(shí)間廉涕,單位為秒
mqttConnectOptions.setConnectionTimeout(15);
//心跳時(shí)間,單位為秒艇拍。即多長(zhǎng)時(shí)間確認(rèn)一次Client端是否在線(xiàn)
mqttConnectOptions.setKeepAliveInterval(30);
//允許同時(shí)發(fā)送幾條消息(未收到broker確認(rèn)信息)
mqttConnectOptions.setMaxInflight(30);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 訂閱所有主題
*/
private void subscribeAllTopics() {
//訂閱主消息主題和更新消息主題
subscribeToTopic(subTopic , 2);
}
/**
* 訂閱一個(gè)主主題
*
* @param subTopic 主題名稱(chēng)
*/
private void subscribeToTopic(String subTopic, int qos) {
try {
mqttAndroidClient.subscribe(subTopic, qos, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.i("MQTT訂閱消息成功:" + subTopic);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogUtils.d("MQTT訂閱消息失敽伞!" + subTopic);
}
});
} catch (MqttException ex) {
LogUtils.d("subscribeToTopic: Exception whilst subscribing");
ex.printStackTrace();
}
}
/**
* 發(fā)布主題
*
* @param topic 主題
* @param msg 內(nèi)容
* @param qos qos
*/
public void publishMessage(String topic, String msg, int qos) {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
try {
LogUtils.d("publishMessage: 發(fā)送" + msg);
mqttAndroidClient.publish(topic, msg.getBytes(), qos, false);
} catch (Exception e) {
LogUtils.e("publishMessage: Error Publishing: " + e.getMessage());
e.printStackTrace();
}
} else {
LogUtils.e("publishMessage失敗卸夕,MQTT未連接 ");
}
}
public void publishMessage(String topic, String msg) {
publishMessage(topic, msg, 2);
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
return Service.START_NOT_STICKY;
}
@Override
public void onDestroy() {
LogUtils.e("關(guān)閉MQTT");
//斷開(kāi)mqtt連接
try {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
mqttAndroidClient.disconnect();
mqttAndroidClient.unregisterResources();
}
} catch (Exception e) {
e.printStackTrace();
}
super.onDestroy();
}
}