前言
正如其名,MessageQueueManager類(后續(xù)簡寫為MQM)提供了MessageQueue(簡寫為MQ)的管理功能姿锭。在之前的文章中已經(jīng)分析過塔鳍,MQ在構(gòu)建時(shí)會(huì)調(diào)用MQ.DoInit()方法,該方法將MQ添加到MQM的內(nèi)部std::Vector<MessageQueue*>成員中呻此。
MQM類的聲明和定義分別在rtc_base/message_queue.h以及rtc_base/message_queue.cc中轮纫,其定義如下所示
// MessageQueueManager does cleanup of of message queues
class MessageQueueManager {
public:
static void Add(MessageQueue* message_queue);
static void Remove(MessageQueue* message_queue);
static void Clear(MessageHandler* handler);
// TODO(nisse): Delete alias, as soon as downstream code is updated.
static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
// For testing purposes, for use with a simulated clock.
// Ensures that all message queues have processed delayed messages
// up until the current point in time.
static void ProcessAllMessageQueuesForTesting();
private:
static MessageQueueManager* Instance();
MessageQueueManager();
~MessageQueueManager();
void AddInternal(MessageQueue* message_queue);
void RemoveInternal(MessageQueue* message_queue);
void ClearInternal(MessageHandler* handler);
void ProcessAllMessageQueuesInternal();
// This list contains all live MessageQueues.
std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
// Methods that don't modify the list of message queues may be called in a
// re-entrant fashion. "processing_" keeps track of the depth of re-entrant
// calls.
CriticalSection crit_;
size_t processing_ RTC_GUARDED_BY(crit_);
};
MessageQueueManager的構(gòu)造
MessageQueueManager的構(gòu)造方式與ThreadManager一樣,都是單例模式焚鲜,都是非安全的掌唾。之前分析過ThreadManager為什么能夠安全的構(gòu)造,MessageQueueManager原理一樣忿磅,并且MessageQueueManager對象的創(chuàng)建先于第一個(gè)MessageQueue對象糯彬。
MessageQueueManager* MessageQueueManager::Instance() {
static MessageQueueManager* const instance = new MessageQueueManager;
return instance;
}
MessageQueueManager::MessageQueueManager() : processing_(0) {}
MessageQueueManager::~MessageQueueManager() {}
MessageQueue的添加與移除
MessageQueueManager提供了Add與Remove的靜態(tài)函數(shù)來往單例的管理類中添加和刪除MQ,具體如下源碼所示:
void MessageQueueManager::Add(MessageQueue* message_queue) {
return Instance()->AddInternal(message_queue);
}
void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
CritScope cs(&crit_);
// Prevent changes while the list of message queues is processed.
RTC_DCHECK_EQ(processing_, 0);
message_queues_.push_back(message_queue);
}
void MessageQueueManager::Remove(MessageQueue* message_queue) {
return Instance()->RemoveInternal(message_queue);
}
void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
{
CritScope cs(&crit_);
// Prevent changes while the list of message queues is processed.
RTC_DCHECK_EQ(processing_, 0);
std::vector<MessageQueue*>::iterator iter;
iter = std::find(message_queues_.begin(), message_queues_.end(),
message_queue);
if (iter != message_queues_.end()) {
message_queues_.erase(iter);
}
}
}
添加和刪除方法對外都是以靜態(tài)方法提供葱她,通過調(diào)用MQM的單實(shí)例的對應(yīng)的私有方法來實(shí)現(xiàn)往向量Vector中添加和刪除MQ撩扒,需要說明的注意點(diǎn)有以下幾個(gè):
1)成員crit_是臨界區(qū)類CriticalSection的對象,該成員保證多線程環(huán)境下MQM.message_queues_以及MQM.processing_訪問安全吨些,正如上面兩個(gè)函數(shù)所示搓谆,函數(shù)開頭創(chuàng)建CritScope cs(&crit_); 在cs的構(gòu)造函數(shù)中調(diào)用crit_->Enter()表示進(jìn)入臨界區(qū)炒辉,相當(dāng)于上鎖。利用函數(shù)結(jié)束后cs對象的析構(gòu)中調(diào)用crit_->Leave()表示離開臨界區(qū)挽拔,相當(dāng)于解鎖辆脸。
2)存儲(chǔ)MQ的向量聲明為:std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
其中RTC_GUARDED_BY宏在clang編譯器下展開為attribute(guarded_by(crit_))但校,指示編譯器在編譯過程中檢查代碼中所有訪問message_queues_的各個(gè)路徑上是否都先獲取了鎖crit_螃诅,如果沒有就會(huì)在編譯過程中產(chǎn)生錯(cuò)誤或者警告。而對于其他編譯器状囱,該宏不起任何作用术裸,意味著不會(huì)在編譯期進(jìn)行檢查。詳見 Thread Safety Analysis亭枷。
3)成員processing_ 聲明為: size_t processing_ RTC_GUARDED_BY(crit_); Add與Remove函數(shù)中執(zhí)行了RTC_DCHECK_EQ(processing_, 0)斷言袭艺,必須確保processing_ 為0。當(dāng)processing_不為0時(shí)叨粘,要么在執(zhí)行MQM的Clear()方法猾编,要么在執(zhí)行ProcessAllMessageQueues(),這些操作此時(shí)升敲,是不允許往MQM添加MQ或者刪除MQ這種會(huì)改變Vector列表的操作答倡,因?yàn)榍懊娴膬蓚€(gè)函數(shù)一般都會(huì)要遍歷Vector。思考一點(diǎn)驴党,不是已經(jīng)上鎖保證線程安全了嘛瘪撇,為啥還要保證processing_為0呢?繼續(xù)往下看吧~~
清理
與Add和Remove方式一摸一樣港庄,Clear函數(shù)也是以靜態(tài)方法的形式對外提供倔既。Clear函數(shù)的作用是從MQM所管理的所有MQ中刪除與入?yún)essageHandler* handler匹配消息。具體而言就是遍歷MQM中的MQ鹏氧,然后調(diào)用MQ本身的Clear()方法渤涌,這個(gè)方法比較冗長,此處就不展開敘述把还,將會(huì)在介紹MQ的文章中詳細(xì)描述实蓬。
void MessageQueueManager::Clear(MessageHandler* handler) {
return Instance()->ClearInternal(handler);
}
void MessageQueueManager::ClearInternal(MessageHandler* handler) {
// Deleted objects may cause re-entrant calls to ClearInternal. This is
// allowed as the list of message queues does not change while queues are
// cleared.
MarkProcessingCritScope cs(&crit_, &processing_);
for (MessageQueue* queue : message_queues_) {
queue->Clear(handler);
}
}
另外很重要的一點(diǎn),該方法并沒有使用前文所述的CritScope cs(&crit_)來實(shí)現(xiàn)線程安全笨篷,而是使用了一個(gè)新的類MarkProcessingCritScope cs(&crit_, &processing_)瞳秽,有什么神奇的地方?且看源碼:
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
public:
MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
: cs_(cs), processing_(processing) {
cs_->Enter();
*processing_ += 1;
}
~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
*processing_ -= 1;
cs_->Leave();
}
private:
const CriticalSection* const cs_;
size_t* processing_;
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
};
首先要知道CriticalSection是可重入的率翅,也即一個(gè)線程上調(diào)用cs_->Enter()上鎖之后练俐,在釋放鎖之前,同一個(gè)線程可以反復(fù)調(diào)用cs_->Enter()而不會(huì)阻塞冕臭,因此被稱為“可重入鎖”腺晾。同一個(gè)線程上鎖一次燕锥,processing_就增1,記錄上鎖次數(shù)悯蝉,只要processing_不為0归形,表示我正在Clear操作或者后文的Process*方法,這兩個(gè)方法不會(huì)改變MQM中Vector列表鼻由,因此暇榴,可以在解鎖之前,重入進(jìn)行反復(fù)操作蕉世,但是不允許Add和Remove操作蔼紧,因?yàn)槠鋾?huì)改變Vector,這就是為什么Add和Remove函數(shù)中既加鎖了狠轻,還要斷言processing_必須為0奸例,否則代碼就是寫得有Bug了。
處理所有MQ中的消息
這個(gè)方法目前還沒有完全的理解向楼,首先說下我自己的分析查吊。
- 方法如其名,目標(biāo)在于使得MQM中管理的MQ中的消息得到處理湖蜕。
- 當(dāng)某個(gè)線程調(diào)用該方法時(shí)逻卖,會(huì)遍歷所有的MQ,然后向MQ中投遞一個(gè)消息ID為MQID_DISPOSE的延遲消息重荠,其消息數(shù)據(jù)為ScopedIncrement對象箭阶,ScopedIncrement的構(gòu)造中將queues_not_done原子性自增1,表示該消息隊(duì)列中有消息沒有被處理戈鲁,而該延遲消息時(shí)間為0仇参,那么該延遲消息將進(jìn)入MQ的延遲消息隊(duì)列的隊(duì)首(因?yàn)镸Q的延遲消息隊(duì)列是以延遲時(shí)間排序的優(yōu)先級隊(duì)列)。記住婆殿,所有的MQ中都會(huì)投遞一個(gè)這樣的消息诈乒。
- 方法后續(xù)就是獲取調(diào)用該方法的線程所關(guān)聯(lián)的Thread對象,并通過Thread對象的ProcessMessages方法不斷的從當(dāng)前線程的MQ中取出消息進(jìn)行處理婆芦,直到queues_not_done為0怕磨,此時(shí),之前投遞到消息循環(huán)中的MQID_DISPOSE類別的消息得到處理消约,因?yàn)樵撓⒃谙⒀h(huán)中被取出后肠鲫,消息數(shù)據(jù)ScopedIncrement對象會(huì)被直接delete伏社,從而ScopedIncrement析構(gòu)完成queues_not_done原子性自減1钩乍。此時(shí),表征著該消息循環(huán)中的所有即時(shí)消息都得到了處理谬盐。
- 最大的疑惑就是,所有的MQ中都放入了一個(gè)MQID_DISPOSE類別的延遲消息渣锦,調(diào)用該方法的線程會(huì)如上所述的方式阻塞地將所有即時(shí)消息處理掉硝岗,而沒有主動(dòng)調(diào)用該方法的線程是如何保證該方法的目標(biāo)得以實(shí)現(xiàn)的,即袋毙,使得所有MQ的即時(shí)消息都能同步的立馬處理型檀?畢竟,對于其他的MQ此處也僅僅是投遞了一個(gè)MQID_DISPOSE類別的延遲消息而已听盖。
static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
return Instance()->ProcessAllMessageQueuesInternal();
}
void MessageQueueManager::ProcessAllMessageQueuesInternal() {
// This works by posting a delayed message at the current time and waiting
// for it to be dispatched on all queues, which will ensure that all messages
// that came before it were also dispatched.
volatile int queues_not_done = 0;
// This class is used so that whether the posted message is processed, or the
// message queue is simply cleared, queues_not_done gets decremented.
class ScopedIncrement : public MessageData {
public:
ScopedIncrement(volatile int* value) : value_(value) {
AtomicOps::Increment(value_);
}
~ScopedIncrement() override { AtomicOps::Decrement(value_); }
private:
volatile int* value_;
};
{
MarkProcessingCritScope cs(&crit_, &processing_);
for (MessageQueue* queue : message_queues_) {
if (!queue->IsProcessingMessagesForTesting()) {
// If the queue is not processing messages, it can
// be ignored. If we tried to post a message to it, it would be dropped
// or ignored.
continue;
}
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
new ScopedIncrement(&queues_not_done));
}
}
rtc::Thread* current = rtc::Thread::Current();
// Note: One of the message queues may have been on this thread, which is
// why we can't synchronously wait for queues_not_done to go to 0; we need
// to process messages as well.
while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
if (current) {
current->ProcessMessages(0);
}
}
}
總結(jié)
- MessageQueueManager是管理MessageQueue的類胀溺,與ThreadManager管理Thread的方式是不同的。MQM內(nèi)部的向量成員std::Vector<MessageQueue*> message_queues_用于存儲(chǔ)所有的MQ媳溺,時(shí)機(jī)是在MQ的構(gòu)造函數(shù)中調(diào)用MQM的Add靜態(tài)方法將自身指針放入message_queues_中月幌。
- MQM對外提供了所有方法都是靜態(tài)方法碍讯,這些靜態(tài)方法均是調(diào)用MQM單實(shí)例的私有的同名函數(shù)來實(shí)現(xiàn)添加悬蔽,刪除MQ,清除所有MQ中含有某個(gè)MessageHandler的所有消息捉兴,處理所有MQ中的消息蝎困。
- 為了線程安全,MQM中的所有操作都由CriticalSection crit_來提供上鎖的操作倍啥,該對象對外暴露的是Windows上臨界區(qū)概念的API禾乘,其提供了Enter(),Leave()等進(jìn)出臨界區(qū)的方法虽缕。CriticalSection跨平臺(tái)是如何實(shí)現(xiàn)的始藕?請看WebRTC源碼分析-線程安全之CriticalSection。 實(shí)踐上氮趋,在需要上鎖的地方創(chuàng)建一個(gè)CritScope cs(&crit_)局部對象伍派,這個(gè)cs局部對象的構(gòu)造函數(shù)中調(diào)用crit_->Enter()進(jìn)行上鎖,利用局部對象在{}之后的析構(gòu)中調(diào)用crit_->Leave()來解鎖剩胁。
- 為了給MQM的Clear和Process* 方法提供可重入訪問方法诉植,又需要與Add和Remove方法互斥,因此昵观,在Clear和Procees* 方法中使用MarkProcessingCritScope來提供上鎖計(jì)數(shù)processing_晾腔,Add和Remove中斷言processing_來確保操作安全。
- 關(guān)于Procees* 如何使得所有線程的消息得到處理的疑惑啊犬,可能是局限于MQM類的分析無法全局看到實(shí)現(xiàn)機(jī)制灼擂,當(dāng)前只知道調(diào)用該方法的線程是如何達(dá)到處理所有即時(shí)消息的。而相關(guān)的方法本文沒有展開來說觉至,會(huì)在介紹Thread和MQ的文章中詳細(xì)闡述剔应。如果誰理解了該函數(shù)如何使得其他線程處理所有即時(shí)消息的機(jī)制,希望留言。