代碼地址
https://github.com/pengyanghua/optimus
論文思路
《Optimus: An Efficient Dynamic Resource Scheduler for Deep Learning Clusters》閱讀筆記
目標(biāo)
通過動(dòng)態(tài)分配集群資源,減少所有任務(wù)訓(xùn)練總時(shí)間
結(jié)論
本文在Kubernetes上實(shí)現(xiàn)了Optimus蜜徽,并在一個(gè)有7個(gè)CPU服務(wù)器和6個(gè)GPU服務(wù)器的深度學(xué)習(xí)集群上進(jìn)行了實(shí)驗(yàn)背蟆,使用MXNet框架運(yùn)行了9個(gè)訓(xùn)練任務(wù)。結(jié)果表明狮杨,Optimus在作業(yè)完成時(shí)間和完成時(shí)間方面分別比典型的集群調(diào)度器高出139%和63%。
背景知識(shí)和假設(shè)
分布式訓(xùn)練 (MXNet [59], Tensor- Flow [23], PaddlePaddle [17], Angel [43], Petuum [67])
- Ps(paramter server):參數(shù)服務(wù),接受參數(shù)甲葬,更新參數(shù)鸡挠,下發(fā)參數(shù)
-
W(worker):訓(xùn)練節(jié)點(diǎn)辉饱,計(jì)算梯度
ps和w數(shù)量決定了訓(xùn)練速度,ps和w的數(shù)量增多可以加速拣展,但是過多可能因?yàn)橥ㄐ砰_銷導(dǎo)致變慢彭沼,圖A是比例關(guān)系,圖B是數(shù)量關(guān)系
收斂
- 實(shí)驗(yàn)?zāi)P筒⒉皇撬卸际諗勘热鏒NN,好在生產(chǎn)模型是成熟的可以更好收斂的
- 一個(gè)epoch訓(xùn)練周期是所有mini-batches都處理一次备埃。
- 數(shù)十到數(shù)百個(gè)epoch姓惑,模型會(huì)收斂
-
訓(xùn)練是一個(gè)迭代過程,將數(shù)據(jù)集劃分為chunks按脚,每個(gè)chunk進(jìn)一步劃分為mini-batches于毙。訓(xùn)練步驟處理一個(gè)mini-batches并更新梯度(參數(shù)值)。我們還可以在每個(gè)mini-batches結(jié)束時(shí)計(jì)算訓(xùn)練性能指標(biāo)辅搬。當(dāng)所有 mini-batch 都處理完畢后唯沮,就完成了一個(gè)epoch。通常一個(gè)模型會(huì)被訓(xùn)練很多個(gè) epoch(幾十到幾百個(gè))直到它收斂(性能指標(biāo)穩(wěn)定)伞辛。
目前的調(diào)度系統(tǒng)具有以下特點(diǎn)
1.靜態(tài)的資源分配烂翰,除非手動(dòng)修改配置或者重新提交任務(wù),否則不能利用閑置資源
2.調(diào)度決策不考慮任務(wù)本身特性蚤氏,導(dǎo)致短時(shí)間任務(wù)因?yàn)闊o(wú)資源餓死
3甘耿、用FIFO模式,導(dǎo)致很多任務(wù)等待過長(zhǎng)
本文調(diào)度系統(tǒng)
用的模型
https://github.com/apache/incubator-mxnet/tree/master/example
步驟
-
任務(wù)收斂模型【估算每個(gè)模型的訓(xùn)練時(shí)間】
- 針對(duì)不同類型模型竿滨,對(duì)損失和迭代次數(shù)的關(guān)系進(jìn)行建模佳恬,使得可以通過當(dāng)前損失預(yù)測(cè)剩余迭代次數(shù)
-
第一個(gè)模型用于估計(jì)作業(yè)需要完成的步驟/時(shí)期數(shù)。在給定步數(shù)k的情況下于游,SGD 以 O(1/k) 的速率收斂毁葱。因此我們可以使用以下模型來(lái)近似訓(xùn)練損失曲線:
-
在每一步后獲得更多數(shù)據(jù)點(diǎn),模型擬合(預(yù)測(cè)誤差)得到改善贰剥,如下所示:
-
資源-速度模型【加速關(guān)鍵】
- 訓(xùn)練速度由前向損失計(jì)算時(shí)間倾剿,反向梯度計(jì)算時(shí)間,通訊時(shí)間等因素決定(同步訓(xùn)練和異步訓(xùn)練公式會(huì)不一樣)
- 異步訓(xùn)練,一個(gè)job中多個(gè)worker不同步前痘,每個(gè)work有訓(xùn)練進(jìn)度PS就更新凛捏;同步訓(xùn)練,PS搜集到所有work更新后才更新
- ps和和w的數(shù)量可以決定上述因素
-
可以通過ps和w數(shù)量來(lái)預(yù)測(cè)訓(xùn)練速度
-
資源分配
- 以上兩個(gè)模型結(jié)合可以決定系統(tǒng)中當(dāng)前的任務(wù)的每個(gè)任務(wù)的ps和w數(shù)量
- 因?yàn)槭莕p問題芹缔,使用一個(gè)啟發(fā)式算法來(lái)確定ps和w數(shù)量
- 為每個(gè)任務(wù)分配一個(gè)ps和一個(gè)w作為初始化
- 計(jì)算每個(gè)任務(wù)增加一個(gè)ps或者1個(gè)w的坯癣,取一個(gè)最大收益的進(jìn)行分配,貪心
- 重復(fù)上述步驟直到資源耗盡或者增加資源成為負(fù)收益
-
任務(wù)放置
- 決定好每個(gè)任務(wù)的ps和w后要確定ps和w怎么放在服務(wù)器上
- ps和w放在相同服務(wù)器上可以減少通信時(shí)間
-
定理:采用最少的服務(wù)器來(lái)放置這些 w 和 ps最欠,使得每個(gè)服務(wù)器內(nèi)部署相同數(shù)量的 w 和 ps示罗。
源碼閱讀
論文步驟
1、任務(wù)收斂
針對(duì)不同類型模型芝硬,對(duì)損失和迭代次數(shù)的關(guān)系進(jìn)行建模蚜点,使得可以通過當(dāng)前損失預(yù)測(cè)剩余迭代次數(shù)
2、資源-速度
3吵取、資源分配
4禽额、任務(wù)放置
提出問題
1、如何計(jì)算時(shí)間
2皮官、資源如何分配
3、任務(wù)如何放置
4实辑、如何與K8S做交互
從目錄開始
estimator.py:估算速度和epoch
experimentor.py:代碼入口, 代碼使用多線程通信
job.py: 每個(gè)任務(wù)具體調(diào)度
jobrepo.py: 三個(gè)任務(wù)倉(cāng)庫(kù)
ResNet-50_ImageNet
VGG-16_ImageNet
ResNext-110_Cifar10
optimus_scheduler.py:主調(diào)度器
params.py: 參數(shù)值
代碼入口 experimentor.py
- 清除所有job
def clear():
os.system('kubectl delete jobs --all')
- 為每個(gè)任務(wù)分配單獨(dú)的線程
Timer:時(shí)間驅(qū)動(dòng)
Hub:消息轉(zhuǎn)發(fā)
Generator:任務(wù)隨機(jī)生成和模擬提交任務(wù)捺氢,本代碼用了三個(gè)任務(wù)在jobrepo.py
Progressor: 任務(wù)執(zhí)行器
Statsor: 任務(wù)狀態(tài)收集
UTIL_Scheduler 任務(wù)調(diào)度器,重點(diǎn)邏輯
主調(diào)度器optimus_scheduler.py
1剪撬、在init函數(shù)中摄乒,除了初始化參數(shù)還有一句
self.msg_handler = threading.Thread(target=self._msg_handle, args=())
重點(diǎn)為閱讀_msg_handle這個(gè)函數(shù)
while循環(huán)中每次收到一條消息,按照類型進(jìn)行處理
第一次收集收集數(shù)據(jù)點(diǎn)残黑,第一次估算速度馍佑,為所有未完成的job分配一個(gè)ps一個(gè)worker
for job in self.uncompleted_jobs:
cpu_req = job.worker_cpu + job.ps_cpu
mem_req = job.worker_mem + job.ps_mem
bw_req = job.worker_bw + job.ps_bw
gpu_req = job.worker_gpu
suff_resr = self.__check_cluster_resource_full(cpu_req, mem_req, bw_req, gpu_req)
if suff_resr:
job.num_worker = 1
job.num_ps = 1
self.cluster_used_cpu += cpu_req
self.cluster_used_mem += mem_req
self.cluster_used_bw += bw_req
self.cluster_used_gpu += gpu_req
# compute initial utility
self.__update_util_queue(job, util_queue)
else:
continue
還有剩余的資源繼續(xù)增加,看task_type
if suff_resr:
# currently no mechanism to reduce resources
if task_type == "ps":
job.num_ps += 1
elif task_type == "worker":
job.num_worker += 1
self.cluster_used_cpu += cpu_req
self.cluster_used_mem += mem_req
self.cluster_used_bw += bw_req
self.cluster_used_gpu += gpu_req
self.__update_util_queue(job, util_queue)
else:
# no enough resource
break
# how to handle not_ready_jobs
__update_util_queue
- 計(jì)算剩余時(shí)間
# compute utility
# allocate 1 ps or 1 worker each time.
# sometimes can allocate multiple ps or worker for optimization, to avoid stuck in local optimal.
end_epoch = self.estimator.est_epoch(job)
if end_epoch <= 0:
# error when estimating epoch
end_epoch = job.progress + 20
rem_epoch = end_epoch - job.progress # the rem_epoch is negative if estimated epoch return -1
est_speed = self.estimator.est_speed(job, job.num_ps, job.num_worker)
self.logger.debug("estimated speed: " + str(est_speed))
if est_speed <= 0:
self.not_ready_jobs.add(job)
return
rem_time = rem_epoch / est_speed
- 當(dāng)ps+1計(jì)算時(shí)間梨水,同理worker
# if add ps 1
est_speed = self.estimator.est_speed(job, job.num_ps + 1, job.num_worker)
if est_speed <= 0:
self.not_ready_jobs.add(job)
return
ps_rem_time = rem_epoch / est_speed
resource_reqs = (job.ps_cpu, job.ps_mem, job.ps_bw)
shares = (1.0 * job.ps_cpu / self.cluster_num_cpu, 1.0 * job.ps_mem / self.cluster_num_mem,
1.0 * job.ps_bw / self.cluster_num_bw)
dom_res = shares.index(max(shares))
ps_util = (rem_time - ps_rem_time)/ resource_reqs[dom_res]
給出ps數(shù)和工人數(shù)拭荤,預(yù)測(cè)訓(xùn)練速度。如果字典中已經(jīng)存在疫诽,使用真正的一個(gè)
def est_speed(self, job, num_ps, num_worker):
"""Give the number of ps and the number of worker, predict the training speed.
Use the real one if already exists in the dict
"""
if (num_ps, num_worker) in job.training_speeds:
return job.training_speeds[(num_ps, num_worker)]
else:
# do training speed curve fitting here
if 'async' in job.kv_store:
if len(job.training_speeds) >= 4:
# do not need curve fitting each time, can be further optimized. future work
ps_list = []
worker_list = []
speed_list = []
for key, value in job.training_speeds.items():
(ps, worker) = key
ps_list.append(float(ps))
worker_list.append(float(worker))
speed_list.append(value)
params = self._async_speed_curve_fitting(np.array(ps_list), np.array(worker_list), np.array(speed_list))
if params is None:
self.logger.error(self.name+":: " + job.name + " " + str((num_ps, num_worker)) + " speed estimation error")
return -1
else:
[a, b, c, d] = params
est_speed = self.__async_speed_fit_func((num_ps, num_worker), a, b, c, d)
return est_speed
else:
return -1
繼續(xù)回到調(diào)度器舅世,計(jì)算好這個(gè)時(shí)間周期所有未完成job需要多少資源后開始放置
# placement
ps_placements, worker_placements = self.__place(self.uncompleted_jobs)
對(duì)任務(wù)需求資源進(jìn)行排序
# sort jobs based on num_ps and num_worker
job_sort_queue = Queue.PriorityQueue()
for job in jobs:
job_sort_queue.put((job.num_ps + job.num_worker, job))
對(duì)節(jié)點(diǎn)進(jìn)行排序
cpu_avail_queue = Queue.PriorityQueue()
# sort nodes based on available cpus, since cpu is usually the bottleneck
for i in range(len(params.NODE_LIST)):
cpu_avail_queue.put((self.node_used_cpu_list[i], i))
從大到小把任務(wù)放入從小到大的節(jié)點(diǎn)
for i in range(job.num_ps):
# place ps evenly
node = cand_place_nodes[i % len(cand_place_nodes)]
# check whether resource is enough to place this ps
suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
if suff_resr:
ps_nodes.append(node)
# minus temporary resources
self.__deduct_resr(job, "ps", 1, node)
else:
# since node is already sorted based on resources,
# if a larger node can not place the task, the following one can not too
fit_flag = False
# add the deducted resource back
for node in ps_nodes:
self.__add_back_resr(job, "ps", 1, node)
ps_already_deduct = True
break
最后一些放不進(jìn)去的節(jié)點(diǎn),和剩余的資源奇徒。拆開他們的ps和work單獨(dú)放置雏亚,同時(shí)計(jì)算會(huì)不會(huì)速度變慢
# have try all nodes, but still can not place, then check if we can place some tasks
# and place ps and worker alternatively
self.logger.debug("last placed job: " + job.name)
ps_nodes = []
worker_nodes = []
flag_place_ps = True
for i in range(job.num_ps + job.num_worker):
flag_no_resource = True
if flag_place_ps:
# place ps task
for node in range(len(params.NODE_LIST)):
suff_resr = self.__check_node_resource_full(node, job.ps_cpu, job.ps_mem, job.ps_bw)
if suff_resr:
ps_nodes.append(node)
self.__deduct_resr(job, "ps", 1, node)
flag_no_resource = False
break
else:
# place worker task
for node in range(len(params.NODE_LIST)):
suff_resr = self.__check_node_resource_full(node, job.worker_cpu, job.worker_mem,
job.worker_bw, job.worker_gpu)
if suff_resr:
worker_nodes.append(node)
self.__deduct_resr(job, "worker", 1, node)
flag_no_resource = False
break
任務(wù)資源和K8S的交互
創(chuàng)建任務(wù)
def start(self):
# start the job in k8s
self.logger.info("starting job " + self.name + "...")
# job working dir
os.system('mkdir -p ' + self.dir)
self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix) # ps container mount
self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix) # worker container mount
self.__set_batch_size()
# create job yamls
self._create()
# prepare data
self._read_data()
# start pods in k8s
subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
如何獲得yaml文件
# copy template file
self.jinja = self.dir + self.name + '.jinja'
os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)
# replace variables in jinja file
temp_file = self.jinja + '.temp'
for key, value in variables.items():
os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
os.system('rm ' + self.jinja)
os.system('mv ' + temp_file + ' ' + self.jinja)
# generate yaml file
self.yaml = self.dir + self.name + '.yaml'
os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
其它相關(guān)交互
cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
# get heapster cluster ip
# heapster 192.168.192.16 <none> 80/TCP 5d
cmd = "kubectl get services --namespace=kube-system | grep heapster |awk '{print $2}'"
# in case not delete all
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)
本論文使用了K8S自帶的均衡負(fù)載
The workers/parameter servers are placed in a load balancing way, according to the default behavior of Kubernetes.
用了其中兩個(gè)特性 FitPredicate和PriorityFunction
candidates = []
for pod in pods:
for node in nodes:
if pod.label-selector == node.label or pod.resources.limits + node.allocated_resources < node.resources.capacity: // fit predicate
candidates.append(node)
for candidate in candidates:
select one according to priorities (e.g., least used resources) // priority function
general steps:
一般步驟:(1)過濾節(jié)點(diǎn),(2)對(duì)過濾后的節(jié)點(diǎn)列表進(jìn)行優(yōu)先級(jí)排序(3)選擇最適合的節(jié)點(diǎn)
Available Predicates
- Static predicates:
PodFitsPorts,
PodFitsResources,
NoDiskConflict,
MatchNodeSelector,
HostName - Configurable predicates:
ServiceAffinity,
LabelsPresence
Available Priority Functions
- Static priority functions:
LeastRequestedPriority,
BalancedResourceAllocation,
ServiceSpreadingPriority,
EqualPriority - Configurable priority functions:
ServiceAntiAffinity,
LabelPrerference
參考地址:https://kubernetes.io/blog/2017/03/advanced-scheduling-in-kubernetes/
參考例子ResNet-50_ImageNet
'''
ResNet-50_ImageNet
'''
def _set_resnet50_job(job):
num_ps = params.DEFAULT_NUM_PS
num_worker = params.DEFAULT_NUM_WORKER
ps_cpu = 3
ps_mem = 9
ps_bw = 0
worker_cpu = 2
worker_mem = 8
worker_gpu = 1
worker_bw = 0
job.set_ps_resources(num_ps, ps_cpu, ps_mem, ps_bw)
job.set_worker_resources(num_worker, worker_cpu, worker_mem, worker_bw, worker_gpu)
image = 'xxx'
script = '/init.sh'
# must end with /, save everything including training data, validation data,
# training log and training model into this dir
work_dir = '/mxnet/example/image-classification/data/'
host_workdir_prefix = '/data/k8s-workdir/experiment/'
job.set_container(image, script, work_dir, host_workdir_prefix)
prog = 'python train_imagenet.py --network resnet --num-layers 50 --disp-batches 5 --num-epochs 100 --data-train /data/imagenet-train.rec'
kv_store = 'dist_sync'
prog += ' --kv-store ' + kv_store
if worker_gpu > 0:
prog += " --gpus" + " " + ",".join([str(i) for i in range(int(worker_gpu))])
job.set_train(prog=prog, batch_size=32, kv_store=kv_store, scale_bs=True)
hdfs_data = ['/k8s-mxnet/imagenet/imagenet-train.rec']
data_dir = '/data/'
host_data_dir = '/data/mxnet-data/imagenet/'
job.set_data(hdfs_data=hdfs_data, data_dir=data_dir, host_data_dir=host_data_dir, data_mounted=True)
job.set_mxnet(kv_store_big_array_bound=1000 * 1000, ps_verbose='')
源碼部署
JOB是如何跑到集群的
由調(diào)度器計(jì)算出每個(gè)job需要用多少ps和多少worker摩钙,然后設(shè)置這個(gè)參數(shù)
set_ps_resources
set_worker_resources
放置這些ps和work罢低,需要從環(huán)境變量中獲取這些資源
基于yarn的mxnet深度學(xué)習(xí),需要配置mxnet分布式集群環(huán)境(暫時(shí)還沒有配置這個(gè))
set_worker_placement
set_ps_placement
掛載到容器的主機(jī)上的目錄
_set_mount_dirs
mount_dirs = []
if type == 'ps':
for i in xrange(self.num_ps):
postfix = self.name + '-ps-' + str(i) + '/'
mount_dir = host_workdir_prefix + postfix
mount_dirs.append(mount_dir)
cmd = 'ssh ' + self.ps_placement[i] + ' "sudo rm -rf ' + mount_dir + '; mkdir -p ' + mount_dir + '"'
os.system(cmd)
設(shè)置容器
set_container
# container description
self.image = image
self.script = script
self.work_dir = work_dir
self.host_workdir_prefix = host_workdir_prefix
self.work_volume = work_volume
如果數(shù)據(jù)不在本地主機(jī)胖笛,從HDFS中獲取网持,數(shù)據(jù)集列表宜肉,包括訓(xùn)練數(shù)據(jù)和驗(yàn)證數(shù)據(jù)
可以下載到本地,放到相應(yīng)目錄翎碑,目前找到一個(gè)cifar10的數(shù)據(jù)集和訓(xùn)練集
'http://data.mxnet.io/data/cifar10/cifar10_val.rec'
'http://data.mxnet.io/data/cifar10/cifar10_train.rec'
set_data 設(shè)置數(shù)據(jù)
_read_data 讀取HDFS數(shù)據(jù)谬返,如果本地有數(shù)據(jù)不會(huì)執(zhí)行
self.hdfs_data = hdfs_data
self.data_dir = data_dir
self.host_data_dir = host_data_dir
self.data_mounted = data_mounted
self.data_volume = data_volume
設(shè)置訓(xùn)練集
set_train
__set_batch_size
set_mxnet
創(chuàng)建任務(wù),利用jinja模版日杈,傳入?yún)?shù)遣铝,生成相應(yīng)的yaml文件
_create
# copy template file
self.jinja = self.dir + self.name + '.jinja'
os.system("cp ../templates/k8s-mxnet-template.jinja " + self.jinja)
# replace variables in jinja file
temp_file = self.jinja + '.temp'
for key, value in variables.items():
os.system('sed -e "s@\$' + key + '@' + value + '@g" "' + self.jinja + '"' + ' > ' + temp_file)
os.system('rm ' + self.jinja)
os.system('mv ' + temp_file + ' ' + self.jinja)
# generate yaml file
self.yaml = self.dir + self.name + '.yaml'
os.system("python ../templates/render-template.py " + self.jinja + " > " + self.yaml)
嘗試運(yùn)行有成功生成一些yaml文件,里面ip參數(shù)不太對(duì)莉擒,其它參數(shù)也可能有問題酿炸,所以執(zhí)行不起來(lái)。
獲取訓(xùn)練狀態(tài)涨冀,主要是獲取正在進(jìn)行的任務(wù)和值損失列表
_read_progress_stats
get_training_progress_stats
self._read_progress_stats()
return (list(self.progress_list), list(self.val_loss_list))
獲取訓(xùn)練速度填硕, 打卡本地文件獲取
_read_training_speed
get_training_speed
'''
cmd = 'scp ' + node + ':' + local_file + ' ' + self.dir # the new txt will replace the old one, no need to delete
os.system(cmd)
try:
with open(self.dir+speed_fn, 'r') as fh:
stb_speed = float(fh.readline().replace('\n', '').split(' ')[1])
self.speed_list[i] = float('%.3f'%(stb_speed))
except Exception as e:
print e
continue
'''
cmd = "ssh " + node + " 'cat " + local_file + "'"
獲取k8s pods
__get_pods
cmd = 'kubectl get pods --selector=' + 'name=' + self.name + ',' + 'job=' + task + ' --namespace=default' + ' |grep ' + task
output = subprocess.check_output(cmd, shell=True)
獲取job的各項(xiàng)指標(biāo)
_read_metrics
get_metrics
# cpu: milli core, mem: bytes, net: bytes/second
metric_keys = ['cpu/usage_rate', 'memory/usage', 'network/tx_rate', 'network/rx_rate']
運(yùn)行開始函數(shù)
start
def start(self):
# start the job in k8s
self.logger.info("starting job " + self.name + "...")
# job working dir
os.system('mkdir -p ' + self.dir)
self.ps_mount_dirs = self._set_mount_dirs('ps', self.host_workdir_prefix) # ps container mount
self.worker_mount_dirs = self._set_mount_dirs('worker', self.host_workdir_prefix) # worker container mount
self.__set_batch_size()
# create job yamls
self._create()
# prepare data
self._read_data()
# start pods in k8s
subprocess.check_output("kubectl create -f " + self.yaml, shell=True)
刪除任務(wù)和刪除所有任務(wù)
subprocess.check_output('kubectl delete jobs --selector=name=' + self.name, shell=True)
安裝過程中遇到的問題
1、ssh nodelist配置到集群鹿鳖,ssh命令運(yùn)行失敗扁眯,堡壘機(jī)需要跳轉(zhuǎn)
2、getenv沒有配置完全翅帜,需要一個(gè)個(gè)配置姻檀,或者刪掉一些再做嘗試
3、還需要收集所有相關(guān)訓(xùn)練任務(wù)的數(shù)據(jù)集和訓(xùn)練集
結(jié)論
動(dòng)態(tài)部署可以利用論文這種方法涝滴,生成配置文件绣版,與集群和外界環(huán)境的交互可以直接在python命令加shell腳本來(lái)獲取和運(yùn)行。