概述
首先奶浦,明確DeviceTwin模塊在KubeEdge架構(gòu)中所處的位置,如下圖所示:
其中,黃色部分是KubEedge設(shè)備管理鏈路相關(guān)的組件。
DeviceTwin組件主要負(fù)責(zé)設(shè)備相關(guān)的處理工作联逻,比如同步設(shè)備的收集的實時數(shù)據(jù)、綁定設(shè)備與邊緣節(jié)點的關(guān)系检痰,等等包归。其內(nèi)部又分為四個子模塊,分別是membership攀细、communication箫踩、device和twin。如下圖所示:
在進一步理解DeviceTwin之前谭贪,這里需要讀者先明確的幾個概念,即:
- 設(shè)備屬性(device attribute/property):設(shè)備屬性可以理解為設(shè)備的元數(shù)據(jù)(或稱之為靜態(tài)屬性)锦担,負(fù)責(zé)描述設(shè)備的詳細(xì)信息俭识,定義好之后一般是不會變的。比如說溫度計設(shè)備洞渔,它的作用是負(fù)責(zé)采集環(huán)境溫度套媚。顯然,這種設(shè)備一般會有一個名為“temperature”的屬性磁椒,該屬性的類型也許是“float”類型堤瘤,表示溫度的數(shù)據(jù)類型,等等浆熔。
- 設(shè)備狀態(tài)(device status):設(shè)備狀態(tài)指的是設(shè)備是否在線本辐,一般有“online”、“offline”和“unknown”3種定義医增。
- 設(shè)備孿生(device twin):設(shè)備孿生則是設(shè)備的動態(tài)屬性慎皱,表示具體設(shè)備的專有實時數(shù)據(jù),例如燈的開/關(guān)狀態(tài)叶骨、溫度計真實采集到的溫度值茫多,等等。在設(shè)備孿生(twin)中忽刽,進一步定義了“desired value(期望值)”和“reported value(真實值)”天揖。其中夺欲,“desired value”指的是控制面希望設(shè)備達(dá)到的狀態(tài),比如今膊,用戶遠(yuǎn)程打開燈洁闰,由用戶發(fā)出的指令即屬于期望值;而“reported value”指的是設(shè)備上報給控制面的真實值万细,比如扑眉,溫度計采集的溫度值。需要注意的是赖钞,并不是每種設(shè)備都必須存在“desired value”腰素,但一般都會有“reported value”。對于燈這類讀寫(readwrite)設(shè)備而言雪营,我們既可以讀取其上報的真實值(開或關(guān))弓千,也可以控制它的狀態(tài)(開或關(guān));而對于溫度計這類只讀(readonly)設(shè)備而言献起,我們只能讀取其上報的真實值洋访,而無需設(shè)置其期望值。
明確以上3個概念是理解KubeEdge中Device API設(shè)計的核心谴餐,也有助于理解DeviceTwin組件的源碼姻政。
總的來說,DeviceTwin組件負(fù)責(zé)溝通協(xié)調(diào)四個子模塊的工作岂嗓,具體包含4個方面汁展,即:1)同步設(shè)備數(shù)據(jù);2)注冊并啟動子模塊厌殉;3)根據(jù)消息類型分別向子模塊進行消息分發(fā)食绿;4)對子模塊進行健康檢查。更具體的公罕,四個子模塊的作用分別如下:
- Membership Module:該模塊主要負(fù)責(zé)綁定新加入的設(shè)備與指定的邊緣節(jié)點(其實就是NodeSelector的體現(xiàn))器紧。比如溫度傳感器關(guān)聯(lián)在邊緣節(jié)點node-A上,藍(lán)牙音箱關(guān)聯(lián)在了節(jié)點node-B上楼眷,如果云端要控制藍(lán)牙音箱铲汪,那就要把數(shù)據(jù)準(zhǔn)確的推到node-B上。
- Twin Module:該模塊主要負(fù)責(zé)所有設(shè)備孿生相關(guān)的操作摩桶。比如桥状,設(shè)備孿生更新(device twin update)、設(shè)備孿生獲认跚濉(device twin get)和設(shè)備孿生同步至云端(device twin sync-to-cloud)辅斟。
- Communication Module:該模塊主要負(fù)責(zé)各個子模塊之間的通信。
- Device Module:該模塊主要負(fù)責(zé)執(zhí)行設(shè)備相關(guān)的操作芦拿,比如處理設(shè)備狀態(tài)(device status)更新和設(shè)備屬性(device attribute)更新士飒。
(PS. 如果不搞清楚“設(shè)備屬性”查邢、“設(shè)備狀態(tài)”和“設(shè)備孿生”的概念,那么初學(xué)者很容易混淆Twin Module和Device Module這兩個子模塊的功能職責(zé)酵幕。事實上扰藕,我不太明白如此設(shè)計的原因是什么,總覺得這部分的設(shè)計稍有欠妥)
此外芳撒,DeviceTwin模塊會在本地數(shù)據(jù)庫(SQLite)中創(chuàng)建3張表邓深,即Device Table、Device Attribute Table和Device Twin Table笔刹,分別記錄設(shè)備的基本信息芥备、設(shè)備的屬性信息和設(shè)備孿生信息。表的具體字段設(shè)計詳見這里舌菜。
源碼分析
本節(jié)從源碼出發(fā)萌壳,剖析DeviceTwin模塊的內(nèi)部實現(xiàn)。為了避免篇幅冗長日月,在不影響說明的前提下袱瓮,會適當(dāng)刪減代碼。
為了方便理解爱咬,讀者需要提前了解:
從前一小節(jié)我們已經(jīng)知道尺借,DeviceTwin模塊會在本地數(shù)據(jù)庫中創(chuàng)建3張表,即Device Table台颠、Device Attribute Table和Device Twin Table褐望,分別記錄設(shè)備的基本信息、設(shè)備的屬性信息和設(shè)備孿生信息串前。這里需要注意,Device Table中存儲的Device并不是云端基于Kubernetes CRD機制定義的Device API实蔽,而是重新定義了一個名字同為Device的結(jié)構(gòu)體荡碾,用以存儲標(biāo)準(zhǔn)Device API中的部分信息。
當(dāng)前局装,CloudHub與EdgeHub中傳遞的設(shè)備并不是完整的Device API坛吁,而是另外重新定義了一個同名的結(jié)構(gòu)體,用于云邊的設(shè)備信息同步铐尚。具體如下所示:
(1)基于Kubernetes CRD機制定義的Device API拨脉,位于cloud/pkg/apis/devices/v1alpha2/device_instance_types.go:
(2)用于云邊消息同步的Device結(jié)構(gòu)體,分別位于cloud/pkg/devicecontroller/types/device.go和edge/pkg/devicetwin/dttype/types.go:
//Device the struct of device
type Device struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
State string `json:"state,omitempty"`
LastOnline string `json:"last_online,omitempty"`
Attributes map[string]*MsgAttr `json:"attributes,omitempty"`
Twin map[string]*MsgTwin `json:"twin,omitempty"`
}
//MsgAttr the struct of device attr
type MsgAttr struct {
Value string `json:"value"`
Optional *bool `json:"optional,omitempty"`
Metadata *TypeMetadata `json:"metadata,omitempty"`
}
//MsgTwin the struct of device twin
type MsgTwin struct {
Expected *TwinValue `json:"expected,omitempty"`
Actual *TwinValue `json:"actual,omitempty"`
Optional *bool `json:"optional,omitempty"`
Metadata *TypeMetadata `json:"metadata,omitempty"`
ExpectedVersion *TwinVersion `json:"expected_version,omitempty"`
ActualVersion *TwinVersion `json:"actual_version,omitempty"`
}
- 對于邊緣側(cè)存儲的Device Table(對應(yīng)Device結(jié)構(gòu)體)宣增、Device Attribute Table(對應(yīng)MsgAttr結(jié)構(gòu)體)和Device Twin Table(對應(yīng)MsgTwin結(jié)構(gòu)體)玫膀,可以理解為,它們將Device CRD中的字段拆開并分別存儲爹脾。目前帖旨,對Device這一自定義資源的處理方式與對內(nèi)置資源(比如Pod箕昭、Service)的處理方式不同,后者將一個完整的API對象存儲在名為Meta的表中解阅。(至于為何這樣設(shè)計落竹,而不統(tǒng)一起來,原因不得而知)
- 另外需要指出的是货抄,在當(dāng)前的Device CRD設(shè)計中述召,并不包含State(表示設(shè)備的狀態(tài),即“online”蟹地、“offline”和“unknown”)和LastOnline(設(shè)備的最后在線時間戳)這兩個字段积暖;而在云與邊的設(shè)備信息同步中(即Device結(jié)構(gòu)體)卻又包含了這兩個字段。后期是否要考慮把這兩個字段加入到Device CRD中呢锈津?(此處存疑)
云邊通信(即CloudHub與EdgeHub之間)的消息結(jié)構(gòu)體是model.Message呀酸,如下所示:
// Message struct
type Message struct {
Header MessageHeader `json:"header"`
Router MessageRoute `json:"route,omitempty"`
Content interface{} `json:"content"`
}
但在DeviceTwin模塊中,傳輸?shù)南⒔Y(jié)構(gòu)體為DTMessage琼梆,如下所示:
//DTMessage the struct of message for communicating between cloud and edge
type DTMessage struct {
Msg *model.Message
Identity string //表示節(jié)點ID
Action string //代表該條消息執(zhí)行的動作性誉,根據(jù)不同的Action將消息轉(zhuǎn)發(fā)至對應(yīng)的DeviceTwin子模塊
Type string //這個貌似也沒啥用
}
代碼入口
代碼入口:edge/pkg/devicetwin/devicetwin.go
// Start run the module
func (dt *DeviceTwin) Start() {
dtContexts, _ := dtcontext.InitDTContext()
dt.DTContexts = dtContexts
err := SyncSqlite(dt.DTContexts) //將存儲在本地數(shù)據(jù)庫中的device信息載入DTContexts
...
dt.runDeviceTwin() //運行各個子模塊
}
可以看到,DeviceTwin模塊的啟動主要做了兩件事情茎杂,一是將存儲在本地數(shù)據(jù)庫(sqlite)中的設(shè)備信息載入至DTContexts中错览;二是運行各個子模塊。
- SyncSqlite()負(fù)責(zé)同步設(shè)備元數(shù)據(jù)煌往,即從本地數(shù)據(jù)庫中查詢所有設(shè)備倾哺,并將這些設(shè)備載入至DTContext中。DTContext是一個關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)刽脖,負(fù)責(zé)存儲在DeviceTwin模塊中的上下文信息羞海。具體在之后的討論涉及。
// SyncSqlite sync sqlite
func SyncSqlite(context *dtcontext.DTContext) error {
...
rows, queryErr := dtclient.QueryDeviceAll() //從本地數(shù)據(jù)庫中查詢所有設(shè)備
...
for _, device := range *rows {
err := SyncDeviceFromSqlite(context, device.ID) //將設(shè)備保存至DTContext中
...
}
return nil
}
-
runDeviceTwin()則包含3個方面的內(nèi)容曲管,分別是 1)注冊并啟動子模塊却邓;2)向子模塊分發(fā)消息;3)健康檢查院水。
消息轉(zhuǎn)發(fā)
函數(shù)入口:edge/pkg/devicetwin/process.go
具體來看classifyMsg()這一方法腊徙,搞懂它至關(guān)重要。
首先檬某,會根據(jù)消息源(msg.Router.Source)判斷進入哪個分支全蝶。比如裂垦,設(shè)備的信息是由云端的devicecontroller組件發(fā)送過來的限番,因此msg.Router.Resource就等于devicecontroller弓熏。緊接著,會根據(jù)消息體所傳送的資源類型(msg.Router.Resource)進一步分類,從而確定對應(yīng)的Action是什么饰潜,最后根據(jù)不同的Action將消息轉(zhuǎn)發(fā)至不同的子模塊中初坠。
我們以云端DeviceController與邊緣端通信作為示例,加以說明彭雾。(DeviceController分析詳見xx)
假設(shè)云端新增一個設(shè)備碟刺,就會構(gòu)造這樣一條消息,并發(fā)送至邊緣端薯酝。
//cloud/pkg/devicecontroller/controller/downstream.go -> deviceAdded()
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
msg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
msg.Content = xxx //這里存放消息傳遞的有效數(shù)據(jù)
其中半沽,device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0]
表示在創(chuàng)建設(shè)備實例時通過NodeSelector所綁定的邊緣節(jié)點,假設(shè)為”edgenode_1“吴菠,那么者填,就會構(gòu)造出:
resource = "node/edgenode_1/membership"
而msg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
則定義了消息的路由信息,如下:
msg.Router.Source = "devicecontroller" //表示消息源
msg.Router.Group = "twin" //表示消息發(fā)送的組別
msg.Router.Resource = "node/edgenode_1/membership" //表示消息的資源類型做葵,也表示topic類型
msg.Router.Operation = "update"
基于此占哟,消息的路由信息就很明確了。我們再回過頭看上邊的classifyMsg()方法酿矢,不難發(fā)現(xiàn)榨乎,這條消息會進入到「標(biāo)記1」所處的分支,從而標(biāo)記該條消息對應(yīng)的Action是dtcommon.MemUpdated瘫筐。由于在ActionModuleMap中已經(jīng)定義了Action與子模塊之間的映射關(guān)系蜜暑,因此,接下來就會根據(jù)這條消息的Action轉(zhuǎn)發(fā)至對應(yīng)的子模塊策肝,然后由那個子模塊來執(zhí)行具體的邏輯肛捍。
子模塊邏輯
本節(jié)重點分析四個子模塊的具體執(zhí)行邏輯。
membership
membership模塊實際上就是負(fù)責(zé)管理邊緣節(jié)點上的設(shè)備之众。當(dāng)有新增設(shè)備或刪除設(shè)備拙毫,都是由這一組件負(fù)責(zé)處理。
membership模塊的啟動邏輯非常簡單棺禾。首先恬偷,它會初始化消息的Action與回調(diào)函數(shù)的映射關(guān)系,即initMemActionCallBack()帘睦;隨后,根據(jù)接收到的消息的Action調(diào)用不同的回調(diào)函數(shù)坦康,也就是去執(zhí)行具體的動作竣付。同時,還需要收發(fā)心跳信息滞欠。這里主要關(guān)注該模塊可執(zhí)行哪些動作:
- dealMembershipGet:該方法主要用于獲取內(nèi)存中(即DTContext)存儲的設(shè)備信息古胆。主要負(fù)責(zé)與eventbus組件的交互,當(dāng)eventbus組件訂閱了指定的topic后,就是通過該方法獲取到設(shè)備信息的逸绎。
-
dealMembershipUpdate:該方法負(fù)責(zé)更新邊緣節(jié)點的設(shè)備信息惹恃。比如云端控制面新增或刪除了一個設(shè)備,會下發(fā)更新信息棺牧,最終就是通過這個方法來執(zhí)行的巫糙。并且在更新完成后,會把最新的結(jié)果發(fā)送給eventbus颊乘。
-
dealMembershipDetail:獲取一個device 的詳細(xì)信息参淹。
這里需要再強調(diào)一下!前面已經(jīng)提過乏悄,邊緣端拿到的Device浙值,并不是云端基于Kubernetes CRD機制定義的Device API。為了獲取到設(shè)備基本信息檩小,設(shè)備屬性信息和設(shè)備孿生信息开呐,又額外定義了許多的結(jié)構(gòu)體用于存儲這些信息,代碼中有很多序列化與反序列的操作规求。不要被繞暈了筐付。
twin
twin模塊主要負(fù)責(zé)所有設(shè)備孿生相關(guān)的操作。比如颓哮,設(shè)備孿生更新(device twin update)家妆、設(shè)備孿生獲取(device twin get)和設(shè)備孿生同步至云端(device twin sync-to-cloud)冕茅。該模塊的執(zhí)行邏輯與membership模塊的邏輯是一模一樣的伤极,我們主要看一下它支持執(zhí)行哪些動作:
-
dealTwinUpdate:該方法更新指定設(shè)備的設(shè)備孿生信息。更新信息可以來自云端(比如云端控制面想要改變等的開/關(guān)狀態(tài))姨伤,通過edgehub接收到哨坪;也可以來自MQTT,通過eventbus接收到(mapper會在devicetwin update topic上發(fā)布信息)乍楚。
-
dealTwinGet:獲取指定設(shè)備的設(shè)備孿生信息当编。該方法負(fù)責(zé)與eventbus交互。
-
dealTwinSync:該方法負(fù)責(zé)將設(shè)備孿生信息同步至云端徒溪。
device
device 模塊主要負(fù)責(zé)執(zhí)行設(shè)備相關(guān)的操作忿偷,比如處理設(shè)備狀態(tài)更新和設(shè)備屬性更新。該模塊可執(zhí)行的動作有如下幾個:
-
dealDeviceAttrUpdate:更新設(shè)備的屬性臊泌。設(shè)備屬性的更新由云端發(fā)起鲤桥。不過就目前(2021-01-04)來看,這個方法還沒用起來渠概,云端devicecontroller的downstream部分并沒有看到更新設(shè)備屬性的操作茶凳。
-
dealDeviceStateUpdate:更新設(shè)備的狀態(tài)(state)嫂拴、最后一次在線時間(last online)。設(shè)備狀態(tài)的更新是由eventbus主動發(fā)起的贮喧。更新后筒狠,會把更新結(jié)果同步至云端和邊緣端的eventbus。
Communication
模塊主要負(fù)責(zé)DeviceTwin和其他組件的通信箱沦。該模塊可執(zhí)行的動作有如下幾個:
- dealSendToCloud:用于向云端發(fā)送數(shù)據(jù)辩恼。在該函數(shù)內(nèi)部,首先確保邊緣側(cè)與云端的連接是否正常饱普,隨后向edgehub發(fā)送消息(基于beehive框架)运挫,再由edgehub向云端發(fā)送。
- dealSendToEdge:內(nèi)部調(diào)用beehiveContext.Send()方法套耕,直接向eventbus組件發(fā)送消息谁帕。
- dealLifeCycle: 負(fù)責(zé)檢查邊緣與云端的狀態(tài)是否正常連接,如果正常連接冯袍,則設(shè)置DTContext.state的狀態(tài)為connected匈挖;反之則標(biāo)志其為unconnected 。
- dealConfirm: 用于處理消息的確認(rèn)康愤。源碼中我看到只有在這里使用到了儡循。
總結(jié)
DeviceTwin模塊源碼整理如下: