在上篇文章中姑荷,我們提到了talent-aio的四類Task:
DecodeRunnable
、HandlerRunnable
颤霎、SendRunnable
猪钮、CloseRunnable
,并且分析了這些task的基類AbstractQueueRunnable
述雾。在這篇文章中街州,我們就來分析一下這幾個task是如何相互協(xié)作來處理輸入、輸出數(shù)據(jù)以及客戶端連接的玻孟。
talent-aio 基礎(chǔ)類介紹
在開始介紹前唆缴,我們需要先介紹幾個talent-aio的基礎(chǔ)類和接口,在處理輸入輸出的整個流程中黍翎,都需要用到它們面徽。兩個基礎(chǔ)的類為:
ChannelContext
GroupContext
其中ChannelContext類包含:
- AsynchronousSocketChannel對象、與該連接有關(guān)的
-
DecodeRunnable
、HandlerRunnable
趟紊、SendRunnable
氮双、CloseRunnable
- GroupContext
- ReadCompletionHandler
ChannelContext類代表一個socket連接,封裝了:連接對應(yīng)的AsynchronousSocketChannel對象霎匈;解碼戴差、處理接收到的數(shù)據(jù)和發(fā)送數(shù)據(jù)、關(guān)閉連接的幾個Runnable铛嘱;該連接所屬的GroupContext引用暖释;該連接的ReadCompletionHandler等。
在創(chuàng)建ChannelContext的過程中墨吓,首先記錄accept返回的asynchronousSocketChannel:
public void setAsynchronousSocketChannel(AsynchronousSocketChannel
asynchronousSocketChannel)
{
this.asynchronousSocketChannel = asynchronousSocketChannel;
if (asynchronousSocketChannel != null)
{
try
{
clientNode = getClientNode(asynchronousSocketChannel);
} catch (IOException e)
{
log.error(e.toString(), e);
}
} else
{
clientNode = null;
}
}
而后設(shè)置GroupContext球匕,并創(chuàng)建幾個Runnable,最終將自己加入GroupContext管理的連接集合connections和對端集合clientNodes中:
public void setGroupContext(GroupContext<Ext, P, R> groupContext)
{
this.groupContext = groupContext;
if (groupContext != null)
{
decodeRunnable = new DecodeRunnable<>(this,
groupContext.getDecodeExecutor());
closeRunnable = new CloseRunnable<>(this, null,
null, groupContext.getCloseExecutor());
handlerRunnableNormPrior = new HandlerRunnable<>(this,
groupContext.getHandlerExecutorNormPrior());
sendRunnableNormPrior = new SendRunnable<>(this,
groupContext.getSendExecutorNormPrior());
groupContext.getConnections().add(this);
groupContext.getClientNodes().put(this);
}
}
而GroupContext管理著當(dāng)前實(shí)例的所有連接connections肛真、連接對應(yīng)的用戶users以及用戶間的分組關(guān)系groups:
protected ClientNodes<Ext, P, R> clientNodes = new ClientNodes<>();
protected Connections<Ext, P, R> connections = new Connections<>();
protected Groups<Ext, P, R> groups = new Groups<>();
protected Users<Ext, P, R> users = new Users<>();
除此之外谐丢,GroupContext還管理著一些線程池:
/**
* 解碼線程池
*/
private SynThreadPoolExecutor<SynRunnableIntf> decodeExecutor;
/**
* 關(guān)閉連接的線程池
*/
private SynThreadPoolExecutor<SynRunnableIntf> closeExecutor;
/**
* 業(yè)務(wù)處理線程池
*/
private SynThreadPoolExecutor<SynRunnableIntf> handlerExecutorNormPrior;
/**
* 消息發(fā)送線程池
*/
private SynThreadPoolExecutor<SynRunnableIntf> sendExecutorNormPrior;
在處理輸入輸出的過程中我們會用到上面這兩個類。
talent-aio的主要的業(yè)務(wù)邏輯都在AioHandler接口的實(shí)現(xiàn)類中蚓让,AioHandler接口為:
public interface AioHandler<Ext, P extends Packet, R>
{
/**
* 處理消息包
*
* @param packet the packet
* @return the r
* @author: tanyaowu
* @創(chuàng)建時間: 2016年11月15日 上午11:38:52
*/
R handler(P packet,
ChannelContext<Ext, P, R> channelContext) throws Exception;
/**
* 編碼
*
* @param packet the packet
* @return the byte buffer
* @author: tanyaowu
* @創(chuàng)建時間: 2016年11月15日 上午11:38:52
*/
ByteBuffer encode(P packet,
ChannelContext<Ext, P, R> channelContext);
/**
* 根據(jù)ByteBuffer解碼成業(yè)務(wù)需要的Packet對象.
*
* @param buffer the buffer
* @return the t
* @throws AioDecodeException the aio decode exception
*/
P decode(ByteBuffer buffer, ChannelContext<Ext, P, R> channelContext)
throws AioDecodeException;
}
三個方法分別用來數(shù)據(jù)解包乾忱、數(shù)據(jù)封包以及處理數(shù)據(jù)包。
處理流程
讀取數(shù)據(jù)
首先历极,是處理讀取數(shù)據(jù):
public void completed(Integer result, ByteBuffer byteBuffer)
{
GroupContext<Ext, P, R> groupContext
= channelContext.getGroupContext();
if (result > 0)
{
byteBuffer.limit(byteBuffer.position());
byteBuffer.position(0);
DecodeRunnable<Ext, P, R> decodeRunnable
= channelContext.getDecodeRunnable();
decodeRunnable.addMsg(byteBuffer);
groupContext.getDecodeExecutor().execute(decodeRunnable);
} else if (result == 0)
{
log.error("讀到的數(shù)據(jù)長度為0");
} else if (result < 0)
{
Aio.close(channelContext, null, "讀數(shù)據(jù)時返回" + result);
}
if (AioUtils.checkBeforeIO(channelContext))
{
AsynchronousSocketChannel asynchronousSocketChannel
= channelContext.getAsynchronousSocketChannel();
ByteBuffer newByteBuffer =
ByteBuffer.allocate(
channelContext.getGroupContext().getReadBufferSize()
);
asynchronousSocketChannel.read(newByteBuffer,
newByteBuffer, this);
}
}
在讀取數(shù)據(jù)后窄瘟,將byteBuffer提交到decodeRunnable的數(shù)據(jù)隊(duì)列中,而后繼續(xù)調(diào)用read方法讀取對端發(fā)送來的數(shù)據(jù)趟卸。
數(shù)據(jù)組包
在decodeRunnable中蹄葱,調(diào)用AioHandler的decode方法來獲取數(shù)據(jù)包,然后提交給HandlerRunnable處理锄列。在處理數(shù)據(jù)時主要考慮了半包和粘包的情況:
@Override
public void runTask()
{
ConcurrentLinkedQueue<ByteBuffer> queue
= getMsgQueue();
ByteBuffer byteBuffer = null;
label_1: while ((size = queue.size()) > 0)
{
byteBuffer = queue.poll();
if (byteBuffer != null)
{
if (lastByteBuffer != null)
{
byteBuffer.position(0);
byteBuffer =
ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
lastByteBuffer = null;
}
} else {
break label_1;
}
try
{
byteBuffer.position(0);
label_2: while (true)
{
int initPosition = byteBuffer.position();
P packet =
channelContext.
getGroupContext().
getAioHandler().
decode(byteBuffer, channelContext);
if (packet == null)// 數(shù)據(jù)不夠图云,組不了包
{
if (log.isDebugEnabled())
{
log.debug("{},數(shù)據(jù)不夠,組不了包",
channelContext.toString());
}
byteBuffer.position(initPosition);
lastByteBuffer = byteBuffer;
continue label_1;
} else //組包成功
{
channelContext.getStat().
setLatestTimeOfReceivedPacket(
SystemTimer.currentTimeMillis()
);
int afterDecodePosition = byteBuffer.position();
int len = afterDecodePosition - initPosition;
AioListener<Ext, P, R> aioListener
= channelContext.getGroupContext().getAioListener();
if (aioListener != null)
{
aioListener.
onAfterDecoded(channelContext, packet, len);
}
submit(packet, len);
channelContext.getGroupContext().
getGroupStat().
getReceivedPacket().
incrementAndGet();
channelContext.getGroupContext().
getGroupStat().
getReceivedBytes().
addAndGet(len);
if (byteBuffer.hasRemaining())//組包后邻邮,還剩有數(shù)據(jù)
{
if (log.isDebugEnabled())
{
log.debug("{}組包后竣况,還剩有數(shù)據(jù):{}",
channelContext, byteBuffer.limit() - byteBuffer.position());
}
continue label_2;
} else//組包后,數(shù)據(jù)剛好用完
{
lastByteBuffer = null;
log.debug("{},組包后筒严,數(shù)據(jù)剛好用完",
channelContext);
continue label_1;
}
}
}
} catch (AioDecodeException e)
{
log.error(channelContext.toString(), e);
Aio.close(channelContext, e, "解碼異常:"
+ e.getMessage());
break label_1;
} finally
{
}
}
}
在這個循環(huán)中丹泉,首先從隊(duì)列中獲取數(shù)據(jù),獲取到數(shù)據(jù)后鸭蛙,看看是否存在粘包多出來的數(shù)據(jù)lastByteBuffer
, 如果存在則將兩部分?jǐn)?shù)據(jù)合并摹恨。而后調(diào)用AioHandler的decode方法處理數(shù)據(jù),如果由于半包導(dǎo)致解包失敗娶视,則繼續(xù)從隊(duì)列中獲取數(shù)據(jù)晒哄,組合起來嘗試解包;如果解包成功,則將多余的數(shù)據(jù)放在lastByteBuffer
揩晴,并且更新各種統(tǒng)計(jì)信息勋陪。最后,將組好的數(shù)據(jù)包通過submit方法傳遞給HandlerRunnable處理硫兰。
數(shù)據(jù)處理
HandlerRunnable的邏輯相對比較簡單诅愚,從數(shù)據(jù)隊(duì)列中獲取組好的包,并調(diào)用doPacket處理數(shù)據(jù)包:
@Override
public void runTask()
{
ConcurrentLinkedQueue<P> queue = getMsgQueue();
P packet = null;
while ((packet = queue.poll()) != null)
{
doPacket(packet);
}
}
在doPacket中劫映,調(diào)用AioHandler的hanlder解除處理數(shù)據(jù)包:
groupContext.getAioHandler().handler(packet, channelContext);
發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)使用Aio類的靜態(tài)方法send將packet添加到SendRunnable的數(shù)據(jù)隊(duì)列中违孝,而后將sendRunnable提交到線程池中運(yùn)行:
public static <Ext, P extends Packet, R> void send(
ChannelContext<Ext, P, R> channelContext,
P packet
)
{
if (channelContext == null)
{
log.error("channelContext == null");
return;
}
SendRunnable<Ext, P, R> sendRunnable
= AioUtils.selectSendRunnable(channelContext, packet);
sendRunnable.addMsg(packet);
SynThreadPoolExecutor<SynRunnableIntf> synThreadPoolExecutor
= AioUtils.selectSendExecutor(channelContext, packet);
synThreadPoolExecutor.execute(sendRunnable);
}
在線程池中,會嘗試一次發(fā)送所有等待發(fā)送的packet泳赋,不過對單次發(fā)送的packet設(shè)置了一個上限雌桑,而后對每個packet編碼,匯總到一個ByteBuffer中:
for ( int i = 0; i < queueSize; i++ )
{
if ( (packet = queue.poll() ) != null )
{
ByteBuffer byteBuffer = aioHandler.encode( packet, channelContext );
allBytebufferCapacity += byteBuffer.limit();
packetCount++;
byteBuffers[i] = byteBuffer;
if ( aioListener != null )
{
try
{
aioListener.onBeforeSent( channelContext, packet );
} catch ( Exception e )
{
log.error( e.toString(), e );
}
}
} else{
break;
}
}
ByteBuffer allByteBuffer = ByteBuffer.allocate( allBytebufferCapacity );
for ( ByteBuffer byteBuffer : byteBuffers )
{
if ( byteBuffer != null )
{
byteBuffer.flip();
allByteBuffer.put( byteBuffer );
}
}
最終在sendRunnable的sendByteBuffer中完成數(shù)據(jù)發(fā)送祖今,注意發(fā)送前需要獲取一個信號量校坑,保證同一時間對一個連接只有一個線程在調(diào)用發(fā)送動作,并在writeCompleteHandler中記錄當(dāng)前連接發(fā)送數(shù)據(jù)千诬,并更新當(dāng)前連接活動時間耍目,為keepalive做準(zhǔn)備:
public void sendByteBuffer( ByteBuffer byteBuffer, Integer packetCount ) {
if ( byteBuffer == null )
{
log.error( "byteBuffer is null" );
return;
}
if ( !AioUtils.checkBeforeIO( channelContext ) )
{
return;
}
byteBuffer.flip();
AsynchronousSocketChannel asynchronousSocketChannel
= channelContext.getAsynchronousSocketChannel();
WriteCompletionHandler<Ext, P, R> writeCompletionHandler
= channelContext.getWriteCompletionHandler();
try
{
writeCompletionHandler.getWriteSemaphore().acquire();
} catch ( InterruptedException e )
{
log.error( e.toString(), e );
}
asynchronousSocketChannel.write( byteBuffer, packetCount, writeCompletionHandler );
}
注意發(fā)送時如果隊(duì)列中有待發(fā)送數(shù)據(jù)將直接在當(dāng)前進(jìn)程中繼續(xù)運(yùn)行runTask,而不是重新提交Runnable到線程池中徐绑,以盡快發(fā)送數(shù)據(jù):
if (queue.size() > 0) {
runTask();
}
關(guān)閉連接
在關(guān)閉連接時邪驮,首先將解碼,處理傲茄,發(fā)送runnable全部停止毅访,然后清空其數(shù)據(jù)隊(duì)列,最后檢查該連接是否已經(jīng) 調(diào)用過關(guān)閉盘榨,如果沒有喻粹,就將CloseRunnable提交到線程池中執(zhí)行:
public static < Ext, P extends Packet, R > void close( ChannelContext<Ext, P, R> channelContext, Throwable t, String remark, boolean isRemove )
{
channelContext.getDecodeRunnable().clearMsgQueue();
channelContext.getHandlerRunnableNormPrior().clearMsgQueue();
channelContext.getSendRunnableNormPrior().clearMsgQueue();
channelContext.getDecodeRunnable().setCanceled( true );
channelContext.getHandlerRunnableNormPrior().setCanceled( true );
channelContext.getSendRunnableNormPrior().setCanceled( true );
CloseRunnable<Ext, P, R> closeRunnable = channelContext.getCloseRunnable();
if ( closeRunnable.isWaitingExecute() )
{
log.error( "{},已經(jīng)在等待關(guān)閉\r\n本次關(guān)閉備注:{}\r\n第一次的備注:{}\r\n本次關(guān)閉異常:{}\r\n第一次時異常:{}", channelContext, remark, closeRunnable.getRemark(), t, closeRunnable.getT() );
return;
}
synchronized (closeRunnable) {
if ( closeRunnable.isWaitingExecute() ) /* double check */
{
return;
}
closeRunnable.setRemove( isRemove );
closeRunnable.setRemark( remark );
closeRunnable.setT( t );
closeRunnable.getExecutor().execute( closeRunnable );
closeRunnable.setWaitingExecute( true );
}
}
在CloseRunnable的runTask中,主要執(zhí)行關(guān)閉連接草巡,清理連接數(shù)據(jù)的邏輯:
//關(guān)閉連接
try
{
AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
if ( asynchronousSocketChannel != null )
{
asynchronousSocketChannel.close();
}
} catch ( Throwable e )
{
log.error( e.toString() );
}
//清理連接數(shù)據(jù)
//刪除集合中的維護(hù)信息 start
/*刪除集合中的維護(hù)信息 start */
try
{
groupContext.getConnections().remove( channelContext );
} catch ( Throwable e )
{
log.error( e.toString(), e );
}
try
{
groupContext.getClientNodes().remove( channelContext );
} catch ( Throwable e )
{
log.error( e.toString(), e );
}
try
{
groupContext.getUsers().unbind( channelContext );
} catch ( Throwable e )
{
log.error( e.toString(), e );
}
try
{
groupContext.getGroups().unbind( channelContext );
} catch ( Throwable e )
{
log.error( e.toString(), e );
}
channelContext.setClosed( true );
channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
/*刪除集合中的維護(hù)信息 end */
除此以外磷斧,如果連接是因?yàn)槌鲥e被關(guān)閉的,還會根據(jù)ReconnConf的配置進(jìn)行斷線重連操作捷犹,這個我們將在后文中講解。
本篇總結(jié)
本篇著重講解了talent-aio如何處理半包冕末、粘包的情況萍歉,對輸入進(jìn)行解包,并處理輸入數(shù)據(jù)包档桃,并返回?cái)?shù)據(jù)的枪孩。
在本系列的下一篇中,會講解talent-aio的斷線重連、keepalive等功能是如何實(shí)現(xiàn)的蔑舞,并且接受其是如何定制線程以及executor的拒担。最后,還將給出一個簡單的使用talent-io的IM server的示例攻询。預(yù)知后事如何从撼,請聽下回分解~