MQTT從搭建代理服務器到推送消息過程

MQTT簡介:

MQTT(Message Queuing Telemetry Transport宝惰,消息隊列遙測傳輸)是IBM開發(fā)的一個即時通訊協(xié)議!

MQTT消息的主要特點:

使用(publish/subscribe)消息模式,簡稱p/s模式,即發(fā)布/訂閱!提供一對多的發(fā)送方式!
MQTT根據(jù)QoS定義的等級來傳輸消息:
  • level 0:最多一次的傳輸
消息是基于TCP/IP網(wǎng)絡傳輸?shù)摹]有回應泉蝌,在協(xié)議中也沒有定義重傳的語義。消息可能到達服務器1次,也可能根本不會到達汹胃。
  • level 1:至少一次的傳輸
服務器接收到消息會被確認,通過傳輸一個PUBACK信息东臀。如果有一個可以辨認的傳輸失敗着饥,無論是通訊連接還是發(fā)送設備,還是過了一段時間確認信息沒有收到惰赋,發(fā)送方都會將消息頭的DUP位置1宰掉,然后再次發(fā)送消息呵哨。消息最少一次到達服務器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS轨奄。
如果客戶端沒有接收到PUBACK信息(無論是應用定義的超時孟害,還是檢測到失敗然后通訊session重啟),客戶端都會再次發(fā)送PUBLISH信息挪拟,并且將DUP位置1挨务。
當它從客戶端接收到重復的數(shù)據(jù),服務器重新發(fā)送消息給訂閱者玉组,并且發(fā)送另一個PUBACK消息谎柄。
  • level 2: 只有一次的傳輸
在QoS level 1上附加的協(xié)議流保證了重復的消息不會傳送到接收的應用。這是最高級別的傳輸惯雳,當重復的消息不被允許的情況下使用朝巫。這樣增加了網(wǎng)絡流量,但是它通常是可以接受的石景,因為消息內(nèi)容很重要劈猿。
QoS level 2在消息頭有Message ID。

接下來開始我們的表演:

下載代理服務器
創(chuàng)建代理服務器
  • 下載完成然后解壓目錄

  • 打開dos窗口進入到apache-apollo-1.7.1\bin目錄下

  • 執(zhí)行apollo create testbroker命令創(chuàng)建一個名稱為testbroker的代理服務器

  • Paste_Image.png
  • 下面就是我們創(chuàng)建的代理服務器


    Paste_Image.png
啟動代理服務器
  • 使用dos進入testbroker目錄中的bin目錄下

  • 執(zhí)行apollo-broker run命令啟動代理服務器

  • Paste_Image.png
通過HTTP訪問代理服務器

現(xiàn)在我們可以打開瀏覽器看下我們的代理服務器
輸入網(wǎng)址http://127.0.0.1:61680/

  • Paste_Image.png
  • 用戶名密碼可到配置文件中查看

  • 進入testbroker目錄下的etc目錄

Paste_Image.png
  • users.properties中配置的用戶名和密碼
Paste_Image.png
  • 默認有個用戶名為admin潮孽,密碼為password的用戶

  • 我們也可以自己配置用戶

  • 現(xiàn)在就用默認用戶登陸

  • Paste_Image.png

OK登陸成功

接下來我們編寫Android客戶端

Paste_Image.png
一定保證客戶端和服務端以及代理服務器所在的電腦在同一網(wǎng)段下
  • 可以在電腦上生成wifi熱點,手機客戶端連接熱點即可
  • 接下來直接貼Android客戶端代碼
MqttService.java
package com.example.jingwc.mqtt_demo;

import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.util.Log;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttService extends Service {

    /**
     * 代理服務器ip地址
     */
    public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613";

    /**
     * 客戶端唯一標識
     */
    public static final String MQTT_CLIENT_ID = "android-jingwc";

    /**
     * 訂閱標識
     */
    public static final String MQTT_TOPIC = "jingwc";

    /**
     * 用戶名
     */
    public static final String USERNAME = "admin";
    /**
     *  密碼
     */
    public static final String PASSWORD = "password";

    private MqttClient mqttClient;

    public MqttService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
        return binder;
    }

    /**
     * 連接mqtt
     */
    public void connect(){
        try {
            // host為主機名揪荣,clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示恩商,
            // MemoryPersistence設置clientid的保存形式变逃,默認為以內(nèi)存保存
            mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence());
            // 配置參數(shù)信息
            MqttConnectOptions options = new MqttConnectOptions();
            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
            // 這里設置為true表示每次連接到服務器都以新的身份連接
            options.setCleanSession(true);
            // 設置用戶名
            options.setUserName(USERNAME);
            // 設置密碼
            options.setPassword(PASSWORD.toCharArray());
            // 設置超時時間 單位為秒
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線怠堪,但這個方法并沒有重連的機制
            options.setKeepAliveInterval(20);
            // 連接
            mqttClient.connect(options);

            // 訂閱
            mqttClient.subscribe(MQTT_TOPIC);

            // 設置回調(diào)
            mqttClient.setCallback(new MqttCallback() {
                //連接丟失后揽乱,一般在這里面進行重連
                @Override
                public void connectionLost(Throwable throwable) {
                    Log.d("test","connectionLost");
                }


                //subscribe后得到的消息會執(zhí)行到這里面
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    Log.d("test","messageArrived"+mqttMessage.toString());
                }

                //publish后會執(zhí)行到這里
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    Log.d("test","deliveryComplete");
                }
            });

        } catch (MqttException e) {
            e.printStackTrace();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開連接
     */
    public void disconnect(){
        if(mqttClient != null){
            if(mqttClient.isConnected()){
                try {
                    mqttClient.disconnect();
                    mqttClient = null;
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private final Binder binder = new MyBinder();

    class MyBinder extends Binder{

        public MqttService getService(){
            return MqttService.this;
        }
    }
}

MainActivity.java
package com.example.jingwc.mqtt_demo;

import android.content.ComponentName;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.IBinder;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;

public class MainActivity extends AppCompatActivity {

    MqttService service = null;

    private ServiceConnection mConnection = new ServiceConnection() {
        @Override
        public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
            service = ((MqttService.MyBinder)iBinder).getService();
        }

        @Override
        public void onServiceDisconnected(ComponentName componentName) {
            service = null;
        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Button bt_connect = (Button) findViewById(R.id.bt_connect);
        Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect);

        bt_connect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                // 連接
                service.connect();
            }
        });

        bt_disconnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                // 斷開連接
                service.disconnect();
            }
        });

        bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE);
    }
}

服務端代碼

  • 也可以在寫一個android程序當作服務端
  • 我這里寫的是java項目
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttServer {
    
    // 代理服務器ip地址
    private static String host = "tcp://192.168.1.107:61613";
    
    private static String userName = "admin";
    private static String password = "password";
    
    private static MqttClient client;
    
    // 主題
    private static MqttTopic topic;
    
    private static MqttMessage message;
    
    // 訂閱標識
    private static String topicStr = "jingwc";
    
    public static void main(String[] args) throws MqttException{
        client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        
        topic = client.getTopic(topicStr);
        
        message = new MqttMessage();
        message.setQos(1);
        message.setRetained(true);
        message.setPayload("from server message".getBytes());
        client.connect(options);
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("token:"+token.isComplete());
        
    }

}

服務端通過代理服務器發(fā)送客戶端訂閱的消息圖

  • Paste_Image.png
Paste_Image.png
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市粟矿,隨后出現(xiàn)的幾起案子凰棉,更是在濱河造成了極大的恐慌,老刑警劉巖陌粹,帶你破解...
    沈念sama閱讀 223,126評論 6 520
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件撒犀,死亡現(xiàn)場離奇詭異,居然都是意外死亡掏秩,警方通過查閱死者的電腦和手機或舞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,421評論 3 400
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蒙幻,“玉大人映凳,你說我怎么就攤上這事∮势疲” “怎么了诈豌?”我有些...
    開封第一講書人閱讀 169,941評論 0 366
  • 文/不壞的土叔 我叫張陵仆救,是天一觀的道長。 經(jīng)常有香客問我矫渔,道長彤蔽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,294評論 1 300
  • 正文 為了忘掉前任庙洼,我火速辦了婚禮顿痪,結果婚禮上,老公的妹妹穿的比我還像新娘送膳。我一直安慰自己员魏,他們只是感情好丑蛤,可當我...
    茶點故事閱讀 69,295評論 6 398
  • 文/花漫 我一把揭開白布叠聋。 她就那樣靜靜地躺著,像睡著了一般受裹。 火紅的嫁衣襯著肌膚如雪碌补。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,874評論 1 314
  • 那天棉饶,我揣著相機與錄音厦章,去河邊找鬼。 笑死照藻,一個胖子當著我的面吹牛袜啃,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播幸缕,決...
    沈念sama閱讀 41,285評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼群发,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了发乔?” 一聲冷哼從身側(cè)響起熟妓,我...
    開封第一講書人閱讀 40,249評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎栏尚,沒想到半個月后起愈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,760評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡译仗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,840評論 3 343
  • 正文 我和宋清朗相戀三年抬虽,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纵菌。...
    茶點故事閱讀 40,973評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡阐污,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出产艾,到底是詐尸還是另有隱情疤剑,我是刑警寧澤滑绒,帶...
    沈念sama閱讀 36,631評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站隘膘,受9級特大地震影響疑故,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弯菊,卻給世界環(huán)境...
    茶點故事閱讀 42,315評論 3 336
  • 文/蒙蒙 一纵势、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧管钳,春花似錦钦铁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,797評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至醇滥,卻和暖如春黎比,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鸳玩。 一陣腳步聲響...
    開封第一講書人閱讀 33,926評論 1 275
  • 我被黑心中介騙來泰國打工阅虫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人不跟。 一個月前我還...
    沈念sama閱讀 49,431評論 3 379
  • 正文 我出身青樓颓帝,卻偏偏與公主長得像,于是被迫代替她去往敵國和親窝革。 傳聞我的和親對象是個殘疾皇子购城,可洞房花燭夜當晚...
    茶點故事閱讀 45,982評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn)聊闯,斷路器工猜,智...
    卡卡羅2017閱讀 134,720評論 18 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,348評論 25 707
  • MQTT簡介 MQ 遙測傳輸 (MQTT) 是輕量級基于代理的發(fā)布/訂閱的消息傳輸協(xié)議,設計思想是開放菱蔬、簡單篷帅、輕量...
    小熊_c37d閱讀 11,280評論 0 3
  • 本來有一顆水晶般的玻璃心,現(xiàn)在被蹂躪成了鹽拴泌,加點兒水稀釋一下就是淚了
    你說我聽好么閱讀 241評論 0 0
  • 昨夜一場狂風暴雨魏身,早上天氣倒是晴朗起來,雨后空氣濕潤清爽蚪腐,由不得心動箭昵。送下女兒回家,推出自行車回季,去轉(zhuǎn)轉(zhuǎn)家制。 雨后的一...
    清和qinghe閱讀 203評論 2 2