tensorflow on kubernetes實(shí)戰(zhàn) 分布式深度學(xué)習(xí)

寫在前面

  • 態(tài)度決定高度朽缴!讓優(yōu)秀成為一種習(xí)慣!
  • 世界上沒有什么事兒是加一次班解決不了的,如果有樊卓,就加兩次!(- - -茂強(qiáng))

為什么是tensorflow on kubernetes杠河?

個(gè)人覺得最大的優(yōu)勢是:

  • 租戶隔離 保證不同的用戶能夠互不干擾
  • 資源包括GPU調(diào)度 能夠有效利用資源
  • 擴(kuò)展能力 能夠很容易橫向擴(kuò)展
  • 靈活 整個(gè)資源分配比較靈活 管理靈活
    等等

kubernetes集群的搭建

本文采用的是kubeadm安裝方式茧彤,這個(gè)安裝方式直接自動化安裝etcd等依賴的組件
首先我們介紹一下什么是kubernetes,我們先來理解一下幾個(gè)概念(一下內(nèi)容均來自官方中文文檔锚烦,版權(quán)歸其所有)

  • 基本概念


    kuberntes特點(diǎn)

    kubernetes能做什么

    kubernetes組件

    kubernetes總體結(jié)構(gòu)

    master節(jié)點(diǎn)

    node節(jié)點(diǎn)

    什么是pod

    什么是標(biāo)簽

    什么是注解

    什么是RC

    什么是服務(wù)

    什么是目錄

    以下是kubernetes常見的命令砚著,可參考官方文檔進(jìn)一步學(xué)習(xí)

  • 常見命里該


    命令

    命令

    命令

    命令
  • kubernetes架構(gòu)圖


    架構(gòu)圖-版權(quán)歸原作者所有
  • 集群安裝
    首先準(zhǔn)備環(huán)境包,我里邊才考了一個(gè)技術(shù)博客里邊的包走敌,版權(quán)歸原創(chuàng)所有
    https://pan.baidu.com/s/1FfO2saDPkXH7wO_5XTNqpw
  • 環(huán)境準(zhǔn)備
    centos7
    三臺機(jī)器:master,node1,node2
  • 解壓資源包

tar -xjvf k8s_images.tar.bz2

  • 安裝docker

yum install docker-ce-selinux-17.03.2.ce-1.el7.centos.noarch.rpm
yum install docker-ce-17.03.2.ce-1.el7.centos.x86_64.rpm

  • 檢測安裝情況

yum list | grep docker

  • 啟動docker

systemctl start docker && systemctl enable docker

  • 檢測docker是否啟動成功

ifconfig
查看是否有docker網(wǎng)絡(luò)

  • 修改docker鏡像源為阿里云

sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["[https://u9ea3fz9.mirror.aliyuncs.com]
(https://u9ea3fz9.mirror.aliyuncs.com)"]
}
EOF

  • 重新啟動docker

sudo systemctl daemon-reload
sudo systemctl restart docker

  • 關(guān)閉防火墻

systemctl stop firewalld && systemctl disable firewalld
setenforce 0

  • 配置路由表參數(shù)

echo "
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
" >> /etc/sysctl.conf
sysctl -p

  • 關(guān)閉交換設(shè)備

swapoff -a

  • 導(dǎo)入鏡像

docker load <./docker_images/etcd-amd64_v3.1.10.tar
docker load <./docker_images/flannel:v0.9.1-amd64.tar
docker load <./docker_images/k8s-dns-dnsmasq-nannyamd64_v1.14.7.tar
docker load <./docker_images/k8s-dns-kube-dns-amd64_1.14.7.tar
docker load <./docker_images/k8s-dns-sidecar-amd64_1.14.7.tar
docker load <./docker_images/kube-apiserver-amd64_v1.9.0.tar
docker load <./docker_images/kube-controller-manager-amd64_v1.9.0.tar
docker load <./docker_images/kube-scheduler-amd64_v1.9.0.tar
docker load < ./docker_images/kube-proxy-amd64_v1.9.0.tar
docker load <./docker_images/pause-amd64_3.0.tar
docker load < ./kubernetes-dashboard_v1.8.1.tar

  • 安裝安裝kubelet kubeadm kubectl

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh kubernetes-cni-0.6.0-0.x86_64.rpm kubelet-1.9.9-9.x86_64.rpm kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubectl-1.9.0-0.x86_64.rpm
rpm -ivh kubeadm-1.9.0-0.x86_64.rpm

  • 在master節(jié)點(diǎn)啟動kubelet

systemctl enable kubelet && sudo systemctl start kubelet

  • check一下kubelet默認(rèn)的cgroup的driver和docker的是否不一樣,docker默認(rèn)的cgroupfs,kubelet默認(rèn)為systemd熊镣,兩個(gè)要保證一致

vim /etc/systemd/system/kubelet.service.d/10-kubeadm.conf

修改其中的cgroupfs-driver的值systemd為cgroupfs

  • 重啟kubelet

systemctl daemon-reload && systemctl restart kubelet

以上操作在所有的節(jié)點(diǎn)上都需要操作,等操作完成后再進(jìn)行下邊的內(nèi)容

  • 初始化master

kubeadm init --kubernetes-version=v1.9.0 --pod-network-cidr=10.244.0.0/16

初始化master節(jié)點(diǎn)

注意:這里一定要記下

kubeadm join --token 26c210.acef208514aaf37f 10.255.164.31:6443 --discovery-token-ca-cert-hash-sha256:87ee8b74e3b937f82b5174fa64bd140071cbf9087f41f5b4bec38c22332e6137

這個(gè)命令募书,后續(xù)如果你想在集群中增加該節(jié)點(diǎn)绪囱,橫向擴(kuò)展的時(shí)候這個(gè)是必須的命令(記錄到你的加密文件里邊)
同時(shí),注意輸出的目錄里邊還有個(gè)提示


提示

這個(gè)是個(gè)授權(quán)的處理莹捡,你需要運(yùn)行以下鬼吵,否則kubecttl命令可能用不了

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

  • 創(chuàng)建網(wǎng)絡(luò)
    找到kube-flannel.yml文件

kubectl create -f kube-flannel.yml

然后運(yùn)行

kubectl get nodes

出現(xiàn)節(jié)點(diǎn)master狀態(tài)為ready表示成功

與此同時(shí)配置好環(huán)境變量:

echo "export KUBECONFIG=/etc/kubernetes/admin.conf" >> ~/.bash_profile
source ~/.bash_profile

  • 加入其他節(jié)點(diǎn)
    到此,該操作需要在node節(jié)點(diǎn)上進(jìn)行操作篮赢,有多少個(gè)node節(jié)點(diǎn)都需要操作

kubeadm join --token 99f58e.60c1ad95c0ac7dcd 10.255.164.31:6443 --discovery-token-ca-cert-hash sha256:7be50b18a3697bad6a0477db525a95e4db011f9f1f89384882b53eb85968eab5

請用你剛才初始化master的那個(gè)讓你保存的命令進(jìn)行齿椅,不要利用以上命令
加入完成以后查看是否已經(jīng)加入成功

kubectl get nodes

查看所有加入的節(jié)點(diǎn)

下面查看以下所有的命名空間


獲取所有的命名空間

如果出現(xiàn)所有的節(jié)點(diǎn)的狀態(tài)都是ready表明已經(jīng)成功建立 kubernetes集群了

  • kubernetes-dashborad的搭建
    這里我們選用官方最新的yaml文件

wget https://raw.githubusercontent.com/kubernetes/dashboard/master/src/deploy/recommended/kubernetes-dashboard.yaml

找到如下圖部分,進(jìn)行修改為如下


dashborad

其中的nodePort是鏈接到該dashborad的端口启泣,type: NodePort是一種端口暴露涣脚,表示暴漏給proxy層,外界可以通過該nodePort訪問到該服務(wù)
另外還要對其依賴的鏡像版本做個(gè)修改寥茫,因?yàn)樯瑁斑吋虞d到docker的kubernetes-dashborad的版本與改文件的版本有可能不一樣,所以要改動一下


dashborad文件鏡像修改

創(chuàng)建dashborad

kubectl create -f kubernetes-dashboard.yaml

然后訪問https://master_ip:32666
修改上邊的master_ip為你物理機(jī)的域名或者ip
通過瀏覽器可以訪問到

kubernetes-dashborad

這里我們采用令牌登陸模式
下邊我們一塊獲取令牌坠敷,執(zhí)行

kubectl get secret -n kube-system

安全訪問token列表

以紅框內(nèi)的controller-token為準(zhǔn)妙同,執(zhí)行

kubectl describe secret/namespace-controller-token-bw9jn -n kube-system

記得不要直接執(zhí)行,修改上邊的namespace-controller-token-bw9jn膝迎,因?yàn)槊總€(gè)集群都不一樣
這時(shí)就會拿到該token


獲取token

復(fù)制以上token到瀏覽器粥帚,就可以登陸了。


dashborad內(nèi)容展示

這樣限次,我們的dashborad已經(jīng)創(chuàng)建好了
到此,kubernetes集群已經(jīng)搭建完成芒涡,如果是正式環(huán)境的話,還請搭建HA機(jī)制的集群卖漫,這個(gè)在中文技術(shù)文檔中都有费尽,這里不再贅述,可參考中文官方文檔進(jìn)行正式環(huán)境HA搭建
下邊就讓我們一步步的來構(gòu)建tensorflow on kubernetes環(huán)境吧

tensorflow on kubernetes架構(gòu)圖

首先我們來了解以下整個(gè)平臺的架構(gòu)圖是什么樣子


tensorflow on kubernetes架構(gòu)圖

我們來解釋一下羊始,這個(gè)架構(gòu)圖是分布式tensorflow的實(shí)戰(zhàn)圖旱幼,其中有兩個(gè)參數(shù)服務(wù),多個(gè)worker服務(wù)突委,還有個(gè)shuffle和抽樣的服務(wù)柏卤,shuffle就是對樣根據(jù)其標(biāo)簽進(jìn)行混排冬三,然后對外提供batch抽樣服務(wù)(可以是有放回和無放回,抽樣是一門科學(xué)缘缚,詳情可以參考抽樣技術(shù)一書)勾笆,每個(gè)batch的抽樣是由每個(gè)worker去觸發(fā),worker拿到抽樣的數(shù)據(jù)樣本ID后就去基于kubernetes構(gòu)建的分布式數(shù)據(jù)庫里邊提取該batchSize的樣本數(shù)據(jù)桥滨,進(jìn)行訓(xùn)練計(jì)算窝爪,由于分布式的tensorflow能夠保證異步梯度下降算法,所以每次訓(xùn)練batch數(shù)據(jù)的時(shí)候都會基于最新的參數(shù)迭代齐媒,然而蒲每,更新參數(shù)操作就是兩個(gè)參數(shù)服務(wù)做的,架構(gòu)中模型(參數(shù))的存儲在NFS中里初,這樣以來啃勉,參數(shù)服務(wù)與worker就可以共享參數(shù)了,最后說明一下双妨,我們訓(xùn)練的所有數(shù)據(jù)都是存儲在分布式數(shù)據(jù)庫中(數(shù)據(jù)庫的選型可以根據(jù)具體的場景而定)淮阐。為什么需要一個(gè)shuffle和抽樣的服務(wù),因?yàn)楫?dāng)數(shù)據(jù)量很大的時(shí)候刁品,我們?nèi)绻麑λ械臉颖緮?shù)據(jù)進(jìn)行shuffle和抽樣計(jì)算的話會浪費(fèi)很大的資源泣特,因此需要一個(gè)這樣的服務(wù)專門提取數(shù)據(jù)的(id,label)來進(jìn)行混排和抽樣挑随,這里如果(id, label)的數(shù)據(jù)量也很大的時(shí)候我們可以考慮基于spark 來分布式的進(jìn)行shuffle和抽樣状您,目前spark2.3已經(jīng)原生支持kubernetes調(diào)度

NFS服務(wù)搭建

  • 什么是NFS(來自百度百科)
    NFS(Network File System)即網(wǎng)絡(luò)文件系統(tǒng),是FreeBSD支持的文件系統(tǒng)中的一種兜挨,它允許網(wǎng)絡(luò)中的計(jì)算機(jī)之間通過TCP/IP網(wǎng)絡(luò)共享資源膏孟。在NFS的應(yīng)用中,本地NFS的客戶端應(yīng)用可以透明地讀寫位于遠(yuǎn)端NFS服務(wù)器上的文件拌汇,就像訪問本地文件一樣柒桑。
  • NFS服務(wù)搭建

yum install nfs-utils rpcbind -y
mkdir -p data/nfs
vim /etc/exports
加入如下內(nèi)容
/data/nfs 192.168.86.0/24(rw,no_root_squash,no_all_squash,sync)
啟動
/bin/systemctl start rpcbind.service
/bin/systemctl start nfs.service

到此,NFS服務(wù)已經(jīng)搭建好了

tensorflow docker鏡像打包

首先我們準(zhǔn)備DockerFile

FROM ubuntu:16.04
MAINTAINER yahengsong yahengsong@foxmail.com
RUN apt-get update \
&& apt-get install -y wget \
&& apt-get install -y lrzsz \
&& apt-get install -y unzip \
&& apt-get install -y zip \
&& apt-get install -y vim \
&& apt-get install -y gcc \
&& apt-get install -y g++ \
&& apt-get install -y automake \
&& apt-get install -y autoconf \
&& apt-get install -y libtool \
&& apt-get install -y make \
&& apt-get install -y openssl \
&& apt-get install -y libssl-dev \
&& apt-get install -y ruby \
&& apt-get install -y zlib1g \
&& apt-get install zlib1g.dev \
&& apt-get install -y bzip2 \
&& apt-get install -y libncurses5-dev \
&& apt-get install -y sqlite sqlite3 \
&& apt-get install -y libgdbm-dev \
&& apt-get install -y libpcap-dev \
&& apt-get install -y xz-utils
RUN wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tar.xz \
&& tar -xvf Python-3.6.0.tar.xz \
&& cd Python-3.6.0 \
&& mkdir -p /usr/local/python3 \
&& ./configure --prefix=/usr/local/python3 \
&& make \
&& make install \
&& rm -rf Python-3.6.0* \
&& ln -s /usr/local/python3/bin/python3 /usr/bin/python3 \
&& ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
RUN pip install --upgrade pip \
&& pip --no-cache-dir install >https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-1.7.0-cp36-cp36m-linux_x86_64.whl
# TensorBoard
EXPOSE 6006
# IPython
EXPOSE 8888
WORKDIR /root

我們可以通過該DockerFile在阿里云上進(jìn)行鏡像打包
這樣我們就有了自己的環(huán)境了(注意噪舀,該版本沒有安裝jupyter)


阿里云鏡像

詳情可以參考如何在阿里云上打包自己的鏡像

tenssorflow on kubernetes實(shí)戰(zhàn)

以下將是如何在kubernetes集群上部署tensorflow環(huán)境
首先部署單機(jī)CPU版本的
下面我們來看線tensorflow.yaml文件

 apiVersion: extensions/v1beta1
 kind: Deployment
 metadata: 
   name: tensorflow
 spec:
   replicas: 1
   template:
     metadata:
       labels:
         k8s-app: tensorflow
     spec:
       containers:
       - name: tensorflow
         image: registry.cn- 
  hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
         ports:
         - containerPort: 8888
         resources:
           limits:
             cpu: 4
             memory: 2Gi
           requests:
             cpu: 2
             memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
    name: jupyter-service
spec:
  type: NodePort
  ports:
  - port: 80
    targetPort: 8888
    nodePort: 32001
    name: tensorflow
  selector:
    k8s-app: tensorflow

這里我們依賴的是阿里云的tensorflow的docker鏡像registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3(該版本里邊有jupyter)魁淳,我們向kubernetes集群申請2個(gè)CPU和1G內(nèi)存,kubernetes集群給該Deployment最大的CPU限制是4核和2G內(nèi)存与倡,需要注意的是該環(huán)境暴露到外網(wǎng)的端口是32001
有了改文件界逛,接下來創(chuàng)建環(huán)境

kubectl create -f tensorflow.yaml

這時(shí)候就可以通過,查看該環(huán)境了

kubectl get pods

到此我們可以通過http://master_ip:32001/來訪問該環(huán)境的jupyter了

jupyter

這時(shí)候我們可以通過如下獲取token


get token

然后通過token就可以登陸了


jupyter

然后你就可以愉快的編程了

分布式tensorflow on kubernetes

如之前所介紹的分布式深度學(xué)習(xí)架構(gòu)
我們首先創(chuàng)建一個(gè)參數(shù)服務(wù)tf-ps.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata: 
  name: tensorflow-ps
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: tensorflow-ps
        role: ps
    spec:
      containers:
      - name: ps
        image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
        ports:
        - containerPort: 2222
        resources:
          limits:
            cpu: 4
            memory: 2Gi
          requests:
            cpu: 2
            memory: 1Gi
        volumeMounts:
        - mountPath: /datanfs
          readOnly: false
          name: nfs
      volumes:
      - name: nfs
        nfs:
          server: 你的nfs服務(wù)地址
          path: "/data/nfs"   
---
apiVersion: v1
kind: Service
metadata:
  name: tensorflow-ps-service
  labels:
    name: tensorflow-ps
    role: service
spec:
  ports:
  - port: 2222
    targetPort: 2222
  selector:
    name: tensorflow-ps

執(zhí)行

kubectl create -f tf-ps.yaml

然后兩個(gè)參數(shù)節(jié)點(diǎn)就會被創(chuàng)建
下面我們創(chuàng)建2個(gè)worker節(jié)點(diǎn)tf-worker.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: tensorflow-worker
spec:
  replicas: 3
  template:
    metadata:
      labels:
        name: tensorflow-worker
        role: worker
    spec:
      containers:
      - name: worker
        image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
        ports:
        - containerPort: 2222
        resources:
          limits:
            cpu: 4
            memory: 2Gi
          requests:
            cpu: 2
            memory: 1Gi
        volumeMounts:
        - mountPath: /datanfs
          readOnly: false
          name: nfs
      volumes:
      - name: nfs
        nfs:
          server: 你的nfs服務(wù)地址
          path: "/data/nfs"   
---
apiVersion: v1
kind: Service
metadata:
  name: tensorflow-wk-service
  labels:
    name: tensorflow-worker
spec:
  ports:
  - port: 2222
    targetPort: 2222
  selector:
    name: tensorflow-worker

執(zhí)行

kubectl create -f tf-worker.yaml

這時(shí)候2個(gè)worker節(jié)點(diǎn)就會被創(chuàng)建


節(jié)點(diǎn)
  • 訓(xùn)練
    查看每個(gè)pod的ip用于構(gòu)建集群訓(xùn)練代碼


    參數(shù)服務(wù)
worker

然后進(jìn)去每個(gè)節(jié)點(diǎn)環(huán)境

kubectl exec -ti tensorflow-ps-77b8d7bc89-87qgp bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-94ntr bash
kubectl exec -ti tensorflow-worker-b7cc4dd66-mzqhb bash

創(chuàng)建以下代碼(在此之前請先準(zhǔn)備好mnist數(shù)據(jù)集的csv格式并放到nfs服務(wù)的data/nfs目錄下)

from __future__ import print_function

import math

import tensorflow as tf

import collections

import sys,os, time

import numpy as np

# TensorFlow集群描述信息纺座,ps_hosts表示參數(shù)服務(wù)節(jié)點(diǎn)信息息拜,worker_hosts表示worker節(jié)點(diǎn)信息
tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")

# TensorFlow Server模型描述信息,包括作業(yè)名稱,任務(wù)編號该溯,隱含層神經(jīng)元數(shù)量岛抄,MNIST數(shù)據(jù)目錄以及每次訓(xùn)練數(shù)據(jù)大斜鸬搿(默認(rèn)一個(gè)批次為100個(gè)圖片)
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100, "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/datanfs", "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
FLAGS = tf.app.flags.FLAGS
#圖片像素大小為28*28像素
IMAGE_PIXELS = 28

class DataSet(object):
    def __init__(self,
                 images,
                 labels,
                 reshape=True):
        """Construct a DataSet.
        one_hot arg is used only if fake_data is true.  `dtype` can be either
        `uint8` to leave the input as `[0, 255]`, or `float32` to rescale into
        `[0, 1]`.
        """

        self._num_examples = images.shape[0]

        # Convert shape from [num examples, rows, columns, depth]
        # to [num examples, rows*columns] (assuming depth == 1)
        images = images.astype(np.float32)
        images = np.multiply(images, 1.0 / 255.0)
        self._images = images
        self._labels = labels
        self._epochs_completed = 0
        self._index_in_epoch = 0

    @property
    def images(self):
        return self._images

    @property
    def labels(self):
        return self._labels

    @property
    def num_examples(self):
        return self._num_examples

    @property
    def epochs_completed(self):
        return self._epochs_completed

    def next_batch(self, batch_size, fake_data=False, shuffle=True):
        """Return the next `batch_size` examples from this data set."""
        start = self._index_in_epoch
        # Shuffle for the first epoch
        if self._epochs_completed == 0 and start == 0 and shuffle:
            perm0 = np.arange(self._num_examples)
            np.random.shuffle(perm0)
            self._images = self.images[perm0]
            self._labels = self.labels[perm0]
        # Go to the next epoch
        if start + batch_size > self._num_examples:
            # Finished epoch
            self._epochs_completed += 1
            # Get the rest examples in this epoch
            rest_num_examples = self._num_examples - start
            images_rest_part = self._images[start:self._num_examples]
            labels_rest_part = self._labels[start:self._num_examples]
            # Shuffle the data
            if shuffle:
                perm = np.arange(self._num_examples)
                np.random.shuffle(perm)
                self._images = self.images[perm]
                self._labels = self.labels[perm]
                # Start next epoch
            start = 0
            self._index_in_epoch = batch_size - rest_num_examples
            end = self._index_in_epoch
            images_new_part = self._images[start:end]
            labels_new_part = self._labels[start:end]
            return np.concatenate((images_rest_part, images_new_part), axis=0) , \
                   np.concatenate((labels_rest_part, labels_new_part), axis=0)
        else:
            self._index_in_epoch += batch_size
            end = self._index_in_epoch
            return self._images[start:end], self._labels[start:end]
def dense_to_one_hot(labels_dense, num_classes):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels = labels_dense.shape[0]
    index_offset = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot


def read_data_sets(train_dir,
                   reshape=True,
                   validation_size=2000):
    trainfile = os.path.join(train_dir, "mnist_train.csv")
    testfile = os.path.join(train_dir, "mnist_test.csv")
    train_images = np.array([], dtype=np.uint8)
    train_labels = np.array([], dtype=np.uint8)
    test_images = np.array([], dtype=np.uint8)
    test_labels = np.array([], dtype=np.uint8)

    count = 0
    with open(trainfile) as f:
        for line in f.readlines():
            count+= 1
            line = line.strip()
            line = line.split(",")
            line = [int(x) for x in line]
            one_rray = np.array(line[1:], dtype=np.uint8)
            train_images = np.hstack((train_images, one_rray))
            train_labels = np.hstack((train_labels, np.array(line[0], dtype=np.uint8)))
            if count % 10000 == 0:
                print(str(count))
            if count == 20000:
                break
    train_images = train_images.reshape(20000, 28*28)
    train_labels = train_labels.reshape(20000, 1)
    train_labels = dense_to_one_hot(train_labels, 10)

    count = 0
    with open(testfile) as f:
        for line in f.readlines():
            count += 1
            line = line.strip()
            line = line.split(",")
            line = [int(x) for x in line]
            one_rray = np.array(line[1:], dtype=np.uint8)
            test_images = np.hstack((test_images, one_rray))
            test_labels = np.hstack((test_labels, np.array(line[0], dtype=np.uint8)))
            if count % 10000 == 0:
                print(str(count))
    test_images = test_images.reshape(10000, 28*28)
    test_labels = test_labels.reshape(10000, 1)
    test_labels = dense_to_one_hot(test_labels, 10)

    if not 0 <= validation_size <= len(train_images):
        raise ValueError(
            'Validation size should be between 0 and {}. Received: {}.'
                .format(len(train_images), validation_size))

    validation_images = train_images[:validation_size]
    validation_labels = train_labels[:validation_size]
    train_images = train_images[validation_size:]
    train_labels = train_labels[validation_size:]

    train = DataSet(train_images, train_labels, reshape=reshape)
    validation = DataSet(validation_images, validation_labels, reshape=reshape)
    test = DataSet(test_images, test_labels, reshape=reshape)


    Datasets = collections.namedtuple('Datasets', ['train', 'validation', 'test'])
    return Datasets(train=train, validation=validation, test=test)

def main(_):
    #從命令行參數(shù)中讀取TensorFlow集群描述信息
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    # 創(chuàng)建TensorFlow集群描述對象
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    # 為本地執(zhí)行Task狈茉,創(chuàng)建TensorFlow本地Server對象.
    server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
    #如果是參數(shù)服務(wù),直接啟動即可
    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":
        #分配操作到指定的worker上執(zhí)行掸掸,默認(rèn)為該節(jié)點(diǎn)上的cpu0
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % FLAGS.task_index,
                ps_device="/job:ps/cpu:0",
                cluster=cluster)):
            # 定義TensorFlow隱含層參數(shù)變量氯庆,為全連接神經(jīng)網(wǎng)絡(luò)隱含層
            hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
            hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
            # 定義TensorFlow softmax回歸層的參數(shù)變量
            sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10], stddev=1.0 / math.sqrt(FLAGS.hidden_units)), name="sm_w")
            sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
            #定義模型輸入數(shù)據(jù)變量(x為圖片像素?cái)?shù)據(jù),y_為手寫數(shù)字分類)
            x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
            y_ = tf.placeholder(tf.float32, [None, 10])
            #定義隱含層及神經(jīng)元計(jì)算模型
            hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
            hid = tf.nn.relu(hid_lin)
            #定義softmax回歸模型扰付,及損失方程
            y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
            loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
            #定義全局步長堤撵,默認(rèn)值為0
            global_step = tf.Variable(0, name="global_step", trainable=False)
            #定義訓(xùn)練模型,采用Adagrad梯度下降法
            train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
            #定義模型精確度驗(yàn)證模型羽莺,統(tǒng)計(jì)模型精確度
            correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
            #對模型定期做checkpoint实昨,通常用于模型回復(fù)
            saver = tf.train.Saver()
            #定義收集模型統(tǒng)計(jì)信息的操作
            summary_op = tf.summary.merge_all()
            #定義操作初始化所有模型變量
            init_op = tf.initialize_all_variables()
            #創(chuàng)建一個(gè)監(jiān)管程序,用于構(gòu)建模型檢查點(diǎn)以及計(jì)算模型統(tǒng)計(jì)信息盐固。
            is_chief = (FLAGS.task_index == 0)
            if is_chief:
                print("Worker %d: Initializing session..." % FLAGS.task_index)
            else:
                print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)

            sv = tf.train.Supervisor(
                is_chief= is_chief,
                logdir="/tmp/train_logs",
                init_op=init_op,
                summary_op=summary_op,
                saver=saver,
                global_step=global_step,
                save_model_secs=600)

            sess_config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False,
                device_filters=["/job:ps",
                                "/job:worker/task:%d" % FLAGS.task_index])

            #讀入MNIST訓(xùn)練數(shù)據(jù)集
            mnist = read_data_sets(FLAGS.data_dir)
            #創(chuàng)建TensorFlow session對象荒给,用于執(zhí)行TensorFlow圖計(jì)算
            with sv.managed_session(server.target, config=sess_config) as sess:
                print("Worker %d: Session initialization complete." % FLAGS.task_index)
                # Perform training
                time_begin = time.time()
                print("Training begins @ %f" % time_begin)
                local_step = 0
                step = 0
                while not sv.should_stop() and step < 10000:
                    # 讀入MNIST的訓(xùn)練數(shù)據(jù),默認(rèn)每批次為100個(gè)圖片
                    batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                    train_feed = {x: batch_xs, y_: batch_ys}
                    #執(zhí)行分布式TensorFlow模型訓(xùn)練
                    _, step = sess.run([train_op, global_step], feed_dict=train_feed)
                    local_step = local_step + 1
                    now = time.time()
                    print("%f: Worker %d: training step %d done (global step: %d)" %
                        (now, FLAGS.task_index, local_step, step))
                    #每隔100步長刁卜,驗(yàn)證模型精度
                    if step % 100 == 0:
                        print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
                        print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
            # 停止TensorFlow Session
            time_end = time.time()
            print("Training ends @ %f" % time_end)
            training_time = time_end - time_begin
            print("Training elapsed time: %f s" % training_time)
            print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
            print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
            sv.stop()
if __name__ == "__main__":
    tf.app.run()

然后在參數(shù)服務(wù)上執(zhí)行

python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0

則會有志电,其實(shí)是啟動了GRPC服務(wù)


參數(shù)服務(wù)

在第一個(gè)worker節(jié)點(diǎn)上執(zhí)行

python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0

在第二個(gè)worker節(jié)點(diǎn)上執(zhí)行

python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1

這時(shí)候等數(shù)據(jù)加載完成就會有如下訓(xùn)練信息


第一個(gè)工作節(jié)點(diǎn)

第二個(gè)工作節(jié)點(diǎn)

兩個(gè)工作節(jié)點(diǎn)的迭代次數(shù)合起來就是我們設(shè)置的總的迭代次數(shù)
自后的模型都會存在nfs服務(wù)中,因?yàn)橹挥羞@樣參數(shù)節(jié)點(diǎn)和工作節(jié)點(diǎn)才能共享模型參數(shù)

  • GPU
    GPU的方案整體和以上差不多蛔趴,只是在原yaml文件中增加GPU支持
    模版:
apiVersion: v1
kind: ReplicationController
metadata:
  name: tensorflow-worker
spec:
  replicas: 1
  selector:
    name: tensorflow-worker
  template:
    metadata:
      labels:
        name: tensorflow-worker
        role: worker
    spec:  
      containers:
      - name: worker
        image: gcr.io/tensorflow/tensorflow:latest-gpu
        ports:
        - containerPort: 2222
        env:
        - name: PS_KEY
          valueFrom:
            configMapKeyRef:
              name: tensorflow-cluster-config
              key: ps
        - name: WORKER_KEY
          valueFrom:
            configMapKeyRef:
              name: tensorflow-cluster-config
              key: worker
        securityContext:
          privileged: true
        resources:
          requests:
            alpha.kubernetes.io/nvidia-gpu: 1
          limits:
            alpha.kubernetes.io/nvidia-gpu: 1
        volumeMounts:
        - mountPath: /dev/nvidia0
          name: nvidia0
        - mountPath: /dev/nvidiactl
          name: nvidiactl
        - mountPath: /dev/nvidia-uvm
          name: nvidia-uvm
        - mountPath: /datanfs
          name: tfstorage
        - name: libcuda-so
          mountPath: /usr/lib/x86_64-linux-gnu
        - name: cuda
          mountPath: /usr/local/cuda-8.0
      volumes:
      - name: nfs
        persistentVolumeClaim:
          claimName: nfs-pvc
      - hostPath:
          path: /dev/nvidia0
        name: nvidia0
      - hostPath:
          path: /dev/nvidiactl
        name: nvidiactl
      - hostPath:
          path: /dev/nvidia-uvm
        name: nvidia-uvm
      - name: libcuda-so
        hostPath:
          path: /usr/lib/x86_64-linux-gnu
      - name: cuda
        hostPath:
          path: /usr/local/cuda-8.0

到此我們的實(shí)戰(zhàn)已經(jīng)初步結(jié)束挑辆,當(dāng)然不排除其中有很多細(xì)節(jié),有很多坑要踩孝情,這些細(xì)節(jié)和坑在這里都不一一再說了鱼蝉,因?yàn)樘嗔耍瑳]發(fā)寫箫荡。
如果你們在實(shí)戰(zhàn)過程中遇到什么問題魁亦,歡迎隨時(shí)跟我溝通,我們共同成長菲茬,共同學(xué)習(xí)吉挣。

QQ:458798698
微信號:songwindwind

或者直接在簡書上聯(lián)系我。
有興趣的可以關(guān)注本人github
https://github.com/songyaheng

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婉弹,一起剝皮案震驚了整個(gè)濱河市睬魂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镀赌,老刑警劉巖氯哮,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異商佛,居然都是意外死亡喉钢,警方通過查閱死者的電腦和手機(jī)姆打,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肠虽,“玉大人幔戏,你說我怎么就攤上這事∷翱危” “怎么了闲延?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長韩玩。 經(jīng)常有香客問我垒玲,道長,這世上最難降的妖魔是什么找颓? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任合愈,我火速辦了婚禮,結(jié)果婚禮上击狮,老公的妹妹穿的比我還像新娘佛析。我一直安慰自己,他們只是感情好帘不,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布说莫。 她就那樣靜靜地躺著,像睡著了一般寞焙。 火紅的嫁衣襯著肌膚如雪储狭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天捣郊,我揣著相機(jī)與錄音辽狈,去河邊找鬼。 笑死呛牲,一個(gè)胖子當(dāng)著我的面吹牛刮萌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播娘扩,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼着茸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了琐旁?” 一聲冷哼從身側(cè)響起涮阔,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎灰殴,沒想到半個(gè)月后敬特,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年伟阔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了辣之。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡皱炉,死狀恐怖怀估,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情娃承,我是刑警寧澤奏夫,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布怕篷,位于F島的核電站历筝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏廊谓。R本人自食惡果不足惜梳猪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蒸痹。 院中可真熱鬧春弥,春花似錦、人聲如沸叠荠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽榛鼎。三九已至逃呼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間者娱,已是汗流浹背抡笼。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留黄鳍,地道東北人推姻。 一個(gè)月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像框沟,于是被迫代替她去往敵國和親藏古。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容