muduo源碼分析系列 線程池的實現(xiàn)
分析線程池之前帮哈,先介紹線程
畢竟線程池里保存著每個線程
先分析Thread類
class Thread : noncopyable
{
public:
typedef std::function<void ()> ThreadFunc;
explicit Thread(ThreadFunc, const string& name = string());
// FIXME: make it movable in C++11
~Thread();
void start();
int join(); // return pthread_join()
bool started() const { return started_; }
// pthread_t pthreadId() const { return pthreadId_; }
pid_t tid() const { return tid_; }
const string& name() const { return name_; }
static int numCreated() { return numCreated_.get(); }
private:
void setDefaultName();
bool started_;
bool joined_;
pthread_t pthreadId_;
pid_t tid_;
ThreadFunc func_;
string name_;
CountDownLatch latch_;
static AtomicInt32 numCreated_;
};
仔細(xì)觀察其實就是把C11的thread相關(guān)的方法進(jìn)行了進(jìn)一步的封裝
但是有個地方 CountDownLatch是什么呢
舉個例子:在考試的時候 收卷老師必須要等到所有考生的卷子都收拾好了肌似,才能離開教室。這就是latch的含義碗旅,意思是某個線程 必須要等待其他線程完成 才能執(zhí)行。直接看CountDownLatch的源碼
class CountDownLatch :
{
public:
explicit CountDownLatch(int count);
void wait();
void countDown();
int getCount() const;
private:
mutable MutexLock mutex_;
Condition condition_ GUARDED_BY(mutex_);
int count_ GUARDED_BY(mutex_);
};
看成員: 一個mutex_,一個條件變量condtion,一個公共count_
看方法
CountDownLatch::CountDownLatch(int count)
: mutex_(),
condition_(mutex_),
count_(count)
{
}
void CountDownLatch::wait()
{
MutexLockGuard lock(mutex_); //上鎖
while (count_ > 0) //等待count變成0
{
condition_.wait();
}
}
void CountDownLatch::countDown()
{
MutexLockGuard lock(mutex_); //上鎖
--count_; //減少count值
if (count_ == 0)
{
condition_.notifyAll(); //count值為0了 喚醒等待的condtion_變量
}
}
int CountDownLatch::getCount() const
{
MutexLockGuard lock(mutex_);
return count_;
}
看了實現(xiàn)其實很簡單:本質(zhì)就是維護(hù)一個共享變量count_,這個count_理解成上面那個例子的學(xué)生感耙,每次學(xué)生離開教室 那么就調(diào)用一次countDown方法荠医,該方法將count-1吁脱,如果最后一個學(xué)生離開了那么count為0,則調(diào)用condtion_的notify方法彬向,喚醒在wait里阻塞的的線程兼贡。
知道了latch的功能,就可以開始看Thread相關(guān)的方法了娃胆。
構(gòu)造函數(shù)
Thread::Thread(ThreadFunc func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(0),
func_(std::move(func)),
name_(n),
latch_(1) //latch為1 說明只需要等待一個線程countDown即可
{
setDefaultName();
}
來看最關(guān)鍵的start方法
void Thread::start()
{
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
{
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
else
{
latch_.wait();
assert(tid_ > 0);
}
}
這里構(gòu)造了一個 ThreadData類遍希,然后調(diào)用系統(tǒng)api,創(chuàng)建出一個新的線程缕棵,且這個線程執(zhí)行的函數(shù)是ThreadData里的detail::startThread()方法(執(zhí)行用戶的func)孵班。
如果創(chuàng)建失敗,就delete掉招驴,成功就等待latch里的計數(shù)器變?yōu)?(如果latch是大于0的話)篙程,否則就一直阻塞。
來看看這個類的聲明
ThreadData(ThreadFunc func,
const string& name,
pid_t* tid,
CountDownLatch* latch)
: func_(std::move(func)),
name_(name),
tid_(tid),
latch_(latch)
{ }
void runInThread()
{
*tid_ = muduo::CurrentThread::tid();
tid_ = NULL;
latch_->countDown(); //將latch_計數(shù)器-1 并且如果等于0的時候 喚醒 latch_.wait線程
latch_ = NULL;
muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
try
{
func_(); //執(zhí)行func
muduo::CurrentThread::t_threadName = "finished";
}
catch (const Exception& ex)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
muduo::CurrentThread::t_threadName = "crashed";
fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
throw; // rethrow
}
}
上面一堆函數(shù)别厘,本質(zhì)上最重要的還是最終執(zhí)行了用戶傳遞的func
void* startThread(void* obj)
{
ThreadData* data = static_cast<ThreadData*>(obj);
data->runInThread();
delete data;
return NULL;
}
threadDetail類里的一個方法虱饿,通過萬能指針void*進(jìn)行強(qiáng)制轉(zhuǎn)化成目標(biāo)ThreadData類,然后執(zhí)行runInThread,執(zhí)行完delete掉這個data氮发。
看完這些設(shè)計其實可以明白渴肉,為什么要這么設(shè)計呢?個人認(rèn)為還是考慮到線程子資源復(fù)用的問題爽冕。把線程里執(zhí)行的函數(shù)封裝成data類仇祭,這樣每次執(zhí)行完畢只需要將data刪除掉,而不需要去重復(fù)分配和刪除掉線程颈畸。
并且 也能說明了乌奇。為什么runInThread這里tid需要為空了,因為執(zhí)行完這個data之后就不需要這個data對象的數(shù)據(jù)了眯娱,latch_置為null也是一樣的礁苗。
感嘆陳碩大佬的代碼功底
講完Thread可以開始講ThreadPool了
先上代碼
class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;
explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool();
// Must be called before start().
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }
void start(int numThreads);
void stop();
const string& name() const
{ return name_; }
size_t queueSize() const;
void run(Task f);
private:
bool isFull() const REQUIRES(mutex_);
void runInThread();
Task take();
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
string name_;
Task threadInitCallback_;
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_);
size_t maxQueueSize_;
bool running_;
};
以上是類的聲明
接下來是對每個成員變量的解釋:
mutable MutexLock mutex_
這是作者把mutex互斥鎖進(jìn)行了一個封裝
class CAPABILITY("mutex") MutexLock : noncopyable
{
public:
MutexLock()
: holder_(0)
{
MCHECK(pthread_mutex_init(&mutex_, NULL)); //初始化調(diào)用系統(tǒng)函數(shù)init
}
~MutexLock()
{
assert(holder_ == 0);
MCHECK(pthread_mutex_destroy(&mutex_)); //銷毀調(diào)用系統(tǒng)函數(shù) destroy
}
// must be called when locked, i.e. for assertion
bool isLockedByThisThread() const
{
return holder_ == CurrentThread::tid(); //判斷這個鎖是否是當(dāng)前線程鎖住的
}
void assertLocked() const ASSERT_CAPABILITY(this)
{
assert(isLockedByThisThread());
}
// internal usage
void lock() ACQUIRE()
{
MCHECK(pthread_mutex_lock(&mutex_)); //加鎖
assignHolder();
}
void unlock() RELEASE()
{
unassignHolder();
MCHECK(pthread_mutex_unlock(&mutex_)); //解鎖
}
pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_; /
}
private:
friend class Condition;
class UnassignGuard : noncopyable
{
public:
explicit UnassignGuard(MutexLock& owner)
: owner_(owner)
{
owner_.unassignHolder();
}
~UnassignGuard()
{
owner_.assignHolder();
}
private:
MutexLock& owner_;
};
void unassignHolder()
{
holder_ = 0;
}
void assignHolder()
{
holder_ = CurrentThread::tid();
}
pthread_mutex_t mutex_;
pid_t holder_;
};
直接看這個Mutex類的成員對象:
pthread_mutex_t mutex_;
pid_t holder_;
也就是說 這個Mutex類對pthread_mutex_t 和pid進(jìn)行了封裝,每個mutex對象都有一個鎖和 掌握該鎖的線程pid徙缴。
再看接下來的ThreadPool成員
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
兩個條件變量
string name_;
名稱
Task threadInitCallback_;
Task是 typedef std::function<void ()> Task;
本質(zhì)是function封裝的函數(shù)
線程池初始化執(zhí)行的函數(shù)
std::vector<std::unique_ptr<muduo::Thread>> threads_;
線程vector
std::deque<Task> queue_ GUARDED_BY(mutex_);
任務(wù)隊列
size_t maxQueueSize_;
隊列最大任務(wù)數(shù)量
bool running_;
是否在執(zhí)行
開始分析這個線程池的各個函數(shù)
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}
成員初始化
void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true; //設(shè)置為運行狀態(tài)
threads_.reserve(numThreads); //為線程數(shù)量分配vector大小
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id)); //new一個thread對象 加入到 vector中
threads_[i]->start(); //執(zhí)行thread
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_(); //僅當(dāng)傳參是0且初始化函數(shù)是非空 執(zhí)行初始化函數(shù)
}
}
start 方法 具體thread類后續(xù)分析
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_); // 當(dāng)前線程池加鎖 防止別的線程使用
running_ = false; //設(shè)置為結(jié)束
notEmpty_.notifyAll(); //喚醒條件
notFull_.notifyAll();//喚醒條件
}
for (auto& thr : threads_)
{
thr->join(); //將所有thread結(jié)束掉
}
}
stop方法
上面的
notEmpty_.notifyAll(); //喚醒所有等待 當(dāng)前條件變量的線程
notFull_.notifyAll();//喚醒所有等待當(dāng)前條件變量的線程
實際上就是 調(diào)用系統(tǒng)api
void notifyAll()
{
MCHECK(pthread_cond_broadcast(&pcond_)); //pcond就是condtion中的成員你變量
}
通俗的說就是:在線程池中试伙,多個線程可能會同時等待同一個條件變量 ,此時在等待的時候 會有多個線程被掛起于样,所以調(diào)用notifyAll把所有阻塞的線程喚醒疏叨,這樣才能進(jìn)行后續(xù)的join操作。
但是百宇,在當(dāng)前線程池線程中考廉,實際上這兩個條件變量更多的表示一個當(dāng)前的狀態(tài)。
notEmpty_在wait的情況 :說明當(dāng)前的線程池并不處于非空的情況 ==》 當(dāng)前線程池是空的(queue是空的)
notEmpty在notify的情況:當(dāng)前線程池里的queue是非空携御,說明有task任務(wù)需要執(zhí)行
同理notFull也是一樣
如果還不理解可以仔細(xì)google一下條件變量的用法
void ThreadPool::run(Task task)
{
if (threads_.empty())
{
task(); //如果當(dāng)前線程池子沒有線程昌粤,直接使用線程池所在的線程執(zhí)行任務(wù)
}
else
{
MutexLockGuard lock(mutex_);
while (isFull() && running_) //如果當(dāng)前的線程池子里所有線程都被占用了
{
notFull_.wait(); //說明當(dāng)前的線程池處于滿任務(wù)狀態(tài) 阻塞
}
if (!running_) return;
assert(!isFull());
queue_.push_back(std::move(task)); //任務(wù)隊列入隊
notEmpty_.notify(); //喚醒所有使用notEmpty條件變量的線程
}
}
run方法
實際上就是在運行start方法之后,暴露給用戶使用的接口啄刹。
start方法:設(shè)置當(dāng)前線程池的線程數(shù)量涮坐。run方法,用戶通過封裝task傳遞給run方法誓军,run方法里將task存入queue中袱讹,等待runInThread對task進(jìn)行Take()
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
notEmpty_.wait();
}
Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}
take方法,實際上就是從queue隊列中取出任務(wù)昵时,并且返回任務(wù)
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_(); //初始化函數(shù)
}
while (running_)
{
Task task(take());
if (task)
{
task();
}
}
}
catch (const Exception &ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception &ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
runInThread方法捷雕,執(zhí)行take方法,將task真正執(zhí)行