Spark Streaming中簡單粗暴執(zhí)行TensorFlow

Spark Streaming =>很火眉抬,在流處理中得到了廣泛的應(yīng)用描孟。TensorFlow=>很火,由Google大神開源锻梳,目前已經(jīng)在深度學(xué)習(xí)領(lǐng)域展現(xiàn)了超高的流行潛質(zhì)抖棘。那么如何在Spark Streaming中調(diào)用TensorFlow茂腥?筆者此文嘗試使用了簡單粗暴的方式在Spark Streaming中調(diào)用TensorFlow.

1. 需求和目標(biāo)

筆者已經(jīng)實現(xiàn)了一個基于Spark Streaming的流處理平臺狸涌,能夠?qū)afka輸入的流數(shù)據(jù)進行運算,并且可以通過Sql的方式對數(shù)據(jù)進行過濾和邏輯判斷础芍。
筆者玩了一段時間的TensorFlow杈抢,一直在琢磨在機器學(xué)習(xí)的Model Serving階段,能否由Kafka喂數(shù)據(jù)仑性,將TF融入到已有的流處理中惶楼,流入的每條數(shù)據(jù)都可以進行predict,處理結(jié)果還能通過Spark DataFrame進行SQL配置過濾诊杆。

2. 第一次嘗試歼捐,失敗。

因為在已有的Spark Streaming代碼框架中晨汹,有一個自定義的函數(shù)接口豹储,能夠依次處理RDD的每一個record,所以剛開始考慮在此函數(shù)接口中通過調(diào)用Python腳本實現(xiàn)調(diào)用TensorFlow淘这。
步驟如下:

  • 在Spark executor 執(zhí)行的機器上安裝TensorFlow環(huán)境
  • 在每個Record的執(zhí)行邏輯中import sys.process._ 剥扣,并調(diào)用外部TF的Python腳本。偽代碼如下:
//加入依賴庫
import sys.process._
    // 執(zhí)行TF hello world腳本
    val result = "sudo python /data/tf/hello.py".!!
    // 把執(zhí)行腳本的結(jié)果輸出到Map铝穷,最終在另外的地方輸出到Kafka中
    labelMap += (isAILabel -> result.toString)

hello.py腳本非常簡單钠怯,就是簡單的調(diào)用TensorFlow的hello world.

import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print sess.run(hello)
  • 看起來很簡單很粗暴,最簡單的方法而已曙聂。運行的結(jié)果能正常調(diào)用起來晦炊,但是因為sys.process._是fork出進程去執(zhí)行腳本命令,在流處理過程中宁脊,如果每條數(shù)據(jù)去fork新的進程断国,會導(dǎo)致執(zhí)行效率非常底下。因此榆苞,在實際測試中稳衬,一臺機器每個macro batch 700 - 800 這樣極少的數(shù)據(jù)下,執(zhí)行一個批次竟然都需要消耗5分鐘以上坐漏。
    因此宋彼,這種方法雖然看似簡單,但無法使用仙畦。
3. 第二種簡單粗暴方式 RDD pipe.

對每一條數(shù)據(jù)執(zhí)行TF腳本不可取,那么換個思路音婶,針對整個RDD進行處理慨畸,嘗試使用了RDD pipe的方法進行處理。在測試中性能比上一種方法有了極大的提升衣式,基本上在幾秒內(nèi)處理完成寸士。
方法如下:

  • 在Spark executor 執(zhí)行的機器上安裝TensorFlow環(huán)境
  • 在RDD處理中檐什,使用rdd的pipe方法,執(zhí)行python TF的腳本弱卡,然后生成新的pipedRDD乃正。
val piped = labelRDD.pipe("sudo python /data/tf/hello.py", Map("SEPARATOR" -> ","))
  • 因為在筆者環(huán)境中,原始的rdd是json類型的婶博,需要使用zip方法瓮具,將pipedRDD合并到原始的labelRDD中。這里用了自定義function的zipPartitions的方法凡人。
val aiRDD = labelRDD.zipPartitions(piped){
      (rdd1Iter,rdd2Iter) => {
           var result = List[String]()
           while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
               val firstStr = rdd1Iter.next()
               result::=(firstStr.substring(0, firstStr.length - 1) + "," + rdd2Iter.next() + "}")
                }
                result.iterator
              }
            }
  • hello.py 稍加改造名党,解析RDD的json類型,然后如果某一字段值為f1,則輸出Hello world. 這里輸出的結(jié)果為json串挠轴,以方便和RDD中的json合并传睹。
import sys
import json
import tensorflow as tf

hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()

while True:
  line = sys.stdin.readline()
  if not line:
     break
  data = json.loads(line)
  if (data['f1'] == 'f1'):
      print '"aiout":"' + sess.run(hello) + '"'
  else:
      print '"aiout":"No"'
  • 這樣簡單的TF hello world程序就可以在原有的Spark Streaming中被launch 起來。

NOTE: 本文只是簡單的在Spark streaming中拉起Python調(diào)用TensorFlow岸晦,后續(xù)還需要從API層面看如何能夠更好的集成欧啤。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市启上,隨后出現(xiàn)的幾起案子邢隧,更是在濱河造成了極大的恐慌,老刑警劉巖碧绞,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件府框,死亡現(xiàn)場離奇詭異,居然都是意外死亡讥邻,警方通過查閱死者的電腦和手機迫靖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來兴使,“玉大人系宜,你說我怎么就攤上這事》⑵牵” “怎么了盹牧?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長励幼。 經(jīng)常有香客問我汰寓,道長,這世上最難降的妖魔是什么苹粟? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任有滑,我火速辦了婚禮,結(jié)果婚禮上嵌削,老公的妹妹穿的比我還像新娘毛好。我一直安慰自己望艺,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布肌访。 她就那樣靜靜地躺著找默,像睡著了一般。 火紅的嫁衣襯著肌膚如雪吼驶。 梳的紋絲不亂的頭發(fā)上惩激,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音旨剥,去河邊找鬼咧欣。 笑死,一個胖子當(dāng)著我的面吹牛轨帜,可吹牛的內(nèi)容都是我干的魄咕。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼蚌父,長吁一口氣:“原來是場噩夢啊……” “哼哮兰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起苟弛,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤喝滞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后膏秫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體右遭,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年缤削,在試婚紗的時候發(fā)現(xiàn)自己被綠了窘哈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡亭敢,死狀恐怖滚婉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情帅刀,我是刑警寧澤让腹,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站扣溺,受9級特大地震影響骇窍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜锥余,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一像鸡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦只估、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至荠医,卻和暖如春吁脱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背彬向。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工兼贡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人娃胆。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓遍希,卻偏偏與公主長得像,于是被迫代替她去往敵國和親里烦。 傳聞我的和親對象是個殘疾皇子凿蒜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容