上面兩篇文章介紹了mqtt服務的安裝拆又,
http://www.reibang.com/p/d421bb32fe4c
http://www.reibang.com/p/879bd7f2db92
這里介紹一下用java來實現(xiàn)mqtt的訂閱和消息分發(fā)北启,我們采用eclipse paho的jar包。
1熬的、創(chuàng)建java工程,添加依賴
<dependency>
? ? ? ? <groupId>org.eclipse.paho</groupId>
? ? ? ? <artifactId>org.eclipse.paho.client.mqttv3<artifactId>
? ? ? ? <version>1.2.0</version>
</dependency>
2赊级、訂閱者
package com.chen.mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ScheduledExecutorService;
/**
* @author: ChenJie
* @date 2018/8/10
*/
public class Subscriber {
????public static final StringHOST ="tcp://172.16.1.86:1883";
? ? public static final StringTOPIC ="speedTopic";
? ? private static final Stringclientid ="client83";
? ? private MqttClientclient;
? ? private MqttConnectOptionsoptions;
//? ? private String userName = "admin";
//? ? private String passWord = "password";
? ? private ScheduledExecutorServicescheduler;
? ? public static void main(String[] args) {
????????Subscriber client =new Subscriber();
? ? ? ? client.start();
? ? }
????private void start() {
????????try {
????????????client =new MqttClient(HOST,clientid,new MemoryPersistence());
? ? ? ? ? ? options =new MqttConnectOptions();
? ? ? ? ? ? options.setCleanSession(true);
? ? ? ? ? ? options.setConnectionTimeout(10);
? ? ? ? ? ? options.setKeepAliveInterval(20);
? ? ? ? ? ? client.setCallback(new PushCallback());
? ? ? ? ? ? MqttTopic topic =client.getTopic(TOPIC);
? ? ? ? ? ? //遺言
? ? ? ? ? ? options.setWill(topic,"close".getBytes(),2,true);
? ? ? ? ? ? client.connect(options);
? ? ? ? ? ? int [] qos = {1};
? ? ? ? ? ? String? [] topics = {TOPIC};
? ? ? ? ? ? client.subscribe(topics,qos);
? ? ? ? }catch (MqttException e) {
????????????e.printStackTrace();
? ? ? ? }
????}
}
3押框、消息發(fā)布者
package com.chen.mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author: ChenJie
* @date 2018/8/10
*/
public class Publisher {
????//tcp://MQTT安裝的服務器地址:MQTT定義的端口號
? ? public static final StringHOST ="tcp://172.16.1.86:1883";
? ? //定義一個主題
? ? public static final StringTOPIC ="speedTopic";
? ? //定義MQTT的ID,可以在MQTT服務配置中指定
? ? private static final Stringclientid ="server84";
? ? private MqttClientclient;
? ? private MqttTopictopic;
? ? private StringuserName ="mosquitto";
? ? private Stringpassword ="";
? ? private MqttMessagemessage;
? ? /**
? ? * 構造函數(shù)
? ? * @throws MqttException
????*/
? ? public Publisher()throws MqttException {
????????// MemoryPersistence設置clientid的保存形式理逊,默認為以內存保存
? ? ? ? client =new MqttClient(HOST, clientid, new MemoryPersistence());
? ? ? ? connect();
? ? }
private void connect() {
MqttConnectOptions options =new MqttConnectOptions();
? ? ? ? options.setCleanSession(false);
//? ? ? ? options.setUserName(userName);
//? ? ? ? options.setPassword(password.toCharArray());
? ? ? ? //超時時長
? ? ? ? options.setConnectionTimeout(100);
? ? ? ? //心跳時長
? ? ? ? options.setKeepAliveInterval(20);
? ? ? ? options.setServerURIs(new String[]{HOST}? );
? ? ? ? try{
????????????client.setCallback(new PushCallback() );
? ? ? ? ? ? client.connect(options);
? ? ? ? ? ? topic =client.getTopic(TOPIC);
? ? ? ? }
????????catch(Exception e){
????????????e.printStackTrace();
? ? ? ? }
}
public void publish(MqttTopic topic,MqttMessage message)throws MqttException {
MqttDeliveryToken token = topic.publish(message);
? ? ? ? System.out.println("等待發(fā)送成功:"+token.isComplete());
? ? ? ? token.waitForCompletion();
? ? ? ? System.out.println("已經(jīng)發(fā)送成功:"+token.isComplete());
? ? }
public static void main(String[] args)throws MqttException {
????????Publisher server =new Publisher();
? ? ? ? server.message =new MqttMessage();
? ? ? ? server.message.setQos(1);
? ? ? ? server.message.setRetained(true);
? ? ? ? for(int i=0;i<10;i++){
? ? ? ? ? ? server.message.setPayload(("hello,topic speed "+i).getBytes());
? ? ? ? ? ? server.publish(server.topic,server.message);
? ? ? ? }
????}
}
4橡伞、PushCallback
package com.chen.mqtt.mosquitto;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.time.LocalDateTime;
/**
* 發(fā)布消息的回調類
*
* 必須實現(xiàn)MqttCallback的接口并實現(xiàn)對應的相關接口方法CallBack 類將實現(xiàn) MqttCallBack。
* 每個客戶機標識都需要一個回調實例晋被。在此示例中兑徘,構造函數(shù)傳遞客戶機標識以另存為實例數(shù)據(jù)。
* 在回調中羡洛,將它用來標識已經(jīng)啟動了該回調的哪個實例挂脑。
* 必須在回調類中實現(xiàn)三個方法:
*
*? public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預訂的發(fā)布。
*
*? public void connectionLost(Throwable cause)在斷開連接時調用欲侮。
*
*? public void deliveryComplete(MqttDeliveryToken token))
*? 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用崭闲。
*? 由 MqttClient.connect 激活此回調。
*/
public class PushCallbackimplements MqttCallback{
@Override
? ? public void connectionLost(Throwable throwable) {
????????System.out.println("連接斷開威蕉!");
? ? ? ? System.out.println(LocalDateTime.now());
? ? }
@Override
? ? public void messageArrived(String topic, MqttMessage message)throws Exception {
????????System.out.println("接收消息主題 : " + topic);
? ? ? ? System.out.println("接收消息Qos : " + message.getQos());
? ? ? ? System.out.println("接收消息內容 : " +new String(message.getPayload()));
? ? }
????@Override
? ? public void deliveryComplete(IMqttDeliveryToken token) {
????????System.out.println("分發(fā)完成---------" + token.isComplete());
? ? ? ? System.out.println(LocalDateTime.now());
? ? }
}