整體架構(gòu)
官方文檔 https://github.com/kubeedge/kubeedge/blob/master/docs/mappers/modbus_mapper.md
從架構(gòu)圖上可以看出隐圾,mapper和外界,包括DeviceTwin唯一的交互途徑就是通過edge node內(nèi)部的MQTT broker.
mapper通過訂閱值朋、發(fā)布相關(guān)的topic砌庄,實(shí)現(xiàn)與EdgeCore或者其他自己定義的APP進(jìn)行交互
topic
先看下網(wǎng)關(guān)文檔中的說明(https://docs.kubeedge.io/en/latest/guides/message_topics.html)
1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
We will focus on the message expected on the first 3 topics.
"$hw/events/node/+/membership/get": This topics is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on "$hw/events/node/+/membership/get/result" topic.
"$hw/events/device/+/state/update”: This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated.
"$hw/events/device/+/twin/+": The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively.
Following is the explanation of the three suffix used:
update: this suffix is used to update the twin for the deviceID.
cloud_updated: this suffix is used to sync the twin status between edge and cloud.
get: is used to get twin status of a device. The response is published on "$hw/events/device/+/twin/get/result" topic.
另外绍撞,在eventbus的文檔中也有一段描述
- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+
在看下代碼,從modbus_mapper的constant.js中,可以看到mapper中的所有topic华烟。
const defaultTopicPrefix = '$hw/events/device/';
const defaultDirectTopicPrefix = '$hw/devices/';
const twinDeltaTopic = defaultTopicPrefix + '+/twin/update/delta';
const twinUpdateTopic = '/twin/update';
const twinGetResTopic = defaultTopicPrefix + '+/twin/get/result';
const twinGetTopic = '/twin/get';
const directGetTopic = '/events/properties/get';
從代碼中可以看出心墅,有兩類topic酿矢,一類是類似廣播類型的hw/devices/。
另外棠涮,文檔和代碼中的topic是有區(qū)別的谬哀,代碼中并沒有node、SYS相關(guān)的topic
topic詳細(xì)分析
基本概念
mqtt的topic是支持通配符的(wildcard)严肪,有3類通配符史煎,分別是
- + 加號(hào),匹配一段
- # 井號(hào)驳糯,匹配多段
- $ 美元符篇梭,用在首位,防止被+酝枢、#通配符匹配
mqtt client是可以訂閱多個(gè)topic的(傳入topic列表)恬偷,不過modbus_mapper的實(shí)現(xiàn)中,并沒有用這種方式帘睦,而是每個(gè)client只用來發(fā)布袍患、訂閱一個(gè)topic。
這種方式下竣付,on_message就不需要在判斷收到的msg是相應(yīng)哪個(gè)topic的了诡延,可以直接進(jìn)入處理流程。
node/+/membership
在server.go中古胆,有相關(guān)的訂閱肆良,但是在mapper中沒看到有推送,這里后續(xù)再研究
// onSubscribe will be called if the topic is matched in topic tree.
func (m *Server) onSubscribe(msg *packet.Message) {
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic))
if strings.HasPrefix(msg.Topic, "$hw/events/device") || strings.HasPrefix(msg.Topic, "$hw/events/node") {
target = modules.TwinGroup
} else {
target = modules.HubGroup
if msg.Topic == "SYS/dis/upload_records" {
resource = "SYS/dis/upload_records"
}
}
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
message := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
resource, "response").FillBody(string(msg.Payload))
klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, resource))
beehiveContext.SendToGroup(target, *message)
}
twinGetResTopic
$hw/events/device/+/twin/get/result
Mapper訂閱這個(gè)twinGetResTopic逸绎,注意中間的通配符加號(hào)惹恃,這里匹配的應(yīng)該是任意設(shè)備ID
mqtt_client.on('connect', ()=>{
logger.info('connetced to edge mqtt with topic twinGet');
mqtt_client.subscribe(constant.twinGetResTopic);
for (let instance of devIns) {
dt.getActuals(instance[0]);
}
});
對(duì)這個(gè)訂閱,收到的消息處理代碼如下
mqtt_client.on('message', (topic, message)=>{
try {
var msgGet = JSON.parse(message.toString());
} catch (err) {
logger.error('unmarshal error');
return;
}
let resources = topic.toString().split('/');
let deviceID = resources[3];
let dt = new DeviceTwin(mqtt_client);
let devProtocol, devInstance;
if (devPro.has(deviceID) && devIns.has(deviceID)) {
devProtocol = devPro.get(deviceID);
devInstance = devIns.get(deviceID);
} else {
logger.error('match visitor failed');
}
logger.info('recieve twinGet msg, set properties actual value map');
if (resources.length === 7 && resources[5] === 'get' && msgGet != null && msgGet.code != 404 && typeof(devProtocol) != 'undefined' && typeof(devInstance) != 'undefined') {
dt.setActuals(msgGet, (PropActuals)=>{
for (let actual of PropActuals) {
ActualVal.set(util.format('%s-%s', deviceID, actual[0]), actual[1]);
}
});
dt.setExpecteds(msgGet, (PropExpecteds)=>{
for (let expected of PropExpecteds) {
modbusProtocolTransfer(devProtocol.protocol, (transferedProtocol)=>{
if (modVistr.has(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol))) {
let visitor = modVistr.get(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol));
dealDeltaMsg(msgGet, expected[0], visitor, devProtocol, expected[1]);
}
});
}
});
}
}
代碼中首先根據(jù)topic的名稱取出device id棺牧,然后根據(jù)本地緩存的device profile檢查是否存在這個(gè)id巫糙,如果存在,那么取出改設(shè)備對(duì)應(yīng)的協(xié)議陨帆、實(shí)例等信息曲秉,然后依次調(diào)用
setActuals和setExpecteds方法。
setActuals
取出topic消息體中個(gè)屬性的actual的值疲牵,然后放到ActualVal這個(gè)全局的map中保存承二。
(從代碼中看,setActuals并不會(huì)改變?cè)O(shè)備上的數(shù)據(jù)纲爸,只會(huì)記錄在ActualVal亥鸠。而ActualVal是會(huì)定期和設(shè)備上實(shí)際的數(shù)據(jù)進(jìn)行同步的,并且以設(shè)備數(shù)據(jù)為準(zhǔn),所以這里setActual的目的不明)
setExpecteds
取出topic消息體中個(gè)屬性的expected的值(不能同時(shí)有actual和expected负蚊,否則是不做處理的)神妹,然后調(diào)用協(xié)議驅(qū)動(dòng)(比如modbus協(xié)議驅(qū)動(dòng)),將值寫入到設(shè)備中家妆。
twinGetTopic
除了訂閱和處理twinGetResTopic消息鸵荠,index.js在初始化的時(shí)候,還干了一件事:遍歷設(shè)備伤极,調(diào)用deviceTwin的getActuals
// getActuals publish get devicetwin msg to edge mqtt
getActuals(deviceID) {
let payload_msg = {
event_id: "",
timestamp: new Date().getTime()
};
this.mqttClient.publish(constant.defaultTopicPrefix + deviceID + constant.twinGetTopic, JSON.stringify(payload_msg));
}
topic由具體的deviceID一起組成蛹找,'$hw/events/device/123456789/twin/get
device的列表是由configMap傳入并維護(hù)的,mapper會(huì)為每個(gè)device都發(fā)布一個(gè)獨(dú)立的twinGetTopic
上一節(jié)分析的twinGetResTopic哨坪,實(shí)際就是對(duì)twinGetTopic的response庸疾。deviceID是從configMap中取出來的,因此当编,mapper是根據(jù)configMap中配置的device instance列表届慈,發(fā)布twinGetTopic,edgecore收到topic后忿偷,發(fā)布twinGetResTopic金顿。mapper通過訂閱twinGetResTopic就可以獲得所有device instance從云端發(fā)來的數(shù)據(jù)。
twinDeltaTopic
$hw/events/device/+/twin/update/delta
twinDeltaTopic是在一個(gè)新的client中訂閱的牵舱,和之前twinGetResTopic處理形式類似串绩,,也是從topic中取出device的id芜壁,然后再緩存中找到該設(shè)備對(duì)應(yīng)的profile信息
然后從topic中的msg中取出數(shù)據(jù),遍歷每個(gè)key(也就是property的名稱)通過如下方法進(jìn)行處理:
DeviceTwin.syncExpected(msg, key, (value)=>{
dealDeltaMsg(msg, key, visitor, devProtocol, value);
});
// syncExpected check whether expected value should be update to device
static syncExpected(delta, key, callback) {
let deviceTwin = delta.twin[key];
if (!delta.twin.hasOwnProperty(key)) {
logger.error("Invalid device twin ", key);
return;
}
if (!deviceTwin.hasOwnProperty('actual') ||
(deviceTwin.hasOwnProperty('expected') && deviceTwin.expected.hasOwnProperty('metadata') && deviceTwin.actual.hasOwnProperty('metadata') &&
deviceTwin.expected.metadata.timestamp > deviceTwin.actual.metadata.timestamp &&
deviceTwin.expected.value !== deviceTwin.actual.value)) {
callback(deviceTwin.expected.value);
}
}
這里邏輯還比較復(fù)雜高氮,總體上就是deviceTwin這個(gè)對(duì)象中需要有expected慧妄,但沒有actual,并且expected的更新時(shí)間晚于actual剪芍,并且expected和actual值不等的時(shí)候塞淹,調(diào)用dealDeltaMsg對(duì)實(shí)際設(shè)備上的寄存器進(jìn)行更新。
deviceProfile更新
index.js會(huì)監(jiān)聽deviceProfile更新罪裹,當(dāng)cloud通過configmap更新deviceProfile時(shí)饱普,會(huì)重新創(chuàng)建一個(gè)mqtt的client,來監(jiān)聽twinGetResTopic的topic状共。
對(duì)topic的處理和之前寫的twinGetResTopic處理邏輯基本一致套耕,只是少了setActuals的步驟,而只調(diào)用setExpecteds峡继。
從這里也可以看出冯袍,setActuals只是mapper第一次啟動(dòng)的時(shí)候進(jìn)行一下初始化。
syncDeviceTwin定時(shí)任務(wù)
syncDeviceTwin這個(gè)定時(shí)任務(wù)每?jī)擅雸?zhí)行一次
定時(shí)任務(wù)中,遍歷本地緩存總的所有設(shè)備康愤,通過modbus讀取設(shè)備上最新的數(shù)據(jù)儡循,然后更新本地的ActualVals緩存,
twinUpdateTopic
然后調(diào)用updateActual方法征冷,通過$hw/events/device/123456789/twin/update topic推送device twin的更新
directGetTopic
接著調(diào)用UpdateDirectActuals方法择膝,往$hw/devices/123456789/events/properties/get 的topic中推送最新的設(shè)備屬性數(shù)據(jù)
兩者區(qū)別
從上述分析可以看出,定時(shí)任務(wù)會(huì)通過mqtt推送兩類數(shù)據(jù)检激,看一下他們的區(qū)別调榄。
- directGetTopic只在每次定時(shí)任務(wù)結(jié)束的時(shí)候調(diào)用一次,而twinUpdateTopic是每個(gè)設(shè)備屬性的值發(fā)生變化時(shí)呵扛,就調(diào)用一次每庆。
- directGetTopic每次更新這個(gè)設(shè)備的所有屬性,twinUpdateTopic每次只更新一個(gè)設(shè)備的一個(gè)屬性
- directGetTopic有些額外的字段今穿,比如route缤灵、content與header字段(具體用途見edge core的device twin模塊分析)
Edge端
edged的eventbus會(huì)監(jiān)控邊緣MQTT的topic,根據(jù)topic名稱和內(nèi)容進(jìn)行相應(yīng)處理蓝晒。
eventbus
首先確認(rèn)一點(diǎn)腮出,所有與MQTT broker的交互,都是通過eventbus來做的芝薇,eventbus從MQTT訂閱消息以后胚嘲,再通過beehive發(fā)送到其他模塊,如devicetwin模塊
// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
target = modules.TwinGroup
} else {
target = modules.HubGroup
if message.Topic() == "SYS/dis/upload_records" {
resource = "SYS/dis/upload_records"
}
}
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
resource, "response").FillBody(string(message.Payload()))
klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
beehiveContext.SendToGroup(target, *msg)
}
// BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
msg.SetRoute(source, group)
msg.SetResourceOperation(res, opr)
return msg
}
resource是通過base64編碼的完整的topic的名字
target對(duì)于"hw/events/node"這兩個(gè)類型的topic洛二,target是 twinGroup馋劈,否則是HubGroup
router key是
routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
根據(jù)BuildRouter的參數(shù)可以看出,這里的source是bus晾嘶,group是user
devicetwin
參考官方文檔 https://github.com/kubeedge/kubeedge/blob/master/docs/modules/edge/devicetwin.md
devicetwin從beehive中取到group為twin的消息妓雾,
然后調(diào)用classifyMsg來對(duì)消息進(jìn)行分類。這里的分類方式垒迂,對(duì)devicetwin的處理流程有比較大的幫助械姻,所以仔細(xì)看下
func classifyMsg(message *dttype.DTMessage) bool {
if EventActionMap == nil {
initEventActionMap()
}
var identity string
var action string
msgSource := message.Msg.GetSource()
if strings.Compare(msgSource, "bus") == 0 {
idLoc := 3
topic := message.Msg.GetResource()
topicByte, err := base64.URLEncoding.DecodeString(topic)
if err != nil {
return false
}
topic = string(topicByte)
klog.Infof("classify the msg with the topic %s", topic)
splitString := strings.Split(topic, "/")
if len(splitString) == 4 {
if strings.HasPrefix(topic, dtcommon.LifeCycleConnectETPrefix) {
action = dtcommon.LifeCycle
} else if strings.HasPrefix(topic, dtcommon.LifeCycleDisconnectETPrefix) {
action = dtcommon.LifeCycle
} else {
return false
}
} else {
identity = splitString[idLoc]
loc := strings.Index(topic, identity)
nextLoc := loc + len(identity)
prefix := topic[0:loc]
suffix := topic[nextLoc:]
klog.Infof("%s %s", prefix, suffix)
if v, exist := EventActionMap[prefix][suffix]; exist {
action = v
} else {
return false
}
}
message.Msg.Content = []byte((message.Msg.Content).(string))
message.Identity = identity
message.Action = action
klog.Infof("Classify the msg to action %s", action)
return true
} else if (strings.Compare(msgSource, "edgemgr") == 0) || (strings.Compare(msgSource, "devicecontroller") == 0) {
switch message.Msg.Content.(type) {
case []byte:
klog.Info("Message content type is []byte, no need to marshal again")
default:
content, err := json.Marshal(message.Msg.Content)
if err != nil {
return false
}
message.Msg.Content = content
}
if strings.Contains(message.Msg.Router.Resource, "membership/detail") {
message.Action = dtcommon.MemDetailResult
return true
} else if strings.Contains(message.Msg.Router.Resource, "membership") {
message.Action = dtcommon.MemUpdated
return true
} else if strings.Contains(message.Msg.Router.Resource, "twin/cloud_updated") {
message.Action = dtcommon.TwinCloudSync
resources := strings.Split(message.Msg.Router.Resource, "/")
message.Identity = resources[1]
return true
} else if strings.Contains(message.Msg.Router.Operation, "updated") {
resources := strings.Split(message.Msg.Router.Resource, "/")
if len(resources) == 2 && strings.Compare(resources[0], "device") == 0 {
message.Action = dtcommon.DeviceUpdated
message.Identity = resources[1]
}
return true
}
return false
} else if strings.Compare(msgSource, "edgehub") == 0 {
if strings.Compare(message.Msg.Router.Resource, "node/connection") == 0 {
message.Action = dtcommon.LifeCycle
return true
}
return false
}
return false
}
classifyMsg中首先判斷的是msg的source,從代碼中看机断,分別有bus楷拳、edgemgr、devicecontroller吏奸、edgehub四種欢揖。
從eventbus中過來的消息,就是bus苦丁;從edgehub過來的消息目前看就只有心跳浸颓;edgemgr的source從代碼中沒看到,目前應(yīng)該是用不到;devicecontroller過來的消息應(yīng)該是云端對(duì)device的操作产上;
這里在分析mapper與devicetwin的交互棵磷,所以先看bus的類型的消息。
對(duì)于"$hw/events/connected/%s"類型的消息晋涣,是心跳類型仪媒,返回false,也就是不需要處理
對(duì)于其他消息谢鹊,也就是"$hw/events/device/123456789/twin/update"的消息算吩,identity就是取的第四個(gè)字段,也就是device的ID
action則是根據(jù)前綴$hw/events/device/和后綴twin/update這兩部分查表(EventActionMap)查出來的佃扼,
通過這兩部分進(jìn)行查表偎巢,最終進(jìn)入到處理方法dealTwinUpdate方法中
遺留問題
twinDeltaTopic和twinGetResTopic區(qū)別
twinDeltaTopic和twinGetResTopic都可以更新設(shè)備上的值,兩者的區(qū)別在哪里兼耀?
- BuildDeviceTwinResult
- BuildDeviceTwinDelta
directGetTopic沒有接受方
directGetTopic從代碼中看压昼,是沒有接收方到,也就是說這個(gè)消息不會(huì)被處理
mapper更新了所有屬性
從modbus mapper中可以看到msg twin的一個(gè)結(jié)構(gòu)瘤运,即一個(gè)key是Property名稱的字典
let reply_msg = {
event_id: "",
timestamp: new Date().getTime()
};
let twin = {};
twin[property.name] = {
actual: {
value: String(value),
metadata: {
timestamp: new Date().getTime()
}
},
metadata: {
tyep: property.dataType
}
};
reply_msg.twin = twin;
所以窍霞,最終eventbus就是從modbus mapper中取出msg twin來進(jìn)行后續(xù)處理。
但這里其實(shí)是有個(gè)問題的拯坟,modbus mapper的當(dāng)前實(shí)現(xiàn)上但金,沒有判斷一個(gè)屬性是否是twin屬性,
直接將所有屬性都做為twin屬性給publish了郁季,這個(gè)問題在后續(xù)應(yīng)該會(huì)進(jìn)行修復(fù)冷溃。