MQTT簡介:
MQTT(Message Queuing Telemetry Transport宝惰,消息隊列遙測傳輸)是IBM開發(fā)的一個即時通訊協(xié)議!
MQTT消息的主要特點:
使用(publish/subscribe)消息模式,簡稱p/s模式,即發(fā)布/訂閱!提供一對多的發(fā)送方式!
MQTT根據(jù)QoS定義的等級來傳輸消息:
- level 0:最多一次的傳輸
消息是基于TCP/IP網(wǎng)絡傳輸?shù)摹]有回應泉蝌,在協(xié)議中也沒有定義重傳的語義。消息可能到達服務器1次,也可能根本不會到達汹胃。
- level 1:至少一次的傳輸
服務器接收到消息會被確認,通過傳輸一個PUBACK信息东臀。如果有一個可以辨認的傳輸失敗着饥,無論是通訊連接還是發(fā)送設備,還是過了一段時間確認信息沒有收到惰赋,發(fā)送方都會將消息頭的DUP位置1宰掉,然后再次發(fā)送消息呵哨。消息最少一次到達服務器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS轨奄。
如果客戶端沒有接收到PUBACK信息(無論是應用定義的超時孟害,還是檢測到失敗然后通訊session重啟),客戶端都會再次發(fā)送PUBLISH信息挪拟,并且將DUP位置1挨务。
當它從客戶端接收到重復的數(shù)據(jù),服務器重新發(fā)送消息給訂閱者玉组,并且發(fā)送另一個PUBACK消息谎柄。
- level 2: 只有一次的傳輸
在QoS level 1上附加的協(xié)議流保證了重復的消息不會傳送到接收的應用。這是最高級別的傳輸惯雳,當重復的消息不被允許的情況下使用朝巫。這樣增加了網(wǎng)絡流量,但是它通常是可以接受的石景,因為消息內(nèi)容很重要劈猿。
QoS level 2在消息頭有Message ID。
接下來開始我們的表演:
下載代理服務器
本文使用mqtt代理服務器是apache下的apollo代理服務器
- Paste_Image.png
創(chuàng)建代理服務器
下載完成然后解壓目錄
打開dos窗口進入到apache-apollo-1.7.1\bin目錄下
執(zhí)行apollo create testbroker命令創(chuàng)建一個名稱為testbroker的代理服務器
- Paste_Image.png
-
下面就是我們創(chuàng)建的代理服務器
Paste_Image.png
啟動代理服務器
使用dos進入testbroker目錄中的bin目錄下
執(zhí)行apollo-broker run命令啟動代理服務器
- Paste_Image.png
通過HTTP訪問代理服務器
現(xiàn)在我們可以打開瀏覽器看下我們的代理服務器
輸入網(wǎng)址http://127.0.0.1:61680/
- Paste_Image.png
用戶名密碼可到配置文件中查看
進入testbroker目錄下的etc目錄
Paste_Image.png
- users.properties中配置的用戶名和密碼
Paste_Image.png
默認有個用戶名為admin潮孽,密碼為password的用戶
我們也可以自己配置用戶
現(xiàn)在就用默認用戶登陸
- Paste_Image.png
OK登陸成功
接下來我們編寫Android客戶端
- 首先準備mqtt jar包
- 不容易呀,mqtt這個jar真難找
- https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/
Paste_Image.png
一定保證客戶端和服務端以及代理服務器所在的電腦在同一網(wǎng)段下
- 可以在電腦上生成wifi熱點,手機客戶端連接熱點即可
- 接下來直接貼Android客戶端代碼
MqttService.java
package com.example.jingwc.mqtt_demo;
import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.util.Log;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttService extends Service {
/**
* 代理服務器ip地址
*/
public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613";
/**
* 客戶端唯一標識
*/
public static final String MQTT_CLIENT_ID = "android-jingwc";
/**
* 訂閱標識
*/
public static final String MQTT_TOPIC = "jingwc";
/**
* 用戶名
*/
public static final String USERNAME = "admin";
/**
* 密碼
*/
public static final String PASSWORD = "password";
private MqttClient mqttClient;
public MqttService() {
}
@Override
public IBinder onBind(Intent intent) {
return binder;
}
/**
* 連接mqtt
*/
public void connect(){
try {
// host為主機名揪荣,clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示恩商,
// MemoryPersistence設置clientid的保存形式变逃,默認為以內(nèi)存保存
mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence());
// 配置參數(shù)信息
MqttConnectOptions options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
// 這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
// 設置用戶名
options.setUserName(USERNAME);
// 設置密碼
options.setPassword(PASSWORD.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線怠堪,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 連接
mqttClient.connect(options);
// 訂閱
mqttClient.subscribe(MQTT_TOPIC);
// 設置回調(diào)
mqttClient.setCallback(new MqttCallback() {
//連接丟失后揽乱,一般在這里面進行重連
@Override
public void connectionLost(Throwable throwable) {
Log.d("test","connectionLost");
}
//subscribe后得到的消息會執(zhí)行到這里面
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Log.d("test","messageArrived"+mqttMessage.toString());
}
//publish后會執(zhí)行到這里
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.d("test","deliveryComplete");
}
});
} catch (MqttException e) {
e.printStackTrace();
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disconnect(){
if(mqttClient != null){
if(mqttClient.isConnected()){
try {
mqttClient.disconnect();
mqttClient = null;
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
private final Binder binder = new MyBinder();
class MyBinder extends Binder{
public MqttService getService(){
return MqttService.this;
}
}
}
MainActivity.java
package com.example.jingwc.mqtt_demo;
import android.content.ComponentName;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.IBinder;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
public class MainActivity extends AppCompatActivity {
MqttService service = null;
private ServiceConnection mConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
service = ((MqttService.MyBinder)iBinder).getService();
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
service = null;
}
};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Button bt_connect = (Button) findViewById(R.id.bt_connect);
Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect);
bt_connect.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
// 連接
service.connect();
}
});
bt_disconnect.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
// 斷開連接
service.disconnect();
}
});
bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE);
}
}
服務端代碼
- 也可以在寫一個android程序當作服務端
- 我這里寫的是java項目
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttServer {
// 代理服務器ip地址
private static String host = "tcp://192.168.1.107:61613";
private static String userName = "admin";
private static String password = "password";
private static MqttClient client;
// 主題
private static MqttTopic topic;
private static MqttMessage message;
// 訂閱標識
private static String topicStr = "jingwc";
public static void main(String[] args) throws MqttException{
client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
topic = client.getTopic(topicStr);
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload("from server message".getBytes());
client.connect(options);
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("token:"+token.isComplete());
}
}
服務端通過代理服務器發(fā)送客戶端訂閱的消息圖
- Paste_Image.png
Paste_Image.png