自己動(dòng)手實(shí)現(xiàn)RPC框架(1)-簡(jiǎn)單發(fā)布服務(wù)

什么是RPC

rpc是遠(yuǎn)程過(guò)程調(diào)用榨婆,在本地代碼中使用模擬調(diào)用本地方法的形式調(diào)用遠(yuǎn)程的服務(wù)過(guò)程窿锉。

RPC的優(yōu)點(diǎn)

  • 對(duì)于服務(wù)器端開發(fā)人員而言,容易設(shè)計(jì)轧简、開發(fā)驰坊。
  • 對(duì)于消費(fèi)者而言,調(diào)用非常簡(jiǎn)單哮独。
  • 便于做集中的監(jiān)控拳芙。
  • 基于socket的二進(jìn)制RPC協(xié)議,建立連接延遲低借嗽、網(wǎng)絡(luò)傳輸效率高态鳖。
  • 支持有狀態(tài)的長(zhǎng)連接,可進(jìn)行雙向通信恶导,實(shí)時(shí)性好浆竭。
  • 在各個(gè)企業(yè)的使用較為成熟,許多企業(yè)都有自己的RPC實(shí)踐惨寿,并已廣泛應(yīng)用在生產(chǎn)環(huán)節(jié)邦泄。

RPC 的缺點(diǎn)

  • 緊耦合
    • API一旦發(fā)布,就難以再做改動(dòng)裂垦。
    • 客戶端必須使用特定的框架顺囊,而且還需引入API包。
  • 沒(méi)有統(tǒng)一的設(shè)計(jì)風(fēng)格
    • 增加了客戶端開發(fā)人員的學(xué)習(xí)成本
    • 難以實(shí)現(xiàn)通用的客戶端庫(kù),每個(gè)RPC框架都有各自的協(xié)議蕉拢。
    • 通常以動(dòng)詞的形式設(shè)計(jì)API特碳,一個(gè)功能就增加一個(gè)API,設(shè)計(jì)的時(shí)候很少考慮領(lǐng)域模型晕换。
  • 掩蓋了網(wǎng)絡(luò)的復(fù)雜性
    • 開發(fā)人員很容易混淆遠(yuǎn)程調(diào)用與本地調(diào)用午乓。
      實(shí)際上網(wǎng)絡(luò)調(diào)用與本地調(diào)用是完全不同的,RPC的調(diào)用方式闸准,讓使用者很難意識(shí)到是在進(jìn)行網(wǎng)絡(luò)調(diào)用益愈,忽略了針對(duì)網(wǎng)絡(luò)復(fù)雜性的處理。
    • 會(huì)損害用戶(客戶端)可感知的性能

盡管RPC有許多的優(yōu)點(diǎn)和缺點(diǎn)夷家,但是在互聯(lián)網(wǎng)企業(yè)中應(yīng)用非常廣泛蒸其,具有很高的學(xué)習(xí)價(jià)值。并且對(duì)網(wǎng)絡(luò)库快、IO摸袁、線程等知識(shí)領(lǐng)域都有涉及,是較好的學(xué)習(xí)路徑缺谴。因此我們通過(guò)自己實(shí)現(xiàn)一個(gè)RPC框架來(lái)學(xué)習(xí)和強(qiáng)化相應(yīng)的知識(shí)但惶,并且可以更了解RPC底層的設(shè)計(jì)思路耳鸯,在工作中實(shí)現(xiàn)應(yīng)用代碼的時(shí)候更加心中有數(shù)。

典型RPC的要點(diǎn)

  • 服務(wù)發(fā)布
    • 序列化協(xié)議
    • 服務(wù)響應(yīng)
      • 線程模型
  • 透明調(diào)用
    • 動(dòng)態(tài)代理
    • 服務(wù)發(fā)現(xiàn)
      • 負(fù)載均衡

第一步,單一直連

實(shí)現(xiàn)目標(biāo)

RPC最核心的部分是服務(wù)發(fā)布和服務(wù)調(diào)用膀曾,服務(wù)動(dòng)態(tài)發(fā)現(xiàn)和負(fù)載均衡都是更為高級(jí)的特性县爬。在第一步,我們先實(shí)現(xiàn)一個(gè)僅支持單一直連和的RPC框架添谊。

  • 直連财喳,所謂直連的意思,就是客戶端必須預(yù)先配置好服務(wù)端的信息(包括服務(wù)地址和服務(wù)協(xié)議)斩狱。所以在這一步耳高,不需要服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn),也就不需要服務(wù)注冊(cè)表所踊。
  • 單一泌枪,單一是指客戶端預(yù)先配置的provider只有一個(gè),如果客戶端預(yù)先配置的provider list含有兩個(gè)及以上元素秕岛,盡管不使用服務(wù)發(fā)現(xiàn)碌燕,仍然會(huì)存在負(fù)載均衡的問(wèn)題,所以這一步我們只支持配置一個(gè)provider继薛。

發(fā)布服務(wù)

發(fā)布服務(wù)是指服務(wù)端開始提供服務(wù)并能響應(yīng)請(qǐng)求修壕,和服務(wù)注冊(cè)不是一回事情。

在這一步遏考,我們要編寫服務(wù)端代碼慈鸠。服務(wù)端代碼有以下要素:

  • 網(wǎng)絡(luò)IO框架,網(wǎng)絡(luò)框架我們采用netty灌具,并使用nio模式青团,netty具備良好的線程模型和豐富的框架功能】ч梗基于netty可以事半功倍壶冒。
  • 序列化協(xié)議,序列化協(xié)議使用protestuff截歉,這是參考了google protebuf的純java序列化框架,兼容protobuf協(xié)議烟零。

實(shí)現(xiàn)細(xì)節(jié)

1. 消息傳輸

客戶端的請(qǐng)求稱為RequestMessage瘪松,服務(wù)端響應(yīng)稱為ResponseMessage,請(qǐng)求和響應(yīng)在網(wǎng)絡(luò)中傳遞锨阿,數(shù)據(jù)包會(huì)被拆分或合并宵睦,也就是我們常說(shuō)的拆包和粘包。所以數(shù)據(jù)要能夠根據(jù)一定規(guī)則切分成一幀一幀墅诡,也就是正確還原為消息壳嚎。

常用的消息幀處理方式有:

  • 定長(zhǎng)消息,每個(gè)消息體的長(zhǎng)度固定并事先知曉。消息收發(fā)方都按約定的長(zhǎng)度接收烟馅、發(fā)送消息说庭。
  • 長(zhǎng)度前置變長(zhǎng)消息,每個(gè)消息的固定頭幾位表示消息的長(zhǎng)度(size)郑趁,后面接著消息內(nèi)容刊驴。通常用4位或8位前綴表示內(nèi)容長(zhǎng)度,前綴解析位整形數(shù)字寡润,該數(shù)字就是消息內(nèi)容大小捆憎。如通常的RPC協(xié)議,消息隊(duì)列梭纹,都是使用這種方式躲惰,這種方式的使用非常廣泛。
  • 字符分割变抽,固定的一個(gè)或多個(gè)字符代表一個(gè)消息的結(jié)束础拨。如http協(xié)議使用連續(xù)的兩個(gè)換行符表示消息結(jié)束。

我們采用長(zhǎng)度前置的變長(zhǎng)消息來(lái)處理信息幀瞬沦,對(duì)應(yīng)netty的實(shí)現(xiàn)就是LengthFieldBasedFrameDecoder太伊。

2.io模式&響應(yīng)方式

當(dāng)然是使用nio了」渥辏基于netty的nio編程也非常高效僚焦。

在這個(gè)階段,我們不區(qū)分業(yè)務(wù)線程和IO線程曙痘,直接用netty IO線程調(diào)用provider的業(yè)務(wù)代碼芳悲。再以后的完善過(guò)程加上業(yè)務(wù)線程的區(qū)分。

類定義

類圖如下:

基礎(chǔ)類圖

代碼實(shí)現(xiàn)

ServiceExporter

ServiceExporter將服務(wù)發(fā)布到網(wǎng)絡(luò)環(huán)境边坤,負(fù)責(zé)響應(yīng)&解析Request名扛、編碼&發(fā)送Response〖胙鳎總而言之就是負(fù)責(zé)網(wǎng)絡(luò)邊界處理肮韧,不負(fù)責(zé)Request的執(zhí)行。

具體查找Provider旺订、執(zhí)行Request弄企、調(diào)用業(yè)務(wù)代碼、產(chǎn)生Response的功能被委托給ProviderManager区拳。

下面用Netty NIO實(shí)現(xiàn)一個(gè)ServiceExporter拘领。

package io.destinyshine.storks.provider; 
/**
 *
 * 基于Netty NIO實(shí)現(xiàn)的服務(wù)發(fā)布。
 * <p>
 *     將服務(wù)發(fā)布到網(wǎng)絡(luò)環(huán)境樱调,可以供消費(fèi)端調(diào)用约素,處理消費(fèi)端請(qǐng)求并返回響應(yīng)消息届良。
 * </p>
 *
 * @author liujianyu
 * @date 2017/08/10
 */

import ...

@Slf4j
public class NioSocketChannelServiceExporter implements ServiceExporter {

    private final int servicePort;

    private Channel channel;
    private InetSocketAddress localAddress;
    private ProviderManager providerManager;
    
    /**
     * 使用一個(gè)本地服務(wù)端口構(gòu)造exporter,將通過(guò)指定端口提供服務(wù)圣猎。
     * <p>
     *     如果localPort為0士葫,代表使用隨機(jī)端口,在服務(wù)啟動(dòng)后通過(guò){@link #getServicePort()}獲取實(shí)際端口样漆。
     * </p>
     * @param localPort 服務(wù)端口为障,為0可使用隨機(jī)端口。
     * @see #getServicePort() 
     */
    public NioSocketChannelServiceExporter(int localPort) {
        this.servicePort = localPort;
    }

    @Override
    public void export() throws InterruptedException {
        Objects.requireNonNull(providerManager, "please set providerManager non null before export services.");

        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap.group(group);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.localAddress(new InetSocketAddress(servicePort));
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(Protocol.newFrameDecoder())
                        .addLast(new ProtostuffDecoder<>(RequestMessage.class, RequestMessage::new))
                        .addLast(new ProtostuffEncoder<>(ResponseMessage.class))
                        .addLast(new RequestMessageHandler(providerManager));
                }
            });
            ChannelFuture channelFuture = bootstrap.bind().sync();
            this.channel = channelFuture.channel();
            this.localAddress = (InetSocketAddress)this.channel.localAddress();
            exportCompleted(providerManager);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }

    protected void exportCompleted(ProviderManager providerManager) {
        //export完成后置處理放祟,未來(lái)可用于注冊(cè)服務(wù)到注冊(cè)中心等鳍怨。
    }

    private ServiceInstance toServiceInstance(ProviderDescriptor desc) {
        StorksApplication application = providerManager.getApplication();
        ServiceInstance serviceInstance = ServiceInstance.builder()
            .appName(application.getAppName())
            .protocol("storks")
            .host(application.getLocalHost())
            .port(getServicePort())
            .serviceInterface(desc.getServiceInterface().getName())
            .serviceVersion(desc.getServiceVersion())
            .build();
        return serviceInstance;
    }

    public int getServicePort() {
        return this.localAddress.getPort();
    }

    @Override
    public void setProviderManager(ProviderManager providerManager) {
        this.providerManager = providerManager;
    }
}

ProviderManager

ServiceExporter只是負(fù)責(zé)搭建網(wǎng)絡(luò)邊界的溝通橋梁,而具體的provider業(yè)務(wù)代碼執(zhí)行由ProviderManager負(fù)責(zé)跪妥。

ServiceExporter負(fù)責(zé)將網(wǎng)絡(luò)請(qǐng)求還原為request鞋喇,以及將response發(fā)送到網(wǎng)絡(luò);而providerManager負(fù)責(zé)查找本地Provider和執(zhí)行request眉撵。

package io.destinyshine.storks.provider;

import ...

/**
 * 管理所有的Provider侦香,Provider以{@link ProviderDescriptor}類來(lái)表示,所有的Provider需要注冊(cè)到ProviderManager中纽疟。
 * <p>
 *     ServiceExporter只是負(fù)責(zé)搭建網(wǎng)絡(luò)邊界的溝通橋梁罐韩,而具體的provider業(yè)務(wù)代碼執(zhí)行由ProviderManager負(fù)責(zé)。
 *     ServiceExporter負(fù)責(zé)將網(wǎng)絡(luò)請(qǐng)求還原為request污朽,以及將response發(fā)送到網(wǎng)絡(luò)散吵;而providerManager負(fù)責(zé)查找本地Provider和執(zhí)行request。
 * <p/>
 * <p>
 *     根據(jù)以上設(shè)計(jì)蟆肆,如果本地ProviderTable沒(méi)有指定的Provider矾睦,則這個(gè)NoProviderDefinedException異常由ProviderManager拋出,而不是ServiceExporter炎功。
 *
 * </p>
 * @author liujianyu
 */
 @Slf4j
public class DefaultProviderManager implements ProviderManager {

    private Map<String, ProviderDescriptor> localProviderTable = new ConcurrentHashMap<>();

    private StorksApplication application;

    private boolean working = false;

    private List<ServiceProviderListener> serviceProviderListeners = new ArrayList<>();

    @Override
    public <T> ProviderDescriptor<T> findProvider(String serviceInterface, String version) {
        String key = serviceInterface + ":" + version;
        ProviderDescriptor<T> providerDescriptor = localProviderTable.get(key);
        if (providerDescriptor == null) {
            throw new NoProviderDefinedException(
                String.format(
                    "no provider of interface %s defined.",
                    serviceInterface
                )
            );
        }
        return providerDescriptor;
    }

    @Override
    public ResponseMessage execute(RequestMessage request) {
        ProviderDescriptor<?> desc = this.findProvider(request.getServiceInterface(), request.getServiceVersion());
        Object provider = desc.getProvider();
        Class<?>[] parameterClasses = new Class<?>[request.getParameterTypes().length];
        for (int i = 0; i < request.getParameterTypes().length; i++) {
            try {
                parameterClasses[i] = Class.forName(request.getParameterTypes()[i]);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
        try {
            Method method = provider.getClass().getMethod(request.getMethodName(), parameterClasses);
            Object returnValue = method.invoke(provider, request.getParameters());
            ResponseMessage responseMessage = new ResponseMessage();
            responseMessage.setReturnValue(returnValue);
            responseMessage.setTraceId(request.getTraceId());
            return responseMessage;
        } catch (NoSuchMethodException e) {
            logger.error(e.getMessage(), e);
        } catch (InvocationTargetException e) {
            logger.error(e.getMessage(), e);
        } catch (IllegalAccessException e) {
            logger.error(e.getMessage(), e);
        }

        return null;
    }

    @Override
    public void addProvider(ProviderDescriptor desc) {
        String serviceKey = Protocol.serviceKey(desc);
        localProviderTable.put(serviceKey, desc);
    }

    @Override
    public void removeProvider(ProviderDescriptor desc) {
        String serviceKey = Protocol.serviceKey(desc);
        localProviderTable.remove(serviceKey);
    }

    @Override
    public StorksApplication getApplication() {
        return application;
    }

    public void setApplication(StorksApplication application) {
        this.application = application;
    }

    @Override
    public Map<String, ProviderDescriptor> getProviders() {
        return localProviderTable;
    }
}

啟動(dòng)服務(wù)端

在這里我們沒(méi)有和spring集成枚冗,更沒(méi)有編寫spring boot starter。我們先用最原始的方式通過(guò)執(zhí)行main方法和原始API來(lái)啟動(dòng)服務(wù)蛇损。

代碼實(shí)現(xiàn):

public static void main(String[] args) throws Exception {

    logger.info("--server main--");

    StorksApplication app = new StorksApplication("testProvider");

    ProviderDescriptor desc = new ProviderDescriptor(HelloService.class, "1.0.0", new HelloServiceImpl());

    NioSocketChannelServiceExporter exporter = new NioSocketChannelServiceExporter(0);

    DefaultProviderManager providerManager = new DefaultProviderManager();
    providerManager.setApplication(app);
    //add provider
    providerManager.addProvider(desc);

    exporter.setProviderManager(providerManager);
    exporter.export();

    logger.info("exporter started.");

}

后續(xù)

到目前只實(shí)現(xiàn)了簡(jiǎn)單的服務(wù)發(fā)布赁温,還沒(méi)有客戶端調(diào)用的實(shí)現(xiàn),將在后面實(shí)現(xiàn)淤齐。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末束世,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子床玻,更是在濱河造成了極大的恐慌,老刑警劉巖沉帮,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锈死,死亡現(xiàn)場(chǎng)離奇詭異贫堰,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)待牵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門其屏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人缨该,你說(shuō)我怎么就攤上這事偎行。” “怎么了贰拿?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵蛤袒,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我膨更,道長(zhǎng)妙真,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任荚守,我火速辦了婚禮珍德,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘矗漾。我一直安慰自己锈候,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布敞贡。 她就那樣靜靜地躺著泵琳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嫡锌。 梳的紋絲不亂的頭發(fā)上虑稼,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音势木,去河邊找鬼蛛倦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛啦桌,可吹牛的內(nèi)容都是我干的溯壶。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼甫男,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼且改!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起板驳,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤又跛,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后若治,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體慨蓝,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡感混,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了礼烈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弧满。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖此熬,靈堂內(nèi)的尸體忽然破棺而出庭呜,到底是詐尸還是另有隱情,我是刑警寧澤犀忱,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布募谎,位于F島的核電站,受9級(jí)特大地震影響峡碉,放射性物質(zhì)發(fā)生泄漏近哟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一鲫寄、第九天 我趴在偏房一處隱蔽的房頂上張望吉执。 院中可真熱鬧,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)咕宿。三九已至,卻和暖如春蜡秽,著一層夾襖步出監(jiān)牢的瞬間府阀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工芽突, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留试浙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓寞蚌,卻偏偏與公主長(zhǎng)得像田巴,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子挟秤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理壹哺,服務(wù)發(fā)現(xiàn),斷路器艘刚,智...
    卡卡羅2017閱讀 134,629評(píng)論 18 139
  • 轉(zhuǎn)自:http://blog.csdn.net/kesonyk/article/details/50924489 ...
    晴天哥_王志閱讀 24,787評(píng)論 2 38