前言
SOFA 包含了 RPC 框架集侯,底層通信框架是 bolt ,基于 Netty 4盟迟,今天將通過 SOFA—RPC 源碼中的例子裆装,看看他是如何發(fā)布一個服務的踱承。
示例代碼
下面的代碼在 com.alipay.sofa.rpc.quickstart.QuickStartServer
類下倡缠。
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 設置一個協(xié)議,默認bolt
.setPort(9696) // 設置一個端口茎活,默認12200
.setDaemon(false); // 非守護線程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定實現(xiàn)
.setServer(serverConfig); // 指定服務端
providerConfig.export(); // 發(fā)布服務
首先毡琉,創(chuàng)建一個 ServerConfig ,包含了端口妙色,協(xié)議等基礎信息,當然慧耍,這些都是手動設定的身辨,在該類加載的時候,會自動加載很多配置文件中的服務器默認配置芍碧。比如 RpcConfigs 類煌珊,RpcRuntimeContext 上下文等。
然后呢泌豆,創(chuàng)建一個 ProviderConfig定庵,也是個 config,不過多繼承了一個 AbstractInterfaceConfig 抽象類踪危,該類是接口級別的配置蔬浙,而 ServerConfig 是 服務器級別的配置。雖然都繼承了 AbstractIdConfig实撒。
ProviderConfig 包含了接口名稱熄诡,接口指定實現(xiàn)類逼裆,還有服務器的配置。
最后俱病,ProviderConfig 調用 export 發(fā)布服務。
展示給我的 API 很簡單袱结,但內部是如何實現(xiàn)的呢亮隙?
在看源碼之前,我們思考一下:如果我們自己來實現(xiàn)垢夹,怎么弄溢吻?
RPC 框架簡單一點來說,就是使用動態(tài)代理和 Socket果元。
SOFA 使用 Netty 來做網絡通信框架煤裙,我們之前也寫過一個簡單的 Netty RPC,主要是通過 handler 的 channelRead 方法來實現(xiàn)噪漾。
SOFA 是這么操作的嗎硼砰?
一起來看看。
# 源碼分析
上面的示例代碼其實就是 3 個步驟欣硼,創(chuàng)建 ServerConfig题翰,創(chuàng)建 ProviderConfig,調用 export 方法。
先看第一步豹障,還是有點意思的冯事。
雖然是空構造方法,但 ServerConfig 的屬性都是自動初始化的血公,而他的父類 AbstractIdConfig 更有意思了昵仅,父類有 1 個地方值得注意:
static {
RpcRuntimeContext.now();
}
熟悉類加載的同學都知道,這是為了主動加載 RpcRuntimeContext 累魔,看名字是 RPC 運行時上下文摔笤,所謂上下文,大約就是我們人類聊天中的 "老地方" 的意思垦写。
這個上下文會在靜態(tài)塊中加載 Module(基于擴展點實現(xiàn))吕世,注冊 JVM 關閉鉤子(類似 Tomcat)。還有很多配置信息梯投。
然后呢命辖?創(chuàng)建 ProviderConfig 對象。這個類比上面的那個類多繼承了一個 AbstractInterfaceConfig分蓖,接口級別的配置尔艇。比如有些方法我不想發(fā)布啊,比如權重啊么鹤,比如超時啊漓帚,比如具體的實現(xiàn)類啊等等,當然還需要一個 ServerConfig 的屬性(注冊到 Server 中啊喂)午磁。
最后就是發(fā)布了尝抖。export 方法。
ProviderCofing 擁有一個 export 方法迅皇,但并不是直接就在這里發(fā)布的昧辽,因為他是一個 config,不適合在config 里面做這些事情登颓,違背單一職責搅荞。
SOFA 使用了一個 Bootstrap 類來進行操作。和大部分服務器類似框咙,這里就是啟動服務器的地方咕痛。因為這個類會多線程使用,比如并發(fā)的發(fā)布服務喇嘱。而不是一個一個慢慢的發(fā)布服務茉贡。所以他不是單例的,而是和 Config 一起使用的者铜,并緩存在 map 中腔丧。
ProviderBootstrap 目前有 3 個實現(xiàn):Rest放椰,Bolt,Dubbo愉粤。Bolt 是他的默認實現(xiàn)砾医。
export 方法默認有個實現(xiàn)(Dubbo 的話就要重寫了)。主要邏輯是執(zhí)行 doExport 方法衣厘,其中包括延遲加載邏輯如蚜。
而 doExport 方法中,就是 SOFA 發(fā)布服務的邏輯所在了影暴。
樓主將方法的異常處理邏輯去除错邦,整體如下:
private void doExport() {
if (exported) {
return;
}
String key = providerConfig.buildKey();
String appName = providerConfig.getAppName();
// 檢查參數
checkParameters();
// 注意同一interface,同一uniqleId坤检,不同server情況
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 計數器
if (cnt == null) { // 沒有發(fā)布過
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
// 超過最大數量,直接拋出異常
}
// 構造請求調用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注冊中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 將處理器注冊到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
Server server = serverConfig.buildIfAbsent();
// 注冊序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
}
// 注冊到注冊中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
// 記錄一些緩存數據
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
主要邏輯如下:
- 根據 providerConfig 創(chuàng)建一個 key 和 AppName期吓。
- 檢驗同一個服務多次發(fā)布的次數早歇。
- 創(chuàng)建一個 ProviderProxyInvoker, 其中包含了過濾器鏈讨勤,而過濾器鏈的最后一鏈就是對接口實現(xiàn)類的調用箭跳。
- 初始化注冊中心,創(chuàng)建 Server(會有多個Server潭千,因為可能配置了多個協(xié)議)谱姓。
- 將 config 和 invoker 注冊到 Server 中。內部是將其放進了一個 Map 中刨晴。
- 啟動 Server屉来。啟動 Server 其實就是啟動 Netty 服務,并創(chuàng)建一個 RpcHandler狈癞,也就是 Netty 的 Handler茄靠,這個 RpcHandler 內部含有一個數據結構,包含接口級別的 invoker蝶桶。所以慨绳,當請求進入的時候,RpcHandler 的 channelRead 方法會被調用真竖,然后間接的調用 invoker 方法脐雪。
- 成功啟動后,注冊到注冊中心恢共。將數據緩存到 RpcRuntimeContext 的一個 Set 中战秋。
一起來詳細看看。
Invoker 怎么構造的讨韭?很簡單获询,最主要的就是過濾器涨岁。關于過濾器,我們之前已經寫過一篇文章了吉嚣。不再贅述梢薪。
關鍵看看 Server 是如何構造的。
關鍵代碼 serverConfig.buildIfAbsent()
尝哆,類似 HashMap 的 putIfAbsent秉撇。如果不存在就創(chuàng)建。
Server 接口目前有 2 個實現(xiàn)秋泄,bolt 和 rest琐馆。當然,Server 也是基于擴展的恒序,所以瘦麸,不用怕,可以隨便增加實現(xiàn)歧胁。
關鍵代碼在 ServerFactory 的 getServer 中滋饲,其中會獲取擴展點的 Server,然后喊巍,執(zhí)行 Server 的 init 方法屠缭,我們看看默認 bolt 的 init 方法。
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
// 啟動線程池
bizThreadPool = initThreadPool(serverConfig);
boltServerProcessor = new BoltServerProcessor(this);
}
保存了 serverConfig 的引用崭参,啟動了一個業(yè)務線程池呵曹,創(chuàng)建了一個 BoltServerProcessor 對象。
第一:這個線程池會在 Bolt 的 RpcHandler 中被使用何暮,也就是說奄喂,復雜業(yè)務都是在這個線程池執(zhí)行,不會影響 Netty 的 IO 線程海洼。
第二:BoltServerProcessor 非常重要砍聊,他的構造方法包括了當前的 BoltServer,所以他倆是互相依賴的贰军。關鍵點來了:
BoltServerProcessor 實現(xiàn)了 UserProcessor 接口玻蝌,而 Bolt 的 RpcHandler 持有一個 Map<String, UserProcessor<?>>
,所以词疼,當 RpcHandler 被執(zhí)行 channelRead 方法的時候俯树,一定會根據接口名稱找到對應的 UserProcessor,并執(zhí)行他的 handlerRequest 方法贰盗。
那么许饿,RpcHandler 是什么時候創(chuàng)建并放置到 RpcHandler 中的呢?
具體是這樣的:在 server.start()
執(zhí)行的時候舵盈,該方法會初始化 Netty 的 Server陋率,在 SOFA 中球化,叫 RpcServer,將 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中瓦糟。然后筒愚,當 RpcServer 啟動的時候,也就是 start 方法菩浙,會執(zhí)行一個 init 方法巢掺,該方法內部就是設置 Netty 各種屬性的地方,包括 Hander劲蜻,其中有 2 行代碼對我們很重要:
final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
pipeline.addLast("handler", rpcHandler);
創(chuàng)建了一個 RpcHandler陆淀,并添加到 pipeline 中,這個 Handler 的構造參數就是包含所有 BoltServerProcessor 的 Map先嬉。
所以轧苫,總的流程就是:
每個接口都會創(chuàng)建一個 providerConfig 對象,這個對象會創(chuàng)建對應的 invoker 對象(包含過濾器鏈)疫蔓,這兩個對象都會放到 BoltServer 的 invokerMap 中含懊,而 BoltServer 還包含其他對象,比如 BoltServerProcessor(繼承 UserProcessor)鳄袍, RpcServer(依賴 RpcHandler)绢要。當初始化 BoltServerProcessor 的時候吏恭,會傳入 this(BoltServer)拗小,當初始化 RpcServer 的時候,會傳入 BoltServerProcessor 到 RpcServer 的 Map 中樱哼。在 RpcHandler 初始化的時候哀九,又會將 RpcServer 的 Map 傳進自己的內部。完成最終的依賴搅幅。
當請求進入阅束,RpcHandler 調用對應的 UserProcessor 的 handlerRequest 方法,而該方法中茄唐,會調用對應的 invoker息裸,invoker 調用過濾器鏈,知道調用真正的實現(xiàn)類沪编。
而大概的 UML 圖就是下面這樣的:
紅色部分是 RPC 的核心呼盆,包含 Solt 的 Server,實現(xiàn) UserProcessor 接口的 BoltServerProcessor蚁廓,業(yè)務線程池访圃,存儲所有接口實現(xiàn)的 Map。
綠色部分是 Bolt 的接口和類相嵌,只要實現(xiàn)了 UserProcessor 接口腿时,就能將具體實現(xiàn)替換况脆,也既是處理具體數據的邏輯。
最后批糟,看看關鍵類 BoltServerProcessor 格了,他是融合 RPC 和 Bolt 的膠水類。
該類會注冊一個序列化器替代 Bolt 默認的跃赚。handleRequest 方法是這個類的核心方法笆搓。有很多邏輯,主要看這里:
// 查找服務
Invoker invoker = boltServer.findInvoker(serviceName);
// 真正調用
response = doInvoke(serviceName, invoker, request);
/**
* 找到服務端Invoker
*
* @param serviceName 服務名
* @return Invoker對象
*/
public Invoker findInvoker(String serviceName) {
return invokerMap.get(serviceName);
}
根據服務名稱纬傲,從 Map 中找到服務满败,然后調用 invoker 的 invoker 方法。
再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的調用鏈叹括,使用 IDEA 的 Hierarchy 功能算墨,查看該方法,最后停留在 ProcessTast 中汁雷,一個 Runnable.
根據經驗净嘀,這個類肯定是被放到線程池了。什么時候放的呢侠讯?看看他的構造方法的 Hierarchy挖藏。
從圖中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最終會調用 ProcessTask 的 構造方法厢漩。
那么 BoltServer 的用戶線程池什么時候使用呢膜眠?還是使用 IDEA 的 Hierarchy 功能。
其實也是在這個過程中溜嗜,當用戶沒有設置線程池宵膨,則使用系統(tǒng)線程池。
總結
好了炸宵,關于 SOFA 的服務發(fā)布和服務的接收過程辟躏,就介紹完了,可以說土全,整個框架還是非常輕量級的捎琐。基本操作就是:內部通過在 Netty的 Handler 中保存一個存儲服務實現(xiàn)的 Map 完成遠程調用裹匙。
其實和我們之前用 Netty 寫的小 demo 類似瑞凑。