手寫一個RPC框架戳表,看看100個線程同時調(diào)用情況如何

本人微信公眾號(jwfy)歡迎關(guān)注

簡單的介紹RPC是什么桶至,RPC整個調(diào)用流程是什么,包含了什么組件镣屹。然后實際編寫一個RPC實例价涝,模擬100個線程調(diào)用以驗證RPC的可用性色瘩,穩(wěn)定性等。最后總結(jié)自己編寫的RPC框架存在哪些問題泞遗,可以去完善的,一個優(yōu)秀的RPC框架應該必備的功能點汹买。

什么是RPC

RPC(Remote Procedure Call)聊倔,遠程過程調(diào)用耙蔑,可通過網(wǎng)絡調(diào)用其他機器的服務請求。RPC是一種規(guī)范甸陌,和TCP钱豁、UDP都沒有關(guān)系,RCP可以采用TCP協(xié)議完成數(shù)據(jù)傳輸卵酪,甚至可以使用HTTP應用協(xié)議。RCP是C端模式溃卡,包含了服務端(服務提供方)蜒简、客戶端(服務使用方),采用特定的網(wǎng)絡傳輸協(xié)議最铁,把數(shù)據(jù)按照特定的協(xié)議包裝后進行傳輸操作等操作垮兑。先來了解下一個具體的RPC調(diào)用請求的執(zhí)行過程

image

本圖來自網(wǎng)絡

  • 1系枪、服務調(diào)用方(Client)調(diào)用本地調(diào)用的方式調(diào)用本地代理對象
  • 2、代理對象將類名稱雾棺、方法衬浑、參數(shù)等請求數(shù)據(jù)按照請求協(xié)議組裝成Request
  • 3工秩、通過Request數(shù)據(jù)從服務治理獲取有效的服務端信息
  • 4、將Request數(shù)據(jù)按照序列化協(xié)議序列化后浪听,使用網(wǎng)絡傳輸協(xié)議通過網(wǎng)絡發(fā)送到服務端中
  • 5眉菱、服務端接收到序列化后到數(shù)據(jù),利用序列號協(xié)議反序列化操作生成Request數(shù)據(jù)
  • 6克伊、通過Request數(shù)據(jù)找到具體的服務提供方华坦,并調(diào)用執(zhí)行特定的方法季春,計算出執(zhí)行結(jié)果
  • 7、執(zhí)行結(jié)果包裝成Response耘拇,按照原路返回至客戶端
  • 8宇攻、客戶端解析Response,得到對應的執(zhí)行結(jié)果嘉涌,又或者是具體的錯誤信息

這就是一個完整的RPC調(diào)用過程,對使用方而言就只暴露了本地代理對象扔役,剩下的數(shù)據(jù)解析警医、運輸?shù)榷急话b了预皇,從服務提供方的角度看還有服務暴露,如下圖DUBBO的架構(gòu)圖序仙。

image

RPC 實踐

學習寫RPC之前必須先了解動態(tài)代理反射這兩個知識點鲁豪,如不了解先自行了解呈昔,本學習筆記不涉及到此內(nèi)容的介紹。

文件夾目錄

image

Request對象

// lombok 
@Data
public class MethodParameter {

    String className;
    String methodName;
    Object[] arguments;
    Class<?>[] parameterTypes;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }

    public static MethodParameter convert(InputStream inputStream) {

        try {
            ObjectInputStream input = new ObjectInputStream(inputStream);
            String className = input.readUTF();
            String methodName = input.readUTF();
            Class<?>[] parameterTypes = (Class<?>[])input.readObject();
            Object[] arguments = (Object[])input.readObject();

            MethodParameter methodParameter = new MethodParameter();
            methodParameter.setClassName(className);
            methodParameter.setMethodName(methodName);
            methodParameter.setArguments(arguments);
            methodParameter.setParameterTypes(parameterTypes);

            return methodParameter;
        } catch (Exception e) {
            throw new RuntimeException("解析請求錯誤:" + e.getMessage());
        }
    }

}

可以很清楚的看到convert方法就是從一個輸入流中讀取出類名稱、方法名等數(shù)據(jù)郭宝,組成一個MethodParameter對象粘室,也就是上面所說的Request

服務端 - 服務暴露

public class RpcExploreService {

    private Map<String, Object> objectMap = new HashMap<>();

    public void explore(String className, Object object) {
        objectMap.put(className, object);
    }

    public Object invoke(MethodParameter methodParameter) {
        Object object = objectMap.get(methodParameter.getClassName());
        if (object == null) {
            throw new RuntimeException("無對應執(zhí)行類:" + methodParameter.getClassName());
        }
        Method method = null;
        try {
            method = object.getClass().getMethod(methodParameter.getMethodName(), methodParameter.getParameterTypes());
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("無對應執(zhí)行方法:" + methodParameter.getClassName() + ", 方法:" + methodParameter.getMethodName());
        }

        try {
            Object result = method.invoke(object, methodParameter.getArguments());

            System.out.println(methodParameter);

            return result;
        } catch (Exception e) {
            throw new RuntimeException("invoke方法執(zhí)行失敗:" + e.getMessage());
        }
    }

}

服務暴露存儲了一個Map<String, Object> objectMap對象衔统,所有可對外提供服務的都必須添加到該容器中,以便于收到網(wǎng)絡數(shù)據(jù)后能找到對應的服務舱殿,然后采用反射invoke調(diào)用险掀,返回得到的結(jié)果。

服務端 - 網(wǎng)絡數(shù)據(jù)處理

public class IOService implements Runnable{

    private int port;
    private ServerSocket serverSocket;
    private RpcExploreService rpcExploreService;
    private volatile boolean flag;

    public IOService(RpcExploreService rpcExploreService, int port) throws IOException {
        this.rpcExploreService = rpcExploreService;
        this.port = port;
        this.serverSocket = new ServerSocket(port);
        this.flag = true;
        System.out.println("服務端啟動了");

        // 優(yōu)雅關(guān)閉
        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override
            public void run() {
                flag = false;
                System.out.println("服務端關(guān)閉了");
            }
        });
    }

    @Override
    public void run() {
        while (flag) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
            } catch (IOException e) {
            }
            if (socket == null) {
                continue;
            }
            new Thread(new ServerSocketRunnable(socket)).start();
        }
    }

    class ServerSocketRunnable implements Runnable {
        private Socket socket;
        public ServerSocketRunnable(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
                MethodParameter methodParameter = MethodParameter.convert(inputStream);
                Object result = rpcExploreService.invoke(methodParameter);
                ObjectOutputStream output = new ObjectOutputStream(outputStream);
                output.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

簡單的BIO模型侠鳄,開啟了一個ServerSocket后伟恶,接收到數(shù)據(jù)后就把套接字丟給一個新的線程處理十电,ServerSocketRunnable接受一個socket后叹螟,解析出MethodParameter這個請求對象,然后調(diào)用服務暴露的invoke方法畏线,再寫回到socket傳輸給客戶端

客戶端 - 服務訂閱

public class RpcUsedService {

    private Map<String, Object> proxyObjectMap = new HashMap<>();
    private Map<String, Class> classMap = new HashMap<>();
    private IOClient ioClient;

    public void setIoClient(IOClient ioClient) {
        this.ioClient = ioClient;
    }

    public void register(Class clazz) {
        String className = clazz.getName();
        classMap.put(className, clazz);
        if (!clazz.isInterface()) {
            throw new RuntimeException("暫時只支持接口類型的");
        }

        try {
            RpcInvocationHandler handler = new RpcInvocationHandler();
            handler.setClazz(clazz);
            Object proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, handler);
            proxyObjectMap.put(className, proxyInstance);
            // 然后需要包裝起來
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <T> T get(Class<T> clazz) {
        String className = clazz.getName();
        return (T) proxyObjectMap.get(className);
    }

    class RpcInvocationHandler implements InvocationHandler {

        private Class clazz;
        public void setClazz(Class clazz) {
            this.clazz = clazz;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 實際上proxy沒啥用處寝殴,不需要真正的反invoke射
            MethodParameter methodParameter = new MethodParameter();

            methodParameter.setClassName(clazz.getName());
            methodParameter.setMethodName(method.getName());
            methodParameter.setArguments(args);
            methodParameter.setParameterTypes(method.getParameterTypes());

            return ioClient.invoke(methodParameter);
        }
    }
}

服務使用方需要使用register進行服務的注冊蚣常,會生成對應的本地代理對象痊银,后續(xù)只需要通過本地代理對象。

客戶端 - 網(wǎng)絡處理

public class IOClient {

    private String ip;
    private int port;
    public IOClient(String ip, int port) throws IOException {
        this.ip = ip;
        this.port = port;
    }

    public Object invoke(MethodParameter methodParameter) {
        Socket socket = null;
        try {
            socket = new Socket(ip, port);
            OutputStream outputStream = socket.getOutputStream();
            ObjectOutputStream ouput = new ObjectOutputStream(outputStream);

            ouput.writeUTF(methodParameter.getClassName());
            ouput.writeUTF(methodParameter.getMethodName());
            ouput.writeObject(methodParameter.getParameterTypes());
            ouput.writeObject(methodParameter.getArguments());

            InputStream inputStream = socket.getInputStream();
            ObjectInputStream input = new ObjectInputStream(inputStream);
            return input.readObject();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return null;
    }
}

代理對象被調(diào)用后生成一個MethodParameter對象,通過此IOClient把數(shù)據(jù)傳輸?shù)椒斩酥孪。⑶曳祷貙臄?shù)據(jù)抖单。

實踐

服務端

public class Service {

    public static void main(String[] args) {
        RpcExploreService rpcExploreService = new RpcExploreService();
        // 傳入的字符串是接口的全名稱
        rpcExploreService.explore("new2019.rpc.rpc_v1.expore.Helloworld", new HelloWorldImpl());

        try {
            Runnable ioService = new IOService(rpcExploreService, 10001);
            new Thread(ioService).start();
            // 開啟了端口為10001的服務監(jiān)聽
        } catch (IOException e) {
        }
    }
}

客戶端

public class Client {

    public static void main(String[] args) {
        RpcUsedService rpcUsedService = new RpcUsedService();
        rpcUsedService.register(Helloworld.class);

        try {
            IOClient ioClient = new IOClient("127.0.0.1", 10001);
            // 網(wǎng)絡套接字鏈接 同上是10001端口
            rpcUsedService.setIoClient(ioClient);

            Helloworld helloworld = rpcUsedService.get(Helloworld.class);
            // 生成的本地代理對象 proxy

            for(int i=0; i< 100; i++) {
                // 開啟了100個縣城
                new Thread(() -> {
                    long start = System.currentTimeMillis();
                    int a = new Random().nextInt(100);
                    int b = new Random().nextInt(100);
                    int c = helloworld.add(a, b);
                    // .add 操作就是屏蔽了所有的細節(jié)矛绘,提供給客戶端使用的方法
                    System.out.println("a: " + a + ", b:" + b + ", c=" + c + ", 耗時:" + (System.currentTimeMillis() - start));
                }).start();
            }

        } catch (IOException e) {
        }
    }
}

測試服務

// Helloworld 接口
public interface Helloworld {
    String hi();
    int add(int a, int b);
}

// Helloworld 接口 實現(xiàn)類
public class HelloWorldImpl implements Helloworld {

    @Override
    public String hi() {
        return "ok";
    }

    @Override
    public int add(int a, int b) {
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(new Random().nextInt(10000));
            // 故意添加了耗時操作蔑歌,以便于模擬真實的調(diào)用操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int c = a + b;
        System.out.println(Thread.currentThread().getName() + " 耗時:" + (System.currentTimeMillis() - start));
        return c;
    }
}

運行效果

image
image

總結(jié) & 思考

這只是一個非常簡單的RPC實踐次屠,包含了服務暴露雳刺、服務注冊(Proxy生成)裸违、BIO模型進行網(wǎng)絡傳輸,java默認的序列化方法枪汪,對RPC有一個初步的認識和了解怔昨,知道RPC必須包含的模塊

不過還是有很多需要優(yōu)化的點以改進赖捌。

  • IO模型:使用的是BIO模型矮烹,可以改進換成NIO模型,引入netty
  • 池化:不要隨意新建線程卤唉,所有的線程都應有線程池統(tǒng)一管理
  • 服務發(fā)現(xiàn):本地模擬的小demo仁期,并沒有服務發(fā)現(xiàn)蟀拷,可以采用zk管理
  • 序列化:java本身自帶的序列化效率很低,可以換成Hessian(DUBBO默認采用其作為序列化工具)问芬、Protobuf(Protobuf是由Google提出的一種支持多語言的跨平臺的序列化框架)等

還有例如服務統(tǒng)計此衅、優(yōu)雅下線、負載均衡等也都是一個成熟的RPC框架必須要考慮到的點骑歹。

本人微信公眾號(搜索jwfy)歡迎關(guān)注

微信公眾號
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末道媚,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子最域,更是在濱河造成了極大的恐慌镀脂,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沙兰,死亡現(xiàn)場離奇詭異翘魄,居然都是意外死亡,警方通過查閱死者的電腦和手機训措,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來怀大,“玉大人,你說我怎么就攤上這事潜慎”涂担” “怎么了蒜焊?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鳖悠。 經(jīng)常有香客問我优妙,道長,這世上最難降的妖魔是什么卡辰? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任九妈,我火速辦了婚禮,結(jié)果婚禮上允蚣,老公的妹妹穿的比我還像新娘嚷兔。我一直安慰自己,他們只是感情好冒晰,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布壶运。 她就那樣靜靜地躺著,像睡著了一般埠况。 火紅的嫁衣襯著肌膚如雪棵癣。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天喜命,我揣著相機與錄音河劝,去河邊找鬼赎瞎。 笑死,一個胖子當著我的面吹牛煎娇,可吹牛的內(nèi)容都是我干的缓呛。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼因妙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了铣耘?” 一聲冷哼從身側(cè)響起以故,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤怒详,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吊骤,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體静尼,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡鼠渺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了奕扣。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汰蓉。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖尔邓,靈堂內(nèi)的尸體忽然破棺而出窘行,到底是詐尸還是另有隱情端考,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布扶供,位于F島的核電站裂明,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏扳碍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一碱蒙、第九天 我趴在偏房一處隱蔽的房頂上張望夯巷。 院中可真熱鬧鞭莽,春花似錦坊秸、人聲如沸褒搔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽琳状。三九已至盒齿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間翎承,已是汗流浹背符匾。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工啊胶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人焰坪。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓琳彩,卻偏偏與公主長得像部凑,于是被迫代替她去往敵國和親碧浊。 傳聞我的和親對象是個殘疾皇子箱锐,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容