由于機(jī)器學(xué)習(xí)和深度學(xué)習(xí)不斷被炒熱旗吁,Tensorflow作為Google家(Jeff Dean大神)推出的開源深度學(xué)習(xí)框架踩萎,也獲得了很多關(guān)注。Tensorflow的靈活性很強(qiáng)很钓,允許用戶使用多臺(tái)機(jī)器的多個(gè)設(shè)備(如不同的CPU和GPU)香府。但是由于Tensorflow 分布式的方式需要用戶在客戶端顯示指定集群信息,另外需要手動(dòng)拉起ps码倦, worker等task. 對(duì)資源管理和使用上有諸多不便企孩。因此,Yahoo開源了基于Spark的Tensorflow袁稽,使用executor執(zhí)行worker和ps task. 項(xiàng)目地址為:https://github.com/yahoo/TensorFlowOnSpark
寫在前面.. 前方高能勿璃,請(qǐng)注意!
雖然yahoo提供了如何在Spark集群中運(yùn)行Tensorflow的步驟推汽,但是由于這個(gè)guideline過于簡(jiǎn)單补疑,一般情況下,根據(jù)這個(gè)guideline是跑不起來的. :(
Tensorflow on Spark 介紹
TensorflowOnSpark 支持使用Spark/Hadoop集群分布式的運(yùn)行Tensorflow歹撒,號(hào)稱支持所有的Tensorflow操作癣丧。需要注意的是用戶需要對(duì)原有的TF程序進(jìn)行簡(jiǎn)單的改造,就能夠運(yùn)行在Spark集群之上栈妆。
如何跑起來Tensorflow on Spark 胁编?
雖然Yahoo在github上說明了安裝部署TFS (https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_YARN), 但是根據(jù)實(shí)際實(shí)踐,根據(jù)這個(gè)文檔如果能跑起來鳞尔,那真的要謝天謝地嬉橙。因?yàn)樵趯?shí)際過程中,會(huì)因?yàn)榄h(huán)境問題遇到一些unexpected error寥假。以下就是我將自己在實(shí)踐過程中遇到的一些問題總結(jié)列舉市框。
- 編譯python和pip
yahoo提供的編譯步驟為:
# download and extract Python 2.7
export PYTHON_ROOT=~/Python
curl -O https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz
tar -xvf Python-2.7.12.tgz
rm Python-2.7.12.tgz
# compile into local PYTHON_ROOT
pushd Python-2.7.12
./configure --prefix="${PYTHON_ROOT}" --enable-unicode=ucs4
make
make install
popd
rm -rf Python-2.7.12
# install pip
pushd "${PYTHON_ROOT}"
curl -O https://bootstrap.pypa.io/get-pip.py
bin/python get-pip.py
rm get-pip.py
# install tensorflow (and any custom dependencies)
${PYTHON_ROOT}/bin/pip install pydoop
# Note: add any extra dependencies here
popd
在實(shí)際編譯過程中,采用的Centos7.2操作系統(tǒng)糕韧,可能出現(xiàn)以下問題:
- 安裝pip報(bào)錯(cuò)
bin/python get-pip.py
ERROR:root:code for hash sha224 was not found.
Traceback (most recent call last):
報(bào)這個(gè)錯(cuò)一般是因?yàn)閜ython中缺少_ssl.so 和 _hashlib.so庫(kù)造成枫振,可以從系統(tǒng)python庫(kù)中找對(duì)應(yīng)版本的拷貝到相應(yīng)的python文件夾下(例如:lib/python2.7/lib-dynload)喻圃。
- 缺少zlib
bin/python get-pip.py
Traceback (most recent call last):
File "get-pip.py", line 20061, in <module>
main()
File "get-pip.py", line 194, in main
bootstrap(tmpdir=tmpdir)
File "get-pip.py", line 82, in bootstrap
import pip
zipimport.ZipImportError: can't decompress data; zlib not available
解決這個(gè)問題的方法是使用yum安裝zlib*后,重新編譯python后粪滤,即可解決斧拍。
- ssl 報(bào)錯(cuò)
bin/python get-pip.py
pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available.
Collecting pip
Could not fetch URL https://pypi.python.org/simple/pip/: There was a problem confirming the ssl certificate: Can't connect to HTTPS URL because the SSL module is not available. - skipping
Could not find a version that satisfies the requirement pip (from versions: )
No matching distribution found for pip
解決方法: 在Python安裝目錄下打開文件lib/python2.7/ssl.py,注釋掉 , HAS_ALPN
from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN#, HAS_ALPN
- pip install pydoop報(bào)錯(cuò)
gcc: error trying to exec 'cc1plus': execvp:
解決辦法:需要在機(jī)器上安裝g++編譯器
2.安裝編譯 TensorFlow w/ RDMA Support
git clone git@github.com:yahoo/tensorflow.git
# follow build instructions to install into ${PYTHON_ROOT}
注意編譯過程需要google的bazel和protoc, 這兩個(gè)工具需要提前裝好杖小。
3.接下來的步驟按照https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_YARN 指導(dǎo)的步驟完成肆汹。
4.在HDP2.5部署的spark on Yarn環(huán)境上運(yùn)行Tensorflow。
- 在yarn-env.sh中設(shè)置環(huán)境變量予权,增加 * export HADOOP_HDFS_HOME=/usr/hdp/2.5.0.0-1245/hadoop-hdfs/*
因?yàn)檫@個(gè)環(huán)境變量需要在執(zhí)行tensorflow任務(wù)時(shí)被用到昂勉,如果沒有export,會(huì)報(bào)錯(cuò)扫腺。 - 重啟YARN岗照,使上述改動(dòng)生效。
- 按照Yahoo github上的步驟笆环,執(zhí)行訓(xùn)練mnist任務(wù)時(shí)攒至,按下面命令提交作業(yè):
export PYTHON_ROOT=/data2/Python/
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=default
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 1G \
--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" \
/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/test/images \
--labels mnist/csv/test/labels \
--mode inference \
--model mnist_model \
--output predictions
此時(shí),通過Spark界面可以觀察到worker0處于阻塞狀態(tài)咧织。
17/03/21 18:17:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.4 KB, free 542.6 KB)
17/03/21 18:17:18 INFO TorrentBroadcast: Reading broadcast variable 1 took 17 ms
17/03/21 18:17:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 440.6 KB, free 983.3 KB)
2017-03-21 18:17:18,404 INFO (MainThread-14872) Connected to TFSparkNode.mgr on ochadoop03, ppid=14685, state='running'
2017-03-21 18:17:18,411 INFO (MainThread-14872) mgr.state='running'
2017-03-21 18:17:18,411 INFO (MainThread-14872) Feeding partition <generator object load_stream at 0x7f447f120960> into input queue <multiprocessing.queues.JoinableQueue object at 0x7f447f129890>
17/03/21 18:17:20 INFO PythonRunner: Times: total = 2288, boot = -5387, init = 5510, finish = 2165
17/03/21 18:17:20 INFO PythonRunner: Times: total = 101, boot = 3, init = 21, finish = 77
2017-03-21 18:17:20.587060: I tensorflow/core/distributed_runtime/master_session.cc:1011] Start master session b5d9a21a16799e0b with config:
通過分析原因發(fā)現(xiàn),在mnist例子中籍救,logdir設(shè)置的是hdfs的路徑习绢,可能是由于tf對(duì)hdfs的支持有限或者存在bug(慚愧,并沒有深究 :))蝙昙。將logdir改為本地目錄闪萄,就可以正常運(yùn)行。但是由此又帶來了另一個(gè)問題奇颠,因?yàn)镾park每次啟動(dòng)時(shí)worker0的位置并不確定败去,有可能每次啟動(dòng)的機(jī)器都不同,這就導(dǎo)致在inference的時(shí)候沒有辦法獲得訓(xùn)練的模型烈拒。
一個(gè)解決辦法是:在worker 0訓(xùn)練完模型后圆裕,將模型同步到hdfs中,在inference的之前荆几,再
將hdfs的checkpoints文件夾拉取到本地執(zhí)行吓妆。以下為我對(duì)yahoo提供的mnist example做的類似的修改.
def writeFileToHDFS():
rootdir = '/tmp/mnist_model'
client = HdfsClient(hosts='localhost:50070')
client.mkdirs('/user/root/mnist_model')
for parent,dirnames,filenames in os.walk(rootdir):
for dirname in dirnames:
print("parent is:{0}".format(parent))
for filename in filenames:
client.copy_from_local(os.path.join(parent,filename), os.path.join('/user/root/mnist_model',filename), overwrite=True)
#logdir = TFNode.hdfs_path(ctx, args.model)
logdir = "/tmp/" + args.model
while not sv.should_stop() and step < args.steps:
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# using feed_dict
batch_xs, batch_ys = feed_dict()
feed = {x: batch_xs, y_: batch_ys}
if len(batch_xs) != batch_size:
print("done feeding")
break
else:
if args.mode == "train":
_, step = sess.run([train_op, global_step], feed_dict=feed)
# print accuracy and save model checkpoint to HDFS every 100 steps
if (step % 100 == 0):
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy,{x: batch_xs, y_: batch_ys})))
else: # args.mode == "inference"
labels, preds, acc = sess.run([label, prediction, accuracy], feed_dict=feed)
results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)]
TFNode.batch_results(ctx.mgr, results)
print("acc: {0}".format(acc))
if task_index == 0:
writeFileToHDFS()
當(dāng)然這段代碼只是為了進(jìn)行說明,并不是很嚴(yán)謹(jǐn)吨铸,在上傳hdfs的時(shí)候行拢,是需要對(duì)文件夾是否存在等要做一系列的判斷。诞吱。舟奠。
5.train & inference
- 向Spark集群提交訓(xùn)練任務(wù).
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 3 \
--executor-memory 7G \
--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" \
/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--mode train \
--model mnist_model
執(zhí)行起來后竭缝,查看Spark UI,可以看到當(dāng)前訓(xùn)練過程中的作業(yè)執(zhí)行情況沼瘫。
執(zhí)行完后抬纸,檢查hdsf,checkpoint目錄, 可以看到模型的checkpoints已經(jīng)上傳到hdfs中晕鹊。
hadoop fs -ls /user/root/mnist_model
Found 8 items
-rwxr-xr-x 3 root hdfs 179 2017-03-21 18:53 /user/root/mnist_model/checkpoint
-rwxr-xr-x 3 root hdfs 117453 2017-03-21 18:53 /user/root/mnist_model/graph.pbtxt
-rwxr-xr-x 3 root hdfs 814164 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.data-00000-of-00001
-rwxr-xr-x 3 root hdfs 372 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.index
-rwxr-xr-x 3 root hdfs 45557 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.meta
-rwxr-xr-x 3 root hdfs 814164 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.data-00000-of-00001
-rwxr-xr-x 3 root hdfs 372 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.index
-rwxr-xr-x 3 root hdfs 45557 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.meta
- 根據(jù)訓(xùn)練的結(jié)果松却,執(zhí)行模型inference
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 1G \
--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" \
/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/test/images \
--labels mnist/csv/test/labels \
--mode inference \
--model mnist_model \
--output predictions
等任務(wù)執(zhí)行完成后,會(huì)發(fā)現(xiàn)溅话,模型判斷的結(jié)果已經(jīng)輸出到hdfs相關(guān)目錄下了晓锻。
hadoop fs -ls /user/root/predictions
Found 11 items
-rw-r--r-- 3 root hdfs 0 2017-03-21 19:16 /user/root/predictions/_SUCCESS
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00000
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00001
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00002
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00003
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00004
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00005
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00006
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00007
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00008
-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00009
查看其中的某一個(gè)文件,可看到里面保存的是測(cè)試集的標(biāo)簽和根據(jù)模型預(yù)測(cè)的結(jié)果飞几。
# hadoop fs -cat /user/root/predictions/part-00000
2017-03-21T19:16:40.795694 Label: 7, Prediction: 7
2017-03-21T19:16:40.795729 Label: 2, Prediction: 2
2017-03-21T19:16:40.795741 Label: 1, Prediction: 1
2017-03-21T19:16:40.795750 Label: 0, Prediction: 0
2017-03-21T19:16:40.795759 Label: 4, Prediction: 4
2017-03-21T19:16:40.795769 Label: 1, Prediction: 1
2017-03-21T19:16:40.795778 Label: 4, Prediction: 4
2017-03-21T19:16:40.795787 Label: 9, Prediction: 9
2017-03-21T19:16:40.795796 Label: 5, Prediction: 6
2017-03-21T19:16:40.795805 Label: 9, Prediction: 9
2017-03-21T19:16:40.795814 Label: 0, Prediction: 0
2017-03-21T19:16:40.795822 Label: 6, Prediction: 6
2017-03-21T19:16:40.795831 Label: 9, Prediction: 9
2017-03-21T19:16:40.795840 Label: 0, Prediction: 0
2017-03-21T19:16:40.795848 Label: 1, Prediction: 1
2017-03-21T19:16:40.795857 Label: 5, Prediction: 5
2017-03-21T19:16:40.795866 Label: 9, Prediction: 9
2017-03-21T19:16:40.795875 Label: 7, Prediction: 7
2017-03-21T19:16:40.795883 Label: 3, Prediction: 3
2017-03-21T19:16:40.795892 Label: 4, Prediction: 4
2017-03-21T19:16:40.795901 Label: 9, Prediction: 9
2017-03-21T19:16:40.795909 Label: 6, Prediction: 6
2017-03-21T19:16:40.795918 Label: 6, Prediction: 6
- Spark集群和tensorflow job task的對(duì)應(yīng)關(guān)系砚哆,如下圖,spark集群起了4個(gè)executor屑墨,其中一個(gè)作為PS, 另外3個(gè)作為worker躁锁,而誰做ps誰做worker是由Yarn和spark調(diào)度的。
Cluster spec: {'ps': ['ochadoop02:50060'], 'worker': ['ochadoop04:52150', 'ochadoop03:52733', 'ochadoop04:33289']}