基于 MQTT 協(xié)議的推送服務(wù)

一略就、簡述

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議)藏澳,是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布颈嚼。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬饭寺,為連接遠(yuǎn)程設(shè)備提供實時可靠的消息服務(wù)阻课。作為一種低開銷、低帶寬占用的即時通訊協(xié)議艰匙,使其在物聯(lián)網(wǎng)限煞、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng)用员凝。

MQTT是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議署驻。MQTT協(xié)議是輕量、簡單健霹、開放和易于實現(xiàn)的旺上,這些特點使它適用范圍非常廣泛。在很多情況下糖埋,包括受限的環(huán)境中宣吱,如:機器與機器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在瞳别,通過衛(wèi)星鏈路通信傳感器征候、偶爾撥號的醫(yī)療設(shè)備、智能家居洒试、及一些小型化設(shè)備中已廣泛使用倍奢。

二、apache-Apollo 服務(wù)器搭建

下載地址:http://activemq.apache.org/apollo/download.html

  • 1.將下載后的 壓縮包進(jìn)行解壓:


    apollo.png
  • 2.打開 cmd 運行 bin/apollo.cmd


    apollo.png
  • 3.創(chuàng)建一個服務(wù)器實例


    image.png
  • 4.在服務(wù)中打開 apollo 服務(wù)


    image.png
  • 5.啟動服務(wù)后垒棋,在瀏覽器上面輸入:https://127.0.0.1:61681/http://127.0.0.1:61680/

    apollo控制臺

初始賬號: admin 密碼:password
  • 6.進(jìn)入apollo控制臺


    image.png

    進(jìn)入此頁面說明服務(wù)器成功搭建

三卒煞、服務(wù)端推送和客戶端監(jiān)聽
  • SeverClass
package com.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ServerMQTT {
    //tcp://MQTT安裝的服務(wù)器地址:MQTT定義的端口號
    public static final String HOST = "tcp://0.0.0.0:61613";
    //定義一個主題
    public static final String TOPIC = "message";
    //定義MQTT的ID,可以在MQTT服務(wù)配置中指定
    private static final String clientid = "server11";

    private MqttClient client;
    private MqttTopic topic11;
    private String userName = "admin";  //非必須
    private String passWord = "password";  //非必須

    private MqttMessage message;

    /**
     * 構(gòu)造函數(shù)
     * @throws MqttException
     */
    public ServerMQTT() throws MqttException {
        // MemoryPersistence設(shè)置clientid的保存形式叼架,默認(rèn)為以內(nèi)存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }

    /**
     *  用來連接服務(wù)器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 設(shè)置超時時間
        options.setConnectionTimeout(10);
        // 設(shè)置會話心跳時間
        options.setKeepAliveInterval(20);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);

            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete());
    }

    /**
     *  啟動入口
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException {
        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();
        server.message.setQos(1);  //保證消息能到達(dá)一次
        server.message.setRetained(true);
        server.message.setPayload("這是服務(wù)器發(fā)出的信息的內(nèi)容".getBytes());
        server.publish(server.topic11 , server.message);
        System.out.println(server.message.isRetained() + "------ratained狀態(tài)");

    }

}
  • ClientClass
package com.mqtt;

import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientMQTT {

    public static final String HOST = "tcp://0.0.0.0:61613";
    public static final String TOPIC1 = "message";
    private static final String clientid = "client11";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "admin";    //非必須
    private String passWord = "password";  //非必須
    @SuppressWarnings("unused")
    private ScheduledExecutorService scheduler;

    private void start() {
        try {
            // host為主機名畔裕,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識符表示乖订,MemoryPersistence設(shè)置clientid的保存形式扮饶,默認(rèn)為以內(nèi)存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的連接設(shè)置
            options = new MqttConnectOptions();
            // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
            options.setCleanSession(false);
            // 設(shè)置連接的用戶名
            options.setUserName(userName);
            // 設(shè)置連接的密碼
            options.setPassword(passWord.toCharArray());
            // 設(shè)置超時時間 單位為秒
            options.setConnectionTimeout(10);
            // 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線乍构,但這個方法并沒有重連的機制
            options.setKeepAliveInterval(20);
            // 設(shè)置回調(diào)
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(TOPIC1);
            //setWill方法甜无,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息
            //options.setWill(topic, "close".getBytes(), 2, true);
            client.connect(options);
            //訂閱消息
            //訂閱消息

            client.subscribe(TOPIC1, 1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        ClientMQTT client = new ClientMQTT();
        client.start();
    }
}
  • server


    server
  • client


    client
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市岂丘,隨后出現(xiàn)的幾起案子陵究,更是在濱河造成了極大的恐慌,老刑警劉巖奥帘,帶你破解...
    沈念sama閱讀 223,207評論 6 521
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铜邮,死亡現(xiàn)場離奇詭異,居然都是意外死亡寨蹋,警方通過查閱死者的電腦和手機松蒜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,455評論 3 400
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來已旧,“玉大人秸苗,你說我怎么就攤上這事≡送剩” “怎么了难述?”我有些...
    開封第一講書人閱讀 170,031評論 0 366
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吐句。 經(jīng)常有香客問我,道長店读,這世上最難降的妖魔是什么嗦枢? 我笑而不...
    開封第一講書人閱讀 60,334評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮屯断,結(jié)果婚禮上文虏,老公的妹妹穿的比我還像新娘。我一直安慰自己殖演,他們只是感情好氧秘,可當(dāng)我...
    茶點故事閱讀 69,322評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著趴久,像睡著了一般丸相。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上彼棍,一...
    開封第一講書人閱讀 52,895評論 1 314
  • 那天灭忠,我揣著相機與錄音,去河邊找鬼座硕。 笑死弛作,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的华匾。 我是一名探鬼主播映琳,決...
    沈念sama閱讀 41,300評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了萨西?” 一聲冷哼從身側(cè)響起有鹿,我...
    開封第一講書人閱讀 40,264評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎原杂,沒想到半個月后印颤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,784評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡穿肄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,870評論 3 343
  • 正文 我和宋清朗相戀三年年局,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咸产。...
    茶點故事閱讀 40,989評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡矢否,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出脑溢,到底是詐尸還是另有隱情僵朗,我是刑警寧澤,帶...
    沈念sama閱讀 36,649評論 5 351
  • 正文 年R本政府宣布屑彻,位于F島的核電站验庙,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏社牲。R本人自食惡果不足惜粪薛,卻給世界環(huán)境...
    茶點故事閱讀 42,331評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望搏恤。 院中可真熱鬧违寿,春花似錦、人聲如沸熟空。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,814評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽息罗。三九已至掂咒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間迈喉,已是汗流浹背俏扩。 一陣腳步聲響...
    開封第一講書人閱讀 33,940評論 1 275
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留弊添,地道東北人录淡。 一個月前我還...
    沈念sama閱讀 49,452評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像油坝,于是被迫代替她去往敵國和親嫉戚。 傳聞我的和親對象是個殘疾皇子刨裆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,995評論 2 361

推薦閱讀更多精彩內(nèi)容