MQTT JAVA paho實例

上面兩篇文章介紹了mqtt服務的安裝拆又,

http://www.reibang.com/p/d421bb32fe4c

http://www.reibang.com/p/879bd7f2db92

這里介紹一下用java來實現(xiàn)mqtt的訂閱和消息分發(fā)北启,我們采用eclipse paho的jar包。

1熬的、創(chuàng)建java工程,添加依賴

<dependency>

? ? ? ? <groupId>org.eclipse.paho</groupId>

? ? ? ? <artifactId>org.eclipse.paho.client.mqttv3<artifactId>

? ? ? ? <version>1.2.0</version>

</dependency>

2赊级、訂閱者

package com.chen.mqtt.mosquitto;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.concurrent.ScheduledExecutorService;

/**

* @author: ChenJie

* @date 2018/8/10

*/

public class Subscriber {

????public static final StringHOST ="tcp://172.16.1.86:1883";

? ? public static final StringTOPIC ="speedTopic";

? ? private static final Stringclientid ="client83";

? ? private MqttClientclient;

? ? private MqttConnectOptionsoptions;

//? ? private String userName = "admin";

//? ? private String passWord = "password";

? ? private ScheduledExecutorServicescheduler;

? ? public static void main(String[] args) {

????????Subscriber client =new Subscriber();

? ? ? ? client.start();

? ? }

????private void start() {

????????try {

????????????client =new MqttClient(HOST,clientid,new MemoryPersistence());

? ? ? ? ? ? options =new MqttConnectOptions();

? ? ? ? ? ? options.setCleanSession(true);

? ? ? ? ? ? options.setConnectionTimeout(10);

? ? ? ? ? ? options.setKeepAliveInterval(20);

? ? ? ? ? ? client.setCallback(new PushCallback());

? ? ? ? ? ? MqttTopic topic =client.getTopic(TOPIC);

? ? ? ? ? ? //遺言

? ? ? ? ? ? options.setWill(topic,"close".getBytes(),2,true);

? ? ? ? ? ? client.connect(options);

? ? ? ? ? ? int [] qos = {1};

? ? ? ? ? ? String? [] topics = {TOPIC};

? ? ? ? ? ? client.subscribe(topics,qos);

? ? ? ? }catch (MqttException e) {

????????????e.printStackTrace();

? ? ? ? }

????}

}

3押框、消息發(fā)布者

package com.chen.mqtt.mosquitto;

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**

* @author: ChenJie

* @date 2018/8/10

*/

public class Publisher {

????//tcp://MQTT安裝的服務器地址:MQTT定義的端口號

? ? public static final StringHOST ="tcp://172.16.1.86:1883";

? ? //定義一個主題

? ? public static final StringTOPIC ="speedTopic";

? ? //定義MQTT的ID,可以在MQTT服務配置中指定

? ? private static final Stringclientid ="server84";

? ? private MqttClientclient;

? ? private MqttTopictopic;

? ? private StringuserName ="mosquitto";

? ? private Stringpassword ="";

? ? private MqttMessagemessage;

? ? /**

? ? * 構造函數(shù)

? ? * @throws MqttException

????*/

? ? public Publisher()throws MqttException {

????????// MemoryPersistence設置clientid的保存形式理逊,默認為以內存保存

? ? ? ? client =new MqttClient(HOST, clientid, new MemoryPersistence());

? ? ? ? connect();

? ? }

private void connect() {

MqttConnectOptions options =new MqttConnectOptions();

? ? ? ? options.setCleanSession(false);

//? ? ? ? options.setUserName(userName);

//? ? ? ? options.setPassword(password.toCharArray());

? ? ? ? //超時時長

? ? ? ? options.setConnectionTimeout(100);

? ? ? ? //心跳時長

? ? ? ? options.setKeepAliveInterval(20);

? ? ? ? options.setServerURIs(new String[]{HOST}? );

? ? ? ? try{

????????????client.setCallback(new PushCallback() );

? ? ? ? ? ? client.connect(options);

? ? ? ? ? ? topic =client.getTopic(TOPIC);

? ? ? ? }

????????catch(Exception e){

????????????e.printStackTrace();

? ? ? ? }

}

public void publish(MqttTopic topic,MqttMessage message)throws MqttException {

MqttDeliveryToken token = topic.publish(message);

? ? ? ? System.out.println("等待發(fā)送成功:"+token.isComplete());

? ? ? ? token.waitForCompletion();

? ? ? ? System.out.println("已經(jīng)發(fā)送成功:"+token.isComplete());

? ? }

public static void main(String[] args)throws MqttException {

????????Publisher server =new Publisher();

? ? ? ? server.message =new MqttMessage();

? ? ? ? server.message.setQos(1);

? ? ? ? server.message.setRetained(true);

? ? ? ? for(int i=0;i<10;i++){

? ? ? ? ? ? server.message.setPayload(("hello,topic speed "+i).getBytes());

? ? ? ? ? ? server.publish(server.topic,server.message);

? ? ? ? }

????}

}

4橡伞、PushCallback

package com.chen.mqtt.mosquitto;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.time.LocalDateTime;

/**

* 發(fā)布消息的回調類

*

* 必須實現(xiàn)MqttCallback的接口并實現(xiàn)對應的相關接口方法CallBack 類將實現(xiàn) MqttCallBack。

* 每個客戶機標識都需要一個回調實例晋被。在此示例中兑徘,構造函數(shù)傳遞客戶機標識以另存為實例數(shù)據(jù)。

* 在回調中羡洛,將它用來標識已經(jīng)啟動了該回調的哪個實例挂脑。

* 必須在回調類中實現(xiàn)三個方法:

*

*? public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預訂的發(fā)布。

*

*? public void connectionLost(Throwable cause)在斷開連接時調用欲侮。

*

*? public void deliveryComplete(MqttDeliveryToken token))

*? 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用崭闲。

*? 由 MqttClient.connect 激活此回調。

*/

public class PushCallbackimplements MqttCallback{

@Override

? ? public void connectionLost(Throwable throwable) {

????????System.out.println("連接斷開威蕉!");

? ? ? ? System.out.println(LocalDateTime.now());

? ? }

@Override

? ? public void messageArrived(String topic, MqttMessage message)throws Exception {

????????System.out.println("接收消息主題 : " + topic);

? ? ? ? System.out.println("接收消息Qos : " + message.getQos());

? ? ? ? System.out.println("接收消息內容 : " +new String(message.getPayload()));

? ? }

????@Override

? ? public void deliveryComplete(IMqttDeliveryToken token) {

????????System.out.println("分發(fā)完成---------" + token.isComplete());

? ? ? ? System.out.println(LocalDateTime.now());

? ? }

}

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末刁俭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子韧涨,更是在濱河造成了極大的恐慌牍戚,老刑警劉巖侮繁,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異如孝,居然都是意外死亡宪哩,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門暑竟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來斋射,“玉大人,你說我怎么就攤上這事但荤÷掎” “怎么了?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵腹躁,是天一觀的道長桑包。 經(jīng)常有香客問我,道長纺非,這世上最難降的妖魔是什么哑了? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮烧颖,結果婚禮上弱左,老公的妹妹穿的比我還像新娘。我一直安慰自己炕淮,他們只是感情好拆火,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著涂圆,像睡著了一般们镜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上润歉,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天模狭,我揣著相機與錄音,去河邊找鬼踩衩。 笑死嚼鹉,一個胖子當著我的面吹牛,可吹牛的內容都是我干的驱富。 我是一名探鬼主播锚赤,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼萌朱!你這毒婦竟也來了?” 一聲冷哼從身側響起策菜,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤晶疼,失蹤者是張志新(化名)和其女友劉穎酒贬,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體翠霍,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡锭吨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了寒匙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片零如。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖锄弱,靈堂內的尸體忽然破棺而出考蕾,到底是詐尸還是另有隱情,我是刑警寧澤会宪,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布肖卧,位于F島的核電站,受9級特大地震影響掸鹅,放射性物質發(fā)生泄漏塞帐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一巍沙、第九天 我趴在偏房一處隱蔽的房頂上張望葵姥。 院中可真熱鬧,春花似錦句携、人聲如沸榔幸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽牡辽。三九已至,卻和暖如春敞临,著一層夾襖步出監(jiān)牢的瞬間态辛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工挺尿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留奏黑,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓编矾,卻偏偏與公主長得像熟史,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子窄俏,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理蹂匹,服務發(fā)現(xiàn),斷路器凹蜈,智...
    卡卡羅2017閱讀 134,601評論 18 139
  • 1. Java基礎部分 基礎部分的順序:基本語法限寞,類相關的語法忍啸,內部類的語法,繼承相關的語法履植,異常的語法计雌,線程的語...
    子非魚_t_閱讀 31,587評論 18 399
  • MQTT通訊及服務器的搭建 Apollo服務器的搭建 1:Apollo服務器下載 首先從http://active...
    惡人mvp閱讀 3,034評論 1 2
  • 地平線切開黑夜的肚皮, 生出一個金燦燦的紅孩兒玫霎。
    青鵝閱讀 243評論 0 0
  • 父親,你寬闊的肩膀,依偎在你的懷里,那時我還是孩子,不知道天高地厚,在你的懷里,我成了我的英雄,在那個世界,我...
    掃地專業(yè)高級研究生閱讀 379評論 0 0