手撕RPC框架

手撕RPC

使用Netty+Zookeeper+Spring實(shí)現(xiàn)簡(jiǎn)易的RPC框架。閱讀本文需要有一些Netty使用基礎(chǔ)温兼。

服務(wù)信息在網(wǎng)絡(luò)傳輸钢坦,需要講服務(wù)類進(jìn)行序列化,服務(wù)端使用Spring作為容器庸毫。服務(wù)端發(fā)布服務(wù),將接口的全路徑當(dāng)做節(jié)點(diǎn)名稱衫樊,服務(wù)的ip+端口作為節(jié)點(diǎn)值飒赃,存儲(chǔ)到Zookeeper中】瞥蓿客戶端調(diào)用的時(shí)候载佳,去Zookeeper查詢,獲得提供該接口的服務(wù)器ip和端口臀栈,通過(guò)Netty進(jìn)行調(diào)用蔫慧。

工程引用的jar包

io.netty

netty-all

4.1.32.Final

org.springframework

spring-context

5.1.3.RELEASE

org.apache.zookeeper

zookeeper

3.4.10

1.請(qǐng)求和響應(yīng)

服務(wù)端和客戶端進(jìn)行通信,完成遠(yuǎn)程服務(wù)調(diào)用权薯,需要統(tǒng)一封裝請(qǐng)求和響應(yīng)姑躲。

請(qǐng)求統(tǒng)一封裝類睡扬,服務(wù)端接收請(qǐng)求之后,通過(guò)反射獲得客戶端傳送的對(duì)象黍析。

package com.test.rpc.pojo;

import java.io.Serializable;

public class Request implements Serializable {

private String serviceName;//調(diào)用的服務(wù)名字

private String methodName;//調(diào)用的方法名

private Class[] parameterTypes;//方法參數(shù)的類型

private Object[] args;//方法的參數(shù)

//getter/setter

}

響應(yīng)統(tǒng)一封裝類卖怜,服務(wù)端返回給客戶端的響應(yīng)

package com.test.rpc.pojo;

import java.io.Serializable;

public class Response implements Serializable {

private Object result;//響應(yīng)結(jié)果

private Exception exception;//響應(yīng)的異常

//getter/setter

}

2.通用工具類

因?yàn)閷?duì)象在網(wǎng)絡(luò)上傳輸需要轉(zhuǎn)換成字節(jié),所以需要序列化阐枣。

序列化工具類马靠,可以替換成第三方,如json蔼两。

package com.test.rpc.util;

import java.io.*;

/**

*?序列化工具甩鳄,可以替換成第三方

*/

public class SerializeUtil {

//序列化

public static byte[] serialize(Object o) throws IOException {

ByteArrayOutputStream baos = null;

ObjectOutputStream oos = null;

try {

//?序列化

baos = new ByteArrayOutputStream();

oos = new ObjectOutputStream(baos);

oos.writeObject(o);

byte[] bytes = baos.toByteArray();

return bytes;

} catch (Exception e) {

e.printStackTrace();

} finally {

oos.close();

baos.close();

}

return null;

}

//?反序列化

public static Object unserialize(byte[] bytes)throws IOException {

ByteArrayInputStream bais = null;

ObjectInputStream ois = null;

try {

bais = new ByteArrayInputStream(bytes);

ois = new ObjectInputStream(bais);

return ois.readObject();

} catch (Exception e) {

e.printStackTrace();

} finally {

bais.close();

ois.close();

}

return null;

}

}

Zookeeper工具類,連接zookeeper的工具

package com.test.rpc.util;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class ZkUtil {

private ZooKeeper zk;

public ZkUtil(String address, int sessionTimeout) {

try {

zk = new ZooKeeper(address, sessionTimeout, null);

} catch (IOException e) {

e.printStackTrace();

}

}

public Stat exist(String path, boolean needWatch) {

try {

return this.zk.exists(path, needWatch);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

public boolean createNode(String path, String data) {

try {

this.exist(path, true);

zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public boolean updateNode(String path, String data) {

try {

this.exist(path, true);

zk.setData(path, data.getBytes(), -1);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public boolean deleteNode(String path) {

try {

this.exist(path, true);

zk.delete(path, -1);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public String getNodeData(String path) {

try {

byte[] data = zk.getData(path, false, null);

return new String(data);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

return null;

}

public void close() {

try {

if (zk != null) {

zk.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.Netty編碼解碼

package com.test.rpc.netty;

import com.test.rpc.util.SerializeUtil;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

/**

*?消息編碼器额划,實(shí)現(xiàn)服務(wù)消息序列化

*/

public class ServiceEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception {

byte[] bytes = SerializeUtil.serialize(o);//序列化

int dataLength = bytes.length;?//讀取消息的長(zhǎng)度

out.writeInt(dataLength);?//先將消息長(zhǎng)度寫入娩贷,也就是消息頭,解決粘包和半包的問(wèn)題

out.writeBytes(bytes);

}

}

package com.test.rpc.netty;

import com.test.rpc.util.SerializeUtil;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**

*?通信解碼,實(shí)現(xiàn)消息的反序列化

*/

public class ServiceDecoder extends ReplayingDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception {

int length = in.readInt();//消息頭锁孟,消息的總長(zhǎng)度

byte[] content = new byte[length];

in.readBytes(content);

Object o = SerializeUtil.unserialize(content);?//byte數(shù)據(jù)轉(zhuǎn)化為我們需要的對(duì)象。

out.add(o);

}

}

4.服務(wù)端代碼

為了發(fā)布服務(wù)方便茁瘦,定義一個(gè)注解品抽,讓Spring掃描此注解,統(tǒng)一發(fā)布服務(wù)

package com.test.rpc.server;

import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

@Target({ElementType.TYPE})

@Retention(RetentionPolicy.RUNTIME)

@Component

public @interface RpcService {

Class value();//發(fā)布的服務(wù)類型

}

注冊(cè)服務(wù)就是把服務(wù)名稱和提供者地址存到Zookeeper中肾扰。

package com.test.rpc.server;

import com.test.rpc.util.ZkUtil;

import org.apache.zookeeper.data.Stat;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.Map;

public class ServerContext {

private static String ip = "127.0.0.1";//服務(wù)端地址

private static int port = 8000;//服務(wù)端口

private static final ApplicationContext applicationContext;//spring容器

public static Object getBean(Class clazz) {//獲得容器中對(duì)象

return applicationContext.getBean(clazz);

}

static {

//創(chuàng)建Spring容器业稼,如果在web環(huán)境的話可以用Listener

applicationContext = new ClassPathXmlApplicationContext("spring-server.xml");

ZkUtil zookeeper = applicationContext.getBean(ZkUtil.class);

//通過(guò)注解獲得bean

Map serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);

if (serviceBeanMap != null) {

for (Object bean : serviceBeanMap.values()) {

RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);

String serviceName = "/" + rpcService.value().getName();//注冊(cè)的節(jié)點(diǎn)名,是接口全路徑

Stat stat= zookeeper.exist(serviceName,false);

if(stat!=null){//如果節(jié)點(diǎn)存在就刪除

zookeeper.deleteNode(serviceName);

}

zookeeper.createNode(serviceName, ip + ":" + port);//注冊(cè)服務(wù)

}

}

}

}

package com.test.rpc.server;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.lang.reflect.Method;

/**

*?處理客戶端發(fā)送的請(qǐng)求

*/

public class ServerRequestHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) throws Exception {

Response response = new Response();

try {

Object result = handleRequest(request);

response.setResult(result);

} catch (Exception e) {

e.printStackTrace();

response.setException(e);

}

//寫入響應(yīng)并關(guān)閉

channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

}

private Object handleRequest(Request request) throws Exception {

String serviceName = request.getServiceName();//被的調(diào)用的服務(wù)

String methodName = request.getMethodName();//被調(diào)用的方法

Class[] parameterTypes = request.getParameterTypes();//方法參數(shù)的類型

Object[] args = request.getArgs();//方法的參數(shù)

//實(shí)例化被調(diào)用的服務(wù)對(duì)象

Class clazz = Class.forName(serviceName);

Object target = ServerContext.getBean(clazz);//從容器獲得對(duì)象

//獲得方法對(duì)象

Method method = clazz.getMethod(methodName, parameterTypes);

Object result = method.invoke(target, args);//執(zhí)行方法

return result;//返回結(jié)果

}

}

服務(wù)啟動(dòng)類

package com.test.rpc.server;

import com.test.rpc.netty.ServiceDecoder;

import com.test.rpc.netty.ServiceEncoder;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ServerMain {

public static void main(String[] args) {

int port=8000;

new ServerContext();

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new ServiceEncoder());//編碼

ch.pipeline().addLast(new ServiceDecoder());//解碼

ch.pipeline().addLast(new ServerRequestHandler());//請(qǐng)求處理

}

})

.option(ChannelOption.SO_BACKLOG, 128)

.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b.bind(port).sync();

f.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

}

5. 待發(fā)布的服務(wù)

package com.test.rpc.service;

public interface HelloService {

String hello(String message);

}

package com.test.rpc.service;

import com.test.rpc.server.RpcService;

@RpcService(HelloService.class)//自定義的注解

public class HelloServiceImpl implements HelloService {

@Override

public String hello(String message) {

return "server response:" + message;

}

}

spring-server.xml配置文件


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

6. 客戶端代碼

package com.test.rpc.client;

import com.test.rpc.netty.ServiceDecoder;

import com.test.rpc.netty.ServiceEncoder;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import com.test.rpc.util.ZkUtil;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import org.springframework.context.ApplicationContext;

public class ClientSender extends SimpleChannelInboundHandler {

private ApplicationContext applicationContext;

public ClientSender(ApplicationContext applicationContext) {

this.applicationContext = applicationContext;

}

private Response response;

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {

this.response = response;//處理響應(yīng)

}

//發(fā)送請(qǐng)求

public Response send(Request request) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(Channel ch) {

ch.pipeline().addLast(new ServiceEncoder());

ch.pipeline().addLast(new ServiceDecoder());

ch.pipeline().addLast(ClientSender.this);

}

});

//去注冊(cè)中心查看服務(wù)發(fā)布者

ZkUtil zk = applicationContext.getBean(ZkUtil.class);

String address = zk.getNodeData("/"+request.getServiceName());

String[] serverAndPort = address.split(":");//查到的服務(wù)提供者地址

//?連接?RPC?服務(wù)器

ChannelFuture future = bootstrap.connect(serverAndPort[0], Integer.parseInt(serverAndPort[1])).sync();

//?寫入?RPC?請(qǐng)求數(shù)據(jù)并關(guān)閉連接

Channel channel = future.channel();

channel.writeAndFlush(request).sync();

channel.closeFuture().sync();

return response;

} finally {

group.shutdownGracefully();

}

}

}

請(qǐng)求動(dòng)態(tài)代理類音比,調(diào)用服務(wù)的時(shí)候需要?jiǎng)?chuàng)建動(dòng)態(tài)代理,把服務(wù)對(duì)象封裝成Request

package com.test.rpc.client;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import org.springframework.beans.BeansException;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

public class RequestProxy implements ApplicationContextAware {

private ApplicationContext applicationContext;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

//動(dòng)態(tài)代理

public T createProxy(final Class interfaceClass) throws Exception {

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Request request = new Request();

request.setServiceName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setParameterTypes(method.getParameterTypes());

request.setArgs(args);

ClientSender sender = new ClientSender(applicationContext);

Response response = sender.send(request);

if (response.getException() != null) {

throw response.getException();

}

return response.getResult();

}

});

}

}

spring-client.xml


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

package com.test.rpc.client;

import com.test.rpc.service.HelloService;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ClientMain {

public static void main(String[] args) throws Exception {

ApplicationContext applicationContext=new ClassPathXmlApplicationContext("spring-client.xml");

RequestProxy proxy=applicationContext.getBean(RequestProxy.class);

HelloService helloService=proxy.createProxy(HelloService.class);

System.out.println(helloService.hello("netty"));

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末盆昙,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子焊虏,更是在濱河造成了極大的恐慌淡喜,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诵闭,死亡現(xiàn)場(chǎng)離奇詭異炼团,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)疏尿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門瘟芝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人褥琐,你說(shuō)我怎么就攤上這事锌俱。” “怎么了敌呈?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵贸宏,是天一觀的道長(zhǎng)造寝。 經(jīng)常有香客問(wèn)我,道長(zhǎng)锚赤,這世上最難降的妖魔是什么匹舞? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮线脚,結(jié)果婚禮上赐稽,老公的妹妹穿的比我還像新娘。我一直安慰自己浑侥,他們只是感情好姊舵,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著寓落,像睡著了一般括丁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伶选,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天史飞,我揣著相機(jī)與錄音,去河邊找鬼仰税。 笑死构资,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的陨簇。 我是一名探鬼主播吐绵,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼河绽!你這毒婦竟也來(lái)了己单?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤耙饰,失蹤者是張志新(化名)和其女友劉穎纹笼,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體榔幸,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡允乐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了削咆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片牍疏。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖拨齐,靈堂內(nèi)的尸體忽然破棺而出鳞陨,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布厦滤,位于F島的核電站援岩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏掏导。R本人自食惡果不足惜享怀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望趟咆。 院中可真熱鬧添瓷,春花似錦、人聲如沸值纱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)虐唠。三九已至搀愧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間疆偿,已是汗流浹背咱筛。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留杆故,地道東北人眷蚓。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像反番,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子叉钥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容