什么是RPC
rpc是遠(yuǎn)程過(guò)程調(diào)用榨婆,在本地代碼中使用模擬調(diào)用本地方法的形式調(diào)用遠(yuǎn)程的服務(wù)過(guò)程窿锉。
RPC的優(yōu)點(diǎn)
- 對(duì)于服務(wù)器端開發(fā)人員而言,容易設(shè)計(jì)轧简、開發(fā)驰坊。
- 對(duì)于消費(fèi)者而言,調(diào)用非常簡(jiǎn)單哮独。
- 便于做集中的監(jiān)控拳芙。
- 基于socket的二進(jìn)制RPC協(xié)議,建立連接延遲低借嗽、網(wǎng)絡(luò)傳輸效率高态鳖。
- 支持有狀態(tài)的長(zhǎng)連接,可進(jìn)行雙向通信恶导,實(shí)時(shí)性好浆竭。
- 在各個(gè)企業(yè)的使用較為成熟,許多企業(yè)都有自己的RPC實(shí)踐惨寿,并已廣泛應(yīng)用在生產(chǎn)環(huán)節(jié)邦泄。
RPC 的缺點(diǎn)
- 緊耦合
- API一旦發(fā)布,就難以再做改動(dòng)裂垦。
- 客戶端必須使用特定的框架顺囊,而且還需引入API包。
- 沒(méi)有統(tǒng)一的設(shè)計(jì)風(fēng)格
- 增加了客戶端開發(fā)人員的學(xué)習(xí)成本
- 難以實(shí)現(xiàn)通用的客戶端庫(kù),每個(gè)RPC框架都有各自的協(xié)議蕉拢。
- 通常以動(dòng)詞的形式設(shè)計(jì)API特碳,一個(gè)功能就增加一個(gè)API,設(shè)計(jì)的時(shí)候很少考慮領(lǐng)域模型晕换。
- 掩蓋了網(wǎng)絡(luò)的復(fù)雜性
- 開發(fā)人員很容易混淆遠(yuǎn)程調(diào)用與本地調(diào)用午乓。
實(shí)際上網(wǎng)絡(luò)調(diào)用與本地調(diào)用是完全不同的,RPC的調(diào)用方式闸准,讓使用者很難意識(shí)到是在進(jìn)行網(wǎng)絡(luò)調(diào)用益愈,忽略了針對(duì)網(wǎng)絡(luò)復(fù)雜性的處理。 - 會(huì)損害用戶(客戶端)可感知的性能
- 開發(fā)人員很容易混淆遠(yuǎn)程調(diào)用與本地調(diào)用午乓。
盡管RPC有許多的優(yōu)點(diǎn)和缺點(diǎn)夷家,但是在互聯(lián)網(wǎng)企業(yè)中應(yīng)用非常廣泛蒸其,具有很高的學(xué)習(xí)價(jià)值。并且對(duì)網(wǎng)絡(luò)库快、IO摸袁、線程等知識(shí)領(lǐng)域都有涉及,是較好的學(xué)習(xí)路徑缺谴。因此我們通過(guò)自己實(shí)現(xiàn)一個(gè)RPC框架來(lái)學(xué)習(xí)和強(qiáng)化相應(yīng)的知識(shí)但惶,并且可以更了解RPC底層的設(shè)計(jì)思路耳鸯,在工作中實(shí)現(xiàn)應(yīng)用代碼的時(shí)候更加心中有數(shù)。
典型RPC的要點(diǎn)
- 服務(wù)發(fā)布
- 序列化協(xié)議
- 服務(wù)響應(yīng)
- 線程模型
- 透明調(diào)用
- 動(dòng)態(tài)代理
- 服務(wù)發(fā)現(xiàn)
- 負(fù)載均衡
第一步,單一直連
實(shí)現(xiàn)目標(biāo)
RPC最核心的部分是服務(wù)發(fā)布和服務(wù)調(diào)用膀曾,服務(wù)動(dòng)態(tài)發(fā)現(xiàn)和負(fù)載均衡都是更為高級(jí)的特性县爬。在第一步,我們先實(shí)現(xiàn)一個(gè)僅支持單一直連和的RPC框架添谊。
- 直連财喳,所謂直連的意思,就是客戶端必須預(yù)先配置好服務(wù)端的信息(包括服務(wù)地址和服務(wù)協(xié)議)斩狱。所以在這一步耳高,不需要服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn),也就不需要服務(wù)注冊(cè)表所踊。
- 單一泌枪,單一是指客戶端預(yù)先配置的provider只有一個(gè),如果客戶端預(yù)先配置的provider list含有兩個(gè)及以上元素秕岛,盡管不使用服務(wù)發(fā)現(xiàn)碌燕,仍然會(huì)存在負(fù)載均衡的問(wèn)題,所以這一步我們只支持配置一個(gè)provider继薛。
發(fā)布服務(wù)
發(fā)布服務(wù)是指服務(wù)端開始提供服務(wù)并能響應(yīng)請(qǐng)求修壕,和服務(wù)注冊(cè)不是一回事情。
在這一步遏考,我們要編寫服務(wù)端代碼慈鸠。服務(wù)端代碼有以下要素:
- 網(wǎng)絡(luò)IO框架,網(wǎng)絡(luò)框架我們采用netty灌具,并使用nio模式青团,netty具備良好的線程模型和豐富的框架功能】ч梗基于netty可以事半功倍壶冒。
- 序列化協(xié)議,序列化協(xié)議使用protestuff截歉,這是參考了google protebuf的純java序列化框架,兼容protobuf協(xié)議烟零。
實(shí)現(xiàn)細(xì)節(jié)
1. 消息傳輸
客戶端的請(qǐng)求稱為RequestMessage瘪松,服務(wù)端響應(yīng)稱為ResponseMessage,請(qǐng)求和響應(yīng)在網(wǎng)絡(luò)中傳遞锨阿,數(shù)據(jù)包會(huì)被拆分或合并宵睦,也就是我們常說(shuō)的拆包和粘包。所以數(shù)據(jù)要能夠根據(jù)一定規(guī)則切分成一幀一幀墅诡,也就是正確還原為消息壳嚎。
常用的消息幀處理方式有:
- 定長(zhǎng)消息,每個(gè)消息體的長(zhǎng)度固定并事先知曉。消息收發(fā)方都按約定的長(zhǎng)度接收烟馅、發(fā)送消息说庭。
- 長(zhǎng)度前置變長(zhǎng)消息,每個(gè)消息的固定頭幾位表示消息的長(zhǎng)度(size)郑趁,后面接著消息內(nèi)容刊驴。通常用4位或8位前綴表示內(nèi)容長(zhǎng)度,前綴解析位整形數(shù)字寡润,該數(shù)字就是消息內(nèi)容大小捆憎。如通常的RPC協(xié)議,消息隊(duì)列梭纹,都是使用這種方式躲惰,這種方式的使用非常廣泛。
- 字符分割变抽,固定的一個(gè)或多個(gè)字符代表一個(gè)消息的結(jié)束础拨。如http協(xié)議使用連續(xù)的兩個(gè)換行符表示消息結(jié)束。
我們采用長(zhǎng)度前置的變長(zhǎng)消息來(lái)處理信息幀瞬沦,對(duì)應(yīng)netty的實(shí)現(xiàn)就是LengthFieldBasedFrameDecoder太伊。
2.io模式&響應(yīng)方式
當(dāng)然是使用nio了」渥辏基于netty的nio編程也非常高效僚焦。
在這個(gè)階段,我們不區(qū)分業(yè)務(wù)線程和IO線程曙痘,直接用netty IO線程調(diào)用provider的業(yè)務(wù)代碼芳悲。再以后的完善過(guò)程加上業(yè)務(wù)線程的區(qū)分。
類定義
類圖如下:
代碼實(shí)現(xiàn)
ServiceExporter
ServiceExporter將服務(wù)發(fā)布到網(wǎng)絡(luò)環(huán)境边坤,負(fù)責(zé)響應(yīng)&解析Request名扛、編碼&發(fā)送Response〖胙鳎總而言之就是負(fù)責(zé)網(wǎng)絡(luò)邊界處理肮韧,不負(fù)責(zé)Request的執(zhí)行。
具體查找Provider旺订、執(zhí)行Request弄企、調(diào)用業(yè)務(wù)代碼、產(chǎn)生Response的功能被委托給ProviderManager区拳。
下面用Netty NIO實(shí)現(xiàn)一個(gè)ServiceExporter拘领。
package io.destinyshine.storks.provider;
/**
*
* 基于Netty NIO實(shí)現(xiàn)的服務(wù)發(fā)布。
* <p>
* 將服務(wù)發(fā)布到網(wǎng)絡(luò)環(huán)境樱调,可以供消費(fèi)端調(diào)用约素,處理消費(fèi)端請(qǐng)求并返回響應(yīng)消息届良。
* </p>
*
* @author liujianyu
* @date 2017/08/10
*/
import ...
@Slf4j
public class NioSocketChannelServiceExporter implements ServiceExporter {
private final int servicePort;
private Channel channel;
private InetSocketAddress localAddress;
private ProviderManager providerManager;
/**
* 使用一個(gè)本地服務(wù)端口構(gòu)造exporter,將通過(guò)指定端口提供服務(wù)圣猎。
* <p>
* 如果localPort為0士葫,代表使用隨機(jī)端口,在服務(wù)啟動(dòng)后通過(guò){@link #getServicePort()}獲取實(shí)際端口样漆。
* </p>
* @param localPort 服務(wù)端口为障,為0可使用隨機(jī)端口。
* @see #getServicePort()
*/
public NioSocketChannelServiceExporter(int localPort) {
this.servicePort = localPort;
}
@Override
public void export() throws InterruptedException {
Objects.requireNonNull(providerManager, "please set providerManager non null before export services.");
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.localAddress(new InetSocketAddress(servicePort));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(Protocol.newFrameDecoder())
.addLast(new ProtostuffDecoder<>(RequestMessage.class, RequestMessage::new))
.addLast(new ProtostuffEncoder<>(ResponseMessage.class))
.addLast(new RequestMessageHandler(providerManager));
}
});
ChannelFuture channelFuture = bootstrap.bind().sync();
this.channel = channelFuture.channel();
this.localAddress = (InetSocketAddress)this.channel.localAddress();
exportCompleted(providerManager);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
protected void exportCompleted(ProviderManager providerManager) {
//export完成后置處理放祟,未來(lái)可用于注冊(cè)服務(wù)到注冊(cè)中心等鳍怨。
}
private ServiceInstance toServiceInstance(ProviderDescriptor desc) {
StorksApplication application = providerManager.getApplication();
ServiceInstance serviceInstance = ServiceInstance.builder()
.appName(application.getAppName())
.protocol("storks")
.host(application.getLocalHost())
.port(getServicePort())
.serviceInterface(desc.getServiceInterface().getName())
.serviceVersion(desc.getServiceVersion())
.build();
return serviceInstance;
}
public int getServicePort() {
return this.localAddress.getPort();
}
@Override
public void setProviderManager(ProviderManager providerManager) {
this.providerManager = providerManager;
}
}
ProviderManager
ServiceExporter只是負(fù)責(zé)搭建網(wǎng)絡(luò)邊界的溝通橋梁,而具體的provider業(yè)務(wù)代碼執(zhí)行由ProviderManager負(fù)責(zé)跪妥。
ServiceExporter負(fù)責(zé)將網(wǎng)絡(luò)請(qǐng)求還原為request鞋喇,以及將response發(fā)送到網(wǎng)絡(luò);而providerManager負(fù)責(zé)查找本地Provider和執(zhí)行request眉撵。
package io.destinyshine.storks.provider;
import ...
/**
* 管理所有的Provider侦香,Provider以{@link ProviderDescriptor}類來(lái)表示,所有的Provider需要注冊(cè)到ProviderManager中纽疟。
* <p>
* ServiceExporter只是負(fù)責(zé)搭建網(wǎng)絡(luò)邊界的溝通橋梁罐韩,而具體的provider業(yè)務(wù)代碼執(zhí)行由ProviderManager負(fù)責(zé)。
* ServiceExporter負(fù)責(zé)將網(wǎng)絡(luò)請(qǐng)求還原為request污朽,以及將response發(fā)送到網(wǎng)絡(luò)散吵;而providerManager負(fù)責(zé)查找本地Provider和執(zhí)行request。
* <p/>
* <p>
* 根據(jù)以上設(shè)計(jì)蟆肆,如果本地ProviderTable沒(méi)有指定的Provider矾睦,則這個(gè)NoProviderDefinedException異常由ProviderManager拋出,而不是ServiceExporter炎功。
*
* </p>
* @author liujianyu
*/
@Slf4j
public class DefaultProviderManager implements ProviderManager {
private Map<String, ProviderDescriptor> localProviderTable = new ConcurrentHashMap<>();
private StorksApplication application;
private boolean working = false;
private List<ServiceProviderListener> serviceProviderListeners = new ArrayList<>();
@Override
public <T> ProviderDescriptor<T> findProvider(String serviceInterface, String version) {
String key = serviceInterface + ":" + version;
ProviderDescriptor<T> providerDescriptor = localProviderTable.get(key);
if (providerDescriptor == null) {
throw new NoProviderDefinedException(
String.format(
"no provider of interface %s defined.",
serviceInterface
)
);
}
return providerDescriptor;
}
@Override
public ResponseMessage execute(RequestMessage request) {
ProviderDescriptor<?> desc = this.findProvider(request.getServiceInterface(), request.getServiceVersion());
Object provider = desc.getProvider();
Class<?>[] parameterClasses = new Class<?>[request.getParameterTypes().length];
for (int i = 0; i < request.getParameterTypes().length; i++) {
try {
parameterClasses[i] = Class.forName(request.getParameterTypes()[i]);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
try {
Method method = provider.getClass().getMethod(request.getMethodName(), parameterClasses);
Object returnValue = method.invoke(provider, request.getParameters());
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setReturnValue(returnValue);
responseMessage.setTraceId(request.getTraceId());
return responseMessage;
} catch (NoSuchMethodException e) {
logger.error(e.getMessage(), e);
} catch (InvocationTargetException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override
public void addProvider(ProviderDescriptor desc) {
String serviceKey = Protocol.serviceKey(desc);
localProviderTable.put(serviceKey, desc);
}
@Override
public void removeProvider(ProviderDescriptor desc) {
String serviceKey = Protocol.serviceKey(desc);
localProviderTable.remove(serviceKey);
}
@Override
public StorksApplication getApplication() {
return application;
}
public void setApplication(StorksApplication application) {
this.application = application;
}
@Override
public Map<String, ProviderDescriptor> getProviders() {
return localProviderTable;
}
}
啟動(dòng)服務(wù)端
在這里我們沒(méi)有和spring集成枚冗,更沒(méi)有編寫spring boot starter。我們先用最原始的方式通過(guò)執(zhí)行main方法和原始API來(lái)啟動(dòng)服務(wù)蛇损。
代碼實(shí)現(xiàn):
public static void main(String[] args) throws Exception {
logger.info("--server main--");
StorksApplication app = new StorksApplication("testProvider");
ProviderDescriptor desc = new ProviderDescriptor(HelloService.class, "1.0.0", new HelloServiceImpl());
NioSocketChannelServiceExporter exporter = new NioSocketChannelServiceExporter(0);
DefaultProviderManager providerManager = new DefaultProviderManager();
providerManager.setApplication(app);
//add provider
providerManager.addProvider(desc);
exporter.setProviderManager(providerManager);
exporter.export();
logger.info("exporter started.");
}
后續(xù)
到目前只實(shí)現(xiàn)了簡(jiǎn)單的服務(wù)發(fā)布赁温,還沒(méi)有客戶端調(diào)用的實(shí)現(xiàn),將在后面實(shí)現(xiàn)淤齐。