mqttmanager
import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import com.licheedev.myutils.LogPlus;
import com.simonfong.mqttdemo.listener.MqttCallback;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @author simonfong
* Created on 2019/6/6
*/
public class MqttManager {
private String serverUri;
private String userName;
private String passWord;
private final Context mContext;
private volatile MqttAndroidClient mqttAndroidClient;
private static volatile MqttManager mqttManager = null;
private MqttCallback callback;
private String[] mqttTopic;
private int[] qos;
private boolean autoReconnect = true;
/**
* 初始化
*
* @param serverUri mqtt域名
* @param userName 賬號(hào)
* @param passWord 密碼
* @param context
*/
public static MqttManager init(String serverUri, String userName, String passWord, Context context) {
return init(serverUri,userName,passWord,context,null,null);
}
/**
* 初始化
*
* @param serverUri mqtt域名
* @param userName 賬號(hào)
* @param passWord 密碼
* @param context
* @param topics 主題
* @param qos QOS = 0/1/2 最多一次 最少一次 多次
* @return
*/
public static MqttManager init(String serverUri, String userName, String passWord, Context context, String[] topics,
int[] qos) {
if (mqttManager == null) {
synchronized (MqttManager.class) {
if (mqttManager == null) {
mqttManager = new MqttManager(serverUri, userName, passWord, context, topics, qos);
}
}
}
return mqttManager;
}
/**
* 修改訂閱主題
*
* @param topics 主題
* @param qos QOS = 0/1/2 最多一次 最少一次 多次
*/
public void setTopicAndQos(String[] topics, int[] qos) {
if (mqttAndroidClient != null || !mqttAndroidClient.isConnected()) {
try {
mqttAndroidClient.unsubscribe(mqttTopic);
} catch (MqttException e) {
e.printStackTrace();
}
}
mqttTopic = topics;
this.qos = qos;
subscribeToTopic();
}
/**
* 設(shè)置是否自動(dòng)重連,默認(rèn)為true
*
* @param isAuto
*/
public void setAutoReconnect(boolean isAuto) {
autoReconnect = isAuto;
}
public MqttManager(String serverUri, String userName, String passWord, Context context, String[] topics, int[] qos) {
this.serverUri = serverUri;
this.userName = userName;
this.passWord = passWord;
this.mqttTopic = topics;
this.qos = qos;
mContext = context;
initConnect();
}
public void setCallback(MqttCallback callback) {
this.callback = callback;
}
private boolean isConnect() {
if (mqttAndroidClient != null) {
return mqttAndroidClient.isConnected();
}
return false;
}
private void initConnect() {
if (isConnect()) {
return;
}
mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, MqttClient.generateClientId());
mqttAndroidClient.registerResources(mContext);
mqttAndroidClient.setCallback(new MyMqttCallbackExtended());
//mqtt連接參數(shù)設(shè)置
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//設(shè)置自動(dòng)重連
mqttConnectOptions.setAutomaticReconnect(autoReconnect);
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄
// 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
mqttConnectOptions.setCleanSession(false);
//設(shè)置連接的用戶名
mqttConnectOptions.setUserName(userName);
//設(shè)置連接的密碼
mqttConnectOptions.setPassword(passWord.toCharArray());
// 設(shè)置超時(shí)時(shí)間 單位為秒
mqttConnectOptions.setConnectionTimeout(10);
// 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線迟杂,但這個(gè)方法并沒(méi)有重連的機(jī)制
mqttConnectOptions.setKeepAliveInterval(20);
try {
mqttAndroidClient.connect(mqttConnectOptions);
} catch (Exception e) {
LogPlus.e("connect--onFailure:" + e.toString());
e.printStackTrace();
if (callback != null) {
callback.connectFail(e.toString());
}
}
}
private class MyMqttCallbackExtended implements MqttCallbackExtended {
/**
* 連接完成回調(diào)
*
* @param reconnect true 斷開(kāi)重連,false 首次連接
* @param serverURI 服務(wù)器URI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
LogPlus.e("連接成功connectComplete:是否重連+" + reconnect + "-----serverURI:" + serverURI);
if (callback != null) {
callback.connectSuccess(reconnect);
}
subscribeToTopic();
}
@Override
public void connectionLost(Throwable cause) {
LogPlus.e("connectionLost:斷開(kāi)");
if (callback != null) {
callback.connectLost("connectionLost:斷開(kāi)");
}
cause.printStackTrace();
}
/**
* 消息接收,如果在訂閱的時(shí)候沒(méi)有設(shè)置IMqttMessageListener缀蹄,那么收到消息則會(huì)在這里回調(diào)峭跳。
* 如果設(shè)置了IMqttMessageListener,則消息回調(diào)在IMqttMessageListener中
*
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
LogPlus.e(message.getId() + "-->receive message: " + message.toString());
if (callback != null) {
callback.receiveMessage(topic, message.toString());
}
}
/**
* 交付完成回調(diào)缺前。在publish消息的時(shí)候會(huì)收到此回調(diào).
* qos:
* 0 發(fā)送完則回調(diào)
* 1 或 2 會(huì)在對(duì)方收到時(shí)候回調(diào)
*
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if (callback != null) {
try {
callback.deliveryComplete(token.getMessage().toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
LogPlus.e(token.toString());
}
}
private void subscribeToTopic() {
if (mqttAndroidClient == null || !mqttAndroidClient.isConnected() || mqttTopic == null || qos == null) {
return;
}
try {
//TODO 訂閱的主題蛀醉,和qos
mqttAndroidClient.subscribe(mqttTopic, qos, mContext.getApplicationContext(), new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
for (String s : mqttTopic) {
LogPlus.e("訂閱成功" + "topic:" + s);
}
if (callback != null) {
callback.subscribedSuccess(mqttTopic);
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogPlus.e("訂閱失敗");
if (callback != null) {
callback.subscribedFail(exception.toString());
}
}
});
} catch (MqttException e) {
e.printStackTrace();
if (callback != null) {
callback.subscribedFail(e.toString());
}
}
}
public static MqttManager getInstance() {
if (mqttManager == null) {
throw new NullPointerException("請(qǐng)先調(diào)用init方法進(jìn)行初始化");
}
return mqttManager;
}
/**
* 發(fā)送
*
* @param topic 發(fā)送的主題
* @param msg 發(fā)送的消息
* @param qos QOS = 0/1/2 最多一次 最少一次 多次
* @param retained 是否保留消息,為true時(shí),后來(lái)訂閱該主題的仍然收到該消息
*/
public void publishMessage(String topic, String msg, int qos, boolean retained) {
if (isConnect()) {
try {
mqttAndroidClient.publish(topic, msg.getBytes(), qos, retained);
LogPlus.e("Mqtt 發(fā)送消息:" + msg);
if (!mqttAndroidClient.isConnected()) {
LogPlus.e(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttException e) {
LogPlus.e("Error Publishing: " + e.toString());
}
}
}
public void onDestroy() {
if (mqttAndroidClient == null) {
return;
}
try {
mqttAndroidClient.close();
mqttAndroidClient.disconnect();
mqttAndroidClient.unregisterResources();
mqttManager = null;
mqttAndroidClient = null;
} catch (Exception e) {
e.printStackTrace();
}
}
// 判斷網(wǎng)絡(luò)是否連接
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getApplicationContext()
.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
LogPlus.e("MQTT當(dāng)前網(wǎng)絡(luò)名稱:" + name);
return true;
} else {
LogPlus.e("MQTT 沒(méi)有可用網(wǎng)絡(luò)");
return false;
}
}
}
image.gif
MqttCallback:
public abstract class MqttCallback {
/**
* 訂閱成功
*
* @param mqttTopic
*/
public void subscribedSuccess(String[] mqttTopic) {
}
/**
* 訂閱失敗
*
* @param message
*/
public void subscribedFail(String message) {
}
/**
* 發(fā)送成功
* @param message
*/
public void deliveryComplete(String message) {
}
/**
* 接收的數(shù)據(jù)
*
* @param topic
* @param message
*/
public abstract void receiveMessage(String topic, String message);
/**
* 連接成功
*/
public void connectSuccess(boolean reconnect) {
}
/**
* 連接失敗
*
* @param message
*/
public void connectFail(String message) {
}
/**
* 斷開(kāi)連接
*
* @param message
*/
public void connectLost(String message) {
}
}
image.gif
使用:
private void initMqtt() {
String[] topics = {myTopic, myTopic2};
int[] qos = {0, 0};
MqttManager.init(host, userName, passWord, this, topics, qos);
MqttManager.getInstance().setCallback(new MyMqttCallback());
}
private class MyMqttCallback extends MqttCallback {
@Override
public void subscribedSuccess(String[] mqttTopic) {
for (String s : mqttTopic) {
LogPlus.e("subscribedSuccess:" + s);
}
}
@Override
public void subscribedFail(String message) {
LogPlus.e("subscribedFail:" + message);
}
@Override
public void receiveMessage(String topic, String message) {
LogPlus.e("topic:" + topic + "------receiveMessage:" + message);
}
@Override
public void connectSuccess(boolean reconnect) {
LogPlus.e("connectSuccess:" + reconnect);
}
@Override
public void connectFail(String message) {
LogPlus.e("connectFail:" + message);
}
@Override
public void connectLost(String message) {
LogPlus.e("connectLost:" + message);
}
}
@Override
protected void onDestroy() {
super.onDestroy();
MqttManager.getInstance().onDestroy();
}
image.gif
注意衅码,回調(diào)是在子線程拯刁,要進(jìn)行ui操作必須要回到主線程
發(fā)消息
MqttManager.getInstance().publishMessage(myTopic2, content, 0, false);
image.gif