前 言
上一篇文章介紹了連接的創(chuàng)建,引出了TcpConnection
類齐唆。其作用就是處理socket上的IO事件遣铝,執(zhí)行各種回調(diào)。本文介紹TcpConnection
對(duì)斷開連接忘闻、讀取數(shù)據(jù)钝计、發(fā)送數(shù)據(jù)的處理。
斷開連接
連接的關(guān)閉分為主動(dòng)斷開和被動(dòng)斷開齐佳,兩者的處理方式基本一致葵蒂。muduo
采用的連接關(guān)閉方式:被動(dòng)斷開,其核心函數(shù)為TcpConnection::handleClose()
重虑。書中提到践付,如果需要主動(dòng)斷開厦滤,添加一個(gè)接口調(diào)用handleClose()
即可问词。
對(duì)于遠(yuǎn)端連接斷開的感知:在可讀事件處理函數(shù)handleRead()
中,當(dāng)read返回值為0時(shí)垛孔,即遠(yuǎn)端斷開了連接,調(diào)用TcpConnection::handleClose()
命爬。此時(shí)處理如下:
1. 取消所有關(guān)注的IO事件
2. 調(diào)用用戶注冊(cè)回調(diào)ConnectionCallback
3. 調(diào)用closeCallback_()
曹傀,此回調(diào)綁定到TcpServer::removeConnection()
在removeConnection()
中處理如下:
4. 將對(duì)應(yīng)的TcpConnection
對(duì)象從TcpServer
中移除
5. 調(diào)用TcpConnection::connectDestroyed()
,并通過std::bind()
將TcpConnection
對(duì)象的生命周期延長(zhǎng)到執(zhí)行完成connectDestroyed()
6. 將連接對(duì)應(yīng)的Channel
從EventLoop
中移除
7. TcpConnection
析構(gòu)饲宛,成員socket_
引用計(jì)數(shù)為0皆愉,其析構(gòu)時(shí)會(huì)調(diào)用close()
,關(guān)閉連接的fd
讀取數(shù)據(jù)
新連接建立時(shí)艇抠,通過TcpConnection::connectEstablished()
注冊(cè)可讀事件幕庐,當(dāng)觸發(fā)可讀事件時(shí)調(diào)用回調(diào),即TcpConnection::handleRead()
家淤,其主要內(nèi)容如下:
void TcpConnection::handleRead(Timestamp receiveTime) {
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
handleError();
}
}
這里主要進(jìn)行了兩個(gè)處理:
1. 讀取數(shù)據(jù)到inputBuffer_
中异剥,其使用Buffer::readFd()
來(lái)實(shí)現(xiàn),具體如下絮重。
2. 調(diào)用用戶回調(diào)messageCallback_
冤寿,此函數(shù)是在建立新連接時(shí),將TcpServer
的成員函數(shù)MessageCallback
設(shè)置為回調(diào)青伤,其由用戶提供督怜。
Buffer::readFd()
實(shí)現(xiàn)(非源碼)如下:
ssize_t Buffer::readFd(int fd, int & savedErrno) {
// 申請(qǐng)棧上空間
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
// 兩塊iovec分別指向內(nèi)部buffer的可寫空間和棧上空間
vec[0].iov_base = begin() + m_writerIndex;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// 判斷內(nèi)部buffer的可寫空間是否足夠
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0) {
savedErrno = errno;
} else if (static_cast<size_t>(n) <= writable) {
// 內(nèi)部空間足夠,直接寫入狠角,移動(dòng)可寫索引
m_writerIndex += n;
} else {
// 內(nèi)部空間不足号杠,先寫入棧上空間,再將棧上數(shù)據(jù)append到內(nèi)部空間
m_writerIndex = m_buffer.size();
append(extrabuf, n - writable);
}
return n;
}
書中提到擎厢,此實(shí)現(xiàn)一是使用了scatter/gather IO
(分離/聚散IO)究流,配合內(nèi)部棧空間使用动遭;二是muduo采用level trigger(LT)
模式芬探,只需要調(diào)用一次read(2)
且不會(huì)丟失數(shù)據(jù)。從而兼顧了內(nèi)存使用量和效率厘惦。
發(fā)送數(shù)據(jù)
數(shù)據(jù)的發(fā)送通過TcpConnection::send()
實(shí)現(xiàn)偷仿,代碼如下:
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(message);
} else {
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message.as_string()));
}
}
}
在確保是連接狀態(tài)的情況下,如果在當(dāng)前IO線程觸發(fā)就調(diào)用TcpConnection::sendInLoop()
宵蕉,反之則使用runInLoop
將該任務(wù)拋給IO線程執(zhí)行酝静。有關(guān)runInLoop
的內(nèi)容在上一篇已經(jīng)介紹過,這里不再贅述羡玛。
在TcpConnection::sendInLoop()
中别智,處理如下:
1. 若outputBuffer_
為空,直接發(fā)送數(shù)據(jù)
2. 若發(fā)送數(shù)據(jù)沒有寫完稼稿,統(tǒng)計(jì)剩余的字節(jié)數(shù)薄榛,將剩余數(shù)據(jù)寫入outputBuffer_
3. 注冊(cè)可寫事件
當(dāng)socket可寫時(shí)讳窟,調(diào)用TcpConnection::handleWrite()
,繼續(xù)發(fā)送outputBuffer_
中的數(shù)據(jù)敞恋,一旦發(fā)送完成丽啡,立刻將可寫事件移除。
此流程需要注意的是可寫事件觀察的范圍硬猫,可以看出只有在outputBuffer_
中有數(shù)據(jù)時(shí)补箍,才會(huì)注冊(cè)觀察可寫事件,因?yàn)楫?dāng)outputBuffer_
中沒數(shù)據(jù)時(shí)啸蜜,此時(shí)socket一直是處于可寫狀態(tài)的坑雅, 這將會(huì)導(dǎo)致一直觸發(fā)TcpConnection::handleWrite()
,而我們并沒有數(shù)據(jù)需要發(fā)送盔性。所以此觸發(fā)沒有意義霞丧,不需要去關(guān)注呢岗。
此外冕香,數(shù)據(jù)發(fā)送的流程中:
當(dāng)outputBuffer_
中的舊數(shù)據(jù)字節(jié)和剩余數(shù)據(jù)字節(jié)之和大于highWaterMark_
時(shí),會(huì)將highWaterMarkCallback_
放入待執(zhí)行隊(duì)列中
當(dāng)數(shù)據(jù)發(fā)送完畢時(shí)后豫,會(huì)調(diào)用writeCompleteCallback_
兩者配合使用悉尾,可以起到限流的作用。
更多內(nèi)容挫酿,詳見github NetLib
參考:
《Linux多線程服務(wù)端編程》陳碩 著
muduo源碼