Spark Streaming =>很火眉抬,在流處理中得到了廣泛的應(yīng)用描孟。TensorFlow=>很火,由Google大神開源锻梳,目前已經(jīng)在深度學(xué)習(xí)領(lǐng)域展現(xiàn)了超高的流行潛質(zhì)抖棘。那么如何在Spark Streaming中調(diào)用TensorFlow茂腥?筆者此文嘗試使用了簡單粗暴的方式在Spark Streaming中調(diào)用TensorFlow.
1. 需求和目標(biāo)
筆者已經(jīng)實現(xiàn)了一個基于Spark Streaming的流處理平臺狸涌,能夠?qū)afka輸入的流數(shù)據(jù)進行運算,并且可以通過Sql的方式對數(shù)據(jù)進行過濾和邏輯判斷础芍。
筆者玩了一段時間的TensorFlow杈抢,一直在琢磨在機器學(xué)習(xí)的Model Serving階段,能否由Kafka喂數(shù)據(jù)仑性,將TF融入到已有的流處理中惶楼,流入的每條數(shù)據(jù)都可以進行predict,處理結(jié)果還能通過Spark DataFrame進行SQL配置過濾诊杆。
2. 第一次嘗試歼捐,失敗。
因為在已有的Spark Streaming代碼框架中晨汹,有一個自定義的函數(shù)接口豹储,能夠依次處理RDD的每一個record,所以剛開始考慮在此函數(shù)接口中通過調(diào)用Python腳本實現(xiàn)調(diào)用TensorFlow淘这。
步驟如下:
- 在Spark executor 執(zhí)行的機器上安裝TensorFlow環(huán)境
- 在每個Record的執(zhí)行邏輯中import sys.process._ 剥扣,并調(diào)用外部TF的Python腳本。偽代碼如下:
//加入依賴庫
import sys.process._
// 執(zhí)行TF hello world腳本
val result = "sudo python /data/tf/hello.py".!!
// 把執(zhí)行腳本的結(jié)果輸出到Map铝穷,最終在另外的地方輸出到Kafka中
labelMap += (isAILabel -> result.toString)
hello.py腳本非常簡單钠怯,就是簡單的調(diào)用TensorFlow的hello world.
import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print sess.run(hello)
- 看起來很簡單很粗暴,最簡單的方法而已曙聂。運行的結(jié)果能正常調(diào)用起來晦炊,但是因為sys.process._是fork出進程去執(zhí)行腳本命令,在流處理過程中宁脊,如果每條數(shù)據(jù)去fork新的進程断国,會導(dǎo)致執(zhí)行效率非常底下。因此榆苞,在實際測試中稳衬,一臺機器每個macro batch 700 - 800 這樣極少的數(shù)據(jù)下,執(zhí)行一個批次竟然都需要消耗5分鐘以上坐漏。
因此宋彼,這種方法雖然看似簡單,但無法使用仙畦。
3. 第二種簡單粗暴方式 RDD pipe.
對每一條數(shù)據(jù)執(zhí)行TF腳本不可取,那么換個思路音婶,針對整個RDD進行處理慨畸,嘗試使用了RDD pipe的方法進行處理。在測試中性能比上一種方法有了極大的提升衣式,基本上在幾秒內(nèi)處理完成寸士。
方法如下:
- 在Spark executor 執(zhí)行的機器上安裝TensorFlow環(huán)境
- 在RDD處理中檐什,使用rdd的pipe方法,執(zhí)行python TF的腳本弱卡,然后生成新的pipedRDD乃正。
val piped = labelRDD.pipe("sudo python /data/tf/hello.py", Map("SEPARATOR" -> ","))
- 因為在筆者環(huán)境中,原始的rdd是json類型的婶博,需要使用zip方法瓮具,將pipedRDD合并到原始的labelRDD中。這里用了自定義function的zipPartitions的方法凡人。
val aiRDD = labelRDD.zipPartitions(piped){
(rdd1Iter,rdd2Iter) => {
var result = List[String]()
while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
val firstStr = rdd1Iter.next()
result::=(firstStr.substring(0, firstStr.length - 1) + "," + rdd2Iter.next() + "}")
}
result.iterator
}
}
- hello.py 稍加改造名党,解析RDD的json類型,然后如果某一字段值為f1,則輸出Hello world. 這里輸出的結(jié)果為json串挠轴,以方便和RDD中的json合并传睹。
import sys
import json
import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
while True:
line = sys.stdin.readline()
if not line:
break
data = json.loads(line)
if (data['f1'] == 'f1'):
print '"aiout":"' + sess.run(hello) + '"'
else:
print '"aiout":"No"'
- 這樣簡單的TF hello world程序就可以在原有的Spark Streaming中被launch 起來。
NOTE: 本文只是簡單的在Spark streaming中拉起Python調(diào)用TensorFlow岸晦,后續(xù)還需要從API層面看如何能夠更好的集成欧啤。