CRD在AI部署中的應(yīng)用

概述

CRD作為MLOps部署中的必要技術(shù),需要重點(diǎn)學(xué)習(xí)和研究一下迎献。

CRD定義

沒有自定義CRD(類似數(shù)據(jù)庫(kù)表結(jié)構(gòu)),因此沒有生成自定義的clientSet,使用的dynamic-client:

# virtualmachines-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # name 必須匹配下面的spec字段:<plural>.<group>  
  name: virtualmachines.cloud.waibizi.com
spec:
   # name 必須匹配下面的spec字段:<plural>.<group>  
  group: cloud.waibizi.com
  # group 名用于 REST API 中的定義:/apis/<group>/<version>  
  versions:
  - name: v1 # 版本名稱穗椅,比如 v1、v2beta1 等等    
    served: true  # 版本名稱奶栖,比如 v1匹表、v2beta1 等等    
    storage: true  # 是否開啟通過 REST APIs 訪問 `/apis/<group>/<version>/...`    
    schema:  # 定義自定義對(duì)象的聲明規(guī)范 
      openAPIV3Schema:
        description: Define virtualMachine YAML Spec
        type: object
        properties:
          # 自定義CRD的字段類型
          spec:
            type: object
            properties:
              uuid:
                type: string
              name:
                type: string
              image:
                type: string
              memory:
                type: integer
              disk:
                type: integer
              status:
                type: string
  # 定義作用范圍:Namespaced(命名空間級(jí)別)或者 Cluster(整個(gè)集群)              
  scope: Namespaced
  names:
    # 定義作用范圍:Namespaced(命名空間級(jí)別)或者 Cluster(整個(gè)集群)
    kind: VirtualMachine
     # plural 名字用于 REST API 中的定義:/apis/<group>/<version>/<plural>    
    plural: virtualmachines
     # singular 名稱用于 CLI 操作或顯示的一個(gè)別名 
    singular: virtualmachines
# 這個(gè)地方就是平時(shí)使用kubectl get po 當(dāng)中這個(gè) po 是 pod的縮寫的定義,我們可以直接使用kubectl get vm查看
    shortNames:
    - vm

增加一條CR進(jìn)去(類似數(shù)據(jù)庫(kù)記錄):

# public-wx.yaml
apiVersion: "cloud.waibizi.com/v1"
kind: VirtualMachine
metadata:
  name: public-wx
spec:
  uuid: "2c4789b2-30f2-4d31-ab71-ca115ea8c199"
  name: "waibizi-wx-virtual-machine"
  image: "Centos-7.9"
  memory: 4096
  disk: 500
  status: "creating"

Operator實(shí)戰(zhàn)

導(dǎo)入包

mkdir dynamic-project && cd dynamic-project
go mod init dynamic-project
go get k8s.io/client-go@v0.22.1
go get k8s.io/apimachinery@v0.22.1

新建幾個(gè)文件夾與文件

tree
.
├── go.mod
├── go.sum
├── main.go
└── pkg
    └── apis
        ├── v1
        │   └── type.go
        └── vm_controller.go

3 directories, 5 files

main.go

package main

import (
    "dynamic-project/pkg/apis"
    "flag"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "k8s.io/klog"
    "path/filepath"
)

func main() {
    var err error
    var config *rest.Config
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可選) kubeconfig 文件的絕對(duì)路徑")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的絕對(duì)路徑")
    }
    flag.Parse()
    if config, err = rest.InClusterConfig(); err != nil {
        // 使用 KubeConfig 文件創(chuàng)建集群配置 Config 對(duì)象
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
            panic(err.Error())
        }
    }
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        return
    }
    dynamicSharedInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, "default", nil)
    // 初始化controller宣鄙,傳入client袍镀、informer, 
    controller := apis.NewController(dynamicClient, dynamicSharedInformerFactory)
    // 直接啟動(dòng)controller
    err = controller.Run()
    if err != nil {
        klog.Errorln("fail run controller")
        return
    }
}

type.go

package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime/schema"
)

// VMGVR 定義資源的GVR,以供dynamic client識(shí)別資源
var VMGVR = schema.GroupVersionResource{
    Group:    "cloud.waibizi.com",
    Version:  "v1",
    Resource: "virtualmachines",
}

//VirtualMachine 根據(jù) CRD 定義 的 結(jié)構(gòu)體
type VirtualMachine struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec              VMSpec `json:"spec"`
}

type VMSpec struct {
    UUID   string `json:"uuid"`
    Name   string `json:"name"`
    Image  string `json:"image"`
    Memory int    `json:"memory"`
    Disk   int    `json:"disk"`
    Status string `json:"status"`
}

vm_controller.go

package apis

import (
    "context"
    v1 "dynamic-project/pkg/apis/v1"
    "encoding/json"
    "fmt"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/types"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/dynamic/dynamiclister"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog"
    "time"
)

var stopCh chan struct{}

const controllerAgentName = "vm-dynamic-controller"

type Controller struct {
    dynamicClient dynamic.Interface
    workqueue     workqueue.RateLimitingInterface
    vmSynced      cache.InformerSynced
    vmInformer    cache.SharedIndexInformer
    vmLister      dynamiclister.Lister
    recorder      record.EventRecorder
}

func NewController(
    dynamicClient dynamic.Interface,
    factory dynamicinformer.DynamicSharedInformerFactory) *Controller {
    informer := factory.ForResource(v1.VMGVR).Informer()
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
    controller := &Controller{
        workqueue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualmachines"),
        vmSynced:      informer.HasSynced,
        vmInformer:    informer,
        vmLister:      dynamiclister.New(informer.GetIndexer(), v1.VMGVR),
        recorder:      recorder,
        dynamicClient: dynamicClient,
    }
    // 添加回調(diào)事件
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            controller.enqueueFoo(obj)
        },
    })

    return controller
}

func (c *Controller) enqueueFoo(obj interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

func (c *Controller) Run() error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()
    go c.vmInformer.Run(stopCh)
    // 啟動(dòng)worker,每個(gè)worker一個(gè)goroutine
    go wait.Until(c.runWorker, time.Second, stopCh)
    stopCh = make(chan struct{})
    // 等待cache同步
    klog.Info("Waiting for informer caches to sync")
    if ok := cache.WaitForCacheSync(stopCh, c.vmSynced); !ok {
        klog.Errorln("failed to wait for caches to sync")
        return nil
    }
    // 等待退出信號(hào)
    <-stopCh
    klog.Infoln("Shutting down workers")
    return nil
}

// worker就是一個(gè)循環(huán)不斷調(diào)用processNextWorkItem
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *Controller) processNextWorkItem() bool {
    // 從工作隊(duì)列獲取對(duì)象
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            // 處理失敗再次加入隊(duì)列
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        // 處理成功不入隊(duì)
        c.workqueue.Forget(obj)
        klog.Infoln("Successfully synced '%s'", key)
        return nil
    }(obj)
    if err != nil {
        utilruntime.HandleError(err)
        return true
    }
    return true
}

func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return err
    }
    unStruct, err := c.vmLister.Namespace(namespace).Get(name)
    newBytes, err := json.Marshal(unStruct)
    if err != nil {
        utilruntime.HandleError(err)
        return err
    }
    vm := &v1.VirtualMachine{}
    if err = json.Unmarshal(newBytes, vm); err != nil {
        utilruntime.HandleError(err)
        return err
    }
    if vm.Spec.Status == "creating" {
        err = c.Creating(vm)
        if err != nil {
            utilruntime.HandleError(err)
            return err
        }
    }
    return nil
}

func (c *Controller) Creating(vm *v1.VirtualMachine) error {
    // 假設(shè)虛擬機(jī)都可以創(chuàng)建成功冻晤,不成功的話就直接更新為fail啥的就行了苇羡,或者返回err重新加入隊(duì)列當(dāng)中,不斷地去進(jìn)行創(chuàng)建操作
    patchData := []byte(`{"spec": {"status": "complete"}}`)
    // patch更新CR
    _, err := c.dynamicClient.Resource(v1.VMGVR).Namespace("default").Patch(context.Background(),vm.Name,types.MergePatchType,patchData,metav1.PatchOptions{})
    if err != nil {
        klog.Errorln(err)
        return err
    }
    return nil
}


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鼻弧,一起剝皮案震驚了整個(gè)濱河市设江,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌攘轩,老刑警劉巖叉存,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異撑刺,居然都是意外死亡鹉胖,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門够傍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甫菠,“玉大人,你說我怎么就攤上這事冕屯〖庞眨” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵安聘,是天一觀的道長(zhǎng)痰洒。 經(jīng)常有香客問我,道長(zhǎng)浴韭,這世上最難降的妖魔是什么丘喻? 我笑而不...
    開封第一講書人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮念颈,結(jié)果婚禮上泉粉,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好嗡靡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開白布跺撼。 她就那樣靜靜地躺著,像睡著了一般讨彼。 火紅的嫁衣襯著肌膚如雪歉井。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評(píng)論 1 296
  • 那天哈误,我揣著相機(jī)與錄音哩至,去河邊找鬼。 笑死黑滴,一個(gè)胖子當(dāng)著我的面吹牛憨募,可吹牛的內(nèi)容都是我干的紧索。 我是一名探鬼主播袁辈,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼珠漂!你這毒婦竟也來了晚缩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤媳危,失蹤者是張志新(化名)和其女友劉穎荞彼,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體待笑,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸣皂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了暮蹂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片寞缝。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖仰泻,靈堂內(nèi)的尸體忽然破棺而出荆陆,到底是詐尸還是另有隱情,我是刑警寧澤集侯,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布被啼,位于F島的核電站,受9級(jí)特大地震影響棠枉,放射性物質(zhì)發(fā)生泄漏浓体。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一辈讶、第九天 我趴在偏房一處隱蔽的房頂上張望命浴。 院中可真熱鬧,春花似錦荞估、人聲如沸咳促。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)跪腹。三九已至褂删,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冲茸,已是汗流浹背屯阀。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留轴术,地道東北人难衰。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像逗栽,于是被迫代替她去往敵國(guó)和親盖袭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容