SOFABolt 源碼分析16 - 上下文機(jī)制的設(shè)計

SOFABolt 中存在四種上下文 context

  • InvokeContext:調(diào)用上下文,用于端內(nèi)隱式傳參千贯,并可以通過自定義序列化器將 InvokeContext 內(nèi)存儲的參數(shù)自定義的序列化傳遞給對端(注意:InvokeContext 本身是不會傳遞給對端的)
  • RemotingContext:Remoting 層的上下文髓堪,程序內(nèi)部使用
  • BizContext:業(yè)務(wù)上下文送朱,提供給用戶程序使用,封裝了 RemotingContext干旁,防止直接將 RemotingContext 暴露給用戶
  • AsyncContext:存儲存根信息驶沼,用于 AsyncUserProcessor 異步返回響應(yīng)

一、InvokeContext

1.1 使用姿勢

SOFABolt 源碼分析10 - 精細(xì)的線程模型的設(shè)計 的 “2.4 設(shè)置 UserProcessor 自定義線程池選擇器”

1.2 源碼分析

public class InvokeContext {
    // ~~~ invoke context keys of client side
    public final static String                CLIENT_LOCAL_IP        = "bolt.client.local.ip";
    public final static String                CLIENT_LOCAL_PORT      = "bolt.client.local.port";
    public final static String                CLIENT_REMOTE_IP       = "bolt.client.remote.ip";
    public final static String                CLIENT_REMOTE_PORT     = "bolt.client.remote.port";
    /** time consumed during connection creating, this is a timespan */
    public final static String                CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";

    // ~~~ invoke context keys of server side
    public final static String                SERVER_LOCAL_IP        = "bolt.server.local.ip";
    public final static String                SERVER_LOCAL_PORT      = "bolt.server.local.port";
    public final static String                SERVER_REMOTE_IP       = "bolt.server.remote.ip";
    public final static String                SERVER_REMOTE_PORT     = "bolt.server.remote.port";

    // ~~~ invoke context keys of client and server side
    public final static String                BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
    /** 時間段:請求達(dá)到解碼器 ~ 請求即將被處理(與處理完成) */
    public final static String                BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
    public final static String                BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
    public final static String                BOLT_CRC_SWITCH        = "bolt.invoke.crc.switch";

    public final static int                   INITIAL_SIZE           = 8;

    /** context */
    private ConcurrentHashMap<String, Object> context;

    public InvokeContext() {
        this.context = new ConcurrentHashMap<String, Object>(INITIAL_SIZE);
    }
}

注意:

  • InvokeContext 內(nèi)部實際上就是一個 map疤孕,而不是像 RpcInvokeContext 一樣商乎,內(nèi)部是一個 ThreadLocal(RpcInvokeContext 是 SOFARPC 的上下文)
  • 由于 InvokeContext 內(nèi)部只是一個 map,所以 InvokeContext 本身不能進(jìn)行“隱式傳遞”祭阀,InvokeContext 本身需要作為接口的參數(shù)進(jìn)行傳遞才行鹉戚,所以四種調(diào)用模式的三種調(diào)用鏈路方式都提供了帶有 InvokeContext 參數(shù)的方法,eg. invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis)
  • InvokeContext 不會傳遞給對端专控,但是其中的內(nèi)容可以通過自定義序列化器的方式傳遞給對端抹凳,使用姿勢見 SOFABolt 源碼分析10 - 精細(xì)的線程模型的設(shè)計

設(shè)置建連消耗時間(僅客戶端,服務(wù)端不建連)

    public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
        final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
        this.connectionManager.check(conn);
        return this.invokeSync(conn, request, invokeContext, timeoutMillis);
    }

    protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
        // 記錄開始時間
        long start = System.currentTimeMillis();
        Connection conn;
        try {
            // 建連
            conn = this.connectionManager.getAndCreateIfAbsent(url);
        } finally {
            if (null != invokeContext) {
                // 記錄建連時間(客戶端伦腐,服務(wù)端不建連)
                invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
            }
        }
        return conn;
    }

設(shè)置客戶端自定義序列化器 + crc 開關(guān) + 四要素(ip/port)+ requestID

    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 創(chuàng)建請求:會根據(jù) invokeContext 配置 - 設(shè)置客戶端自定義序列化器 + crc 開關(guān)
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        // 預(yù)處理 InvokeContext:設(shè)置四要素(IP/PORT) + 請求ID
        preProcessInvokeContext(invokeContext, requestCommand, conn);
        // 發(fā)起請求
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
        // 將 invokeContext 設(shè)置到 responseCommand赢底,可以讓用戶使用
        responseCommand.setInvokeContext(invokeContext);

        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
            RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

    protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
        RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);

        if (null != invokeContext) {
            // 設(shè)置客戶端自定義序列化器
            Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
            if (null != clientCustomSerializer) {
                    command.setSerializer((Byte) clientCustomSerializer);
            }

            // 是否開啟 crc 開關(guān)
            // enable crc by default, user can disable by set invoke context `false` for key `InvokeContext.BOLT_CRC_SWITCH`
            Boolean crcSwitch = invokeContext.get(InvokeContext.BOLT_CRC_SWITCH, ProtocolSwitch.CRC_SWITCH_DEFAULT_VALUE);
            if (null != crcSwitch && crcSwitch) {
                command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
            }
        } else {
            // enable crc by default, if there is no invoke context.
            command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
        }
        command.setTimeout(timeoutMillis);
        command.setRequestClass(request.getClass().getName());
        // 設(shè)置 invokeContext 到 RpcRequestCommand 中,后續(xù)在自定義序列化器的序列化 header 和 body 的過程中,
        // 可以自定義的從 invokeContext 中序列化信息到對端
        command.setInvokeContext(invokeContext);
        command.serialize();
        logDebugInfo(command);
        return command;
    }

============================= RpcClientRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 設(shè)置四要素
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            // 設(shè)置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

============================= RpcServerRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 設(shè)置四要素
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            // 設(shè)置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

關(guān)于序列化的東西幸冻,在《序列化設(shè)計》部分分析粹庞。

統(tǒng)計“消息到達(dá)解碼器 ~ 消息即將被業(yè)務(wù)邏輯處理器處理” 之間的時間

============================= RpcRequestProcessor =============================
    public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
        long currentTimestamp = System.currentTimeMillis();
        // 預(yù)處理 RemotingContext
        preProcessRemotingContext(ctx, cmd, currentTimestamp);
        ...
        dispatchToUserProcessor(ctx, cmd);
    }

    private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd,
                                           long currentTimestamp) {
        ...
        ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime());
    }

BizContext 獲取 InvokeContext

public class DefaultBizContext implements BizContext {
    private RemotingContext remotingCtx;
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

二、RemotingContext

/**
 * Wrap the ChannelHandlerContext.
 */
public class RemotingContext {
    // netty ChannelHandlerContext
    private ChannelHandlerContext                       channelContext;
    // 是否是服務(wù)端
    private boolean                                     serverSide     = false;
    /** whether need handle request timeout, if true, request will be discarded. The default value is true */
    private boolean                                     timeoutDiscard = true;
    /** request arrive time stamp */
    private long                                        arriveTimestamp;
    /** request timeout setting by invoke side */
    private int                                         timeout;
    /** rpc command type:REQUEST / RESPONSE / REQUEST_ONEWAY */
    private int                                         rpcCommandType;
    // 用戶業(yè)務(wù)邏輯處理器
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors;
    // 調(diào)用上下文洽损,主要會統(tǒng)計“消息到達(dá)解碼器 ~ 消息即將被業(yè)務(wù)邏輯處理器處理” 之間的時間
    private InvokeContext                               invokeContext;

    public ChannelFuture writeAndFlush(RemotingCommand msg) {
        return this.channelContext.writeAndFlush(msg);
    }

    // whether this request already timeout: oneway 沒有請求超時的概念
    public boolean isRequestTimeout() {
        if (this.timeout > 0 && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
            && (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
            return true;
        }
        return false;
    }

    public UserProcessor<?> getUserProcessor(String className) {
        return StringUtils.isBlank(className) ? null : this.userProcessors.get(className);
    }

    public Connection getConnection() {
        return ConnectionUtil.getConnectionFromChannel(channelContext.channel());
    }
}

================== ConnectionUtil ==================
public class ConnectionUtil {
    public static Connection getConnectionFromChannel(Channel channel) {
         // 從 channel 的附屬屬性中獲取 Connection
        Attribute<Connection> connAttr = channel.attr(Connection.CONNECTION);
        Connection connection = connAttr.get();
        return connection;
    }
}

RemotingContext 作用:

  • 包含 userProcessors 映射:用于在處理消息流程中選擇業(yè)務(wù)邏輯處理
  • 包裝 ChannelHandlerContext:用于在處理消息結(jié)束后或者異常后向?qū)Χ税l(fā)送消息
  • 包裝 InvokeContext:用于存放添加服務(wù)端鏈路調(diào)用上下文

注意:BizContext 會包含 RemotingContext庞溜,但是不會提供 public 的 getRemotingContext 方法,但是會提供 getInvokeContext 方法碑定。

使用鏈路

================== RpcHandler ==================
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ...
        protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
    }

================== RpcRequestProcessor ==================
    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
        // 從 RemotingContext 獲取 UserProcessor
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
        // set timeout check state from user's processor
        ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());

        // use the final executor dispatch process task
        executor.execute(new ProcessTask(ctx, cmd));
    }

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        // 從 RemotingContext 獲取 UserProcessor
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        if (processor instanceof AsyncUserProcessor) {
            // processor.preHandleRequest:使用 BizContext 包裝 RemotingContext流码,避免 RemotingContext 直接暴露給用戶(因為 RemotingContext 包含 ChannelHandlerContext,可直接發(fā)送消息給對端)
            // 創(chuàng)建 RpcAsyncContext 存根:包裝 RemotingContext延刘,內(nèi)部使用其做一步發(fā)送操作
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
        } else {
            // processor.preHandleRequest:使用 BizContext 包裝 RemotingContext
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            // 使用 ctx.writeAndFlush(serializedResponse) 發(fā)送響應(yīng)
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
        }
    }

三漫试、BizContext

使用姿勢見 SOFABolt 源碼分析15 - 雙工通信機(jī)制的設(shè)計 中的“1.1、基于 addr 鏈路模式”

public class DefaultBizContext implements BizContext {
    // 包裹 RemotingContext
    private RemotingContext remotingCtx;

    // protect 方式碘赖,只有其子類可以訪問
    protected RemotingContext getRemotingCtx() {
        return this.remotingCtx;
    }

    @Override
    public String getRemoteAddress() {
        if (null != this.remotingCtx) {
            ChannelHandlerContext channelCtx = this.remotingCtx.getChannelContext();
            Channel channel = channelCtx.channel();
            if (null != channel) {
                return RemotingUtil.parseRemoteAddress(channel);
            }
        }
        return "UNKNOWN_ADDRESS";
    }
    
    ...
  
    // 這里也存儲了 Connection驾荣,可以用于服務(wù)端向客戶端直接發(fā)起調(diào)用
    @Override
    public Connection getConnection() {
        if (null != this.remotingCtx) {
            return this.remotingCtx.getConnection();
        }
        return null;
    }

    @Override
    public boolean isRequestTimeout() {
        return this.remotingCtx.isRequestTimeout();
    }
    
    ...
 
    @Override
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

BizContext 是直接給用戶程序使用的,而 RemotingContext 是程序內(nèi)部使用的

    public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
        return new DefaultBizContext(remotingCtx);
    }

四普泡、AsyncContext

public class RpcAsyncContext implements AsyncContext {
    /** remoting context */
    private RemotingContext     ctx;
    // rpc request command: 
    // 1. 會根據(jù)請求中的 type 是否是 oneway 來決定是否向?qū)Χ税l(fā)送數(shù)據(jù)
    // 2. 會將 RpcRequestCommand 中的 requestID 設(shè)置給響應(yīng)
    private RpcRequestCommand   cmd;

    private RpcRequestProcessor processor;

    /** is response sent already */
    private AtomicBoolean       isResponseSentAlready = new AtomicBoolean();

    // 創(chuàng)造響應(yīng)秘车,發(fā)送消息(發(fā)送還是使用 RemotingContext)
    @Override
    public void sendResponse(Object responseObject) {
        if (isResponseSentAlready.compareAndSet(false, true)) {
            processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
        } else {
            throw new IllegalStateException("Should not send rpc response repeatedly!");
        }
    }
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
    UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
    if (processor instanceof AsyncUserProcessor) {
        // yi'bu'chu'li
        processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市劫哼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌割笙,老刑警劉巖权烧,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異伤溉,居然都是意外死亡般码,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門乱顾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來板祝,“玉大人,你說我怎么就攤上這事走净∪保” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵伏伯,是天一觀的道長橘洞。 經(jīng)常有香客問我,道長说搅,這世上最難降的妖魔是什么炸枣? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上适肠,老公的妹妹穿的比我還像新娘霍衫。我一直安慰自己,他們只是感情好侯养,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布敦跌。 她就那樣靜靜地躺著,像睡著了一般沸毁。 火紅的嫁衣襯著肌膚如雪峰髓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天息尺,我揣著相機(jī)與錄音携兵,去河邊找鬼。 笑死搂誉,一個胖子當(dāng)著我的面吹牛徐紧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播炭懊,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼并级,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了侮腹?” 一聲冷哼從身側(cè)響起嘲碧,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎父阻,沒想到半個月后愈涩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡加矛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年履婉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斟览。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡毁腿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出苛茂,到底是詐尸還是另有隱情已烤,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布妓羊,位于F島的核電站草戈,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏侍瑟。R本人自食惡果不足惜唐片,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一丙猬、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧费韭,春花似錦茧球、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至督暂,卻和暖如春揪垄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背逻翁。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工饥努, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人八回。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓酷愧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缠诅。 傳聞我的和親對象是個殘疾皇子溶浴,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容