package com.realtop.mqttutils;
import android.content.Context;
import android.content.SharedPreferences;
import android.os.Handler;
import android.os.Looper;
import android.text.TextUtils;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.lang.ref.WeakReference;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
public class MQTTHelper implements Runnable {
private final static class Inner {
public static final MQTTHelper OBJ = new MQTTHelper();
}
public static MQTTHelper getInstance() {
return Inner.OBJ;
}
private MQTTHelper() {
Log.i(TAG, "MQTTHelper_init_do_nothing: ");
}
public static final String TAG = "mqtt_helper";
public static final String INTENT_ACTION_MQTT_MSG = "INTENT_ACTION_MQTT_MSG";
public static final String MQTT_MSG = "MQTT_MSG";
/**
* 參數(shù)部分
*/
private String receiveSubject = "";
private String sendSubject = "";
private String host = "";
private String clientId = "";
private String userName = "";
private String password = "";
private boolean needAutoConnect = true;
private Context mContext;
private MqttAndroidClient mMqtt;
private Handler mHandler;
private final ConcurrentHashMap<Integer, WeakReference<OnSendMsgListener>> mListeners
= new ConcurrentHashMap<>();
private final Set<MQTTReceiver> mReceivers=new CopyOnWriteArraySet<>();
public Context getContext() {
return mContext;
}
public static SharedPreferences getConfig(Context context){
return context.getSharedPreferences("mqtt_config_prefer", Context.MODE_PRIVATE);
}
public boolean isNeedAutoConnect() {
return needAutoConnect;
}
public void setNeedAutoConnect(boolean needAutoConnect) {
this.needAutoConnect = needAutoConnect;
}
public String getSendSubject() {
return sendSubject;
}
public MQTTHelper setSendSubject(String sendSubject) {
this.sendSubject = sendSubject;
return this;
}
public String getReceiveSubject() {
return receiveSubject;
}
public MQTTHelper setReceiveSubject(String receiveSubject) {
this.receiveSubject = receiveSubject;
return this;
}
public String getHost() {
return host;
}
public MQTTHelper setHost(String host) {
this.host = host;
return this;
}
public String getClientId() {
return clientId;
}
public MQTTHelper setClientId(String clientId) {
this.clientId = clientId;
return this;
}
public String getUserName() {
return userName;
}
public MQTTHelper setUserName(String userName) {
this.userName = userName;
return this;
}
public String getPassword() {
return password;
}
public MQTTHelper setPassword(String password) {
this.password = password;
return this;
}
public void init(Context context) {
mContext = context.getApplicationContext();
mHandler = new Handler(Looper.getMainLooper());
if (mMqtt != null)
return;
// todo id 服務(wù)器提前輸入的id壹罚, 設(shè)備出場(chǎng)時(shí)刻錄
String id = MUtils.getFactoryMacAddress(context);
if (TextUtils.isEmpty(id)){
id = System.currentTimeMillis()+"_id";
}
if (TextUtils.isEmpty(clientId)) {
clientId = id;
}
if (TextUtils.isEmpty(receiveSubject)) {
receiveSubject = "$thing/down/";
}
if (TextUtils.isEmpty(sendSubject)) {
sendSubject = "$thing/up/";
}
mMqtt = new MqttAndroidClient(mContext, host, clientId);
mMqtt.setCallback(new MqttCallback());
Log.i(TAG, "init_" + clientId + ": " + receiveSubject + ": " + sendSubject);
}
@Override
public void run() {
startConnect();
}
/**
* 發(fā)送上行信息
* @param str 消息json
* @param listener 發(fā)送是否成功回調(diào)
*/
public void sendMsg(String str, OnSendMsgListener listener) {
sendMsg(getSendSubject(), str, listener);
}
public void sendMsg(String sendSubject, String str, OnSendMsgListener listener) {
if (mMqtt == null)
return;
new Handler(Looper.getMainLooper()).post(() -> {
try {
MqttMessage msg = new MqttMessage();
msg.setPayload(str.getBytes());
IMqttDeliveryToken publish = mMqtt.publish(sendSubject, msg);
Log.i(TAG, "sendMsg_send_msg_id: " + publish.getMessageId()+"; "+str);
if (listener == null)
return;
if (publish.getMessageId() == 0) {
listener.onSendFinish(false);
return;
}
WeakReference<OnSendMsgListener> reference = new WeakReference<>(listener);
mListeners.put(publish.getMessageId(), reference);
} catch (Exception e) {
Log.i(TAG, "sendMsg_error_" + e.getMessage());
}
});
}
/**
* 關(guān)閉自動(dòng)重連 默認(rèn)打開
*/
public void cancelAutoConnect() {
needAutoConnect = false;
mHandler.removeCallbacksAndMessages(null);
}
public void startConnect() {
if (mMqtt == null)
return;
mHandler.removeCallbacksAndMessages(null);
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(false);
options.setCleanSession(true);
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
options.setMaxInflight(10);
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
Log.i(TAG, "startConnect_username_passwd: " +
userName + "; " + password + "; " + mMqtt.getClientId() + "; " + options.getUserName());
mMqtt.connect(options, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "onSuccess_");
mHandler.removeCallbacksAndMessages(null);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "onFailure_" + exception.getMessage());
exception.printStackTrace();
mHandler.removeCallbacksAndMessages(null);
if (needAutoConnect) {
mHandler.postDelayed(getInstance(), 16000);
}
}
});
} catch (Exception e) {
e.printStackTrace();
Log.i(TAG, "onCreate_error_" + e.getMessage());
}
}
public void release() {
mReceivers.clear();
clearListener();
cancelAutoConnect();
mHandler.removeCallbacksAndMessages(null);
if (mMqtt == null)
return;
if (mMqtt.isConnected()) {
try {
mMqtt.disconnect();
} catch (Exception e) {
Log.i(TAG, "onDestroy_error_" + e.getMessage());
}
}
mMqtt = null;
Log.i(TAG, "release_end_now: ");
}
public void clearListener() {
Set<Integer> integers = mListeners.keySet();
for (Integer index : integers) {
try {
Objects.requireNonNull(mListeners.get(index)).clear();
} catch (Exception e) {
Log.i(TAG, "clearListener_item_error: " + e.getMessage());
}
}
mListeners.clear();
}
public void registerCallback(MQTTReceiver callback) {
mReceivers.add(callback);
}
public void unRegisterCallback(MQTTReceiver registerReceiver) {
mReceivers.remove(registerReceiver);
}
public static class MQTTReceiver {
protected void receiveMsg(String msg) {
}
}
private class MqttCallback implements MqttCallbackExtended {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
mHandler.removeCallbacksAndMessages(null);
try {
mMqtt.subscribe(receiveSubject, 0);
Log.i(TAG, "connectComplete_start_subscribe");
} catch (Exception e) {
Log.i(TAG, "onSuccess_error_" + e.getMessage());
}
}
@Override
public void connectionLost(Throwable cause) {
try {
if (mMqtt != null)
mMqtt.unsubscribe(receiveSubject);
Log.i(TAG, "connectionLost_unsubscribe");
} catch (Exception e) {
Log.i(TAG, "connectionLost_error_" + e.getMessage());
}
mHandler.removeCallbacksAndMessages(null);
if (needAutoConnect) {
mHandler.postDelayed(getInstance(), 68000);
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
byte[] payload = message.getPayload();
String msg = new String(payload);
Log.i(TAG, "messageArrived_msg_" + msg);
for (MQTTReceiver item : mReceivers) {
item.receiveMsg(msg);
}
} catch (Exception e) {
Log.i(TAG, "messageArrived_error:" + e.getMessage());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
WeakReference<OnSendMsgListener> reference = mListeners.get(token.getMessageId());
if (reference!=null){
OnSendMsgListener onSendMsgListener = reference.get();
if (onSendMsgListener != null) {
onSendMsgListener.onSendFinish(token.isComplete());
}
reference.clear();
mListeners.remove(token.getMessageId());
}
} catch (Exception e) {
Log.i(TAG, "deliveryComplete_error: " + e.getMessage());
}
Log.i(TAG, "deliveryComplete_send_msg_id: "
+ token.getMessageId() + "; " + token.isComplete());
}
}
public interface OnSendMsgListener {
void onSendFinish(boolean isComplete);
}
}
mqtt 工具類
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
- 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)暇韧,“玉大人勾习,你說(shuō)我怎么就攤上這事⌒覆#” “怎么了巧婶?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)涂乌。 經(jīng)常有香客問(wèn)我艺栈,道長(zhǎng),這世上最難降的妖魔是什么湾盒? 我笑而不...
- 正文 為了忘掉前任湿右,我火速辦了婚禮,結(jié)果婚禮上罚勾,老公的妹妹穿的比我還像新娘毅人。我一直安慰自己,他們只是感情好尖殃,可當(dāng)我...
- 文/花漫 我一把揭開白布丈莺。 她就那樣靜靜地躺著,像睡著了一般送丰。 火紅的嫁衣襯著肌膚如雪缔俄。 梳的紋絲不亂的頭發(fā)上,一...
- 那天,我揣著相機(jī)與錄音俐载,去河邊找鬼铐懊。 笑死,一個(gè)胖子當(dāng)著我的面吹牛瞎疼,可吹牛的內(nèi)容都是我干的科乎。 我是一名探鬼主播,決...
- 文/蒼蘭香墨 我猛地睜開眼贼急,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼茅茂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起太抓,我...
- 序言:老撾萬(wàn)榮一對(duì)情侶失蹤空闲,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后走敌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碴倾,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年掉丽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了跌榔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
- 正文 年R本政府宣布,位于F島的核電站锭部,受9級(jí)特大地震影響暂论,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拌禾,卻給世界環(huán)境...
- 文/蒙蒙 一取胎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蹋砚,春花似錦扼菠、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至墨坚,卻和暖如春秧饮,著一層夾襖步出監(jiān)牢的瞬間映挂,已是汗流浹背。 一陣腳步聲響...
- 正文 我出身青樓泼各,卻偏偏與公主長(zhǎng)得像鞍时,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扣蜻,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 介紹 MQTT X 是一款開源的MQTT 5.0 桌面測(cè)試客戶端逆巍,相比其他mqtt的桌面客戶端工具,比如paho莽使、...
- 介紹 多標(biāo)簽頁(yè)管理锐极,同時(shí)打開多個(gè)連接 提供原生性能,并且比使用 Electron 等 Web 技術(shù)開發(fā)的同等應(yīng)用程...
- MQTT Spy 這是一個(gè)用java開發(fā)的開源MQTT客戶端芳肌,可以到他們的GitHub上下載灵再,用起來(lái)感覺(jué)得行但是如...
- 在通過(guò)MQTT協(xié)議接入騰訊云物聯(lián)網(wǎng)平臺(tái)時(shí),需要根據(jù)控制臺(tái)的相關(guān)產(chǎn)品信息計(jì)算用戶名和密碼等配置參數(shù)亿笤,操作上對(duì)于新手不...