pigeon源碼分析-處理請求結(jié)果

RPC調(diào)用處理請求結(jié)果可以分為兩部分:

  • 獲取 response
  • 關(guān)聯(lián) request 和 response

這么分看起來似乎很奇怪至朗,不是直接等待處理完獲取結(jié)果就行了嗎拱撵?

我們說 RPC 調(diào)用都是在模擬這個動作: Result result = service.call(args); 但是遠程調(diào)用畢竟不是本地調(diào)用(其實稍后可以看到還是有相似之處的)篷店,將請求寫到網(wǎng)絡(luò)之后昔园,就無法命令遠端做任何事了扁瓢,這次請求就已經(jīng)告一段落了。

pigeon client 只知道:

  • 向網(wǎng)絡(luò)寫數(shù)據(jù)堕战;
    • 就是寫 request
  • 處理網(wǎng)絡(luò)寫入的數(shù)據(jù)坤溃;
    • 處理成 response

于是不難理解為何有此一問:網(wǎng)絡(luò)另一端寫過來的數(shù)據(jù),我怎么知道是哪個請求的返回值呢嘱丢?

獲取 response

其實從網(wǎng)絡(luò)讀取數(shù)據(jù)薪介,轉(zhuǎn)化成 Object。 pigeon 基于 netty屿讽,獲取 response 就是處理網(wǎng)絡(luò)寫入。

具體實現(xiàn)在:

 // com.dianping.pigeon.remoting.netty.invoker.NettyClientHandler#messageReceived
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        CodecEvent codecEvent = (CodecEvent) e.getMessage();

        if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
            client.processResponse((InvocationResponse) codecEvent.getInvocation());
        }
    }

從 netty 封裝 MessageEvent 轉(zhuǎn)換成 CodecEvent吠裆,再剝開一層就是 InvocationResponse伐谈,當(dāng)然這還是一個比較泛化的返回值,com.dianping.pigeon.remoting.invoker.process.ResponseProcessor 將會進一步處理试疙;

繼續(xù)往下看:

// com.dianping.pigeon.remoting.invoker.process.threadpool.ResponseThreadPoolProcessor#doProcessResponse
public void doProcessResponse(final InvocationResponse response, final Client client) {
        Runnable task = new Runnable() {
            public void run() {
                ServiceInvocationRepository.getInstance().receiveResponse(response);
            }
        };
        try {
            responseProcessThreadPool.execute(task);
        } catch (RejectedExecutionException e) {
            String error = String.format("process response failed:%s, processor stats:%s", response,
                    getProcessorStatistics());
            throw new RejectedException(error, e);
        }
    }

這里封裝成了一個 task诵棵,交給線程池處理。

再下一層:

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#receiveResponse
public class ServiceInvocationRepository {
    // 略
    private static Map<Long, RemoteInvocationBean> invocations = new ConcurrentHashMap<Long, RemoteInvocationBean>();


    public void receiveResponse(InvocationResponse response) {
        RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
        if (invocationBean != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("received response:" + response);
            }
            InvocationRequest request = invocationBean.request;
            try {
                Callback callback = invocationBean.callback;
                if (callback != null) {
                    Client client = callback.getClient();
                    if (client != null) {
                        ServiceStatisticsHolder.flowOut(request, client.getAddress());
                    }
                    callback.callback(response);
                    callback.run();
                }
            } finally {
                invocations.remove(response.getSequence());
            }
        }
    }
// 略
}

這里可以看到祝旷, RemoteInvocationBean invocationBean = invocations.get(response.getSequence()); invocations 維護一個 HashMap履澳,key 是一個 long 型的 sequenceId,通過這種方式定位到 invocationBean怀跛,而 invocationBean 看實現(xiàn)可知持有 request 引用距贷,以及一個處理返回值的 callback。

Callback 在不同調(diào)用模式(sync / future/ oneway/ callback) 下有不同實現(xiàn)類吻谋,比如 sync 模式下:

// public class CallbackFuture implements Callback, CallFuture {   
@Override
    public void callback(InvocationResponse response) {
        this.response = response;
    }

sync 和 future 調(diào)用忠蝗,都是將 response 對象實例設(shè)置給相應(yīng)的引用

何時得到真正的 returnValue?

看動態(tài)代理的邏輯漓拾,com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#invoke

com.dianping.pigeon.remoting.invoker.process.filter.InvocationInvokeFilter 挨個執(zhí)行完之后,提取返回值

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
    public Object extractResult(InvocationResponse response, Class<?> returnType) throws Throwable {
        Object responseReturn = response.getReturn();
        // ...
}

拿到的是頂層父類實例 Object,具體的類型匹配需要客戶端和服務(wù)端自行匹配

關(guān)聯(lián) request 和 response

從上面的分析可以看出绿渣,關(guān)鍵點就在于 sequence拳喻,每次調(diào)用應(yīng)該有個唯一的 id 進行匹配

這個 sequence 是唯一的嗎?

如果不唯一低千,就可能導(dǎo)致拿到錯誤的處理結(jié)果配阵。

sequence 的生成位置

//com.dianping.pigeon.remoting.invoker.process.filter.ContextPrepareInvokeFilter#initRequest

private static AtomicLong requestSequenceMaker = new AtomicLong();

request.setSequence(requestSequenceMaker.incrementAndGet() * -1);

可以看到 這個 sequence 是全局唯一的,準確說是同一個 JVM 中是唯一的,而且是 long 類型闸餐,足夠大饱亮;

Q:sequence 發(fā)生回繞怎么辦?

A:long 類型舍沙,即使發(fā)生回繞近上,也需要足夠長的時間,一般來說不會堆積有那么多的請求拂铡,導(dǎo)致兩個相同的 sequenceId 實際對應(yīng)不同請求壹无;

分布式環(huán)境下,sequenceId 在多個機器上可能重復(fù)感帅,會出錯嗎斗锭?

A:sequence 存儲的 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#invocations也是同一個 JVM 唯一的,所以只需要擔(dān)心會不會有這樣的場景:

client A 調(diào)用 server A失球,client B 也調(diào)用 server A岖是,但是 server A 把 client B的請求返回值處理之后發(fā)送到了 client A?

看看服務(wù)端的處理实苞,寫返回值:

//com.dianping.pigeon.remoting.provider.process.filter.WriteResponseProcessFilter#invoke
    public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
            throws Throwable {
        try {
            ProviderChannel channel = invocationContext.getChannel();
            InvocationRequest request = invocationContext.getRequest();
            InvocationResponse response = handler.handle(invocationContext);
            if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
                channel.write(invocationContext, response);
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
            }
        // ...

channel 就是對socket的封裝豺撑,可以看成是 client / server 對對方的抽象。

那么只需要保證拿到正確的 channel 就對了:

//com.dianping.pigeon.remoting.netty.provider.NettyServerHandler#messageReceived
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
        CodecEvent codecEvent = (CodecEvent) (message.getMessage());

        if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
            return;
        }

        InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();

        ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
        //...
    }

也就是說黔牵,再 client 端 writeRequest() 之后聪轿,server 端讀取網(wǎng)絡(luò)數(shù)據(jù)的時候就從 context 中獲取到 client 所在的 channel 了,簡單來說猾浦,”從哪里來陆错,到哪里去“。

總的來說金赦,不會發(fā)生以上所述的 sequenceId 錯亂的問題音瓷。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市夹抗,隨后出現(xiàn)的幾起案子外莲,更是在濱河造成了極大的恐慌,老刑警劉巖兔朦,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件偷线,死亡現(xiàn)場離奇詭異,居然都是意外死亡沽甥,警方通過查閱死者的電腦和手機声邦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來摆舟,“玉大人亥曹,你說我怎么就攤上這事邓了。” “怎么了媳瞪?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵骗炉,是天一觀的道長。 經(jīng)常有香客問我蛇受,道長句葵,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任兢仰,我火速辦了婚禮乍丈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘把将。我一直安慰自己轻专,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布察蹲。 她就那樣靜靜地躺著请垛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪洽议。 梳的紋絲不亂的頭發(fā)上宗收,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天,我揣著相機與錄音绞铃,去河邊找鬼镜雨。 笑死嫂侍,一個胖子當(dāng)著我的面吹牛儿捧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播挑宠,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼菲盾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了各淀?” 一聲冷哼從身側(cè)響起懒鉴,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎碎浇,沒想到半個月后临谱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡奴璃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年悉默,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苟穆。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡抄课,死狀恐怖唱星,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情跟磨,我是刑警寧澤间聊,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站抵拘,受9級特大地震影響哎榴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜仑濒,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一叹话、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧墩瞳,春花似錦驼壶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至泪电,卻和暖如春般妙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背相速。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工碟渺, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人突诬。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓苫拍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親旺隙。 傳聞我的和親對象是個殘疾皇子绒极,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容

  • pigeon源碼分析-同步調(diào)用和異步調(diào)用 Pigeon是美團點評內(nèi)部廣泛使用的一個分布式服務(wù)通信框架(RPC),本...
    WhiteBase閱讀 1,470評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理蔬捷,服務(wù)發(fā)現(xiàn)垄提,斷路器,智...
    卡卡羅2017閱讀 134,639評論 18 139
  • 國家電網(wǎng)公司企業(yè)標準(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 10,934評論 6 13
  • 1.感謝最強大腦周拐,發(fā)現(xiàn)了自己觀察視角的不同铡俐,會發(fā)現(xiàn)最強大腦觀眾席抓拍到的觀眾都沒有化妝,是因為受眾不同妥粟。 2.發(fā)現(xiàn)...
    張洪瑜閱讀 172評論 0 0
  • 這還是高一時候的事审丘。好吧,說我記仇也好罕容,懷舊也罷备恤。就這樣在心里生了根稿饰,后來發(fā)了芽。 是的露泊,我喜歡上一個人喉镰,故事太多...
    Grexogr閱讀 176評論 0 1