rpc系列1-10 minute Tutorial

一個簡單的rpc demo

最近在網(wǎng)上看到阿里巴巴2015年的中間件性能挑戰(zhàn)賽的一個題目圣蝎,實現(xiàn)一個簡單的RPC框架刹悴,于是乎有一種沖動實現(xiàn)一個簡單的rpc,要求基本按照競賽題目的要求蛹稍,具體如下:

1.要成為框架:對于框架的使用者,隱藏RPC實現(xiàn)扒吁。

2.網(wǎng)絡模塊可以自己編寫,如果要使用IO框架室囊,要求使用netty-4.0.23.Final。

3.能夠傳輸基本類型魁索、自定義業(yè)務類型融撞、異常類型(要在客戶端拋出)。

4.支持異步調用粗蔚,提供future尝偎、callback的能力。

5.要處理超時場景鹏控,服務端處理時間較長時致扯,客戶端在指定時間內跳出本次調用。

6.提供RPC上下文当辐,客戶端可以透傳數(shù)據(jù)給服務端抖僵。

7.提供Hook,讓開發(fā)人員進行RPC層面的AOP缘揪。

最終預期的框架結構:


rpc-demo結構圖
  • ConsumerService耍群、ProviderService是提供給client和server端使用的api接口。
  • InterceptorChain:提供了RPC層面的AOP功能找筝。日志蹈垢、白名單過濾、權限認證等等袖裕。
  • RpcContext:提供上線文曹抬,雙端可以透明傳輸數(shù)據(jù)。
  • Connector急鳄、Acceptor:網(wǎng)絡模塊谤民,第一步自己用JavaSocket實現(xiàn)。

要求有了攒岛,下面第一步先整一個能跑起來的赖临!

第一步先跑起來

先把我們預期能實現(xiàn)的功能擺出來:

基本調用鏈路暢通,能夠傳輸基本類型灾锯、自定義業(yè)務類型兢榨、異常類型(要在客戶端拋出)。

測試用的業(yè)務接口UserService:

/**
 * 測試用業(yè)務接口
 * 
 * @author wqx
 *
 */
public interface UserService {
    
    /**
     * 基本鏈路測試
     * 
     * @return
     */
    public String test();
    
    /**
     * 自定義業(yè)務類型測試
     * 
     * @param userId
     * @return
     */
    public User queryUserById(int userId);  
    
    /**
     * 異常測試
     * 
     * @throws IOException
     */
    public Object exceptionTest() throws RpcException;
}

業(yè)務實現(xiàn)UserServiceImpl類:


/**
 * 測試業(yè)務接口實現(xiàn)類
 * 
 * @author wqx
 *
 */
public class UserServiceImpl implements UserService {

    public String test() {
        return "hello client, this is rpc server.";
    }

    public User queryUserById(int userId) {
        User parent = new User(100,"小明爸爸");
        User child = new User(101,"小明同學");
        parent.addChild(child);
        return parent;
    }
    
    public Object exceptionTest() throws RpcException {
        throw new RpcException("exception occur in server!3炒稀凌那!");
    }
}

測試用的自定義業(yè)務類型User

/**
 * 測試用的自定義業(yè)務類型
 * 
 * @author wqx
 *
 */
public class User implements java.io.Serializable{
    
    private static final long serialVersionUID = 493399440916323966L;

    private Integer id;
    
    private String name;
    
    private List<User> childs;

    
    public void addChild(User child){
        if(childs == null){
            childs = new ArrayList<User>();
        }
        childs.add(child);
    }
    //。吟逝。帽蝶。getter setter 
}

需求明確。块攒。励稳。鍵盤飛起。囱井。驹尼。

基本步驟很簡單,通過Proxy對客戶端方法調用進行攔截庞呕,在代理對象的回調方法中新翎,發(fā)起遠程調用,網(wǎng)絡模塊先采用簡單的Java提供的SocketAPI吧住练,對象的序列化和反序列化也是用JDK自帶的功能實現(xiàn)地啰。一切從簡從速!=补洹亏吝!

基本流程如下圖:


rpc調用流程
  1. invokeMethod:客戶端調用目標對象的方法,被代理對象攔截盏混。
  2. wrap Request:封裝調用參數(shù)(方法名顺呕,方法參數(shù)等),實現(xiàn)RpcRequest對象括饶,并序列化株茶。
  3. network transport:網(wǎng)絡傳輸,將序列化的參數(shù)對象傳輸?shù)侥繕朔斩恕?/li>
  4. unwrap request:對接收到的請求參數(shù)進行反序列化過程图焰。
  5. invokeMethod:通過反射機制method.invoke(obj,args)調用目標方法启盛。
  6. 將結果包裝在RpcResponse對象中,進行序列化技羔,為返回做準備僵闯。
  7. 和步驟3一樣,服務端通過網(wǎng)絡將結果返回給客戶端藤滥。
  8. unwrap Response:反序列化鳖粟,得到RpcResponse對象,從中獲取結果retVal拙绊,并返回客戶端向图。

過程很簡單泳秀,下面開始實現(xiàn)第一個組件RpcBuilder,主要功能用來生成client端和server端的代理對象榄攀。

/**
 * rpc服務類
 * 
 * @author wqx
 *
 */
public final class RpcBuilder {

    //構建客戶端的代理對象
    public static Object buildRpcClient(final Class<?> interfaces,final String host,final int port){
        if(interfaces == null){
            throw new IllegalArgumentException("interfaces can not be null");
        }

        return Proxy.newProxyInstance(RpcBuilder.class.getClassLoader(), new Class<?>[]{interfaces},
                new InvocationHandler(){

            //攔截目標方法->序列化method對象->發(fā)起socket連接
            public Object invoke(Object proxy, Method method,
                    Object[] args) throws Throwable {

                //創(chuàng)建連接,獲取輸入輸出流
                Socket socket = new Socket(host,port);
                Object retVal = null;
                try{

                    ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                    ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
                    try{
                        //構造請求參數(shù)對象
                        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args);
                        //發(fā)送
                        out.writeObject(request);

                        //接受server端的返回信息---阻塞
                        Object response = in.readObject();

                        if(response instanceof RpcResponse){
                            RpcResponse rpcResp  = (RpcResponse)response;
                            if(!rpcResp.isError()){
                                retVal = rpcResp.getResponseBody();
                            }else{
                                return new Throwable(rpcResp.getErrorMsg());
                            }
                        }
                    }finally{
                        out.close();
                        in.close();
                    }
                }finally{
                    socket.close();
                }
                return retVal;
            }
        });
    }
    
    private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;
    private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);

    public static void buildRpcServer(final Object service, final int port) throws IOException{
        if (service == null)  
            throw new IllegalArgumentException("service can not be null.");

        ServerSocket server = new ServerSocket(port);
        System.out.println("server started!!!");
        while(true){
            Socket socket = server.accept();//監(jiān)聽請求--阻塞

            //交由線程池異步處理
            handlerPool.submit(new Handler(service,socket));
        }
    }
    static class Handler implements Runnable{

        private Object service;

        private Socket socket;

        public Handler(Object service,Socket socket){
            this.service = service;
            this.socket = socket;
        }
        public void run() {
            try{
                ObjectInputStream in = null;
                ObjectOutputStream out = null;
                RpcResponse response = new RpcResponse();
                try {
                    in = new ObjectInputStream(socket.getInputStream());
                    out = new ObjectOutputStream(socket.getOutputStream());

                    Object req = in.readObject();
                    if(req instanceof RpcRequest){
                        RpcRequest rpcRequest = (RpcRequest)req;
                        Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                        Object retVal = method.invoke(service, rpcRequest.getArgs());
                        response.setResponseBody(retVal);
                        out.writeObject(response);
                    }else{
                        throw new IllegalArgumentException("bad request!");
                    }
                } catch (Exception e) {
                    response.setErrorMsg(e.getMessage());
                    response.setResponseBody(e);
                    out.writeObject(response);
                }finally{
                    in.close();
                    out.close();
                }
            }catch(Exception e){}
        }
    }
}

其中每次發(fā)送請求包含的信息(方法名嗜傅、參數(shù)名),將封裝在RpcRequest中檩赢,實現(xiàn)如下:

/**
 * 封裝請求參數(shù)
 * 
 * @author wqx
 *
 */
public class RpcRequest implements Serializable
{
    /**
     * 
     */
    private static final long serialVersionUID = -7102839100899303105L;

    //方法名
    private String methodName;
    
    //參數(shù)類型
    private Class<?>[] parameterTypes;
    
    //參數(shù)列表
    private Object[] args;
    
    public RpcRequest(String methodName,Class<?>[] parameterTypes,Object[] args)
    {
        this.methodName = methodName;
        this.parameterTypes = parameterTypes;
        this.args = args;
    }
    //getter and sette

服務端返回的執(zhí)行結果封裝在RpcResponse中吕嘀,如下所示:

/**
 * 響應對象
 * 
 * @author wqx
 *
 */
public class RpcResponse implements Serializable{
    
    static private final long serialVersionUID = -4364536436151723421L;
    
    //響應實體
    private Object responseBody;
    
    //錯誤信息
    private String errorMsg;
    
    public boolean isError(){
        return errorMsg == null ? false:true;
    }
    //getter and setter
}

自定義的業(yè)務異常RpcException:

/**
 * 自定義異常
 * 
 * @author wqx
 *
 */
public class RpcException extends RuntimeException {

    private static final long serialVersionUID = -2157872157006208360L;
    
    public RpcException(String msg){
        super(msg);
    }
}

client端測試代碼:

// client端
public class ClientTest {

    private static String host = "127.0.0.1";
    private static int port = 8888;
    
    public static void main(String[] args) {
        
        UserService userService = (UserService) RpcBuilder.buildRpcClient(UserService.class, host, port);
        
        Object msg = null;
        try{
        msg = userService.test();//測試基本鏈路是否暢通
//      msg = userService.exceptionTest();//異常測試
//      msg = userService.queryUserById(0);//傳輸自定義業(yè)務類型
//          if(msg instanceof User){
//              System.out.println("parent:" + ((User)msg).getName());
//              System.out.println("child:" + ((User)msg).getChilds().get(0).getName());
//          }
            System.out.println("msg:" + msg);
        }catch(Exception e){
            System.out.println("errorMsg:" + e);
        }
    }
}

server端的測試代碼:

public class ServerTest {

    private static int port = 8888;
    
    public static void main(String[] args) throws IOException {
        UserService userService = new UserServiceImpl();
         //暴露服務
        RpcBuilder.buildRpcServer(userService,port);
    }
}

測試test方法:預期輸出:

msg : hello client, this is rpc server.

測試exceptionTest方法:

輸出:
errorMsg:edu.ouc.rpc.RpcException: exception occur in server!U曷鳌偶房!

測試queryUserById方法:

輸出:
parent:小明爸爸
child:小明同學

done!>蝴悉!
上述源碼托管在github上

實現(xiàn)簡單但問題多多

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市伙菜,隨后出現(xiàn)的幾起案子轩缤,更是在濱河造成了極大的恐慌,老刑警劉巖贩绕,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件火的,死亡現(xiàn)場離奇詭異,居然都是意外死亡淑倾,警方通過查閱死者的電腦和手機馏鹤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來娇哆,“玉大人湃累,你說我怎么就攤上這事勃救。” “怎么了脱茉?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵剪芥,是天一觀的道長。 經(jīng)常有香客問我琴许,道長税肪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任榜田,我火速辦了婚禮益兄,結果婚禮上,老公的妹妹穿的比我還像新娘箭券。我一直安慰自己净捅,他們只是感情好,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布辩块。 她就那樣靜靜地躺著蛔六,像睡著了一般。 火紅的嫁衣襯著肌膚如雪废亭。 梳的紋絲不亂的頭發(fā)上国章,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音豆村,去河邊找鬼液兽。 笑死,一個胖子當著我的面吹牛掌动,可吹牛的內容都是我干的四啰。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼粗恢,長吁一口氣:“原來是場噩夢啊……” “哼柑晒!你這毒婦竟也來了?” 一聲冷哼從身側響起眷射,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤敦迄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后凭迹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罚屋,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年嗅绸,在試婚紗的時候發(fā)現(xiàn)自己被綠了脾猛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡鱼鸠,死狀恐怖猛拴,靈堂內的尸體忽然破棺而出羹铅,到底是詐尸還是另有隱情,我是刑警寧澤愉昆,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布职员,位于F島的核電站,受9級特大地震影響跛溉,放射性物質發(fā)生泄漏焊切。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一芳室、第九天 我趴在偏房一處隱蔽的房頂上張望专肪。 院中可真熱鬧,春花似錦堪侯、人聲如沸嚎尤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽芽死。三九已至,卻和暖如春次洼,著一層夾襖步出監(jiān)牢的瞬間关贵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工滓玖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人质蕉。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓势篡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親模暗。 傳聞我的和親對象是個殘疾皇子禁悠,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內容

  • 轉自:http://blog.csdn.net/kesonyk/article/details/50924489 ...
    晴天哥_王志閱讀 24,784評論 2 38
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn)兑宇,斷路器碍侦,智...
    卡卡羅2017閱讀 134,626評論 18 139
  • 今天對剛來的新人進行好一頓批評,對前幾天剛培訓的知識竟然全部忘光了隶糕,一個知識點都答不上來瓷产,我心里當時就火冒三丈,本...
    飛城閱讀 165評論 0 0
  • 1.class和id的使用場景 class定義到頁面上某一類的元素枚驻,可以在多個標簽內使用濒旦。id定義到頁面上唯一的元...
    我七閱讀 420評論 0 0
  • 時間已過0點,翻來覆去睡不著再登。打開日期計算器尔邓,才發(fā)現(xiàn)自己來到北京已經(jīng)整整147天晾剖。今天是在北京失業(yè)的第9天。我還以...
    心sheng閱讀 245評論 0 0