KubeEdge分析-cloudcore-cloudhub

概述

cloudhub是負責云端和邊端的通信的议惰,目前支持Websocket和QUIC兩種方式妙痹。

Register

初始化配置寇荧,并在beehive中注冊最易。

Start

func (a *cloudHub) Start() {
    messageq := channelq.NewChannelMessageQueue()

    // start dispatch message from the cloud to edge node
    go messageq.DispatchMessage()

    // start the cloudhub server
    if hubconfig.Get().ProtocolWebsocket {
        // TODO delete second param  @kadisi
        go servers.StartCloudHub(api.ProtocolTypeWS, hubconfig.Get(), messageq)
    }

    if hubconfig.Get().ProtocolQuic {
        // TODO delete second param  @kadisi
        go servers.StartCloudHub(api.ProtocolTypeQuic, hubconfig.Get(), messageq)
    }

    if hubconfig.Get().ProtocolUDS {
        // The uds server is only used to communicate with csi driver from kubeedge on cloud.
        // It is not used to communicate between cloud and edge.
        go udsserver.StartServer(hubconfig.Get())
    }
}

這里就干了3件事情,DispatchMessage粒没、StartCloudHub和udsserver.StartServer筛婉。

DispatchMessage

messageq(ChannelMessageQueue)中只有一個同步的map

// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelMessageQueue) DispatchMessage() {
    for {
        select {
        case <-beehiveContext.Done():
            klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped")
            return
        default:
        }
        msg, err := beehiveContext.Receive(model.SrcCloudHub)
        if err != nil {
            klog.Info("receive not Message format message")
            continue
        }
        resource := msg.Router.Resource
        tokens := strings.Split(resource, "/")
        numOfTokens := len(tokens)
        var nodeID string
        for i, token := range tokens {
            if token == model.ResNode && i+1 < numOfTokens {
                nodeID = tokens[i+1]
                break
            }
        }
        if nodeID == "" {
            klog.Warning("node id is not found in the message")
            continue
        }
        rChannel, err := q.getRChannel(nodeID)
        if err != nil {
            klog.Infof("fail to get dispatch channel for %s", nodeID)
            continue
        }
        rChannel <- msg
    }
}

從beehiveContext收取group名稱為“cloudhub”的message,也就是之前分析中癞松,edgecontroller和devicecontroller的downstreamcontroller發(fā)下來的消息爽撒。

取出消息后,根據(jù)消息中的Router信息响蓉,取出目標的nodeId硕勿,如果nodeId沒取到,則只記錄日志枫甲。 然后根據(jù)NodeId從ChannelMessageQueue的map中取出通道源武,然后把消息放到通道中扼褪。

整個流程是在一個for循環(huán)中,因此這個流程會不斷重復執(zhí)行粱栖,直到beehiveContext被關閉迎捺。

StartCloudHub

// StartCloudHub starts the cloud hub service
func StartCloudHub(protocolType string, config *hubconfig.Configure, messageq *channelq.ChannelMessageQueue) {
    // init certificate
    pool := x509.NewCertPool()
    ok := pool.AppendCertsFromPEM(config.Ca)
    if !ok {
        panic(fmt.Errorf("fail to load ca content"))
    }
    cert, err := tls.X509KeyPair(config.Cert, config.Key)
    if err != nil {
        panic(err)
    }
    tlsConfig := tls.Config{
        ClientCAs:    pool,
        ClientAuth:   tls.RequireAndVerifyClientCert,
        Certificates: []tls.Certificate{cert},
        MinVersion:   tls.VersionTLS12,
        CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
    }

    handler.InitHandler(config, messageq)

    svc := server.Server{
        Type:       protocolType,
        TLSConfig:  &tlsConfig,
        AutoRoute:  true,
        ConnNotify: handler.CloudhubHandler.OnRegister,
    }

    switch protocolType {
    case api.ProtocolTypeWS:
        svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.Port)
        svc.ExOpts = api.WSServerOption{Path: "/"}
    case api.ProtocolTypeQuic:
        svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.QuicPort)
        svc.ExOpts = api.QuicServerOption{MaxIncomingStreams: config.MaxIncomingStreams}
    default:
        panic(fmt.Errorf("invalid protocol, should be websocket or quic"))
    }

    klog.Infof("Start cloud hub %s server", protocolType)
    svc.ListenAndServeTLS("", "")
}

前幾行是初始化證書(用于https)的相關配置。然后初始化了一個handler

InitHandler

func InitHandler(config *hubconfig.Configure, eventq *channelq.ChannelMessageQueue) {
    once.Do(func() {
        CloudhubHandler = &MessageHandle{
            KeepaliveInterval: config.KeepaliveInterval,
            WriteTimeout:      config.WriteTimeout,
            MessageQueue:      eventq,
            NodeLimit:         config.NodeLimit,
        }

        CloudhubHandler.KeepaliveChannel = make(map[string]chan struct{})
        CloudhubHandler.Handlers = []HandleFunc{CloudhubHandler.KeepaliveCheckLoop, CloudhubHandler.MessageWriteLoop}

        CloudhubHandler.initServerEntries()
    })
}

這個eventq就是之前DispatchMessage的messageq, 這里設置了2個HandleFunc查排,分別是KeepaliveCheckLoop和MessageWriteLoop。KeepaliveCheckLoop是維持心跳的抄沮,這里就不仔細看了跋核。
MessageWriteLoop是處理云端發(fā)給邊端消息的。


// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stop chan ExitCode) {
    messages, err := mh.MessageQueue.Consume(info)
    if err != nil {
        klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
        stop <- messageQueueDisconnect
        return
    }
    for {
        msg, err := messages.Get()
        if err != nil {
            klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
            if err.Error() == MsgFormatError {
                // error format message should not impact other message
                messages.Ack()
                continue
            }
            stop <- messageQueueDisconnect
            return
        }

        if model.IsNodeStopped(msg) {
            klog.Infof("node %s is stopped, will disconnect", info.NodeID)
            messages.Ack()
            stop <- nodeStop
            return
        }
        if !model.IsToEdge(msg) {
            klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
            messages.Ack()
            continue
        }
        klog.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)

        trimMessage(msg)
        err = hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
        if err != nil {
            klog.Errorf("SetWriteDeadline error, %s", err.Error())
            stop <- hubioWriteFail
            return
        }
        err = mh.hubIoWrite(hi, info.NodeID, msg)
        if err != nil {
            klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
                info.NodeID, dumpMessageMetadata(msg), err.Error())
            stop <- hubioWriteFail
            return
        }
        messages.Ack()
    }

從MessageQueue取出消息叛买,然后判斷需要發(fā)往的節(jié)點是否停止砂代、是否是發(fā)往邊緣節(jié)點的。然后調用hubIoWrite向邊緣的node上寫數(shù)據(jù)率挣。

InitHandler的最后調用initServerEntries刻伊,這個是viaduct模塊中的功能,viduct是用來云端和邊緣端進行通信的底層模塊椒功,這里先不仔細分析了捶箱。

Server

回到StartCloudHub中,接下來就起了一個viaduct的server动漾,并執(zhí)行ListenAndServeTLS開始監(jiān)聽和邊緣節(jié)點的交互請求丁屎。

server在啟動的時候,注冊了一個OnResigter方法

// OnRegister regist node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
    nodeID := connection.ConnectionState().Headers.Get("node_id")
    projectID := connection.ConnectionState().Headers.Get("project_id")

    if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
        mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
    }

    io := &hubio.JSONIO{Connection: connection}
    go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}

用于首次收到邊緣側發(fā)來的請求后旱眯,創(chuàng)建好相關的通道晨川,并調用ServeConn處理消息,這里主要涉及到了viaduct這個中間件的處理流程删豺,這里就不仔細看了共虑。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市呀页,隨后出現(xiàn)的幾起案子妈拌,更是在濱河造成了極大的恐慌,老刑警劉巖赔桌,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件供炎,死亡現(xiàn)場離奇詭異,居然都是意外死亡疾党,警方通過查閱死者的電腦和手機音诫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雪位,“玉大人竭钝,你說我怎么就攤上這事。” “怎么了香罐?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵卧波,是天一觀的道長。 經常有香客問我庇茫,道長港粱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任旦签,我火速辦了婚禮查坪,結果婚禮上,老公的妹妹穿的比我還像新娘宁炫。我一直安慰自己偿曙,他們只是感情好,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布羔巢。 她就那樣靜靜地躺著望忆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪竿秆。 梳的紋絲不亂的頭發(fā)上启摄,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天,我揣著相機與錄音袍辞,去河邊找鬼鞋仍。 笑死,一個胖子當著我的面吹牛搅吁,可吹牛的內容都是我干的威创。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼谎懦,長吁一口氣:“原來是場噩夢啊……” “哼肚豺!你這毒婦竟也來了?” 一聲冷哼從身側響起界拦,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤吸申,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后享甸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體截碴,經...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年蛉威,在試婚紗的時候發(fā)現(xiàn)自己被綠了日丹。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚯嫌,死狀恐怖哲虾,靈堂內的尸體忽然破棺而出丙躏,到底是詐尸還是另有隱情,我是刑警寧澤束凑,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布晒旅,位于F島的核電站,受9級特大地震影響汪诉,放射性物質發(fā)生泄漏废恋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一扒寄、第九天 我趴在偏房一處隱蔽的房頂上張望拴签。 院中可真熱鬧,春花似錦旗们、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至喜颁,卻和暖如春稠氮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背半开。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工隔披, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人寂拆。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓奢米,卻偏偏與公主長得像,于是被迫代替她去往敵國和親纠永。 傳聞我的和親對象是個殘疾皇子鬓长,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349