概述
Edgecontroller 是 Kubernetes apiserver 與 Edgecore 之間的橋梁氯迂,負(fù)責(zé)兩者間的通信與協(xié)議轉(zhuǎn)換扯旷。
其核心為:
- UpstreamController(上行消息控制器):負(fù)責(zé)從邊緣訂閱消息并同步到 kubernetes api server耸棒。
- DownstreamController(下行消息控制器):負(fù)責(zé)抓取 kubernetes api server 更改并將更改發(fā)送到邊緣。
結(jié)構(gòu)
EdgeController
對(duì)于 EdgeController 而言亡问,它所做的事情非常簡(jiǎn)單汞幢,它主要就是將自己注冊(cè)到 Core 中,并且使用 config 實(shí)例化 <font color="#ff0000">UpstreamController</font>舰罚,<font color="#ff0000">DownstreamController</font> 纽门。然后在 Core 啟動(dòng)該模塊時(shí)啟動(dòng) UpstreamController 與 DownstreamController。
Register
- 根據(jù)配置實(shí)例化上行控制器--UpstreamController
- 根據(jù)配置實(shí)例化下行控制器--DownstreamController
func Register(ec *v1alpha1.EdgeController) {
core.Register(newEdgeController(ec))
}
func newEdgeController(config *v1alpha1.EdgeController) *EdgeController {
ec := &EdgeController{config: *config}
if !ec.Enable() {
return ec
}
var err error
ec.upstream, err = controller.NewUpstreamController(config, informers.GetInformersManager().GetK8sInformerFactory())
...
ec.downstream, err = controller.NewDownstreamController(config, informers.GetInformersManager().GetK8sInformerFactory(), informers.GetInformersManager(), informers.GetInformersManager().GetCRDInformerFactory())
...
return ec
}
Start
- 啟動(dòng)上行控制器
- 啟動(dòng)下行控制器
func (ec *EdgeController) Start() {
if err := ec.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}
if err := ec.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
}
UpstreamController
對(duì)于上下控制器而言营罢,它的主要作用就是訂閱邊緣的消息赏陵,然后進(jìn)行協(xié)議轉(zhuǎn)換和處理后,發(fā)送給 k8s apiserver饲漾。
創(chuàng)建上行控制器實(shí)例---NewUpstreamController
NewUpstreamController 方法主要作用就是創(chuàng)建一個(gè) UpstreamController 實(shí)例瘟滨,其主要流程如下:
- 獲取 k8s 相關(guān)客戶端
- 獲取消息傳輸層實(shí)例
- 獲取各類資源Lister---Node,Pod能颁,ConfigMap杂瘸,Secret,Lease
- 創(chuàng)建各類資源數(shù)據(jù)的處理通道
func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error) {
uc := &UpstreamController{
kubeClient: client.GetKubeClient(),
messageLayer: messagelayer.EdgeControllerMessageLayer(),
crdClient: client.GetCRDClient(),
config: *config,
}
//獲取各類資源Lister
uc.nodeLister = factory.Core().V1().Nodes().Lister()
...
//創(chuàng)建各類資源數(shù)據(jù)的處理通道
uc.nodeStatusChan = make(chan model.Message, config.Buffer.UpdateNodeStatus)
uc.podStatusChan = make(chan model.Message, config.Buffer.UpdatePodStatus)
...
return uc, nil
}
啟動(dòng)控制器---Start
Start 的主要作用就是啟動(dòng) UpstreamController 實(shí)例伙菊,開始接收來(lái)自消息層的消息败玉,然后分發(fā)給各個(gè)函數(shù)處理敌土,其主要流程如下:
- 開協(xié)程進(jìn)行消息分發(fā)處理---dispatchMessage
- 根據(jù)配置開協(xié)程處理各類資源消息
- updateNodeStatus
- updatePodStatus
- queryConfigMap
- querySecret
- processServiceAccountToken
- ... --- 太多了,不一一例舉了
func (uc *UpstreamController) Start() error {
klog.Info("start upstream controller")
//開協(xié)程進(jìn)行消息分發(fā)處理
go uc.dispatchMessage()
//根據(jù)配置開協(xié)程處理各類資源消息
for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++ {
go uc.updateNodeStatus()
}
for i := 0; i < int(uc.config.Load.UpdatePodStatusWorkers); i++ {
go uc.updatePodStatus()
}
...
return nil
}
分發(fā)消息---dispatchMessage
dispatchMessage 主要用于從消息層接收消息运翼,并根據(jù)資源類型的不同返干,通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)。其主要流程如下:
- 開啟循環(huán)
- 接收到結(jié)束信號(hào)就退出循環(huán)
- 接收到消息就獲取并判斷資源類型
- 根據(jù)資源類型和操作將消息通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)
資源類型
const (
ResourceTypePod = "pod"
ResourceTypeConfigmap = "configmap"
ResourceTypeServiceAccountToken = "serviceaccounttoken"
ResourceTypeSecret = "secret"
ResourceTypeNode = "node"
ResourceTypePodlist = "podlist"
ResourceTypePodStatus = "podstatus"
ResourceTypePodPatch = "podpatch"
ResourceTypeNodeStatus = "nodestatus"
ResourceTypeNodePatch = "nodepatch"
ResourceTypeRule = "rule"
ResourceTypeRuleEndpoint = "ruleendpoint"
ResourceTypeRuleStatus = "rulestatus"
ResourceTypeLease = "lease"
)
操作類型
const (
InsertOperation = "insert"
DeleteOperation = "delete"
QueryOperation = "query"
UpdateOperation = "update"
PatchOperation = "patch"
UploadOperation = "upload"
ResponseOperation = "response"
ResponseErrorOperation = "error"
)
方法
func (uc *UpstreamController) dispatchMessage() {
for {
select {
// 接收到結(jié)束信號(hào)就退出循環(huán)
case <-beehiveContext.Done():
return
default:
}
//接收到消息
msg, err := uc.messageLayer.Receive()
if err != nil {
continue
}
//獲取資源類型
resourceType, err := messagelayer.GetResourceType(msg)
if err != nil {
continue
}
//判斷資源類型
//根據(jù)資源類型和操作將消息通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)
switch resourceType {
case model.ResourceTypeNodeStatus:
uc.nodeStatusChan <- msg
case model.ResourceTypePodStatus:
uc.podStatusChan <- msg
case model.ResourceTypeConfigmap:
uc.configMapChan <- msg
case model.ResourceTypeSecret:
uc.secretChan <- msg
case model.ResourceTypeServiceAccountToken:
uc.serviceAccountTokenChan <- msg
case common.ResourceTypePersistentVolume:
uc.persistentVolumeChan <- msg
case common.ResourceTypePersistentVolumeClaim:
uc.persistentVolumeClaimChan <- msg
case common.ResourceTypeVolumeAttachment:
uc.volumeAttachmentChan <- msg
case model.ResourceTypeNode:
switch msg.GetOperation() {
case model.InsertOperation:
uc.createNodeChan <- msg
case model.QueryOperation:
uc.queryNodeChan <- msg
case model.UpdateOperation:
uc.updateNodeChan <- msg
default:
}
case model.ResourceTypeNodePatch:
uc.patchNodeChan <- msg
case model.ResourceTypePodPatch:
uc.patchPodChan <- msg
case model.ResourceTypePod:
if msg.GetOperation() == model.DeleteOperation {
uc.podDeleteChan <- msg
}
case model.ResourceTypeRuleStatus:
uc.ruleStatusChan <- msg
case model.ResourceTypeLease:
switch msg.GetOperation() {
case model.InsertOperation, model.UpdateOperation:
uc.createLeaseChan <- msg
case model.QueryOperation:
uc.queryLeaseChan <- msg
}
default:
}
}
}
資源處理函數(shù)
大部分資源處理函數(shù)都只是將消息進(jìn)行轉(zhuǎn)換后血淌,進(jìn)行了簡(jiǎn)單的判斷矩欠,然后就將實(shí)際的資源根據(jù)請(qǐng)求的資源類型和操作去請(qǐng)求 k8s apiserver 了。在這里我就不過(guò)多敘述了悠夯,只舉一個(gè)例子給大家看就懂了癌淮。
常規(guī)資源處理
常規(guī)資源請(qǐng)求方法--kubeClientGet
這個(gè)方法主要用于去請(qǐng)求一些常規(guī)的資源,在控制器中沦补,被多個(gè)地方調(diào)用它的上層封裝函數(shù) queryInner乳蓄。它的主要流程就非常簡(jiǎn)單:
- 判斷請(qǐng)求的什么資源
- 請(qǐng)求k8s apiserver 獲取資源
- 返回資源和錯(cuò)誤信息
func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string, msg model.Message) (metaV1.Object, error) {
var obj metaV1.Object
var err error
switch queryType {
case model.ResourceTypeConfigmap:
obj, err = uc.configMapLister.ConfigMaps(namespace).Get(name)
case model.ResourceTypeSecret:
obj, err = uc.secretLister.Secrets(namespace).Get(name)
case common.ResourceTypePersistentVolume:
obj, err = uc.kubeClient.CoreV1().PersistentVolumes().Get(context.Background(), name, metaV1.GetOptions{})
case common.ResourceTypePersistentVolumeClaim:
obj, err = uc.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.Background(), name, metaV1.GetOptions{})
case common.ResourceTypeVolumeAttachment:
obj, err = uc.kubeClient.StorageV1().VolumeAttachments().Get(context.Background(), name, metaV1.GetOptions{})
case model.ResourceTypeNode:
obj, err = uc.nodeLister.Get(name)
case model.ResourceTypeServiceAccountToken:
obj, err = uc.getServiceAccountToken(namespace, name, msg)
case model.ResourceTypeLease:
obj, err = uc.leaseLister.Leases(namespace).Get(name)
default:
err = stderrors.New("wrong query type")
}
if err != nil {
return nil, err
}
if err := util.SetMetaType(obj.(runtime.Object)); err != nil {
return nil, err
}
return obj, nil
}
資源請(qǐng)求方法--queryInner
它的主要作用是解析消息本體,獲取消息中包含的 k8s 資源信息夕膀,然后根據(jù)這些信息去調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 去獲取資源數(shù)據(jù),請(qǐng)求成功就再次通過(guò)消息層將請(qǐng)求的數(shù)據(jù)包裝成私有協(xié)議后發(fā)送出去虚倒。其主要流程如下:
- 解析消息中包含的 k8s 資源信息
- 判斷是不是請(qǐng)求操作
- 是就調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 獲取資源數(shù)據(jù)
- 獲取成功就調(diào)用消息層包裝資源數(shù)據(jù)成私有協(xié)議
- 調(diào)用消息層發(fā)送包裝后的消息
func queryInner(uc *UpstreamController, msg model.Message, queryType string) {
//解析消息中包含的 k8s 資源信息
namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
return
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
return
}
//判斷是不是請(qǐng)求操作
switch msg.GetOperation() {
case model.QueryOperation:
// 調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 獲取資源數(shù)據(jù)
object, err := kubeClientGet(uc, namespace, name, queryType, msg)
if errors.IsNotFound(err) {
return
}
if err != nil {
return
}
nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
return
}
//調(diào)用消息層包裝資源數(shù)據(jù)成私有協(xié)議
resource, err := messagelayer.BuildResource(nodeID, namespace, queryType, name)
if err != nil {
return
}
//調(diào)用消息層發(fā)送包裝后的消息
resMsg := model.NewMessage(msg.GetID()).
SetResourceVersion(object.GetResourceVersion()).
FillBody(object).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
err = uc.messageLayer.Response(*resMsg)
if err != nil {
return
}
default:
}
}
例子:查詢ConfigMap --- queryConfigMap
主要流程如下:
- 開啟循環(huán)
- 從通道接收消息
- 如果是結(jié)束通道傳來(lái)的消息就退出函數(shù)
- 如果是消息通道傳來(lái)的消息,就接收消息产舞,然后交給資源請(qǐng)求方法(queryInner)處理魂奥。
func (uc *UpstreamController) queryConfigMap() {
for {
select {
case <-beehiveContext.Done():
return
case msg := <-uc.configMapChan:
queryInner(uc, msg, model.ResourceTypeConfigmap)
}
}
}
更新節(jié)點(diǎn)狀態(tài) --- updateNodeStatus(已棄用)
主要作用就是更新節(jié)點(diǎn)的狀態(tài),其流程如下:
接收退出信號(hào)和消息信號(hào)就不再過(guò)多贅述了易猫,直接介紹處理流程耻煤。
- 獲取 k8s 資源相關(guān)的消息(命名空間,資源名稱等)
- 判斷需要對(duì)消息進(jìn)行的操作
- 新增操作:
- 先從 k8s apiserver 查詢?cè)摴?jié)點(diǎn)資源擦囊,查詢到就返回資源已存在,不用創(chuàng)建
- 查詢錯(cuò)誤如果不是資源不存在就返回錯(cuò)誤消息
- 該節(jié)點(diǎn)資源不存在就調(diào)用 k8s client 創(chuàng)建節(jié)點(diǎn)資源
- 創(chuàng)建失敗就返回失敗消息嘴办,成功就返回成功消息
- 更新操作:
- 先從 k8s apiserver 查詢?cè)摴?jié)點(diǎn)資源瞬场,不存在就打印錯(cuò)誤消息
- 存在就更新資源狀態(tài)里的心跳時(shí)間信息
- 為節(jié)點(diǎn)資源設(shè)置Annotations
- 更新資源狀態(tài)信息
- 調(diào)用 k8s client 更新節(jié)點(diǎn)狀態(tài)信息
- 調(diào)用消息層返回更新節(jié)點(diǎn)資源成功的消息
func (uc *UpstreamController) updateNodeStatus() {
for {
select {
case <-beehiveContext.Done():
return
case msg := <-uc.nodeStatusChan:
data, err := msg.GetContentData()
if err != nil {
continue
}
namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
continue
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
continue
}
switch msg.GetOperation() {
case model.InsertOperation:
_, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if err == nil {
uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)
continue
}
if !errors.IsNotFound(err) {
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}
node := &v1.Node{}
err = json.Unmarshal(data, node)
if err != nil {
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}
if _, err = uc.createNode(name, node); err != nil {
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}
uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)
case model.UpdateOperation:
nodeStatusRequest := &edgeapi.NodeStatusRequest{}
err := json.Unmarshal(data, nodeStatusRequest)
if err != nil {
continue
}
getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
continue
}
if err != nil {
continue
}
for i := range nodeStatusRequest.Status.Conditions {
if time.Since(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(uc.config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
}
}
if getNode.Annotations == nil {
getNode.Annotations = make(map[string]string)
}
for name, v := range nodeStatusRequest.ExtendResources {
if name == constants.NvidiaGPUScalarResourceName {
var gpuStatus []types.NvidiaGPUStatus
for _, er := range v {
gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
}
if len(gpuStatus) > 0 {
data, _ := json.Marshal(gpuStatus)
getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
}
}
data, err := json.Marshal(v)
if err != nil {
continue
}
getNode.Annotations[string(name)] = string(data)
}
if getNode.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
nodeStatusRequest.Status.DaemonEndpoints.KubeletEndpoint.Port = getNode.Status.DaemonEndpoints.KubeletEndpoint.Port
}
getNode.Status = nodeStatusRequest.Status
node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})
if err != nil {
continue
}
nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
continue
}
resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
if err != nil {
continue
}
resMsg := model.NewMessage(msg.GetID()).
SetResourceVersion(node.ResourceVersion).
FillBody(common.MessageSuccessfulContent).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
if err = uc.messageLayer.Response(*resMsg); err != nil {
continue
}
default:
continue
}
}
}
}
其他的上行消息處理都是比較簡(jiǎn)單的,我在這里就不一一例舉了涧郊,其大概的處理流程如下:
- 獲取信息
- 判斷操作什么資源
- 判斷對(duì)資源需要做什么操作
- 進(jìn)行一定的處理
- 根據(jù)對(duì)應(yīng)的資源類型和操作類型請(qǐng)求 k8s api server
- 如果需要響應(yīng)請(qǐng)求贯被,就調(diào)用消息層構(gòu)建并響應(yīng)消息。
DownstreamController
DownstreamController 主要用于接收 k8s apiserver 的變更事件信息妆艘,然后經(jīng)過(guò)協(xié)議轉(zhuǎn)換彤灶,發(fā)送給邊緣。
創(chuàng)建下行控制器實(shí)例---NewDownstreamController
NewDownstreamController 主要作用是創(chuàng)建一個(gè) DownstreamController 實(shí)例批旺,其主要流程如下:
- 獲取一個(gè)本地緩存實(shí)例存儲(chǔ)節(jié)點(diǎn)和 ConfigMap,Secret 間的關(guān)系
- 獲取 k8s client
- 獲取各類 manager 管理各類資源的變化事件
- 獲取消息層用于發(fā)送消息給邊緣
- 初始化本地緩存
func NewDownstreamController(config *v1alpha1.EdgeController, k8sInformerFactory k8sinformers.SharedInformerFactory, keInformerFactory informers.KubeEdgeCustomInformer,
crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
//獲取一個(gè)本地緩存實(shí)例
lc := &manager.LocationCache{}
//獲取各類 manager 管理各類資源的變化事件
podInformer := k8sInformerFactory.Core().V1().Pods()
podManager, err := manager.NewPodManager(config, podInformer.Informer())
if err != nil {
return nil, err
}
configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps()
configMapManager, err := manager.NewConfigMapManager(config, configMapInformer.Informer())
if err != nil {
return nil, err
}
secretInformer := k8sInformerFactory.Core().V1().Secrets()
secretManager, err := manager.NewSecretManager(config, secretInformer.Informer())
if err != nil {
return nil, err
}
nodeInformer := keInformerFactory.EdgeNode()
nodesManager, err := manager.NewNodesManager(nodeInformer)
if err != nil {
return nil, err
}
rulesInformer := crdInformerFactory.Rules().V1().Rules().Informer()
rulesManager, err := manager.NewRuleManager(config, rulesInformer)
if err != nil {
return nil, err
}
ruleEndpointsInformer := crdInformerFactory.Rules().V1().RuleEndpoints().Informer()
ruleEndpointsManager, err := manager.NewRuleEndpointManager(config, ruleEndpointsInformer)
if err != nil {
return nil, err
}
dc := &DownstreamController{
//獲取 k8s client
kubeClient: client.GetKubeClient(),
podManager: podManager,
configmapManager: configMapManager,
secretManager: secretManager,
nodeManager: nodesManager,
//獲取消息層用于發(fā)送消息給邊緣
messageLayer: messagelayer.EdgeControllerMessageLayer(),
lc: lc,
podLister: podInformer.Lister(),
rulesManager: rulesManager,
ruleEndpointsManager: ruleEndpointsManager,
}
//初始化本地緩存
if err := dc.initLocating(); err != nil {
return nil, err
}
return dc, nil
}
啟動(dòng)控制器---Start
下行控制器的啟動(dòng)和上行控制器的啟動(dòng)一樣非常簡(jiǎn)單幌陕,就是開啟協(xié)程處理各類事件:
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()
// configmap
go dc.syncConfigMap()
// secret
go dc.syncSecret()
// nodes
go dc.syncEdgeNodes()
// rule
go dc.syncRule()
// ruleendpoint
go dc.syncRuleEndpoint()
return nil
}
本地緩存 --- LocationCache
在上游控制器中,我們提到過(guò) LocationCache 的作用主要就是用于本地緩存汽煮,緩存節(jié)點(diǎn)和 ConfigMap,Secret 間的關(guān)系搏熄,這樣做主要是為了能給在 ConfigMap 和 Secret 發(fā)生變更是能夠快速知道有哪些節(jié)點(diǎn)在使用棚唆,然后下發(fā)給對(duì)應(yīng)節(jié)點(diǎn)。其數(shù)據(jù)結(jié)構(gòu)如下:
type LocationCache struct {
//EdgeNodes 用于存儲(chǔ)邊緣 nodeName心例,把Map用作了Set
EdgeNodes sync.Map
// configMapNode 用于儲(chǔ)存 node 和 ConfigMap 間的關(guān)系宵凌,
// key 是 namespace/configMapName, value 是 nodeName
configMapNode sync.Map
// configMapNode 用于儲(chǔ)存 node 和 secret 間的關(guān)系,
// key 是 namespace/configMapName, value 是 nodeName
secretNode sync.Map
}
我們接下來(lái)看看它的幾個(gè)關(guān)鍵的處理方法:
初始化 --- initLocating
initLocating 函數(shù)用于下行控制器創(chuàng)建的時(shí)候初始化本地緩存這個(gè)實(shí)例止后,在初始化的時(shí)候瞎惫,其主要流程如下:
- 創(chuàng)建一個(gè)用于獲取邊緣節(jié)點(diǎn)的labelSelector
- 從 k8s client 查詢節(jié)點(diǎn)數(shù)據(jù)
- 更新本地緩存的邊緣節(jié)點(diǎn)信息
- 查詢 k8s 集群 所有 pod信息
- 判斷 pod 是不是邊緣節(jié)點(diǎn)的 pod
- 是就使用 pod 的里的信息更新本地緩存
func (dc *DownstreamController) initLocating() error {
//創(chuàng)建一個(gè)用于獲取邊緣節(jié)點(diǎn)的labelSelector
set := labels.Set{commonconstants.EdgeNodeRoleKey: commonconstants.EdgeNodeRoleValue}
selector := labels.SelectorFromSet(set)
//從 k8s client 查詢節(jié)點(diǎn)數(shù)據(jù)
nodes, err := dc.kubeClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
//更新本地緩存的邊緣節(jié)點(diǎn)信息
for _, node := range nodes.Items {
dc.lc.UpdateEdgeNode(node.ObjectMeta.Name)
}
//查詢 k8s 集群 所有 pod信息
pods, err := dc.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}
for _, p := range pods.Items {
//判斷 pod 是不是邊緣節(jié)點(diǎn)的 pod
if dc.lc.IsEdgeNode(p.Spec.NodeName) {
//是就使用 pod 的里的信息更新本地緩存
dc.lc.AddOrUpdatePod(p)
}
}
return nil
}
緩存邊緣節(jié)點(diǎn)信息 --- UpdateEdgeNode
緩存節(jié)點(diǎn)信息則是直接將節(jié)點(diǎn)名稱存入 map 。
func (lc *LocationCache) UpdateEdgeNode(nodeName string) {
lc.EdgeNodes.Store(nodeName, struct{}{})
}
判斷節(jié)點(diǎn)是否是邊緣節(jié)點(diǎn) --- IsEdgeNode
對(duì)應(yīng)判斷是否是邊緣節(jié)點(diǎn)译株,則直接查詢 EdgeNodes 這個(gè) map瓜喇,如果能夠查出來(lái)就說(shuō)明是邊緣節(jié)點(diǎn),如果不能古戴,就說(shuō)明不是欠橘。
為什么可以這樣判斷?
因?yàn)槲覀兂跏蓟镜鼐彺娴臅r(shí)候就將所有的邊緣節(jié)點(diǎn)查詢出來(lái)加入了 EdgeNodes 這個(gè)map现恼。
func (lc *LocationCache) IsEdgeNode(nodeName string) bool {
_, ok := lc.EdgeNodes.Load(nodeName)
return ok
}
根據(jù) Pod 添加或更新緩存信息 --- AddOrUpdatePod
AddOrUpdatePod 方法主要作用是將 Pod 用到的 ConfigMap 和 Secret 信息從 Pod 中拿出來(lái)進(jìn)行緩存肃续,這樣的話,我們就能在 ConfigMap 或 Secret 發(fā)生變更的時(shí)候及時(shí)通知給對(duì)應(yīng)邊緣叉袍。
主要流程如下:
- 從 pod 中獲取用到的 ConfigMaps 和 Secrets.
- 判斷 configMapNode 中有沒(méi)有對(duì)應(yīng)的 configMapKey 和 nodeName
- 沒(méi)有就將缺失的數(shù)據(jù)將入進(jìn)去始锚,有就忽略。
- 判斷 secretNode 中有沒(méi)有對(duì)應(yīng)的 secretKey 和 nodeName
- 沒(méi)有就將缺失的數(shù)據(jù)將入進(jìn)去喳逛,有就忽略瞧捌。
func (lc *LocationCache) AddOrUpdatePod(pod v1.Pod) {
configMaps, secrets := lc.PodConfigMapsAndSecrets(pod)
for _, c := range configMaps {
configMapKey := fmt.Sprintf("%s/%s", pod.Namespace, c)
// update configMapPod
value, ok := lc.configMapNode.Load(configMapKey)
var newNodes []string
if ok {
nodes, _ := value.([]string)
newNodes = lc.newNodes(nodes, pod.Spec.NodeName)
} else {
newNodes = []string{pod.Spec.NodeName}
}
lc.configMapNode.Store(configMapKey, newNodes)
}
for _, s := range secrets {
secretKey := fmt.Sprintf("%s/%s", pod.Namespace, s)
// update secretPod
value, ok := lc.secretNode.Load(secretKey)
var newNodes []string
if ok {
nodes, _ := value.([]string)
newNodes = lc.newNodes(nodes, pod.Spec.NodeName)
} else {
newNodes = []string{pod.Spec.NodeName}
}
lc.secretNode.Store(secretKey, newNodes)
}
}
從 Pod 中獲取 ConfigMaps & Secrets --- PodConfigMapsAndSecrets
該函數(shù)主要用于從 Pod 中獲取該 Pod 用到的ConfigMaps & Secrets。 其主要流程如下:
- 遍歷 pod 的 Volumes 獲取 ConfigMaps & Secrets
- 遍歷 pod 的 Containers 的 envs 獲取 ConfigMaps & Secrets
- 遍歷 pod 的 ImagePullSecrets 獲取 Secrets
func (lc *LocationCache) PodConfigMapsAndSecrets(pod v1.Pod) (configMaps, secrets []string) {
for _, v := range pod.Spec.Volumes {
if v.ConfigMap != nil {
configMaps = append(configMaps, v.ConfigMap.Name)
}
if v.Secret != nil {
secrets = append(secrets, v.Secret.SecretName)
}
if v.Projected != nil {
for _, source := range v.Projected.Sources {
switch {
case source.ConfigMap != nil:
configMaps = append(configMaps, source.ConfigMap.Name)
case source.Secret != nil:
secrets = append(secrets, source.Secret.Name)
}
}
}
}
// used by envs
for _, s := range pod.Spec.Containers {
for _, ef := range s.EnvFrom {
if ef.ConfigMapRef != nil {
configMaps = append(configMaps, ef.ConfigMapRef.Name)
}
if ef.SecretRef != nil {
secrets = append(secrets, ef.SecretRef.Name)
}
}
for _, e := range s.Env {
if e.ValueFrom == nil {
continue
}
if e.ValueFrom.ConfigMapKeyRef != nil {
configMaps = append(configMaps, e.ValueFrom.ConfigMapKeyRef.Name)
} else if e.ValueFrom.SecretKeyRef != nil {
secrets = append(secrets, e.ValueFrom.SecretKeyRef.Name)
}
}
}
// used by ImagePullSecrets
for _, s := range pod.Spec.ImagePullSecrets {
secrets = append(secrets, s.Name)
}
return
}
公共事件管理器 --- CommonResourceEventHandler
公共事件管理器润文,主要用來(lái)接收來(lái)自 k8s apiserver informer 的事件消息姐呐,然后將該事件消息通過(guò)通道傳遞出去。
其數(shù)據(jù)結(jié)構(gòu)如下:
type CommonResourceEventHandler struct {
events chan watch.Event
}
它實(shí)現(xiàn)了 cache 包的 ResourceEventHandler 接口用來(lái)接收事件通知典蝌。
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
func (c *CommonResourceEventHandler) OnAdd(obj interface{}) {
c.obj2Event(watch.Added, obj)
}
func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
c.obj2Event(watch.Modified, newObj)
}
func (c *CommonResourceEventHandler) OnDelete(obj interface{}) {
c.obj2Event(watch.Deleted, obj)
}
func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
eventObj, ok := obj.(runtime.Object)
if !ok {
klog.Warningf("unknown type: %T, ignore", obj)
return
}
err := util.SetMetaType(eventObj)
if err != nil {
klog.Warningf("failed to set meta type :%v", err)
}
c.events <- watch.Event{Type: t, Object: eventObj}
}
定義了 Manager 接口用來(lái)發(fā)送消息曙砂。用來(lái)傳遞事件消息。
type Manager interface {
Events() chan watch.Event
}
創(chuàng)建事件管理器
func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler {
return &CommonResourceEventHandler{events: events}
}
節(jié)點(diǎn)事件管理
節(jié)點(diǎn)事件管理器實(shí)現(xiàn)了公共事件管理器的接口骏掀。其代碼如下:
type NodesManager struct {
events chan watch.Event
}
// manager 接口實(shí)現(xiàn)
func (nm *NodesManager) Events() chan watch.Event {
return nm.events
}
// 創(chuàng)建一個(gè)節(jié)點(diǎn)事件管理器
func NewNodesManager(si cache.SharedIndexInformer) (*NodesManager, error) {
events := make(chan watch.Event)
//獲取一個(gè)公共事件管理器實(shí)例
rh := NewCommonResourceEventHandler(events)
//注冊(cè)到 Informer
si.AddEventHandler(rh)
return &NodesManager{events: events}, nil
}
我們可以看到它的實(shí)現(xiàn)非常簡(jiǎn)單鸠澈,主要就是利用 informer 和 CommonResourceEventHandler 的能力進(jìn)行事件傳遞。
其他的如:
- ConfigMapManager
- SecretManager
- RuleManager
- RuleEndpointManager
都是如上的實(shí)現(xiàn)截驮,只要 PodManager 有異同缰泡。
Pod 事件管理器 --- PodManager
PodManager 與其他事件管理器的不同是因?yàn)樗鼘?duì)事件消息進(jìn)行了合并和緩存后定枷,然后再將消息傳遞下行控制器處理云稚。
它擁有兩個(gè)通道个初,一個(gè)通道用來(lái)接收來(lái)自 api server 事件消息,一個(gè)通道用來(lái)傳遞合并后的事件信息坡锡。
type PodManager struct {
// 接收來(lái)自 api server 事件消息
realEvents chan watch.Event
// 用來(lái)傳遞合并后的事件信息
mergedEvents chan watch.Event
// pods, key 是 UID, value 是 *v1.Pod
pods sync.Map
}
觀察下面代碼,我們可以發(fā)現(xiàn)妹笆,創(chuàng)建 manager 的時(shí)候块请,創(chuàng)建了兩個(gè)通道:
- 一個(gè)通道 realEvents 傳遞給了CommonResourceEventHandler 用來(lái)接收來(lái)自 api server 事件消息
- 一個(gè)通道 mergedEvents 返回給下行控制器用來(lái)傳遞合并后的事件信息。
func (pm *PodManager) Events() chan watch.Event {
return pm.mergedEvents
}
func NewPodManager(config *v1alpha1.EdgeController, si cache.SharedIndexInformer) (*PodManager, error) {
realEvents := make(chan watch.Event, config.Buffer.PodEvent)
mergedEvents := make(chan watch.Event, config.Buffer.PodEvent)
rh := NewCommonResourceEventHandler(realEvents)
si.AddEventHandler(rh)
pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}
go pm.merge()
return pm, nil
}
消息合并
消息合并主要合并的是修改信息拳缠,如果 pod.spec 和 metadata 沒(méi)有改變墩新,那么這個(gè)事件就會(huì)被忽略:
- 接收事件信息
- 新增:
- 將數(shù)據(jù)緩存起來(lái)。
- 判斷是否是刪除窟坐,是就將事件類型變?yōu)樾薷?/li>
- 通過(guò)通道傳遞數(shù)據(jù)出去
- 刪除:
- 刪除緩存
- 通過(guò)通道傳遞數(shù)據(jù)出去
- 修改 :
- 取出緩存數(shù)據(jù)
- 更新緩存數(shù)據(jù)
- 沒(méi)有緩存就通過(guò)通道傳遞數(shù)據(jù)出去
- 有就判斷數(shù)據(jù)是否修改海渊,修改了就傳遞,沒(méi)有就不傳遞哲鸳。
func (pm *PodManager) isPodUpdated(old *CachePod, new *v1.Pod) bool {
// 忽略一些不關(guān)心的字段
old.ObjectMeta.ResourceVersion = new.ObjectMeta.ResourceVersion
old.ObjectMeta.Generation = new.ObjectMeta.Generation
// 深度比較 pod.spec 和 metadata 有沒(méi)有改變
return !reflect.DeepEqual(old.ObjectMeta, new.ObjectMeta) || !reflect.DeepEqual(old.Spec, new.Spec)
}
func (pm *PodManager) merge() {
for re := range pm.realEvents {
pod := re.Object.(*v1.Pod)
switch re.Type {
case watch.Added:
pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
if pod.DeletionTimestamp == nil {
pm.mergedEvents <- re
} else {
re.Type = watch.Modified
pm.mergedEvents <- re
}
case watch.Deleted:
pm.pods.Delete(pod.UID)
pm.mergedEvents <- re
case watch.Modified:
value, ok := pm.pods.Load(pod.UID)
pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
if ok {
cachedPod := value.(*CachePod)
if pm.isPodUpdated(cachedPod, pod) {
pm.mergedEvents <- re
}
} else {
pm.mergedEvents <- re
}
default:
klog.Warningf("event type: %s unsupported", re.Type)
}
}
}
事件處理
事件從事件管理器傳遞出來(lái)后臣疑,它的具體處理是由下行控制器里面的各個(gè)函數(shù)去處理。
Pod
對(duì)于 Pod 事件的處理:
- 判斷是否是邊緣節(jié)點(diǎn)上的pod
- 是就使用消息層構(gòu)建自定義消息
- 對(duì)于新增和修改事件徙菠,更新本地緩存
- 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncPod() {
for {
select {
case <-beehiveContext.Done():
return
case e := <-dc.podManager.Events():
pod, ok := e.Object.(*v1.Pod)
if !ok {
continue
}
if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {
continue
}
resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)
if err != nil {
continue
}
msg := model.NewMessage("").
SetResourceVersion(pod.ResourceVersion).
FillBody(pod)
switch e.Type {
case watch.Added:
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
dc.lc.AddOrUpdatePod(*pod)
case watch.Deleted:
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
case watch.Modified:
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
dc.lc.AddOrUpdatePod(*pod)
default:
continue
}
dc.messageLayer.Send(*msg)
...
}
}
}
Nodes
對(duì)于 node 事件的處理:
- 向邊緣傳遞的消息只有刪除事件
- 對(duì)于新增事件會(huì)直接忽略
- 修改事件會(huì)修改本地緩存
之所以忽略新增事件讯沈,是因?yàn)樾略稣?qǐng)求是由邊緣傳遞到 apiserver 的,所以不需要再將新增事件傳遞回去婿奔。
func (dc *DownstreamController) syncEdgeNodes() {
for {
select {
case <-beehiveContext.Done():
return
case e := <-dc.nodeManager.Events():
node, ok := e.Object.(*v1.Node)
if !ok {
continue
}
switch e.Type {
case watch.Added:
fallthrough
case watch.Modified:
dc.lc.UpdateEdgeNode(node.ObjectMeta.Name)
case watch.Deleted:
dc.lc.DeleteNode(node.ObjectMeta.Name)
resource, err := messagelayer.BuildResource(node.Name, "namespace", constants.ResourceNode, node.Name)
if err != nil {
break
}
msg := model.NewMessage("").
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
err = dc.messageLayer.Send(*msg)
...
default:
}
}
}
}
configMap & Secret
對(duì)于 configMap 和 Secret缺狠, 其處理流程如下:
- 獲取 configMap/Secret 事件信息
- 根據(jù)事件類型修改自定義消息操作類型
- 從本地緩存獲取用到該資源的節(jié)點(diǎn)名稱
- 如果是刪除,刪除本地緩存信息
- 遍歷節(jié)點(diǎn)名稱列表構(gòu)建自定義消息
- 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncXXX() {
for {
select {
case <-beehiveContext.Done():
return
case e := <-dc.XXXManager.Events():
XXX, ok := e.Object.(*v1.XXX)
if !ok {
continue
}
var operation string
switch e.Type {
case watch.Added:
operation = model.InsertOperation
case watch.Modified:
operation = model.UpdateOperation
case watch.Deleted:
operation = model.DeleteOperation
default:
continue
}
nodes := dc.lc.XXXNodes(XXX.Namespace, XXX.Name)
if e.Type == watch.Deleted {
dc.lc.DeleteXXX(configMap.Namespace, XXX.Name)
}
for _, n := range nodes {
resource, err := messagelayer.BuildResource(n, XXX.Namespace, model.ResourceTypeXXX, configMap.Name)
if err != nil {
continue
}
msg := model.NewMessage("").
SetResourceVersion(XXX.ResourceVersion).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, operation).
FillBody(XXX)
err = dc.messageLayer.Send(*msg)
}
}
}
}
Rule & RuleEndpoint
對(duì)于 Rule & RuleEndpoint萍摊, 其處理流程如下:
- 獲取 Rule / RuleEndpoint 事件信息
- 根據(jù)操作類型構(gòu)建自定義消息
- 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncRuleEndpoint() {
for {
select {
case <-beehiveContext.Done():
return
case e := <-dc.ruleEndpointsManager.Events():
ruleEndpoint, ok := e.Object.(*routerv1.RuleEndpoint)
if !ok {
continue
}
resource, err := messagelayer.BuildResourceForRouter(model.ResourceTypeRuleEndpoint, ruleEndpoint.Name)
if err != nil {
continue
}
msg := model.NewMessage("").
SetResourceVersion(ruleEndpoint.ResourceVersion).
FillBody(ruleEndpoint)
switch e.Type {
case watch.Added:
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
case watch.Deleted:
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
case watch.Modified:
continue
default:
continue
}
dc.messageLayer.Send(*msg)
...
}
}
}