Hadoop RPC
Hadoop作為分布式存儲(chǔ)系統(tǒng),為了實(shí)現(xiàn)各節(jié)點(diǎn)間的通信和交互,所以需要實(shí)現(xiàn)一套這樣的機(jī)制.RPC(Remote Procedure CallProtocol, 遠(yuǎn)程過(guò)程調(diào)用協(xié)議)允許本地程序像調(diào)用本地方法一樣調(diào)用調(diào)用遠(yuǎn)程機(jī)器上應(yīng)用程序提供的服務(wù).所以Hadoop實(shí)現(xiàn)了自己的RPC協(xié)議.
Hadoop RPC協(xié)議基于IPC(Inter-Process Communcations,進(jìn)程間通信)實(shí)現(xiàn)了一套高效的輕量級(jí)RPC,這套框架使用了JAVANIO,動(dòng)態(tài)代理,Protobuf
RPC架構(gòu)圖:
客戶端代理程序?qū)⒄?qǐng)求程序的RPC調(diào)用序列化,并調(diào)用Client.call()方法將請(qǐng)求發(fā)送給遠(yuǎn)程服務(wù)器,這些實(shí)現(xiàn)對(duì)客戶端調(diào)用程序是完全透明的亭畜。
Hadoop2.x默認(rèn)使用protobuf作為序列化工具盐股,當(dāng)然Hadoop框架也支持其他的序列化框架侄非。
Hadoop定義了RpcEngine接口抽象使用不同的序列化框架的RPC引擎睡扬,RPCEngine接口包括兩個(gè)重要的方法谬哀。
getProxy():采用java動(dòng)態(tài)代理機(jī)制,客戶端在代理對(duì)象上的調(diào)用會(huì)由RpcInvocationHandler對(duì)象處理绽族。RpcInvocationHandler會(huì)將請(qǐng)求序列化姨涡,并調(diào)用Client.call()方法將請(qǐng)求發(fā)送到遠(yuǎn)程服務(wù)器。當(dāng)服務(wù)器返回響應(yīng)信息后吧慢,RpcInvocationHandler會(huì)將響應(yīng)信息反序列化并返回給調(diào)用程序涛漂。
getServer():用于產(chǎn)生一個(gè)RPC Server對(duì)象,服務(wù)器會(huì)啟動(dòng)這個(gè)Server對(duì)象監(jiān)聽(tīng)從客戶端發(fā)來(lái)的請(qǐng)求.成功從網(wǎng)絡(luò)接收數(shù)據(jù)后,Server對(duì)象會(huì)調(diào)用RpcInvoker對(duì)象處理這個(gè)請(qǐng)求.
RpcEngine目前有兩個(gè)子類:
WritableRpcEngine : Hadoop自帶的Writable作為序列化工具的RPC引擎.
ProtobufRpcEngine :使用protobuf作為序列化工具的RPC引擎.
Java動(dòng)態(tài)代理:
//Handler
public class MyInvocationHandler implements InvocationHandler{
//被代理的對(duì)象
private Object target = null;
//通過(guò)構(gòu)造方法傳遞一個(gè)對(duì)象
public MyInvocationHandler(Object _obj){
this.target = _obj
}
//代理方法
public?invoke(Object proxy, Method method, Object[] args) throws Throwable{
//Aop before切面
//執(zhí)行被代理方法
Object obj = method.invoke(this.target,args);
//?Aop afte切面
}?
}
//動(dòng)態(tài)代理類
public class DynamicProxy<T>{
public?static <T>?T newProxyInstance(ClassLoader loader,Class<?>[] interfaces,
InvocationHandler?h){
return (T)Proxy.newProxyIntstance(loader,interfaces,h);
}
}
//具體業(yè)務(wù)的動(dòng)態(tài)代理類
public class TargetDynamicProxy extends DynamicProxy {
public?static <T>?T newProxyInstance(TargetInterface target){
ClassLoader loader = target.getClass().getLoader();
Class<?>[] interfaces = target.getCalss.getInterfaces();
InvocationHandler?Handler?= new MyInvocationHandler(target);
return newProxyIntstance(loader,interfaces, Handler);
}
}
//要代理的接口
public?interface TargetInterface{
?public void doSomething(String str);
}
//接口實(shí)現(xiàn)類
public?class MyTarget implements TargetInterface{
???public void doSomething(){
?????System.out.println(“do something!”);
}
}
//測(cè)試動(dòng)態(tài)代理
public class Test{
public static void main(String[] args){
//自定義一個(gè)目標(biāo)對(duì)象
TargetInterface target = new MyTarget();
//定義目標(biāo)對(duì)象的代理對(duì)象
TargetInterface?proxy = TargetDynamicProxy.?newProxyInstance(target);
//代理目標(biāo)對(duì)象執(zhí)行方法
proxy.doSomething();
}
}
Rename RPC調(diào)用流程圖:
Client發(fā)送請(qǐng)求和接收響應(yīng)流程:
Client.call()方法將RPC請(qǐng)求封裝成一個(gè)CALL對(duì)象,CALL對(duì)象中保存了RPC調(diào)用的完成標(biāo)志检诗、返回值信息以及異常信息匈仗;隨后Client.call()方法會(huì)創(chuàng)建一個(gè)Connection對(duì)象,Connection對(duì)象用戶管理Client與Server的soket鏈接岁诉。
用ConnectionId做為key锚沸,將新建的Connection對(duì)象放入Client.Connections字段中保存(對(duì)于Connection對(duì)象,由于涉及了與Server的Socket連接涕癣,會(huì)比較消耗資源哗蜈,所以Client類使用HashTable對(duì)象connections保存哪些沒(méi)有過(guò)期的Connection,如果可以復(fù)用則復(fù)用這些Connection對(duì)象);以callId為key坠韩,將構(gòu)造的call對(duì)象放入Connection.calls字段中保存距潘。
Client.call()方法調(diào)用Connection.setupIOstreams()方法建立與Server的Socket連接。setupIOstreams()方法還會(huì)啟動(dòng)Connection線程只搁,Connection線程會(huì)監(jiān)聽(tīng)Socket并讀取Server發(fā)回的響應(yīng)信息音比。
Client.call()方法調(diào)用Connection.sendRpcRequest()發(fā)送RPC請(qǐng)求到Server。
Client.call()方法調(diào)用Call.wait()在Call對(duì)象上等待氢惋,等待Server發(fā)回響應(yīng)信息洞翩。
Connection線程收到Server發(fā)回的響應(yīng)信息,根據(jù)響應(yīng)消息中攜帶的信息找到對(duì)應(yīng)的Call對(duì)象焰望,然后設(shè)置Call對(duì)象的返回字段骚亿,并調(diào)用call.notify()喚醒調(diào)用Client.call()方法的線程讀取call()對(duì)應(yīng)的返回值。
Server為了提高性能熊赖,Server類采用了很多技術(shù)來(lái)提高并發(fā)能力来屠,包括線程池、JavaNIO提供的Reactor震鹉,其中Reactor模式貫穿整個(gè)Server的設(shè)計(jì)俱笛。
Reactor模式是一種廣泛應(yīng)用在服務(wù)器端的設(shè)計(jì)模式,也是一種事件驅(qū)動(dòng)的設(shè)計(jì)模式传趾。Reactor模式的處理流程是:應(yīng)用程序向中間人注冊(cè)IO事件迎膜,當(dāng)中間人監(jiān)聽(tīng)到這個(gè)IO事件的發(fā)生后,會(huì)通知并喚醒應(yīng)用程序處理這個(gè)事件墨缘。這里的中間人其實(shí)是一個(gè)不斷等待和循環(huán)的線程星虹,它接受所有請(qǐng)用程序的注冊(cè)零抬,并檢查應(yīng)用程序注冊(cè)的io事件是否就緒,如果就緒了則通知應(yīng)用程序進(jìn)行處理宽涌。
Server類的設(shè)計(jì)是典型的多線程Reactor的網(wǎng)絡(luò)服務(wù)器的結(jié)構(gòu)平夜。Server定義了如下幾個(gè)內(nèi)部類,我們可以對(duì)Reactor結(jié)構(gòu)中的模塊來(lái)理解卸亮。
Listener:類似于Reactor 模型中的mainReactor忽妒。Listener對(duì)象中存在一個(gè)Selector對(duì)象acceptSelector,負(fù)責(zé)監(jiān)聽(tīng)來(lái)自客戶端的Socket連接請(qǐng)求兼贸。當(dāng)acceptSelector監(jiān)聽(tīng)到連接請(qǐng)求后段直,Listener對(duì)象會(huì)初始化這個(gè)連接,之后采用輪詢的方式從readers線程池中選擇一個(gè)reader線程處理RPC請(qǐng)求的讀操作溶诞。
Reader:用于讀取RPC請(qǐng)求鸯檬。Reader線程類中存在一個(gè)Selector對(duì)象readSelector,這個(gè)對(duì)象用戶監(jiān)聽(tīng)網(wǎng)絡(luò)中是否有可以讀取的RPC請(qǐng)求螺垢。當(dāng)readerSelector監(jiān)聽(tīng)到可讀的RPC請(qǐng)求后喧务,會(huì)喚醒Reader線程讀取這個(gè)請(qǐng)求,并將請(qǐng)求封裝到一個(gè)Call對(duì)象中枉圃,然后將這個(gè)Call對(duì)象放入共享隊(duì)列CallQueue中功茴。
Handler:用戶處理RPC請(qǐng)求并發(fā)回響應(yīng)。Handler對(duì)象從CallQueue中不停地取出RPC請(qǐng)求孽亲,然后執(zhí)行RPC請(qǐng)求對(duì)應(yīng)的本地函數(shù)坎穿,最后封裝響應(yīng)并將響應(yīng)發(fā)回客戶端。為了能夠并發(fā)的處理RPC請(qǐng)求返劲,Server中會(huì)存在多個(gè)Handler對(duì)象玲昧。
Responder:用戶向客戶端發(fā)送RPC響應(yīng)。Handler中不是已經(jīng)發(fā)送RPC響應(yīng)了嗎篮绿?為什么還要再實(shí)現(xiàn)Responder類酌呆?這是出于對(duì)響應(yīng)很大時(shí)或網(wǎng)絡(luò)條件不好時(shí),Handler線程很難將完整的響應(yīng)發(fā)回客戶端搔耕,這就會(huì)造成Handler線程阻塞,從而影響RPC請(qǐng)求的處理效率痰娱。出現(xiàn)這種情況時(shí)弃榨,Responder內(nèi)部的respondSelector上注冊(cè)寫(xiě)響應(yīng)事件,這里的respondSelector會(huì)監(jiān)聽(tīng)網(wǎng)絡(luò)環(huán)境具備寫(xiě)響應(yīng)條件時(shí)梨睁,會(huì)通知Responder將剩余響應(yīng)發(fā)回客戶端鲸睛。
服務(wù)端處理客戶請(qǐng)求流程: