Android mqtt 客戶端實現(xiàn)一般使用以下兩個庫:
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
這兩個庫是eclipse 公司開發(fā)維護的粘咖。
github地址:https://github.com/eclipse/paho.mqtt.android
一般Android 客戶端的連接代碼可以這樣寫:
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClient {
private static MqttClient instance;
private Context context;
//單例模式
public static MqttClient getInstance(Context context) {
if (null == instance) {
synchronized (MqttClient.class) {
instance = new MqttClient(context);
}
}
return instance;
}
private MqttClient(Context context) {
this.context = context.getApplicationContext();
}
//聲明一個MQTT客戶端對象
private MqttAndroidClient mMqttClient;
private static final String TAG = "MqttClient";
//連接到服務器
private void connectMQTT() {
//連接時使用的clientId, 必須唯一, 一般加時間戳
String clientId = "xxxx" + System.currentTimeMillis();
mMqttClient = new MqttAndroidClient(context, "tcp://xxxxhost:xxxxport", clientId);
//連接參數(shù)
MqttConnectOptions options;
options = new MqttConnectOptions();
//設置自動重連
options.setAutomaticReconnect(true);
// 緩存,
options.setCleanSession(true);
// 設置超時時間楣铁,單位:秒
options.setConnectionTimeout(15);
// 心跳包發(fā)送間隔,單位:秒
options.setKeepAliveInterval(15);
// 用戶名
options.setUserName("username");
// 密碼
options.setPassword("password".toCharArray());
// 設置MQTT監(jiān)聽
mMqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connectionLost: 連接斷開");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "消息到達");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
try {
//進行連接
mMqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "onSuccess: 連接成功");
try {
//連接成功后訂閱主題
mMqttClient.subscribe("some topic", 2);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d(TAG, "onFailure: 連接失敗");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
幾個重要參數(shù)的介紹:
這里最重要的是對幾個關鍵參數(shù)的理解缀旁。
-
options.setAutomaticReconnect(true);
先看官方的解釋:
Sets whether the client will automatically attempt to reconnect to the server if the connection is lost.
If set to false, the client will not attempt to automatically reconnect to the server in the event that the connection is lost.
If set to true, in the event that the connection is lost, the client will attempt to reconnect to the server. It will initially wait 1 second before it attempts to reconnect, for every failed reconnect attempt, the delay will double until it is at 2 minutes at which point the delay will stay at 2 minutes.
設置為true表示支持自動重連记劈。 這里的重連機制是:先1秒之后嘗試重連,然后翻倍2秒后再嘗試重連,最后一直穩(wěn)定重連間隔時間為2分鐘。
-
options.setCleanSession(true);
這個標志是標志客戶端并巍,服務端是否要保持持久化的一個標志目木。默認是
true
。這個標志理解起來并不容易先看官方的解釋:
Sets whether the client and server should remember state across restarts and reconnects.
If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
The server will treat a subscription as durable.
If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
The server will treat a subscription as non-durable
設置客戶端和服務端重啟或重連后是否需要記住之前的狀態(tài)懊渡。
當setCleanSession為true
時刽射,客戶端掉線或服務端重啟后,服務端和客戶端會清掉之前的 session剃执, 重連后客戶端會有一個新的session誓禁。離線期間發(fā)來QoS=0,1,2的消息一律接收不到,且所有之前訂閱的topic需要重新訂閱忠蝗。
當setCleanSession為false
時, 客戶端掉線或服務端重啟后现横,服務端和客戶端不會清除之前的session。重連后session將會恢復阁最,客戶端不用重新訂閱主題戒祠。且離線期間發(fā)來QoS=0,1,2的消息仍然可以接收到。
這里有個需要注意的地方速种,即setCleanSession為true時姜盈,掉線后客戶端設置了setAutomaticReconnect
為true才會自動重連。為當setCleanSession為false時配阵。不管setAutomaticReconnect
為true或者false都會自動重連馏颂。
-
options.setKeepAliveInterval(15);
Sets the "keep alive" interval. This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, the client sends a very small "ping" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.
這個字段是設置keepalive
時間間隔的示血。
MQTT客戶端(client)在沒有收到或發(fā)布任何消息時仍然是保持連接的。服務端(the broker
)需要跟蹤客戶端的連接狀態(tài)救拉。 所有需要發(fā)送心跳包來確定客戶端是否是連接狀態(tài)难审。心跳包發(fā)送的時間間隔就是keepalive
設置的。
服務端都會維持一個timer亿絮。當這個timer記錄的時間超過1.5倍keepalive
值時告喊,服務端會將這個客戶端標記為斷開連接,并發(fā)送Last Will and Testament (LWT)
遺言廣播派昧。
以下情況下會重置這個timer:
- 每次客戶端發(fā)送或接收一個消息, 服務端會重置這個timer黔姜。
- 一個客戶端可以在任何時間發(fā)送一個
PINGREQ
的消息到服務器,服務器如果收到這個消息后蒂萎,會回一個PINGRESP
消息秆吵,然后服務端會重置這個timer。
paho client在一個keepalive
時間間隔內沒有向 Broker 發(fā)送任何數(shù)據(jù)包五慈,比如 PUBLISH 和 SUBSCRIBE 的時候纳寂,它會向 Broker 發(fā)送PINGREQ
數(shù)據(jù)包,告訴服務器自己仍然是連接的豺撑。
遇到的問題及解決方法
1.斷線重連后收不到消息烈疚。
一般是由于我們設置了setCleanSession=true
時,且setAutomaticReconnect(true)
聪轿,這樣mqtt客戶端斷線后會啟動自動重連機制爷肝。但是由于CleanSession=true
會啟動一個新的session,這樣需要重新訂閱topic陆错。如果我們沒有重新訂閱topic灯抛,就會導致斷線重連后收不到消息。
此時
我們需要將Callback替換成MqttCallbackExtended音瓷,并在重寫方法connectComplete
重新訂閱即可对嚼。
public void connectComplete(boolean reconnect, String serverURI){
client.subscribe(topics,Qos);//具體訂閱代碼
}```