深入分析kubelet(7)—— 選取GPU掛載
深入淺出kubernetes之device-plugins主要分析device-plugin資源上報(bào)部分,本來著重分析下分配過程总滩。
device-plugin
kubelet過于復(fù)雜,所以通過device-plugin反推
interface
kubernetes\pkg\kubelet\apis\deviceplugin\v1beta1\api.pb.go
type DevicePluginServer interface {
GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error)
ListAndWatch(*Empty, DevicePlugin_ListAndWatchServer) error
Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}
最重要的是ListAndWatch()/Allocate()
席函,因?yàn)榱硗鈨蓚€(gè)方法直接返回結(jié)果冈涧,沒有任何邏輯
ListAndWatch
k8s-device-plugin\server.go
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
老朋友了督弓,list所有設(shè)備,并長連接http-steaming將變化發(fā)到客戶端蒂阱。
// E.g:
// struct Device {
// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
// State: "Healthy",
// }
type Device struct {
ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Health string `protobuf:"bytes,2,opt,name=health,proto3" json:"health,omitempty"`
}
目前設(shè)備信息只有設(shè)備號(hào)和健康狀態(tài)狂塘,沒辦法擴(kuò)展荞胡,所以也就不知道GPU拓?fù)?。=廊营,所以說目前也就支持GPU數(shù)量萝勤。
Allocate
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
devs := m.devs
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"NVIDIA_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","),
},
}
for _, id := range req.DevicesIDs {
if !deviceExists(devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
Allocate做了兩件事情纵刘,返回NVIDIA_VISIBLE_DEVICES
環(huán)境變量,以及檢查設(shè)備是否存在。
Note:
- 這里其實(shí)就已經(jīng)告訴了我們分配邏輯鞍历,即kubelet根據(jù)
limit
選擇掛載具體的GPU卡劣砍,然后將設(shè)備號(hào)發(fā)送給device-plugin,得到env香嗓; - 以后想在調(diào)度器里面根據(jù)GPU拓?fù)溥x擇GPU卡,是很難實(shí)現(xiàn)的沧烈,并且調(diào)度器本身邏輯只創(chuàng)建bind像云,賦值node name,要想再把設(shè)備號(hào)加進(jìn)去比較困難腋逆。
kubelet
從上面我們可以知道最重要的就是Allocate方法侈贷,所以我們首先去找kubelet中Allocate方法的調(diào)用俏蛮。
kubernetes\pkg\kubelet\cm\devicemanager\endpoint.go
type endpoint interface {
run()
stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
callback(resourceName string, devices []pluginapi.Device)
isStopped() bool
stopGracePeriodExpired() bool
}
其中最重要的就是run和allocate,分別會(huì)調(diào)用device-plugin的ListAndWatch()和Allocate()锨并。
run
func (e *endpointImpl) run() {
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
for {
response, err := stream.Recv()
devs := response.Devices
var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}
e.callback(e.resourceName, newDevs)
}
}
調(diào)用ListAndWatch第煮,再調(diào)用callback處理設(shè)備
kubernetes\pkg\kubelet\cm\devicemanager\manager.go
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
for _, dev := range devices {
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
m.writeCheckpoint()
}
這里就看到在kubelet.ContainerManager.deviceManager
中保存了設(shè)備ID包警,數(shù)據(jù)結(jié)構(gòu)是map[string]sets.String
allocate
kubernetes\pkg\kubelet\cm\devicemanager\endpoint.go
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
{DevicesIDs: devs},
},
})
}
這里就直接發(fā)了gRPC請(qǐng)求底靠,看下函數(shù)調(diào)用處是怎么選擇設(shè)備ID的暑中。
kubernetes\pkg\kubelet\cm\devicemanager\manager.go
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
podUID := string(pod.UID)
contName := container.Name
allocatedDevicesUpdated := false
for k, v := range container.Resources.Limits {
resource := string(k)
needed := int(v.Value())
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
startRPCTime := time.Now()
m.mutex.Lock()
e, ok := m.endpoints[resource]
m.mutex.Unlock()
devs := allocDevices.UnsortedList()
resp, err := e.allocate(devs)
// Update internal cached podDevices state.
m.mutex.Lock()
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
m.mutex.Unlock()
}
// Checkpoints device to container allocation information.
return m.writeCheckpoint()
}
- 通過devicesToAllocate方法獲得分配的設(shè)備ID
- 調(diào)用allocate方法鳄逾,獲取響應(yīng)env
- 更新devicemanager.podDevices數(shù)據(jù)
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
needed := required
devices = sets.NewString()
devicesInUse := m.allocatedDevices[resource]
available := m.healthyDevices[resource].Difference(devicesInUse)
allocated := available.UnsortedList()[:needed]
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
devices.Insert(device)
}
return devices, nil
}
分配資源邏輯
- 獲取容器已分配資源
- 從cache中獲取已使用的設(shè)備
- 比較全部設(shè)備與已用設(shè)備雕凹,得到可用設(shè)備
- 隨機(jī)從可用設(shè)備選出設(shè)備ID
- 更新已用設(shè)備cache
- 返回取得的設(shè)備ID
這里就一切真相大白了政冻,kubelet是隨機(jī)去GPU掛載的明场。
保存資源分配情況
kubernetes\pkg\kubelet\cm\devicemanager\pod_devices.go
func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
if _, podExists := pdev[podUID]; !podExists {
pdev[podUID] = make(containerDevices)
}
if _, contExists := pdev[podUID][contName]; !contExists {
pdev[podUID][contName] = make(resourceAllocateInfo)
}
pdev[podUID][contName][resource] = deviceAllocateInfo{
deviceIds: devices,
allocResp: resp,
}
}
這里就保存了每個(gè)Pod下每個(gè)contrainer的每種資源的使用情況李丰。
// Returns combined container runtime settings to consume the container's allocated devices.
func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {}
deviceRunContainerOptions
方法返回了創(chuàng)建容器所需的設(shè)備信息配置參數(shù)嫌套。
ps. 一般來說信息不會(huì)存兩份踱讨,所以資源分配情況應(yīng)該只存在于devicemanager中;只有在需要的時(shí)候莺治,返回對(duì)應(yīng)的配置文件就好帚稠。