Java實(shí)現(xiàn)簡(jiǎn)單的RPC框架

1 RPC簡(jiǎn)介

RPC,全稱為Remote Procedure Call春宣,即遠(yuǎn)程過程調(diào)用酵颁,它是一個(gè)計(jì)算機(jī)通信協(xié)議。它允許像調(diào)用本地服務(wù)一樣調(diào)用遠(yuǎn)程服務(wù)信认。它可以有不同的實(shí)現(xiàn)方式材义。如RMI(遠(yuǎn)程方法調(diào)用)、Hessian嫁赏、Http invoker等其掂。另外,RPC是與語言無關(guān)的潦蝇。

RPC示意圖.png

如上圖所示款熬,假設(shè)Computer1在調(diào)用sayHi()方法,對(duì)于Computer1而言調(diào)用sayHi()方法就像調(diào)用本地方法一樣攘乒,調(diào)用 –>返回贤牛。但從后續(xù)調(diào)用可以看出Computer1調(diào)用的是Computer2中的sayHi()方法,RPC屏蔽了底層的實(shí)現(xiàn)細(xì)節(jié)则酝,讓調(diào)用者無需關(guān)注網(wǎng)絡(luò)通信殉簸,數(shù)據(jù)傳輸?shù)燃?xì)節(jié)。

2 RPC框架的實(shí)現(xiàn)

上面介紹了RPC的核心原理:RPC能夠讓本地應(yīng)用簡(jiǎn)單沽讹、高效地調(diào)用服務(wù)器中的過程(服務(wù))般卑。它主要應(yīng)用在分布式系統(tǒng)。如Hadoop中的IPC組件爽雄。但怎樣實(shí)現(xiàn)一個(gè)RPC框架呢蝠检?
從下面幾個(gè)方面思考,僅供參考:

  1. 通信模型:假設(shè)通信的為A機(jī)器與B機(jī)器挚瘟,A與B之間有通信模型叹谁,在Java中一般基于BIO或NIO饲梭;。
  2. 過程(服務(wù))定位:使用給定的通信方式焰檩,與確定IP與端口及方法名稱確定具體的過程或方法憔涉;
  3. 遠(yuǎn)程代理對(duì)象:本地調(diào)用的方法(服務(wù))其實(shí)是遠(yuǎn)程方法的本地代理,因此可能需要一個(gè)遠(yuǎn)程代理對(duì)象锅尘,對(duì)于Java而言监氢,遠(yuǎn)程代理對(duì)象可以使用Java的動(dòng)態(tài)對(duì)象實(shí)現(xiàn),封裝了調(diào)用遠(yuǎn)程方法調(diào)用藤违;
  4. 序列化浪腐,將對(duì)象名稱、方法名稱顿乒、參數(shù)等對(duì)象信息進(jìn)行網(wǎng)絡(luò)傳輸需要轉(zhuǎn)換成二進(jìn)制傳輸议街,這里可能需要不同的序列化技術(shù)方案。如:protobuf璧榄,Arvo等特漩。

3 Java實(shí)現(xiàn)RPC框架

3.1 實(shí)現(xiàn)技術(shù)方案

下面使用比較原始的方案實(shí)現(xiàn)RPC框架,采用Socket通信骨杂、動(dòng)態(tài)代理與反射與Java原生的序列化涂身。

3.2 RPC框架架構(gòu)

RPC架構(gòu)分為三部分:

  • 服務(wù)提供者,運(yùn)行在服務(wù)器端搓蚪,提供服務(wù)接口定義與服務(wù)實(shí)現(xiàn)類蛤售。
  • 服務(wù)中心,運(yùn)行在服務(wù)器端妒潭,負(fù)責(zé)將本地服務(wù)發(fā)布成遠(yuǎn)程服務(wù)悴能,管理遠(yuǎn)程服務(wù),提供給服務(wù)消費(fèi)者使用雳灾。
  • 服務(wù)消費(fèi)者漠酿,運(yùn)行在客戶端,通過遠(yuǎn)程代理對(duì)象調(diào)用遠(yuǎn)程服務(wù)谎亩。

3.3 具體實(shí)現(xiàn)

服務(wù)提供者接口定義與實(shí)現(xiàn)炒嘲,代碼如下:

package com.boer.tdf.act.service;

/**
 * HelloService.
 */
public interface HelloService {
    String sayHi(String name);
}

HelloServices接口實(shí)現(xiàn)類:

package com.boer.tdf.act.serviceimpl;

import com.boer.tdf.act.service.HelloService;

/**
 * HelloServiceImpl.
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHi(String name) {
        return "Hi, " + name;
    }
}

服務(wù)中心代碼實(shí)現(xiàn),代碼如下:

package com.boer.tdf.act.service;

import java.io.IOException;

/**
 * Server.
 */
public interface Server {

    public void stop();

    public void start() throws IOException;

    public void register(Class serviceInterface, Class impl);

    public boolean isRunning();

    public int getPort();
}

服務(wù)中心實(shí)現(xiàn)類:

package com.boer.tdf.act.serviceimpl;

import com.boer.tdf.act.service.Server;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 服務(wù)中心實(shí)現(xiàn)類.
 */
public class ServiceCenter implements Server {

    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
    private static boolean isRunning = false;
    private static int port;

    public ServiceCenter(int port) {
        this.port = port;
    }

    @Override
    public void stop() {
        isRunning = false;
        executor.shutdown();
    }

    @Override
    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(port));
        System.out.println("start server");
        try {
            while (true) {
                // 1.監(jiān)聽客戶端的TCP連接匈庭,接到TCP連接后將其封裝成task夫凸,由線程池執(zhí)行
                executor.execute(new ServiceTask(server.accept()));
            }
        } finally {
            server.close();
        }

    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        serviceRegistry.put(serviceInterface.getName(), impl);

    }

    @Override
    public boolean isRunning() {
        return isRunning;
    }

    @Override
    public int getPort() {
        return port;
    }

    private static class ServiceTask implements Runnable {
        Socket clent = null;

        public ServiceTask(Socket client) {
            this.clent = client;
        }

        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try {
                // 2.將客戶端發(fā)送的碼流反序列化成對(duì)象,反射調(diào)用服務(wù)實(shí)現(xiàn)者嚎花,獲取執(zhí)行結(jié)果
                input = new ObjectInputStream(clent.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                if (serviceClass == null) {
                    throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);

                // 3.將執(zhí)行結(jié)果反序列化寸痢,通過socket發(fā)送給客戶端
                output = new ObjectOutputStream(clent.getOutputStream());
                output.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (output != null) {
                    try {
                        output.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (input != null) {
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (clent != null) {
                    try {
                        clent.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }


}

客戶端的遠(yuǎn)程代理對(duì)象:

package com.boer.tdf.act.serviceimpl;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 客戶端的遠(yuǎn)程代理對(duì)象.
 */
public class RPCClient<T> {

    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
        // 1.將本地的接口調(diào)用轉(zhuǎn)換成JDK的動(dòng)態(tài)代理呀洲,在動(dòng)態(tài)代理中實(shí)現(xiàn)接口的遠(yuǎn)程調(diào)用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
                        try {
                            // 2.創(chuàng)建Socket客戶端紊选,根據(jù)指定地址連接遠(yuǎn)程服務(wù)提供者
                            socket = new Socket();
                            socket.connect(addr);

                            // 3.將遠(yuǎn)程服務(wù)調(diào)用所需的接口類啼止、方法名、參數(shù)列表等編碼后發(fā)送給服務(wù)提供者
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);

                            // 4.同步阻塞等待服務(wù)器返回應(yīng)答兵罢,獲取應(yīng)答后返回
                            input = new ObjectInputStream(socket.getInputStream());
                            return input.readObject();
                        } finally {
                            if (socket != null) socket.close();
                            if (output != null) output.close();
                            if (input != null) input.close();
                        }
                    }
                });
    }

}

最后為測(cè)試類:

package com.boer.tdf.act.serviceimpl;

import com.boer.tdf.act.service.HelloService;
import com.boer.tdf.act.service.Server;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * 測(cè)試類.
 *
 * 運(yùn)行結(jié)果:
 * start server
 * Hi, test
 */
public class RPCTest {
    public static void main(String[] args) throws IOException {
        new Thread(new Runnable() {
            public void run() {
                try {
                    Server serviceServer = new ServiceCenter(8088);
                    serviceServer.register(HelloService.class, HelloServiceImpl.class);
                    serviceServer.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));
        System.out.println(service.sayHi("test"));
    }
}

總結(jié)

RPC本質(zhì)為消息處理模型,RPC屏蔽了底層不同主機(jī)間的通信細(xì)節(jié),讓進(jìn)程調(diào)用遠(yuǎn)程的服務(wù)就像是本地的服務(wù)一樣度气。

可以改進(jìn)的地方

這里實(shí)現(xiàn)的簡(jiǎn)單RPC框架是使用Java語言開發(fā)墨榄,與Java語言高度耦合,并且通信方式采用的Socket是基于BIO實(shí)現(xiàn)的此蜈,IO效率不高即横,還有Java原生的序列化機(jī)制占內(nèi)存太多,運(yùn)行效率也不高裆赵《簦可以考慮從下面幾種方法改進(jìn)。

  • 可以采用基于JSON數(shù)據(jù)傳輸?shù)腞PC框架战授;
  • 可以使用NIO或直接使用Netty替代BIO實(shí)現(xiàn)页藻;
  • 使用開源的序列化機(jī)制,如Hadoop Avro與Google protobuf等植兰;
  • 服務(wù)注冊(cè)可以使用Zookeeper進(jìn)行管理份帐,能夠讓應(yīng)用更加穩(wěn)定。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末楣导,一起剝皮案震驚了整個(gè)濱河市废境,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌爷辙,老刑警劉巖彬坏,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異膝晾,居然都是意外死亡栓始,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門血当,熙熙樓的掌柜王于貴愁眉苦臉地迎上來幻赚,“玉大人,你說我怎么就攤上這事臊旭÷淠眨” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵离熏,是天一觀的道長(zhǎng)佳谦。 經(jīng)常有香客問我,道長(zhǎng)滋戳,這世上最難降的妖魔是什么钻蔑? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任啥刻,我火速辦了婚禮,結(jié)果婚禮上咪笑,老公的妹妹穿的比我還像新娘可帽。我一直安慰自己,他們只是感情好窗怒,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布映跟。 她就那樣靜靜地躺著,像睡著了一般扬虚。 火紅的嫁衣襯著肌膚如雪努隙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天辜昵,我揣著相機(jī)與錄音剃法,去河邊找鬼。 笑死路鹰,一個(gè)胖子當(dāng)著我的面吹牛贷洲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播晋柱,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼优构,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了雁竞?” 一聲冷哼從身側(cè)響起钦椭,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎碑诉,沒想到半個(gè)月后彪腔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡进栽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年德挣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片快毛。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡格嗅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出唠帝,到底是詐尸還是另有隱情屯掖,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布襟衰,位于F島的核電站贴铜,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜绍坝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一赶熟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧陷嘴,春花似錦、人聲如沸间坐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽竹宋。三九已至劳澄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蜈七,已是汗流浹背秒拔。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留飒硅,地道東北人砂缩。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像三娩,于是被迫代替她去往敵國(guó)和親庵芭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理雀监,服務(wù)發(fā)現(xiàn)双吆,斷路器,智...
    卡卡羅2017閱讀 134,713評(píng)論 18 139
  • 今天分布式應(yīng)用会前、云計(jì)算好乐、微服務(wù)大行其道,作為其技術(shù)基石之一的 RPC 你了解多少瓦宜?一篇 RPC 的技術(shù)總結(jié)文章蔚万,數(shù)...
    零一間閱讀 1,902評(píng)論 1 46
  • 轉(zhuǎn)自http://mp.weixin.qq.com/s?__biz=MzAxMTEyOTQ5OQ==&mid=26...
    文刂德光軍閱讀 1,123評(píng)論 0 11
  • 作為初學(xué)者在課堂上你真的聽懂瑜伽老師講的口令了嗎笛坦?該如何做呢?以下六種常見的口令解說我們一起來看看苔巨。 1 雙腳打開...
    yogi嵐閱讀 1,326評(píng)論 0 1
  • 我過了23年的單身生活版扩,沒有戀愛,不是不想侄泽,而是不能礁芦。在23年的學(xué)習(xí)生活,工作生活中,沒有男生追過我∈量郏現(xiàn)在我好像有...
    黃桃真美味閱讀 183評(píng)論 0 0