手撕RPC
使用Netty+Zookeeper+Spring實(shí)現(xiàn)簡(jiǎn)易的RPC框架。閱讀本文需要有一些Netty使用基礎(chǔ)温兼。
服務(wù)信息在網(wǎng)絡(luò)傳輸钢坦,需要講服務(wù)類進(jìn)行序列化,服務(wù)端使用Spring作為容器庸毫。服務(wù)端發(fā)布服務(wù),將接口的全路徑當(dāng)做節(jié)點(diǎn)名稱衫樊,服務(wù)的ip+端口作為節(jié)點(diǎn)值飒赃,存儲(chǔ)到Zookeeper中】瞥蓿客戶端調(diào)用的時(shí)候载佳,去Zookeeper查詢,獲得提供該接口的服務(wù)器ip和端口臀栈,通過(guò)Netty進(jìn)行調(diào)用蔫慧。
工程引用的jar包
io.netty
netty-all
4.1.32.Final
org.springframework
spring-context
5.1.3.RELEASE
org.apache.zookeeper
zookeeper
3.4.10
1.請(qǐng)求和響應(yīng)
服務(wù)端和客戶端進(jìn)行通信,完成遠(yuǎn)程服務(wù)調(diào)用权薯,需要統(tǒng)一封裝請(qǐng)求和響應(yīng)姑躲。
請(qǐng)求統(tǒng)一封裝類睡扬,服務(wù)端接收請(qǐng)求之后,通過(guò)反射獲得客戶端傳送的對(duì)象黍析。
package com.test.rpc.pojo;
import java.io.Serializable;
public class Request implements Serializable {
private String serviceName;//調(diào)用的服務(wù)名字
private String methodName;//調(diào)用的方法名
private Class[] parameterTypes;//方法參數(shù)的類型
private Object[] args;//方法的參數(shù)
//getter/setter略
}
響應(yīng)統(tǒng)一封裝類卖怜,服務(wù)端返回給客戶端的響應(yīng)
package com.test.rpc.pojo;
import java.io.Serializable;
public class Response implements Serializable {
private Object result;//響應(yīng)結(jié)果
private Exception exception;//響應(yīng)的異常
//getter/setter略
}
2.通用工具類
因?yàn)閷?duì)象在網(wǎng)絡(luò)上傳輸需要轉(zhuǎn)換成字節(jié),所以需要序列化阐枣。
序列化工具類马靠,可以替換成第三方,如json蔼两。
package com.test.rpc.util;
import java.io.*;
/**
*?序列化工具甩鳄,可以替換成第三方
*/
public class SerializeUtil {
//序列化
public static byte[] serialize(Object o) throws IOException {
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
//?序列化
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(o);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
e.printStackTrace();
} finally {
oos.close();
baos.close();
}
return null;
}
//?反序列化
public static Object unserialize(byte[] bytes)throws IOException {
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
try {
bais = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bais);
return ois.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
bais.close();
ois.close();
}
return null;
}
}
Zookeeper工具類,連接zookeeper的工具
package com.test.rpc.util;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class ZkUtil {
private ZooKeeper zk;
public ZkUtil(String address, int sessionTimeout) {
try {
zk = new ZooKeeper(address, sessionTimeout, null);
} catch (IOException e) {
e.printStackTrace();
}
}
public Stat exist(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean createNode(String path, String data) {
try {
this.exist(path, true);
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean updateNode(String path, String data) {
try {
this.exist(path, true);
zk.setData(path, data.getBytes(), -1);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean deleteNode(String path) {
try {
this.exist(path, true);
zk.delete(path, -1);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public String getNodeData(String path) {
try {
byte[] data = zk.getData(path, false, null);
return new String(data);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public void close() {
try {
if (zk != null) {
zk.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.Netty編碼解碼
package com.test.rpc.netty;
import com.test.rpc.util.SerializeUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
*?消息編碼器额划,實(shí)現(xiàn)服務(wù)消息序列化
*/
public class ServiceEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception {
byte[] bytes = SerializeUtil.serialize(o);//序列化
int dataLength = bytes.length;?//讀取消息的長(zhǎng)度
out.writeInt(dataLength);?//先將消息長(zhǎng)度寫入娩贷,也就是消息頭,解決粘包和半包的問(wèn)題
out.writeBytes(bytes);
}
}
package com.test.rpc.netty;
import com.test.rpc.util.SerializeUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
*?通信解碼,實(shí)現(xiàn)消息的反序列化
*/
public class ServiceDecoder extends ReplayingDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception {
int length = in.readInt();//消息頭锁孟,消息的總長(zhǎng)度
byte[] content = new byte[length];
in.readBytes(content);
Object o = SerializeUtil.unserialize(content);?//將byte數(shù)據(jù)轉(zhuǎn)化為我們需要的對(duì)象。
out.add(o);
}
}
4.服務(wù)端代碼
為了發(fā)布服務(wù)方便茁瘦,定義一個(gè)注解品抽,讓Spring掃描此注解,統(tǒng)一發(fā)布服務(wù)
package com.test.rpc.server;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class value();//發(fā)布的服務(wù)類型
}
注冊(cè)服務(wù)就是把服務(wù)名稱和提供者地址存到Zookeeper中肾扰。
package com.test.rpc.server;
import com.test.rpc.util.ZkUtil;
import org.apache.zookeeper.data.Stat;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Map;
public class ServerContext {
private static String ip = "127.0.0.1";//服務(wù)端地址
private static int port = 8000;//服務(wù)端口
private static final ApplicationContext applicationContext;//spring容器
public static Object getBean(Class clazz) {//獲得容器中對(duì)象
return applicationContext.getBean(clazz);
}
static {
//創(chuàng)建Spring容器业稼,如果在web環(huán)境的話可以用Listener
applicationContext = new ClassPathXmlApplicationContext("spring-server.xml");
ZkUtil zookeeper = applicationContext.getBean(ZkUtil.class);
//通過(guò)注解獲得bean
Map serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceBeanMap != null) {
for (Object bean : serviceBeanMap.values()) {
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
String serviceName = "/" + rpcService.value().getName();//注冊(cè)的節(jié)點(diǎn)名,是接口全路徑
Stat stat= zookeeper.exist(serviceName,false);
if(stat!=null){//如果節(jié)點(diǎn)存在就刪除
zookeeper.deleteNode(serviceName);
}
zookeeper.createNode(serviceName, ip + ":" + port);//注冊(cè)服務(wù)
}
}
}
}
package com.test.rpc.server;
import com.test.rpc.pojo.Request;
import com.test.rpc.pojo.Response;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.lang.reflect.Method;
/**
*?處理客戶端發(fā)送的請(qǐng)求
*/
public class ServerRequestHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) throws Exception {
Response response = new Response();
try {
Object result = handleRequest(request);
response.setResult(result);
} catch (Exception e) {
e.printStackTrace();
response.setException(e);
}
//寫入響應(yīng)并關(guān)閉
channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handleRequest(Request request) throws Exception {
String serviceName = request.getServiceName();//被的調(diào)用的服務(wù)
String methodName = request.getMethodName();//被調(diào)用的方法
Class[] parameterTypes = request.getParameterTypes();//方法參數(shù)的類型
Object[] args = request.getArgs();//方法的參數(shù)
//實(shí)例化被調(diào)用的服務(wù)對(duì)象
Class clazz = Class.forName(serviceName);
Object target = ServerContext.getBean(clazz);//從容器獲得對(duì)象
//獲得方法對(duì)象
Method method = clazz.getMethod(methodName, parameterTypes);
Object result = method.invoke(target, args);//執(zhí)行方法
return result;//返回結(jié)果
}
}
服務(wù)啟動(dòng)類
package com.test.rpc.server;
import com.test.rpc.netty.ServiceDecoder;
import com.test.rpc.netty.ServiceEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ServerMain {
public static void main(String[] args) {
int port=8000;
new ServerContext();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServiceEncoder());//編碼
ch.pipeline().addLast(new ServiceDecoder());//解碼
ch.pipeline().addLast(new ServerRequestHandler());//請(qǐng)求處理
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
5. 待發(fā)布的服務(wù)
package com.test.rpc.service;
public interface HelloService {
String hello(String message);
}
package com.test.rpc.service;
import com.test.rpc.server.RpcService;
@RpcService(HelloService.class)//自定義的注解
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String message) {
return "server response:" + message;
}
}
spring-server.xml配置文件
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
6. 客戶端代碼
package com.test.rpc.client;
import com.test.rpc.netty.ServiceDecoder;
import com.test.rpc.netty.ServiceEncoder;
import com.test.rpc.pojo.Request;
import com.test.rpc.pojo.Response;
import com.test.rpc.util.ZkUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.context.ApplicationContext;
public class ClientSender extends SimpleChannelInboundHandler {
private ApplicationContext applicationContext;
public ClientSender(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
private Response response;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response;//處理響應(yīng)
}
//發(fā)送請(qǐng)求
public Response send(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ServiceEncoder());
ch.pipeline().addLast(new ServiceDecoder());
ch.pipeline().addLast(ClientSender.this);
}
});
//去注冊(cè)中心查看服務(wù)發(fā)布者
ZkUtil zk = applicationContext.getBean(ZkUtil.class);
String address = zk.getNodeData("/"+request.getServiceName());
String[] serverAndPort = address.split(":");//查到的服務(wù)提供者地址
//?連接?RPC?服務(wù)器
ChannelFuture future = bootstrap.connect(serverAndPort[0], Integer.parseInt(serverAndPort[1])).sync();
//?寫入?RPC?請(qǐng)求數(shù)據(jù)并關(guān)閉連接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return response;
} finally {
group.shutdownGracefully();
}
}
}
請(qǐng)求動(dòng)態(tài)代理類音比,調(diào)用服務(wù)的時(shí)候需要?jiǎng)?chuàng)建動(dòng)態(tài)代理,把服務(wù)對(duì)象封裝成Request
package com.test.rpc.client;
import com.test.rpc.pojo.Request;
import com.test.rpc.pojo.Response;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class RequestProxy implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
//動(dòng)態(tài)代理
public T createProxy(final Class interfaceClass) throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setServiceName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setArgs(args);
ClientSender sender = new ClientSender(applicationContext);
Response response = sender.send(request);
if (response.getException() != null) {
throw response.getException();
}
return response.getResult();
}
});
}
}
spring-client.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
package com.test.rpc.client;
import com.test.rpc.service.HelloService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ClientMain {
public static void main(String[] args) throws Exception {
ApplicationContext applicationContext=new ClassPathXmlApplicationContext("spring-client.xml");
RequestProxy proxy=applicationContext.getBean(RequestProxy.class);
HelloService helloService=proxy.createProxy(HelloService.class);
System.out.println(helloService.hello("netty"));
}
}