SpringBoot整合MQTT實現(xiàn)MQTT多連接

業(yè)務(wù)場景

硬件采集的數(shù)據(jù)傳入MQTT(這邊MQTT的服務(wù)器用的是EMQX,有興趣的可以自己去了解一下)单刁,JAVA通過代碼連接MQTT服務(wù)器捐祠,對數(shù)據(jù)進行處理

新建SpringBoot項目,pom文件中直接引入下面MQTT的依賴

        <!-- MQTT start -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!-- MQTT end -->

MQTT工具類

MqttServerData

因為是多連接悼沈,所以說MQTT的配置需要存放在數(shù)據(jù)庫中屈溉,需要新增MQTT基礎(chǔ)表MqttServerData塞关,我這邊用的是Spring Data JPA,大家可以用自己的ORM框架生成這些基礎(chǔ)表的信息

package com.huibo.xx.xx.entity;

import com.huibo.core.entity.BaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;

import javax.persistence.Entity;

/**
 * @author :cola
 * @description: Mqtt配置表
 * @date :2023/7/18 8:51
 */
@Data
@Entity
@EqualsAndHashCode(callSuper = true)
public class MqttServerData extends BaseEntity {

    private String serverHost;

    private String serverPort;

    private String userName;

    private String password;

    private String clientId;

    /**
     * mqtt訂閱的主體语婴,以“描孟,”隔開
     */
    private String clientTopic;

    /**
     * 客戶端每次重連是否清除session
     */
    private String clientCleanSession;

    private String remark;

    private Boolean enableFlag;
}

MqttServerDataRepository

package com.xx.xx.mqtt.repository;

import com.xx.xx.mqtt.entity.MqttServerData;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;

public interface MqttServerDataRepository extends JpaRepository<MqttServerData, Long>, JpaSpecificationExecutor<MqttServerData{}

MqttServerDataService

package com.xx.xx.mqtt.service;


import com.xx.xx.entity.Device;
import com.xx.xx.mqtt.controller.dto.MqttServerDataAddDto;
import com.xx.xx.mqtt.controller.dto.MqttServerDataQueryDto;
import com.xx.xx.mqtt.controller.dto.MqttServerDataUpdateDto;
import com.xx.xx.mqtt.entity.MqttServerData;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.web.multipart.MultipartFile;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;

public interface MqttServerDataService {

    MqttServerData add(MqttServerDataAddDto addDto);

    boolean update(MqttServerDataUpdateDto updateDto) throws MqttException;

    MqttServerData findById(Long id);

    boolean delete(Long id) throws MqttException;

    List<MqttServerData> findByList(MqttServerDataQueryDto queryDto);

    Page<MqttServerData> findByPage(MqttServerDataQueryDto queryDto, Pageable page);

    /**
     * 初始化MQTT連接
     *
     * @param response 響應(yīng)體
     */
    void initMqttServer(MqttServerData mqttServerData) throws MqttException;
}

MqttServerDataServiceImpl

/**
 * @author cola
 * @date 2023/7/13 16:23
 */
@Service
@Slf4j
public class MqttServerDataServiceImpl implements MqttServerDataService {

    @Autowired
    private MqttServerDataRepository mqttServerDataRepository;

    @Autowired
    private ExcelHandler excelHandler;

    @Override
    public MqttServerData add(MqttServerDataAddDto addDto) {
        MqttServerData mqttServerData = mqttServerDataRepository.save(from(addDto));
        if (mqttServerData.getEnableFlag()) {
            this.initMqttServer(mqttServerData);
        }
        return mqttServerData;
    }

    @Override
    @Transactional
    public boolean update(MqttServerDataUpdateDto updateDto) throws MqttException {
        MqttServerData entity = findById(updateDto.getId());
        assemble(entity, updateDto);

        if (!entity.getEnableFlag() && updateDto.getEnableFlag()) {
            if (!MqttClientListener.mqttClients.containsKey(entity.getId().toString())) {
                this.initMqttServer(entity);
            } else {

            }
        }
        if (entity.getEnableFlag() && !updateDto.getEnableFlag()) {
            MqttClientConnect mqttClientConnect = MqttClientListener.mqttClients.get(entity.getId().toString());
            mqttClientConnect.close();
        }


        return true;
    }

    @Override
    public MqttServerData findById(Long id) {
        return mqttServerDataRepository.findById(id).orElseThrow(() -> new NoEntityFoundException("No record found by id = " + id));
    }

    @Override
    @Transactional
    public boolean delete(Long id) throws MqttException {
        MqttServerData entity = findById(id);
        entity.setDeleted(true);

        MqttClientConnect mqttClientConnect = MqttClientListener.mqttClients.get(entity.getId().toString());
        if (Objects.nonNull(mqttClientConnect)) {
            mqttClientConnect.close();
        }

        return true;
    }

    @Override
    public List<MqttServerData> findByList(MqttServerDataQueryDto queryDto) {
        return mqttServerDataRepository.findAll(getSpecification(queryDto));
    }

    @Override
    public Page<MqttServerData> findByPage(MqttServerDataQueryDto queryDto, Pageable page) {
        return mqttServerDataRepository.findAll(getSpecification(queryDto), page);
    }

    @Override
    public void initMqttServer(MqttServerData mqttServerData) {
        try {
            MqttClientConnect mqttClientConnect = new MqttClientConnect();
            mqttClientConnect.setMqttClientId(UUID.randomUUID().toString());
            String host = "tcp://" + mqttServerData.getServerHost() + ":" + mqttServerData.getServerPort();
            mqttClientConnect.setMqttClient(host, UUID.randomUUID().toString(), mqttServerData.getUserName(), mqttServerData.getPassword(), new MqttClientCallback(String.valueOf(mqttServerData.getId())));
            //訂閱CPUId
            mqttClientConnect.sub(String.format(DEVICE_DOWN_TOPIC, ServerUtil.getCPUSerial()));
            MqttClientListener.mqttClients.put(String.valueOf(mqttServerData.getId()), mqttClientConnect);
            log.info("{}--連接成功!砰左!訂閱主題--{}", mqttServerData.getId(), mqttServerData.getClientTopic());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void addSubTopicForDevice(Device device) {
        MqttClientListener.mqttClients.forEach((key, value) -> {
            try {
                value.sub(String.format(DEVICE_DOWN_TOPIC, device.getName()));
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });

    }

    private MqttServerData from(MqttServerDataAddDto dto) {
        MqttServerData entity = new MqttServerData();
        BeanUtils.copyProperties(dto, entity);
        entity.setClientTopic("/topic/default");
        return entity;
    }

    private void assemble(MqttServerData entity, MqttServerDataUpdateDto updateDto) {
        BeanUtils.copyProperties(updateDto, entity);
    }

    private Specification<MqttServerData> getSpecification(MqttServerDataQueryDto queryDto) {
        return (root, query, builder) -> {
            List<Predicate> predicates = Lists.newArrayList();
            if (!StringUtils.isEmpty(queryDto.getServerHost())) {
                predicates.add(builder.like(root.get("serverHost"), "%".concat(queryDto.getServerHost()).concat("%")));
            }
            if (Objects.nonNull(queryDto.getEnableFlag())) {
                predicates.add(builder.isTrue(root.get("enableFlag")));
            }
            predicates.add(builder.isFalse(root.get("isDeleted")));
            return builder.and(predicates.toArray(new Predicate[0]));
        };
    }
}

MqttClientListener

項目啟動匿醒,監(jiān)聽主題

package com.huibo.acquisition.mqtt.listener;

import com.xx.xx.mqtt.config.MqttClientConnect;
import com.xx.xx.acquisition.mqtt.controller.dto.MqttServerDataQueryDto;
import com.xx.xx.acquisition.mqtt.entity.MqttServerData;
import com.xx.xx.acquisition.mqtt.service.MqttServerDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author :cola
 * @description: 項目啟動 監(jiān)聽主題
 * @date :2023/7/18 10:53
 */
@Slf4j
@Component
public class MqttClientListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private MqttServerDataService mqttServerDataService;

    /**
     * 客戶端
     */
    public static ConcurrentHashMap<String, MqttClientConnect> mqttClients = new ConcurrentHashMap();

    private final AtomicBoolean isInit = new AtomicBoolean(false);

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        //防止重復(fù)觸發(fā)
        if (!isInit.compareAndSet(false, true)) {
            return;
        }
        //TODO 需要傳入啟用參數(shù)
        List<MqttServerData> mqttServers = mqttServerDataService.findByList(new MqttServerDataQueryDto() {{
            setEnableFlag(Boolean.TRUE);
        }});
        try {
            if (!CollectionUtils.isEmpty(mqttServers)) {
                for (MqttServerData mqttServerData : mqttServers) {
                    mqttServerDataService.initMqttServer(mqttServerData);
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

MqttClientConnect

package com.xx.xx.mqtt.config;

import com.huibo.acquisition.mqtt.callback.MqttClientCallback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * @author :cola
 * @description: Mqtt
 * @date :2023/7/18 10:33
 */
@Slf4j
@Component
public class MqttClientConnect {

    private MqttClient mqttClient;
    /**
     * 系統(tǒng)的mqtt客戶端id
     */
    private String mqttClientId;

    /**
     * 客戶端connect連接mqtt服務(wù)器
     *
     * @param userName     用戶名
     * @param passWord     密碼
     * @param mqttCallback 回調(diào)函數(shù)
     **/
    public void setMqttClient(String host, String clientId, String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
        // 連接設(shè)置
        MqttConnectOptions options = mqttConnectOptions(host, clientId, userName, passWord);
        if (mqttCallback == null) {
            mqttClient.setCallback(new MqttClientCallback(mqttClientId));
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
    }

    /**
     * MQTT連接參數(shù)設(shè)置
     */
    private MqttConnectOptions mqttConnectOptions(String host, String clientId, String userName, String passWord) throws MqttException {
        // 創(chuàng)建 MQTT 客戶端對象
        mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
        // 連接設(shè)置
        MqttConnectOptions options = new MqttConnectOptions();
        // 設(shè)置連接用戶名
        options.setUserName(userName);
        // 設(shè)置連接密碼
        options.setPassword(passWord.toCharArray());
        // 設(shè)置超時時間,單位為秒
        options.setConnectionTimeout(30);
        // 設(shè)置心跳時間 單位為秒缠导,表示服務(wù)器每 30 秒向客戶端發(fā)送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(30);
        // 設(shè)置遺囑消息的話題廉羔,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
        options.setWill("willTopic", (mqttClientId + "與服務(wù)器斷開連接").getBytes(), 0, false);
        // 斷線自動重連
        options.setAutomaticReconnect(true);
        // 是否清空 session僻造,設(shè)置為 false 表示服務(wù)器會保留客戶端的連接記錄憋他,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
        // 設(shè)置為 true 表示每次連接到服務(wù)端都是以新的身份
        options.setCleanSession(true);
        return options;
    }

    /**
     * 關(guān)閉MQTT連接
     */
    public void close() throws MqttException {
        mqttClient.close();
        mqttClient.disconnect();
    }

    /**
     * 向某個主題發(fā)布消息 默認(rèn)qos:1
     *
     * @param topic:發(fā)布的主題
     * @param msg:發(fā)布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        try {
            mqttMessage.setPayload(msg.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            log.error(e.getMessage());
        }
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 向某個主題發(fā)布消息
     *
     * @param topic: 發(fā)布的主題
     * @param msg:   發(fā)布的消息
     * @param qos:   消息質(zhì)量    Qos:0、1髓削、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 訂閱多個主題 竹挡,此方法默認(rèn)的的Qos等級為:1
     *
     * @param topic 主題
     */
    public void sub(String[] topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 訂閱一個個主題 ,此方法默認(rèn)的的Qos等級為:1
     *
     * @param topic 主題
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 訂閱某一個主題立膛,可攜帶Qos
     *
     * @param topic 所要訂閱的主題
     * @param qos   消息質(zhì)量:0揪罕、1梯码、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }

    public String getMqttClientId() {
        return mqttClientId;
    }

    public void setMqttClientId(String mqttClientId) {
        this.mqttClientId = mqttClientId;
    }
}

MqttClientCallback

Mqtt的回調(diào)

package com.huibo.acquisition.mqtt.callback;

import com.huibo.acquisition.mqtt.listener.MqttClientListener;
import com.huibo.acquisition.service.DeviceService;
import com.huibo.acquisition.utils.ApplicationContextUtil;
import com.huibo.acquisition.utils.ServerUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.nio.charset.StandardCharsets;

import static com.huibo.acquisition.constant.GlobalConstant.DEVICE_DOWN_TOPIC;

/**
 * @author :cola
 * @description:Mqtt的回調(diào)
 * @date :2023/7/18 10:43
 */

@Slf4j
public class MqttClientCallback implements MqttCallbackExtended {

    /**
     * 系統(tǒng)的mqtt客戶端id
     */
    private String mqttClientId;

    public MqttClientCallback(String mqttClientId) {
        this.mqttClientId = mqttClientId;
    }

    /**
     * TODO subscribe訂閱后得到的消息會執(zhí)行到這里
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        String data = new String(message.getPayload(), StandardCharsets.UTF_8);
        System.out.printf("接收消息主題: %s%n", topic);
        System.out.printf("接收消息Qos: %d%n", message.getQos());
        // message.toString() 也可以拿到消息內(nèi)容
        System.out.printf("接收消息內(nèi)容: %s%n", data);
        System.out.printf("接收消息retained: %b%n", message.isRetained());

        //處理消息
        ApplicationContextUtil.getBean(DeviceService.class).handleMqttMessage(data);
    }

    /**
     * MQTT 斷開連接會執(zhí)行此方法
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.info("斷開了MQTT連接 :{}", cause.getMessage());
    }

    /**
     * publish發(fā)布成功后會執(zhí)行到這里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("發(fā)布消息成功");
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
            MqttClientListener.mqttClients.forEach((k, v) -> {
                try {
                    log.info("Mqtt重連成功:" + v.getMqttClientId() + " 已連接");
                    v.sub(String.format(DEVICE_DOWN_TOPIC, ServerUtil.getCPUSerial()));
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            });
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市好啰,隨后出現(xiàn)的幾起案子轩娶,更是在濱河造成了極大的恐慌,老刑警劉巖框往,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鳄抒,死亡現(xiàn)場離奇詭異,居然都是意外死亡椰弊,警方通過查閱死者的電腦和手機许溅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來男应,“玉大人闹司,你說我怎么就攤上這事娱仔°迤” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵牲迫,是天一觀的道長耐朴。 經(jīng)常有香客問我,道長盹憎,這世上最難降的妖魔是什么筛峭? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮陪每,結(jié)果婚禮上影晓,老公的妹妹穿的比我還像新娘。我一直安慰自己檩禾,他們只是感情好挂签,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著盼产,像睡著了一般饵婆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上戏售,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天侨核,我揣著相機與錄音,去河邊找鬼灌灾。 笑死搓译,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的锋喜。 我是一名探鬼主播些己,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了轴总?” 一聲冷哼從身側(cè)響起直颅,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎怀樟,沒想到半個月后功偿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡往堡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年械荷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虑灰。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡吨瞎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出穆咐,到底是詐尸還是另有隱情颤诀,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布对湃,位于F島的核電站崖叫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏拍柒。R本人自食惡果不足惜心傀,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拆讯。 院中可真熱鬧脂男,春花似錦、人聲如沸种呐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽陕贮。三九已至堕油,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肮之,已是汗流浹背掉缺。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留戈擒,地道東北人眶明。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像筐高,于是被迫代替她去往敵國和親搜囱。 傳聞我的和親對象是個殘疾皇子丑瞧,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容