Android端基于Mqtt協(xié)議數(shù)據(jù)傳輸
architecture:
server端參考:SpringBoot 集成Mqtt,protobuf服務端搭建
關(guān)于Mqtt
MQTT是一個機器對機器(M2M)/“物聯(lián)網(wǎng)”連接協(xié)議宋下。它被設(shè)計為一種非常輕量級的發(fā)布/訂閱消息傳輸塑娇。它適用于需要少量代碼占用和/或網(wǎng)絡(luò)帶寬昂貴的遠程位置的連接。
例如,它已被用于通過衛(wèi)星鏈路與代理通信的傳感器脂新、通過與醫(yī)療服務提供商的偶爾撥號連接的傳感器,以及一系列家庭自動化和小型設(shè)備場景粗梭。
它也是移動應用程序的理想選擇争便,因為優(yōu)點:
- 體積小、
- 功耗低断医、
- 數(shù)據(jù)包最少滞乙,
- 并且可以有效地將信息分發(fā)給一個或多個接收者
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers
關(guān)于 protobuf
Protobuf (全稱 Protocol Buffers)奏纪,是Google公司開發(fā)的一種數(shù)據(jù)描述語言,類似于XML能夠?qū)⒔Y(jié)構(gòu)化數(shù)據(jù)序列化斩启,可用于數(shù)據(jù)存儲序调、通信協(xié)議等方面。
簡單點來說就是類似于Json兔簇、Xml发绢,最主要的優(yōu)點是比Json、Xml速度快男韧,相信不久的將來應用會更加廣泛朴摊。
android中使用protobuf默垄,過程是這樣的:
- 1此虑、定義proto文件;
- 2口锭、使用該文件生成對應的java類朦前;
- 3、利用該java類實現(xiàn)數(shù)據(jù)傳輸鹃操;
背景介紹
項目中涉及面板機與服務器之間進行通訊韭寸。在協(xié)議選擇上,選擇了輕量級的Mqtt + protobuf荆隘。旨在解決數(shù)據(jù)上的輕量級傳輸恩伺,和后臺服務器的動態(tài)拓展。
因為是從0到1的開發(fā)過程椰拒,以此進行總結(jié)晶渠。
建議:由于protobuf包后的數(shù)據(jù)不方便展示,所以先調(diào)通Mqtt通信燃观,在添加protobuf協(xié)議褒脯。
由于服務端API較復雜不適合一開始Mqtt協(xié)議的調(diào)試,所以構(gòu)建了輕量級的服務端Mqtt-server用于debug缆毁,具體過程參考我的另外一篇文章:SpringBoot 集成Mqtt番川,protobuf服務端搭建
實現(xiàn)簡單通信
要通信,先協(xié)議脊框。通訊前先定義好通訊協(xié)議颁督。好比需要溝通前,我們必須先商量好說哪國的語言是一樣的浇雹。
參考tag:v1.0
簡單協(xié)議
實現(xiàn)消息回環(huán)
服務端訂閱
[topic]: in/mqtt/loop/message
服務端發(fā)布
[topic]: out/mqtt/loop/message
客戶端訂閱
[topic]: out/mqtt/loop/message
客戶端發(fā)布
[topic]: in/mqtt/loop/message
服務端和客戶端都需要訂閱彼此的topic适篙,并向?qū)Ψ降膖opic 發(fā)送消息
注意:不能同時訂閱和發(fā)布到相同topic,會產(chǎn)生回環(huán)箫爷,消息會不斷接收發(fā)送嚷节,[自激震蕩]聂儒。
為簡單起見這里只實現(xiàn):客戶端發(fā)送消息,服務端接受消息
服務端訂閱
[topic]: mqtt/loop/message
客戶端發(fā)布:
[topic]: mqtt/loop/message
創(chuàng)建客戶端
使用Service包裝客戶端MqttAndroidClient
依賴Paho庫硫痰,創(chuàng)建Paho 的 MqttAndroidClient
衩婚,這是所有通訊的核心:
Paho MqttAndroidClient
構(gòu)造方法如下:
- 傳入context:Android上下文
- 傳入 serverURI: broker地址
IP地址+端口號
- 傳入clientID:需要具有唯一標識,這里使用
前綴+UUID
的方式
package org.eclipse.paho.android.service;
//...
/**
* Constructor - create an MqttAndroidClient that can be used to communicate with an MQTT server on android
*
* @param context
* object used to pass context to the callback.
* @param serverURI
* specifies the protocol, host name and port to be used to
* connect to an MQTT server
* @param clientId
* specifies the name by which this connection should be
* identified to the server
*/
public MqttAndroidClient(Context context, String serverURI,
String clientId) {
this(context, serverURI, clientId, null, Ack.AUTO_ACK);
}
初始化客戶端效斑,在Service的 onCreate()
調(diào)用
private void init() {
String serverURI = HOST; //服務器地址(協(xié)議+地址+端口號)
Log.i(TAG, "CLIENTID = " + CLIENTID);
char prefix = PRODUCT_NAME.charAt(0);
prefix = Character.toUpperCase(prefix);
String DeviceID = CLIENTID + "_" + UUID.randomUUID().toString();
Log.i(TAG, "DeviceID:" + DeviceID);
Log.i(TAG, "serverURI:" + serverURI);
mqttAndroidClient = new MqttAndroidClient(this, serverURI, DeviceID);
mqttAndroidClient.setCallback(mqttCallback); //設(shè)置監(jiān)聽訂閱消息的回調(diào)
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //設(shè)置是否清除緩存
mMqttConnectOptions.setConnectionTimeout(10); //設(shè)置超時時間非春,單位:秒
// mMqttConnectOptions.setKeepAliveInterval(20); //設(shè)置心跳包發(fā)送間隔,單位:秒
mMqttConnectOptions.setUserName(USERNAME); //設(shè)置用戶名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //設(shè)置密碼
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + DeviceID + "\"}";
// 最后的遺囑last_will
try {
mMqttConnectOptions.setWill(LAST_WILL_PANEL_SATAUS, message.getBytes(), MQTT_QOS_HIGH, MQTT_RETAINED);
mMqttConnectOptions.setWill(LAST_WILL_PANEL_CHECK, message.getBytes(), MQTT_QOS_HIGH, MQTT_RETAINED);
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
if (doConnect) {
doClientConnection();
}
}
設(shè)置監(jiān)聽缓屠,
- 當連接成功后奇昙,設(shè)置客戶端訂閱消息
- 當連接失敗后,嘗試重新建立連接
//MQTT是否連接成功的監(jiān)聽
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "連接成功 ");
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "連接失敗 ");
/*沒有可用網(wǎng)絡(luò)的時候敌完,延遲3秒再嘗試重連*/
mHandler.postDelayed(new Runnable() {
@Override
public void run() {
doClientConnection();//連接失敗储耐,重連(可關(guān)閉服務器進行模擬)
}
}, 5000);
// doClientConnection();//連接失敗,重連(可關(guān)閉服務器進行模擬)
}
};
通過客戶端發(fā)布消息
為方便起見滨溉,設(shè)置為靜態(tài)方法
通過創(chuàng)建的mqttAndroidClient客戶端發(fā)布消息
- topic:與服務端監(jiān)聽的topic一致什湘,并非是客戶端監(jiān)聽的topic
- message:要發(fā)布的String
- qos:服務質(zhì)量 0-2
- retained:是否message被服務器保持
/**
* 發(fā)布 (模擬其他客戶端發(fā)布消息)
*
* @param message 消息
*/
public static void publish(String topic, String message, int qos, boolean retained) {
try {
//參數(shù)分別為:主題、消息的字節(jié)數(shù)組晦攒、服務質(zhì)量闽撤、是否在服務器保留斷開連接后的最后一條消息
mqttAndroidClient.publish(topic, message.getBytes(), qos, retained);
} catch (MqttException e) {
e.printStackTrace();
}
}
服務端搭建
參考我的這篇文章 SpringBoot 集成Mqtt,protobuf服務端搭建
run springboot 項目脯颜,進行sub監(jiān)聽topic:mqtt/loop/message
***********************************************************************
Message Arrived at Time: 2019-12-10 12:01:52.724 Topic: mqtt/loop/message Message: test-mqtt
***********************************************************************
完整版Mqtt通訊
項目實現(xiàn)面板機與服務器間的通信
具體業(yè)務:
- 啟動哟旗、停止面板機
- 心跳
- 加組、刪組(人臉組)
- 加臉栋操、刪臉(人臉底庫)
- 抓拍消息
- 識別消息
完整協(xié)議
use: mqtt & protobuf
[topic] 定義規(guī)范:產(chǎn)品類別/產(chǎn)品名/產(chǎn)品類別id/消息流向/功能
[消息流向] 規(guī)范:流向服務器端為 in闸餐, 流向客戶端為 out
open camera:
// request
[topic]: panel/m5/12345/out/start
message Mode {
enum Mode {
capture = 0;
recognize = 1;
}
Mode mode = 1;
int32 health_check_interval = 2;
}
[last will]: panel/m5/12345/out/stop
// response (LOOP)
[topic]: panel/m5/12345/in/check
[last will]: panel/m5/12345/in/disconnect
recognize message:
// response (LOOP)
[topic]: panel/m5/12345/in/recognize
message Recognize {
message Result {
string group = 1;
string face = 2;
float score = 3;
}
int id = 1; // message id
Result top = 2;
required bytes image = 100; //crop face image binary ...
optional bytes full = 101; //full face image binary ...
}
add group:
// request
[topic]: panel/m5/12345/out/add_group
message Group {
string group = 1;
int32 top = 2;
float threshold = 3;
}
// response
[topic]: panel/m5/12345/int/add_group/:group
message Response {
required int32 success = 1; // 0: success others: error code
optional string err = 2; // "error: delete face falied. reason: xxxxxxx"
}
del group:
// request
[topic]: panel/m5/12345/out/del_group/:group
//response
[topic]: panel/m5/12345/in/del_group/:group
message Response {
required int32 success = 1; // 0: success others: error code
optional string err = 2; // "error: delete face falied. reason: xxxxxxx"
}
add face:
// request
[topic]: panel/m5/12345/out/add_face
message Face {
string group
string face
bytes image binary ...
}
// response
[topic]: panel/m5/12345/in/add_face/:group/:face
message Response {
required int32 success = 1; // 0: success others: error code
optional string err = 2; // "error: delete face falied. reason: xxxxxxx"
}
del face:
// request
[topic]: panel/m5/12345/out/del_face/:group/:face
// response
[topic]: panel/m5/12345/in/del_face/:group/:face
message Response {
required int32 success = 1; // 0: success others: error code
optional string err = 2; // "error: delete face falied. reason: xxxxxxx"
}
manual snapshot:
// request
[topic]: panel/m5/12345/out/snapshot
// response
[topic]: panel/m5/12345/in/snapshot
message Snapshot {
bytes image = 1; // snapshot image binary ...
}
創(chuàng)建 protocol 模塊封裝mqtt通信
依賴
創(chuàng)建 Android Library 命名:protocol. 此庫用于實現(xiàn)mqtt通訊相關(guān)業(yè)務邏輯
依賴庫
基于 Eclipse的paho庫實現(xiàn)mqtt協(xié)議
基于rxjava 實現(xiàn)調(diào)度
基于retrofit2 實現(xiàn)網(wǎng)絡(luò)請求
基于 Google protobuf實現(xiàn)數(shù)據(jù)傳輸過程的包裝
protocol/gradle.build 中添加依賴
dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
// 定義protobuf依賴,//使用精簡版
api 'com.google.protobuf:protobuf-java:3.1.0'
api 'com.google.protobuf:protoc:3.1.0'
api ('com.squareup.retrofit2:converter-protobuf:2.2.0') {
exclude group: 'com.google.protobuf', module: 'protobuf-java'
}
//mqtt support
api 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
api 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
//rxjava
api 'io.reactivex.rxjava2:rxjava:2.2.8'
api 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'com.android.support:appcompat-v7:28.0.0'
testImplementation 'junit:junit:4.12'
androidTestImplementation 'com.android.support.test:runner:1.0.2'
androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'
}
搭建.proto文件生成java類的環(huán)境
參考:http://www.reibang.com/p/1b78eb36ded7
1讼庇、 在 src/main
下創(chuàng)建protobuf文件夾創(chuàng)建mqtt.proto
文件
依據(jù)協(xié)議绎巨,映射到proto文件
syntax = "proto3";
option java_package = "com.braincs.attrsc.protocol.protobuf";
option java_outer_classname = "Protocol";
package pb;
message Rect {
int32 left = 1;
int32 top = 2;
int32 width = 3;
int32 height = 4;
}
message RectF {
float left = 1;
float top = 2;
float width = 3;
float height = 4;
}
message Response {
int32 success = 1;
string err = 2;
}
message Start {
enum Mode {
capture = 0;
recognize = 1;
}
Mode mode = 1;
bool is_open = 2;
int32 health_check_interval = 3;
}
message Group {
string group = 1;
int32 top = 2;
float threshold = 3;
}
message Face {
string group = 1;
string face = 2;
string name = 3;
string url = 10;
bytes image = 100;
}
message Capture {
int32 track = 1;
int32 seq_num = 2;
int64 timestamp = 10;
float quality = 11;
bytes crop = 100;
RectF crop_rect = 101;
bytes full = 200;
}
message Recognize {
message Result {
string face = 1;
string name = 2;
float score = 11;
}
repeated Result top = 1;
string group = 2;
bytes crop = 100;
bytes full = 101;
}
message SnapShot {
bytes image = 100;
}
message Upgrade {
int32 timeout = 1;
string url = 10;
}
message UpgradeProgress {
int32 progress = 1;
}
message Status {
string version = 1;
string algorithm = 2;
string local_ip = 10;
}
2、 在根Project/build.gradle中加入protobuf插件
buildscript {
repositories {
google()
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:3.2.0'
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.6' //添加這行
}
}
3蠕啄、在protocol/build.gradle中加入如下配置场勤,
頂部加上
apply plugin: 'com.android.library'
apply plugin: 'com.google.protobuf' //添加
android{}中加入
sourceSets {
main {
// 自動生成的java資源路徑
java {
srcDir 'src/main/java'
}
// 定義proto文件目錄
proto {
srcDir 'src/main/protobuf'
include '**/*.proto'
}
}
}
android{}同級加入:
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.1.0'
}
generateProtoTasks {
all().each { task ->
task.builtins {
remove java
}
task.builtins {
java {}
// Add cpp output without any option.
// DO NOT omit the braces if you want this builtin to be added.
cpp {}
}
}
}
//生成目錄
generatedFilesBaseDir = "$projectDir/src/generated"
}
同步項目即可生成proto對應的java類
如果未能正常生成:
- 安裝proto支持插件,Settings-->Plugins-->搜索protobuf-->找到Protobuf Support點擊安裝
- 重啟as此時porto文件會有一個彩環(huán)歼跟,并且編寫proto文件時也會有相應的提示
加入 Rxjava
承接Mqtt消息
Android內(nèi)部使用Rxjava進行消息通訊和媳,由于篇幅原因,在這里不在贅述 Rxjava
的具體實現(xiàn)
以啟動消息為例:
1哈街、創(chuàng)建ObservableQueue 消息隊列留瞳,用于MqttMessage的隊列
ObservableOnSubscribe<MqttMessage> messageQueueObservable = new ObservableOnSubscribe<MqttMessage>() {
@Override
public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {
msqStartEmitter = emitter;
}
};
2、創(chuàng)建Observable
Observable<Protocol.Start> msqStartObservable = Observable.create(messageQueueObservable)
.map(mqttStartFilter)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io());
3骚秦、關(guān)聯(lián)對應的Observer
if (observerStart != null)
msqStartObservable.subscribe(observerStart);
4她倘、創(chuàng)建消息轉(zhuǎn)換器mqttStartFilter
Observable提供map功能璧微,能自定義將消息轉(zhuǎn)換成protobuf包裝的消息體
private static Function<MqttMessage, Protocol.Start> mqttStartFilter = new Function<MqttMessage, Protocol.Start>() {
@Override
public Protocol.Start apply(MqttMessage mqttMessage) throws Exception {
return Protocol.Start.parseFrom(mqttMessage.getPayload());
}
};
5、關(guān)聯(lián)mqtt消息與rxjava 消息
初始化時候硬梁,設(shè)置了mqtt消息回調(diào)
private void init(){
//...
mqttAndroidClient.setCallback(mqttCallback); //設(shè)置監(jiān)聽訂閱消息的回調(diào)
//...
}
接受mqtt消息前硫,并通過msqStartEmitter.onNext(message);
將消息傳入rxjava
隊列
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "》》》》》》》》》》》" + topic);
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
//...
if (topic.equals(TOPIC_PANEL_SATAUS)) {
msqStartEmitter.onNext(message);
}
//...
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "連接斷開 ");
// doClientConnection();//連接斷開,重連
}
};
創(chuàng)建 Activity 使用rxjava與底層 MqttService進行通信
1荧止、在 onCreate()
中啟動Service屹电,并傳入rxjava的 Observer
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_protocal);
mView = this;
m5IsOpen = false;
initView();
initData();
getPermissions();
Config config = new Config.Builder().setServerUrl(HOST)
.setClientID(clientID)
.setStartObserver(observerStart)
.setSnapShotObserver(observerSnapshot)
.setAddGroupObserver(observerAddGroup)
.setAddFaceObserver(observerAddFace)
.setDelGroupObserver(observerDelGroup)
.setDelFaceObserver(observerDelFace)
.create();
MqttService.startService(this, config);
}
2、實現(xiàn)Observer跃巡,監(jiān)聽Mqtt消息 [Client-Sub]
以打開相機消息為例:
private Observer<Protocol.Start> observerStart = new Observer<Protocol.Start>() {
@Override
public void onSubscribe(Disposable d) {
disposerStart = d;
}
@Override
public void onNext(Protocol.Start start) {
boolean statusIsOpen = start.getIsOpen();
int statusCheckInterval = start.getHealthCheckInterval();
Protocol.Start.Mode statusMode = start.getMode();
Log.d(TAG, start.toString());
setReceiveMessage("Receive start request\n是否打開相機:" + statusIsOpen + "危号,相機檢測周期:" + statusCheckInterval + "相機的模式:" + statusMode.toString());
if (statusIsOpen && !m5IsOpen) {
m5IsOpen = true;
Log.d(TAG, "是否打開相機:" + statusIsOpen + ",相機檢測周期:" + statusCheckInterval);
Log.d(TAG, "相機的模式:" + statusMode.toString());
// MockM5Activity.start(ProtocalActivity.this);
setStatusMessage("Running");
MqttService.startChecking(statusCheckInterval, "meg-v1", "meg-v2", "192.168.1.10");
setSendMessage("Send check message every: " + statusCheckInterval + " s");
} else if (!statusIsOpen && m5IsOpen) {
m5IsOpen = false;
Log.d(TAG, "是否打開相機:" + statusIsOpen + "素邪,相機檢測周期:" + statusCheckInterval);
Log.d(TAG, "相機的模式:" + statusMode.toString());
// MockM5Activity.close();
setStatusMessage("Pause");
MqttService.stopChecking();
setSendMessage("Send stop checking");
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
注意:在Activity退出時候外莲,需要釋放對應的disposer,否則會內(nèi)存泄漏
3娘香、實現(xiàn)pub消息 [Client-Pub]
public void onClickSnapshot(View view) {
//test snapshot response
Protocol.SnapShot.Builder builder = Protocol.SnapShot.newBuilder();
Protocol.SnapShot snapShot = builder.setImage(ByteString.copyFrom(data))
.build();
Log.d(TAG, "pub an snapshot");
MqttService.responseSnapshot(snapShot);
}
與服務端聯(lián)調(diào)
在pc端啟動 springboot工程苍狰,使用以下curl命令進行調(diào)試
流程:使用curl命令办龄,觸發(fā)服務端的pub消息烘绽,客戶端接收sub消息,并返回response俐填。
1安接、啟動相機
? curl -X POST "http://127.0.0.1:8080/start" -d "modeStr=recognize&health_check_interval=20"
2、添加組
組名:test1
閾值:92
top值:1
? curl -XPOST "http://127.0.0.1:8080/group/add?url=mqtt://admin:admin@panel/m5/12345&group_name=test1&threshold=92.&top=1"
3英融、向組中添加人臉
組名:test1
cert_no:3333333
image_url:圖片地址
注意: image_url 需填寫本機的ip地址盏檐,確保Client和Server都在一個局域網(wǎng)內(nèi)
? curl -XPOST "http://127.0.0.1:8080/face/add?url=mqtt://admin:admin@panel/m5/12345&group_name=test1&cert_no=3333333&name=xxx&image_url=http://10.156.2.75:8080/static/test.png"
效果圖
工程開源地址 github 歡迎star,一起探討驶悟,提issue胡野,等等等~