talent-aio源碼閱讀小記(二)

上篇文章中姑荷,我們提到了talent-aio的四類Task:DecodeRunnableHandlerRunnable颤霎、SendRunnable猪钮、CloseRunnable,并且分析了這些task的基類AbstractQueueRunnable述雾。在這篇文章中街州,我們就來分析一下這幾個task是如何相互協(xié)作來處理輸入、輸出數(shù)據(jù)以及客戶端連接的玻孟。

talent-aio 基礎(chǔ)類介紹


在開始介紹前唆缴,我們需要先介紹幾個talent-aio的基礎(chǔ)類和接口,在處理輸入輸出的整個流程中黍翎,都需要用到它們面徽。兩個基礎(chǔ)的類為:

ChannelContext
GroupContext 

其中ChannelContext類包含:

  1. AsynchronousSocketChannel對象、與該連接有關(guān)的
  2. DecodeRunnableHandlerRunnable趟紊、SendRunnable氮双、CloseRunnable
  3. GroupContext
  4. ReadCompletionHandler
    ChannelContext類代表一個socket連接,封裝了:連接對應(yīng)的AsynchronousSocketChannel對象霎匈;解碼戴差、處理接收到的數(shù)據(jù)和發(fā)送數(shù)據(jù)、關(guān)閉連接的幾個Runnable铛嘱;該連接所屬的GroupContext引用暖释;該連接的ReadCompletionHandler等。

在創(chuàng)建ChannelContext的過程中墨吓,首先記錄accept返回的asynchronousSocketChannel:

    public void setAsynchronousSocketChannel(AsynchronousSocketChannel
                         asynchronousSocketChannel)
    {
        this.asynchronousSocketChannel = asynchronousSocketChannel;

        if (asynchronousSocketChannel != null)
        {
            try
            {
                clientNode = getClientNode(asynchronousSocketChannel);
            } catch (IOException e)
            {
                log.error(e.toString(), e);
            }
        } else
        {
            clientNode = null;
        }
    }

而后設(shè)置GroupContext球匕,并創(chuàng)建幾個Runnable,最終將自己加入GroupContext管理的連接集合connections和對端集合clientNodes中:

    public void setGroupContext(GroupContext<Ext, P, R> groupContext)
    {
        this.groupContext = groupContext;

        if (groupContext != null)
        {
            decodeRunnable = new DecodeRunnable<>(this, 
groupContext.getDecodeExecutor());
            closeRunnable = new CloseRunnable<>(this, null, 
null, groupContext.getCloseExecutor());

            handlerRunnableNormPrior = new HandlerRunnable<>(this, 
groupContext.getHandlerExecutorNormPrior());

            sendRunnableNormPrior = new SendRunnable<>(this, 
groupContext.getSendExecutorNormPrior());

            groupContext.getConnections().add(this);
            groupContext.getClientNodes().put(this);
        }
    }

而GroupContext管理著當(dāng)前實(shí)例的所有連接connections肛真、連接對應(yīng)的用戶users以及用戶間的分組關(guān)系groups:


    protected ClientNodes<Ext, P, R> clientNodes = new ClientNodes<>();
    protected Connections<Ext, P, R> connections = new Connections<>();
    protected Groups<Ext, P, R> groups = new Groups<>();
    protected Users<Ext, P, R> users = new Users<>();

除此之外谐丢,GroupContext還管理著一些線程池:

/**
 * 解碼線程池
*/
private SynThreadPoolExecutor<SynRunnableIntf> decodeExecutor;

/**
 * 關(guān)閉連接的線程池
 */
private SynThreadPoolExecutor<SynRunnableIntf> closeExecutor;

/**
 * 業(yè)務(wù)處理線程池
 */
private SynThreadPoolExecutor<SynRunnableIntf> handlerExecutorNormPrior;

/**
 * 消息發(fā)送線程池
 */
private SynThreadPoolExecutor<SynRunnableIntf> sendExecutorNormPrior;

在處理輸入輸出的過程中我們會用到上面這兩個類。

talent-aio的主要的業(yè)務(wù)邏輯都在AioHandler接口的實(shí)現(xiàn)類中蚓让,AioHandler接口為:

public interface AioHandler<Ext, P extends Packet, R>
{

    /**
     * 處理消息包
     *
     * @param packet the packet
     * @return the r
     * @author: tanyaowu
     * @創(chuàng)建時間: 2016年11月15日 上午11:38:52
     */
    R handler(P packet,
 ChannelContext<Ext, P, R> channelContext) throws Exception;

    /**
     * 編碼
     *
     * @param packet the packet
     * @return the byte buffer
     * @author: tanyaowu
     * @創(chuàng)建時間: 2016年11月15日 上午11:38:52
     */
    ByteBuffer encode(P packet, 
ChannelContext<Ext, P, R> channelContext);

    /**
     * 根據(jù)ByteBuffer解碼成業(yè)務(wù)需要的Packet對象.
     *
     * @param buffer the buffer
     * @return the t
     * @throws AioDecodeException the aio decode exception
     */
    P decode(ByteBuffer buffer, ChannelContext<Ext, P, R> channelContext)
 throws AioDecodeException;

}

三個方法分別用來數(shù)據(jù)解包乾忱、數(shù)據(jù)封包以及處理數(shù)據(jù)包。

處理流程


讀取數(shù)據(jù)

首先历极,是處理讀取數(shù)據(jù):

    public void completed(Integer result, ByteBuffer byteBuffer)
    {
        GroupContext<Ext, P, R> groupContext 
  = channelContext.getGroupContext();
        if (result > 0)
        {
            byteBuffer.limit(byteBuffer.position());
            byteBuffer.position(0);
            DecodeRunnable<Ext, P, R> decodeRunnable
 = channelContext.getDecodeRunnable();
            decodeRunnable.addMsg(byteBuffer);

            groupContext.getDecodeExecutor().execute(decodeRunnable);

        } else if (result == 0)
        {
            log.error("讀到的數(shù)據(jù)長度為0");
        } else if (result < 0)
        {
            Aio.close(channelContext, null, "讀數(shù)據(jù)時返回" + result);
        }

        if (AioUtils.checkBeforeIO(channelContext))
        {
            AsynchronousSocketChannel asynchronousSocketChannel 
= channelContext.getAsynchronousSocketChannel();
            ByteBuffer newByteBuffer = 
      ByteBuffer.allocate(
channelContext.getGroupContext().getReadBufferSize()
);
            asynchronousSocketChannel.read(newByteBuffer,
 newByteBuffer, this);
        }

    }

在讀取數(shù)據(jù)后窄瘟,將byteBuffer提交到decodeRunnable的數(shù)據(jù)隊(duì)列中,而后繼續(xù)調(diào)用read方法讀取對端發(fā)送來的數(shù)據(jù)趟卸。

數(shù)據(jù)組包

在decodeRunnable中蹄葱,調(diào)用AioHandler的decode方法來獲取數(shù)據(jù)包,然后提交給HandlerRunnable處理锄列。在處理數(shù)據(jù)時主要考慮了半包和粘包的情況:

@Override
public void runTask()
{
    ConcurrentLinkedQueue<ByteBuffer> queue
 = getMsgQueue();
        
    ByteBuffer byteBuffer = null;
    label_1: while ((size = queue.size()) > 0)
    {
        byteBuffer = queue.poll();
        if (byteBuffer != null)
        {
            if (lastByteBuffer != null)
            {
                byteBuffer.position(0);
                byteBuffer = 
            ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
                lastByteBuffer = null;
            }
        } else {
            break label_1;
        }

        try
        {
            byteBuffer.position(0);
            label_2: while (true)
            {
                int initPosition = byteBuffer.position();
                P packet = 
        channelContext.
        getGroupContext().
        getAioHandler().
        decode(byteBuffer, channelContext);

                if (packet == null)// 數(shù)據(jù)不夠图云,組不了包
                {
                    if (log.isDebugEnabled())
                    {
                        log.debug("{},數(shù)據(jù)不夠,組不了包", 
                             channelContext.toString());
                    }
                    byteBuffer.position(initPosition);
                    lastByteBuffer = byteBuffer;
                    continue label_1;
                } else //組包成功
                {       
                    channelContext.getStat().
            setLatestTimeOfReceivedPacket(
              SystemTimer.currentTimeMillis()
            );  
                    int afterDecodePosition = byteBuffer.position();
                    int len = afterDecodePosition - initPosition;
                    AioListener<Ext, P, R> aioListener
       = channelContext.getGroupContext().getAioListener();
                    if (aioListener != null)
                    {
                          aioListener.
onAfterDecoded(channelContext, packet, len);
                    }
                    submit(packet, len);
                    channelContext.getGroupContext().
                                  getGroupStat().
                                  getReceivedPacket().
                                  incrementAndGet();

                                  channelContext.getGroupContext().
                                  getGroupStat().
                                  getReceivedBytes().
                                  addAndGet(len);
                        
                  if (byteBuffer.hasRemaining())//組包后邻邮,還剩有數(shù)據(jù)
                  {
                        if (log.isDebugEnabled())
                        {
                            log.debug("{}組包后竣况,還剩有數(shù)據(jù):{}", 
          channelContext, byteBuffer.limit() - byteBuffer.position());
                        }
                            continue label_2;
                        } else//組包后,數(shù)據(jù)剛好用完
                        {
                            lastByteBuffer = null;
                            log.debug("{},組包后筒严,數(shù)據(jù)剛好用完", 
                                                channelContext);
                            continue label_1;
                        }
                    }
                }
            } catch (AioDecodeException e)
            {
                log.error(channelContext.toString(), e);
                Aio.close(channelContext, e, "解碼異常:" 
                                + e.getMessage());
                break label_1;
            } finally
            {

            }
        }
    }

在這個循環(huán)中丹泉,首先從隊(duì)列中獲取數(shù)據(jù),獲取到數(shù)據(jù)后鸭蛙,看看是否存在粘包多出來的數(shù)據(jù)lastByteBuffer, 如果存在則將兩部分?jǐn)?shù)據(jù)合并摹恨。而后調(diào)用AioHandler的decode方法處理數(shù)據(jù),如果由于半包導(dǎo)致解包失敗娶视,則繼續(xù)從隊(duì)列中獲取數(shù)據(jù)晒哄,組合起來嘗試解包;如果解包成功,則將多余的數(shù)據(jù)放在lastByteBuffer揩晴,并且更新各種統(tǒng)計(jì)信息勋陪。最后,將組好的數(shù)據(jù)包通過submit方法傳遞給HandlerRunnable處理硫兰。

數(shù)據(jù)處理

HandlerRunnable的邏輯相對比較簡單诅愚,從數(shù)據(jù)隊(duì)列中獲取組好的包,并調(diào)用doPacket處理數(shù)據(jù)包:

    @Override
    public void runTask()
    {
        ConcurrentLinkedQueue<P> queue = getMsgQueue();
        P packet = null;
        while ((packet = queue.poll()) != null)
        {
            doPacket(packet);
        }
    }

在doPacket中劫映,調(diào)用AioHandler的hanlder解除處理數(shù)據(jù)包:

groupContext.getAioHandler().handler(packet, channelContext);

發(fā)送數(shù)據(jù)

發(fā)送數(shù)據(jù)使用Aio類的靜態(tài)方法send將packet添加到SendRunnable的數(shù)據(jù)隊(duì)列中违孝,而后將sendRunnable提交到線程池中運(yùn)行:

public static <Ext, P extends Packet, R> void send(
ChannelContext<Ext, P, R> channelContext,
 P packet
)
{
    if (channelContext == null)
    {
        log.error("channelContext == null");
        return;
    }
    SendRunnable<Ext, P, R> sendRunnable
   = AioUtils.selectSendRunnable(channelContext, packet);
    sendRunnable.addMsg(packet);
    SynThreadPoolExecutor<SynRunnableIntf> synThreadPoolExecutor
   = AioUtils.selectSendExecutor(channelContext, packet);
    synThreadPoolExecutor.execute(sendRunnable);
}

在線程池中,會嘗試一次發(fā)送所有等待發(fā)送的packet泳赋,不過對單次發(fā)送的packet設(shè)置了一個上限雌桑,而后對每個packet編碼,匯總到一個ByteBuffer中:

for ( int i = 0; i < queueSize; i++ )
{
    if ( (packet = queue.poll() ) != null )
    {
        ByteBuffer byteBuffer = aioHandler.encode( packet, channelContext );
        allBytebufferCapacity += byteBuffer.limit();
        packetCount++;
        byteBuffers[i] = byteBuffer;

        if ( aioListener != null )
        {
            try
            {
                aioListener.onBeforeSent( channelContext, packet );
            } catch ( Exception e )
            {
                log.error( e.toString(), e );
            }
        }
    } else{
        break;
    }
}
ByteBuffer allByteBuffer = ByteBuffer.allocate( allBytebufferCapacity );
for ( ByteBuffer byteBuffer : byteBuffers )
{
    if ( byteBuffer != null )
    {
        byteBuffer.flip();
        allByteBuffer.put( byteBuffer );
    }
}

最終在sendRunnable的sendByteBuffer中完成數(shù)據(jù)發(fā)送祖今,注意發(fā)送前需要獲取一個信號量校坑,保證同一時間對一個連接只有一個線程在調(diào)用發(fā)送動作,并在writeCompleteHandler中記錄當(dāng)前連接發(fā)送數(shù)據(jù)千诬,并更新當(dāng)前連接活動時間耍目,為keepalive做準(zhǔn)備:

public void sendByteBuffer( ByteBuffer byteBuffer, Integer packetCount ) {
    if ( byteBuffer == null )
    {
        log.error( "byteBuffer is null" );
        return;
    }

    if ( !AioUtils.checkBeforeIO( channelContext ) )
    {
        return;
    }

    byteBuffer.flip();
    AsynchronousSocketChannel  asynchronousSocketChannel    
                                              = channelContext.getAsynchronousSocketChannel();
    WriteCompletionHandler<Ext, P, R>  writeCompletionHandler       
                                             = channelContext.getWriteCompletionHandler();
    try
    {
        writeCompletionHandler.getWriteSemaphore().acquire();
    } catch ( InterruptedException e )
    {
        log.error( e.toString(), e );
    }
    asynchronousSocketChannel.write( byteBuffer, packetCount, writeCompletionHandler );
}

注意發(fā)送時如果隊(duì)列中有待發(fā)送數(shù)據(jù)將直接在當(dāng)前進(jìn)程中繼續(xù)運(yùn)行runTask,而不是重新提交Runnable到線程池中徐绑,以盡快發(fā)送數(shù)據(jù):

if (queue.size() > 0) {
    runTask();
}

關(guān)閉連接

在關(guān)閉連接時邪驮,首先將解碼,處理傲茄,發(fā)送runnable全部停止毅访,然后清空其數(shù)據(jù)隊(duì)列,最后檢查該連接是否已經(jīng) 調(diào)用過關(guān)閉盘榨,如果沒有喻粹,就將CloseRunnable提交到線程池中執(zhí)行:

public static < Ext, P extends Packet, R > void close( ChannelContext<Ext, P, R> channelContext, Throwable t, String remark, boolean isRemove )
{
    channelContext.getDecodeRunnable().clearMsgQueue();
    channelContext.getHandlerRunnableNormPrior().clearMsgQueue();
    channelContext.getSendRunnableNormPrior().clearMsgQueue();

    channelContext.getDecodeRunnable().setCanceled( true );
    channelContext.getHandlerRunnableNormPrior().setCanceled( true );
    channelContext.getSendRunnableNormPrior().setCanceled( true );

    CloseRunnable<Ext, P, R> closeRunnable = channelContext.getCloseRunnable();
    if ( closeRunnable.isWaitingExecute() )
    {
        log.error( "{},已經(jīng)在等待關(guān)閉\r\n本次關(guān)閉備注:{}\r\n第一次的備注:{}\r\n本次關(guān)閉異常:{}\r\n第一次時異常:{}", channelContext, remark, closeRunnable.getRemark(), t, closeRunnable.getT() );
        return;
    }
    synchronized (closeRunnable) {
        if ( closeRunnable.isWaitingExecute() ) /* double check */
        {
            return;
        }
        closeRunnable.setRemove( isRemove );
        closeRunnable.setRemark( remark );
        closeRunnable.setT( t );
        closeRunnable.getExecutor().execute( closeRunnable );
        closeRunnable.setWaitingExecute( true );
    }
}

在CloseRunnable的runTask中,主要執(zhí)行關(guān)閉連接草巡,清理連接數(shù)據(jù)的邏輯:

//關(guān)閉連接
try
{
    AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
    if ( asynchronousSocketChannel != null )
    {
        asynchronousSocketChannel.close();
    }
} catch ( Throwable e )
{
    log.error( e.toString() );
}

//清理連接數(shù)據(jù)
//刪除集合中的維護(hù)信息 start
/*刪除集合中的維護(hù)信息 start */
try
{
    groupContext.getConnections().remove( channelContext );
} catch ( Throwable e )
{
    log.error( e.toString(), e );
}

try
{
    groupContext.getClientNodes().remove( channelContext );
} catch ( Throwable e )
{
    log.error( e.toString(), e );
}

try
{
    groupContext.getUsers().unbind( channelContext );
} catch ( Throwable e )
{
    log.error( e.toString(), e );
}

try
{
    groupContext.getGroups().unbind( channelContext );
} catch ( Throwable e )
{
    log.error( e.toString(), e );
}

channelContext.setClosed( true );
channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
/*刪除集合中的維護(hù)信息 end */

除此以外磷斧,如果連接是因?yàn)槌鲥e被關(guān)閉的,還會根據(jù)ReconnConf的配置進(jìn)行斷線重連操作捷犹,這個我們將在后文中講解。

本篇總結(jié)


本篇著重講解了talent-aio如何處理半包冕末、粘包的情況萍歉,對輸入進(jìn)行解包,并處理輸入數(shù)據(jù)包档桃,并返回?cái)?shù)據(jù)的枪孩。
在本系列的下一篇中,會講解talent-aio的斷線重連、keepalive等功能是如何實(shí)現(xiàn)的蔑舞,并且接受其是如何定制線程以及executor的拒担。最后,還將給出一個簡單的使用talent-io的IM server的示例攻询。預(yù)知后事如何从撼,請聽下回分解~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市钧栖,隨后出現(xiàn)的幾起案子低零,更是在濱河造成了極大的恐慌,老刑警劉巖拯杠,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掏婶,死亡現(xiàn)場離奇詭異,居然都是意外死亡潭陪,警方通過查閱死者的電腦和手機(jī)雄妥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來依溯,“玉大人老厌,你說我怎么就攤上這事∈姆校” “怎么了梅桩?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拜隧。 經(jīng)常有香客問我宿百,道長,這世上最難降的妖魔是什么洪添? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任垦页,我火速辦了婚禮,結(jié)果婚禮上干奢,老公的妹妹穿的比我還像新娘痊焊。我一直安慰自己,他們只是感情好忿峻,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布薄啥。 她就那樣靜靜地躺著,像睡著了一般逛尚。 火紅的嫁衣襯著肌膚如雪垄惧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天绰寞,我揣著相機(jī)與錄音到逊,去河邊找鬼铣口。 笑死,一個胖子當(dāng)著我的面吹牛觉壶,可吹牛的內(nèi)容都是我干的脑题。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼铜靶,長吁一口氣:“原來是場噩夢啊……” “哼叔遂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起旷坦,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤掏熬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后秒梅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體旗芬,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年捆蜀,在試婚紗的時候發(fā)現(xiàn)自己被綠了疮丛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡辆它,死狀恐怖誊薄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锰茉,我是刑警寧澤呢蔫,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站飒筑,受9級特大地震影響片吊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜协屡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一俏脊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧肤晓,春花似錦爷贫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盈匾,卻和暖如春卷胯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背威酒。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工窑睁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人葵孤。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓担钮,卻偏偏與公主長得像,于是被迫代替她去往敵國和親尤仍。 傳聞我的和親對象是個殘疾皇子箫津,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容