概述
CRD作為MLOps部署中的必要技術(shù),需要重點(diǎn)學(xué)習(xí)和研究一下迎献。
CRD定義
沒有自定義CRD(類似數(shù)據(jù)庫(kù)表結(jié)構(gòu)),因此沒有生成自定義的clientSet,使用的dynamic-client:
# virtualmachines-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
# name 必須匹配下面的spec字段:<plural>.<group>
name: virtualmachines.cloud.waibizi.com
spec:
# name 必須匹配下面的spec字段:<plural>.<group>
group: cloud.waibizi.com
# group 名用于 REST API 中的定義:/apis/<group>/<version>
versions:
- name: v1 # 版本名稱穗椅,比如 v1、v2beta1 等等
served: true # 版本名稱奶栖,比如 v1匹表、v2beta1 等等
storage: true # 是否開啟通過 REST APIs 訪問 `/apis/<group>/<version>/...`
schema: # 定義自定義對(duì)象的聲明規(guī)范
openAPIV3Schema:
description: Define virtualMachine YAML Spec
type: object
properties:
# 自定義CRD的字段類型
spec:
type: object
properties:
uuid:
type: string
name:
type: string
image:
type: string
memory:
type: integer
disk:
type: integer
status:
type: string
# 定義作用范圍:Namespaced(命名空間級(jí)別)或者 Cluster(整個(gè)集群)
scope: Namespaced
names:
# 定義作用范圍:Namespaced(命名空間級(jí)別)或者 Cluster(整個(gè)集群)
kind: VirtualMachine
# plural 名字用于 REST API 中的定義:/apis/<group>/<version>/<plural>
plural: virtualmachines
# singular 名稱用于 CLI 操作或顯示的一個(gè)別名
singular: virtualmachines
# 這個(gè)地方就是平時(shí)使用kubectl get po 當(dāng)中這個(gè) po 是 pod的縮寫的定義,我們可以直接使用kubectl get vm查看
shortNames:
- vm
增加一條CR進(jìn)去(類似數(shù)據(jù)庫(kù)記錄):
# public-wx.yaml
apiVersion: "cloud.waibizi.com/v1"
kind: VirtualMachine
metadata:
name: public-wx
spec:
uuid: "2c4789b2-30f2-4d31-ab71-ca115ea8c199"
name: "waibizi-wx-virtual-machine"
image: "Centos-7.9"
memory: 4096
disk: 500
status: "creating"
Operator實(shí)戰(zhàn)
導(dǎo)入包
mkdir dynamic-project && cd dynamic-project
go mod init dynamic-project
go get k8s.io/client-go@v0.22.1
go get k8s.io/apimachinery@v0.22.1
新建幾個(gè)文件夾與文件
tree
.
├── go.mod
├── go.sum
├── main.go
└── pkg
└── apis
├── v1
│ └── type.go
└── vm_controller.go
3 directories, 5 files
main.go
package main
import (
"dynamic-project/pkg/apis"
"flag"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog"
"path/filepath"
)
func main() {
var err error
var config *rest.Config
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可選) kubeconfig 文件的絕對(duì)路徑")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的絕對(duì)路徑")
}
flag.Parse()
if config, err = rest.InClusterConfig(); err != nil {
// 使用 KubeConfig 文件創(chuàng)建集群配置 Config 對(duì)象
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return
}
dynamicSharedInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, "default", nil)
// 初始化controller宣鄙,傳入client袍镀、informer,
controller := apis.NewController(dynamicClient, dynamicSharedInformerFactory)
// 直接啟動(dòng)controller
err = controller.Run()
if err != nil {
klog.Errorln("fail run controller")
return
}
}
type.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// VMGVR 定義資源的GVR,以供dynamic client識(shí)別資源
var VMGVR = schema.GroupVersionResource{
Group: "cloud.waibizi.com",
Version: "v1",
Resource: "virtualmachines",
}
//VirtualMachine 根據(jù) CRD 定義 的 結(jié)構(gòu)體
type VirtualMachine struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec VMSpec `json:"spec"`
}
type VMSpec struct {
UUID string `json:"uuid"`
Name string `json:"name"`
Image string `json:"image"`
Memory int `json:"memory"`
Disk int `json:"disk"`
Status string `json:"status"`
}
vm_controller.go
package apis
import (
"context"
v1 "dynamic-project/pkg/apis/v1"
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/dynamic/dynamiclister"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"time"
)
var stopCh chan struct{}
const controllerAgentName = "vm-dynamic-controller"
type Controller struct {
dynamicClient dynamic.Interface
workqueue workqueue.RateLimitingInterface
vmSynced cache.InformerSynced
vmInformer cache.SharedIndexInformer
vmLister dynamiclister.Lister
recorder record.EventRecorder
}
func NewController(
dynamicClient dynamic.Interface,
factory dynamicinformer.DynamicSharedInformerFactory) *Controller {
informer := factory.ForResource(v1.VMGVR).Informer()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualmachines"),
vmSynced: informer.HasSynced,
vmInformer: informer,
vmLister: dynamiclister.New(informer.GetIndexer(), v1.VMGVR),
recorder: recorder,
dynamicClient: dynamicClient,
}
// 添加回調(diào)事件
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controller.enqueueFoo(obj)
},
})
return controller
}
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) Run() error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
go c.vmInformer.Run(stopCh)
// 啟動(dòng)worker,每個(gè)worker一個(gè)goroutine
go wait.Until(c.runWorker, time.Second, stopCh)
stopCh = make(chan struct{})
// 等待cache同步
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.vmSynced); !ok {
klog.Errorln("failed to wait for caches to sync")
return nil
}
// 等待退出信號(hào)
<-stopCh
klog.Infoln("Shutting down workers")
return nil
}
// worker就是一個(gè)循環(huán)不斷調(diào)用processNextWorkItem
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 從工作隊(duì)列獲取對(duì)象
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
// 處理失敗再次加入隊(duì)列
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// 處理成功不入隊(duì)
c.workqueue.Forget(obj)
klog.Infoln("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return err
}
unStruct, err := c.vmLister.Namespace(namespace).Get(name)
newBytes, err := json.Marshal(unStruct)
if err != nil {
utilruntime.HandleError(err)
return err
}
vm := &v1.VirtualMachine{}
if err = json.Unmarshal(newBytes, vm); err != nil {
utilruntime.HandleError(err)
return err
}
if vm.Spec.Status == "creating" {
err = c.Creating(vm)
if err != nil {
utilruntime.HandleError(err)
return err
}
}
return nil
}
func (c *Controller) Creating(vm *v1.VirtualMachine) error {
// 假設(shè)虛擬機(jī)都可以創(chuàng)建成功冻晤,不成功的話就直接更新為fail啥的就行了苇羡,或者返回err重新加入隊(duì)列當(dāng)中,不斷地去進(jìn)行創(chuàng)建操作
patchData := []byte(`{"spec": {"status": "complete"}}`)
// patch更新CR
_, err := c.dynamicClient.Resource(v1.VMGVR).Namespace("default").Patch(context.Background(),vm.Name,types.MergePatchType,patchData,metav1.PatchOptions{})
if err != nil {
klog.Errorln(err)
return err
}
return nil
}