說明
java 從零開始手寫 RPC (01) 基于 socket 實現(xiàn)
java 從零開始手寫 RPC (02)-netty4 實現(xiàn)客戶端和服務(wù)端
寫完了客戶端和服務(wù)端启盛,那么如何實現(xiàn)客戶端和服務(wù)端的調(diào)用呢姊氓?
下面就讓我們一起來看一下。
接口定義
計算方法
package com.github.houbb.rpc.common.service;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
/**
* <p> 計算服務(wù)接口 </p>
*
* <pre> Created: 2018/8/24 下午4:47 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.1
*/
public interface Calculator {
/**
* 計算加法
* @param request 請求入?yún)? * @return 返回結(jié)果
*/
CalculateResponse sum(final CalculateRequest request);
}
pojo
對應(yīng)的參數(shù)對象:
- CalculateRequest
package com.github.houbb.rpc.common.model;
import java.io.Serializable;
/**
* <p> 請求入?yún)?</p>
*
* <pre> Created: 2018/8/24 下午5:05 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.3
*/
public class CalculateRequest implements Serializable {
private static final long serialVersionUID = 6420751004355300996L;
/**
* 參數(shù)一
*/
private int one;
/**
* 參數(shù)二
*/
private int two;
public CalculateRequest() {
}
public CalculateRequest(int one, int two) {
this.one = one;
this.two = two;
}
//getter setter toString
}
- CalculateResponse
package com.github.houbb.rpc.common.model;
import java.io.Serializable;
/**
* <p> 請求入?yún)?</p>
*
* <pre> Created: 2018/8/24 下午5:05 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.3
*/
public class CalculateResponse implements Serializable {
private static final long serialVersionUID = -1972014736222511341L;
/**
* 是否成功
*/
private boolean success;
/**
* 二者的和
*/
private int sum;
public CalculateResponse() {
}
public CalculateResponse(boolean success, int sum) {
this.success = success;
this.sum = sum;
}
//getter setter toString
}
客戶端
核心部分
RpcClient 需要添加對應(yīng)的 Handler蟀伸,調(diào)整如下:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new CalculateRequestEncoder())
.addLast(new CalculateResponseDecoder())
.addLast(new RpcClientHandler());
}
})
.connect(RpcConstant.ADDRESS, port)
.syncUninterruptibly();
netty 中的 handler 泳道設(shè)計的非常優(yōu)雅蚀同,讓我們的代碼可以非常靈活地進行拓展。
接下來我們看一下對應(yīng)的實現(xiàn)啊掏。
RpcClientHandler
package com.github.houbb.rpc.client.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* <p> 客戶端處理類 </p>
*
* <pre> Created: 2019/10/16 11:30 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcClient.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
CalculateRequest request = new CalculateRequest(1, 2);
ctx.writeAndFlush(request);
log.info("[Client] request is :{}", request);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
CalculateResponse response = (CalculateResponse)msg;
log.info("[Client] response is :{}", response);
}
}
這里比較簡單蠢络,channelActive 中我們直接發(fā)起調(diào)用,入?yún)⒌膶ο鬄榱撕唵纬倜郏颂幑潭▽懰馈?/p>
channelRead0 中監(jiān)聽服務(wù)端的相應(yīng)結(jié)果刹孔,并做日志輸出。
CalculateRequestEncoder
請求參數(shù)是一個對象娜睛,netty 是無法直接傳輸?shù)乃柘迹覀儗⑵滢D(zhuǎn)換為基本對象:
package com.github.houbb.rpc.client.encoder;
import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
int one = msg.getOne();
int two = msg.getTwo();
out.writeInt(one);
out.writeInt(two);
}
}
CalculateResponseDecoder
針對服務(wù)端的響應(yīng),也是同理微姊。
我們需要把基本的類型酸茴,封裝轉(zhuǎn)換為我們需要的對象。
package com.github.houbb.rpc.client.decoder;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 響應(yīng)參數(shù)解碼
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
boolean success = in.readBoolean();
int sum = in.readInt();
CalculateResponse response = new CalculateResponse(success, sum);
out.add(response);
}
}
服務(wù)端
設(shè)置處理類
RpcServer 中的處理類要稍微調(diào)整一下兢交,其他的保持不變薪捍。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
// 打印日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new CalculateRequestDecoder())
.addLast(new CalculateResponseEncoder())
.addLast(new RpcServerHandler());
}
})
// 這個參數(shù)影響的是還沒有被accept 取出的連接
.option(ChannelOption.SO_BACKLOG, 128)
// 這個參數(shù)只是過一段時間內(nèi)客戶端沒有響應(yīng),服務(wù)端會發(fā)送一個 ack 包,以判斷客戶端是否還活著酪穿。
.childOption(ChannelOption.SO_KEEPALIVE, true);
RpcServerHandler
一開始這里是空實現(xiàn)凳干,我們來添加一下對應(yīng)的實現(xiàn)。
package com.github.houbb.rpc.server.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 0.0.1
*/
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = ctx.channel().id().asLongText();
CalculateRequest request = (CalculateRequest)msg;
log.info("[Server] receive channel {} request: {} from ", id, request);
Calculator calculator = new CalculatorService();
CalculateResponse response = calculator.sum(request);
// 回寫到 client 端
ctx.writeAndFlush(response);
log.info("[Server] channel {} response {}", id, response);
}
}
讀取到客戶端的訪問之后被济,我們獲取到計算的入?yún)?CalculateRequest救赐,然后調(diào)用 sum 方法,獲取到對應(yīng)的 CalculateResponse只磷,將結(jié)果通知客戶端经磅。
CalculateRequestDecoder
這里和客戶端是一一對應(yīng)的,我們首先把 netty 傳遞的基本類型轉(zhuǎn)換為 CalculateRequest 對象钮追。
package com.github.houbb.rpc.server.decoder;
import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 請求參數(shù)解碼
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int one = in.readInt();
int two = in.readInt();
CalculateRequest request = new CalculateRequest(one, two);
out.add(request);
}
}
CalculateResponseEncoder
這里和客戶端類似预厌,我們需要把 response 轉(zhuǎn)換為基本類型進行網(wǎng)絡(luò)傳輸。
package com.github.houbb.rpc.server.encoder;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
boolean success = msg.isSuccess();
int result = msg.getSum();
out.writeBoolean(success);
out.writeInt(result);
}
}
CalculatorService
服務(wù)端對應(yīng)的實現(xiàn)類元媚。
public class CalculatorService implements Calculator {
@Override
public CalculateResponse sum(CalculateRequest request) {
int sum = request.getOne()+request.getTwo();
return new CalculateResponse(true, sum);
}
}
測試
服務(wù)端
啟動服務(wù)端:
new RpcServer().start();
服務(wù)端啟動日志:
[DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)開始啟動服務(wù)端
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xd399474f] REGISTERED
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xd399474f] BIND: 0.0.0.0/0.0.0.0:9527
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)端啟動完成轧叽,監(jiān)聽【9527】端口
客戶端
啟動客戶端:
new RpcClient().start();
日志如下:
[DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)開始啟動客戶端
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x4d75c580] REGISTERED
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x4d75c580] CONNECT: /127.0.0.1:9527
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] ACTIVE
[INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)啟動客戶端完成,監(jiān)聽端口:9527
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] WRITE: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 00 00 00 02 |........ |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] FLUSH
[INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelActive] - [Client] request is :CalculateRequest{one=1, two=2}
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ: 5B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 03 |..... |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ COMPLETE
[INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}
可以看到刊棕,輸出了對應(yīng)的請求參數(shù)和響應(yīng)結(jié)果炭晒。
當(dāng)然,此時服務(wù)端也有對應(yīng)的新增日志:
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L:/127.0.0.1:9527 - R:/127.0.0.1:54030]
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE
[INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927
[INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from
[INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}
小結(jié)
為了便于大家學(xué)習(xí)甥角,以上源碼已經(jīng)開源:
希望本文對你有所幫助网严,如果喜歡,歡迎點贊收藏轉(zhuǎn)發(fā)一波蜈膨。
我是老馬屿笼,期待與你的下次相遇。