介紹
MQTT是一種機(jī)器到機(jī)器消息協(xié)議萤晴,旨在為“物聯(lián)網(wǎng)”設(shè)備提供輕量級(jí)發(fā)布/訂閱通信吐句。它通常用于車(chē)輛的地理跟蹤車(chē)隊(duì),家庭自動(dòng)化店读,環(huán)境傳感器網(wǎng)絡(luò)和公用事業(yè)規(guī)模的數(shù)據(jù)收集嗦枢。
Mosquitto是一個(gè)實(shí)現(xiàn)了MQTT3.1協(xié)議的MQTT服務(wù)器(或代理 ,在MQTT中的用法)屯断,具有良好的社區(qū)支持文虏,易于安裝和配置。
在我的應(yīng)用場(chǎng)景中殖演,云服務(wù)器和食堂消費(fèi)終端需要數(shù)據(jù)交互氧秘,終端拉取服務(wù)器數(shù)據(jù)使用HTTP,服務(wù)器通知終端使用MQTT趴久。 本文的主旨在于記錄Mosquitto服務(wù)的安裝和使用丸相、基于Golang實(shí)現(xiàn)發(fā)布訂閱,以備日后查閱彼棍。
一灭忠、服務(wù)器安裝Mosquitto
服務(wù)器 CentOS Linux release 7.3.1611 (Core)
檢查系統(tǒng)是否已安裝EPEL(Extra Packages for Enterprise Linux)軟件源。
檢查安裝epel源
檢查 epel
是否已安裝
// 如下已存在epel源座硕,則不需要安裝
? yum repolist | grep epel
!epel/x86_64 Extra Packages for Enterprise Linux 7 - x86_64 13,242
如果不存在弛作,則需要安裝 epel
? yum -y install epel-release
安裝mosquitto
? yum -y install mosquitto
啟動(dòng)mosquitto
// 啟動(dòng)
? systemctl start mosquitto
// 停止
? systemctl stop mosquitto
// 重新啟動(dòng)
? systemctl restart mosquitto
// 查看運(yùn)行狀態(tài)
? systemctl status mosquitto
// 設(shè)為開(kāi)機(jī)啟動(dòng)
? systemctl enable mosquitto
測(cè)試Mosquitto
開(kāi)啟兩個(gè)終端窗口,一個(gè)負(fù)責(zé)訂閱华匾,另一個(gè)負(fù)責(zé)發(fā)布映琳。
// 訂閱
? mosquitto_sub -h localhost -t test
// 發(fā)布
? mosquitto_pub -h localhost -t test -m "hello world"
-
-h
host, 服務(wù)器的地址 -
-t
topic,發(fā)布的主題瘦真,訂閱的主題和發(fā)布的一致才會(huì)收到信息 -
-m
message, 消息內(nèi)容
mosquitto_pub
和 mosquitto_sub
參數(shù)詳解可以使用 man
命令查看刊头,或訪(fǎng)問(wèn)官方地址: mosquitto_pub命令詳解 mosquitto_sub命令詳解
修改Mosquitto配置
配置文件位置/etc/mosquitto/mosquitto.conf
,默認(rèn)無(wú)密碼可以訪(fǎng)問(wèn)诸尽,端口號(hào)為1883原杂。設(shè)置一下用戶(hù)名和密碼,mqtt 才會(huì)比較安全您机。
原配置文件內(nèi)容過(guò)多穿肄,為方便查看,新建一個(gè)空白文件代替默認(rèn)的际看。
- 新建用戶(hù)名密碼
// uname1 為你的用戶(hù)名
? mosquitto_passwd -c /etc/mosquitto/passwd username1
// 如果要添加多個(gè)用戶(hù)使用 -b 參數(shù)
// 必須在控制臺(tái)輸入明文的密碼淑玫,且文件不會(huì)覆蓋之前的
? mosquitto_passwd -b /etc/mosquitto/passwd username2 password2
- 備份默認(rèn)配置
? mv /etc/mosquitto/mosquitto.conf /etc/mosquitto/mosquitto.conf.example
- 新建文件
vim /etc/mosquitto/mosquitto.conf
史汗,寫(xiě)入如下內(nèi)容
# 禁止匿名訪(fǎng)問(wèn)
allow_anonymous false
# 用戶(hù)及密碼存儲(chǔ)文件
password_file /etc/mosquitto/passwd
# 端口號(hào)
port 8810
重啟測(cè)試
- 重啟mosquitto
? systemctl restart mosquitto
- 不帶用戶(hù)名和密碼進(jìn)行測(cè)試
// 指定端口號(hào)8810
// 不帶用戶(hù)名和密碼腥刹,訪(fǎng)問(wèn)被拒絕
? mosquitto_pub -h localhost -t test -p 8810 -m "hello world"
Connection error: Connection Refused: not authorised.
- 使用用戶(hù)名和密碼測(cè)試發(fā)布訂閱
在一個(gè)窗口執(zhí)行訂閱:
// -u 指定用戶(hù)名
// -P 指定密碼
? mosquitto_sub -h localhost -t test -p 8810 -u 'username1' -P 'password1'
在另一個(gè)終端窗口發(fā)布消息:
? mosquitto_pub -h localhost -t test -p 8810 -u 'username1' -P 'password1' -m 'hello world'
防火墻開(kāi)放端口
服務(wù)器使用的firewall喻奥,如果需要遠(yuǎn)程測(cè)試,需要打開(kāi)mosquitto端口
// 查看所有打開(kāi)的端口
? firewall-cmd --zone=public --list-ports
// 打開(kāi)8810端口
? firewall-cmd --zone=public --add-port=8810/tcp --permanent
// 重啟防火墻
? firewall-cmd --reload
二、Golang發(fā)布訂閱MQTT
項(xiàng)目目錄結(jié)構(gòu):
|____mqtt
| |____mqtt.go
|____mqtt_pub.go
|____mqtt_sub.go
|____service
| |____lib.go
封裝mqtt
使用包 github.com/eclipse/paho.mqtt.golang
封裝屑彻,修改其中的配置 Host验庙、UserName、Password
社牲。
新建文件mqtt/mqtt.go
:
package mqtt
import (
"encoding/json"
"errors"
"fmt"
gomqtt "github.com/eclipse/paho.mqtt.golang"
"strings"
"sync"
"time"
)
// mqtt服務(wù)器配置
const (
Host = "127.0.0.1:8810"
UserName = "test"
Password = "123456"
)
type Client struct {
nativeClient gomqtt.Client
clientOptions *gomqtt.ClientOptions
locker *sync.Mutex
// 消息收到之后處理函數(shù)
observer func(c *Client, msg *Message)
}
type Message struct {
// client_id
ClientID string `json:"client_id"`
// 接口名粪薛,訂閱號(hào)通過(guò)識(shí)別接口名處理相應(yīng)業(yè)務(wù)
Action string `json:"action"`
// 數(shù)據(jù)類(lèi)型
Type string `json:"type"`
// 發(fā)布時(shí)間
Time int64 `json:"time"`
// 業(yè)務(wù)數(shù)據(jù)的header,可以攜帶一些系統(tǒng)參數(shù)
Header interface{} `json:"header"`
// 業(yè)務(wù)數(shù)據(jù)的body搏恤,業(yè)務(wù)參數(shù)
Body interface{} `json:"body"`
}
func NewClient(clientId string) *Client {
clientOptions := gomqtt.NewClientOptions().
AddBroker(Host).
SetUsername(UserName).
SetPassword(Password).
SetClientID(clientId).
SetCleanSession(false).
SetAutoReconnect(true).
SetKeepAlive(120 * time.Second).
SetPingTimeout(10 * time.Second).
SetWriteTimeout(10 * time.Second).
SetOnConnectHandler(func(client gomqtt.Client) {
// 連接被建立后的回調(diào)函數(shù)
fmt.Println("Mqtt is connected!", "clientId", clientId)
}).
SetConnectionLostHandler(func(client gomqtt.Client, err error) {
// 連接被關(guān)閉后的回調(diào)函數(shù)
fmt.Println("Mqtt is disconnected!", "clientId", clientId, "reason", err.Error())
})
nativeClient := gomqtt.NewClient(clientOptions)
return &Client{
nativeClient: nativeClient,
clientOptions: clientOptions,
locker: &sync.Mutex{},
}
}
func (client *Client) GetClientID() string {
return client.clientOptions.ClientID
}
func (client *Client) Connect() error {
return client.ensureConnected()
}
// 確保連接
func (client *Client) ensureConnected() error {
if !client.nativeClient.IsConnected() {
client.locker.Lock()
defer client.locker.Unlock()
if !client.nativeClient.IsConnected() {
if token := client.nativeClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
}
}
return nil
}
// 發(fā)布消息
// retained: 是否保留信息
func (client *Client) Publish(topic string, qos byte, retained bool, data []byte) error {
if err := client.ensureConnected(); err != nil {
return err
}
token := client.nativeClient.Publish(topic, qos, retained, data)
if err := token.Error(); err != nil {
return err
}
// return false is the timeout occurred
if !token.WaitTimeout(time.Second * 10) {
return errors.New("mqtt publish wait timeout")
}
return nil
}
// 消費(fèi)消息
func (client *Client) Subscribe(observer func(c *Client, msg *Message), qos byte, topics ...string) error {
if len(topics) == 0 {
return errors.New("the topic is empty")
}
if observer == nil {
return errors.New("the observer func is nil")
}
if client.observer != nil {
return errors.New("an existing observer subscribed on this client, you must unsubscribe it before you subscribe a new observer")
}
client.observer = observer
filters := make(map[string]byte)
for _, topic := range topics {
filters[topic] = qos
}
client.nativeClient.SubscribeMultiple(filters, client.messageHandler)
return nil
}
func (client *Client) messageHandler(c gomqtt.Client, msg gomqtt.Message) {
if client.observer == nil {
fmt.Println("not subscribe message observer")
return
}
message, err := decodeMessage(msg.Payload())
if err != nil {
fmt.Println("failed to decode message")
return
}
client.observer(client, message)
}
func decodeMessage(payload []byte) (*Message, error) {
message := new(Message)
decoder := json.NewDecoder(strings.NewReader(string(payload)))
decoder.UseNumber()
if err := decoder.Decode(&message); err != nil {
return nil, err
}
return message, nil
}
func (client *Client) Unsubscribe(topics ...string) {
client.observer = nil
client.nativeClient.Unsubscribe(topics...)
}
map解析成struct
基于包github.com/goinggo/mapstructure
將訂閱者收到的map解析為相應(yīng)的struct违寿。
新建文件 service/lib.go
:
package service
import "github.com/goinggo/mapstructure"
// User 一個(gè)用于測(cè)試的struct
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
// MapToStruct map轉(zhuǎn)struct.
// mqtt解析字段直接使用項(xiàng)目通用的json標(biāo)簽
// mqtt傳輸int會(huì)轉(zhuǎn)為string(int),需要開(kāi)啟 WeaklyTypedInput
func MapToStruct(m interface{}, structPointer interface{}) error {
// https://godoc.org/github.com/mitchellh/mapstructure#DecoderConfig
config := &mapstructure.DecoderConfig{
TagName: "json",
Result: structPointer,
WeaklyTypedInput: true,
}
newDecode, _ := mapstructure.NewDecoder(config)
if err := newDecode.Decode(m); err != nil {
return err
}
return nil
}
發(fā)布者
新建文件 mqtt_pub.go
:
package main
import (
"encoding/json"
"github.com/astaxie/beego/logs"
"github.com/gushasha/go-mqtt/mqtt"
"github.com/gushasha/go-mqtt/service"
"time"
)
func main() {
const (
clientId = "pub-001"
// topic規(guī)則:設(shè)備編號(hào)/接口名
topicName = "device001/user"
actionNameUser = "user/detail"
)
client := mqtt.NewClient(clientId)
err := client.Connect()
if err != nil {
logs.Error(err.Error())
}
// 發(fā)布一個(gè) user 消息
body := service.User{1, "小寶", 2}
msg := &mqtt.Message{
ClientID: clientId,
Action: actionNameUser,
Type: "json",
Time: time.Now().Unix(),
Body: body,
}
data, _ := json.Marshal(msg)
err = client.Publish(topicName, 0, false, data)
if err != nil {
panic(err)
}
}
訂閱者
新建文件 mqtt_sub.go
:
package main
import (
"fmt"
"github.com/gushasha/go-mqtt/mqtt"
"github.com/gushasha/go-mqtt/service"
"sync"
)
var (
wg sync.WaitGroup
)
const (
clientId = "client-001"
topicName = "device001/#"
actionNameUser = "user/detail"
)
func main() {
client := mqtt.NewClient(clientId)
err := client.Connect()
if err != nil {
panic(err.Error())
}
wg.Add(1)
err = client.Subscribe(userHandler, 0, topicName)
if err != nil {
panic(err)
}
wg.Wait()
}
var userHandler = func(c *mqtt.Client, msg *mqtt.Message) {
switch msg.Action {
case actionNameUser:
// map 轉(zhuǎn) service.User
user := &service.User{}
err := service.MapToStruct(msg.Body, user);
if err != nil {
fmt.Println("error:", err)
} else {
HandleUser(user)
}
default:
fmt.Printf("unkonwn action %s \n", msg.Action)
}
}
func HandleUser(user *service.User) {
fmt.Printf("user: %#v \n", user)
}
執(zhí)行測(cè)試
// 在一個(gè)終端窗口執(zhí)行訂閱
? go run mqtt_sub.go
// 另一個(gè)終端窗口執(zhí)行發(fā)布
? go run mqtt_pub.go
// 訂閱端收到信息熟空,解析為struct:
// mqttuser: &service.User{ID:1, Name:"小寶", Age:2}