入門:一個(gè)基于Netty的RPC實(shí)現(xiàn)

目標(biāo):客戶端遠(yuǎn)程調(diào)用服務(wù)端的一項(xiàng)服務(wù)偎行,具體來說川背,客戶端給服務(wù)端指定具體的類,方法睦优,和參數(shù)信息渗常,服務(wù)端用這些信息完成服務(wù),并將調(diào)用結(jié)果或異常返回給客戶端汗盘。

這個(gè)例子中皱碘,客戶端想要調(diào)用的服務(wù)是 HelloService

public interface HelloService {
    String sayHello(String name);
}

具體實(shí)現(xiàn)為 HelloServiceImpl

/**
 * 給參數(shù)中的名字返回打招呼
 */
public class HelloServiceImpl implements HelloService{
    @Override
    public String sayHello(String name) {
        return "您好," + name;
    }
}

首先隐孽,客戶端與服務(wù)端的這些通訊信息需要載體癌椿,我們把他們封裝成兩個(gè)類,客戶端給服務(wù)端發(fā)送的遠(yuǎn)程調(diào)用Request菱阵,和服務(wù)端返回的Response踢俄。從設(shè)計(jì)考慮,讓他們都繼承與Message這個(gè)類

Message

@Data
public abstract class Message implements Serializable {

    private int sequenceId;
    private int messageType;

    public abstract int getMessageType();

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();

    public static Class<?> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }

    static {
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}

客戶端的request類晴及,包含了客戶端想要調(diào)用的服務(wù)的一切信息(全類名都办、方法名、參數(shù)...)

RpcRequestMessage

@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

    // 接口的全限定名
    private String interfaceName;

    // 調(diào)用的方法名
    private String methodName;

    // 方法的返回值類
    private Class<?> returnType;

    // 方法的參數(shù)類型數(shù)組
    private Class[] parameterTypes;

    // 方法的參數(shù)值數(shù)組
    private Object[] parameterValue;

    // 一個(gè)可以設(shè)置sequenceID的構(gòu)造器
    public RpcRequestMessage(int sequenceID, String interfaceName, String methodName,
                             Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {

        super.setSequenceId(sequenceID);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
        return Message.RPC_MESSAGE_TYPE_REQUEST;
    }
}

服務(wù)端Request類虑稼,攜帶服務(wù)的返回值或異常值 RpcResponseMessage

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {

    // 返回值
    private Object returnValue;

    // 異常值
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return Message.RPC_MESSAGE_TYPE_RESPONSE;
    }
}

好了琳钉,我們現(xiàn)在有了信息的載體,如何在客戶端和服務(wù)端之間進(jìn)行網(wǎng)絡(luò)通信呢蛛倦,這里使用了Netty框架

首先編寫服務(wù)側(cè):RpcServer

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .group(boss, worker)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtocolFrameDecoder())   // 處理粘包半包
                                    .addLast(LOGGING_HANDLER)   // 日志
                                    .addLast(MESSAGE_CODEC) // 自定義協(xié)議 消息編解碼
                                    .addLast(RPC_HANDLER);
                        }
                    });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error",e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

由于本篇旨在實(shí)現(xiàn)rpc歌懒,netty的基本內(nèi)容就不再贅述,代碼中的一些自定義的handler(處理粘包半包問題的溯壶、編解碼協(xié)議的)就不再一一實(shí)現(xiàn)了及皂。服務(wù)側(cè)我們只關(guān)注一個(gè)關(guān)鍵的handler -- RpcRequestMessageHandler 這個(gè)handler專門負(fù)責(zé)處理從客戶端來的 RpcRequestMessage

處理思路為

  • 先在封裝的request信息中拿到所需服務(wù)的全類名且改,然后根據(jù)全類名拿到服務(wù)的實(shí)現(xiàn)類
  • 拿到所需服務(wù)的方法验烧、參數(shù)類型、參數(shù)值又跛、返回類型
  • 使用反射調(diào)用方法碍拆,拿到返回值或異常,封裝成response類,返回客戶端

由于沒有整合Spring倔监,需要手寫一個(gè)從接口類拿到實(shí)現(xiàn)類的工具直砂,具體為:

首先,在配置文件(application.properties)中浩习,將服務(wù)的接口類和實(shí)現(xiàn)類綁定

# rpc bean
com.yldog.rpc.HelloService=com.yldog.rpc.HelloServiceImpl

然后一個(gè)工廠

public class ServicesFactory {
    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {
        try {
            InputStream in = MyConfig.class.getResourceAsStream("/application.properties");
            properties = new Properties();
            properties.load(in);
            Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                if (name.endsWith("Service")) {
                    // 拿到接口類Class對(duì)象
                    Class<?> interfaceClass = Class.forName(name);
                    // 拿到接口的實(shí)現(xiàn)類Class對(duì)象
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.getDeclaredConstructor().newInstance());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 根據(jù)接口類獲得實(shí)現(xiàn)類
    public static <T> T getService(Class<T> interfaceClass) {
        return (T) map.get(interfaceClass);
    }
}

最后就能編寫服務(wù)端的handler了

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
        // 構(gòu)造一個(gè)響應(yīng)實(shí)例
        RpcResponseMessage resp = new RpcResponseMessage();
        // 設(shè)置響應(yīng)消息的序號(hào) -- 應(yīng)與請(qǐng)求消息序號(hào)一致
        resp.setSequenceId(msg.getSequenceId());
        try {
            // 通過request message拿到接口類静暂,在通過接口類拿到實(shí)現(xiàn)類
            HelloService service = (HelloService) ServicesFactory.getService(Class.forName(msg.getInterfaceName()));
            // 拿到方法
            Method method = service.getClass().getDeclaredMethod(msg.getMethodName(), msg.getParameterTypes());
            // 在剛才拿到的實(shí)現(xiàn)類上調(diào)用方法
            Object result = method.invoke(service, msg.getParameterValue());
            resp.setReturnValue(result);
        } catch (Exception e) {
            String message = e.getCause().getMessage();
            resp.setExceptionValue(new Exception("遠(yuǎn)程調(diào)用異常:" + message));
        } finally {
            ctx.writeAndFlush(resp);
        }

    }
}

接著編寫客戶端

首先要實(shí)現(xiàn)客戶端給服務(wù)端發(fā)遠(yuǎn)程調(diào)用請(qǐng)求。

第一步是先要拿到與服務(wù)端聯(lián)絡(luò)的 SocketChannel谱秽,再使用channel來進(jìn)行遠(yuǎn)程調(diào)用

@Slf4j
public class RpcClientManager {

    private static volatile Channel channel = null;
    private static final Object LOCK = new Object();
  
    // 使用DCL保證channel單例
    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (LOCK) {
            if (channel != null) {
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // 初始化channel洽蛀,只能初始化一次
    private static void initChannel() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_HANDLER = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class)
                .group(worker)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtocolFrameDecoder())
                                .addLast(LOGGING_HANDLER)
                                .addLast(MESSAGE_HANDLER)
                                .addLast(RPC_HANDLER);
                    }
                });

        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 初始化channel后不能在這里阻塞住,所以采用異步
            // channel.closeFuture().sync();
            channel.closeFuture().addListener(future -> { worker.shutdownGracefully(); });
        } catch (InterruptedException e) {
            log.debug("Client Error", e);
        }
    }
}

注意:

  • getChannel() 就可以拿到與服務(wù)端通信的Channel疟赊,由于使用了DLC單例模式郊供,保證了channel的唯一性
  • 在處理連接關(guān)閉時(shí),不能使用 channel.closeFuture().sync();否則代碼阻塞在了 initChannel()中近哟,要使用異步的處理方式

這樣驮审,客戶端就能向服務(wù)端發(fā)送request消息了,具體為:

public static void main(String[] args) {
        getChannel().writeAndFlush(new RpcRequestMessage(
                1,
                "com.yldog.rpc.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"張三"}
        ));
}

但是這種讓用戶一個(gè)個(gè)填寫參數(shù)的方式吉执,非常的不友好疯淫,理想的方式應(yīng)該是用戶像在本地調(diào)用方法一般,如

service.sayHello("張三")戳玫,這里的思路是使用代理模式將構(gòu)建請(qǐng)求消息這一步封裝起來熙掺,具體為編寫一個(gè)構(gòu)造代理類的方法:

    // 創(chuàng)建代理類
    public static <T> T getProxyService(Class<T> serviceClazz) {
        ClassLoader classLoader = serviceClazz.getClassLoader();
        Class<?>[] interfaces = {serviceClazz};
        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1. 將方法調(diào)用 轉(zhuǎn)為 消息對(duì)象
            int sequenceID = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceID,
                    serviceClazz.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 發(fā)送消息
            getChannel().writeAndFlush(msg);
          
            // 暫時(shí)不考慮如何接收返回值

        });
        return (T) o;
    }

這樣,用戶的使用就變成了:

        public static void main(String[] args) {
//        getChannel().writeAndFlush(new RpcRequestMessage(
//                1,
//                "com.yldog.rpc.HelloService",
//                "sayHello",
//                String.class,
//                new Class[]{String.class},
//                new Object[]{"張三"}
//        ));
        // 以上這種遠(yuǎn)程調(diào)用的方法非常不友好咕宿,考慮使用代理封裝
        HelloService proxyService = getProxyService(HelloService.class);
        proxyService.sayHello("張三");
        proxyService.sayHello("李四");
        proxyService.sayHello("王五");
    }

最后币绩,我們要考慮的就是客戶端如何接收服務(wù)端的返回消息,由于在netty中府阀,調(diào)用遠(yuǎn)程服務(wù)的線程與收到返回結(jié)果的線程并不是一個(gè)線程缆镣,接收返回消息的線程一般在 nioEventLoop 中,所以想要在調(diào)用服務(wù)的線程中獲取服務(wù)的結(jié)果肌似,就涉及到了線程之間異步交換信息费就,具體實(shí)現(xiàn)思路為

  • 發(fā)起調(diào)用的線程在發(fā)送了Request請(qǐng)求信息后诉瓦,準(zhǔn)備一個(gè)空的Promise川队,然后等待nio線程將服務(wù)端返回的結(jié)果放入Promise中
  • nio線程得到返回的結(jié)果后,將結(jié)果放入Promise中睬澡,并通知調(diào)用的線程固额,實(shí)現(xiàn)異步

首先編寫處理服務(wù)器響應(yīng)信息的handler

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    /**
     * 一個(gè)用來傳遞線程之間異步結(jié)果的容器集合
     * 某個(gè)線程進(jìn)行遠(yuǎn)程調(diào)用后,開啟一個(gè)Promise容器煞聪,Nio線程收到遠(yuǎn)程結(jié)果后斗躏,將結(jié)果放入那個(gè)線程的Promise中
     */
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);

        // 拿到屬于這次消息接收的空的promise
        Promise<Object> promise = PROMISES.remove(msg.getSequenceId());

        // 往promise放結(jié)果
        if (promise != null) {
            if ((msg.getExceptionValue() == null)) { // 若異常值為空,則正常調(diào)用
                promise.setSuccess(msg.getReturnValue());
            } else { // 異常值不為空昔脯,則調(diào)用異常
                promise.setFailure(msg.getExceptionValue());
            }
        }

    }
}

然后完整的客戶端代碼為

@Slf4j
public class RpcClientManager {

    private static volatile Channel channel = null;
    private static final Object LOCK = new Object();

    public static void main(String[] args) {
//        getChannel().writeAndFlush(new RpcRequestMessage(
//                1,
//                "com.yldog.rpc.HelloService",
//                "sayHello",
//                String.class,
//                new Class[]{String.class},
//                new Object[]{"張三"}
//        ));
        // 以上這種遠(yuǎn)程調(diào)用的方法非常不友好啄糙,考慮使用代理封裝
        HelloService proxyService = getProxyService(HelloService.class);
        proxyService.sayHello("張三");
        proxyService.sayHello("李四");
        proxyService.sayHello("王五");
    }

    // 創(chuàng)建代理類
    public static <T> T getProxyService(Class<T> serviceClazz) {
        ClassLoader classLoader = serviceClazz.getClassLoader();
        Class<?>[] interfaces = {serviceClazz};
        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1. 將方法調(diào)用轉(zhuǎn)為 消息對(duì)象
            int sequenceID = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceID,
                    serviceClazz.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 發(fā)送消息
            getChannel().writeAndFlush(msg);

            // 3. 準(zhǔn)備一個(gè)這次接收消息專用的Promise對(duì)象                指定promise異步接收結(jié)果的線程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            // 放入promise集合中笛臣,Nio線程取用時(shí),用sequenceId作為key
            RpcResponseMessageHandler.PROMISES.put(sequenceID, promise);

            // 4. 等待 Nio線程將返回的結(jié)果放入 promise 中
            promise.await();

            // 5. 拿到結(jié)果后隧饼,判斷調(diào)用是否正常
            if (promise.isSuccess()) {
                // 調(diào)用成功
                return promise.getNow();
            } else {
                // 調(diào)用異常
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) o;
    }

    // 使用DCL保證channel單例
    public static Channel getChannel() {...}

    // 初始化channel沈堡,只能初始化一次
    private static void initChannel() {...}
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市燕雁,隨后出現(xiàn)的幾起案子诞丽,更是在濱河造成了極大的恐慌,老刑警劉巖拐格,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件僧免,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡捏浊,警方通過查閱死者的電腦和手機(jī)懂衩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來金踪,“玉大人勃痴,你說我怎么就攤上這事∪瓤担” “怎么了沛申?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)姐军。 經(jīng)常有香客問我铁材,道長(zhǎng),這世上最難降的妖魔是什么奕锌? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任著觉,我火速辦了婚禮,結(jié)果婚禮上惊暴,老公的妹妹穿的比我還像新娘饼丘。我一直安慰自己,他們只是感情好辽话,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布肄鸽。 她就那樣靜靜地躺著,像睡著了一般油啤。 火紅的嫁衣襯著肌膚如雪典徘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天益咬,我揣著相機(jī)與錄音逮诲,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛梅鹦,可吹牛的內(nèi)容都是我干的裆甩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼齐唆,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼淑掌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蝶念,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤抛腕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后媒殉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體担敌,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年廷蓉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了全封。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡桃犬,死狀恐怖刹悴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情攒暇,我是刑警寧澤土匀,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站形用,受9級(jí)特大地震影響就轧,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜田度,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一妒御、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧镇饺,春花似錦乎莉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至揭保,卻和暖如春肥橙,著一層夾襖步出監(jiān)牢的瞬間魄宏,已是汗流浹背秸侣。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人味榛。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓椭坚,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親搏色。 傳聞我的和親對(duì)象是個(gè)殘疾皇子善茎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容