視頻教程:https://www.bilibili.com/video/av94487439
本文主要介紹多線程中數(shù)據(jù)同步的方法光戈,技術(shù)包括:線程鎖涉波,同步變量,原子變量肌割,消息處理等声滥;以及三種同步隊(duì)列的實(shí)現(xiàn)方法眉撵。
std::unique_lock
- 與std:::lock_gurad基本一致,但更加靈活的鎖管理類模板落塑,構(gòu)造時(shí)是否加鎖是可選的执桌,在對(duì)象析構(gòu)時(shí)如果持有鎖會(huì)自動(dòng)釋放鎖,所有權(quán)可以轉(zhuǎn)移芜赌。對(duì)象生命期內(nèi)允許手動(dòng)加鎖和釋放鎖。但提供了更好的上鎖和解鎖控制接口(lock伴逸,try_lock缠沈,try_lock_for,try_lock_until 和unlock)
條件變量
- 條件變量可以阻塞一個(gè)或多個(gè)線程错蝴,直到收到另外一個(gè)線程發(fā)出的通知洲愤,或者超時(shí)了才會(huì)喚醒當(dāng)前阻塞的線程。
類型
- condition_variable顷锰,配合std::unique_lock<std::mutex>進(jìn)行操作
-
condition_variable_any柬赐,配合任意帶有l(wèi)ock,unlock語(yǔ)義的mutex進(jìn)行操作
- 比較靈活官紫,更通用肛宋,對(duì)所有的鎖都適用
- 效率比condition_variable差
成員函數(shù)
- notify_one 通知一個(gè)等待線程(public)
- notify_all 通知所有等待線程(public)
notify_one()和notify_all()都是Object對(duì)象用于通知處在等待該對(duì)象的線程的方法,但notify_one是通知一個(gè)線程獲取鎖束世,notify_all是通知所有相關(guān)的線程去競(jìng)爭(zhēng)鎖酝陈。
- wait 阻塞當(dāng)前線程直至條件變量被喚醒(public)
- wait_for 阻塞當(dāng)前線程直至條件變量被喚醒或超時(shí)(public)
- wait_until 阻塞當(dāng)前線程直至條件變量被喚醒或直到指定的時(shí)間點(diǎn)(public)
執(zhí)行過(guò)程
- 擁有條件變量的線程首先獲取互斥量
- 然后循環(huán)檢查某個(gè)條件,如果條件不滿足毁涉,釋放互斥量沉帮,同時(shí)阻塞該線程直到條件滿足;如果條件滿足,則向下執(zhí)行穆壕。
- 另一個(gè)線程獲取互斥量待牵,執(zhí)行完成后調(diào)用條件變量的notify_one或notify_all喚醒一個(gè)或者所有等待線程。
簡(jiǎn)潔寫法及wait機(jī)制
std::unique_lock<std::mutex> lck(m_mtRun);
m_cvRun.wait(lck, [this]{ return m_runDown; });
- 條件變量首先檢查判斷式是否滿足條件喇勋,例如上例中的m_runDown是否為true
- 如果不滿足條件缨该,釋放mutex,將線程置為wait狀態(tài)茄蚯,繼續(xù)等待喚醒
- 如果滿足條件压彭,重新獲取mutex,線程結(jié)束wait狀態(tài)渗常,繼續(xù)向下執(zhí)行
- 這里需要注意的是壮不,wait狀態(tài)的線程被喚醒,但判斷式不滿足條件皱碘,****即假喚醒****询一,條件變量將繼續(xù)釋放mutex,將線程置為wait狀態(tài)癌椿,繼續(xù)等待下一次的喚醒
基本示例-wait, wait_for和假喚醒
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include <windows.h>
bool completed;
std::mutex mtRun;
std::condition_variable cvRun;
void Wait()
{
std::unique_lock<std::mutex> lck(mtRun);
std::cout <<"Thread_"<<std::this_thread::get_id() << " is waiting..." << std::endl;
cvRun.wait(lck, []() {
return completed;
});
std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
}
void Wait_For()
{
std::unique_lock<std::mutex> lck(mtRun);
std::cout << "Thread_" << std::this_thread::get_id() << " is waiting..." << std::endl;
if (!cvRun.wait_for(lck, std::chrono::seconds(4), []() {
//if (!cvRun.wait_for(lck, std::chrono::seconds(2), []() {
return completed;
}))
{
std::cout << "Thread_" << std::this_thread::get_id() << " time out!" << std::endl;
}
else
{
std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
}
}
void Completed()
{
{
std::cout << "Thread_" << std::this_thread::get_id() << " set completed" << std::endl;
std::unique_lock<std::mutex> lck(mtRun);
completed = true;
}
cvRun.notify_all();
}
void FakeCompleted()
{
{
std::cout << "Thread_" << std::this_thread::get_id() << " not set completed" << std::endl;
std::unique_lock<std::mutex> lck(mtRun);
completed = false;
}
cvRun.notify_all();
}
int main()
{
//Wait
completed = false;
std::thread thWait(Wait);
thWait.detach();
Sleep(3000);
std::thread thCompleted(Completed);
thCompleted.join();
Sleep(3000);
//Waitfor
//completed = false;
//std::thread thWait(Wait_For);
//thWait.detach();
//Sleep(3000);
//std::thread thCompleted(Completed);
//thCompleted.join();
//Sleep(3000);
//Fake
/*completed = false;
std::thread thWait(Wait_For);
thWait.detach();
Sleep(3000);
std::thread thCompleted(FakeCompleted);
thCompleted.join();
Sleep(3000);*/
return 0;
}
原子變量
- 使用原子變量不需要使用互斥量來(lái)保護(hù)這個(gè)變量健蕊,使用更簡(jiǎn)潔。
- C++11提供個(gè)原子類型std::atomic<T>, 可以使用任意類型作為參數(shù)模板踢俄,同時(shí)也內(nèi)置了基礎(chǔ)類型的原子變量缩功。
typedef atomic<bool> atomic_bool;
typedef atomic<char> atomic_char;
typedef atomic<signed char> atomic_schar;
typedef atomic<unsigned char> atomic_uchar;
typedef atomic<short> atomic_short;
typedef atomic<unsigned short> atomic_ushort;
typedef atomic<int> atomic_int;
typedef atomic<unsigned int> atomic_uint;
typedef atomic<long> atomic_long;
typedef atomic<unsigned long> atomic_ulong;
typedef atomic<long long> atomic_llong;
typedef atomic<unsigned long long> atomic_ullong;
typedef atomic<char16_t> atomic_char16_t;
typedef atomic<char32_t> atomic_char32_t;
typedef atomic<wchar_t> atomic_wchar_t;
typedef atomic<int8_t> atomic_int8_t;
typedef atomic<uint8_t> atomic_uint8_t;
typedef atomic<int16_t> atomic_int16_t;
typedef atomic<uint16_t> atomic_uint16_t;
typedef atomic<int32_t> atomic_int32_t;
typedef atomic<uint32_t> atomic_uint32_t;
typedef atomic<int64_t> atomic_int64_t;
typedef atomic<uint64_t> atomic_uint64_t;
typedef atomic<int_least8_t> atomic_int_least8_t;
typedef atomic<uint_least8_t> atomic_uint_least8_t;
typedef atomic<int_least16_t> atomic_int_least16_t;
typedef atomic<uint_least16_t> atomic_uint_least16_t;
typedef atomic<int_least32_t> atomic_int_least32_t;
typedef atomic<uint_least32_t> atomic_uint_least32_t;
typedef atomic<int_least64_t> atomic_int_least64_t;
typedef atomic<uint_least64_t> atomic_uint_least64_t;
typedef atomic<int_fast8_t> atomic_int_fast8_t;
typedef atomic<uint_fast8_t> atomic_uint_fast8_t;
typedef atomic<int_fast16_t> atomic_int_fast16_t;
typedef atomic<uint_fast16_t> atomic_uint_fast16_t;
typedef atomic<int_fast32_t> atomic_int_fast32_t;
typedef atomic<uint_fast32_t> atomic_uint_fast32_t;
typedef atomic<int_fast64_t> atomic_int_fast64_t;
typedef atomic<uint_fast64_t> atomic_uint_fast64_t;
typedef atomic<intptr_t> atomic_intptr_t;
typedef atomic<uintptr_t> atomic_uintptr_t;
typedef atomic<size_t> atomic_size_t;
typedef atomic<ptrdiff_t> atomic_ptrdiff_t;
typedef atomic<intmax_t> atomic_intmax_t;
typedef atomic<uintmax_t> atomic_uintmax_t;
//
typedef signed char int8_t;
typedef short int16_t;
typedef int int32_t;
typedef long long int64_t;
typedef unsigned char uint8_t;
typedef unsigned short uint16_t;
typedef unsigned int uint32_t;
typedef unsigned long long uint64_t;
typedef signed char int_least8_t;
typedef short int_least16_t;
typedef int int_least32_t;
typedef long long int_least64_t;
typedef unsigned char uint_least8_t;
typedef unsigned short uint_least16_t;
typedef unsigned int uint_least32_t;
typedef unsigned long long uint_least64_t;
typedef signed char int_fast8_t;
typedef int int_fast16_t;
typedef int int_fast32_t;
typedef long long int_fast64_t;
typedef unsigned char uint_fast8_t;
typedef unsigned int uint_fast16_t;
typedef unsigned int uint_fast32_t;
typedef unsigned long long uint_fast64_t;
typedef long long intmax_t;
typedef unsigned long long uintmax_t;
- 以下寫法是一樣
std::atomic_int m_standbyIdIndex;
std::atomic<int> m_standbyIdIndex;
call_once&once_flag
- 如果多個(gè)線程需要同時(shí)調(diào)用某個(gè)函數(shù),std::call_once 可以保證多個(gè)線程對(duì)該函數(shù)只調(diào)用一次都办。
- 需要一個(gè)std::once_flag作為std::call_once的入?yún)?/li>
std::once_flag m_flag;
std::call_once(m_flag, [this](){StopExecute(); });
同步隊(duì)列
基本同步隊(duì)列
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include <windows.h>
class SyncQueue
{
public:
SyncQueue()
{
}
void Push(const int& x)
{
{
std::unique_lock<std::mutex> lck(m_mutex);
m_queue.push_back(x);
}
m_notEmpty.notify_all();
}
void Pop(int& x)
{
std::unique_lock<std::mutex> lck(m_mutex);
m_notEmpty.wait(lck, [this]() {
return !m_queue.empty();
});
x = m_queue.front();
m_queue.pop_front();
}
bool Empty()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.empty();
}
size_t Size()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.size();
}
private:
std::list<int> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
};
SyncQueue queue;
void GetData()
{
int x = 0;
while (queue.Empty())
{
queue.Pop(x);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
if (x == 0)
{
break;
}
}
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}
void SetData()
{
for (int i = 10; i >= 0; i--)
{
Sleep(1000);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
queue.Push(i);
}
Sleep(500);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}
int main()
{
std::thread thGet(GetData);
thGet.detach();
std::thread thSet(SetData);
thSet.join();
return 0;
}
//output
Thread_27072---- Push 1
Thread_26712---- Pop 1
Thread_27072---- Push 2
Thread_26712---- Pop 2
Thread_27072---- Push 3
Thread_26712---- Pop 3
Thread_27072---- Push 4
Thread_26712---- Pop 4
Thread_27072---- Push 5
Thread_26712---- Pop 5
Thread_27072---- Push 6
Thread_26712---- Pop 6
Thread_27072---- Push 7
Thread_26712---- Pop 7
Thread_27072---- Push 8
Thread_26712---- Pop 8
Thread_27072---- Push 9
Thread_26712---- Pop 9
Thread_27072---- Push 10
Thread_26712---- Pop 10
Thread_26712---- Pop End!
Thread_27072---- Push End!
帶外部控制的同步隊(duì)列
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include <windows.h>
#include <atomic>
class SyncQueue
{
public:
SyncQueue()
{
}
void Push(const int& x)
{
{
std::unique_lock<std::mutex> lck(m_mutex);
m_queue.push_back(x);
}
m_notEmpty.notify_all();
}
void Pop(int& x)
{
std::unique_lock<std::mutex> lck(m_mutex);
m_notEmpty.wait(lck, [this]() {
return !m_queue.empty();
});
x = m_queue.front();
m_queue.pop_front();
}
bool Empty()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.empty();
}
size_t Size()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.size();
}
private:
std::list<int> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
};
SyncQueue queue;
std::atomic_bool getStop = false;
void GetData()
{
int x = 0;
while (queue.Empty())
{
if (getStop)
{
break;
}
queue.Pop(x);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
}
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}
void SetData()
{
for (int i = 10; i >= 0; i--)
{
Sleep(1000);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
queue.Push(i);
if (i == 5)
{
getStop = true;
}
}
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}
int main()
{
std::thread thGet(GetData);
thGet.detach();
std::thread thSet(SetData);
thSet.join();
return 0;
}
//output
Thread_29616---- Push 10
Thread_30076---- Pop 10
Thread_29616---- Push 9
Thread_30076---- Pop 9
Thread_29616---- Push 8
Thread_30076---- Pop 8
Thread_29616---- Push 7
Thread_30076---- Pop 7
Thread_29616---- Push 6
Thread_30076---- Pop 6
Thread_29616---- Push 5
Thread_30076---- Pop 5
Thread_30076---- Pop End!
Thread_29616---- Push 4
Thread_29616---- Push 3
Thread_29616---- Push 2
Thread_29616---- Push 1
Thread_29616---- Push 0
Thread_29616---- Push End!
帶超時(shí)的同步隊(duì)列
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include <windows.h>
#include <atomic>
class SyncQueue
{
public:
SyncQueue()
{
}
void Push(const int& x)
{
{
std::unique_lock<std::mutex> lck(m_mutex);
m_queue.push_back(x);
}
m_notEmpty.notify_all();
}
bool Pop(int& x)
{
std::unique_lock<std::mutex> lck(m_mutex);
if (m_notEmpty.wait_for(lck, std::chrono::seconds(1), [this]() {
return !m_queue.empty();
}))
{
x = m_queue.front();
m_queue.pop_front();
return true;
}
else
{
return false;
}
}
bool Empty()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.empty();
}
size_t Size()
{
std::lock_guard<std::mutex> lck(m_mutex);
return m_queue.size();
}
private:
std::list<int> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
};
SyncQueue queue;
std::atomic_bool getStop = false;
void GetData()
{
int x = 0;
while (queue.Empty())
{
if (getStop)
{
break;
}
if (queue.Pop(x))
{
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
}
else
{
std::cout << "Thread_" << std::this_thread::get_id() << "---- Get Data Time out" << std::endl;
}
}
std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
}
void SetData()
{
for (int i = 10; i >= 0; i--)
{
Sleep(100);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
queue.Push(i);
if (i <= 5)
{
Sleep(2000);
}
}
getStop = true;
Sleep(500);
std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
}
int main()
{
std::thread thGet(GetData);
thGet.detach();
std::thread thSet(SetData);
thSet.join();
return 0;
}
//output
Thread_18908---- Push 10
Thread_2204---- Pop 10
Thread_18908---- Push 9
Thread_2204---- Pop 9
Thread_18908---- Push 8
Thread_2204---- Pop 8
Thread_18908---- Push 7
Thread_2204---- Pop 7
Thread_18908---- Push 6
Thread_2204---- Pop 6
Thread_18908---- Push 5
Thread_2204---- Pop 5
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 4
Thread_2204---- Pop 4
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 3
Thread_2204---- Pop 3
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 2
Thread_2204---- Pop 2
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 1
Thread_2204---- Pop 1
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_18908---- Push 0
Thread_2204---- Pop 0
Thread_2204---- Get Data Time out
Thread_2204---- Get Data Time out
Thread_2204---- Pop End!
Thread_18908---- Push End!