KubeEdge分析-reliablesyncs
前言
reliablesyncs是KubeEdge v1.2加入的,主要是用來(lái)實(shí)現(xiàn)cloudcore的高可用,防止cloudcore故障切換的時(shí)候,丟失數(shù)據(jù)。
design見(jiàn) https://github.com/kubeedge/kubeedge/blob/master/docs/proposals/reliable-message-delivery.md
從設(shè)計(jì)文檔看,應(yīng)該是要保證消息的“At-Least-Once”,不過(guò)從代碼實(shí)現(xiàn)上看警绩,目前應(yīng)該是只完成了一部分,還沒(méi)實(shí)現(xiàn)完整盅称,比如ack機(jī)制還沒(méi)做肩祥。
CRD
reliablesyncs引入了2個(gè)新的crd:
- ClusterObjectSync
- ObjectSync
從代碼中看,目前只實(shí)現(xiàn)了ObjectSync缩膝,ClusterObjectSync是沒(méi)有實(shí)現(xiàn)的混狠,所以下面主要看ObjectSync
這兩個(gè)crd的內(nèi)容比較簡(jiǎn)單,只有objectType逞盆、objectName以及objectResourceVersion3個(gè)字段
入口
從代碼中看檀蹋,有幾個(gè)地方都有一部分reliablesyncs相關(guān)的代碼,分別在cloudhub、synccontroller下面
cloudhub
cloudhub的start方法初始化了一個(gè)新的newObjectSyncController俯逾,然后調(diào)用WaitForCacheSync方法等到cache sync完成贸桶,
Start
newObjectSyncController
func newObjectSyncController() *hubconfig.ObjectSyncController {
config, err := buildConfig()
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)
os.Exit(1)
}
crdClient := versioned.NewForConfigOrDie(config)
crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()
sc := &hubconfig.ObjectSyncController{
CrdClient: crdClient,
ClusterObjectSyncInformer: clusterObjectSyncInformer,
ObjectSyncInformer: objectSyncInformer,
ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
ObjectSyncSynced: objectSyncInformer.Informer().HasSynced,
ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
ObjectSyncLister: objectSyncInformer.Lister(),
}
go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done())
return sc
}
首先初始化了一個(gè)crdclient和crdFactory
crdclient是實(shí)現(xiàn)了go-client的rest.Interface的一組client的集合(封裝到了Clientset這個(gè)結(jié)構(gòu)體中)
Clientset包含了devicesV1alpha1、reliablesyncsV1alpha1和DiscoveryClient這三個(gè)client桌肴。每個(gè)client在rest client基礎(chǔ)上新加了一些接口皇筛;
crdFactory是一個(gè)配置了resync時(shí)間為0的sharedInformerFactory(為0應(yīng)該就是不自動(dòng)做resync,這里也可以理解坠七,畢竟整個(gè)object就是用來(lái)做sync用的水醋,再讓go-client自動(dòng)做resync就顯得有點(diǎn)多余了)
下面這些代碼都是client-go生成的,簡(jiǎn)單看下
${SCRIPT_ROOT}/cloud/hack/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/kubeedge/kubeedge/cloud/pkg/client github.com/kubeedge/kubeedge/cloud/pkg/apis \
"devices:v1alpha1 reliablesyncs:v1alpha1" \
--go-header-file ${SCRIPT_ROOT}/cloud/hack/boilerplate/boilerplate.txt
type sharedInformerFactory struct {
client versioned.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}
這里再看下sharedInformerFactory彪置,這里基本就是go-client定義的標(biāo)準(zhǔn)結(jié)構(gòu)拄踪,sharedInformerFactory又實(shí)現(xiàn)了Devices()和Reliablesyncs()這兩個(gè)方法。
Devices()拳魁、Reliablesyncs()分別調(diào)用了New方法
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer
// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// TweakListOptionsFunc is a function that transforms a v1.ListOptions.
type TweakListOptionsFunc func(*v1.ListOptions)
這里流程還是比較長(zhǎng)的惶桐,這里先記錄下,暫時(shí)先不仔細(xì)看了潘懊。
接著初始化了clusterObjectSyncInformer和objectSyncInformer這兩個(gè)Informer
// Interface provides access to all the informers in this group version.
type Interface interface {
// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
ClusterObjectSyncs() ClusterObjectSyncInformer
// ObjectSyncs returns a ObjectSyncInformer.
ObjectSyncs() ObjectSyncInformer
}
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
func (v *version) ClusterObjectSyncs() ClusterObjectSyncInformer {
return &clusterObjectSyncInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}
// ObjectSyncs returns a ObjectSyncInformer.
func (v *version) ObjectSyncs() ObjectSyncInformer {
return &objectSyncInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
上面這些方法應(yīng)該都是client-go生成的姚糊。
接著把crdClient、ClusterObjectSyncInformer授舟、objectSyncInformer等都保存到ObjectSyncController這個(gè)對(duì)象中救恨,以便后續(xù)使用。
然后調(diào)用了Informer().Run方法释树,這里詳細(xì)看下
Informer().Run
Informer()返回的是cache.SharedIndexInformer對(duì)象肠槽,這個(gè)對(duì)象是定義在client-go中的,run方法就是啟動(dòng)這個(gè)informer躏哩。
run方法的結(jié)束條件是beehiveContext.Done()
WaitForCacheSync
回到Start方法中,調(diào)用cache.WaitForCacheSync方法
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
// callers should prefer WaitForNamedCacheSync()
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
這里實(shí)際上就是分別調(diào)用參數(shù)列表中傳入的方法ClusterObjectSyncSynced和ObjectSyncSynced署浩,直到這些方法都返回true揉燃。
而ClusterObjectSyncSynced在之前的newObjectSyncController中被初始化為clusterObjectSyncInformer.Informer().HasSynced
func (f *clusterObjectSyncInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)
}
所以這里最終調(diào)用到的是internalinterfaces.SharedInformerFactory的InformerFor方法扫尺。
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
這里是理解整個(gè)Informer初始化的關(guān)鍵了,InformerFor方法傳入了2個(gè)參數(shù)炊汤,一個(gè)是要被watch的對(duì)象正驻,另一個(gè)是創(chuàng)建informer的方法。
比如這里要被watch的是ClusterObjectSync對(duì)象抢腐,NewInformerFunc傳入的是defaultInformer
// NewFilteredClusterObjectSyncInformer constructs a new informer for ClusterObjectSync type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredClusterObjectSyncInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().Watch(options)
},
},
&reliablesyncsv1alpha1.ClusterObjectSync{},
resyncPeriod,
indexers,
)
}
這里可以看到創(chuàng)建NewSharedIndexInformer主要就是創(chuàng)建ListFunc和WatchFunc兩個(gè)方法姑曙,而這兩個(gè)方法又調(diào)用了client.DevicesV1alpha1().Devices(namespace).List(options)方法。
再回顧一下迈倍,WaitForCacheSync-->ClusterObjectSyncSynced-->clusterObjectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredClusterObjectSyncInformer
所以對(duì)ClusterObjectSyncSynced來(lái)說(shuō)伤靠,實(shí)際是對(duì)ClusterObjectSyncs進(jìn)行watch。
對(duì)ObjectSyncSynced來(lái)說(shuō)啼染,則對(duì)ObjectSync進(jìn)行watch宴合。
WaitForCacheSync-->ObjectSyncSynced-->objectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredObjectSyncInformer
回到Start中
所以在cloudhub的start中焕梅,其實(shí)就是對(duì)ClusterObjectSync和ObjectSync這兩類(lèi)對(duì)象進(jìn)行同步。
接著又初始化了一個(gè)ChannelMessageQueue對(duì)象卦洽,這個(gè)對(duì)象中保存了ObjectSyncController對(duì)象(這個(gè)操作有點(diǎn)奇怪贞言,后面再分析他的用途)
然后調(diào)用ChannelMessageQueue的DispatchMessage方法(這個(gè)方法本身是用于將cloud的消息分發(fā)給edge的,這里多了一個(gè)ObjectSyncController對(duì)象以后就要看下他在消息分發(fā)過(guò)程中的作用)
DispatchMessage
DispatchMessage-->addMessageToQueue-->BuildObjectSyncName
if !isDeleteMessage(msg) {
// If the message doesn't exist in the store, then compare it with
// the version stored in the database
if !exist {
resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
resourceUID, err := GetMessageUID(*msg)
if err != nil {
klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
return
}
objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion {
return
}
}
這里終于看到了ObjectSyncController的出現(xiàn)阀蒂,這里實(shí)際就是對(duì)比了一下objectSync對(duì)象的版本和msg中的版本该窗,如果objectsync中的版本更高,這里就不處理了(說(shuō)明已經(jīng)同步了更高的版本)
saveSuccessPoint
在啟動(dòng)websocket server的時(shí)候蚤霞,注冊(cè)了新連接的回調(diào)方法OnRegister酗失,
// OnRegister register node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")
if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
}
io := &hubio.JSONIO{Connection: connection}
go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
OnRegister從連接信息中取出node_id和project_id相關(guān)信息,然后調(diào)用ServeConn方法昧绣。
ServeConn-->handler->(InitHandler)-->ListMessageWriteLoop-->handleMessage-->sendMsg-->saveSuccessPoint
|
|->MessageWriteLoop-->
ServeConn最終調(diào)用到了saveSuccessPoint方法级零,
// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
if err != nil {
klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
stopServe <- messageQueueDisconnect
return
}
nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
if err != nil {
klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
stopServe <- messageQueueDisconnect
return
}
for {
select {
case <-stopSendMsg:
klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
return
default:
mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
}
}
}
HubInfo只包含了projectID和nodeID兩個(gè)信息,MessageWriteLoop根據(jù)nodeid滞乙,從MessageQueue中取出緩存的消息奏纪,然后交給mh.handleMessage處理。
要注意這里的消息應(yīng)該是只有云發(fā)送給邊的消息(只有DispatchMessage方法網(wǎng)MessageQueue中寫(xiě)數(shù)據(jù))斩启。
func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
if msg.GetGroup() == edgeconst.GroupResource {
resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
resourceName, _ := edgemessagelayer.GetResourceName(*msg)
resourceType, _ := edgemessagelayer.GetResourceType(*msg)
resourceUID, err := channelq.GetMessageUID(*msg)
if err != nil {
return
}
objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID)
if msg.GetOperation() == beehiveModel.DeleteOperation {
nodeStore.Delete(msg)
mh.deleteSuccessPoint(resourceNamespace, objectSyncName)
return
}
objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
if err == nil {
objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
_, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync)
if err != nil {
klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s",
err, resourceType, resourceNamespace, resourceName)
}
} else if err != nil && apierrors.IsNotFound(err) {
objectSync := &v1alpha1.ObjectSync{
ObjectMeta: metav1.ObjectMeta{
Name: objectSyncName,
},
Spec: v1alpha1.ObjectSyncSpec{
ObjectAPIVersion: "",
ObjectKind: resourceType,
ObjectName: resourceName,
},
}
_, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync)
if err != nil {
klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err)
return
}
objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err)
}
objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)
}
}
// TODO: save device info
if msg.GetGroup() == deviceconst.GroupTwin {
}
klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource())
}
從之前的分析看序调,saveSuccessPoint也是只用在云端往邊緣發(fā)消息這個(gè)場(chǎng)景下。
首先要判斷msg的Group兔簇,對(duì)于K8S本身的資源发绢,Group是"resource",也就是這里要處理的垄琐。
對(duì)于device相關(guān)的msg边酒,group是"twin",目前還沒(méi)處理。
然后根據(jù)NodeID和resourceUID創(chuàng)建一個(gè)objectSyncName狸窘,這個(gè)objectSyncName是各類(lèi)和objectSync相關(guān)的存儲(chǔ)的主鍵墩朦。
對(duì)于刪除操作,從nodeStore中刪除msg翻擒,并刪除以objectSyncName為key的SuccessPoint
然后以objectSyncName為key向k8s查詢Object對(duì)象
- 如果可以查到氓涣,則更新ObjectSync對(duì)象
- 如果查詢不到,則創(chuàng)建一個(gè)新的ObjectSync對(duì)象
新的ObjectSync對(duì)象包含了objectSyncName陋气、resourceType劳吠、resourceName。創(chuàng)建成功后巩趁,再根據(jù)名字查詢出objectSync對(duì)象痒玩,然后將objectSyncStatus.Status.ObjectResourceVersion更新為msg中的resourceVersion(這里為啥要分兩步,而不是在創(chuàng)建的時(shí)候就把ResourceVersion放進(jìn)去?)
小結(jié)
從整個(gè)流程看蠢古,ObjectSync就是將云端需要發(fā)送給邊緣端的K8S原生資源的操作燃观,保存到ObjectSync對(duì)象中(ObjectSync對(duì)象本身也是存到K8S中的)
在DispatchMessage的時(shí)候,首先判斷objectSync對(duì)象的版本和msg中的版本便瑟,如果objectsync中的版本更高缆毁,這個(gè)操作就會(huì)被丟棄,因?yàn)橐呀?jīng)同步了更高的版本到涂。
這里需要注意的是脊框,上述方式只能保證云發(fā)送到邊不會(huì)出現(xiàn)重復(fù)發(fā)送或者亂序發(fā)送,但并不能保證這個(gè)操作被發(fā)送到邊緣践啄,并被正確執(zhí)行浇雹。(這里不確定是否會(huì)有bug,比如objectsync中保存了操作屿讽,但是edge節(jié)點(diǎn)在這個(gè)時(shí)候down了昭灵,那么從現(xiàn)在的邏輯看,如果沒(méi)有后續(xù)對(duì)這個(gè)資源的操作伐谈,那這個(gè)資源在edge節(jié)點(diǎn)恢復(fù)后烂完,也不會(huì)被同步)
synccontroller
Register
Register方法中,創(chuàng)建了一個(gè)synccontroller
func Register(ec *configv1alpha1.SyncController, kubeAPIConfig *configv1alpha1.KubeAPIConfig) {
config.InitConfigure(ec, kubeAPIConfig)
core.Register(newSyncController(ec.Enable))
}
func newSyncController(enable bool) *SyncController {
config, err := buildConfig()
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)
os.Exit(1)
}
kubeClient := kubernetes.NewForConfigOrDie(config)
crdClient := versioned.NewForConfigOrDie(config)
kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)
crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
podInformer := kubeSharedInformers.Core().V1().Pods()
configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps()
...
sctl := &SyncController{
enable: enable,
podInformer: podInformer,
configMapInformer: configMapInformer,
...
podSynced: podInformer.Informer().HasSynced,
configMapSynced: configMapInformer.Informer().HasSynced,
...
podLister: podInformer.Lister(),
configMapLister: configMapInformer.Lister(),
...
}
return sctl
}
newSyncController初始化了一堆informer诵棵,并將它們保存了起來(lái)抠蚣。
Start
start方法分別讓各種informer都run起來(lái),并等他們完成同步履澳。接著啟動(dòng)一個(gè)go協(xié)程
go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())
這里就是每5秒運(yùn)行一次reconcile()方法直到整個(gè)程序退出嘶窄。
reconcile
func (sctl *SyncController) reconcile() {
allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err)
}
sctl.manageClusterObjectSync(allClusterObjectSyncs)
allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the ObjectSyncs: %v", err)
}
sctl.manageObjectSync(allObjectSyncs)
sctl.manageCreateFailedObject()
}
從代碼中看,manageClusterObjectSync是空的方法距贷,應(yīng)該是為了以后多租戶做準(zhǔn)備的柄冲。
manageObjectSync是namespace范圍的,包括Pod忠蝗、configmap现横、secret、service什湘、endpoint的同步
// Compare the namespace scope objects that have been persisted to the edge with the namespace scope objects in K8s,
// and generate update and delete events to the edge
func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) {
for _, sync := range syncs {
switch sync.Spec.ObjectKind {
case model.ResourceTypePod:
sctl.managePod(sync)
case model.ResourceTypeConfigmap:
sctl.manageConfigMap(sync)
case model.ResourceTypeSecret:
sctl.manageSecret(sync)
case commonconst.ResourceTypeService:
sctl.manageService(sync)
case commonconst.ResourceTypeEndpoints:
sctl.manageEndpoint(sync)
// TODO: add device here
default:
klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
}
}
}
managePod
func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) {
pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)
nodeName := getNodeName(sync.Name)
if err != nil {
if apierrors.IsNotFound(err) {
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: sync.Spec.ObjectName,
Namespace: sync.Namespace,
UID: types.UID(getObjectUID(sync.Name)),
},
}
} else {
klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err)
return
}
}
sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod)
}
先通過(guò)Informer查詢pod對(duì)象以及node的名字长赞,然后調(diào)用sendEvents
sendEvents
func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string,
objectResourceVersion string, obj interface{}) {
if err != nil && apierrors.IsNotFound(err) {
//trigger the delete event
klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName)
msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
return
}
if sync.Status.ObjectResourceVersion == "" {
klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name)
return
}
if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 {
// trigger the update event
klog.V(4).Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion)
msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
}
}
這里就是3個(gè)判斷晦攒,
- 如果對(duì)象被刪除了闽撤,那么向Edge節(jié)點(diǎn)發(fā)送DeleteOperation(通過(guò)cloudhub來(lái)發(fā)送)
- 如果對(duì)象沒(méi)有取到版本,則報(bào)錯(cuò)脯颜,不處理(update resourceversion失敗會(huì)導(dǎo)致這里是空的)
- 如果object的版本比已經(jīng)sync的版本高哟旗,,那么發(fā)送UpdateOperation操作
manageCreateFailedObject
manageCreateFailedObject又調(diào)用了manageCreateFailedCoreObject(device object暫未處理)
manageCreateFailedCoreObject
func (sctl *SyncController) manageCreateFailedCoreObject() {
allPods, err := sctl.podLister.List(labels.Everything())
if err != nil {
klog.Errorf("Filed to list all the pods: %v", err)
return
}
set := labels.Set{edgemgr.NodeRoleKey: edgemgr.NodeRoleValue}
selector := labels.SelectorFromSet(set)
allEdgeNodes, err := sctl.nodeLister.List(selector)
if err != nil {
klog.Errorf("Filed to list all the edge nodes: %v", err)
return
}
for _, pod := range allPods {
if !isFromEdgeNode(allEdgeNodes, pod.Spec.NodeName) {
continue
}
// Check whether the pod is successfully persisted to edge
_, err := sctl.objectSyncLister.ObjectSyncs(pod.Namespace).Get(BuildObjectSyncName(pod.Spec.NodeName, string(pod.UID)))
if err != nil && apierrors.IsNotFound(err) {
msg := buildEdgeControllerMessage(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name, model.InsertOperation, pod)
beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
}
// TODO: add send check for service and endpoint
這里就是先查出要在edge運(yùn)行的pod,然后看下這個(gè)pod是否在ObjectSync中闸餐,如果不在饱亮,就認(rèn)為沒(méi)發(fā)送成功,重新向邊緣同步一次消息舍沙。
小結(jié)
這里可以看到近上,SyncController就是起了一個(gè)定時(shí)任務(wù),來(lái)判斷K8S中的資源版本和objectSync中的資源版本拂铡,如果K8S中的版本大壹无,則同步到邊緣節(jié)點(diǎn)。
如果objectSync中沒(méi)有對(duì)象感帅,也同步一次斗锭。
不過(guò)這里還是有之前說(shuō)的問(wèn)題,如果ObjectSync成功了失球,邊緣節(jié)點(diǎn)沒(méi)成功岖是,如何處理。
總結(jié)
reliable sync通過(guò)將cloud下發(fā)給edge的消息持久化在objectsync這個(gè)CRD中实苞,來(lái)保證消息至少被同步一次到邊緣節(jié)點(diǎn)豺撑。
但是目前整個(gè)流程還沒(méi)完整的實(shí)現(xiàn),從當(dāng)前的代碼看黔牵,目前只完成了K8S原生resource在CRD中的保存前硫,ACK機(jī)制以及device等CRD的reliable sync還未完成。