muduo源碼 ---ThreadPool介紹

muduo源碼分析系列 線程池的實現(xiàn)

分析線程池之前帮哈,先介紹線程
畢竟線程池里保存著每個線程
先分析Thread類

class Thread : noncopyable
{
 public:
  typedef std::function<void ()> ThreadFunc;

  explicit Thread(ThreadFunc, const string& name = string());
  // FIXME: make it movable in C++11
  ~Thread();

  void start();
  int join(); // return pthread_join()

  bool started() const { return started_; }
  // pthread_t pthreadId() const { return pthreadId_; }
  pid_t tid() const { return tid_; }
  const string& name() const { return name_; }

  static int numCreated() { return numCreated_.get(); }

 private:
  void setDefaultName();

  bool       started_;
  bool       joined_;
  pthread_t  pthreadId_;
  pid_t      tid_;
  ThreadFunc func_;
  string     name_;
  CountDownLatch latch_;

  static AtomicInt32 numCreated_;
};

仔細(xì)觀察其實就是把C11的thread相關(guān)的方法進(jìn)行了進(jìn)一步的封裝
但是有個地方 CountDownLatch是什么呢
舉個例子:在考試的時候 收卷老師必須要等到所有考生的卷子都收拾好了肌似,才能離開教室。這就是latch的含義碗旅,意思是某個線程 必須要等待其他線程完成 才能執(zhí)行。直接看CountDownLatch的源碼

class CountDownLatch :    
{
 public:

  explicit CountDownLatch(int count);

  void wait();

  void countDown();

  int getCount() const;

 private:
  mutable MutexLock mutex_;
  Condition condition_ GUARDED_BY(mutex_);
  int count_ GUARDED_BY(mutex_);
};

看成員: 一個mutex_,一個條件變量condtion,一個公共count_

看方法

CountDownLatch::CountDownLatch(int count)
  : mutex_(),
    condition_(mutex_),
    count_(count)
{
}

void CountDownLatch::wait()
{
  MutexLockGuard lock(mutex_); //上鎖
  while (count_ > 0) //等待count變成0
  {
    condition_.wait();
  }
}

void CountDownLatch::countDown()
{
  MutexLockGuard lock(mutex_); //上鎖
  --count_; //減少count值
  if (count_ == 0)
  {
    condition_.notifyAll(); //count值為0了 喚醒等待的condtion_變量
  }
}

int CountDownLatch::getCount() const
{
  MutexLockGuard lock(mutex_);
  return count_;
}

看了實現(xiàn)其實很簡單:本質(zhì)就是維護(hù)一個共享變量count_,這個count_理解成上面那個例子的學(xué)生感耙,每次學(xué)生離開教室 那么就調(diào)用一次countDown方法荠医,該方法將count-1吁脱,如果最后一個學(xué)生離開了那么count為0,則調(diào)用condtion_的notify方法彬向,喚醒在wait里阻塞的的線程兼贡。

知道了latch的功能,就可以開始看Thread相關(guān)的方法了娃胆。

構(gòu)造函數(shù)

Thread::Thread(ThreadFunc func, const string& n)
  : started_(false),
    joined_(false),
    pthreadId_(0),
    tid_(0),
    func_(std::move(func)),
    name_(n),
    latch_(1) //latch為1 說明只需要等待一個線程countDown即可
{
  setDefaultName();
}

來看最關(guān)鍵的start方法

void Thread::start()
{
  assert(!started_);
  started_ = true;
  // FIXME: move(func_)
  detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
  if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
  {
    started_ = false;
    delete data; // or no delete?
    LOG_SYSFATAL << "Failed in pthread_create";
  }
  else
  {
    latch_.wait();
    assert(tid_ > 0);
  }
}

這里構(gòu)造了一個 ThreadData類遍希,然后調(diào)用系統(tǒng)api,創(chuàng)建出一個新的線程缕棵,且這個線程執(zhí)行的函數(shù)是ThreadData里的detail::startThread()方法(執(zhí)行用戶的func)孵班。
如果創(chuàng)建失敗,就delete掉招驴,成功就等待latch里的計數(shù)器變?yōu)?(如果latch是大于0的話)篙程,否則就一直阻塞。

來看看這個類的聲明

ThreadData(ThreadFunc func,
             const string& name,
             pid_t* tid,
             CountDownLatch* latch)
    : func_(std::move(func)),
      name_(name),
      tid_(tid),
      latch_(latch)
  { }

  void runInThread()
  {
    *tid_ = muduo::CurrentThread::tid();
    tid_ = NULL;
    latch_->countDown(); //將latch_計數(shù)器-1 并且如果等于0的時候 喚醒 latch_.wait線程
    latch_ = NULL;

    muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();
    ::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);
    try
    {
      func_(); //執(zhí)行func
      muduo::CurrentThread::t_threadName = "finished";
    }
    catch (const Exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
      abort();
    }
    catch (const std::exception& ex)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());
      fprintf(stderr, "reason: %s\n", ex.what());
      abort();
    }
    catch (...)
    {
      muduo::CurrentThread::t_threadName = "crashed";
      fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());
      throw; // rethrow
    }
  }

上面一堆函數(shù)别厘,本質(zhì)上最重要的還是最終執(zhí)行了用戶傳遞的func

void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  return NULL;
}

threadDetail類里的一個方法虱饿,通過萬能指針void*進(jìn)行強(qiáng)制轉(zhuǎn)化成目標(biāo)ThreadData類,然后執(zhí)行runInThread,執(zhí)行完delete掉這個data氮发。

看完這些設(shè)計其實可以明白渴肉,為什么要這么設(shè)計呢?個人認(rèn)為還是考慮到線程子資源復(fù)用的問題爽冕。把線程里執(zhí)行的函數(shù)封裝成data類仇祭,這樣每次執(zhí)行完畢只需要將data刪除掉,而不需要去重復(fù)分配和刪除掉線程颈畸。


image.png

并且 也能說明了乌奇。為什么runInThread這里tid需要為空了,因為執(zhí)行完這個data之后就不需要這個data對象的數(shù)據(jù)了眯娱,latch_置為null也是一樣的礁苗。

感嘆陳碩大佬的代碼功底

講完Thread可以開始講ThreadPool了

先上代碼

class ThreadPool : noncopyable
{
 public:
  typedef std::function<void ()> Task;

  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  void setThreadInitCallback(const Task& cb)
  { threadInitCallback_ = cb; }

  void start(int numThreads);
  void stop();

  const string& name() const
  { return name_; }

  size_t queueSize() const;

  void run(Task f);

 private:
  bool isFull() const REQUIRES(mutex_);
  void runInThread();
  Task take();

  mutable MutexLock mutex_;
  Condition notEmpty_ GUARDED_BY(mutex_);
  Condition notFull_ GUARDED_BY(mutex_);
  string name_;
  Task threadInitCallback_;
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);
  size_t maxQueueSize_;
  bool running_;
};

以上是類的聲明

接下來是對每個成員變量的解釋:
mutable MutexLock mutex_
這是作者把mutex互斥鎖進(jìn)行了一個封裝

class CAPABILITY("mutex") MutexLock : noncopyable
{
 public:
  MutexLock()
    : holder_(0)
  {
    MCHECK(pthread_mutex_init(&mutex_, NULL)); //初始化調(diào)用系統(tǒng)函數(shù)init
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    MCHECK(pthread_mutex_destroy(&mutex_)); //銷毀調(diào)用系統(tǒng)函數(shù) destroy
  }

  // must be called when locked, i.e. for assertion
  bool isLockedByThisThread() const
  {
    return holder_ == CurrentThread::tid(); //判斷這個鎖是否是當(dāng)前線程鎖住的
  }

  void assertLocked() const ASSERT_CAPABILITY(this)
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock() ACQUIRE()
  {
    MCHECK(pthread_mutex_lock(&mutex_)); //加鎖
    assignHolder();
  }

  void unlock() RELEASE()
  {
    unassignHolder();
    MCHECK(pthread_mutex_unlock(&mutex_)); //解鎖
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;  /
  }

 private:
  friend class Condition;

  class UnassignGuard : noncopyable
  {
   public:
    explicit UnassignGuard(MutexLock& owner)
      : owner_(owner)
    {
      owner_.unassignHolder();
    }

    ~UnassignGuard()
    {
      owner_.assignHolder();
    }

   private:
    MutexLock& owner_;
  };

  void unassignHolder()
  {
    holder_ = 0;
  }

  void assignHolder()
  {
    holder_ = CurrentThread::tid();
  }

  pthread_mutex_t mutex_;
  pid_t holder_;
};

直接看這個Mutex類的成員對象:
pthread_mutex_t mutex_;
pid_t holder_;

也就是說 這個Mutex類對pthread_mutex_t 和pid進(jìn)行了封裝,每個mutex對象都有一個鎖和 掌握該鎖的線程pid徙缴。

再看接下來的ThreadPool成員
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
兩個條件變量
string name_;
名稱
Task threadInitCallback_;
Task是 typedef std::function<void ()> Task;
本質(zhì)是function封裝的函數(shù)

線程池初始化執(zhí)行的函數(shù)
std::vector<std::unique_ptr<muduo::Thread>> threads_;
線程vector
std::deque<Task> queue_ GUARDED_BY(mutex_);
任務(wù)隊列
size_t maxQueueSize_;
隊列最大任務(wù)數(shù)量
bool running_;
是否在執(zhí)行

開始分析這個線程池的各個函數(shù)

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

成員初始化

void ThreadPool::start(int numThreads)
{
  assert(threads_.empty());
  running_ = true; //設(shè)置為運行狀態(tài)
  threads_.reserve(numThreads); //為線程數(shù)量分配vector大小
  for (int i = 0; i < numThreads; ++i) 
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));  //new一個thread對象 加入到 vector中
    threads_[i]->start(); //執(zhí)行thread
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();  //僅當(dāng)傳參是0且初始化函數(shù)是非空 執(zhí)行初始化函數(shù)
  }
}

start 方法 具體thread類后續(xù)分析

void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_); // 當(dāng)前線程池加鎖 防止別的線程使用
  running_ = false;  //設(shè)置為結(jié)束
  notEmpty_.notifyAll(); //喚醒條件
  notFull_.notifyAll();//喚醒條件
  }
  for (auto& thr : threads_)
  {
    thr->join();  //將所有thread結(jié)束掉
  }
}

stop方法
上面的
notEmpty_.notifyAll(); //喚醒所有等待 當(dāng)前條件變量的線程
notFull_.notifyAll();//喚醒所有等待當(dāng)前條件變量的線程

實際上就是 調(diào)用系統(tǒng)api

  void notifyAll()
  {
    MCHECK(pthread_cond_broadcast(&pcond_));  //pcond就是condtion中的成員你變量
  }

通俗的說就是:在線程池中试伙,多個線程可能會同時等待同一個條件變量 ,此時在等待的時候 會有多個線程被掛起于样,所以調(diào)用notifyAll把所有阻塞的線程喚醒疏叨,這樣才能進(jìn)行后續(xù)的join操作。
但是百宇,在當(dāng)前線程池線程中考廉,實際上這兩個條件變量更多的表示一個當(dāng)前的狀態(tài)。
notEmpty_在wait的情況 :說明當(dāng)前的線程池并不處于非空的情況 ==》 當(dāng)前線程池是空的(queue是空的)
notEmpty在notify的情況:當(dāng)前線程池里的queue是非空携御,說明有task任務(wù)需要執(zhí)行

同理notFull也是一樣

如果還不理解可以仔細(xì)google一下條件變量的用法

void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task(); //如果當(dāng)前線程池子沒有線程昌粤,直接使用線程池所在的線程執(zhí)行任務(wù)
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull() && running_) //如果當(dāng)前的線程池子里所有線程都被占用了
    {
      notFull_.wait();  //說明當(dāng)前的線程池處于滿任務(wù)狀態(tài) 阻塞
    }
    if (!running_) return;
    assert(!isFull());

    queue_.push_back(std::move(task)); //任務(wù)隊列入隊
    notEmpty_.notify(); //喚醒所有使用notEmpty條件變量的線程
  }
}

run方法
實際上就是在運行start方法之后,暴露給用戶使用的接口啄刹。
start方法:設(shè)置當(dāng)前線程池的線程數(shù)量涮坐。run方法,用戶通過封裝task傳遞給run方法誓军,run方法里將task存入queue中袱讹,等待runInThread對task進(jìn)行Take()

ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify(); 
    }
  }
  return task;
}

take方法,實際上就是從queue隊列中取出任務(wù)昵时,并且返回任務(wù)

void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_(); //初始化函數(shù)
    }
    while (running_)
    {
      Task task(take());
      if (task)
      {
        task();
      }
    }
  }
  catch (const Exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception &ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

runInThread方法捷雕,執(zhí)行take方法,將task真正執(zhí)行

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末壹甥,一起剝皮案震驚了整個濱河市救巷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌句柠,老刑警劉巖浦译,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棒假,死亡現(xiàn)場離奇詭異,居然都是意外死亡精盅,警方通過查閱死者的電腦和手機(jī)帽哑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叹俏,“玉大人妻枕,你說我怎么就攤上這事∷希” “怎么了佳头?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長晴氨。 經(jīng)常有香客問我,道長碉输,這世上最難降的妖魔是什么籽前? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮敷钾,結(jié)果婚禮上枝哄,老公的妹妹穿的比我還像新娘。我一直安慰自己阻荒,他們只是感情好挠锥,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著侨赡,像睡著了一般蓖租。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上羊壹,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天蓖宦,我揣著相機(jī)與錄音,去河邊找鬼油猫。 笑死稠茂,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的情妖。 我是一名探鬼主播睬关,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼毡证!你這毒婦竟也來了电爹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤情竹,失蹤者是張志新(化名)和其女友劉穎藐不,沒想到半個月后匀哄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡雏蛮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年涎嚼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挑秉。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡法梯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出犀概,到底是詐尸還是另有隱情立哑,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布姻灶,位于F島的核電站铛绰,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏产喉。R本人自食惡果不足惜捂掰,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望曾沈。 院中可真熱鬧这嚣,春花似錦、人聲如沸塞俱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽障涯。三九已至罐旗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間像樊,已是汗流浹背尤莺。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留生棍,地道東北人颤霎。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像涂滴,于是被迫代替她去往敵國和親友酱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容