SOFABolt 中存在四種上下文 context
- InvokeContext:調(diào)用上下文,用于端內(nèi)隱式傳參千贯,并可以通過自定義序列化器將 InvokeContext 內(nèi)存儲的參數(shù)自定義的序列化傳遞給對端(注意:InvokeContext 本身是不會傳遞給對端的)
- RemotingContext:Remoting 層的上下文髓堪,程序內(nèi)部使用
- BizContext:業(yè)務(wù)上下文送朱,提供給用戶程序使用,封裝了 RemotingContext干旁,防止直接將 RemotingContext 暴露給用戶
- AsyncContext:存儲存根信息驶沼,用于 AsyncUserProcessor 異步返回響應(yīng)
一、InvokeContext
1.1 使用姿勢
見 SOFABolt 源碼分析10 - 精細(xì)的線程模型的設(shè)計 的 “2.4 設(shè)置 UserProcessor 自定義線程池選擇器”
1.2 源碼分析
public class InvokeContext {
// ~~~ invoke context keys of client side
public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
/** time consumed during connection creating, this is a timespan */
public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
// ~~~ invoke context keys of server side
public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";
// ~~~ invoke context keys of client and server side
public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
/** 時間段:請求達(dá)到解碼器 ~ 請求即將被處理(與處理完成) */
public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";
public final static int INITIAL_SIZE = 8;
/** context */
private ConcurrentHashMap<String, Object> context;
public InvokeContext() {
this.context = new ConcurrentHashMap<String, Object>(INITIAL_SIZE);
}
}
注意:
- InvokeContext 內(nèi)部實際上就是一個 map疤孕,而不是像 RpcInvokeContext 一樣商乎,內(nèi)部是一個 ThreadLocal(RpcInvokeContext 是 SOFARPC 的上下文)
- 由于 InvokeContext 內(nèi)部只是一個 map,所以 InvokeContext 本身不能進(jìn)行“隱式傳遞”祭阀,InvokeContext 本身需要作為接口的參數(shù)進(jìn)行傳遞才行鹉戚,所以四種調(diào)用模式的三種調(diào)用鏈路方式都提供了帶有 InvokeContext 參數(shù)的方法,eg. invokeSync(Connection conn, Object request,
InvokeContext invokeContext
, int timeoutMillis)- InvokeContext 不會傳遞給對端专控,但是其中的內(nèi)容可以通過自定義序列化器的方式傳遞給對端抹凳,使用姿勢見 SOFABolt 源碼分析10 - 精細(xì)的線程模型的設(shè)計
設(shè)置建連消耗時間(僅客戶端,服務(wù)端不建連)
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
this.connectionManager.check(conn);
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
// 記錄開始時間
long start = System.currentTimeMillis();
Connection conn;
try {
// 建連
conn = this.connectionManager.getAndCreateIfAbsent(url);
} finally {
if (null != invokeContext) {
// 記錄建連時間(客戶端伦腐,服務(wù)端不建連)
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
}
}
return conn;
}
設(shè)置客戶端自定義序列化器 + crc 開關(guān) + 四要素(ip/port)+ requestID
public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 創(chuàng)建請求:會根據(jù) invokeContext 配置 - 設(shè)置客戶端自定義序列化器 + crc 開關(guān)
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
// 預(yù)處理 InvokeContext:設(shè)置四要素(IP/PORT) + 請求ID
preProcessInvokeContext(invokeContext, requestCommand, conn);
// 發(fā)起請求
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
// 將 invokeContext 設(shè)置到 responseCommand赢底,可以讓用戶使用
responseCommand.setInvokeContext(invokeContext);
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);
if (null != invokeContext) {
// 設(shè)置客戶端自定義序列化器
Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
if (null != clientCustomSerializer) {
command.setSerializer((Byte) clientCustomSerializer);
}
// 是否開啟 crc 開關(guān)
// enable crc by default, user can disable by set invoke context `false` for key `InvokeContext.BOLT_CRC_SWITCH`
Boolean crcSwitch = invokeContext.get(InvokeContext.BOLT_CRC_SWITCH, ProtocolSwitch.CRC_SWITCH_DEFAULT_VALUE);
if (null != crcSwitch && crcSwitch) {
command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
}
} else {
// enable crc by default, if there is no invoke context.
command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
}
command.setTimeout(timeoutMillis);
command.setRequestClass(request.getClass().getName());
// 設(shè)置 invokeContext 到 RpcRequestCommand 中,后續(xù)在自定義序列化器的序列化 header 和 body 的過程中,
// 可以自定義的從 invokeContext 中序列化信息到對端
command.setInvokeContext(invokeContext);
command.serialize();
logDebugInfo(command);
return command;
}
============================= RpcClientRemoting =============================
protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
if (null != invokeContext) {
// 設(shè)置四要素
invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
// 設(shè)置 requestID
invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
}
}
============================= RpcServerRemoting =============================
protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
if (null != invokeContext) {
// 設(shè)置四要素
invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
// 設(shè)置 requestID
invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
}
}
關(guān)于序列化的東西幸冻,在《序列化設(shè)計》部分分析粹庞。
統(tǒng)計“消息到達(dá)解碼器 ~ 消息即將被業(yè)務(wù)邏輯處理器處理” 之間的時間
============================= RpcRequestProcessor =============================
public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
long currentTimestamp = System.currentTimeMillis();
// 預(yù)處理 RemotingContext
preProcessRemotingContext(ctx, cmd, currentTimestamp);
...
dispatchToUserProcessor(ctx, cmd);
}
private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd,
long currentTimestamp) {
...
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime());
}
BizContext 獲取 InvokeContext
public class DefaultBizContext implements BizContext {
private RemotingContext remotingCtx;
public InvokeContext getInvokeContext() {
return this.remotingCtx.getInvokeContext();
}
}
二、RemotingContext
/**
* Wrap the ChannelHandlerContext.
*/
public class RemotingContext {
// netty ChannelHandlerContext
private ChannelHandlerContext channelContext;
// 是否是服務(wù)端
private boolean serverSide = false;
/** whether need handle request timeout, if true, request will be discarded. The default value is true */
private boolean timeoutDiscard = true;
/** request arrive time stamp */
private long arriveTimestamp;
/** request timeout setting by invoke side */
private int timeout;
/** rpc command type:REQUEST / RESPONSE / REQUEST_ONEWAY */
private int rpcCommandType;
// 用戶業(yè)務(wù)邏輯處理器
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors;
// 調(diào)用上下文洽损,主要會統(tǒng)計“消息到達(dá)解碼器 ~ 消息即將被業(yè)務(wù)邏輯處理器處理” 之間的時間
private InvokeContext invokeContext;
public ChannelFuture writeAndFlush(RemotingCommand msg) {
return this.channelContext.writeAndFlush(msg);
}
// whether this request already timeout: oneway 沒有請求超時的概念
public boolean isRequestTimeout() {
if (this.timeout > 0 && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
&& (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
return true;
}
return false;
}
public UserProcessor<?> getUserProcessor(String className) {
return StringUtils.isBlank(className) ? null : this.userProcessors.get(className);
}
public Connection getConnection() {
return ConnectionUtil.getConnectionFromChannel(channelContext.channel());
}
}
================== ConnectionUtil ==================
public class ConnectionUtil {
public static Connection getConnectionFromChannel(Channel channel) {
// 從 channel 的附屬屬性中獲取 Connection
Attribute<Connection> connAttr = channel.attr(Connection.CONNECTION);
Connection connection = connAttr.get();
return connection;
}
}
RemotingContext 作用:
- 包含 userProcessors 映射:用于在處理消息流程中選擇業(yè)務(wù)邏輯處理
- 包裝 ChannelHandlerContext:用于在處理消息結(jié)束后或者異常后向?qū)Χ税l(fā)送消息
- 包裝 InvokeContext:用于存放添加服務(wù)端鏈路調(diào)用上下文
注意:BizContext 會包含 RemotingContext庞溜,但是不會提供 public 的 getRemotingContext 方法,但是會提供 getInvokeContext 方法碑定。
使用鏈路
================== RpcHandler ==================
public void channelRead(ChannelHandlerContext ctx, Object msg) {
...
protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
}
================== RpcRequestProcessor ==================
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
// 從 RemotingContext 獲取 UserProcessor
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
// set timeout check state from user's processor
ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
// use the final executor dispatch process task
executor.execute(new ProcessTask(ctx, cmd));
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
// 從 RemotingContext 獲取 UserProcessor
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
if (processor instanceof AsyncUserProcessor) {
// processor.preHandleRequest:使用 BizContext 包裝 RemotingContext流码,避免 RemotingContext 直接暴露給用戶(因為 RemotingContext 包含 ChannelHandlerContext,可直接發(fā)送消息給對端)
// 創(chuàng)建 RpcAsyncContext 存根:包裝 RemotingContext延刘,內(nèi)部使用其做一步發(fā)送操作
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
} else {
// processor.preHandleRequest:使用 BizContext 包裝 RemotingContext
Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
// 使用 ctx.writeAndFlush(serializedResponse) 發(fā)送響應(yīng)
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
}
}
三漫试、BizContext
使用姿勢見 SOFABolt 源碼分析15 - 雙工通信機(jī)制的設(shè)計 中的“1.1、基于 addr 鏈路模式”
public class DefaultBizContext implements BizContext {
// 包裹 RemotingContext
private RemotingContext remotingCtx;
// protect 方式碘赖,只有其子類可以訪問
protected RemotingContext getRemotingCtx() {
return this.remotingCtx;
}
@Override
public String getRemoteAddress() {
if (null != this.remotingCtx) {
ChannelHandlerContext channelCtx = this.remotingCtx.getChannelContext();
Channel channel = channelCtx.channel();
if (null != channel) {
return RemotingUtil.parseRemoteAddress(channel);
}
}
return "UNKNOWN_ADDRESS";
}
...
// 這里也存儲了 Connection驾荣,可以用于服務(wù)端向客戶端直接發(fā)起調(diào)用
@Override
public Connection getConnection() {
if (null != this.remotingCtx) {
return this.remotingCtx.getConnection();
}
return null;
}
@Override
public boolean isRequestTimeout() {
return this.remotingCtx.isRequestTimeout();
}
...
@Override
public InvokeContext getInvokeContext() {
return this.remotingCtx.getInvokeContext();
}
}
BizContext 是直接給用戶程序使用的,而 RemotingContext 是程序內(nèi)部使用的
public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
return new DefaultBizContext(remotingCtx);
}
四普泡、AsyncContext
public class RpcAsyncContext implements AsyncContext {
/** remoting context */
private RemotingContext ctx;
// rpc request command:
// 1. 會根據(jù)請求中的 type 是否是 oneway 來決定是否向?qū)Χ税l(fā)送數(shù)據(jù)
// 2. 會將 RpcRequestCommand 中的 requestID 設(shè)置給響應(yīng)
private RpcRequestCommand cmd;
private RpcRequestProcessor processor;
/** is response sent already */
private AtomicBoolean isResponseSentAlready = new AtomicBoolean();
// 創(chuàng)造響應(yīng)秘车,發(fā)送消息(發(fā)送還是使用 RemotingContext)
@Override
public void sendResponse(Object responseObject) {
if (isResponseSentAlready.compareAndSet(false, true)) {
processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
} else {
throw new IllegalStateException("Should not send rpc response repeatedly!");
}
}
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
if (processor instanceof AsyncUserProcessor) {
// yi'bu'chu'li
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
}
}