Mqtt及mosquitto贸辈、eclipse paho詳解

一.MQTT

1.簡介

???????MQTT(Message Queuing Telemetry Transport 消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議证舟。它工作在 TCP/IP協(xié)議族上拣展,是為硬件性能低下的遠程設備以及網(wǎng)絡狀況糟糕的情況下而設計的發(fā)布/訂閱型消息協(xié)議傻铣,為此,它需要一個消息中間件 页响。
???????MQTT是IBM開發(fā)的一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。
???????MQTT協(xié)議是輕量段誊、簡單闰蚕、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛连舍。在很多情況下没陡,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)索赏。其在盼玄,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設備潜腻、智能家居强岸、及一些小型化設備中已廣泛使用。

2.特性

  • 基于發(fā)布 / 訂閱范式的 “輕量級” 消息協(xié)議(頭部 2 字節(jié))
  • 專為資源受限的設備砾赔、低帶寬占用高延時或者不可靠的網(wǎng)絡設計蝌箍,適用于 IoT 與 M2M
  • 基于 TCP/IP 協(xié)議棧
  • 實時的 IoT 通訊的標準協(xié)議

二.Mosquitto

1.簡介

???????Mosquitto是一款實現(xiàn)了消息推送協(xié)議 MQTT v3.1 的開源消息代理軟件,提供輕量級的暴心,支持可發(fā)布/可訂閱的的消息推送模式妓盲,使設備對設備之間的短消息通信變得簡單。

2.Broker

???????我們知道专普,網(wǎng)絡間進行通信需要有Server和Client悯衬,在Mqtt中Broker扮演了Server的角色,基于mosquitto源碼通過NDK進行編譯生成android系統(tǒng)端可執(zhí)行的bin文件檀夹,通過mosquitto -c mosquitto.conf來啟動Broker筋粗;

3.版本和名稱

???????Mosquitto會支持不同的協(xié)議版本號和名稱,通過PROTOCOL_NAME和PROTOCOL_VERSION來進行區(qū)分炸渡,比如本文用到的mosquitto源碼版本支持MQTTV3.1和MQTTV3.1.1娜亿,MQTTV3.1對應的協(xié)議name為"MQIsdp";
MQTTV3.1.1對應的協(xié)議name為"MQTT"蚌堵;版本和name必須匹配买决。

三.Client端實現(xiàn)

???????Client端實現(xiàn)主要分為三部分:Client端的創(chuàng)建沛婴;Client端連接Client端消息注冊督赤;

1.Client端創(chuàng)建

??????在創(chuàng)建client時嘁灯,需要初始化一些指定的參數(shù),通過這些參數(shù)來處理與broker端的交互躲舌,包括連接丑婿,心跳,斷開重連及設置狀態(tài)回調(diào)等没卸。

  //用來存儲Qos=1和2的消息
  MemoryPersistence dataStore = new MemoryPersistence();
  //保存著一些控制客戶端如何連接到服務器的選項
  MqttConnectOptions mConOpt = new MqttConnectOptions();
  //set mqtt version
  mConOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
  /**
  * set cleanSession
  * false:broker will save connection record for client
  * true:As a new client to connect broker every time[每次連接上都是一個新的客戶端]
  */
  mConOpt.setCleanSession(true);
  // set heartbeat 30s[30S去檢測一下broker是否有效,如果無效羹奉,會回調(diào)connectionLost]
  mConOpt.setKeepAliveInterval(30);
  // set username
  if (userName != null) {
      mConOpt.setUserName(userName);
  }
  // set password
  if (password != null) {
      mConOpt.setPassword(password.toCharArray());
  }
  //when disconnect unexpectly, broker will send "close" to clients which subscribe this topic to announce the connection is lost
  mConOpt.setWill(topic, "close".getBytes(), 2, true);
  //client reconnect to broker automatically[與broker斷開后會去重連]
  mConOpt.setAutomaticReconnect(true);
  // create Mqtt client
  if (sClient == null) {
      sClient = new MqttClient(brokerUrl, clientId, dataStore);
      // set callback[狀態(tài)回調(diào)]
      mCallback = new MqttCallbackBus(sInstance);
      sClient.setCallback(mCallback);
  }

2.Client端連接Broker

??????上一步創(chuàng)建了Client,并且初始化了各種參數(shù)办悟,接下來調(diào)用connect進行連接尘奏,本文創(chuàng)建的是異步Client,設置了連接狀態(tài)的回調(diào)IMqttActionListener病蛉,連接成功后可以進行topic的訂閱炫加,失敗后可以進行重連。

// connect to broker
sClient.connect(mConOpt);

//異步的client铺然,同步連接沒有狀態(tài)回調(diào)
mClient = new MqttAsyncClient(brokerUrl, clientId, dataStore);
mClient.connect(mConOpt, null, mIMqttActionListener);
//連接狀態(tài)回調(diào)
private IMqttActionListener mIMqttActionListener = new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
        try {
            Log.i(TAG, "connect success");
            ......
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        try {
            Log.e(TAG, "connect failure, reconnect");
            ......
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
};

3.狀態(tài)回調(diào)

??????在第一步進行client創(chuàng)建時傳入了MqttCallback俗孝,在與broker斷開連接、新消息到達魄健、消息發(fā)送完成后赋铝,通過該MqttCallback會收到對應的回調(diào),具體如下:

public class MqttCallbackBus implements MqttCallback {
    private static final String TAG = MqttCallbackBus.class.getSimpleName();
    private MqttManager mMqttManager;
    public MqttCallbackBus(MqttManager mqttManager) {
        mMqttManager = mqttManager;
    }
    @Override
    public void connectionLost(Throwable cause) {
        Log.e(TAG, "cause : " + cause.toString());
        //與broker斷開后回調(diào)沽瘦,[雖然上邊屬性中設置了自動重連革骨,但是連上后不會去訂閱topic,即使連上也接收不到topic析恋,因此選擇在此手動連接良哲,然后在連接成功后訂閱topic]
        mMqttManager.reconnectBroker();
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        Log.e(TAG, "topic : " + topic + "\t MqttMessage : " + message.toString());
        //訂閱的消息接收到后回調(diào)
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.e(TAG, "token : " + token.toString());
        //消息publish完成后回調(diào)
    }
}

四.Eclipse paho源碼分析

???????Client端是基于Eclipse Paho提供的mqtt開源庫進行實現(xiàn),接下來對Eclipse Paho源碼進行分析:

1.MqttConnectOptions.java

//設置是否重連
public void setAutomaticReconnect(boolean automaticReconnect) {
    this.automaticReconnect = automaticReconnect;
}
//獲取是否設置了重連標志助隧,確定后續(xù)是否進行重連
public boolean isAutomaticReconnect() {
    return automaticReconnect;
}

2.MqttAsyncClient.java

???????創(chuàng)建mqtt async client筑凫,包括一些實例初始化等,程序最重要的入口類并村。

2.1.構(gòu)造方法

public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
    ....
    ....
    MqttConnectOptions.validateURI(serverURI);

    this.serverURI = serverURI;
    this.clientId = clientId;

    this.persistence = persistence;
    if (this.persistence == null) {
    }
    this.persistence.open(clientId, serverURI);
    //創(chuàng)建了ClientComms巍实,最終去跟broker建立連接
    this.comms = new ClientComms(this, this.persistence, pingSender);
    this.persistence.close();
    this.topics = new Hashtable();
}

???????在構(gòu)造方法內(nèi),進行了一些變量賦值哩牍,然后創(chuàng)建ClientComms實例棚潦,該實例用來跟broker建立連接,后面會進行分析姐叁;

2.2.connect()

???????在創(chuàng)建完實例后瓦盛,調(diào)用connect()去跟broker建立連接洗显,看一下connect()方法的具體實現(xiàn):

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
            throws MqttException, MqttSecurityException {
    ....
    this.connOpts = options;
    this.userContext = userContext;
    final boolean automaticReconnect = options.isAutomaticReconnect();
    ....
    comms.setNetworkModules(createNetworkModules(serverURI, options));
    comms.setReconnectCallback(new MqttCallbackExtended() {
        public void messageArrived(String topic, MqttMessage message) throws Exception {
        }
        public void deliveryComplete(IMqttDeliveryToken token) {
        }
        public void connectComplete(boolean reconnect, String serverURI) {
        }
        public void connectionLost(Throwable cause) {
            if(automaticReconnect){
               // Automatic reconnect is set so make sure comms is in resting state
               comms.setRestingState(true);
               reconnecting = true;
               //設置了重連外潜,在收到connectionLost后原环,進行重連
               startReconnectCycle();
             }
         }
    });

    // Insert our own callback to iterate through the URIs till the connect succeeds
    MqttToken userToken = new MqttToken(getClientId());
    ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
    userToken.setActionCallback(connectActionListener);
    userToken.setUserContext(this);

    ......
    comms.setNetworkModuleIndex(0);
    connectActionListener.connect();
}

???????在connect()內(nèi)部主要做了以下幾件事:
???????1.通過createNetworkModules()創(chuàng)建NetworkModule,包含serverURl处窥,最終創(chuàng)建的URI_TYPE_TCP嘱吗,對應的是TCPNetworkModule;
???????2.調(diào)用setReconnectCallback()來設置重連滔驾,狀態(tài)斷開時會進行自動重連谒麦;
???????3.創(chuàng)建ConnectActionListener對象,傳入了comms哆致、callback等參數(shù)绕德,連接狀態(tài)onSuccess()和onFailure()是在ConnectActionListener里面進行回調(diào)的;
???????4.執(zhí)行ConnectActionListener的connect()進行連接摊阀;

2.3.startReconnectCycle()

???????前面講到耻蛇,如果設置了automaticReconnect,則在異常斷開后會調(diào)用startReconnectCycle()進行重連:

//1.重連循環(huán)
private void startReconnectCycle() {
    ....
    reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
    reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}

//2.重連task
private class ReconnectTask extends TimerTask {
    private static final String methodName = "ReconnectTask.run";
    public void run() {
        attemptReconnect();
    }
}

//3.重連入口
private void attemptReconnect(){
    ....
    try {
        //連接
        connect(this.connOpts, this.userContext,new IMqttActionListener() {
            public void onSuccess(IMqttToken asyncActionToken) {
                ....
                comms.setRestingState(false);
                //重連成功胞此,結(jié)束重連循環(huán)
                stopReconnectCycle();
            }

            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                ....
                //繼續(xù)重連臣咖,下一次重連時間是上一次的兩倍,最高是128s
                if(reconnectDelay < 128000){
                   reconnectDelay = reconnectDelay * 2;
                }
                rescheduleReconnectCycle(reconnectDelay);
             }
         });
    ....
}

//設置Mqttcallback
public void setCallback(MqttCallback callback) {
     this.mqttCallback = callback;
     //將MqttCallbackBus回調(diào)設置給ClientComms漱牵,后續(xù)的回調(diào)供client使用夺蛇,此處主要用到onConnectionLost()
     comms.setCallback(callback);
}

3.ConnectActionListener.java

???????通過以上可以看到,connect()方法中酣胀,最終調(diào)用的是connectActionListener的connect()方法刁赦,一起看一下該方法的具體實現(xiàn):

3.1.connect()

public void connect() throws MqttPersistenceException {
    //創(chuàng)建MqttToken
    MqttToken token = new MqttToken(client.getClientId());
    //設置callback,由于connectActionListener實現(xiàn)了IMqttActionListener闻镶,即把自己注冊進去
    token.setActionCallback(this);
    token.setUserContext(this);

    ......

    try {
      //調(diào)用comms的connect甚脉,comms是在創(chuàng)建client里面創(chuàng)建的,在connect時傳入connectActionListener里面
      comms.connect(options, token);
    } catch (MqttException e) {
      onFailure(token, e);
    }
  }

3.2.onSuccess()和onFailure()

public void onSuccess(IMqttToken token) {
    ....
    if (userCallback != null) {
      //回調(diào)傳入的IActionListener回調(diào)儒溉,該userCallback是在AsyncClient.connect()是傳入的IMqttActionListener
      userCallback.onSuccess(userToken);
    }
    ....  
  }

public void onFailure(IMqttToken token, Throwable exception) {
     ....
     if (userCallback != null) {
       //回調(diào)傳入的IActionListener回調(diào)宦焦,該userCallback是在AsyncClient.connect()是傳入的IMqttActionListener
        userCallback.onFailure(userToken, exception);
      }
    ....
    }
}

4.ClientComms.java

???????該類也是一個非常重要的類,主要創(chuàng)建了三個線程和ClientState實例顿涣,構(gòu)造方法中創(chuàng)建了CommsCallback線程和ClientState實例波闹,接著上步會調(diào)用到connect()方法,看一下該方法的實現(xiàn)邏輯:

4.1.connect()

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
    final String methodName = "connect";
    synchronized (conLock) {
        if (isDisconnected() && !closePending) {
            //設置狀態(tài)為連接中
            conState = CONNECTING;
            conOptions = options;
            //創(chuàng)建MqttConnect涛碑,表示與broker連接的message
            MqttConnect connect = new MqttConnect(client.getClientId(),
                        conOptions.getMqttVersion(),
                        conOptions.isCleanSession(),
                        conOptions.getKeepAliveInterval(),
                        conOptions.getUserName(),
                        conOptions.getPassword(),
                        conOptions.getWillMessage(),
                        conOptions.getWillDestination());
            ......
            
            tokenStore.open();
            ConnectBG conbg = new ConnectBG(this, token, connect);
            conbg.start();
        }
    }
}

???????在connect()內(nèi)部創(chuàng)建了MqttConnect精堕,表示是連接message,然后創(chuàng)建ConnectBG實例并執(zhí)行start()蒲障;

4.2.ConnectBG

private class ConnectBG implements Runnable {
        ClientComms     clientComms = null;
        Thread          cBg = null;
        MqttToken       conToken;
        MqttConnect     conPacket;

        ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) {
            clientComms = cc;
            conToken    = cToken;
            conPacket   = cPacket;
            cBg = new Thread(this, "MQTT Con: "+getClient().getClientId());
        }

        void start() {
            cBg.start();
        }

        public void run() {
            ......
            try {
                ........
                NetworkModule networkModule = networkModules[networkModuleIndex];
                networkModule.start();
                receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
                receiver.start("MQTT Rec: "+getClient().getClientId());
                sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
                sender.start("MQTT Snd: "+getClient().getClientId());
                //CommsCallback本身是一個線程歹篓,啟動
                callback.start("MQTT Call: "+getClient().getClientId());                
                internalSend(conPacket, conToken);
            }
            .......

            if (mqttEx != null) {
                 //不為空瘫证,說明進入了catch,則shut down
                shutdownConnection(conToken, mqttEx);
            }
        }
}

//接著AsyncClient.setCallback庄撮,會調(diào)用ClientComms設置MqttCallbackBus回調(diào)
public void setCallback(MqttCallback mqttCallback) {
    this.callback.setCallback(mqttCallback);
}

???????從上面的代碼可以看到背捌,在執(zhí)行connect()方法后,主要做了以下幾項工作:
??????1.networkModule.start():創(chuàng)建socket洞斯,與broker建立連接毡庆;

//TcpNetworkModule.java
public void start() throws IOException, MqttException 
    try {
        ......
        SocketAddress sockaddr = new InetSocketAddress(host, port);
        socket = factory.createSocket();
        socket.connect(sockaddr, conTimeout*1000);
        ......
    }
    .......
}

???????2.創(chuàng)建CommsReceiver()然后start(),不斷循環(huán)讀取Broker端來的消息烙如;

//CommsReceiver.java
public void run() {
    ......
    while (running && (in != null)) {
        try {
            receiving = in.available() > 0;
            MqttWireMessage message = in.readMqttWireMessage();
            receiving = false;
                
            if (message instanceof MqttAck) {
                token = tokenStore.getToken(message);
                if (token!=null) {
                    synchronized (token) {
                        clientState.notifyReceivedAck((MqttAck)message);
                    }
                }
            } else {
                // A new message has arrived
                clientState.notifyReceivedMsg(message);
            }
        }
        ........
    ........
}

???????3.創(chuàng)建CommsSender實例么抗,然后start(),主要用來向Broker發(fā)送消息亚铁;

//CommsSender.java 
public void run() {
    ......
    while (running && (out != null)) {
        try {
            message = clientState.get();
            if (message != null) {
                if (message instanceof MqttAck) {
                    out.write(message);
                    out.flush();
                } else {
                    MqttToken token = tokenStore.getToken(message);
                    if (token != null) {
                        synchronized (token) {
                            out.write(message);
                            try {
                                out.flush();
                            } ......
                            clientState.notifySent(message);
                        }
                    }
                }
            }
        }
    }
    ......
}

??????4.CommsCallback.start():通過該類來實現(xiàn)broker返回消息的回調(diào)處理入口蝇刀,后面會講到。
??????5.internalSend(conPacket, conToken):發(fā)送連接action

5.ClientState.java

??????client在進行消息publish時徘溢,先經(jīng)過ClientComms吞琐,最終會調(diào)用到ClientState里面的send()方法,看一下send()方法的實現(xiàn)邏輯:

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "send";
        ......
        //message是publish型的message
        if (message instanceof MqttPublish) {
            synchronized (queueLock) {
                ......
                //獲取到要發(fā)送的Message
                MqttMessage innerMessage = ((MqttPublish) message).getMessage();
               //獲取到要發(fā)送Message的Qos
                switch(innerMessage.getQos()) {
                    case 2:
                        outboundQoS2.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                    case 1:
                        outboundQoS1.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                }
                tokenStore.saveToken(token, message);
                //加入pendingMessages甸昏,CommsSender通過get()方法獲取到message后就會進行發(fā)送
                pendingMessages.addElement(message);
                queueLock.notifyAll();
            }
        } else {
            //其他類型的message
            if (message instanceof MqttConnect) {
                synchronized (queueLock) {
                    // Add the connect action at the head of the pending queue ensuring it jumps
                    // ahead of any of other pending actions.
                    tokenStore.saveToken(token, message);
                    pendingFlows.insertElementAt(message,0);
                    queueLock.notifyAll();
                }
            } else {
                if (message instanceof MqttPingReq) {
                    this.pingCommand = message;
                }
                else if (message instanceof MqttPubRel) {
                    outboundQoS2.put(new Integer(message.getMessageId()), message);
                    persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
                }
                else if (message instanceof MqttPubComp)  {
                    persistence.remove(getReceivedPersistenceKey(message));
                }
                
                synchronized (queueLock) {
                    if ( !(message instanceof MqttAck )) {
                        tokenStore.saveToken(token, message);
                    }
                    pendingFlows.addElement(message);
                    queueLock.notifyAll();
                }
            }
        }
    }

    //連接成功
    public void connected() {
        final String methodName = "connected";
        //@TRACE 631=connected
        log.fine(CLASS_NAME, methodName, "631");
        this.connected = true;
        //啟動pingSender顽分,來在keepAliveInterval內(nèi)發(fā)送心跳包
        pingSender.start(); //Start ping thread when client connected to server.
    }

??????在CommsReceiver收到broker的ack及普通消息后,會先經(jīng)過clientState施蜜,具體會調(diào)用以下兩個方法:
??????notifyReceivedAck():連接成功ack卒蘸、qos=1和2時publish消息后的ack、心跳相關ack等都會通過該方法調(diào)用CommsCallback的方法翻默。
??????notifyReceivedMsg():正常的publish消息等缸沃。
??????pingSender.start():表示client在連上broker后會在aliveInterval后發(fā)送心跳包,pingSender是在創(chuàng)建client時就創(chuàng)建了TimerPingSender實例修械,一步步先傳給clientComms趾牧,再傳給ClientState,執(zhí)行start()來創(chuàng)建Timer肯污,執(zhí)行TimerTask來最終clientState的checkForActivity()翘单,將PingReq加入pendingFlows后,queueLock.notifyAll()蹦渣,調(diào)用CommsSender來進行發(fā)送哄芜,然后加入下一輪執(zhí)行。如果正常的話柬唯,會收到PingResp认臊,修改lastInboundActivity和pingOutstanding的值來在下一輪執(zhí)行check時來判斷是否收到心跳,異常就拋出REASON_CODE_CLIENT_TIMEOUT(32000)碼锄奢。

6.CommsCallback.java

??????通過該類來實現(xiàn)broker返回消息的回調(diào)處理入口失晴,包括處理斷開剧腻、消息發(fā)送成功,消息到達涂屁、連接狀態(tài)回調(diào)等

    public void start(String threadName) {
        synchronized (lifecycle) {
            if (!running) {
                // Preparatory work before starting the background thread.
                // For safety ensure any old events are cleared.
                messageQueue.clear();
                completeQueue.clear();

                running = true;
                quiescing = false;
                callbackThread = new Thread(this, threadName);
                callbackThread.start();
            }
        }
    }

    public void run() {
        final String methodName = "run";
        while (running) {
            try {
                //沒有work時书在,wait(),當有message來時胯陋,通過workAvailable.notifyAll()來喚醒
                try {
                    synchronized (workAvailable) {
                        if (running && messageQueue.isEmpty()
                                && completeQueue.isEmpty()) {
                            workAvailable.wait();
                        }
                    }
                } catch (InterruptedException e) {
                }

                if (running) {
                    // Check for deliveryComplete callbacks...
                    MqttToken token = null;
                    synchronized (completeQueue) {
                        if (!completeQueue.isEmpty()) {
                            // First call the delivery arrived callback if needed
                            token = (MqttToken) completeQueue.elementAt(0);
                            completeQueue.removeElementAt(0);
                        }
                    }
                    if (null != token) {
                        handleActionComplete(token);
                    }
                    
                    // Check for messageArrived callbacks...
                    MqttPublish message = null;
                    synchronized (messageQueue) {
                        if (!messageQueue.isEmpty()) {
                            // Note, there is a window on connect where a publish
                            // could arrive before we've
                            // finished the connect logic.
                            message = (MqttPublish) messageQueue.elementAt(0);

                            messageQueue.removeElementAt(0);
                        }
                    }
                    .......
            } finally {
                synchronized (spaceAvailable) {
                    spaceAvailable.notifyAll();
                }
            }
        }
    }

//連接回調(diào)方法入口蕊温,通過handleActionComplete()來調(diào)用
public void fireActionEvent(MqttToken token) {
        final String methodName = "fireActionEvent";
        if (token != null) {
           //此處的asyncCB就是在MqttAsyncClient.connect()里面?zhèn)魅氲腃onnectActionListener,參考如下:
           //ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
           //userToken.setActionCallback(connectActionListener);
            IMqttActionListener asyncCB = token.getActionCallback();
            if (asyncCB != null) {
                if (token.getException() == null) {
                    //回調(diào)ConnectActionListener的onSuccess的方法
                    asyncCB.onSuccess(token);
                } else {
                    //回調(diào)ConnectActionListener的onFailure的方法
                    asyncCB.onFailure(token, token.getException());
                }
            }
        }
}

//設置MqttCallbackBus回調(diào)
public void setCallback(MqttCallback mqttCallback) {
    this.mqttCallback = mqttCallback;
}

//斷開回調(diào)MqttCallbackBus接口
public void connectionLost(MqttException cause) {
    try {
       if (mqttCallback != null && cause != null) {
            //回調(diào)MqttCallbackBus的connectionLost接口
            mqttCallback.connectionLost(cause);
      ....
       }
       ....
     } 
    ....
}

7.客戶端連接流程圖

??????總結(jié)一下client端連接broker的流程圖袱箱,黑色的線代表client端從創(chuàng)建到跟broker進行connect()的調(diào)用流程遏乔,綠色的線代表從broker收到回復消息后的調(diào)用流程。


image.png

8.客戶端發(fā)送及訂閱接收消息流程圖

image.png

五.qos值及其含義

1.至多一次

???????消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡发笔。會發(fā)生消息丟失或重復盟萨。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù)了讨,丟失一次讀記錄無所謂捻激,因為不久后還會有第二次發(fā)送。


Qos0.png

2.至少一次

???????確保消息到達前计,但消息可能會重復發(fā)生胞谭。


Qos1.png

3.只有一次

???????確保消息到達一次。這一級別可用于如下情況男杈,在計費系統(tǒng)中丈屹,消息重復或丟失會導致不正確的結(jié)果。


Qos2.png

4.源碼分析

???????從Broker來的ack消息是通過CommsReceiver來接收的伶棒,接收后會調(diào)用ClientState的notifyReceiveAck()方法旺垒,結(jié)合上面的圖及代碼一起看一下:

protected void notifyReceivedAck(MqttAck ack) throws MqttException {
        final String methodName = "notifyReceivedAck";
        this.lastInboundActivity = System.currentTimeMillis();

        MqttToken token = tokenStore.getToken(ack);
        MqttException mex = null;

        if (token == null) {
        } else if (ack instanceof MqttPubRec) {
            MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
            //收到MqttPubRec后,創(chuàng)建MqttPubRel進行回復
            this.send(rel, token);
        } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
            //qos = 1或2時肤无,收到MqttPubAck或MqttPubComp來通知deliveryComplete回調(diào)先蒋,及刪除message
            notifyResult(ack, token, mex);
        } else if (ack instanceof MqttPingResp) {
            synchronized (pingOutstandingLock) {
                pingOutstanding = Math.max(0,  pingOutstanding-1);
                notifyResult(ack, token, mex);
                if (pingOutstanding == 0) {
                    tokenStore.removeToken(ack);
                }
            }
            //@TRACE 636=ping response received. pingOutstanding: {0}                                                                                                                                                     
            log.fine(CLASS_NAME,methodName,"636",new Object[]{ new Integer(pingOutstanding)});
        } else if (ack instanceof MqttConnack) {
            ......
            //連接成功的回調(diào)
            ......
        } else {
            ......
        }
        
        checkQuiesceLock();
    }

???????Subscriber在收到message后,會執(zhí)行到ClientState.notifyReceivedMsg()方法宛渐,該方法會根據(jù)qos的值來做相應的處理竞漾,qos=0或1,直接會調(diào)用messageArrived窥翩,然后刪除persistence业岁,send(PubAck);
???????qos=2時鳍烁,先存儲叨襟,然后send(PubRec),接下來會收到broker的PubRel幔荒,如果能找到msg糊闽,則會調(diào)用messageArrived梳玫,然后send(PubComp),刪除persistence右犹;如果再次收到PubRel提澎,找不到msg,則直接send(PubComp)念链,確保只執(zhí)行一次messageArrived盼忌。

//ClientState.java
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
        final String methodName = "notifyReceivedMsg";
        this.lastInboundActivity = System.currentTimeMillis();

        // @TRACE 651=received key={0} message={1}
        log.fine(CLASS_NAME, methodName, "651", new Object[] {
                new Integer(message.getMessageId()), message });
        
        if (!quiescing) {
            if (message instanceof MqttPublish) {
                MqttPublish send = (MqttPublish) message;
                switch (send.getMessage().getQos()) {
                case 0:
                case 1:
                    //Qos=1或0,直接執(zhí)行
                    if (callback != null) {
                        callback.messageArrived(send);
                    }
                    break;
                case 2:
            //Qos=2掂墓,先存儲谦纱,然后發(fā)送PubRec
                    persistence.put(getReceivedPersistenceKey(message),
                            (MqttPublish) message);
                    inboundQoS2.put(new Integer(send.getMessageId()), send);
                    this.send(new MqttPubRec(send), null);
                    break;

                default:
                    //should NOT reach here
                }
            } else if (message instanceof MqttPubRel) {
                //收到PubRel后,先從inboundQoS2找msg
                MqttPublish sendMsg = (MqttPublish) inboundQoS2
                        .get(new Integer(message.getMessageId()));
              //找到msg君编,表示還未執(zhí)行messageArrived跨嘉,先執(zhí)行
                if (sendMsg != null) {
                    if (callback != null) {
                        callback.messageArrived(sendMsg);
                    }
                } else {
              //找不到說明已經(jīng)執(zhí)行了messageArrived,直接發(fā)送PubComp
                    // Original publish has already been delivered.
                    MqttPubComp pubComp = new MqttPubComp(message
                            .getMessageId());
                    this.send(pubComp, null);
                }
            }
        }
    }

??????clientState的notifyResult()方法吃嘿,經(jīng)過一系列調(diào)用祠乃,最終會通過CommsCallback中的workAvailable.notifyAll()---->run()---->handleActionComplete()方法,看一下該方法的實現(xiàn):

private void handleActionComplete(MqttToken token)
            throws MqttException {
        final String methodName = "handleActionComplete";
        synchronized (token) {
            if (token.isComplete()) {
                //刪除message
                clientState.notifyComplete(token);
            }
            
            // Unblock any waiters and if pending complete now set completed
            token.internalTok.notifyComplete();
            
            if (!token.internalTok.isNotified()) {
                // If a callback is registered and delivery has finished 
                // call delivery complete callback. 
                if ( mqttCallback != null 
                    && token instanceof MqttDeliveryToken 
                    && token.isComplete()) {
                        //回調(diào)deliveryComplete()方法
                        mqttCallback.deliveryComplete((MqttDeliveryToken) token);
                }
             //內(nèi)部邏輯只有在connect()時才會調(diào)用兑燥,publish()時亮瓷,Listener為null,不會執(zhí)行
                fireActionEvent(token);
            }
            
            // Set notified so we don't tell the user again about this action.
            if ( token.isComplete() ){
               if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
                    token.internalTok.setNotified(true);
                }
            }
        }
    }

??????以下是在收到MqttPubAck或MqttPubComp后降瞳,會將存儲的persistence刪除掉嘱支。

//ClientState.java
protected void notifyComplete(MqttToken token) throws MqttException {
        
        final String methodName = "notifyComplete";

        MqttWireMessage message = token.internalTok.getWireMessage();

        if (message != null && message instanceof MqttAck) {
            
            // @TRACE 629=received key={0} token={1} message={2}
            log.fine(CLASS_NAME, methodName, "629", new Object[] {
                     new Integer(message.getMessageId()), token, message });

            MqttAck ack = (MqttAck) message;

            if (ack instanceof MqttPubAck) {
                
                // QoS 1 - user notified now remove from persistence...
                persistence.remove(getSendPersistenceKey(message));
                outboundQoS1.remove(new Integer(ack.getMessageId()));
                decrementInFlight();
                releaseMessageId(message.getMessageId());
                tokenStore.removeToken(message);
                // @TRACE 650=removed Qos 1 publish. key={0}
                log.fine(CLASS_NAME, methodName, "650",
                        new Object[] { new Integer(ack.getMessageId()) });
            } else if (ack instanceof MqttPubComp) {
                // QoS 2 - user notified now remove from persistence...
                persistence.remove(getSendPersistenceKey(message));
                persistence.remove(getSendConfirmPersistenceKey(message));
                outboundQoS2.remove(new Integer(ack.getMessageId()));

                inFlightPubRels--;
                decrementInFlight();
                releaseMessageId(message.getMessageId());
                tokenStore.removeToken(message);

                // @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1}
                log.fine(CLASS_NAME, methodName, "645", new Object[] {
                        new Integer(ack.getMessageId()),
                        new Integer(inFlightPubRels) });
            }

            checkQuiesceLock();
        }
    }

??????以上邏輯主要是發(fā)送或接收Qos=1和Qos=2的message后,publisher與Broker力崇、Broker與Subscriber之間的交互流程斗塘。
??????看一下總的流程圖:


image.png

5.總結(jié)

??????Qos0:消息不存persistence,publish后直接通過notifySent()后來complete亮靴;
??????Qos1:publisher:消息存persistence馍盟,publish后收到PubAck后來進行complete及persistence.remove();
??????????????????subscriber:notifyReceivedMsg后茧吊,先deliverMessage()贞岭,然后send(PubAck);
??????Qos2:publisher:消息存persistence搓侄,publish后會收到PubRec瞄桨,然后發(fā)送PubRel,再收到PubComp后進行complete及persistence.remove()讶踪;
??????????????????subscriber:notifyReceivedMsg后芯侥,先persistence.put(),inboundQos2.put(),然后send(PubRec)到Broker柱查,收到來自Broker的PubRel后廓俭,再次notifyReceivedMsg,執(zhí)行deliverMessage()唉工,send(PubComp)研乒,刪除persistence。如果再次收到PubRel淋硝,不會進行deliverMessage()雹熬,直接send(PubComp)。

六.心跳機制

??????1.Keep Alive指定連接最大空閑時間T谣膳,當客戶端檢測到連接空閑時間超過T時竿报,必須向Broker發(fā)送心跳報文PINGREQ,Broker收到心跳請求后返回心跳響應PINGRESP参歹。
??????2.若Broker超過1.5T時間沒收到心跳請求則斷開連接仰楚,并且投遞遺囑消息到訂閱方;同樣犬庇,若客戶端超過一定時間仍沒收到Broker心跳響應PINGRESP則斷開連接。
??????3.連接空閑時發(fā)送心跳報文可以降低網(wǎng)絡請求侨嘀,弱化對帶寬的依賴臭挽。

七.保留消息定義[retained]

??????如果Publish消息的retained標記位被設置為1,則稱該消息為“保留消息”咬腕;
??????Broker對保留消息的處理如下
??????Broker會存儲每個Topic的最后一條保留消息及其Qos欢峰,當訂閱該Topic的客戶端上線后,Broker需要將該消息投遞給它涨共。
??????保留消息作用
??????可以讓新訂閱的客戶端得到發(fā)布方的最新的狀態(tài)值纽帖,而不必要等待發(fā)送。
??????保留消息的刪除
??????方式1:發(fā)送空消息體的保留消息举反;
??????方式2:發(fā)送最新的保留消息覆蓋之前的(推薦)懊直;

八.完全實現(xiàn)解耦

??????MQTT這種結(jié)構(gòu)替代了傳統(tǒng)的客戶端/服務器模型,可以實現(xiàn)以下解耦:
??????空間解耦:發(fā)布者和訂閱者不需要知道對方火鼻;
??????時間解耦:發(fā)布者和訂閱者不需要同時運行(離線消息, retained = 1的話室囊,可以實現(xiàn));
??????同步解耦:發(fā)布和接收都是異步通訊魁索,無需停止任何處理融撞;

九.與HTTP比較:

??????MQTT最長可以一次性發(fā)送256MB數(shù)據(jù);
??????HTTP是典型的C/S通訊模式:請求從客戶端發(fā)出粗蔚,服務端只能被動接收尝偎,一條連接只能發(fā)送一次請求,獲取響應后就斷開連接鹏控;
??????HTTP的請求/應答方式的會話都是客戶端發(fā)起的致扯,缺乏服務器通知客戶端的機制,客戶端應用需要不斷地輪詢服務器趁窃;

十.mosquitto.conf

??????可以通過以下輸出mosquitto日志:

log_dest file /storage/emulated/0/mosquitto.log //Android系統(tǒng)輸出到文件
log_dest stdout // linux系統(tǒng)直接輸出到工作臺
log_type all

十一.mqtt本地調(diào)試

1.啟動broker,mosquitto – 代理器主程序

./mosquitto &

??????其中:broker ip 為電腦端ip急前;port:默認1883醒陆;

2.mosquitto_pub – 用于發(fā)布消息的命令行客戶端,向已訂閱的topic發(fā)布消息

./mosquitto_pub -h host -p port -t topic -m message

??????其中:-h:broker ip裆针;-p:端口號刨摩,默認1883;-t:已訂閱的topic世吨;-m:發(fā)布的消息
??????舉例

./mosquitto_pub -h 10.10.20.10 -p 1883 -t baiduMap -m "{"origin": "西直門","end":"東直門"}"

3.mosquitto_sub – 用于訂閱消息的命令行客戶端澡刹,訂閱topic

./mosquitto_sub -h host -p port -t topic

??????其中:-h:broker ip;-p:端口號耘婚,默認1883罢浇;-t:需要訂閱的topic

4.運行環(huán)境

??????ubuntu系統(tǒng),將libmosquitto.so.1放入系統(tǒng)變量中:

export LD_LIBRARY_PATH= 附件libmosquitto.so.1路徑:$PATH

5.查看連接broker的客戶端

??????broker默認的端口號1883沐祷,可以通過端口號來查看已連接的客戶端

netstat -an | grep :1883
tcp        0      0 0.0.0.0:1883            0.0.0.0:*               LISTEN     
tcp        0      0 127.0.0.1:1883             127.0.0.1:44618         ESTABLISHED
tcp        0      0 172.27.117.1:1883       172.27.117.1:45256      ESTABLISHED
tcp        0      0 172.27.117.1:1883       172.27.117.2:49612      ESTABLISHED
tcp        0      0 172.27.117.1:1883       172.27.117.2:49508      ESTABLISHED

??????可以看到連接broker的客戶端為4個嚷闭;

??????以上分別介紹了MQTT協(xié)議以及服務端mosquitto、客戶端Eclipse paho的使用及源碼介紹赖临!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末胞锰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子兢榨,更是在濱河造成了極大的恐慌嗅榕,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吵聪,死亡現(xiàn)場離奇詭異凌那,居然都是意外死亡,警方通過查閱死者的電腦和手機吟逝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門帽蝶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人澎办,你說我怎么就攤上這事嘲碱。” “怎么了局蚀?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵麦锯,是天一觀的道長。 經(jīng)常有香客問我琅绅,道長扶欣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮料祠,結(jié)果婚禮上骆捧,老公的妹妹穿的比我還像新娘。我一直安慰自己髓绽,他們只是感情好敛苇,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著顺呕,像睡著了一般枫攀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上株茶,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天来涨,我揣著相機與錄音,去河邊找鬼启盛。 笑死蹦掐,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的僵闯。 我是一名探鬼主播卧抗,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棍厂!你這毒婦竟也來了颗味?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤牺弹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后时呀,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體张漂,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年谨娜,在試婚紗的時候發(fā)現(xiàn)自己被綠了航攒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡趴梢,死狀恐怖漠畜,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情坞靶,我是刑警寧澤憔狞,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站彰阴,受9級特大地震影響瘾敢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一簇抵、第九天 我趴在偏房一處隱蔽的房頂上張望庆杜。 院中可真熱鬧,春花似錦碟摆、人聲如沸晃财。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽断盛。三九已至,卻和暖如春嘉裤,著一層夾襖步出監(jiān)牢的瞬間郑临,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工屑宠, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留厢洞,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓典奉,卻偏偏與公主長得像躺翻,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子卫玖,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 一:前言 最近在了解MQTT協(xié)議相關的內(nèi)容公你,內(nèi)容有點多,特此把MQTT協(xié)議假瞬,以及其從服務端到客戶端的流程整理出來...
    子夏的不語閱讀 69,895評論 9 92
  • 概述 今天來學習MQTT協(xié)議中關于connect部分陕靠,connect是很重要的部分,因為它是Client 與MQT...
    曾彪彪閱讀 4,014評論 0 2
  • MQTT總結(jié) 使用MQTT以訂閱消息的方式保持客戶端和服務端的通訊脱茉。MQTT 是IBM開發(fā)的一個即時通訊協(xié)議剪芥,有可...
    亮_ThomasXu閱讀 2,090評論 2 0
  • iOS開發(fā)中,關于MQTT的三方庫主要有兩種琴许。 基于C實現(xiàn)的Mosquitto庫税肪。當然直接去調(diào)用C的接口并不是特別...
    Noskthing閱讀 24,479評論 20 22
  • 隨著 5G 時代的來臨,萬物物聯(lián)的偉大構(gòu)想正在成為現(xiàn)實榜田。聯(lián)網(wǎng)的物聯(lián)網(wǎng)設備在 2018 年已經(jīng)達到了 70 億[1]...
    覺釋閱讀 312評論 0 2