tensorflowOnSpark.version == '1.1.0'
在TFSparkNode.py run方法里下面這段代碼
# for ps nodes only, wait indefinitely in foreground thread for a "control" event (None == "stop")
if job_name == 'ps':
queue = TFSparkNode.mgr.get_queue('control')
done = False
while not done:
msg = queue.get(block=True) # 阻塞executor
logging.info("Got msg: {0}".format(msg))
if msg is None:
logging.info("Terminating PS")
TFSparkNode.mgr.set('state', 'stopped')
done = True
queue.task_done()
通過阻塞ps所在的spark executor肪跋,在后續(xù)訓(xùn)練冷尉、預(yù)測過程時(shí)講ps所在的executor中的數(shù)據(jù)轉(zhuǎn)發(fā)到其他空閑的executor中計(jì)算,這樣每個(gè)executor的角色只能是tensorflow work / tensorflow ps server中的一個(gè)