寫在前面
- 態(tài)度決定高度朽缴!讓優(yōu)秀成為一種習(xí)慣!
- 世界上沒有什么事兒是加一次班解決不了的,如果有樊卓,就加兩次!(- - -茂強(qiáng))
為什么是tensorflow on kubernetes杠河?
個(gè)人覺得最大的優(yōu)勢是:
- 租戶隔離 保證不同的用戶能夠互不干擾
- 資源包括GPU調(diào)度 能夠有效利用資源
- 擴(kuò)展能力 能夠很容易橫向擴(kuò)展
- 靈活 整個(gè)資源分配比較靈活 管理靈活
等等
kubernetes集群的搭建
本文采用的是kubeadm安裝方式茧彤,這個(gè)安裝方式直接自動化安裝etcd等依賴的組件
首先我們介紹一下什么是kubernetes,我們先來理解一下幾個(gè)概念(一下內(nèi)容均來自官方中文文檔锚烦,版權(quán)歸其所有)
-
基本概念
以下是kubernetes常見的命令砚著,可參考官方文檔進(jìn)一步學(xué)習(xí)
-
常見命里該
-
kubernetes架構(gòu)圖
- 集群安裝
首先準(zhǔn)備環(huán)境包,我里邊才考了一個(gè)技術(shù)博客里邊的包走敌,版權(quán)歸原創(chuàng)所有
https://pan.baidu.com/s/1FfO2saDPkXH7wO_5XTNqpw - 環(huán)境準(zhǔn)備
centos7
三臺機(jī)器:master,node1,node2 - 解壓資源包
tar -xjvf k8s_images.tar.bz2
- 安裝docker
yum install docker-ce-selinux-17.03.2.ce-1.el7.centos.noarch.rpm
yum install docker-ce-17.03.2.ce-1.el7.centos.x86_64.rpm
- 檢測安裝情況
yum list | grep docker
- 啟動docker
systemctl start docker && systemctl enable docker
- 檢測docker是否啟動成功
ifconfig
查看是否有docker網(wǎng)絡(luò)
- 修改docker鏡像源為阿里云
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["[https://u9ea3fz9.mirror.aliyuncs.com]
(https://u9ea3fz9.mirror.aliyuncs.com)"]
}
EOF
- 重新啟動docker
sudo systemctl daemon-reload
sudo systemctl restart docker
- 關(guān)閉防火墻
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
- 配置路由表參數(shù)
echo "
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
" >> /etc/sysctl.conf
sysctl -p
- 關(guān)閉交換設(shè)備
swapoff -a
- 導(dǎo)入鏡像
docker load <./docker_images/etcd-amd64_v3.1.10.tar
docker load <./docker_images/flannel:v0.9.1-amd64.tar
docker load <./docker_images/k8s-dns-dnsmasq-nannyamd64_v1.14.7.tar
docker load <./docker_images/k8s-dns-kube-dns-amd64_1.14.7.tar
docker load <./docker_images/k8s-dns-sidecar-amd64_1.14.7.tar
docker load <./docker_images/kube-apiserver-amd64_v1.9.0.tar
docker load <./docker_images/kube-controller-manager-amd64_v1.9.0.tar
docker load <./docker_images/kube-scheduler-amd64_v1.9.0.tar
docker load < ./docker_images/kube-proxy-amd64_v1.9.0.tar
docker load <./docker_images/pause-amd64_3.0.tar
docker load < ./kubernetes-dashboard_v1.8.1.tar
- 安裝安裝kubelet kubeadm kubectl
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh kubernetes-cni-0.6.0-0.x86_64.rpm kubelet-1.9.9-9.x86_64.rpm kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubeadm-1.9.0-0.x86_64.rpm
- 在master節(jié)點(diǎn)啟動kubelet
systemctl enable kubelet && sudo systemctl start kubelet
- check一下kubelet默認(rèn)的cgroup的driver和docker的是否不一樣,docker默認(rèn)的cgroupfs,kubelet默認(rèn)為systemd熊镣,兩個(gè)要保證一致
vim /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
修改其中的cgroupfs-driver的值systemd為cgroupfs
- 重啟kubelet
systemctl daemon-reload && systemctl restart kubelet
以上操作在所有的節(jié)點(diǎn)上都需要操作,等操作完成后再進(jìn)行下邊的內(nèi)容
- 初始化master
kubeadm init --kubernetes-version=v1.9.0 --pod-network-cidr=10.244.0.0/16
注意:這里一定要記下
kubeadm join --token 26c210.acef208514aaf37f 10.255.164.31:6443 --discovery-token-ca-cert-hash-sha256:87ee8b74e3b937f82b5174fa64bd140071cbf9087f41f5b4bec38c22332e6137
這個(gè)命令募书,后續(xù)如果你想在集群中增加該節(jié)點(diǎn)绪囱,橫向擴(kuò)展的時(shí)候這個(gè)是必須的命令(記錄到你的加密文件里邊)
同時(shí),注意輸出的目錄里邊還有個(gè)提示
這個(gè)是個(gè)授權(quán)的處理莹捡,你需要運(yùn)行以下鬼吵,否則kubecttl命令可能用不了
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
- 創(chuàng)建網(wǎng)絡(luò)
找到kube-flannel.yml文件
kubectl create -f kube-flannel.yml
然后運(yùn)行
kubectl get nodes
出現(xiàn)節(jié)點(diǎn)master狀態(tài)為ready表示成功
與此同時(shí)配置好環(huán)境變量:
echo "export KUBECONFIG=/etc/kubernetes/admin.conf" >> ~/.bash_profile
source ~/.bash_profile
- 加入其他節(jié)點(diǎn)
到此,該操作需要在node節(jié)點(diǎn)上進(jìn)行操作篮赢,有多少個(gè)node節(jié)點(diǎn)都需要操作
kubeadm join --token 99f58e.60c1ad95c0ac7dcd 10.255.164.31:6443 --discovery-token-ca-cert-hash sha256:7be50b18a3697bad6a0477db525a95e4db011f9f1f89384882b53eb85968eab5
請用你剛才初始化master的那個(gè)讓你保存的命令進(jìn)行齿椅,不要利用以上命令
加入完成以后查看是否已經(jīng)加入成功
kubectl get nodes
下面查看以下所有的命名空間
如果出現(xiàn)所有的節(jié)點(diǎn)的狀態(tài)都是ready表明已經(jīng)成功建立 kubernetes集群了
- kubernetes-dashborad的搭建
這里我們選用官方最新的yaml文件
找到如下圖部分,進(jìn)行修改為如下
其中的nodePort是鏈接到該dashborad的端口启泣,type: NodePort是一種端口暴露涣脚,表示暴漏給proxy層,外界可以通過該nodePort訪問到該服務(wù)
另外還要對其依賴的鏡像版本做個(gè)修改寥茫,因?yàn)樯瑁斑吋虞d到docker的kubernetes-dashborad的版本與改文件的版本有可能不一樣,所以要改動一下
創(chuàng)建dashborad
kubectl create -f kubernetes-dashboard.yaml
然后訪問https://master_ip:32666
修改上邊的master_ip為你物理機(jī)的域名或者ip
通過瀏覽器可以訪問到
這里我們采用令牌登陸模式
下邊我們一塊獲取令牌坠敷,執(zhí)行
kubectl get secret -n kube-system
以紅框內(nèi)的controller-token為準(zhǔn)妙同,執(zhí)行
kubectl describe secret/namespace-controller-token-bw9jn -n kube-system
記得不要直接執(zhí)行,修改上邊的namespace-controller-token-bw9jn膝迎,因?yàn)槊總€(gè)集群都不一樣
這時(shí)就會拿到該token
復(fù)制以上token到瀏覽器粥帚,就可以登陸了。
這樣限次,我們的dashborad已經(jīng)創(chuàng)建好了
到此,kubernetes集群已經(jīng)搭建完成芒涡,如果是正式環(huán)境的話,還請搭建HA機(jī)制的集群卖漫,這個(gè)在中文技術(shù)文檔中都有费尽,這里不再贅述,可參考中文官方文檔進(jìn)行正式環(huán)境HA搭建
下邊就讓我們一步步的來構(gòu)建tensorflow on kubernetes環(huán)境吧
tensorflow on kubernetes架構(gòu)圖
首先我們來了解以下整個(gè)平臺的架構(gòu)圖是什么樣子
我們來解釋一下羊始,這個(gè)架構(gòu)圖是分布式tensorflow的實(shí)戰(zhàn)圖旱幼,其中有兩個(gè)參數(shù)服務(wù),多個(gè)worker服務(wù)突委,還有個(gè)shuffle和抽樣的服務(wù)柏卤,shuffle就是對樣根據(jù)其標(biāo)簽進(jìn)行混排冬三,然后對外提供batch抽樣服務(wù)(可以是有放回和無放回,抽樣是一門科學(xué)缘缚,詳情可以參考抽樣技術(shù)一書)勾笆,每個(gè)batch的抽樣是由每個(gè)worker去觸發(fā),worker拿到抽樣的數(shù)據(jù)樣本ID后就去基于kubernetes構(gòu)建的分布式數(shù)據(jù)庫里邊提取該batchSize的樣本數(shù)據(jù)桥滨,進(jìn)行訓(xùn)練計(jì)算窝爪,由于分布式的tensorflow能夠保證異步梯度下降算法,所以每次訓(xùn)練batch數(shù)據(jù)的時(shí)候都會基于最新的參數(shù)迭代齐媒,然而蒲每,更新參數(shù)操作就是兩個(gè)參數(shù)服務(wù)做的,架構(gòu)中模型(參數(shù))的存儲在NFS中里初,這樣以來啃勉,參數(shù)服務(wù)與worker就可以共享參數(shù)了,最后說明一下双妨,我們訓(xùn)練的所有數(shù)據(jù)都是存儲在分布式數(shù)據(jù)庫中(數(shù)據(jù)庫的選型可以根據(jù)具體的場景而定)淮阐。為什么需要一個(gè)shuffle和抽樣的服務(wù),因?yàn)楫?dāng)數(shù)據(jù)量很大的時(shí)候刁品,我們?nèi)绻麑λ械臉颖緮?shù)據(jù)進(jìn)行shuffle和抽樣計(jì)算的話會浪費(fèi)很大的資源泣特,因此需要一個(gè)這樣的服務(wù)專門提取數(shù)據(jù)的(id,label)來進(jìn)行混排和抽樣挑随,這里如果(id, label)的數(shù)據(jù)量也很大的時(shí)候我們可以考慮基于spark 來分布式的進(jìn)行shuffle和抽樣状您,目前spark2.3已經(jīng)原生支持kubernetes調(diào)度
NFS服務(wù)搭建
- 什么是NFS(來自百度百科)
NFS(Network File System)即網(wǎng)絡(luò)文件系統(tǒng),是FreeBSD支持的文件系統(tǒng)中的一種兜挨,它允許網(wǎng)絡(luò)中的計(jì)算機(jī)之間通過TCP/IP網(wǎng)絡(luò)共享資源膏孟。在NFS的應(yīng)用中,本地NFS的客戶端應(yīng)用可以透明地讀寫位于遠(yuǎn)端NFS服務(wù)器上的文件拌汇,就像訪問本地文件一樣柒桑。 - NFS服務(wù)搭建
yum install nfs-utils rpcbind -y
mkdir -p data/nfs
vim /etc/exports
加入如下內(nèi)容
/data/nfs 192.168.86.0/24(rw,no_root_squash,no_all_squash,sync)
啟動
/bin/systemctl start rpcbind.service
/bin/systemctl start nfs.service
到此,NFS服務(wù)已經(jīng)搭建好了
tensorflow docker鏡像打包
首先我們準(zhǔn)備DockerFile
FROM ubuntu:16.04
MAINTAINER yahengsong yahengsong@foxmail.com
RUN apt-get update \
&& apt-get install -y wget \
&& apt-get install -y lrzsz \
&& apt-get install -y unzip \
&& apt-get install -y zip \
&& apt-get install -y vim \
&& apt-get install -y gcc \
&& apt-get install -y g++ \
&& apt-get install -y automake \
&& apt-get install -y autoconf \
&& apt-get install -y libtool \
&& apt-get install -y make \
&& apt-get install -y openssl \
&& apt-get install -y libssl-dev \
&& apt-get install -y ruby \
&& apt-get install -y zlib1g \
&& apt-get install zlib1g.dev \
&& apt-get install -y bzip2 \
&& apt-get install -y libncurses5-dev \
&& apt-get install -y sqlite sqlite3 \
&& apt-get install -y libgdbm-dev \
&& apt-get install -y libpcap-dev \
&& apt-get install -y xz-utils
RUN wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tar.xz \
&& tar -xvf Python-3.6.0.tar.xz \
&& cd Python-3.6.0 \
&& mkdir -p /usr/local/python3 \
&& ./configure --prefix=/usr/local/python3 \
&& make \
&& make install \
&& rm -rf Python-3.6.0* \
&& ln -s /usr/local/python3/bin/python3 /usr/bin/python3 \
&& ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
RUN pip install --upgrade pip \
&& pip --no-cache-dir install >https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-1.7.0-cp36-cp36m-linux_x86_64.whl
# TensorBoard
EXPOSE 6006
# IPython
EXPOSE 8888
WORKDIR /root
我們可以通過該DockerFile在阿里云上進(jìn)行鏡像打包
這樣我們就有了自己的環(huán)境了(注意噪舀,該版本沒有安裝jupyter)
詳情可以參考如何在阿里云上打包自己的鏡像
tenssorflow on kubernetes實(shí)戰(zhàn)
以下將是如何在kubernetes集群上部署tensorflow環(huán)境
首先部署單機(jī)CPU版本的
下面我們來看線tensorflow.yaml文件
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow
spec:
replicas: 1
template:
metadata:
labels:
k8s-app: tensorflow
spec:
containers:
- name: tensorflow
image: registry.cn-
hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 8888
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: jupyter-service
spec:
type: NodePort
ports:
- port: 80
targetPort: 8888
nodePort: 32001
name: tensorflow
selector:
k8s-app: tensorflow
這里我們依賴的是阿里云的tensorflow的docker鏡像registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3(該版本里邊有jupyter)魁淳,我們向kubernetes集群申請2個(gè)CPU和1G內(nèi)存,kubernetes集群給該Deployment最大的CPU限制是4核和2G內(nèi)存与倡,需要注意的是該環(huán)境暴露到外網(wǎng)的端口是32001
有了改文件界逛,接下來創(chuàng)建環(huán)境
kubectl create -f tensorflow.yaml
這時(shí)候就可以通過,查看該環(huán)境了
kubectl get pods
到此我們可以通過http://master_ip:32001/來訪問該環(huán)境的jupyter了
這時(shí)候我們可以通過如下獲取token
然后通過token就可以登陸了
然后你就可以愉快的編程了
分布式tensorflow on kubernetes
如之前所介紹的分布式深度學(xué)習(xí)架構(gòu)
我們首先創(chuàng)建一個(gè)參數(shù)服務(wù)tf-ps.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow-ps
spec:
replicas: 1
template:
metadata:
labels:
name: tensorflow-ps
role: ps
spec:
containers:
- name: ps
image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 2222
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
volumeMounts:
- mountPath: /datanfs
readOnly: false
name: nfs
volumes:
- name: nfs
nfs:
server: 你的nfs服務(wù)地址
path: "/data/nfs"
---
apiVersion: v1
kind: Service
metadata:
name: tensorflow-ps-service
labels:
name: tensorflow-ps
role: service
spec:
ports:
- port: 2222
targetPort: 2222
selector:
name: tensorflow-ps
執(zhí)行
kubectl create -f tf-ps.yaml
然后兩個(gè)參數(shù)節(jié)點(diǎn)就會被創(chuàng)建
下面我們創(chuàng)建2個(gè)worker節(jié)點(diǎn)tf-worker.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: tensorflow-worker
spec:
replicas: 3
template:
metadata:
labels:
name: tensorflow-worker
role: worker
spec:
containers:
- name: worker
image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
ports:
- containerPort: 2222
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 2
memory: 1Gi
volumeMounts:
- mountPath: /datanfs
readOnly: false
name: nfs
volumes:
- name: nfs
nfs:
server: 你的nfs服務(wù)地址
path: "/data/nfs"
---
apiVersion: v1
kind: Service
metadata:
name: tensorflow-wk-service
labels:
name: tensorflow-worker
spec:
ports:
- port: 2222
targetPort: 2222
selector:
name: tensorflow-worker
執(zhí)行
kubectl create -f tf-worker.yaml
這時(shí)候2個(gè)worker節(jié)點(diǎn)就會被創(chuàng)建
-
訓(xùn)練
查看每個(gè)pod的ip用于構(gòu)建集群訓(xùn)練代碼
然后進(jìn)去每個(gè)節(jié)點(diǎn)環(huán)境
kubectl exec -ti tensorflow-ps-77b8d7bc89-87qgp bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-94ntr bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-mzqhb bash
創(chuàng)建以下代碼(在此之前請先準(zhǔn)備好mnist數(shù)據(jù)集的csv格式并放到nfs服務(wù)的data/nfs目錄下)
from __future__ import print_function
import math
import tensorflow as tf
import collections
import sys,os, time
import numpy as np
# TensorFlow集群描述信息纺座,ps_hosts表示參數(shù)服務(wù)節(jié)點(diǎn)信息息拜,worker_hosts表示worker節(jié)點(diǎn)信息
tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
# TensorFlow Server模型描述信息,包括作業(yè)名稱,任務(wù)編號该溯,隱含層神經(jīng)元數(shù)量岛抄,MNIST數(shù)據(jù)目錄以及每次訓(xùn)練數(shù)據(jù)大斜鸬搿(默認(rèn)一個(gè)批次為100個(gè)圖片)
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100, "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/datanfs", "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
FLAGS = tf.app.flags.FLAGS
#圖片像素大小為28*28像素
IMAGE_PIXELS = 28
class DataSet(object):
def __init__(self,
images,
labels,
reshape=True):
"""Construct a DataSet.
one_hot arg is used only if fake_data is true. `dtype` can be either
`uint8` to leave the input as `[0, 255]`, or `float32` to rescale into
`[0, 1]`.
"""
self._num_examples = images.shape[0]
# Convert shape from [num examples, rows, columns, depth]
# to [num examples, rows*columns] (assuming depth == 1)
images = images.astype(np.float32)
images = np.multiply(images, 1.0 / 255.0)
self._images = images
self._labels = labels
self._epochs_completed = 0
self._index_in_epoch = 0
@property
def images(self):
return self._images
@property
def labels(self):
return self._labels
@property
def num_examples(self):
return self._num_examples
@property
def epochs_completed(self):
return self._epochs_completed
def next_batch(self, batch_size, fake_data=False, shuffle=True):
"""Return the next `batch_size` examples from this data set."""
start = self._index_in_epoch
# Shuffle for the first epoch
if self._epochs_completed == 0 and start == 0 and shuffle:
perm0 = np.arange(self._num_examples)
np.random.shuffle(perm0)
self._images = self.images[perm0]
self._labels = self.labels[perm0]
# Go to the next epoch
if start + batch_size > self._num_examples:
# Finished epoch
self._epochs_completed += 1
# Get the rest examples in this epoch
rest_num_examples = self._num_examples - start
images_rest_part = self._images[start:self._num_examples]
labels_rest_part = self._labels[start:self._num_examples]
# Shuffle the data
if shuffle:
perm = np.arange(self._num_examples)
np.random.shuffle(perm)
self._images = self.images[perm]
self._labels = self.labels[perm]
# Start next epoch
start = 0
self._index_in_epoch = batch_size - rest_num_examples
end = self._index_in_epoch
images_new_part = self._images[start:end]
labels_new_part = self._labels[start:end]
return np.concatenate((images_rest_part, images_new_part), axis=0) , \
np.concatenate((labels_rest_part, labels_new_part), axis=0)
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
return self._images[start:end], self._labels[start:end]
def dense_to_one_hot(labels_dense, num_classes):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
def read_data_sets(train_dir,
reshape=True,
validation_size=2000):
trainfile = os.path.join(train_dir, "mnist_train.csv")
testfile = os.path.join(train_dir, "mnist_test.csv")
train_images = np.array([], dtype=np.uint8)
train_labels = np.array([], dtype=np.uint8)
test_images = np.array([], dtype=np.uint8)
test_labels = np.array([], dtype=np.uint8)
count = 0
with open(trainfile) as f:
for line in f.readlines():
count+= 1
line = line.strip()
line = line.split(",")
line = [int(x) for x in line]
one_rray = np.array(line[1:], dtype=np.uint8)
train_images = np.hstack((train_images, one_rray))
train_labels = np.hstack((train_labels, np.array(line[0], dtype=np.uint8)))
if count % 10000 == 0:
print(str(count))
if count == 20000:
break
train_images = train_images.reshape(20000, 28*28)
train_labels = train_labels.reshape(20000, 1)
train_labels = dense_to_one_hot(train_labels, 10)
count = 0
with open(testfile) as f:
for line in f.readlines():
count += 1
line = line.strip()
line = line.split(",")
line = [int(x) for x in line]
one_rray = np.array(line[1:], dtype=np.uint8)
test_images = np.hstack((test_images, one_rray))
test_labels = np.hstack((test_labels, np.array(line[0], dtype=np.uint8)))
if count % 10000 == 0:
print(str(count))
test_images = test_images.reshape(10000, 28*28)
test_labels = test_labels.reshape(10000, 1)
test_labels = dense_to_one_hot(test_labels, 10)
if not 0 <= validation_size <= len(train_images):
raise ValueError(
'Validation size should be between 0 and {}. Received: {}.'
.format(len(train_images), validation_size))
validation_images = train_images[:validation_size]
validation_labels = train_labels[:validation_size]
train_images = train_images[validation_size:]
train_labels = train_labels[validation_size:]
train = DataSet(train_images, train_labels, reshape=reshape)
validation = DataSet(validation_images, validation_labels, reshape=reshape)
test = DataSet(test_images, test_labels, reshape=reshape)
Datasets = collections.namedtuple('Datasets', ['train', 'validation', 'test'])
return Datasets(train=train, validation=validation, test=test)
def main(_):
#從命令行參數(shù)中讀取TensorFlow集群描述信息
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# 創(chuàng)建TensorFlow集群描述對象
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# 為本地執(zhí)行Task狈茉,創(chuàng)建TensorFlow本地Server對象.
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
#如果是參數(shù)服務(wù),直接啟動即可
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
#分配操作到指定的worker上執(zhí)行掸掸,默認(rèn)為該節(jié)點(diǎn)上的cpu0
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
# 定義TensorFlow隱含層參數(shù)變量氯庆,為全連接神經(jīng)網(wǎng)絡(luò)隱含層
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# 定義TensorFlow softmax回歸層的參數(shù)變量
sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10], stddev=1.0 / math.sqrt(FLAGS.hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
#定義模型輸入數(shù)據(jù)變量(x為圖片像素?cái)?shù)據(jù),y_為手寫數(shù)字分類)
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
#定義隱含層及神經(jīng)元計(jì)算模型
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
#定義softmax回歸模型扰付,及損失方程
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
#定義全局步長堤撵,默認(rèn)值為0
global_step = tf.Variable(0, name="global_step", trainable=False)
#定義訓(xùn)練模型,采用Adagrad梯度下降法
train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
#定義模型精確度驗(yàn)證模型羽莺,統(tǒng)計(jì)模型精確度
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
#對模型定期做checkpoint实昨,通常用于模型回復(fù)
saver = tf.train.Saver()
#定義收集模型統(tǒng)計(jì)信息的操作
summary_op = tf.summary.merge_all()
#定義操作初始化所有模型變量
init_op = tf.initialize_all_variables()
#創(chuàng)建一個(gè)監(jiān)管程序,用于構(gòu)建模型檢查點(diǎn)以及計(jì)算模型統(tǒng)計(jì)信息盐固。
is_chief = (FLAGS.task_index == 0)
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)
sv = tf.train.Supervisor(
is_chief= is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=600)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps",
"/job:worker/task:%d" % FLAGS.task_index])
#讀入MNIST訓(xùn)練數(shù)據(jù)集
mnist = read_data_sets(FLAGS.data_dir)
#創(chuàng)建TensorFlow session對象荒给,用于執(zhí)行TensorFlow圖計(jì)算
with sv.managed_session(server.target, config=sess_config) as sess:
print("Worker %d: Session initialization complete." % FLAGS.task_index)
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
step = 0
while not sv.should_stop() and step < 10000:
# 讀入MNIST的訓(xùn)練數(shù)據(jù),默認(rèn)每批次為100個(gè)圖片
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
#執(zhí)行分布式TensorFlow模型訓(xùn)練
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
local_step = local_step + 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, FLAGS.task_index, local_step, step))
#每隔100步長刁卜,驗(yàn)證模型精度
if step % 100 == 0:
print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
# 停止TensorFlow Session
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
sv.stop()
if __name__ == "__main__":
tf.app.run()
然后在參數(shù)服務(wù)上執(zhí)行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0
則會有志电,其實(shí)是啟動了GRPC服務(wù)
在第一個(gè)worker節(jié)點(diǎn)上執(zhí)行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0
在第二個(gè)worker節(jié)點(diǎn)上執(zhí)行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1
這時(shí)候等數(shù)據(jù)加載完成就會有如下訓(xùn)練信息
兩個(gè)工作節(jié)點(diǎn)的迭代次數(shù)合起來就是我們設(shè)置的總的迭代次數(shù)
自后的模型都會存在nfs服務(wù)中,因?yàn)橹挥羞@樣參數(shù)節(jié)點(diǎn)和工作節(jié)點(diǎn)才能共享模型參數(shù)
- GPU
GPU的方案整體和以上差不多蛔趴,只是在原yaml文件中增加GPU支持
模版:
apiVersion: v1
kind: ReplicationController
metadata:
name: tensorflow-worker
spec:
replicas: 1
selector:
name: tensorflow-worker
template:
metadata:
labels:
name: tensorflow-worker
role: worker
spec:
containers:
- name: worker
image: gcr.io/tensorflow/tensorflow:latest-gpu
ports:
- containerPort: 2222
env:
- name: PS_KEY
valueFrom:
configMapKeyRef:
name: tensorflow-cluster-config
key: ps
- name: WORKER_KEY
valueFrom:
configMapKeyRef:
name: tensorflow-cluster-config
key: worker
securityContext:
privileged: true
resources:
requests:
alpha.kubernetes.io/nvidia-gpu: 1
limits:
alpha.kubernetes.io/nvidia-gpu: 1
volumeMounts:
- mountPath: /dev/nvidia0
name: nvidia0
- mountPath: /dev/nvidiactl
name: nvidiactl
- mountPath: /dev/nvidia-uvm
name: nvidia-uvm
- mountPath: /datanfs
name: tfstorage
- name: libcuda-so
mountPath: /usr/lib/x86_64-linux-gnu
- name: cuda
mountPath: /usr/local/cuda-8.0
volumes:
- name: nfs
persistentVolumeClaim:
claimName: nfs-pvc
- hostPath:
path: /dev/nvidia0
name: nvidia0
- hostPath:
path: /dev/nvidiactl
name: nvidiactl
- hostPath:
path: /dev/nvidia-uvm
name: nvidia-uvm
- name: libcuda-so
hostPath:
path: /usr/lib/x86_64-linux-gnu
- name: cuda
hostPath:
path: /usr/local/cuda-8.0
到此我們的實(shí)戰(zhàn)已經(jīng)初步結(jié)束挑辆,當(dāng)然不排除其中有很多細(xì)節(jié),有很多坑要踩孝情,這些細(xì)節(jié)和坑在這里都不一一再說了鱼蝉,因?yàn)樘嗔耍瑳]發(fā)寫箫荡。
如果你們在實(shí)戰(zhàn)過程中遇到什么問題魁亦,歡迎隨時(shí)跟我溝通,我們共同成長菲茬,共同學(xué)習(xí)吉挣。
QQ:458798698
微信號:songwindwind
或者直接在簡書上聯(lián)系我。
有興趣的可以關(guān)注本人github
https://github.com/songyaheng