需求:安卓手機通過app遠程監(jiān)控臥室溫濕度,控制電源開關(guān)盟庞。
流程概述:安卓手機通過MQTT協(xié)議服務(wù)器與ESP32(Arduino)開發(fā)板連接并進行數(shù)據(jù)交互
1. MTQQ協(xié)議
轉(zhuǎn)載自(https://baijiahao.baidu.com/s?id=1608411516249221334&wfr=spider&for=pc)
1.1 簡介:
MQTT協(xié)議(Message Queuing Telemetry Transport),翻譯過來就是遙信消息隊列傳輸罩息,是IBM公司于1999年提出的嗤详,現(xiàn)在最新版本是3.1.1。MQTT是一個基于TCP的發(fā)布訂閱協(xié)議瓷炮,設(shè)計的初始目的是為了極有限的內(nèi)存設(shè)備和網(wǎng)絡(luò)帶寬很低的網(wǎng)絡(luò)不可靠的通信葱色,非常適合物聯(lián)網(wǎng)通信。
MQTT的網(wǎng)絡(luò)層級:
1.2 工作原理:
發(fā)布訂閱示意圖
如上圖所示娘香,客戶端A連接到消息代理(message broker),消息代理返回確認消息苍狰。客戶B發(fā)布消息溫度25度烘绽,客戶A訂閱‘溫度’淋昭,消息代理吧消息推給客戶A,客戶A發(fā)布溫度20度安接,但客戶B沒有訂閱翔忽,消息代理不推送。消息B又發(fā)布了溫度38度盏檐,客戶A就再次收到訂閱的消息38度歇式。最后客戶端斷開連接。整個過程非常簡單清晰胡野,容易理解材失。
1.3 MQTT消息的QOS
MQTT支持三種QOS等級:
QoS 0:“最多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)硫豆。分發(fā)的消息可能丟失或重復龙巨。例如,這個等級可用于環(huán)境傳感器數(shù)據(jù)熊响,單次的數(shù)據(jù)丟失沒關(guān)系旨别,因為不久后還會有第二次發(fā)送。
QoS 1:“至少一次”汗茄,確保消息可以到達昼榛,但消息可能會重復。
QoS 2:“只有一次”剔难,確保消息只到達一次胆屿。例如,這個等級可用在一個計費系統(tǒng)中偶宫,這里如果消息重復或丟失會導致不正確的收費非迹。
1.4 MQTT的消息類型
1 CONNECT – 連接服務(wù)端:客戶端到服務(wù)端的網(wǎng)絡(luò)連接建立后, 客戶端發(fā)送給服務(wù)端的第一個報文必須是CONNECT報文
2 CONNACK – 確認連接請求:服務(wù)端發(fā)送CONNACK報文響應(yīng)從客戶端收到的CONNECT報文纯趋。 服務(wù)端發(fā)送給客戶端的第一個報文必須是CONNACK憎兽。如果客戶端在合理的時間內(nèi)沒有收到服務(wù)端的CONNACK報文冷离, 客戶端應(yīng)該關(guān)閉網(wǎng)絡(luò)連接。合理的時間取決于應(yīng)用的類型和通信基礎(chǔ)設(shè)施纯命。
3 PUBLISH – 發(fā)布消息:PUBLISH控制報文是指從客戶端向服務(wù)端或者服務(wù)端向客戶端傳輸一個應(yīng)用消息西剥。
4 PUBACK –發(fā)布確認:PUBACK報文是對QoS 1等級的PUBLISH報文的響應(yīng)。
5 PUBREC – 發(fā)布收到( QoS 2亿汞, 第一步):PUBREC報文是對QoS等級2的PUBLISH報文的響應(yīng)瞭空。 它是QoS 2等級協(xié)議交換的第二個報文。
6 PUBREL – 發(fā)布釋放( QoS 2疗我, 第二步):PUBREL報文是對PUBREC報文的響應(yīng)咆畏。 它是QoS 2等級協(xié)議交換的第三個報文。
7 PUBCOMP – 發(fā)布完成( QoS 2吴裤, 第三步):PUBCOMP報文是對PUBREL報文的響應(yīng)旧找。 它是QoS 2等級協(xié)議交換的第四個也是最后一個報文。
8 SUBSCRIBE - 訂閱主題:客戶端向服務(wù)端發(fā)送SUBSCRIBE報文用于創(chuàng)建一個或多個訂閱麦牺。 每個訂閱注冊客戶端關(guān)心的一個或多個主題钮蛛。 為了將應(yīng)用消息轉(zhuǎn)發(fā)給與那些訂閱匹配的主題, 服務(wù)端發(fā)送PUBLISH報文給客戶端剖膳。 SUBSCRIBE報文也( 為每個訂閱) 指定了最大的QoS等級魏颓, 服務(wù)端根據(jù)這個發(fā)送應(yīng)用消息給客戶端。
9 SUBACK – 訂閱確認:服務(wù)端發(fā)送SUBACK報文給客戶端潮秘, 用于確認它已收到并且正在處理SUBSCRIBE報文琼开。
10 UNSUBSCRIBE –取消訂閱:客戶端發(fā)送UNSUBSCRIBE報文給服務(wù)端易结, 用于取消訂閱主題枕荞。
11 UNSUBACK – 取消訂閱確認:服務(wù)端發(fā)送UNSUBACK報文給客戶端用于確認收到UNSUBSCRIBE報文。
12 PINGREQ – 心跳請求:客戶端發(fā)送PINGREQ報文給服務(wù)端的搞动。 用于:1. 在沒有任何其它控制報文從客戶端發(fā)給服務(wù)的時躏精, 告知服務(wù)端客戶端還活著。2. 請求服務(wù)端發(fā)送 響應(yīng)確認它還活著鹦肿。3. 使用網(wǎng)絡(luò)以確認網(wǎng)絡(luò)連接沒有斷開矗烛。
13 PINGRESP – 心跳響應(yīng):服務(wù)端發(fā)送PINGRESP報文響應(yīng)客戶端的PINGREQ報文。 表示服務(wù)端還活著箩溃。
14 DISCONNECT –斷開連接:DISCONNECT報文是客戶端發(fā)給服務(wù)端的最后一個控制報文瞭吃。 表示客戶端正常斷開連接。
1.5 MQTT控制報文格式
2. MQTT服務(wù)器搭建
MQTT服務(wù)器非常多涣旨,如apache的ActiveMQ歪架,emtqqd,HiveMQ霹陡,Emitter和蚪,Mosquitto止状,Moquette等等
在這里我們使用apache-apollo-1.7.1作為MQTT服務(wù)器
運行環(huán)境:阿里云ubuntu1604
前往下載壓縮包:(http://activemq.apache.org/apollo/download.html)
apache-apollo-1.7.1-unix-distro.tar.gz
下載后拷貝到服務(wù)器,使用linux命令解壓:
tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
解壓完成后進入文件夾
cd apache-apollo-1.7.1
新建broker(MQTT的服務(wù)器被稱作broker攒霹,broker即一個MQTT服務(wù)器項目)
./bin/apollo create mybroker
配置
參考:(https://blog.csdn.net/qq_27109703/article/details/78789494)
cd mybroker
vim etc/apollo.xml
部分配置按照上圖修改怯疤,確保外網(wǎng)可以訪問http監(jiān)控頁面(http://ip:61680),如果依然不能訪問可能是因為以下問題:
ubuntu防火墻啟用中(參考解決:https://www.cnblogs.com/EasonJim/p/7595213.html)
阿里云安全組未配置61680與61613端口
vim etc/users.properties
配置用戶名密碼:
admin=admin
配置完成后即可啟動服務(wù)器:
./bin/apollo-broker run
或
./bin/apollo-broker-service start
腳本參數(shù):apollo-broker-service {start|stop|restart|force-stop|status}
啟動后可以訪問http監(jiān)控頁面(http://ip:61680)催束,輸入用戶名密碼即可進入監(jiān)控集峦。
3. Android端接入MQTT服務(wù)器
使用AndroidStudio新建項目
app的build.gradle:
//eclipse的mqtt協(xié)議開發(fā)包
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
AndroidManifest.xml添加權(quán)限
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
Client.java:MQTT連接客戶端類
package com.myhuanghai.mymqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
public class Client {
private static final String HOST = "tcp://47.104.142.113:61613";
private static final String clientid = "android";
private static final String userName = "admin";
private static final String passWord = "admin";
private HashMap<String, MqttTopic> topicList = new HashMap<>();
void start(String[] publicTopics, String[] subscribeTopics, PushCallback pushCallback) {
try {
// host為主機名,clientid即連接MQTT的客戶端ID泣崩,一般以唯一標識符表示少梁,MemoryPersistence設(shè)置clientid的保存形式,默認為以內(nèi)存保存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄矫付,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(true);
// 設(shè)置連接的用戶名
options.setUserName(userName);
// 設(shè)置連接的密碼
options.setPassword(passWord.toCharArray());
// 設(shè)置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線凯沪,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 設(shè)置回調(diào)
client.setCallback(pushCallback);
//MqttTopic topic = client.getTopic(TOPIC);
//setWill方法,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法买优。設(shè)置最終端口的通知消息
//options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
//訂閱消息
int[] Qos = new int[subscribeTopics.length];
for (int i=0;i<Qos.length;i++){
Qos[i] = 1;
}
client.subscribe(subscribeTopics, Qos);
for (String publicTopic : publicTopics) {
topicList.put(publicTopic, client.getTopic(publicTopic));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void publish(String topic, String message) throws MqttException, UnsupportedEncodingException {
MqttMessage msg = new MqttMessage();
msg.setQos(2);
msg.setRetained(true);
msg.setPayload(ByteUtils.stringToByte(message));
MqttTopic mqttTopic = topicList.get(topic);
MqttDeliveryToken token = mqttTopic.publish(msg);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
public static void main(String[] args) {
}
}
PushCallback.java
package com.myhuanghai.mymqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 發(fā)布消息的回調(diào)類
*
* 必須實現(xiàn)MqttCallback的接口并實現(xiàn)對應(yīng)的相關(guān)接口方法CallBack 類將實現(xiàn) MqttCallBack妨马。
* 每個客戶機標識都需要一個回調(diào)實例。在此示例中杀赢,構(gòu)造函數(shù)傳遞客戶機標識以另存為實例數(shù)據(jù)烘跺。
* 在回調(diào)中,將它用來標識已經(jīng)啟動了該回調(diào)的哪個實例脂崔。
* 必須在回調(diào)類中實現(xiàn)三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預(yù)訂的發(fā)布滤淳。
*
* public void connectionLost(Throwable cause)在斷開連接時調(diào)用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時調(diào)用砌左。
* 由 MqttClient.connect 激活此回調(diào)脖咐。
*
*/
public interface PushCallback extends MqttCallback {
void connectionLost(Throwable cause);
void deliveryComplete(IMqttDeliveryToken token);
void messageArrived(String topic, MqttMessage message) throws Exception;
}
byte工具類源碼ByteUtils.java
package com.myhuanghai.mymqtt;
import java.io.UnsupportedEncodingException;
/**
* Created by huang on 2017/6/30.
*/
public class ByteUtils {
/**
* string到字節(jié)數(shù)組的轉(zhuǎn)換.
*/
public static byte[] stringToByte(String str) throws UnsupportedEncodingException {
return str.getBytes("UTF-8");
}
/**
* 字節(jié)數(shù)組到String的轉(zhuǎn)換.
*/
public static String bytesToString(byte[] str) {
String keyword = null;
try {
keyword = new String(str,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return keyword;
}
}
activity_main.xml
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
tools:context=".MainActivity">
<TextView
android:id="@+id/tv"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="正在連接"
/>
<Button
android:id="@+id/btn"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="電源:ON"/>
</LinearLayout>
MainActivity.java
package com.myhuanghai.mymqtt;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.UnsupportedEncodingException;
public class MainActivity extends AppCompatActivity {
private TextView textView;
private Button button;
boolean state = true;//當前繼電器開關(guān)狀態(tài)
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
textView = findViewById(R.id.tv);
button = findViewById(R.id.btn);
final Client client = new Client();
client.start(new String[]{"power"},new String[]{"temperature","state"},new PushCallback() {
@Override
public void connectionLost(Throwable cause) {
runOnUiThread(new Runnable() {
@Override
public void run() {
textView.setText("連接丟失");
}
});
cause.getCause().printStackTrace();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void messageArrived(final String topic, final MqttMessage message) throws Exception {
runOnUiThread(new Runnable() {
@Override
public void run() {
if (topic.equals("temperature")){
textView.setText(""+ByteUtils.bytesToString(message.getPayload()));
}else if (topic.equals("state")){
state = ByteUtils.bytesToString(message.getPayload()).equals("state:1");
button.setText("電源:"+(state?"on":"off"));
}
}
});
}
});
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
try {
client.publish("power",state?"0":"1");
state = !state;
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
});
}
}
4. ESP32開發(fā)板接入MQTT
4.1 接線
ESP使用Arduino兼容模式開發(fā),開發(fā)請安裝Arduino工具
4.2 開發(fā)板配置:
將ESP32開發(fā)板帶的類庫解壓復制到Arduino目錄下的hardware文件夾下并重啟
Arduino代碼使用以下類庫:DHTesp.h汇歹、WiFi.h屁擅、WiFiClient.h、PubSubClient.h
下載類庫:
4.3 Arduino代碼:
#include <DHTesp.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>
#define DHT11PIN 18
#define RELAYPIN 27
const char* ssid = "ASUS";
const char* password = "jingai.love";
const char* mqtt_server = "47.104.142.113"; // 使用HIVEMQ 的信息中轉(zhuǎn)服務(wù)
const char* mqtt_username = "admin";
const char* mqtt_password = "admin";
const char* sub_topic = "power"; // 訂閱信息主題
const char* pub_topic_1 = "temperature"; // 發(fā)布信息主題
const char* pub_topic_2 = "state"; // 發(fā)布信息主題
const char* client_id = "esp32"; // 標識當前設(shè)備的客戶端編號
int state = 0;
DHTesp dht;
WiFiClient espClient; // 定義wifiClient實例
PubSubClient client(espClient); // 定義PubSubClient的實例
long lastMsg = 0; // 記錄上一次發(fā)送信息的時長
void setup() {
pinMode(RELAYPIN, OUTPUT); // 定義繼電器輸出引腳
Serial.begin(115200);
dht.setup(DHT11PIN, DHTesp::DHT11);
setup_wifi(); //執(zhí)行Wifi初始化产弹,下文有具體描述
client.setServer(mqtt_server, 61613); //設(shè)定MQTT服務(wù)器與使用的端口派歌,1883是默認的MQTT端口
client.setCallback(callback); //設(shè)定回調(diào)方式,當ESP8266收到訂閱消息時會調(diào)用此方法
}
void setup_wifi() {
delay(10);
// 板子通電后要啟動痰哨,稍微等待一下讓板子點亮
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic); // 打印主題信息
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]); // 打印主題內(nèi)容
}
Serial.println();
if ((char)payload[0] == '1') {
digitalWrite(RELAYPIN, HIGH); // 亮燈
state = 1;
} else {
digitalWrite(RELAYPIN, LOW); // 熄燈
state = 0;
}
//發(fā)布電源狀態(tài)消息
char pub2[20];
sprintf(pub2, "state:%d",state);
client.publish(pub_topic_2, pub2);
}
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Attempt to connect
if (client.connect(client_id,mqtt_username,mqtt_password)) {
Serial.println("connected");
// 連接成功時訂閱主題
client.subscribe(sub_topic);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
long now = millis();
if (now - lastMsg > 2000) {
char pub1[20];
TempAndHumidity lastValues = dht.getTempAndHumidity();
Serial.println("Temperature: " + String(lastValues.temperature,0));
Serial.println("Humidity: " + String(lastValues.humidity,0));
lastMsg = now;
sprintf(pub1, "Temperature:%f#Humidity:%f",lastValues.temperature,lastValues.humidity);
client.publish(pub_topic_1, pub1);
}
}
連接開發(fā)板并進行數(shù)據(jù)燒錄
結(jié)果展示
點擊電源按鈕胶果,繼電器會會開關(guān),并把狀態(tài)返回給客戶端
Github地址:https://github.com/FlyMantou/android_mqtt.git
部分資源百度云:鏈接:https://pan.baidu.com/s/1UmBtOiWXpVtTOVMi8gFhrA
提取碼:bgpc
如果這篇文章能夠幫到你斤斧,請幫我在Github點一個follow早抠,一起學習,一起加油折欠!