一起學(xué)RPC(七)

上一篇文章中說到要繼續(xù)討論關(guān)于“Registry”注冊(cè)器的實(shí)現(xiàn)罗珍。然而我反悔了。注冊(cè)器的實(shí)現(xiàn)涉及到了客戶端程序玩讳,而客戶端是屬于consumer的部分榕茧。因此我決定將這個(gè)部分稍微放一放。這篇文章開始介紹“注冊(cè)中心”的實(shí)現(xiàn)诽偷。

    public static void main(String[] args) {
        RegistryServer registryServer = RegistryServer.Default.createRegistryServer(20001, 1);
        MonitorServer monitor = new MonitorServer(19998);
        try {
            monitor.setRegistryMonitor(registryServer);
            monitor.start();
            registryServer.startRegistryServer();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

啟動(dòng)一個(gè)注冊(cè)中心server很簡(jiǎn)單坤学。注冊(cè)中心中有一個(gè)監(jiān)控“MonitorServer”,這個(gè)不僅僅在注冊(cè)中心中有报慕,在provider中也有深浮。

public interface RegistryServer extends RegistryMonitor {
    void startRegistryServer();
}

public interface RegistryMonitor {

    /**
     * Returns the address list of publisher.
     */
    List<String> listPublisherHosts();

    /**
     * Returns the address list of subscriber.
     */
    List<String> listSubscriberAddresses();

    /**
     * Returns to the service of all the specified service provider's address.
     */
    List<String> listAddressesByService(String group, String serviceProviderName, String version);

    /**
     * Finds the address(host, port) of the corresponding node and returns all
     * the service names it provides.
     */
    List<String> listServicesByAddress(String host, int port);
}

注冊(cè)中心抽象接口中只提供了一個(gè)操作--啟動(dòng)注冊(cè)中心。其超級(jí)繼承自Monitor眠冈,而這個(gè)Monitor提供的操作也很簡(jiǎn)單飞苇。默認(rèn)的實(shí)現(xiàn)是由DefaultRegistryServer來完成的菌瘫。這個(gè)實(shí)例的創(chuàng)建并不是顯示的,而是使用反射方式去實(shí)例化布卡。在某些情況下不想使用默認(rèn)的注冊(cè)中心而是使用別的如zk雨让,如果jupiter-registry-default依賴包沒有顯示的添加進(jìn)來,編譯會(huì)直接報(bào)錯(cuò)的忿等。這種思路在很多框架中都有體現(xiàn)栖忠。

注冊(cè)中心的作用除了保存provider的一些信息外,還有一個(gè)重要的職責(zé)--監(jiān)聽注冊(cè)信息的變化贸街。當(dāng)一個(gè)provider斷線了庵寞,那么注冊(cè)中心就得及時(shí)將這個(gè)provide發(fā)布的信息給清理掉(下線)并且告訴consumer:你訂閱的服務(wù)不可用了。如果重連后薛匪,provider會(huì)重新發(fā)布服務(wù)信息捐川,注冊(cè)中心又得將這些變化告訴consumer:你訂閱的服務(wù)又能使用了。對(duì)于consumer來說逸尖,當(dāng)consumer斷線了注冊(cè)中心將consumer訂閱的服務(wù)給清理掉综看,連接重建后又會(huì)自動(dòng)訂閱蚓庭。

和provider一樣蒸痹,注冊(cè)中心也是一個(gè)server头遭。因此需要關(guān)心的核心邏輯其實(shí)就是handler的實(shí)現(xiàn)。

boot.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(
                        new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
                        idleStateTrigger,
                        new MessageDecoder(),
                        encoder,
                        ackEncoder,
                        handler);
            }
        });

這里還多了一個(gè)ackEncoder編碼器逞频。

protected void encode(ChannelHandlerContext ctx, Acknowledge ack, ByteBuf out) throws Exception {
        out.writeShort(JProtocolHeader.MAGIC)
                .writeByte(JProtocolHeader.ACK)
                .writeByte(0)
                .writeLong(ack.sequence())
                .writeInt(0);
    }

從具體的實(shí)現(xiàn)邏輯中可以看出纯衍,這個(gè)編碼器實(shí)際上僅僅只做了一件事:構(gòu)造一個(gè)ack的消息包。

解碼器的邏輯和之前provider的類似苗胀,只不過接收的消息格式類型不同襟诸。

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            switch (state()) {
                case MAGIC:
                    checkMagic(in.readShort());             // MAGIC
                    checkpoint(State.SIGN);
                case SIGN:
                    header.sign(in.readByte());             // 消息標(biāo)志位
                    checkpoint(State.STATUS);
                case STATUS:
                    in.readByte();                          // no-op
                    checkpoint(State.ID);
                case ID:
                    header.id(in.readLong());               // 消息id
                    checkpoint(State.BODY_SIZE);
                case BODY_SIZE:
                    header.bodySize(in.readInt());          // 消息體長(zhǎng)度
                    checkpoint(State.BODY);
                case BODY:
                    byte s_code = header.serializerCode();

                    switch (header.messageCode()) {
                        case JProtocolHeader.HEARTBEAT:
                            break;
                        case JProtocolHeader.PUBLISH_SERVICE:
                        case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
                        case JProtocolHeader.SUBSCRIBE_SERVICE:
                        case JProtocolHeader.OFFLINE_NOTICE: {
                            byte[] bytes = new byte[header.bodySize()];
                            in.readBytes(bytes);

                            Serializer serializer = SerializerFactory.getSerializer(s_code);
                            Message msg = serializer.readObject(bytes, Message.class);
                            msg.messageCode(header.messageCode());
                            out.add(msg);

                            break;
                        }
                        case JProtocolHeader.ACK:
                            out.add(new Acknowledge(header.id()));

                            break;
                        default:
                            throw IoSignals.ILLEGAL_SIGN;
                    }
                    checkpoint(State.MAGIC);
            }
        }

實(shí)際上有三種消息類型:心跳、發(fā)布/訂閱和ack消息基协。對(duì)應(yīng)的處理邏輯也不相同歌亲,心跳消息不處理,發(fā)布/訂閱消息單獨(dú)去構(gòu)造消息實(shí)體澜驮,ack消息也如此陷揪,只不過內(nèi)容非常簡(jiǎn)單。

將消息解碼成特定對(duì)象之后就得對(duì)這些對(duì)象進(jìn)行處理杂穷。也就是MessageHandler要干的活兒悍缠。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel ch = ctx.channel();

            if (msg instanceof Message) {
                Message obj = (Message) msg;

                switch (obj.messageCode()) {
                    case JProtocolHeader.PUBLISH_SERVICE:
                    case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
                        RegisterMeta meta = (RegisterMeta) obj.data();
                        if (Strings.isNullOrEmpty(meta.getHost())) {
                            SocketAddress address = ch.remoteAddress();
                            if (address instanceof InetSocketAddress) {
                                meta.setHost(((InetSocketAddress) address).getAddress().getHostAddress());
                            } else {
                                logger.warn("Could not get remote host: {}, info: {}", ch, meta);

                                return;
                            }
                        }

                        if (obj.messageCode() == JProtocolHeader.PUBLISH_SERVICE) {
                            handlePublish(meta, ch);
                        } else if (obj.messageCode() == JProtocolHeader.PUBLISH_CANCEL_SERVICE) {
                            handlePublishCancel(meta, ch);
                        }
                        ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回復(fù)ACK
                                .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

                        break;
                    case JProtocolHeader.SUBSCRIBE_SERVICE:
                        handleSubscribe((RegisterMeta.ServiceMeta) obj.data(), ch);
                        ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回復(fù)ACK
                                .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

                        break;
                    case JProtocolHeader.OFFLINE_NOTICE:
                        handleOfflineNotice((RegisterMeta.Address) obj.data());

                        break;
                }
            } else if (msg instanceof Acknowledge) {
                handleAcknowledge((Acknowledge) msg, ch);
            } else {
                logger.warn("Unexpected message type received: {}, channel: {}.", msg.getClass(), ch);

                ReferenceCountUtil.release(msg);
            }
        }

這里處理的也只會(huì)有兩種類型--ack消息包和Message消息包,也就是之前解碼器的產(chǎn)物耐量。

對(duì)于發(fā)布/取消發(fā)布服務(wù)操作來說飞蚓,消息的發(fā)送者一定是provider,而消息的內(nèi)容之前也說到廊蜒,一定是RegisterMeta類型的趴拧。不信可以看看代碼:

    public void publish(ServiceWrapper serviceWrapper) {
        ServiceMetadata metadata = serviceWrapper.getMetadata();

        RegisterMeta meta = new RegisterMeta();
        meta.setPort(acceptor.boundPort());
        meta.setGroup(metadata.getGroup());
        meta.setServiceProviderName(metadata.getServiceProviderName());
        meta.setVersion(metadata.getVersion());
        meta.setWeight(serviceWrapper.getWeight());
        meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);

        registryService.register(meta);
    }

因此這里使用強(qiáng)制類型轉(zhuǎn)換是沒有問題的溅漾。可以看到著榴,provider發(fā)布注冊(cè)消息的時(shí)候并沒有將address給set進(jìn)去添履,因此這里是通過channel來獲取provider的host。沒有一點(diǎn)毛病脑又。我猜是因?yàn)閜rovider獲取自己的host比較麻煩暮胧,所以干脆直接讓注冊(cè)中心來處理這件事情。

接下來就是區(qū)分是發(fā)布還是取消發(fā)布挂谍。沒有直接在case中判斷可能是因?yàn)槿绻褂媚欠N方式上面的代碼得重寫一遍叔壤,也很累人瞎饲。干脆直接用if來判斷得了口叙。處理完了之后還得發(fā)送一個(gè)ack的消息到provider/consumer。這種邏輯也是能夠理解的嗅战。作為注冊(cè)中心肯定是有責(zé)任告訴provider/consumer:你們的請(qǐng)求都完成了妄田。不然provider/consumer怎么能保證自己發(fā)布/訂閱的消息是不是成功的被注冊(cè)中心給處理了呢?作為注冊(cè)中心驮捍,還是得讓人信任的疟呐。

看看發(fā)布服務(wù)的具體邏輯。

private void handlePublish(RegisterMeta meta, Channel channel) {

        logger.info("Publish {} on channel{}.", meta, channel);

        attachPublishEventOnChannel(meta, channel);

        final RegisterMeta.ServiceMeta serviceMeta = meta.getServiceMeta();
        ConfigWithVersion<ConcurrentMap<RegisterMeta.Address, RegisterMeta>> config =
                registerInfoContext.getRegisterMeta(serviceMeta);

        synchronized (registerInfoContext.publishLock(config)) {
            // putIfAbsent和config.newVersion()需要是原子操作, 所以這里加鎖
            if (config.getConfig().putIfAbsent(meta.getAddress(), meta) == null) {
                registerInfoContext.getServiceMeta(meta.getAddress()).add(serviceMeta);

                final Message msg = new Message(serializerType.value());
                msg.messageCode(JProtocolHeader.PUBLISH_SERVICE);
                msg.version(config.newVersion()); // 版本號(hào)+1
                msg.data(Pair.of(serviceMeta, meta));

                subscriberChannels.writeAndFlush(msg, new ChannelMatcher() {

                    @Override
                    public boolean matches(Channel channel) {
                        boolean doSend = isChannelSubscribeOnServiceMeta(serviceMeta, channel);
                        if (doSend) {
                            MessageNonAck msgNonAck = new MessageNonAck(serviceMeta, msg, channel);
                            // 收到ack后會(huì)移除當(dāng)前key(參見handleAcknowledge), 否則超時(shí)超時(shí)重發(fā)
                            messagesNonAck.put(msgNonAck.id, msgNonAck);
                        }
                        return doSend;
                    }
                });
            }
        }
    }

這句代碼attachPublishEventOnChannel(meta, channel);的方法名直譯過來意思就是在channel上綁定發(fā)布服務(wù)的信息东且。具體實(shí)現(xiàn)如下:

private static final AttributeKey<ConcurrentSet<RegisterMeta>> S_PUBLISH_KEY =
            AttributeKey.valueOf("server.published");

private static boolean attachPublishEventOnChannel(RegisterMeta meta, Channel channel) {
        Attribute<ConcurrentSet<RegisterMeta>> attr = channel.attr(S_PUBLISH_KEY);
        ConcurrentSet<RegisterMeta> registerMetaSet = attr.get();
        if (registerMetaSet == null) {
            ConcurrentSet<RegisterMeta> newRegisterMetaSet = new ConcurrentSet<>();
            registerMetaSet = attr.setIfAbsent(newRegisterMetaSet);
            if (registerMetaSet == null) {
                registerMetaSet = newRegisterMetaSet;
            }
        }

        return registerMetaSet.add(meta);
    }

這樣就很容易理解了启具,意思就是一個(gè)channel上是能夠保存信息的,怎么保存呢珊泳?通過S_PUBLISH_KEY鲁冯,就將這個(gè)玩意作為一個(gè)記號(hào),刻在channel上色查,這個(gè)記號(hào)是能夠指定類型的薯演,這里指定的是ConcurrentSet類型,當(dāng)然也能指定別的如String秧了、Integer啥的跨扮。然后就是一段騷操作--將這個(gè)類型給實(shí)例化出來,并且將meta消息給添加進(jìn)去验毡。第一次在這個(gè)channel去獲取S_PUBLISH_KEY標(biāo)記的內(nèi)容衡创,肯定是不存在的,那么就得將其中的內(nèi)容給實(shí)例化出來晶通,單線程環(huán)境什么都好說璃氢,簡(jiǎn)單粗暴去賦值即可,但是在并發(fā)情況下鬼知道啥時(shí)候某個(gè)線程就悄咪咪的把這個(gè)標(biāo)記好的set給實(shí)例化了录择,而這時(shí)候你卻不知道拔莱,再去實(shí)例化賦值一次碗降,這就產(chǎn)生了并發(fā)問題。這里的操作是使用setIfAbsent塘秦,如果這個(gè)標(biāo)記中已經(jīng)有了讼渊,那就直接返回這個(gè)存在的值,沒有就將其set進(jìn)去尊剔,并返回null爪幻。

“打標(biāo)記”這種概念很抽象,很納悶须误,為什么一個(gè)channel能夠被打上標(biāo)記挨稿。其實(shí)也容易理解,無非就是netty將這個(gè)channel進(jìn)行單獨(dú)的“處理”京痢。從本質(zhì)上來說在服務(wù)端的內(nèi)存中分配了一塊區(qū)域奶甘,與channel相關(guān)聯(lián)。這個(gè)channel也比較抽象祭椰,不容易理解臭家,其實(shí)就是一個(gè)“連接”,再具體點(diǎn)就是一個(gè)客戶端方淤。具體的實(shí)現(xiàn)細(xì)節(jié)得去查閱netty的源碼钉赁。

接下來使用一個(gè)config變量來保存address和RegisterMeta的映射.這個(gè)config是通過serviceMeta來獲取的,也就是通過服務(wù)名來獲取映射--指定的服務(wù)有哪些節(jié)點(diǎn)注冊(cè)携茂。這里同時(shí)也得維護(hù)節(jié)點(diǎn)-服務(wù)的映射關(guān)系你踩。最后構(gòu)造消息報(bào)文,這個(gè)報(bào)文發(fā)給所有的訂閱者讳苦。

僅僅發(fā)送出消息還不算完事

private final ConcurrentMap<String, MessageNonAck> messagesNonAck = Maps.newConcurrentMap();

這里還維護(hù)一個(gè)map變量带膜,保存著“未回應(yīng)”的消息。這么做的目的也是為了保證可靠医吊。也就是說钱慢,當(dāng)provider發(fā)布一條注冊(cè)信息給注冊(cè)中心了,注冊(cè)中心得回應(yīng)provider:我收到了你的請(qǐng)求了卿堂。同時(shí)也得處理這個(gè)注冊(cè)消息--告訴訂閱者束莫,但是不能一直等著訂閱者回復(fù)確認(rèn)收到的消息。于是在本地保存一個(gè)“未回應(yīng)”消息草描,當(dāng)收到訂閱者的響應(yīng)報(bào)文時(shí)再將這些“未回應(yīng)”報(bào)文給移除掉览绿。當(dāng)然也得考慮永遠(yuǎn)收不到或者很長(zhǎng)時(shí)間都收不到的情況,內(nèi)部有一個(gè)守護(hù)線程穗慕,定時(shí)去清理超時(shí)的消息饿敲。不然的話這個(gè)map肯定會(huì)爆炸,oom肯定是不允許發(fā)生的逛绵。

至于取消發(fā)布的邏輯基本上和發(fā)布一樣怀各,只不過是反著來的倔韭,一個(gè)是添加,一個(gè)是移除而已瓢对。不啰嗦了寿酌。

服務(wù)訂閱和服務(wù)下線留到下篇接著講。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末硕蛹,一起剝皮案震驚了整個(gè)濱河市醇疼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌法焰,老刑警劉巖秧荆,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異埃仪,居然都是意外死亡乙濒,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門贵试,熙熙樓的掌柜王于貴愁眉苦臉地迎上來琉兜,“玉大人,你說我怎么就攤上這事毙玻。” “怎么了廊散?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵桑滩,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我允睹,道長(zhǎng)运准,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任缭受,我火速辦了婚禮胁澳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘米者。我一直安慰自己韭畸,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布蔓搞。 她就那樣靜靜地躺著胰丁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喂分。 梳的紋絲不亂的頭發(fā)上锦庸,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音蒲祈,去河邊找鬼甘萧。 笑死萝嘁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扬卷。 我是一名探鬼主播酿愧,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼邀泉!你這毒婦竟也來了嬉挡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤汇恤,失蹤者是張志新(化名)和其女友劉穎庞钢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體因谎,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡基括,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了财岔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片风皿。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖匠璧,靈堂內(nèi)的尸體忽然破棺而出桐款,到底是詐尸還是另有隱情,我是刑警寧澤夷恍,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布魔眨,位于F島的核電站,受9級(jí)特大地震影響酿雪,放射性物質(zhì)發(fā)生泄漏遏暴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一指黎、第九天 我趴在偏房一處隱蔽的房頂上張望朋凉。 院中可真熱鬧,春花似錦醋安、人聲如沸杂彭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽盖灸。三九已至,卻和暖如春磺芭,著一層夾襖步出監(jiān)牢的瞬間赁炎,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留徙垫,地道東北人讥裤。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像姻报,于是被迫代替她去往敵國(guó)和親己英。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理吴旋,服務(wù)發(fā)現(xiàn)损肛,斷路器,智...
    卡卡羅2017閱讀 134,704評(píng)論 18 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,727評(píng)論 13 425
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html荣瑟,并沒有及時(shí)更新治拿。 術(shù)語對(duì)...
    joyenlee閱讀 7,668評(píng)論 0 3
  • 生活確實(shí)很殘酷,一個(gè)人的成長(zhǎng)環(huán)境嚴(yán)重影響一個(gè)人的性格笆焰,不要輕易討論一個(gè)人的脾氣秉性劫谅,你不知道他背后曾經(jīng)經(jīng)歷了...
    落葉Bonnie閱讀 232評(píng)論 0 0
  • 大山里的日子單調(diào)得過分,似乎每天除了上下班嚷掠,就只會(huì)三五一群的說長(zhǎng)道短了捏检。 我們這些二十來歲的人,除了吃就不知道還有...
    羽毛_閱讀 225評(píng)論 2 2