Paramter Server
? Author:lyp@ Date:2017/01/26
在深度神經(jīng)網(wǎng)絡(luò)計算框架中祷蝌,參數(shù)服務(wù)器是一個非常重要的基礎(chǔ)概念,而其不同的實現(xiàn)對計算效果和計算能力都有直接的影響买窟。
請你自學(xué)參數(shù)服務(wù)器的概念宠默,并給出一個綜述,介紹什么是參數(shù)服務(wù)器,它對機器學(xué)習(xí)的作用是什么觉壶,一般實現(xiàn)有哪些方案,各自又有哪些優(yōu)缺點件缸。
背景
1. 問題的提出铜靶?
? 在大規(guī)模數(shù)據(jù)上跑機器學(xué)習(xí)任務(wù)是過去十多年內(nèi)系統(tǒng)架構(gòu)師面臨的主要挑戰(zhàn)之一,許多模型和抽象先后用于這一任務(wù)停团。
? 現(xiàn)在的大數(shù)據(jù)機器學(xué)習(xí)系統(tǒng)旷坦,通常數(shù)據(jù)在1TB到1PB之間,參數(shù)范圍在10^9 和10^12左右佑稠。而往往這些模型的參數(shù)需要被所有的worker節(jié)點頻繁的訪問秒梅,這就會帶來很多問題和挑戰(zhàn):
- 訪問這些巨量的參數(shù),需要大量的網(wǎng)絡(luò)帶寬支持舌胶;
- 很多機器學(xué)習(xí)算法都是連續(xù)型的捆蜀,只有上一次迭代完成(各個worker都完成)之后,才能進行下一次迭代幔嫂,這就導(dǎo)致了如果機器之間性能差距大(木桶理論)辆它,就會造成性能的極大損失;
- 在分布式中履恩,容錯能力是非常重要的锰茉。很多情況下,算法都是部署到云環(huán)境中的(這種環(huán)境下切心,機器是不可靠的飒筑,并且job也是有可能被搶占的);
2. 業(yè)內(nèi)如何解決绽昏?
? 如何解決這些問題呢协屡?對于機器學(xué)習(xí)分布式優(yōu)化,有很多大公司在做了全谤,包括:Amazon肤晓,Baidu,F(xiàn)acebook认然,Google补憾,Microsoft 和 Yahoo。也有一些開源的項目卷员,比如:YahooLDA 和 Petuum 和Graphlab余蟹。
? 從最開始的MPI,到Hadoop子刮,Spark 以及Paramter Server威酒。都曾廣泛應(yīng)用于機器學(xué)習(xí)處理任務(wù)窑睁。總結(jié)一下:
- ==MPI Gradient Aggregation==:批任務(wù)求解器的速度不高,無法支持大規(guī)模數(shù)據(jù)集。
- ==MapReduce==:解決了MPI無法支撐大數(shù)據(jù)的問題举户。但無法改進批處理求解器的訓(xùn)練性能,并且還引入了新的問題箫津,包括迭代式計算的低效,節(jié)點之間通信低效宰啦。
- ==GraphLab==:基于圖的抽象苏遥。用圖來做抽象可以解決許多機器學(xué)習(xí)問題,但仍然有許多問題無法很好高效求解赡模,比如深度學(xué)習(xí)中的多層結(jié)構(gòu)田炭。
- ==Parameter Server==:跟基于圖的方法主要區(qū)別在于把模型參數(shù)存儲和更新上升為主要組件,并且采用了異步機制提升處理能力漓柑。
Paramter Server發(fā)展歷程
? 參數(shù)服務(wù)器也經(jīng)歷了多次發(fā)展教硫。
-
第一代參數(shù)服務(wù)器
? 參數(shù)服務(wù)器的概念最早來自Alex Smola于2010年提出的并行LDA的框架[4]。它通過采用一個分布式的Memcached作為存放參數(shù)的存儲辆布,這樣就提供了有效的機制用于在分布式系統(tǒng)不同的Worker節(jié)點之間同步模型參數(shù)瞬矩,而每個Worker只需要保存它計算時所依賴的一小部分參數(shù)即可。當(dāng)然锋玲,這里存放參數(shù)的存儲跟做OLTP應(yīng)用中的Key-Value抽象有所不同景用,因為以Key-Value為單元進行頻繁的參數(shù)數(shù)據(jù)交互會導(dǎo)致過高的通信開銷,因此參數(shù)服務(wù)器通常采用數(shù)學(xué)封裝來進行參數(shù)同步惭蹂,比如向量伞插,張量,矩陣的行列等剿干。
-
第二代參數(shù)服務(wù)器
? 對第二代分布式架構(gòu)做改進的初步嘗試是Petuun,他使用一個有限制的延遲模型穆刻,同時在工作線程模型上添加更多限制置尔。
-
第三代參數(shù)服務(wù)器
? 來自Alex Smola的高徒——李沐設(shè)計的參數(shù)服務(wù)器。ps-lite應(yīng)當(dāng)屬于第三代參數(shù)服務(wù)器氢伟,提供了更加通用的設(shè)計榜轿。==以下也主要介紹他在論文中提供的ps架構(gòu)==。
Paramter Server架構(gòu)設(shè)計
1. Paramter Server 整體架構(gòu)
PS架構(gòu)主要包括兩大部分朵锣。那就是一個參數(shù)服務(wù)器組server group 和多個工作組谬盐。在parameter server中,每個 server 實際上都只負責(zé)分到的部分參數(shù)(servers共同維持一個全局的共享參數(shù))诚些,而每個 work 也只分到部分數(shù)據(jù)和處理任務(wù)飞傀;
一些概念解釋:
- server 節(jié)點可以跟其他 server 節(jié)點通信皇型,每個server負責(zé)自己分到的參數(shù),server group 共同維持所有參數(shù)的更新砸烦。
- server manager node 負責(zé)維護一些元數(shù)據(jù)的一致性弃鸦,比如各個節(jié)點的狀態(tài),參數(shù)的分配情況等幢痘;
- worker 節(jié)點之間沒有通信唬格,只跟自己對應(yīng)的server進行通信。
- 每個worker group有一個task scheduler颜说,負責(zé)向worker分配任務(wù)购岗,并且監(jiān)控worker的運行情況。當(dāng)有新的worker加入或者退出门粪,task scheduler 負責(zé)重新分配任務(wù)喊积。
- training data 被split多個部分,一個worker在本地將一部分訓(xùn)練數(shù)據(jù)存儲在本地統(tǒng)計數(shù)據(jù)中庄拇。
2. Paramter Server通信設(shè)計
-
==(key,value)==
? parameter server 中注服,參數(shù)都是可以被表示成(key, value)的集合,比如一個最小化損失函數(shù)的問題措近,key就是feature ID溶弟,而value就是它的權(quán)值。對于稀疏參數(shù)瞭郑,不存在的key辜御,就可以認為是0.
-
==(key,value)Vectors==
? 如果每一個參數(shù)都設(shè)一個key,那么會使得通信變得非常頻繁低效屈张,為了抹平這個問題擒权,賦予每個key所對應(yīng)的value向量概念或者矩陣概念。做這樣的操作的前提是假設(shè)參數(shù)是有順序的阁谆。
? 這樣做有兩點好處碳抄,降低網(wǎng)絡(luò)通信和使得向量層面的操作變得可行,從而很多線性庫的優(yōu)化特性可以利用的上场绿,比如BLAS剖效、LAPACK、ATLAS等焰盗。缺點是在對于稀疏模型來說璧尸,總會在向量或者矩陣里會有參數(shù)為0,這在單個參數(shù)狀態(tài)下是不用存的熬拒,所以爷光,造成了數(shù)據(jù)的冗余。
-
==range push and pull==
? workers 跟 servers 之間通過 push 跟 pull 來通信澎粟。worker 通過 push 將計算好的梯度發(fā)送到server蛀序,然后通過 pull 從server更新參數(shù)欢瞪。為了提高計算性能和帶寬效率,parameter server 允許用戶使用 Range Push 跟 Range Pull 操作哼拔;
? 假設(shè) R 是需要push或pull的 key 的range引有,那么可以進行如下操作。就是發(fā)送和接送特定Range中的w倦逐。
w.push(R, dest) w.pull(R, dest)
- ==Asynchronous Tasks and Dependency & Flexible Consistency==
? 體會一下Asynchronous Task 跟 Synchronous Task 的區(qū)別譬正。
? 如果 iter12 需要在 iter11 computation,push 跟 pull 都完成后才能開始檬姥,那么就是Synchronous曾我,反之就是Asynchronous.如iter 11 在 iter10計算完成后就開始執(zhí)行。
![image](http://upload-images.jianshu.io/upload_images/2472711-92152ae529e7940b.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
? 參數(shù)服務(wù)器和工作節(jié)點之間的通信都屬于遠程調(diào)用健民,那么抒巢,遠程調(diào)用是比較耗時的行為,如果每次都保持同步的話秉犹,那么訓(xùn)練相對于單節(jié)點來說是減慢了許多的蛉谜,因為遠程調(diào)用的耗時。因而崇堵,PS框架讓遠程調(diào)用成為一部調(diào)用型诚,比如參數(shù)的push和pull發(fā)出之后,立即使用當(dāng)前值開始進行下一步的梯度計算鸳劳,如上圖狰贯,迭代11發(fā)出push和pull的請求后,立馬開始進行梯度計算赏廓,而此時涵紊,使用的還是迭代10的值。
? **Asynchronous Task**:能夠提高系統(tǒng)的效率(因為節(jié)省了很多等待的過程)幔摸,但是摸柄,它的缺點就是容易降低算法的收斂速率;
? 所以既忆,系統(tǒng)性能跟算法收斂速率之間是存在一個trade-off的驱负,你需要同時考慮:
```xml
算法對于參數(shù)非一致性的敏感度;
訓(xùn)練數(shù)據(jù)特征之間的關(guān)聯(lián)度尿贫;
硬盤的存儲容量电媳;
? 考慮到用戶使用的時候會有不同的情況踏揣,parameter server 為用戶提供了多種任務(wù)依賴方式:
Sequential: 這里其實是 synchronous task庆亡,任務(wù)之間是有順序的,只有上一個任務(wù)完成捞稿,才能開始下一個任務(wù)又谋;
Eventual: 跟 sequential 相反拼缝,所有任務(wù)之間沒有順序,各自獨立完成自己的任務(wù)彰亥,
-
Bounded Delay: 這是sequential 跟 eventual 之間的trade-off咧七,可以設(shè)置一個 τ 作為最大的延時時間。也就是說任斋,只有 >τ 之前的任務(wù)都被完成了继阻,才能開始一個新的任務(wù);極端的情況:
- τ=0废酷,情況就是 Sequential瘟檩;
- τ=∞,情況就是 Eventual澈蟆;
-
==User-defined Filters==
? 作為上述特點的補充墨辛,PS還有這樣一個小feature,即過濾趴俘,在工作節(jié)點這一端對梯度進行過濾睹簇,如果梯度并不是那么影響重大,就不用占用網(wǎng)絡(luò)去更新寥闪。
Paramter Server架構(gòu)實現(xiàn)
1. Vector Clock
? 為參數(shù)服務(wù)器中的每個參數(shù)添加一個時間戳太惠,來跟蹤參數(shù)的更新和防止重復(fù)發(fā)送數(shù)據(jù)〕裙福基于此垛叨,通信中的梯度更新數(shù)據(jù)中也應(yīng)該有時間戳,防止重復(fù)更新柜某。
? 如果每個參數(shù)都有一個時間戳嗽元,那么參數(shù)眾多,時間戳也眾多喂击。好在剂癌,parameter server 在push跟pull的時候,都是rang-based翰绊,這就帶來了一個好處:這個range里面的參數(shù)共享的是同一個時間戳佩谷,這顯然可以大大降低了空間復(fù)雜度。
2. Messages
? Message是節(jié)點間交互的主要格式监嗜。一條 message 包括:時間戳谐檀,len(range)對k-v.
? $[vc(R),(k1,v1),...,(kp,vp)]kj∈Randj∈{1,...p}$
? 這是parameter server 中最基本的通信格式,不僅僅是共享的參數(shù)才有裁奇,task 的message也是這樣的格式桐猬,只要把這里的(key, value) 改成 (task ID, 參數(shù)/返回值)。
? Messages may carry a subset of all available keys within range R. The missing keys are assigned the same timestamp without changing their values.
? 由于機器學(xué)習(xí)問題通常都需要很高的網(wǎng)絡(luò)帶寬刽肠,因此信息的壓縮是必須的溃肪。
- key的壓縮: 因為訓(xùn)練數(shù)據(jù)通常在分配之后都不會發(fā)生改變免胃,因此worker沒有必要每次都發(fā)送相同的key,只需要接收方在第一次接收的時候緩存起來就行了惫撰。第二次羔沙,worker不再需要同時發(fā)送key和value,只需要發(fā)送value 和 key list的hash就行厨钻。這樣瞬間減少了一般的通信量扼雏。
- value的壓縮: 假設(shè)參數(shù)時稀疏的,那么就會有大量的0存在夯膀。因此呢蛤,為了進一步壓縮,我們只需要發(fā)送非0值棍郎。parameter server使用 Snappy 快速壓縮庫來壓縮數(shù)據(jù)其障、高效去除0值。
- key 的壓縮和 value 的壓縮可以同時進行涂佃。
3. Replication and Consistency
? parameter server 在數(shù)據(jù)一致性上励翼,使用的是傳統(tǒng)的一致性哈希算法,參數(shù)key與server node id被插入到一個hash ring中辜荠。具體實現(xiàn)可以參考另一篇blog一致性hash算法詳解汽抚。動態(tài)增加和移除節(jié)點的同時還能保證系統(tǒng)存儲與key分配的性能效率.
? 兩種方式保證slave跟master之間的數(shù)據(jù)一致性:
-
默認的復(fù)制方式: Chain replication (強一致性, 可靠):
a. 更新:只能發(fā)生在數(shù)據(jù)頭節(jié)點,然后更新逐步后移,直到更新到達尾節(jié)點伯病,并由尾節(jié)點向客戶確認更新成功造烁;
b. 查詢:為保證強一致性,客戶查詢只能在尾節(jié)點進行午笛;
- Replication after Aggregation
兩個worker 節(jié)點分別向server傳送x和y惭蟋。server 首先通過一定方式(如:f(x+y) )進行aggregate,然后再進行復(fù)制操作药磺;
當(dāng)有n個worker的時候告组,復(fù)制只需要k/n的帶寬。通常來說癌佩,k(復(fù)制次數(shù))是一個很小的常數(shù)木缝,而n的值大概是幾百到幾千;
4. Server Management
由于key的range特性围辙,當(dāng)參數(shù)服務(wù)器集群中增加一個節(jié)點時我碟,步驟如下:
- server manager節(jié)點給新節(jié)點分配一個key range,這可能會導(dǎo)致其他節(jié)點上的key range切分
- 新節(jié)點從其他節(jié)點上將屬于它的key range數(shù)據(jù)取過來姚建,然后也將slave信息取過來
- server manager廣播節(jié)點變動矫俺,其他節(jié)點得知消息后將不屬于自己key range的數(shù)據(jù)刪掉
? 在第二步,從其他節(jié)點上取數(shù)據(jù)的時候,其他節(jié)點上的操作也分為兩步恳守,第一是拷貝數(shù)據(jù),這可能也會導(dǎo)致key range的切分贩虾。第二是不再接受和這些數(shù)據(jù)有關(guān)的消息催烘,而是進行轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到新節(jié)點缎罢。
? 在第三步伊群,收到廣播信息后,節(jié)點會刪除對應(yīng)區(qū)間的數(shù)據(jù)策精,然后舰始,掃描所有的和R有關(guān)發(fā)送出去的還沒收到回復(fù)的消息,當(dāng)這些消息回復(fù)時咽袜,轉(zhuǎn)發(fā)到新節(jié)點丸卷。
? 節(jié)點的離開與節(jié)點的加入類似。
5. Worker Management
添加工作節(jié)點比添加服務(wù)器節(jié)點要簡單一些询刹,步驟如下:
- task scheduler給新節(jié)點分配一些數(shù)據(jù)
- 節(jié)點從網(wǎng)絡(luò)文件系統(tǒng)中載入數(shù)據(jù)谜嫉,然后從服務(wù)器端拉取參數(shù)
- task scheduler廣播變化,其他節(jié)點free掉一些訓(xùn)練數(shù)據(jù)
? 當(dāng)一個節(jié)點離開的時候凹联,task scheduler可能會尋找一個替代沐兰,但恢復(fù)節(jié)點是十分耗時的工作,同時蔽挠,損失一些數(shù)據(jù)對最后的結(jié)果可能影響并不是很大住闯。所以,系統(tǒng)會讓用戶進行選擇澳淑,是恢復(fù)節(jié)點還是不做處理比原。這種機制甚至可以允許用戶刪掉跑的最慢的節(jié)點來提升速度。
PS-lite 實現(xiàn)
PS-Lite是PS架構(gòu)的一個輕量級的實現(xiàn)杠巡。它提供了push春寿,pull,wait等APIs忽孽。整個項目代碼量不多绑改。
A light and efficient implementation of the parameter server framework. It provides clean yet powerful APIs. For example, a worker node can communicate with the server nodes by
Push(keys, values)
: push a list of (key, value) pairs to the server nodesPull(keys)
: pull the values from servers for a list of keysWait
: wait untill a push or pull finished.A simple example:
std::vector<uint64_t> key = {1, 3, 5}; std::vector<float> val = {1, 1, 1}; std::vector<float> recv_val; ps::KVWorker<float> w; w.Wait(w.Push(key, val)); w.Wait(w.Pull(key, &recv_val));
總體概覽
整個項目的類圖如下:
- Postoffice是全局管理類,單例模式創(chuàng)建兄一。管理當(dāng)前節(jié)點角色厘线、其他節(jié)點的連接、心跳信息出革、配置信息造壮。
- Van是負責(zé)通信的類,是Postoffice的成員。Van中
std::unordered_map<int, void*> senders_
保存了node_id到連接的映射耳璧。Van只是定義了接口成箫,具體實現(xiàn)是依賴ZMQ實現(xiàn)的ZMQVan
。 - Customer用來通信旨枯,跟蹤request和response蹬昌。每一個連接對應(yīng)一個Customer實例,連接對方的id和Customer實例的id相同攀隔。
- SimpleApp是一個基類皂贩;提供了發(fā)送接收int型的head和string型的body消息,以及注冊消息處理函數(shù)昆汹。它有2個派生類明刷。
- KVServer是SimpleApp的派生類,用來保存key-values數(shù)據(jù)满粗。
- KVWorker是SimpleApp的派生類辈末,用來想Server Push/Pull key-value數(shù)據(jù)。
- KVPairs封裝了Key-Value結(jié)構(gòu)映皆,還包含了一個長度選項本冲。
- SArray是Shared array,像智能指針一樣共享數(shù)據(jù)劫扒,接口類似vector檬洞。
- Node封裝了節(jié)點的信息,例如角色沟饥、ip添怔、端口、是否是恢復(fù)節(jié)點贤旷。
- Control封裝了控制信息广料,例如命令類型、目的節(jié)點幼驶、barrier_group的id艾杏、簽名。
- Meta封裝了元數(shù)據(jù)盅藻,發(fā)送者购桑、接受者、時間戳氏淑、請求還是相應(yīng)等勃蜘。
- Message是要發(fā)送的信息,除了元數(shù)據(jù)外假残,還包括發(fā)送的數(shù)據(jù)缭贡。
節(jié)點角色ID
一共三種類型,從上圖可以看出Scheduler節(jié)點只有一個,多個Worker和多個Server可以組成一個Group阳惹,因此有WorkerGroup和ServerGroup谍失;還有Worker節(jié)點和Server節(jié)點。每個節(jié)點以及每一個Group都有唯一確定的ID莹汤。
Scheduler快鱼、ServerGroup、WorkerGroup節(jié)點ID確定如下:
/** \brief node ID for the scheduler */
static const int kScheduler = 1;
/**
* \brief the server node group ID
*
* group id can be combined:
* - kServerGroup + kScheduler means all server nodes and the scheuduler
* - kServerGroup + kWorkerGroup means all server and worker nodes
*/
static const int kServerGroup = 2;
/** \brief the worker node group ID */
static const int kWorkerGroup = 4;
上述定義在base.h中体啰。
1、2嗽仪、4的二進制表示分別為:001荒勇、010、001闻坚。這樣可以做Group之間的合并沽翔,例如要和ServerGroup和WorkerGroup發(fā)信息,只需要destination node id設(shè)為2+4=6窿凤。
1-7用來表示節(jié)點的組合仅偎。單個節(jié)點的ID從8開始。單個Server和單個Worker節(jié)點從自己的rank(0雳殊、1橘沥、2……)轉(zhuǎn)換到其ID:
/**
* \brief convert from a worker rank into a node id
* \param rank the worker rank
*/
static inline int WorkerRankToID(int rank) {
return rank * 2 + 9;
}
/**
* \brief convert from a server rank into a node id
* \param rank the server rank
*/
static inline int ServerRankToID(int rank) {
return rank * 2 + 8;
}
ID到其rank轉(zhuǎn)換:
static inline int IDtoRank(int id) {
return std::max((id - 8) / 2, 0);
}
Postoffice
中td::unordered_map<int, std::vector<int>> node_ids_
==保存了Node/NodeGroup與連接節(jié)點集合的對應(yīng)關(guān)系==。
消息封裝
首先使用了自定義的
SArray
夯秃,Smart Array座咆。共享數(shù)據(jù),減少數(shù)據(jù)拷貝仓洼,且提供了類似vector的接口介陶。==備注:沒有仔細看實現(xiàn),在這里先把他理解成一個數(shù)組色建。==元數(shù)據(jù)
Meta
使用了Protobuf哺呜,進行了數(shù)據(jù)壓縮.具體使用可以參考blog14,鏈接見參考文獻箕戳。具體定義見代碼某残。消息分層比較清晰。
Node
包含節(jié)點的角色陵吸、id驾锰、ip、端口信息走越;Control
包含了命令信息椭豫、簽名等;Meta
是元數(shù)據(jù),包含時間戳赏酥、發(fā)送者喳整、接受者、控制信息等裸扶;Message
才是發(fā)送的信息框都,包含元數(shù)據(jù)和發(fā)送的數(shù)據(jù)。他們之間的構(gòu)造見上圖呵晨。-
參數(shù)有key-value組成魏保,對應(yīng)于
KVPairs
。定義如下struct KVPairs { // /** \brief empty constructor */ // KVPairs() {} /** \brief the list of keys */ SArray<Key> keys; /** \brief the according values */ SArray<Val> vals; /** \brief the according value lengths (could be empty) */ SArray<int> lens; };
通信機制
Scheduler節(jié)點管理所有節(jié)點的地址摸屠。每個節(jié)點要知道Scheduler節(jié)點的IP谓罗、port;啟動時綁定一個本地端口季二,并向Scheduler節(jié)點報告檩咱。==Scheduler節(jié)點在每個幾點啟動后,給節(jié)點分配ID胯舷,把節(jié)點信息通知出去(例如Worker節(jié)點要知道Server節(jié)點IP和端口刻蚯,Server節(jié)點要知道Worker節(jié)點的IP和端口)==。節(jié)點在建立連接后桑嘶,才會正式啟動炊汹。
測試鏈接的過程:
-
test.connection.cc:執(zhí)行ps:start函數(shù)
int main(int argc, char *argv[]) { ps::Start(0); // do nothing ps::Finalize(0, true); return 0; }
-
Start,獲取一個postoffice的單利對象,并調(diào)用start方法逃顶。
inline void Start(int customer_id, const char* argv0 = nullptr) { Postoffice::Get()->Start(customer_id, argv0, true); }
-
postoffice的start函數(shù)兵扬,是主要內(nèi)容。分三步口蝠,1.初始化node器钟,2.start van 3.如果設(shè)置barrier,start barrier妙蔗。
void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) { .... //1. init node info. 多少個worker傲霸。 for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); //i*2+9 獲得workid for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } } for (int i = 0; i < num_servers_; ++i) { int id = ServerRankToID(i); for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup, kServerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } } for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup, kScheduler + kWorkerGroup, kScheduler + kServerGroup}) { node_ids_[g].push_back(kScheduler); } init_stage_++; } ..... //2. start van //已經(jīng)初始化完node,這邊啟動通信眉反。 van_->Start(customer_id); ...... //3. do a barrier here if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler); }
-
此刻昙啄,我們查看van_->Start(customer_id);方法。int init_stage = 0;這個是van初始化最開始的值寸五。
void Van::Start(int customer_id) { // get scheduler info start_mu_.lock(); if (init_stage == 0) { scheduler_.hostname = std::string(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI"))); scheduler_.port = atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT"))); scheduler_.role = Node::SCHEDULER; scheduler_.id = kScheduler; is_scheduler_ = Postoffice::Get()->is_scheduler(); //1. get my node info 獲取節(jié)點信息梳凛,主要是ip port role 等信息 if (is_scheduler_) { my_node_ = scheduler_; } else { auto role = is_scheduler_ ? Node::SCHEDULER : (Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER); const char *nhost = Environment::Get()->find("DMLC_NODE_HOST"); std::string ip; if (nhost) ip = std::string(nhost); if (ip.empty()) { const char *itf = Environment::Get()->find("DMLC_INTERFACE"); std::string interface; if (itf) interface = std::string(itf); if (interface.size()) { GetIP(interface, &ip); } else { GetAvailableInterfaceAndIP(&interface, &ip); } CHECK(!interface.empty()) << "failed to get the interface"; } int port = GetAvailablePort(); const char *pstr = Environment::Get()->find("PORT"); if (pstr) port = atoi(pstr); CHECK(!ip.empty()) << "failed to get ip"; CHECK(port) << "failed to get a port"; my_node_.hostname = ip; my_node_.role = role; my_node_.port = port; // cannot determine my id now, the scheduler will assign it later // set it explicitly to make re-register within a same process possible my_node_.id = Node::kEmpty; my_node_.customer_id = customer_id; } //2. bind. 綁定端口 my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40); PS_VLOG(1) << "Bind to " << my_node_.DebugString(); CHECK_NE(my_node_.port, -1) << "bind failed"; //3. connect to the scheduler 建立連接。具體實現(xiàn)在 zmq中梳杏。 Connect(scheduler_); //4. for debug use if (Environment::Get()->find("PS_DROP_MSG")) { drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG")); } //5. start receiver 開一個新的線程韧拒。來接受信息淹接。 receiver_thread_ = std::unique_ptr<std::thread>( new std::thread(&Van::Receiving, this)); init_stage++; } start_mu_.unlock(); if (!is_scheduler_) { // let the scheduler know myself 如果不是scheduler,發(fā)送message叛溢。告知自己塑悼。 Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); } // wait until ready while (!ready_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } start_mu_.lock(); if (init_stage == 1) { // resender if (Environment::Get()->find("PS_RESEND") && atoi(Environment::Get()->find("PS_RESEND")) != 0) { int timeout = 1000; if (Environment::Get()->find("PS_RESEND_TIMEOUT")) { timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT")); } resender_ = new Resender(timeout, 10, this); } if (!is_scheduler_) { // start heartbeat thread 開啟一個新的縣城 進行心跳檢測。 heartbeat_thread_ = std::unique_ptr<std::thread>( new std::thread(&Van::Heartbeat, this)); } init_stage++; } start_mu_.unlock(); }
-
-
-
至此楷掉,通信連接建立完成厢蒜。
同步策略
異步工作時,Worker計算參數(shù)可能要依賴前面Pull
是否完成烹植。如果需要等待某一步操作斑鸦,可以調(diào)用SimpleApp::Wait
操作。具體實現(xiàn)是調(diào)用了Customer::WaitRequest()
草雕,它會跟蹤request和response數(shù)量是否相同巷屿,直到相同才會返回;tracker_
類型為std::vector<std::pair<int, int>>
促绵,記錄了request和response數(shù)量攒庵,這個數(shù)據(jù)結(jié)構(gòu)一直增長嘴纺,會造成內(nèi)存一直增長败晴。
消息處理流程
每個節(jié)點都監(jiān)聽了本地一個端口;該連接的節(jié)點在啟動時已經(jīng)連接栽渴。 上述通信機制的時候已經(jīng)描述過尖坤。
回顧一下通信機制中VAN start()方法的內(nèi)容:
- 獲取scheduler信息;
- 獲取node 信息闲擦;綁定端口慢味。
- 連接scheduler節(jié)點。Connect(scheduler__)
- ==開啟一個線程墅冷,來接受信息==纯路。
receiver_thread_ = std::unique_ptr< std::thread>( new std::thread(&Van::Receiving, this));
- 如果不是scheduler節(jié)點,發(fā)送messge寞忿,告訴scheduler節(jié)點驰唬。
- 開一個線程,來發(fā)送心跳腔彰。
而針對消息處理流程叫编,主要的邏輯集中在上述標黃那一步開始的。
對于==Server節(jié)點==:
-
Van::Receiving()
函數(shù)是單獨一個線程來接收數(shù)據(jù)霹抛。數(shù)據(jù)接收后搓逾,根據(jù)不同命令執(zhí)行不同動作,例如Control::ADD_NODE
就是添加節(jié)點杯拐。如果需要下一步處理霞篡,會將消息傳遞給Customer::Accept
函數(shù)世蔗。void Van::Receiving() { Meta nodes; Meta recovery_nodes; // store recovery nodes recovery_nodes.control.cmd = Control::ADD_NODE; while (true) { Message msg; int recv_bytes = RecvMsg(&msg); // For debug, drop received message if (ready_.load() && drop_rate_ > 0) { unsigned seed = time(NULL) + my_node_.id; if (rand_r(&seed) % 100 < drop_rate_) { LOG(WARNING) << "Drop message " << msg.DebugString(); continue; } } CHECK_NE(recv_bytes, -1); recv_bytes_ += recv_bytes; if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } // duplicated message if (resender_ && resender_->AddIncomming(msg)) continue; if (!msg.meta.control.empty()) { // control msg auto& ctrl = msg.meta.control; if (ctrl.cmd == Control::TERMINATE) { ProcessTerminateCommand(); break; } else if (ctrl.cmd == Control::ADD_NODE) { ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); } else if (ctrl.cmd == Control::BARRIER) { ProcessBarrierCommand(&msg); } else if (ctrl.cmd == Control::HEARTBEAT) { ProcessHearbeat(&msg); } else { LOG(WARNING) << "Drop unknown typed message " << msg.DebugString(); } } else { ProcessDataMsg(&msg); } } }
?
-
Customer::Accept()
函數(shù)將消息添加到一個隊列recv_queue_
;Customer::Receiving()
是一個線程在運行寇损,從隊列取消息處理凸郑;處理過程中會使用函數(shù)對象recv_handle_
處理消息,這個函數(shù)對象是SimpleApp::Process
函數(shù)矛市。void Customer::Receiving() { while (true) { Message recv; recv_queue_.WaitAndPop(&recv); if (!recv.meta.control.empty() && recv.meta.control.cmd == Control::TERMINATE) { break; } //該線程處理消息 recv_handle_(recv); if (!recv.meta.request) { std::lock_guard<std::mutex> lk(tracker_mu_); tracker_[recv.meta.timestamp].second++; tracker_cond_.notify_all(); } } }
SimpleApp::Process
根據(jù)是消息類型(請求or響應(yīng)芙沥,調(diào)用用戶注冊的函數(shù)來處理消息,request_handle_
浊吏、response_handle_
分別處理請求和響應(yīng)而昨。
對于Worker節(jié)點,上面第3點略有不同找田。因為Worker都是通過Push
歌憨、Pull
來通信,而且參數(shù)都是key-value對墩衙。Pull·參數(shù)時务嫡,通過KVWorker::Process
調(diào)用回調(diào)函數(shù)來處理消息。
調(diào)試及啟動流程
PS Lite通過環(huán)境變量和外界交互漆改。
啟動流程:
1心铃、首先啟動Scheduler節(jié)點。這是要固定好Server和Worker數(shù)量挫剑。
2去扣、啟動Worker或Server節(jié)點。啟動時連接Scheduler節(jié)點樊破,綁定本地端口愉棱,并向Scheduler節(jié)點注冊自己信息。
3哲戚、Scheduler等待所有Worker節(jié)點都注冊后奔滑,給其分配id,并把節(jié)點信息傳送出去顺少。此時Scheduler節(jié)點已經(jīng)準備好朋其。
4、Worker或Server接收到Scheduler傳送的信息后祈纯,建立對應(yīng)節(jié)點的連接令宿。此時Worker或Server已經(jīng)準備好。
調(diào)試時腕窥,通過環(huán)境變量來控制調(diào)試日志粒没。
PS_VERBOSE=1,會打印連接日志簇爆。
PS_VERBOSE=2癞松,會打印所有數(shù)據(jù)通信日志爽撒。
源碼test中連接事例
#include "ps/ps.h"
using namespace ps;
void StartServer() {
if (!IsServer()) return;
auto server = new KVServer<float>(0);
//設(shè)置kv默認處理handle, 可以自定義
server->set_request_handle(KVServerDefaultHandle<float>());
RegisterExitCallback([server](){ delete server; });
}
void RunWorker() {
if (!IsWorker()) return;
KVWorker<float> kv(0, 0);
// init
int num = 10000;
std::vector<Key> keys(num);
std::vector<float> vals(num);
int rank = MyRank();
srand(rank + 7);
for (int i = 0; i < num; ++i) {
keys[i] = kMaxKey / num * i + rank;
vals[i] = (rand() % 1000);
}
// push
int repeat = 50;
std::vector<int> ts;
for (int i = 0; i < repeat; ++i) {
ts.push_back(kv.Push(keys, vals));
// to avoid too frequency push, which leads huge memory usage
if (i > 10) kv.Wait(ts[ts.size()-10]);
}
for (int t : ts) kv.Wait(t);
// pull
std::vector<float> rets;
kv.Wait(kv.Pull(keys, &rets));
float res = 0;
for (int i = 0; i < num; ++i) {
res += fabs(rets[i] - vals[i] * repeat);
}
CHECK_LT(res / repeat, 1e-5);
LL << "error: " << res / repeat;
}
int main(int argc, char *argv[]) {
// setup server nodes
StartServer();
// start system
Start(0);
// run worker nodes
RunWorker();
// stop system
Finalize(0, true);
return 0;
}
其他
該部分內(nèi)容暫未完成响蓉,進行學(xué)習(xí)硕勿。
PaddlePaddle
//@TODO
Tensorflow
//@TODO
Adam
//@TODO
Adam框架仍然基于Multi-Spert架構(gòu),這個架構(gòu)的大體含義就是將集群分為如下幾個部分:
- 數(shù)據(jù)服務(wù)類枫甲。存儲數(shù)據(jù)源武,數(shù)據(jù)備份。向計算節(jié)點提供數(shù)據(jù)想幻。
- 訓(xùn)練模型類粱栖。訓(xùn)練模型,然后更新參數(shù)脏毯。
- 參數(shù)服務(wù)器闹究。維護一個共享的模型,計算節(jié)點計算完成后食店,可以向參數(shù)服務(wù)器發(fā)送請求更新參數(shù)渣淤。
參考文獻
參考文獻
- Scaling Distributed Machine Learning with the Parameter Server
- Parameter Server for Distributed Machine Learning
- PS-Lite Documents
參考Blog
- MPI 在大規(guī)模機器學(xué)習(xí)領(lǐng)域的前景如何
- 參數(shù)服務(wù)器——分布式機器學(xué)習(xí)的新殺器
- Allreduce (or MPI) vs. Parameter server approaches
- 橫向?qū)Ρ热蠓植际綑C器學(xué)習(xí)平臺:Spark、PMLS吉嫩、TensorFlow
- 機器學(xué)習(xí)入門:線性回歸及梯度下降
- 詳解并行邏輯回歸
- 一致性HASH算法詳解
- 【深度學(xué)習(xí)&分布式】Parameter Server 詳解
- parameter_server架構(gòu)
- 【分布式計算】MapReduce的替代者-Parameter Server
- Adam:大規(guī)模分布式機器學(xué)習(xí)框架
- ParameterServer入門和理解
- PS-Lite源碼分析
- Google Protocol Buffer 的使用和原理
- 幾種機器學(xué)習(xí)框架的對比和選擇
- tensorflow架構(gòu)
- 如何評價百度開源的深度學(xué)習(xí)框架 PaddlePaddle?