- 該系列文章基于Thingsboard release-3.0分支的源碼進(jìn)行分析扭粱,可能與最新的特性有所區(qū)別糕簿。
初識項目
拉代碼
把我們把代碼pull下來探入,打開IDEA,相信大多數(shù)人的反應(yīng)會是懂诗,臥槽蜂嗽,這什么項目,怎么這么多模塊殃恒!
我剛把項目拉下來的時候也是一臉懵逼植旧,完全不知道這么多module是怎么劃分的,臉上大寫的兩個字离唐,臥槽病附!
整理思路
既來之則安之,雖然是滿臉的臥槽亥鬓,但也只能靜下心來慢慢分析了完沪。其實tb的源碼模塊化做的還是不錯的,起碼讓我寫這代碼我寫不到這程度嵌戈,不理解還是由于模塊之間的依賴關(guān)系不清晰導(dǎo)致的覆积。
找官方文檔
根據(jù)tb的官網(wǎng)我們知道tb主要由一下這幾種微服務(wù)
tb-http-transport
tb-mqtt-transport
tb-coap-transport
tb-core
tb-rule-engine
tb-js-executor
tb-web-ui
官網(wǎng)鏈接:[https://thingsboard.io/docs/reference/](https://thingsboard.io/docs/reference/)
從源碼里面找到對應(yīng)的服務(wù)
既然都已經(jīng)知道有哪些服務(wù)了,那我只要在這一堆模塊中找到對應(yīng)服務(wù)的啟動類就完事兒了熟呛,然后按照這個邏輯不就一下子就能找到對應(yīng)服務(wù)的代碼了嘛宽档。照著這個思路,一番查找之后找到了一下幾個服務(wù)
tb-http-transport---./transport/http
tb-mqtt-transport---./transport/mqtt
tb-coap-transport---./transport/coap
tb-js-executor---./msa/js-executor
tb-web-ui---./ui-ngx
其中js-executor和web-ui因為是ng和前端寫的庵朝,所以可以分開來看吗冤,但是這幾個transport就一個啟動類是個什么鬼,而且也沒有找到tb-core以及tb-rule-engine九府,只有ThingsboardInstallApplication和ThingsboardServerApplication這倆莫名其妙的啟動類椎瘟,這時候心里已經(jīng)開始罵娘了。
雖然沒法一下子就搞清楚項目昔逗,但起碼還是通過幾個啟動類把服務(wù)大致劃分了一下降传,接下來那就只能看一下這幾個啟動類對應(yīng)服務(wù)的依賴關(guān)系了,看看能不能找到一些線索勾怒。
分析服務(wù)引用
分析transport服務(wù)的引用
由于transport是一個比較獨立的模塊婆排,且不同的transport功能基本一致,所以先從這個開始分析笔链,我先選取了mqtt-transport這個模塊進(jìn)行分析段只,它的pom文件展示了它的依賴關(guān)系
<dependency>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>queue</artifactId>
</dependency>
可以看到這兩個主要的依賴,從名字上基本上可以猜到queue肯定是transport的管道服務(wù)鉴扫,因為之前搭建過tb的集群赞枕,所以基本猜到里面可能會有與kafka的管道相關(guān)的代碼,而mqtt這個依賴基本上可以猜到是一個mqttserver服務(wù),接下來就是分析這兩個模塊的源碼炕婶。
完整的依賴
從這張圖可以大致可以看出mqtt-transport這個服務(wù)通過transport-mqtt模塊實現(xiàn)一個mqttserver的功能姐赡,然后通過queue模塊與建立管道服務(wù)從而與其他服務(wù)進(jìn)行交互,接下來就是具體看這兩個模塊的代碼柠掂。
分析mqtt模塊源碼
源碼路徑位于 ./common/transport/mqtt
果然這個模塊就是一個用netty封裝的mqttserver项滑,而且還通過配置文件的條件來判斷是否要初始化這個mqttserver服務(wù)。
package org.thingsboard.server.transport.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @author Andrew Shvayka
*/
@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {
@Value("${transport.mqtt.bind_address}")
private String host;
@Value("${transport.mqtt.bind_port}")
private Integer port;
@Value("${transport.mqtt.netty.leak_detector_level}")
private String leakDetectorLevel;
@Value("${transport.mqtt.netty.boss_group_thread_count}")
private Integer bossGroupThreadCount;
@Value("${transport.mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount;
@Value("${transport.mqtt.netty.so_keep_alive}")
private boolean keepAlive;
@Autowired
private MqttTransportContext context;
private Channel serverChannel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
}
@PreDestroy
public void shutdown() throws InterruptedException {
log.info("Stopping MQTT transport!");
try {
serverChannel.close().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
log.info("MQTT transport stopped!");
}
}
對于熟悉netty的朋友來說這個就比較簡單了涯贞,就是根據(jù)handler來處理接收到的消息枪狂,這個其實就沒什么好多說了。不熟悉netty的朋友可以自己去百度一下就好了宋渔,就是簡單的接收到消息之后丟給org.thingsboard.server.transport.mqtt.MqttTransportHandler這個類來處理而已州疾,具體的消息處理則都是通過handler來進(jìn)行。
分析MqttTransportHandler
消息都是從channelRead方法進(jìn)入到handler皇拣,然后丟到processMqttMsg這個方法進(jìn)行具體的分類處理严蓖。
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
return;
}
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
}
break;
case DISCONNECT:
if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}
通過這個handler,消息會按照類型來做不通的處理审磁。越往下面看就會有更多的模塊之間的依賴谈飒,將模塊獨立出來分析反而不是很方便,所以接下來換一種思路态蒂,從一個完整的鑒權(quán)流程來分析源碼以及模塊之間的依賴關(guān)系。
分析連接鑒權(quán)流程
tb默認(rèn)的設(shè)備鑒權(quán)方式是token的方式费什,所以先挑這個來分析整個的數(shù)據(jù)上下行流程钾恢,因此我們詳細(xì)分析一下processAuthTokenConnect這個方法,源碼如下鸳址。
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
} else {
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
onValidateDeviceResponse(msg, ctx);
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
}
});
}
}
通過將token提取出來瘩蚪,然后拼裝成一個ValidateDeviceTokenRequestMsg對象之后調(diào)用transportService服務(wù)。其中transportService服務(wù)的實現(xiàn)類為org.thingsboard.server.common.transport.service.DefaultTransportService稿黍,該類屬于transport-api模塊疹瘦。
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport'")
public class DefaultTransportService implements TransportService {
@Value("${transport.rate_limits.enabled}")
private boolean rateLimitEnabled;
@Value("${transport.rate_limits.tenant}")
通過process將消息發(fā)送出去,process的時候又調(diào)用了transportApiRequestTemplate這個對象
@Override
public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
log.trace("Processing msg: {}", msg);
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
}
這個對象對應(yīng)的實現(xiàn)類位于queue模塊巡球,org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate言沐,他的send方法會返回一個ListenableFuture
@Override
public ListenableFuture<Response> send(Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
pendingRequests.remove(requestId);
future.setException(t);
}
});
return future;
}
send方法通過requestTemplate進(jìn)行消息的發(fā)送,因為使用的是Kafka作為消息管道酣栈,所以其實現(xiàn)類為queue模塊的org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate類险胰,調(diào)用其send方法將消息真實發(fā)送出去,其中producer就是一個kafka的producer實現(xiàn)矿筝,最后將信息發(fā)送到tb_transport.api.requests這個topical中起便。
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
createTopicIfNotExist(tpi);
String key = msg.getKey().toString();
byte[] data = msg.getData();
ProducerRecord<String, byte[]> record;
Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
}
} else {
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
}
});
}
乍看至下認(rèn)證的流程已經(jīng)完成了,其實仔細(xì)一看里面問題還很多,首先需要問的是這個認(rèn)證信息是如何返回給mqtt-transport的榆综?
像kafka這類消息管道都是異步進(jìn)行數(shù)據(jù)的傳輸?shù)拿畋裕园l(fā)送消息之后最多會收到一個是否發(fā)送成功的信息,實際上是沒有認(rèn)證的返回信息的鼻疮,但是在MqttTransportHandler的processAuthTokenConnect方法里面明確是有對返回信息做進(jìn)一步核實的代碼的细诸,那這個返回信息到底是怎么來的呢?
如何拿到認(rèn)證的返回信息
在tb的代碼中有很多類似的異步操作陋守,并且都是要對返回信息進(jìn)行確認(rèn)的震贵。比如這個認(rèn)證信息,其實是通過queue模塊的org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate方法來實現(xiàn)的水评,下面是處理的源碼
@Override
public void init() {
queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
this.requestTemplate.init();
tickTs = System.currentTimeMillis();
responseTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
try {
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.size());
} else {
continue;
}
responses.forEach(response -> {
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId;
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received: {}", requestId, response);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
expectedResponse.future.set(response);
}
}
});
responseTemplate.commit();
tickTs = System.currentTimeMillis();
tickSize = pendingRequests.size();
if (nextCleanupMs < tickTs) {
//cleanup;
pendingRequests.forEach((key, value) -> {
if (value.expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
if (staleRequest != null) {
log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException());
}
}
});
nextCleanupMs = tickTs + maxRequestTimeout;
}
} catch (Throwable e) {
log.warn("Failed to obtain responses from queue.", e);
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new responses", e2);
}
}
}
});
}
服務(wù)啟動的時候會創(chuàng)建一個response對應(yīng)topical的消費者猩系,對應(yīng)的topical格式為tb_transport.api.responses.hostname,然后從這個topical拉取消息中燥。
DefaultTbQueueRequestTemplate.send()方法在發(fā)送的時候會將當(dāng)前需要發(fā)送的消息存入pendingRequests對象寇甸,以requestId為唯一ID,將返回的Future裝箱之后作為value疗涉。當(dāng)從topical中獲取到消息拿霉,且ID可以對應(yīng)上的時候會將pendingRequests對應(yīng)key的value的狀態(tài)設(shè)置為已完成,然后出發(fā)回調(diào)咱扣。
回調(diào)之后就可以拿到core返回的認(rèn)證信息了绽淘,這樣就完成了一個完整的認(rèn)證流程。