業(yè)務(wù)場景
硬件采集的數(shù)據(jù)傳入MQTT(這邊MQTT的服務(wù)器用的是EMQX,有興趣的可以自己去了解一下)单刁,JAVA通過代碼連接MQTT服務(wù)器捐祠,對數(shù)據(jù)進行處理
新建SpringBoot項目,pom文件中直接引入下面MQTT的依賴
<!-- MQTT start -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- MQTT end -->
MQTT工具類
MqttServerData
因為是多連接悼沈,所以說MQTT的配置需要存放在數(shù)據(jù)庫中屈溉,需要新增MQTT基礎(chǔ)表MqttServerData塞关,我這邊用的是Spring Data JPA,大家可以用自己的ORM框架生成這些基礎(chǔ)表的信息
package com.huibo.xx.xx.entity;
import com.huibo.core.entity.BaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.persistence.Entity;
/**
* @author :cola
* @description: Mqtt配置表
* @date :2023/7/18 8:51
*/
@Data
@Entity
@EqualsAndHashCode(callSuper = true)
public class MqttServerData extends BaseEntity {
private String serverHost;
private String serverPort;
private String userName;
private String password;
private String clientId;
/**
* mqtt訂閱的主體语婴,以“描孟,”隔開
*/
private String clientTopic;
/**
* 客戶端每次重連是否清除session
*/
private String clientCleanSession;
private String remark;
private Boolean enableFlag;
}
MqttServerDataRepository
package com.xx.xx.mqtt.repository;
import com.xx.xx.mqtt.entity.MqttServerData;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
public interface MqttServerDataRepository extends JpaRepository<MqttServerData, Long>, JpaSpecificationExecutor<MqttServerData{}
MqttServerDataService
package com.xx.xx.mqtt.service;
import com.xx.xx.entity.Device;
import com.xx.xx.mqtt.controller.dto.MqttServerDataAddDto;
import com.xx.xx.mqtt.controller.dto.MqttServerDataQueryDto;
import com.xx.xx.mqtt.controller.dto.MqttServerDataUpdateDto;
import com.xx.xx.mqtt.entity.MqttServerData;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
public interface MqttServerDataService {
MqttServerData add(MqttServerDataAddDto addDto);
boolean update(MqttServerDataUpdateDto updateDto) throws MqttException;
MqttServerData findById(Long id);
boolean delete(Long id) throws MqttException;
List<MqttServerData> findByList(MqttServerDataQueryDto queryDto);
Page<MqttServerData> findByPage(MqttServerDataQueryDto queryDto, Pageable page);
/**
* 初始化MQTT連接
*
* @param response 響應(yīng)體
*/
void initMqttServer(MqttServerData mqttServerData) throws MqttException;
}
MqttServerDataServiceImpl
/**
* @author cola
* @date 2023/7/13 16:23
*/
@Service
@Slf4j
public class MqttServerDataServiceImpl implements MqttServerDataService {
@Autowired
private MqttServerDataRepository mqttServerDataRepository;
@Autowired
private ExcelHandler excelHandler;
@Override
public MqttServerData add(MqttServerDataAddDto addDto) {
MqttServerData mqttServerData = mqttServerDataRepository.save(from(addDto));
if (mqttServerData.getEnableFlag()) {
this.initMqttServer(mqttServerData);
}
return mqttServerData;
}
@Override
@Transactional
public boolean update(MqttServerDataUpdateDto updateDto) throws MqttException {
MqttServerData entity = findById(updateDto.getId());
assemble(entity, updateDto);
if (!entity.getEnableFlag() && updateDto.getEnableFlag()) {
if (!MqttClientListener.mqttClients.containsKey(entity.getId().toString())) {
this.initMqttServer(entity);
} else {
}
}
if (entity.getEnableFlag() && !updateDto.getEnableFlag()) {
MqttClientConnect mqttClientConnect = MqttClientListener.mqttClients.get(entity.getId().toString());
mqttClientConnect.close();
}
return true;
}
@Override
public MqttServerData findById(Long id) {
return mqttServerDataRepository.findById(id).orElseThrow(() -> new NoEntityFoundException("No record found by id = " + id));
}
@Override
@Transactional
public boolean delete(Long id) throws MqttException {
MqttServerData entity = findById(id);
entity.setDeleted(true);
MqttClientConnect mqttClientConnect = MqttClientListener.mqttClients.get(entity.getId().toString());
if (Objects.nonNull(mqttClientConnect)) {
mqttClientConnect.close();
}
return true;
}
@Override
public List<MqttServerData> findByList(MqttServerDataQueryDto queryDto) {
return mqttServerDataRepository.findAll(getSpecification(queryDto));
}
@Override
public Page<MqttServerData> findByPage(MqttServerDataQueryDto queryDto, Pageable page) {
return mqttServerDataRepository.findAll(getSpecification(queryDto), page);
}
@Override
public void initMqttServer(MqttServerData mqttServerData) {
try {
MqttClientConnect mqttClientConnect = new MqttClientConnect();
mqttClientConnect.setMqttClientId(UUID.randomUUID().toString());
String host = "tcp://" + mqttServerData.getServerHost() + ":" + mqttServerData.getServerPort();
mqttClientConnect.setMqttClient(host, UUID.randomUUID().toString(), mqttServerData.getUserName(), mqttServerData.getPassword(), new MqttClientCallback(String.valueOf(mqttServerData.getId())));
//訂閱CPUId
mqttClientConnect.sub(String.format(DEVICE_DOWN_TOPIC, ServerUtil.getCPUSerial()));
MqttClientListener.mqttClients.put(String.valueOf(mqttServerData.getId()), mqttClientConnect);
log.info("{}--連接成功!砰左!訂閱主題--{}", mqttServerData.getId(), mqttServerData.getClientTopic());
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void addSubTopicForDevice(Device device) {
MqttClientListener.mqttClients.forEach((key, value) -> {
try {
value.sub(String.format(DEVICE_DOWN_TOPIC, device.getName()));
} catch (MqttException e) {
e.printStackTrace();
}
});
}
private MqttServerData from(MqttServerDataAddDto dto) {
MqttServerData entity = new MqttServerData();
BeanUtils.copyProperties(dto, entity);
entity.setClientTopic("/topic/default");
return entity;
}
private void assemble(MqttServerData entity, MqttServerDataUpdateDto updateDto) {
BeanUtils.copyProperties(updateDto, entity);
}
private Specification<MqttServerData> getSpecification(MqttServerDataQueryDto queryDto) {
return (root, query, builder) -> {
List<Predicate> predicates = Lists.newArrayList();
if (!StringUtils.isEmpty(queryDto.getServerHost())) {
predicates.add(builder.like(root.get("serverHost"), "%".concat(queryDto.getServerHost()).concat("%")));
}
if (Objects.nonNull(queryDto.getEnableFlag())) {
predicates.add(builder.isTrue(root.get("enableFlag")));
}
predicates.add(builder.isFalse(root.get("isDeleted")));
return builder.and(predicates.toArray(new Predicate[0]));
};
}
}
MqttClientListener
項目啟動匿醒,監(jiān)聽主題
package com.huibo.acquisition.mqtt.listener;
import com.xx.xx.mqtt.config.MqttClientConnect;
import com.xx.xx.acquisition.mqtt.controller.dto.MqttServerDataQueryDto;
import com.xx.xx.acquisition.mqtt.entity.MqttServerData;
import com.xx.xx.acquisition.mqtt.service.MqttServerDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author :cola
* @description: 項目啟動 監(jiān)聽主題
* @date :2023/7/18 10:53
*/
@Slf4j
@Component
public class MqttClientListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private MqttServerDataService mqttServerDataService;
/**
* 客戶端
*/
public static ConcurrentHashMap<String, MqttClientConnect> mqttClients = new ConcurrentHashMap();
private final AtomicBoolean isInit = new AtomicBoolean(false);
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//防止重復(fù)觸發(fā)
if (!isInit.compareAndSet(false, true)) {
return;
}
//TODO 需要傳入啟用參數(shù)
List<MqttServerData> mqttServers = mqttServerDataService.findByList(new MqttServerDataQueryDto() {{
setEnableFlag(Boolean.TRUE);
}});
try {
if (!CollectionUtils.isEmpty(mqttServers)) {
for (MqttServerData mqttServerData : mqttServers) {
mqttServerDataService.initMqttServer(mqttServerData);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
MqttClientConnect
package com.xx.xx.mqtt.config;
import com.huibo.acquisition.mqtt.callback.MqttClientCallback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
/**
* @author :cola
* @description: Mqtt
* @date :2023/7/18 10:33
*/
@Slf4j
@Component
public class MqttClientConnect {
private MqttClient mqttClient;
/**
* 系統(tǒng)的mqtt客戶端id
*/
private String mqttClientId;
/**
* 客戶端connect連接mqtt服務(wù)器
*
* @param userName 用戶名
* @param passWord 密碼
* @param mqttCallback 回調(diào)函數(shù)
**/
public void setMqttClient(String host, String clientId, String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
// 連接設(shè)置
MqttConnectOptions options = mqttConnectOptions(host, clientId, userName, passWord);
if (mqttCallback == null) {
mqttClient.setCallback(new MqttClientCallback(mqttClientId));
} else {
mqttClient.setCallback(mqttCallback);
}
mqttClient.connect(options);
}
/**
* MQTT連接參數(shù)設(shè)置
*/
private MqttConnectOptions mqttConnectOptions(String host, String clientId, String userName, String passWord) throws MqttException {
// 創(chuàng)建 MQTT 客戶端對象
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
// 連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
// 設(shè)置連接用戶名
options.setUserName(userName);
// 設(shè)置連接密碼
options.setPassword(passWord.toCharArray());
// 設(shè)置超時時間,單位為秒
options.setConnectionTimeout(30);
// 設(shè)置心跳時間 單位為秒缠导,表示服務(wù)器每 30 秒向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(30);
// 設(shè)置遺囑消息的話題廉羔,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic", (mqttClientId + "與服務(wù)器斷開連接").getBytes(), 0, false);
// 斷線自動重連
options.setAutomaticReconnect(true);
// 是否清空 session僻造,設(shè)置為 false 表示服務(wù)器會保留客戶端的連接記錄憋他,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
// 設(shè)置為 true 表示每次連接到服務(wù)端都是以新的身份
options.setCleanSession(true);
return options;
}
/**
* 關(guān)閉MQTT連接
*/
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/**
* 向某個主題發(fā)布消息 默認(rèn)qos:1
*
* @param topic:發(fā)布的主題
* @param msg:發(fā)布的消息
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//mqttMessage.setQos(2);
try {
mqttMessage.setPayload(msg.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
log.error(e.getMessage());
}
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某個主題發(fā)布消息
*
* @param topic: 發(fā)布的主題
* @param msg: 發(fā)布的消息
* @param qos: 消息質(zhì)量 Qos:0、1髓削、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 訂閱多個主題 竹挡,此方法默認(rèn)的的Qos等級為:1
*
* @param topic 主題
*/
public void sub(String[] topic) throws MqttException {
mqttClient.subscribe(topic);
}
/**
* 訂閱一個個主題 ,此方法默認(rèn)的的Qos等級為:1
*
* @param topic 主題
*/
public void sub(String topic) throws MqttException {
mqttClient.subscribe(topic);
}
/**
* 訂閱某一個主題立膛,可攜帶Qos
*
* @param topic 所要訂閱的主題
* @param qos 消息質(zhì)量:0揪罕、1梯码、2
*/
public void sub(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
public String getMqttClientId() {
return mqttClientId;
}
public void setMqttClientId(String mqttClientId) {
this.mqttClientId = mqttClientId;
}
}
MqttClientCallback
Mqtt的回調(diào)
package com.huibo.acquisition.mqtt.callback;
import com.huibo.acquisition.mqtt.listener.MqttClientListener;
import com.huibo.acquisition.service.DeviceService;
import com.huibo.acquisition.utils.ApplicationContextUtil;
import com.huibo.acquisition.utils.ServerUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.nio.charset.StandardCharsets;
import static com.huibo.acquisition.constant.GlobalConstant.DEVICE_DOWN_TOPIC;
/**
* @author :cola
* @description:Mqtt的回調(diào)
* @date :2023/7/18 10:43
*/
@Slf4j
public class MqttClientCallback implements MqttCallbackExtended {
/**
* 系統(tǒng)的mqtt客戶端id
*/
private String mqttClientId;
public MqttClientCallback(String mqttClientId) {
this.mqttClientId = mqttClientId;
}
/**
* TODO subscribe訂閱后得到的消息會執(zhí)行到這里
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
String data = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.printf("接收消息主題: %s%n", topic);
System.out.printf("接收消息Qos: %d%n", message.getQos());
// message.toString() 也可以拿到消息內(nèi)容
System.out.printf("接收消息內(nèi)容: %s%n", data);
System.out.printf("接收消息retained: %b%n", message.isRetained());
//處理消息
ApplicationContextUtil.getBean(DeviceService.class).handleMqttMessage(data);
}
/**
* MQTT 斷開連接會執(zhí)行此方法
*/
@Override
public void connectionLost(Throwable cause) {
log.info("斷開了MQTT連接 :{}", cause.getMessage());
}
/**
* publish發(fā)布成功后會執(zhí)行到這里
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("發(fā)布消息成功");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
MqttClientListener.mqttClients.forEach((k, v) -> {
try {
log.info("Mqtt重連成功:" + v.getMqttClientId() + " 已連接");
v.sub(String.format(DEVICE_DOWN_TOPIC, ServerUtil.getCPUSerial()));
} catch (MqttException e) {
e.printStackTrace();
}
});
}
}