java 從零開始手寫 RPC (03) 如何實現(xiàn)客戶端調(diào)用服務(wù)端屈梁?

說明

java 從零開始手寫 RPC (01) 基于 socket 實現(xiàn)

java 從零開始手寫 RPC (02)-netty4 實現(xiàn)客戶端和服務(wù)端

寫完了客戶端和服務(wù)端启盛,那么如何實現(xiàn)客戶端和服務(wù)端的調(diào)用呢姊氓?

下面就讓我們一起來看一下。

在這里插入圖片描述

接口定義

計算方法

package com.github.houbb.rpc.common.service;

import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;

/**
 * <p> 計算服務(wù)接口 </p>
 *
 * <pre> Created: 2018/8/24 下午4:47  </pre>
 * <pre> Project: fake  </pre>
 *
 * @author houbinbin
 * @since 0.0.1
 */
public interface Calculator {

    /**
     * 計算加法
     * @param request 請求入?yún)?     * @return 返回結(jié)果
     */
    CalculateResponse sum(final CalculateRequest request);

}

pojo

對應(yīng)的參數(shù)對象:

  • CalculateRequest
package com.github.houbb.rpc.common.model;

import java.io.Serializable;

/**
 * <p> 請求入?yún)?</p>
 *
 * <pre> Created: 2018/8/24 下午5:05  </pre>
 * <pre> Project: fake  </pre>
 *
 * @author houbinbin
 * @since 0.0.3
 */
public class CalculateRequest implements Serializable {

    private static final long serialVersionUID = 6420751004355300996L;

    /**
     * 參數(shù)一
     */
    private int one;

    /**
     * 參數(shù)二
     */
    private int two;

    public CalculateRequest() {
    }

    public CalculateRequest(int one, int two) {
        this.one = one;
        this.two = two;
    }

    //getter setter toString

}
  • CalculateResponse
package com.github.houbb.rpc.common.model;

import java.io.Serializable;

/**
 * <p> 請求入?yún)?</p>
 *
 * <pre> Created: 2018/8/24 下午5:05  </pre>
 * <pre> Project: fake  </pre>
 *
 * @author houbinbin
 * @since 0.0.3
 */
public class CalculateResponse implements Serializable {

    private static final long serialVersionUID = -1972014736222511341L;

    /**
     * 是否成功
     */
   private boolean success;

    /**
     * 二者的和
     */
   private int sum;

    public CalculateResponse() {
    }

    public CalculateResponse(boolean success, int sum) {
        this.success = success;
        this.sum = sum;
    }

    //getter setter toString
}

客戶端

核心部分

RpcClient 需要添加對應(yīng)的 Handler蟀伸,調(diào)整如下:

Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new ChannelInitializer<Channel>(){
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        .addLast(new CalculateRequestEncoder())
                        .addLast(new CalculateResponseDecoder())
                        .addLast(new RpcClientHandler());
            }
        })
        .connect(RpcConstant.ADDRESS, port)
        .syncUninterruptibly();

netty 中的 handler 泳道設(shè)計的非常優(yōu)雅蚀同,讓我們的代碼可以非常靈活地進行拓展。

接下來我們看一下對應(yīng)的實現(xiàn)啊掏。

RpcClientHandler

package com.github.houbb.rpc.client.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * <p> 客戶端處理類 </p>
 *
 * <pre> Created: 2019/10/16 11:30 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.2
 */
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CalculateRequest request = new CalculateRequest(1, 2);

        ctx.writeAndFlush(request);
        log.info("[Client] request is :{}", request);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        CalculateResponse response = (CalculateResponse)msg;
        log.info("[Client] response is :{}", response);
    }

}

這里比較簡單蠢络,channelActive 中我們直接發(fā)起調(diào)用,入?yún)⒌膶ο鬄榱撕唵纬倜郏颂幑潭▽懰馈?/p>

channelRead0 中監(jiān)聽服務(wù)端的相應(yīng)結(jié)果刹孔,并做日志輸出。

CalculateRequestEncoder

請求參數(shù)是一個對象娜睛,netty 是無法直接傳輸?shù)乃柘迹覀儗⑵滢D(zhuǎn)換為基本對象:

package com.github.houbb.rpc.client.encoder;

import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @author binbin.hou
 * @since 0.0.3
 */
public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
        int one = msg.getOne();
        int two = msg.getTwo();

        out.writeInt(one);
        out.writeInt(two);
    }

}

CalculateResponseDecoder

針對服務(wù)端的響應(yīng),也是同理微姊。

我們需要把基本的類型酸茴,封裝轉(zhuǎn)換為我們需要的對象。

package com.github.houbb.rpc.client.decoder;

import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 響應(yīng)參數(shù)解碼
 * @author binbin.hou
 * @since 0.0.3
 */
public class CalculateResponseDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        boolean success = in.readBoolean();
        int sum = in.readInt();

        CalculateResponse response = new CalculateResponse(success, sum);
        out.add(response);
    }

}

服務(wù)端

設(shè)置處理類

RpcServer 中的處理類要稍微調(diào)整一下兢交,其他的保持不變薪捍。

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
        .channel(NioServerSocketChannel.class)
        // 打印日志
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new CalculateRequestDecoder())
                        .addLast(new CalculateResponseEncoder())
                        .addLast(new RpcServerHandler());
            }
        })
        // 這個參數(shù)影響的是還沒有被accept 取出的連接
        .option(ChannelOption.SO_BACKLOG, 128)
        // 這個參數(shù)只是過一段時間內(nèi)客戶端沒有響應(yīng),服務(wù)端會發(fā)送一個 ack 包,以判斷客戶端是否還活著酪穿。
        .childOption(ChannelOption.SO_KEEPALIVE, true);

RpcServerHandler

一開始這里是空實現(xiàn)凳干,我們來添加一下對應(yīng)的實現(xiàn)。

package com.github.houbb.rpc.server.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();

        CalculateRequest request = (CalculateRequest)msg;
        log.info("[Server] receive channel {} request: {} from ", id, request);

        Calculator calculator = new CalculatorService();
        CalculateResponse response = calculator.sum(request);

        // 回寫到 client 端
        ctx.writeAndFlush(response);
        log.info("[Server] channel {} response {}", id, response);
    }

}

讀取到客戶端的訪問之后被济,我們獲取到計算的入?yún)?CalculateRequest救赐,然后調(diào)用 sum 方法,獲取到對應(yīng)的 CalculateResponse只磷,將結(jié)果通知客戶端经磅。

CalculateRequestDecoder

這里和客戶端是一一對應(yīng)的,我們首先把 netty 傳遞的基本類型轉(zhuǎn)換為 CalculateRequest 對象钮追。

package com.github.houbb.rpc.server.decoder;

import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 請求參數(shù)解碼
 * @author binbin.hou
 * @since 0.0.3
 */
public class CalculateRequestDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int one = in.readInt();
        int two = in.readInt();

        CalculateRequest request = new CalculateRequest(one, two);
        out.add(request);
    }

}

CalculateResponseEncoder

這里和客戶端類似预厌,我們需要把 response 轉(zhuǎn)換為基本類型進行網(wǎng)絡(luò)傳輸。

package com.github.houbb.rpc.server.encoder;

import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @author binbin.hou
 * @since 0.0.3
 */
public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
        boolean success = msg.isSuccess();
        int result = msg.getSum();
        out.writeBoolean(success);
        out.writeInt(result);
    }

}

CalculatorService

服務(wù)端對應(yīng)的實現(xiàn)類元媚。

public class CalculatorService implements Calculator {

    @Override
    public CalculateResponse sum(CalculateRequest request) {
        int sum = request.getOne()+request.getTwo();

        return new CalculateResponse(true, sum);
    }

}

測試

服務(wù)端

啟動服務(wù)端:

new RpcServer().start();

服務(wù)端啟動日志:

[DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)開始啟動服務(wù)端
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xd399474f] REGISTERED
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xd399474f] BIND: 0.0.0.0/0.0.0.0:9527
十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務(wù)端啟動完成轧叽,監(jiān)聽【9527】端口

客戶端

啟動客戶端:

new RpcClient().start();

日志如下:

[DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)開始啟動客戶端
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x4d75c580] REGISTERED
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x4d75c580] CONNECT: /127.0.0.1:9527
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] ACTIVE
[INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務(wù)啟動客戶端完成,監(jiān)聽端口:9527
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] WRITE: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 00 00 00 02                         |........        |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] FLUSH
[INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelActive] - [Client] request is :CalculateRequest{one=1, two=2}
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ: 5B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 03                                  |.....           |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ COMPLETE
[INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}

可以看到刊棕,輸出了對應(yīng)的請求參數(shù)和響應(yīng)結(jié)果炭晒。

當(dāng)然,此時服務(wù)端也有對應(yīng)的新增日志:

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L:/127.0.0.1:9527 - R:/127.0.0.1:54030]
十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE
[INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927
[INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from 
[INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}

小結(jié)

為了便于大家學(xué)習(xí)甥角,以上源碼已經(jīng)開源:

https://github.com/houbb/rpc

希望本文對你有所幫助网严,如果喜歡,歡迎點贊收藏轉(zhuǎn)發(fā)一波蜈膨。

我是老馬屿笼,期待與你的下次相遇。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翁巍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子休雌,更是在濱河造成了極大的恐慌灶壶,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杈曲,死亡現(xiàn)場離奇詭異驰凛,居然都是意外死亡,警方通過查閱死者的電腦和手機担扑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門恰响,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人涌献,你說我怎么就攤上這事胚宦。” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵枢劝,是天一觀的道長井联。 經(jīng)常有香客問我,道長您旁,這世上最難降的妖魔是什么烙常? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮鹤盒,結(jié)果婚禮上蚕脏,老公的妹妹穿的比我還像新娘。我一直安慰自己侦锯,他們只是感情好驼鞭,可當(dāng)我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著率触,像睡著了一般终议。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上葱蝗,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天穴张,我揣著相機與錄音,去河邊找鬼两曼。 笑死皂甘,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的悼凑。 我是一名探鬼主播偿枕,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼户辫!你這毒婦竟也來了渐夸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤渔欢,失蹤者是張志新(化名)和其女友劉穎墓塌,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體奥额,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡苫幢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了垫挨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片韩肝。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖九榔,靈堂內(nèi)的尸體忽然破棺而出哀峻,到底是詐尸還是另有隱情涡相,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布谜诫,位于F島的核電站漾峡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏喻旷。R本人自食惡果不足惜生逸,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望且预。 院中可真熱鬧槽袄,春花似錦、人聲如沸锋谐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涮拗。三九已至乾戏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間三热,已是汗流浹背鼓择。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留就漾,地道東北人呐能。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像抑堡,于是被迫代替她去往敵國和親摆出。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容