一.MQTT
1.簡介
???????MQTT(Message Queuing Telemetry Transport 消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議证舟。它工作在 TCP/IP協(xié)議族上拣展,是為硬件性能低下的遠程設備以及網(wǎng)絡狀況糟糕的情況下而設計的發(fā)布/訂閱型消息協(xié)議傻铣,為此,它需要一個消息中間件 页响。
???????MQTT是IBM開發(fā)的一個基于客戶端-服務器的消息發(fā)布/訂閱傳輸協(xié)議。
???????MQTT協(xié)議是輕量段誊、簡單闰蚕、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛连舍。在很多情況下没陡,包括受限的環(huán)境中,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)索赏。其在盼玄,通過衛(wèi)星鏈路通信傳感器、偶爾撥號的醫(yī)療設備潜腻、智能家居强岸、及一些小型化設備中已廣泛使用。
2.特性
- 基于發(fā)布 / 訂閱范式的 “輕量級” 消息協(xié)議(頭部 2 字節(jié))
- 專為資源受限的設備砾赔、低帶寬占用高延時或者不可靠的網(wǎng)絡設計蝌箍,適用于 IoT 與 M2M
- 基于 TCP/IP 協(xié)議棧
- 實時的 IoT 通訊的標準協(xié)議
二.Mosquitto
1.簡介
???????Mosquitto是一款實現(xiàn)了消息推送協(xié)議 MQTT v3.1 的開源消息代理軟件,提供輕量級的暴心,支持可發(fā)布/可訂閱的的消息推送模式妓盲,使設備對設備之間的短消息通信變得簡單。
2.Broker
???????我們知道专普,網(wǎng)絡間進行通信需要有Server和Client悯衬,在Mqtt中Broker扮演了Server的角色,基于mosquitto源碼通過NDK進行編譯生成android系統(tǒng)端可執(zhí)行的bin文件檀夹,通過mosquitto -c mosquitto.conf來啟動Broker筋粗;
3.版本和名稱
???????Mosquitto會支持不同的協(xié)議版本號和名稱,通過PROTOCOL_NAME和PROTOCOL_VERSION來進行區(qū)分炸渡,比如本文用到的mosquitto源碼版本支持MQTTV3.1和MQTTV3.1.1娜亿,MQTTV3.1對應的協(xié)議name為"MQIsdp";
MQTTV3.1.1對應的協(xié)議name為"MQTT"蚌堵;版本和name必須匹配买决。
三.Client端實現(xiàn)
???????Client端實現(xiàn)主要分為三部分:Client端的創(chuàng)建沛婴;Client端連接;Client端消息注冊督赤;
1.Client端創(chuàng)建
??????在創(chuàng)建client時嘁灯,需要初始化一些指定的參數(shù),通過這些參數(shù)來處理與broker端的交互躲舌,包括連接丑婿,心跳,斷開重連及設置狀態(tài)回調(diào)等没卸。
//用來存儲Qos=1和2的消息
MemoryPersistence dataStore = new MemoryPersistence();
//保存著一些控制客戶端如何連接到服務器的選項
MqttConnectOptions mConOpt = new MqttConnectOptions();
//set mqtt version
mConOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
/**
* set cleanSession
* false:broker will save connection record for client
* true:As a new client to connect broker every time[每次連接上都是一個新的客戶端]
*/
mConOpt.setCleanSession(true);
// set heartbeat 30s[30S去檢測一下broker是否有效,如果無效羹奉,會回調(diào)connectionLost]
mConOpt.setKeepAliveInterval(30);
// set username
if (userName != null) {
mConOpt.setUserName(userName);
}
// set password
if (password != null) {
mConOpt.setPassword(password.toCharArray());
}
//when disconnect unexpectly, broker will send "close" to clients which subscribe this topic to announce the connection is lost
mConOpt.setWill(topic, "close".getBytes(), 2, true);
//client reconnect to broker automatically[與broker斷開后會去重連]
mConOpt.setAutomaticReconnect(true);
// create Mqtt client
if (sClient == null) {
sClient = new MqttClient(brokerUrl, clientId, dataStore);
// set callback[狀態(tài)回調(diào)]
mCallback = new MqttCallbackBus(sInstance);
sClient.setCallback(mCallback);
}
2.Client端連接Broker
??????上一步創(chuàng)建了Client,并且初始化了各種參數(shù)办悟,接下來調(diào)用connect進行連接尘奏,本文創(chuàng)建的是異步Client,設置了連接狀態(tài)的回調(diào)IMqttActionListener病蛉,連接成功后可以進行topic的訂閱炫加,失敗后可以進行重連。
// connect to broker
sClient.connect(mConOpt);
//異步的client铺然,同步連接沒有狀態(tài)回調(diào)
mClient = new MqttAsyncClient(brokerUrl, clientId, dataStore);
mClient.connect(mConOpt, null, mIMqttActionListener);
//連接狀態(tài)回調(diào)
private IMqttActionListener mIMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
try {
Log.i(TAG, "connect success");
......
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
try {
Log.e(TAG, "connect failure, reconnect");
......
} catch (Exception e) {
e.printStackTrace();
}
}
};
3.狀態(tài)回調(diào)
??????在第一步進行client創(chuàng)建時傳入了MqttCallback俗孝,在與broker斷開連接、新消息到達魄健、消息發(fā)送完成后赋铝,通過該MqttCallback會收到對應的回調(diào),具體如下:
public class MqttCallbackBus implements MqttCallback {
private static final String TAG = MqttCallbackBus.class.getSimpleName();
private MqttManager mMqttManager;
public MqttCallbackBus(MqttManager mqttManager) {
mMqttManager = mqttManager;
}
@Override
public void connectionLost(Throwable cause) {
Log.e(TAG, "cause : " + cause.toString());
//與broker斷開后回調(diào)沽瘦,[雖然上邊屬性中設置了自動重連革骨,但是連上后不會去訂閱topic,即使連上也接收不到topic析恋,因此選擇在此手動連接良哲,然后在連接成功后訂閱topic]
mMqttManager.reconnectBroker();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e(TAG, "topic : " + topic + "\t MqttMessage : " + message.toString());
//訂閱的消息接收到后回調(diào)
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.e(TAG, "token : " + token.toString());
//消息publish完成后回調(diào)
}
}
四.Eclipse paho源碼分析
???????Client端是基于Eclipse Paho提供的mqtt開源庫進行實現(xiàn),接下來對Eclipse Paho源碼進行分析:
1.MqttConnectOptions.java
//設置是否重連
public void setAutomaticReconnect(boolean automaticReconnect) {
this.automaticReconnect = automaticReconnect;
}
//獲取是否設置了重連標志助隧,確定后續(xù)是否進行重連
public boolean isAutomaticReconnect() {
return automaticReconnect;
}
2.MqttAsyncClient.java
???????創(chuàng)建mqtt async client筑凫,包括一些實例初始化等,程序最重要的入口類并村。
2.1.構(gòu)造方法
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
....
....
MqttConnectOptions.validateURI(serverURI);
this.serverURI = serverURI;
this.clientId = clientId;
this.persistence = persistence;
if (this.persistence == null) {
}
this.persistence.open(clientId, serverURI);
//創(chuàng)建了ClientComms巍实,最終去跟broker建立連接
this.comms = new ClientComms(this, this.persistence, pingSender);
this.persistence.close();
this.topics = new Hashtable();
}
???????在構(gòu)造方法內(nèi),進行了一些變量賦值哩牍,然后創(chuàng)建ClientComms實例棚潦,該實例用來跟broker建立連接,后面會進行分析姐叁;
2.2.connect()
???????在創(chuàng)建完實例后瓦盛,調(diào)用connect()去跟broker建立連接洗显,看一下connect()方法的具體實現(xiàn):
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
....
this.connOpts = options;
this.userContext = userContext;
final boolean automaticReconnect = options.isAutomaticReconnect();
....
comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttCallbackExtended() {
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectComplete(boolean reconnect, String serverURI) {
}
public void connectionLost(Throwable cause) {
if(automaticReconnect){
// Automatic reconnect is set so make sure comms is in resting state
comms.setRestingState(true);
reconnecting = true;
//設置了重連外潜,在收到connectionLost后原环,進行重連
startReconnectCycle();
}
}
});
// Insert our own callback to iterate through the URIs till the connect succeeds
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
......
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
}
???????在connect()內(nèi)部主要做了以下幾件事:
???????1.通過createNetworkModules()創(chuàng)建NetworkModule,包含serverURl处窥,最終創(chuàng)建的URI_TYPE_TCP嘱吗,對應的是TCPNetworkModule;
???????2.調(diào)用setReconnectCallback()來設置重連滔驾,狀態(tài)斷開時會進行自動重連谒麦;
???????3.創(chuàng)建ConnectActionListener對象,傳入了comms哆致、callback等參數(shù)绕德,連接狀態(tài)onSuccess()和onFailure()是在ConnectActionListener里面進行回調(diào)的;
???????4.執(zhí)行ConnectActionListener的connect()進行連接摊阀;
2.3.startReconnectCycle()
???????前面講到耻蛇,如果設置了automaticReconnect,則在異常斷開后會調(diào)用startReconnectCycle()進行重連:
//1.重連循環(huán)
private void startReconnectCycle() {
....
reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}
//2.重連task
private class ReconnectTask extends TimerTask {
private static final String methodName = "ReconnectTask.run";
public void run() {
attemptReconnect();
}
}
//3.重連入口
private void attemptReconnect(){
....
try {
//連接
connect(this.connOpts, this.userContext,new IMqttActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
....
comms.setRestingState(false);
//重連成功胞此,結(jié)束重連循環(huán)
stopReconnectCycle();
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
....
//繼續(xù)重連臣咖,下一次重連時間是上一次的兩倍,最高是128s
if(reconnectDelay < 128000){
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
});
....
}
//設置Mqttcallback
public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
//將MqttCallbackBus回調(diào)設置給ClientComms漱牵,后續(xù)的回調(diào)供client使用夺蛇,此處主要用到onConnectionLost()
comms.setCallback(callback);
}
3.ConnectActionListener.java
???????通過以上可以看到,connect()方法中酣胀,最終調(diào)用的是connectActionListener的connect()方法刁赦,一起看一下該方法的具體實現(xiàn):
3.1.connect()
public void connect() throws MqttPersistenceException {
//創(chuàng)建MqttToken
MqttToken token = new MqttToken(client.getClientId());
//設置callback,由于connectActionListener實現(xiàn)了IMqttActionListener闻镶,即把自己注冊進去
token.setActionCallback(this);
token.setUserContext(this);
......
try {
//調(diào)用comms的connect甚脉,comms是在創(chuàng)建client里面創(chuàng)建的,在connect時傳入connectActionListener里面
comms.connect(options, token);
} catch (MqttException e) {
onFailure(token, e);
}
}
3.2.onSuccess()和onFailure()
public void onSuccess(IMqttToken token) {
....
if (userCallback != null) {
//回調(diào)傳入的IActionListener回調(diào)儒溉,該userCallback是在AsyncClient.connect()是傳入的IMqttActionListener
userCallback.onSuccess(userToken);
}
....
}
public void onFailure(IMqttToken token, Throwable exception) {
....
if (userCallback != null) {
//回調(diào)傳入的IActionListener回調(diào)宦焦,該userCallback是在AsyncClient.connect()是傳入的IMqttActionListener
userCallback.onFailure(userToken, exception);
}
....
}
}
4.ClientComms.java
???????該類也是一個非常重要的類,主要創(chuàng)建了三個線程和ClientState實例顿涣,構(gòu)造方法中創(chuàng)建了CommsCallback線程和ClientState實例波闹,接著上步會調(diào)用到connect()方法,看一下該方法的實現(xiàn)邏輯:
4.1.connect()
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
//設置狀態(tài)為連接中
conState = CONNECTING;
conOptions = options;
//創(chuàng)建MqttConnect涛碑,表示與broker連接的message
MqttConnect connect = new MqttConnect(client.getClientId(),
conOptions.getMqttVersion(),
conOptions.isCleanSession(),
conOptions.getKeepAliveInterval(),
conOptions.getUserName(),
conOptions.getPassword(),
conOptions.getWillMessage(),
conOptions.getWillDestination());
......
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();
}
}
}
???????在connect()內(nèi)部創(chuàng)建了MqttConnect精堕,表示是連接message,然后創(chuàng)建ConnectBG實例并執(zhí)行start()蒲障;
4.2.ConnectBG
private class ConnectBG implements Runnable {
ClientComms clientComms = null;
Thread cBg = null;
MqttToken conToken;
MqttConnect conPacket;
ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) {
clientComms = cc;
conToken = cToken;
conPacket = cPacket;
cBg = new Thread(this, "MQTT Con: "+getClient().getClientId());
}
void start() {
cBg.start();
}
public void run() {
......
try {
........
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
//CommsCallback本身是一個線程歹篓,啟動
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
}
.......
if (mqttEx != null) {
//不為空瘫证,說明進入了catch,則shut down
shutdownConnection(conToken, mqttEx);
}
}
}
//接著AsyncClient.setCallback庄撮,會調(diào)用ClientComms設置MqttCallbackBus回調(diào)
public void setCallback(MqttCallback mqttCallback) {
this.callback.setCallback(mqttCallback);
}
???????從上面的代碼可以看到背捌,在執(zhí)行connect()方法后,主要做了以下幾項工作:
??????1.networkModule.start():創(chuàng)建socket洞斯,與broker建立連接毡庆;
//TcpNetworkModule.java
public void start() throws IOException, MqttException
try {
......
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
......
}
.......
}
???????2.創(chuàng)建CommsReceiver()然后start(),不斷循環(huán)讀取Broker端來的消息烙如;
//CommsReceiver.java
public void run() {
......
while (running && (in != null)) {
try {
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
clientState.notifyReceivedAck((MqttAck)message);
}
}
} else {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
........
........
}
???????3.創(chuàng)建CommsSender實例么抗,然后start(),主要用來向Broker發(fā)送消息亚铁;
//CommsSender.java
public void run() {
......
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} ......
clientState.notifySent(message);
}
}
}
}
}
}
......
}
??????4.CommsCallback.start():通過該類來實現(xiàn)broker返回消息的回調(diào)處理入口蝇刀,后面會講到。
??????5.internalSend(conPacket, conToken):發(fā)送連接action
5.ClientState.java
??????client在進行消息publish時徘溢,先經(jīng)過ClientComms吞琐,最終會調(diào)用到ClientState里面的send()方法,看一下send()方法的實現(xiàn)邏輯:
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
......
//message是publish型的message
if (message instanceof MqttPublish) {
synchronized (queueLock) {
......
//獲取到要發(fā)送的Message
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//獲取到要發(fā)送Message的Qos
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
//加入pendingMessages甸昏,CommsSender通過get()方法獲取到message后就會進行發(fā)送
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
//其他類型的message
if (message instanceof MqttConnect) {
synchronized (queueLock) {
// Add the connect action at the head of the pending queue ensuring it jumps
// ahead of any of other pending actions.
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttPingReq) {
this.pingCommand = message;
}
else if (message instanceof MqttPubRel) {
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
else if (message instanceof MqttPubComp) {
persistence.remove(getReceivedPersistenceKey(message));
}
synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
}
}
//連接成功
public void connected() {
final String methodName = "connected";
//@TRACE 631=connected
log.fine(CLASS_NAME, methodName, "631");
this.connected = true;
//啟動pingSender顽分,來在keepAliveInterval內(nèi)發(fā)送心跳包
pingSender.start(); //Start ping thread when client connected to server.
}
??????在CommsReceiver收到broker的ack及普通消息后,會先經(jīng)過clientState施蜜,具體會調(diào)用以下兩個方法:
??????notifyReceivedAck():連接成功ack卒蘸、qos=1和2時publish消息后的ack、心跳相關ack等都會通過該方法調(diào)用CommsCallback的方法翻默。
??????notifyReceivedMsg():正常的publish消息等缸沃。
??????pingSender.start():表示client在連上broker后會在aliveInterval后發(fā)送心跳包,pingSender是在創(chuàng)建client時就創(chuàng)建了TimerPingSender實例修械,一步步先傳給clientComms趾牧,再傳給ClientState,執(zhí)行start()來創(chuàng)建Timer肯污,執(zhí)行TimerTask來最終clientState的checkForActivity()翘单,將PingReq加入pendingFlows后,queueLock.notifyAll()蹦渣,調(diào)用CommsSender來進行發(fā)送哄芜,然后加入下一輪執(zhí)行。如果正常的話柬唯,會收到PingResp认臊,修改lastInboundActivity和pingOutstanding的值來在下一輪執(zhí)行check時來判斷是否收到心跳,異常就拋出REASON_CODE_CLIENT_TIMEOUT(32000)碼锄奢。
6.CommsCallback.java
??????通過該類來實現(xiàn)broker返回消息的回調(diào)處理入口失晴,包括處理斷開剧腻、消息發(fā)送成功,消息到達涂屁、連接狀態(tài)回調(diào)等
public void start(String threadName) {
synchronized (lifecycle) {
if (!running) {
// Preparatory work before starting the background thread.
// For safety ensure any old events are cleared.
messageQueue.clear();
completeQueue.clear();
running = true;
quiescing = false;
callbackThread = new Thread(this, threadName);
callbackThread.start();
}
}
}
public void run() {
final String methodName = "run";
while (running) {
try {
//沒有work時书在,wait(),當有message來時胯陋,通過workAvailable.notifyAll()來喚醒
try {
synchronized (workAvailable) {
if (running && messageQueue.isEmpty()
&& completeQueue.isEmpty()) {
workAvailable.wait();
}
}
} catch (InterruptedException e) {
}
if (running) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (completeQueue) {
if (!completeQueue.isEmpty()) {
// First call the delivery arrived callback if needed
token = (MqttToken) completeQueue.elementAt(0);
completeQueue.removeElementAt(0);
}
}
if (null != token) {
handleActionComplete(token);
}
// Check for messageArrived callbacks...
MqttPublish message = null;
synchronized (messageQueue) {
if (!messageQueue.isEmpty()) {
// Note, there is a window on connect where a publish
// could arrive before we've
// finished the connect logic.
message = (MqttPublish) messageQueue.elementAt(0);
messageQueue.removeElementAt(0);
}
}
.......
} finally {
synchronized (spaceAvailable) {
spaceAvailable.notifyAll();
}
}
}
}
//連接回調(diào)方法入口蕊温,通過handleActionComplete()來調(diào)用
public void fireActionEvent(MqttToken token) {
final String methodName = "fireActionEvent";
if (token != null) {
//此處的asyncCB就是在MqttAsyncClient.connect()里面?zhèn)魅氲腃onnectActionListener,參考如下:
//ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
//userToken.setActionCallback(connectActionListener);
IMqttActionListener asyncCB = token.getActionCallback();
if (asyncCB != null) {
if (token.getException() == null) {
//回調(diào)ConnectActionListener的onSuccess的方法
asyncCB.onSuccess(token);
} else {
//回調(diào)ConnectActionListener的onFailure的方法
asyncCB.onFailure(token, token.getException());
}
}
}
}
//設置MqttCallbackBus回調(diào)
public void setCallback(MqttCallback mqttCallback) {
this.mqttCallback = mqttCallback;
}
//斷開回調(diào)MqttCallbackBus接口
public void connectionLost(MqttException cause) {
try {
if (mqttCallback != null && cause != null) {
//回調(diào)MqttCallbackBus的connectionLost接口
mqttCallback.connectionLost(cause);
....
}
....
}
....
}
7.客戶端連接流程圖
??????總結(jié)一下client端連接broker的流程圖袱箱,黑色的線代表client端從創(chuàng)建到跟broker進行connect()的調(diào)用流程遏乔,綠色的線代表從broker收到回復消息后的調(diào)用流程。
8.客戶端發(fā)送及訂閱接收消息流程圖
五.qos值及其含義
1.至多一次
???????消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡发笔。會發(fā)生消息丟失或重復盟萨。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù)了讨,丟失一次讀記錄無所謂捻激,因為不久后還會有第二次發(fā)送。
2.至少一次
???????確保消息到達前计,但消息可能會重復發(fā)生胞谭。
3.只有一次
???????確保消息到達一次。這一級別可用于如下情況男杈,在計費系統(tǒng)中丈屹,消息重復或丟失會導致不正確的結(jié)果。
4.源碼分析
???????從Broker來的ack消息是通過CommsReceiver來接收的伶棒,接收后會調(diào)用ClientState的notifyReceiveAck()方法旺垒,結(jié)合上面的圖及代碼一起看一下:
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
final String methodName = "notifyReceivedAck";
this.lastInboundActivity = System.currentTimeMillis();
MqttToken token = tokenStore.getToken(ack);
MqttException mex = null;
if (token == null) {
} else if (ack instanceof MqttPubRec) {
MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
//收到MqttPubRec后,創(chuàng)建MqttPubRel進行回復
this.send(rel, token);
} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
//qos = 1或2時肤无,收到MqttPubAck或MqttPubComp來通知deliveryComplete回調(diào)先蒋,及刪除message
notifyResult(ack, token, mex);
} else if (ack instanceof MqttPingResp) {
synchronized (pingOutstandingLock) {
pingOutstanding = Math.max(0, pingOutstanding-1);
notifyResult(ack, token, mex);
if (pingOutstanding == 0) {
tokenStore.removeToken(ack);
}
}
//@TRACE 636=ping response received. pingOutstanding: {0}
log.fine(CLASS_NAME,methodName,"636",new Object[]{ new Integer(pingOutstanding)});
} else if (ack instanceof MqttConnack) {
......
//連接成功的回調(diào)
......
} else {
......
}
checkQuiesceLock();
}
???????Subscriber在收到message后,會執(zhí)行到ClientState.notifyReceivedMsg()方法宛渐,該方法會根據(jù)qos的值來做相應的處理竞漾,qos=0或1,直接會調(diào)用messageArrived窥翩,然后刪除persistence业岁,send(PubAck);
???????qos=2時鳍烁,先存儲叨襟,然后send(PubRec),接下來會收到broker的PubRel幔荒,如果能找到msg糊闽,則會調(diào)用messageArrived梳玫,然后send(PubComp),刪除persistence右犹;如果再次收到PubRel提澎,找不到msg,則直接send(PubComp)念链,確保只執(zhí)行一次messageArrived盼忌。
//ClientState.java
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = "notifyReceivedMsg";
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(CLASS_NAME, methodName, "651", new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish) message;
switch (send.getMessage().getQos()) {
case 0:
case 1:
//Qos=1或0,直接執(zhí)行
if (callback != null) {
callback.messageArrived(send);
}
break;
case 2:
//Qos=2掂墓,先存儲谦纱,然后發(fā)送PubRec
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
break;
default:
//should NOT reach here
}
} else if (message instanceof MqttPubRel) {
//收到PubRel后,先從inboundQoS2找msg
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
//找到msg君编,表示還未執(zhí)行messageArrived跨嘉,先執(zhí)行
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
}
} else {
//找不到說明已經(jīng)執(zhí)行了messageArrived,直接發(fā)送PubComp
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
}
}
}
}
??????clientState的notifyResult()方法吃嘿,經(jīng)過一系列調(diào)用祠乃,最終會通過CommsCallback中的workAvailable.notifyAll()---->run()---->handleActionComplete()方法,看一下該方法的實現(xiàn):
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
if (token.isComplete()) {
//刪除message
clientState.notifyComplete(token);
}
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
//回調(diào)deliveryComplete()方法
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
//內(nèi)部邏輯只有在connect()時才會調(diào)用兑燥,publish()時亮瓷,Listener為null,不會執(zhí)行
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
}
}
??????以下是在收到MqttPubAck或MqttPubComp后降瞳,會將存儲的persistence刪除掉嘱支。
//ClientState.java
protected void notifyComplete(MqttToken token) throws MqttException {
final String methodName = "notifyComplete";
MqttWireMessage message = token.internalTok.getWireMessage();
if (message != null && message instanceof MqttAck) {
// @TRACE 629=received key={0} token={1} message={2}
log.fine(CLASS_NAME, methodName, "629", new Object[] {
new Integer(message.getMessageId()), token, message });
MqttAck ack = (MqttAck) message;
if (ack instanceof MqttPubAck) {
// QoS 1 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
outboundQoS1.remove(new Integer(ack.getMessageId()));
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 650=removed Qos 1 publish. key={0}
log.fine(CLASS_NAME, methodName, "650",
new Object[] { new Integer(ack.getMessageId()) });
} else if (ack instanceof MqttPubComp) {
// QoS 2 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
persistence.remove(getSendConfirmPersistenceKey(message));
outboundQoS2.remove(new Integer(ack.getMessageId()));
inFlightPubRels--;
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1}
log.fine(CLASS_NAME, methodName, "645", new Object[] {
new Integer(ack.getMessageId()),
new Integer(inFlightPubRels) });
}
checkQuiesceLock();
}
}
??????以上邏輯主要是發(fā)送或接收Qos=1和Qos=2的message后,publisher與Broker力崇、Broker與Subscriber之間的交互流程斗塘。
??????看一下總的流程圖:
5.總結(jié)
??????Qos0:消息不存persistence,publish后直接通過notifySent()后來complete亮靴;
??????Qos1:publisher:消息存persistence馍盟,publish后收到PubAck后來進行complete及persistence.remove();
??????????????????subscriber:notifyReceivedMsg后茧吊,先deliverMessage()贞岭,然后send(PubAck);
??????Qos2:publisher:消息存persistence搓侄,publish后會收到PubRec瞄桨,然后發(fā)送PubRel,再收到PubComp后進行complete及persistence.remove()讶踪;
??????????????????subscriber:notifyReceivedMsg后芯侥,先persistence.put(),inboundQos2.put(),然后send(PubRec)到Broker柱查,收到來自Broker的PubRel后廓俭,再次notifyReceivedMsg,執(zhí)行deliverMessage()唉工,send(PubComp)研乒,刪除persistence。如果再次收到PubRel淋硝,不會進行deliverMessage()雹熬,直接send(PubComp)。
六.心跳機制
??????1.Keep Alive指定連接最大空閑時間T谣膳,當客戶端檢測到連接空閑時間超過T時竿报,必須向Broker發(fā)送心跳報文PINGREQ,Broker收到心跳請求后返回心跳響應PINGRESP参歹。
??????2.若Broker超過1.5T時間沒收到心跳請求則斷開連接仰楚,并且投遞遺囑消息到訂閱方;同樣犬庇,若客戶端超過一定時間仍沒收到Broker心跳響應PINGRESP則斷開連接。
??????3.連接空閑時發(fā)送心跳報文可以降低網(wǎng)絡請求侨嘀,弱化對帶寬的依賴臭挽。
七.保留消息定義[retained]
??????如果Publish消息的retained標記位被設置為1,則稱該消息為“保留消息”咬腕;
??????Broker對保留消息的處理如下:
??????Broker會存儲每個Topic的最后一條保留消息及其Qos欢峰,當訂閱該Topic的客戶端上線后,Broker需要將該消息投遞給它涨共。
??????保留消息作用:
??????可以讓新訂閱的客戶端得到發(fā)布方的最新的狀態(tài)值纽帖,而不必要等待發(fā)送。
??????保留消息的刪除:
??????方式1:發(fā)送空消息體的保留消息举反;
??????方式2:發(fā)送最新的保留消息覆蓋之前的(推薦)懊直;
八.完全實現(xiàn)解耦
??????MQTT這種結(jié)構(gòu)替代了傳統(tǒng)的客戶端/服務器模型,可以實現(xiàn)以下解耦:
??????空間解耦:發(fā)布者和訂閱者不需要知道對方火鼻;
??????時間解耦:發(fā)布者和訂閱者不需要同時運行(離線消息, retained = 1的話室囊,可以實現(xiàn));
??????同步解耦:發(fā)布和接收都是異步通訊魁索,無需停止任何處理融撞;
九.與HTTP比較:
??????MQTT最長可以一次性發(fā)送256MB數(shù)據(jù);
??????HTTP是典型的C/S通訊模式:請求從客戶端發(fā)出粗蔚,服務端只能被動接收尝偎,一條連接只能發(fā)送一次請求,獲取響應后就斷開連接鹏控;
??????HTTP的請求/應答方式的會話都是客戶端發(fā)起的致扯,缺乏服務器通知客戶端的機制,客戶端應用需要不斷地輪詢服務器趁窃;
十.mosquitto.conf
??????可以通過以下輸出mosquitto日志:
log_dest file /storage/emulated/0/mosquitto.log //Android系統(tǒng)輸出到文件
log_dest stdout // linux系統(tǒng)直接輸出到工作臺
log_type all
十一.mqtt本地調(diào)試
1.啟動broker,mosquitto – 代理器主程序
./mosquitto &
??????其中:broker ip 為電腦端ip急前;port:默認1883醒陆;
2.mosquitto_pub – 用于發(fā)布消息的命令行客戶端,向已訂閱的topic發(fā)布消息
./mosquitto_pub -h host -p port -t topic -m message
??????其中:-h:broker ip裆针;-p:端口號刨摩,默認1883;-t:已訂閱的topic世吨;-m:發(fā)布的消息
??????舉例
./mosquitto_pub -h 10.10.20.10 -p 1883 -t baiduMap -m "{"origin": "西直門","end":"東直門"}"
3.mosquitto_sub – 用于訂閱消息的命令行客戶端澡刹,訂閱topic
./mosquitto_sub -h host -p port -t topic
??????其中:-h:broker ip;-p:端口號耘婚,默認1883罢浇;-t:需要訂閱的topic
4.運行環(huán)境
??????ubuntu系統(tǒng),將libmosquitto.so.1放入系統(tǒng)變量中:
export LD_LIBRARY_PATH= 附件libmosquitto.so.1路徑:$PATH
5.查看連接broker的客戶端
??????broker默認的端口號1883沐祷,可以通過端口號來查看已連接的客戶端
netstat -an | grep :1883
tcp 0 0 0.0.0.0:1883 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:1883 127.0.0.1:44618 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.1:45256 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.2:49612 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.2:49508 ESTABLISHED
??????可以看到連接broker的客戶端為4個嚷闭;
??????以上分別介紹了MQTT協(xié)議以及服務端mosquitto、客戶端Eclipse paho的使用及源碼介紹赖临!