Centos7搭建Mosquitto难述,Golang使用MQTT

介紹

MQTT是一種機(jī)器到機(jī)器消息協(xié)議萤晴,旨在為“物聯(lián)網(wǎng)”設(shè)備提供輕量級(jí)發(fā)布/訂閱通信吐句。它通常用于車(chē)輛的地理跟蹤車(chē)隊(duì),家庭自動(dòng)化店读,環(huán)境傳感器網(wǎng)絡(luò)和公用事業(yè)規(guī)模的數(shù)據(jù)收集嗦枢。
Mosquitto是一個(gè)實(shí)現(xiàn)了MQTT3.1協(xié)議的MQTT服務(wù)器(或代理 ,在MQTT中的用法)屯断,具有良好的社區(qū)支持文虏,易于安裝和配置。
在我的應(yīng)用場(chǎng)景中殖演,云服務(wù)器和食堂消費(fèi)終端需要數(shù)據(jù)交互氧秘,終端拉取服務(wù)器數(shù)據(jù)使用HTTP,服務(wù)器通知終端使用MQTT趴久。 本文的主旨在于記錄Mosquitto服務(wù)的安裝和使用丸相、基于Golang實(shí)現(xiàn)發(fā)布訂閱,以備日后查閱彼棍。

一灭忠、服務(wù)器安裝Mosquitto

服務(wù)器 CentOS Linux release 7.3.1611 (Core)
檢查系統(tǒng)是否已安裝EPEL(Extra Packages for Enterprise Linux)軟件源。

檢查安裝epel源

檢查 epel 是否已安裝

// 如下已存在epel源座硕,則不需要安裝
? yum repolist | grep epel
!epel/x86_64               Extra Packages for Enterprise Linux 7 - x86_64 13,242

如果不存在弛作,則需要安裝 epel

? yum -y install epel-release

安裝mosquitto

? yum -y install mosquitto

啟動(dòng)mosquitto

// 啟動(dòng)
? systemctl start mosquitto
// 停止
? systemctl stop mosquitto
// 重新啟動(dòng)
? systemctl restart mosquitto
// 查看運(yùn)行狀態(tài) 
? systemctl status mosquitto
// 設(shè)為開(kāi)機(jī)啟動(dòng)
? systemctl enable mosquitto

測(cè)試Mosquitto

開(kāi)啟兩個(gè)終端窗口,一個(gè)負(fù)責(zé)訂閱华匾,另一個(gè)負(fù)責(zé)發(fā)布映琳。


// 訂閱
? mosquitto_sub -h localhost -t test

// 發(fā)布
? mosquitto_pub -h localhost -t test -m "hello world"
  • -h host, 服務(wù)器的地址
  • -t topic,發(fā)布的主題瘦真,訂閱的主題和發(fā)布的一致才會(huì)收到信息
  • -m message, 消息內(nèi)容

mosquitto_pubmosquitto_sub 參數(shù)詳解可以使用 man命令查看刊头,或訪(fǎng)問(wèn)官方地址: mosquitto_pub命令詳解 mosquitto_sub命令詳解

修改Mosquitto配置

配置文件位置/etc/mosquitto/mosquitto.conf,默認(rèn)無(wú)密碼可以訪(fǎng)問(wèn)诸尽,端口號(hào)為1883原杂。設(shè)置一下用戶(hù)名和密碼,mqtt 才會(huì)比較安全您机。
原配置文件內(nèi)容過(guò)多穿肄,為方便查看,新建一個(gè)空白文件代替默認(rèn)的际看。

  • 新建用戶(hù)名密碼
// uname1 為你的用戶(hù)名
? mosquitto_passwd -c /etc/mosquitto/passwd username1

// 如果要添加多個(gè)用戶(hù)使用 -b 參數(shù) 
// 必須在控制臺(tái)輸入明文的密碼淑玫,且文件不會(huì)覆蓋之前的
? mosquitto_passwd -b /etc/mosquitto/passwd username2 password2
  • 備份默認(rèn)配置
? mv /etc/mosquitto/mosquitto.conf /etc/mosquitto/mosquitto.conf.example
  • 新建文件 vim /etc/mosquitto/mosquitto.conf史汗,寫(xiě)入如下內(nèi)容
# 禁止匿名訪(fǎng)問(wèn)
allow_anonymous false
# 用戶(hù)及密碼存儲(chǔ)文件
password_file /etc/mosquitto/passwd
# 端口號(hào)
port 8810

重啟測(cè)試

  • 重啟mosquitto
? systemctl restart mosquitto
  • 不帶用戶(hù)名和密碼進(jìn)行測(cè)試
// 指定端口號(hào)8810
// 不帶用戶(hù)名和密碼腥刹,訪(fǎng)問(wèn)被拒絕
? mosquitto_pub -h localhost -t test -p 8810 -m "hello world"
Connection error: Connection Refused: not authorised.
  • 使用用戶(hù)名和密碼測(cè)試發(fā)布訂閱

在一個(gè)窗口執(zhí)行訂閱:

// -u 指定用戶(hù)名
// -P 指定密碼
? mosquitto_sub -h localhost -t test -p 8810  -u 'username1' -P 'password1'

在另一個(gè)終端窗口發(fā)布消息:

? mosquitto_pub -h localhost -t test -p 8810  -u 'username1' -P 'password1' -m 'hello world'

防火墻開(kāi)放端口

服務(wù)器使用的firewall喻奥,如果需要遠(yuǎn)程測(cè)試,需要打開(kāi)mosquitto端口

// 查看所有打開(kāi)的端口
? firewall-cmd --zone=public --list-ports
// 打開(kāi)8810端口
? firewall-cmd --zone=public --add-port=8810/tcp --permanent
// 重啟防火墻
? firewall-cmd --reload

二、Golang發(fā)布訂閱MQTT

項(xiàng)目目錄結(jié)構(gòu):

|____mqtt
| |____mqtt.go
|____mqtt_pub.go
|____mqtt_sub.go
|____service
| |____lib.go

封裝mqtt

使用包 github.com/eclipse/paho.mqtt.golang 封裝屑彻,修改其中的配置 Host验庙、UserName、Password社牲。
新建文件mqtt/mqtt.go

package mqtt

import (
    "encoding/json"
    "errors"
    "fmt"
    gomqtt "github.com/eclipse/paho.mqtt.golang"
    "strings"
    "sync"
    "time"
)

// mqtt服務(wù)器配置
const (
    Host     = "127.0.0.1:8810"
    UserName = "test"
    Password = "123456"
)

type Client struct {
    nativeClient  gomqtt.Client
    clientOptions *gomqtt.ClientOptions
    locker        *sync.Mutex
    // 消息收到之后處理函數(shù)
    observer func(c *Client, msg *Message)
}

type Message struct {
    // client_id
    ClientID string `json:"client_id"`
    // 接口名粪薛,訂閱號(hào)通過(guò)識(shí)別接口名處理相應(yīng)業(yè)務(wù)
    Action string `json:"action"`
    // 數(shù)據(jù)類(lèi)型
    Type string `json:"type"`
    // 發(fā)布時(shí)間
    Time int64 `json:"time"`
    // 業(yè)務(wù)數(shù)據(jù)的header,可以攜帶一些系統(tǒng)參數(shù)
    Header interface{} `json:"header"`
    // 業(yè)務(wù)數(shù)據(jù)的body搏恤,業(yè)務(wù)參數(shù)
    Body interface{} `json:"body"`
}

func NewClient(clientId string) *Client {
    clientOptions := gomqtt.NewClientOptions().
        AddBroker(Host).
        SetUsername(UserName).
        SetPassword(Password).
        SetClientID(clientId).
        SetCleanSession(false).
        SetAutoReconnect(true).
        SetKeepAlive(120 * time.Second).
        SetPingTimeout(10 * time.Second).
        SetWriteTimeout(10 * time.Second).
        SetOnConnectHandler(func(client gomqtt.Client) {
            // 連接被建立后的回調(diào)函數(shù)
            fmt.Println("Mqtt is connected!", "clientId", clientId)
        }).
        SetConnectionLostHandler(func(client gomqtt.Client, err error) {
            // 連接被關(guān)閉后的回調(diào)函數(shù)
            fmt.Println("Mqtt is disconnected!", "clientId", clientId, "reason", err.Error())
        })

    nativeClient := gomqtt.NewClient(clientOptions)

    return &Client{
        nativeClient:  nativeClient,
        clientOptions: clientOptions,
        locker:        &sync.Mutex{},
    }
}

func (client *Client) GetClientID() string {
    return client.clientOptions.ClientID
}

func (client *Client) Connect() error {
    return client.ensureConnected()
}

// 確保連接
func (client *Client) ensureConnected() error {
    if !client.nativeClient.IsConnected() {
        client.locker.Lock()
        defer client.locker.Unlock()
        if !client.nativeClient.IsConnected() {
            if token := client.nativeClient.Connect(); token.Wait() && token.Error() != nil {
                return token.Error()
            }
        }
    }
    return nil
}

// 發(fā)布消息
// retained: 是否保留信息
func (client *Client) Publish(topic string, qos byte, retained bool, data []byte) error {
    if err := client.ensureConnected(); err != nil {
        return err
    }

    token := client.nativeClient.Publish(topic, qos, retained, data)
    if err := token.Error(); err != nil {
        return err
    }

    // return false is the timeout occurred
    if !token.WaitTimeout(time.Second * 10) {
        return errors.New("mqtt publish wait timeout")
    }

    return nil
}

// 消費(fèi)消息
func (client *Client) Subscribe(observer func(c *Client, msg *Message), qos byte, topics ...string) error {
    if len(topics) == 0 {
        return errors.New("the topic is empty")
    }

    if observer == nil {
        return errors.New("the observer func is nil")
    }

    if client.observer != nil {
        return errors.New("an existing observer subscribed on this client, you must unsubscribe it before you subscribe a new observer")
    }
    client.observer = observer

    filters := make(map[string]byte)
    for _, topic := range topics {
        filters[topic] = qos
    }
    client.nativeClient.SubscribeMultiple(filters, client.messageHandler)

    return nil
}

func (client *Client) messageHandler(c gomqtt.Client, msg gomqtt.Message) {
    if client.observer == nil {
        fmt.Println("not subscribe message observer")
        return
    }
    message, err := decodeMessage(msg.Payload())
    if err != nil {
        fmt.Println("failed to decode message")
        return
    }
    client.observer(client, message)
}

func decodeMessage(payload []byte) (*Message, error) {
    message := new(Message)
    decoder := json.NewDecoder(strings.NewReader(string(payload)))
    decoder.UseNumber()
    if err := decoder.Decode(&message); err != nil {
        return nil, err
    }
    return message, nil
}

func (client *Client) Unsubscribe(topics ...string) {
    client.observer = nil
    client.nativeClient.Unsubscribe(topics...)
}

map解析成struct

基于包github.com/goinggo/mapstructure 將訂閱者收到的map解析為相應(yīng)的struct违寿。
新建文件 service/lib.go

package service

import "github.com/goinggo/mapstructure"

// User 一個(gè)用于測(cè)試的struct
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// MapToStruct map轉(zhuǎn)struct.
// mqtt解析字段直接使用項(xiàng)目通用的json標(biāo)簽
// mqtt傳輸int會(huì)轉(zhuǎn)為string(int),需要開(kāi)啟 WeaklyTypedInput
func MapToStruct(m interface{}, structPointer interface{}) error {
    // https://godoc.org/github.com/mitchellh/mapstructure#DecoderConfig
    config := &mapstructure.DecoderConfig{
        TagName:          "json",
        Result:           structPointer,
        WeaklyTypedInput: true,
    }
    newDecode, _ := mapstructure.NewDecoder(config)

    if err := newDecode.Decode(m); err != nil {
        return err
    }
    return nil
}

發(fā)布者

新建文件 mqtt_pub.go

package main

import (
    "encoding/json"
    "github.com/astaxie/beego/logs"
    "github.com/gushasha/go-mqtt/mqtt"
    "github.com/gushasha/go-mqtt/service"
    "time"
)

func main() {

    const (
        clientId = "pub-001"
        // topic規(guī)則:設(shè)備編號(hào)/接口名
        topicName      = "device001/user"
        actionNameUser = "user/detail"
    )

    client := mqtt.NewClient(clientId)
    err := client.Connect()
    if err != nil {
        logs.Error(err.Error())
    }

    // 發(fā)布一個(gè) user 消息
    body := service.User{1, "小寶", 2}

    msg := &mqtt.Message{
        ClientID: clientId,
        Action:   actionNameUser,
        Type:     "json",
        Time:     time.Now().Unix(),
        Body:     body,
    }
    data, _ := json.Marshal(msg)
    err = client.Publish(topicName, 0, false, data)
    if err != nil {
        panic(err)
    }
}

訂閱者

新建文件 mqtt_sub.go

package main

import (
    "fmt"
    "github.com/gushasha/go-mqtt/mqtt"
    "github.com/gushasha/go-mqtt/service"
    "sync"
)

var (
    wg sync.WaitGroup
)

const (
    clientId = "client-001"
    topicName      = "device001/#"
    actionNameUser = "user/detail"
)

func main() {

    client := mqtt.NewClient(clientId)
    err := client.Connect()
    if err != nil {
        panic(err.Error())
    }

    wg.Add(1)
    err = client.Subscribe(userHandler, 0, topicName)
    if err != nil {
        panic(err)
    }
    wg.Wait()
}

var userHandler = func(c *mqtt.Client, msg *mqtt.Message) {
    switch msg.Action {
    case actionNameUser:
        // map 轉(zhuǎn) service.User
        user := &service.User{}
        err := service.MapToStruct(msg.Body, user);
        if err != nil {
            fmt.Println("error:", err)
        } else {
            HandleUser(user)
        }
    default:
        fmt.Printf("unkonwn action %s \n", msg.Action)
    }
}

func HandleUser(user *service.User) {
    fmt.Printf("user: %#v \n", user)
}

執(zhí)行測(cè)試

// 在一個(gè)終端窗口執(zhí)行訂閱
? go run mqtt_sub.go

// 另一個(gè)終端窗口執(zhí)行發(fā)布
? go run mqtt_pub.go

// 訂閱端收到信息熟空,解析為struct:
// mqttuser: &service.User{ID:1, Name:"小寶", Age:2}

參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末藤巢,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子息罗,更是在濱河造成了極大的恐慌菌瘪,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件阱当,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡糜工,警方通過(guò)查閱死者的電腦和手機(jī)弊添,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)捌木,“玉大人油坝,你說(shuō)我怎么就攤上這事∨亳桑” “怎么了澈圈?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)帆啃。 經(jīng)常有香客問(wèn)我瞬女,道長(zhǎng),這世上最難降的妖魔是什么努潘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任诽偷,我火速辦了婚禮,結(jié)果婚禮上疯坤,老公的妹妹穿的比我還像新娘报慕。我一直安慰自己,他們只是感情好压怠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布眠冈。 她就那樣靜靜地躺著,像睡著了一般菌瘫。 火紅的嫁衣襯著肌膚如雪蜗顽。 梳的紋絲不亂的頭發(fā)上布卡,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音诫舅,去河邊找鬼羽利。 笑死,一個(gè)胖子當(dāng)著我的面吹牛刊懈,可吹牛的內(nèi)容都是我干的这弧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼虚汛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼匾浪!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起卷哩,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蛋辈,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后将谊,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體冷溶,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年尊浓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了逞频。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡栋齿,死狀恐怖苗胀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情瓦堵,我是刑警寧澤基协,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站菇用,受9級(jí)特大地震影響澜驮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜刨疼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一泉唁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧揩慕,春花似錦亭畜、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至,卻和暖如春劲藐,著一層夾襖步出監(jiān)牢的瞬間八堡,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工聘芜, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兄渺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓汰现,卻偏偏與公主長(zhǎng)得像挂谍,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瞎饲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345