一起學(xué)RPC(五)

之前的文章介紹了一個(gè)RPC請(qǐng)求有哪些經(jīng)歷富纸。這篇文章將介紹一直遺漏的provider發(fā)布服務(wù)的相關(guān)細(xì)節(jié)葵袭。

以前都是以JNettyTcpAcceptor作為程序的入口來(lái)一步一步抽絲剝繭探索細(xì)節(jié)◆岢澹現(xiàn)在就得尋根問(wèn)底從DefaultServer來(lái)繼續(xù)探索簸州。

程序包中有個(gè)example缠借,給了最基礎(chǔ)的用法拆火,一看就知道的那種跳夭。

public static void main(String[] args) {
        JServer server = new DefaultServer().withAcceptor(new JNettyTcpAcceptor(18090));
        MonitorServer monitor = new MonitorServer();
        try {
            monitor.start();

            ServiceWrapper provider = server.serviceRegistry()
                    .provider(new GenericServiceTestImpl())
                    .flowController(new FlowController<JRequest>() {

                        private AtomicLong count = new AtomicLong();

                        @Override
                        public ControlResult flowControl(JRequest request) {
                            if (count.getAndIncrement() > 100) {
                                return new ControlResult(false, "fuck out!!!");
                            }
                            return ControlResult.ALLOWED;
                        }
                    })
                    .register();

//            server.withGlobalFlowController(); // 全局限流器
            server.connectToRegistryServer("127.0.0.1:20001");
            server.publish(provider);
            server.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

很明顯JNettyTcpAcceptor僅僅是一個(gè)小弟,正真的老大其實(shí)是JServer.他才是有至高無(wú)上的權(quán)利们镜。這也是一個(gè)接口類(lèi)型币叹,這么做目的也是很明顯,就是為了方便拓展--其他的實(shí)現(xiàn)只需要實(shí)現(xiàn)這個(gè)接口就完事了模狭。從demo中可以看到幾個(gè)非常明顯的操作如connectToRegistryServer颈抚、publishstart以及serviceRegistry嚼鹉。接下來(lái)就對(duì)這些動(dòng)作逐一解讀贩汉。

首先通過(guò)serviceRegistry()獲取“本地”注冊(cè)工具。什么意思呢锚赤?也就是說(shuō)通過(guò)這個(gè)方法獲取的玩意兒實(shí)際上是用來(lái)在本機(jī)上將所要發(fā)布的服務(wù)給“注冊(cè)”上匹舞。所謂的注冊(cè)其實(shí)簡(jiǎn)單理解為用一個(gè)map給保存起來(lái)。而這個(gè)注冊(cè)工具也提供幾個(gè)動(dòng)作线脚。

這幾個(gè)動(dòng)作的命名也是非常語(yǔ)義化--provider()方法就是將一個(gè)服務(wù)的實(shí)現(xiàn)(就是普通的Javabean)給hold状突:

    public ServiceRegistry provider(Object serviceProvider, ProviderInterceptor... interceptors) {
            this.serviceProvider = serviceProvider;
            this.interceptors = interceptors;
            return this;
        }

這里將其保存為成員變量,放到后面會(huì)有用到的浑侥,現(xiàn)在先別慌姊舵,繼續(xù)往下面看≡⒙洌看完了啥都會(huì)知道的蠢莺。

接下來(lái)就是flowController()流控器,也是非常直白的命名零如。這個(gè)玩意具體是做什么的呢,其實(shí)也很簡(jiǎn)單锄弱。在一些場(chǎng)景下考蕾,會(huì)出現(xiàn)下游業(yè)務(wù)大量請(qǐng)求上游的接口,翻譯成人話就是consumer會(huì)發(fā)起很多請(qǐng)求到provider会宪,這時(shí)候provider會(huì)覺(jué)得難受肖卧,壓力很大,可能被搞崩潰掸鹅。因?yàn)橐坏┍罎⒘巳慷纪甑叭剩虼藀rovider就想個(gè)法子:你們consumer可以來(lái)請(qǐng)求拦赠,隨便你們?cè)趺锤悖坏┌盐腋銦┝宋揖筒淮罾砟懔丝选_@樣就不會(huì)出現(xiàn)崩潰的情形了荷鼠。當(dāng)然這個(gè)解釋是很簡(jiǎn)陋的,實(shí)際上要比這個(gè)復(fù)雜的多榔幸。流量控制是一個(gè)比較大的話題允乐,絕非一句兩句能說(shuō)的清道的明的。現(xiàn)有的成熟的限流中間件也有很多削咆,如近期阿里開(kāi)源的Sentinel等牍疏。這個(gè)demo中給的限流策略也很簡(jiǎn)單:如果你調(diào)用這服務(wù)超過(guò)100次我就把你踢掉,具體做法是拋出異常拨齐。

接下來(lái)就是register()方法了鳞陨。顧名思義就是注冊(cè)。這個(gè)注冊(cè)就是本地的注冊(cè)瞻惋,也就是之前說(shuō)的將這個(gè)服務(wù)在本地用一個(gè)map保存下來(lái)厦滤。這個(gè)邏輯稍微有點(diǎn)復(fù)雜。先不急熟史,繼續(xù)往后看馁害。

通過(guò)register()會(huì)返回一個(gè)ServiceWrapper實(shí)例,Jupiter根據(jù)這個(gè)對(duì)象將注冊(cè)信息上傳給注冊(cè)中心蹂匹。這里的注冊(cè)才是正真意義上的注冊(cè)碘菜,其實(shí)邏輯也非常簡(jiǎn)單,無(wú)非就是將這個(gè)實(shí)例相關(guān)的信息如類(lèi)名限寞,版本信息和組名等一些元數(shù)據(jù)放到注冊(cè)中心上去忍啸,這里的注冊(cè)中心簡(jiǎn)單理解為zk就好了。

connectToRegistryServer()方法用來(lái)連接到注冊(cè)中心履植,注冊(cè)中心本質(zhì)上就是一個(gè)server计雌,這里的provider實(shí)際上是一個(gè)客戶端。然后通過(guò)publish()方法將ServiceWrapper中的相關(guān)信息通過(guò)tcp網(wǎng)絡(luò)傳送給注冊(cè)中心玫霎,注冊(cè)中心將其保存下來(lái)凿滤。最后start()用來(lái)啟動(dòng)provider的server,用來(lái)接受consumer的請(qǐng)求庶近。

下面來(lái)看看這個(gè)register()是怎么實(shí)現(xiàn)的翁脆。

    public ServiceWrapper register() {
            checkNotNull(serviceProvider, "serviceProvider");

            Class<?> providerClass = serviceProvider.getClass();

            ServiceProviderImpl implAnnotation = null;
            ServiceProvider ifAnnotation = null;
            for (Class<?> cls = providerClass; cls != Object.class; cls = cls.getSuperclass()) {
                if (implAnnotation == null) {
                    implAnnotation = cls.getAnnotation(ServiceProviderImpl.class);
                }

                Class<?>[] interfaces = cls.getInterfaces();
                if (interfaces != null) {
                    for (Class<?> i : interfaces) {
                        ifAnnotation = i.getAnnotation(ServiceProvider.class);
                        if (ifAnnotation == null) {
                            continue;
                        }

                        checkArgument(
                                interfaceClass == null,
                                i.getName() + " has a @ServiceProvider annotation, can't set [interfaceClass] again"
                        );

                        interfaceClass = i;
                        break;
                    }
                }

                if (implAnnotation != null && ifAnnotation != null) {
                    break;
                }
            }

            if (ifAnnotation != null) {
                checkArgument(
                        group == null,
                        interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [group] again"
                );
                checkArgument(
                        providerName == null,
                        interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [providerName] again"
                );

                group = ifAnnotation.group();
                String name = ifAnnotation.name();
                providerName = Strings.isNotBlank(name) ? name : interfaceClass.getName();
            }

            if (implAnnotation != null) {
                checkArgument(
                        version == null,
                        providerClass.getName() + " has a @ServiceProviderImpl annotation, can't set [version] again"
                );

                version = implAnnotation.version();
            }

            checkNotNull(interfaceClass, "interfaceClass");
            checkArgument(Strings.isNotBlank(group), "group");
            checkArgument(Strings.isNotBlank(providerName), "providerName");
            checkArgument(Strings.isNotBlank(version), "version");

            // method's extensions
            //
            // key:     method name
            // value:   pair.first:  方法參數(shù)類(lèi)型(用于根據(jù)JLS規(guī)則實(shí)現(xiàn)方法調(diào)用的靜態(tài)分派)
            //          pair.second: 方法顯式聲明拋出的異常類(lèi)型
            Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions = Maps.newHashMap();
            for (Method method : interfaceClass.getMethods()) {
                String methodName = method.getName();
                List<Pair<Class<?>[], Class<?>[]>> list = extensions.get(methodName);
                if (list == null) {
                    list = Lists.newArrayList();
                    extensions.put(methodName, list);
                }
                list.add(Pair.of(method.getParameterTypes(), method.getExceptionTypes()));
            }

            return registerService(
                    group,
                    providerName,
                    version,
                    serviceProvider,
                    interceptors,
                    extensions,
                    weight,
                    executor,
                    flowController
            );
        }

首先第一個(gè)for循環(huán)目的是通過(guò)給定的Javabean找到其接口,如果沒(méi)有就去找父類(lèi)鼻种,直到最終為Object為止反番。在暴露一個(gè)服務(wù)的時(shí)候可以在其實(shí)現(xiàn)類(lèi)及接口上加注解,因此這里需要做一下篩選,只會(huì)去找有注解的接口以及實(shí)現(xiàn)類(lèi)罢缸。通過(guò)注解去找接口是一種方式篙贸,其實(shí)也能手動(dòng)指定接口,原則是不能既手動(dòng)指定接口又給接口指定注解枫疆。注解中有提供相關(guān)信息如group爵川、name等。最后將這些參數(shù)全部丟給registerService處理养铸。

    ServiceWrapper registerService(
            String group,
            String providerName,
            String version,
            Object serviceProvider,
            ProviderInterceptor[] interceptors,
            Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions,
            int weight,
            Executor executor,
            FlowController<JRequest> flowController) {

        ProviderInterceptor[] allInterceptors = null;
        List<ProviderInterceptor> tempList = Lists.newArrayList();
        if (globalInterceptors != null) {
            Collections.addAll(tempList, globalInterceptors);
        }
        if (interceptors != null) {
            Collections.addAll(tempList, interceptors);
        }
        if (!tempList.isEmpty()) {
            allInterceptors = tempList.toArray(new ProviderInterceptor[tempList.size()]);
        }

        ServiceWrapper wrapper =
                new ServiceWrapper(group, providerName, version, serviceProvider, allInterceptors, extensions);

        wrapper.setWeight(weight);
        wrapper.setExecutor(executor);
        wrapper.setFlowController(flowController);

        providerContainer.registerService(wrapper.getMetadata().directoryString(), wrapper);

        return wrapper;
    }

這里有幾個(gè)參數(shù)沒(méi)有提及到雁芙,如interceptors,weight钞螟,executor等兔甘。這些實(shí)際上和核心邏輯關(guān)系不大,只是可選項(xiàng)而已鳞滨。不必太多關(guān)注洞焙。這里的邏輯也十分簡(jiǎn)單,主旨就是將這些參數(shù)封裝到一個(gè)ServiceWrapper對(duì)象中拯啦,最后將這個(gè)對(duì)象保存在本地(放一個(gè)map中)澡匪。這里的key的生成邏輯是這樣的:組-服務(wù)名-版本號(hào)。這種結(jié)構(gòu)就像目錄一樣褒链,一級(jí)又一級(jí)唁情。

public String directoryString() {
        if (directoryCache != null) {
            return directoryCache;
        }

        StringBuilder buf = StringBuilderHelper.get();
        buf.append(getGroup())
                .append('-')
                .append(getServiceProviderName())
                .append('-')
                .append(getVersion());

        directoryCache = buf.toString();

        return directoryCache;
    }

本地注冊(cè)邏輯也非常簡(jiǎn)單:

    private final ConcurrentMap<String, ServiceWrapper> serviceProviders = Maps.newConcurrentMap();
    public void registerService(String uniqueKey, ServiceWrapper serviceWrapper) {
            serviceProviders.put(uniqueKey, serviceWrapper);

            logger.info("ServiceProvider [{}, {}] is registered.", uniqueKey, serviceWrapper);
        }

整個(gè)“本地注冊(cè)”的邏輯實(shí)際上也講完了(除了那些不影響邏輯的可選參數(shù))。接下來(lái)就是連接到注冊(cè)中心甫匹。

    public void connectToRegistryServer(String connectString) {
        registryService.connectToRegistryServer(connectString);
    }

實(shí)際上也是委派給registryService完成的甸鸟,這個(gè)老大很懶,很多事情不自己做兵迅。而這個(gè)registryService是這樣創(chuàng)建的:


    public DefaultServer() {
        this(RegistryService.RegistryType.DEFAULT);
    }
    public DefaultServer(RegistryService.RegistryType registryType) {
        registryType = registryType == null ? RegistryService.RegistryType.DEFAULT : registryType;
        registryService = JServiceLoader.load(RegistryService.class).find(registryType.getValue());
    }

這個(gè)操作比較高級(jí)抢韭,通過(guò)SPI去加載具體實(shí)現(xiàn)。具體怎么整的先不去看了恍箭,反正一時(shí)半會(huì)也看不懂刻恭。Jupiter中實(shí)現(xiàn)有兩種:DefaultRegistryServiceZookeeperRegistryService,默認(rèn)的是DefaultRegistryService扯夭。實(shí)際上默認(rèn)的是作者自己寫(xiě)的一個(gè)實(shí)現(xiàn)鳍贾,屬于玩票級(jí)別的。雖然是業(yè)余的交洗,但是也是值得學(xué)習(xí)的贾漏。這個(gè)DefaultRegistryService的核心實(shí)際上就是內(nèi)部維護(hù)一個(gè)tcp客戶端,然后去連接注冊(cè)中心藕筋。注冊(cè)中心也有兩個(gè)實(shí)現(xiàn):自己實(shí)現(xiàn)的和基于zk的。這里的細(xì)節(jié)以后會(huì)說(shuō)到。

連接完事了之后隐圾,接下來(lái)的動(dòng)作就是發(fā)布publish:

    public void publish(ServiceWrapper serviceWrapper) {
        ServiceMetadata metadata = serviceWrapper.getMetadata();

        RegisterMeta meta = new RegisterMeta();
        meta.setPort(acceptor.boundPort());
        meta.setGroup(metadata.getGroup());
        meta.setServiceProviderName(metadata.getServiceProviderName());
        meta.setVersion(metadata.getVersion());
        meta.setWeight(serviceWrapper.getWeight());
        meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);

        registryService.register(meta);
    }

這個(gè)發(fā)布的實(shí)際操作是將這些信息傳給注冊(cè)中心伍掀,準(zhǔn)確來(lái)講應(yīng)該叫做“遠(yuǎn)程注冊(cè)”。其中的邏輯無(wú)非就是將服務(wù)中的信息封裝到RegisterMeta對(duì)象中暇藏,然后將這個(gè)對(duì)象丟給注冊(cè)中心蜜笤。后續(xù)的處理就留個(gè)懸念,后續(xù)的文章繼續(xù)講盐碱。

至此把兔,一個(gè)provider的服務(wù)發(fā)布流程就差不多整理完了。這么回頭以看瓮顽,其實(shí)代碼組織邏輯也不是那么復(fù)雜县好。接下來(lái)的內(nèi)容該是RegistryService的核心實(shí)現(xiàn)了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末暖混,一起剝皮案震驚了整個(gè)濱河市缕贡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌拣播,老刑警劉巖晾咪,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異贮配,居然都是意外死亡谍倦,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)泪勒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)昼蛀,“玉大人,你說(shuō)我怎么就攤上這事酣藻〔芮ⅲ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵辽剧,是天一觀的道長(zhǎng)送淆。 經(jīng)常有香客問(wèn)我,道長(zhǎng)怕轿,這世上最難降的妖魔是什么偷崩? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮撞羽,結(jié)果婚禮上阐斜,老公的妹妹穿的比我還像新娘。我一直安慰自己诀紊,他們只是感情好谒出,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著,像睡著了一般笤喳。 火紅的嫁衣襯著肌膚如雪为居。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,208評(píng)論 1 299
  • 那天杀狡,我揣著相機(jī)與錄音蒙畴,去河邊找鬼。 笑死呜象,一個(gè)胖子當(dāng)著我的面吹牛膳凝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恭陡,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蹬音,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了子姜?” 一聲冷哼從身側(cè)響起祟绊,我...
    開(kāi)封第一講書(shū)人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哥捕,沒(méi)想到半個(gè)月后牧抽,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡遥赚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年扬舒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凫佛。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡讲坎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出愧薛,到底是詐尸還是另有隱情晨炕,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布毫炉,位于F島的核電站瓮栗,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏瞄勾。R本人自食惡果不足惜费奸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望进陡。 院中可真熱鬧愿阐,春花似錦、人聲如沸趾疚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至戈二,卻和暖如春舒裤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背觉吭。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留仆邓,地道東北人鲜滩。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像节值,于是被迫代替她去往敵國(guó)和親徙硅。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理搞疗,服務(wù)發(fā)現(xiàn)嗓蘑,斷路器,智...
    卡卡羅2017閱讀 134,652評(píng)論 18 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,082評(píng)論 25 707
  • 遷就這 糾結(jié)著 兩年的 于心不忍的 一切的一切 就在今天晚上 什么都有了答案 只能說(shuō) 快樂(lè) 今晚 我領(lǐng)略風(fēng)采 領(lǐng)略...
    右手辣條丶左手啤酒閱讀 142評(píng)論 0 0
  • 版權(quán)所有,轉(zhuǎn)載需注明出處 1幢炸、連載《只說(shuō)一次分手》||初識(shí):不如便宜我吧 芒果有時(shí)候很無(wú)聊泄隔,她會(huì)在夜里12...
    教書(shū)匠張家三少爺閱讀 340評(píng)論 0 0
  • 2017.04.12 兒子: 你昨天的數(shù)學(xué)測(cè)試得了有史以來(lái)的最低分,媽媽心里翻江倒海的宛徊,不是很好受佛嬉,我想我一個(gè)局外...
    JaneHuang閱讀 298評(píng)論 0 0