TcpConnection發(fā)送數(shù)據(jù)
之前我們的Channel僅僅用到了ReadCallback,而并沒有啟用WriteCallback,在本節(jié)中會設(shè)置為在需要時關(guān)注可寫事件,在TcpConnection中添加如下:
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
相應(yīng)的添加接收緩沖區(qū)與發(fā)送緩沖區(qū):
Buffer inputBuffer_; // 應(yīng)用層接收緩沖區(qū)
Buffer outputBuffer_; // 應(yīng)用層發(fā)送緩沖區(qū)
在TcpServer::newConnection()中注冊寫完成回調(diào)函數(shù):writeCompleteCallback_,即當(dāng)所有數(shù)據(jù)都已經(jīng)拷貝到內(nèi)核緩沖區(qū)時回調(diào)該函數(shù),即發(fā)送緩沖區(qū)被清空時.
conn->setWriteCompleteCallback(writeCompleteCallback_);
此函數(shù)的作用是如果我們向一個連接發(fā)送send()大流量的數(shù)據(jù)士袄,發(fā)送頻率不能太快她肯,因為如果對等方接收不及時,則內(nèi)核發(fā)送緩沖區(qū)會堆積數(shù)據(jù)氨鹏,根據(jù)前面的分析,我們會將數(shù)據(jù)添加到outputBuffer_压状,導(dǎo)致outputBuffer_ 增長太快仆抵,對此可以關(guān)注WriteCompleteCallback_ 跟继,當(dāng)它被調(diào)用時表示outputBuffer_ 已經(jīng)被清空,此時再次send()镣丑,可以有效的調(diào)整send()函數(shù)的發(fā)送頻率.相應(yīng)的還有一個highWaterMarkCallback_,可以當(dāng)作是”高水位標(biāo)“ 回調(diào)函數(shù)舔糖,即如果對等方接收不及時,outputBuffer_ 會一直增大莺匠,當(dāng)增長到highWaterMark_ (具體數(shù)值)時金吗,回調(diào)highWaterMarkCallback_ 函數(shù),很可能在函數(shù)內(nèi)主動shutdown.
除此之外,并在TcpConnection的接口中增加了send()函數(shù),這兩個函數(shù)都是可以跨線程調(diào)用的,因為其內(nèi)部實現(xiàn)都增加了兩個*InLoop函數(shù)(sendInloop函數(shù)),對應(yīng)前新的接口函數(shù),并使用Buffer作為輸出緩沖區(qū).
若在當(dāng)前IO線程調(diào)用send()函數(shù),它會把message復(fù)制一份,傳給IO線程中的sendInLoop()來發(fā)送,這是通過當(dāng)前線程處理doPendingFunctors 時被調(diào)用的sendInLoop函數(shù).
void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool error = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
// 通道沒有關(guān)注可寫事件并且應(yīng)用層發(fā)送緩沖區(qū)沒有數(shù)據(jù)趣竣,直接write
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 寫完了摇庙,回調(diào)writeCompleteCallback_
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE) // FIXME: any others?
{
error = true;
}
}
}
}
assert(remaining <= len);
// 沒有錯誤,并且還有未寫完的數(shù)據(jù)(說明內(nèi)核發(fā)送緩沖區(qū)滿遥缕,要將未寫完的數(shù)據(jù)添加到output buffer中)
if (!error && remaining > 0)
{
LOG_TRACE << "I am going to write more data";
size_t oldLen = outputBuffer_.readableBytes();
// 如果超過highWaterMark_(高水位標(biāo))卫袒,回調(diào)highWaterMarkCallback_
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting(); // 關(guān)注POLLOUT事件
}
}
寫的過程是先嘗試直接往內(nèi)核緩沖區(qū)內(nèi)write,如果內(nèi)核緩沖區(qū)滿了則將未寫完的數(shù)據(jù)添加到outputBuffer_中,并開始關(guān)注可寫事件,即POLLOUT事件,當(dāng)內(nèi)核緩沖區(qū)不滿時,調(diào)用handleWrite()函數(shù),在TcpConnection中設(shè)置了可寫回調(diào)函數(shù),以發(fā)送剩余的數(shù)據(jù).
// 內(nèi)核發(fā)送緩沖區(qū)有空間了,回調(diào)該函數(shù)
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 發(fā)送緩沖區(qū)已清空
{
channel_->disableWriting(); // 停止關(guān)注POLLOUT事件单匣,以免出現(xiàn)busy loop
if (writeCompleteCallback_) // 回調(diào)writeCompleteCallback_
{
// 應(yīng)用層發(fā)送緩沖區(qū)被清空夕凝,就回調(diào)用writeCompleteCallback_
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) // 發(fā)送緩沖區(qū)已清空并且連接狀態(tài)是kDisconnecting, 要關(guān)閉連接
{
shutdownInLoop(); // 關(guān)閉連接
}
}
else
{
LOG_TRACE << "I am going to write more data";
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
從outputBuffer中取出剩余的數(shù)據(jù)寫入到內(nèi)核緩沖區(qū),當(dāng)然有可能這一次還是不能完全的寫入,但只要應(yīng)用層的發(fā)送緩沖區(qū)還有數(shù)據(jù)就會一直關(guān)注POLLOUT事件,當(dāng)內(nèi)核緩沖區(qū)又有空間時,就再次回調(diào)此函數(shù),繼續(xù)寫入發(fā)送.一旦數(shù)據(jù)發(fā)送完畢,立刻停止關(guān)注可寫事件,以免造成busy loop.
TcpConnection中的shutdown()函數(shù)
TcpConnection中提供了shutdown()函數(shù),沒有提供close函數(shù),就是為了保證數(shù)據(jù)收發(fā)的完整性.即當(dāng)連接正在處于關(guān)閉狀態(tài),網(wǎng)絡(luò)庫會轉(zhuǎn)而調(diào)用shutdownInloop()函數(shù),繼續(xù)執(zhí)行關(guān)閉過程,該函數(shù)也是線程安全的.
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
void Socket::shutdownWrite()
{
sockets::shutdownWrite(sockfd_);
}
void sockets::shutdownWrite(int sockfd)
{
int ret = ::shutdown(sockfd, SHUT_WR);
// 檢查錯誤
}
若在關(guān)閉連接的過程中,應(yīng)用層緩沖區(qū)還有數(shù)據(jù)沒有發(fā)完,即還在關(guān)注POLLOUT事件,那么shutdownInLoop()中會先判斷isWriting() 為true,所以并不會直接執(zhí)行shutdownWrite()函數(shù).而當(dāng)數(shù)據(jù)已經(jīng)完全發(fā)完時,在
handleWrite() 函數(shù)中,會執(zhí)行channel_->disableWriting();停止關(guān)注了POLLOUT事件,而且會判斷是否為kDisconnecting狀態(tài),隨即調(diào)用shutdownInLoop()函數(shù)關(guān)閉連接.
當(dāng)我們這邊也已經(jīng)發(fā)送完數(shù)據(jù)了,于是我們調(diào)用shutdownInloop中的shutdownWrite,發(fā)送TCP FIN分節(jié),對方會讀到0 字節(jié),然后對方通常會關(guān)閉連接(無論shutdownWrite() 還是close())封孙,可讀事件發(fā)生調(diào)用handleRead()迹冤,這樣muduo 會讀到0 字節(jié),調(diào)用handleClose()虎忌,進(jìn)而調(diào)用connectionCallback_(定義了默認(rèn)的muduo::net::defaultConnectionCallback), 這樣客戶代碼就知道對方斷開連接了(判斷是否connected()),最后調(diào)用closeCallback_ (TcpServer::removeConnection(),newConnection函數(shù)中對conn注冊了)
測試:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
message1_.resize(100);
message2_.resize(200);
std::fill(message1_.begin(), message1_.end(), 'A');
std::fill(message2_.begin(), message2_.end(), 'B');
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
conn->send(message1_);
conn->send(message2_);
conn->shutdown();
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn,Buffer* buf, Timestamp receiveTime)
{
muduo::string msg1 = buf->retrieveAllAsString();
muduo::string msg(msg1);
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",msg.size(), conn->name().c_str(),receiveTime.toFormattedString().c_str());
conn->send(msg);
}
EventLoop* loop_;
TcpServer server_;
muduo::string message1_;
muduo::string message2_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
運行結(jié)果
main(): pid = 28316
20191013 06:25:26.252774Z 28316 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 06:25:26.252905Z 28316 TRACE EventLoop EventLoop created 0x7FFDBEA4B3C0 in thread 28316 - EventLoop.cc:62
20191013 06:25:26.252926Z 28316 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253053Z 28316 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253075Z 28316 TRACE loop EventLoop 0x7FFDBEA4B3C0 start looping - EventLoop.cc:94
20191013 06:25:28.237390Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.237708Z 28316 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191013 06:25:28.237782Z 28316 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706 - TcpServer.cc:93
20191013 06:25:28.237824Z 28316 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:62
20191013 06:25:28.237843Z 28316 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191013 06:25:28.237860Z 28316 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191013 06:25:28.237875Z 28316 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:231
20191013 06:25:28.237888Z 28316 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706
20191013 06:25:28.238018Z 28316 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:236
20191013 06:25:28.238036Z 28316 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20191013 06:25:28.238095Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.238133Z 28316 TRACE printActiveChannels {8: IN HUP } - EventLoop.cc:257
20191013 06:25:28.238147Z 28316 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191013 06:25:28.238183Z 28316 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:297
20191013 06:25:28.238194Z 28316 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20191013 06:25:28.238215Z 28316 TRACE handleClose [7] usecount=3 - TcpConnection.cc:305
20191013 06:25:28.238229Z 28316 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20191013 06:25:28.238238Z 28316 TRACE removeConnectionInLoop [8] usecount=6 - TcpServer.cc:157
20191013 06:25:28.238253Z 28316 TRACE removeConnectionInLoop [9] usecount=5 - TcpServer.cc:159
20191013 06:25:28.238272Z 28316 TRACE removeConnectionInLoop [10] usecount=6 - TcpServer.cc:170
20191013 06:25:28.238284Z 28316 TRACE handleClose [11] usecount=3 - TcpConnection.cc:308
20191013 06:25:28.238295Z 28316 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 06:25:28.238308Z 28316 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20191013 06:25:28.238328Z 28316 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:69
分析:程序使用nc命令進(jìn)行測試,當(dāng)程序建立之后,會回調(diào)TestServer::onConnection()函數(shù),然后會依次會調(diào)用send函數(shù)發(fā)送message1與message2,然后shutdown()會調(diào)用shutdownInLoop函數(shù),會一直等到outputBuffer_ 數(shù)據(jù)全部寫到內(nèi)核發(fā)送緩沖區(qū)才會真正關(guān)閉寫端泡徙,客戶端讀到數(shù)據(jù)后最后read 返回0,客戶端close導(dǎo)致服務(wù)端最終removeConnection.由最后可以看到,TcpConnection的析構(gòu)會在handleEvent處理完了之后才會執(zhí)行.