【手擼RPC框架】SpringBoot+Netty4實現(xiàn)RPC框架
線程模型
簡單了解React線程模型押赊,參考文章【五分鐘快速理解 Reactor 模型】
線程模型1:傳統(tǒng)阻塞 I/O 服務模型
模型特點:
- 采用阻塞
IO
模式獲取輸入的數(shù)據(jù) - 每個鏈接都需要獨立的線程完成數(shù)據(jù)的輸入疼约,業(yè)務處理、數(shù)據(jù)返回蹦锋。
問題分析:
- 當并發(fā)數(shù)很大炕置,就會創(chuàng)建大量的線程荣挨,占用很大系統(tǒng)資源
- 連接創(chuàng)建后男韧,如果當前線程暫時沒有數(shù)據(jù)可讀,該線程會阻塞在
read
操作默垄,造成線程資源浪費此虑。
線程模型2:Reactor 模式
針對傳統(tǒng)阻塞I/O服務模型的2個缺點,解決方案如下:
- 基于
I/O
復用模型:多個連接共用一個阻塞對象厕倍,應用程序只需要在一個阻塞對象等待寡壮,無需阻塞等待所有連接。當某個連接有新的數(shù)據(jù)可以處理時讹弯,操作系統(tǒng)通知應用程序况既,線程從阻塞狀態(tài)返回,開始進行業(yè)務處理组民。Reactor
對應的叫法: 1. 反應器模式 2. 分發(fā)者模式(Dispatcher
) 3. 通知者模式(notifier
) - 基于線程池復用線程資源:不必再為每個連接創(chuàng)建線程棒仍,將連接完成后的業(yè)務處理任務分配給線程進行處理,一個線程可以處理多個連接的業(yè)務臭胜。
單 Reactor 單線程
模型分析
- 優(yōu)點:模型簡單莫其,沒有多線程、進程通信耸三、競爭的問題乱陡,全部都在一個線程中完成
- 缺點:性能問題,只有一個線程仪壮,無法完全發(fā)揮多核 CPU 的性能憨颠。Handler 在處理某個連接上的業(yè)務時,整個進程無法處理其他連接事件积锅,很容易導致性能瓶頸
- 缺點:可靠性問題爽彤,線程意外終止,或者進入死循環(huán)缚陷,會導致整個系統(tǒng)通信模塊不可用适篙,不能接收和處理外部消息,造成節(jié)點故障
- 使用場景:客戶端的數(shù)量有限箫爷,業(yè)務處理非橙陆冢快速,比如 Redis在業(yè)務處理的時間復雜度 O(1) 的情況
單 Reactor 多線程
模型分析
-
優(yōu)點:可以充分的利用多核
cpu
的處理能力 -
缺點:多線程數(shù)據(jù)共享和訪問比較復雜虎锚,
reactor
處理所有的事件的監(jiān)聽和響應硫痰,在單線程運行, 在高并發(fā)場景容易出現(xiàn)性能瓶頸.
主從 Reactor 多線程
模型分析
- 優(yōu)點:父線程與子線程的數(shù)據(jù)交互簡單職責明確翁都,父線程只需要接收新連接碍论,子線程完成后續(xù)的業(yè)務處理谅猾。
- 優(yōu)點:父線程與子線程的數(shù)據(jù)交互簡單柄慰,Reactor 主線程只需要把新連接傳給子線程鳍悠,子線程無需返回數(shù)據(jù)
- 缺點:編程復雜度較高
- 結合實例:這種模型在許多項目中廣泛使用,包括 Nginx 主從 Reactor 多進程模型坐搔,Memcached 主從多線程藏研,Netty 主從多線程模型的支持
先實現(xiàn)簡單的Netty通信
服務端示例
public static void main(String[] args) {
//創(chuàng)建連接線程組,線程數(shù)為1概行。只負責處理連接請求
NioEventLoopGroup boss = new NioEventLoopGroup(1);
//創(chuàng)建工作線程組蠢挡,線程數(shù)默認為cpu核數(shù)*2。處理與客戶端的業(yè)務處理
NioEventLoopGroup worker = new NioEventLoopGroup();
//創(chuàng)建Server端的啟動對象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置線程組
serverBootstrap.group(boss, worker)
//使用 NioServerSocketChannel 作為服務器的通道實現(xiàn)
.channel(NioServerSocketChannel.class)
//給worker線程組初始化處理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
//添加字符串的編解碼器
.addLast(new StringDecoder())
.addLast(new StringEncoder())
//添加對象的編解碼器凳忙,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器业踏,防止內存溢出
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
//添加自定義的業(yè)務處理器
.addLast(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端連接啦。涧卵。勤家。客戶端地址:{}", ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
log.info("服務端接收到的數(shù)據(jù):{}", o.toString());
//價值1個億的AI代碼
String str = o.toString();
str = str.replace("嗎", "");
str = str.replace("柳恐?", "伐脖!");
str = str.replace("? ", "乐设! ");
channelHandlerContext.writeAndFlush(str);
}
});
}
});
//啟動并且監(jiān)聽
ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly();
//監(jiān)聽關閉通道
channelFuture.channel().closeFuture();
}
客戶端示例
public static void main(String[] args) {
//設置客戶端工作線程
NioEventLoopGroup worker = new NioEventLoopGroup();
//創(chuàng)建客戶端啟動對象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
//通道連接者
.channel(NioSocketChannel.class)
//給worker線程組初始化處理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
//添加字符串的編解碼器
.addLast(new StringDecoder())
.addLast(new StringEncoder())
//添加對象的編解碼器讼庇,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
//添加自定義的業(yè)務處理器
.addLast(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("哈哈哈");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
log.info("客戶端接收到的數(shù)據(jù):{}", o.toString());
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly();
//客戶端需要輸入信息近尚,創(chuàng)建一個掃描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通過channel發(fā)送到服務器端
channel.writeAndFlush(msg + "\r\n");
}
channelFuture.channel().closeFuture();
}
快啟動試試看把蠕啄,不過需要注意的是,得先啟動服務端哦~
SpringBoot + Netty4實現(xiàn)rpc框架
好了肿男,接下來就讓我們進入正題介汹,讓我們利用我們所學的知識去實現(xiàn)自己一個簡單的rpc框架吧
簡單說下RPC(Remote Procedure Call)遠程過程調用,簡單的理解是一個節(jié)點請求另一個節(jié)點提供的服務舶沛。讓兩個服務之間調用就像調用本地方法一樣嘹承。
RPC時序圖:
RPC流程:
- 【客戶端】發(fā)起調用
- 【客戶端】數(shù)據(jù)編碼
- 【客戶端】發(fā)送編碼后的數(shù)據(jù)到服務端
- 【服務端】接收客戶端發(fā)送的數(shù)據(jù)
- 【服務端】對數(shù)據(jù)進行解碼
- 【服務端】處理消息業(yè)務并返回結果值
- 【服務端】對結果值編碼
- 【服務端】將編碼后的結果值回傳給客戶端
- 【客戶端】接收結果值
- 【客戶端】解碼結果值
- 【客戶端】處理返回數(shù)據(jù)業(yè)務
引入依賴
<dependencies>
<!-- SpringBoot依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring容器上下文 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<!-- Spring配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Netty4 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.58.Final</version>
</dependency>
<!-- 工具 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>
</dependencies>
編寫服務端
自定義消息協(xié)議:
/**
* @author zc
* @date 2021/3/1 17:43
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcMessage implements Serializable {
private static final long serialVersionUID = 430507739718447406L;
/**
* interface接口名
*/
private String name;
/**
* 方法名
*/
private String methodName;
/**
* 參數(shù)類型
*/
private Class<?>[] parTypes;
/**
* 參數(shù)
*/
private Object[] pars;
/**
* 結果值
*/
private Object result;
}
自定義Rpc注解:
/**
* @author zc
* @date 2021/3/2 15:36
*/
@Target(value = {ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcServer {
}
定義ServerHandle
業(yè)務處理器:
/**
* Netty Server端Handle處理類,消息體RpcMessage
* 實現(xiàn)ApplicationContextAware接口:該接口可以加載獲取到所有的 spring bean如庭。
* 實現(xiàn)了這個接口的bean叹卷,當spring容器初始化的時候,會自動的將ApplicationContext注入進來
*
* @author ZC
* @date 2021/3/1 22:15
*/
@Slf4j
@ChannelHandler.Sharable
public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware {
private Map<String, Object> serviceMap;
/**
* 在類被Spring容器加載時會自動執(zhí)行setApplicationAware
*
* @param applicationContext Spring上下文
* @throws BeansException 異常信息
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//從Spring容器中獲取到所有擁有@RpcServer注解的Beans集合坪它,Map<Name(對象類型骤竹,對象全路徑名),實例對象>
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class);
log.info("被@RpcServer注解加載的Bean: {}", beansWithAnnotation);
if (beansWithAnnotation.size() > 0) {
Map<String, Object> map = new ConcurrentHashMap<>(16);
for (Object o : beansWithAnnotation.values()) {
//獲取該實例對象實現(xiàn)的接口Class
Class<?> anInterface = o.getClass().getInterfaces()[0];
//獲取該接口類名,作為Key往毡,實例對象作為Value
map.put(anInterface.getName(), o);
}
//使用變量接住map
serviceMap = map;
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端連接了: {}", ctx.channel().remoteAddress());
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("異常信息");
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
log.info("客戶端發(fā)送的消息:{}", rpcMessage);
//從Map中獲取實例對象
Object service = serviceMap.get(rpcMessage.getName());
//獲取調用方法
Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes());
method.setAccessible(true);
//反射調用實例對象方法蒙揣,獲取返回值
Object result = method.invoke(service, rpcMessage.getPars());
rpcMessage.setResult(JSONUtil.toJsonStr(result));
log.info("回給客戶端的消息:{}", rpcMessage);
//Netty服務端將數(shù)據(jù)寫會Channel并發(fā)送給客戶端,同時添加一個監(jiān)聽器开瞭,當所有數(shù)據(jù)包發(fā)送完成后懒震,關閉通道
channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE);
}
}
定義NettyServer
端:
/**
* Netty服務端
*
* @author zc
* @date 2021/2/24 13:23
**/
@Slf4j
public class NettyServer {
/**
* server端處理器
*/
private final ServerHandle serverHandle;
/**
* 服務端通道
*/
private Channel channel;
/**
* 構造器
*
* @param serverHandle server處理器
*/
public NettyServer(ServerHandle serverHandle) {
this.serverHandle = serverHandle;
}
/**
* 啟動
*
* @param port 啟動端口
*/
public void start(int port) {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(serverHandle);
}
});
final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
log.info("服務端啟動-端口: {}", port);
channel = channelFuture.channel();
channel.closeFuture().syncUninterruptibly();
} catch (Exception e) {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
/**
* 關閉當前通道
*/
public void stop() {
channel.close();
}
}
自定義rpc配置屬性類:
/**
* @author zc
* @date 2021/3/4 23:38
*/
@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyRpcProperties {
private int serverPort;
}`
創(chuàng)建Server端啟動配置類:
/**
* NettyServer服務端配置類
*
* @author zc
* @date 2021/3/1 18:24
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(NettyRpcProperties.class)
public class ServerBeanConfig {
private final NettyRpcProperties nettyRpcProperties;
@Autowired
public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) {
this.nettyRpcProperties = nettyRpcProperties;
}
/**
* 配置ServerHandle
*
* @return ServerHandle處理類
*/
@Bean
public ServerHandle serverHandle() {
return new ServerHandle();
}
/**
* 配置NettyServer
*
* @param handle ServerHandle處理類
* @return NettyServer
*/
@Bean
public NettyServer nettyServer(ServerHandle handle) {
NettyServer nettyServer = new NettyServer(handle);
// nettyServer.start(nettyRpcProperties.getServerPort());
return nettyServer;
}
/**
* 解決SpringBoot端口無法監(jiān)聽問題
*/
@Component
static class NettyServerStart implements ApplicationRunner {
private final NettyServer nettyServer;
private final NettyRpcProperties properties;
@Autowired
NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) {
this.nettyServer = nettyServer;
this.properties = properties;
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("===============ApplicationRunner");
if (nettyServer != null) {
nettyServer.start(properties.getServerPort());
}
}
}
}
注入Spring容器
此時有兩種方式讓該配置自動注入Spring容器生效:
-
自動注入
在resource目錄下創(chuàng)建META-INF目錄罩息,創(chuàng)建spring.factories文件
在該文件里寫上
org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路徑:xxx.xxx.xxx}.${配置類:ServerBeanConfig}
配置好之后,在SpringBoot啟動時會自動加載該配置類个扰。
-
通過注解注入
/** * 自定義SpringBoot啟動注解 * 注入ServerBeanConfig配置類 * * @author ZC * @date 2021/3/1 23:48 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ServerBeanConfig.class}) public @interface EnableNettyServer { }
編寫客戶端
創(chuàng)建客戶端處理器`ClientHandle
/**
* @author zc
* @date 2021/3/2 15:19
*/
@Slf4j
@ChannelHandler.Sharable
public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {
/**
* 定義消息Map瓷炮,將連接通道Channel作為key,消息返回值作為value
*/
private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;
public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) {
this.rpcMessageConcurrentMap = rpcMessageConcurrentMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
log.info("客戶端收到服務端消息:{}", rpcMessage);
rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage);
}
}
創(chuàng)建客戶端啟動類NettyClient
/**
* @author ZC
* @date 2021/3/1 23:30
*/
@Slf4j
public class NettyClient {
private Channel channel;
/**
* 存放請求編號與響應對象的映射關系
*/
private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();
public RpcMessage send(int port, final RpcMessage rpcMessage) {
//客戶端需要一個事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(new ClientHandle(rpcMessageConcurrentMap));
}
});
final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
log.info("連接服務端成功: " + channelFuture.channel().remoteAddress());
channel = channelFuture.channel();
channel.writeAndFlush(rpcMessage);
log.info("發(fā)送數(shù)據(jù)成功:{}", rpcMessage);
channel.closeFuture().syncUninterruptibly();
return rpcMessageConcurrentMap.get(channel);
} catch (Exception e) {
log.error("client exception", e);
return null;
} finally {
group.shutdownGracefully();
//移除請求編號和響應對象直接的映射關系
rpcMessageConcurrentMap.remove(channel);
}
}
public void stop() {
channel.close();
}
}
定義Netty客戶端Bean后置處理器
/**
* Netty客戶端Bean后置處理器
* 實現(xiàn)Spring后置處理器接口:BeanPostProcessor
* 在Bean對象在實例化和依賴注入完畢后递宅,在顯示調用初始化方法的前后添加我們自己的邏輯娘香。注意是Bean實例化完畢后及依賴注入完成后觸發(fā)的
*
* @author ZC
* @date 2021/3/2 23:00
*/
@Slf4j
public class NettyClientBeanPostProcessor implements BeanPostProcessor {
private final NettyClient nettyClient;
public NettyClientBeanPostProcessor(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 實例化、依賴注入完畢办龄,在調用顯示的初始化之前完成一些定制的初始化任務
* 注意:方法返回值不能為null
* 如果返回null那么在后續(xù)初始化方法將報空指針異澈嬲溃或者通過getBean()方法獲取不到Bean實例對象
* 因為后置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
*/
@Override
public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException {
//獲取實例Class
Class<?> beanClass = bean.getClass();
do {
//獲取該類所有字段
Field[] fields = beanClass.getDeclaredFields();
for (Field field : fields) {
//判斷該字段是否擁有@RpcServer
if (field.getAnnotation(RpcServer.class) != null) {
field.setAccessible(true);
try {
//通過JDK動態(tài)代理獲取該類的代理對象
Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient));
//將代理類注入該字段
field.set(bean, o);
log.info("創(chuàng)建代理類 ===>>> {}", beanName);
} catch (IllegalAccessException e) {
log.error(e.getMessage());
}
}
}
} while ((beanClass = beanClass.getSuperclass()) != null);
return bean;
}
/**
* 實例化、依賴注入俐填、初始化完畢時執(zhí)行
* 注意:方法返回值不能為null
* 如果返回null那么在后續(xù)初始化方法將報空指針異尘饕Γ或者通過getBean()方法獲取不到Bean實例對象
* 因為后置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 可以根據(jù)beanName不同執(zhí)行不同的處理操作
return bean;
}
/**
* JDK動態(tài)代理處理器
*/
static class ClientInvocationHandle implements InvocationHandler {
private final NettyClient nettyClient;
public ClientInvocationHandle(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 代理方法調用
*
* @param proxy 代理類
* @param method 方法
* @param args 參數(shù)
* @return 返回值
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
//組裝Netty參數(shù)
RpcMessage rpcMessage = RpcMessage.builder()
.name(method.getDeclaringClass().getName())
.methodName(method.getName())
.parTypes(method.getParameterTypes())
.pars(args)
.build();
//調用Netty,發(fā)送數(shù)據(jù)
RpcMessage send = nettyClient.send(1111, rpcMessage);
log.info("接收到服務端數(shù)據(jù):{}, 返回結果值 ====》》》》{}", send, send.getResult());
return send.getResult();
}
}
}
定義客戶端配置類
/**
* @author zc
* @date 2021/3/1 18:24
*/
@Configuration
public class ClientBeanConfig {
@Bean
public NettyClient nettyClient() {
return new NettyClient();
}
@Bean
public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) {
return new NettyClientBeanPostProcessor(nettyClient);
}
}
最后和服務端一樣玷禽,注入Spring容器
/**
* @author ZC
* @date 2021/3/1 23:48
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@ImportAutoConfiguration({ClientBeanConfig.class})
public @interface EnableNettyClient {
}
至此我們的SpringBoot + Netty4的就已經(jīng)實現(xiàn)了最最簡單的rpc框架模式了赫段;然后我們就可以引用我們自己的rpc依賴了。
最后再執(zhí)行一下maven命令
mvn install
netty-rpc-examples例子
接口服務
pom里啥也沒有矢赁。糯笙。。
定義一個接口
/**
* @author zc
* @date 2021/3/1 17:55
*/
public interface Test1Api {
void test();
void test(int id, String name);
String testStr(int id);
Object testObj();
}
rpc-server服務端
正常的SpringBoot工程
引入pom
<!-- 自定義rpc依賴 -->
<dependency>
<groupId>cn.happyloves.rpc</groupId>
<artifactId>netty-rpc</artifactId>
<version>0.0.1</version>
</dependency>
<!-- 接口依賴 -->
<dependency>
<groupId>cn.happyloves.netty.rpc.examples.api</groupId>
<artifactId>rpc-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
配置屬性
# 應用名稱
spring.application.name=rpc-server
# 應用服務 WEB 訪問端口
server.port=8080
netty.server-port=1111
創(chuàng)建一個實體類
/**
* @author ZC
* @date 2021/3/2 23:59
*/
@Data
public class Account implements Serializable {
private static final long serialVersionUID = 667178018106218163L;
private Integer id;
private String name;
private String username;
private String password;
}
創(chuàng)建Server實現(xiàn)Test1Api接口
/**
* @author ZC
* @date 2021/3/2 23:59
*/
@Slf4j
@Service
@RpcServer
public class TestServiceImpl implements Test1Api {
@Override
public void test() {
log.info("111111111");
}
@Override
public void test(int id, String name) {
log.info("222222222,{},{}", id, name);
}
@Override
public String testStr(int id) {
log.info("33333333333333333,{}", id);
return "33333333333333333 " + id;
}
@Override
public Object testObj() {
log.info("444444444444444444");
Account account = new Account();
account.setName("張三");
return account;
}
}
最后在SpringBoot啟動類上加上@EnableNettyServer
/**
* @author ZC
* @date 2021/3/2 23:55
*/
@EnableNettyServer
@SpringBootApplication
public class RpcServerApplication {
public static void main(String[] args) {
SpringApplication.run(RpcServerApplication.class, args);
}
}
rpc-server客戶端
引入pom依賴
<dependency>
<groupId>cn.happyloves.rpc</groupId>
<artifactId>netty-rpc</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>cn.happyloves.netty.rpc.examples.api</groupId>
<artifactId>rpc-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
創(chuàng)建Controller
/**
* @author ZC
* @date 2021/3/3 0:04
*/
@RestController
public class ClientController {
@RpcServer
private Test1Api testServiceImpl;
@GetMapping("/test1")
public void test() {
testServiceImpl.test();
}
@GetMapping("/test2")
public void test(int id, String name) {
testServiceImpl.test(id, name);
}
@GetMapping("/test3")
public String testStr(int id) {
return testServiceImpl.testStr(id);
}
@GetMapping("/test4")
public Object testObj() {
return testServiceImpl.testObj();
}
}
最后在啟動類上加上注解@EnableNettyClient
@EnableNettyClient
@SpringBootApplication
public class RpcClientApplication {
public static void main(String[] args) {
SpringApplication.run(RpcClientApplication.class, args);
}
}
先運行服務端撩银,在運行客戶端给涕,然后在調用客戶端接口就可以看到服務端能夠接收到客戶端發(fā)來的消息,然后服務端處理并返回额获,客戶端接收并返回够庙。。抄邀。
至此耘眨,一個小demo就完成了。
當然啦剔难,后續(xù)還有很多需求需要處理的偶宫,比方說當前demo中客戶端每次通信都需要創(chuàng)建一個實例去連接、服務的注冊吵冒、客戶端和服務端是同一個應用等等桦锄,這個后面再慢慢完善吧
趙小胖個人博客:https://zc.happyloves.cn:4443/wordpress/