微服務(wù),已經(jīng)是每個互聯(lián)網(wǎng)開發(fā)者必須掌握的一項技術(shù)关贵。而 RPC 框架遇骑,是構(gòu)成微服務(wù)最重要的組成部分之一。趁最近有時間揖曾。又看了看 dubbo 的源碼落萎。dubbo 為了做到靈活和解耦,使用了大量的設(shè)計模式和 SPI機制炭剪,要看懂 dubbo 的代碼也不太容易练链。
按照《徒手擼框架》系列文章的套路,我還是會極簡的實現(xiàn)一個 RPC 框架奴拦。幫助大家理解 RPC 框架的原理媒鼓。
廣義的來講一個完整的 RPC 包含了很多組件,包括服務(wù)發(fā)現(xiàn)错妖,服務(wù)治理隶糕,遠程調(diào)用,調(diào)用鏈分析站玄,網(wǎng)關(guān)等等枚驻。我將會慢慢的實現(xiàn)這些功能,這篇文章主要先講解的是 RPC 的基石株旷,遠程調(diào)用的實現(xiàn)再登。
相信尔邓,讀完這篇文章你也一定可以自己實現(xiàn)一個可以提供 RPC 調(diào)用的框架。
1. RPC 的調(diào)用過程
通過一圖我們來了解一下 RPC 的調(diào)用過程锉矢,從宏觀上來看看到底一次 RPC 調(diào)用經(jīng)過些什么過程梯嗽。
當一次調(diào)用開始:
client 會調(diào)用本地動態(tài)代理 proxy
這個代理會將調(diào)用通過協(xié)議轉(zhuǎn)序列化字節(jié)流
通過 netty 網(wǎng)絡(luò)框架,將字節(jié)流發(fā)送到服務(wù)端
服務(wù)端在受到這個字節(jié)流后沽损,會根據(jù)協(xié)議灯节,反序列化為原始的調(diào)用,利用反射原理調(diào)用服務(wù)方提供的方法
如果請求有返回值绵估,又需要把結(jié)果根據(jù)協(xié)議序列化后炎疆,再通過 netty 返回給調(diào)用方
2. 框架概覽和技術(shù)選型
看一看框架的組件:
clinet就是調(diào)用方。servive是服務(wù)的提供者国裳。protocol包定義了通信協(xié)議形入。common包含了通用的一些邏輯組件。
技術(shù)選型項目使用?maven?作為包管理工具缝左,json?作為序列化協(xié)議亿遂,使用spring boot管理對象的生命周期,netty作為?nio?的網(wǎng)路組件渺杉。所以要閱讀這篇文章蛇数,你需要對spring boot和netty有基本的了解。
下面就看看每個組件的具體實現(xiàn):
3. protocol
其實作為 RPC 的協(xié)議是越,需要考慮只有一個問題–就是怎么把一次方法的調(diào)用苞慢,變成能夠被網(wǎng)絡(luò)傳輸?shù)淖止?jié)流。
首先我們需要定義方法的調(diào)用和返回兩個實體:
請求:
@Data
public class RpcRequest {
? ? // 調(diào)用編號
? ? private String requestId;
? ? // 類名
? ? private String className;
? ? // 方法名
? ? private String methodName;
? ? // 請求參數(shù)的數(shù)據(jù)類型
? ? private Class<?>[] parameterTypes;
? ? // 請求的參數(shù)
? ? private Object[] parameters;
}
結(jié)果:
@Data
public class RpcResponse {
? ? // 調(diào)用編號
? ? private String requestId;
? ? // 拋出的異常
? ? private Throwable throwable;
? ? // 返回結(jié)果
? ? private Object result;
}
確定了英妓,需要序列化的對象,就要確定序列化的協(xié)議绍赛,實現(xiàn)兩個方法蔓纠,序列化和反序列化兩個方法。
public interface Serialization {
? ? <T> byte[] serialize(T obj);
? ? <T> T deSerialize(byte[] data,Class<T> clz);
}
可選用的序列化的協(xié)議很多比如:
jdk 的序列化方法吗蚌。(不推薦腿倚,不利于之后的跨語言調(diào)用)
json 可讀性強,但是序列化速度慢蚯妇,體積大敷燎。
protobuf,kyro箩言,Hessian 等都是優(yōu)秀的序列化框架硬贯,也可按需選擇。
為了簡單和便于調(diào)試陨收,我們就選擇 json 作為序列化協(xié)議饭豹,使用jackson作為 json 解析框架鸵赖。
/**
* @author Zhengxin
*/
public class JsonSerialization implements Serialization {
? ? private ObjectMapper objectMapper;
? ? public JsonSerialization(){
? ? ? ? this.objectMapper = new ObjectMapper();
? ? }
? ? @Override
? ? public <T> byte[] serialize(T obj) {
? ? ? ? try {
? ? ? ? ? ? return objectMapper.writeValueAsBytes(obj);
? ? ? ? } catch (JsonProcessingException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return null;
? ? }
? ? @Override
? ? public <T> T deSerialize(byte[] data, Class<T> clz) {
? ? ? ? try {
? ? ? ? ? ? return objectMapper.readValue(data,clz);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return null;
? ? }
}
因為 netty 支持自定義 coder 。所以只需要實現(xiàn)?ByteToMessageDecoder?和?MessageToByteEncoder?兩個接口拄衰。就解決了序列化的問題:
public class RpcDecoder extends ByteToMessageDecoder {
? ? private Class<?> clz;
? ? private Serialization serialization;
? ? public RpcDecoder(Class<?> clz,Serialization serialization){
? ? ? ? this.clz = clz;
? ? ? ? this.serialization = serialization;
? ? }
? ? @Override
? ? protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
? ? ? ? if(in.readableBytes() < 4){
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? in.markReaderIndex();
? ? ? ? int dataLength = in.readInt();
? ? ? ? if (in.readableBytes() < dataLength) {
? ? ? ? ? ? in.resetReaderIndex();
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? byte[] data = new byte[dataLength];
? ? ? ? in.readBytes(data);
? ? ? ? Object obj = serialization.deSerialize(data, clz);
? ? ? ? out.add(obj);
? ? }
}
public class RpcEncoder extends MessageToByteEncoder {
? ? private Class<?> clz;
? ? private Serialization serialization;
? ? public RpcEncoder(Class<?> clz, Serialization serialization){
? ? ? ? this.clz = clz;
? ? ? ? this.serialization = serialization;
? ? }
? ? @Override
? ? protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
? ? ? ? if(clz != null){
? ? ? ? ? ? byte[] bytes = serialization.serialize(msg);
? ? ? ? ? ? out.writeInt(bytes.length);
? ? ? ? ? ? out.writeBytes(bytes);
? ? ? ? }
? ? }
}
至此它褪,protocol 就實現(xiàn)了,我們就可以把方法的調(diào)用和結(jié)果的返回翘悉,轉(zhuǎn)換為一串可以在網(wǎng)絡(luò)中傳輸?shù)?byte[] 數(shù)組了茫打。
4. server
server 是負責處理客戶端請求的組件。在互聯(lián)網(wǎng)高并發(fā)的環(huán)境下妖混,使用 Nio 非阻塞的方式可以相對輕松的應(yīng)付高并發(fā)的場景老赤。netty 是一個優(yōu)秀的 Nio 處理框架。Server 的關(guān)鍵代碼如下:
netty 是基于 Recotr 模型的源葫。所以需要初始化兩組線程 boss 和 worker 诗越。boss 負責分發(fā)請求,worker 負責執(zhí)行相應(yīng)的 handler:
@Bean
? public ServerBootstrap serverBootstrap() throws InterruptedException {
? ? ? ServerBootstrap serverBootstrap = new ServerBootstrap();
? ? ? serverBootstrap.group(bossGroup(), workerGroup())
? ? ? ? ? ? ? .channel(NioServerSocketChannel.class)
? ? ? ? ? ? ? .handler(new LoggingHandler(LogLevel.DEBUG))
? ? ? ? ? ? ? .childHandler(serverInitializer);
? ? ? Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
? ? ? Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
? ? ? for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
? ? ? ? ? serverBootstrap.option(option, tcpChannelOptions.get(option));
? ? ? }
? ? ? return serverBootstrap;
? }
netty 的操作是基于 pipeline 的息堂。所以我們需要把在 protocol 實現(xiàn)的幾個 coder 注冊到 netty 的 pipeline 中嚷狞。
ChannelPipeline pipeline = ch.pipeline();
// 處理 tcp 請求中粘包的 coder,具體作用可以自行 google
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4));
// protocol 中實現(xiàn)的 序列化和反序列化 coder
pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));
// 具體處理請求的 handler 下文具體解釋
pipeline.addLast(serverHandler);
實現(xiàn)具體的 ServerHandler 用于處理真正的調(diào)用荣堰。
ServerHandler?繼承?SimpleChannelInboundHandler<RpcRequest>床未。簡單來說這個?InboundHandler?會在數(shù)據(jù)被接受時或者對于的 Channel 的狀態(tài)發(fā)生變化的時候被調(diào)用。當這個 handler 讀取數(shù)據(jù)的時候方法?channelRead0()?會被用振坚,所以我們就重寫這個方法就夠了薇搁。
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
? ? RpcResponse rpcResponse = new RpcResponse();
? ? rpcResponse.setRequestId(msg.getRequestId());
? ? try{
? ? ? ? // 收到請求后開始處理請求
? ? ? ? Object handler = handler(msg);
? ? ? ? rpcResponse.setResult(handler);
? ? }catch (Throwable throwable){
? ? ? ? // 如果拋出異常也將異常存入 response 中
? ? ? ? rpcResponse.setThrowable(throwable);
? ? ? ? throwable.printStackTrace();
? ? }
? ? // 操作完以后寫入 netty 的上下文中。netty 自己處理返回值渡八。
? ? ctx.writeAndFlush(rpcResponse);
}
handler(msg) 實際上使用的是 cglib 的 Fastclass 實現(xiàn)的啃洋,其實根本原理,還是反射屎鳍。學好 java 中的反射真的可以為所欲為宏娄。
private Object handler(RpcRequest request) throws Throwable {
? ? Class<?> clz = Class.forName(request.getClassName());
? ? Object serviceBean = applicationContext.getBean(clz);
? ? Class<?> serviceClass = serviceBean.getClass();
? ? String methodName = request.getMethodName();
? ? Class<?>[] parameterTypes = request.getParameterTypes();
? ? Object[] parameters = request.getParameters();
? ? // 根本思路還是獲取類名和方法名,利用反射實現(xiàn)調(diào)用
? ? FastClass fastClass = FastClass.create(serviceClass);
? ? FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
? ? // 實際調(diào)用發(fā)生的地方
? ? return fastMethod.invoke(serviceBean,parameters);
}
總體上來看逮壁,server 的實現(xiàn)不是很困難孵坚。核心的知識點是 netty 的 channel 的使用和 cglib 的反射機制。
5. client
future
其實窥淆,對于我來說卖宠,client 的實現(xiàn)難度,遠遠大于 server 的實現(xiàn)忧饭。netty 是一個異步框架扛伍,所有的返回都是基于 Future 和 Callback 的機制。
所以在閱讀以下文字前強烈推薦词裤,我之前寫的一篇文章?Future 研究蜒秤。利用經(jīng)典的 wite 和 notify 機制汁咏,實現(xiàn)異步的獲取請求的結(jié)果。
/**
* @author zhengxin
*/
public class DefaultFuture {
private RpcResponse rpcResponse;
private volatile boolean isSucceed = false;
private final Object object = new Object();
public RpcResponse getResponse(int timeout){
synchronized (object){
while (!isSucceed){
try {
? ? ? ? ? ? ? ? ? ? //wait
object.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return rpcResponse;
}
}
public void setResponse(RpcResponse response){
if(isSucceed){
return;
}
synchronized (object) {
this.rpcResponse = response;
this.isSucceed = true;
? ? ? ? ? ? //notiy
object.notify();
}
}
}
復(fù)用資源
為了能夠提升 client 的吞吐量作媚,可提供的思路有以下幾種:
使用對象池:建立多個 client 以后保存在對象池中攘滩。但是代碼的復(fù)雜度和維護 client 的成本會很高。
盡可能的復(fù)用 netty 中的 channel纸泡。
之前你可能注意到漂问,為什么要在 RpcRequest 和 RpcResponse 中增加一個 ID。因為 netty 中的 channel 是會被多個線程使用的女揭。當一個結(jié)果異步的返回后蚤假,你并不知道是哪個線程返回的。這個時候就可以考慮利用一個 Map吧兔,建立一個 ID 和 Future 映射磷仰。這樣請求的線程只要使用對應(yīng)的 ID 就能獲取,相應(yīng)的返回結(jié)果境蔼。
/**
* @author Zhengxin
*/
public class ClientHandler extends ChannelDuplexHandler {
? ? // 使用 map 維護 id 和 Future 的映射關(guān)系灶平,在多線程環(huán)境下需要使用線程安全的容器
? ? private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
? ? @Override
? ? public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
? ? ? ? if(msg instanceof RpcRequest){
? ? ? ? ? ? RpcRequest request = (RpcRequest) msg;
? ? ? ? ? ? // 寫數(shù)據(jù)的時候,增加映射
? ? ? ? ? ? futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
? ? ? ? }
? ? ? ? super.write(ctx, msg, promise);
? ? }
? ? @Override
? ? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
? ? ? ? if(msg instanceof RpcResponse){
? ? ? ? ? ? RpcResponse response = (RpcResponse) msg;
? ? ? ? ? ? // 獲取數(shù)據(jù)的時候 將結(jié)果放入 future 中
? ? ? ? ? ? DefaultFuture defaultFuture = futureMap.get(response.getRequestId());
? ? ? ? ? ? defaultFuture.setResponse(response);
? ? ? ? }
? ? ? ? super.channelRead(ctx, msg);
? ? }
? ? public RpcResponse getRpcResponse(String requestId){
? ? ? ? try {
? ? ? ? ? ? // 從 future 中獲取真正的結(jié)果箍土。
? ? ? ? ? ? DefaultFuture defaultFuture = futureMap.get(requestId);
? ? ? ? ? ? return defaultFuture.getResponse(10);
? ? ? ? }finally {
? ? ? ? ? ? // 完成后從 map 中移除逢享。
? ? ? ? ? ? futureMap.remove(requestId);
? ? ? ? }
? ? }
}
這里沒有繼承 server 中的?InboundHandler?而使用了?ChannelDuplexHandler。顧名思義就是在寫入和讀取數(shù)據(jù)的時候吴藻,都會觸發(fā)相應(yīng)的方法瞒爬。寫入的時候在 Map 中保存 ID 和 Future。讀到數(shù)據(jù)的時候從 Map 中取出 Future 并將結(jié)果放入 Future 中沟堡。獲取結(jié)果的時候需要對應(yīng)的 ID侧但。
使用?Transporters?對請求進行封裝。
public class Transporters {
? ? public static RpcResponse send(RpcRequest request){
? ? ? ? NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);
? ? ? ? nettyClient.connect(nettyClient.getInetSocketAddress());
? ? ? ? RpcResponse send = nettyClient.send(request);
? ? ? ? return send;
? ? }
}
動態(tài)代理的實現(xiàn)
動態(tài)代理技術(shù)最廣為人知的應(yīng)用航罗,應(yīng)該就是 Spring Aop禀横,面向切面的編程實現(xiàn)。動態(tài)的在原有方法Before 或者 After 添加代碼伤哺。而 RPC 框架中動態(tài)代理的作用就是徹底替換原有方法,直接調(diào)用遠程方法者祖。
代理工廠類:
public class ProxyFactory {
? ? @SuppressWarnings("unchecked")
? ? public static <T> T create(Class<T> interfaceClass){
? ? ? ? return (T) Proxy.newProxyInstance(
? ? ? ? ? ? ? ? interfaceClass.getClassLoader(),
? ? ? ? ? ? ? ? new Class<?>[]{interfaceClass},
? ? ? ? ? ? ? ? new RpcInvoker<T>(interfaceClass)
? ? ? ? );
? ? }
}
當 proxyFactory 生成的類被調(diào)用的時候立莉,就會執(zhí)行 RpcInvoker 方法。
public class RpcInvoker<T> implements InvocationHandler {
? ? private Class<T> clz;
? ? public RpcInvoker(Class<T> clz){
? ? ? ? this.clz = clz;
? ? }
? ? @Override
? ? public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
? ? ? ? RpcRequest request = new RpcRequest();
? ? ? ? String requestId = UUID.randomUUID().toString();
? ? ? ? String className = method.getDeclaringClass().getName();
? ? ? ? String methodName = method.getName();
? ? ? ? Class<?>[] parameterTypes = method.getParameterTypes();
? ? ? ? request.setRequestId(requestId);
? ? ? ? request.setClassName(className);
? ? ? ? request.setMethodName(methodName);
? ? ? ? request.setParameterTypes(parameterTypes);
? ? ? ? request.setParameters(args);
? ? ? ? return Transporters.send(request).getResult();
? ? }
}
看到這個 invoke 方法七问,主要三個作用蜓耻,
生成 RequestId。
拼裝 RpcRequest械巡。
調(diào)用 Transports 發(fā)送請求刹淌,獲取結(jié)果饶氏。
至此終于,整個調(diào)用鏈完整了有勾。我們終于完成了一次 RPC 調(diào)用疹启。
與 Spring 集成
為了使我們的 client 能夠易于使用我們需要考慮,定義一個自定義注解?@RpcInterface?當我們的項目接入 Spring 以后蔼卡,Spring 掃描到這個注解之后喊崖,自動的通過我們的 ProxyFactory 創(chuàng)建代理對象,并存放在 spring 的 applicationContext 中雇逞。這樣我們就可以通過?@Autowired?注解直接注入使用了荤懂。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}
@Configuration
@Slf4j
public class RpcConfig implements ApplicationContextAware,InitializingBean {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() throws Exception {
Reflections reflections = new Reflections("com.xilidou");
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
? ? ? ? // 獲取 @RpcInterfac 標注的接口
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);
for (Class<?> aClass : typesAnnotatedWith) {
? ? ? ? ? ? // 創(chuàng)建代理對象,并注冊到 spring 上下文塘砸。
beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
}
log.info("afterPropertiesSet is {}",typesAnnotatedWith);
}
}
終于我們最簡單的 RPC 框架就開發(fā)完了节仿。下面可以測試一下。
6. Demo
api
@RpcInterface
public interface IHelloService {
? ? String sayHi(String name);
}
server
IHelloSerivce 的實現(xiàn):
@Service
@Slf4j
public class TestServiceImpl implements IHelloService {
? ? @Override
? ? public String sayHi(String name) {
? ? ? ? log.info(name);
? ? ? ? return "Hello " + name;
? ? }
}
啟動服務(wù):
@SpringBootApplication
public class Application {
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? ConfigurableApplicationContext context = SpringApplication.run(Application.class);
? ? ? ? TcpService tcpService = context.getBean(TcpService.class);
? ? ? ? tcpService.start();
? ? }
}
`
client
@SpringBootApplication()
public class ClientApplication {
? ? public static void main(String[] args) {
? ? ? ? ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
? ? IHelloService helloService = context.getBean(IHelloService.class);
? ? ? ? System.out.println(helloService.sayHi("doudou"));
? ? }
}
運行以后輸出的結(jié)果:
Hello doudou
總結(jié)
終于我們實現(xiàn)了一個最簡版的 RPC 遠程調(diào)用的模塊掉蔬。
想了解更多可加 扣扣?7 9 8 8 9 1 7 1 0