準(zhǔn)備
Step1: 首先下載項(xiàng)目:
//下載項(xiàng)目
git clone https://github.com/allwefantasy/spark-deep-learning.git .
//切換到release 分支
git checkout release
Step2: 構(gòu)建pyspark環(huán)境:
確保安裝了python 2.7 ,強(qiáng)烈建議你使用Virtualenv方便python環(huán)境的管理。之后通過(guò)pip 安裝pyspark
pip install pyspark
文件比較大竭钝,大約180多M,有點(diǎn)耐心抑胎。你也可以使用阿里源:
pip install pyspark -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
下載 spark 2.2.0,然后解壓到特定目錄燥滑,設(shè)置SPARK_HOME即可。
其實(shí)如果通過(guò)spark-submit 提交程序阿逃,并不會(huì)需要額外安裝pyspark, 這里通過(guò)pip安裝的主要目的是為了讓你的IDE能有代碼提示铭拧。
接著安裝項(xiàng)目需要的依賴:
pip install -r requirements.txt
最后進(jìn)行項(xiàng)目build:
build/sbt assembly
這個(gè)時(shí)候你就得到了一個(gè)jar包:
target/scala-2.11/spark-deep-learning-assembly-0.1.0-spark2.1.jar
另外,另外你還需要一個(gè)Kafka恃锉。 似乎感覺(jué)有點(diǎn)麻煩搀菩,然而只要配置一次。
方便代碼提示破托,package python 源碼
為了方便在IDE得到代碼提示肪跋,我們還需要把python相關(guān)的代碼打包。
在主目錄運(yùn)行:
cd ./python && python setup.py bdist_wheel && cd dist && pip uninstall sparkdl && pip install ./sparkdl-0.2.2-py2-none-any.whl && cd ..
我這里打包和安裝放一塊了土砂。
現(xiàn)在州既,在IDE里,你可以得到代碼提示補(bǔ)全了萝映。
開(kāi)發(fā)基于SK-Learn的應(yīng)用
首先我們假設(shè)我們有這樣的數(shù)據(jù):
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV
from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer
session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
documentDF = session.createDataFrame([
("Hi I heard about Spark", "spark"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "mlib")
], ["text", "preds"])
接著我們希望把preds轉(zhuǎn)化為數(shù)字(分類(lèi))吴叶,text轉(zhuǎn)化為向量,這樣才能喂給算法序臂。我們可以這么做:
features = TFTextTransformer(
inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
indexer = StringIndexer(inputCol="preds", outputCol="labels")
pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)
TFTextTransformer 默認(rèn)提供的是一個(gè)二維數(shù)組蚌卤,shape為(64,100),這種shape其實(shí)是為了給深度學(xué)習(xí)使用的,這里我指定shape為(-1,) 則會(huì)將二維數(shù)組轉(zhuǎn)化為一個(gè)64*100的向量
現(xiàn)在我們寫(xiě)一個(gè)函數(shù)造寝,里面實(shí)現(xiàn)具體的sk-learn邏輯:
def sk_map_fun(args={}, ctx=None, _read_data=None):
params = args['params']['fitParam']
data = [item for item in _read_data()]
parameters = {'kernel': ('linear', 'rbf')}
svr = svm.SVC()
clf = GridSearchCV(svr, parameters)
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
model = clf.fit(X, y)
print(model.best_estimator_)
return ""
前面必須是def sk_map_fun(args={}, ctx=None, _read_data=None):
這樣,函數(shù)名字可以隨意定吭练。 _read_data 是你獲取數(shù)據(jù)的一個(gè)對(duì)象诫龙,典型用法如下:
for data in _read_data(max_records=params["batch_size"]):
batch_data = feed_dict(data)
sess.run(train_step, feed_dict={input_x: batch_data})
因?yàn)镾VM是需要全量數(shù)據(jù)的,所以我簡(jiǎn)單的一次性拉取所有數(shù)據(jù)鲫咽,因?yàn)闂l數(shù)小于默認(rèn)的64條签赃,所以我沒(méi)有指定max_records.
data = [item for item in _read_data()]
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
現(xiàn)在我們要把sk_map_fun 集成到Estimator里:
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=sk_map_fun)
estimator.fit(ds).collect()
這里,通過(guò)mapFnParam
參數(shù)分尸,我們將sklearn函數(shù)傳遞給了TextEstimator锦聊,并且我們配置了Kakfa相關(guān)參數(shù)。這里唯一需要注意的是fitParam箩绍, 這里的fitParam 長(zhǎng)度為2,意味著會(huì)啟動(dòng)兩個(gè)進(jìn)程運(yùn)行sk_map_fun,并且一次傳遞對(duì)應(yīng)的參數(shù)給sk_map_fun孔庭,sk_map_fun的第一段代碼:
params = args['params']['fitParam']
這個(gè)時(shí)候params是{"epochs": 5, "batch_size": 64} 或者 {"epochs": 5, "batch_size": 1}。
這樣你可以通過(guò)params拿到epoche,batch_size等材蛛,然后傳給對(duì)應(yīng)的Sk-Learn模型圆到。
如果你只是運(yùn)行Local模式,那么可以修改下kafkaParam參數(shù):
import tempfile
mock_kafka_file = tempfile.mkdtemp()
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"mock_kafka_file": mock_kafka_file,
"group_id": "sdl_1", "test_mode": True},
指定一個(gè)臨時(shí)目錄mock_kafka_file卑吭,并且設(shè)置為test_mode為T(mén)rue,這樣就可以不依賴于Kafka.
現(xiàn)在我么給出完整程序:
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV
from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer
session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()
documentDF = session.createDataFrame([
("Hi I heard about Spark", "spark"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("I wish Java could use case classes", "java"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "mlib"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "spark"),
("Logistic regression models are neat", "java"),
("Logistic regression models are neat", "mlib")
], ["text", "preds"])
# transform text column to sentence_matrix column which contains 2-D array.
features = TFTextTransformer(
inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)
indexer = StringIndexer(inputCol="preds", outputCol="labels")
pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)
def sk_map_fun(args={}, ctx=None, _read_data=None):
data = [item for item in _read_data()]
parameters = {'kernel': ('linear', 'rbf')}
svr = svm.SVC()
clf = GridSearchCV(svr, parameters)
X = [x["features"] for x in data[0]]
y = [int(x["labels"]) for x in data[0]]
model = clf.fit(X, y)
print(model.best_estimator_)
return ""
# create a estimator to training where map_fun contains tensorflow's code
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"group_id": "sdl_1", "test_mode": False},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=sk_map_fun)
estimator.fit(ds).collect()
然后使用如下指令運(yùn)行:
./bin/spark-submit \
--py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--jars spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--master "local[*]" Sk2.py
記得改下代碼芽淡。
開(kāi)發(fā)基于TensorFlow的應(yīng)用
只要修改map_fun函數(shù)即可,比如:
def map_fun(args={}, ctx=None, _read_data=None):
import tensorflow as tf
EMBEDDING_SIZE = args["embedding_size"]
params = args['params']['fitParam']
SEQUENCE_LENGTH = 64
def feed_dict(batch):
# Convert from dict of named arrays to two numpy arrays of the proper type
features = []
for i in batch:
features.append(i['sentence_matrix'])
# print("{} {}".format(feature, features))
return features
encoder_variables_dict = {
"encoder_w1": tf.Variable(
tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"),
"encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"),
"encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"),
"encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2")
}
def encoder(x, name="encoder"):
with tf.name_scope(name):
encoder_w1 = encoder_variables_dict["encoder_w1"]
encoder_b1 = encoder_variables_dict["encoder_b1"]
layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1)
encoder_w2 = encoder_variables_dict["encoder_w2"]
encoder_b2 = encoder_variables_dict["encoder_b2"]
layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2)
return layer_2
def decoder(x, name="decoder"):
with tf.name_scope(name):
decoder_w1 = tf.Variable(tf.random_normal([128, 256]))
decoder_b1 = tf.Variable(tf.random_normal([256]))
layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1)
decoder_w2 = tf.Variable(
tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE]))
decoder_b2 = tf.Variable(
tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE]))
layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2)
return layer_2
tf.reset_default_graph
sess = tf.Session()
input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x")
flattened = tf.reshape(input_x,
[-1, SEQUENCE_LENGTH * EMBEDDING_SIZE])
encoder_op = encoder(flattened)
tf.add_to_collection('encoder_op', encoder_op)
y_pred = decoder(encoder_op)
y_true = flattened
with tf.name_scope("xent"):
consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1),
tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)),
tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1))))
xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine))
tf.summary.scalar("xent", xent)
with tf.name_scope("train"):
# train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent)
train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent)
summ = tf.summary.merge_all()
sess.run(tf.global_variables_initializer())
for i in range(params["epochs"]):
print("epoll {}".format(i))
for data in _read_data(max_records=params["batch_size"]):
batch_data = feed_dict(data)
sess.run(train_step, feed_dict={input_x: batch_data})
sess.close()
我這里還是之前的一個(gè)例子豆赏,一個(gè)auto-encoder程序挣菲。
接著通過(guò)TextEstimator接入:
estimator = TextEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
"mock_kafka_file": mock_kafka_file,
"group_id": "sdl_1", "test_mode": True},
runningMode="Normal",
fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
mapFnParam=map_fun)
estimator.fit(df).collect()
大同小異了。
關(guān)于tensorflow,還可以有集群模式掷邦,可參考: 為Spark Deep Learning 集成TFoS