代碼已開源在GitHub,如果有幫助歡迎star班眯。
Architecture
Server side 構(gòu)成
- broker (mqtt核心:用于消息的發(fā)送管理) 類似
pub-sub
隊(duì)列 - Application Server用于處理RestFul的請(qǐng)求希停,轉(zhuǎn)發(fā)為Mqtt消息
- Publisher 本質(zhì)是Mqtt client,用于發(fā)布server端消息到broker
- Subscriber 本質(zhì)是Mqtt client署隘,用于從broker訂閱client端消息
- Client side
- Publisher用于發(fā)布client端消息到broker
- Subscriber用于從broker訂閱server端的消息
- Client 用于發(fā)送RestFul 請(qǐng)求給Application Server觸發(fā)消息pub/sub
總結(jié):從結(jié)構(gòu)上Broker算是Mqtt的本質(zhì)上的Server端宠能,從業(yè)務(wù)上講封裝了Mqtt Client pub/sub的Application server和Broker共同構(gòu)成了業(yè)務(wù)上的Server端
安裝mosquitto及基本使用
安裝
# Install Mosquitto Broker
sudo apt-get update
sudo apt-get install mosquitto
# Install the Clients
sudo apt-get install mosquitto-clients
開啟、停止查看狀態(tài)
# 查看狀態(tài)
sudo service mosquitto status
# 使用默認(rèn)配置打開mosquitto, 使用-v打開log功能
sudo mosquitto -c /etc/mosquitto/mosquitto.conf -v
# 停止
sudo service mosquitto stop
#開啟
sudo service mosquitto start
使用mosquitto測(cè)試pub/sub
注意
pub和sub的clientid不能相同定踱,相同會(huì)刷屏棍潘。
# 簡單測(cè)試發(fā)布。 -h host -t topic -m message
mosquitto_pub -h localhost -t mqtt-test -m 'hello mqtt'
# 簡單測(cè)試訂閱崖媚。
mosquitto_sub -h localhost -t mqtt-test
# 發(fā)布設(shè)置用戶密碼 -u user -P password
mosquitto_pub -u admin -P admin -h localhost -t mqtt/loop/message -m 'test mqtt'
mosquitto_sub -u admin -P admin -h localhost -t mqtt/loop/message
# 指定發(fā)布clientid -i (id to use for this client)
mosquitto_sub -u admin -P admin -i shuai-ubuntu-test -h localhost -t mqtt/loop/message
mosquitto_pub -u admin -P admin -i shuai-ubuntu-test-client -h localhost -t mqtt/loop/message -m 'test mqtt client'
查看broker的log
mosquitto的默認(rèn)log 地址是:/var/log/mosquitto/xxx.log
tailf /var/log/mosquitto/mosquitto.log
構(gòu)建Java-Mqtt-Server(Springboot + Mqtt)
requirement依賴
-
mosquitto broker
可以使用Eclipse公開的broker,據(jù)說底層也是mosquitto恤浪。地址為
iot.eclipse.org
可以部署安裝mosquitto(本文方案)
springboot (2.1.5.RELEASE)
Eclipse Paho
curl/postman
構(gòu)建springboot項(xiàng)目
1. 使用idea springboot initializer 初始化springboot工程
使用springboot版本2.1.5.RELEASE
2. pom中添加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
3. MQTT Configuration
- 配置broker地址畅哑,
- 端口號(hào),
- 是否使用ssl水由,
- 用戶名
- 密碼
public abstract class MQTTConfig {
protected final String broker = "10.156.2.132";
protected final int qos = 2;
protected Boolean hasSSL = false; /* By default SSL is disabled */
protected Integer port = 1883; /* Default port */
protected final String userName = "admin";
protected final String password = "admin";
protected final String TCP = "tcp://";
protected final String SSL = "ssl://";
/**
* Custom Configuration
*
* @param broker
* @param port
* @param ssl
* @param withUserNamePass
*/
protected abstract void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass);
/**
* Default Configuration
*/
protected abstract void config();
}
4. Publisher推送者
定義接口
public interface IMQTTPublisher {
/**
* Publish message
*
* @param topic
* @param String Message
*/
public void publishMessage(String topic, String message);
/**
* Disconnect MQTT Client
*/
public void disconnect();
}
定義類
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MQTTPublisher extends MQTTConfig implements MqttCallback, IMQTTPublisher {
private String brokerUrl = null;
final private String colon = ":";
final private String clientId = "mqtt_server_pub";
private MqttClient mqttClient = null;
private MqttConnectOptions connectionOptions = null;
private MemoryPersistence persistence = null;
private static final Logger logger = LoggerFactory.getLogger(MQTTPublisher.class);
/**
* Private default constructor
*/
private MQTTPublisher() {
this.config();
}
/**
* Private constructor
*/
private MQTTPublisher(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
this.config(broker, port, ssl, withUserNamePass);
}
/**
* Factory method to get instance of MQTTPublisher
*
* @return MQTTPublisher
*/
public static MQTTPublisher getInstance() {
return new MQTTPublisher();
}
/**
* Factory method to get instance of MQTTPublisher
*
* @param broker
* @param port
* @param ssl
* @param withUserNamePass
* @return MQTTPublisher
*/
public static MQTTPublisher getInstance(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
return new MQTTPublisher(broker, port, ssl, withUserNamePass);
}
/*
* (non-Javadoc)
*
* @see
* com.bjitgroup.jasmysp.mqtt.publisher.MQTTPublisherBase#configurePublisher()
*/
@Override
protected void config() {
this.brokerUrl = this.TCP + this.broker + colon + this.port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
/*
* (non-Javadoc)
*
* @see
* com.bjitgroup.jasmysp.mqtt.publisher.MQTTPublisherBase#configurePublisher(
* java.lang.String, java.lang.Integer, java.lang.Boolean, java.lang.Boolean)
*/
@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
String protocal = this.TCP;
if (true == ssl) {
protocal = this.SSL;
}
this.brokerUrl = protocal + this.broker + colon + port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
if (true == withUserNamePass) {
if (password != null) {
this.connectionOptions.setPassword(this.password.toCharArray());
}
if (userName != null) {
this.connectionOptions.setUserName(this.userName);
}
}
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
/*
* (non-Javadoc)
* @see com.monirthought.mqtt.publisher.MQTTPublisherBase#publishMessage(java.lang.String, java.lang.String)
*/
@Override
public void publishMessage(String topic, String message) {
try {
MqttMessage mqttmessage = new MqttMessage(message.getBytes());
mqttmessage.setQos(this.qos);
this.mqttClient.publish(topic, mqttmessage);
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
/*
* (non-Javadoc)
* @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.Throwable)
*/
@Override
public void connectionLost(Throwable arg0) {
logger.info("Connection Lost");
}
/*
* (non-Javadoc)
* @see org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
logger.info("delivery completed");
}
/*
* (non-Javadoc)
* @see org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage)
*/
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// Leave it blank for Publisher
}
/*
* (non-Javadoc)
* @see com.monirthought.mqtt.publisher.MQTTPublisherBase#disconnect()
*/
@Override
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
}
5. Subscriber 訂閱者
定義接口
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface IMQTTSubscriber {
public static final Logger logger = LoggerFactory.getLogger(IMQTTSubscriber.class);
/**
* Subscribe message
*
* @param topic
* @param jasonMessage
*/
public void subscribeMessage(String topic);
/**
* Disconnect MQTT Client
*/
public void disconnect();
}
類定義
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
@Component
public class MQTTSubscriber extends MQTTConfig implements MqttCallback, IMQTTSubscriber {
private String brokerUrl = null;
final private String colon = ":";
final private String clientId = "mqtt_server_sub";
private MqttClient mqttClient = null;
private MqttConnectOptions connectionOptions = null;
private MemoryPersistence persistence = null;
private static final Logger logger = LoggerFactory.getLogger(MQTTSubscriber.class);
public MQTTSubscriber() {
this.config();
}
/*
* (non-Javadoc)
*
* @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.
* Throwable)
*/
@Override
public void connectionLost(Throwable cause) {
logger.info("Connection Lost");
}
/*
* (non-Javadoc)
*
* @see
* org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String,
* org.eclipse.paho.client.mqttv3.MqttMessage)
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Called when a message arrives from the server that matches any
// subscription made by the client
String time = new Timestamp(System.currentTimeMillis()).toString();
System.out.println();
System.out.println("***********************************************************************");
System.out.println("Message Arrived at Time: " + time + " Topic: " + topic + " Message: "
+ new String(message.getPayload()));
System.out.println("***********************************************************************");
System.out.println();
}
/*
* (non-Javadoc)
*
* @see
* org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho
* .client.mqttv3.IMqttDeliveryToken)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Leave it blank for subscriber
}
/*
* (non-Javadoc)
*
* @see
* com.monirthought.mqtt.subscriber.MQTTSubscriberBase#subscribeMessage(java.
* lang.String)
*/
@Override
public void subscribeMessage(String topic) {
try {
this.mqttClient.subscribe(topic, this.qos);
} catch (MqttException me) {
me.printStackTrace();
}
}
/*
* (non-Javadoc)
*
* @see com.monirthought.mqtt.subscriber.MQTTSubscriberBase#disconnect()
*/
public void disconnect() {
try {
this.mqttClient.disconnect();
} catch (MqttException me) {
logger.error("ERROR", me);
}
}
/*
* (non-Javadoc)
*
* @see com.monirthought.config.MQTTConfig#config(java.lang.String,
* java.lang.Integer, java.lang.Boolean, java.lang.Boolean)
*/
@Override
protected void config(String broker, Integer port, Boolean ssl, Boolean withUserNamePass) {
String protocal = this.TCP;
if (true == ssl) {
protocal = this.SSL;
}
this.brokerUrl = protocal + this.broker + colon + port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
if (true == withUserNamePass) {
if (password != null) {
this.connectionOptions.setPassword(this.password.toCharArray());
}
if (userName != null) {
this.connectionOptions.setUserName(this.userName);
}
}
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
me.printStackTrace();
}
}
/*
* (non-Javadoc)
*
* @see com.monirthought.config.MQTTConfig#config()
*/
@Override
protected void config() {
this.brokerUrl = this.TCP + this.broker + colon + this.port;
this.persistence = new MemoryPersistence();
this.connectionOptions = new MqttConnectOptions();
try {
this.mqttClient = new MqttClient(brokerUrl, clientId, persistence);
this.connectionOptions.setCleanSession(true);
this.mqttClient.connect(this.connectionOptions);
this.mqttClient.setCallback(this);
} catch (MqttException me) {
me.printStackTrace();
}
}
}
6. 構(gòu)建 RestFul接口
構(gòu)建Controller
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.PostConstruct;
@Slf4j
@RestController
public class DemoRestController {
public static String TOPIC_LOOP_TEST = "mqtt/loop/message";
@Autowired
IMQTTPublisher publisher;
@Autowired
IMQTTSubscriber subscriber;
@PostConstruct
public void init() {
subscriber.subscribeMessage(TOPIC_LOOP_TEST);
}
@RequestMapping(value = "/mqtt/loop/message", method = RequestMethod.POST)
public String index(@RequestBody String data) {
publisher.publishMessage(TOPIC_LOOP_TEST, data);
return "Success";
}
}
7. 使用curl命令進(jìn)行api調(diào)用測(cè)試
? curl -X POST "http://127.0.0.1:8080/mqtt/loop/message" -d "test"
Success%
# springboot 窗口中可以看到自己sub的回顯
***********************************************************************
Message Arrived at Time: 2019-05-21 16:11:13.675 Topic: mqtt/loop/message Message: test=
***********************************************************************
也可以使用postman 調(diào)用8080 端口調(diào)試荠呐。
構(gòu)建Java-Mqtt-Server (Springboot + Mqtt +protobuf)
在現(xiàn)有基礎(chǔ)上添加protobuf包裝pub/sub 消息
1. proto文件
將.proto文件放到src/main/proto/
下
2. 使用maven生成protobuf java代碼
pom中properties中添加
<properties>
<java.version>1.8</java.version>
<grpc.version>1.6.1</grpc.version>
<protobuf.version>3.3.0</protobuf.version>
</properties>
pom dependencies中添加
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
pom build中添加,pom plugins中添加
<build>
<!--protobuf ext-->
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
使用IDE中右側(cè)Maven Projects -> Lifecycle ->compile 生成java對(duì)應(yīng)的protobuf文件
生成的路徑在:target/generated-sources/protobuf/java/對(duì)應(yīng)的包名下
3. 使用proto封裝mqtt message
使用Proto中的newBuilder構(gòu)建builder砂客。使用builder中的set方法設(shè)置proto中的參數(shù)泥张,例如:
KylinProto.Group.Builder builder = KylinProto.Group.newBuilder();
KylinProto.Group group = builder.setThreshold(85.f)
.setTop(1)
.setGroup(group_name).build();
publisher.publish(topic, group.toByteArray(), 2, false);