目錄
MQTT簡(jiǎn)介
MQTT是機(jī)器對(duì)機(jī)器(M2M)/物聯(lián)網(wǎng)(IoT)連接協(xié)議。它被設(shè)計(jì)為一個(gè)極其輕量級(jí)的發(fā)布/訂閱消息傳輸協(xié)議。對(duì)于需要較小代碼占用空間和/或網(wǎng)絡(luò)帶寬非常寶貴的遠(yuǎn)程連接非常有用绊谭,是專為受限設(shè)備和低帶寬、高延遲或不可靠的網(wǎng)絡(luò)而設(shè)計(jì)尉辑。這些原則也使該協(xié)議成為新興的“機(jī)器到機(jī)器”(M2M)或物聯(lián)網(wǎng)(IoT)世界的連接設(shè)備积瞒,以及帶寬和電池功率非常高的移動(dòng)應(yīng)用的理想選擇。例如丐一,它已被用于通過衛(wèi)星鏈路與代理通信的傳感器藻糖、與醫(yī)療服務(wù)提供者的撥號(hào)連接,以及一系列家庭自動(dòng)化和小型設(shè)備場(chǎng)景库车。它也是移動(dòng)應(yīng)用的理想選擇巨柒,因?yàn)樗w積小,功耗低,數(shù)據(jù)包最小洋满,并且可以有效地將信息分配給一個(gè)或多個(gè)接收器晶乔。
效果演示
這是我連接的MQTT中文網(wǎng)的公共服務(wù)器
基礎(chǔ)知識(shí)
這里開發(fā)客戶端需要的知識(shí)不多,paho的核心庫(kù)都封裝好了芦岂,我們只需要了解下基礎(chǔ)的知識(shí)就行了瘪弓。
1.連接
這里我使用的MQTT3.1.1協(xié)議垫蛆,服務(wù)器地址是以tcp開頭的末尾加上端口號(hào)1883
val server = "tcp://mqtt.p2hp.com:1883" //服務(wù)端地址
其他的一些參數(shù)如下:
clientId:(作為客戶端的標(biāo)識(shí))禽最,這里我使用的是AndroidID,獲取不到的話就使用生成的一個(gè)UUID
CleanSession:設(shè)置不持久化的話袱饭,每次都是一次新會(huì)話
keepAliveInterval:發(fā)送心跳包的時(shí)間
val androidId = DeviceUtils.getAndroidID()
val clientId = if(!TextUtils.isEmpty(androidId)){
androidId
}else{
UUID.randomUUID().toString()
}
mqttClient = MqttAndroidClient(context,serverUrl,clientId)
connectOptions = MqttConnectOptions().apply {
isCleanSession = false //是否會(huì)話持久化
connectionTimeout = 30 //連接超時(shí)時(shí)間
keepAliveInterval = 10 //發(fā)送心跳時(shí)間
userName = name //如果設(shè)置了認(rèn)證川无,填的用戶名
password = pass.toCharArray() //用戶密碼
}
2.訂閱和發(fā)布
(1) 訂閱
這里的概念就好像你微博關(guān)注了一個(gè)博主,然后當(dāng)博主發(fā)布新的動(dòng)態(tài)虑乖,你這就可以收到懦趋,而這里的訂閱就是類似關(guān)注,訂閱的主題格式跟文件路徑差不多疹味,比如訂閱一個(gè)topic/1仅叫,當(dāng)然這里也有帶通配符的訂閱方式比如topic/#,#的意思就是匹配所有糙捺,也就是當(dāng)你訂閱了topic/#的主題你就可以收到所有topic開頭的主題消息诫咱,像topic/1、topic/2洪灯、topic/3等坎缭。
(2) 發(fā)布
發(fā)布消息的時(shí)候也需要指定一個(gè)主題,比如topic/1签钩,但是不能指定帶通配符的主題掏呼。
(3) QOS
另外還有一個(gè)比較重要的概念就是QOS(服務(wù)質(zhì)量),這里有3個(gè)值(0,1,2)铅檩,代表的意義如下:
0:代表憎夷,Sender 發(fā)送的一條消息,Receiver 最多能收到一次昧旨,也就是說(shuō) Sender 盡力向 Receiver 發(fā)送消息岭接,如果發(fā)送失敗,也就算了臼予。
1:代表鸣戴,Sender 發(fā)送的一條消息,Receiver 至少能收到一次粘拾,也就是說(shuō) Sender 向 Receiver 發(fā)送消息窄锅,如果發(fā)送失敗,會(huì)繼續(xù)重試,直到 Receiver 收到消息為止入偷,但是因?yàn)橹貍鞯脑蜃仿浚琑eceiver 有可能會(huì)收到重復(fù)的消息。
2:代表疏之,Sender 發(fā)送的一條消息殿雪,Receiver 確保能收到而且只收到一次,也就是說(shuō) Sender 盡力向 Receiver 發(fā)送消息锋爪,如果發(fā)送失敗丙曙,會(huì)繼續(xù)重試,直到 Receiver 收到消息為止其骄,同時(shí)保證 Receiver 不會(huì)因?yàn)橄⒅貍鞫盏街貜?fù)的消息亏镰。
實(shí)現(xiàn)步驟
1.引入依賴
這里本來(lái)我是用的這個(gè)庫(kù)https://github.com/eclipse/paho.mqtt.android,但是這個(gè)庫(kù)不適配Android12因此我下載了源碼調(diào)整了下拯爽,重新自己封裝了一個(gè)索抓,如下:
allprojects {
repositories {
...
maven { url 'https://jitpack.io' }
}
}
dependencies {
implementation 'com.github.itfitness:MQTTAndroid:1.0.0'
}
2.封裝方法
我對(duì)這個(gè)庫(kù)進(jìn)行了一些封裝,如下:
class MQTTHelper{
private val mqttClient: MqttAndroidClient
private val connectOptions: MqttConnectOptions
private var mqttActionListener: IMqttActionListener? = null
constructor(context: Context, serverUrl:String, name:String, pass:String){
val macAddress = DeviceUtils.getAndroidID()
val clientId = if(!TextUtils.isEmpty(macAddress)){
macAddress
}else{
UUID.randomUUID().toString()
}
mqttClient = MqttAndroidClient(context,serverUrl,clientId)
connectOptions = MqttConnectOptions().apply {
isCleanSession = false
connectionTimeout = 30
keepAliveInterval = 10
userName = name
password = pass.toCharArray()
}
}
/**
* 連接
* @param mqttCallback 接到訂閱的消息的回調(diào)
* @param isFailRetry 失敗是否重新連接
*/
fun connect(topic: Topic, qos: Qos, isFailRetry:Boolean, mqttCallback: MqttCallback){
mqttClient.setCallback(mqttCallback)
if(mqttActionListener == null){
mqttActionListener = object :IMqttActionListener{
override fun onSuccess(asyncActionToken: IMqttToken?) {
LogUtils.eTag("連接","連接成功")
subscribe(topic,qos)
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
//失敗重連
LogUtils.eTag("連接","連接失敗重試${exception?.message}")
if (isFailRetry){
mqttClient.connect(connectOptions,null,mqttActionListener)
}
}
}
}
mqttClient.connect(connectOptions,null,mqttActionListener)
}
/**
* 訂閱
*/
private fun subscribe(topic: Topic,qos:Qos){
mqttClient.subscribe(topic.value(),qos.value())
}
/**
* 發(fā)布
*/
fun publish(topic:Topic,message:String,qos:Qos){
val msg = MqttMessage()
msg.isRetained = false
msg.payload = message.toByteArray()
msg.qos = qos.value()
mqttClient.publish(topic.value(),msg)
}
/**
* 斷開連接
*/
fun disconnect(){
mqttClient.disconnect()
}
}
enum class Qos{
QOS_ZERO{
override fun value():Int{
return 0
}
},
QOS_ONE{
override fun value():Int{
return 1
}
},
QOS_TWO{
override fun value():Int{
return 2
}
};
abstract fun value(): Int
}
enum class Topic{
//訂閱主題
TOPIC_MSG{
override fun value():String{
return "testtopic/#"
}
},
//發(fā)布主題
TOPIC_SEND{
override fun value():String{
return "testtopic/1"
}
};
abstract fun value(): String
}
3.使用
在Activity中的使用如下:
class MainActivity : AppCompatActivity() {
@RequiresApi(Build.VERSION_CODES.N)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val server = "tcp://mqtt.p2hp.com:1883" //服務(wù)端地址
val mqttHelper = MQTTHelper(this,server,"123","123")
mqttHelper.connect(Topic.TOPIC_MSG, Qos.QOS_TWO,false,object : MqttCallback {
override fun connectionLost(cause: Throwable?) {
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
//收到消息
message?.payload?.let { ToastUtils.showShort(String(it)) }
LogUtils.eTag("消息", message?.payload?.let { String(it) })
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
}
})
val etMsg = findViewById<EditText>(R.id.et_msg)
findViewById<Button>(R.id.tv_send).setOnClickListener {
//發(fā)送消息
mqttHelper.publish(Topic.TOPIC_SEND,etMsg.text.toString(),Qos.QOS_TWO)
}
}
}