EventLoopThread類
一個程序可以有不止一個IO線程,IO線程也不一定是主線程,我們可以在任何一個線程創(chuàng)建并運行Eventloop.且任何一個線程只要創(chuàng)建并運行了Eventloop,就稱該線程為IO線程.
EventloopThread類封裝了IO線程,該類創(chuàng)建了一個線程,并在線程函數中創(chuàng)建了一個Eventloop對象,然后將其地址賦值給loop_成員變量,然后notify()條件變量,喚醒startLoop()函數,startLoop()中會啟動兩個線程,一個線程是調用Eventloop::startLoop()線程,一個是執(zhí)行Eventloop::threadFunc()線程(在構造函數中boost::bind的函數接口).
構造函數:
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
: loop_(NULL),
exiting_(false),
thread_(boost::bind(&EventLoopThread::threadFunc, this)), //綁定回調函數
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
startloop函數:
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start(); //啟動創(chuàng)建線程,
//一個是調用EventLoopThread::startLoop()的線程,一個是執(zhí)行EventLoopThread::threadFunc()的線程(IO線程)
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();
}
}
return loop_;
}
threadFunc函數:
void EventLoopThread::threadFunc()
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
// loop_指針指向了一個棧上的對象,threadFunc函數退出之后到腥,這個指針就失效了,棧上對象自動析構
// threadFunc函數退出艺沼,就意味著線程退出了世澜,EventLoopThread對象也就沒有存在的價值了揣云。
// 因而不會有什么大的問題
loop_ = &loop;
cond_.notify();
}
loop.loop();
}
測試程序:
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
EventLoopThread loopThread;
EventLoop* loop = loopThread.startLoop();
// 異步調用runInThread匣椰,即將runInThread添加到loop對象所在IO線程,讓該IO線程執(zhí)行
loop->runInLoop(runInThread);
sleep(1);
// runAfter內部也調用了runInLoop枫攀,所以這里也是異步調用,IO線程添加一個2s的定時器
loop->runAfter(2, runInThread);
sleep(3);
loop->quit();
//~EventLoopThread()會調用loop_->quit();
printf("exit main().\n");
}
輸出
main(): pid = 18547, tid = 18547
runInThread(): pid = 18547, tid = 18548
runInThread(): pid = 18547, tid = 18548
exit main()
分析:主線程創(chuàng)建一個EventloopThread對象,該對象通過調用startLoop()函數返回一個EventLoop兌現發(fā)給的地址,并且在該函數中通過thread_.start()創(chuàng)建一個線程,該創(chuàng)建的線程為IO線程.且由于主線程并不是IO線程,所以調用runInLoop()函數實現跨線程調用,將runInThread通過queueInLoop()函數添加到任務隊列.然后wakeup() IO線程,IO線程在doPendingFunctors() 中取出隊列的runInThread()執(zhí)行,從運行結果可以看出IO線程的tid與主線程不一樣.同理,loop->runAfter(2, runInThread);timerfd_ 可讀株茶,先handleRead()一下然后執(zhí)行回調函數runInThread().
EventLoopThreadPool 線程池類
用one loop per thread的思想實現多線程TcpServer的關鍵步驟是在新建TcpConnection時從event loop pool里挑選一個loop給TcpConnection用.即多線程TcpServer自己的Eventloop只用來接受新連接,而新的連接會用其他EventLoop來執(zhí)行IO.相反單線程TcpServer的Eventloop是與TcpConnection共享的.
IO線程池的功能是開啟若干個IO線程,并讓這些IO線程處于事件循環(huán)的狀態(tài).
Threadpool實現
class EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
EventLoop* getNextLoop(); //Tcpserver每次新建一個TcpConnection就會調用該函數來取得Eventloop.
//當Eventloop列表loops_為空時,即為單線程服務時,返回baseLoop_,即TcpServer自己用的那個loop,若為
//非空,則按照輪詢的調度方式選擇一個Eventloop.
private:
EventLoop* baseLoop_; // 與Acceptor所屬EventLoop相同
bool started_;
int numThreads_; // 線程數,除去mainReactor
int next_; // 新連接到來来涨,所選擇的EventLoop對象下標
boost::ptr_vector<EventLoopThread> threads_; // IO線程列表
std::vector<EventLoop*> loops_; // EventLoop列表
};
start函數的實現:
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
EventLoopThread* t = new EventLoopThread(cb);
threads_.push_back(t);
loops_.push_back(t->startLoop()); // 啟動EventLoopThread線程,在進入事件循環(huán)之前启盛,會調用cb
}
if (numThreads_ == 0 && cb)
{
// 只有一個EventLoop蹦掐,在這個EventLoop進入事件循環(huán)之前技羔,調用cb
cb(baseLoop_);
}
}
其中,baseLoop_與TcpServer類和Acceptor類中的私有成員變量EventLoop* loop_是相同的,即主線程中的mainReactor處理監(jiān)聽事件,已連接套接字事件輪詢線程池中的subReactors處理.因此在TcpServer中創(chuàng)建一個newConnection時進行修改,只需把原來TcpServe自用的loop_傳給TcpConnection改為現在每次從EventloopThreadpool中取得ioLoop.
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
// 按照輪叫的方式選擇一個EventLoop
EventLoop* ioLoop = threadPool_->getNextLoop();
.....
TcpConnectionPtr conn(...
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}
測試程序
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr, int numThreads)
: loop_(loop),
server_(loop, listenAddr, "TestServer"),
numThreads_(numThreads)
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(numThreads);
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn,
const char* data,
ssize_t len)
{
printf("onMessage(): received %zd bytes from connection [%s]\n",
len, conn->name().c_str());
}
EventLoop* loop_;
TcpServer server_;
int numThreads_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr,4);
server.start();
loop.loop();
}
開啟兩個終端運行"nc 127.0.0.1 8888",一個輸入"aaaa",一個輸"bbbb"運行結果:
**main(): pid = 1492**
20191012 13:28:49.581281Z 1492 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191012 13:28:49.581472Z 1492 TRACE EventLoop EventLoop created 0x7FFED67C0870 in thread 1492 - EventLoop.cc:62
20191012 13:28:49.581490Z 1492 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191012 13:28:49.581799Z 1493 TRACE updateChannel fd = 9 events = 3 - EPollPoller.cc:104
20191012 13:28:49.581844Z 1493 TRACE EventLoop EventLoop created 0x7FAC2121FA40 in thread 1493 - EventLoop.cc:62
20191012 13:28:49.581868Z 1493 TRACE updateChannel fd = 10 events = 3 - EPollPoller.cc:104
20191012 13:28:49.581902Z 1493 TRACE loop EventLoop 0x7FAC2121FA40 start looping - EventLoop.cc:94
20191012 13:28:49.582118Z 1494 TRACE updateChannel fd = 12 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582167Z 1494 TRACE EventLoop EventLoop created 0x7FAC20A1EA40 in thread 1494 - EventLoop.cc:62
20191012 13:28:49.582185Z 1494 TRACE updateChannel fd = 13 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582207Z 1494 TRACE loop EventLoop 0x7FAC20A1EA40 start looping - EventLoop.cc:94
20191012 13:28:49.582407Z 1495 TRACE updateChannel fd = 15 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582561Z 1495 TRACE EventLoop EventLoop created 0x7FAC1BFFEA40 in thread 1495 - EventLoop.cc:62
20191012 13:28:49.582613Z 1495 TRACE updateChannel fd = 16 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582651Z 1495 TRACE loop EventLoop 0x7FAC1BFFEA40 start looping - EventLoop.cc:94
20191012 13:28:49.582814Z 1496 TRACE updateChannel fd = 18 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582854Z 1496 TRACE EventLoop EventLoop created 0x7FAC1B7FDA40 in thread 1496 - EventLoop.cc:62
20191012 13:28:49.582871Z 1496 TRACE updateChannel fd = 19 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582891Z 1496 TRACE loop EventLoop 0x7FAC1B7FDA40 start looping - EventLoop.cc:94
20191012 13:28:49.582936Z 1492 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191012 13:28:49.582971Z 1492 TRACE loop EventLoop 0x7FFED67C0870 start looping - EventLoop.cc:94
20191012 13:29:03.904403Z 1492 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:03.904700Z 1492 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191012 13:29:03.904778Z 1492 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:35708 - TcpServer.cc:93
20191012 13:29:03.904815Z 1492 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x18BB020 fd=20 - TcpConnection.cc:62
20191012 13:29:03.904833Z 1492 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191012 13:29:03.904858Z 1492 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191012 13:29:03.904894Z 1492 TRACE newConnection [5] usecount=3 - TcpServer.cc:122
20191012 13:29:03.904913Z 1493 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:03.904971Z 1493 TRACE printActiveChannels {10: IN } - EventLoop.cc:257
20191012 13:29:03.905012Z 1493 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78
20191012 13:29:03.905024Z 1493 TRACE updateChannel fd = 20 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:35708
20191012 13:29:03.905071Z 1493 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83
20191012 13:29:16.368826Z 1492 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191012 13:29:16.368843Z 1492 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#2] from 127.0.0.1:35712 - TcpServer.cc:93
20191012 13:29:16.368851Z 1492 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#2] at 0x18BB310 fd=21 - TcpConnection.cc:62
20191012 13:29:16.368857Z 1492 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191012 13:29:16.368866Z 1492 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191012 13:29:16.368877Z 1492 TRACE newConnection [5] usecount=3 - TcpServer.cc:122
20191012 13:29:16.368881Z 1494 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:16.368896Z 1494 TRACE printActiveChannels {13: IN } - EventLoop.cc:257
20191012 13:29:16.368907Z 1494 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78
20191012 13:29:16.368911Z 1494 TRACE updateChannel fd = 21 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#2] from 127.0.0.1:35712
20191012 13:29:16.368923Z 1494 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83
20191012 13:29:18.455550Z 1494 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:18.455578Z 1494 TRACE printActiveChannels {21: IN } - EventLoop.cc:257
20191012 13:29:18.455582Z 1494 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 5 bytes from connection [TestServer:0.0.0.0:8888#2]
20191012 13:29:18.455603Z 1494 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191012 13:29:21.601139Z 1493 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:21.601201Z 1493 TRACE printActiveChannels {20: IN } - EventLoop.cc:257
20191012 13:29:21.601215Z 1493 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 5 bytes from connection [TestServer:0.0.0.0:8888#1]
20191012 13:29:21.601272Z 1493 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191012 13:29:23.948582Z 1494 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:23.948648Z 1494 TRACE printActiveChannels {21: IN } - EventLoop.cc:257
20191012 13:29:23.948662Z 1494 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191012 13:29:23.948692Z 1494 TRACE handleClose fd = 21 state = 2 - TcpConnection.cc:144
20191012 13:29:23.948706Z 1494 TRACE updateChannel fd = 21 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#2] is down
20191012 13:29:23.948737Z 1494 TRACE handleClose [7] usecount=3 - TcpConnection.cc:152
20191012 13:29:23.948768Z 1494 TRACE handleClose [11] usecount=4 - TcpConnection.cc:155
20191012 13:29:23.948777Z 1494 TRACE handleEvent [12] usecount=3 - Channel.cc:69
20191012 13:29:23.948801Z 1492 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:23.948822Z 1492 TRACE printActiveChannels {5: IN } - EventLoop.cc:257
20191012 13:29:23.948837Z 1492 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#2 - TcpServer.cc:153
20191012 13:29:23.948845Z 1492 TRACE removeConnectionInLoop [8] usecount=2 - TcpServer.cc:157
20191012 13:29:23.948867Z 1492 TRACE removeConnectionInLoop [9] usecount=1 - TcpServer.cc:159
20191012 13:29:23.948890Z 1492 TRACE removeConnectionInLoop [10] usecount=2 - TcpServer.cc:170
20191012 13:29:23.948908Z 1494 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:23.948922Z 1494 TRACE printActiveChannels {13: IN } - EventLoop.cc:257
20191012 13:29:23.948935Z 1494 TRACE removeChannel fd = 21 - EPollPoller.cc:147
20191012 13:29:23.948948Z 1494 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#2] at 0x18BB310 fd=21 - TcpConnection.cc:69
20191012 13:29:24.584651Z 1493 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:24.584702Z 1493 TRACE printActiveChannels {20: IN } - EventLoop.cc:257
20191012 13:29:24.584712Z 1493 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191012 13:29:24.584735Z 1493 TRACE handleClose fd = 20 state = 2 - TcpConnection.cc:144
20191012 13:29:24.584749Z 1493 TRACE updateChannel fd = 20 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20191012 13:29:24.584773Z 1493 TRACE handleClose [7] usecount=3 - TcpConnection.cc:152
20191012 13:29:24.584795Z 1493 TRACE handleClose [11] usecount=4 - TcpConnection.cc:155
20191012 13:29:24.584801Z 1493 TRACE handleEvent [12] usecount=3 - Channel.cc:69
20191012 13:29:24.584803Z 1492 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:24.584840Z 1492 TRACE printActiveChannels {5: IN } - EventLoop.cc:257
20191012 13:29:24.584860Z 1492 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20191012 13:29:24.584866Z 1492 TRACE removeConnectionInLoop [8] usecount=2 - TcpServer.cc:157
20191012 13:29:24.584877Z 1492 TRACE removeConnectionInLoop [9] usecount=1 - TcpServer.cc:159
20191012 13:29:24.584897Z 1492 TRACE removeConnectionInLoop [10] usecount=2 - TcpServer.cc:170
20191012 13:29:24.584906Z 1493 TRACE poll 1 events happended - EPollPoller.cc:65
20191012 13:29:24.584944Z 1493 TRACE printActiveChannels {10: IN } - EventLoop.cc:257
20191012 13:29:24.584959Z 1493 TRACE removeChannel fd = 20 - EPollPoller.cc:147
20191012 13:29:24.584971Z 1493 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0x18BB020 fd=20 - TcpConnection.cc:69
結果分析:創(chuàng)建4個線程,總共有五個IO線程,一個是主線程,4個線程池中創(chuàng)建的線程subReactor,其中server.start()會啟動創(chuàng)建線程池中的4個線程,該函數里面會依次調用TcpServer::start()->EventloopThreadPool::start()->EventLoopThread::startLoop()函數,并且啟動mainReactor監(jiān)聽:
void Tcpserver::start()
{
loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
文件描述符分析:
一個進程本來被打開的文件描述符就有0,1卧抗,2;
且每個Reactor 的 EventLoop 對象構造時藤滥,默認使用的是EPollPoller,即EPollPoller::epollfd_ ;
此外還有兩個channel(EventLoop::timeQueue_ ::timerfd_ 和 EventLoop::wakeupFd_ )
處于被poll()關注可讀事件的狀態(tài)社裆,而且是一直關注直到事件循環(huán)結束,可見每個Reactor 都分別有這3個fd;
對于mainReactor來說拙绊,還有Acceptor::acceptSocket_.sockfd_ (listenfd); Acceptor::idleFd_ ; (/dev/null 空閑fd);所以,在上面的運行結果中,mainReactor中:epollfd_ = 3; timerfd_ = 4; wakeupFd_ = 5; sockfd_ = 6; idleFd_ = 7;剩下的外下排每三個分別歸4個IO線程所有.
TRACE updateChannel fd = 20 events = 3 - EPollPoller.cc:104
所以這樣新連接的套接字fd只能從20開始算起.
過程:
當使用nc命令進行連接時,sockfd_可讀,mainReactor使用acceptor_進行接受連接,并在Acceptor::handleRead函數中回調TcpServer::newConnection函數,在函數中會新建一個Eventloop*ioloop對象,并采用輪詢的方式賦值到threadpool_->getNextLoop()的返回值上.并新建一個TcpConnection對象conn,綁定到輪詢產生的ioloop上,并設置conn上的setConnectionCallback與setMessageCallback回調.函數內調用ioLoop->runInLoop(); 會喚醒第一個IO線程,即第一個IO線程的wakeupFd_ (10)可讀泳秀,handleEvent() 處理后繼續(xù)處理doPendingFunctors()标沪,執(zhí)行TcpConnection::connectEstablished()接下來就是調用channel::handleEvent()等等.