不講語言特性,只從工程角度出發(fā)锭环,個(gè)人覺得C++標(biāo)準(zhǔn)委員會(huì)在C++11中對(duì)多線程庫的引入是有史以來做得最人道的一件事鸡挠;今天我將就C++11多線程中的atomic原子操作展開討論呻袭;比較互斥鎖礁芦,自旋鎖(spinlock),無鎖編程的異同得院,并進(jìn)行性能測(cè)試傻铣;最后會(huì)討論一下內(nèi)存序的問題;為了流暢閱讀你最好先熟悉一下C++11 Atomic的基本操作英文文檔祥绞,這里還有一份我覺得做得很用心的關(guān)于C++11并發(fā)編程的中文教程非洲,你也可以從其中找到對(duì)應(yīng)的知識(shí)點(diǎn);
原子操作
我們寫的代碼最終都會(huì)被翻譯為CPU指令蜕径,一條最簡(jiǎn)單加減法語句都有可能會(huì)被翻譯成幾條指令執(zhí)行两踏;為了避免語句在CPU這一層級(jí)上的指令交叉帶來的行為不可知,在多線程程序設(shè)計(jì)時(shí)我們必須通過一些方式來進(jìn)行規(guī)范丧荐;這里面最常見的做法就是引入互斥鎖缆瓣,其大概的模型就是籃球模式:幾個(gè)人一起搶球,誰搶到了誰玩虹统,玩完了再把球丟出來重新?lián)尮耄坏コ怄i是操作系統(tǒng)這一層級(jí)的隧甚,最終映射到CPU上也是一堆指令,是指令就必然會(huì)帶來額外的開銷渡冻;
既然CPU指令是多線程不可再分的最小單元戚扳,那我們?nèi)绻修k法將代碼語句和指令對(duì)應(yīng)起來,不就不需要引入互斥鎖從而提高性能了嗎? 而這個(gè)對(duì)應(yīng)關(guān)系就是所謂的原子操作族吻;在C++11的atomic中有兩種做法:
- 模擬, 比如說對(duì)于一個(gè)atomic<T>類型帽借,我們可以給他附帶一個(gè)mutex,操作時(shí)lock/unlock一下超歌,這種在多線程下進(jìn)行訪問砍艾,必然會(huì)導(dǎo)致線程阻塞;
- 有相應(yīng)的CPU層級(jí)的對(duì)應(yīng)巍举,這就是一個(gè)標(biāo)準(zhǔn)的lock-free類型脆荷;
可以通過is_lock_free函數(shù),判斷一個(gè)atomic是否是lock-free類型
自旋鎖
使用原子操作模擬互斥鎖的行為就是自旋鎖懊悯,互斥鎖狀態(tài)是由操作系統(tǒng)控制的蜓谋,自旋鎖的狀態(tài)是程序員自己控制的;要搞清楚自旋鎖我們首先要搞清楚自旋鎖模型炭分,常用的自旋鎖模型有:
- TAS, <u>Test-and-set</u>桃焕,有且只有atomic_flag類型與之對(duì)應(yīng)
- CAS, <u>Compare-and-swap</u>,對(duì)應(yīng)atomic的compare_exchange_strong 和 compare_exchange_weak捧毛,這兩個(gè)版本的區(qū)別是:Weak版本如果數(shù)據(jù)符合條件被修改观堂,其也可能返回false,就好像不符合修改狀態(tài)一致岖妄;而Strong版本不會(huì)有這個(gè)問題型将,但在某些平臺(tái)上Strong版本比Weak版本慢 [<u>注:在x86平臺(tái)我沒發(fā)現(xiàn)他們之間有任何性能差距</u>]寂祥;絕大多數(shù)情況下荐虐,我們應(yīng)該優(yōu)先選擇使用Strong版本;
我針對(duì)這兩種模型分別實(shí)現(xiàn)了兩個(gè)版本的自旋鎖丸凭,最終代碼可以在性能測(cè)試章節(jié)中找到福扬,這里我們要注意以下問題:
LOCK時(shí)自旋鎖是自己輪詢狀態(tài),如果不引入中斷機(jī)制惜犀,會(huì)有大量計(jì)算資源浪費(fèi)到輪詢本身上铛碑;常用的做法是使用yield切換到其他線程執(zhí)行,或直接使用sleep暫停當(dāng)前線程.
無鎖編程
如果看了CAS實(shí)現(xiàn)的自旋鎖代碼會(huì)發(fā)現(xiàn)其有些別扭:每次都需要去重置exp的狀態(tài)為false虽界;CAS雖然也能實(shí)現(xiàn)自旋鎖汽烦,但通常被我們用來進(jìn)行無鎖編程;
什么是無鎖編程呢莉御,讓我們以一個(gè)例子開始:
template<typename _Ty> struct LockFreeStackT { struct Node { _Ty val; Node* next; }; LockFreeStackT() : head_(nullptr) {} void push(const _Ty& val) { Node* node = new Node{ val, head_.load() }; while (!head_.compare_exchange_strong(node->next, node)) { } } void pop() { Node* node = head_.load(); while (node && !head_.compare_exchange_strong(node, node->next)) { } if (node) delete node; } std::atomic<Node*> head_; };
整個(gè)邏輯很簡(jiǎn)單撇吞,pop只是push的逆過程俗冻,這里我們僅僅只分析一下push:每次push時(shí)去獲得棧頂元素,生成一個(gè)指向棧頂?shù)男略仉咕保缓笫褂肅AS操作去更新棧頂迄薄,這里可能有兩種情況:
- 如果新元素的next和棧頂一樣,證明在你之前沒人操作它煮岁,使用新元素替換棧頂退出即可讥蔽;
- 如果不一樣,證明在你之前已經(jīng)有人操作它画机,棧頂已發(fā)生改變冶伞,該函數(shù)會(huì)自動(dòng)更新新元素的next值為改變后的棧頂;然后繼續(xù)循環(huán)檢測(cè)直到狀態(tài)1成立退出步氏;
不難看出碰缔,其實(shí)所謂無鎖編程只是將多條指令合并成了一條指令形成一個(gè)邏輯完備的最小單元,通過兼容CPU指令執(zhí)行邏輯形成的一種多線程編程模型戳护;結(jié)束了嗎金抡,再等等,使用上面的代碼腌且,有很大的幾率出delete不存在的內(nèi)存梗肝,或內(nèi)存被多次delete的錯(cuò)誤,讓我們進(jìn)入下一節(jié).
ABA問題
維基百科: ABA problem铺董,如果有兩個(gè)線程[1&2]操作上面的堆棧巫击,初始狀態(tài)有2個(gè)元素: top->A->B,線程1執(zhí)行pop操作精续,在CAS前進(jìn)行線程切換:
注意pop函數(shù)中的head_.compare_exchange_strong(node, node->next)語句坝锰,如果對(duì)C/C++不夠熟悉,很容易發(fā)生誤解重付;我們不考慮函數(shù)包裝等復(fù)雜情況顷级,只考慮最簡(jiǎn)單的情況下在調(diào)用CAS原子操作前至少還有參數(shù)壓棧操作,也就是說node->next不是調(diào)用時(shí)確定的确垫,而是在參數(shù)壓棧時(shí)確定的弓颈;前面說的CAS操作前進(jìn)行線程切換,切換時(shí){node, node->next}對(duì)應(yīng)的是{A, B}.
然后線程2執(zhí)行pop操作删掀,將A翔冀,B都刪掉,然后創(chuàng)建了一個(gè)新元素push進(jìn)去披泪,因?yàn)椴僮飨到y(tǒng)的內(nèi)存分配機(jī)制會(huì)重復(fù)使用之前釋放的內(nèi)存纤子,恰好push進(jìn)去的內(nèi)存地址和A一樣,我們記為A',這時(shí)候切換到線程1控硼,CAS操作檢查到A沒變化成功將B設(shè)為棧頂跌捆,但B是一個(gè)已經(jīng)被釋放的內(nèi)存塊...
解決ABA問題的辦法無非就是通過打標(biāo)簽標(biāo)識(shí)A和A'為不同的指針,這下總結(jié)束了吧象颖,事實(shí)上還沒有佩厚,再次進(jìn)入下一節(jié).
內(nèi)存回收
還是先看上面的代碼,在Pop時(shí)先獲得了頭指針Node* node = head_.load();说订,如果這時(shí)候發(fā)生線程切換并且這個(gè)節(jié)點(diǎn)被另一個(gè)線程刪除抄瓦,當(dāng)線程切回時(shí)候node無效造成node->next訪問異常...,對(duì)于這個(gè)問題陶冷,現(xiàn)在流行的處理方式主要有:
- Lock-Free Reference Counting: 引用計(jì)數(shù)方式钙姊,嚴(yán)格的說應(yīng)該是全局計(jì)數(shù)方式;每次POP時(shí)首先增加計(jì)數(shù)埂伦,然后處理完后做檢測(cè)煞额,計(jì)數(shù)如果大于1證明其他線程也在操作况增,把節(jié)點(diǎn)存起來悯许;只要檢測(cè)到計(jì)數(shù)等于1权悟,證明目前只有自己操作流纹,可以刪除當(dāng)前節(jié)點(diǎn)和之前緩存的節(jié)點(diǎn),刪除之前節(jié)點(diǎn)時(shí)必須進(jìn)行二次判斷來解決交換后报辱,其他線程Acquire的問題歹篓;具體可以參見Paper: Lock-Free Reference Counting, Efficient and Reliable Lock-Free Memory Reclamation Based on Reference Counting
- Hazard pointer: 使用POP的線程儡湾,都有一個(gè)自己可以寫的thread_local變量媳否,每次得到的舊指針都存入其中栅螟,當(dāng)交換完成后將該變量置為0;對(duì)于其他線程來說每次刪除前對(duì)整個(gè)Hazard pointer表遍歷一遍篱竭,如果發(fā)現(xiàn)其他線程也在訪問力图,就把要?jiǎng)h除的節(jié)點(diǎn)放入將刪列表中,否則可以直接刪除掺逼;定時(shí)對(duì)將刪列表中的節(jié)點(diǎn)按照之前的規(guī)則和整個(gè)Hazard pointer表做比較吃媒,只要Hazard pointer表不持有這個(gè)節(jié)點(diǎn)就可刪除;可參考維基百科: Hazard pointer
- Epoch Based Reclamation
- Quiescent State Based Reclamation
我實(shí)現(xiàn)了前兩種方式坪圾,后兩種方式如果以后有時(shí)間再補(bǔ)上晓折;
性能測(cè)試
本來想挖個(gè)坑惑朦,在Github上重新建一個(gè)項(xiàng)目的兽泄;但工作太忙,給自己挖的坑也太多了漾月,挖了估計(jì)也僅僅只是挖了病梢,沒精力去維護(hù);好在代碼是完整的,邏輯也相對(duì)比較簡(jiǎn)單蜓陌,將以下代碼拷到一個(gè)CPP文件中編譯一下就能運(yùn)行觅彰;測(cè)試上在VS2015中基本全覆蓋的單元測(cè)試跑過,macOS上也經(jīng)過了大概8個(gè)小時(shí)的壓力測(cè)試钮热,應(yīng)該沒太大問題填抬,如果有問題請(qǐng)與我聯(lián)系,先貼代碼:
#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1 // VS2015 SP2 BUG #include <iostream> #include <thread> #include <mutex> #include <atomic> #include <chrono> #include <cassert> template<typename _Ty> struct NodeT { std::unique_ptr<_Ty> data; NodeT* next; NodeT(const _Ty& _val, NodeT* _next) : data(new _Ty(_val)) , next(_next) { } }; template<size_t _SleepWhenAcquireFailedInMicroSeconds = size_t(-1)> class SpinLockByTasT { std::atomic_flag locked_flag_ = ATOMIC_FLAG_INIT; public: void lock() { while (locked_flag_.test_and_set()) { if (_SleepWhenAcquireFailedInMicroSeconds == size_t(-1)) { std::this_thread::yield(); } else if (_SleepWhenAcquireFailedInMicroSeconds != 0) { std::this_thread::sleep_for(std::chrono::microseconds(_SleepWhenAcquireFailedInMicroSeconds)); } } } void unlock() { locked_flag_.clear(); } }; template<size_t _SleepWhenAcquireFailedInMicroSeconds = size_t(-1)> class SpinLockByCasT { std::atomic<bool> locked_flag_ = ATOMIC_VAR_INIT(false); public: void lock() { bool exp = false; while (!locked_flag_.compare_exchange_strong(exp, true)) { exp = false; if (_SleepWhenAcquireFailedInMicroSeconds == size_t(-1)) { std::this_thread::yield(); } else if (_SleepWhenAcquireFailedInMicroSeconds != 0) { std::this_thread::sleep_for(std::chrono::microseconds(_SleepWhenAcquireFailedInMicroSeconds)); } } } void unlock() { locked_flag_.store(false); } }; template<typename _Ty, typename _Lock> class LockedStackT { typedef NodeT<_Ty> Node; public: ~LockedStackT() { std::lock_guard<_Lock> lock(lock_); while (head_) { Node* node = head_; head_ = node->next; delete node; } } void Push(const _Ty& val) { Node* node(new Node(val, nullptr)); // 不需要鎖構(gòu)造函數(shù)隧期,這個(gè)可能是一個(gè)耗時(shí)操作 std::lock_guard<_Lock> lock(lock_); node->next = head_; head_ = node; } std::unique_ptr<_Ty> Pop() { std::unique_ptr<_Ty> ret; Node* node; { // 同上飒责,只需要鎖鏈表本身,其他操作可以放到鏈表外執(zhí)行 std::lock_guard<_Lock> lock(lock_); node = head_; if (node) head_ = node->next; } if (node) { ret.swap(node->data); delete node; } return std::move(ret); } private: Node* head_ = nullptr; _Lock lock_; }; template<typename _Node> class MemoryReclamationByReferenceCountingT { std::atomic<size_t> counter_ = ATOMIC_VAR_INIT(0); std::atomic<_Node*> will_be_deleted_list_ = ATOMIC_VAR_INIT(nullptr); void InsertToList(_Node* first, _Node* last) { last->next = will_be_deleted_list_; while (!will_be_deleted_list_.compare_exchange_strong(last->next, first)); } void InsertToList(_Node* head) { _Node* last = head; while (_Node* next = last->next) last = next; InsertToList(head, last); } public: ~MemoryReclamationByReferenceCountingT() { // 如果線程正常退出仆潮,Reference Counting算法能刪除所有數(shù)據(jù) assert(will_be_deleted_list_.load() == nullptr); assert(counter_.load() == 0); _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr); while (to_delete_list) { _Node* node = to_delete_list; to_delete_list = node->next; delete node; } } inline void Addref() { ++counter_; } inline bool Store(_Node* node) { return true; } bool Release(_Node* node) { if (!node) return false; if (counter_ == 1) { _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr); if (!--counter_) { while (to_delete_list) { _Node* node = to_delete_list; to_delete_list = node->next; delete node; } } else if (to_delete_list) { InsertToList(to_delete_list); } delete node; } else { if (node) InsertToList(node, node); --counter_; } return true; } }; template<class _Node, size_t _MaxPopThreadCount = 16> class MemoryReclamationByHazardPointerT { struct HazardPointer { std::atomic<std::thread::id> id; std::atomic<_Node*> ptr = ATOMIC_VAR_INIT(nullptr); }; HazardPointer hps_[_MaxPopThreadCount]; std::atomic<_Node*> will_be_deleted_list_ = ATOMIC_VAR_INIT(nullptr); void _ReleaseImpl(_Node* node) { // 檢查HazardPointers中是否有線程正在訪問當(dāng)前指針 size_t i = 0; while (i < _MaxPopThreadCount) { if (hps_[i].ptr.load() == node) break; ++i; } if (i == _MaxPopThreadCount) { // 無任何線程正在訪問當(dāng)前指針宏蛉,直接刪除 delete node; } else { // 有線程正在訪問,加入緩存列表 node->next = will_be_deleted_list_.load(); while (!will_be_deleted_list_.compare_exchange_strong(node->next, node)); } } public: ~MemoryReclamationByHazardPointerT() { // 自己不能刪除自己性置,正常退出HazardPointer始終會(huì)持有一個(gè)節(jié)點(diǎn)拾并,只能在此做清理 size_t count = 0; _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr); while (to_delete_list) { _Node* node = to_delete_list; to_delete_list = node->next; delete node; ++count; } assert(count < 2); } inline void Addref() {} bool Store(_Node* node) { struct HazardPointerOwner { HazardPointer* hp; HazardPointerOwner(HazardPointer* hps) : hp(nullptr) { for (size_t i = 0; i < _MaxPopThreadCount; ++i) { std::thread::id id; if (hps[i].id.compare_exchange_strong(id, std::this_thread::get_id())) { hp = &hps[i]; break; } } } ~HazardPointerOwner() { if (hp) { hp->ptr.store(nullptr); hp->id.store(std::thread::id()); } } }; thread_local HazardPointerOwner owner(hps_); if (!node || !owner.hp) return false; owner.hp->ptr.store(node); return true; } bool Release(_Node* node) { if (!node) return false; _ReleaseImpl(node); // 對(duì)當(dāng)前傳入指針進(jìn)行釋放操作 // 循環(huán)檢測(cè)will_be_deleted_list_, 可以另開一個(gè)線程定時(shí)檢測(cè)效率會(huì)更高 _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr); while (to_delete_list) { _Node* node = to_delete_list; to_delete_list = node->next; _ReleaseImpl(node); } return true; } }; template<typename _Ty, typename _MemoryReclamation> class LockFreeStackT { typedef NodeT<_Ty> Node; struct TaggedPointer { Node* ptr; size_t tag; TaggedPointer() {} TaggedPointer(Node* _ptr, size_t _tag) : ptr(_ptr) , tag(_tag) { } }; public: ~LockFreeStackT() { TaggedPointer o(nullptr, 0); head_.exchange(o); Node* head = o.ptr; while (head) { Node* node = head; head = node->next; delete node; } } void Push(const _Ty& val) { TaggedPointer o = head_.load(); TaggedPointer n(new Node(val, o.ptr), o.tag + 1); while (!head_.compare_exchange_strong(o, n)) { n.ptr->next = o.ptr; n.tag = o.tag + 1; } } std::unique_ptr<_Ty> Pop() { memory_reclamation_.Addref(); TaggedPointer o = head_.load(), n; while (true) { if (!o.ptr) break; memory_reclamation_.Store(o.ptr); // HazardPointer算法儲(chǔ)存(相當(dāng)于上鎖)后,需要對(duì)有效值進(jìn)行二次確認(rèn)鹏浅,否則還是有先刪除的問題 // 這樣做并沒效率問題嗅义,不等的情況CAS操作也會(huì)進(jìn)行循環(huán),因此可以作為針對(duì)任何內(nèi)存回收算法的固定寫法 const TaggedPointer t = head_.load(); if (memcmp(&t, &o, sizeof(TaggedPointer))) { o = t; continue; } n.ptr = o.ptr->next; n.tag = o.tag + 1; if (head_.compare_exchange_strong(o, n)) break; } memory_reclamation_.Store(nullptr); std::unique_ptr<_Ty> ret; if (o.ptr) { ret.swap(o.ptr->data); memory_reclamation_.Release(o.ptr); } return std::move(ret); } private: std::atomic<TaggedPointer> head_ = ATOMIC_VAR_INIT(TaggedPointer(nullptr, 0)); _MemoryReclamation memory_reclamation_; }; template<typename _Ty, int _ThreadCount = 16, int _LoopCount = 100000> struct LockFreePerformanceTestT { template<class _ProcessUnit> static double Run(_ProcessUnit puf) { std::thread ths[_ThreadCount]; auto st = std::chrono::high_resolution_clock::now(); for (int i = 0; i < _ThreadCount; ++i) ths[i] = std::thread([&puf]() { for (int i = 0; i < _LoopCount; ++i) { puf(); } }); for (int i = 0; i < _ThreadCount; ++i) ths[i].join(); const double period_in_ms = static_cast<double>((std::chrono::high_resolution_clock::now() - st).count()) / std::chrono::high_resolution_clock::period::den * 1000; return period_in_ms; } static void Run() { _Ty s; std::cout << Run([&s]() {s.Push(0); }) << "\t\t"; std::cout << Run([&s]() { s.Pop(); }) << std::endl; } }; int main() { std::cout << "LockedStack with std::mutex" << "\t\t\t\t\t"; LockFreePerformanceTestT<LockedStackT<uint32_t, std::mutex>>::Run(); std::cout << "LockedStack with SpinLockByTas yield" << "\t\t\t\t"; LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByTasT<>>>::Run(); std::cout << "LockedStack with SpinLockByCas yield" << "\t\t\t\t"; LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByCasT<>>>::Run(); std::cout << "LockedStack with SpinLockByTas usleep(5)" << "\t\t\t"; LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByTasT<5>>>::Run(); std::cout << "LockedStack with SpinLockByCas usleep(5)" << "\t\t\t"; LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByCasT<5>>>::Run(); std::cout << "LockFreeStack with MemoryReclamationByReferenceCounting" << "\t\t"; LockFreePerformanceTestT<LockFreeStackT<uint32_t, MemoryReclamationByReferenceCountingT<NodeT<uint32_t>>>>::Run(); std::cout << "LockFreeStack with MemoryReclamationByHazardPointer" << "\t\t"; LockFreePerformanceTestT<LockFreeStackT<uint32_t, MemoryReclamationByHazardPointerT<NodeT<uint32_t>>>>::Run(); return 0; }
測(cè)試結(jié)果如下:
注:在Windows x64下隐砸,atomic<size=16>不是lockfree芥喇,如果有需要要可以用InterlockedCompareExchange128自己實(shí)現(xiàn)一下;
內(nèi)存模型
C++11原子操作的很多函數(shù)都有個(gè)std::memory_order參數(shù)凰萨,這個(gè)參數(shù)就是這里所說的內(nèi)存模型继控,其并不是類似POD的內(nèi)存布局,而是一種數(shù)據(jù)同步模型胖眷,準(zhǔn)確說法應(yīng)該是儲(chǔ)存一致性模型武通,其作用是對(duì)同一時(shí)間的讀寫操作進(jìn)行排序;C++11中一個(gè)定義了6種類型珊搀,我們可以將其分為4類冶忱,下面我從我的角度以普通程序員能理解的語言描述一下,具體的可以參見 C++11 Memory Order:
- memory_order_relaxed: 很多文檔都說這種模型是完全亂序的境析,但我理解同一線程內(nèi)囚枪,基本上應(yīng)該還是按照代碼順序執(zhí)行的;
- memory_order_release & memory_order_acquire: 兩個(gè)線程A&B劳淆,A線程Release后链沼,B線程Acquire能保證一定讀到的是最新被修改過的值;這種模型更強(qiáng)大的地方在于它能保證發(fā)生在A-Release前的所有寫操作沛鸵,在B-Acquire后都能讀到最新值括勺;
- memory_order_release & memory_order_consume: 上一個(gè)模型的同步是針對(duì)所有對(duì)象的缆八,這種模型只針對(duì)依賴于該操作涉及的對(duì)象:比如這個(gè)操作發(fā)生在變量a上,而s = a + b; 那s依賴于a疾捍,但b不依賴于a; 當(dāng)然這里也有循環(huán)依賴的問題奈辰,例如:t = s + 1,因?yàn)閟依賴于a乱豆,那t其實(shí)也是依賴于a的奖恰;
- memory_order_seq_cst: 順序一致性模型,這是C++11原子操作的默認(rèn)模型宛裕;大概行為為對(duì)每一個(gè)變量都進(jìn)行2中所說的Release-Acquire操作房官,當(dāng)然這也是一個(gè)最慢的同步模型;
說到內(nèi)存模型续滋,就不得不提一下經(jīng)常被大家誤用的 volatile 關(guān)鍵字翰守,這個(gè)關(guān)鍵字僅僅保證:數(shù)據(jù)只在內(nèi)存中讀寫,直接操作它既不能保證操作是atomic的疲酌,也不能保證Memory Order蜡峰;其實(shí)在我理解中,這個(gè)應(yīng)該是嵌入式朗恳,內(nèi)核或驅(qū)動(dòng)程序員專用關(guān)鍵字:)湿颅,當(dāng)然如果在競(jìng)爭(zhēng)不敏感的環(huán)境中用來做flag用一下也沒太大問題.
最后要說一下x86體系中Release-Acquire是自動(dòng)獲取的,最終形成一個(gè)memory_order_seq_cst模型粥诫;因此絕大多數(shù)情況下memory_order_relaxed其實(shí)并沒有什么用.
結(jié)語
無鎖編程真的很難油航,如果要完全寫對(duì)那就成了變態(tài)難,出錯(cuò)了平時(shí)常見的調(diào)試手段根本沒用怀浆,幾乎全靠腦補(bǔ)谊囚;并且這種輪詢方式相對(duì)于鎖的中斷掛起方式來講,只有在超高并發(fā)的前提下才能達(dá)到一個(gè)理想的效果执赡,低并發(fā)下空載會(huì)對(duì)系統(tǒng)資源造成極大的浪費(fèi)镰踏,因此原則上我不推崇這個(gè)玩意;從測(cè)試結(jié)果來看沙合,所有平臺(tái)上自旋鎖性能都非常接近無鎖實(shí)現(xiàn)奠伪,并且其使用方式和互斥鎖幾乎沒差別,因此在沒啃透之前首懈,使用鎖的方式才是明智的選擇.
參考資料
C++ 并發(fā)編程指南
Yet another implementation of a lock-free circular array queue
Common Pitfalls in Writing Lock-Free Algorithms
Writing Lock-Free Code: A Corrected Queue
維基百科: ABA problem
Lock-Free Reference Counting
Efficient and Reliable Lock-Free Memory Reclamation Based on Reference Counting
維基百科: Hazard pointer
C++11 Memory Order
Nine ways to break your systems code using volatile
原始鏈接 版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 4.0