動(dòng)手寫RPC起步

前一篇文章簡(jiǎn)單介紹了下RPC的基本原理,同時(shí)附上了一個(gè)小的demo蒲稳,但是這個(gè)小的demo并不能在生產(chǎn)上使用,因?yàn)樯a(chǎn)上的RPC還需要考慮很多因素伍派,比如接入注冊(cè)中心江耀、高性能的網(wǎng)絡(luò)通信、高性能的序列化和反序列化诉植、自動(dòng)路由祥国、容錯(cuò)處理等等。要實(shí)現(xiàn)生產(chǎn)上使用的先不談晾腔,我們先來實(shí)現(xiàn)一個(gè)稍微復(fù)雜的RPC舌稀,借助這個(gè)RPC例子來更深刻的理解RPC原理,為后續(xù)Dubbo源碼的分析做準(zhǔn)備建车。

一扩借、簡(jiǎn)單RPC架構(gòu)設(shè)計(jì)

回顧下RPC原理圖:



如果要自己設(shè)計(jì)實(shí)現(xiàn)稍微簡(jiǎn)單的一個(gè)rpc框架,應(yīng)該需要考慮注冊(cè)中心缤至、網(wǎng)絡(luò)通信潮罪、序列化等內(nèi)容,因?yàn)榭梢栽O(shè)計(jì)出如下鍵略的RPC架構(gòu)圖:


二领斥、項(xiàng)目起步說明

1嫉到、本RCP項(xiàng)目涉及的功能點(diǎn)

動(dòng)態(tài)代理、反射月洛、序列化何恶、反序列化、網(wǎng)絡(luò)通信嚼黔、編解碼细层、服務(wù)發(fā)現(xiàn)和注冊(cè)惜辑、心跳與鏈路檢測(cè)等

2、本小節(jié)內(nèi)容說明

設(shè)計(jì)的技術(shù)點(diǎn):動(dòng)態(tài)代理技術(shù)疫赎、反射(有關(guān)動(dòng)態(tài)代理可以看我的另一篇博文:代理模式
實(shí)現(xiàn)的功能:簡(jiǎn)易的基于socket的rpc

3盛撑、本小節(jié)項(xiàng)目總體框架圖

三、編碼實(shí)現(xiàn)

1捧搞、api模塊

創(chuàng)建用于測(cè)試的接口

public interface HelloService {
    String sayHello(String name);
}
2抵卫、common模塊

request請(qǐng)求實(shí)體類

public class Request implements Serializable {
    private static final long serialVersionUID = 7929047349488932740L;
    /**
     * 請(qǐng)求表示id
     */
    private String requestId;
    /**
     * 請(qǐng)求服務(wù)類型
     */
    private String className;
    /**
     * 請(qǐng)求方法名稱
     */
    private String methodName;
    /**
     * 請(qǐng)求方法參數(shù)類型數(shù)組
     */
    private Class<?>[] parameterTypes;
    /**
     * 請(qǐng)求參數(shù)列表
     */
    private Object[] args;
  ......省略getter/setter
}

response響應(yīng)實(shí)體類:

public class Response {
    private static final long serialVersionUID = -1023480952777229650L;

    private String requestId;
    /**
     * 響應(yīng)狀態(tài)嗎
     */
    private int code;
    /**
     * 響應(yīng)消息說明
     */
    private String msg;
    /**
     * 相應(yīng)數(shù)據(jù)
     */
    private Object data;
......省略getter/setter
3、provider模塊

服務(wù)的暴露(包好服務(wù)的注冊(cè)和服務(wù)的發(fā)布)胎撇,服務(wù)端基本流程是

服務(wù)注冊(cè)->服務(wù)發(fā)布->服務(wù)啟動(dòng)監(jiān)聽請(qǐng)求(socket)->處理請(qǐng)求

//rpc代理服務(wù)介粘,用于暴露服務(wù)
public class RpcProxyServer {
    /**
     * 創(chuàng)建一個(gè)線程池
     */
    ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * 端口號(hào)
     */
    private int port;

    /**
     * 1、服務(wù)注冊(cè)
     * @param serviceInterface
     * @param impClass
     * @return
     */
    public RpcProxyServer register(Class serviceInterface, Class impClass) {
        //注冊(cè)服務(wù)(接口名:實(shí)現(xiàn)類名)
        ProcessorHandler.register(serviceInterface, impClass);
        return this;
    }

    public RpcProxyServer(int port) {
        this.port = port;
    }

    /**
     *2晚树、 啟動(dòng)發(fā)布(啟動(dòng))
     */
    public void start() {
        System.out.println("服務(wù)啟動(dòng)====");
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);

            while (true) {//3姻采、通過循環(huán)不斷接受請(qǐng)求
                Socket socket = serverSocket.accept();//監(jiān)聽客戶端的請(qǐng)求
                //4、每一個(gè)socket交給一個(gè)processorhandler處理题涨,這里的target就是真正的業(yè)務(wù)類
                executorService.execute(new ProcessorHandler(socket));//處理客戶端的請(qǐng)求
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

具體處理請(qǐng)求的handler

//服務(wù)端接受請(qǐng)求處理線程
public class ProcessorHandler implements Runnable {

    private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
    /**
     * socket
     */
    private Socket socket;

    public static void register(Class serviceInterface, Class impClass) {
        //注冊(cè)服務(wù)(接口名:實(shí)現(xiàn)類名)
        serviceRegistry.put(serviceInterface.getName(), impClass);
    }
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        //用于定義輸入流和輸出流
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;

        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            //從socket中讀取請(qǐng)求流對(duì)象
            Request rpcRequest = (Request) objectInputStream.readObject();
            //調(diào)用正真的處理方法
            Object result = invoke(rpcRequest);
            Response response = new Response();
            response.setRequestId(rpcRequest.getRequestId());
            response.setData(result);
            response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
            response.setCode(ResponseCodeEnum.SUCCESS.getCode());
            //將結(jié)果通過socket輸出
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeOpenSource(objectInputStream, objectOutputStream);
        }
    }

    private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
        if (objectInputStream != null) {
            try {
                objectInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (objectOutputStream != null) {
            try {
                objectOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 利用反射技術(shù)執(zhí)行正真的方法(這里只是簡(jiǎn)單的實(shí)現(xiàn)偎谁,沒有容錯(cuò)處理)
     *
     * @param rpcRequest
     * @return
     */
    private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //獲取目標(biāo)對(duì)象并執(zhí)行目標(biāo)方法(也就是獲取注冊(cè)后的接口實(shí)現(xiàn)類對(duì)象)
        Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
        Object[] args = rpcRequest.getArgs();
        return method.invoke(targetClass.newInstance(), args);
    }
}

用到的枚舉

public enum ResponseCodeEnum {
    SUCCESS(0, "success"),
    FAIL(1, "fail");
....省略

rpc接口實(shí)現(xiàn)類:

public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
    }
}

啟動(dòng)服務(wù)main方法:

public class Demo1Main {
    public static void main(String[] args) {
        //創(chuàng)建代理服務(wù)
        RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
        //注冊(cè)服務(wù)
        rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
        //啟動(dòng)服務(wù)
        rpcProxyServer.start();
    }
}
4、consumer模塊

消費(fèi)端模塊主要是通過jdk動(dòng)態(tài)代理的方式實(shí)現(xiàn)rpc接口代理請(qǐng)求遠(yuǎn)程纲堵,基本流程

client->創(chuàng)建代理對(duì)象->通過代理對(duì)象請(qǐng)求遠(yuǎn)程服務(wù)->接受返回的信息

public class ClientProxy<T> {
    /**
     * 服務(wù)端代理接口
     */
    private Class<T> serverInstance;

    /**
     * 服務(wù)端地址
     */
    private InetSocketAddress address;

    public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
        this.address = new InetSocketAddress(ip, port);
        this.serverInstance = serverInstance;
    }

    /**
     * 獲取客戶端代理對(duì)象
     *
     * @return
     */
    public T getClientInstance() {
        return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
    }
}

具體遠(yuǎn)程調(diào)用invoke方法(jdk動(dòng)態(tài)代理InvocationHandler)

public class RemoteInvocationHandler implements InvocationHandler {

    /**
     * 服務(wù)端地址
     */
    private InetSocketAddress address;

    public RemoteInvocationHandler(InetSocketAddress address) {
        this.address=address;
    }


    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request rpcRequest = new Request();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);

        //通過網(wǎng)絡(luò)發(fā)送正式請(qǐng)求
        RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
        Object result = (Object) netTransport.send(rpcRequest);
        return result;//返回收到的結(jié)果
    }
}

具體的rpc網(wǎng)絡(luò)請(qǐng)求(socket)

//網(wǎng)絡(luò)傳送
public class RpcNetTransport {

    private int port;
    private String host;

    public RpcNetTransport(int port, String host) {
        this.port = port;
        this.host = host;
    }

    /**
     * 發(fā)送請(qǐng)求
     *
     * @param request
     */
    public Object send(Request request) throws IOException, ClassNotFoundException {
        Socket socket = null;
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            // 1.創(chuàng)建Socket客戶端巡雨,根據(jù)指定地址連接遠(yuǎn)程服務(wù)提供者
            socket = new Socket(host, port);
            //2、將遠(yuǎn)程服務(wù)調(diào)用所需的接口類席函、方法名铐望、參數(shù)列表等編碼后發(fā)送給服務(wù)提供者
            outputStream=new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(request);
            //3、同步阻塞等待服務(wù)器返回應(yīng)答茂附,獲取應(yīng)答后返回
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        } finally {
            if (socket != null) {
                socket.close();
            }
            if (outputStream != null) {
                outputStream.close();
            }
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

}

消費(fèi)端消費(fèi)服務(wù)main方法:

public class Demo1Main {
    public static void main(String[] args) {
        ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
        HelloService helloService = (HelloService) clientProxy.getClientInstance();
        String result = helloService.sayHello("嘿嘿嘿");
        System.out.println(result);
    }
}

三正蛙、總結(jié)與思考

總結(jié):本節(jié)實(shí)現(xiàn)了一個(gè)非常簡(jiǎn)單的rpc原型項(xiàng)目,包含了服務(wù)注冊(cè)营曼、采用BIO的網(wǎng)絡(luò)通信模型傳送數(shù)據(jù)乒验、采用jdk原生代理模式進(jìn)行服務(wù)代理、采用jdk原生的序列化方式進(jìn)行序列化和反序列化等蒂阱。后續(xù)將會(huì)針對(duì)該原型項(xiàng)目不斷的改進(jìn)锻全,不斷的引入新的“武器”,來豐富整個(gè)rpc項(xiàng)目录煤。
后期預(yù)熱:引入注冊(cè)中心(解決服務(wù)治理問題)鳄厌、引入多種高效的序列化機(jī)制、引入NIO的網(wǎng)絡(luò)通信模型妈踊、引入軟負(fù)載均衡機(jī)制了嚎、引入spi擴(kuò)展機(jī)制、接入spring等等,敬請(qǐng)期待歪泳。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末萝勤,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子夹囚,更是在濱河造成了極大的恐慌纵刘,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件荸哟,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡瞬捕,警方通過查閱死者的電腦和手機(jī)鞍历,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肪虎,“玉大人劣砍,你說我怎么就攤上這事∩染龋” “怎么了刑枝?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長迅腔。 經(jīng)常有香客問我装畅,道長,這世上最難降的妖魔是什么沧烈? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任掠兄,我火速辦了婚禮,結(jié)果婚禮上锌雀,老公的妹妹穿的比我還像新娘蚂夕。我一直安慰自己,他們只是感情好腋逆,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布婿牍。 她就那樣靜靜地躺著,像睡著了一般惩歉。 火紅的嫁衣襯著肌膚如雪等脂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天柬泽,我揣著相機(jī)與錄音慎菲,去河邊找鬼。 笑死锨并,一個(gè)胖子當(dāng)著我的面吹牛露该,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播第煮,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼解幼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼抑党!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起撵摆,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤底靠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后特铝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體暑中,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年鲫剿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鳄逾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灵莲,死狀恐怖雕凹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情政冻,我是刑警寧澤枚抵,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站明场,受9級(jí)特大地震影響汽摹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜榕堰,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一竖慧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧逆屡,春花似錦圾旨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至莺治,卻和暖如春廓鞠,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谣旁。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國打工床佳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人榄审。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓砌们,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子浪感,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 本文將從大的框架層面來聊聊RPC原理和實(shí)現(xiàn)昔头,既然叫跨語言RPC,也將以thrift為例講講跨語言RPC如何實(shí)現(xiàn)影兽。在...
    彥幀閱讀 14,378評(píng)論 0 19
  • 前言 在微服務(wù)當(dāng)?shù)赖慕裉旖腋植际较到y(tǒng)越來越重要,實(shí)現(xiàn)服務(wù)化首先就要考慮服務(wù)之間的通信問題峻堰。這里面涉及序列化讹开、反序列...
    habit_learning閱讀 2,357評(píng)論 1 26
  • 網(wǎng)絡(luò)通信模塊是分布式系統(tǒng)中最底層的模塊,他直接支撐了上層分布式環(huán)境下復(fù)雜的進(jìn)程間通信邏輯,是所有分布式系統(tǒng)的基礎(chǔ)。...
    SmallBird_閱讀 2,192評(píng)論 0 1
  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,869評(píng)論 6 13
  • 我想有時(shí)候我一定是瘋了捐名,是被s逼瘋了
    砸扁回憶閱讀 133評(píng)論 0 0