參考資料
https://ubuntu.com/blog/data-science-workflows-on-kubernetes-with-kubeflow-pipelines-part-2?
https://www.tensorflow.org/tutorials/keras/classification
https://github.com/manceps/fashion-mnist-kfp-lab
正文
如標(biāo)題所示动猬,本文闡述在kubeflow的pipeline組件中如何創(chuàng)建pvc,并在pipeline的流水線中傳遞文件腔寡。
眾所周知奈揍,pipeline制定了從數(shù)據(jù)到模型的整個(gè)流程,那么紊册,按照一個(gè)嚴(yán)謹(jǐn)?shù)拇a農(nóng)思維比肄,我們應(yīng)該這么準(zhǔn)備:
1、創(chuàng)建存儲(chǔ)的地方
2囊陡、準(zhǔn)備數(shù)據(jù)
3芳绩、訓(xùn)練模型并輸出
4、測(cè)試模型
而且這四步應(yīng)該發(fā)生在不同的容器中关斜,即指定不同的鏡像示括。
好了,閑話不說(shuō)痢畜,開(kāi)始干垛膝!
用的例子是最基礎(chǔ)的例子鳍侣,fashion mnist
直接上最終的代碼:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
def train(data_path, model_file):
? ? # func_to_container_op requires packages to be imported inside of the function.
? ? import pickle
? ? import tensorflow as tf
? ? from tensorflow.python import keras
? ? # Download the dataset and split into training and test data.
? ? fashion_mnist = keras.datasets.fashion_mnist
? ? (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
? ? # Normalize the data so that the values all fall between 0 and 1.
? ? train_images = train_images / 255.0
? ? test_images = test_images / 255.0
? ? # Define the model using Keras.
? ? model = keras.Sequential([
? ? keras.layers.Flatten(input_shape=(28, 28)),
? ? keras.layers.Dense(128, activation='relu'),
? ? keras.layers.Dense(10)
? ? ])
? ? model.compile(optimizer='adam',
? ? ? ? ? ? ? ? ? loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
? ? ? ? ? ? ? ? ? metrics=['accuracy'])
? ? # Run a training job with specified number of epochs
? ? model.fit(train_images, train_labels, epochs=10)
? ? # Evaluate the model and print the results
? ? test_loss, test_acc = model.evaluate(test_images,? test_labels, verbose=2)
? ? print('Test accuracy:', test_acc)
? ? # Save the model to the designated
? ? model.save(f'{data_path}/{model_file}')
? ? # Save the test_data as a pickle file to be used by the predict component.
? ? with open(f'{data_path}/test_data', 'wb') as f:
? ? ? ? pickle.dump((test_images,test_labels), f)
def predict(data_path, model_file, image_number):
? ? # func_to_container_op requires packages to be imported inside of the function.
? ? import pickle
? ? import tensorflow as tf
? ? from tensorflow import keras
? ? import numpy as np
? ? # Load the saved Keras model
? ? model = keras.models.load_model(f'{data_path}/{model_file}')
? ? # Load and unpack the test_data
? ? with open(f'{data_path}/test_data','rb') as f:
? ? ? ? test_data = pickle.load(f)
? ? # Separate the test_images from the test_labels.
? ? test_images, test_labels = test_data
? ? # Define the class names.
? ? class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
? ? ? ? ? ? ? ? ? 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
? ? # Define a Softmax layer to define outputs as probabilities
? ? probability_model = tf.keras.Sequential([model,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? tf.keras.layers.Softmax()])
? ? # See https://github.com/kubeflow/pipelines/issues/2320 for explanation on this line.
? ? image_number = int(image_number)
? ? # Grab an image from the test dataset.
? ? img = test_images[image_number]
? ? # Add the image to a batch where it is the only member.
? ? img = (np.expand_dims(img,0))
? ? # Predict the label of the image.
? ? predictions = probability_model.predict(img)
? ? # Take the prediction with the highest probability
? ? prediction = np.argmax(predictions[0])
? ? # Retrieve the true label of the image from the test labels.
? ? true_label = test_labels[image_number]
? ? class_prediction = class_names[prediction]
? ? confidence = 100*np.max(predictions)
? ? actual = class_names[true_label]
? ? with open(f'{data_path}/result.txt', 'w') as result:
? ? ? ? result.write(" Prediction: {} | Confidence: {:2.0f}% | Actual: {}".format(class_prediction,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? confidence,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? actual))
? ? print('Prediction has be saved successfully!')
# Define the pipeline
@dsl.pipeline(
? name='Fashion MNIST Pipeline',
? description='A toy pipeline that performs fashion mnist model training and prediction.')
# Define parameters to be fed into pipeline
def mnist_container_pipeline(
? ? data_path='/mnt',
? ? model_file='mnist_model_0929.h5',
? ? image_number=0):
? ? train_op = comp.func_to_container_op(train, base_image='0.0.0.0/library/tensorflow:2.3.0-gpu')
? ? predict_op = comp.func_to_container_op(predict, base_image='0.0.0.0/library/tensorflow:2.3.0-gpu')
? ? # Define volume to share data between components.
? ? vop = dsl.VolumeOp(
? ? name="create_volume",
? ? resource_name="data-volume",
? ? size="1Gi",
? ? modes=dsl.VOLUME_MODE_RWM)
? ? # Create MNIST training component.
? ? mnist_training_container = train_op(data_path, model_file) \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .add_pvolumes({data_path: vop.volume})
? ? # Create MNIST prediction component.
? ? mnist_predict_container = predict_op(data_path, model_file, image_number) \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .add_pvolumes({data_path: mnist_training_container.pvolume})
? ? # Print the result of the prediction
? ? mnist_result_container = dsl.ContainerOp(
? ? ? ? name="print_prediction",
? ? ? ? image='0.0.0.0/library/bash:4.4.23',
? ? ? ? pvolumes={data_path: mnist_predict_container.pvolume},
? ? ? ? arguments=['cat', f'{data_path}/result.txt']
? ? )
if __name__ == '__main__':
? ? import kfp.compiler as compiler
? ? compiler.Compiler().compile(mnist_container_pipeline, __file__ + '.tar.gz')
代碼解釋:
前面定義了train、predict兩個(gè)函數(shù)吼拥,這個(gè)就不說(shuō)了倚聚,是我們的主要執(zhí)行代碼
后面就是一個(gè)mnist_container_pipeline,這個(gè)流程制定了整個(gè)工作流程1凿可、創(chuàng)建pvc? 2惑折、訓(xùn)練? 3、 測(cè)試? 4枯跑、打印輸出
其中第1步創(chuàng)建的pvc會(huì)被后面3步用到惨驶,第2步訓(xùn)練產(chǎn)生的模型文件和用于第3步測(cè)試的數(shù)據(jù)都會(huì)被保存在pvc中,第3步從pvc中讀取數(shù)據(jù)并進(jìn)行測(cè)試敛助,測(cè)試結(jié)果寫(xiě)在txt中粗卜,依舊保存在pvc中,第四步從pvc中讀取txt并進(jìn)行輸出纳击。
完畢续扔!
編譯命令:dsl-compile --py? pipeline.py --output pipeline.tar.gz? ? 然后上傳執(zhí)行即可
持續(xù)學(xué)習(xí)中。焕数。纱昧。
希望能給大家?guī)?lái)幫助。
分享知識(shí)堡赔,分享進(jìn)步
歡迎大家分享交流识脆。