dubbo協(xié)議層 jsonrpc協(xié)議遷移到http協(xié)議

周末看到社區(qū)的協(xié)議遷移開始被提交了pr澜公,還沒merge,打算拜讀一下

jsonrpc.png
httpremoteInvoker.png

看到HttpRemoteInvocation被更改了柠衅,這里是要把json-rpc協(xié)議轉(zhuǎn)換為http協(xié)議

為什么要轉(zhuǎn)換呢?先來看看兩種協(xié)議的不同

struct.png

HTTP 請求本身也可以看做是 RPC 的一種具體形式泊柬。HTTP 請求也一樣是可以從本地發(fā)一個信號到服務(wù)器根穷,服務(wù)器上執(zhí)行某個函數(shù),然后返回一些信息給客戶端弃理。

經(jīng)常用的一些數(shù)據(jù)通過HTTP協(xié)議來傳輸,thrift屎蜓,grpc案铺,xml-rpc,json-rpc都是通過HTTP傳輸?shù)陌鹁浮R簿驼fjson-rpc是依賴http協(xié)議傳輸?shù)目睾海孕陆ㄒ粋€公共的代理協(xié)議,凡是用http協(xié)議傳輸?shù)臄?shù)據(jù)格式都設(shè)置這個協(xié)議返吻,是比較好的方案姑子。

protocol.png

根據(jù)dubbo的類機構(gòu)繼承圖,是沿用SPI機制實現(xiàn)的AbstractProxyProtocol
這里比較重要的是protocolBindingRefer方法测僵。,構(gòu)建一個DubboInvoker對象,根據(jù)dubbo的動態(tài)代理不斷找到調(diào)用方的目標對象街佑,添加到invoke當中,但這里沒有指定要傳輸?shù)母袷胶纯浚栽赿ubbo序列化的時候還是會走默認的hessian協(xié)議沐旨,需要對協(xié)議中傳入的格式進行校驗

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dubbo.rpc.protocol;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;

/**
 * AbstractProxyProtocol
 */
public abstract class AbstractProxyProtocol extends AbstractProtocol {

    private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();

    private ProxyFactory proxyFactory;

    public AbstractProxyProtocol() {
    }

    public AbstractProxyProtocol(Class<?>... exceptions) {
        for (Class<?> exception : exceptions) {
            addRpcException(exception);
        }
    }

    public void addRpcException(Class<?> exception) {
        this.rpcExceptions.add(exception);
    }

    public ProxyFactory getProxyFactory() {
        return proxyFactory;
    }

    public void setProxyFactory(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
        final String uri = serviceKey(invoker.getUrl());
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
        if (exporter != null) {
            // When modifying the configuration through override, you need to re-expose the newly modified service.
            if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
                return exporter;
            }
        }
        final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
        exporter = new AbstractExporter<T>(invoker) {
            @Override
            public void unexport() {
                super.unexport();
                exporterMap.remove(uri);
                if (runnable != null) {
                    try {
                        runnable.run();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            }
        };
        exporterMap.put(uri, exporter);
        return exporter;
    }

    @Override
    protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
        final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = target.invoke(invocation);
                    // FIXME result is an AsyncRpcResult instance.
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
        invokers.add(invoker);
        return invoker;
    }

    protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
        RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: "
                + invocation.getMethodName() + ", cause: " + e.getMessage(), e);
        re.setCode(getErrorCode(e));
        return re;
    }

    protected String getAddr(URL url) {
        String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
        if (url.getParameter(ANYHOST_KEY, false)) {
            bindIp = ANYHOST_VALUE;
        }
        return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
    }

    protected int getErrorCode(Throwable e) {
        return RpcException.UNKNOWN_EXCEPTION;
    }

    protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;

    protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

}

基本思路:將參數(shù)中的service(傳輸格式傳入)設(shè)置到dubbo的代理,拿到代理之后添加到invoke的責任鏈當中榨婆,返回一個dubbo的代理對象給調(diào)用方

*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.rpc.protocol.httpinvoker;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.http.HttpBinder;
import org.apache.dubbo.remoting.http.HttpHandler;
import org.apache.dubbo.remoting.http.HttpServer;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.dubbo.rpc.service.GenericService;
import org.apache.dubbo.rpc.support.ProtocolUtils;

import org.aopalliance.intercept.MethodInvocation;
import org.springframework.remoting.RemoteAccessException;
import org.springframework.remoting.httpinvoker.HttpComponentsHttpInvokerRequestExecutor;
import org.springframework.remoting.httpinvoker.HttpInvokerProxyFactoryBean;
import org.springframework.remoting.httpinvoker.HttpInvokerServiceExporter;
import org.springframework.remoting.httpinvoker.SimpleHttpInvokerRequestExecutor;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.remoting.Constants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;

/**
 * HttpInvokerProtocol
 */
public class HttpInvokerProtocol extends AbstractProxyProtocol {

    public static final int DEFAULT_PORT = 80;

    private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();

    private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();

    private HttpBinder httpBinder;

    public HttpInvokerProtocol() {
        super(RemoteAccessException.class);
    }

    public void setHttpBinder(HttpBinder httpBinder) {
        this.httpBinder = httpBinder;
    }

    @Override
    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
        String addr = getAddr(url);
        HttpServer server = serverMap.get(addr);
        if (server == null) {
            server = httpBinder.bind(url, new InternalHandler());
            serverMap.put(addr, server);
        }
        final String path = url.getAbsolutePath();
        skeletonMap.put(path, createExporter(impl, type));

        final String genericPath = path + "/" + GENERIC_KEY;

        skeletonMap.put(genericPath, createExporter(impl, GenericService.class));
        return new Runnable() {
            @Override
            public void run() {
                skeletonMap.remove(path);
                skeletonMap.remove(genericPath);
            }
        };
    }

    private <T> HttpInvokerServiceExporter createExporter(T impl, Class<?> type) {
        final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
        httpServiceExporter.setServiceInterface(type);
        httpServiceExporter.setService(impl);
        try {
            httpServiceExporter.afterPropertiesSet();
        } catch (Exception e) {
            throw new RpcException(e.getMessage(), e);
        }
        return httpServiceExporter;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
        final String generic = url.getParameter(GENERIC_KEY);
        final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

        final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
        httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
            @Override
            public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
                RemoteInvocation invocation;
                /*
                  package was renamed to 'org.apache.dubbo' in v2.7.0, so only provider versions after v2.7.0 can
                  recognize org.apache.xxx.HttpRemoteInvocation'.
                 */
                if (Version.isRelease270OrHigher(url.getParameter(RELEASE_KEY))) {
                    invocation = new HttpRemoteInvocation(methodInvocation);
                } else {
                    /*
                      The customized 'com.alibaba.dubbo.rpc.protocol.http.HttpRemoteInvocation' was firstly introduced
                      in v2.6.3. The main purpose is to support transformation of attachments in HttpInvokerProtocol, see
                      https://github.com/apache/dubbo/pull/1827. To guarantee interoperability with lower
                      versions, we need to check if the provider is v2.6.3 or higher before sending customized
                      HttpRemoteInvocation.
                     */
                    if (Version.isRelease263OrHigher(url.getParameter(DUBBO_VERSION_KEY))) {
                        invocation = new com.alibaba.dubbo.rpc.protocol.http.HttpRemoteInvocation(methodInvocation);
                    } else {
                        invocation = new RemoteInvocation(methodInvocation);
                    }
                }
                if (isGeneric) {
                    invocation.addAttribute(GENERIC_KEY, generic);
                }
                return invocation;
            }
        });

        String key = url.toIdentityString();
        if (isGeneric) {
            key = key + "/" + GENERIC_KEY;
        }

        httpProxyFactoryBean.setServiceUrl(key);
        httpProxyFactoryBean.setServiceInterface(serviceType);
        String client = url.getParameter(Constants.CLIENT_KEY);
        if (StringUtils.isEmpty(client) || "simple".equals(client)) {
            SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
                @Override
                protected void prepareConnection(HttpURLConnection con,
                                                 int contentLength) throws IOException {
                    super.prepareConnection(con, contentLength);
                    con.setReadTimeout(url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
                    con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
                }
            };
            httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
        } else if ("commons".equals(client)) {
            HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
            httpInvokerRequestExecutor.setReadTimeout(url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
            httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
            httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
        } else {
            throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
        }
        httpProxyFactoryBean.afterPropertiesSet();
        return (T) httpProxyFactoryBean.getObject();
    }

    @Override
    protected int getErrorCode(Throwable e) {
        if (e instanceof RemoteAccessException) {
            e = e.getCause();
        }
        if (e != null) {
            Class<?> cls = e.getClass();
            if (SocketTimeoutException.class.equals(cls)) {
                return RpcException.TIMEOUT_EXCEPTION;
            } else if (IOException.class.isAssignableFrom(cls)) {
                return RpcException.NETWORK_EXCEPTION;
            } else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
                return RpcException.SERIALIZATION_EXCEPTION;
            }
        }
        return super.getErrorCode(e);
    }

    private class InternalHandler implements HttpHandler {

        @Override
        public void handle(HttpServletRequest request, HttpServletResponse response)
                throws IOException, ServletException {
            String uri = request.getRequestURI();
            HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
            if (!"POST".equalsIgnoreCase(request.getMethod())) {
                response.setStatus(500);
            } else {
                RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
                try {
                    skeleton.handleRequest(request, response);
                } catch (Throwable e) {
                    throw new ServletException(e);
                }
            }
        }

    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末磁携,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子良风,更是在濱河造成了極大的恐慌谊迄,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烟央,死亡現(xiàn)場離奇詭異统诺,居然都是意外死亡,警方通過查閱死者的電腦和手機疑俭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門粮呢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事啄寡∫魄樱” “怎么了?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵这难,是天一觀的道長。 經(jīng)常有香客問我葡秒,道長姻乓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任眯牧,我火速辦了婚禮蹋岩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘学少。我一直安慰自己剪个,他們只是感情好,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布版确。 她就那樣靜靜地躺著扣囊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪绒疗。 梳的紋絲不亂的頭發(fā)上侵歇,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天,我揣著相機與錄音吓蘑,去河邊找鬼惕虑。 笑死,一個胖子當著我的面吹牛磨镶,可吹牛的內(nèi)容都是我干的溃蔫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼琳猫,長吁一口氣:“原來是場噩夢啊……” “哼伟叛!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起脐嫂,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤痪伦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后雹锣,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體网沾,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年蕊爵,在試婚紗的時候發(fā)現(xiàn)自己被綠了辉哥。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖醋旦,靈堂內(nèi)的尸體忽然破棺而出恒水,到底是詐尸還是另有隱情,我是刑警寧澤饲齐,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布钉凌,位于F島的核電站,受9級特大地震影響捂人,放射性物質(zhì)發(fā)生泄漏御雕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一滥搭、第九天 我趴在偏房一處隱蔽的房頂上張望酸纲。 院中可真熱鬧,春花似錦瑟匆、人聲如沸闽坡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疾嗅。三九已至,卻和暖如春冕象,著一層夾襖步出監(jiān)牢的瞬間宪迟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工交惯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留次泽,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓席爽,卻偏偏與公主長得像意荤,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子只锻,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 在分布式服務(wù)框架中玖像,一個最基礎(chǔ)的問題就是遠程服務(wù)是怎么通訊的,在Java領(lǐng)域中有很多可實現(xiàn)遠程通訊的技術(shù)齐饮,例如:R...
    java菜閱讀 988評論 0 2
  • 簡單介紹RPC協(xié)議及常見框架捐寥,對比傳統(tǒng)restful api和RPC方式的優(yōu)缺點。常見RPC框架祖驱,gRPC及序列化...
    王勝廣閱讀 146,325評論 9 158
  • 本文將從大的框架層面來聊聊RPC原理和實現(xiàn)握恳,既然叫跨語言RPC,也將以thrift為例講講跨語言RPC如何實現(xiàn)捺僻。在...
    彥幀閱讀 14,536評論 0 19
  • Redux核心概念 設(shè)計思想 (1) Web 應(yīng)用是一個狀態(tài)機乡洼,視圖與狀態(tài)是一一對應(yīng)的(2) 所有的狀態(tài)都保存在一...
    Allan要做活神仙閱讀 318評論 0 0
  • 面食是關(guān)中人的最愛崇裁,地處秦嶺之南的旬陽人也愛吃面,并且在這面里束昵,融入了旬陽元素拔稳,這就有了旬陽特有的酸菜兩摻面。 兩...
    亮子說閱讀 885評論 0 2