概述
cloudhub是負責云端和邊端的通信的议惰,目前支持Websocket和QUIC兩種方式妙痹。
Register
初始化配置寇荧,并在beehive中注冊最易。
Start
func (a *cloudHub) Start() {
messageq := channelq.NewChannelMessageQueue()
// start dispatch message from the cloud to edge node
go messageq.DispatchMessage()
// start the cloudhub server
if hubconfig.Get().ProtocolWebsocket {
// TODO delete second param @kadisi
go servers.StartCloudHub(api.ProtocolTypeWS, hubconfig.Get(), messageq)
}
if hubconfig.Get().ProtocolQuic {
// TODO delete second param @kadisi
go servers.StartCloudHub(api.ProtocolTypeQuic, hubconfig.Get(), messageq)
}
if hubconfig.Get().ProtocolUDS {
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Get())
}
}
這里就干了3件事情,DispatchMessage粒没、StartCloudHub和udsserver.StartServer筛婉。
DispatchMessage
messageq(ChannelMessageQueue)中只有一個同步的map
// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelMessageQueue) DispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped")
return
default:
}
msg, err := beehiveContext.Receive(model.SrcCloudHub)
if err != nil {
klog.Info("receive not Message format message")
continue
}
resource := msg.Router.Resource
tokens := strings.Split(resource, "/")
numOfTokens := len(tokens)
var nodeID string
for i, token := range tokens {
if token == model.ResNode && i+1 < numOfTokens {
nodeID = tokens[i+1]
break
}
}
if nodeID == "" {
klog.Warning("node id is not found in the message")
continue
}
rChannel, err := q.getRChannel(nodeID)
if err != nil {
klog.Infof("fail to get dispatch channel for %s", nodeID)
continue
}
rChannel <- msg
}
}
從beehiveContext收取group名稱為“cloudhub”的message,也就是之前分析中癞松,edgecontroller和devicecontroller的downstreamcontroller發(fā)下來的消息爽撒。
取出消息后,根據(jù)消息中的Router信息响蓉,取出目標的nodeId硕勿,如果nodeId沒取到,則只記錄日志枫甲。 然后根據(jù)NodeId從ChannelMessageQueue的map中取出通道源武,然后把消息放到通道中扼褪。
整個流程是在一個for循環(huán)中,因此這個流程會不斷重復執(zhí)行粱栖,直到beehiveContext被關閉迎捺。
StartCloudHub
// StartCloudHub starts the cloud hub service
func StartCloudHub(protocolType string, config *hubconfig.Configure, messageq *channelq.ChannelMessageQueue) {
// init certificate
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM(config.Ca)
if !ok {
panic(fmt.Errorf("fail to load ca content"))
}
cert, err := tls.X509KeyPair(config.Cert, config.Key)
if err != nil {
panic(err)
}
tlsConfig := tls.Config{
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
}
handler.InitHandler(config, messageq)
svc := server.Server{
Type: protocolType,
TLSConfig: &tlsConfig,
AutoRoute: true,
ConnNotify: handler.CloudhubHandler.OnRegister,
}
switch protocolType {
case api.ProtocolTypeWS:
svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.Port)
svc.ExOpts = api.WSServerOption{Path: "/"}
case api.ProtocolTypeQuic:
svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.QuicPort)
svc.ExOpts = api.QuicServerOption{MaxIncomingStreams: config.MaxIncomingStreams}
default:
panic(fmt.Errorf("invalid protocol, should be websocket or quic"))
}
klog.Infof("Start cloud hub %s server", protocolType)
svc.ListenAndServeTLS("", "")
}
前幾行是初始化證書(用于https)的相關配置。然后初始化了一個handler
InitHandler
func InitHandler(config *hubconfig.Configure, eventq *channelq.ChannelMessageQueue) {
once.Do(func() {
CloudhubHandler = &MessageHandle{
KeepaliveInterval: config.KeepaliveInterval,
WriteTimeout: config.WriteTimeout,
MessageQueue: eventq,
NodeLimit: config.NodeLimit,
}
CloudhubHandler.KeepaliveChannel = make(map[string]chan struct{})
CloudhubHandler.Handlers = []HandleFunc{CloudhubHandler.KeepaliveCheckLoop, CloudhubHandler.MessageWriteLoop}
CloudhubHandler.initServerEntries()
})
}
這個eventq就是之前DispatchMessage的messageq, 這里設置了2個HandleFunc查排,分別是KeepaliveCheckLoop和MessageWriteLoop。KeepaliveCheckLoop是維持心跳的抄沮,這里就不仔細看了跋核。
MessageWriteLoop是處理云端發(fā)給邊端消息的。
// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stop chan ExitCode) {
messages, err := mh.MessageQueue.Consume(info)
if err != nil {
klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
stop <- messageQueueDisconnect
return
}
for {
msg, err := messages.Get()
if err != nil {
klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
if err.Error() == MsgFormatError {
// error format message should not impact other message
messages.Ack()
continue
}
stop <- messageQueueDisconnect
return
}
if model.IsNodeStopped(msg) {
klog.Infof("node %s is stopped, will disconnect", info.NodeID)
messages.Ack()
stop <- nodeStop
return
}
if !model.IsToEdge(msg) {
klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
messages.Ack()
continue
}
klog.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
trimMessage(msg)
err = hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
if err != nil {
klog.Errorf("SetWriteDeadline error, %s", err.Error())
stop <- hubioWriteFail
return
}
err = mh.hubIoWrite(hi, info.NodeID, msg)
if err != nil {
klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
info.NodeID, dumpMessageMetadata(msg), err.Error())
stop <- hubioWriteFail
return
}
messages.Ack()
}
從MessageQueue取出消息叛买,然后判斷需要發(fā)往的節(jié)點是否停止砂代、是否是發(fā)往邊緣節(jié)點的。然后調用hubIoWrite向邊緣的node上寫數(shù)據(jù)率挣。
InitHandler的最后調用initServerEntries刻伊,這個是viaduct模塊中的功能,viduct是用來云端和邊緣端進行通信的底層模塊椒功,這里先不仔細分析了捶箱。
Server
回到StartCloudHub中,接下來就起了一個viaduct的server动漾,并執(zhí)行ListenAndServeTLS開始監(jiān)聽和邊緣節(jié)點的交互請求丁屎。
server在啟動的時候,注冊了一個OnResigter方法
// OnRegister regist node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")
if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
}
io := &hubio.JSONIO{Connection: connection}
go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
用于首次收到邊緣側發(fā)來的請求后旱眯,創(chuàng)建好相關的通道晨川,并調用ServeConn處理消息,這里主要涉及到了viaduct這個中間件的處理流程删豺,這里就不仔細看了共虑。