rocketmq網(wǎng)絡(luò)部分的整體的架構(gòu)
remoting 模塊是 mq 的基礎(chǔ)通信模塊,理解通信層的原理對理解模塊間的交互很有幫助仰禀。RocketMQ Remoting 模塊底層基于 Netty 網(wǎng)絡(luò)庫驅(qū)動丰涉,因此需要先了解一些基本的Netty原理辫呻。
Netty 使用 Reactor 模式铜邮,將監(jiān)聽線程萨蚕、IO 線程颗搂、業(yè)務(wù)邏輯線程隔離開來担猛。對每個連接,都對應(yīng)一個 ChannelPipeline丢氢。ChannelPipeline 的默認(rèn)實現(xiàn) DefaultChannelPipeline 中用一個雙向鏈表儲存著若干 ChannelHandlerContext傅联,每個ChannelHandlerContext 又對應(yīng)著一個 ChannelHandler。鏈表的頭部是一個 ChannelOutboundHandler:
class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
...
}
尾部是一個 ChannelInboundHandler:
class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
...
}
這里的 Inbound 是指這個 Handler 處理 外界觸發(fā) 的事件疚察,典型的就是對端發(fā)送了數(shù)據(jù)過來蒸走; Outbound 是指事件是 自己觸發(fā) 的,比如向?qū)Χ税l(fā)送數(shù)據(jù)貌嫡。同時比驻,一個 inbound 的事件將在 ChannelPipeline 的 ChannelHandlerContext 鏈表中從頭到尾傳播;而一個 outbound 的事件將會在 ChannelPipeline 的 ChannelHandlerContext 鏈表中從尾向頭傳播岛抄。這樣别惦,就能將數(shù)據(jù)解碼、數(shù)據(jù)處理夫椭、數(shù)據(jù)編碼等操作分散到不同的 ChannelHandler 中去了掸掸。
另外,RocketMQ 的協(xié)議格式如下,開頭4字節(jié)表示整個消息長度扰付,隨后4字節(jié)表示頭部數(shù)據(jù)的長度堤撵,最后就是消息體的長度:
<4 byte length> <4 byte header length> <N byte header data> <N byte body data>
最后,我們再來看一下 RocketMQ remoting 部分的 UML 圖羽莺,了解一下其大概由哪些部分組成:
上圖這些類中实昨,最重要的是 NettyRemotingClient 和 NettyRemotingServer,它們的一些公共方法就被封裝在 NettyRemotingAbstract 中盐固。
RemotingServer
有了上面的基本認(rèn)識荒给,就可以開始著手分析 RemotingServer 的源碼了。
啟動
public void start() {
...
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
...
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
...
}
可以看到闰挡,在 NettyRemotingServer 的 start() 方法中锐墙,啟動了 netty,使用成員變量 eventLoopGroupBoss
接受連接长酗,使用 eventLoopGroupSelector
處理 IO溪北,并且使用 defaultEventExecutorGroup
來處理 ChannelHandler 中的業(yè)務(wù)邏輯。nettyServerConfig
用來封裝對 Netty 的配置信息夺脾,包括 SendBufSize之拨、RcvBufSize 等。最重要的是咧叭,添加了 NettyEncoder
蚀乔、NettyDecoder
、IdleStateHandler
菲茬、NettyConnetManageHandler
吉挣、NettyServerHandler
幾個ChannelHandler。
隨后婉弹,如果 channelEventListener
不為 null睬魂, 則啟動一個專門的線程監(jiān)聽 Channel 的各種事件。
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
這個類主要是循環(huán)的從一個 LinkedBlockingQueue 中讀取事件镀赌,而后調(diào)用 channelEventListener 的不同方法處理事件:
class NettyEventExecuter extends ServiceThread {
//使用一個 LinkedBlockingQueue 來存儲待處理的 NettyEvent
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
//添加待處理事件氯哮,如果隊列大小沒用超過限制,則將事件入隊
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
PLOG.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
//循環(huán)讀取事件商佛,并處理
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
PLOG.warn(this.getServiceName() + " service has exception. ", e);
}
}
PLOG.info(this.getServiceName() + " service end");
}
...
}
隨后喉钢,則啟動一個定時器,每隔一段時間查看 responseTable 是否有超時未回應(yīng)的請求良姆,并完成一些清理工作肠虽,responseTable 的作用將在后文說明發(fā)送請求過程時說明:
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
至此,NettyRemotingServer 的啟動過程就結(jié)束了歇盼。
ChannelHandler
在啟動時舔痕,向 ChannelPipeline 中添加了以下ChannelHandler,我們分別來解釋其作用豹缀。
NettyEncoder
對發(fā)送請求按照上文提到的格式進行編碼伯复,沒用什么特殊的:
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
NettyDecoder
與 NettyEncoder 相反,這是一個 Inbound ChannelHandler邢笙,對接收到的數(shù)據(jù)進行解碼啸如,注意由于 RocketMQ 的協(xié)議的頭部是定長的,所以它繼承了 LengthFieldBasedFrameDecoder:
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
IdleStateHandler
這個 Handler 是用來進行 keepalive 的氮惯,當(dāng)一段時間沒有發(fā)送或接收到數(shù)據(jù)時叮雳,則觸發(fā) IdleStateEvent。
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
NettyConnetManageHandler
負(fù)責(zé)處理各種連接事件妇汗,尤其是 IdleState帘不,將其交給 channelEventListener
處理。
IdleStateEvent evnet = (IdleStateEvent) evt;
if ( evnet.state().equals( IdleState.ALL_IDLE ) )
{
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr( ctx.channel() );
log.warn( "NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress );
RemotingUtil.closeChannel( ctx.channel() );
if ( NettyRemotingServer.this.channelEventListener != null )
{
NettyRemotingServer.this
.putNettyEvent( new NettyEvent( NettyEventType.IDLE, remoteAddress.toString(), ctx.channel() ) );
}
}
NettyServerHandler
調(diào)用 NettyRemotingAbstract 的 processMessageReceived 方法處理請求杨箭。
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0( ChannelHandlerContext ctx, RemotingCommand msg ) throws Exception
{
processMessageReceived( ctx, msg );
}
}
public void processMessageReceived( ChannelHandlerContext ctx, RemotingCommand msg ) throws Exception
{
final RemotingCommand cmd = msg;
if ( cmd != null )
{
switch ( cmd.getType() )
{
case REQUEST_COMMAND:
processRequestCommand( ctx, cmd );
break;
case RESPONSE_COMMAND:
processResponseCommand( ctx, cmd );
break;
default:
break;
}
}
}
在這里寞焙,請求可以分為兩類,一類是處理別的服務(wù)發(fā)來的請求互婿;另外一類是處理自己發(fā)給別的服務(wù)的請求的處理結(jié)果捣郊。所有的請求其實都是異步的,只是將請求相關(guān)的 ResponseFuture記在一個 ConcurrentHashMap 中慈参,map 的 key 為與請求相關(guān)的一個整數(shù)呛牲。
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
另外需要注意的時,對不同類型的請求(由 RemotingCommand 的 code 字段標(biāo)識)驮配,會提前注冊對應(yīng)的 NettyRequestProcessor
以及 ExecutorService
娘扩,對請求的處理將放在注冊好的線程池中進行:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
...
}
對于 ResponseRequest 的處理則較為簡單,只是將其從 responseTable 中刪掉壮锻,然后再調(diào)用 ResponseFuture 的 putResponse 方法設(shè)置返回結(jié)果琐旁,或是調(diào)用 responseFuture 中預(yù)設(shè)的回掉方法。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseFuture.release();
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
} else {
PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
PLOG.warn(cmd.toString());
}
}
向其他服務(wù)發(fā)起請求
請求有三種躯保,分別是異步請求旋膳、同步請求以及單向請求,分別調(diào)用了 NettyRemotingAbstract 的對應(yīng)方法途事。從上文的分析我們可以看到验懊,異步請求其實是使用 opaque
字段標(biāo)識了一次請求,然后生成一個占位符 ResponseFuture 并存儲起來尸变。接收方在處理完請求后义图,發(fā)送一個相同 opaque
值的回應(yīng)請求,從而通過 opaque
找到對應(yīng)的 ResponseFuture召烂,返回結(jié)果或是運行預(yù)設(shè)的回調(diào)函數(shù)碱工。同步請求其實也是一個異步請求,只不過通過 CountdownLatch 使調(diào)用者發(fā)生阻塞。單向請求最簡單怕篷,只發(fā)送历筝,不關(guān)注請求結(jié)果。
下面以 invokeAsync 為例分析整個過程:
//調(diào)用 NettyRemotingAbstract 的 invokeAsyncImpl 方法
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
public void invokeAsyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback )
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
{
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire( timeoutMillis, TimeUnit.MILLISECONDS );
if ( acquired )
{
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce( this.semaphoreAsync );
final ResponseFuture responseFuture = new ResponseFuture( opaque, timeoutMillis, invokeCallback, once );
this.responseTable.put( opaque, responseFuture );
try {
channel.writeAndFlush( request ).addListener( new ChannelFutureListener()
{
@Override
public void operationComplete( ChannelFuture f ) throws Exception {
if ( f.isSuccess() )
{
responseFuture.setSendRequestOK( true );
return;
} else {
responseFuture.setSendRequestOK( false );
}
responseFuture.putResponse( null );
responseTable.remove( opaque );
try {
executeInvokeCallback( responseFuture );
} catch ( Throwable e ) {
PLOG.warn( "excute callback in writeAndFlush addListener, and callback throw", e );
} finally {
responseFuture.release();
}
PLOG.warn( "send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr( channel ) );
}
} );
} catch ( Exception e ) {
responseFuture.release();
PLOG.warn( "send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr( channel ) + "> Exception", e );
throw new RemotingSendRequestException( RemotingHelper.parseChannelRemoteAddr( channel ), e );
}
} else {
String info =
String.format( "invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", /* */
timeoutMillis, /* */
this.semaphoreAsync.getQueueLength(), /* */
this.semaphoreAsync.availablePermits() /* */
);
PLOG.warn( info );
throw new RemotingTooMuchRequestException( info );
}
}
在請求時如果失敗成功則直接返回廊谓,如果失敗則從 responseTable 刪除本次請求梳猪,并調(diào)用 responseFuture.putResponse( null )
,然后執(zhí)行失敗回調(diào) executeInvokeCallback( responseFuture )
蒸痹。而后春弥,就是等待對方發(fā)來的 Response Request 了,上文已經(jīng)有過分析叠荠,這里不再贅述匿沛。
下面,看看同步消息榛鼎。在發(fā)送請求后逃呼,調(diào)用了 ResponseFuture 的 waitResponse 方法。這個方法調(diào)用了 CountDownLatch 的 await 方法借帘。請求處理成功或失敗后則會調(diào)用 ResponseFuture 的 putResponse 方法蜘渣,設(shè)置處理結(jié)果并打開 CountDownLatch,從而實現(xiàn)了同步調(diào)用肺然。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
PLOG.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
NettyRemotingClient 的思路與 NettyRemotingServer 類似蔫缸,這里不再進行分析。
以上际起。