Paramter Server

Paramter Server

? Author:lyp@ Date:2017/01/26

在深度神經(jīng)網(wǎng)絡(luò)計算框架中祷蝌,參數(shù)服務(wù)器是一個非常重要的基礎(chǔ)概念,而其不同的實現(xiàn)對計算效果和計算能力都有直接的影響买窟。

請你自學(xué)參數(shù)服務(wù)器的概念宠默,并給出一個綜述,介紹什么是參數(shù)服務(wù)器,它對機器學(xué)習(xí)的作用是什么觉壶,一般實現(xiàn)有哪些方案,各自又有哪些優(yōu)缺點件缸。

背景

1. 問題的提出铜靶?

? 在大規(guī)模數(shù)據(jù)上跑機器學(xué)習(xí)任務(wù)是過去十多年內(nèi)系統(tǒng)架構(gòu)師面臨的主要挑戰(zhàn)之一,許多模型和抽象先后用于這一任務(wù)停团。

? 現(xiàn)在的大數(shù)據(jù)機器學(xué)習(xí)系統(tǒng)旷坦,通常數(shù)據(jù)在1TB到1PB之間,參數(shù)范圍在10^9 和10^12左右佑稠。而往往這些模型的參數(shù)需要被所有的worker節(jié)點頻繁的訪問秒梅,這就會帶來很多問題和挑戰(zhàn):

  1. 訪問這些巨量的參數(shù),需要大量的網(wǎng)絡(luò)帶寬支持舌胶;
  2. 很多機器學(xué)習(xí)算法都是連續(xù)型的捆蜀,只有上一次迭代完成(各個worker都完成)之后,才能進行下一次迭代幔嫂,這就導(dǎo)致了如果機器之間性能差距大(木桶理論)辆它,就會造成性能的極大損失;
  3. 在分布式中履恩,容錯能力是非常重要的锰茉。很多情況下,算法都是部署到云環(huán)境中的(這種環(huán)境下切心,機器是不可靠的飒筑,并且job也是有可能被搶占的);

2. 業(yè)內(nèi)如何解決绽昏?

? 如何解決這些問題呢协屡?對于機器學(xué)習(xí)分布式優(yōu)化,有很多大公司在做了全谤,包括:Amazon肤晓,Baidu,F(xiàn)acebook认然,Google补憾,Microsoft 和 Yahoo。也有一些開源的項目卷员,比如:YahooLDA 和 Petuum 和Graphlab余蟹。

? 從最開始的MPI,到Hadoop子刮,Spark 以及Paramter Server威酒。都曾廣泛應(yīng)用于機器學(xué)習(xí)處理任務(wù)窑睁。總結(jié)一下:

  • ==MPI Gradient Aggregation==:批任務(wù)求解器的速度不高,無法支持大規(guī)模數(shù)據(jù)集。
  • ==MapReduce==:解決了MPI無法支撐大數(shù)據(jù)的問題举户。但無法改進批處理求解器的訓(xùn)練性能,并且還引入了新的問題箫津,包括迭代式計算的低效,節(jié)點之間通信低效宰啦。
  • ==GraphLab==:基于圖的抽象苏遥。用圖來做抽象可以解決許多機器學(xué)習(xí)問題,但仍然有許多問題無法很好高效求解赡模,比如深度學(xué)習(xí)中的多層結(jié)構(gòu)田炭。
  • ==Parameter Server==:跟基于圖的方法主要區(qū)別在于把模型參數(shù)存儲和更新上升為主要組件,并且采用了異步機制提升處理能力漓柑。

Paramter Server發(fā)展歷程

? 參數(shù)服務(wù)器也經(jīng)歷了多次發(fā)展教硫。

  • 第一代參數(shù)服務(wù)器

    ? 參數(shù)服務(wù)器的概念最早來自Alex Smola于2010年提出的并行LDA的框架[4]。它通過采用一個分布式的Memcached作為存放參數(shù)的存儲辆布,這樣就提供了有效的機制用于在分布式系統(tǒng)不同的Worker節(jié)點之間同步模型參數(shù)瞬矩,而每個Worker只需要保存它計算時所依賴的一小部分參數(shù)即可。當(dāng)然锋玲,這里存放參數(shù)的存儲跟做OLTP應(yīng)用中的Key-Value抽象有所不同景用,因為以Key-Value為單元進行頻繁的參數(shù)數(shù)據(jù)交互會導(dǎo)致過高的通信開銷,因此參數(shù)服務(wù)器通常采用數(shù)學(xué)封裝來進行參數(shù)同步惭蹂,比如向量伞插,張量,矩陣的行列等剿干。

  • 第二代參數(shù)服務(wù)器

    ? 對第二代分布式架構(gòu)做改進的初步嘗試是Petuun,他使用一個有限制的延遲模型穆刻,同時在工作線程模型上添加更多限制置尔。

  • 第三代參數(shù)服務(wù)器

    ? 來自Alex Smola的高徒——李沐設(shè)計的參數(shù)服務(wù)器。ps-lite應(yīng)當(dāng)屬于第三代參數(shù)服務(wù)器氢伟,提供了更加通用的設(shè)計榜轿。==以下也主要介紹他在論文中提供的ps架構(gòu)==。

Paramter Server架構(gòu)設(shè)計

1. Paramter Server 整體架構(gòu)

PS架構(gòu)主要包括兩大部分朵锣。那就是一個參數(shù)服務(wù)器組server group 和多個工作組谬盐。在parameter server中,每個 server 實際上都只負責(zé)分到的部分參數(shù)(servers共同維持一個全局的共享參數(shù))诚些,而每個 work 也只分到部分數(shù)據(jù)和處理任務(wù)飞傀;

image

一些概念解釋:

  • server 節(jié)點可以跟其他 server 節(jié)點通信皇型,每個server負責(zé)自己分到的參數(shù),server group 共同維持所有參數(shù)的更新砸烦。
  • server manager node 負責(zé)維護一些元數(shù)據(jù)的一致性弃鸦,比如各個節(jié)點的狀態(tài),參數(shù)的分配情況等幢痘;
  • worker 節(jié)點之間沒有通信唬格,只跟自己對應(yīng)的server進行通信。
  • 每個worker group有一個task scheduler颜说,負責(zé)向worker分配任務(wù)购岗,并且監(jiān)控worker的運行情況。當(dāng)有新的worker加入或者退出门粪,task scheduler 負責(zé)重新分配任務(wù)喊积。
  • training data 被split多個部分,一個worker在本地將一部分訓(xùn)練數(shù)據(jù)存儲在本地統(tǒng)計數(shù)據(jù)中庄拇。

2. Paramter Server通信設(shè)計

  • ==(key,value)==

    ? parameter server 中注服,參數(shù)都是可以被表示成(key, value)的集合,比如一個最小化損失函數(shù)的問題措近,key就是feature ID溶弟,而value就是它的權(quán)值。對于稀疏參數(shù)瞭郑,不存在的key辜御,就可以認為是0.


    image
  • ==(key,value)Vectors==

    ? 如果每一個參數(shù)都設(shè)一個key,那么會使得通信變得非常頻繁低效屈张,為了抹平這個問題擒权,賦予每個key所對應(yīng)的value向量概念或者矩陣概念。做這樣的操作的前提是假設(shè)參數(shù)是有順序的阁谆。

    ? 這樣做有兩點好處碳抄,降低網(wǎng)絡(luò)通信和使得向量層面的操作變得可行,從而很多線性庫的優(yōu)化特性可以利用的上场绿,比如BLAS剖效、LAPACK、ATLAS等焰盗。缺點是在對于稀疏模型來說璧尸,總會在向量或者矩陣里會有參數(shù)為0,這在單個參數(shù)狀態(tài)下是不用存的熬拒,所以爷光,造成了數(shù)據(jù)的冗余。

  • ==range push and pull==

    ? workers 跟 servers 之間通過 pushpull 來通信澎粟。worker 通過 push 將計算好的梯度發(fā)送到server蛀序,然后通過 pull 從server更新參數(shù)欢瞪。為了提高計算性能和帶寬效率,parameter server 允許用戶使用 Range PushRange Pull 操作哼拔;

    ? 假設(shè) R 是需要push或pull的 key 的range引有,那么可以進行如下操作。就是發(fā)送和接送特定Range中的w倦逐。

    w.push(R, dest)
    w.pull(R, dest)
    
image

- ==Asynchronous Tasks and Dependency & Flexible Consistency==

? 體會一下Asynchronous Task 跟 Synchronous Task 的區(qū)別譬正。

? 如果 iter12 需要在 iter11 computation,push 跟 pull 都完成后才能開始檬姥,那么就是Synchronous曾我,反之就是Asynchronous.如iter 11 在 iter10計算完成后就開始執(zhí)行。

![image](http://upload-images.jianshu.io/upload_images/2472711-92152ae529e7940b.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


? 參數(shù)服務(wù)器和工作節(jié)點之間的通信都屬于遠程調(diào)用健民,那么抒巢,遠程調(diào)用是比較耗時的行為,如果每次都保持同步的話秉犹,那么訓(xùn)練相對于單節(jié)點來說是減慢了許多的蛉谜,因為遠程調(diào)用的耗時。因而崇堵,PS框架讓遠程調(diào)用成為一部調(diào)用型诚,比如參數(shù)的push和pull發(fā)出之后,立即使用當(dāng)前值開始進行下一步的梯度計算鸳劳,如上圖狰贯,迭代11發(fā)出push和pull的請求后,立馬開始進行梯度計算赏廓,而此時涵紊,使用的還是迭代10的值。

? **Asynchronous Task**:能夠提高系統(tǒng)的效率(因為節(jié)省了很多等待的過程)幔摸,但是摸柄,它的缺點就是容易降低算法的收斂速率;

? 所以既忆,系統(tǒng)性能跟算法收斂速率之間是存在一個trade-off的驱负,你需要同時考慮:

```xml
算法對于參數(shù)非一致性的敏感度;
訓(xùn)練數(shù)據(jù)特征之間的關(guān)聯(lián)度尿贫;
硬盤的存儲容量电媳;

? 考慮到用戶使用的時候會有不同的情況踏揣,parameter server 為用戶提供了多種任務(wù)依賴方式:


image
  • Sequential: 這里其實是 synchronous task庆亡,任務(wù)之間是有順序的,只有上一個任務(wù)完成捞稿,才能開始下一個任務(wù)又谋;

  • Eventual: 跟 sequential 相反拼缝,所有任務(wù)之間沒有順序,各自獨立完成自己的任務(wù)彰亥,

  • Bounded Delay: 這是sequential 跟 eventual 之間的trade-off咧七,可以設(shè)置一個 τ 作為最大的延時時間。也就是說任斋,只有 >τ 之前的任務(wù)都被完成了继阻,才能開始一個新的任務(wù);極端的情況:

    • τ=0废酷,情況就是 Sequential瘟檩;
    • τ=∞,情況就是 Eventual澈蟆;
  • ==User-defined Filters==

    ? 作為上述特點的補充墨辛,PS還有這樣一個小feature,即過濾趴俘,在工作節(jié)點這一端對梯度進行過濾睹簇,如果梯度并不是那么影響重大,就不用占用網(wǎng)絡(luò)去更新寥闪。

Paramter Server架構(gòu)實現(xiàn)

1. Vector Clock

? 為參數(shù)服務(wù)器中的每個參數(shù)添加一個時間戳太惠,來跟蹤參數(shù)的更新和防止重復(fù)發(fā)送數(shù)據(jù)〕裙福基于此垛叨,通信中的梯度更新數(shù)據(jù)中也應(yīng)該有時間戳,防止重復(fù)更新柜某。

? 如果每個參數(shù)都有一個時間戳嗽元,那么參數(shù)眾多,時間戳也眾多喂击。好在剂癌,parameter server 在push跟pull的時候,都是rang-based翰绊,這就帶來了一個好處:這個range里面的參數(shù)共享的是同一個時間戳佩谷,這顯然可以大大降低了空間復(fù)雜度。

2. Messages

? Message是節(jié)點間交互的主要格式监嗜。一條 message 包括:時間戳谐檀,len(range)對k-v.

? $[vc(R),(k1,v1),...,(kp,vp)]kj∈Randj∈{1,...p}$

? 這是parameter server 中最基本的通信格式,不僅僅是共享的參數(shù)才有裁奇,task 的message也是這樣的格式桐猬,只要把這里的(key, value) 改成 (task ID, 參數(shù)/返回值)。

? Messages may carry a subset of all available keys within range R. The missing keys are assigned the same timestamp without changing their values.

? 由于機器學(xué)習(xí)問題通常都需要很高的網(wǎng)絡(luò)帶寬刽肠,因此信息的壓縮是必須的溃肪。

  • key的壓縮: 因為訓(xùn)練數(shù)據(jù)通常在分配之后都不會發(fā)生改變免胃,因此worker沒有必要每次都發(fā)送相同的key,只需要接收方在第一次接收的時候緩存起來就行了惫撰。第二次羔沙,worker不再需要同時發(fā)送key和value,只需要發(fā)送value 和 key list的hash就行厨钻。這樣瞬間減少了一般的通信量扼雏。
  • value的壓縮: 假設(shè)參數(shù)時稀疏的,那么就會有大量的0存在夯膀。因此呢蛤,為了進一步壓縮,我們只需要發(fā)送非0值棍郎。parameter server使用 Snappy 快速壓縮庫來壓縮數(shù)據(jù)其障、高效去除0值。
  • key 的壓縮和 value 的壓縮可以同時進行涂佃。

3. Replication and Consistency

? parameter server 在數(shù)據(jù)一致性上励翼,使用的是傳統(tǒng)的一致性哈希算法,參數(shù)key與server node id被插入到一個hash ring中辜荠。具體實現(xiàn)可以參考另一篇blog一致性hash算法詳解汽抚。動態(tài)增加和移除節(jié)點的同時還能保證系統(tǒng)存儲與key分配的性能效率.

image

? 兩種方式保證slave跟master之間的數(shù)據(jù)一致性:

  1. 默認的復(fù)制方式: Chain replication (強一致性, 可靠):

    image

a. 更新:只能發(fā)生在數(shù)據(jù)頭節(jié)點,然后更新逐步后移,直到更新到達尾節(jié)點伯病,并由尾節(jié)點向客戶確認更新成功造烁;
b. 查詢:為保證強一致性,客戶查詢只能在尾節(jié)點進行午笛;

  1. Replication after Aggregation
    image

兩個worker 節(jié)點分別向server傳送x和y惭蟋。server 首先通過一定方式(如:f(x+y) )進行aggregate,然后再進行復(fù)制操作药磺;

當(dāng)有n個worker的時候告组,復(fù)制只需要k/n的帶寬。通常來說癌佩,k(復(fù)制次數(shù))是一個很小的常數(shù)木缝,而n的值大概是幾百到幾千;

4. Server Management

由于key的range特性围辙,當(dāng)參數(shù)服務(wù)器集群中增加一個節(jié)點時我碟,步驟如下:

  • server manager節(jié)點給新節(jié)點分配一個key range,這可能會導(dǎo)致其他節(jié)點上的key range切分
  • 新節(jié)點從其他節(jié)點上將屬于它的key range數(shù)據(jù)取過來姚建,然后也將slave信息取過來
  • server manager廣播節(jié)點變動矫俺,其他節(jié)點得知消息后將不屬于自己key range的數(shù)據(jù)刪掉

? 在第二步,從其他節(jié)點上取數(shù)據(jù)的時候,其他節(jié)點上的操作也分為兩步恳守,第一是拷貝數(shù)據(jù),這可能也會導(dǎo)致key range的切分贩虾。第二是不再接受和這些數(shù)據(jù)有關(guān)的消息催烘,而是進行轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到新節(jié)點缎罢。

? 在第三步伊群,收到廣播信息后,節(jié)點會刪除對應(yīng)區(qū)間的數(shù)據(jù)策精,然后舰始,掃描所有的和R有關(guān)發(fā)送出去的還沒收到回復(fù)的消息,當(dāng)這些消息回復(fù)時咽袜,轉(zhuǎn)發(fā)到新節(jié)點丸卷。

? 節(jié)點的離開與節(jié)點的加入類似。

5. Worker Management

添加工作節(jié)點比添加服務(wù)器節(jié)點要簡單一些询刹,步驟如下:

  • task scheduler給新節(jié)點分配一些數(shù)據(jù)
  • 節(jié)點從網(wǎng)絡(luò)文件系統(tǒng)中載入數(shù)據(jù)谜嫉,然后從服務(wù)器端拉取參數(shù)
  • task scheduler廣播變化,其他節(jié)點free掉一些訓(xùn)練數(shù)據(jù)

? 當(dāng)一個節(jié)點離開的時候凹联,task scheduler可能會尋找一個替代沐兰,但恢復(fù)節(jié)點是十分耗時的工作,同時蔽挠,損失一些數(shù)據(jù)對最后的結(jié)果可能影響并不是很大住闯。所以,系統(tǒng)會讓用戶進行選擇澳淑,是恢復(fù)節(jié)點還是不做處理比原。這種機制甚至可以允許用戶刪掉跑的最慢的節(jié)點來提升速度。

PS-lite 實現(xiàn)

PS-Lite是PS架構(gòu)的一個輕量級的實現(xiàn)杠巡。它提供了push春寿,pull,wait等APIs忽孽。整個項目代碼量不多绑改。

A light and efficient implementation of the parameter server framework. It provides clean yet powerful APIs. For example, a worker node can communicate with the server nodes by

  • Push(keys, values): push a list of (key, value) pairs to the server nodes
  • Pull(keys): pull the values from servers for a list of keys
  • Wait: wait untill a push or pull finished.

A simple example:

  std::vector<uint64_t> key = {1, 3, 5};
  std::vector<float> val = {1, 1, 1};
  std::vector<float> recv_val;
  ps::KVWorker<float> w;
  w.Wait(w.Push(key, val));
  w.Wait(w.Pull(key, &recv_val));

總體概覽

整個項目的類圖如下:

image
  • Postoffice是全局管理類,單例模式創(chuàng)建兄一。管理當(dāng)前節(jié)點角色厘线、其他節(jié)點的連接、心跳信息出革、配置信息造壮。
  • Van是負責(zé)通信的類,是Postoffice的成員。Van中std::unordered_map<int, void*> senders_保存了node_id到連接的映射耳璧。Van只是定義了接口成箫,具體實現(xiàn)是依賴ZMQ實現(xiàn)的ZMQVan
  • Customer用來通信旨枯,跟蹤request和response蹬昌。每一個連接對應(yīng)一個Customer實例,連接對方的id和Customer實例的id相同攀隔。
  • SimpleApp是一個基類皂贩;提供了發(fā)送接收int型的head和string型的body消息,以及注冊消息處理函數(shù)昆汹。它有2個派生類明刷。
  • KVServer是SimpleApp的派生類,用來保存key-values數(shù)據(jù)满粗。
  • KVWorker是SimpleApp的派生類辈末,用來想Server Push/Pull key-value數(shù)據(jù)。
  • KVPairs封裝了Key-Value結(jié)構(gòu)映皆,還包含了一個長度選項本冲。
  • SArray是Shared array,像智能指針一樣共享數(shù)據(jù)劫扒,接口類似vector檬洞。
  • Node封裝了節(jié)點的信息,例如角色沟饥、ip添怔、端口、是否是恢復(fù)節(jié)點贤旷。
  • Control封裝了控制信息广料,例如命令類型、目的節(jié)點幼驶、barrier_group的id艾杏、簽名。
  • Meta封裝了元數(shù)據(jù)盅藻,發(fā)送者购桑、接受者、時間戳氏淑、請求還是相應(yīng)等勃蜘。
  • Message是要發(fā)送的信息,除了元數(shù)據(jù)外假残,還包括發(fā)送的數(shù)據(jù)缭贡。

節(jié)點角色ID

image

一共三種類型,從上圖可以看出Scheduler節(jié)點只有一個,多個Worker和多個Server可以組成一個Group阳惹,因此有WorkerGroup和ServerGroup谍失;還有Worker節(jié)點和Server節(jié)點。每個節(jié)點以及每一個Group都有唯一確定的ID莹汤。
Scheduler快鱼、ServerGroup、WorkerGroup節(jié)點ID確定如下:

/** \brief node ID for the scheduler */
static const int kScheduler = 1;
/**
 * \brief the server node group ID
 *
 * group id can be combined:
 * - kServerGroup + kScheduler means all server nodes and the scheuduler
 * - kServerGroup + kWorkerGroup means all server and worker nodes
 */
static const int kServerGroup = 2;
/** \brief the worker node group ID */
static const int kWorkerGroup = 4;

上述定義在base.h中体啰。

1、2嗽仪、4的二進制表示分別為:001荒勇、010、001闻坚。這樣可以做Group之間的合并沽翔,例如要和ServerGroup和WorkerGroup發(fā)信息,只需要destination node id設(shè)為2+4=6窿凤。
1-7用來表示節(jié)點的組合仅偎。單個節(jié)點的ID從8開始。單個Server和單個Worker節(jié)點從自己的rank(0雳殊、1橘沥、2……)轉(zhuǎn)換到其ID:

/**
   * \brief convert from a worker rank into a node id
   * \param rank the worker rank
   */
  static inline int WorkerRankToID(int rank) {
    return rank * 2 + 9;
  }
  /**
   * \brief convert from a server rank into a node id
   * \param rank the server rank
   */
  static inline int ServerRankToID(int rank) {
    return rank * 2 + 8;
  }

ID到其rank轉(zhuǎn)換:

  static inline int IDtoRank(int id) {
    return std::max((id - 8) / 2, 0);
  }

Postofficetd::unordered_map<int, std::vector<int>> node_ids_==保存了Node/NodeGroup與連接節(jié)點集合的對應(yīng)關(guān)系==。

消息封裝

zz
  • 首先使用了自定義的SArray夯秃,Smart Array座咆。共享數(shù)據(jù),減少數(shù)據(jù)拷貝仓洼,且提供了類似vector的接口介陶。==備注:沒有仔細看實現(xiàn),在這里先把他理解成一個數(shù)組色建。==

  • 元數(shù)據(jù)Meta使用了Protobuf哺呜,進行了數(shù)據(jù)壓縮.具體使用可以參考blog14,鏈接見參考文獻箕戳。具體定義見代碼某残。

  • 消息分層比較清晰。Node包含節(jié)點的角色陵吸、id驾锰、ip、端口信息走越;Control包含了命令信息椭豫、簽名等;Meta是元數(shù)據(jù),包含時間戳赏酥、發(fā)送者喳整、接受者、控制信息等裸扶;Message才是發(fā)送的信息框都,包含元數(shù)據(jù)和發(fā)送的數(shù)據(jù)。他們之間的構(gòu)造見上圖呵晨。

  • 參數(shù)有key-value組成魏保,對應(yīng)于KVPairs。定義如下

    struct KVPairs {
      // /** \brief empty constructor */
      // KVPairs() {}
      /** \brief the list of keys */
      SArray<Key> keys;
      /** \brief the according values */
      SArray<Val> vals;
      /** \brief the according value lengths (could be empty) */
      SArray<int> lens;
    };
    

通信機制

Scheduler節(jié)點管理所有節(jié)點的地址摸屠。每個節(jié)點要知道Scheduler節(jié)點的IP谓罗、port;啟動時綁定一個本地端口季二,并向Scheduler節(jié)點報告檩咱。==Scheduler節(jié)點在每個幾點啟動后,給節(jié)點分配ID胯舷,把節(jié)點信息通知出去(例如Worker節(jié)點要知道Server節(jié)點IP和端口刻蚯,Server節(jié)點要知道Worker節(jié)點的IP和端口)==。節(jié)點在建立連接后桑嘶,才會正式啟動炊汹。

測試鏈接的過程:

  • test.connection.cc:執(zhí)行ps:start函數(shù)

    int main(int argc, char *argv[]) {
      ps::Start(0);
      // do nothing
      ps::Finalize(0, true);
      return 0;
    }
    
    • Start,獲取一個postoffice的單利對象,并調(diào)用start方法逃顶。

      inline void Start(int customer_id, const char* argv0 = nullptr) {
        Postoffice::Get()->Start(customer_id, argv0, true);
      }
      
      • postoffice的start函數(shù)兵扬,是主要內(nèi)容。分三步口蝠,1.初始化node器钟,2.start van 3.如果設(shè)置barrier,start barrier妙蔗。

        void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {
          ....
            //1. init node info. 多少個worker傲霸。
            for (int i = 0; i < num_workers_; ++i) {
              int id = WorkerRankToID(i); //i*2+9 獲得workid
              for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup,
                            kWorkerGroup + kScheduler,
                            kWorkerGroup + kServerGroup + kScheduler}) {
                node_ids_[g].push_back(id);
              }
            }
            for (int i = 0; i < num_servers_; ++i) {
              int id = ServerRankToID(i);
              for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup,
                            kServerGroup + kScheduler,
                            kWorkerGroup + kServerGroup + kScheduler}) {
                node_ids_[g].push_back(id);
              }
            }
            for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup,
                          kScheduler + kWorkerGroup, kScheduler + kServerGroup}) {
              node_ids_[g].push_back(kScheduler);
            }
            init_stage_++;
          }
           .....
          //2. start van
          //已經(jīng)初始化完node,這邊啟動通信眉反。
          van_->Start(customer_id);
           ......
          //3. do a barrier here
          if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
        }
        
        • 此刻昙啄,我們查看van_->Start(customer_id);方法。int init_stage = 0;這個是van初始化最開始的值寸五。

          void Van::Start(int customer_id) {
            // get scheduler info
            start_mu_.lock();
            if (init_stage == 0) {
              scheduler_.hostname = std::string(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
              scheduler_.port = atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
              scheduler_.role = Node::SCHEDULER;
              scheduler_.id = kScheduler;
              is_scheduler_ = Postoffice::Get()->is_scheduler();
          
              //1. get my node info 獲取節(jié)點信息梳凛,主要是ip port role 等信息
              if (is_scheduler_) {
                my_node_ = scheduler_;
              } else {
                auto role = is_scheduler_ ? Node::SCHEDULER :
                            (Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER);
                const char *nhost = Environment::Get()->find("DMLC_NODE_HOST");
                std::string ip;
                if (nhost) ip = std::string(nhost);
                if (ip.empty()) {
                  const char *itf = Environment::Get()->find("DMLC_INTERFACE");
                  std::string interface;
                  if (itf) interface = std::string(itf);
                  if (interface.size()) {
                    GetIP(interface, &ip);
                  } else {
                    GetAvailableInterfaceAndIP(&interface, &ip);
                  }
                  CHECK(!interface.empty()) << "failed to get the interface";
                }
                int port = GetAvailablePort();
                const char *pstr = Environment::Get()->find("PORT");
                if (pstr) port = atoi(pstr);
                CHECK(!ip.empty()) << "failed to get ip";
                CHECK(port) << "failed to get a port";
                my_node_.hostname = ip;
                my_node_.role = role;
                my_node_.port = port;
                // cannot determine my id now, the scheduler will assign it later
                // set it explicitly to make re-register within a same process possible
                my_node_.id = Node::kEmpty;
                my_node_.customer_id = customer_id;
              }
          
              //2. bind. 綁定端口
              my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
              PS_VLOG(1) << "Bind to " << my_node_.DebugString();
              CHECK_NE(my_node_.port, -1) << "bind failed";
          
              //3. connect to the scheduler 建立連接。具體實現(xiàn)在 zmq中梳杏。
              Connect(scheduler_);
          
              //4. for debug use
              if (Environment::Get()->find("PS_DROP_MSG")) {
                drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
              }
              //5. start receiver 開一個新的線程韧拒。來接受信息淹接。
              receiver_thread_ = std::unique_ptr<std::thread>(
                      new std::thread(&Van::Receiving, this));
              init_stage++;
            }
            start_mu_.unlock();
          
            if (!is_scheduler_) {
              // let the scheduler know myself 如果不是scheduler,發(fā)送message叛溢。告知自己塑悼。
              Message msg;
              Node customer_specific_node = my_node_;
              customer_specific_node.customer_id = customer_id;
              msg.meta.recver = kScheduler;
              msg.meta.control.cmd = Control::ADD_NODE;
              msg.meta.control.node.push_back(customer_specific_node);
              msg.meta.timestamp = timestamp_++;
              Send(msg);
            }
            // wait until ready
            while (!ready_.load()) {
              std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
          
            start_mu_.lock();
            if (init_stage == 1) {
              // resender
              if (Environment::Get()->find("PS_RESEND") && atoi(Environment::Get()->find("PS_RESEND")) != 0) {
                int timeout = 1000;
                if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
                  timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
                }
                resender_ = new Resender(timeout, 10, this);
              }
          
              if (!is_scheduler_) {
                // start heartbeat thread 開啟一個新的縣城 進行心跳檢測。
                heartbeat_thread_ = std::unique_ptr<std::thread>(
                        new std::thread(&Van::Heartbeat, this));
              }
              init_stage++;
            }
            start_mu_.unlock();
          }
          
          

至此楷掉,通信連接建立完成厢蒜。

同步策略

異步工作時,Worker計算參數(shù)可能要依賴前面Pull是否完成烹植。如果需要等待某一步操作斑鸦,可以調(diào)用SimpleApp::Wait操作。具體實現(xiàn)是調(diào)用了Customer::WaitRequest()草雕,它會跟蹤request和response數(shù)量是否相同巷屿,直到相同才會返回;tracker_類型為std::vector<std::pair<int, int>>促绵,記錄了request和response數(shù)量攒庵,這個數(shù)據(jù)結(jié)構(gòu)一直增長嘴纺,會造成內(nèi)存一直增長败晴。

消息處理流程

每個節(jié)點都監(jiān)聽了本地一個端口;該連接的節(jié)點在啟動時已經(jīng)連接栽渴。 上述通信機制的時候已經(jīng)描述過尖坤。

回顧一下通信機制中VAN start()方法的內(nèi)容:

  • 獲取scheduler信息;
  • 獲取node 信息闲擦;綁定端口慢味。
  • 連接scheduler節(jié)點。Connect(scheduler__)
  • ==開啟一個線程墅冷,來接受信息==纯路。receiver_thread_ = std::unique_ptr< std::thread>( new std::thread(&Van::Receiving, this));
  • 如果不是scheduler節(jié)點,發(fā)送messge寞忿,告訴scheduler節(jié)點驰唬。
  • 開一個線程,來發(fā)送心跳腔彰。

而針對消息處理流程叫编,主要的邏輯集中在上述標黃那一步開始的。

對于==Server節(jié)點==:

  1. Van::Receiving()函數(shù)是單獨一個線程來接收數(shù)據(jù)霹抛。數(shù)據(jù)接收后搓逾,根據(jù)不同命令執(zhí)行不同動作,例如Control::ADD_NODE就是添加節(jié)點杯拐。如果需要下一步處理霞篡,會將消息傳遞給Customer::Accept函數(shù)世蔗。

    void Van::Receiving() {
      Meta nodes;
      Meta recovery_nodes;  // store recovery nodes
      recovery_nodes.control.cmd = Control::ADD_NODE;
    
      while (true) {
        Message msg;
        int recv_bytes = RecvMsg(&msg);
        // For debug, drop received message
        if (ready_.load() && drop_rate_ > 0) {
          unsigned seed = time(NULL) + my_node_.id;
          if (rand_r(&seed) % 100 < drop_rate_) {
            LOG(WARNING) << "Drop message " << msg.DebugString();
            continue;
          }
        }
    
        CHECK_NE(recv_bytes, -1);
        recv_bytes_ += recv_bytes;
        if (Postoffice::Get()->verbose() >= 2) {
          PS_VLOG(2) << msg.DebugString();
        }
        // duplicated message
        if (resender_ && resender_->AddIncomming(msg)) continue;
    
        if (!msg.meta.control.empty()) {
          // control msg
          auto& ctrl = msg.meta.control;
          if (ctrl.cmd == Control::TERMINATE) {
            ProcessTerminateCommand();
            break;
          } else if (ctrl.cmd == Control::ADD_NODE) {
            ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes);
          } else if (ctrl.cmd == Control::BARRIER) {
            ProcessBarrierCommand(&msg);
          } else if (ctrl.cmd == Control::HEARTBEAT) {
            ProcessHearbeat(&msg);
          } else {
            LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
          }
        } else {
          ProcessDataMsg(&msg);
        }
      }
    }
    

    ?

  2. Customer::Accept()函數(shù)將消息添加到一個隊列recv_queue_Customer::Receiving()是一個線程在運行寇损,從隊列取消息處理凸郑;處理過程中會使用函數(shù)對象recv_handle_處理消息,這個函數(shù)對象是SimpleApp::Process函數(shù)矛市。

    void Customer::Receiving() {
      while (true) {
        Message recv;
        recv_queue_.WaitAndPop(&recv);
        if (!recv.meta.control.empty() &&
            recv.meta.control.cmd == Control::TERMINATE) {
          break;
        }
        //該線程處理消息
        recv_handle_(recv);
        if (!recv.meta.request) {
          std::lock_guard<std::mutex> lk(tracker_mu_);
          tracker_[recv.meta.timestamp].second++;
          tracker_cond_.notify_all();
        }
      }
    }
    
  3. SimpleApp::Process根據(jù)是消息類型(請求or響應(yīng)芙沥,調(diào)用用戶注冊的函數(shù)來處理消息,request_handle_浊吏、response_handle_分別處理請求和響應(yīng)而昨。

對于Worker節(jié)點,上面第3點略有不同找田。因為Worker都是通過Push歌憨、Pull來通信,而且參數(shù)都是key-value對墩衙。Pull·參數(shù)時务嫡,通過KVWorker::Process調(diào)用回調(diào)函數(shù)來處理消息。

調(diào)試及啟動流程

PS Lite通過環(huán)境變量和外界交互漆改。

啟動流程:
1心铃、首先啟動Scheduler節(jié)點。這是要固定好Server和Worker數(shù)量挫剑。
2去扣、啟動Worker或Server節(jié)點。啟動時連接Scheduler節(jié)點樊破,綁定本地端口愉棱,并向Scheduler節(jié)點注冊自己信息。
3哲戚、Scheduler等待所有Worker節(jié)點都注冊后奔滑,給其分配id,并把節(jié)點信息傳送出去顺少。此時Scheduler節(jié)點已經(jīng)準備好朋其。
4、Worker或Server接收到Scheduler傳送的信息后祈纯,建立對應(yīng)節(jié)點的連接令宿。此時Worker或Server已經(jīng)準備好。

調(diào)試時腕窥,通過環(huán)境變量來控制調(diào)試日志粒没。
PS_VERBOSE=1,會打印連接日志簇爆。
PS_VERBOSE=2癞松,會打印所有數(shù)據(jù)通信日志爽撒。

源碼test中連接事例

#include "ps/ps.h"
using namespace ps;

void StartServer() {
  if (!IsServer()) return;
  auto server = new KVServer<float>(0);
  //設(shè)置kv默認處理handle, 可以自定義
  server->set_request_handle(KVServerDefaultHandle<float>());
  RegisterExitCallback([server](){ delete server; });
}

void RunWorker() {
  if (!IsWorker()) return;
  KVWorker<float> kv(0, 0);

  // init
  int num = 10000;
  std::vector<Key> keys(num);
  std::vector<float> vals(num);

  int rank = MyRank();
  srand(rank + 7);
  for (int i = 0; i < num; ++i) {
    keys[i] = kMaxKey / num * i + rank;
    vals[i] = (rand() % 1000);
  }

  // push
  int repeat = 50;
  std::vector<int> ts;
  for (int i = 0; i < repeat; ++i) {
    ts.push_back(kv.Push(keys, vals));

    // to avoid too frequency push, which leads huge memory usage
    if (i > 10) kv.Wait(ts[ts.size()-10]);
  }
  for (int t : ts) kv.Wait(t);

  // pull
  std::vector<float> rets;
  kv.Wait(kv.Pull(keys, &rets));

  float res = 0;
  for (int i = 0; i < num; ++i) {
    res += fabs(rets[i] - vals[i] * repeat);
  }
  CHECK_LT(res / repeat, 1e-5);
  LL << "error: " << res / repeat;
}

int main(int argc, char *argv[]) {
  // setup server nodes
  StartServer();
  // start system
  Start(0);
  // run worker nodes
  RunWorker();
  // stop system
  Finalize(0, true);
  return 0;
}

其他

該部分內(nèi)容暫未完成响蓉,進行學(xué)習(xí)硕勿。

PaddlePaddle

//@TODO

Tensorflow

//@TODO

Adam

//@TODO

Adam框架仍然基于Multi-Spert架構(gòu),這個架構(gòu)的大體含義就是將集群分為如下幾個部分:

  1. 數(shù)據(jù)服務(wù)類枫甲。存儲數(shù)據(jù)源武,數(shù)據(jù)備份。向計算節(jié)點提供數(shù)據(jù)想幻。
  2. 訓(xùn)練模型類粱栖。訓(xùn)練模型,然后更新參數(shù)脏毯。
  3. 參數(shù)服務(wù)器闹究。維護一個共享的模型,計算節(jié)點計算完成后食店,可以向參數(shù)服務(wù)器發(fā)送請求更新參數(shù)渣淤。

參考文獻

參考文獻

  1. Scaling Distributed Machine Learning with the Parameter Server
  2. Parameter Server for Distributed Machine Learning
  3. PS-Lite Documents

參考Blog

  1. MPI 在大規(guī)模機器學(xué)習(xí)領(lǐng)域的前景如何
  2. 參數(shù)服務(wù)器——分布式機器學(xué)習(xí)的新殺器
  3. Allreduce (or MPI) vs. Parameter server approaches
  4. 橫向?qū)Ρ热蠓植际綑C器學(xué)習(xí)平臺:Spark、PMLS吉嫩、TensorFlow
  5. 機器學(xué)習(xí)入門:線性回歸及梯度下降
  6. 詳解并行邏輯回歸
  7. 一致性HASH算法詳解
  8. 【深度學(xué)習(xí)&分布式】Parameter Server 詳解
  9. parameter_server架構(gòu)
  10. 【分布式計算】MapReduce的替代者-Parameter Server
  11. Adam:大規(guī)模分布式機器學(xué)習(xí)框架
  12. ParameterServer入門和理解
  13. PS-Lite源碼分析
  14. Google Protocol Buffer 的使用和原理
  15. 幾種機器學(xué)習(xí)框架的對比和選擇
  16. tensorflow架構(gòu)
  17. 如何評價百度開源的深度學(xué)習(xí)框架 PaddlePaddle?

參考項目

  1. https://github.com/dmlc/ps-lite
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末涧衙,一起剝皮案震驚了整個濱河市如暖,隨后出現(xiàn)的幾起案子者填,更是在濱河造成了極大的恐慌训堆,老刑警劉巖露戒,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件椒功,死亡現(xiàn)場離奇詭異,居然都是意外死亡智什,警方通過查閱死者的電腦和手機动漾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荠锭,“玉大人旱眯,你說我怎么就攤上這事≈ぞ牛” “怎么了删豺?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長愧怜。 經(jīng)常有香客問我呀页,道長,這世上最難降的妖魔是什么拥坛? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任蓬蝶,我火速辦了婚禮尘分,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丸氛。我一直安慰自己培愁,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布缓窜。 她就那樣靜靜地躺著定续,像睡著了一般。 火紅的嫁衣襯著肌膚如雪禾锤。 梳的紋絲不亂的頭發(fā)上香罐,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音时肿,去河邊找鬼庇茫。 笑死,一個胖子當(dāng)著我的面吹牛螃成,可吹牛的內(nèi)容都是我干的旦签。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼寸宏,長吁一口氣:“原來是場噩夢啊……” “哼宁炫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起氮凝,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤羔巢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后罩阵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竿秆,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年稿壁,在試婚紗的時候發(fā)現(xiàn)自己被綠了幽钢。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡傅是,死狀恐怖匪燕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情喧笔,我是刑警寧澤帽驯,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站书闸,受9級特大地震影響尼变,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜梗劫,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一享甸、第九天 我趴在偏房一處隱蔽的房頂上張望截碴。 院中可真熱鬧,春花似錦蛉威、人聲如沸日丹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哲虾。三九已至,卻和暖如春择示,著一層夾襖步出監(jiān)牢的瞬間束凑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工栅盲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留汪诉,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓谈秫,卻偏偏與公主長得像扒寄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子拟烫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理该编,服務(wù)發(fā)現(xiàn),斷路器硕淑,智...
    卡卡羅2017閱讀 134,628評論 18 139
  • mxnet分布式2 ps-lite論文閱讀 https://www.usenix.org/system/files...
    迷途的Go閱讀 1,559評論 0 2
  • 論文原文:Scaling Distributed Machine Learning with the Parame...
    是neinei啊閱讀 1,691評論 0 3
  • YarnYarn產(chǎn)生背景:Yarn直接來自于MR1.0MR1.0 問題:采用的是master slave結(jié)構(gòu)课竣,ma...
    時待吾閱讀 5,579評論 2 23
  • 家長會的感觸很深,李老師和家長探討的不僅是學(xué)習(xí)上的內(nèi)容置媳,還有一種教育的理念于樟,李老師真誠的和我們談了孩子們現(xiàn)在面臨的...
    Baiqin閱讀 240評論 0 0