MQTT通訊及服務(wù)器的搭建
Apollo服務(wù)器的搭建
1:Apollo服務(wù)器下載
首先從http://activemq.apache.org/apollo/download.html官網(wǎng)上下載windows對應(yīng)的apollo版本羡宙,本文下載的是apache-apollo-1.7.1-windows-distro.zip 版本茶鉴。windows的版本為win10研铆,JDK版本1.8。
2.解壓到C:\apache-apollo下晦溪,此時會多出一個apache-apollo-1.7.1文件夾荧嵌。
3.然后以管理員的身份運行cmd,進(jìn)入到如下目錄C:\apache-apollo\apache-apollo-1.7.1\bin亭姥,如下圖所示:
4.然后就是要創(chuàng)建broker,這里是創(chuàng)建在C:\apache-apollo\broker
的目錄下顾稀,執(zhí)行如下命令:apollo create myapolloC:\apache-apollo\broker
5.broker創(chuàng)建成功的提示如下圖所示:
6.創(chuàng)建完broker之后就是要運行apollo达罗,進(jìn)入C:\apache-apollo\broker\bin目錄下,執(zhí)行如下命令:apollo-brokerrun
7.apollo運行成功的提示础拨,如下圖所示:
8.最后打開瀏覽器氮块,輸入網(wǎng)址http://127.0.0.1:61680/绍载,即可看到如下頁面诡宗,默認(rèn)
賬號:admin?? 密碼:password
9.登錄成功之后的頁面,控制臺頁面如下圖所示 :
測試demo—服務(wù)器端代碼---需要下載并引入mqttv3jar包
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttServerTest {
??? public static final String HOST = "tcp://127.0.0.1:61613";
??? public static final String TOPIC = "toclient/124";
??? public static final String TOPIC125 = "toclient/125";
??? private static final String clientid = "server-id-0";
??? private MqttClient client;
??? private MqttTopic topic;
??? private MqttTopic topic125;
??? private String userName = "admin";
??? private String passWord = "password";
??? private MqttMessage message;
??? public MqttServerTest() throws MqttException {
?????? // MemoryPersistence設(shè)置clientid的保存形式击儡,默認(rèn)為以內(nèi)存保存
?????? client = new MqttClient(HOST, clientid, newMemoryPersistence());
?????? connect();
??? }
??? private void connect() {
?????? MqttConnectOptions options = new MqttConnectOptions();
??? ??? options.setCleanSession(false);
?????? options.setUserName(userName);
?????? options.setPassword(passWord.toCharArray());
?????? // 設(shè)置超時時間
?????? options.setConnectionTimeout(10);
?????? // 設(shè)置會話心跳時間
?????? options.setKeepAliveInterval(20);
?????? try {
?????????? client.setCallback(new PushCallBack());
?????????? client.connect(options);
?????????? topic = client.getTopic(TOPIC);
?????????? topic125 = client.getTopic(TOPIC125);
?????? } catch (Exception e) {
?????????? e.printStackTrace();
?????? }
??? }
??? public void publish(MqttTopic topic, MqttMessage message)
?????????? throws MqttPersistenceException, MqttException {
?????? MqttDeliveryToken token = topic.publish(message);
?????? token.waitForCompletion();
?????? System.out.println("message is
published completely! "
????????????? + token.isComplete());
??? }
??? public static void main(String[] args) throws MqttException {
?????? MqttServerTest server = new MqttServerTest();
?????? server.message = new MqttMessage();
?????? server.message.setQos(2);
?????? server.message.setRetained(true);
?????? server.message.setPayload("給客戶端124推送的信息".getBytes());
?????? server.publish(server.topic, server.message);
?????? server.message = new MqttMessage();
?????? server.message.setQos(2);
?????? server.message.setRetained(true);
?????? server.message.setPayload("給客戶端125推送的信息".getBytes());
?????? server.publish(server.topic125, server.message);
?????? System.out.println(server.message.isRetained() + "------ratained狀態(tài)");
??? }
}
測試demo—客戶端代碼—需引入mqttv3jar包
1測試類一 代表 客戶端client124
importjava.util.concurrent.ScheduledExecutorService;
importorg.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
importorg.eclipse.paho.client.mqttv3.MqttException;
importorg.eclipse.paho.client.mqttv3.MqttTopic;
importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttClientTest {
?????? publicstatic final String HOST = "tcp://127.0.0.1:61613";
?????? publicstatic final String TOPIC = "toclient/124";
?????? privatestatic final String clientid = "client124";
?????? privateMqttClient client;
?????? privateMqttConnectOptions options;
?????? privateString userName = "admin";
?????? privateString passWord = "password";
?????? privateScheduledExecutorService scheduler;
?????? privatevoid start() {
????????????? try{
???????????????????? //host為主機名塔沃,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識符表示阳谍,MemoryPersistence設(shè)置clientid的保存形式蛀柴,默認(rèn)為以內(nèi)存保存
???????????????????? client= new MqttClient(HOST, clientid, new MemoryPersistence());
???????????????????? //MQTT的連接設(shè)置
???????????????????? options= new MqttConnectOptions();
???????????????????? //設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
???????????????????? options.setCleanSession(true);
???????????????????? //設(shè)置連接的用戶名
???????????????????? options.setUserName(userName);
???????????????????? //設(shè)置連接的密碼
???????????????????? options.setPassword(passWord.toCharArray());
???????????????????? //設(shè)置超時時間 單位為秒
???????????????????? options.setConnectionTimeout(10);
???????????????????? //設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線矫夯,但這個方法并沒有重連的機制
???????????????????? options.setKeepAliveInterval(20);
???????????????????? //設(shè)置回調(diào)
???????????????????? client.setCallback(newPushCallBack());
???????????????????? MqttTopictopic = client.getTopic(TOPIC);
???????????????????? //setWill方法鸽疾,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息
???????????????????? options.setWill(topic,"close".getBytes(), 2, true);
???????????????????? client.connect(options);
???????????????????? //訂閱消息
???????????????????? int[]Qos = { 1 };
???????????????????? String[]topic1 = { TOPIC };
???????????????????? client.subscribe(topic1,Qos);
????????????? }catch (Exception e) {
???????????????????? e.printStackTrace();
????????????? }
?????? }
?????? publicstatic void main(String[] args) throws MqttException {
????????????? MqttClientTestclient = new MqttClientTest();
????????????? client.start();
?????? }
}
2測試類二 代表 客戶端client125
回調(diào)類demo---需要引入mqttv3jar包
importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
importorg.eclipse.paho.client.mqttv3.MqttCallback;
importorg.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallBack implementsMqttCallback{
?????? publicvoid connectionLost(Throwable cause) {
????????????? //連接丟失后训貌,一般在這里面進(jìn)行重連
????????????? System.out.println("連接斷開制肮,可以做重連");
?????? }
?????? publicvoid deliveryComplete(IMqttDeliveryToken token) {
????????????? System.out.println("deliveryComplete---------"+ token.isComplete());
?????? }
?????? publicvoid messageArrived(String topic, MqttMessage message)
???????????????????? throwsException {
????????????? //subscribe后得到的消息會執(zhí)行到這里面
????????????? System.out.println("接收消息主題: " + topic);
????????????? System.out.println("接收消息Qos : " + message.getQos());
????????????? System.out.println("接收消息內(nèi)容: " + new String(message.getPayload()));
?????? }
}
Demo架構(gòu)--只演示客戶端其中一個
服務(wù)端運行結(jié)果
客戶端運行結(jié)果
說明
1.? ?客戶端和服務(wù)端的demo代碼中的userName和passWord為apollo服務(wù)器的登錄賬號和密碼
2.? ?Host地址根據(jù)自己所需要的連接方式選擇