Thingsboard源碼探索(1)

  • 該系列文章基于Thingsboard release-3.0分支的源碼進(jìn)行分析扭粱,可能與最新的特性有所區(qū)別糕簿。

初識項目

拉代碼

把我們把代碼pull下來探入,打開IDEA,相信大多數(shù)人的反應(yīng)會是懂诗,臥槽蜂嗽,這什么項目,怎么這么多模塊殃恒!

我剛把項目拉下來的時候也是一臉懵逼植旧,完全不知道這么多module是怎么劃分的,臉上大寫的兩個字离唐,臥槽病附!

整理思路

既來之則安之,雖然是滿臉的臥槽亥鬓,但也只能靜下心來慢慢分析了完沪。其實tb的源碼模塊化做的還是不錯的,起碼讓我寫這代碼我寫不到這程度嵌戈,不理解還是由于模塊之間的依賴關(guān)系不清晰導(dǎo)致的覆积。

找官方文檔

根據(jù)tb的官網(wǎng)我們知道tb主要由一下這幾種微服務(wù)

tb-http-transport 
tb-mqtt-transport
tb-coap-transport
tb-core
tb-rule-engine
tb-js-executor
tb-web-ui

官網(wǎng)鏈接:[https://thingsboard.io/docs/reference/](https://thingsboard.io/docs/reference/)

從源碼里面找到對應(yīng)的服務(wù)

既然都已經(jīng)知道有哪些服務(wù)了,那我只要在這一堆模塊中找到對應(yīng)服務(wù)的啟動類就完事兒了熟呛,然后按照這個邏輯不就一下子就能找到對應(yīng)服務(wù)的代碼了嘛宽档。照著這個思路,一番查找之后找到了一下幾個服務(wù)

tb-http-transport---./transport/http
tb-mqtt-transport---./transport/mqtt
tb-coap-transport---./transport/coap
tb-js-executor---./msa/js-executor
tb-web-ui---./ui-ngx

其中js-executor和web-ui因為是ng和前端寫的庵朝,所以可以分開來看吗冤,但是這幾個transport就一個啟動類是個什么鬼,而且也沒有找到tb-core以及tb-rule-engine九府,只有ThingsboardInstallApplication和ThingsboardServerApplication這倆莫名其妙的啟動類椎瘟,這時候心里已經(jīng)開始罵娘了。

雖然沒法一下子就搞清楚項目昔逗,但起碼還是通過幾個啟動類把服務(wù)大致劃分了一下降传,接下來那就只能看一下這幾個啟動類對應(yīng)服務(wù)的依賴關(guān)系了,看看能不能找到一些線索勾怒。

分析服務(wù)引用

分析transport服務(wù)的引用

由于transport是一個比較獨立的模塊婆排,且不同的transport功能基本一致,所以先從這個開始分析笔链,我先選取了mqtt-transport這個模塊進(jìn)行分析段只,它的pom文件展示了它的依賴關(guān)系

        <dependency>
            <groupId>org.thingsboard.common.transport</groupId>
            <artifactId>mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.thingsboard.common</groupId>
            <artifactId>queue</artifactId>
        </dependency>

可以看到這兩個主要的依賴,從名字上基本上可以猜到queue肯定是transport的管道服務(wù)鉴扫,因為之前搭建過tb的集群赞枕,所以基本猜到里面可能會有與kafka的管道相關(guān)的代碼,而mqtt這個依賴基本上可以猜到是一個mqttserver服務(wù),接下來就是分析這兩個模塊的源碼炕婶。

完整的依賴

transport-dependency.png

從這張圖可以大致可以看出mqtt-transport這個服務(wù)通過transport-mqtt模塊實現(xiàn)一個mqttserver的功能姐赡,然后通過queue模塊與建立管道服務(wù)從而與其他服務(wù)進(jìn)行交互,接下來就是具體看這兩個模塊的代碼柠掂。

分析mqtt模塊源碼

源碼路徑位于 ./common/transport/mqtt

果然這個模塊就是一個用netty封裝的mqttserver项滑,而且還通過配置文件的條件來判斷是否要初始化這個mqttserver服務(wù)。

package org.thingsboard.server.transport.mqtt;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @author Andrew Shvayka
 */
@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        log.info("Mqtt transport started!");
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        log.info("Stopping MQTT transport!");
        try {
            serverChannel.close().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        log.info("MQTT transport stopped!");
    }
}

對于熟悉netty的朋友來說這個就比較簡單了涯贞,就是根據(jù)handler來處理接收到的消息枪狂,這個其實就沒什么好多說了。不熟悉netty的朋友可以自己去百度一下就好了宋渔,就是簡單的接收到消息之后丟給org.thingsboard.server.transport.mqtt.MqttTransportHandler這個類來處理而已州疾,具體的消息處理則都是通過handler來進(jìn)行。

分析MqttTransportHandler

消息都是從channelRead方法進(jìn)入到handler皇拣,然后丟到processMqttMsg這個方法進(jìn)行具體的分類處理严蓖。

    private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            processDisconnect(ctx);
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx, msg)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                    transportService.reportActivity(sessionInfo);
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx, msg)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

通過這個handler,消息會按照類型來做不通的處理审磁。越往下面看就會有更多的模塊之間的依賴谈飒,將模塊獨立出來分析反而不是很方便,所以接下來換一種思路态蒂,從一個完整的鑒權(quán)流程來分析源碼以及模塊之間的依賴關(guān)系。

分析連接鑒權(quán)流程

tb默認(rèn)的設(shè)備鑒權(quán)方式是token的方式费什,所以先挑這個來分析整個的數(shù)據(jù)上下行流程钾恢,因此我們詳細(xì)分析一下processAuthTokenConnect這個方法,源碼如下鸳址。

    private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        String userName = msg.payload().userName();
        log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
        if (StringUtils.isEmpty(userName)) {
            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
            ctx.close();
        } else {
            transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
                    new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                        @Override
                        public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
                            onValidateDeviceResponse(msg, ctx);
                        }

                        @Override
                        public void onError(Throwable e) {
                            log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
                            ctx.close();
                        }
                    });
        }
    }

通過將token提取出來瘩蚪,然后拼裝成一個ValidateDeviceTokenRequestMsg對象之后調(diào)用transportService服務(wù)。其中transportService服務(wù)的實現(xiàn)類為org.thingsboard.server.common.transport.service.DefaultTransportService稿黍,該類屬于transport-api模塊疹瘦。

@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport'")
public class DefaultTransportService implements TransportService {

    @Value("${transport.rate_limits.enabled}")
    private boolean rateLimitEnabled;
    @Value("${transport.rate_limits.tenant}")

通過process將消息發(fā)送出去,process的時候又調(diào)用了transportApiRequestTemplate這個對象

@Override
    public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", msg);
        TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
        AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
                response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
    }

這個對象對應(yīng)的實現(xiàn)類位于queue模塊巡球,org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate言沐,他的send方法會返回一個ListenableFuture

@Override
    public ListenableFuture<Response> send(Request request) {
        if (tickSize > maxPendingRequests) {
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        UUID requestId = UUID.randomUUID();
        request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
        request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
        request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
        SettableFuture<Response> future = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
        pendingRequests.putIfAbsent(requestId, responseMetaData);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
        requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
            @Override
            public void onSuccess(TbQueueMsgMetadata metadata) {
                log.trace("[{}] Request sent: {}", requestId, metadata);
            }

            @Override
            public void onFailure(Throwable t) {
                pendingRequests.remove(requestId);
                future.setException(t);
            }
        });
        return future;
    }

send方法通過requestTemplate進(jìn)行消息的發(fā)送,因為使用的是Kafka作為消息管道酣栈,所以其實現(xiàn)類為queue模塊的org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate類险胰,調(diào)用其send方法將消息真實發(fā)送出去,其中producer就是一個kafka的producer實現(xiàn)矿筝,最后將信息發(fā)送到tb_transport.api.requests這個topical中起便。

@Override
    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
        createTopicIfNotExist(tpi);
        String key = msg.getKey().toString();
        byte[] data = msg.getData();
        ProducerRecord<String, byte[]> record;
        Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
        record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                if (callback != null) {
                    callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
                }
            } else {
                if (callback != null) {
                    callback.onFailure(exception);
                } else {
                    log.warn("Producer template failure: {}", exception.getMessage(), exception);
                }
            }
        });
    }

乍看至下認(rèn)證的流程已經(jīng)完成了,其實仔細(xì)一看里面問題還很多,首先需要問的是這個認(rèn)證信息是如何返回給mqtt-transport的榆综?

像kafka這類消息管道都是異步進(jìn)行數(shù)據(jù)的傳輸?shù)拿畋裕园l(fā)送消息之后最多會收到一個是否發(fā)送成功的信息,實際上是沒有認(rèn)證的返回信息的鼻疮,但是在MqttTransportHandler的processAuthTokenConnect方法里面明確是有對返回信息做進(jìn)一步核實的代碼的细诸,那這個返回信息到底是怎么來的呢?

如何拿到認(rèn)證的返回信息

在tb的代碼中有很多類似的異步操作陋守,并且都是要對返回信息進(jìn)行確認(rèn)的震贵。比如這個認(rèn)證信息,其實是通過queue模塊的org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate方法來實現(xiàn)的水评,下面是處理的源碼

    @Override
    public void init() {
        queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
        this.requestTemplate.init();
        tickTs = System.currentTimeMillis();
        responseTemplate.subscribe();
        executor.submit(() -> {
            long nextCleanupMs = 0L;
            while (!stopped) {
                try {
                    List<Response> responses = responseTemplate.poll(pollInterval);
                    if (responses.size() > 0) {
                        log.trace("Polling responses completed, consumer records count [{}]", responses.size());
                    } else {
                        continue;
                    }
                    responses.forEach(response -> {
                        byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
                        UUID requestId;
                        if (requestIdHeader == null) {
                            log.error("[{}] Missing requestId in header and body", response);
                        } else {
                            requestId = bytesToUuid(requestIdHeader);
                            log.trace("[{}] Response received: {}", requestId, response);
                            ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
                            if (expectedResponse == null) {
                                log.trace("[{}] Invalid or stale request", requestId);
                            } else {
                                expectedResponse.future.set(response);
                            }
                        }
                    });
                    responseTemplate.commit();
                    tickTs = System.currentTimeMillis();
                    tickSize = pendingRequests.size();
                    if (nextCleanupMs < tickTs) {
                        //cleanup;
                        pendingRequests.forEach((key, value) -> {
                            if (value.expTime < tickTs) {
                                ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
                                if (staleRequest != null) {
                                    log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
                                    staleRequest.future.setException(new TimeoutException());
                                }
                            }
                        });
                        nextCleanupMs = tickTs + maxRequestTimeout;
                    }
                } catch (Throwable e) {
                    log.warn("Failed to obtain responses from queue.", e);
                    try {
                        Thread.sleep(pollInterval);
                    } catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new responses", e2);
                    }
                }
            }
        });
    }

服務(wù)啟動的時候會創(chuàng)建一個response對應(yīng)topical的消費者猩系,對應(yīng)的topical格式為tb_transport.api.responses.hostname,然后從這個topical拉取消息中燥。

DefaultTbQueueRequestTemplate.send()方法在發(fā)送的時候會將當(dāng)前需要發(fā)送的消息存入pendingRequests對象寇甸,以requestId為唯一ID,將返回的Future裝箱之后作為value疗涉。當(dāng)從topical中獲取到消息拿霉,且ID可以對應(yīng)上的時候會將pendingRequests對應(yīng)key的value的狀態(tài)設(shè)置為已完成,然后出發(fā)回調(diào)咱扣。

回調(diào)之后就可以拿到core返回的認(rèn)證信息了绽淘,這樣就完成了一個完整的認(rèn)證流程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末闹伪,一起剝皮案震驚了整個濱河市沪铭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌偏瓤,老刑警劉巖杀怠,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異厅克,居然都是意外死亡赔退,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門证舟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硕旗,“玉大人,你說我怎么就攤上這事褪储÷芽剩” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵鲤竹,是天一觀的道長浪读。 經(jīng)常有香客問我昔榴,道長,這世上最難降的妖魔是什么碘橘? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任互订,我火速辦了婚禮,結(jié)果婚禮上痘拆,老公的妹妹穿的比我還像新娘仰禽。我一直安慰自己,他們只是感情好纺蛆,可當(dāng)我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布吐葵。 她就那樣靜靜地躺著,像睡著了一般桥氏。 火紅的嫁衣襯著肌膚如雪温峭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天字支,我揣著相機與錄音凤藏,去河邊找鬼。 笑死堕伪,一個胖子當(dāng)著我的面吹牛揖庄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播欠雌,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼蹄梢,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了桨昙?” 一聲冷哼從身側(cè)響起检号,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蛙酪,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體翘盖,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡桂塞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了馍驯。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阁危。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖汰瘫,靈堂內(nèi)的尸體忽然破棺而出狂打,到底是詐尸還是另有隱情,我是刑警寧澤混弥,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布趴乡,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏晾捏。R本人自食惡果不足惜蒿涎,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惦辛。 院中可真熱鬧劳秋,春花似錦、人聲如沸胖齐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽呀伙。三九已至补履,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間区匠,已是汗流浹背干像。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留驰弄,地道東北人麻汰。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像戚篙,于是被迫代替她去往敵國和親五鲫。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容