runInLoop相關(guān)
在之前得文章中提到了EventLoop::runInLoop()
赌渣,該函數(shù)用于在EventLoop
的IO線程執(zhí)行某個(gè)用戶的任務(wù)回調(diào)肾胯,源碼如下:
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread()) { //判斷是否在當(dāng)前IO線程
cb(); //同步調(diào)用
} else {
queueInLoop(cb); //加入隊(duì)列
}
}
若用戶在其他線程調(diào)用runInLoop()
岸霹,則執(zhí)行queueInLoop()
,其實(shí)現(xiàn)如下:
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
}
該函數(shù)將cb
添加到隊(duì)列pendingFunctors_
中大咱,并喚醒IO線程赌厅。
1. 隊(duì)列中的回調(diào)是如何觸發(fā)的
在EventLoop::loop()
的事件循環(huán)中咆霜,通過doPendingFunctors()
來執(zhí)行隊(duì)列中的任務(wù)回調(diào)邓馒,實(shí)現(xiàn)如下:
void EventLoop::loop()
{
//...
while (!quit_) {
//...
doPendingFunctors();
}
//...
}
這里可能會(huì)有疑問,IO線程會(huì)阻塞在事件循環(huán)EventLoop::loop()
的poll
調(diào)用中蛾坯,因此通過loop()
來觸發(fā)用戶回調(diào)光酣,如果EventLoop
中一直沒有事件觸發(fā),那poll
會(huì)一直阻塞脉课,從而導(dǎo)致用戶回調(diào)一直無法執(zhí)行救军。
所以為了及時(shí)觸發(fā)用戶回調(diào)财异,我們需要去喚醒IO線程。
2. 喚醒的實(shí)現(xiàn)
書中提到唱遭,傳統(tǒng)的做法是使用pipe(2)
戳寸,讓IO線程監(jiān)視此管道的可讀事件。在需要喚醒時(shí)胆萧,往管道寫入一個(gè)字節(jié)來觸發(fā)喚醒庆揩。
muduo中使用了eventfd(2)
來實(shí)現(xiàn)IO線程的喚醒俐东。其不必管理緩沖區(qū)跌穗,可以更高效地喚醒。
在EventLoop
構(gòu)造時(shí)虏辫,創(chuàng)建eventfd并注冊可讀事件蚌吸,將事件分發(fā)至EventLoop::handleRead()
。
和傳統(tǒng)做法一樣砌庄,wakeup()
的實(shí)現(xiàn)就是對(duì)eventfd進(jìn)行寫操作羹唠,從而觸發(fā)可讀事件達(dá)到喚醒IO線程的目的。
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
//...
}
3. doPendingFunctors的實(shí)現(xiàn)
上文提到在EventLoop::loop()
事件循環(huán)最后娄昆,觸發(fā)隊(duì)列中的任務(wù)回調(diào)佩微,其實(shí)現(xiàn)如下:
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
這段代碼有兩處需要注意的:
鎖的范圍
doPendingFunctors()
沒有直接在臨界區(qū)內(nèi)依次調(diào)用任務(wù)回調(diào),而且sawp()
到局部變量中(減小了臨界區(qū)的長度)萌焰。否則哺眯,鎖會(huì)一直等到所有回調(diào)函數(shù)處理完才釋放,阻塞其他線程調(diào)用queueInLoop()
扒俯。callingPendingFunctors_
代碼中使用callingPendingFunctors_
來標(biāo)記是否在執(zhí)行doPendingFunctors()
過程中奶卓,目的是為了queueInLoop()
來判斷喚醒的時(shí)機(jī)。
4. 喚醒的時(shí)機(jī)
在queueInLoop()
中撼玄,是否需要weakup()
進(jìn)行了這樣的判斷:
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
即夺姑,當(dāng)調(diào)用queueInLoop()
的線程不是IO線程;以及在IO線程調(diào)用queueInLoop()
掌猛,但此時(shí)正在執(zhí)行doPendingFunctors()
過程中盏浙,才需要喚醒。
因?yàn)槔蟛纾?dāng)queueInLoop()
在IO線程中調(diào)用废膘,doPendingFunctors()
就會(huì)在EventLoop::loop()
事件循環(huán)的最后被調(diào)用,所以此時(shí)無須喚醒兔院。
TcpServer
創(chuàng)建TcpServer
對(duì)象殖卑,在其構(gòu)造時(shí)通過Acceptor
來獲得新連接的fd
。在Acceptor
的構(gòu)造函數(shù)中調(diào)用了[1]socket()
和[2]bind()
坊萝。
啟動(dòng)函數(shù)TcpServer::start()
通過Acceptor::listen()
(以runInLoop的方式)調(diào)用[3]listen()
孵稽,并注冊了可讀事件许起。
當(dāng)新連接請求時(shí),觸發(fā)可讀事件菩鲜,在其回調(diào)函數(shù)Acceptor::handleRead()
中調(diào)用了[4]accept()
并回調(diào)了TcpServer::newConnection()
园细,其實(shí)現(xiàn)如下:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(
new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
函數(shù)中創(chuàng)建了TcpConnection
對(duì)象,把它加入ConnectionMap
中管理并設(shè)置了相關(guān)回調(diào)接校。
注:以上的 [1] [2] [3] [4] 完成了一次完整的連接猛频。
TcpConnection
TcpConnection
是muduo中唯一默認(rèn)使用智能指針管理的類,也是唯一繼承enable_shared_from_this
的類蛛勉。有關(guān)enable_shared_from_this
可以參考share_ptr相關(guān)鹿寻。
TcpConnection
作用就是使用Channel
來獲得socket上的IO事件,執(zhí)行各種回調(diào)诽凌。相關(guān)事件的處理毡熏,后面的文章再具體介紹。