WebRTC源碼分析-線程基礎(chǔ)之MessageQueueManager

前言

正如其名,MessageQueueManager類(后續(xù)簡寫為MQM)提供了MessageQueue(簡寫為MQ)的管理功能姿锭。在之前的文章中已經(jīng)分析過塔鳍,MQ在構(gòu)建時(shí)會(huì)調(diào)用MQ.DoInit()方法,該方法將MQ添加到MQM的內(nèi)部std::Vector<MessageQueue*>成員中呻此。
MQM類的聲明和定義分別在rtc_base/message_queue.h以及rtc_base/message_queue.cc中轮纫,其定義如下所示

// MessageQueueManager does cleanup of of message queues
class MessageQueueManager {
 public:
  static void Add(MessageQueue* message_queue);
  static void Remove(MessageQueue* message_queue);
  static void Clear(MessageHandler* handler);

  // TODO(nisse): Delete alias, as soon as downstream code is updated.
  static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }

  // For testing purposes, for use with a simulated clock.
  // Ensures that all message queues have processed delayed messages
  // up until the current point in time.
  static void ProcessAllMessageQueuesForTesting();

 private:
  static MessageQueueManager* Instance();
  MessageQueueManager();
  ~MessageQueueManager();

  void AddInternal(MessageQueue* message_queue);
  void RemoveInternal(MessageQueue* message_queue);
  void ClearInternal(MessageHandler* handler);
  void ProcessAllMessageQueuesInternal();

  // This list contains all live MessageQueues.
  std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);

  // Methods that don't modify the list of message queues may be called in a
  // re-entrant fashion. "processing_" keeps track of the depth of re-entrant
  // calls.
  CriticalSection crit_;
  size_t processing_ RTC_GUARDED_BY(crit_);
};

MessageQueueManager的構(gòu)造

MessageQueueManager的構(gòu)造方式與ThreadManager一樣,都是單例模式焚鲜,都是非安全的掌唾。之前分析過ThreadManager為什么能夠安全的構(gòu)造,MessageQueueManager原理一樣忿磅,并且MessageQueueManager對象的創(chuàng)建先于第一個(gè)MessageQueue對象糯彬。

MessageQueueManager* MessageQueueManager::Instance() {
  static MessageQueueManager* const instance = new MessageQueueManager;
  return instance;
}
MessageQueueManager::MessageQueueManager() : processing_(0) {}
MessageQueueManager::~MessageQueueManager() {}

MessageQueue的添加與移除

MessageQueueManager提供了Add與Remove的靜態(tài)函數(shù)來往單例的管理類中添加和刪除MQ,具體如下源碼所示:

void MessageQueueManager::Add(MessageQueue* message_queue) {
  return Instance()->AddInternal(message_queue);
}
void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
  CritScope cs(&crit_);
  // Prevent changes while the list of message queues is processed.
  RTC_DCHECK_EQ(processing_, 0);
  message_queues_.push_back(message_queue);
}

void MessageQueueManager::Remove(MessageQueue* message_queue) {
  return Instance()->RemoveInternal(message_queue);
}
void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
  {
    CritScope cs(&crit_);
    // Prevent changes while the list of message queues is processed.
    RTC_DCHECK_EQ(processing_, 0);
    std::vector<MessageQueue*>::iterator iter;
    iter = std::find(message_queues_.begin(), message_queues_.end(),
                     message_queue);
    if (iter != message_queues_.end()) {
      message_queues_.erase(iter);
    }
  }
}

添加和刪除方法對外都是以靜態(tài)方法提供葱她,通過調(diào)用MQM的單實(shí)例的對應(yīng)的私有方法來實(shí)現(xiàn)往向量Vector中添加和刪除MQ撩扒,需要說明的注意點(diǎn)有以下幾個(gè):
1)成員crit_是臨界區(qū)類CriticalSection的對象,該成員保證多線程環(huán)境下MQM.message_queues_以及MQM.processing_訪問安全吨些,正如上面兩個(gè)函數(shù)所示搓谆,函數(shù)開頭創(chuàng)建CritScope cs(&crit_); 在cs的構(gòu)造函數(shù)中調(diào)用crit_->Enter()表示進(jìn)入臨界區(qū)炒辉,相當(dāng)于上鎖。利用函數(shù)結(jié)束后cs對象的析構(gòu)中調(diào)用crit_->Leave()表示離開臨界區(qū)挽拔,相當(dāng)于解鎖辆脸。
2)存儲(chǔ)MQ的向量聲明為:std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
其中RTC_GUARDED_BY宏在clang編譯器下展開為attribute(guarded_by(crit_))但校,指示編譯器在編譯過程中檢查代碼中所有訪問message_queues_的各個(gè)路徑上是否都先獲取了鎖crit_螃诅,如果沒有就會(huì)在編譯過程中產(chǎn)生錯(cuò)誤或者警告。而對于其他編譯器状囱,該宏不起任何作用术裸,意味著不會(huì)在編譯期進(jìn)行檢查。詳見 Thread Safety Analysis亭枷。
3)成員processing_ 聲明為: size_t processing_ RTC_GUARDED_BY(crit_); Add與Remove函數(shù)中執(zhí)行了RTC_DCHECK_EQ(processing_, 0)斷言袭艺,必須確保processing_ 為0。當(dāng)processing_不為0時(shí)叨粘,要么在執(zhí)行MQM的Clear()方法猾编,要么在執(zhí)行ProcessAllMessageQueues(),這些操作此時(shí)升敲,是不允許往MQM添加MQ或者刪除MQ這種會(huì)改變Vector列表的操作答倡,因?yàn)榍懊娴膬蓚€(gè)函數(shù)一般都會(huì)要遍歷Vector。思考一點(diǎn)驴党,不是已經(jīng)上鎖保證線程安全了嘛瘪撇,為啥還要保證processing_為0呢?繼續(xù)往下看吧~~

清理

與Add和Remove方式一摸一樣港庄,Clear函數(shù)也是以靜態(tài)方法的形式對外提供倔既。Clear函數(shù)的作用是從MQM所管理的所有MQ中刪除與入?yún)essageHandler* handler匹配消息。具體而言就是遍歷MQM中的MQ鹏氧,然后調(diào)用MQ本身的Clear()方法渤涌,這個(gè)方法比較冗長,此處就不展開敘述把还,將會(huì)在介紹MQ的文章中詳細(xì)描述实蓬。

void MessageQueueManager::Clear(MessageHandler* handler) {
  return Instance()->ClearInternal(handler);
}
void MessageQueueManager::ClearInternal(MessageHandler* handler) {
  // Deleted objects may cause re-entrant calls to ClearInternal. This is
  // allowed as the list of message queues does not change while queues are
  // cleared.
  MarkProcessingCritScope cs(&crit_, &processing_);
  for (MessageQueue* queue : message_queues_) {
    queue->Clear(handler);
  }
}

另外很重要的一點(diǎn),該方法并沒有使用前文所述的CritScope cs(&crit_)來實(shí)現(xiàn)線程安全笨篷,而是使用了一個(gè)新的類MarkProcessingCritScope cs(&crit_, &processing_)瞳秽,有什么神奇的地方?且看源碼:

class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
 public:
  MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
      RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
      : cs_(cs), processing_(processing) {
    cs_->Enter();
    *processing_ += 1;
  }

  ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
    *processing_ -= 1;
    cs_->Leave();
  }

 private:
  const CriticalSection* const cs_;
  size_t* processing_;

  RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
};

首先要知道CriticalSection是可重入的率翅,也即一個(gè)線程上調(diào)用cs_->Enter()上鎖之后练俐,在釋放鎖之前,同一個(gè)線程可以反復(fù)調(diào)用cs_->Enter()而不會(huì)阻塞冕臭,因此被稱為“可重入鎖”腺晾。同一個(gè)線程上鎖一次燕锥,processing_就增1,記錄上鎖次數(shù)悯蝉,只要processing_不為0归形,表示我正在Clear操作或者后文的Process*方法,這兩個(gè)方法不會(huì)改變MQM中Vector列表鼻由,因此暇榴,可以在解鎖之前,重入進(jìn)行反復(fù)操作蕉世,但是不允許Add和Remove操作蔼紧,因?yàn)槠鋾?huì)改變Vector,這就是為什么Add和Remove函數(shù)中既加鎖了狠轻,還要斷言processing_必須為0奸例,否則代碼就是寫得有Bug了。

處理所有MQ中的消息

這個(gè)方法目前還沒有完全的理解向楼,首先說下我自己的分析查吊。

  • 方法如其名,目標(biāo)在于使得MQM中管理的MQ中的消息得到處理湖蜕。
  • 當(dāng)某個(gè)線程調(diào)用該方法時(shí)逻卖,會(huì)遍歷所有的MQ,然后向MQ中投遞一個(gè)消息ID為MQID_DISPOSE的延遲消息重荠,其消息數(shù)據(jù)為ScopedIncrement對象箭阶,ScopedIncrement的構(gòu)造中將queues_not_done原子性自增1,表示該消息隊(duì)列中有消息沒有被處理戈鲁,而該延遲消息時(shí)間為0仇参,那么該延遲消息將進(jìn)入MQ的延遲消息隊(duì)列的隊(duì)首(因?yàn)镸Q的延遲消息隊(duì)列是以延遲時(shí)間排序的優(yōu)先級隊(duì)列)。記住婆殿,所有的MQ中都會(huì)投遞一個(gè)這樣的消息诈乒。
  • 方法后續(xù)就是獲取調(diào)用該方法的線程所關(guān)聯(lián)的Thread對象,并通過Thread對象的ProcessMessages方法不斷的從當(dāng)前線程的MQ中取出消息進(jìn)行處理婆芦,直到queues_not_done為0怕磨,此時(shí),之前投遞到消息循環(huán)中的MQID_DISPOSE類別的消息得到處理消约,因?yàn)樵撓⒃谙⒀h(huán)中被取出后肠鲫,消息數(shù)據(jù)ScopedIncrement對象會(huì)被直接delete伏社,從而ScopedIncrement析構(gòu)完成queues_not_done原子性自減1钩乍。此時(shí),表征著該消息循環(huán)中的所有即時(shí)消息都得到了處理谬盐。
  • 最大的疑惑就是,所有的MQ中都放入了一個(gè)MQID_DISPOSE類別的延遲消息渣锦,調(diào)用該方法的線程會(huì)如上所述的方式阻塞地將所有即時(shí)消息處理掉硝岗,而沒有主動(dòng)調(diào)用該方法的線程是如何保證該方法的目標(biāo)得以實(shí)現(xiàn)的,即袋毙,使得所有MQ的即時(shí)消息都能同步的立馬處理型檀?畢竟,對于其他的MQ此處也僅僅是投遞了一個(gè)MQID_DISPOSE類別的延遲消息而已听盖。
static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }

void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
  return Instance()->ProcessAllMessageQueuesInternal();
}

void MessageQueueManager::ProcessAllMessageQueuesInternal() {
  // This works by posting a delayed message at the current time and waiting
  // for it to be dispatched on all queues, which will ensure that all messages
  // that came before it were also dispatched.
  volatile int queues_not_done = 0;

  // This class is used so that whether the posted message is processed, or the
  // message queue is simply cleared, queues_not_done gets decremented.
  class ScopedIncrement : public MessageData {
   public:
    ScopedIncrement(volatile int* value) : value_(value) {
      AtomicOps::Increment(value_);
    }
    ~ScopedIncrement() override { AtomicOps::Decrement(value_); }

   private:
    volatile int* value_;
  };

  {
    MarkProcessingCritScope cs(&crit_, &processing_);
    for (MessageQueue* queue : message_queues_) {
      if (!queue->IsProcessingMessagesForTesting()) {
        // If the queue is not processing messages, it can
        // be ignored. If we tried to post a message to it, it would be dropped
        // or ignored.
        continue;
      }
      queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
                         new ScopedIncrement(&queues_not_done));
    }
  }
  rtc::Thread* current = rtc::Thread::Current();
  // Note: One of the message queues may have been on this thread, which is
  // why we can't synchronously wait for queues_not_done to go to 0; we need
  // to process messages as well.
  while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
    if (current) {
      current->ProcessMessages(0);
    }
  }
}

總結(jié)

  • MessageQueueManager是管理MessageQueue的類胀溺,與ThreadManager管理Thread的方式是不同的。MQM內(nèi)部的向量成員std::Vector<MessageQueue*> message_queues_用于存儲(chǔ)所有的MQ媳溺,時(shí)機(jī)是在MQ的構(gòu)造函數(shù)中調(diào)用MQM的Add靜態(tài)方法將自身指針放入message_queues_中月幌。
  • MQM對外提供了所有方法都是靜態(tài)方法碍讯,這些靜態(tài)方法均是調(diào)用MQM單實(shí)例的私有的同名函數(shù)來實(shí)現(xiàn)添加悬蔽,刪除MQ,清除所有MQ中含有某個(gè)MessageHandler的所有消息捉兴,處理所有MQ中的消息蝎困。
  • 為了線程安全,MQM中的所有操作都由CriticalSection crit_來提供上鎖的操作倍啥,該對象對外暴露的是Windows上臨界區(qū)概念的API禾乘,其提供了Enter(),Leave()等進(jìn)出臨界區(qū)的方法虽缕。CriticalSection跨平臺(tái)是如何實(shí)現(xiàn)的始藕?請看WebRTC源碼分析-線程安全之CriticalSection。 實(shí)踐上氮趋,在需要上鎖的地方創(chuàng)建一個(gè)CritScope cs(&crit_)局部對象伍派,這個(gè)cs局部對象的構(gòu)造函數(shù)中調(diào)用crit_->Enter()進(jìn)行上鎖,利用局部對象在{}之后的析構(gòu)中調(diào)用crit_->Leave()來解鎖剩胁。
  • 為了給MQM的Clear和Process* 方法提供可重入訪問方法诉植,又需要與Add和Remove方法互斥,因此昵观,在Clear和Procees* 方法中使用MarkProcessingCritScope來提供上鎖計(jì)數(shù)processing_晾腔,Add和Remove中斷言processing_來確保操作安全。
  • 關(guān)于Procees* 如何使得所有線程的消息得到處理的疑惑啊犬,可能是局限于MQM類的分析無法全局看到實(shí)現(xiàn)機(jī)制灼擂,當(dāng)前只知道調(diào)用該方法的線程是如何達(dá)到處理所有即時(shí)消息的。而相關(guān)的方法本文沒有展開來說觉至,會(huì)在介紹Thread和MQ的文章中詳細(xì)闡述剔应。如果誰理解了該函數(shù)如何使得其他線程處理所有即時(shí)消息的機(jī)制,希望留言。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末领斥,一起剝皮案震驚了整個(gè)濱河市嫉到,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌月洛,老刑警劉巖何恶,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嚼黔,居然都是意外死亡细层,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門唬涧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疫赎,“玉大人,你說我怎么就攤上這事碎节∨醺悖” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵狮荔,是天一觀的道長胎撇。 經(jīng)常有香客問我,道長殖氏,這世上最難降的妖魔是什么晚树? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮雅采,結(jié)果婚禮上爵憎,老公的妹妹穿的比我還像新娘。我一直安慰自己婚瓜,他們只是感情好宝鼓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著闰渔,像睡著了一般席函。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上冈涧,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天茂附,我揣著相機(jī)與錄音,去河邊找鬼督弓。 笑死营曼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的愚隧。 我是一名探鬼主播蒂阱,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了录煤?” 一聲冷哼從身側(cè)響起鳄厌,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎妈踊,沒想到半個(gè)月后了嚎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡廊营,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年歪泳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片露筒。...
    茶點(diǎn)故事閱讀 39,696評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡呐伞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出慎式,到底是詐尸還是另有隱情伶氢,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布瞬捕,位于F島的核電站鞍历,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏肪虎。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一惧蛹、第九天 我趴在偏房一處隱蔽的房頂上張望扇救。 院中可真熱鬧,春花似錦香嗓、人聲如沸迅腔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽沧烈。三九已至,卻和暖如春像云,著一層夾襖步出監(jiān)牢的瞬間锌雀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工迅诬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留腋逆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓侈贷,卻偏偏與公主長得像惩歉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評論 2 353