導(dǎo)讀
繼 Spark 之后,UC Berkeley AMP 實(shí)驗(yàn)室又推出一重磅高性能AI計(jì)算引擎——Ray甘凭,號(hào)稱(chēng)支持每秒數(shù)百萬(wàn)次任務(wù)調(diào)度稀拐。那么它是怎么做到的呢?在試用之后对蒲,簡(jiǎn)單總結(jié)一下:
-
極簡(jiǎn) Python API 接口:在函數(shù)或者類(lèi)定義時(shí)加上
ray.remote
的裝飾器并做一些微小改變钩蚊,就能將單機(jī)代碼變?yōu)榉植际酱a。這意味著不僅可以遠(yuǎn)程執(zhí)行純函數(shù)蹈矮,還可以遠(yuǎn)程注冊(cè)一個(gè)類(lèi)(Actor模型)砰逻,在其中維護(hù)大量context(成員變量),并遠(yuǎn)程調(diào)用其成員方法來(lái)改變這些上下文泛鸟。 - 高效數(shù)據(jù)存儲(chǔ)和傳輸:每個(gè)節(jié)點(diǎn)上通過(guò)共享內(nèi)存(多進(jìn)程訪問(wèn)無(wú)需拷貝)維護(hù)了一塊局部的對(duì)象存儲(chǔ)蝠咆,然后利用專(zhuān)門(mén)優(yōu)化過(guò)的 Apache Arrow格式來(lái)進(jìn)行不同節(jié)點(diǎn)間的數(shù)據(jù)交換。
- 動(dòng)態(tài)圖計(jì)算模型:這一點(diǎn)得益于前兩點(diǎn)北滥,將遠(yuǎn)程調(diào)用返回的 future 句柄傳給其他的遠(yuǎn)程函數(shù)或者角色方法刚操,即通過(guò)遠(yuǎn)程函數(shù)的嵌套調(diào)用構(gòu)建復(fù)雜的計(jì)算拓?fù)洌⒒趯?duì)象存儲(chǔ)的發(fā)布訂閱模式來(lái)進(jìn)行動(dòng)態(tài)觸發(fā)執(zhí)行。
- 全局狀態(tài)維護(hù):將全局的控制狀態(tài)(而非數(shù)據(jù))利用 Redis 分片來(lái)維護(hù)等浊,使得其他組件可以方便的進(jìn)行平滑擴(kuò)展和錯(cuò)誤恢復(fù)流部。當(dāng)然,每個(gè) redis 分片通過(guò) chain-replica 來(lái)避免單點(diǎn)鉴逞。
- 兩層調(diào)度架構(gòu):分本地調(diào)度器和全局調(diào)度器;任務(wù)請(qǐng)求首先被提交到本地調(diào)度器司训,本地調(diào)度器會(huì)盡量在本地執(zhí)行任務(wù)构捡,以減少網(wǎng)絡(luò)開(kāi)銷(xiāo)。在資源約束壳猜、數(shù)據(jù)依賴(lài)或者負(fù)載狀況不符合期望時(shí)勾徽,會(huì)轉(zhuǎn)給全局調(diào)度器來(lái)進(jìn)行全局調(diào)度。
當(dāng)然统扳,還有一些需要優(yōu)化的地方喘帚,比如 Job 級(jí)別的封裝(以進(jìn)行多租戶(hù)資源配給),待優(yōu)化的垃圾回收算法(針對(duì)對(duì)象存儲(chǔ)咒钟,現(xiàn)在只是粗暴的 LRU)啥辨,多語(yǔ)言支持(最近支持了Java,但不知道好不好用)等等盯腌。但是瑕不掩瑜溉知,其架構(gòu)設(shè)計(jì)和實(shí)現(xiàn)思路還是有很多可以借鑒的地方。
作者:青藤木鳥(niǎo) https://www.qtmuniao.com, 轉(zhuǎn)載請(qǐng)注明出處
動(dòng)機(jī)和需求
(開(kāi)發(fā) Ray 的動(dòng)機(jī)始于強(qiáng)化學(xué)習(xí)(RL),但是由于其計(jì)算模型強(qiáng)大表達(dá)能力级乍,使用絕不限于 RL舌劳。這一小節(jié)是以描述 RL 系統(tǒng)需求為契機(jī),引出 Ray 的初始設(shè)計(jì)方向玫荣。但是由于不大熟悉強(qiáng)化學(xué)習(xí)甚淡,一些名詞可能表達(dá)翻譯不準(zhǔn)確。如果只對(duì)其架構(gòu)感興趣捅厂,完全可以跳過(guò)這一節(jié))
圖1:一個(gè) RL 系統(tǒng)的例子
我們從考慮 RL 系統(tǒng)的基本組件開(kāi)始贯卦,逐漸完善 Ray 的需求。如圖1所示焙贷,在一個(gè) RL 系統(tǒng)的的設(shè)定中撵割,智能體(agent)會(huì)反復(fù)與環(huán)境(environment)進(jìn)行交互。智能體的目標(biāo)是學(xué)習(xí)出一種最大化獎(jiǎng)勵(lì)(reward)的策略辙芍。策略(policy)本質(zhì)上是從環(huán)境中狀態(tài)到行為抉擇(action)的一種映射啡彬。至于環(huán)境,智能體故硅,狀態(tài)庶灿,行為和獎(jiǎng)勵(lì)值的詳細(xì)定義,則是由具體的應(yīng)用所決定的吃衅。
為了學(xué)習(xí)策略往踢,智能體通常要進(jìn)行兩步操作:1)策略評(píng)估(policy evaluation)和 2)策略?xún)?yōu)化(policy improvement)。為了評(píng)估一個(gè)策略徘层,智能體和環(huán)境持續(xù)進(jìn)行交互(一般是仿真的環(huán)境)以產(chǎn)生軌跡(trajectories)峻呕。軌跡是在當(dāng)前環(huán)境和給定策略下產(chǎn)生的一個(gè)二元組(狀態(tài),獎(jiǎng)勵(lì)值)序列惑灵。然后,智能體根據(jù)這些軌跡來(lái)反饋優(yōu)化該策略眼耀,即英支,向最大化獎(jiǎng)勵(lì)值的梯度方向更新策略。圖2展示了智能體用來(lái)學(xué)習(xí)策略一個(gè)例子的偽碼哮伟。該偽碼通過(guò)調(diào)用 rollout(environment, policy)
來(lái)評(píng)估策略干花,進(jìn)而產(chǎn)生仿真軌跡。train_policy()
接著會(huì)用這些軌跡作為輸入楞黄,調(diào)用 policy.update(trajectories)
來(lái)優(yōu)化當(dāng)前策略池凄。會(huì)重復(fù)迭代這個(gè)過(guò)程直到策略收斂。
// evaluate policy by interacting with env. (e.g., simulator)
rollout(policy, environment):
trajectory = []
state = environment.initial_state()
while (not environment.has_terminated()):
action = policy.compute(state) // Serving
state, reward = environment.step(action) // Simulation
trajectory.append(state, reward)
return trajectory
// improve policy iteratively until it converges
train_policy(environment):
policy = initial_policy()
while (policy has not converged):
trajectories = []
for i from 1 to k:
// evaluate policy by generating k rollouts
trajectories.append(rollout(policy, environment))
// improve policy
policy = policy.update(trajectories) // Training
return policy
圖2:一段用于學(xué)習(xí)策略的典型的偽代碼
由此看來(lái)鬼廓,針對(duì) RL 應(yīng)用的計(jì)算框架需要高效的支持模型訓(xùn)練(training)肿仑,在線預(yù)測(cè)(serving)和平臺(tái)仿真(simulation)(如圖1所示)。接下來(lái),我們簡(jiǎn)要說(shuō)明一下這些工作負(fù)載(workloads)尤慰。
模型訓(xùn)練一般會(huì)涉及到在分布式的環(huán)境中跑隨機(jī)梯度下降模型(stochastic gradient descent馏锡,SGD)來(lái)更新策略。而分布式 SGD 通常依賴(lài)于 allreduce 聚合步驟或參數(shù)服務(wù)器(parameter server).
在線預(yù)測(cè) 使用已經(jīng)訓(xùn)練好的策略并基于當(dāng)前環(huán)境來(lái)給出動(dòng)作決策伟端。預(yù)測(cè)系統(tǒng)通常要求降低預(yù)測(cè)延遲杯道,提高決策頻次。為了支持?jǐn)U展责蝠,最好能夠?qū)⒇?fù)載均攤到多節(jié)點(diǎn)上來(lái)協(xié)同進(jìn)行預(yù)測(cè)党巾。
最后,大多數(shù)現(xiàn)存的 RL 應(yīng)用使用仿真(simulations) 來(lái)對(duì)策略進(jìn)行評(píng)估——因?yàn)楝F(xiàn)有的 RL 算法不足以單獨(dú)依賴(lài)從與物理世界的交互中高效的進(jìn)行取樣霜医。這些仿真器在復(fù)雜度上跨度極大齿拂。也許只需要幾毫秒(如模擬國(guó)際象棋游戲中的移動(dòng)),也許會(huì)需要幾分鐘(如為了一個(gè)自動(dòng)駕駛的車(chē)輛模擬真實(shí)的環(huán)境)支子。
與模型訓(xùn)練和在線預(yù)測(cè)可以在不同系統(tǒng)中進(jìn)行處理的監(jiān)督學(xué)習(xí)相比创肥, RL 中所有三種工作負(fù)載都被緊耦合在了單個(gè)應(yīng)用中,并且對(duì)不同負(fù)載間的延遲要求很苛刻≈蹬螅現(xiàn)有的系統(tǒng)中還沒(méi)有能同時(shí)支持三種工作負(fù)載的叹侄。理論上,可以將多個(gè)專(zhuān)用系統(tǒng)組合到一塊來(lái)提供所有能力昨登,但實(shí)際上趾代,子系統(tǒng)間的結(jié)果傳輸?shù)难舆t在 RL 下是不可忍受的。因此丰辣,RL 的研究人員和從業(yè)者不得不針對(duì)每個(gè)需求單獨(dú)構(gòu)建多套一次性的專(zhuān)用系統(tǒng)撒强。
這些現(xiàn)狀要求為 RL 開(kāi)發(fā)全新的分布式框架,可以有效地支持訓(xùn)練笙什,預(yù)測(cè)和仿真飘哨。尤其是,這樣的框架應(yīng)具有以下能力:
支持細(xì)粒度琐凭,異構(gòu)的計(jì)算芽隆。RL 計(jì)算的運(yùn)行持續(xù)時(shí)間往往從數(shù)毫秒(做一個(gè)簡(jiǎn)單的動(dòng)作)到數(shù)小時(shí)(訓(xùn)練一個(gè)復(fù)雜的策略)。此外统屈,模型訓(xùn)練通常需要各種異構(gòu)的硬件支持(如CPU胚吁,GPU或者TPU)。
提供靈活的計(jì)算模型愁憔。RL 應(yīng)用同時(shí)具有有狀態(tài)和無(wú)狀態(tài)類(lèi)型的計(jì)算腕扶。無(wú)狀態(tài)的計(jì)算可以在系統(tǒng)中的任何節(jié)點(diǎn)進(jìn)行執(zhí)行,從而可以方便的進(jìn)行負(fù)載均衡和按需的數(shù)據(jù)傳輸吨掌。因此半抱,無(wú)狀態(tài)的計(jì)算非常適合細(xì)粒度的仿真和數(shù)據(jù)處理脓恕,例如從視頻或圖片中提取特征。相比之下代虾,有狀態(tài)的計(jì)算適合用來(lái)實(shí)現(xiàn)參數(shù)服務(wù)器进肯、在支持 GPU 運(yùn)算的數(shù)據(jù)上進(jìn)行重復(fù)迭代或者運(yùn)行不暴露內(nèi)部狀態(tài)參數(shù)的第三方仿真器。
動(dòng)態(tài)的執(zhí)行能力棉磨。RL 應(yīng)用中的很多模塊要求動(dòng)態(tài)的進(jìn)行執(zhí)行江掩,因?yàn)樗麄冇?jì)算完成的順序并不總是預(yù)先確定(例如:仿真的完成順序),并且乘瓤,一個(gè)計(jì)算的運(yùn)行結(jié)果可以決定是否執(zhí)行數(shù)個(gè)將來(lái)的計(jì)算(如环形,某個(gè)仿真的運(yùn)行結(jié)果將決定我們是否運(yùn)行更多的仿真)。
除此之外衙傀,我們提出了兩個(gè)額外的要求抬吟。首先,為了高效的利用大型集群统抬,框架必須支持每秒鐘數(shù)百萬(wàn)次的任務(wù)調(diào)度火本。其次,框架不是為了支持從頭開(kāi)始實(shí)現(xiàn)深度神經(jīng)網(wǎng)絡(luò)或者復(fù)雜的仿真器聪建,而是必須和現(xiàn)有的仿真器(OpenAI gym等)和深度學(xué)習(xí)框架(如TensorFlow钙畔,MXNet,Caffe金麸, PyTorch)無(wú)縫集成擎析。
語(yǔ)言和計(jì)算模型
Ray 實(shí)現(xiàn)了動(dòng)態(tài)任務(wù)圖計(jì)算模型,即挥下,Ray 將應(yīng)用建模為一個(gè)在運(yùn)行過(guò)程中動(dòng)態(tài)生成依賴(lài)的任務(wù)圖揍魂。在此模型之上,Ray 提供了角色模型(Actor)和并行任務(wù)模型(task-parallel)的編程范式棚瘟。Ray 對(duì)混合計(jì)算范式的支持使其有別于與像 CIEL 一樣只提供并行任務(wù)抽象和像 Orleans 或 Akka 一樣只提供角色模型抽象的系統(tǒng)现斋。
編程模型
任務(wù)模型(Tasks)。一個(gè)任務(wù)表示一個(gè)在無(wú)狀態(tài)工作進(jìn)程執(zhí)行的遠(yuǎn)程函數(shù)(remote function)偎蘸。當(dāng)一個(gè)遠(yuǎn)程函數(shù)被調(diào)用的時(shí)候庄蹋,表示任務(wù)結(jié)果的 future 會(huì)立即被返回(也就是說(shuō)所有的遠(yuǎn)程函數(shù)調(diào)用都是異步的,調(diào)用后會(huì)立即返回一個(gè)任務(wù)句柄)禀苦÷希可以將 Futures傳給 ray.get()
以阻塞的方式獲取結(jié)果遂鹊,也可以將 Futures 作為參數(shù)傳給其他遠(yuǎn)程函數(shù)振乏,以非阻塞、事件觸發(fā)的方式進(jìn)行執(zhí)行(后者是構(gòu)造動(dòng)態(tài)拓?fù)鋱D的精髓)秉扑。Futures 的這兩個(gè)特性讓用戶(hù)在構(gòu)造并行任務(wù)的同時(shí)指定其依賴(lài)關(guān)系慧邮。下表是 Ray 的所有 API(相當(dāng)簡(jiǎn)潔而強(qiáng)大调限,但是實(shí)現(xiàn)起來(lái)會(huì)有很多坑,畢竟所有裝飾有 ray.remote
的函數(shù)或者類(lèi)及其上下文都要序列化后傳給遠(yuǎn)端節(jié)點(diǎn)误澳,序列化用的和 PySpark 一樣的 cloudpickle)耻矮。
Name | Description |
---|---|
futures = f.remote(args) | Execute function f remotely. f.remote() can take objects or futures as inputs and returns one or more futures. This is non-blocking. |
objects = ray.get(futures) | Return the values associated with one or more futures. This is blocking. |
ready futures = ray.wait(futures, k, timeout) | Return the futures whose corresponding tasks have completed as soon as either k have completed or the timeout expires. |
actor = Class.remote(args) futures = actor.method.remote(args) |
Instantiate class Class as a remote actor, and return a handle to it. Call a method on the remote actor and return one or more futures. Both are non-blocking. |
表1 Ray API
遠(yuǎn)程函數(shù)作用于不可變的物體上,并且應(yīng)該是無(wú)狀態(tài)的并且沒(méi)有副作用的:這些函數(shù)的輸出僅取決于他們的輸入(純函數(shù))忆谓。這意味著冪等性(idempotence)裆装,獲取結(jié)果出錯(cuò)時(shí)只需要重新執(zhí)行該函數(shù)即可,從而簡(jiǎn)化容錯(cuò)設(shè)計(jì)倡缠。
角色模型(Actors)哨免。一個(gè)角色對(duì)象代表一個(gè)有狀態(tài)的計(jì)算過(guò)程。每個(gè)角色對(duì)象暴露了一組可以被遠(yuǎn)程調(diào)用昙沦,并且按調(diào)用順序依次執(zhí)行的成員方法(即在同一個(gè)角色對(duì)象內(nèi)是串行執(zhí)行的琢唾,以保證角色狀態(tài)正確的進(jìn)行更新)。一個(gè)角色方法的執(zhí)行過(guò)程和普通任務(wù)一樣盾饮,也會(huì)在遠(yuǎn)端(每個(gè)角色對(duì)象會(huì)對(duì)應(yīng)一個(gè)遠(yuǎn)端進(jìn)程)執(zhí)行并且立即返回一個(gè) future采桃;但不同的是,角色方法會(huì)運(yùn)行在一個(gè)有狀態(tài)(stateful)的工作進(jìn)程上丘损。一個(gè)角色對(duì)象的句柄(handle)可以傳遞給其他角色對(duì)象或者遠(yuǎn)程任務(wù)普办,從而使他們能夠在該角色對(duì)象上調(diào)用這些成員函數(shù)。
Tasks | Actors |
---|---|
細(xì)粒度的負(fù)載均衡 | 粗粒度的負(fù)載均衡 |
支持對(duì)象的局部性(對(duì)象存儲(chǔ)cache) | 比較差的局部性支持 |
微小更新開(kāi)銷(xiāo)很高 | 微小更新開(kāi)銷(xiāo)不大 |
高效的錯(cuò)誤處理 | 檢查點(diǎn)(checkpoint)恢復(fù)帶來(lái)較高開(kāi)銷(xiāo) |
表2 任務(wù)模型 vs. 角色模型的對(duì)比
表2 比較了任務(wù)模型和角色模型在不同維度上的優(yōu)劣号俐。任務(wù)模型利用集群節(jié)點(diǎn)的負(fù)載信息和依賴(lài)數(shù)據(jù)的位置信息來(lái)實(shí)現(xiàn)細(xì)粒度的負(fù)載均衡泌豆,即每個(gè)任務(wù)可以被調(diào)度到存儲(chǔ)了其所需參數(shù)對(duì)象的空閑節(jié)點(diǎn)上;并且不需要過(guò)多的額外開(kāi)銷(xiāo)吏饿,因?yàn)椴恍枰O(shè)置檢查點(diǎn)和進(jìn)行中間狀態(tài)的恢復(fù)踪危。與之相比,角色模型提供了極高效的細(xì)粒度的更新支持猪落,因?yàn)檫@些更新作用在內(nèi)部狀態(tài)(即角色成員變量所維護(hù)的上下文信息)而非外部對(duì)象(比如遠(yuǎn)程對(duì)象贞远,需要先同步到本地)。后者通常來(lái)說(shuō)需要進(jìn)行序列化和反序列化(還需要進(jìn)行網(wǎng)絡(luò)傳輸笨忌,因此往往很費(fèi)時(shí)間)蓝仲。例如,角色模型可以用來(lái)實(shí)現(xiàn)參數(shù)服務(wù)器(parameter servers)和基于GPU 的迭代式計(jì)算(如訓(xùn)練)官疲。此外袱结,角色模型可以用來(lái)包裹第三方仿真器(simulators)或者其他難以序列化的對(duì)象(比如某些模型)。
為了滿(mǎn)足異構(gòu)性和可擴(kuò)展性途凫,我們從三個(gè)方面增強(qiáng)了 API 的設(shè)計(jì)垢夹。首先,為了處理長(zhǎng)短不一的并發(fā)任務(wù)维费,我們引入了 ray.wait()
果元,它可以等待前 k 個(gè)結(jié)果滿(mǎn)足了就返回促王;而不是像 ray.get()
一樣,必須等待所有結(jié)果都滿(mǎn)足后才返回而晒。其次蝇狼,為了處理對(duì)不同資源緯度( resource-heterogeneous)需求的任務(wù),我們讓用戶(hù)可以指定所需資源用量(例如裝飾器:ray.remote(gpu_nums=1)
)倡怎,從而讓調(diào)度系統(tǒng)可以高效的管理資源(即提供一種交互手段迅耘,讓調(diào)度系統(tǒng)在調(diào)度任務(wù)時(shí)相對(duì)不那么盲目)。最后监署,為了提靈活性豹障,我們?cè)试S構(gòu)造嵌套遠(yuǎn)程函數(shù)(nested remote functions),意味著在一個(gè)遠(yuǎn)程函數(shù)內(nèi)可以調(diào)用另一個(gè)遠(yuǎn)程函數(shù)焦匈。這對(duì)于獲得高擴(kuò)展性是至關(guān)重要的血公,因?yàn)樗试S多個(gè)進(jìn)程以分布式的方式相互調(diào)用(這一點(diǎn)是很強(qiáng)大的,通過(guò)合理設(shè)計(jì)函數(shù)缓熟,可以使得可以并行部分都變成遠(yuǎn)程函數(shù)累魔,從而提高并行性)。
計(jì)算模型
Ray 采用的動(dòng)態(tài)圖計(jì)算模型够滑,在該模型中垦写,當(dāng)輸入可用(即任務(wù)依賴(lài)的所有輸入對(duì)象都被同步到了任務(wù)所在節(jié)點(diǎn)上)時(shí),遠(yuǎn)程函數(shù)和角色方法會(huì)自動(dòng)被觸發(fā)執(zhí)行彰触。在這一小節(jié)梯投,我們會(huì)詳細(xì)描述如何從一個(gè)用戶(hù)程序(圖3)來(lái)構(gòu)建計(jì)算圖(圖4)。該程序使用了表1 的API 實(shí)現(xiàn)了圖2 的偽碼况毅。
@ray.remote
def create_policy():
# Initialize the policy randomly. return policy
@ray.remote(num_gpus=1)
class Simulator(object):
def __init__(self):
# Initialize the environment. self.env = Environment()
def rollout(self, policy, num_steps):
observations = []
observation = self.env.current_state()
for _ in range(num_steps):
action = policy(observation)
observation = self.env.step(action)
observations.append(observation)
return observations
@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
# Update the policy.
return policy
@ray.remote
def train_policy():
# Create a policy.
policy_id = create_policy.remote()
# Create 10 actors.
simulators = [Simulator.remote() for _ in range(10)] # Do 100 steps of training.
for _ in range(100):
# Perform one rollout on each actor.
rollout_ids = [s.rollout.remote(policy_id)
for s in simulators]
# Update the policy with the rollouts.
policy_id =
update_policy.remote(policy_id, *rollout_ids)
return ray.get(policy_id)
圖3:在 Ray 中實(shí)現(xiàn)圖2邏輯的代碼分蓖,注意裝飾器 @ray.remote 會(huì)將被注解的方法或類(lèi)聲明為遠(yuǎn)程函數(shù)或者角色對(duì)象。調(diào)用遠(yuǎn)程函數(shù)或者角色方法后會(huì)立即返回一個(gè) future 句柄尔许,該句柄可以被傳遞給隨后的遠(yuǎn)程函數(shù)或者角色方法么鹤,以此來(lái)表達(dá)數(shù)據(jù)間的依賴(lài)關(guān)系。每個(gè)角色對(duì)象包含一個(gè)環(huán)境對(duì)象 self.env 味廊,這個(gè)環(huán)境狀態(tài)為所有角色方法所共享蒸甜。
在不考慮角色對(duì)象的情況下,在一個(gè)計(jì)算圖中有兩種類(lèi)型的點(diǎn):數(shù)據(jù)對(duì)象(data objects)和遠(yuǎn)程函數(shù)調(diào)用(或者說(shuō)任務(wù))余佛。同樣柠新,也有兩種類(lèi)型的邊:數(shù)據(jù)邊(data edges)和控制邊(control edges)。數(shù)據(jù)邊表達(dá)了數(shù)據(jù)對(duì)象任務(wù)間的依賴(lài)關(guān)系辉巡。更確切來(lái)說(shuō)恨憎,如果數(shù)據(jù)對(duì)象 D 是任務(wù) T 的輸出,我們就會(huì)增加一條從 T 到 D 的邊红氯。類(lèi)似的框咙,如果 D 是 任務(wù) T 的輸入,我們就會(huì)增加一條 D 到 T 的邊痢甘±觯控制邊表達(dá)了由于遠(yuǎn)程函數(shù)嵌套調(diào)用所造成的計(jì)算依賴(lài)關(guān)系郎笆,即匆背,如果任務(wù) T1 調(diào)用任務(wù) T2, 我們就會(huì)增加一條 T1 到 T2 的控制邊域仇。
在計(jì)算圖中放椰,角色方法調(diào)用也被表示成了節(jié)點(diǎn)作烟。除了一個(gè)關(guān)鍵不同點(diǎn)外,他們和任務(wù)調(diào)用間的依賴(lài)關(guān)系基本一樣砾医。為了表達(dá)同一個(gè)角色對(duì)象上的連續(xù)方法調(diào)用所形成的狀態(tài)依賴(lài)關(guān)系拿撩,我們向計(jì)算圖添加第三種類(lèi)型的邊:在同一個(gè)角色對(duì)象上,如果角色方法 Mj 緊接著 Mi 被調(diào)用如蚜,我們就會(huì)添加一條 Mi 到 Mj 的狀態(tài)邊(即 Mi 調(diào)用后會(huì)改變角色對(duì)象中的某些狀態(tài)压恒,或者說(shuō)成員變量;然后這些變化后的成員變量會(huì)作為 Mj 調(diào)用的隱式輸入错邦;由此,Mi 到 Mj 間形成了某種隱式依賴(lài)關(guān)系)撬呢。這樣一來(lái)伦吠,作用在同一角色對(duì)象上的所有方法調(diào)用會(huì)形成一條由狀態(tài)邊串起來(lái)的調(diào)用鏈(chain,見(jiàn)圖4)魂拦。這條調(diào)用鏈表達(dá)了同一角色對(duì)象上方法被調(diào)用的前后相繼的依賴(lài)關(guān)系毛仪。
圖4:該圖與圖3 train_policy.remote()
調(diào)用相對(duì)應(yīng)。遠(yuǎn)程函數(shù)調(diào)用和角色方法調(diào)用對(duì)應(yīng)圖中的任務(wù)(tasks)芯勘。該圖中顯示了兩個(gè)角色對(duì)象A10和A20潭千,每個(gè)角色對(duì)象的方法調(diào)用(被標(biāo)記為 A1i 和 A2i 的兩個(gè)任務(wù))之間都有狀態(tài)邊(stateful edge)連接,表示這些調(diào)用間共享可變的角色狀態(tài)借尿。從 train_policy
到被其調(diào)用的任務(wù)間有控制邊連接刨晴。為了并行地訓(xùn)練多種策略,我們可以調(diào)用 train_policy.remote()
多次路翻。
狀態(tài)邊讓我們將角色對(duì)象嵌入到無(wú)狀態(tài)的任務(wù)圖中狈癞,因?yàn)樗麄儽磉_(dá)出了共享狀態(tài)、前后相繼的兩個(gè)角色方法調(diào)用之間的隱式數(shù)據(jù)依賴(lài)關(guān)系茂契。狀態(tài)邊的添加還可以讓我們維護(hù)譜系圖(lineage)蝶桶,如其他數(shù)據(jù)流系統(tǒng)一樣,我們也會(huì)跟蹤數(shù)據(jù)的譜系關(guān)系以在必要的時(shí)候進(jìn)行數(shù)據(jù)的重建掉冶。通過(guò)顯式的將狀態(tài)邊引入數(shù)據(jù)譜系圖中真竖,我們可以方便的對(duì)數(shù)據(jù)進(jìn)行重建脐雪,不管這些數(shù)據(jù)是遠(yuǎn)程函數(shù)產(chǎn)生的還是角色方法產(chǎn)生的(小節(jié)4.2.3中會(huì)詳細(xì)講)。
架構(gòu)
Ray 的架構(gòu)組成包括兩部分:
- 實(shí)現(xiàn) API 的應(yīng)用層恢共,現(xiàn)在包括 Python 和 Java分別實(shí)現(xiàn)的版本战秋。
- 提供高擴(kuò)展性和容錯(cuò)的系統(tǒng)層,用 C++ 寫(xiě)的讨韭,以CPython的形式嵌入包中脂信。
圖5:Ray 的架構(gòu)包括兩部分:系統(tǒng)層和應(yīng)用層。前者實(shí)現(xiàn)了API和計(jì)算模型透硝,后者實(shí)現(xiàn)了任務(wù)調(diào)度和數(shù)據(jù)管理狰闪,以滿(mǎn)足性能要求和容錯(cuò)需求
應(yīng)用層
應(yīng)用層包括三種類(lèi)型的進(jìn)程:
- 驅(qū)動(dòng)進(jìn)程(Driver): 用來(lái)執(zhí)行用戶(hù)程序。
-
工作進(jìn)程(Worker):用來(lái)執(zhí)行 Driver 或者其他 Worker 指派的任務(wù)(remote functions濒生,就是用戶(hù)代碼中裝飾了
@ray.remote
的那些函數(shù))的無(wú)狀態(tài)進(jìn)程埋泵。工作進(jìn)程在節(jié)點(diǎn)啟動(dòng)時(shí)被自動(dòng)啟動(dòng),一般來(lái)說(shuō)會(huì)在每個(gè)物理機(jī)上啟動(dòng)與 CPU 同樣數(shù)量的 Worker(這里還有些問(wèn)題:如果節(jié)點(diǎn)是容器的話(huà)罪治,獲取的仍然是其所在物理機(jī)的 CPU 數(shù))秋泄。當(dāng)一個(gè)遠(yuǎn)程函數(shù)被聲明時(shí),會(huì)被注冊(cè)到全局规阀,并推送到所有 Worker恒序。每個(gè) Worker 順序的執(zhí)行任務(wù),并且不維護(hù)本地狀態(tài)谁撼。 - 角色進(jìn)程(Actor):用來(lái)執(zhí)行角色方法的有狀態(tài)進(jìn)程歧胁。與 Worker 被自動(dòng)的啟動(dòng)不同,每個(gè) Actor 會(huì)根據(jù)需求(即被調(diào)用時(shí))被工作進(jìn)程或者驅(qū)動(dòng)進(jìn)程顯示啟動(dòng)厉碟。和 Worker 一樣喊巍,Actor 也會(huì)順序的執(zhí)行任務(wù),不同的是箍鼓,下一個(gè)任務(wù)的執(zhí)行依賴(lài)于前一個(gè)任務(wù)生成或改變的狀態(tài)(即 Actor 的成員變量)崭参。
系統(tǒng)層
系統(tǒng)層包括三個(gè)主要組件:全局控制存儲(chǔ)(GCS,global control store)款咖,分布式調(diào)度器(distributed scheduler)和分布式對(duì)象存儲(chǔ)(distributed object store)何暮。所有組件都可以進(jìn)行水平擴(kuò)展并且支持容錯(cuò)。
全局控制存儲(chǔ)(GCS)
全局狀態(tài)存儲(chǔ)維護(hù)著系統(tǒng)全局的控制狀態(tài)信息铐殃,是我們系統(tǒng)獨(dú)創(chuàng)的一個(gè)部件海洼。其核心是一個(gè)可以進(jìn)行發(fā)布訂閱的鍵值對(duì)存儲(chǔ)。我們通過(guò)分片(sharding)來(lái)應(yīng)對(duì)擴(kuò)展富腊,每片存儲(chǔ)通過(guò)鏈?zhǔn)礁北?/a>(將所有數(shù)據(jù)副本組織成鏈表坏逢,來(lái)保證強(qiáng)一致性,見(jiàn)04年的一篇論文)來(lái)提供容錯(cuò)。提出和設(shè)計(jì)這樣一個(gè)GCS的動(dòng)機(jī)在于使系統(tǒng)能夠每秒進(jìn)行數(shù)百萬(wàn)次的任務(wù)創(chuàng)建和調(diào)度是整,并且延遲較低肖揣,容錯(cuò)方便。
對(duì)于節(jié)點(diǎn)故障的容錯(cuò)需要一個(gè)能夠記錄譜系信息(lineage information)的方案「∪耄現(xiàn)有的基于譜系的解決方法側(cè)重粗粒度(比如 Spark 的 rdd)的并行龙优,因此可以只利用單個(gè)節(jié)點(diǎn)(如Master or Driver)存儲(chǔ)譜系信息,而不影響性能舵盈。然而,這種設(shè)計(jì)并不適合像仿真(simulation)一樣的細(xì)粒度球化、動(dòng)態(tài)的作業(yè)類(lèi)型(workload)秽晚。因此我們將譜系信息的存儲(chǔ)與系統(tǒng)其它模塊解耦,使之可以獨(dú)立地動(dòng)態(tài)擴(kuò)容筒愚。
保持低延遲意味著要盡可能降低任務(wù)調(diào)度的開(kāi)銷(xiāo)赴蝇。具體來(lái)說(shuō),一個(gè)調(diào)度過(guò)程包括選擇節(jié)點(diǎn)巢掺,分派任務(wù)句伶,拉取遠(yuǎn)端依賴(lài)對(duì)象等等。很多現(xiàn)有的信息流系統(tǒng)陆淀,將其所有對(duì)象的位置考余、大小等信息集中存儲(chǔ)在調(diào)度器上,使得上述調(diào)度過(guò)程耦合在一塊轧苫。當(dāng)調(diào)度器不是瓶頸的時(shí)候楚堤,這是一個(gè)很簡(jiǎn)單自然的設(shè)計(jì)。然而含懊,考慮到 Ray 要處理的數(shù)據(jù)量級(jí)和數(shù)據(jù)粒度身冬,需要將中心調(diào)度器從關(guān)鍵路徑中移出(否則如果所有調(diào)度都要全局調(diào)度器經(jīng)手處理,它肯定會(huì)成為瓶頸)岔乔。對(duì)于像 allreduce 這樣的(傳輸頻繁酥筝,對(duì)延遲敏感)分布式訓(xùn)練很重要的原語(yǔ)來(lái)說(shuō),每個(gè)對(duì)象傳輸時(shí)都經(jīng)手調(diào)度器的開(kāi)銷(xiāo)是不可容忍的雏门。 因此嘿歌,我們將對(duì)象的元數(shù)據(jù)存儲(chǔ)在 GCS 中而不是中央調(diào)度器里,從而將任務(wù)分派與任務(wù)調(diào)度完全解耦茁影。
總的來(lái)說(shuō)搅幅,GCS 極大地簡(jiǎn)化了 Ray 的整體設(shè)計(jì),因?yàn)樗鼘⑺袪顟B(tài)攬下呼胚,從而使得系統(tǒng)中其他部分都變成無(wú)狀態(tài)茄唐。這不僅使得對(duì)容錯(cuò)支持簡(jiǎn)化了很多(即,每個(gè)故障節(jié)點(diǎn)恢復(fù)時(shí)只需要從 GCS 中讀取譜系信息就行),也使得分布式的對(duì)象存儲(chǔ)和調(diào)度器可以進(jìn)行獨(dú)立的擴(kuò)展(因?yàn)樗薪M件可以通過(guò) GCS 來(lái)獲取必要的信息)沪编。還有一個(gè)額外的好處呼盆,就是可以更方便的開(kāi)發(fā)調(diào)試、監(jiān)控和可視化工具蚁廓。
自下而上的分布式調(diào)度系統(tǒng)(Bottom-up Distributed Scheduler)
如前面提到的访圃,Ray 需要支持每秒數(shù)百萬(wàn)次任務(wù)調(diào)度,這些任務(wù)可能只持續(xù)短短數(shù)毫秒相嵌。大部分已知的調(diào)度策略都不滿(mǎn)足這些需求腿时。常見(jiàn)的集群計(jì)算框架,如 Spark饭宾, CIEL批糟, Dryad 都實(shí)現(xiàn)了一個(gè)中心的調(diào)度器。這些調(diào)度器具有很好的局部性(局部性原理)的特點(diǎn)看铆,但是往往會(huì)有數(shù)十毫秒的延遲徽鼎。像 work stealing,Sparrow 和 Canary 一樣的的分布式調(diào)度器的確能做到高并發(fā)弹惦,但是往往不考慮數(shù)據(jù)的局部性特點(diǎn)否淤,或者假設(shè)任務(wù)(tasks)屬于不同的作業(yè)(job),或者假設(shè)計(jì)算拓?fù)涫翘崆爸赖摹?/p>
為了滿(mǎn)足上述需求棠隐,我們?cè)O(shè)計(jì)了一個(gè)兩層調(diào)度架構(gòu)石抡,包括一個(gè)全局調(diào)度器(global scheduler)和每個(gè)節(jié)點(diǎn)上的本地調(diào)度器(local scheduler)。為了避免全局調(diào)度器過(guò)載助泽,每個(gè)節(jié)點(diǎn)(node)上創(chuàng)建的任務(wù)會(huì)被先提交到本地調(diào)度器汁雷。本地調(diào)度器總是先嘗試將任務(wù)在本地執(zhí)行,除非其所在機(jī)器過(guò)載(比如任務(wù)隊(duì)列超過(guò)了預(yù)定義的閾值)或者不能滿(mǎn)足任務(wù)任務(wù)的資源需求(比如报咳,缺少 GPU)侠讯。如果本地調(diào)度器發(fā)現(xiàn)不能在本地執(zhí)行某個(gè)任務(wù),會(huì)將其轉(zhuǎn)發(fā)給全局調(diào)度器暑刃。由于調(diào)度系統(tǒng)都傾向于首先在本地調(diào)度任務(wù)(即在調(diào)度結(jié)構(gòu)層級(jí)中的葉子節(jié)點(diǎn))厢漩,我們將其稱(chēng)為自下而上的調(diào)度系統(tǒng)(可以看出,本地調(diào)度器只是根據(jù)本節(jié)點(diǎn)的局部負(fù)載信息進(jìn)行調(diào)度岩臣,而全局調(diào)度器會(huì)根據(jù)全局負(fù)載來(lái)分派任務(wù)溜嗜;當(dāng)然前提是資源約束首先得被滿(mǎn)足)。
圖6 這是調(diào)度系統(tǒng)示意圖架谎,任務(wù)自下而上被提交:任務(wù)首先被驅(qū)動(dòng)進(jìn)程(Drivers)或者工作進(jìn)程(Workers)提交到本地調(diào)度器炸宵,然后在需要的時(shí)候會(huì)由本地調(diào)度器轉(zhuǎn)給全局調(diào)度器進(jìn)行處理。圖中箭頭的粗細(xì)程度代表其請(qǐng)求的繁忙程度谷扣。
全局調(diào)度器根據(jù)每個(gè)節(jié)點(diǎn)的負(fù)載狀況和資源請(qǐng)求約束來(lái)決定調(diào)度策略土全。細(xì)化一下就是捎琐,全局調(diào)度器首先確定所有滿(mǎn)足任務(wù)資源要求的節(jié)點(diǎn),然后在其中選擇具有最小預(yù)估排隊(duì)時(shí)間(estimated waiting time)的一個(gè)裹匙,將任務(wù)調(diào)度過(guò)去瑞凑。在給定的節(jié)點(diǎn)上,預(yù)估排隊(duì)時(shí)間是下述兩項(xiàng)時(shí)間的和:1)任務(wù)在節(jié)點(diǎn)上的排隊(duì)時(shí)間 (任務(wù)隊(duì)列長(zhǎng)度乘上平均執(zhí)行時(shí)間)概页; 2)任務(wù)依賴(lài)的遠(yuǎn)程對(duì)象的預(yù)估傳輸時(shí)間(所有遠(yuǎn)程輸入的大小除以平均帶寬)籽御。全局調(diào)度器通過(guò)心跳獲取到每個(gè)節(jié)點(diǎn)的任務(wù)排隊(duì)情況和可用資源信息,從 GCS 中得到任務(wù)所有輸入的位置和大小惰匙。然后技掏,全局調(diào)度器通過(guò)移動(dòng)指數(shù)平均(exponential averaging)的方法來(lái)計(jì)算任務(wù)平均執(zhí)行時(shí)間和平均傳輸帶寬。如果全局調(diào)度器成為了系統(tǒng)瓶頸项鬼,我們可以實(shí)例化更多的副本來(lái)分?jǐn)偭髁垦剖幔鼈兺ㄟ^(guò) GCS來(lái)共享全局狀態(tài)信息。如此一來(lái)秃臣,我們的調(diào)度架構(gòu)具有極高可擴(kuò)展性涧衙。
任務(wù)生命周期
(注:這部分是從代碼中的設(shè)計(jì)文檔翻譯而來(lái)哪工,注意這只是截止到2019.04.21 的設(shè)計(jì))
在實(shí)現(xiàn)的時(shí)候奥此,每個(gè)任務(wù)具有以下幾種狀態(tài)。任意時(shí)刻雁比,任務(wù)都會(huì)處在這幾種狀態(tài)之一稚虎。
- 可放置(Placeable):任務(wù)已經(jīng)準(zhǔn)備好被調(diào)度到(本地或者遠(yuǎn)程)節(jié)點(diǎn)上,具體如何調(diào)度偎捎,前一段已經(jīng)說(shuō)明蠢终。注意該狀態(tài)不表示放置位置已經(jīng)最終確定,還可能被再一次被從某處調(diào)度出去茴她。
- 等待角色創(chuàng)建(WaitActorCreation):一個(gè)角色方法(task)等待其所在角色實(shí)例化完畢寻拂。一旦角色被創(chuàng)建,該任務(wù)會(huì)被轉(zhuǎn)給運(yùn)行該角色的遠(yuǎn)端機(jī)器進(jìn)行處理丈牢。
- 等待中(Waiting):等待該任務(wù)參數(shù)需求被滿(mǎn)足祭钉,即,等待所有遠(yuǎn)端參數(shù)對(duì)象傳送到本地對(duì)象存儲(chǔ)中己沛。
- 準(zhǔn)備好(Ready):任務(wù)準(zhǔn)備好了被運(yùn)行慌核,也就說(shuō)所有所需參數(shù)已經(jīng)在本地對(duì)象存儲(chǔ)中就位了。
- 運(yùn)行中(Running):任務(wù)已經(jīng)被分派申尼,并且正在本地工作進(jìn)程(worker)或者角色進(jìn)程(actor)中運(yùn)行垮卓。
- 被阻塞(Blocked):當(dāng)前任務(wù)由于其依賴(lài)的數(shù)據(jù)不可用而被阻塞住。如师幕,嵌套調(diào)用時(shí)粟按,該任務(wù)啟動(dòng)了另外的遠(yuǎn)程任務(wù)并且等待其完成,以取得結(jié)果。
- 不可行(infeasible):任務(wù)的資源要求在任何一臺(tái)機(jī)器上都得不到滿(mǎn)足钾怔。
---------------------------------
| |
| forward | forward
|---------------- |
node with ------| | arguments |
resources forward| | resource | local | actor/worker
joins | v available | --------> | available
---------------------- Placeable ----------> Waiting Ready ---------> Running
| | | ^ ^ <-------- ^ | ^
| |--------- | | | local arg | | |
| | | | | evicted | worker | | worker
| | actor | | | | blocked | | unblocked
| resources | created | | actor | --------------- | |
| infeasible | | | created | actor | |
| | | | (remote) | created v |
| | v | | (local) Blocked
| | WaitForActorCreation----------
| v
----Infeasible
基于內(nèi)存的分布式對(duì)象存儲(chǔ)
為了降低任務(wù)的延遲碱呼,我們實(shí)現(xiàn)了一個(gè)基于內(nèi)存的分布式存儲(chǔ)系統(tǒng)以存儲(chǔ)每個(gè)任務(wù)(無(wú)狀態(tài)的計(jì)算過(guò)程)的輸入和輸出。在每個(gè)節(jié)點(diǎn)上宗侦,我們以共享內(nèi)存(shared memory)的方式實(shí)現(xiàn)了對(duì)象存儲(chǔ)愚臀。這使得同一節(jié)點(diǎn)上的不同任務(wù)以零拷貝的代價(jià)進(jìn)行數(shù)據(jù)共享。至于數(shù)據(jù)格式矾利,我們選擇了 Apache Arrow姑裂。
如果一個(gè)任務(wù)的輸入(即函數(shù)的參數(shù)對(duì)象)不在本地,在該任務(wù)執(zhí)行之前男旗,輸入會(huì)被拷貝到本地的對(duì)象存儲(chǔ)中舶斧。同時(shí),任務(wù)執(zhí)行完畢后察皇,會(huì)將輸出也寫(xiě)到本地得對(duì)象存儲(chǔ)中茴厉。對(duì)象拷貝消除了熱數(shù)據(jù)所造成的潛在的瓶頸,并且通過(guò)將任務(wù)的數(shù)據(jù)讀寫(xiě)都限制在本地內(nèi)存中以縮短執(zhí)行時(shí)間什荣。這些做法增加了計(jì)算密集型工作任務(wù)的吞吐量矾缓,而很多 AI 應(yīng)用都是計(jì)算密集型的。為了降低延遲稻爬,我們將用到的對(duì)象全部放在內(nèi)存中嗜闻,只有在內(nèi)存不夠的時(shí)候才通過(guò) LRU 算法將一些對(duì)象擠出內(nèi)存(從API 可以看出,每個(gè)節(jié)點(diǎn)的內(nèi)存上限可以在啟動(dòng)節(jié)點(diǎn)時(shí)通過(guò)參數(shù)指定桅锄。此外用 LRU 作為垃圾回收算法還是有點(diǎn)粗暴琉雳,如果不同類(lèi)型的任務(wù)負(fù)載跑在同一個(gè) ray 集群上,可能導(dǎo)致資源的互相爭(zhēng)搶?zhuān)瑥亩写罅康馁Y源換出然后重建友瘤,從而嚴(yán)重影響效率)翠肘。
和現(xiàn)有的計(jì)算框架的集群(如Spark, Dryad)一樣辫秧,對(duì)象存儲(chǔ)只接受不可變數(shù)據(jù)(immutable data)束倍。這種設(shè)計(jì)避免了對(duì)復(fù)雜的一致性協(xié)議的需求(因?yàn)閷?duì)象數(shù)據(jù)從來(lái)不需要進(jìn)行更新),并且簡(jiǎn)化了數(shù)據(jù)的容錯(cuò)支持茶没。當(dāng)有節(jié)點(diǎn)出現(xiàn)故障時(shí)肌幽,Ray 通過(guò)重新執(zhí)行對(duì)象譜系圖來(lái)恢復(fù)任意所需對(duì)象(也就是說(shuō)不用整個(gè)恢復(fù)該宕機(jī)節(jié)點(diǎn)所有狀態(tài),只需要按需恢復(fù)后面計(jì)算所需數(shù)據(jù)抓半,用不到的數(shù)據(jù)丟了就丟了吧)喂急。在工作開(kāi)始之前,存放在 GCS 的譜系信息追蹤了所有無(wú)狀態(tài)的任務(wù)和有狀態(tài)的角色笛求;我們利用前者對(duì)丟失對(duì)象進(jìn)行重建(結(jié)合上一段廊移,如果一個(gè)任務(wù)有大量的迭代糕簿,并且都是遠(yuǎn)程執(zhí)行狡孔,會(huì)造成大量的中間結(jié)果對(duì)象懂诗,將內(nèi)存擠爆,從而使得較少使用但是稍后可能使用的全局變量擠出內(nèi)存苗膝,所以 LRU 有點(diǎn)粗暴殃恒,聽(tīng)說(shuō)現(xiàn)在在醞釀基于引用計(jì)數(shù)的GC)。
為了簡(jiǎn)化實(shí)現(xiàn)辱揭,我們的對(duì)象存儲(chǔ)不支持分布式的對(duì)象离唐。也就是說(shuō),每個(gè)對(duì)象必須能夠在單節(jié)點(diǎn)內(nèi)存下问窃,并且只存在于單節(jié)點(diǎn)中亥鬓。對(duì)于大矩陣、樹(shù)狀結(jié)構(gòu)等大對(duì)象域庇,可以在應(yīng)用層來(lái)拆分處理嵌戈,比如說(shuō)實(shí)現(xiàn)為一個(gè)集合。
實(shí)現(xiàn)
Ray 是一個(gè)由加州大學(xué)伯克利分校開(kāi)發(fā)的一個(gè)活躍的開(kāi)源項(xiàng)目听皿。Ray 深度整合了 Python熟呛,你可以通過(guò) pip install ray
來(lái)安裝 ray。整個(gè)代碼實(shí)現(xiàn)包括大約 40K 行写穴,其中有 72% C++ 實(shí)現(xiàn)的系統(tǒng)層和 28% 的 Python 實(shí)現(xiàn)的應(yīng)用層(截止目前惰拱,又增加了對(duì) Java 的支持)。GCS 的每個(gè)分片使用了一個(gè) Redis 的 key-val 存儲(chǔ),并且只設(shè)計(jì)單個(gè)鍵值對(duì)操作墙牌。GCS 的表通過(guò)按任務(wù)ID沉填、數(shù)據(jù)對(duì)象集合進(jìn)行切分來(lái)進(jìn)行平滑擴(kuò)展。每一片利用鏈?zhǔn)饺哂嗖呗?/a>(chained-replcated)來(lái)容錯(cuò)秧耗。我們將本地調(diào)度器和全局調(diào)度器都實(shí)現(xiàn)為了單線程、事件驅(qū)動(dòng)的進(jìn)程。本地調(diào)度器緩存了本地對(duì)象元信息篷朵,被阻塞的任務(wù)隊(duì)列和等待調(diào)度的任務(wù)隊(duì)列。為了在不同節(jié)點(diǎn)的對(duì)象存儲(chǔ)之間無(wú)感知的傳輸超大對(duì)象婆排,我們將大對(duì)象切片声旺,利用多條 TCP 連接來(lái)并行傳。
將所有碎片捏一塊
圖 7 通過(guò)一個(gè)簡(jiǎn)單的 a
加 b
(a段只,b
可以是標(biāo)量腮猖,向量或者矩陣)然后返回 c
的例子展示了 Ray 端到端的工作流。遠(yuǎn)程函數(shù) add()
在初始化 ( ray.init
) 的時(shí)候赞枕,會(huì)自動(dòng)地被注冊(cè)到 GCS 中澈缺,進(jìn)而分發(fā)到集群中的每個(gè)工作進(jìn)程坪创。(圖7a 的第零步)
圖7a 展示了當(dāng)一個(gè)驅(qū)動(dòng)進(jìn)程(driver)調(diào)用 add.remote(a, b)
,并且 a, b
分別存在節(jié)點(diǎn) N1 和 N2 上時(shí) 姐赡,Ray 的每一步操作莱预。驅(qū)動(dòng)進(jìn)程將任務(wù) add(a, b)
提交到本地調(diào)度器(步驟1),然后該任務(wù)請(qǐng)求被轉(zhuǎn)到全局調(diào)度器(步驟2)(如前所述项滑,如果本地任務(wù)排隊(duì)隊(duì)列沒(méi)有超過(guò)設(shè)定閾值依沮,該任務(wù)也可以在本地進(jìn)行執(zhí)行)。接著枪狂,全局調(diào)度器開(kāi)始在 GCS 中查找 add(a, b)
請(qǐng)求中參數(shù) a, b
的位置(步驟3)悉抵,從而決定將該任務(wù)調(diào)度到節(jié)點(diǎn) N2 上(因?yàn)?N2 上有其中一個(gè)參數(shù) b
)(步驟4)。N2 節(jié)點(diǎn)上的本地調(diào)度器收到請(qǐng)求后(發(fā)現(xiàn)滿(mǎn)足本地調(diào)度策略的條件摘完,如滿(mǎn)足資源約束姥饰,排隊(duì)隊(duì)列也沒(méi)超過(guò)閾值,就會(huì)在本地開(kāi)始執(zhí)行該任務(wù))孝治,會(huì)檢查本地對(duì)象存儲(chǔ)中是否存在任務(wù) add(a, b)
的所有輸入?yún)?shù)(步驟5)列粪。由于本地對(duì)象存儲(chǔ)中沒(méi)有對(duì)象 a
,工作進(jìn)程會(huì)在 GCS 中查找 a
的位置(步驟6)谈飒。 這時(shí)候發(fā)現(xiàn) a
存儲(chǔ)在 N1 中岂座,于是將其同步到本地的對(duì)象存儲(chǔ)中(步驟7)。由于任務(wù) add()
所有的輸入?yún)?shù)對(duì)象都存在了本地存儲(chǔ)中杭措,本地調(diào)度器將在本地工作進(jìn)程中執(zhí)行 add()
(步驟8)费什,并通過(guò)共享存儲(chǔ)訪問(wèn)輸入?yún)?shù)(步驟9)。
圖 7b 展現(xiàn)了在 N1 上執(zhí)行 ray.get()
和在 N2 上執(zhí)行 add()
后所觸發(fā)的逐步的操作手素。一旦 ray.get(id)
被調(diào)用鸳址,N1 上的用戶(hù)驅(qū)動(dòng)進(jìn)程會(huì)在本地對(duì)象存儲(chǔ)中查看該 id (即由遠(yuǎn)程調(diào)用 add()
返回的 future 值,所有 object id 是全局唯一的泉懦,GCS 可以保證這一點(diǎn))對(duì)應(yīng)的對(duì)象 c
是否存在(步驟1)稿黍。由于本地對(duì)象存儲(chǔ)中沒(méi)有 c
, 驅(qū)動(dòng)進(jìn)程會(huì)去 GCS 中查找 c
的位置。在此時(shí)崩哩,發(fā)現(xiàn) GCS 中并沒(méi)有 c 的存在巡球,因?yàn)?c 根本還沒(méi)有被創(chuàng)建出來(lái)。 于是邓嘹,N1 的對(duì)象存儲(chǔ)向 GCS 中的對(duì)象表(Object Table)注冊(cè)了一個(gè)回調(diào)函數(shù)酣栈,以監(jiān)聽(tīng) c
對(duì)象被創(chuàng)建事件(步驟2)。與此同時(shí)汹押,在節(jié)點(diǎn) N2 上矿筝,add() 任務(wù)執(zhí)行完畢,將結(jié)果 c
存到其本地對(duì)象存儲(chǔ)中(步驟3)鲸阻,同時(shí)也將 c
的位置信息添加到 GCS 的對(duì)象存儲(chǔ)表中(步驟4)跋涣。GCS 監(jiān)測(cè)到 c
的創(chuàng)建缨睡,會(huì)去觸發(fā)之前 N1 的對(duì)象存儲(chǔ)注冊(cè)的回調(diào)函數(shù)(步驟5)。接下來(lái)陈辱,N1 的對(duì)象存儲(chǔ)將 c
從 N2 中同步過(guò)去(步驟6)奖年,從而結(jié)束該任務(wù)。
盡管這個(gè)例子中涉及了大量的 RPC調(diào)用沛贪,但對(duì)于大部分情況來(lái)說(shuō)陋守,RPC 的數(shù)量會(huì)小的多,因?yàn)榇蟛糠秩蝿?wù)會(huì)在本地被調(diào)度執(zhí)行利赋,而且 GCS 回復(fù)的對(duì)象信息會(huì)被本地調(diào)度器和全局調(diào)度器緩存(但是另一方面水评,執(zhí)行了大量遠(yuǎn)程任務(wù)之后,本地對(duì)象存儲(chǔ)很容易被撐爆)媚送。
名詞對(duì)照
workloads:工作負(fù)載中燥,即描述任務(wù)需要做的工作。
GCS: Global Control Store塘偎,全局控制信息存儲(chǔ)疗涉。
Object Table:存在于 GCS 中的對(duì)象表,記錄了所有對(duì)象的位置等信息(objectId -> location)吟秩。
Object Store:本地對(duì)象存儲(chǔ)咱扣,在實(shí)現(xiàn)中叫 Plasma,即存儲(chǔ)任務(wù)所需對(duì)象的實(shí)例涵防。
Lineage:血統(tǒng)信息闹伪,譜系信息;即計(jì)算時(shí)的數(shù)據(jù)變換前后的相繼關(guān)系圖壮池。
Node:節(jié)點(diǎn)偏瓤;Ray 集群中的每個(gè)物理節(jié)點(diǎn)。
Driver火窒、Worker:驅(qū)動(dòng)進(jìn)程和工作進(jìn)程硼补,物理表現(xiàn)形式都是 Node 上的進(jìn)程驮肉。但前者是用戶(hù)側(cè)使用 ray.init
時(shí)候生成的熏矿,隨著 ray.shutdown
會(huì)進(jìn)行銷(xiāo)毀。后者是 ray 在啟動(dòng)的時(shí)在每個(gè)節(jié)點(diǎn)啟動(dòng)的無(wú)狀態(tài)的駐留工作進(jìn)程离钝,一般和物理機(jī) CPU 數(shù)量相同票编。
Actor:角色對(duì)象,語(yǔ)言層面卵渴,就是一個(gè)類(lèi)慧域;物理層面,表現(xiàn)為某個(gè)節(jié)點(diǎn)上的一個(gè)角色進(jìn)程浪读,維護(hù)了該角色對(duì)象內(nèi)的所有上下文(角色成員變量)昔榴。
Actor method:角色方法辛藻,語(yǔ)言層面,就是類(lèi)的成員方法互订;其所有輸入包括顯式的函數(shù)參數(shù)和隱式的成員變量吱肌。
Remote function:遠(yuǎn)程函數(shù),即通過(guò) @ray.remote 注冊(cè)到系統(tǒng)的函數(shù)仰禽。在其被調(diào)度時(shí)氮墨,稱(chēng)為一個(gè)任務(wù)(Task)。
Job吐葵,Task:文中用到了不少 Job 和 Task 的概念规揪,但是這兩個(gè)概念在 CS 中其實(shí)定義比較模糊,不如進(jìn)程和線程一般明確温峭。Task 在本論文是對(duì)一個(gè)遠(yuǎn)程函數(shù)(remote action)或者一個(gè) actor 的遠(yuǎn)程方法(remote method)的封裝猛铅。而 Job 在當(dāng)前的實(shí)現(xiàn)中并不存在,只是一個(gè)邏輯上的概念凤藏,其含義為運(yùn)行一次用戶(hù)側(cè)代碼所所涉及到的所有生成的 Task 以及產(chǎn)生的狀態(tài)的集合奕坟。
Scheduler:paper 中統(tǒng)一用的 scheduler,但是有的是指部分(local scheduler 和 global scheduler)清笨,這時(shí)我翻譯為調(diào)度器月杉,有時(shí)候是指 Ray 中所有調(diào)度器構(gòu)成的整體,這時(shí)我翻譯為調(diào)度系統(tǒng)抠艾。
exponential averaging:我翻譯成了移動(dòng)指數(shù)平均苛萎,雖然他沒(méi)有寫(xiě)移動(dòng)。對(duì)于剛過(guò)去的前 n 項(xiàng)检号,以隨著時(shí)間漸進(jìn)指數(shù)增長(zhǎng)的權(quán)重做加權(quán)平均腌歉。計(jì)算時(shí)候可以通過(guò)滑動(dòng)窗口的概念方便的遞推計(jì)算。
Future:這個(gè)不大好翻譯齐苛,大概意思就是對(duì)于異步調(diào)用中的返回值句柄翘盖。相信信息可以參見(jiàn)維基百科 Future 和 promise。
引用
[1] 官方文檔:https://ray.readthedocs.io/en/latest/
[2] 系統(tǒng)論文:https://www.usenix.org/system/files/osdi18-moritz.pdf
[3] 系統(tǒng)源碼:https://github.com/ray-project/ray
歡迎關(guān)注公眾號(hào):分布式點(diǎn)滴凹蜂,獲取更多分布式系統(tǒng)文章