簡介
Kubernetes官方從2016年8月份開始问芬,將Kubernetes資源操作相關(guān)的核心源碼抽取出來标沪,獨立出來一個項目Client-go贮配,作為官方提供的Go client囊卜。Kubernetes的部分代碼也是基于這個client實現(xiàn)的,所以對這個client的質(zhì)量越妈、性能等方面還是非常有信心的季俩。
client-go是一個調(diào)用kubernetes集群資源對象API的客戶端,即通過client-go實現(xiàn)對kubernetes集群中資源對象(包括deployment叮称、service、ingress藐鹤、replicaSet瓤檐、pod、namespace娱节、node等)的增刪改查等操作挠蛉。大部分對kubernetes進(jìn)行前置API封裝的二次開發(fā)都通過client-go這個第三方包來實現(xiàn)
主要package
- kubernetes
訪問 Kubernetes API的一系列的clientset - discovery
通過Kubernetes API 進(jìn)行服務(wù)發(fā)現(xiàn) - dynamic
對任意Kubernetes對象執(zhí)行通用操作的動態(tài)client - plugin/pkg/client/auth
可選的身份驗證插件,用于從外部來源獲取憑據(jù)肄满。 - transport
啟動連接和鑒權(quán)auth - tools/cache
controllers控制器
關(guān)鍵組件
- Reflector
reflector可以通過ListAndWatch方法對指定的Kubernetes資源進(jìn)行監(jiān)聽谴古,資源可以是Kubernetes內(nèi)置的資源類型,也可以是CRD稠歉。當(dāng)一個reflector收到新資源創(chuàng)建的通知時掰担,會通知相應(yīng)的list API獲取對應(yīng)資源模型并將其寫入到一個Delta FIFO的工作隊列中 - ClientSet
提供了直接從Kubernetes API Server 中獲取Object的機制,當(dāng)然使用Lister從本地Cache中獲取Objects的方法優(yōu)先級會高于直接從API Server讀取怒炸,以最大程度上減輕API server的負(fù)載 - Indexer(線程安全)
indexer的作用是對目標(biāo)object生成相應(yīng)的存儲索引带饱,在內(nèi)部實現(xiàn)上,indexer可以支持多種索引方式阅羹,同時利用一個線程安全的本地存儲持久化對應(yīng)的object和索引鍵勺疼,默認(rèn)情況下indexer會使用/的組合形式作為目標(biāo)object的索引鍵 - Lister
從本地Index中獲取Objecrts的工作機制,可以在client-go中的tools/cache包中找到其具體實現(xiàn) - Informer
Client-go包中一個相對較為高端的設(shè)計在于Informer的設(shè)計捏鱼,我們知道我們可以直接通過Kubernetes API交互执庐,但是考慮一點就是交互的形式,Informer設(shè)計為List/Watch的方式导梆。Informer在初始化的時先通過List去從Kubernetes API中取出資源的全部object對象轨淌,并同時緩存迂烁,然后后面通過Watch的機制去監(jiān)控資源,這樣的話猿诸,通過Informer及其緩存婚被,我們就可以直接和Informer交互而不是每次都和Kubernetes API交互。
Informer另外一塊內(nèi)容在于提供了事件handler機制梳虽,并會觸發(fā)回調(diào)址芯,這樣上層應(yīng)用如Controller就可以基于回調(diào)處理具體業(yè)務(wù)邏輯。因為Informer通過List窜觉、Watch機制可以監(jiān)控到所有資源的所有事件谷炸,因此只要給Informer添加ResourceEventHandler 實例的回調(diào)函數(shù)實例取實現(xiàn)
這三個方法,就可以處理好資源的創(chuàng)建禀挫、更新和刪除操作OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{})
Kubernetes中都是各種controller的實現(xiàn)旬陡,各種controller都會用到Informer
Demo
集群內(nèi)
package main
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"time"
)
func main() {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
for {
// get pods in all the namespaces by omitting namespace
// Or specify namespace to get pods in particular namespace
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
// Examples for error handling:
// - Use helper functions e.g. errors.IsNotFound()
// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
_, err = clientset.CoreV1().Pods("default").Get(context.TODO(), "example-xxxxx", metav1.GetOptions{})
if errors.IsNotFound(err) {
fmt.Printf("Pod example-xxxxx not found in default namespace\n")
} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pod %v\n", statusError.ErrStatus.Message)
} else if err != nil {
panic(err.Error())
} else {
fmt.Printf("Found example-xxxxx pod in default namespace\n")
}
time.Sleep(10 * time.Second)
}
}
集群外
package main
import (
"context"
"flag"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"time"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
for {
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
// Examples for error handling:
// - Use helper functions like e.g. errors.IsNotFound()
// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
namespace := "default"
pod := "example-xxxxx"
_, err = clientset.CoreV1().Pods(namespace).Get(context.TODO(), pod, metav1.GetOptions{})
if errors.IsNotFound(err) {
fmt.Printf("Pod %s in namespace %s not found\n", pod, namespace)
} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pod %s in namespace %s: %v\n",
pod, namespace, statusError.ErrStatus.Message)
} else if err != nil {
panic(err.Error())
} else {
fmt.Printf("Found pod %s in namespace %s\n", pod, namespace)
}
time.Sleep(10 * time.Second)
}
}