MQTT消息通訊及Apollo通訊服務(wù)器的搭建

MQTT通訊及服務(wù)器的搭建

Apollo服務(wù)器的搭建

1:Apollo服務(wù)器下載

首先從http://activemq.apache.org/apollo/download.html官網(wǎng)上下載windows對應(yīng)的apollo版本羡宙,本文下載的是apache-apollo-1.7.1-windows-distro.zip 版本茶鉴。windows的版本為win10研铆,JDK版本1.8。


2.解壓到C:\apache-apollo下晦溪,此時會多出一個apache-apollo-1.7.1文件夾荧嵌。

3.然后以管理員的身份運行cmd,進(jìn)入到如下目錄C:\apache-apollo\apache-apollo-1.7.1\bin亭姥,如下圖所示:


4.然后就是要創(chuàng)建broker,這里是創(chuàng)建在C:\apache-apollo\broker

的目錄下顾稀,執(zhí)行如下命令:apollo create myapolloC:\apache-apollo\broker


5.broker創(chuàng)建成功的提示如下圖所示:


6.創(chuàng)建完broker之后就是要運行apollo达罗,進(jìn)入C:\apache-apollo\broker\bin目錄下,執(zhí)行如下命令:apollo-brokerrun


7.apollo運行成功的提示础拨,如下圖所示:


8.最后打開瀏覽器氮块,輸入網(wǎng)址http://127.0.0.1:61680/绍载,即可看到如下頁面诡宗,默認(rèn)

賬號:admin?? 密碼:password


9.登錄成功之后的頁面,控制臺頁面如下圖所示 :


測試demo—服務(wù)器端代碼---需要下載并引入mqttv3jar包

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttServerTest {

??? public static final String HOST = "tcp://127.0.0.1:61613";

??? public static final String TOPIC = "toclient/124";

??? public static final String TOPIC125 = "toclient/125";

??? private static final String clientid = "server-id-0";


??? private MqttClient client;

??? private MqttTopic topic;

??? private MqttTopic topic125;

??? private String userName = "admin";

??? private String passWord = "password";


??? private MqttMessage message;


??? public MqttServerTest() throws MqttException {

?????? // MemoryPersistence設(shè)置clientid的保存形式击儡,默認(rèn)為以內(nèi)存保存

?????? client = new MqttClient(HOST, clientid, newMemoryPersistence());

?????? connect();

??? }


??? private void connect() {

?????? MqttConnectOptions options = new MqttConnectOptions();

??? ??? options.setCleanSession(false);

?????? options.setUserName(userName);

?????? options.setPassword(passWord.toCharArray());

?????? // 設(shè)置超時時間

?????? options.setConnectionTimeout(10);

?????? // 設(shè)置會話心跳時間

?????? options.setKeepAliveInterval(20);

?????? try {

?????????? client.setCallback(new PushCallBack());

?????????? client.connect(options);

?????????? topic = client.getTopic(TOPIC);

?????????? topic125 = client.getTopic(TOPIC125);

?????? } catch (Exception e) {

?????????? e.printStackTrace();

?????? }

??? }


??? public void publish(MqttTopic topic, MqttMessage message)

?????????? throws MqttPersistenceException, MqttException {

?????? MqttDeliveryToken token = topic.publish(message);

?????? token.waitForCompletion();

?????? System.out.println("message is

published completely! "

????????????? + token.isComplete());

??? }


??? public static void main(String[] args) throws MqttException {

?????? MqttServerTest server = new MqttServerTest();


?????? server.message = new MqttMessage();

?????? server.message.setQos(2);

?????? server.message.setRetained(true);

?????? server.message.setPayload("給客戶端124推送的信息".getBytes());

?????? server.publish(server.topic, server.message);


?????? server.message = new MqttMessage();

?????? server.message.setQos(2);

?????? server.message.setRetained(true);

?????? server.message.setPayload("給客戶端125推送的信息".getBytes());

?????? server.publish(server.topic125, server.message);


?????? System.out.println(server.message.isRetained() + "------ratained狀態(tài)");

??? }

}

測試demo—客戶端代碼—需引入mqttv3jar包

1測試類一 代表 客戶端client124

importjava.util.concurrent.ScheduledExecutorService;


importorg.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

importorg.eclipse.paho.client.mqttv3.MqttException;

importorg.eclipse.paho.client.mqttv3.MqttTopic;

importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttClientTest {

?????? publicstatic final String HOST = "tcp://127.0.0.1:61613";

?????? publicstatic final String TOPIC = "toclient/124";

?????? privatestatic final String clientid = "client124";

?????? privateMqttClient client;

?????? privateMqttConnectOptions options;

?????? privateString userName = "admin";

?????? privateString passWord = "password";


?????? privateScheduledExecutorService scheduler;


?????? privatevoid start() {

????????????? try{

???????????????????? //host為主機名塔沃,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識符表示阳谍,MemoryPersistence設(shè)置clientid的保存形式蛀柴,默認(rèn)為以內(nèi)存保存

???????????????????? client= new MqttClient(HOST, clientid, new MemoryPersistence());

???????????????????? //MQTT的連接設(shè)置

???????????????????? options= new MqttConnectOptions();

???????????????????? //設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接

???????????????????? options.setCleanSession(true);

???????????????????? //設(shè)置連接的用戶名

???????????????????? options.setUserName(userName);

???????????????????? //設(shè)置連接的密碼

???????????????????? options.setPassword(passWord.toCharArray());

???????????????????? //設(shè)置超時時間 單位為秒

???????????????????? options.setConnectionTimeout(10);

???????????????????? //設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線矫夯,但這個方法并沒有重連的機制

???????????????????? options.setKeepAliveInterval(20);

???????????????????? //設(shè)置回調(diào)

???????????????????? client.setCallback(newPushCallBack());

???????????????????? MqttTopictopic = client.getTopic(TOPIC);

???????????????????? //setWill方法鸽疾,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息

???????????????????? options.setWill(topic,"close".getBytes(), 2, true);


???????????????????? client.connect(options);

???????????????????? //訂閱消息

???????????????????? int[]Qos = { 1 };

???????????????????? String[]topic1 = { TOPIC };

???????????????????? client.subscribe(topic1,Qos);


????????????? }catch (Exception e) {

???????????????????? e.printStackTrace();

????????????? }

?????? }


?????? publicstatic void main(String[] args) throws MqttException {

????????????? MqttClientTestclient = new MqttClientTest();

????????????? client.start();

?????? }

}

2測試類二 代表 客戶端client125

回調(diào)類demo---需要引入mqttv3jar包

importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

importorg.eclipse.paho.client.mqttv3.MqttCallback;

importorg.eclipse.paho.client.mqttv3.MqttMessage;


public class PushCallBack implementsMqttCallback{

?????? publicvoid connectionLost(Throwable cause) {

????????????? //連接丟失后训貌,一般在這里面進(jìn)行重連

????????????? System.out.println("連接斷開制肮,可以做重連");

?????? }


?????? publicvoid deliveryComplete(IMqttDeliveryToken token) {

????????????? System.out.println("deliveryComplete---------"+ token.isComplete());

?????? }


?????? publicvoid messageArrived(String topic, MqttMessage message)

???????????????????? throwsException {

????????????? //subscribe后得到的消息會執(zhí)行到這里面

????????????? System.out.println("接收消息主題: " + topic);

????????????? System.out.println("接收消息Qos : " + message.getQos());

????????????? System.out.println("接收消息內(nèi)容: " + new String(message.getPayload()));

?????? }

}

Demo架構(gòu)--只演示客戶端其中一個


服務(wù)端運行結(jié)果


客戶端運行結(jié)果


說明

1.? ?客戶端和服務(wù)端的demo代碼中的userName和passWord為apollo服務(wù)器的登錄賬號和密碼

2.? ?Host地址根據(jù)自己所需要的連接方式選擇


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末冒窍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子豺鼻,更是在濱河造成了極大的恐慌综液,老刑警劉巖,帶你破解...
    沈念sama閱讀 223,126評論 6 520
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件儒飒,死亡現(xiàn)場離奇詭異谬莹,居然都是意外死亡,警方通過查閱死者的電腦和手機桩了,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,421評論 3 400
  • 文/潘曉璐 我一進(jìn)店門附帽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人井誉,你說我怎么就攤上這事士葫。” “怎么了送悔?”我有些...
    開封第一講書人閱讀 169,941評論 0 366
  • 文/不壞的土叔 我叫張陵慢显,是天一觀的道長。 經(jīng)常有香客問我欠啤,道長荚藻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,294評論 1 300
  • 正文 為了忘掉前任洁段,我火速辦了婚禮应狱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘祠丝。我一直安慰自己疾呻,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,295評論 6 398
  • 文/花漫 我一把揭開白布写半。 她就那樣靜靜地躺著岸蜗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪叠蝇。 梳的紋絲不亂的頭發(fā)上璃岳,一...
    開封第一講書人閱讀 52,874評論 1 314
  • 那天,我揣著相機與錄音悔捶,去河邊找鬼铃慷。 笑死,一個胖子當(dāng)著我的面吹牛蜕该,可吹牛的內(nèi)容都是我干的犁柜。 我是一名探鬼主播,決...
    沈念sama閱讀 41,285評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼堂淡,長吁一口氣:“原來是場噩夢啊……” “哼馋缅!你這毒婦竟也來了坛怪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,249評論 0 277
  • 序言:老撾萬榮一對情侶失蹤股囊,失蹤者是張志新(化名)和其女友劉穎袜匿,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體稚疹,經(jīng)...
    沈念sama閱讀 46,760評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡居灯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,840評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了内狗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怪嫌。...
    茶點故事閱讀 40,973評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖柳沙,靈堂內(nèi)的尸體忽然破棺而出岩灭,到底是詐尸還是另有隱情,我是刑警寧澤赂鲤,帶...
    沈念sama閱讀 36,631評論 5 351
  • 正文 年R本政府宣布噪径,位于F島的核電站,受9級特大地震影響数初,放射性物質(zhì)發(fā)生泄漏找爱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,315評論 3 336
  • 文/蒙蒙 一泡孩、第九天 我趴在偏房一處隱蔽的房頂上張望车摄。 院中可真熱鬧,春花似錦仑鸥、人聲如沸吮播。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,797評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽意狠。三九已至,卻和暖如春泵琳,著一層夾襖步出監(jiān)牢的瞬間摄职,已是汗流浹背誊役。 一陣腳步聲響...
    開封第一講書人閱讀 33,926評論 1 275
  • 我被黑心中介騙來泰國打工获列, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蛔垢。 一個月前我還...
    沈念sama閱讀 49,431評論 3 379
  • 正文 我出身青樓击孩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鹏漆。 傳聞我的和親對象是個殘疾皇子巩梢,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,982評論 2 361

推薦閱讀更多精彩內(nèi)容