mqtt簡(jiǎn)單封裝類

mqttmanager

import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;

import com.licheedev.myutils.LogPlus;
import com.simonfong.mqttdemo.listener.MqttCallback;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author simonfong
 * Created  on 2019/6/6
 */
public class MqttManager {
    private String serverUri;
    private String userName;
    private String passWord;
    private final Context mContext;
    private volatile MqttAndroidClient mqttAndroidClient;
    private static volatile MqttManager mqttManager = null;
    private MqttCallback callback;
    private String[] mqttTopic;
    private int[] qos;
    private boolean autoReconnect = true;

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號(hào)
     * @param passWord  密碼
     * @param context
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context) {
        return init(serverUri,userName,passWord,context,null,null);
    }

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號(hào)
     * @param passWord  密碼
     * @param context
     * @param topics    主題
     * @param qos       QOS = 0/1/2   最多一次  最少一次 多次
     * @return
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context, String[] topics,
                                   int[] qos) {
        if (mqttManager == null) {
            synchronized (MqttManager.class) {
                if (mqttManager == null) {
                    mqttManager = new MqttManager(serverUri, userName, passWord, context, topics, qos);
                }
            }
        }
        return mqttManager;
    }

    /**
     * 修改訂閱主題
     *
     * @param topics 主題
     * @param qos    QOS = 0/1/2   最多一次  最少一次 多次
     */
    public void setTopicAndQos(String[] topics, int[] qos) {
        if (mqttAndroidClient != null || !mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.unsubscribe(mqttTopic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        mqttTopic = topics;
        this.qos = qos;
        subscribeToTopic();
    }

    /**
     * 設(shè)置是否自動(dòng)重連,默認(rèn)為true
     *
     * @param isAuto
     */
    public void setAutoReconnect(boolean isAuto) {
        autoReconnect = isAuto;
    }

    public MqttManager(String serverUri, String userName, String passWord, Context context, String[] topics, int[] qos) {
        this.serverUri = serverUri;
        this.userName = userName;
        this.passWord = passWord;
        this.mqttTopic = topics;
        this.qos = qos;
        mContext = context;
        initConnect();
    }

    public void setCallback(MqttCallback callback) {
        this.callback = callback;
    }

    private boolean isConnect() {
        if (mqttAndroidClient != null) {
            return mqttAndroidClient.isConnected();
        }
        return false;
    }

    private void initConnect() {

        if (isConnect()) {
            return;
        }
        mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, MqttClient.generateClientId());
        mqttAndroidClient.registerResources(mContext);
        mqttAndroidClient.setCallback(new MyMqttCallbackExtended());
        //mqtt連接參數(shù)設(shè)置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //設(shè)置自動(dòng)重連
        mqttConnectOptions.setAutomaticReconnect(autoReconnect);
        // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄
        // 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
        mqttConnectOptions.setCleanSession(false);
        //設(shè)置連接的用戶名
        mqttConnectOptions.setUserName(userName);
        //設(shè)置連接的密碼
        mqttConnectOptions.setPassword(passWord.toCharArray());
        // 設(shè)置超時(shí)時(shí)間 單位為秒
        mqttConnectOptions.setConnectionTimeout(10);
        // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線迟杂,但這個(gè)方法并沒(méi)有重連的機(jī)制
        mqttConnectOptions.setKeepAliveInterval(20);
        try {
            mqttAndroidClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            LogPlus.e("connect--onFailure:" + e.toString());
            e.printStackTrace();
            if (callback != null) {
                callback.connectFail(e.toString());
            }
        }
    }

    private class MyMqttCallbackExtended implements MqttCallbackExtended {

        /**
         * 連接完成回調(diào)
         *
         * @param reconnect true 斷開(kāi)重連,false 首次連接
         * @param serverURI 服務(wù)器URI
         */
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            LogPlus.e("連接成功connectComplete:是否重連+" + reconnect + "-----serverURI:" + serverURI);
            if (callback != null) {
                callback.connectSuccess(reconnect);
            }
            subscribeToTopic();
        }

        @Override
        public void connectionLost(Throwable cause) {
            LogPlus.e("connectionLost:斷開(kāi)");
            if (callback != null) {
                callback.connectLost("connectionLost:斷開(kāi)");
            }
            cause.printStackTrace();
        }

        /**
         * 消息接收,如果在訂閱的時(shí)候沒(méi)有設(shè)置IMqttMessageListener缀蹄,那么收到消息則會(huì)在這里回調(diào)峭跳。
         * 如果設(shè)置了IMqttMessageListener,則消息回調(diào)在IMqttMessageListener中
         *
         * @param topic
         * @param message
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            LogPlus.e(message.getId() + "-->receive message: " + message.toString());
            if (callback != null) {
                callback.receiveMessage(topic, message.toString());
            }

        }

        /**
         * 交付完成回調(diào)缺前。在publish消息的時(shí)候會(huì)收到此回調(diào).
         * qos:
         * 0 發(fā)送完則回調(diào)
         * 1 或 2 會(huì)在對(duì)方收到時(shí)候回調(diào)
         *
         * @param token
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            if (callback != null) {
                try {
                    callback.deliveryComplete(token.getMessage().toString());
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
            LogPlus.e(token.toString());
        }
    }

    private void subscribeToTopic() {
        if (mqttAndroidClient == null || !mqttAndroidClient.isConnected() || mqttTopic == null || qos == null) {
            return;
        }
        try {
            //TODO  訂閱的主題蛀醉,和qos
            mqttAndroidClient.subscribe(mqttTopic, qos, mContext.getApplicationContext(), new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    for (String s : mqttTopic) {
                        LogPlus.e("訂閱成功" + "topic:" + s);
                    }
                    if (callback != null) {
                        callback.subscribedSuccess(mqttTopic);
                    }
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogPlus.e("訂閱失敗");
                    if (callback != null) {
                        callback.subscribedFail(exception.toString());
                    }
                }

            });
        } catch (MqttException e) {
            e.printStackTrace();
            if (callback != null) {
                callback.subscribedFail(e.toString());
            }
        }
    }

    public static MqttManager getInstance() {
        if (mqttManager == null) {
            throw new NullPointerException("請(qǐng)先調(diào)用init方法進(jìn)行初始化");
        }
        return mqttManager;
    }

    /**
     * 發(fā)送
     *
     * @param topic    發(fā)送的主題
     * @param msg      發(fā)送的消息
     * @param qos      QOS = 0/1/2   最多一次  最少一次 多次
     * @param retained 是否保留消息,為true時(shí),后來(lái)訂閱該主題的仍然收到該消息
     */
    public void publishMessage(String topic, String msg, int qos, boolean retained) {
        if (isConnect()) {
            try {
                mqttAndroidClient.publish(topic, msg.getBytes(), qos, retained);
                LogPlus.e("Mqtt 發(fā)送消息:" + msg);
                if (!mqttAndroidClient.isConnected()) {
                    LogPlus.e(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                }
            } catch (MqttException e) {
                LogPlus.e("Error Publishing: " + e.toString());
            }
        }
    }

    public void onDestroy() {
        if (mqttAndroidClient == null) {
            return;
        }
        try {
            mqttAndroidClient.close();
            mqttAndroidClient.disconnect();
            mqttAndroidClient.unregisterResources();
            mqttManager = null;
            mqttAndroidClient = null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 判斷網(wǎng)絡(luò)是否連接
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            LogPlus.e("MQTT當(dāng)前網(wǎng)絡(luò)名稱:" + name);
            return true;
        } else {
            LogPlus.e("MQTT 沒(méi)有可用網(wǎng)絡(luò)");
            return false;
        }
    }

}
image.gif

MqttCallback:

public abstract class MqttCallback {
    /**
     * 訂閱成功
     *
     * @param mqttTopic
     */
    public void subscribedSuccess(String[] mqttTopic) {

    }

    /**
     * 訂閱失敗
     *
     * @param message
     */
    public void subscribedFail(String message) {

    }

    /**
     * 發(fā)送成功
     * @param message
     */
    public void deliveryComplete(String message) {

    }

    /**
     * 接收的數(shù)據(jù)
     *
     * @param topic
     * @param message
     */
    public abstract void receiveMessage(String topic, String message);

    /**
     * 連接成功
     */
    public void connectSuccess(boolean reconnect) {

    }

    /**
     * 連接失敗
     *
     * @param message
     */
    public void connectFail(String message) {

    }

    /**
     * 斷開(kāi)連接
     *
     * @param message
     */
    public void connectLost(String message) {

    }
}

image.gif

使用:

  private void initMqtt() {
        String[] topics = {myTopic, myTopic2};
        int[] qos = {0, 0};
        MqttManager.init(host, userName, passWord, this, topics, qos);
        MqttManager.getInstance().setCallback(new MyMqttCallback());

    }

    private class MyMqttCallback extends MqttCallback {

        @Override
        public void subscribedSuccess(String[] mqttTopic) {
            for (String s : mqttTopic) {
                LogPlus.e("subscribedSuccess:" + s);
            }
        }

        @Override
        public void subscribedFail(String message) {
            LogPlus.e("subscribedFail:" + message);
        }

        @Override
        public void receiveMessage(String topic, String message) {
            LogPlus.e("topic:" + topic + "------receiveMessage:" + message);
        }

        @Override
        public void connectSuccess(boolean reconnect) {
            LogPlus.e("connectSuccess:" + reconnect);
        }

        @Override
        public void connectFail(String message) {
            LogPlus.e("connectFail:" + message);
        }

        @Override
        public void connectLost(String message) {
            LogPlus.e("connectLost:" + message);
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        MqttManager.getInstance().onDestroy();
    }
image.gif

注意衅码,回調(diào)是在子線程拯刁,要進(jìn)行ui操作必須要回到主線程

發(fā)消息

 MqttManager.getInstance().publishMessage(myTopic2, content, 0, false);
image.gif
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肆良,隨后出現(xiàn)的幾起案子筛璧,更是在濱河造成了極大的恐慌,老刑警劉巖惹恃,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件夭谤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡巫糙,警方通過(guò)查閱死者的電腦和手機(jī)朗儒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)参淹,“玉大人醉锄,你說(shuō)我怎么就攤上這事≌阒担” “怎么了恳不?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)开呐。 經(jīng)常有香客問(wèn)我烟勋,道長(zhǎng),這世上最難降的妖魔是什么筐付? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任卵惦,我火速辦了婚禮,結(jié)果婚禮上瓦戚,老公的妹妹穿的比我還像新娘沮尿。我一直安慰自己,他們只是感情好较解,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布畜疾。 她就那樣靜靜地躺著赴邻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪庸疾。 梳的紋絲不亂的頭發(fā)上乍楚,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天当编,我揣著相機(jī)與錄音届慈,去河邊找鬼。 笑死忿偷,一個(gè)胖子當(dāng)著我的面吹牛金顿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鲤桥,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼揍拆,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了茶凳?” 一聲冷哼從身側(cè)響起嫂拴,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎贮喧,沒(méi)想到半個(gè)月后筒狠,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡箱沦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年辩恼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谓形。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灶伊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出寒跳,到底是詐尸還是另有隱情聘萨,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布童太,位于F島的核電站米辐,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏康愤。R本人自食惡果不足惜儡循,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望征冷。 院中可真熱鬧择膝,春花似錦、人聲如沸检激。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至齿穗,卻和暖如春傲隶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背窃页。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工跺株, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人脖卖。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓乒省,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親畦木。 傳聞我的和親對(duì)象是個(gè)殘疾皇子袖扛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒(méi)有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,101評(píng)論 1 32
  • Object C中創(chuàng)建線程的方法是什么?如果在主線程中執(zhí)行代碼十籍,方法是什么蛆封?如果想延時(shí)執(zhí)行代碼、方法又是什么勾栗? 1...
    AlanGe閱讀 1,739評(píng)論 0 17
  • 美圖欣賞 Java惨篱、Android知識(shí)點(diǎn)匯集 Java集合類 ** Java集合相關(guān)的博客** java面試相關(guān) ...
    ElvenShi閱讀 1,740評(píng)論 0 2
  • 為了更好的理解 Looper 的工作原理,我們需要對(duì) ThreadLocal 進(jìn)行了解械姻,如果對(duì) ThreadLoc...
    墨染書(shū)閱讀 1,475評(píng)論 0 3
  • Android消息處理機(jī)制估計(jì)都被寫(xiě)爛了,但是依然還是要寫(xiě)一下欢揖,因?yàn)锳ndroid應(yīng)用程序是通過(guò)消息來(lái)驅(qū)動(dòng)的陶耍,An...
    一碼立程閱讀 4,469評(píng)論 4 36