自己動(dòng)手實(shí)現(xiàn)RPC框架(2)-服務(wù)調(diào)用

遠(yuǎn)程服務(wù)調(diào)用

在上一步,我們實(shí)現(xiàn)了一個(gè)服務(wù)發(fā)布程序唠亚,這一步我們要進(jìn)行客戶端調(diào)用链方。

實(shí)現(xiàn)細(xì)節(jié)

  • 序列化,客戶端和服務(wù)端使用相同的序列化協(xié)議灶搜,也就是protostuff侄柔。
  • 網(wǎng)絡(luò),也使用netty發(fā)起客戶端請(qǐng)求占调,客戶端連接保持長(zhǎng)鏈接暂题。
  • 動(dòng)態(tài)代理,使用jdk原生的動(dòng)態(tài)代理究珊,也就是必須基于接口生成代理類薪者。rpc調(diào)用本身需要依賴服務(wù)端API,因此沒有問題剿涮。至于不需要依賴API的泛化調(diào)用言津,不是這一步考慮的問題。

設(shè)計(jì)

類圖概況

整體的類圖如下:


客戶端類圖

時(shí)序圖

客戶端時(shí)序圖

重要類描述

  • ConsumerFactory取试,負(fù)責(zé)根據(jù)提供的ConsumerDescriptor生成代理類悬槽,內(nèi)部使用的JDK動(dòng)態(tài)代理。
  • RemoteProcedureInvoker瞬浓,consumerFactory生產(chǎn)的代理類只是一個(gè)門面初婆,內(nèi)部實(shí)際調(diào)用委托給此類。這是最核心的一個(gè)骨架類猿棉。
    • ServiceInstanceSelector磅叛,服務(wù)實(shí)例選擇器,根據(jù)制定的ConsumerDescriptor獲取一個(gè)遠(yuǎn)程的服務(wù)實(shí)例(即ServiceInstance)萨赁,ServiceInstance表示具體的服務(wù)實(shí)例弊琴,如集群中的某一個(gè)機(jī)器上的某一個(gè)服務(wù)。
      • 目前先實(shí)現(xiàn)一個(gè)直連的實(shí)現(xiàn)杖爽,DirectServiceInstanceSelector敲董,直接根據(jù)Consumer配置的信息返回實(shí)例,不查詢注冊(cè)中心之類的組件慰安。未來可以有其它實(shí)現(xiàn)腋寨。
    • ConnectionSupplier,實(shí)際是connection的工廠類泻帮,根據(jù)ServiceInstance生成ConsumingConnection精置,實(shí)現(xiàn)的時(shí)候需要考慮ServiceInstance指向的服務(wù)的協(xié)議,目前不考慮多協(xié)議锣杂。
      • 默認(rèn)提供基于Netty NIO的長(zhǎng)鏈接實(shí)現(xiàn)脂倦,序列化使用Protostuff。

代碼實(shí)現(xiàn)

RemoteProcedureInvoker

RemoteServiceAccessor

package io.destinyshine.storks.consumer;

import ...

/**
 * a base class for RemoteProcedureInvoker
 *
 * @author liujianyu.ljy
 * @date 2017/09/07
 */
@Slf4j
public abstract class RemoteServiceAccessor {

    protected ServiceInstanceSelector serviceInstanceSelector;
    protected ConnectionSupplier connectionSupplier;

    protected ConsumingConnection obtainConnection(ConsumerDescriptor<?> desc) throws Exception {
        ServiceKey serviceKey = ServiceKey.of(desc);

        Optional<ServiceInstance> instOpt = serviceInstanceSelector.select(desc);

        if (instOpt.isPresent()) {
            ServiceInstance inst = instOpt.get();
            return (connectionSupplier.getConnection(inst));
        } else {
            throw new ServiceNotFoundException("cannot found service of " + serviceKey + " in registry.");
        }
    }

    public void setServiceInstanceSelector(ServiceInstanceSelector serviceInstanceSelector) {
        this.serviceInstanceSelector = serviceInstanceSelector;
    }

    public void setConnectionSupplier(ConnectionSupplier connectionSupplier) {
        this.connectionSupplier = connectionSupplier;
    }
}

DefaultRemoteProcedureInvoker

package io.destinyshine.storks.consumer;

import ...

/**
 * @author destinyliu
 */
@Slf4j
public class DefaultRemoteProcedureInvoker extends RemoteServiceAccessor implements RemoteProcedureInvoker {

    @Override
    public ResponseMessage invoke(ConsumerDescriptor desc, RequestMessage requestMessage) throws Exception {
        ConsumingConnection connection = obtainConnection(desc);
        Future<ResponseMessage> responsePromise = connection.sendRequest(requestMessage);
        return responsePromise.get();
    }

}

ServiceInstanceSelector

DirectServiceInstanceSelector

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * 只支持直連的服務(wù)選擇器
 *
 * @author liujianyu
 * @date 2017/09/03
 */
public class DirectServiceInstanceSelector implements ServiceInstanceSelector {

    @Override
    public Optional<ServiceInstance> select(ConsumerDescriptor desc) {
        if (desc.isDirect()) {
            ServiceInstance inst = ServiceInstance.builder()
                .host(desc.getRemoteHost())
                .port(desc.getRemotePort())
                .serviceInterface(desc.getServiceInterface().getName())
                .serviceVersion(desc.getServiceVersion())
                .build();
            return Optional.of(inst);
        }
        return Optional.empty();
    }

}

ConnectionSupplier

AbstractConnectionSupplier

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * @author destinyliu
 */
@Slf4j
public abstract class AbstractConnectionSupplier implements ConnectionSupplier {

    private Map<ServiceInstance, ConsumingConnection> connectionCache = new ConcurrentHashMap<>();

    @Override
    public synchronized ConsumingConnection getConnection(ServiceInstance instance) throws Exception {
        ConsumingConnection con = connectionCache.computeIfAbsent(instance,
            instance1 -> {
                try {
                    return createConnectionInternal(instance1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            });

        return con;
    }

    /**
     * create connection
     *
     * @param instance
     * @return
     * @throws Exception
     */
    protected abstract ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception;

    public void shutdown() {
        logger.warn("shutting down connectionManager...");
        connectionCache.forEach((serviceKey, con) -> {
            logger.warn("closing all connections of serviceKey={}", serviceKey);
            try {
                con.close();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            logger.warn(
                "closed connection of serviceKey={}, {}",
                serviceKey,
                con
            );
            logger.warn("closed all connections of serviceKey={}", serviceKey);
        });
        logger.warn("shutdown connectionManager finished.");
    }
}

SocketChannelConsumingConnectionSupplier

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * NIO consuming connection supplier.
 *
 * @author liujianyu
 * @date 2017/09/03
 */
@Slf4j
public class SocketChannelConsumingConnectionSupplier extends AbstractConnectionSupplier implements ConnectionSupplier {

    @Override
    public ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception {
        logger.info("will create connection of instance {}", instance);
        String remoteHost = instance.getHost();
        int remotePort = instance.getPort();
        SocketChannelConsumingConnection con = new SocketChannelConsumingConnection(remoteHost, remotePort);
        con.connect();
        return con;
    }
}

ConsumingConnection

在ConsumingConnection的實(shí)現(xiàn)過程中元莫,我們使用基于Netty 的NIO方式實(shí)現(xiàn)赖阻。由于NIO的response返回是異步的,并且發(fā)送Request后不會(huì)阻塞線程知道遠(yuǎn)程服務(wù)端返回詳細(xì)踱蠢;所以需要維護(hù)一個(gè)ConcurrentLinkedQueue<Promise<ResponseMessage>>隊(duì)列火欧,當(dāng)發(fā)送一個(gè)請(qǐng)求的時(shí)候,增加一個(gè)Promise到隊(duì)列中茎截,并返回這個(gè)Promise苇侵。收到響應(yīng)的時(shí)候取隊(duì)頭的Promise設(shè)置結(jié)果,在Promise上等待的線程可收到結(jié)果企锌∮芘ǎ可參考下面的代碼。

Netty中的Promise是Future的子接口撕攒,類似于java8自帶的CompletableFuture陡鹃,能夠增加監(jiān)聽器監(jiān)聽其是否完成,也能夠手動(dòng)設(shè)置結(jié)果抖坪∑季ǎ可參考java8的CompletableFuture.

實(shí)現(xiàn)代碼。

package io.destinyshine.storks.consumer.support;

import ...

/**
 * @author liujianyu
 */
public class SocketChannelConsumingConnection implements AutoCloseable, ConsumingConnection {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private ConcurrentLinkedQueue<Promise<ResponseMessage>> responsePromises = new ConcurrentLinkedQueue<>();

    private final String remoteHost;
    private final int remotePort;

    private Channel channel;

    private long connectedTime;

    public SocketChannelConsumingConnection(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void connect() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.remoteAddress(new InetSocketAddress(remoteHost, remotePort));
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    super.channelActive(ctx);
                }

                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(new ProtostuffEncoder<>(RequestMessage.class))
                        .addLast(Protocol.newFrameDecoder())
                        .addLast(new ProtostuffDecoder<>(ResponseMessage.class, ResponseMessage::new))
                        .addLast(new SimpleChannelInboundHandler<ResponseMessage>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg)
                                throws Exception {
                                Promise<ResponseMessage> promise;
                                if ((promise = responsePromises.poll()) != null) {
                                    promise.setSuccess(msg);
                                } else {
                                    logger.error("remote server closed!");
                                }
                            }
                        });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect().sync();
            this.channel = channelFuture.channel();
            this.connectedTime = System.currentTimeMillis();
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    logger.debug(future.toString() + "client connected");
                } else {
                    logger.debug(future.toString() + "server attemp failed", future.cause());
                }

            });
        } finally {

        }
    }

    @Override
    public Promise<ResponseMessage> sendRequest(RequestMessage requestMessage) {
        Promise<ResponseMessage> promise = this.channel.eventLoop().newPromise();
        this.responsePromises.add(promise);
        this.channel.writeAndFlush(requestMessage);
        return promise;
    }

    @Override
    public String toString() {
        return "SocketChannelConsumingConnection{" +
            "remoteHost='" + remoteHost + '\'' +
            ", remotePort=" + remotePort +
            '}';
    }

    @Override
    public void close() {
        channel.close().awaitUninterruptibly();
    }
}

運(yùn)行客戶端

同樣使用原始的方式執(zhí)行客戶端代碼擦俐。

在這個(gè)實(shí)例中脊阴,用多個(gè)線程發(fā)起并行RPC調(diào)用,試驗(yàn)是否能夠正確響應(yīng)蚯瞧,多個(gè)線程之間是否會(huì)錯(cuò)亂蹬叭。經(jīng)過試驗(yàn)多線程正常調(diào)用和響應(yīng)。

package io.destinyshine.storks.sample.service;

import ...

/**
 * @author liujianyu
 */
public class DirectClientMain {

    private static final Logger logger = LoggerFactory.getLogger(DirectClientMain.class);

    public static void main(String[] args) throws Exception {

        DefaultRemoteProcedureInvoker invoker = new DefaultRemoteProcedureInvoker();
        invoker.setConnectionSupplier(new SocketChannelConsumingConnectionSupplier());
        invoker.setServiceInstanceSelector(new DirectServiceInstanceSelector());

        ConsumerDescriptor<HelloService> desc = ConsumerBuilder
            .ofServiceInterface(HelloService.class)
            .remoteServer("127.0.0.1")
            .remotePort(39874)
            .serviceVersion("1.0.0")
            .direct(true)
            .build();

        ConsumerFactory consumerFactory = new DefaultConsumerFactory(invoker);

        HelloService helloServiceConsumer = consumerFactory.getConsumer(desc);

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            int finalI = i;
            executorService.submit(() -> {
                String input = null;
                String result = null;
                try {
                    input = "tom,direct," + Thread.currentThread().getName() + "," + finalI;
                    result = helloServiceConsumer.hello(input);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                logger.info("input={}, get result: {}", input, result);
            });
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                invoker.close();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }));

    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末状知,一起剝皮案震驚了整個(gè)濱河市秽五,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌饥悴,老刑警劉巖坦喘,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異西设,居然都是意外死亡瓣铣,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門贷揽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來棠笑,“玉大人,你說我怎么就攤上這事禽绪”途龋” “怎么了洪规?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)循捺。 經(jīng)常有香客問我斩例,道長(zhǎng),這世上最難降的妖魔是什么从橘? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任念赶,我火速辦了婚禮,結(jié)果婚禮上恰力,老公的妹妹穿的比我還像新娘叉谜。我一直安慰自己,他們只是感情好踩萎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布停局。 她就那樣靜靜地躺著,像睡著了一般驻民。 火紅的嫁衣襯著肌膚如雪翻具。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天回还,我揣著相機(jī)與錄音裆泳,去河邊找鬼。 笑死柠硕,一個(gè)胖子當(dāng)著我的面吹牛工禾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蝗柔,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼闻葵,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了癣丧?” 一聲冷哼從身側(cè)響起槽畔,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胁编,沒想到半個(gè)月后厢钧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嬉橙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年早直,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片市框。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡霞扬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情喻圃,我是刑警寧澤萤彩,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站级及,受9級(jí)特大地震影響乒疏,放射性物質(zhì)發(fā)生泄漏额衙。R本人自食惡果不足惜饮焦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望窍侧。 院中可真熱鬧县踢,春花似錦、人聲如沸伟件。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽斧账。三九已至谴返,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間咧织,已是汗流浹背嗓袱。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留习绢,地道東北人渠抹。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像闪萄,于是被迫代替她去往敵國(guó)和親梧却。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理败去,服務(wù)發(fā)現(xiàn)放航,斷路器,智...
    卡卡羅2017閱讀 134,654評(píng)論 18 139
  • 從三月份找實(shí)習(xí)到現(xiàn)在圆裕,面了一些公司广鳍,掛了不少,但最終還是拿到小米葫辐、百度搜锰、阿里、京東耿战、新浪蛋叼、CVTE、樂視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,243評(píng)論 11 349
  • 什么是RPC rpc是遠(yuǎn)程過程調(diào)用,在本地代碼中使用模擬調(diào)用本地方法的形式調(diào)用遠(yuǎn)程的服務(wù)過程狈涮。 RPC的優(yōu)點(diǎn) 對(duì)于...
    景樗子劉閱讀 3,690評(píng)論 1 4
  • 晚上喝了一杯咖啡狐胎,本以為可以熬夜看看書的,今天的任務(wù)沒有完成歌馍,不能拖到明天握巢,可是,還是洗漱上床了松却。雖然沒有完成今天...
    檸檬安然閱讀 129評(píng)論 0 0
  • 月度檢視模版: 【60天月度檢視】Derrick #基本情況#(寫孩子的) 姓名:Derrick 年齡:11歲 小...
    不停跑閱讀 192評(píng)論 0 0