Python操作Kubernetes集群完全指南
目錄
- 基礎(chǔ)環(huán)境準備
- Python Kubernetes客戶端介紹
- 連接Kubernetes集群
- Pod操作實戰(zhàn)
- Deployment管理
- Service資源操作
- ConfigMap和Secret管理
- 自定義資源定義(CRD)操作
- 事件監(jiān)聽和Watch操作
- 高級應(yīng)用場景
基礎(chǔ)環(huán)境準備
1. 安裝必要的包
首先静秆,我們需要安裝Python的Kubernetes客戶端庫:
pip install kubernetes
pip install openshift # 可選,用于OpenShift集群
2. 配置文件準備
import os
from kubernetes import client, config
# 加載kubeconfig配置
config.load_kube_config()
Python Kubernetes客戶端介紹
1. 主要模塊說明
from kubernetes import client, config, watch
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException
主要模塊功能:
-
client
: 提供各種API操作接口 -
config
: 處理配置文件加載 -
watch
: 用于監(jiān)控資源變化 -
ApiClient
: 底層API客戶端 -
ApiException
: 異常處理
連接Kubernetes集群
示例1:基礎(chǔ)連接配置
from kubernetes import client, config
def connect_kubernetes():
try:
# 加載本地kubeconfig
config.load_kube_config()
# 創(chuàng)建API客戶端
v1 = client.CoreV1Api()
# 測試連接
ret = v1.list_pod_for_all_namespaces(limit=1)
print("連接成功呵燕!發(fā)現(xiàn) {} 個Pod".format(len(ret.items)))
return v1
except Exception as e:
print(f"連接失斎迨俊:{str(e)}")
return None
# 測試連接
api = connect_kubernetes()
示例2:多集群配置
def connect_multiple_clusters():
clusters = {
'prod': '/path/to/prod-kubeconfig',
'dev': '/path/to/dev-kubeconfig'
}
apis = {}
for cluster_name, config_file in clusters.items():
try:
config.load_kube_config(config_file=config_file)
apis[cluster_name] = client.CoreV1Api()
print(f"成功連接到{cluster_name}集群")
except Exception as e:
print(f"連接{cluster_name}集群失敗:{str(e)}")
return apis
Pod操作實戰(zhàn)
示例3:創(chuàng)建Pod
from kubernetes import client, config
def create_pod(name, image, namespace="default"):
# 創(chuàng)建Pod對象
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=name,
image=image,
ports=[client.V1ContainerPort(container_port=80)]
)
]
)
)
# 獲取API實例
v1 = client.CoreV1Api()
try:
# 創(chuàng)建Pod
api_response = v1.create_namespaced_pod(
namespace=namespace,
body=pod
)
print(f"Pod {name} 創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"Pod創(chuàng)建失斣下薄:{str(e)}")
return None
# 使用示例
create_pod("nginx-pod", "nginx:latest")
示例4:查詢Pod狀態(tài)
def get_pod_status(name, namespace="default"):
v1 = client.CoreV1Api()
try:
pod = v1.read_namespaced_pod(name=name, namespace=namespace)
return {
"name": pod.metadata.name,
"status": pod.status.phase,
"pod_ip": pod.status.pod_ip,
"host_ip": pod.status.host_ip,
"start_time": pod.status.start_time,
"conditions": [
{
"type": condition.type,
"status": condition.status
}
for condition in pod.status.conditions or []
]
}
except ApiException as e:
print(f"獲取Pod狀態(tài)失斊邸:{str(e)}")
return None
# 使用示例
status = get_pod_status("nginx-pod")
print(status)
Deployment管理
示例5:創(chuàng)建Deployment
def create_deployment(name, image, replicas=3, namespace="default"):
# 創(chuàng)建Deployment對象
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": name}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=name,
image=image,
ports=[client.V1ContainerPort(container_port=80)]
)
]
)
)
)
)
# 獲取API實例
apps_v1 = client.AppsV1Api()
try:
# 創(chuàng)建Deployment
api_response = apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment
)
print(f"Deployment {name} 創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"Deployment創(chuàng)建失敗:{str(e)}")
return None
# 使用示例
create_deployment("nginx-deployment", "nginx:latest")
示例6:更新Deployment
def update_deployment(name, new_image, namespace="default"):
apps_v1 = client.AppsV1Api()
try:
# 獲取現(xiàn)有deployment
deployment = apps_v1.read_namespaced_deployment(name, namespace)
# 更新鏡像
deployment.spec.template.spec.containers[0].image = new_image
# 應(yīng)用更新
api_response = apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"Deployment {name} 更新成功")
return api_response
except ApiException as e:
print(f"Deployment更新失斏蛱酢:{str(e)}")
return None
# 使用示例
update_deployment("nginx-deployment", "nginx:1.19")
Service資源操作
示例7:創(chuàng)建Service
def create_service(name, selector, port, target_port, namespace="default"):
# 創(chuàng)建Service對象
service = client.V1Service(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1ServiceSpec(
selector=selector,
ports=[client.V1ServicePort(
port=port,
target_port=target_port
)]
)
)
v1 = client.CoreV1Api()
try:
# 創(chuàng)建Service
api_response = v1.create_namespaced_service(
namespace=namespace,
body=service
)
print(f"Service {name} 創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"Service創(chuàng)建失斝璺蕖:{str(e)}")
return None
# 使用示例
create_service(
"nginx-service",
{"app": "nginx-deployment"},
80,
80
)
ConfigMap和Secret管理
示例8:創(chuàng)建ConfigMap
def create_configmap(name, data, namespace="default"):
# 創(chuàng)建ConfigMap對象
configmap = client.V1ConfigMap(
metadata=client.V1ObjectMeta(name=name),
data=data
)
v1 = client.CoreV1Api()
try:
# 創(chuàng)建ConfigMap
api_response = v1.create_namespaced_config_map(
namespace=namespace,
body=configmap
)
print(f"ConfigMap {name} 創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"ConfigMap創(chuàng)建失敗:{str(e)}")
return None
# 使用示例
config_data = {
"app.properties": """
app.name=myapp
app.env=production
"""
}
create_configmap("app-config", config_data)
示例9:創(chuàng)建Secret
import base64
def create_secret(name, data, namespace="default"):
# 編碼數(shù)據(jù)
encoded_data = {
k: base64.b64encode(v.encode()).decode()
for k, v in data.items()
}
# 創(chuàng)建Secret對象
secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=name),
type="Opaque",
data=encoded_data
)
v1 = client.CoreV1Api()
try:
# 創(chuàng)建Secret
api_response = v1.create_namespaced_secret(
namespace=namespace,
body=secret
)
print(f"Secret {name} 創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"Secret創(chuàng)建失斃酢:{str(e)}")
return None
# 使用示例
secret_data = {
"username": "admin",
"password": "secret123"
}
create_secret("app-secrets", secret_data)
自定義資源定義(CRD)操作
示例10:操作CRD資源
def create_custom_resource(group, version, plural, namespace, body):
# 獲取CustomObjectsApi
custom_api = client.CustomObjectsApi()
try:
# 創(chuàng)建自定義資源
api_response = custom_api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=body
)
print(f"自定義資源創(chuàng)建成功")
return api_response
except ApiException as e:
print(f"自定義資源創(chuàng)建失斘堇濉:{str(e)}")
return None
# 使用示例
custom_resource = {
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"metadata": {
"name": "my-crontab"
},
"spec": {
"cronSpec": "* * * * */5",
"image": "my-cron-image"
}
}
create_custom_resource(
group="stable.example.com",
version="v1",
plural="crontabs",
namespace="default",
body=custom_resource
)
事件監(jiān)聽和Watch操作
示例11:監(jiān)聽Pod事件
from kubernetes import watch
def watch_pods(namespace="default"):
v1 = client.CoreV1Api()
w = watch.Watch()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
pod = event['object']
event_type = event['type']
print(f"事件類型: {event_type}")
print(f"Pod名稱: {pod.metadata.name}")
print(f"Pod狀態(tài): {pod.status.phase}")
print("-------------------")
except ApiException as e:
print(f"監(jiān)聽失敗:{str(e)}")
except KeyboardInterrupt:
w.stop()
print("監(jiān)聽已停止")
# 使用示例
# watch_pods() # 此函數(shù)會持續(xù)運行直到被中斷
高級應(yīng)用場景
示例12:批量操作和錯誤處理
def batch_create_resources(resources):
results = {
'success': [],
'failed': []
}
for resource in resources:
try:
if resource['kind'] == 'Deployment':
apps_v1 = client.AppsV1Api()
response = apps_v1.create_namespaced_deployment(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'Deployment',
'name': resource['spec'].metadata.name
})
elif resource['kind'] == 'Service':
v1 = client.CoreV1Api()
response = v1.create_namespaced_service(
namespace=resource['namespace'],
body=resource['spec']
)
results['success'].append({
'kind': 'Service',
'name': resource['spec'].metadata.name
})
except ApiException as e:
results['failed'].append({
'kind': resource['kind'],
'name': resource['spec'].metadata.name,
'error': str(e)
})
return results
# 使用示例
resources = [
{
'kind': 'Deployment',
'namespace': 'default',
'spec': client.V1Deployment(
metadata=client.V1ObjectMeta(name="nginx-deployment"),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(
match_labels={"app": "nginx"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": "nginx"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="nginx",
image="nginx:latest"
)
]
)
)
)
)
}
]
### 示例13:資源清理和垃圾回收
```python
def cleanup_resources(namespace="default", label_selector=None):
"""
清理指定命名空間下的資源
"""
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
cleanup_results = {
'pods': [],
'deployments': [],
'services': [],
'errors': []
}
try:
# 刪除Pod
pods = v1.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector
)
for pod in pods.items:
try:
v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=namespace
)
cleanup_results['pods'].append(pod.metadata.name)
except ApiException as e:
cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}")
# 刪除Deployment
deployments = apps_v1.list_namespaced_deployment(
namespace=namespace,
label_selector=label_selector
)
for deployment in deployments.items:
try:
apps_v1.delete_namespaced_deployment(
name=deployment.metadata.name,
namespace=namespace
)
cleanup_results['deployments'].append(deployment.metadata.name)
except ApiException as e:
cleanup_results['errors'].append(f"Deployment {deployment.metadata.name}: {str(e)}")
# 刪除Service
services = v1.list_namespaced_service(
namespace=namespace,
label_selector=label_selector
)
for service in services.items:
try:
v1.delete_namespaced_service(
name=service.metadata.name,
namespace=namespace
)
cleanup_results['services'].append(service.metadata.name)
except ApiException as e:
cleanup_results['errors'].append(f"Service {service.metadata.name}: {str(e)}")
return cleanup_results
except ApiException as e:
print(f"清理資源時發(fā)生錯誤:{str(e)}")
return None
# 使用示例
cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx")
print("清理結(jié)果:", cleanup_result)
示例14:資源健康檢查和自動修復(fù)
import time
from typing import Dict, List
class ResourceHealthChecker:
def __init__(self, namespace: str = "default"):
self.namespace = namespace
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
def check_pod_health(self) -> Dict[str, List[str]]:
"""
檢查Pod的健康狀態(tài)
"""
unhealthy_pods = []
pending_pods = []
try:
pods = self.v1.list_namespaced_pod(namespace=self.namespace)
for pod in pods.items:
if pod.status.phase == 'Failed':
unhealthy_pods.append(pod.metadata.name)
elif pod.status.phase == 'Pending':
pending_pods.append(pod.metadata.name)
return {
'unhealthy': unhealthy_pods,
'pending': pending_pods
}
except ApiException as e:
print(f"檢查Pod健康狀態(tài)時發(fā)生錯誤:{str(e)}")
return None
def check_deployment_health(self) -> Dict[str, List[str]]:
"""
檢查Deployment的健康狀態(tài)
"""
unhealthy_deployments = []
try:
deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace)
for deployment in deployments.items:
if deployment.status.ready_replicas != deployment.status.replicas:
unhealthy_deployments.append(deployment.metadata.name)
return {
'unhealthy': unhealthy_deployments
}
except ApiException as e:
print(f"檢查Deployment健康狀態(tài)時發(fā)生錯誤:{str(e)}")
return None
def auto_repair(self):
"""
自動修復(fù)不健康的資源
"""
repair_actions = []
# 檢查并修復(fù)Pod
pod_health = self.check_pod_health()
if pod_health:
for unhealthy_pod in pod_health['unhealthy']:
try:
self.v1.delete_namespaced_pod(
name=unhealthy_pod,
namespace=self.namespace
)
repair_actions.append(f"刪除不健康的Pod: {unhealthy_pod}")
except ApiException as e:
repair_actions.append(f"修復(fù)Pod {unhealthy_pod} 失敗: {str(e)}")
# 檢查并修復(fù)Deployment
deployment_health = self.check_deployment_health()
if deployment_health:
for unhealthy_deployment in deployment_health['unhealthy']:
try:
# 重啟Deployment
patch = {
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt": datetime.now().isoformat()
}
}
}
}
}
self.apps_v1.patch_namespaced_deployment(
name=unhealthy_deployment,
namespace=self.namespace,
body=patch
)
repair_actions.append(f"重啟Deployment: {unhealthy_deployment}")
except ApiException as e:
repair_actions.append(f"修復(fù)Deployment {unhealthy_deployment} 失敗: {str(e)}")
return repair_actions
# 使用示例
health_checker = ResourceHealthChecker("default")
repair_results = health_checker.auto_repair()
print("修復(fù)操作:", repair_results)
示例15:自定義控制器實現(xiàn)
from kubernetes import watch
import threading
import queue
class CustomController:
def __init__(self, namespace="default"):
self.namespace = namespace
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.event_queue = queue.Queue()
self.running = False
def start(self):
"""
啟動控制器
"""
self.running = True
# 啟動事件處理線程
threading.Thread(target=self._process_events).start()
# 啟動資源監(jiān)控
threading.Thread(target=self._watch_pods).start()
threading.Thread(target=self._watch_deployments).start()
def stop(self):
"""
停止控制器
"""
self.running = False
def _watch_pods(self):
"""
監(jiān)控Pod變化
"""
w = watch.Watch()
while self.running:
try:
for event in w.stream(
self.v1.list_namespaced_pod,
namespace=self.namespace
):
if not self.running:
break
self.event_queue.put(('Pod', event))
except Exception as e:
print(f"Pod監(jiān)控異常:{str(e)}")
if self.running:
time.sleep(5) # 發(fā)生錯誤時等待后重試
def _watch_deployments(self):
"""
監(jiān)控Deployment變化
"""
w = watch.Watch()
while self.running:
try:
for event in w.stream(
self.apps_v1.list_namespaced_deployment,
namespace=self.namespace
):
if not self.running:
break
self.event_queue.put(('Deployment', event))
except Exception as e:
print(f"Deployment監(jiān)控異常:{str(e)}")
if self.running:
time.sleep(5)
def _process_events(self):
"""
處理事件隊列
"""
while self.running:
try:
resource_type, event = self.event_queue.get(timeout=1)
self._handle_event(resource_type, event)
except queue.Empty:
continue
except Exception as e:
print(f"事件處理異常:{str(e)}")
def _handle_event(self, resource_type, event):
"""
處理具體事件
"""
event_type = event['type']
obj = event['object']
print(f"收到{resource_type}事件:")
print(f" 類型: {event_type}")
print(f" 名稱: {obj.metadata.name}")
if resource_type == 'Pod':
self._handle_pod_event(event_type, obj)
elif resource_type == 'Deployment':
self._handle_deployment_event(event_type, obj)
def _handle_pod_event(self, event_type, pod):
"""
處理Pod事件
"""
if event_type == 'MODIFIED':
if pod.status.phase == 'Failed':
print(f"檢測到Pod {pod.metadata.name} 失敗月而,嘗試重啟")
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=self.namespace
)
except ApiException as e:
print(f"重啟Pod失斏谜狻:{str(e)}")
def _handle_deployment_event(self, event_type, deployment):
"""
處理Deployment事件
"""
if event_type == 'MODIFIED':
if deployment.status.ready_replicas != deployment.status.replicas:
print(f"檢測到Deployment {deployment.metadata.name} 副本不一致")
# 這里可以添加自定義的處理邏輯
# 使用示例
controller = CustomController("default")
controller.start()
# 運行一段時間后停止
# time.sleep(3600)
# controller.stop()
示例16:資源指標監(jiān)控
from kubernetes.client import CustomObjectsApi
import time
class MetricsCollector:
def __init__(self):
self.custom_api = CustomObjectsApi()
def get_node_metrics(self):
"""
獲取節(jié)點資源使用指標
"""
try:
metrics = self.custom_api.list_cluster_custom_object(
group="metrics.k8s.io",
version="v1beta1",
plural="nodes"
)
node_metrics = {}
for item in metrics['items']:
node_name = item['metadata']['name']
node_metrics[node_name] = {
'cpu': item['usage']['cpu'],
'memory': item['usage']['memory']
}
return node_metrics
except ApiException as e:
print(f"獲取節(jié)點指標失敗:{str(e)}")
return None
def get_pod_metrics(self, namespace="default"):
"""
獲取Pod資源使用指標
"""
try:
metrics = self.custom_api.list_namespaced_custom_object(
group="metrics.k8s.io",
version="v1beta1",
namespace=namespace,
plural="pods"
)
pod_metrics = {}
for item in metrics['items']:
pod_name = item['metadata']['name']
containers = {}
for container in item['containers']:
containers[container['name']] = {
'cpu': container['usage']['cpu'],
'memory': container['usage']['memory']
}
pod_metrics[pod_name] = containers
return pod_metrics
except ApiException as e:
print(f"獲取Pod指標失斁笆蟆:{str(e)}")
return None
def monitor_resources(self, interval=30):
"""
持續(xù)監(jiān)控資源使用情況
"""
while True:
print("\n=== 資源使用情況 ===")
# 獲取節(jié)點指標
node_metrics = self.get_node_metrics()
if node_metrics:
print("\n節(jié)點資源使用情況:")
for node_name, metrics in node_metrics.items():
print(f"\n節(jié)點: {node_name}")
print(f"CPU使用: {metrics['cpu']}")
print(f"內(nèi)存使用: {metrics['memory']}")
# 獲取Pod指標
pod_metrics = self.get_pod_metrics()
if pod_metrics:
print("\nPod資源使用情況:")
for pod_name, containers in pod_metrics.items():
print(f"\nPod: {pod_name}")
for container_name, metrics in containers.items():
print(f"容器: {container_name}")
print(f"CPU使用: {metrics['cpu']}")
print(f"內(nèi)存使用: {metrics['memory']}")
time.sleep(interval)
# 使用示例
collector = MetricsCollector()
# collector.monitor_resources() # 持續(xù)監(jiān)控
最佳實踐和注意事項
- 錯誤處理
- 始終使用try-except塊處理API調(diào)用
- 實現(xiàn)重試機制處理臨時性故障
- 記錄詳細的錯誤信息便于調(diào)試
- 性能優(yōu)化
- 使用批量操作代替單個操作
- 實現(xiàn)合適的緩存機制
- 避免頻繁的API調(diào)用
- 安全考慮
- 使用最小權(quán)限原則
- 保護敏感信息(如密鑰和證書)
- 實現(xiàn)適當?shù)恼J證和授權(quán)機制
- 可維護性
- 模塊化代碼結(jié)構(gòu)
- 完善的日志記錄
- 清晰的代碼注釋
總結(jié)
本文詳細介紹了如何使用Python操作Kubernetes集群仲翎,包括:
- 基礎(chǔ)環(huán)境配置
- 常見資源操作
- 高級應(yīng)用場景
- 自動化運維實踐
- 監(jiān)控和告警實現(xiàn)
通過這些示例和最佳實踐,可以構(gòu)建強大的Kubernetes自動化工具和運維系統(tǒng)铛漓。
本文使用 文章同步助手 同步