pigeon源碼分析-同步調(diào)用和異步調(diào)用

pigeon源碼分析-同步調(diào)用和異步調(diào)用

Pigeon是美團點評內(nèi)部廣泛使用的一個分布式服務(wù)通信框架(RPC)述雾,本文所有分析基于 pigeon 開源版本:RPC framework of DIANPING路幸,具體版本為:2.9.12-SNAPSHOT

概要:先從調(diào)用方視角 RPC 框架基本原理,然后介紹動態(tài)代理的生成,從這里切入之后再詳細介紹同步、異步調(diào)用;

RPC就是要在分布式場景下完成這樣一個動作: Result result = service.call(args) 肃拜,在這個= 等號背后 RPC 框架做了很多事。

RPC 調(diào)用在客戶端視角下的分解

簡單來說可以分成這幾步:

  • Client client = findClient(); // 這一步主要是服務(wù)治理相關(guān)雌团,本文不關(guān)注燃领;
  • client.write(request);
  • Object resp = waitResponse();
  • Result result = convert2Result(resp);

pigeon 在客戶端是如何生成代理的?

遠程調(diào)用離不開動態(tài)代理锦援,其實 pigeon service 在客戶端視角就是生成了一個被調(diào)用接口的實現(xiàn)類猛蔽,通過網(wǎng)絡(luò)請求來返回數(shù)據(jù),這樣就好像實現(xiàn)類在本地一樣雨涛;

生成代理的邏輯:
在 bean 初始化的時候枢舶,獲取 Proxy 實例,代碼如下:

// com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean#init
    public void init() throws Exception {
            // 進行初始化配置
        this.obj = ServiceFactory.getService(invokerConfig);
        // 略
    }

使用 JDK 動態(tài)代理替久,真正執(zhí)行的邏輯都在 java.lang.reflect.InvocationHandler 中凉泄,

可以看到 invoke() 時執(zhí)行的是 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult,執(zhí)行的邏輯是一個 FilterChain蚯根,成員見 com.dianping.pigeon.remoting.invoker.process.InvokerProcessHandlerFactory#init后众,每個 InvocationInvokeFilter 的作用基本見名知意,完成一些統(tǒng)計颅拦、上下文準(zhǔn)備蒂誉、監(jiān)控等功能;

終于到了 RemoteCallInvokeFilter距帅,這是執(zhí)行遠程調(diào)用最關(guān)鍵的一步右锨,主要代碼如下:

//com.dianping.pigeon.remoting.invoker.process.filter.RemoteCallInvokeFilter#invoke

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
            // 略
        try {
            switch (callMethod) {
                case SYNC:
                    // do call
                     break;
                case CALLBACK:
                    // do call
                    break;
                case FUTURE:
                    //do call
                    break;
                case ONEWAY:
                    //do call
                    break;
                default:
                    throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");

            }

            ((DefaultInvokerContext)invocationContext).setResponse(response);
            afterInvoke(invocationContext);
        } catch (Throwable t) {
            afterThrowing(invocationContext, t);
            throw t;
        }

        return response;
    }

可以看到,pigeon 支持四種調(diào)用類型:sync, callback, future, oneway

這里主要分析 sync 和 future碌秸,分別對應(yīng)一般意義上的同步绍移、異步調(diào)用模式,其他兩種是在這兩種上做了特殊處理讥电;

SYNC-同步調(diào)用

同步調(diào)用蹂窖,主要邏輯:

case SYNC:
    CallbackFuture future = new CallbackFuture();
    response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
    if (response == null) {
        response = future.getResponse(request.getTimeout());
    }
    break;

先初始化了一個 callbackFuture,之后 sendRequest()恩敌,最后獲取 response

進一步查看 sendRequest() 實現(xiàn):

com.dianping.pigeon.remoting.invoker.util.InvokerUtils#sendRequest(com.dianping.pigeon.remoting.invoker.Client, com.dianping.pigeon.remoting.common.domain.InvocationRequest, com.dianping.pigeon.remoting.invoker.concurrent.Callback)
 
 --> com.dianping.pigeon.remoting.invoker.AbstractClient#write
 實際的 doWrite() 有兩種實現(xiàn)瞬测,HTTP / netty,這里看 NettyClient
 
 --> com.dianping.pigeon.remoting.netty.invoker.NettyClient#doWrite
 
 // 調(diào)用 netty client 寫請求
     @Override
    public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
        NettyChannel channel = null;

        try {

            channel = channelPool.selectChannel();

            ChannelFuture future = channel.write0(request);

            afterWrite(request, channel);

            if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
                    || request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
                future.addListener(new MessageWriteListener(request, channel));
            }

        } catch (Exception e) {
            throw new NetworkException("[doRequest] remote call failed:" + request, e);
        }
        return null;
    }

仔細看這里永遠返回 null,然后再看 sync 調(diào)用執(zhí)行的下一步月趟,就是 response = future.getResponse(request.getTimeout()); , 判斷 sendRequest() 返回結(jié)果為 null灯蝴,就從 future 獲取結(jié)果,這兩步就聯(lián)系起來了孝宗;

而 getResponse(timeout) 所做的事情只有一件:while 循環(huán)等待到 isDone == true || timeout绽乔,這里不詳細展示,看代碼即可碳褒;

拓展一下,這里只做等待看疗,那么返回值從哪里來呢沙峻?答案是等待 socket 的另一端也就是 server 回寫;

回過頭來看两芳,remoteCallInvocationFilter 所做的就是寫請求摔寨,然后等待接受返回值;

FUTURE-異步調(diào)用

Future 調(diào)用是 pigeon 提供的標(biāo)準(zhǔn)異步調(diào)用模式怖辆。使用方式如下:

//調(diào)用ServiceA的method1
serviceA.method1("aaa");
//獲取ServiceA的method1調(diào)用future狀態(tài)
Future future1OfServiceA = InvokerHelper.getFuture();
//調(diào)用ServiceA的method2
serviceA.method2("bbb");
//獲取ServiceA的method2調(diào)用future狀態(tài)
Future future2OfServiceA = InvokerHelper.getFuture();

//獲取ServiceA的method2調(diào)用結(jié)果
Object result2OfServiceA = future2OfServiceA.get();
//獲取ServiceA的method1調(diào)用結(jié)果
Object result1OfServiceA = future1OfServiceA.get();

簡單來說就是:調(diào)用 —> 獲取 Future —> 獲取結(jié)果是复。

因為每次調(diào)用不用阻塞等待結(jié)果,而是拿到 Future 實例之后由開發(fā)者在合適的時機獲取返回值竖螃,所以合理使用能提高并發(fā)性淑廊;

來看下實現(xiàn):

case FUTURE:
    ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
    InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
    FutureFactory.setFuture(futureImpl);
    response = InvokerUtils.createFutureResponse(futureImpl);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
break;

可以看到只有 sendRequest(),沒有等待返回特咆,開發(fā)文檔中有介紹季惩,F(xiàn)uture 調(diào)用之后需要開發(fā)者自行通過 future.get(timeout); 獲取返回值;

FutureFactory.setFuture(futureImpl); 這一步做了什么腻格?

public class FutureFactory {
    private static ThreadLocal<Future<?>> threadFuture = new ThreadLocal<Future<?>>();

    public static void setFuture(Future<?> future) {
        threadFuture.set(future);
    }

其實就是將 Future 實例設(shè)置到 ThreadLocal 中画拾,所以每次調(diào)用結(jié)束之后立即通過 Future future1OfServiceA = InvokerHelper.getFuture(); 獲取Future實例,否則如果緊接著進行了下一次future調(diào)用菜职,就會因為ThreadLocal 中的 future 被覆蓋而無法正確獲取返回值青抛;

思考 :返回值的 Future 是什么作用?

答案:其實 pigeon 中的 Future 實現(xiàn)了 JDK 的標(biāo)準(zhǔn)語義酬核,即:A Future represents the result of an asynchronous computation . 代表了一次異步計算的結(jié)果蜜另。簡單來說,就是一個暫時代表結(jié)果的占位符愁茁。

需要結(jié)果的時候就去看看有沒有 response 了蚕钦,有就返回,沒有就阻塞到超時時間到鹅很;

pigeon 同步調(diào)用與異步調(diào)用處理有哪些不同

同步嘶居、異步,都是先發(fā)送完請求,然后等待結(jié)果邮屁。

SYNC 模式下整袁,立即等待直到拿到結(jié)果或者超時,然后將結(jié)果(超時情況下則是異常)返回給調(diào)用方佑吝;

FUTURE 模式下坐昙,立即返回個占位符 Future,這里的實現(xiàn)類是:com.dianping.pigeon.remoting.invoker.concurrent.ServiceFutureImpl#ServiceFutureImpl芋忿,將真正獲取返回值的時機交給調(diào)用方控制炸客;

這里也因為 Future 模式實現(xiàn)時使用了 ThreadLocal 暫時存儲占位符,所以多次 Future 調(diào)用戈钢,一定要順序獲取結(jié)果痹仙;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市殉了,隨后出現(xiàn)的幾起案子开仰,更是在濱河造成了極大的恐慌,老刑警劉巖薪铜,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件众弓,死亡現(xiàn)場離奇詭異,居然都是意外死亡隔箍,警方通過查閱死者的電腦和手機谓娃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜒滩,“玉大人傻粘,你說我怎么就攤上這事“锏簦” “怎么了弦悉?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蟆炊。 經(jīng)常有香客問我稽莉,道長,這世上最難降的妖魔是什么涩搓? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任污秆,我火速辦了婚禮,結(jié)果婚禮上昧甘,老公的妹妹穿的比我還像新娘良拼。我一直安慰自己,他們只是感情好充边,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布庸推。 她就那樣靜靜地躺著常侦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贬媒。 梳的紋絲不亂的頭發(fā)上聋亡,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機與錄音际乘,去河邊找鬼坡倔。 笑死,一個胖子當(dāng)著我的面吹牛脖含,可吹牛的內(nèi)容都是我干的罪塔。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼养葵,長吁一口氣:“原來是場噩夢啊……” “哼垢袱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起港柜,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎咳榜,沒想到半個月后夏醉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡涌韩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年畔柔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片臣樱。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡靶擦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出雇毫,到底是詐尸還是另有隱情玄捕,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布棚放,位于F島的核電站枚粘,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏飘蚯。R本人自食惡果不足惜馍迄,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望局骤。 院中可真熱鬧攀圈,春花似錦、人聲如沸峦甩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至撕捍,卻和暖如春拿穴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背忧风。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工默色, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人狮腿。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓腿宰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缘厢。 傳聞我的和親對象是個殘疾皇子吃度,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)贴硫,斷路器椿每,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 前言 很多朋友對異步編程都處于“聽說很強大”的認(rèn)知狀態(tài)。鮮有在生產(chǎn)項目中使用它英遭。而使用它的同學(xué)间护,則大多數(shù)都停留在知...
    星星在線閱讀 2,853評論 2 39
  • “ 消息隊列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段。它具有低耦合挖诸、可靠投遞汁尺、廣播、流量控制多律、最終一致性等一系列...
    落羽成霜丶閱讀 3,974評論 1 41
  • 接著上節(jié) condition_varible 痴突,本節(jié)主要介紹future的內(nèi)容,練習(xí)代碼地址狼荞。本文參考http:/...
    jorion閱讀 14,760評論 1 5
  • 小七 13歲 愛吃甜食 體型卻削瘦 身高比同齡人矮5公分辽装。因為中學(xué)有早自習(xí),每天要五點起床而小七不會扎...
    小婊七閱讀 285評論 2 0