(mqtt java客戶端整合Spring的參看這篇文章)
Paho Java客戶端是一個用Java編寫的MQTT客戶端庫粹断,用于開發(fā)在JVM或其他Java兼容平臺(如Android)上運(yùn)行的應(yīng)用程序忧换。
Paho Java客戶端提供了兩個API:MqttAsyncClient提供了一個完全異步的API膏萧,通過已注冊的回調(diào)通知完成活動。 MqttClient是MqttAsyncClient的一個同步包裝伞访,其中函數(shù)與應(yīng)用程序同步霞玄。
下載
將下面顯示的依賴性定義添加到maven pom文件中焕毫。
最新版本是1.2.0
和當(dāng)前的快照版本1.2.1-SNAPSHOT
。
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
入門
基類 | 介紹 |
---|---|
MqttClient | 同步調(diào)用客戶端航揉,使用阻塞方法與MQTT服務(wù)器通信塞祈。 |
MqttAsyncClient | 異步調(diào)用客戶端,使用非阻塞方法與MQTT服務(wù)器通信帅涂,允許操作在后臺運(yùn)行议薪。 |
MqttClientPersistence | 表示持久性數(shù)據(jù)存儲尤蛮,用于存儲正在傳輸?shù)某稣竞腿胝鞠ⅲ瑥亩鴮?shí)現(xiàn)向指定的QoS的傳遞斯议。 可以使用 MqttClient指定此接口的實(shí)現(xiàn)产捞,MqttClient將使用該實(shí)現(xiàn)來持久保存QoS為1和2消息。 |
MqttConnectOptions | 保存控制客戶端連接到服務(wù)器的方式的選項(xiàng)集哼御,包括用戶名坯临、密碼等。 |
MqttMessage | MQTT消息恋昼,保存應(yīng)用程序有效負(fù)載和指定消息如何傳遞的選項(xiàng)消息看靠。 |
下面包含的代碼是一個非常基本的示例焰雕,它連接到服務(wù)器并使用MqttClient同步API發(fā)布/訂閱消息衷笋。
- 發(fā)布端
/**
*發(fā)布端
*/
public class PublishSample {
public static void main(String[] args) {
String topic = "mqtt/test";
String content = "hello 哈哈";
int qos = 1;
String broker = "tcp://iot.eclipse.org:1883";
String userName = "test";
String password = "test";
String clientId = "pubClient";
// 內(nèi)存存儲
MemoryPersistence persistence = new MemoryPersistence();
try {
// 創(chuàng)建客戶端
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
// 創(chuàng)建鏈接參數(shù)
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新啟動和重新連接時記住狀態(tài)
connOpts.setCleanSession(false);
// 設(shè)置連接的用戶名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立連接
sampleClient.connect(connOpts);
// 創(chuàng)建消息
MqttMessage message = new MqttMessage(content.getBytes());
// 設(shè)置消息的服務(wù)質(zhì)量
message.setQos(qos);
// 發(fā)布消息
sampleClient.publish(topic, message);
// 斷開連接
sampleClient.disconnect();
// 關(guān)閉客戶端
sampleClient.close();
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
- 訂閱端
/**
*訂閱端
*/
public class SubscribeSample {
public static void main(String[] args) throws MqttException {
String HOST = "tcp://iot.eclipse.org:1883";
String TOPIC = "mqtt/test";
int qos = 1;
String clientid = "subClient";
String userName = "test";
String passWord = "test";
try {
// host為主機(jī)名,test為clientid即連接MQTT的客戶端ID矩屁,一般以客戶端唯一標(biāo)識符表示辟宗,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄吝秕,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(true);
// 設(shè)置連接的用戶名
options.setUserName(userName);
// 設(shè)置連接的密碼
options.setPassword(passWord.toCharArray());
// 設(shè)置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線泊脐,但這個方法并沒有重連的機(jī)制
options.setKeepAliveInterval(20);
// 設(shè)置回調(diào)函數(shù)
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("topic:"+topic);
System.out.println("Qos:"+message.getQos());
System.out.println("message content:"+new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+ token.isComplete());
}
});
client.connect(options);
//訂閱消息
client.subscribe(TOPIC, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
鏈接
- 項(xiàng)目網(wǎng)站:https://www.eclipse.org/paho
- MQTT協(xié)議概述:http://www.reibang.com/p/73d9c6668dfc
- Paho Java:https://eclipse.org/paho/clients/java/
- GitHub:https://github.com/eclipse/paho.mqtt.java
- Spring支持:http://www.reibang.com/p/6b60858b7d44
- Mosquitto搭建:http://www.reibang.com/p/9e3cb7042a2e