使用partialmethod統(tǒng)一接入第三方的API調(diào)用开伏,包括 kruise-rollout,Alibaba Cgroups
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2023-09-25 19:10:37
import datetime
import yaml
import time, arrow
from kubernetes import client, utils
from kubernetes.stream import stream
from functools import partialmethod
class MyK8sError(Exception):
pass
class MyK8s:
def __init__(
self,
host,
token,
verify_ssl=False,
):
"""
參考 https://github.com/kubernetes-client/python/blob/release-24.0/examples/remote_cluster.py
"""
aConfiguration = client.Configuration()
aConfiguration.host = host
aConfiguration.verify_ssl = verify_ssl
aConfiguration.api_key = {"authorization": "Bearer " + token}
aApiClient = client.ApiClient(aConfiguration)
self.api_client = aApiClient
self.corev1 = client.CoreV1Api(aApiClient)
self.appv1 = client.AppsV1Api(aApiClient)
self.netv1 = client.NetworkingV1Api(aApiClient)
self.co_api = client.CustomObjectsApi(aApiClient)
self.batchv1 = client.BatchV1Api(aApiClient)
# 接入第三方對(duì)象在此處接入
custom_objects = dict(
cgroup=("resources.alibabacloud.com", "v1alpha1", "cgroups"),
rollout=("rollouts.kruise.io", "v1alpha1", "rollouts"),
)
for object_name, object_attrs in custom_objects.items():
for op in ["get", "create", "patch", "delete", "list"]:
setattr(
MyK8s,
"{0}_{1}{2}".format(op, object_name,"s" if op == "list" else ""),
partialmethod(
getattr(MyK8s, "{}_custom_object".format(op)),
group=object_attrs[0],
api_version=object_attrs[1],
plural=object_attrs[2],
),
)
def _client_api(self, kind):
if kind in (
"controller_revision",
"daemon_set",
"deployment",
"replica_set",
"stateful_set",
):
return self.appv1
elif kind in ("pod", "service"):
return self.corev1
elif kind == "ingress":
return self.netv1
elif kind == "rollout":
return self.co_api
elif kind == "job":
return self.batchv1
else:
raise MyK8sError("不支持操作類型為{}的對(duì)象".format(kind))
def get_custom_object(self, group, api_version, plural, namespace, name, **kw):
func = getattr(self.co_api, "get_namespaced_custom_object")
return func(group, api_version, namespace, plural, name, **kw)
def delete_custom_object(self, group, api_version, plural, namespace, name, **kw):
func = getattr(self.co_api, "delete_namespaced_custom_object")
return func(group, api_version, namespace, plural, name, **kw)
def list_custom_object(
self, group, api_version, plural, namespace, name, null=False, **kw
):
selector = "metadata.name={}".format(name)
func = getattr(self.co_api, "list_namespaced_custom_object")
items = func(
group, api_version, namespace, plural, field_selector=selector, **kw
)["items"]
if not items:
if null:
return None
else:
raise MyK8sError("{0} namespace has not found {1} {2}".format(namespace, name, plural))
if len(items) > 1:
raise MyK8sError("{0} namespace has found multi {1} {2}".format(namespace, name, plural))
return items[0]
def patch_custom_object(
self, group, api_version, plural, namespace, name, json_data, **kw
):
func = getattr(self.co_api, "patch_namespaced_custom_object")
return func(group, api_version, namespace, plural, name, json_data, **kw)
def create_custom_object(
self, group, api_version, plural, namespace, json_data, **kw
):
func = getattr(self.co_api, "create_namespaced_custom_object")
return func(group, api_version, namespace, plural, json_data, **kw)
myk8s = MyK8s(
"https://xxxxxx",
"tokenxxxxxxxxxxx"
)
if __name__ == "__main__":
object_name = "deploy_name"
namespace = "basic"
object_type = "deployment"
container_name = "app_name"
cpu_limit = "1000m"
memory_limit = "1024Mi"
data = {
"apiVersion": "resources.alibabacloud.com/v1alpha1",
"kind": "Cgroups",
"metadata": {
"name": object_name,
"namespace": namespace,
},
"spec": {
object_type: {
"containers": [{"name": container_name}],
"name": object_name,
"namespace": namespace,
}
},
}
data["spec"][object_type]["containers"][0]["cpu"] = cpu_limit
data["spec"][object_type]["containers"][0]["memory"] = memory_limit
res = myk8s.create_cgroup(namespace=namespace, json_data=data)
print(res)
# res = myk8s.delete_cgroup(namespace=namespace, name="scm-dubbodemo-2022.1000.233")
# print(res)
try:
res = myk8s.get_cgroup(namespace=namespace, name=object_name)
except Exception as e:
if e.reason == "Not Found":
pass
raise
res["spec"][object_type]["containers"][0]["cpu"] = "1200m"
res["spec"][object_type]["containers"][0]["memory"] = "1500Mi"
res = myk8s.patch_cgroup(namespace=namespace, name=object_name, json_data= res)
print(res)
Ref: CustomObjectsApi