前置知識(shí):
前置知識(shí)1:Actor的創(chuàng)建
Props代表的是一個(gè)不可變的配置類(lèi)涡贱,創(chuàng)建Actor時(shí),必須要用到畜隶;它由兩部分構(gòu)成
- 創(chuàng)建的Actor類(lèi):akkaRpcActorType,它是Class類(lèi)型
- 調(diào)用上述Class的構(gòu)造方法使用的參數(shù):剩余的參數(shù)
比如,如下代碼用來(lái)創(chuàng)建持有通信實(shí)體TaskExecutor引用的Actor
Props.create(AkkaRpcActor.class,
TaskExecutor實(shí)例,
actorTerminationFuture,
getVersion(),
configuration.getMaximumFramesize())
看看AkkaRpcActor的構(gòu)造方法船万,剩余參數(shù)與構(gòu)造方法入?yún)⒁灰粚?duì)應(yīng)上了
AkkaRpcActor(
final T rpcEndpoint,
final CompletableFuture<Boolean> terminationFuture,
final int version,
final long maximumFramesize)
每個(gè)Actor在創(chuàng)建時(shí)都會(huì)返回一個(gè)ActorRef刻撒,用來(lái)與Actor通信。
前置知識(shí)2:flink使用的Actor
- flink中所有的通信實(shí)體耿导,都需要繼承RpcEndpoint抽象類(lèi)声怔;flink中常見(jiàn)的組件,如JobMaster舱呻、Dispatcher醋火、ResourceManager、TaskExecutor等都直接或間接繼承自RpcEndpoint箱吕。在flink使用的Actor有如下幾種:
- SupervisorActor:用來(lái)創(chuàng)建AkkaRpcActor胎撇、FencedAkkaRpcActor,它們用來(lái)接收對(duì)應(yīng)的ActorRef發(fā)送的消息
- AkkaRpcActor:RpcEndpoint使用的actor
- FencedAkkaRpcActor: FencedRpcEndpoint使用的actor殖氏,相較于RpcEndpoint增加了防護(hù)令牌晚树,F(xiàn)encedAkkaRpcActor繼承自 AkkaRpcActor
前置知識(shí)3:動(dòng)態(tài)代理
RPC過(guò)程
-
那么RpcEndpoint與Actor的關(guān)系是怎樣的?
akka-rpc-整體圖
如上圖雅采,每個(gè)Actor都持有RpcEndpoint的引用爵憎,當(dāng)Actor接收到具體RPC消息后,會(huì)調(diào)用底層的RpcEndpoint實(shí)現(xiàn)類(lèi)來(lái)干活婚瓜。整個(gè)RPC流程如下:
- 構(gòu)建代理$Proxy宝鼓,它是發(fā)送消息的入口,這個(gè)過(guò)程等會(huì)再描述
- 用戶調(diào)用$Proxy的方法干活巴刻,實(shí)際會(huì)調(diào)用AkkaInvocationHandler的invoke方法愚铡,handler就能獲取到對(duì)應(yīng)的方法簽名與傳參,將<方法簽名胡陪,傳參>封裝為消息沥寥,通過(guò)ActorRef發(fā)送給AkkaActor
- 利用反射獲取底層的RpcEndpoint實(shí)現(xiàn)類(lèi)的Class對(duì)象,通過(guò)<方法簽名柠座,傳參Class對(duì)象>就可以獲取具體的實(shí)現(xiàn)邑雅,然后執(zhí)行即可
- 如果需要返回,則返回結(jié)果給ActorRef
- 那Actor會(huì)處理什么信息妈经?每個(gè)Actor都會(huì)重寫(xiě)AbstractActor的
createReceive
方法淮野,當(dāng)該actor接收到信息時(shí),會(huì)根據(jù)消息的類(lèi)型調(diào)用相應(yīng)的方法來(lái)進(jìn)行處理吹泡,比如SupervisorActor的實(shí)現(xiàn):
@Override
public Receive createReceive() {
return receiveBuilder()
// 偏函數(shù)骤星,當(dāng)消息的類(lèi)型是StartAkkaRpcActor,則使用createStartAkkaRpcActorMessage方法來(lái)處理
.match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage)
// 否則爆哑,對(duì)于其它的任意類(lèi)型洞难,都使用handleUnknownMessage方法來(lái)處理
.matchAny(this::handleUnknownMessage)
.build();
}
問(wèn)題1:對(duì)于本地的RPC,$Proxy怎么構(gòu)建泪漂?
最典型的場(chǎng)景莫過(guò)于JobMaster的SlotPoolImpl向ResourceManager請(qǐng)求資源了廊营。整個(gè)過(guò)程如下
- 在JobMaster創(chuàng)建時(shí),會(huì)傳入ResourceManager的actor地址targetAddress
- 當(dāng)JobMaster請(qǐng)求資源時(shí)萝勤,AkkaRpcService會(huì)通過(guò)targetAddress創(chuàng)建連接ResourceManager的ActorRef
- 然后通過(guò)該ActorRef就可以構(gòu)建FencedAkkaInvocationHandler露筒,然后構(gòu)建代理,由于代理也實(shí)現(xiàn)了ResourceManagerGateway敌卓,該代理就被強(qiáng)轉(zhuǎn)為大家看到的ResourceManagerGateway慎式。當(dāng)在看代碼的時(shí)候,發(fā)現(xiàn)以GateWay結(jié)尾(通常都實(shí)現(xiàn)了FencedRpcGateway或RpcGateway接口)的基本都是上圖中的$Proxy趟径,它們是發(fā)送消息給Actor的入口
- SlotPoolImpl就可以通過(guò)ResourceManagerGateway與ResourceManager通信了
問(wèn)題2:對(duì)于遠(yuǎn)程的RPC瘪吏,$Proxy怎么構(gòu)建?
最典型的場(chǎng)景莫過(guò)于TaskExecutor(即文檔中說(shuō)的TaskManager蜗巧,用來(lái)提供slot與執(zhí)行任務(wù)的掌眠,它也是RpcEndpoint)啟動(dòng)時(shí)調(diào)用自己的onStart方法向ResourceManager注冊(cè)。TaskExecutor持有了ResourceManager的地址幕屹,過(guò)程和上面一致
問(wèn)題2:消息怎么返回蓝丙?返回給誰(shuí)?
ActorRef就是客戶端望拖,也是返回給它
// rpc渺尘,就像在本地調(diào)用一樣
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
// rpcEndpoint就是目標(biāo)通信實(shí)體的ActorRef
protected CompletableFuture<?> ask(Object message, Time timeout) {
return FutureUtils.toJava(
Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
}