Muduo_Day6(TcpConnection完善,發(fā)送數(shù)據(jù)以及shutdown)

TcpConnection發(fā)送數(shù)據(jù)

之前我們的Channel僅僅用到了ReadCallback,而并沒有啟用WriteCallback,在本節(jié)中會設(shè)置為在需要時關(guān)注可寫事件,在TcpConnection中添加如下:

channel_->setWriteCallback(
      boost::bind(&TcpConnection::handleWrite, this));

相應(yīng)的添加接收緩沖區(qū)與發(fā)送緩沖區(qū):

Buffer inputBuffer_;            // 應(yīng)用層接收緩沖區(qū)
Buffer outputBuffer_;           // 應(yīng)用層發(fā)送緩沖區(qū)

在TcpServer::newConnection()中注冊寫完成回調(diào)函數(shù):writeCompleteCallback_,即當(dāng)所有數(shù)據(jù)都已經(jīng)拷貝到內(nèi)核緩沖區(qū)時回調(diào)該函數(shù),即發(fā)送緩沖區(qū)被清空時.

conn->setWriteCompleteCallback(writeCompleteCallback_);

此函數(shù)的作用是如果我們向一個連接發(fā)送send()大流量的數(shù)據(jù)士袄,發(fā)送頻率不能太快她肯,因為如果對等方接收不及時,則內(nèi)核發(fā)送緩沖區(qū)會堆積數(shù)據(jù)氨鹏,根據(jù)前面的分析,我們會將數(shù)據(jù)添加到outputBuffer_压状,導(dǎo)致outputBuffer_ 增長太快仆抵,對此可以關(guān)注WriteCompleteCallback_ 跟继,當(dāng)它被調(diào)用時表示outputBuffer_ 已經(jīng)被清空,此時再次send()镣丑,可以有效的調(diào)整send()函數(shù)的發(fā)送頻率.相應(yīng)的還有一個highWaterMarkCallback_,可以當(dāng)作是”高水位標(biāo)“ 回調(diào)函數(shù)舔糖,即如果對等方接收不及時,outputBuffer_ 會一直增大莺匠,當(dāng)增長到highWaterMark_ (具體數(shù)值)時金吗,回調(diào)highWaterMarkCallback_ 函數(shù),很可能在函數(shù)內(nèi)主動shutdown.
除此之外,并在TcpConnection的接口中增加了send()函數(shù),這兩個函數(shù)都是可以跨線程調(diào)用的,因為其內(nèi)部實現(xiàn)都增加了兩個*InLoop函數(shù)(sendInloop函數(shù)),對應(yīng)前新的接口函數(shù),并使用Buffer作為輸出緩沖區(qū).
若在當(dāng)前IO線程調(diào)用send()函數(shù),它會把message復(fù)制一份,傳給IO線程中的sendInLoop()來發(fā)送,這是通過當(dāng)前線程處理doPendingFunctors 時被調(diào)用的sendInLoop函數(shù).

void TcpConnection::sendInLoop(const StringPiece& message)
{
  sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool error = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  // 通道沒有關(guān)注可寫事件并且應(yīng)用層發(fā)送緩沖區(qū)沒有數(shù)據(jù)趣竣,直接write
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      // 寫完了摇庙,回調(diào)writeCompleteCallback_
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE) // FIXME: any others?
        {
          error = true;
        }
      }
    }
  }

  assert(remaining <= len);
  // 沒有錯誤,并且還有未寫完的數(shù)據(jù)(說明內(nèi)核發(fā)送緩沖區(qū)滿遥缕,要將未寫完的數(shù)據(jù)添加到output buffer中)
  if (!error && remaining > 0)
  {
    LOG_TRACE << "I am going to write more data";
    size_t oldLen = outputBuffer_.readableBytes();
    // 如果超過highWaterMark_(高水位標(biāo))卫袒,回調(diào)highWaterMarkCallback_
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();        // 關(guān)注POLLOUT事件
    }
  }

寫的過程是先嘗試直接往內(nèi)核緩沖區(qū)內(nèi)write,如果內(nèi)核緩沖區(qū)滿了則將未寫完的數(shù)據(jù)添加到outputBuffer_中,并開始關(guān)注可寫事件,即POLLOUT事件,當(dāng)內(nèi)核緩沖區(qū)不滿時,調(diào)用handleWrite()函數(shù),在TcpConnection中設(shè)置了可寫回調(diào)函數(shù),以發(fā)送剩余的數(shù)據(jù).

// 內(nèi)核發(fā)送緩沖區(qū)有空間了,回調(diào)該函數(shù)
void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)    // 發(fā)送緩沖區(qū)已清空
      {
        channel_->disableWriting();     // 停止關(guān)注POLLOUT事件单匣,以免出現(xiàn)busy loop
        if (writeCompleteCallback_)     // 回調(diào)writeCompleteCallback_
        {
          // 應(yīng)用層發(fā)送緩沖區(qū)被清空夕凝,就回調(diào)用writeCompleteCallback_
          loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)   // 發(fā)送緩沖區(qū)已清空并且連接狀態(tài)是kDisconnecting, 要關(guān)閉連接
        {
          shutdownInLoop();     // 關(guān)閉連接
        }
      }
      else
      {
        LOG_TRACE << "I am going to write more data";
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

從outputBuffer中取出剩余的數(shù)據(jù)寫入到內(nèi)核緩沖區(qū),當(dāng)然有可能這一次還是不能完全的寫入,但只要應(yīng)用層的發(fā)送緩沖區(qū)還有數(shù)據(jù)就會一直關(guān)注POLLOUT事件,當(dāng)內(nèi)核緩沖區(qū)又有空間時,就再次回調(diào)此函數(shù),繼續(xù)寫入發(fā)送.一旦數(shù)據(jù)發(fā)送完畢,立刻停止關(guān)注可寫事件,以免造成busy loop.

TcpConnection中的shutdown()函數(shù)

TcpConnection中提供了shutdown()函數(shù),沒有提供close函數(shù),就是為了保證數(shù)據(jù)收發(fā)的完整性.即當(dāng)連接正在處于關(guān)閉狀態(tài),網(wǎng)絡(luò)庫會轉(zhuǎn)而調(diào)用shutdownInloop()函數(shù),繼續(xù)執(zhí)行關(guān)閉過程,該函數(shù)也是線程安全的.

void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}
void Socket::shutdownWrite()
{
    sockets::shutdownWrite(sockfd_);
}

void sockets::shutdownWrite(int sockfd)
{
    int ret = ::shutdown(sockfd, SHUT_WR);
    // 檢查錯誤
}

若在關(guān)閉連接的過程中,應(yīng)用層緩沖區(qū)還有數(shù)據(jù)沒有發(fā)完,即還在關(guān)注POLLOUT事件,那么shutdownInLoop()中會先判斷isWriting() 為true,所以并不會直接執(zhí)行shutdownWrite()函數(shù).而當(dāng)數(shù)據(jù)已經(jīng)完全發(fā)完時,在
handleWrite() 函數(shù)中,會執(zhí)行channel_->disableWriting();停止關(guān)注了POLLOUT事件,而且會判斷是否為kDisconnecting狀態(tài),隨即調(diào)用shutdownInLoop()函數(shù)關(guān)閉連接.

shutdown時序圖.png

當(dāng)我們這邊也已經(jīng)發(fā)送完數(shù)據(jù)了,于是我們調(diào)用shutdownInloop中的shutdownWrite,發(fā)送TCP FIN分節(jié),對方會讀到0 字節(jié),然后對方通常會關(guān)閉連接(無論shutdownWrite() 還是close())封孙,可讀事件發(fā)生調(diào)用handleRead()迹冤,這樣muduo 會讀到0 字節(jié),調(diào)用handleClose()虎忌,進(jìn)而調(diào)用connectionCallback_(定義了默認(rèn)的muduo::net::defaultConnectionCallback), 這樣客戶代碼就知道對方斷開連接了(判斷是否connected()),最后調(diào)用closeCallback_ (TcpServer::removeConnection(),newConnection函數(shù)中對conn注冊了)
測試:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));

    message1_.resize(100);
    message2_.resize(200);
    std::fill(message1_.begin(), message1_.end(), 'A');
    std::fill(message2_.begin(), message2_.end(), 'B');
  }
   void start()
  {
      server_.start();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
      conn->send(message1_);
      conn->send(message2_);
      conn->shutdown();
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
  void onMessage(const TcpConnectionPtr& conn,Buffer* buf, Timestamp receiveTime)
  {
    muduo::string msg1 = buf->retrieveAllAsString();
    muduo::string msg(msg1); 
    printf("onMessage(): received %zd bytes from connection [%s] at %s\n",msg.size(), conn->name().c_str(),receiveTime.toFormattedString().c_str());
    conn->send(msg);
  }

  EventLoop* loop_;
  TcpServer server_;

  muduo::string message1_;
  muduo::string message2_;
};
int main()
{
  printf("main(): pid = %d\n", getpid());

  InetAddress listenAddr(8888);
  EventLoop loop;

  TestServer server(&loop, listenAddr);
  server.start();
  loop.loop();
}

運行結(jié)果
main(): pid = 28316
20191013 06:25:26.252774Z 28316 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 06:25:26.252905Z 28316 TRACE EventLoop EventLoop created 0x7FFDBEA4B3C0 in thread 28316 - EventLoop.cc:62
20191013 06:25:26.252926Z 28316 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253053Z 28316 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253075Z 28316 TRACE loop EventLoop 0x7FFDBEA4B3C0 start looping - EventLoop.cc:94
20191013 06:25:28.237390Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.237708Z 28316 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191013 06:25:28.237782Z 28316 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706 - TcpServer.cc:93
20191013 06:25:28.237824Z 28316 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:62
20191013 06:25:28.237843Z 28316 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191013 06:25:28.237860Z 28316 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191013 06:25:28.237875Z 28316 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:231
20191013 06:25:28.237888Z 28316 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706
20191013 06:25:28.238018Z 28316 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:236
20191013 06:25:28.238036Z 28316 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20191013 06:25:28.238095Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.238133Z 28316 TRACE printActiveChannels {8: IN HUP } - EventLoop.cc:257
20191013 06:25:28.238147Z 28316 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191013 06:25:28.238183Z 28316 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:297
20191013 06:25:28.238194Z 28316 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20191013 06:25:28.238215Z 28316 TRACE handleClose [7] usecount=3 - TcpConnection.cc:305
20191013 06:25:28.238229Z 28316 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20191013 06:25:28.238238Z 28316 TRACE removeConnectionInLoop [8] usecount=6 - TcpServer.cc:157
20191013 06:25:28.238253Z 28316 TRACE removeConnectionInLoop [9] usecount=5 - TcpServer.cc:159
20191013 06:25:28.238272Z 28316 TRACE removeConnectionInLoop [10] usecount=6 - TcpServer.cc:170
20191013 06:25:28.238284Z 28316 TRACE handleClose [11] usecount=3 - TcpConnection.cc:308
20191013 06:25:28.238295Z 28316 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 06:25:28.238308Z 28316 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20191013 06:25:28.238328Z 28316 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:69
分析:程序使用nc命令進(jìn)行測試,當(dāng)程序建立之后,會回調(diào)TestServer::onConnection()函數(shù),然后會依次會調(diào)用send函數(shù)發(fā)送message1與message2,然后shutdown()會調(diào)用shutdownInLoop函數(shù),會一直等到outputBuffer_ 數(shù)據(jù)全部寫到內(nèi)核發(fā)送緩沖區(qū)才會真正關(guān)閉寫端泡徙,客戶端讀到數(shù)據(jù)后最后read 返回0,客戶端close導(dǎo)致服務(wù)端最終removeConnection.由最后可以看到,TcpConnection的析構(gòu)會在handleEvent處理完了之后才會執(zhí)行.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末膜蠢,一起剝皮案震驚了整個濱河市堪藐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌挑围,老刑警劉巖礁竞,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異杉辙,居然都是意外死亡模捂,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門蜘矢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狂男,“玉大人,你說我怎么就攤上這事品腹♂常” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵舞吭,是天一觀的道長泡垃。 經(jīng)常有香客問我析珊,道長,這世上最難降的妖魔是什么蔑穴? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任忠寻,我火速辦了婚禮,結(jié)果婚禮上存和,老公的妹妹穿的比我還像新娘锡溯。我一直安慰自己,他們只是感情好哑姚,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芜茵,像睡著了一般叙量。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上九串,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天绞佩,我揣著相機(jī)與錄音,去河邊找鬼猪钮。 笑死品山,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烤低。 我是一名探鬼主播肘交,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扑馁!你這毒婦竟也來了涯呻?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤腻要,失蹤者是張志新(化名)和其女友劉穎复罐,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雄家,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡效诅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了趟济。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乱投。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖咙好,靈堂內(nèi)的尸體忽然破棺而出篡腌,到底是詐尸還是另有隱情,我是刑警寧澤勾效,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布嘹悼,位于F島的核電站叛甫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏杨伙。R本人自食惡果不足惜其监,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望限匣。 院中可真熱鬧抖苦,春花似錦、人聲如沸米死。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽峦筒。三九已至究西,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間物喷,已是汗流浹背卤材。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留峦失,地道東北人扇丛。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像尉辑,于是被迫代替她去往敵國和親帆精。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345