目錄
圖嵌入是一種從圖中生成無(wú)監(jiān)督節(jié)點(diǎn)特征(node features)的方法恢口,生成的特征可以應(yīng)用在各類機(jī)器學(xué)習(xí)任務(wù)上⌒⒆冢現(xiàn)代的圖網(wǎng)絡(luò),尤其是在工業(yè)應(yīng)用中耕肩,通常會(huì)包含數(shù)十億的節(jié)點(diǎn)(node)和數(shù)萬(wàn)億的邊(edge)因妇。這已經(jīng)超出了已知嵌入系統(tǒng)的處理能力。Facebook開(kāi)源了一種嵌入系統(tǒng)猿诸,PyTorch-BigGraph(PBG)婚被,系統(tǒng)對(duì)傳統(tǒng)的多關(guān)系嵌入系統(tǒng)做了幾處修改讓系統(tǒng)能擴(kuò)展到能處理數(shù)十億節(jié)點(diǎn)和數(shù)萬(wàn)億條邊的圖形。
本系列為翻譯的pytouch的官方手冊(cè)梳虽,希望能幫助大家快速入門GNN及其使用址芯,全文十五篇,文中如果有勘誤請(qǐng)隨時(shí)聯(lián)系窜觉。
(一)Facebook開(kāi)源圖神經(jīng)網(wǎng)絡(luò)-Pytorch Biggraph
(二)Facebook:BigGraph 中文文檔-數(shù)據(jù)模型(PyTorch)
(三)Facebook:BigGraph 中文文檔-從實(shí)體嵌入到邊分值(PyTorch)
(四)Facebook:BigGraph 中文文檔-I/O格式化(PyTorch)
(五)Facebook:BigGraph 中文文檔-批預(yù)處理
Distributed Mode 分布式模式
源鏈接:https://torchbiggraph.readthedocs.io/en/latest/distributed_training.html
PBG can perform training across multiple machines which communicate over a network, in order to reduce training time on large graphs. Distributed training is able to concurrently utilize larger computing resources, as well as to keep the entire model stored in memory across all machines, avoiding the need to swap it to disk. On each machine, the training is further parallelized across multiple subprocesses.
PBG可以跨多臺(tái)機(jī)器訓(xùn)練谷炸,通過(guò)網(wǎng)絡(luò)進(jìn)行機(jī)器間通信,以減少大型圖形上的訓(xùn)練時(shí)長(zhǎng)禀挫。分布式訓(xùn)練可以同時(shí)利用更大的計(jì)算資源旬陡,同時(shí)將整個(gè)模型跨機(jī)器存儲(chǔ)在內(nèi)存中,避免了內(nèi)存和硬盤間的數(shù)據(jù)交換语婴。訓(xùn)練在每個(gè)機(jī)器上多個(gè)子流程間進(jìn)一步的并行化描孟。
Setup 啟動(dòng)
In order to perform distributed training, the configuration file must first be updated to contain the specification of the desired distributed setup. If training should be carried out on?NN?machines, then the?num_machines?key in the config must be set to that value. In addition, thedistributed_init_method?must describe a way for the trainers to discover each other and set up their communication. All valid values for the?init_methodargument of?torch.distributed.init_process_group()?are accepted here. Usually this will be a path to a shared network filesystem or the network address of one of the machines. See the?PyTorch docs?for more information and a complete reference.
若要執(zhí)行分布式的訓(xùn)練驶睦,我們首先得更新包含在分布式配置中的必須的一些配置項(xiàng)。如果訓(xùn)練要在N個(gè)機(jī)器上執(zhí)行匿醒,那在配置中的num_machines配置項(xiàng)應(yīng)該要正確配置场航。另外,訓(xùn)練機(jī)器相互之間發(fā)現(xiàn)和通信模式的設(shè)置通過(guò)distributed_init_method來(lái)描述廉羔。所有torch.distributed.init_process_group()可接收的參數(shù)都是init_methed有效參數(shù)溉痢。通常這是一個(gè)共享網(wǎng)絡(luò)文件系統(tǒng)的路徑或其中一臺(tái)計(jì)算機(jī)的網(wǎng)絡(luò)地址。相關(guān)的信息和完整參考蜜另,請(qǐng)參閱pytorch文檔适室。
To launch distributed training, call?torchbiggraph_train?--rank?rank?config.py?on each machine, with?rank?replaced by an integer between 0 and?N?1N?1?(inclusive), different for each machine. Each machine must have PBG installed and have a copy of the config file.
啟動(dòng)分布式訓(xùn)練,在每臺(tái)機(jī)器上啟動(dòng)torchbiggraph_train --rank rank config.py 举瑰,其中對(duì)每臺(tái)機(jī)器,rank需要替換成從0到N-1的不同的整數(shù)蔬螟。每個(gè)機(jī)器上必須已經(jīng)安裝了PBG并且都有一份相同的配置文件此迅。
In some uncommon circumstances, one may want to store the embeddings on different machines than the ones that are performing training. In that case, one would set?num_partition_servers?to a positive value and manually launch some instances of?torchbiggraph_partitionserver?as well. See below for more information on this.
在某些非常規(guī)的情況下,需求是希望將嵌入存放到不同的機(jī)器上而不是訓(xùn)練嵌入旧巾。在這種情況下耸序,可以配置num_partition_servers及存放的機(jī)器數(shù),并且在一些實(shí)例上啟動(dòng)torchbiggraph_partitionserver鲁猩。請(qǐng)參閱下面的詳細(xì)信息坎怪。
Tip:
A good default setting is to set?num_machines?to?half?the number of partitions (see below why) and leave?num_partition_servers?unset.
推薦的默認(rèn)設(shè)置是將num_machines設(shè)置為分區(qū)數(shù)量的一半(參見(jiàn)下面的原因),并讓num_partition_servers保持未設(shè)置狀態(tài)廓握。
Once all these commands are started, no more manual intervention is needed.
一旦啟動(dòng)所有這些命令搅窿,就不再需要手動(dòng)干預(yù)。
Warning:
Unpartitioned entity types should not be used with distributed training. While the embeddings of partitioned entity types are only in use on one machine at a time and are swapped between machines as needed, the embeddings of unpartitioned entity types are communicated asynchronously through a poorly-optimized parameter server which was designed for sharing relation parameters, which are small. It cannot support synchronizing large amounts of parameters, e.g. an unpartitioned entity type with more than 1000 entities. In that case, the quality of the unpartitioned embeddings will likely be very poor.
未分區(qū)的實(shí)體不應(yīng)該用于做分布式訓(xùn)練隙券。原因是盡管分區(qū)實(shí)體類型的嵌入一次只能在一臺(tái)計(jì)算機(jī)上使用男应,并可以根據(jù)需要在計(jì)算機(jī)之間進(jìn)行交換,但未分區(qū)實(shí)體類型的嵌入是通過(guò)一個(gè)優(yōu)化不良的參數(shù)服務(wù)器異步通信的娱仔,該服務(wù)器是為共享關(guān)系而設(shè)計(jì)的沐飘,空間很小,所以不支持同步大量參數(shù)牲迫。例如耐朴,具有1000多個(gè)實(shí)體的未分區(qū)實(shí)體類型,在這種情況下盹憎,未劃分的嵌入的質(zhì)量可能非常差筛峭。
Communication protocols 通信協(xié)議
Distributed training requires the machines to coordinate and communicate in various ways for different purposes. These tasks are:
分布式訓(xùn)練包含機(jī)器間協(xié)同和通信的在不同目標(biāo)下的不同方式,包含以下幾種:
1)synchronizing which trainer is operating on which bucket, assigning them so that there are no conflicts
2)passing the embeddings of an entity partition from one trainer to the next one when needed (as this is data that is only accessed by one trainer at a time)
3)sharing parameters that all trainers need access to simultaneously, by collecting and redistributing the updates to them.
1)廣播那個(gè)訓(xùn)練器正在操作哪個(gè)分桶脚乡,通過(guò)分配來(lái)避免沖突蜒滩;
2)在需要時(shí)將實(shí)體分區(qū)的嵌入從一個(gè)訓(xùn)練器傳遞到下一個(gè)訓(xùn)練器(針對(duì)一次只能由一個(gè)訓(xùn)練器訪問(wèn)的數(shù)據(jù)類型)
3)通過(guò)收集和重新分發(fā)更新參數(shù)滨达,共享所有訓(xùn)練器需要同時(shí)訪問(wèn)的參數(shù)。
Each of these is implemented by a separate “protocol”, and each trainer takes part in some or all of them by launching subprocesses that act as clients or servers for the different protocols. These protocols are explained below to provide insight into the system.
上述的方案都被獨(dú)立實(shí)現(xiàn)為“協(xié)議”俯艰,每個(gè)訓(xùn)練器通過(guò)啟動(dòng)子流程作為協(xié)議的一部分或者全部捡遍,在不同的協(xié)議中扮演客戶端或者服務(wù)端的角色。下面將對(duì)這些協(xié)議進(jìn)行說(shuō)明竹握,以深入了解系統(tǒng)画株。
Synchronizing bucket access 同步分桶訪問(wèn)
PBG parallelizes training across multiple machines by having them all operate simultaneously on disjoint buckets (i.e., buckets that don’t have any partition in common). Therefore, each partition is in use by up to one machine at a time, and each machine uses up to two partitions (the only exception is for buckets “on the diagonal”, that have the same left- and right-hand side partition). This means that the number of buckets one can simultaneously train on is about half the total number of partitions.
PBG通過(guò)在多機(jī)上訓(xùn)練不交叉的分桶來(lái)實(shí)現(xiàn)分布式訓(xùn)練(如:分桶間不會(huì)包含有相同的partition)。因此啦辐,每個(gè)分區(qū)一次最多由一臺(tái)機(jī)器使用谓传,每臺(tái)機(jī)器最多使用兩個(gè)分區(qū)(唯一的例外是“對(duì)角線上”的存儲(chǔ)桶,它們具有相同的左側(cè)和右側(cè)分區(qū))芹关。這意味著可以同時(shí)訓(xùn)練的bucket數(shù)量大約是分區(qū)總數(shù)的一半续挟。
The way the machines agree on which one gets to operate on what bucket is through a “lock server”. The server is implicitly started by the trainer of rank 0. All other machines connect to it as clients, ask for a new bucket to operate on (when they need one), get a bucket assigned from the server (or none, if all buckets have already been trained on or are “l(fā)ocked” because their partitions are in use by another trainer), train on it, then report it as done and repeat. The lock server tries to optimize I/O by preferring, when a trainer asks for a bucket, to assign one that has as many partitions in common with the previous bucket that the trainer trained on, so that these partitions can be kept in memory rather than having to be unloaded and reloaded.
這種方式讓集群通過(guò)“lock server”商定哪個(gè)機(jī)器可以對(duì)哪個(gè)桶進(jìn)行操作。服務(wù)器隱式的從排序?yàn)?訓(xùn)練器開(kāi)始侥衬,其他所有機(jī)器作為他的客戶端诗祸,請(qǐng)求一個(gè)新的分桶來(lái)開(kāi)始訓(xùn)練操作(當(dāng)他們需要新的分桶時(shí)),獲得一個(gè)服務(wù)器指定的分桶(或者未獲得轴总,如果所有的分桶都已經(jīng)被訓(xùn)練或者在“l(fā)ocked”狀態(tài)直颅,即該partitions被其他訓(xùn)練器使用中),開(kāi)始訓(xùn)練怀樟,訓(xùn)練完成后返回并重復(fù)上述動(dòng)作功偿。lock server 優(yōu)化I/O的策略:當(dāng)訓(xùn)練器請(qǐng)求bucket時(shí),盡可能的分配和上一個(gè)bucket相鄰的bucket到的相同的訓(xùn)練器上往堡,這樣這些分區(qū)就可以保存在內(nèi)存中械荷,而不必卸載和重新加載。
Exchanging partition embeddings 分區(qū)交換嵌入
When a trainer starts operating on a bucket it needs access to the embeddings of all entities (of all types) that belong to either the left- or the right-hand side partition of the bucket. The “l(fā)ocking” mechanism of the lock server ensures that at most one trainer is operating on a partition at any given time. This doesn’t hold for unpartitioned entity types, which are shared among all trainers; see below. Thus each trainer has exclusive hold of the partitions it’s training on.
當(dāng)一個(gè)訓(xùn)練器開(kāi)始對(duì)一個(gè)分桶的數(shù)據(jù)操作的時(shí)候投蝉,它需要訪問(wèn)所有實(shí)體的嵌入向量embeddings养葵,這些向量不然是分桶的左鄰接節(jié)點(diǎn),不然是右鄰接節(jié)點(diǎn)瘩缆。locking 機(jī)制通過(guò)lock server確保在給定時(shí)刻內(nèi)只有一個(gè)訓(xùn)練器對(duì)partition進(jìn)行操作关拒。但是這無(wú)法處理未分區(qū)的實(shí)體類型,這需要再所有訓(xùn)練器中共享庸娱,參照下文着绊。因此保證了每個(gè)訓(xùn)練器對(duì)自己持有的partition是獨(dú)占的。
Once a trainer starts working on a new bucket it needs to acquire the embeddings of its partitions, and once it’s done it needs to release them and make them available, in their updated version, to the next trainer that needs them. In order to do this, there’s a system of so-called “partition servers” that store the embeddings, provide them upon request to the trainers who need them, receive back the updated embedding and store it.
一旦一個(gè)訓(xùn)練器開(kāi)始在新的bucket上工作熟尉,就需要獲取其分區(qū)的嵌入归露,在完成后釋放,并在其更新版本中提供給下一個(gè)需要它們的訓(xùn)練器斤儿。為了支持上述過(guò)程剧包,有一個(gè)顧名思義叫“partition servers”的系統(tǒng)來(lái)存儲(chǔ)嵌入向量恐锦,提供給需要這些嵌入的訓(xùn)練器并接收對(duì)這些向量的更新并存儲(chǔ)。
This service is optional, and is disabled when?num_partition_servers?is set to zero. In that case the trainers “send” each other the embeddings simply by writing them to the checkpoint directory (which should reside on a shared disk) and then fetching them back from there.
這個(gè)服務(wù)是可選的疆液,當(dāng)吧num_partition_servers設(shè)置為0時(shí)被禁用一铅。在這種情況下,訓(xùn)練器只需將嵌入內(nèi)容寫入檢查點(diǎn)目錄(該目錄應(yīng)位于共享磁盤上)堕油,然后其他訓(xùn)練器從那里取回潘飘,就可以相互“傳送”。
When this system is enabled, it can operate in two modes. The simplest mode is triggered when?num_partition_servers?is -1 (the default): in that case all trainers spawn a local process that acts as a partition server. If, on the other hand,?num_partition_servers?is a positive value then the trainers will not spawn any process, but will instead connect to the partition servers that the user must have provisioned manually by launching the?torchbiggraph_partitionserver?command on the appropriate number of machines.
啟用此系統(tǒng)后掉缺,它可以在兩種模式下工作卜录。最簡(jiǎn)單的模式是我們把num_partition_servers設(shè)置為-1(默認(rèn)):在這種情況下,所有訓(xùn)練器會(huì)生成一個(gè)本地進(jìn)程作為partition服務(wù)眶明。另外一種模式則num_partition_servers是一個(gè)正值艰毒,那么訓(xùn)練器將不會(huì)生成任何進(jìn)程,但是我們需要通過(guò)在適當(dāng)數(shù)量的機(jī)器上手動(dòng)通過(guò)torchbiggraph_partitionserver命令來(lái)啟動(dòng)該系統(tǒng)赘来。
Updating shared parameters 共享參數(shù)更新
Some parameters of the model need to be used by all trainers at the same time (this includes the operator weights, the global embeddings of each entity type, the embeddings of the unpartitioned entities). These are parameters that don’t depend on what bucket the trainer is operating on, and therefore are always present on all trainers (as opposed to the entity embeddings, which are loaded and unloaded as needed). These parameters are synchronized using a series of “parameter servers”. Each trainer starts a local parameter server (in a separate subprocess) and connects to all other parameter servers. Each parameter that is shared between trainers is then stored in a parameter server (possibly sharded across several of them, if too large). Each trainer also has a loop (also in a separate subprocess) which, at regular intervals, goes over each shared parameter, computes the difference between its current local value and the value it had when it was last synced with the server where the parameter is hosted and sends that delta to that server. The server, in turn, accumulates all the deltas it receives from all trainers, updates the value of the parameter and sends this new value back to the trainers. The parameter server performs throttling to 100 updates/s or 1GB/s, in order to prevent the parameter server from starving the other communication.
有些模型參數(shù)需要一直被全局的訓(xùn)練器訪問(wèn)(包含操作器operator的權(quán)重现喳,每個(gè)實(shí)體類型的全局嵌入,未分區(qū)實(shí)體的嵌入向量)犬辰。有些參數(shù)不依賴于訓(xùn)練器處理的分桶,但所有訓(xùn)練器都需要常駐以使用到(和跟進(jìn)需要反復(fù)加載和卸載的實(shí)體嵌入相反)冰单。這些參數(shù)通過(guò)一組“parameter servers”來(lái)同步幌缝,每個(gè)訓(xùn)練器啟動(dòng)一個(gè)本地的參數(shù)服務(wù)器(一個(gè)獨(dú)立的子進(jìn)程)并且和其他參數(shù)服務(wù)器連接。訓(xùn)練器間共享每個(gè)參數(shù)并且存儲(chǔ)在parameter server(如果太大的話可能是分片)诫欠。每個(gè)訓(xùn)練器也有一個(gè)循環(huán)(也在一個(gè)單獨(dú)的子進(jìn)程中)定期遍歷每個(gè)共享參數(shù)涵卵,計(jì)算其當(dāng)前本地值和上次與參數(shù)的服務(wù)器同步時(shí)的值之間的差值,并將該差值發(fā)送給服務(wù)器荒叼。服務(wù)器依次累加從所有訓(xùn)練器接收到的所有增量轿偎,更新參數(shù)值并將此新值發(fā)送回訓(xùn)練器。同時(shí)被廓,為了防止參數(shù)服務(wù)器使其他通信不足坏晦,參數(shù)服務(wù)器限制100個(gè)更新/s或1Gb/s。
Todo:
Mention?distributed_tree_init_order.
提及distributed_tree_init_order.