一. Spark rpc概述
首先說明RPC,引用百度百科:
RPC(Remote Procedure Call)—遠程過程調(diào)用位迂,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務,而不需要了解底層網(wǎng)絡技術的協(xié)議阔加。RPC協(xié)議假定某些傳輸協(xié)議的存在脑漫,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)袋坑。
Spark RPC可以說 是 Spark 分布式集群的基礎仗处,若是將 Spark 類比為一個人的話,Spark RPC 就是這個人的血液部分枣宫。
有一位大神將 Spark RPC 中的 RPC 部分剝離出來婆誓,弄成一個新的可運行的 RPC 項目,地址在這Spark RPC也颤。
雖然名字不一樣洋幻,但這個項目的類和內(nèi)容基本和 Spark Core 中 RPC 部分的代碼和結構基本是一樣的,可以通過這個來學習 Spark RPC翅娶。
PS:所用 spark 版本:spark 2.1.0
二. Spark RPC 文留,從簡單的例子開始
接下來我們來演示如何下載并運行最簡單的 Hello World 中的例子。
首先故觅,我使用的編譯器是 IDEA 厂庇,通過 idea 將 github 上的代碼 clone 下來。
可以看到項目目錄下有兩個模塊输吏,
- kraps-rpc
- kraps-rpc-example
我們要做的即是運行 kraps-rpc-example 中的代碼权旷。
啟動 PRC 的話首先需要啟動 Server 端,開啟監(jiān)聽服務贯溅,這里在 HelloworldServer.scala 中都已經(jīng)幫我們寫好拄氯,不過在 main 方法中需要修改一下內(nèi)容,就是將 host 改為本機地址它浅。
def main(args: Array[String]): Unit = {
// val host = args(0)
val host = "localhost"
val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()
}
然后我們只需要右鍵該文件然后執(zhí)行即可译柏。
然后到 HelloworldClient 文件中,這里面提供了同步和異步兩個方法可以運行姐霍。代碼同樣都已經(jīng)寫好鄙麦,通過修改注釋即可使用不同的方法運行典唇。同樣是右鍵點擊該文件執(zhí)行。
def main(args: Array[String]): Unit = {
//異步方法
//asyncCall()
//同步方法
syncCall()
}
異步方法中胯府, ask 會返回一個 Future 介衔。在 Future 運行結果出來前,我們可以去做其他事情骂因。scala 中的 Future 和 Java 的 Future 有些不同炎咖,不過這可以先不去管,先當作 Java 里面的 Future 即可寒波。
def asyncCall() = {
val rpcConf = new RpcConf()
val config = RpcEnvClientConfig(rpcConf, "hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
future.onComplete {
case scala.util.Success(value) => println(s"Got the result = $value")
case scala.util.Failure(e) => println(s"Got error: $e")
}
Await.result(future, Duration.apply("3s"))
//在future結果運行出來前乘盼,會先打印這條語句。
println("print me at first!")
Thread.sleep(7)
}
而同步方法是直接將結果返回俄烁,并且會阻塞绸栅,直到結果返回。
def syncCall() = {
val rpcConf = new RpcConf()
val config = RpcEnvClientConfig(rpcConf, "hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
val result = endPointRef.askWithRetry[String](SayBye("neo"))
println(result)
}
很簡單是吧猴娩,接下來我們先來了解一些 Spark RPC 運行過程中至關重要的兩個編程模型阴幌,以及在這其中使用到的一些主要的類勺阐。
三. Spark RPC 中各類說明
Spark RPC 是使用了 Actor 模型和 Reactor 模型的混合模式卷中,我們結合兩種模型分別說明 Spark RPC 中各個類的作用:
首先我們先來看 Spark RPC 的類圖。
是不是感覺很亂渊抽?沒事蟆豫,我們來逐步剖析各個類。
Spark RPC 主要用到了 Actor 模型 和 Reactor 模型懒闷,我們從這兩個模型的角度來拆解十减。
Actor 模型
其實之前也有寫過一篇介紹 Actor 模型的文章,感興趣的同學可以點擊這里查看 Actor模型淺析 一致性和隔離性愤估。
其實 Actor 主要就是這副圖的內(nèi)容:
RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem
我們逐個來看:
RpcEnv --RPC Environment
RPC Environment 是 RpcEndpoint 的運行環(huán)境帮辟。它管理 RpcEndpoint 的整個生命周期:
- 通過名字或 URI 注冊 RpcEndpoint。
- 對到底的消息進行路由玩焰,決定分發(fā)給哪個 RpcEndpoint由驹。
- 停止 RpcEndpoint。
RPC Environment在 akka 已經(jīng)被移除的2.0后面版本中昔园,RPC Environment 的實現(xiàn)類是 NettyRpcEnv蔓榄。通常是由 NettyRpcEnvFactory.create 創(chuàng)建。
RpcEndpoint
RpcEndpoint 能通過 callbacks 接收消息默刚。通常需要我們自己寫一個類繼承 RpcEndpoint 甥郑。編寫自己的接收信息和返回信息規(guī)則。
RpcEndpoint 的生命周期被 RPC Environment 管理荤西。其生命周期包括澜搅,onStart, receive 和 onStop伍俘。
它是作為服務端,比如上面例子中的 HelloworldServer 就是一個 RpcEndpoint 勉躺。
RpcEndpointRef
RpcEndpointRef 是 RpcEndpoint 在 RPC Environment 中的一個引用养篓。
它包含一個地址(即 Spark URL)和名字。RpcEndpointRef 作為客戶端向服務端發(fā)送請求并接收返回信息赂蕴,通沉可以選擇使用同步或異步的方式進行發(fā)送。
Reactor 模型
我們可以從一張圖來看 Reactor 的架構概说。
使用Reactor模型碧注,由底層netty創(chuàng)建的EventLoop做I/O多路復用,這里使用Multiple Reactors這種形式糖赔,如上圖所示萍丐,從netty的角度而言,Main Reactor 和 Sub Reactor 對應 BossGroup 和 WorkerGroup 的概念放典,前者負責監(jiān)聽 TCP 連接逝变、建立和斷開,后者負責真正的 I/O 讀寫奋构。
而圖中的 ThreadPool 就是的 Dispatcher 中的線程池壳影,它來解耦開來耗時的業(yè)務邏輯和 I/O 操作,這樣就可以更 scalabe弥臼,只需要少數(shù)的線程就可以處理成千上萬的連接宴咧,這種思想是標準的分治策略,offload 非 I/O 操作到另外的線程池径缅。
Dispatcher
Dispatcher 的主要作用是保存注冊的RpcEndpoint掺栅、分發(fā)相應的Message到RpcEndPoint中進行處理。Dispatcher 即是上圖中 ThreadPool的角色纳猪。它同時也維系一個 threadpool氧卧,用來處理每次接受到的 InboxMessage 。而這里處理 InboxMessage 是通過 inbox 實現(xiàn)的氏堤。
Inbox
Inbox 其實屬于 Actor 模型沙绝,是 Actor 中的信箱,不過它和 Dispatcher 聯(lián)系緊密所以放這邊丽猬。
InboxMessage 有多個實現(xiàn)它的類宿饱,比如 OneWayMessage,RpcMessage脚祟,等等谬以。Dispatcher會將接收到的 InboxMessage 分發(fā)到對應 RpcEndpoint 的 Inbox 中,然后 Inbox 便會處理這個 InboxMessage 由桌。
OK为黎,這次就先介紹到這里邮丰,下次我們從代碼的角度來看 Spark RPC 的運行機制
如果覺得對你有幫助,不妨關注一波吧~~