brpc之bthread源碼淺淅(一)

這幾篇主要是分析bthread是什么趣竣,怎么用耘擂,和實(shí)現(xiàn)原理痴脾,這里先使用官方鏈接中的參考來說明。
協(xié)程和bthread區(qū)別:協(xié)程指N:1線程庫梳星,bthread是一個M:N線程庫赞赖;參考之前的分析情況,一個線程中冤灾,可能運(yùn)行一組協(xié)程前域,且不會被挪到其他線程中執(zhí)行;而此時的bthread就不一樣韵吨,通過work stealing匿垄,可由其他線程偷bthread來執(zhí)行,一個bthread被卡住不會影響其他bthread归粉,而協(xié)程卡住則會影響同一個線程中的其他協(xié)程執(zhí)行椿疗,即沒有機(jī)會主動yield

若bthread因bthread API而阻塞糠悼,它會把當(dāng)前pthread worker讓給其他bthread届榄。若bthread因pthread API或系統(tǒng)函數(shù)而阻塞,當(dāng)前pthread worker上待運(yùn)行的bthread會被其他空閑的pthread worker偷過去運(yùn)行倔喂。個人感覺铝条,協(xié)程也是可以實(shí)現(xiàn)類似的功能靖苇,即協(xié)程不再與相關(guān)的線程綁定。

在開始分析bthread前班缰,先介紹下WorkStealingQueue贤壁,后面再稍微分析下mutex中對于bthread時的特殊處理。WorkStealingQueue類似一種循環(huán)隊(duì)列埠忘,如這里DPDK中無鎖環(huán)形隊(duì)列實(shí)現(xiàn)類似脾拆,使用內(nèi)存屏障指令和CAS指令實(shí)現(xiàn)多生產(chǎn)者和多消費(fèi)者的隊(duì)列。不過在brpc中的實(shí)現(xiàn)莹妒,雖然代碼實(shí)現(xiàn)不多名船,里面有一些C++11的同步指令,我看的有點(diǎn)暈动羽,效果就是在正確的前提下包帚,性能沒問題渔期。最后再回到bthread和coroutine的區(qū)別运吓,這里不再說明線程。

bthread源碼在bthread.h/cpp中疯趟,和線程及協(xié)程一樣拘哨,有鎖/條件變量/線程私有存儲等,雖然在之前的分析中并未涉及到協(xié)程級的鎖和條件變量及私有變量信峻,可以參考源碼自己分析下倦青,不是很難。萬事開頭難盹舞,本應(yīng)該先從相關(guān)example分析比較簡單产镐,但這里先從基礎(chǔ)代碼分析。

下面是key的實(shí)現(xiàn):

typedef unsigned int pthread_key_t;
 50 // Key of thread-local data, created by bthread_key_create.
 51 typedef struct {
 52     uint32_t index;    // index in KeyTable
 53     uint32_t version;  // ABA avoidance
 54 } bthread_key_t;

408 int bthread_key_create(bthread_key_t* key, void (*dtor)(void*)) {
409     if (dtor == NULL) {       
410         return bthread_key_create2(key, NULL, NULL);
411     } else {
412         return bthread_key_create2(key, bthread::arg_as_dtor, (const void*)dtor);
413     }
414 }

383 int bthread_key_create2(bthread_key_t* key,
384                         void (*dtor)(void*, const void*),
385                         const void* dtor_args) {
386     uint32_t index = 0;
387     {
388         BAIDU_SCOPED_LOCK(bthread::s_key_mutex);//lock
389         if (bthread::nfreekey > 0) {
390             index = bthread::s_free_keys[--bthread::nfreekey];
391         } else if (bthread::nkey < bthread::KEYS_MAX) {
392             index = bthread::nkey++;
393         } else {
394             return EAGAIN;  // what pthread_key_create returns in this case.
395         }
396     }
397     bthread::s_key_info[index].dtor = dtor;
398     bthread::s_key_info[index].dtor_args = dtor_args;
399     key->index = index;
400     key->version = bthread::s_key_info[index].version;
401     if (key->version == 0) {
402         ++bthread::s_key_info[index].version;
403         ++key->version;
404     }
405     return 0;
406 }

416 int bthread_key_delete(bthread_key_t key) {
417     if (key.index < bthread::KEYS_MAX &&
418         key.version == bthread::s_key_info[key.index].version) {
419         BAIDU_SCOPED_LOCK(bthread::s_key_mutex);
420         if (key.version == bthread::s_key_info[key.index].version) {
421             if (++bthread::s_key_info[key.index].version == 0) {
422                 ++bthread::s_key_info[key.index].version;//version為1
423             }
424             bthread::s_key_info[key.index].dtor = NULL;
425             bthread::s_key_info[key.index].dtor_args = NULL;
426             bthread::s_free_keys[bthread::nfreekey++] = key.index;
427             return 0;
428         }
429     }
431     return EINVAL;
432 }

 58 // destructors/version of TLS.
 59 struct KeyInfo {
 60     uint32_t version;
 61     void (*dtor)(void*, const void*);
 62     const void* dtor_args;
 63 };  
 64 static KeyInfo s_key_info[KEYS_MAX] = {};
 65     
 66 // For allocating keys.
 67 static pthread_mutex_t s_key_mutex = PTHREAD_MUTEX_INITIALIZER;
 68 static size_t nfreekey = 0;
 69 static size_t nkey = 0;
 70 static uint32_t s_free_keys[KEYS_MAX]; //992

249 static void arg_as_dtor(void* data, const void* arg) {
250     typedef void (*KeyDtor)(void*);
251     return ((KeyDtor)arg)(data);
252 }

以上申請key的時候踢步,從全局free列表中獲取可用的index癣亚,一開始是為0,所以直接++获印,釋放的時候把index存入s_free_keys中述雾,在有可用的index情況下,從最后開始取兼丰。

key的設(shè)置和獲取玻孟,其中的TaskGroup后面再介紹:

434 // NOTE: Can't borrow_keytable in bthread_setspecific, otherwise following
435 // memory leak may occur:
436 //  -> bthread_getspecific fails to borrow_keytable and returns NULL.
437 //  -> bthread_setspecific succeeds to borrow_keytable and overwrites old data
438 //     at the position with newly created data, the old data is leaked.
439 int bthread_setspecific(bthread_key_t key, void* data) {
440     bthread::KeyTable* kt = bthread::tls_bls.keytable;
441     if (NULL == kt) {
442         kt = new (std::nothrow) bthread::KeyTable;
443         if (NULL == kt) {
444             return ENOMEM;
445         }
446         bthread::tls_bls.keytable = kt;
447         bthread::TaskGroup* const g = bthread::tls_task_group;
448         if (g) {
449             g->current_task()->local_storage.keytable = kt;
450         }
451         if (!bthread::tls_ever_created_keytable) {
452             bthread::tls_ever_created_keytable = true;
453             CHECK_EQ(0, butil::thread_atexit(bthread::cleanup_pthread, kt));//結(jié)束時銷毀函數(shù)
454         }
455     }
456     return kt->set_data(key, data);
457 }

459 void* bthread_getspecific(bthread_key_t key) {
460     bthread::KeyTable* kt = bthread::tls_bls.keytable;
461     if (kt) {
462         return kt->get_data(key);
463     }
464     bthread::TaskGroup* const g = bthread::tls_task_group;
465     if (g) {
466         bthread::TaskMeta* const task = g->current_task();
467         kt = bthread::borrow_keytable(task->attr.keytable_pool);
468         if (kt) {
469             g->current_task()->local_storage.keytable = kt;
470             bthread::tls_bls.keytable = kt;
471             return kt->get_data(key);
472         }
473     }
474     return NULL;
475 }

208 static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
209     if (pool != NULL && pool->free_keytables) {
210         BAIDU_SCOPED_LOCK(pool->mutex);
211         KeyTable* p = (KeyTable*)pool->free_keytables;
212         if (p) {
213             pool->free_keytables = p->next;
214             return p;
215         }
216     }
217     return NULL;
218 } 

如上面的注釋,Can't borrow_keytable in bthread_setspecific鳍征,防止出現(xiàn)memory leak黍翎。

因?yàn)闆]有在源碼中找到框架中使用的地方,在test/example中有使用艳丛,且這里說明一些注意點(diǎn)thread-local問題

調(diào)用阻塞的bthread函數(shù)后玩敏,所在的pthread很可能改變斗忌,這使pthread_getspecificgcc __thread和c++11 thread_local變量旺聚,pthread_self()等的值變化了织阳,如下代碼的行為是不可預(yù)計(jì)的:

thread_local SomeObject obj;
SomeObject* p = &obj;
p->bar();
bthread_usleep(1000);
p->bar();

bthread_usleep之后,該bthread很可能身處不同的pthread砰粹,這時p指向了之前pthread的thread_local變量唧躲,繼續(xù)訪問p的結(jié)果無法預(yù)計(jì)。這種使用模式往往發(fā)生在用戶使用線程級變量傳遞業(yè)務(wù)變量的情況碱璃。為了防止這種情況弄痹,應(yīng)該謹(jǐn)記:

  • 不使用線程級變量傳遞業(yè)務(wù)數(shù)據(jù)。這是一種槽糕的設(shè)計(jì)模式嵌器,依賴線程級數(shù)據(jù)的函數(shù)也難以單測肛真。判斷是否濫用:如果不使用線程級變量,業(yè)務(wù)邏輯是否還能正常運(yùn)行爽航?線程級變量應(yīng)只用作優(yōu)化手段蚓让,使用過程中不應(yīng)直接或間接調(diào)用任何可能阻塞的bthread函數(shù)。比如使用線程級變量的tcmalloc就不會和bthread有任何沖突讥珍。
  • 如果一定要(在業(yè)務(wù)中)使用線程級變量历极,使用bthread_key_create和bthread_getspecific。

這里使用session_data_and_thread_local測試中的部分來說明:

 28 class DataFactory {
 29 public:
 30     virtual ~DataFactory() {}
 31         
 32     // Implement this method to create a piece of data.
 33     // Notice that this method is const.
 34     // Returns the data, NULL on error.
 35     virtual void* CreateData() const = 0;
 36 
 37     // Implement this method to destroy a piece of data that was created
 38     // by Create().
 39     // Notice that this method is const.
 40     virtual void DestroyData(void*) const = 0;
 41 };

創(chuàng)建數(shù)據(jù)塊的抽象接口類衷佃,測試類:

 71 class MyThreadLocalDataFactory : public brpc::DataFactory {
 72 public:
 73     void* CreateData() const {
 74         return new MyThreadLocalData;
 75     }
 76 
 77     void DestroyData(void* d) const {
 78         MyThreadLocalData::deleter(d);
 79     }
 80 };

 57 struct MyThreadLocalData {
 58     MyThreadLocalData() : y(0) {
 60     }
 61     ~MyThreadLocalData() {
 63     }
 64     static void deleter(void* d) {
 66     }
 68     int y;
 69 };
352     struct ThreadLocalOptions {
353         bthread_key_t tls_key;
354         const DataFactory* thread_local_data_factory;
355         
356         ThreadLocalOptions()
357             : tls_key(INVALID_BTHREAD_KEY)
358             , thread_local_data_factory(NULL) {}
359     };

 790         _tl_options.thread_local_data_factory = _options.thread_local_data_factory;
 791         if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
 792                                 _options.thread_local_data_factory) != 0) {
 793             LOG(ERROR) << "Fail to create thread-local key";
 794             return -1;
 795         }

在server收到client請求時會進(jìn)行趟卸,設(shè)置ThreadLocalOptions

364     if (server->thread_local_options().thread_local_data_factory) {
365         bthread_assign_data((void*)&server->thread_local_options());
366     }

測試代碼:

138         MyThreadLocalData* tls =
139             static_cast<MyThreadLocalData*>(brpc::thread_local_data());
140         if (tls == NULL) {
141             cntl->SetFailed("Require ServerOptions.thread_local_data_factory "
142                             "to be set with a correctly implemented instance");
143             LOG(ERROR) << cntl->ErrorText();
144             return;
145         }

1683 void* thread_local_data() {
1684     const Server::ThreadLocalOptions* tl_options =
1685         static_cast<const Server::ThreadLocalOptions*>(bthread_get_assigned_data());
1686     if (tl_options == NULL) { // not in server threads.
1687         return NULL;
1688     }      
1689     if (BAIDU_UNLIKELY(tl_options->thread_local_data_factory == NULL)) {
1691         return NULL;
1692     }  
1693     void* data = bthread_getspecific(tl_options->tls_key);
1694     if (data == NULL) {
1695         data = tl_options->thread_local_data_factory->CreateData();
1696         if (data != NULL) {
1697             CHECK_EQ(0, bthread_setspecific(tl_options->tls_key, data));
1698         }
1699     }      
1700     return data;
1701 }

以上根據(jù)bthread所在的線程,通過給bthread分配bthread_key來索引tls局部存儲數(shù)據(jù)氏义,當(dāng)bthread切換到其他線程時锄列,也不會出現(xiàn)上面失效的情況。

會在切換bthread時重置:

589     if (__builtin_expect(next_meta != cur_meta, 1)) {
590         g->_cur_meta = next_meta;
591         // Switch tls_bls
595         cur_meta->local_storage = tls_bls;
596         tls_bls = next_meta->local_storage;

下面是KeyTable的類聲明:

138 class BAIDU_CACHELINE_ALIGNMENT KeyTable {
139 public:
140     KeyTable() : next(NULL) {
141         memset(_subs, 0, sizeof(_subs));
142         nkeytable.fetch_add(1, butil::memory_order_relaxed);
143     }
144 
145     ~KeyTable() {
146         nkeytable.fetch_sub(1, butil::memory_order_relaxed);
147         for (int ntry = 0; ntry < PTHREAD_DESTRUCTOR_ITERATIONS; ++ntry) {
148             for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
149                 if (_subs[i]) {
150                     _subs[i]->clear(i * KEY_2NDLEVEL_SIZE);
151                 }
152             }
153             bool all_cleared = true;
154             for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
155                 if (_subs[i] != NULL && !_subs[i]->cleared()) {
156                     all_cleared = false;
157                     break;
158                 }
159             }
160             if (all_cleared) {
161                 for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
162                     delete _subs[i];
163                 }
164                 return;
165             }
166         }
168     }
170     inline void* get_data(bthread_key_t key) const {
171         const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;
172         if (subidx < KEY_1STLEVEL_SIZE) {
173             const SubKeyTable* sub_kt = _subs[subidx];
174             if (sub_kt) {
175                 return sub_kt->get_data(
176                     key.index - subidx * KEY_2NDLEVEL_SIZE, key.version);
177             }
178         }
179         return NULL;
180     }
181 
182     inline int set_data(bthread_key_t key, void* data) {
183         const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;//32
184         if (subidx < KEY_1STLEVEL_SIZE &&
185             key.version == s_key_info[key.index].version) {
186             SubKeyTable* sub_kt = _subs[subidx];
187             if (sub_kt == NULL) {
188                 sub_kt = new (std::nothrow) SubKeyTable;
189                 if (NULL == sub_kt) {
190                     return ENOMEM;
191                 }
192                 _subs[subidx] = sub_kt;
193             }
194             sub_kt->set_data(key.index - subidx * KEY_2NDLEVEL_SIZE,
195                              key.version, data);
196             return 0;
197         }
199         return EINVAL;
200     }
201 
202 public:
203     KeyTable* next;
204 private:
205     SubKeyTable* _subs[KEY_1STLEVEL_SIZE];//31
206 };

其中bthread_key_t會根據(jù)index索引到SubKeyTable某個位置惯悠,并對version進(jìn)行校驗(yàn)邻邮,并set到subkeytable中的某個偏移處key.index - subidx * KEY_2NDLEVEL_SIZE

 78 class BAIDU_CACHELINE_ALIGNMENT SubKeyTable {
 79 public:
 80     SubKeyTable() {
 81         memset(_data, 0, sizeof(_data));
 82         nsubkeytable.fetch_add(1, butil::memory_order_relaxed);
 83     }
 84 
 85     // NOTE: Call clear first.
 86     ~SubKeyTable() {
 87         nsubkeytable.fetch_sub(1, butil::memory_order_relaxed);
 88     }
 89 
 90     void clear(uint32_t offset) {
 91         for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
 92             void* p = _data[i].ptr;
 93             if (p) {
 94                 // Set the position to NULL before calling dtor which may set
 95                 // the position again.
 96                 _data[i].ptr = NULL;
 97 
 98                 KeyInfo info = bthread::s_key_info[offset + i];
 99                 if (info.dtor && _data[i].version == info.version) {
100                     info.dtor(p, info.dtor_args);
101                 }
102             }
103         }
104     }
106     bool cleared() const {
107         // We need to iterate again to check if every slot is empty. An
108         // alternative is remember if set_data() was called during clear.
109         for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
110             if (_data[i].ptr) {
111                 return false;
112             }
113         }
114         return true;
115     }
116 
117     inline void* get_data(uint32_t index, uint32_t version) const {
118         if (_data[index].version == version) {
119             return _data[index].ptr;
120         }
121         return NULL;
122     }
123     inline void set_data(uint32_t index, uint32_t version, void* data) {
124         _data[index].version = version;
125         _data[index].ptr = data;
126     }
127 
128 private:
129     struct Data {
130         uint32_t version;
131         void* ptr;
132     };
133     Data _data[KEY_2NDLEVEL_SIZE];
134 };

關(guān)于bthread的bthread_mutex_t已經(jīng)在之前分析過吮螺,這里不再分析饶囚。

下面是條件變量bthread_cond_t的分析,復(fù)用之前的Butex

116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
117     Butex() {} 
118     ~Butex() {}
119         
120     butil::atomic<int> value;
121     ButexWaiterList waiters;
122     internal::FastPthreadMutex waiter_lock;
123 };

155 typedef struct {
156     int64_t duration_ns;
157     size_t sampling_range;
158 } bthread_contention_site_t;

160 typedef struct {
161     unsigned* butex;
162     bthread_contention_site_t csite;
163 } bthread_mutex_t;
164 
168 typedef struct {
169     bthread_mutex_t* m;
170     int* seq;
171 } bthread_cond_t;

初始化:

 48 int bthread_cond_init(bthread_cond_t* __restrict c,
 49                       const bthread_condattr_t*) {
 50     c->m = NULL;
 51     c->seq = bthread::butex_create_checked<int>();
 52     *c->seq = 0;
 53     return 0;
 54 }   
 55     
 56 int bthread_cond_destroy(bthread_cond_t* c) {
 57     bthread::butex_destroy(c->seq);
 58     c->seq = NULL;
 59     return 0;
 60 }
 29 struct CondInternal {
 30     butil::atomic<bthread_mutex_t*> m;
 31     butil::atomic<int>* seq;
 32 };

 87 int bthread_cond_wait(bthread_cond_t* __restrict c,
 88                       bthread_mutex_t* __restrict m) {
 89     bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
 90     const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
 91     if (ic->m.load(butil::memory_order_relaxed) != m) {
 92         // bind m to c
 93         bthread_mutex_t* expected_m = NULL;
 94         if (!ic->m.compare_exchange_strong(
 95                 expected_m, m, butil::memory_order_relaxed)) {
 96             return EINVAL;
 97         }
 98     }   
 99     bthread_mutex_unlock(m);
100     int rc1 = 0;
101     if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
102         errno != EWOULDBLOCK && errno != EINTR/*note*/) {
103         // EINTR should not be returned by cond_*wait according to docs on
104         // pthread, however spurious wake-up is OK, just as we do here
105         // so that users can check flags in the loop often companioning
106         // with the cond_wait ASAP. For example:
107         //   mutex.lock();
108         //   while (!stop && other-predicates) {
109         //     cond_wait(&mutex);
110         //   }
111         //   mutex.unlock();
112         // After interruption, above code should wake up from the cond_wait
113         // soon and check the `stop' flag and other predicates.
114         rc1 = errno;
115     }
116     const int rc2 = bthread_mutex_lock_contended(m);
117     return (rc2 ? rc2 : rc1);
118 }

 62 int bthread_cond_signal(bthread_cond_t* c) {
 63     bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
 64     // ic is probably dereferenced after fetch_add, save required fields before
 65     // this point
 66     butil::atomic<int>* const saved_seq = ic->seq;
 67     saved_seq->fetch_add(1, butil::memory_order_release);
 68     // don't touch ic any more
 69     bthread::butex_wake(saved_seq);
 70     return 0;
 71 }

butex_wait回來后還要再上鎖bthread_mutex_lock_contended鸠补,這里說明一下pthread中的使用及bthread中test相關(guān)的bthread_cond_t例子:

376     int Signal() {
377         int ret = 0;
378         bthread_mutex_lock(&_mutex);
379         _count --;
380         bthread_cond_signal(&_cond);
381         bthread_mutex_unlock(&_mutex);
382         return ret;
383     }           
384     
385     int Wait() {
386         int ret = 0;
387         bthread_mutex_lock(&_mutex);//lock
388         while (_count > 0) {
389             ret = bthread_cond_wait(&_cond, &_mutex);//wait
390         }
391         bthread_mutex_unlock(&_mutex);//unlock
392         return ret;
393     }
394 private:
395     int _count;
396     bthread_cond_t _cond;
397     bthread_mutex_t _mutex;

bthread_cond_wait調(diào)用時萝风,bthread_cond_t是需要與一個bthread_mutex_t綁定,先取出seq值再unlock m紫岩,接著butex_wait规惰,這里的列出ButexBthreadWaiter大概實(shí)現(xiàn)brpc之mutex源碼分析

 97 struct ButexBthreadWaiter : public ButexWaiter {
 98     TaskMeta* task_meta;
 99     TimerThread::TaskId sleep_id;
100     WaiterState waiter_state;
101     int expected_value; 
102     Butex* initial_butex;
103     TaskControl* control;
104 };

611 int butex_wait(void* arg, int expected_value, const timespec* abstime) {
612     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
613     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
614         errno = EWOULDBLOCK;
615         // Sometimes we may take actions immediately after unmatched butex,
616         // this fence makes sure that we see changes before changing butex.
617         butil::atomic_thread_fence(butil::memory_order_acquire);
618         return -1;
619     }
620     TaskGroup* g = tls_task_group;
621     if (NULL == g || g->is_current_pthread_task()) {
622         return butex_wait_from_pthread(g, b, expected_value, abstime);
623     }
624     ButexBthreadWaiter bbw;
625     // tid is 0 iff the thread is non-bthread
626     bbw.tid = g->current_tid();
627     bbw.container.store(NULL, butil::memory_order_relaxed);
628     bbw.task_meta = g->current_task();
629     bbw.sleep_id = 0;
630     bbw.waiter_state = WAITER_STATE_READY;
631     bbw.expected_value = expected_value;
632     bbw.initial_butex = b;
633     bbw.control = g->control();
635     if (abstime != NULL) {
636         // Schedule timer before queueing. If the timer is triggered before
637         // queueing, cancel queueing. This is a kind of optimistic locking.
638         if (butil::timespec_to_microseconds(*abstime) <
639             (butil::gettimeofday_us() + MIN_SLEEP_US)) {
640             // Already timed out.
641             errno = ETIMEDOUT;
642             return -1;
643         }
644         bbw.sleep_id = get_global_timer_thread()->schedule(
645             erase_from_butex_and_wakeup, &bbw, *abstime);
646         if (!bbw.sleep_id) {  // TimerThread stopped.
647             errno = ESTOP;
648             return -1;
649         }
650     }
656     // release fence matches with acquire fence in interrupt_and_consume_waiters
657     // in task_group.cpp to guarantee visibility of `interrupted'.
658     bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
659     g->set_remained(wait_for_butex, &bbw);
660     TaskGroup::sched(&g);
661 
662     // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
663     // running and using bbw. The chance is small, just spin until it's done.
664     BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
665                  30/*nops before sched_yield*/);
666 
667     // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
668     // Spin until current_waiter != NULL.
669     BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
670                      NULL, butil::memory_order_acquire) == NULL,
671                  30/*nops before sched_yield*/);
676     bool is_interrupted = false;//return value, more code...

對于bthread等待的實(shí)現(xiàn),這里需要把當(dāng)前跑的bthread上下文保存起來泉蝌,然后掛到某個條件變量上去歇万。task_group中的set_remained作用會在后面再詳細(xì)分析揩晴。

495 static void wait_for_butex(void* arg) {
496     ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
497     Butex* const b = bw->initial_butex;
512     {   
513         BAIDU_SCOPED_LOCK(b->waiter_lock);
514         if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
515             bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
516         } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
517                    !bw->task_meta->interrupted) {
518             b->waiters.Append(bw);
519             bw->container.store(b, butil::memory_order_relaxed);
520             return;
521         }
522     }
528     unsleep_if_necessary(bw, get_global_timer_thread());
529     tls_task_group->ready_to_run(bw->tid);
540 }

wait的bthread即將執(zhí)行。

453 static void erase_from_butex_and_wakeup(void* arg) {
454     erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
455 }

462 inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
463     // `bw' is guaranteed to be valid inside this function because waiter
464     // will wait until this function being cancelled or finished.
465     // NOTE: This function must be no-op when bw->container is NULL.
466     bool erased = false;
467     Butex* b;
468     int saved_errno = errno;
469     while ((b = bw->container.load(butil::memory_order_acquire))) {
470         // b can be NULL when the waiter is scheduled but queued.
471         BAIDU_SCOPED_LOCK(b->waiter_lock);
472         if (b == bw->container.load(butil::memory_order_relaxed)) {
473             bw->RemoveFromList();
474             bw->container.store(NULL, butil::memory_order_relaxed);
475             if (bw->tid) {
476                 static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
477             }
478             erased = true;
479             break;
480         }
481     }
482     if (erased && wakeup) {
483         if (bw->tid) {//bthread
484             ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
485             get_task_group(bbw->control)->ready_to_run_general(bw->tid);
486         } else {//pthread
487             ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
488             wakeup_pthread(pw);
489         }
490     }
491     errno = saved_errno;
492     return erased;
493 }

以上butex_wait可能會把當(dāng)前bthread切出去贪磺,在sched 中處理硫兰,后面再分析。如果有等待時間的話寒锚,未超時則加入定時器否則直接返回劫映,后面超時后會自己喚醒,如erase_from_butex實(shí)現(xiàn)刹前,并在后面進(jìn)行調(diào)度ready_to_run_general泳赋。

如果直接返回EWOULDBLOCK此時重新對m加鎖,整個wait的實(shí)現(xiàn)思路差不多與pthread_cond_wait相同喇喉。

265 int butex_wake(void* arg) {
266     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
267     ButexWaiter* front = NULL;
268     {
269         BAIDU_SCOPED_LOCK(b->waiter_lock);
270         if (b->waiters.empty()) {
271             return 0;
272         }
273         front = b->waiters.head()->value();
274         front->RemoveFromList();
275         front->container.store(NULL, butil::memory_order_relaxed);
276     }
277     if (front->tid == 0) {//pthread
278         wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
279         return 1;
280     }
281     ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
282     unsleep_if_necessary(bbw, get_global_timer_thread());
283     TaskGroup* g = tls_task_group;
284     if (g) {
285         TaskGroup::exchange(&g, bbw->tid);
286     } else {
287         bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
288     }
289     return 1;
290 }

喚醒時祖今,可能要把bthread(front->tid)從定時器中刪除,并把自己從隊(duì)列中移走拣技,并進(jìn)行調(diào)度exchange千诬。

這里列出了man pthread中的cond條件變量的wait/signal實(shí)現(xiàn)偽代碼供參考:

[pthread_cond_wait(mutex, cond)]
value = cond->value;                 
pthread_mutex_unlock(mutex);        
pthread_mutex_lock(cond->mutex);      
if (value == cond->value) {
    me->next_cond = cond->waiter;
    cond->waiter = me;
    pthread_mutex_unlock(cond->mutex);  
    unable_to_run(me);               
} else
    pthread_mutex_unlock(cond->mutex); 
pthread_mutex_lock(mutex);           

[pthread_cond_signal(cond)]
pthread_mutex_lock(cond->mutex);      
cond->value++;                        
if (cond->waiter) { 
    sleeper = cond->waiter;           
    cond->waiter = sleeper->next_cond;  
    able_to_run(sleeper);          
}
pthread_mutex_unlock(cond->mutex);  

剩下的接口,其中bthread_cond_timedwaitbthread_cond_wait一樣过咬,只不過多個等待時間:

 73 int bthread_cond_broadcast(bthread_cond_t* c) {
 74     bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
 75     bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
 76     butil::atomic<int>* const saved_seq = ic->seq;
 77     if (!m) {
 78         return 0;
 79     }
 80     void* const saved_butex = m->butex;
 81     // Wakeup one thread and requeue the rest on the mutex.
 82     ic->seq->fetch_add(1, butil::memory_order_release);
 83     bthread::butex_requeue(saved_seq, saved_butex);
 84     return 0;
 85 }

120 int bthread_cond_timedwait(bthread_cond_t* __restrict c,
121                            bthread_mutex_t* __restrict m,
122                            const struct timespec* __restrict abstime) {
123     bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
124     const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
125     if (ic->m.load(butil::memory_order_relaxed) != m) {
126         // bind m to c
127         bthread_mutex_t* expected_m = NULL;
128         if (!ic->m.compare_exchange_strong(
129                 expected_m, m, butil::memory_order_relaxed)) {
130             return EINVAL;
131         }
132     }
133     bthread_mutex_unlock(m);
134     int rc1 = 0;
135     if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 &&
136         errno != EWOULDBLOCK && errno != EINTR/*note*/) {
137         // note: see comments in bthread_cond_wait on EINTR.
138         rc1 = errno;
139     }
140     const int rc2 = bthread_mutex_lock_contended(m);
141     return (rc2 ? rc2 : rc1);
142 }

412 int butex_requeue(void* arg, void* arg2) {
413     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);//條件變量的互斥鎖
414     Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);//外層的互斥鎖
415         
416     ButexWaiter* front = NULL;
417     {
418         std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
419         std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
420         butil::double_lock(lck1, lck2);//按地址大小加鎖大渤,避免死鎖
421         if (b->waiters.empty()) {
422             return 0;//等待隊(duì)列為空
423         }
424 
425         front = b->waiters.head()->value();
426         front->RemoveFromList();//喚醒隊(duì)頭的bthread
427         front->container.store(NULL, butil::memory_order_relaxed);
428     
429         while (!b->waiters.empty()) {//bthread重新入mutex隊(duì)列待下一次喚醒
430             ButexWaiter* bw = b->waiters.head()->value();
431             bw->RemoveFromList();
432             m->waiters.Append(bw);
433             bw->container.store(m, butil::memory_order_relaxed);
434         }
435     }
437     if (front->tid == 0) {  // which is a pthread
438         wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
439         return 1;
440     }   
441     ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
442     unsleep_if_necessary(bbw, get_global_timer_thread());//刪除定時器
443     TaskGroup* g = tls_task_group;
444     if (g) {
445         TaskGroup::exchange(&g, front->tid);//等待調(diào)度
446     } else {
447         bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
448     }   
449     return 1;
450 }

對于bthread_cond_broadcast制妄,因?yàn)閜thread中的實(shí)現(xiàn)pthread_cond_broadcast可能有點(diǎn)區(qū)別掸绞,為了避免驚群以及性能問題(早期版本)。

這里額外引用bthread主要作者gejun在知乎上的回答“如果題主說的"驚群"是這個意思: 對有N個線程等著的condition調(diào)用broadcast, 最后所有線程醒來的次數(shù)是O(N)而不是O(N^2). 那么現(xiàn)在的broadcast實(shí)現(xiàn)應(yīng)該都不會驚群.常見的broadcast實(shí)現(xiàn)是只喚醒等待在condition上的第一個線程, 其他線程都轉(zhuǎn)移到condition使用的那個mutex中去(在linux下是futex requeue操作), 這樣第一個線程醒來做完事并unlock那個mutex時, 就會喚醒下一個線程, 以此類推. 其他平臺同理這里”耕捞。

而在bthread_cond_broadcast實(shí)現(xiàn)中衔掸,如注釋中“Wakeup one thread and requeue the rest on the mutex.”只喚醒一個而其他的再重新入隊(duì)列butex_requeue,相關(guān)分析已經(jīng)注釋俺抽。

其他一些接口如調(diào)度讓出cpu執(zhí)行權(quán)限敞映,sleep(非真的掛起),增加工作線程接口:

367 int bthread_yield(void) {
368     bthread::TaskGroup* g = bthread::tls_task_group;
369     if (NULL != g && !g->is_current_pthread_task()) {
370         bthread::TaskGroup::yield(&g);////bthread讓出cpu讓其他bthread執(zhí)行磷斧,并非把線程切走
371         return 0;
372     }
373     // pthread_yield is not available on MAC
374     return sched_yield();//相當(dāng)于該線程讓出cpu
375 }

886 void TaskGroup::yield(TaskGroup** pg) {
887     TaskGroup* g = *pg; 
888     ReadyToRunArgs args = { g->current_tid(), false };
889     g->set_remained(ready_to_run_in_worker, &args);
890     sched(pg); //后面再分析
891 }

359 int bthread_usleep(uint64_t microseconds) {
360     bthread::TaskGroup* g = bthread::tls_task_group;
361     if (NULL != g && !g->is_current_pthread_task()) {
362         return bthread::TaskGroup::usleep(&g, microseconds);//bthread讓出cpu讓其他bthread執(zhí)行振愿,并非把線程切走
363     }
364     return ::usleep(microseconds);//該線程sleep去
365 }

781 // To be consistent with sys_usleep, set errno and return -1 on error.
782 int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
783     if (0 == timeout_us) {
784         yield(pg);
785         return 0;
786     }
787     TaskGroup* g = *pg;
788     // We have to schedule timer after we switched to next bthread otherwise
789     // the timer may wake up(jump to) current still-running context.
790     SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
791     g->set_remained(_add_sleep_event, &e);
792     sched(pg);
793     g = *pg;
794     e.meta->current_sleep = 0;
795     if (e.meta->interrupted) {
796         // Race with set and may consume multiple interruptions, which are OK.
797         e.meta->interrupted = false;
803         errno = (e.meta->stop ? ESTOP : EINTR);
804         return -1;
805     }
806     return 0;
807 }

268 int bthread_setconcurrency(int num) {
269     //more code...
308     if (num > bthread::FLAGS_bthread_concurrency) {
309         // Create more workers if needed.
310         bthread::FLAGS_bthread_concurrency +=
311             c->add_workers(num - bthread::FLAGS_bthread_concurrency);//增加工作線程,后面分析
312         return 0;
313     }
314     return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
315 }
247 int bthread_join(bthread_t tid, void** thread_return) {
248     return bthread::TaskGroup::join(tid, thread_return);
249 }

467 int TaskGroup::join(bthread_t tid, void** return_value) {
468     if (__builtin_expect(!tid, 0)) {  // tid of bthread is never 0.
469         return EINVAL;
470     }
471     TaskMeta* m = address_meta(tid);
472     if (__builtin_expect(!m, 0)) {
473         // The bthread is not created yet, this join is definitely wrong.
474         return EINVAL;
475     }
476     TaskGroup* g = tls_task_group;
477     if (g != NULL && g->current_tid() == tid) {
478         // joining self causes indefinite waiting.
479         return EINVAL;
480     }
481     const uint32_t expected_version = get_version(tid);
482     while (*m->version_butex == expected_version) {//wait
483         if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
484             errno != EWOULDBLOCK && errno != EINTR) {
485             return errno;
486         }
487     }
488     if (return_value) {
489         *return_value = NULL;
490     }
491     return 0;
492 }

因?yàn)閯?chuàng)建bthread時的idmake_tid(*m->version_butex, slot);弛饭,執(zhí)行完后更新version_butexwake up冕末,部分代碼:

327         // Increase the version and wake up all joiners, if resulting version
328         // is 0, change it to 1 to make bthread_t never be 0. Any access
329         // or join to the bthread after changing version will be rejected.
330         // The spinlock is for visibility of TaskGroup::get_attr.
331         {
332             BAIDU_SCOPED_LOCK(m->version_lock);
333             if (0 == ++*m->version_butex) {
334                 ++*m->version_butex;
335             }
336         }
337         butex_wake_except(m->version_butex, 0);
350 int butex_wake_except(void* arg, bthread_t excluded_bthread) {
351     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
352 
353     ButexWaiterList bthread_waiters;
354     ButexWaiterList pthread_waiters;
355     {
356         ButexWaiter* excluded_waiter = NULL;
357         BAIDU_SCOPED_LOCK(b->waiter_lock);
358         while (!b->waiters.empty()) {
359             ButexWaiter* bw = b->waiters.head()->value();
360             bw->RemoveFromList();
361 
362             if (bw->tid) {
363                 if (bw->tid != excluded_bthread) {
364                     bthread_waiters.Append(bw);
365                     bw->container.store(NULL, butil::memory_order_relaxed);
366                 } else {
367                     excluded_waiter = bw;
368                 }
369             } else {
370                 bw->container.store(NULL, butil::memory_order_relaxed);
371                 pthread_waiters.Append(bw);
372             }
373         }
374 
375         if (excluded_waiter) {
376             b->waiters.Append(excluded_waiter);//excluded_waiter再次wait
377         }
378     }
380     int nwakeup = 0;
381     while (!pthread_waiters.empty()) {
382         ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
383             pthread_waiters.head()->value());
384         bw->RemoveFromList();//不是很明白這里再次RemoveFromList
385         wakeup_pthread(bw);//喚醒pthread
386         ++nwakeup;
387     }
388 
389     if (bthread_waiters.empty()) {
390         return nwakeup;
391     }
392     ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
393                 bthread_waiters.head()->value());
394 
395     TaskGroup* g = get_task_group(front->control);
396     const int saved_nwakeup = nwakeup;
397     do {
398         // pop reversely
399         ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
400             bthread_waiters.tail()->value());
401         w->RemoveFromList();
402         unsleep_if_necessary(w, get_global_timer_thread());//刪除定時器
403         g->ready_to_run_general(w->tid, true);//喚醒等待調(diào)度
404         ++nwakeup;
405     } while (!bthread_waiters.empty());
406     if (saved_nwakeup != nwakeup) {
407         g->flush_nosignal_tasks_general();
408     }
409     return nwakeup;
410 }

對于創(chuàng)建bthread的兩個bthread_start_urgent/bthread_start_background實(shí)現(xiàn)在后面再分析,其他的一些關(guān)于bthread的在后面的相關(guān)代碼中注釋下侣颂,不是難明白档桃。

以上只是一些基礎(chǔ)代碼分析,有些重要的類聲明并未分析憔晒,比如TaskMeta/TaskControl/TaskGroup等藻肄,包括bthread切入和切出實(shí)現(xiàn)蔑舞,以及一些思考會在后續(xù)分析。

今天去上亨谕停看看攻询,好久沒跑這么遠(yuǎn)了。

參考資料:
bthread.md
threading_overview.md
butex_wait超時無法喚醒
linux線程私有數(shù)據(jù)詳解
深入理解pthread_cond_wait州弟、pthread_cond_signal
bthread-local

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蜕窿,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子呆馁,更是在濱河造成了極大的恐慌桐经,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浙滤,死亡現(xiàn)場離奇詭異阴挣,居然都是意外死亡畔咧,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進(jìn)店門誓沸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來壹粟,“玉大人,你說我怎么就攤上這事洪添。” “怎么了干奢?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵盏袄,是天一觀的道長。 經(jīng)常有香客問我逛尚,道長,這世上最難降的妖魔是什么逛漫? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任黑低,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘克握。我一直安慰自己蕾管,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布菩暗。 她就那樣靜靜地躺著掰曾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪停团。 梳的紋絲不亂的頭發(fā)上旷坦,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天,我揣著相機(jī)與錄音佑稠,去河邊找鬼秒梅。 笑死,一個胖子當(dāng)著我的面吹牛舌胶,可吹牛的內(nèi)容都是我干的捆蜀。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼幔嫂,長吁一口氣:“原來是場噩夢啊……” “哼辆它!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起履恩,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤锰茉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后切心,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體飒筑,經(jīng)...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年昙衅,在試婚紗的時候發(fā)現(xiàn)自己被綠了扬霜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片而涉。...
    茶點(diǎn)故事閱讀 40,015評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡啼县,死狀恐怖季眷,靈堂內(nèi)的尸體忽然破棺而出子刮,到底是詐尸還是另有隱情葵孤,我是刑警寧澤尤仍,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布宰啦,位于F島的核電站赡模,受9級特大地震影響纺裁,放射性物質(zhì)發(fā)生泄漏欺缘。R本人自食惡果不足惜谚殊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望围肥。 院中可真熱鬧穆刻,春花似錦氢伟、人聲如沸朵锣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽外冀。三九已至雪隧,卻和暖如春脑沿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背注服。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工溶弟, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留辜御,地道東北人擒权。 一個月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓碳抄,卻偏偏與公主長得像剖效,于是被迫代替她去往敵國和親裳凸。 傳聞我的和親對象是個殘疾皇子姨谷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評論 2 355

推薦閱讀更多精彩內(nèi)容