根據(jù)rocketmq的模塊設(shè)計(jì)倍踪,其通信相關(guān)的代碼放在源碼包下的rocketmq-remoting模塊。主要內(nèi)容包括了編解碼處理婶希,使用了nety框架對(duì)接收發(fā)送消息的處理等藤乙。其類圖見下:
其中,以RemotingService為最上層接口,提供了三個(gè)接口:
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
RemotingClient和RemotingServer都繼承了RemotingService接口, 并增加了自己特有的接口.NettyRemotingClient和NettyRemotingServer分別實(shí)現(xiàn)了RemotingClient和RemotingServer, 并且都繼承了NettyRemotingAbstract類. NettyRemotingAbstract這個(gè)抽象類包含了很多公共數(shù)據(jù)處理,也包含了很多重要的數(shù)據(jù)結(jié)構(gòu), 這個(gè)稍后介紹.
其它還有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信過程中使用到的類.
1、協(xié)議設(shè)計(jì)和編解碼
1.1 協(xié)議設(shè)計(jì)
rocketmq的協(xié)議如下
從上面可以看出,其總長度是4+4+消息頭長度+消息體長度是偷。
其中消息頭的長度值,在第二個(gè)4字節(jié)中的2,募逞、3蛋铆、4個(gè)字節(jié)中。
1.2消息的編碼
以rocketmq給的關(guān)于remoting的test調(diào)試入手凡辱,具體類是
以同步通信為例
@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 100000 * 3);
assertTrue(response != null);
System.out.println(response);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
上面的例子中戒职, requestHeader實(shí)現(xiàn)了CommandCustomHeader接口,即requestHeader是我們的消息頭部信息透乾。然后洪燥,以requestHeader為參,建立RemoteCommand消息乳乌。
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
其中捧韵,RemoteCommand是rocketmq中傳輸信息的消息定義體。其成員變量定義如下汉操,其中extFields可以存儲(chǔ)用戶的鍵值對(duì)信息:
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
在test之前再来,會(huì)先啟動(dòng)remotingServer 服務(wù)端和remotingClient 客戶端。
@BeforeClass
public static void setup() throws InterruptedException {
remotingServer = createRemotingServer();
remotingClient = createRemotingClient();
}
上述的服務(wù)端和客戶端都一netty為基礎(chǔ)磷瘤。首先看客戶端的啟動(dòng)createRemotingClient芒篷。客戶端的啟動(dòng)之前采缚,會(huì)先定義一些線程池中针炉,創(chuàng)建線程如何定義等,然后調(diào)用start
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
其中扳抽,編碼工作在new NettyEncoder()中篡帕。查看其定義:
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
可以看到殖侵,上面主要完成了消息頭和消息體的寫入。其中消息體本身就是byte[]數(shù)組镰烧,不需要多做關(guān)注拢军。重點(diǎn)勘察remotingCommand.encodeHeader();方法。
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
//這里是重點(diǎn)怔鳖,完成了消息頭的編碼
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length 寫入length
result.putInt(length);
// header length 寫入headerlenth和序列化方式
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data 寫入頭數(shù)據(jù)
result.put(headerData);
result.flip();
return result;
}
可以看到茉唉,上面的方法完成了除了body之外的消息轉(zhuǎn)為bytebuf的過程。其中:
1败砂、前四個(gè)字節(jié)存放的是整個(gè)消息體的長度(但是這個(gè)長度不包括前四個(gè)字節(jié))赌渣,即length的長度值包括上面rocket協(xié)議圖中2,3,4部分的長度;
2昌犹、查看 headerData = this.headerEncode();,將消息頭轉(zhuǎn)為byte[]數(shù)組览芳。
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
//默認(rèn)采用了json序列化斜姥。這里的this指代的是RemotingCommand
return RemotingSerializable.encode(this);
}
}
/**
makeCustomHeaderToNet方法是將customHeader中定義的鍵值對(duì)參數(shù)寫入 extFields中,比如我們?cè)跍y試用例中沧竟,寫入了requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
*/
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
重點(diǎn)關(guān)注上面的 RemotingSerializable.encode(this);铸敏,其中,this指代的RemotingCommand悟泵,我們通過斷點(diǎn)調(diào)試杈笔,看一下這個(gè)this主要包含了什么內(nèi)容:
RemotingCommand [code=0, language=JAVA, version=0, opaque=0, flag(B)=0, remark=null, extFields={count=1, messageTitle=Welcome}, serializeTypeCurrentRPC=JSON]
可以看到,里面內(nèi)容是RemotingCommand 的相關(guān)信息糕非,其中包括了我們自己定義的count和messageTitle信息蒙具。
這是因?yàn)镽emotingCommand 重新定義了RemotingCommand 的tostring方法。(如果沒有重新定義tostring方法朽肥,則this表示類的實(shí)例org.apache.rocketmq.remoting.RemotingCommand @3b764bce)
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ serializeTypeCurrentRPC + "]";
}
總結(jié)rocketmq的編碼: 消息體需要用戶自己轉(zhuǎn)為byte[]數(shù)組禁筏,進(jìn)行傳輸。而消息頭衡招,是rocketmq來完成序列化和轉(zhuǎn)為byte[] 數(shù)組操作篱昔。這樣的設(shè)計(jì),應(yīng)該是考慮到RemotingCommand的很多設(shè)置是默認(rèn)的始腾,但又是必須的州刽,由系統(tǒng)來完成消息頭的序列化操作。
1.3 消息的解碼
消息的解碼定義于NettyDecoder中浪箭,
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
其中穗椅,解碼操作位于 RemotingCommand.decode(byteBuffer);
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//獲取消息長度,不包括消息長度本身
int length = byteBuffer.limit();
//獲取消息頭的長度
int oriHeaderLen = byteBuffer.getInt();
//執(zhí)行 length & 0xFFFFFF山林,將int的后24位取出
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
//將消息頭解碼
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
//默認(rèn)采用json方式將字符串轉(zhuǎn)為RemotingCommand類
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
2 .rocketmq的通信流程
2.1同步發(fā)送
2.1.1 客戶端流程
@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 100000 * 3);
assertTrue(response != null);
System.out.println(response);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
真正的發(fā)送和接收數(shù)據(jù)在下面這一行:其中房待,為了調(diào)試方便邢羔,把時(shí)間增加到10s。即如果10s內(nèi)收不到返回的數(shù)據(jù)桑孩,就報(bào)錯(cuò)
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 10000 * 1);
而invokeSync方法的定義如下:
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
//調(diào)用之前的操作
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
//真正的調(diào)用在于這里
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//結(jié)果返回后拜鹤,如果有相關(guān)操作,則執(zhí)行
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
再看invokeSyncImpl的定義
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//這里的opaque是我們每次新建一個(gè)RemotingCommand時(shí)流椒,就會(huì)自動(dòng)+1.可以理解為RemotingCommand的id
final int opaque = request.getOpaque();
try {
//responseFuture用于異步獲取處理的結(jié)果
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//異步等待timeoutMillis時(shí)間后敏簿,從responseFuture獲取返回結(jié)果,如果沒有結(jié)果的話就是null
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
可以看到宣虾,我們是從ResponseFuture中取的結(jié)果惯裕,那么ResponseFuture的結(jié)果又從哪來的呢?
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
ResponseFuture類中有一些用于控制多線程的工具類绣硝,比如CountDownLatch 蜻势,Semaphore等。
先跳出來鹉胖,去看看我們收到消息后是如何處理的:
client的處理類是
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
//從收到的數(shù)據(jù)中找到opaque握玛,
final int opaque = cmd.getOpaque();
//從responseTable中找到此標(biāo)識(shí)號(hào)的ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
//把結(jié)果存入responseFuture
responseFuture.setResponseCommand(cmd);
//處理完了,移除
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
//好像和 responseFuture.setResponseCommand(cmd);是一樣的
responseFuture.putResponse(cmd);
//異步時(shí)候有用
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
上面方法中甫菠,客戶端收到數(shù)據(jù)后挠铲,會(huì)將結(jié)果存入responseFuture中,而在我們前面的分析中可以看到寂诱,客戶端發(fā)送完消息后拂苹,會(huì)在一定的時(shí)間之后,從responseFuture去取這個(gè)結(jié)果痰洒。
2.1.1 服務(wù)端流程
在單元測試中瓢棒,服務(wù)端的建立方法如下:
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
//registerProcessor是后面的處理方法與0進(jìn)行綁定。即請(qǐng)求中如果cmd的code是0的話带迟,就調(diào)用后面這個(gè)方法
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hixxxxx " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}
其中registerProcessor
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
//運(yùn)行processor的線程池
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
//把requestCode與處理方法做成鍵值對(duì)音羞,存入processorTable中
this.processorTable.put(requestCode, pair);
}
與客戶端類似,服務(wù)端收到數(shù)據(jù)后仓犬,也會(huì)進(jìn)行處理嗅绰,流程不再說,其處理方法如下
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根據(jù)cmd.getCode()找到對(duì)應(yīng)的處理方法
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
//如果沒有搀继,就用默認(rèn)的
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
//得到消息的id號(hào)
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
//執(zhí)行消息處理方法窘面,得到返回值
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
//設(shè)置返回消息的id
response.setOpaque(opaque);
//設(shè)置返回消息的類型
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
//如果是isOnewayRPC,單向消息叽躯,就不用處理了
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
//生成一個(gè)runnable财边,綁定channel
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//pair.getObject2()得到的是一個(gè)線程池,線程池執(zhí)行requestTask点骑,就是我們上面定義的runnable
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
2.2 單向(oneway)
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//表明是單向發(fā)送
request.markOnewayRPC();
//semaphoreOneway用于控制發(fā)送順序酣难,
//semaphoreOneway的默認(rèn)許可是65535谍夭,每次發(fā)送前獲取一次許可(許可-1),發(fā)送完成之后許可+1
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
2.3 異步調(diào)用
異步調(diào)用與同步調(diào)用流程大體類似憨募,
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
//從收到的數(shù)據(jù)中找到opaque紧索,
final int opaque = cmd.getOpaque();
//從responseTable中找到此標(biāo)識(shí)號(hào)的ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
//把結(jié)果存入responseFuture
responseFuture.setResponseCommand(cmd);
//處理完了,移除
responseTable.remove(opaque);
//***********在這里執(zhí)行異步調(diào)用菜谣,結(jié)果返回*****************
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
//好像和 responseFuture.setResponseCommand(cmd);是一樣的
responseFuture.putResponse(cmd);
//異步時(shí)候有用
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
public void executeInvokeCallback() {
if (invokeCallback != null) {
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
invokeCallback.operationComplete(this);
}
}
}
上面的operationComplete就是我們?cè)趩卧獪y試類中珠漂,定義的
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
System.out.println("latch.countDown()運(yùn)行");
assertTrue(responseFuture != null);
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});