以太坊節(jié)點(diǎn)間通信是指本地節(jié)點(diǎn)和peer節(jié)點(diǎn)之間的按照p2p線上協(xié)議標(biāo)準(zhǔn)實(shí)現(xiàn)的數(shù)據(jù)收發(fā)過程亲配。其中peer節(jié)點(diǎn)從發(fā)現(xiàn)協(xié)議維護(hù)的活躍節(jié)點(diǎn)列表中獲取。
p2p消息傳輸協(xié)議詳細(xì)介紹請(qǐng)看:https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol
ethereum線上傳輸協(xié)議詳細(xì)介紹請(qǐng)看:https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
要說明p2p和eth線上協(xié)議,涉及到軟件架構(gòu)中的3層:
- service層: 提供PeerManager和ChainService
- protocol層: 實(shí)現(xiàn)p2p-protocol 和eth-protocol命令報(bào)文的收發(fā)
- network層: 接收tcp連接的原始數(shù)據(jù)并解析,分發(fā)給相應(yīng)的協(xié)議
下面主要分析p2p-protocol和PeerManager抒寂,來說明這幾層之間的交互流程洪灯。
1. 概述
本地節(jié)點(diǎn)需要和指定數(shù)量的peer維持tcp連接, 如果連接的節(jié)點(diǎn)數(shù)不足, 從節(jié)點(diǎn)發(fā)現(xiàn)協(xié)議維護(hù)的活躍節(jié)點(diǎn)列表中動(dòng)態(tài)獲取節(jié)點(diǎn)信息。
本地節(jié)點(diǎn)監(jiān)聽peer的tcp連接, 當(dāng)有peer連接時(shí)白群,為每個(gè)peer都維護(hù)一個(gè)Peer數(shù)據(jù)結(jié)構(gòu)尚胞,在這個(gè)Peer結(jié)構(gòu)中實(shí)現(xiàn)了protocol層和network層:network層負(fù)責(zé)消息的收發(fā), 并分發(fā)給protocol層的p2pProtocol和ethProtocol。
2. network層
網(wǎng)絡(luò)層的詳細(xì)功能將下圖:
- PeerManager服務(wù)啟動(dòng)時(shí)帜慢,在p2p端口上啟動(dòng)tcp連接的監(jiān)聽笼裳;
- PeerManager服務(wù)啟動(dòng)時(shí),如果配置有bootstrap, 會(huì)主動(dòng)進(jìn)行連接粱玲;
- 無論主動(dòng)還是被動(dòng)連接到一個(gè)peer, 都會(huì)建立一個(gè)Peer結(jié)構(gòu), 配置protocol層的協(xié)議(
p2pprotocol,ethprotocol
)躬柬; - 在Peer結(jié)構(gòu)初始化時(shí),使能network層的數(shù)據(jù)收發(fā)功能抽减;
- 本node和peer建立tcp連接后, 首先需要進(jìn)行加密認(rèn)證:
5.1 在初始化Peer結(jié)構(gòu)的多路復(fù)用的會(huì)話分發(fā)器時(shí), 作為連接的發(fā)起者(發(fā)起者知道對(duì)端的公鑰), 會(huì)發(fā)送一個(gè)hello認(rèn)證消息允青;
5.2 等發(fā)送協(xié)程起來后,hello認(rèn)證消息就會(huì)被發(fā)送出去;
5.3 對(duì)端的Peer結(jié)構(gòu)也是首次收到消息,即該hello認(rèn)證消息, 在對(duì)端的分發(fā)器層需要專門處理該hello認(rèn)證消息卵沉;
5.4 對(duì)端驗(yàn)證該消息的簽名, 回復(fù)一個(gè)認(rèn)證確認(rèn)消息颠锉;
5.5 然后本端和對(duì)端都發(fā)一個(gè)hello認(rèn)證消息,觸發(fā)定時(shí)發(fā)送ping包,進(jìn)行心跳狈ㄉ活; - 消息的發(fā)送和接收都是通過緩存隊(duì)列異步執(zhí)行的木柬,方便控制發(fā)送速率和數(shù)據(jù)塊大小皆串。
- 首次和peer通信時(shí),peer發(fā)過來的hello報(bào)文后, 該函數(shù)檢查是否允許連接該peer是否超過配置最大的連接數(shù), 超過了則不允許連接眉枕;是否是重復(fù)連接恶复。
2.1 消息發(fā)送
- 每個(gè)Peer都有一個(gè)send_packet接口,用戶發(fā)送已經(jīng)組裝好的消息報(bào)文速挑;
- 發(fā)送的消息目前只支持p2pProtocol和ethProtocol協(xié)議層的格式谤牡;
- 消息送給消息復(fù)用分發(fā)器mux,組裝成Frame格式
3.1 組裝成Frame是為了RLP序列化編碼姥宝,幀數(shù)據(jù)提供了報(bào)文的大小翅萤,報(bào)文的協(xié)議;
3.2 Frame有2種形式:?jiǎn)螏鄮?br> 3.3 單幀的格式如下:header || header-mac || frame || mac
3.4 多幀的格式如下:
header || header-mac || frame-0 || [ header || header-mac || frame-n || ... || ] header || header-mac || frame-last || mac
消息也可以帶有優(yōu)先級(jí),因此最蕾, 消息在組裝Frame后,根據(jù)類型分別進(jìn)入3中不同類型的隊(duì)列:normal隊(duì)列胚泌,chunked隊(duì)列,priority隊(duì)列肃弟;
從上面3個(gè)隊(duì)列中根據(jù)pws(protocol-window-size)大小和隊(duì)列中frame數(shù)據(jù)組裝成數(shù)據(jù)流玷室,方法如下:
5.1 normal隊(duì)列和priority隊(duì)列中存在Frame:分別獲取pws/2字節(jié)的報(bào)文數(shù)據(jù)(以frame為單位)
5.2 chunked隊(duì)列和priority隊(duì)列中存在Frame:分別獲取pws/2字節(jié)的報(bào)文數(shù)據(jù)(以frame為單位)
5.3 normal隊(duì)列和chunked隊(duì)列中存在Frame:分別獲取pws/2字節(jié)的報(bào)文數(shù)據(jù)(以frame為單位)
5.4 其他情況,從有數(shù)據(jù)的隊(duì)列中獲取pws字節(jié)的frame笤受;
5.5 所有獲取的數(shù)據(jù)字節(jié)數(shù)大于pws才可以發(fā)送穷缤;對(duì)上面取出來的frame數(shù)據(jù)進(jìn)行加密,在放入發(fā)送隊(duì)列:message_queue箩兽;
線程_run_egress_message監(jiān)聽該隊(duì)列津肛, 發(fā)送最終的數(shù)據(jù)給peer;
2.2 消息接收
收到原始的tcp數(shù)據(jù)流后汗贫, 需要解析為消息身坐, 并根據(jù)protocolID和cmdID分發(fā)給p2pProtocol和ethProtocol協(xié)議。
3. 協(xié)議層
protocol層處于service層和network層之間芳绩,實(shí)現(xiàn)了一系列的命令掀亥。
3.1 命令消息
在協(xié)議層實(shí)現(xiàn)p2p線上協(xié)議定義的命令撞反。
協(xié)議層的實(shí)例比如ethProtocol協(xié)議實(shí)例妥色,對(duì)于每個(gè)命令消息,在初始化時(shí)都會(huì)生成下面3個(gè)方法(X是命令名字):
protocol.create_X(*args, **kargs)
protocol._receive_X(data)
protocol.send_X(*args, **kargs)
其中send_X
是 protocol.send_packet(protocol.create_X(*args, **kargs))
的簡(jiǎn)寫遏片。
上圖是ethProtocol協(xié)議實(shí)例舉例嘹害,比如發(fā)送和接收block對(duì)應(yīng)的接口分別是send_newblock()
撮竿,_receive_newblock()
處理, 最后調(diào)用注冊(cè)的回調(diào)函數(shù): receive_newblock_callbacks()
。
3.2 接收命令報(bào)文
接收從網(wǎng)絡(luò)層的報(bào)文的入口是 protocol.receive_packet
笔呀。
在 protocol.receive_packet
中:
- 從peer的netwwork層接收?qǐng)?bào)文幢踏;
- 根據(jù)命令的結(jié)構(gòu),對(duì)數(shù)據(jù)進(jìn)行反序列化并保存為dict;
- 根據(jù)cmd_id得到cmd_name许师,執(zhí)行cmd_name對(duì)應(yīng)的receiveX(...)函數(shù),即command.receive()房蝉;command.receive()中默認(rèn)執(zhí)行的是注冊(cè)的回調(diào)函數(shù):
protocol.receive_X_callbacks
; - 依次執(zhí)行注冊(cè)的callbacks微渠;
3.3 發(fā)送命令報(bào)文
直接調(diào)用send_X函數(shù)搭幻。
4. P2PProtocol協(xié)議層
P2PProtocol有4個(gè)命令:
-
hello: cmd_id = 0 握手報(bào)文
發(fā)送: send_hello(...)
接收: 在hello命令的receive中
- 先注冊(cè)ethProtocol, 實(shí)現(xiàn)位置:
proto.peer.receive_hello(proto, **data)
- 啟動(dòng)定時(shí)發(fā)送ping的任務(wù):
BaseProtocol.command.receive(self, proto, data)
- 先注冊(cè)ethProtocol, 實(shí)現(xiàn)位置:
disconnect: cmd_id = 1 關(guān)閉連接報(bào)文
ping: cmd_id = 2 保活的心跳報(bào)文逞盆,定時(shí)發(fā)送ping給peer檀蹋;
pong: cmd_id = 3 保活的心跳報(bào)文,收到ping后云芦,回一個(gè)pong俯逾;
p2pprotocol還包括一個(gè)連接監(jiān)視器:
self.monitor = ConnectionMonitor(self)
在ConnectionMonitor中, 設(shè)置了pong消息和hello消息的回調(diào)函數(shù):
self.proto.receive_pong_callbacks.append(self.track_response)
self.proto.receive_hello_callbacks.append(lambda p, **kargs: self.start())
receive_pong_callbacks
: 計(jì)算ping和pong的時(shí)間間隔
receive_hello_callbacks
: 啟動(dòng)定時(shí)發(fā)送ping 的保活消息任務(wù): 計(jì)算ping消息是否超時(shí)
5. 服務(wù)層: PeerManager
Peermanager負(fù)責(zé)和peer的連接,并維護(hù)peer結(jié)構(gòu).
5.1 peer連接
peer連接分2種:我去連peer, 等待peer來連我舅逸。
等待peer來連我(不知道peer的公鑰)
在初始化時(shí), 已經(jīng)在p2p監(jiān)聽端口上啟動(dòng)了一個(gè)tcp服務(wù), 監(jiān)聽tcp客戶端的連接桌肴。
如果新的連接進(jìn)來, 啟動(dòng)一個(gè)協(xié)程執(zhí)行已注冊(cè)的處理函數(shù): _on_new_connection
, 流程如下:
peer = self._start_peer(connection, address)
- 創(chuàng)建一個(gè)Peer結(jié)構(gòu)(繼承自Greenlet)
peer = Peer(self, connection, remote_pubkey=remote_pubkey)
,詳細(xì)過程見下面小節(jié). - 設(shè)置peer掛掉時(shí)的回調(diào)函數(shù)
peer.link(on_peer_exit)
- 保存peer
self.peers.append(peer)
- 調(diào)度peer運(yùn)行
peer.start()
. 最終通過_run啟動(dòng)3個(gè)協(xié)程, 處理network層的原始數(shù)據(jù)分發(fā),詳見下面小節(jié).
主動(dòng)連接peer(知道peer的公鑰)
有2種情況需要主動(dòng)連接peer:
- 在初始化時(shí),如果配置了bootstrap堡赔,主動(dòng)連接bootstrap识脆;
- 監(jiān)視協(xié)程_discovery_loop線程定時(shí)檢查,檢測(cè)到連接的peer數(shù)量少于指定數(shù)量時(shí)善已, 從節(jié)點(diǎn)發(fā)現(xiàn)協(xié)議kademlia維護(hù)的活躍節(jié)點(diǎn)列表中選取節(jié)點(diǎn)進(jìn)行連接灼捂;
peer = self._start_peer(connection, address, remote_pubkey)
5.2 peer廣播
接口:broadcast(protocol, command_name, ...)
這是一個(gè)通用函數(shù), 可以廣播指定協(xié)議的指定命令. 可以指定廣播的peer數(shù)量, 和指定排除那些peer.
對(duì)于符合要求的peer, 根據(jù)command_name 找到對(duì)應(yīng)的send_<command_name>函數(shù),, 發(fā)送要求的數(shù)據(jù).
6. Peer
peer實(shí)現(xiàn)network層和protocol層, 主要功能實(shí)際上在上文中已經(jīng)詳細(xì)介紹:發(fā)送消息, 接收數(shù)據(jù)并解析, 分發(fā)給相應(yīng)的Protocol協(xié)議。
6.1 peer的初始化
- 創(chuàng)建一個(gè)加密的多路復(fù)用的command分發(fā)器
self.mux = MultiplexedSession(privkey, hello_packet, remote_pubkey=remote_pubkey)
,在MultiplexedSession
初始化中:
1.1hello_packet
報(bào)文是p2p協(xié)議的cmdID=0的協(xié)議報(bào)文
1.2 新建一個(gè)rlpx_session,用于加密會(huì)話數(shù)據(jù)
1.3 如果已知peer的公鑰,和peer進(jìn)行加密握手:_send_init_msg()
auth_msg = self.rlpx_session.create_auth_message(self._remote_pubkey)#創(chuàng)建認(rèn)證消息 auth_msg_ct = self.rlpx_session.encrypt_auth_message(auth_msg) #加密認(rèn)證消息 self.message_queue.put(auth_msg_ct) #發(fā)送加密后的消息
- 注冊(cè)p2p協(xié)議, 啟動(dòng)和peer連接的服務(wù)
self.connect_service(self.peermanager)
2.1 實(shí)例化一個(gè)線上協(xié)議: P2PProtocol
2.2 在復(fù)用器中增加該協(xié)議:self.mux.add_protocol(protocol.protocol_id)
2.3 啟動(dòng)該協(xié)議(P2PProtocol):protocol.start()
换团。執(zhí)行的是baseProtocol的start(), 注冊(cè)自定義的收到命令消息后的回調(diào)函數(shù)
6.2 Peer在network層收發(fā)數(shù)據(jù)
在收到一個(gè)新的peer的連接到時(shí)候, 實(shí)例化一個(gè)Peer結(jié)構(gòu), 然后會(huì)觸發(fā)peer.start()
, 創(chuàng)建3個(gè)協(xié)程分別處理收發(fā)報(bào)文:
_run_ingress_message
- 等待讀事件:
self.safe_to_read.wait()
- 每次讀取最多4k數(shù)據(jù):
imsg = self.connection.recv(4096)
- 將消息加到分發(fā)器中:
self.mux.add_message(imsg)
3.1 對(duì)消息進(jìn)行解碼, 因?yàn)槭橇鲾?shù)據(jù), 需要解析消息,并對(duì)消息進(jìn)行解密
3.2add_message
有兩個(gè)定義
- 第一次收到消息時(shí)處理函數(shù)是
_add_message_during_handshake
- 如果是連接的發(fā)起者, 收到的第一個(gè)消息是認(rèn)證確認(rèn)消息,如果是連接的接收者 , 收到的第一個(gè)消息是hello認(rèn)證消息
- 解密消息,并驗(yàn)證簽名(對(duì)端使用本節(jié)點(diǎn)的公鑰進(jìn)行簽名)
- 如果是連接的發(fā)起者, 再發(fā)送一個(gè)hello認(rèn)證消息給對(duì)端, 如果是連接的接收者, 驗(yàn)證成功后, 回復(fù)一個(gè)認(rèn)證確認(rèn)消息, 然后需要再發(fā)送一個(gè)hello認(rèn)證消息給對(duì)端
- 以后的消息處理入口是:
_add_message_post_handshake
_run_decoded_packets
- 從packet_queue獲取報(bào)文數(shù)據(jù)
- 解析消息,并對(duì)每個(gè)消息解析protocol, cmd_id
- 發(fā)給protocol層的receive_packet(pkt)
_run_egress_message
- 從message_queue中獲取報(bào)文,并發(fā)送
- 剛啟動(dòng)該監(jiān)聽任務(wù)時(shí), 如果是本連接的發(fā)起者, 那么隊(duì)列中已經(jīng)有一個(gè)hello的認(rèn)證消息, 此時(shí)會(huì)馬上將該消息發(fā)送出去.