實(shí)現(xiàn)一個(gè)簡(jiǎn)單的RPC系統(tǒng)
provider:
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
public class Provider {
public static void main(String[] args) throws Exception {
//新建一個(gè)server socket 端口號(hào)為1234经备,普通的RPC調(diào)用可以通過socket來實(shí)現(xiàn)
ServerSocket server=new ServerSocket(1234);
//監(jiān)聽socket連接并響應(yīng)
while(true)
{
Socket socket=server.accept();
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
//獲得服務(wù)端要調(diào)用的類名
String classname=input.readUTF();
//獲得服務(wù)端要調(diào)用的方法名稱
String methodName=input.readUTF();
//獲得服務(wù)端要調(diào)用方法的參數(shù)類型
Class<?>[] parameterTypes=(Class<?>[]) input.readObject();
//獲得服務(wù)端要調(diào)用方法的每一個(gè)參數(shù)的值
Object[] arguments=(Object[]) input.readObject();
//創(chuàng)建類
Class serviceClass=Class.forName(classname);
//創(chuàng)建對(duì)象
Object object = serviceClass.newInstance();
//獲得該類的對(duì)應(yīng)的方法
Method method=serviceClass.getMethod(methodName, parameterTypes);
//該對(duì)象調(diào)用指定方法
Object result=method.invoke(object, arguments);
ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
socket.close();
}
}
}
consumer:
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
public class Consumer {
public void doBusiness() throws Exception{
//設(shè)置調(diào)用類的路徑和要調(diào)用的方法
String classname="com.dfire.demo.rpc.RPCServiceImpl";
String method="sayHello";
Class[] argumentsType={String.class};
//獲取本機(jī)計(jì)算機(jī)名稱
InetAddress inetAddress = InetAddress.getLocalHost();
String hostName = inetAddress.getHostName().toString();
Object[] arguments={hostName};
//與10.1.134.145主機(jī)建立socket連接進(jìn)行通訊
Socket socket = new Socket("127.0.0.1",1234);
ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
//輸入數(shù)據(jù)
output.writeUTF(classname);
output.writeUTF(method);
output.writeObject(argumentsType);
output.writeObject(arguments);
//得到返回?cái)?shù)據(jù)
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
Object result=input.readObject();
System.out.println(Thread.currentThread().getName()+" "+result);
socket.close();
}
public static void main(String[] args) {
try {
new Consumer().doBusiness();
}catch (Exception e){
e.printStackTrace();
}
}
}
輸入命令查看:
lsof -i tcp:1234
上圖顯示線程16419在監(jiān)聽TCP的通訊拭抬,做為RPC的server端
下面我們看下我們的dubbo的通訊監(jiān)聽情況:
先看服務(wù)提供者:
lsof -i tcp:20881
下面為服務(wù)消費(fèi)者:
lsof -i tcp:20880
最后邊的許多連接表示10.1.134.145連接了多個(gè)機(jī)器的服務(wù)【服務(wù)提供者端口為20880】
上面通過兩個(gè)簡(jiǎn)單的類Provider和Consumer實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的RPC系統(tǒng),大家應(yīng)該就能清楚認(rèn)識(shí)到RPC的整個(gè)架構(gòu)和通訊是比較簡(jiǎn)單的侵蒙,當(dāng)然上面的這個(gè)RPC系統(tǒng)有很多缺點(diǎn):
- BIO造虎,不能支持高并發(fā)。
- 不支持負(fù)載均衡
- 不支持容錯(cuò)機(jī)制
- 不支持SPI擴(kuò)展
- 不支持各種序列化
- 不支持自動(dòng)注冊(cè)和發(fā)現(xiàn)
當(dāng)然還有其他很多缺點(diǎn)蘑志,業(yè)界有很多RPC的框架累奈,例如:dubbo贬派,我們下面來講一下講急但。
下面是從dubbo官網(wǎng)copy下面的一段話,對(duì)dubbo的簡(jiǎn)單描述:
dubbo的拓?fù)鋱D
我們上面自己實(shí)現(xiàn)的RPC系統(tǒng)就缺少Registry和Monitor這兩個(gè)模塊搞乏,當(dāng)然其他模塊也很弱
整體設(shè)計(jì)
- config 配置層:對(duì)外配置接口波桩,以 ServiceConfig, ReferenceConfig 為中心,可以直接初始化配置類请敦,也可以通過 spring 解析配置生成配置類
- proxy 服務(wù)代理層:服務(wù)接口透明代理镐躲,生成服務(wù)的客戶端 Stub 和服務(wù)器端 Skeleton, 以 ServiceProxy 為中心,擴(kuò)展接口為 ProxyFactory
- registry 注冊(cè)中心層:封裝服務(wù)地址的注冊(cè)與發(fā)現(xiàn)侍筛,以服務(wù) URL 為中心萤皂,擴(kuò)展接口為 RegistryFactory, Registry, RegistryService
- cluster 路由層:封裝多個(gè)提供者的路由及負(fù)載均衡,并橋接注冊(cè)中心匣椰,以 Invoker 為中心裆熙,擴(kuò)展接口 為 Cluster, Directory, Router, LoadBalance
- monitor 監(jiān)控層:RPC 調(diào)用次數(shù)和調(diào)用時(shí)間監(jiān)控,以 Statistics 為中心禽笑,擴(kuò)展接口為 MonitorFactory, Monitor, MonitorServiceprotocol 遠(yuǎn)程調(diào)用層:封裝 RPC 調(diào)用入录,以 Invocation, Result 為中心,擴(kuò)展接口為 Protocol, Invoker, Exporter
- exchange 信息交換層:封裝請(qǐng)求響應(yīng)模式佳镜,同步轉(zhuǎn)異步僚稿,以 Request, Response 為中心,擴(kuò)展接口為Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口蟀伸,以 Message 為中心蚀同,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codecserialize 數(shù)據(jù)序列化層:可復(fù)用的一些工具缅刽,擴(kuò)展接口為 Serialization, ObjectInput, ObjectOutput, ThreadPool
調(diào)用鏈:
服務(wù)提供者暴露一個(gè)服務(wù)的過程:
暴露服務(wù)時(shí)序圖
服務(wù)消費(fèi)者消費(fèi)一個(gè)服務(wù)的過程
引用服務(wù)時(shí)序
服務(wù)提供 Invoker 和服務(wù)消費(fèi) Invoker
核心領(lǐng)域模型(Microkernel + Plugin 模式)
Protocol 是服務(wù)域,它是 Invoker 暴露和引用的主功能入口蠢络,它負(fù)責(zé) Invoker 的生命周期管理拷恨。
functionalities是實(shí)體域,它是 Dubbo 的核心模型谢肾,其它模型都向它靠擾腕侄,或轉(zhuǎn)換成它,它代表一個(gè)可
執(zhí)行體芦疏,可向它發(fā)起 invoke 調(diào)用冕杠,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)酸茴,也可
能一個(gè)集群實(shí)現(xiàn)分预。Invocation 是會(huì)話域,它持有調(diào)用過程中的變量薪捍,比如方法名笼痹,參數(shù)等。
dubbo:// 數(shù)據(jù)傳輸
Dubbo 缺省協(xié)議采用單一長(zhǎng)連接和 NIO 異步通訊酪穿,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用凳干,以及服務(wù)消費(fèi)者機(jī)器數(shù)遠(yuǎn)大于服務(wù)提供者機(jī)器數(shù)的情況。反之被济,Dubbo 缺省協(xié)議不適合傳送大數(shù)據(jù)量的服務(wù)救赐,比如傳文件,傳視頻等只磷,除非請(qǐng)求量很低经磅。
+ Transporter: mina, netty, grizzy
+ Serialization: dubbo, hessian2, java, json
+ Dispatcher: all, direct, message, execution, connection
+ ThreadPool: fixed, cached, limited
Dispatcher
對(duì)于Dubbo集群中的Provider角色,有IO線程池和業(yè)務(wù)處理線程池(默認(rèn)200)兩個(gè)線程池钮追,所以當(dāng)業(yè)務(wù)的并發(fā)比較高预厌,或者某些業(yè)務(wù)處理變慢,業(yè)務(wù)線程池就很容易被“打滿”元媚,拋出“RejectedExecutionException: Thread pool is EXHAUSTED! ”異常轧叽。
- all 所有消息都派發(fā)到線程池,包括請(qǐng)求惠毁,響應(yīng)犹芹,連接事件,斷開事件鞠绰,心跳等腰埂。
- direct 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行蜈膨。
- message 只有請(qǐng)求響應(yīng)消息派發(fā)到線程池屿笼,其它連接斷開事件牺荠,心跳等消息,直接在IO線程上執(zhí)行驴一。
- execution 只請(qǐng)求消息派發(fā)到線程池休雌,不含響應(yīng),響應(yīng)和其它連接斷開事件肝断,心跳等消息杈曲,直接在 IO 線程上執(zhí)行。
- connection 在 IO 線程上胸懈,將連接斷開事件放入隊(duì)列担扑,有序逐個(gè)執(zhí)行,其它消息派發(fā)到線程池趣钱。
<dubbo:provider version="1.0" delay="-5000" timeout="5000" dispatcher="all" threads="400" loadbalance="leastactive" actives="400" />
ThreadPool
fixed 固定大小線程池涌献,啟動(dòng)時(shí)建立線程,不關(guān)閉首有,一直持有燕垃。(缺省)。
cached 緩存線程池井联,空閑一分鐘自動(dòng)刪除卜壕,需要時(shí)重建。
limited 可伸縮線程池低矮,但池中的線程數(shù)只會(huì)增長(zhǎng)不會(huì)收縮印叁。只增長(zhǎng)不收縮的目的是為了避免收縮時(shí)突然來了大流量引起的性能問題。
eager 優(yōu)先創(chuàng)建Worker線程池军掂。在任務(wù)數(shù)量大于corePoolSize但是小于maximumPoolSize時(shí),優(yōu)先創(chuàng)建Worker來處理任務(wù)。當(dāng)任務(wù)數(shù)量大于maximumPoolSize時(shí)昨悼,將任務(wù)放入阻塞隊(duì)列中蝗锥。阻塞隊(duì)列充滿時(shí)拋出RejectedExecutionException。(相比于cached:cached在任務(wù)數(shù)量超
過maximumPoolSize時(shí)直接拋出異常而不是將任務(wù)放入阻塞隊(duì)列)率触。
zookeeper 注冊(cè)中心
Zookeeper 是 Apacahe Hadoop 的子項(xiàng)目终议,是一個(gè)樹型的目錄服務(wù),支持變更推送葱蝗,適合作為 Dubbo 服務(wù)的注冊(cè)中心穴张,工業(yè)強(qiáng)度較高,可用于生產(chǎn)環(huán)境两曼,并推薦使用皂甘。
流程說明:
- 服務(wù)提供者啟動(dòng)時(shí): 向 /dubbo/com.foo.BarService/providers 目錄下寫入自己的 URL 地址。
- 服務(wù)消費(fèi)者啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService/providers 目錄下的提供者 URL 地址悼凑。并向 /dubbo/com.foo.BarService/consumers 目錄下寫入自己的 URL 地址偿枕。
- 監(jiān)控中心啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService 目錄下的所有提供者和消費(fèi)者 URL 地址璧瞬。
Fault Tolerance 容錯(cuò)
- Failover – FailoverClusterInvoker
失敗自動(dòng)切換,嘗試其他服務(wù)器渐夸。 (默認(rèn)的方案) - Failfast – FailfastClusterInvoker
失敗立即拋出異常,嗤锉。通常用于非冪等性的寫操作,比如新增記錄墓塌。 - Failsafe – FailsafeClusterInvoker
失敗忽略異常瘟忱。通常用于寫入審計(jì)日志等操作。 - Failback – FailbackClusterInvoker
失敗自動(dòng)恢復(fù)苫幢,記錄日志并定時(shí)重試酷誓。 通常用于消息通知操作。 - Forking – ForkingClusterInvoker
并行調(diào)用多個(gè)服務(wù)态坦,一個(gè)成功立即返回盐数。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源伞梯∶登猓可通過 forks=“2” 來設(shè)置最大并行數(shù)。 - Broadcast – BroadcastClusterInvoker
廣播調(diào)用所有提供者谜诫,任意一個(gè)報(bào)錯(cuò)則報(bào)錯(cuò)
Load Balancing 負(fù)載均衡
random (隨機(jī)漾峡,按權(quán)重設(shè)置隨機(jī)概率) 默認(rèn)的策略
round-robin (輪循,按公約后的權(quán)重設(shè)置輪循比率)
存在慢的提供者累積請(qǐng)求問題喻旷,比如:第二臺(tái)機(jī)器很慢生逸,但沒掛,
當(dāng)請(qǐng)求調(diào)到第二臺(tái)時(shí)就卡在那且预,久而久之槽袄,所有請(qǐng)求都卡在調(diào)到第二臺(tái)上。least-active (最少活躍調(diào)用數(shù)锋谐,相同活躍數(shù)的隨機(jī)遍尺,活躍數(shù)指調(diào)用前后計(jì)數(shù)差。) 使慢的提供者收到更少請(qǐng)求涮拗,因?yàn)樵铰奶峁┱叩恼{(diào)用前后計(jì)數(shù)差會(huì)越大乾戏,不支持權(quán)重。 【線上用得比較多三热,注意dubbo低版本有bug】
consistent-hash(一致性Hash鼓择,相同參數(shù)的請(qǐng)求總是發(fā)到同一提供者)
當(dāng)某一臺(tái)提供者掛時(shí)就漾,原本發(fā)往該提供者的請(qǐng)求,基于虛擬節(jié)點(diǎn)从藤,平攤到其它提供者锁蠕,不會(huì)引起劇烈變動(dòng),會(huì)導(dǎo)致壓力分?jǐn)偛痪?/p>支持?jǐn)U展
需要實(shí)現(xiàn)AbstractLoadBalance接口
SPI擴(kuò)展實(shí)現(xiàn)
一懊蒸、調(diào)用攔截?cái)U(kuò)展
擴(kuò)展說明:服務(wù)提供方和服務(wù)消費(fèi)方調(diào)用過程攔截荣倾,Dubbo 本身的大多功能均基于此擴(kuò)展點(diǎn)實(shí)現(xiàn)骑丸,每次遠(yuǎn)程方法執(zhí)行,該攔截都會(huì)被執(zhí)行通危,請(qǐng)注意對(duì)性能的影響铸豁。dubbo層的限流和熔斷可以用這個(gè)filter擴(kuò)展來實(shí)現(xiàn)。
約定:
- 用戶自定義 filter 默認(rèn)在內(nèi)置 filter 之后菊碟。
- 特殊值 default,表示缺省擴(kuò)展點(diǎn)插入的位置逆害。比如:filter=“xxx,default,yyy”,表示 xxx 在缺省 filter 之前相艇,yyy 在缺省 filter 之后纯陨。
- 特殊符號(hào) -,表示剔除翼抠。比如:filter=“-foo1”,剔除添加缺省擴(kuò)展點(diǎn) foo1臭墨。比如:filter=“-default”膘盖,剔除添加所有缺省擴(kuò)展點(diǎn)尤误。
- provider 和 service 同時(shí)配置的 filter 時(shí),累加所有 filter损晤,而不是覆蓋。比如:<dubbo:provider filter="xxx,yyy"/> 和 <dubbo:service filter="aaa,bbb" />喘落,則 xxx,yyy,aaa,bbb 均會(huì)生效。如果要覆蓋瘦棋,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />
接口com.alibaba.dubbo.rpc.Filter
擴(kuò)展配置
<dubbo:reference filter= “xxx,yyy” />
<dubbo:consumer filter= “xxx,yyy” />
<dubbo:service filter= “xxx,yyy” />
<dubbo:provider filter= “xxx,yyy” />
已知擴(kuò)展
com.alibaba.dubbo.rpc.filter.EchoFilter
com.alibaba.dubbo.rpc.filter.GenericFilter
com.alibaba.dubbo.rpc.filter.GenericImplFilter
com.alibaba.dubbo.rpc.filter.TokenFilter
com.alibaba.dubbo.rpc.filter.AccessLogFilter
com.alibaba.dubbo.rpc.filter.CountFilter
com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
com.alibaba.dubbo.rpc.filter.ContextFilter
com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
com.alibaba.dubbo.rpc.filter.ExceptionFilter
com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
com.alibaba.dubbo.rpc.filter.DeprecatedFilter
總結(jié)得比較簡(jiǎn)單赌朋,摘自我的培訓(xùn)課件,適合根據(jù)這個(gè)文章來做分享沛慢,歡迎大家一起交流团甲。