前言
經(jīng)過前面這么長的篇幅,rpc通信的所有的準備工作dubbo都已經(jīng)在啟動的時候準備就緒算色。
服務暴露方:
- 將自己注冊到注冊中心
- 開啟了netty服務端就用接受請求
服務消費方:
- 將自己注冊到注冊中心
- 獲取到了某個服務的服務列表
- 持有了某個服務的某臺主機的invoker對象,并針對其url建立了netty服務端,并發(fā)起了連接倚舀。
rpc通信流程
- 服務消費方 向服務暴露方 發(fā)起tcp請求
- 服務降級
- 集群容錯
- 負載均衡
- 過濾器
- netty發(fā)請求
- 服務提供方接到請求后,調用指定的服務方法,并響應數(shù)據(jù)
- nettyHandler響應channelRead
- 調用本地方法
- 回寫數(shù)據(jù)
- 服務消費方接到服務提供的響應數(shù)據(jù)后進行業(yè)務處理
- 接受響應結果(Response對象不回寫數(shù)據(jù))
- 喚醒阻塞線程
- 最終得到結果
測試代碼:rpc調用另外一個服務的getUsername方法,并傳一個string參數(shù)
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(AnnoBean.class);
OrderServiceImpl orderService = (OrderServiceImpl) applicationContext.getBean("orderServiceImpl");
orderService.test();
System.in.read();
}
OrderService
@Service
public class OrderServiceImpl implements OrderService {
@Reference(mock = "true")
private UserService userService;
@Override
public void test() {
System.out.println(userService.getUsername("liuben"));
}
}
服務暴露方UserService
@Service
public class UserServiceImpl implements UserService {
@Override
public String getUsername(String username) {
return "i am "+username;
}
}
1. 服務消費方發(fā)起tcp請求
@Reference修飾的屬性會被注入一個代理對象
代理對象是用jdk動態(tài)代理生成的,肯定會調到InvocationHandler里,去執(zhí)行bean的對應方法
創(chuàng)建的ReferenceBeanInvocationHandler
這個bean是dubbo創(chuàng)建的invoker對象,先是走到invoker對象的包裝代理實例AbstractProxyInvoker中,由javassist代理invoker,調用他的invokeMethod方法,并把參數(shù)列表類型,遠程接口,以及對應方法 作為參數(shù)傳進去
服務降級
由于invoker對象先是經(jīng)過集群容錯ClusterInvoker的包裝,會先調到MockClusterInvoker的invoker方法:根據(jù)的你的配置決定是否降級
在這里之前會把參數(shù)列表類型,遠程接口,以及對應方法包裝成一個invocation對象,可以理解成是一次調用信息的對象
集群容錯 & 負載均衡
MockClusterInvoker對象里包裝了其他集群容錯Cluster實例,接下來會調用集群容錯Cluster實例的invoker方法.
默認的集群容錯策略是重試,對應FailoverClusterInvoker實例
會先調到父類的AbstractClusterInvoker的invoker方法,獲取初始的服務列表和負載均衡策略,作為參數(shù)傳入子類
第一次嘗試調用,利用負載均衡算法,選擇出具體的invoker對象,調他的invoker
過濾器鏈
這里獲取到的invoker對象仍然是被過濾器鏈包裝著的,
通過一個鏈表進行傳遞,鏈表中持有下一個過濾器的引用,調完自己的invoker之后,傳給下一個過濾器調
可以看到是層層包裝
最終調到dubboInvoker
netty發(fā)請求
dubboInvoker就會涉及到真正的rpc通信了
先是調到父類的invoke方法,往invocation里塞了一個attachment列表,是否遠程接口名
再進入dubboInvoker的doInvoke方法
先是從之前已經(jīng)創(chuàng)建好的netty連接客戶端拿一個連接出來,判斷你是異步,還是單工
isOneway 單工
單工就是直接返回默認的結果,不需要獲取返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
//@Reference(chefalse,methods = {@Method(= "asynctoDo",async = tru異步調用
}
isAsync 異步
異步的話,其實就是先返回結果,用一個Feature阻塞住,不停的去拿結果,拿到結果再響應邏輯
else if (isAsync) {
ResponseFuture futurecurrentClient.request(intimeout);
Context.getContext().seture(nFutureAdapter<Object>(futur;
return new RpcResult();
}
Feature對象.get
雙工(主要看這個)
雙工的話是發(fā)了請求,等響應結果了再返回對應的結果
這里也是用的Future.get實現(xiàn)的,因為netty nio通信是異步的,這里用Future阻塞住,不停的去獲取結果(一個變量),服務端響應了結果,然后去改這個變量的值,有了值才會返回,jdk里的Feature類也是這種做法。相當于 由異步轉同步的效果。
這里的調用鏈路比較長,是初始化netty客戶端創(chuàng)建的,可以看上篇.上圖
HeaderExchangeClient.request
HeaderExchangeChannel.request,會把Invocation對象包裝成一個Request對象
NettyClient.send(req)
先走父類AbstractPeer.send
再走父類AbstractClient.send(message,sent)
這里會判斷你連接是否還有效,不行的話給你重新連接一下,獲取一個新的channel對象
如果連接是有效的,就直接拿netty的channel對象
,鉤到子類NettyClient.getChannel()
NettyChannel.getOrAddChannel(c, getUrl(), this)這行代碼,會根據(jù)netty的channel對象與url與nettyClient對象包裝成dubbo的NettyChannel對象,并建立緩存:netty的channel對象 ——> dubbo的NettyChannel對象
最后返回的就是一個包裝了netty的channel對象與url與nettyClient對象的NettyChannel對象,調他的send方法
這個方法調的就是netty的channel對象的writeAndFlush發(fā)請求了。
還有個sent判斷默認是false,是用來判斷是否需要等待發(fā)送的結果的,默認不等待
為true的話就會用netty返回的Feature的await阻塞住當前線程,等待發(fā)送結果
最后看一下發(fā)送的消息體是啥樣的
netty handler鏈的調用
根據(jù)netty的api,發(fā)消息之前會先調到pipeline注冊的處理鏈中實現(xiàn)outBoundHandler接口的寫方法
之前我們在啟動netty服務端的時候,往netty的pipeline中注冊了一個nettyClinetHanler,那么就會調到這個類實現(xiàn)ChannelDuplexHandler接口的write方法
nettyClinetHanler.write()
發(fā)送的時候不發(fā)生異常的話就會調到nettyClient的send方法(注冊pipeline的時候new nettyClinetHanler傳進來的)
之后就會走這個鏈路
nettyClient.send()啥也沒干
MultiMessageHandler.send啥也沒干
HeartbeatHandler.send 這里面會有發(fā)心跳的邏輯,設置了寫的時間
AllChannelHandler.send 啥也沒干
DecodeHandler.send 啥也沒干
HeaderExchangeHandler.send 感覺也是沒干啥
最后進到DubboPtotocol里的那個匿名的RequestHandler對象里面,也是調的父類的方法,啥也沒干
走到這里,服務消費者也已經(jīng)把調服務的請求發(fā)出去了洛姑。
最終返回了一個Feature對象
調他的get方法阻塞住,等待對端響應結果。
2.服務提供方響應請求
服務提供方啟動netty服務端創(chuàng)建的handler鏈
也是一個requestHandler,和服務消費方的代碼是公用的
getExchangerHandler也是HeaderExchanger對象
HeaderExchanger這次調的是bind方法
最里層包成這樣了 DecodeHandler( HeaderExchangeHandler (requestHanle) )
又傳到了Transporters.bind里面,又給套一個ChannelHandlerDispatcher
現(xiàn)在是ChannelHandlerDispatcher(DecodeHandler( HeaderExchangeHandler (requestHanle) ))
再獲取NettyTransporter實例皮服,調把前面的傳到nettyServer里去
NettyServer(ChannelHandlerDispatcher(DecodeHandler( HeaderExchangeHandler (requestHanler) )))
進NettyServer構造方法,看看里面做了什么
乍一看,又開始包了,和服務消費方包的代碼一模一樣
最終是這樣
HeaderExchangeServer(NettyServer(MultiMessageHandler( HeartbeatHandler(AllChannelHandler(ChannelHandlerDispatcher(DecodeHandler( HeaderExchangeHandler (requestHanler) ))))))
可以看到和服務消費方nettyClient的鏈差不多楞艾, nettyClient換成了NettyServer而已
NettyServer的構造方法回去開啟netty服務端
先調父類構造
調回子類的doOpen
把NettyServer對象包到了NettyServerHandler中,注冊到netty的pipeline的handler鏈中
最后調 bootstrap.bind(getBindAddress())
接受請求
接受請求的話,肯定是先從netty的pipeline的handler鏈開始的
注冊到netty的pipeline的是NettyServerHandler,這個類肯定實現(xiàn)了netty的hanlder之類的接口,看起來和服務消費者的nettyClientHandler一樣的繼承關系,可以響應channel的讀寫操作
從netty調過來的NettyServerHandler.channelRead
又是一個緩存,和nettyClientHandler那邊如出一轍,調的方法也一樣
NettyServer.receive
MultiMessageHandler.receive
針對多消息體,返回多次結果
HeartbeatHandler.received
設置這次心跳讀的時間龄广,也不是專門響應心跳硫眯,而是響應業(yè)務請求,走下一個Handler
AllChannelHandler.received
這里是服務隔離思想,用的是線程池隔離,從線程池拿一個線程出來響應請求
run方法
DecodeHandler.received
這里會對消息體進行解碼,把消息體弄成一個Request對象
HeaderExchangeHandler.received
走handleRequest會調用接口的具體方法
handleRequest()會調到DubboProtocol的那個RequestHandler匿名對象里
把message(request對象)弄成Invocation,根據(jù)Invocation對象獲取invoker對象
然后調他的invoker方法
同樣的,服務提供方調本地服務,也是要經(jīng)過過濾器鏈的
這個鏈更長了
最后調到代理對象
由代理對象去調本地接口實例
最后層層返回到HeaderExchangeHandler.received方法,由于是雙工通信需要返回給對端結果,所以會把結果以rpc通信的方式返回出去
注意這里的對象就是Response對象了
里頭有個result
最后由用netty的channel對象,回寫數(shù)據(jù),這里走的就是和服務消費方發(fā)請求一樣的邏輯了择同。
3.服務消費方接受響應結果
當服務提供方回寫 響應之后之后,服務消費方就會收到響應結果的請求,和服務提供方接受請求是一樣的邏輯
不同的是,這次接受的是個Response對象,不用回寫數(shù)據(jù)給服務提供方
這個時候需要去喚醒之前等待結果的Feature里的線程了
先根據(jù)通信的id從緩存中移除并推出來對應的DefaultFuture對象
去喚醒DefaultFuture對象
對DefaultFuture中的response對象進行賦值,并喚醒
所以DefaultFuture的get方法會獲取到返回值,并最終返回出去
最終就會成功獲取到本地rpc調用的返回值,并打印
這就是一次完整的tcp調用的全部源碼的流程了,end......
圖解 :