C++11原子操作與無鎖編程

不講語言特性,只從工程角度出發(fā)锭环,個(gè)人覺得C++標(biāo)準(zhǔn)委員會(huì)在C++11中對(duì)多線程庫的引入是有史以來做得最人道的一件事鸡挠;今天我將就C++11多線程中的atomic原子操作展開討論呻袭;比較互斥鎖礁芦,自旋鎖(spinlock),無鎖編程的異同得院,并進(jìn)行性能測(cè)試傻铣;最后會(huì)討論一下內(nèi)存序的問題;為了流暢閱讀你最好先熟悉一下C++11 Atomic的基本操作英文文檔祥绞,這里還有一份我覺得做得很用心的關(guān)于C++11并發(fā)編程的中文教程非洲,你也可以從其中找到對(duì)應(yīng)的知識(shí)點(diǎn);

原子操作

我們寫的代碼最終都會(huì)被翻譯為CPU指令蜕径,一條最簡(jiǎn)單加減法語句都有可能會(huì)被翻譯成幾條指令執(zhí)行两踏;為了避免語句在CPU這一層級(jí)上的指令交叉帶來的行為不可知,在多線程程序設(shè)計(jì)時(shí)我們必須通過一些方式來進(jìn)行規(guī)范丧荐;這里面最常見的做法就是引入互斥鎖缆瓣,其大概的模型就是籃球模式:幾個(gè)人一起搶球,誰搶到了誰玩虹统,玩完了再把球丟出來重新?lián)尮耄坏コ怄i是操作系統(tǒng)這一層級(jí)的隧甚,最終映射到CPU上也是一堆指令,是指令就必然會(huì)帶來額外的開銷渡冻;

既然CPU指令是多線程不可再分的最小單元戚扳,那我們?nèi)绻修k法將代碼語句和指令對(duì)應(yīng)起來,不就不需要引入互斥鎖從而提高性能了嗎? 而這個(gè)對(duì)應(yīng)關(guān)系就是所謂的原子操作族吻;在C++11的atomic中有兩種做法:

  • 模擬, 比如說對(duì)于一個(gè)atomic<T>類型帽借,我們可以給他附帶一個(gè)mutex,操作時(shí)lock/unlock一下超歌,這種在多線程下進(jìn)行訪問砍艾,必然會(huì)導(dǎo)致線程阻塞;
  • 有相應(yīng)的CPU層級(jí)的對(duì)應(yīng)巍举,這就是一個(gè)標(biāo)準(zhǔn)的lock-free類型脆荷;

可以通過is_lock_free函數(shù),判斷一個(gè)atomic是否是lock-free類型

自旋鎖

使用原子操作模擬互斥鎖的行為就是自旋鎖懊悯,互斥鎖狀態(tài)是由操作系統(tǒng)控制的蜓谋,自旋鎖的狀態(tài)是程序員自己控制的;要搞清楚自旋鎖我們首先要搞清楚自旋鎖模型炭分,常用的自旋鎖模型有:

  1. TAS, <u>Test-and-set</u>桃焕,有且只有atomic_flag類型與之對(duì)應(yīng)
  2. CAS, <u>Compare-and-swap</u>,對(duì)應(yīng)atomic的compare_exchange_strong 和 compare_exchange_weak捧毛,這兩個(gè)版本的區(qū)別是:Weak版本如果數(shù)據(jù)符合條件被修改观堂,其也可能返回false,就好像不符合修改狀態(tài)一致岖妄;而Strong版本不會(huì)有這個(gè)問題型将,但在某些平臺(tái)上Strong版本比Weak版本慢 [<u>注:在x86平臺(tái)我沒發(fā)現(xiàn)他們之間有任何性能差距</u>]寂祥;絕大多數(shù)情況下荐虐,我們應(yīng)該優(yōu)先選擇使用Strong版本;

我針對(duì)這兩種模型分別實(shí)現(xiàn)了兩個(gè)版本的自旋鎖丸凭,最終代碼可以在性能測(cè)試章節(jié)中找到福扬,這里我們要注意以下問題:

LOCK時(shí)自旋鎖是自己輪詢狀態(tài),如果不引入中斷機(jī)制惜犀,會(huì)有大量計(jì)算資源浪費(fèi)到輪詢本身上铛碑;常用的做法是使用yield切換到其他線程執(zhí)行,或直接使用sleep暫停當(dāng)前線程.

無鎖編程

如果看了CAS實(shí)現(xiàn)的自旋鎖代碼會(huì)發(fā)現(xiàn)其有些別扭:每次都需要去重置exp的狀態(tài)為false虽界;CAS雖然也能實(shí)現(xiàn)自旋鎖汽烦,但通常被我們用來進(jìn)行無鎖編程;
什么是無鎖編程呢莉御,讓我們以一個(gè)例子開始:

template<typename _Ty>
struct LockFreeStackT
{
  struct Node
  {
    _Ty val;
    Node* next;
  };
  LockFreeStackT() : head_(nullptr) {}
  void push(const _Ty& val)
  {
    Node* node = new Node{ val, head_.load() };
    while (!head_.compare_exchange_strong(node->next, node)) {
    }
  }
  void pop()
  {
    Node* node = head_.load();
    while (node && !head_.compare_exchange_strong(node, node->next)) {
    }
    if (node) delete node;
  }
  std::atomic<Node*> head_;
};

整個(gè)邏輯很簡(jiǎn)單撇吞,pop只是push的逆過程俗冻,這里我們僅僅只分析一下push:每次push時(shí)去獲得棧頂元素,生成一個(gè)指向棧頂?shù)男略仉咕保缓笫褂肅AS操作去更新棧頂迄薄,這里可能有兩種情況:

  1. 如果新元素的next和棧頂一樣,證明在你之前沒人操作它煮岁,使用新元素替換棧頂退出即可讥蔽;
  2. 如果不一樣,證明在你之前已經(jīng)有人操作它画机,棧頂已發(fā)生改變冶伞,該函數(shù)會(huì)自動(dòng)更新新元素的next值為改變后的棧頂;然后繼續(xù)循環(huán)檢測(cè)直到狀態(tài)1成立退出步氏;

不難看出碰缔,其實(shí)所謂無鎖編程只是將多條指令合并成了一條指令形成一個(gè)邏輯完備的最小單元,通過兼容CPU指令執(zhí)行邏輯形成的一種多線程編程模型戳护;結(jié)束了嗎金抡,再等等,使用上面的代碼腌且,有很大的幾率出delete不存在的內(nèi)存梗肝,或內(nèi)存被多次delete的錯(cuò)誤,讓我們進(jìn)入下一節(jié).

ABA問題

維基百科: ABA problem铺董,如果有兩個(gè)線程[1&2]操作上面的堆棧巫击,初始狀態(tài)有2個(gè)元素: top->A->B,線程1執(zhí)行pop操作精续,在CAS前進(jìn)行線程切換:

注意pop函數(shù)中的head_.compare_exchange_strong(node, node->next)語句坝锰,如果對(duì)C/C++不夠熟悉,很容易發(fā)生誤解重付;我們不考慮函數(shù)包裝等復(fù)雜情況顷级,只考慮最簡(jiǎn)單的情況下在調(diào)用CAS原子操作前至少還有參數(shù)壓棧操作,也就是說node->next不是調(diào)用時(shí)確定的确垫,而是在參數(shù)壓棧時(shí)確定的弓颈;前面說的CAS操作前進(jìn)行線程切換,切換時(shí){node, node->next}對(duì)應(yīng)的是{A, B}.

然后線程2執(zhí)行pop操作删掀,將A翔冀,B都刪掉,然后創(chuàng)建了一個(gè)新元素push進(jìn)去披泪,因?yàn)椴僮飨到y(tǒng)的內(nèi)存分配機(jī)制會(huì)重復(fù)使用之前釋放的內(nèi)存纤子,恰好push進(jìn)去的內(nèi)存地址和A一樣,我們記為A',這時(shí)候切換到線程1控硼,CAS操作檢查到A沒變化成功將B設(shè)為棧頂跌捆,但B是一個(gè)已經(jīng)被釋放的內(nèi)存塊...

解決ABA問題的辦法無非就是通過打標(biāo)簽標(biāo)識(shí)A和A'為不同的指針,這下總結(jié)束了吧象颖,事實(shí)上還沒有佩厚,再次進(jìn)入下一節(jié).

內(nèi)存回收

還是先看上面的代碼,在Pop時(shí)先獲得了頭指針Node* node = head_.load();说订,如果這時(shí)候發(fā)生線程切換并且這個(gè)節(jié)點(diǎn)被另一個(gè)線程刪除抄瓦,當(dāng)線程切回時(shí)候node無效造成node->next訪問異常...,對(duì)于這個(gè)問題陶冷,現(xiàn)在流行的處理方式主要有:

  1. Lock-Free Reference Counting: 引用計(jì)數(shù)方式钙姊,嚴(yán)格的說應(yīng)該是全局計(jì)數(shù)方式;每次POP時(shí)首先增加計(jì)數(shù)埂伦,然后處理完后做檢測(cè)煞额,計(jì)數(shù)如果大于1證明其他線程也在操作况增,把節(jié)點(diǎn)存起來悯许;只要檢測(cè)到計(jì)數(shù)等于1权悟,證明目前只有自己操作流纹,可以刪除當(dāng)前節(jié)點(diǎn)和之前緩存的節(jié)點(diǎn),刪除之前節(jié)點(diǎn)時(shí)必須進(jìn)行二次判斷來解決交換后报辱,其他線程Acquire的問題歹篓;具體可以參見Paper: Lock-Free Reference Counting, Efficient and Reliable Lock-Free Memory Reclamation Based on Reference Counting
  2. Hazard pointer: 使用POP的線程儡湾,都有一個(gè)自己可以寫的thread_local變量媳否,每次得到的舊指針都存入其中栅螟,當(dāng)交換完成后將該變量置為0;對(duì)于其他線程來說每次刪除前對(duì)整個(gè)Hazard pointer表遍歷一遍篱竭,如果發(fā)現(xiàn)其他線程也在訪問力图,就把要?jiǎng)h除的節(jié)點(diǎn)放入將刪列表中,否則可以直接刪除掺逼;定時(shí)對(duì)將刪列表中的節(jié)點(diǎn)按照之前的規(guī)則和整個(gè)Hazard pointer表做比較吃媒,只要Hazard pointer表不持有這個(gè)節(jié)點(diǎn)就可刪除;可參考維基百科: Hazard pointer
  3. Epoch Based Reclamation
  4. Quiescent State Based Reclamation

我實(shí)現(xiàn)了前兩種方式坪圾,后兩種方式如果以后有時(shí)間再補(bǔ)上晓折;

性能測(cè)試

本來想挖個(gè)坑惑朦,在Github上重新建一個(gè)項(xiàng)目的兽泄;但工作太忙,給自己挖的坑也太多了漾月,挖了估計(jì)也僅僅只是挖了病梢,沒精力去維護(hù);好在代碼是完整的,邏輯也相對(duì)比較簡(jiǎn)單蜓陌,將以下代碼拷到一個(gè)CPP文件中編譯一下就能運(yùn)行觅彰;測(cè)試上在VS2015中基本全覆蓋的單元測(cè)試跑過,macOS上也經(jīng)過了大概8個(gè)小時(shí)的壓力測(cè)試钮热,應(yīng)該沒太大問題填抬,如果有問題請(qǐng)與我聯(lián)系,先貼代碼:

#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1    // VS2015 SP2 BUG
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <cassert>
template<typename _Ty>
struct NodeT
{
  std::unique_ptr<_Ty> data;
  NodeT* next;
  NodeT(const _Ty& _val, NodeT* _next)
    : data(new _Ty(_val))
    , next(_next)
  {
  }
};
template<size_t _SleepWhenAcquireFailedInMicroSeconds = size_t(-1)>
class SpinLockByTasT
{
  std::atomic_flag locked_flag_ = ATOMIC_FLAG_INIT;
public:
  void lock()
  {
    while (locked_flag_.test_and_set()) {
      if (_SleepWhenAcquireFailedInMicroSeconds == size_t(-1)) {
        std::this_thread::yield();
      } else if (_SleepWhenAcquireFailedInMicroSeconds != 0) {
        std::this_thread::sleep_for(std::chrono::microseconds(_SleepWhenAcquireFailedInMicroSeconds));
      }
    }
  }
  void unlock()
  {
    locked_flag_.clear();
  }
};
template<size_t _SleepWhenAcquireFailedInMicroSeconds = size_t(-1)>
class SpinLockByCasT
{
  std::atomic<bool> locked_flag_ = ATOMIC_VAR_INIT(false);
public:
  void lock()
  {
    bool exp = false;
    while (!locked_flag_.compare_exchange_strong(exp, true)) {
      exp = false;
      if (_SleepWhenAcquireFailedInMicroSeconds == size_t(-1)) {
        std::this_thread::yield();
      } else if (_SleepWhenAcquireFailedInMicroSeconds != 0) {
        std::this_thread::sleep_for(std::chrono::microseconds(_SleepWhenAcquireFailedInMicroSeconds));
      }
    }
  }
  void unlock()
  {
    locked_flag_.store(false);
  }
};
template<typename _Ty, typename _Lock>
class LockedStackT
{
  typedef NodeT<_Ty> Node;
public:
  ~LockedStackT()
  {
    std::lock_guard<_Lock> lock(lock_);
    while (head_) {
      Node* node = head_;
      head_ = node->next;
      delete node;
    }
  }
  void Push(const _Ty& val)
  {
    Node* node(new Node(val, nullptr)); // 不需要鎖構(gòu)造函數(shù)隧期,這個(gè)可能是一個(gè)耗時(shí)操作
    std::lock_guard<_Lock> lock(lock_);
    node->next = head_;
    head_ = node;
  }
  std::unique_ptr<_Ty> Pop()
  {
    std::unique_ptr<_Ty> ret;
    Node* node;
    {
      // 同上飒责,只需要鎖鏈表本身,其他操作可以放到鏈表外執(zhí)行
      std::lock_guard<_Lock> lock(lock_);
      node = head_;
      if (node) head_ = node->next;
    }
    if (node) {
      ret.swap(node->data);
      delete node;
    }
    return std::move(ret);
  }
 
private:
  Node* head_ = nullptr;
  _Lock lock_;
};
template<typename _Node>
class MemoryReclamationByReferenceCountingT
{
  std::atomic<size_t> counter_ = ATOMIC_VAR_INIT(0);
  std::atomic<_Node*> will_be_deleted_list_ = ATOMIC_VAR_INIT(nullptr);
  void InsertToList(_Node* first, _Node* last)
  {
    last->next = will_be_deleted_list_;
    while (!will_be_deleted_list_.compare_exchange_strong(last->next, first));
  }
  void InsertToList(_Node* head)
  {
    _Node* last = head;
    while (_Node* next = last->next) last = next;
    InsertToList(head, last);
  }
public:
  ~MemoryReclamationByReferenceCountingT()
  {
    // 如果線程正常退出仆潮,Reference Counting算法能刪除所有數(shù)據(jù)
    assert(will_be_deleted_list_.load() == nullptr);
    assert(counter_.load() == 0);
    _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr);
    while (to_delete_list) {
      _Node* node = to_delete_list;
      to_delete_list = node->next;
      delete node;
    }
  }
  inline void Addref()
  {
    ++counter_;
  }
  inline bool Store(_Node* node) { return true; }
  bool Release(_Node* node)
  {
    if (!node) return false;
    if (counter_ == 1)
    {
      _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr);
      if (!--counter_) {
        while (to_delete_list) {
          _Node* node = to_delete_list;
          to_delete_list = node->next;
          delete node;
        }
      } else if (to_delete_list) {
        InsertToList(to_delete_list);
      }
      delete node;
    } else {
      if (node) InsertToList(node, node);
      --counter_;
    }
    return true;
  }
};
template<class _Node, size_t _MaxPopThreadCount = 16>
class MemoryReclamationByHazardPointerT
{
  struct HazardPointer
  {
    std::atomic<std::thread::id> id;
    std::atomic<_Node*> ptr = ATOMIC_VAR_INIT(nullptr);
  };
  HazardPointer hps_[_MaxPopThreadCount];
  std::atomic<_Node*> will_be_deleted_list_ = ATOMIC_VAR_INIT(nullptr);
  void _ReleaseImpl(_Node* node)
  {
    // 檢查HazardPointers中是否有線程正在訪問當(dāng)前指針
    size_t i = 0;
    while (i < _MaxPopThreadCount) {
      if (hps_[i].ptr.load() == node)
        break;
      ++i;
    }
    if (i == _MaxPopThreadCount) { // 無任何線程正在訪問當(dāng)前指針宏蛉,直接刪除
      delete node;
    } else {  // 有線程正在訪問,加入緩存列表
      node->next = will_be_deleted_list_.load();
      while (!will_be_deleted_list_.compare_exchange_strong(node->next, node));
    }
  }
public:
  ~MemoryReclamationByHazardPointerT()
  {
    // 自己不能刪除自己性置,正常退出HazardPointer始終會(huì)持有一個(gè)節(jié)點(diǎn)拾并,只能在此做清理
    size_t count = 0;
    _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr);
    while (to_delete_list) {
      _Node* node = to_delete_list;
      to_delete_list = node->next;
      delete node;
      ++count;
    }
    assert(count < 2);
  }
  inline void Addref() {}
  bool Store(_Node* node)
  {
    struct HazardPointerOwner
    {
      HazardPointer* hp;
      HazardPointerOwner(HazardPointer* hps)
        : hp(nullptr)
      {
        for (size_t i = 0; i < _MaxPopThreadCount; ++i) {
          std::thread::id id;
          if (hps[i].id.compare_exchange_strong(id, std::this_thread::get_id())) {
            hp = &hps[i];
            break;
          }
        }
      }
      ~HazardPointerOwner()
      {
        if (hp) {
          hp->ptr.store(nullptr);
          hp->id.store(std::thread::id());
        }
      }
    };
    thread_local HazardPointerOwner owner(hps_);
    if (!node || !owner.hp) return false;
    owner.hp->ptr.store(node);
    return true;
  }
  bool Release(_Node* node)
  {
    if (!node) return false;
    _ReleaseImpl(node); // 對(duì)當(dāng)前傳入指針進(jìn)行釋放操作
                        // 循環(huán)檢測(cè)will_be_deleted_list_, 可以另開一個(gè)線程定時(shí)檢測(cè)效率會(huì)更高
    _Node* to_delete_list = will_be_deleted_list_.exchange(nullptr);
    while (to_delete_list) {
      _Node* node = to_delete_list;
      to_delete_list = node->next;
      _ReleaseImpl(node);
    }
    return true;
  }
};
template<typename _Ty, typename _MemoryReclamation>
class LockFreeStackT
{
  typedef NodeT<_Ty> Node;
  struct TaggedPointer
  {
    Node* ptr;
    size_t tag;
    TaggedPointer() {}
    TaggedPointer(Node* _ptr, size_t _tag)
      : ptr(_ptr)
      , tag(_tag)
    {
    }
  };
public:
  ~LockFreeStackT()
  {
    TaggedPointer o(nullptr, 0);
    head_.exchange(o);

    Node* head = o.ptr;
    while (head) {
      Node* node = head;
      head = node->next;
      delete node;
    }
  }
  void Push(const _Ty& val)
  {
    TaggedPointer o = head_.load();
    TaggedPointer n(new Node(val, o.ptr), o.tag + 1);
    while (!head_.compare_exchange_strong(o, n)) {
      n.ptr->next = o.ptr;
      n.tag = o.tag + 1;
    }
  }
  std::unique_ptr<_Ty> Pop()
  {
    memory_reclamation_.Addref();
    TaggedPointer o = head_.load(), n;
    while (true) {
      if (!o.ptr) break;
      memory_reclamation_.Store(o.ptr);
      // HazardPointer算法儲(chǔ)存(相當(dāng)于上鎖)后,需要對(duì)有效值進(jìn)行二次確認(rèn)鹏浅,否則還是有先刪除的問題
      // 這樣做并沒效率問題嗅义,不等的情況CAS操作也會(huì)進(jìn)行循環(huán),因此可以作為針對(duì)任何內(nèi)存回收算法的固定寫法
      const TaggedPointer t = head_.load();
      if (memcmp(&t, &o, sizeof(TaggedPointer))) {
        o = t;
        continue;
      }
      n.ptr = o.ptr->next;
      n.tag = o.tag + 1;
      if (head_.compare_exchange_strong(o, n)) break;
    }
    memory_reclamation_.Store(nullptr);
    std::unique_ptr<_Ty> ret;
    if (o.ptr) {
      ret.swap(o.ptr->data);
      memory_reclamation_.Release(o.ptr);
    }
    return std::move(ret);
  }
private:
  std::atomic<TaggedPointer> head_ = ATOMIC_VAR_INIT(TaggedPointer(nullptr, 0));
  _MemoryReclamation memory_reclamation_;
};
template<typename _Ty, int _ThreadCount = 16, int _LoopCount = 100000>
struct LockFreePerformanceTestT
{
  template<class _ProcessUnit>
  static double Run(_ProcessUnit puf)
  {
    std::thread ths[_ThreadCount];
    auto st = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < _ThreadCount; ++i) ths[i] = std::thread([&puf]() {
      for (int i = 0; i < _LoopCount; ++i) {
        puf();
      }
    });
    for (int i = 0; i < _ThreadCount; ++i) ths[i].join();
    const double period_in_ms = static_cast<double>((std::chrono::high_resolution_clock::now() - st).count())
      / std::chrono::high_resolution_clock::period::den * 1000;
    return period_in_ms;
  }
  static void Run()
  {
    _Ty s;
    std::cout << Run([&s]() {s.Push(0); }) << "\t\t";
    std::cout << Run([&s]() { s.Pop(); }) << std::endl;
  }
};
int main()
{
  std::cout << "LockedStack with std::mutex" << "\t\t\t\t\t";
  LockFreePerformanceTestT<LockedStackT<uint32_t, std::mutex>>::Run();
  std::cout << "LockedStack with SpinLockByTas yield" << "\t\t\t\t";
  LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByTasT<>>>::Run();
  std::cout << "LockedStack with SpinLockByCas yield" << "\t\t\t\t";
  LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByCasT<>>>::Run();
  std::cout << "LockedStack with SpinLockByTas usleep(5)" << "\t\t\t";
  LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByTasT<5>>>::Run();
  std::cout << "LockedStack with SpinLockByCas usleep(5)" << "\t\t\t";
  LockFreePerformanceTestT<LockedStackT<uint32_t, SpinLockByCasT<5>>>::Run();
  std::cout << "LockFreeStack with MemoryReclamationByReferenceCounting" << "\t\t";
  LockFreePerformanceTestT<LockFreeStackT<uint32_t, MemoryReclamationByReferenceCountingT<NodeT<uint32_t>>>>::Run();
  std::cout << "LockFreeStack with MemoryReclamationByHazardPointer" << "\t\t";
  LockFreePerformanceTestT<LockFreeStackT<uint32_t, MemoryReclamationByHazardPointerT<NodeT<uint32_t>>>>::Run();
  return 0;
}

測(cè)試結(jié)果如下:


注:在Windows x64下隐砸,atomic<size=16>不是lockfree芥喇,如果有需要要可以用InterlockedCompareExchange128自己實(shí)現(xiàn)一下;

內(nèi)存模型

C++11原子操作的很多函數(shù)都有個(gè)std::memory_order參數(shù)凰萨,這個(gè)參數(shù)就是這里所說的內(nèi)存模型继控,其并不是類似POD的內(nèi)存布局,而是一種數(shù)據(jù)同步模型胖眷,準(zhǔn)確說法應(yīng)該是儲(chǔ)存一致性模型武通,其作用是對(duì)同一時(shí)間的讀寫操作進(jìn)行排序;C++11中一個(gè)定義了6種類型珊搀,我們可以將其分為4類冶忱,下面我從我的角度以普通程序員能理解的語言描述一下,具體的可以參見 C++11 Memory Order

  1. memory_order_relaxed: 很多文檔都說這種模型是完全亂序的境析,但我理解同一線程內(nèi)囚枪,基本上應(yīng)該還是按照代碼順序執(zhí)行的;
  2. memory_order_release & memory_order_acquire: 兩個(gè)線程A&B劳淆,A線程Release后链沼,B線程Acquire能保證一定讀到的是最新被修改過的值;這種模型更強(qiáng)大的地方在于它能保證發(fā)生在A-Release前的所有寫操作沛鸵,在B-Acquire后都能讀到最新值括勺;
  3. memory_order_release & memory_order_consume: 上一個(gè)模型的同步是針對(duì)所有對(duì)象的缆八,這種模型只針對(duì)依賴于該操作涉及的對(duì)象:比如這個(gè)操作發(fā)生在變量a上,而s = a + b; 那s依賴于a疾捍,但b不依賴于a; 當(dāng)然這里也有循環(huán)依賴的問題奈辰,例如:t = s + 1,因?yàn)閟依賴于a乱豆,那t其實(shí)也是依賴于a的奖恰;
  4. memory_order_seq_cst: 順序一致性模型,這是C++11原子操作的默認(rèn)模型宛裕;大概行為為對(duì)每一個(gè)變量都進(jìn)行2中所說的Release-Acquire操作房官,當(dāng)然這也是一個(gè)最慢的同步模型;

說到內(nèi)存模型续滋,就不得不提一下經(jīng)常被大家誤用的 volatile 關(guān)鍵字翰守,這個(gè)關(guān)鍵字僅僅保證:數(shù)據(jù)只在內(nèi)存中讀寫,直接操作它既不能保證操作是atomic的疲酌,也不能保證Memory Order蜡峰;其實(shí)在我理解中,這個(gè)應(yīng)該是嵌入式朗恳,內(nèi)核或驅(qū)動(dòng)程序員專用關(guān)鍵字:)湿颅,當(dāng)然如果在競(jìng)爭(zhēng)不敏感的環(huán)境中用來做flag用一下也沒太大問題.

最后要說一下x86體系中Release-Acquire是自動(dòng)獲取的,最終形成一個(gè)memory_order_seq_cst模型粥诫;因此絕大多數(shù)情況下memory_order_relaxed其實(shí)并沒有什么用.

結(jié)語

無鎖編程真的很難油航,如果要完全寫對(duì)那就成了變態(tài)難,出錯(cuò)了平時(shí)常見的調(diào)試手段根本沒用怀浆,幾乎全靠腦補(bǔ)谊囚;并且這種輪詢方式相對(duì)于鎖的中斷掛起方式來講,只有在超高并發(fā)的前提下才能達(dá)到一個(gè)理想的效果执赡,低并發(fā)下空載會(huì)對(duì)系統(tǒng)資源造成極大的浪費(fèi)镰踏,因此原則上我不推崇這個(gè)玩意;從測(cè)試結(jié)果來看沙合,所有平臺(tái)上自旋鎖性能都非常接近無鎖實(shí)現(xiàn)奠伪,并且其使用方式和互斥鎖幾乎沒差別,因此在沒啃透之前首懈,使用鎖的方式才是明智的選擇.

參考資料

C++ 并發(fā)編程指南
Yet another implementation of a lock-free circular array queue
Common Pitfalls in Writing Lock-Free Algorithms
Writing Lock-Free Code: A Corrected Queue
維基百科: ABA problem
Lock-Free Reference Counting
Efficient and Reliable Lock-Free Memory Reclamation Based on Reference Counting
維基百科: Hazard pointer
C++11 Memory Order
Nine ways to break your systems code using volatile

原始鏈接 版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 4.0

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末绊率,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子究履,更是在濱河造成了極大的恐慌滤否,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挎袜,死亡現(xiàn)場(chǎng)離奇詭異顽聂,居然都是意外死亡肥惭,警方通過查閱死者的電腦和手機(jī)盯仪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門紊搪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人全景,你說我怎么就攤上這事耀石。” “怎么了爸黄?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵滞伟,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我炕贵,道長(zhǎng)梆奈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任称开,我火速辦了婚禮亩钟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鳖轰。我一直安慰自己清酥,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布蕴侣。 她就那樣靜靜地躺著焰轻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪昆雀。 梳的紋絲不亂的頭發(fā)上辱志,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音狞膘,去河邊找鬼荸频。 笑死,一個(gè)胖子當(dāng)著我的面吹牛客冈,可吹牛的內(nèi)容都是我干的旭从。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼场仲,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼和悦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起渠缕,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤鸽素,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后亦鳞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體馍忽,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡棒坏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了遭笋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坝冕。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瓦呼,靈堂內(nèi)的尸體忽然破棺而出喂窟,到底是詐尸還是另有隱情,我是刑警寧澤央串,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布磨澡,位于F島的核電站,受9級(jí)特大地震影響质和,放射性物質(zhì)發(fā)生泄漏稳摄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一饲宿、第九天 我趴在偏房一處隱蔽的房頂上張望厦酬。 院中可真熱鬧,春花似錦褒傅、人聲如沸弃锐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽霹菊。三九已至,卻和暖如春支竹,著一層夾襖步出監(jiān)牢的瞬間旋廷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來泰國打工礼搁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留饶碘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓馒吴,卻偏偏與公主長(zhǎng)得像扎运,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子饮戳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容