Optimus論文源碼閱讀

代碼地址

https://github.com/pengyanghua/optimus

論文思路

《Optimus: An Efficient Dynamic Resource Scheduler for Deep Learning Clusters》閱讀筆記

目標(biāo)

通過動(dòng)態(tài)分配集群資源,減少所有任務(wù)訓(xùn)練總時(shí)間

結(jié)論

本文在Kubernetes上實(shí)現(xiàn)了Optimus蜜徽,并在一個(gè)有7個(gè)CPU服務(wù)器和6個(gè)GPU服務(wù)器的深度學(xué)習(xí)集群上進(jìn)行了實(shí)驗(yàn)背蟆,使用MXNet框架運(yùn)行了9個(gè)訓(xùn)練任務(wù)。結(jié)果表明狮杨,Optimus在作業(yè)完成時(shí)間和完成時(shí)間方面分別比典型的集群調(diào)度器高出139%和63%。

背景知識(shí)和假設(shè)

分布式訓(xùn)練 (MXNet [59], Tensor- Flow [23], PaddlePaddle [17], Angel [43], Petuum [67])

  • Ps(paramter server):參數(shù)服務(wù),接受參數(shù)甲葬,更新參數(shù)鸡挠,下發(fā)參數(shù)
  • W(worker):訓(xùn)練節(jié)點(diǎn)辉饱,計(jì)算梯度


    image.png

    ps和w數(shù)量決定了訓(xùn)練速度,ps和w的數(shù)量增多可以加速拣展,但是過多可能因?yàn)橥ㄐ砰_銷導(dǎo)致變慢彭沼,圖A是比例關(guān)系,圖B是數(shù)量關(guān)系


    image.png

收斂

  • 實(shí)驗(yàn)?zāi)P筒⒉皇撬卸际諗勘热鏒NN,好在生產(chǎn)模型是成熟的可以更好收斂的
    • 一個(gè)epoch訓(xùn)練周期是所有mini-batches都處理一次备埃。
    • 數(shù)十到數(shù)百個(gè)epoch姓惑,模型會(huì)收斂
    • 訓(xùn)練是一個(gè)迭代過程,將數(shù)據(jù)集劃分為chunks按脚,每個(gè)chunk進(jìn)一步劃分為mini-batches于毙。訓(xùn)練步驟處理一個(gè)mini-batches并更新梯度(參數(shù)值)。我們還可以在每個(gè)mini-batches結(jié)束時(shí)計(jì)算訓(xùn)練性能指標(biāo)辅搬。當(dāng)所有 mini-batch 都處理完畢后唯沮,就完成了一個(gè)epoch。通常一個(gè)模型會(huì)被訓(xùn)練很多個(gè) epoch(幾十到幾百個(gè))直到它收斂(性能指標(biāo)穩(wěn)定)伞辛。


      image.png

目前的調(diào)度系統(tǒng)具有以下特點(diǎn)

1.靜態(tài)的資源分配烂翰,除非手動(dòng)修改配置或者重新提交任務(wù),否則不能利用閑置資源
2.調(diào)度決策不考慮任務(wù)本身特性蚤氏,導(dǎo)致短時(shí)間任務(wù)因?yàn)闊o(wú)資源餓死
3甘耿、用FIFO模式,導(dǎo)致很多任務(wù)等待過長(zhǎng)

本文調(diào)度系統(tǒng)

用的模型
https://github.com/apache/incubator-mxnet/tree/master/example

步驟

  • 任務(wù)收斂模型【估算每個(gè)模型的訓(xùn)練時(shí)間】

    • 針對(duì)不同類型模型竿滨,對(duì)損失和迭代次數(shù)的關(guān)系進(jìn)行建模佳恬,使得可以通過當(dāng)前損失預(yù)測(cè)剩余迭代次數(shù)
    • 第一個(gè)模型用于估計(jì)作業(yè)需要完成的步驟/時(shí)期數(shù)。在給定步數(shù)k的情況下于游,SGD 以 O(1/k) 的速率收斂毁葱。因此我們可以使用以下模型來(lái)近似訓(xùn)練損失曲線:


      image.png

      image.png
  • 在每一步后獲得更多數(shù)據(jù)點(diǎn),模型擬合(預(yù)測(cè)誤差)得到改善贰剥,如下所示:


    image.png
  • 資源-速度模型【加速關(guān)鍵】

    • 訓(xùn)練速度由前向損失計(jì)算時(shí)間倾剿,反向梯度計(jì)算時(shí)間,通訊時(shí)間等因素決定(同步訓(xùn)練和異步訓(xùn)練公式會(huì)不一樣)
    • 異步訓(xùn)練,一個(gè)job中多個(gè)worker不同步前痘,每個(gè)work有訓(xùn)練進(jìn)度PS就更新凛捏;同步訓(xùn)練,PS搜集到所有work更新后才更新
    • ps和和w的數(shù)量可以決定上述因素
    • 可以通過ps和w數(shù)量來(lái)預(yù)測(cè)訓(xùn)練速度


      image.png

      image.png
  • 資源分配

    • 以上兩個(gè)模型結(jié)合可以決定系統(tǒng)中當(dāng)前的任務(wù)的每個(gè)任務(wù)的ps和w數(shù)量
    • 因?yàn)槭莕p問題芹缔,使用一個(gè)啟發(fā)式算法來(lái)確定ps和w數(shù)量
      • 為每個(gè)任務(wù)分配一個(gè)ps和一個(gè)w作為初始化
      • 計(jì)算每個(gè)任務(wù)增加一個(gè)ps或者1個(gè)w的坯癣,取一個(gè)最大收益的進(jìn)行分配,貪心
      • 重復(fù)上述步驟直到資源耗盡或者增加資源成為負(fù)收益
  • 任務(wù)放置

    • 決定好每個(gè)任務(wù)的ps和w后要確定ps和w怎么放在服務(wù)器上
    • ps和w放在相同服務(wù)器上可以減少通信時(shí)間
    • 定理:采用最少的服務(wù)器來(lái)放置這些 w 和 ps最欠,使得每個(gè)服務(wù)器內(nèi)部署相同數(shù)量的 w 和 ps示罗。


      image.png

源碼閱讀

論文步驟

1、任務(wù)收斂
針對(duì)不同類型模型芝硬,對(duì)損失和迭代次數(shù)的關(guān)系進(jìn)行建模蚜点,使得可以通過當(dāng)前損失預(yù)測(cè)剩余迭代次數(shù)
2、資源-速度
3吵取、資源分配
4禽额、任務(wù)放置

提出問題

1、如何計(jì)算時(shí)間
2皮官、資源如何分配
3、任務(wù)如何放置
4实辑、如何與K8S做交互

從目錄開始

image.png

estimator.py:估算速度和epoch
experimentor.py:代碼入口, 代碼使用多線程通信
job.py: 每個(gè)任務(wù)具體調(diào)度
jobrepo.py: 三個(gè)任務(wù)倉(cāng)庫(kù)

ResNet-50_ImageNet
VGG-16_ImageNet
ResNext-110_Cifar10

optimus_scheduler.py:主調(diào)度器
params.py: 參數(shù)值

代碼入口 experimentor.py

企業(yè)微信截圖_0282de44-4ef2-473f-a1b5-0d3eb6801b52_副本.png
  • 清除所有job
def clear():
    os.system('kubectl delete jobs --all')
  • 為每個(gè)任務(wù)分配單獨(dú)的線程
    Timer:時(shí)間驅(qū)動(dòng)
    Hub:消息轉(zhuǎn)發(fā)
    Generator:任務(wù)隨機(jī)生成和模擬提交任務(wù)捺氢,本代碼用了三個(gè)任務(wù)在jobrepo.py
    Progressor: 任務(wù)執(zhí)行器
    Statsor: 任務(wù)狀態(tài)收集
    UTIL_Scheduler 任務(wù)調(diào)度器,重點(diǎn)邏輯

主調(diào)度器optimus_scheduler.py

image.png

1剪撬、在init函數(shù)中摄乒,除了初始化參數(shù)還有一句

self.msg_handler = threading.Thread(target=self._msg_handle, args=())

重點(diǎn)為閱讀_msg_handle這個(gè)函數(shù)


企業(yè)微信截圖_b91c1b1b-fbbb-44ef-bb41-2b8df5edad8d_副本.png

while循環(huán)中每次收到一條消息,按照類型進(jìn)行處理

第一次收集收集數(shù)據(jù)點(diǎn)残黑,第一次估算速度馍佑,為所有未完成的job分配一個(gè)ps一個(gè)worker

       for job in self.uncompleted_jobs:
            cpu_req = job.worker_cpu + job.ps_cpu
            mem_req = job.worker_mem + job.ps_mem
            bw_req = job.worker_bw + job.ps_bw
            gpu_req = job.worker_gpu

            suff_resr = self.__check_cluster_resource_full(cpu_req, mem_req, bw_req, gpu_req)
            if suff_resr:
                job.num_worker = 1
                job.num_ps = 1
                self.cluster_used_cpu += cpu_req
                self.cluster_used_mem += mem_req
                self.cluster_used_bw += bw_req
                self.cluster_used_gpu += gpu_req
                # compute initial utility
                self.__update_util_queue(job, util_queue)
            else:
                continue

還有剩余的資源繼續(xù)增加,看task_type

            if suff_resr:
                # currently no mechanism to reduce resources
                if task_type == "ps":
                    job.num_ps += 1
                elif task_type == "worker":
                    job.num_worker += 1
                self.cluster_used_cpu += cpu_req
                self.cluster_used_mem += mem_req
                self.cluster_used_bw += bw_req
                self.cluster_used_gpu += gpu_req

                self.__update_util_queue(job, util_queue)
            else:
                # no enough resource
                break
        # how to handle not_ready_jobs

__update_util_queue

  • 計(jì)算剩余時(shí)間
        # compute utility
        # allocate 1 ps or 1 worker each time.
        # sometimes can allocate multiple ps or worker for optimization, to avoid stuck in local optimal.
        end_epoch = self.estimator.est_epoch(job)
        if end_epoch <= 0:
            # error when estimating epoch
            end_epoch = job.progress + 20

        rem_epoch = end_epoch - job.progress  # the rem_epoch is negative if estimated epoch return -1
        est_speed = self.estimator.est_speed(job, job.num_ps, job.num_worker)
        self.logger.debug("estimated speed: " + str(est_speed))
        if est_speed <= 0:
            self.not_ready_jobs.add(job)
            return
        rem_time = rem_epoch / est_speed
  • 當(dāng)ps+1計(jì)算時(shí)間梨水,同理worker
    # if add ps 1
        est_speed = self.estimator.est_speed(job, job.num_ps + 1, job.num_worker)
        if est_speed <= 0:
            self.not_ready_jobs.add(job)
            return
        ps_rem_time = rem_epoch / est_speed
        resource_reqs = (job.ps_cpu, job.ps_mem, job.ps_bw)
        shares = (1.0 * job.ps_cpu / self.cluster_num_cpu, 1.0 * job.ps_mem / self.cluster_num_mem,
                  1.0 * job.ps_bw / self.cluster_num_bw)
        dom_res = shares.index(max(shares))
        ps_util = (rem_time - ps_rem_time)/ resource_reqs[dom_res]

給出ps數(shù)和工人數(shù)拭荤,預(yù)測(cè)訓(xùn)練速度。如果字典中已經(jīng)存在疫诽,使用真正的一個(gè)

  def est_speed(self, job, num_ps, num_worker):
        """Give the number of ps and the number of worker, predict the training speed.
        Use the real one if already exists in the dict
        """
        if (num_ps, num_worker) in job.training_speeds:
            return job.training_speeds[(num_ps, num_worker)]
        else:
            # do training speed curve fitting here
            if 'async' in job.kv_store:
                if len(job.training_speeds) >= 4:
                    # do not need curve fitting each time, can be further optimized. future work
                    ps_list = []
                    worker_list = []
                    speed_list = []
                    for key, value in job.training_speeds.items():
                        (ps, worker) = key
                        ps_list.append(float(ps))
                        worker_list.append(float(worker))
                        speed_list.append(value)
                    params = self._async_speed_curve_fitting(np.array(ps_list), np.array(worker_list), np.array(speed_list))
                    if params is None:
                        self.logger.error(self.name+":: " + job.name + " " + str((num_ps, num_worker)) + " speed estimation error")
                        return -1
                    else:
                        [a, b, c, d] = params
                        est_speed = self.__async_speed_fit_func((num_ps, num_worker), a, b, c, d)
                        return est_speed
                else:
                    return -1

繼續(xù)回到調(diào)度器舅世,計(jì)算好這個(gè)時(shí)間周期所有未完成job需要多少資源后開始放置

   # placement
        ps_placements, worker_placements = self.__place(self.uncompleted_jobs)

對(duì)任務(wù)需求資源進(jìn)行排序

  # sort jobs based on num_ps and num_worker
        job_sort_queue = Queue.PriorityQueue()
        for job in jobs:
            job_sort_queue.put((job.num_ps + job.num_worker, job))

對(duì)節(jié)點(diǎn)進(jìn)行排序

  cpu_avail_queue = Queue.PriorityQueue()
        # sort nodes based on available cpus, since cpu is usually the bottleneck
        for i in range(len(params.NODE_LIST)):
            cpu_avail_queue.put((self.node_used_cpu_list[i], i))

從大到小把任務(wù)放入從小到大的節(jié)點(diǎn)

     for i in range(job.num_ps):
                    # place ps evenly
                    node = cand_place_nodes[i % len(cand_place_nodes)]
                    # check whether resource is enough to place this ps
                    suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
                    if suff_resr:
                        ps_nodes.append(node)
                        # minus temporary resources
                        self.__deduct_resr(job, "ps", 1, node)
                    else:
                        # since node is already sorted based on resources,
                        # if a larger node can not place the task, the following one can not too
                        fit_flag = False
                        # add the deducted resource back
                        for node in ps_nodes:
                            self.__add_back_resr(job, "ps", 1, node)
                        ps_already_deduct = True
                        break

最后一些放不進(jìn)去的節(jié)點(diǎn),和剩余的資源奇徒。拆開他們的ps和work單獨(dú)放置雏亚,同時(shí)計(jì)算會(huì)不會(huì)速度變慢

                  # have try all nodes, but still can not place, then check if we can place some tasks
                        # and place ps and worker alternatively
                        self.logger.debug("last placed job: " + job.name)
                        ps_nodes = []
                        worker_nodes = []
                        flag_place_ps = True
                        for i in range(job.num_ps + job.num_worker):
                            flag_no_resource = True
                            if flag_place_ps:
                                # place ps task
                                for node in range(len(params.NODE_LIST)):
                                    suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
                                    if suff_resr:
                                        ps_nodes.append(node)
                                        self.__deduct_resr(job, "ps", 1, node)
                                        flag_no_resource = False
                                        break
                            else:
                                # place worker task
                                for node in range(len(params.NODE_LIST)):
                                    suff_resr = self.__check_node_resource_full(node, job.worker_cpu, job.worker_mem,
                                                                                job.worker_bw, job.worker_gpu)
                                    if suff_resr:
                                        worker_nodes.append(node)
                                        self.__deduct_resr(job, "worker", 1, node)
                                        flag_no_resource = False
                                        break

任務(wù)資源和K8S的交互

創(chuàng)建任務(wù)

    def start(self):
        # start the job in k8s
        self.logger.info("starting job " + self.name + "...")

        # job working dir
        os.system('mkdir -p ' + self.dir)
        self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix)  # ps container mount
        self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix)  # worker container mount
        self.__set_batch_size()

        # create job yamls
        self._create()

        # prepare data
        self._read_data()

        # start pods in k8s
        subprocess.check_output("kubectl create -f " + self.yaml, shell=True)

如何獲得yaml文件

        # copy template file
        self.jinja = self.dir + self.name + '.jinja'
        os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)

        # replace variables in jinja file
        temp_file = self.jinja + '.temp'
        for key, value in variables.items():
            os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
            os.system('rm ' + self.jinja)
            os.system('mv ' + temp_file + ' ' + self.jinja)

        # generate yaml file
        self.yaml = self.dir + self.name + '.yaml'
        os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
image.png

其它相關(guān)交互

 cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
# get heapster cluster ip
# heapster               192.168.192.16    <none>        80/TCP              5d
cmd = "kubectl get services --namespace=kube-system | grep heapster |awk '{print $2}'"
# in case not delete all
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)

本論文使用了K8S自帶的均衡負(fù)載

The workers/parameter servers are placed in a load balancing way, according to the default behavior of Kubernetes.
用了其中兩個(gè)特性 FitPredicate和PriorityFunction

candidates = []
for pod in pods:
    for node in nodes:
        if pod.label-selector == node.label or pod.resources.limits + node.allocated_resources < node.resources.capacity:   // fit predicate
        candidates.append(node)
    for candidate in candidates:
        select one according to priorities (e.g., least used resources) // priority function

general steps:
一般步驟:(1)過濾節(jié)點(diǎn),(2)對(duì)過濾后的節(jié)點(diǎn)列表進(jìn)行優(yōu)先級(jí)排序(3)選擇最適合的節(jié)點(diǎn)
Available Predicates

  • Static predicates:
    PodFitsPorts,
    PodFitsResources,
    NoDiskConflict,
    MatchNodeSelector,
    HostName
  • Configurable predicates:
    ServiceAffinity,
    LabelsPresence

Available Priority Functions

參考例子ResNet-50_ImageNet

'''
ResNet-50_ImageNet
'''
def _set_resnet50_job(job):
    num_ps = params.DEFAULT_NUM_PS
    num_worker = params.DEFAULT_NUM_WORKER
    ps_cpu = 3
    ps_mem = 9
    ps_bw = 0
    worker_cpu = 2
    worker_mem = 8
    worker_gpu = 1
    worker_bw = 0

    job.set_ps_resources(num_ps, ps_cpu, ps_mem, ps_bw)
    job.set_worker_resources(num_worker, worker_cpu, worker_mem, worker_bw, worker_gpu)

    image = 'xxx'
    script = '/init.sh'

    # must end with /, save everything including training data, validation data,
    # training log and training model into this dir
    work_dir = '/mxnet/example/image-classification/data/'
    host_workdir_prefix = '/data/k8s-workdir/experiment/'
    job.set_container(image, script, work_dir, host_workdir_prefix)

    prog = 'python train_imagenet.py --network resnet --num-layers 50 --disp-batches 5 --num-epochs 100 --data-train /data/imagenet-train.rec'
    kv_store = 'dist_sync'
    prog += ' --kv-store ' + kv_store
    if worker_gpu > 0:
        prog += " --gpus" + " " + ",".join([str(i) for i in range(int(worker_gpu))])

    job.set_train(prog=prog, batch_size=32, kv_store=kv_store, scale_bs=True)
    hdfs_data = ['/k8s-mxnet/imagenet/imagenet-train.rec']
    data_dir = '/data/'
    host_data_dir = '/data/mxnet-data/imagenet/'
    job.set_data(hdfs_data=hdfs_data, data_dir=data_dir, host_data_dir=host_data_dir, data_mounted=True)
    job.set_mxnet(kv_store_big_array_bound=1000 * 1000, ps_verbose='')

源碼部署

JOB是如何跑到集群的

image.png
由調(diào)度器計(jì)算出每個(gè)job需要用多少ps和多少worker摩钙,然后設(shè)置這個(gè)參數(shù)

set_ps_resources
set_worker_resources

放置這些ps和work罢低,需要從環(huán)境變量中獲取這些資源

基于yarn的mxnet深度學(xué)習(xí),需要配置mxnet分布式集群環(huán)境(暫時(shí)還沒有配置這個(gè))
set_worker_placement
set_ps_placement

掛載到容器的主機(jī)上的目錄

_set_mount_dirs

 mount_dirs = []
        if type == 'ps':
            for i in xrange(self.num_ps):
                postfix = self.name + '-ps-' + str(i) + '/'
                mount_dir = host_workdir_prefix + postfix
                mount_dirs.append(mount_dir)
                cmd = 'ssh ' + self.ps_placement[i] + ' "sudo rm -rf ' + mount_dir + '; mkdir -p ' + mount_dir + '"'
                os.system(cmd)
設(shè)置容器

set_container

 # container description
        self.image = image
        self.script = script
        self.work_dir = work_dir
        self.host_workdir_prefix = host_workdir_prefix
        self.work_volume = work_volume
如果數(shù)據(jù)不在本地主機(jī)胖笛,從HDFS中獲取网持,數(shù)據(jù)集列表宜肉,包括訓(xùn)練數(shù)據(jù)和驗(yàn)證數(shù)據(jù)

可以下載到本地,放到相應(yīng)目錄翎碑,目前找到一個(gè)cifar10的數(shù)據(jù)集和訓(xùn)練集
'http://data.mxnet.io/data/cifar10/cifar10_val.rec'
'http://data.mxnet.io/data/cifar10/cifar10_train.rec'
set_data 設(shè)置數(shù)據(jù)
_read_data 讀取HDFS數(shù)據(jù)谬返,如果本地有數(shù)據(jù)不會(huì)執(zhí)行

        self.hdfs_data = hdfs_data
        self.data_dir = data_dir
        self.host_data_dir = host_data_dir
        self.data_mounted = data_mounted
        self.data_volume = data_volume
設(shè)置訓(xùn)練集

set_train
__set_batch_size
set_mxnet

創(chuàng)建任務(wù),利用jinja模版日杈,傳入?yún)?shù)遣铝,生成相應(yīng)的yaml文件

_create


        # copy template file
        self.jinja = self.dir + self.name + '.jinja'
        os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)

        # replace variables in jinja file
        temp_file = self.jinja + '.temp'
        for key, value in variables.items():
            os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
            os.system('rm ' + self.jinja)
            os.system('mv ' + temp_file + ' ' + self.jinja)

        # generate yaml file
        self.yaml = self.dir + self.name + '.yaml'
        os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)

嘗試運(yùn)行有成功生成一些yaml文件,里面ip參數(shù)不太對(duì)莉擒,其它參數(shù)也可能有問題酿炸,所以執(zhí)行不起來(lái)。


image.png
獲取訓(xùn)練狀態(tài)涨冀,主要是獲取正在進(jìn)行的任務(wù)和值損失列表

_read_progress_stats
get_training_progress_stats

 self._read_progress_stats()
        return (list(self.progress_list), list(self.val_loss_list))
獲取訓(xùn)練速度填硕, 打卡本地文件獲取

_read_training_speed
get_training_speed

     '''
            cmd = 'scp ' + node + ':' + local_file + ' ' + self.dir # the new txt will replace the old one, no need to delete
            os.system(cmd)
            try:
                with open(self.dir+speed_fn, 'r') as fh:
                    stb_speed = float(fh.readline().replace('\n', '').split(' ')[1])
                    self.speed_list[i] = float('%.3f'%(stb_speed))
            except Exception as e:
                print e
                continue
            '''
            cmd = "ssh " + node + " 'cat " + local_file + "'"
獲取k8s pods

__get_pods

        cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
        output = subprocess.check_output(cmd, shell=True)
獲取job的各項(xiàng)指標(biāo)

_read_metrics
get_metrics

   # cpu: milli core, mem: bytes, net: bytes/second
        metric_keys = ['cpu/usage_rate', 'memory/usage', 'network/tx_rate', 'network/rx_rate']
運(yùn)行開始函數(shù)

start

    def start(self):
        # start the job in k8s
        self.logger.info("starting job " + self.name + "...")

        # job working dir
        os.system('mkdir -p ' + self.dir)
        self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix)  # ps container mount
        self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix)  # worker container mount
        self.__set_batch_size()

        # create job yamls
        self._create()

        # prepare data
        self._read_data()

        # start pods in k8s
        subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
刪除任務(wù)和刪除所有任務(wù)
        subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)

安裝過程中遇到的問題

1、ssh nodelist配置到集群鹿鳖,ssh命令運(yùn)行失敗扁眯,堡壘機(jī)需要跳轉(zhuǎn)


image.png

2、getenv沒有配置完全翅帜,需要一個(gè)個(gè)配置姻檀,或者刪掉一些再做嘗試


image.png

image.png

image.png

3、還需要收集所有相關(guān)訓(xùn)練任務(wù)的數(shù)據(jù)集和訓(xùn)練集

結(jié)論

動(dòng)態(tài)部署可以利用論文這種方法涝滴,生成配置文件绣版,與集群和外界環(huán)境的交互可以直接在python命令加shell腳本來(lái)獲取和運(yùn)行。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末歼疮,一起剝皮案震驚了整個(gè)濱河市杂抽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌韩脏,老刑警劉巖缩麸,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異骤素,居然都是意外死亡匙睹,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門济竹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)痕檬,“玉大人,你說(shuō)我怎么就攤上這事送浊∶蚊眨” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)唁桩。 經(jīng)常有香客問我闭树,道長(zhǎng),這世上最難降的妖魔是什么荒澡? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任报辱,我火速辦了婚禮,結(jié)果婚禮上单山,老公的妹妹穿的比我還像新娘碍现。我一直安慰自己,他們只是感情好米奸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布昼接。 她就那樣靜靜地躺著,像睡著了一般悴晰。 火紅的嫁衣襯著肌膚如雪慢睡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天铡溪,我揣著相機(jī)與錄音漂辐,去河邊找鬼。 笑死佃却,一個(gè)胖子當(dāng)著我的面吹牛者吁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饲帅,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼瘤泪!你這毒婦竟也來(lái)了灶泵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤对途,失蹤者是張志新(化名)和其女友劉穎赦邻,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體实檀,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡惶洲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了膳犹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恬吕。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖须床,靈堂內(nèi)的尸體忽然破棺而出铐料,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布钠惩,位于F島的核電站柒凉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏篓跛。R本人自食惡果不足惜膝捞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望愧沟。 院中可真熱鬧蔬咬,春花似錦、人聲如沸央渣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)芽丹。三九已至北启,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拔第,已是汗流浹背咕村。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蚊俺,地道東北人懈涛。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像泳猬,于是被迫代替她去往敵國(guó)和親批钠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容