KubeEdge分析-reliablesync

KubeEdge分析-reliablesyncs

前言

reliablesyncs是KubeEdge v1.2加入的,主要是用來(lái)實(shí)現(xiàn)cloudcore的高可用,防止cloudcore故障切換的時(shí)候,丟失數(shù)據(jù)。

design見(jiàn) https://github.com/kubeedge/kubeedge/blob/master/docs/proposals/reliable-message-delivery.md

從設(shè)計(jì)文檔看,應(yīng)該是要保證消息的“At-Least-Once”,不過(guò)從代碼實(shí)現(xiàn)上看警绩,目前應(yīng)該是只完成了一部分,還沒(méi)實(shí)現(xiàn)完整盅称,比如ack機(jī)制還沒(méi)做肩祥。


reliablemessage-workflow.png

CRD

reliablesyncs引入了2個(gè)新的crd:

  • ClusterObjectSync
  • ObjectSync
    從代碼中看,目前只實(shí)現(xiàn)了ObjectSync缩膝,ClusterObjectSync是沒(méi)有實(shí)現(xiàn)的混狠,所以下面主要看ObjectSync

這兩個(gè)crd的內(nèi)容比較簡(jiǎn)單,只有objectType逞盆、objectName以及objectResourceVersion3個(gè)字段

入口

從代碼中看檀蹋,有幾個(gè)地方都有一部分reliablesyncs相關(guān)的代碼,分別在cloudhub、synccontroller下面

cloudhub

cloudhub的start方法初始化了一個(gè)新的newObjectSyncController俯逾,然后調(diào)用WaitForCacheSync方法等到cache sync完成贸桶,

Start

newObjectSyncController

func newObjectSyncController() *hubconfig.ObjectSyncController {
    config, err := buildConfig()
    if err != nil {
        klog.Errorf("Failed to build config, err: %v", err)
        os.Exit(1)
    }

    crdClient := versioned.NewForConfigOrDie(config)
    crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)

    clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
    objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()

    sc := &hubconfig.ObjectSyncController{
        CrdClient: crdClient,

        ClusterObjectSyncInformer: clusterObjectSyncInformer,
        ObjectSyncInformer:        objectSyncInformer,

        ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
        ObjectSyncSynced:        objectSyncInformer.Informer().HasSynced,

        ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
        ObjectSyncLister:        objectSyncInformer.Lister(),
    }

    go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
    go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done())

    return sc
}

首先初始化了一個(gè)crdclient和crdFactory
crdclient是實(shí)現(xiàn)了go-client的rest.Interface的一組client的集合(封裝到了Clientset這個(gè)結(jié)構(gòu)體中)
Clientset包含了devicesV1alpha1、reliablesyncsV1alpha1和DiscoveryClient這三個(gè)client桌肴。每個(gè)client在rest client基礎(chǔ)上新加了一些接口皇筛;
crdFactory是一個(gè)配置了resync時(shí)間為0的sharedInformerFactory(為0應(yīng)該就是不自動(dòng)做resync,這里也可以理解坠七,畢竟整個(gè)object就是用來(lái)做sync用的水醋,再讓go-client自動(dòng)做resync就顯得有點(diǎn)多余了)

下面這些代碼都是client-go生成的,簡(jiǎn)單看下

${SCRIPT_ROOT}/cloud/hack/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/kubeedge/kubeedge/cloud/pkg/client github.com/kubeedge/kubeedge/cloud/pkg/apis \
"devices:v1alpha1 reliablesyncs:v1alpha1" \
--go-header-file ${SCRIPT_ROOT}/cloud/hack/boilerplate/boilerplate.txt
type sharedInformerFactory struct {
    client           versioned.Interface
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration

    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    startedInformers map[reflect.Type]bool
}

這里再看下sharedInformerFactory彪置,這里基本就是go-client定義的標(biāo)準(zhǔn)結(jié)構(gòu)拄踪,sharedInformerFactory又實(shí)現(xiàn)了Devices()和Reliablesyncs()這兩個(gè)方法。

Devices()拳魁、Reliablesyncs()分別調(diào)用了New方法

type version struct {
    factory          internalinterfaces.SharedInformerFactory
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
    return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}


// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer

// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

// TweakListOptionsFunc is a function that transforms a v1.ListOptions.
type TweakListOptionsFunc func(*v1.ListOptions)

這里流程還是比較長(zhǎng)的惶桐,這里先記錄下,暫時(shí)先不仔細(xì)看了潘懊。

接著初始化了clusterObjectSyncInformer和objectSyncInformer這兩個(gè)Informer

// Interface provides access to all the informers in this group version.
type Interface interface {
    // ClusterObjectSyncs returns a ClusterObjectSyncInformer.
    ClusterObjectSyncs() ClusterObjectSyncInformer
    // ObjectSyncs returns a ObjectSyncInformer.
    ObjectSyncs() ObjectSyncInformer
}

type version struct {
    factory          internalinterfaces.SharedInformerFactory
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
    return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// ClusterObjectSyncs returns a ClusterObjectSyncInformer.
func (v *version) ClusterObjectSyncs() ClusterObjectSyncInformer {
    return &clusterObjectSyncInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}
}

// ObjectSyncs returns a ObjectSyncInformer.
func (v *version) ObjectSyncs() ObjectSyncInformer {
    return &objectSyncInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

上面這些方法應(yīng)該都是client-go生成的姚糊。

接著把crdClient、ClusterObjectSyncInformer授舟、objectSyncInformer等都保存到ObjectSyncController這個(gè)對(duì)象中救恨,以便后續(xù)使用。
然后調(diào)用了Informer().Run方法释树,這里詳細(xì)看下

Informer().Run

Informer()返回的是cache.SharedIndexInformer對(duì)象肠槽,這個(gè)對(duì)象是定義在client-go中的,run方法就是啟動(dòng)這個(gè)informer躏哩。
run方法的結(jié)束條件是beehiveContext.Done()

WaitForCacheSync

回到Start方法中,調(diào)用cache.WaitForCacheSync方法

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the controller should shutdown
// callers should prefer WaitForNamedCacheSync()
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    err := wait.PollImmediateUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        klog.V(2).Infof("stop requested")
        return false
    }

    klog.V(4).Infof("caches populated")
    return true
}

這里實(shí)際上就是分別調(diào)用參數(shù)列表中傳入的方法ClusterObjectSyncSynced和ObjectSyncSynced署浩,直到這些方法都返回true揉燃。
而ClusterObjectSyncSynced在之前的newObjectSyncController中被初始化為clusterObjectSyncInformer.Informer().HasSynced

func (f *clusterObjectSyncInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)
}

所以這里最終調(diào)用到的是internalinterfaces.SharedInformerFactory的InformerFor方法扫尺。

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

這里是理解整個(gè)Informer初始化的關(guān)鍵了,InformerFor方法傳入了2個(gè)參數(shù)炊汤,一個(gè)是要被watch的對(duì)象正驻,另一個(gè)是創(chuàng)建informer的方法。
比如這里要被watch的是ClusterObjectSync對(duì)象抢腐,NewInformerFunc傳入的是defaultInformer

// NewFilteredClusterObjectSyncInformer constructs a new informer for ClusterObjectSync type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredClusterObjectSyncInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().List(options)
            },
            WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.ReliablesyncsV1alpha1().ClusterObjectSyncs().Watch(options)
            },
        },
        &reliablesyncsv1alpha1.ClusterObjectSync{},
        resyncPeriod,
        indexers,
    )
}

這里可以看到創(chuàng)建NewSharedIndexInformer主要就是創(chuàng)建ListFunc和WatchFunc兩個(gè)方法姑曙,而這兩個(gè)方法又調(diào)用了client.DevicesV1alpha1().Devices(namespace).List(options)方法。

再回顧一下迈倍,WaitForCacheSync-->ClusterObjectSyncSynced-->clusterObjectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ClusterObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredClusterObjectSyncInformer

所以對(duì)ClusterObjectSyncSynced來(lái)說(shuō)伤靠,實(shí)際是對(duì)ClusterObjectSyncs進(jìn)行watch。
對(duì)ObjectSyncSynced來(lái)說(shuō)啼染,則對(duì)ObjectSync進(jìn)行watch宴合。
WaitForCacheSync-->ObjectSyncSynced-->objectSyncInformer.Informer().HasSynced-->f.factory.InformerFor(&reliablesyncsv1alpha1.ObjectSync{}, f.defaultInformer)-->defaultInformer-->NewFilteredObjectSyncInformer

回到Start中

所以在cloudhub的start中焕梅,其實(shí)就是對(duì)ClusterObjectSync和ObjectSync這兩類(lèi)對(duì)象進(jìn)行同步。
接著又初始化了一個(gè)ChannelMessageQueue對(duì)象卦洽,這個(gè)對(duì)象中保存了ObjectSyncController對(duì)象(這個(gè)操作有點(diǎn)奇怪贞言,后面再分析他的用途)

然后調(diào)用ChannelMessageQueue的DispatchMessage方法(這個(gè)方法本身是用于將cloud的消息分發(fā)給edge的,這里多了一個(gè)ObjectSyncController對(duì)象以后就要看下他在消息分發(fā)過(guò)程中的作用)

DispatchMessage

DispatchMessage-->addMessageToQueue-->BuildObjectSyncName

    if !isDeleteMessage(msg) {
        // If the message doesn't exist in the store, then compare it with
        // the version stored in the database
        if !exist {
            resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
            resourceUID, err := GetMessageUID(*msg)
            if err != nil {
                klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
                return
            }

            objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
            if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion {
                return
            }
        }

這里終于看到了ObjectSyncController的出現(xiàn)阀蒂,這里實(shí)際就是對(duì)比了一下objectSync對(duì)象的版本和msg中的版本该窗,如果objectsync中的版本更高,這里就不處理了(說(shuō)明已經(jīng)同步了更高的版本)

saveSuccessPoint

在啟動(dòng)websocket server的時(shí)候蚤霞,注冊(cè)了新連接的回調(diào)方法OnRegister酗失,

// OnRegister register node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
    nodeID := connection.ConnectionState().Headers.Get("node_id")
    projectID := connection.ConnectionState().Headers.Get("project_id")

    if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
        mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
    }

    io := &hubio.JSONIO{Connection: connection}
    go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}

OnRegister從連接信息中取出node_id和project_id相關(guān)信息,然后調(diào)用ServeConn方法昧绣。

ServeConn-->handler->(InitHandler)-->ListMessageWriteLoop-->handleMessage-->sendMsg-->saveSuccessPoint
                                  | 
                                  |->MessageWriteLoop-->

ServeConn最終調(diào)用到了saveSuccessPoint方法级零,

// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
    nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
    if err != nil {
        klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
        stopServe <- messageQueueDisconnect
        return
    }
    nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
    if err != nil {
        klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
        stopServe <- messageQueueDisconnect
        return
    }

    for {
        select {
        case <-stopSendMsg:
            klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
            return
        default:
            mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
        }
    }
}

HubInfo只包含了projectID和nodeID兩個(gè)信息,MessageWriteLoop根據(jù)nodeid滞乙,從MessageQueue中取出緩存的消息奏纪,然后交給mh.handleMessage處理。
要注意這里的消息應(yīng)該是只有云發(fā)送給邊的消息(只有DispatchMessage方法網(wǎng)MessageQueue中寫(xiě)數(shù)據(jù))斩启。

func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
    if msg.GetGroup() == edgeconst.GroupResource {
        resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
        resourceName, _ := edgemessagelayer.GetResourceName(*msg)
        resourceType, _ := edgemessagelayer.GetResourceType(*msg)
        resourceUID, err := channelq.GetMessageUID(*msg)
        if err != nil {
            return
        }

        objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID)

        if msg.GetOperation() == beehiveModel.DeleteOperation {
            nodeStore.Delete(msg)
            mh.deleteSuccessPoint(resourceNamespace, objectSyncName)
            return
        }

        objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
        if err == nil {
            objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
            _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync)
            if err != nil {
                klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s",
                    err, resourceType, resourceNamespace, resourceName)
            }
        } else if err != nil && apierrors.IsNotFound(err) {
            objectSync := &v1alpha1.ObjectSync{
                ObjectMeta: metav1.ObjectMeta{
                    Name: objectSyncName,
                },
                Spec: v1alpha1.ObjectSyncSpec{
                    ObjectAPIVersion: "",
                    ObjectKind:       resourceType,
                    ObjectName:       resourceName,
                },
            }
            _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync)
            if err != nil {
                klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err)
                return
            }

            objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
            if err != nil {
                klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err)
            }
            objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
            mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)
        }
    }

    // TODO: save device info
    if msg.GetGroup() == deviceconst.GroupTwin {
    }
    klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource())
}

從之前的分析看序调,saveSuccessPoint也是只用在云端往邊緣發(fā)消息這個(gè)場(chǎng)景下。

首先要判斷msg的Group兔簇,對(duì)于K8S本身的資源发绢,Group是"resource",也就是這里要處理的垄琐。
對(duì)于device相關(guān)的msg边酒,group是"twin",目前還沒(méi)處理。

然后根據(jù)NodeID和resourceUID創(chuàng)建一個(gè)objectSyncName狸窘,這個(gè)objectSyncName是各類(lèi)和objectSync相關(guān)的存儲(chǔ)的主鍵墩朦。

對(duì)于刪除操作,從nodeStore中刪除msg翻擒,并刪除以objectSyncName為key的SuccessPoint

然后以objectSyncName為key向k8s查詢Object對(duì)象

  • 如果可以查到氓涣,則更新ObjectSync對(duì)象
  • 如果查詢不到,則創(chuàng)建一個(gè)新的ObjectSync對(duì)象

新的ObjectSync對(duì)象包含了objectSyncName陋气、resourceType劳吠、resourceName。創(chuàng)建成功后巩趁,再根據(jù)名字查詢出objectSync對(duì)象痒玩,然后將objectSyncStatus.Status.ObjectResourceVersion更新為msg中的resourceVersion(這里為啥要分兩步,而不是在創(chuàng)建的時(shí)候就把ResourceVersion放進(jìn)去?)

小結(jié)

從整個(gè)流程看蠢古,ObjectSync就是將云端需要發(fā)送給邊緣端的K8S原生資源的操作燃观,保存到ObjectSync對(duì)象中(ObjectSync對(duì)象本身也是存到K8S中的)

在DispatchMessage的時(shí)候,首先判斷objectSync對(duì)象的版本和msg中的版本便瑟,如果objectsync中的版本更高缆毁,這個(gè)操作就會(huì)被丟棄,因?yàn)橐呀?jīng)同步了更高的版本到涂。

這里需要注意的是脊框,上述方式只能保證云發(fā)送到邊不會(huì)出現(xiàn)重復(fù)發(fā)送或者亂序發(fā)送,但并不能保證這個(gè)操作被發(fā)送到邊緣践啄,并被正確執(zhí)行浇雹。(這里不確定是否會(huì)有bug,比如objectsync中保存了操作屿讽,但是edge節(jié)點(diǎn)在這個(gè)時(shí)候down了昭灵,那么從現(xiàn)在的邏輯看,如果沒(méi)有后續(xù)對(duì)這個(gè)資源的操作伐谈,那這個(gè)資源在edge節(jié)點(diǎn)恢復(fù)后烂完,也不會(huì)被同步)

synccontroller

Register

Register方法中,創(chuàng)建了一個(gè)synccontroller

func Register(ec *configv1alpha1.SyncController, kubeAPIConfig *configv1alpha1.KubeAPIConfig) {
    config.InitConfigure(ec, kubeAPIConfig)
    core.Register(newSyncController(ec.Enable))
}

func newSyncController(enable bool) *SyncController {
    config, err := buildConfig()
    if err != nil {
        klog.Errorf("Failed to build config, err: %v", err)
        os.Exit(1)
    }
    kubeClient := kubernetes.NewForConfigOrDie(config)
    crdClient := versioned.NewForConfigOrDie(config)

    kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)
    crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)

    podInformer := kubeSharedInformers.Core().V1().Pods()
    configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps()
    ...

    sctl := &SyncController{
        enable: enable,

        podInformer:               podInformer,
        configMapInformer:         configMapInformer,
        ...

        podSynced:               podInformer.Informer().HasSynced,
        configMapSynced:         configMapInformer.Informer().HasSynced,
        ...

        podLister:               podInformer.Lister(),
        configMapLister:         configMapInformer.Lister(),
        ...
    }

    return sctl
}

newSyncController初始化了一堆informer诵棵,并將它們保存了起來(lái)抠蚣。

Start

start方法分別讓各種informer都run起來(lái),并等他們完成同步履澳。接著啟動(dòng)一個(gè)go協(xié)程

go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())

這里就是每5秒運(yùn)行一次reconcile()方法直到整個(gè)程序退出嘶窄。

reconcile

func (sctl *SyncController) reconcile() {
    allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err)
    }
    sctl.manageClusterObjectSync(allClusterObjectSyncs)

    allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the ObjectSyncs: %v", err)
    }
    sctl.manageObjectSync(allObjectSyncs)

    sctl.manageCreateFailedObject()
}

從代碼中看,manageClusterObjectSync是空的方法距贷,應(yīng)該是為了以后多租戶做準(zhǔn)備的柄冲。
manageObjectSync是namespace范圍的,包括Pod忠蝗、configmap现横、secret、service什湘、endpoint的同步

// Compare the namespace scope objects that have been persisted to the edge with the namespace scope objects in K8s,
// and generate update and delete events to the edge
func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) {
    for _, sync := range syncs {
        switch sync.Spec.ObjectKind {
        case model.ResourceTypePod:
            sctl.managePod(sync)
        case model.ResourceTypeConfigmap:
            sctl.manageConfigMap(sync)
        case model.ResourceTypeSecret:
            sctl.manageSecret(sync)
        case commonconst.ResourceTypeService:
            sctl.manageService(sync)
        case commonconst.ResourceTypeEndpoints:
            sctl.manageEndpoint(sync)
        // TODO: add device here
        default:
            klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
        }
    }
}

managePod

func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) {
    pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)

    nodeName := getNodeName(sync.Name)

    if err != nil {
        if apierrors.IsNotFound(err) {
            pod = &v1.Pod{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      sync.Spec.ObjectName,
                    Namespace: sync.Namespace,
                    UID:       types.UID(getObjectUID(sync.Name)),
                },
            }
        } else {
            klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err)
            return
        }
    }
    sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod)
}

先通過(guò)Informer查詢pod對(duì)象以及node的名字长赞,然后調(diào)用sendEvents

sendEvents

func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string,
    objectResourceVersion string, obj interface{}) {
    if err != nil && apierrors.IsNotFound(err) {
        //trigger the delete event
        klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName)
        msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj)
        beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
        return
    }

    if sync.Status.ObjectResourceVersion == "" {
        klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name)
        return
    }

    if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 {
        // trigger the update event
        klog.V(4).Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion)
        msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj)
        beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
    }
}

這里就是3個(gè)判斷晦攒,

  • 如果對(duì)象被刪除了闽撤,那么向Edge節(jié)點(diǎn)發(fā)送DeleteOperation(通過(guò)cloudhub來(lái)發(fā)送)
  • 如果對(duì)象沒(méi)有取到版本,則報(bào)錯(cuò)脯颜,不處理(update resourceversion失敗會(huì)導(dǎo)致這里是空的)
  • 如果object的版本比已經(jīng)sync的版本高哟旗,,那么發(fā)送UpdateOperation操作

manageCreateFailedObject

manageCreateFailedObject又調(diào)用了manageCreateFailedCoreObject(device object暫未處理)

manageCreateFailedCoreObject

func (sctl *SyncController) manageCreateFailedCoreObject() {
    allPods, err := sctl.podLister.List(labels.Everything())
    if err != nil {
        klog.Errorf("Filed to list all the pods: %v", err)
        return
    }

    set := labels.Set{edgemgr.NodeRoleKey: edgemgr.NodeRoleValue}
    selector := labels.SelectorFromSet(set)
    allEdgeNodes, err := sctl.nodeLister.List(selector)
    if err != nil {
        klog.Errorf("Filed to list all the edge nodes: %v", err)
        return
    }

    for _, pod := range allPods {
        if !isFromEdgeNode(allEdgeNodes, pod.Spec.NodeName) {
            continue
        }
        // Check whether the pod is successfully persisted to edge
        _, err := sctl.objectSyncLister.ObjectSyncs(pod.Namespace).Get(BuildObjectSyncName(pod.Spec.NodeName, string(pod.UID)))
        if err != nil && apierrors.IsNotFound(err) {
            msg := buildEdgeControllerMessage(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name, model.InsertOperation, pod)
            beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
        }

        // TODO: add send check for service and endpoint

這里就是先查出要在edge運(yùn)行的pod,然后看下這個(gè)pod是否在ObjectSync中闸餐,如果不在饱亮,就認(rèn)為沒(méi)發(fā)送成功,重新向邊緣同步一次消息舍沙。

小結(jié)

這里可以看到近上,SyncController就是起了一個(gè)定時(shí)任務(wù),來(lái)判斷K8S中的資源版本和objectSync中的資源版本拂铡,如果K8S中的版本大壹无,則同步到邊緣節(jié)點(diǎn)。
如果objectSync中沒(méi)有對(duì)象感帅,也同步一次斗锭。
不過(guò)這里還是有之前說(shuō)的問(wèn)題,如果ObjectSync成功了失球,邊緣節(jié)點(diǎn)沒(méi)成功岖是,如何處理。

總結(jié)

reliable sync通過(guò)將cloud下發(fā)給edge的消息持久化在objectsync這個(gè)CRD中实苞,來(lái)保證消息至少被同步一次到邊緣節(jié)點(diǎn)豺撑。

但是目前整個(gè)流程還沒(méi)完整的實(shí)現(xiàn),從當(dāng)前的代碼看黔牵,目前只完成了K8S原生resource在CRD中的保存前硫,ACK機(jī)制以及device等CRD的reliable sync還未完成。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末荧止,一起剝皮案震驚了整個(gè)濱河市屹电,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌跃巡,老刑警劉巖危号,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異素邪,居然都是意外死亡外莲,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)兔朦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)偷线,“玉大人,你說(shuō)我怎么就攤上這事沽甥∩睿” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵摆舟,是天一觀的道長(zhǎng)亥曹。 經(jīng)常有香客問(wèn)我邓了,道長(zhǎng),這世上最難降的妖魔是什么媳瞪? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任骗炉,我火速辦了婚禮,結(jié)果婚禮上蛇受,老公的妹妹穿的比我還像新娘句葵。我一直安慰自己,他們只是感情好兢仰,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布笼呆。 她就那樣靜靜地躺著,像睡著了一般旨别。 火紅的嫁衣襯著肌膚如雪诗赌。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天秸弛,我揣著相機(jī)與錄音铭若,去河邊找鬼。 笑死递览,一個(gè)胖子當(dāng)著我的面吹牛叼屠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绞铃,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼镜雨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了儿捧?” 一聲冷哼從身側(cè)響起荚坞,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎菲盾,沒(méi)想到半個(gè)月后颓影,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡懒鉴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年诡挂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片临谱。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡璃俗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出悉默,到底是詐尸還是另有隱情城豁,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布麦牺,位于F島的核電站钮蛛,受9級(jí)特大地震影響鞭缭,放射性物質(zhì)發(fā)生泄漏剖膳。R本人自食惡果不足惜魏颓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望吱晒。 院中可真熱鬧甸饱,春花似錦、人聲如沸仑濒。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)墩瞳。三九已至驼壶,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間喉酌,已是汗流浹背热凹。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留泪电,地道東北人般妙。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像相速,于是被迫代替她去往敵國(guó)和親碟渺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349