目標(biāo):客戶端遠(yuǎn)程調(diào)用服務(wù)端的一項(xiàng)服務(wù)偎行,具體來說川背,客戶端給服務(wù)端指定具體的類,方法睦优,和參數(shù)信息渗常,服務(wù)端用這些信息完成服務(wù),并將調(diào)用結(jié)果或異常返回給客戶端汗盘。
這個(gè)例子中皱碘,客戶端想要調(diào)用的服務(wù)是 HelloService
public interface HelloService {
String sayHello(String name);
}
具體實(shí)現(xiàn)為 HelloServiceImpl
/**
* 給參數(shù)中的名字返回打招呼
*/
public class HelloServiceImpl implements HelloService{
@Override
public String sayHello(String name) {
return "您好," + name;
}
}
首先隐孽,客戶端與服務(wù)端的這些通訊信息需要載體癌椿,我們把他們封裝成兩個(gè)類,客戶端給服務(wù)端發(fā)送的遠(yuǎn)程調(diào)用Request菱阵,和服務(wù)端返回的Response踢俄。從設(shè)計(jì)考慮,讓他們都繼承與Message這個(gè)類
Message
@Data
public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
static {
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
客戶端的request類晴及,包含了客戶端想要調(diào)用的服務(wù)的一切信息(全類名都办、方法名、參數(shù)...)
RpcRequestMessage
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
// 接口的全限定名
private String interfaceName;
// 調(diào)用的方法名
private String methodName;
// 方法的返回值類
private Class<?> returnType;
// 方法的參數(shù)類型數(shù)組
private Class[] parameterTypes;
// 方法的參數(shù)值數(shù)組
private Object[] parameterValue;
// 一個(gè)可以設(shè)置sequenceID的構(gòu)造器
public RpcRequestMessage(int sequenceID, String interfaceName, String methodName,
Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceID);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return Message.RPC_MESSAGE_TYPE_REQUEST;
}
}
服務(wù)端Request類虑稼,攜帶服務(wù)的返回值或異常值 RpcResponseMessage
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
// 返回值
private Object returnValue;
// 異常值
private Exception exceptionValue;
@Override
public int getMessageType() {
return Message.RPC_MESSAGE_TYPE_RESPONSE;
}
}
好了琳钉,我們現(xiàn)在有了信息的載體,如何在客戶端和服務(wù)端之間進(jìn)行網(wǎng)絡(luò)通信呢蛛倦,這里使用了Netty框架
首先編寫服務(wù)側(cè):RpcServer
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(boss, worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder()) // 處理粘包半包
.addLast(LOGGING_HANDLER) // 日志
.addLast(MESSAGE_CODEC) // 自定義協(xié)議 消息編解碼
.addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
由于本篇旨在實(shí)現(xiàn)rpc歌懒,netty的基本內(nèi)容就不再贅述,代碼中的一些自定義的handler(處理粘包半包問題的溯壶、編解碼協(xié)議的)就不再一一實(shí)現(xiàn)了及皂。服務(wù)側(cè)我們只關(guān)注一個(gè)關(guān)鍵的handler -- RpcRequestMessageHandler
這個(gè)handler專門負(fù)責(zé)處理從客戶端來的 RpcRequestMessage
。
處理思路為
- 先在封裝的request信息中拿到所需服務(wù)的全類名且改,然后根據(jù)全類名拿到服務(wù)的實(shí)現(xiàn)類
- 拿到所需服務(wù)的方法验烧、參數(shù)類型、參數(shù)值又跛、返回類型
- 使用反射調(diào)用方法碍拆,拿到返回值或異常,封裝成response類,返回客戶端
由于沒有整合Spring倔监,需要手寫一個(gè)從接口類拿到實(shí)現(xiàn)類的工具直砂,具體為:
首先,在配置文件(application.properties)中浩习,將服務(wù)的接口類和實(shí)現(xiàn)類綁定
# rpc bean
com.yldog.rpc.HelloService=com.yldog.rpc.HelloServiceImpl
然后一個(gè)工廠
public class ServicesFactory {
static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
static {
try {
InputStream in = MyConfig.class.getResourceAsStream("/application.properties");
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
// 拿到接口類Class對(duì)象
Class<?> interfaceClass = Class.forName(name);
// 拿到接口的實(shí)現(xiàn)類Class對(duì)象
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.getDeclaredConstructor().newInstance());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 根據(jù)接口類獲得實(shí)現(xiàn)類
public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}
最后就能編寫服務(wù)端的handler了
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
// 構(gòu)造一個(gè)響應(yīng)實(shí)例
RpcResponseMessage resp = new RpcResponseMessage();
// 設(shè)置響應(yīng)消息的序號(hào) -- 應(yīng)與請(qǐng)求消息序號(hào)一致
resp.setSequenceId(msg.getSequenceId());
try {
// 通過request message拿到接口類静暂,在通過接口類拿到實(shí)現(xiàn)類
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(msg.getInterfaceName()));
// 拿到方法
Method method = service.getClass().getDeclaredMethod(msg.getMethodName(), msg.getParameterTypes());
// 在剛才拿到的實(shí)現(xiàn)類上調(diào)用方法
Object result = method.invoke(service, msg.getParameterValue());
resp.setReturnValue(result);
} catch (Exception e) {
String message = e.getCause().getMessage();
resp.setExceptionValue(new Exception("遠(yuǎn)程調(diào)用異常:" + message));
} finally {
ctx.writeAndFlush(resp);
}
}
}
接著編寫客戶端
首先要實(shí)現(xiàn)客戶端給服務(wù)端發(fā)遠(yuǎn)程調(diào)用請(qǐng)求。
第一步是先要拿到與服務(wù)端聯(lián)絡(luò)的 SocketChannel
谱秽,再使用channel來進(jìn)行遠(yuǎn)程調(diào)用
@Slf4j
public class RpcClientManager {
private static volatile Channel channel = null;
private static final Object LOCK = new Object();
// 使用DCL保證channel單例
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (LOCK) {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}
// 初始化channel洽蛀,只能初始化一次
private static void initChannel() {
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_HANDLER = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_HANDLER)
.addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 初始化channel后不能在這里阻塞住,所以采用異步
// channel.closeFuture().sync();
channel.closeFuture().addListener(future -> { worker.shutdownGracefully(); });
} catch (InterruptedException e) {
log.debug("Client Error", e);
}
}
}
注意:
-
getChannel()
就可以拿到與服務(wù)端通信的Channel疟赊,由于使用了DLC單例模式郊供,保證了channel的唯一性 - 在處理連接關(guān)閉時(shí),不能使用
channel.closeFuture().sync();
否則代碼阻塞在了initChannel()
中近哟,要使用異步的處理方式
這樣驮审,客戶端就能向服務(wù)端發(fā)送request消息了,具體為:
public static void main(String[] args) {
getChannel().writeAndFlush(new RpcRequestMessage(
1,
"com.yldog.rpc.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"張三"}
));
}
但是這種讓用戶一個(gè)個(gè)填寫參數(shù)的方式吉执,非常的不友好疯淫,理想的方式應(yīng)該是用戶像在本地調(diào)用方法一般,如
service.sayHello("張三")
戳玫,這里的思路是使用代理模式將構(gòu)建請(qǐng)求消息這一步封裝起來熙掺,具體為編寫一個(gè)構(gòu)造代理類的方法:
// 創(chuàng)建代理類
public static <T> T getProxyService(Class<T> serviceClazz) {
ClassLoader classLoader = serviceClazz.getClassLoader();
Class<?>[] interfaces = {serviceClazz};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
// 1. 將方法調(diào)用 轉(zhuǎn)為 消息對(duì)象
int sequenceID = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceID,
serviceClazz.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 發(fā)送消息
getChannel().writeAndFlush(msg);
// 暫時(shí)不考慮如何接收返回值
});
return (T) o;
}
這樣,用戶的使用就變成了:
public static void main(String[] args) {
// getChannel().writeAndFlush(new RpcRequestMessage(
// 1,
// "com.yldog.rpc.HelloService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"張三"}
// ));
// 以上這種遠(yuǎn)程調(diào)用的方法非常不友好咕宿,考慮使用代理封裝
HelloService proxyService = getProxyService(HelloService.class);
proxyService.sayHello("張三");
proxyService.sayHello("李四");
proxyService.sayHello("王五");
}
最后币绩,我們要考慮的就是客戶端如何接收服務(wù)端的返回消息,由于在netty中府阀,調(diào)用遠(yuǎn)程服務(wù)的線程與收到返回結(jié)果的線程并不是一個(gè)線程缆镣,接收返回消息的線程一般在 nioEventLoop
中,所以想要在調(diào)用服務(wù)的線程中獲取服務(wù)的結(jié)果肌似,就涉及到了線程之間異步交換信息费就,具體實(shí)現(xiàn)思路為
- 發(fā)起調(diào)用的線程在發(fā)送了Request請(qǐng)求信息后诉瓦,準(zhǔn)備一個(gè)空的Promise川队,然后等待nio線程將服務(wù)端返回的結(jié)果放入Promise中
- nio線程得到返回的結(jié)果后,將結(jié)果放入Promise中睬澡,并通知調(diào)用的線程固额,實(shí)現(xiàn)異步
首先編寫處理服務(wù)器響應(yīng)信息的handler
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
/**
* 一個(gè)用來傳遞線程之間異步結(jié)果的容器集合
* 某個(gè)線程進(jìn)行遠(yuǎn)程調(diào)用后,開啟一個(gè)Promise容器煞聪,Nio線程收到遠(yuǎn)程結(jié)果后斗躏,將結(jié)果放入那個(gè)線程的Promise中
*/
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
// 拿到屬于這次消息接收的空的promise
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
// 往promise放結(jié)果
if (promise != null) {
if ((msg.getExceptionValue() == null)) { // 若異常值為空,則正常調(diào)用
promise.setSuccess(msg.getReturnValue());
} else { // 異常值不為空昔脯,則調(diào)用異常
promise.setFailure(msg.getExceptionValue());
}
}
}
}
然后完整的客戶端代碼為
@Slf4j
public class RpcClientManager {
private static volatile Channel channel = null;
private static final Object LOCK = new Object();
public static void main(String[] args) {
// getChannel().writeAndFlush(new RpcRequestMessage(
// 1,
// "com.yldog.rpc.HelloService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"張三"}
// ));
// 以上這種遠(yuǎn)程調(diào)用的方法非常不友好啄糙,考慮使用代理封裝
HelloService proxyService = getProxyService(HelloService.class);
proxyService.sayHello("張三");
proxyService.sayHello("李四");
proxyService.sayHello("王五");
}
// 創(chuàng)建代理類
public static <T> T getProxyService(Class<T> serviceClazz) {
ClassLoader classLoader = serviceClazz.getClassLoader();
Class<?>[] interfaces = {serviceClazz};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
// 1. 將方法調(diào)用轉(zhuǎn)為 消息對(duì)象
int sequenceID = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceID,
serviceClazz.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 發(fā)送消息
getChannel().writeAndFlush(msg);
// 3. 準(zhǔn)備一個(gè)這次接收消息專用的Promise對(duì)象 指定promise異步接收結(jié)果的線程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 放入promise集合中笛臣,Nio線程取用時(shí),用sequenceId作為key
RpcResponseMessageHandler.PROMISES.put(sequenceID, promise);
// 4. 等待 Nio線程將返回的結(jié)果放入 promise 中
promise.await();
// 5. 拿到結(jié)果后隧饼,判斷調(diào)用是否正常
if (promise.isSuccess()) {
// 調(diào)用成功
return promise.getNow();
} else {
// 調(diào)用異常
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}
// 使用DCL保證channel單例
public static Channel getChannel() {...}
// 初始化channel沈堡,只能初始化一次
private static void initChannel() {...}
}