以前工作多偏重于業(yè)務(wù)邏輯刹衫,較少關(guān)注到底層的邏輯實(shí)現(xiàn),自己寫內(nèi)部工具中時(shí)也比較隨意搞挣,遇到用多線程的地方都是臨時(shí)起一個(gè)線程處理耗時(shí)的復(fù)雜任務(wù)带迟,任務(wù)結(jié)束后,自動(dòng)被主線程回收柿究。線程的頻繁創(chuàng)建銷毀其實(shí)是存在很大開(kāi)銷的邮旷。下面的這種設(shè)計(jì)使用方式可以更高效的利用multi-thread
MessageLoopThread
循環(huán)隊(duì)列任務(wù)處理機(jī)制簡(jiǎn)單說(shuō)明:
- 一個(gè)持續(xù)運(yùn)行的Thread(while (1) { do tasks})
- 一個(gè) task queue (如果用的不是線程安全的標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu),則需要另外加鎖來(lái)保證)
- task queue中存放的是 類似于 C++ Functor的對(duì)象蝇摸,或者簡(jiǎn)單理解就是task function的一段代碼
- 其他線程會(huì)將耗時(shí)任務(wù)封裝成task婶肩,加入隊(duì)列中
- Thread 運(yùn)行時(shí),按queue中順序依次執(zhí)行其中的task function
上面有一個(gè)很重要解決的問(wèn)題是貌夕,需要把不同的邏輯處理函數(shù) 封裝成統(tǒng)一的task function律歼,答案就是Android里面使用了chromium的base::Bind來(lái)實(shí)現(xiàn)統(tǒng)一的封裝
可以參考關(guān)于std::bind 來(lái)理解這個(gè)base::Bind作用,實(shí)際作用有些像python的functools 里面的輔助函數(shù)啡专,也有些類似python里的 wrapper裝飾器险毁,簡(jiǎn)單理解就是把一個(gè)函數(shù)固化入?yún)⒑螅梢粋€(gè)新的函數(shù)们童。
-
下面舉例說(shuō)明:
static bt_status_t btif_gatts_add_service(int server_if,
const btgatt_db_element_t* service,
size_t service_count) {
CHECK_BTGATT_INIT();
return do_in_jni_thread(FROM_HERE,
Bind(&add_service_impl, server_if,
std::vector(service, service + service_count)));
}
static void add_service_impl(int server_if,
vector<btgatt_db_element_t> service) {
if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
LOG_ERROR("%s: Attept to register restricted service", __func__);
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
server_if, service.data(), service.size());
return;
}
BTA_GATTS_AddService(
server_if, service,
jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}
bt_status_t do_in_jni_thread(const base::Location& from_here,
base::OnceClosure task) {
if (!jni_thread.DoInThread(from_here, std::move(task))) {
LOG(ERROR) << __func__ << ": Post task to task runner failed!";
return BT_STATUS_FAIL;
}
return BT_STATUS_SUCCESS;
}
static MessageLoopThread jni_thread("bt_jni_thread");
bool MessageLoopThread::DoInThread(const base::Location& from_here,
base::OnceClosure task) {
return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
}
bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
base::OnceClosure task,
const base::TimeDelta& delay) {
std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
if (is_main_ && init_flags::gd_rust_is_enabled()) {
if (rust_thread_ == nullptr) {
LOG(ERROR) << __func__ << ": rust thread is null for thread " << *this
<< ", from " << from_here.ToString();
return false;
}
shim::rust::main_message_loop_thread_do_delayed(
**rust_thread_,
std::make_unique<shim::rust::OnceClosure>(std::move(task)),
delay.InMilliseconds());
return true;
}
上面這段代碼btif_gatts_add_service 就是將函數(shù)add_service_impl通過(guò)Bind入?yún)⒐袒筠D(zhuǎn)換成統(tǒng)一的OnceClosure task玉控,放到bt_jni_thread中
這里需要注意的一點(diǎn)shim::rust::main_message_loop_thread_do_delayed
最終調(diào)用的是Rust實(shí)現(xiàn),這里似乎是Android在新版本里最終底層多線程的處理都改用Rust實(shí)現(xiàn)了汞扎,目前尚不熟悉Rust語(yǔ)言和C++的調(diào)用Rust處理柠傍,對(duì)應(yīng)rust文件message_loop_thread.rs
,據(jù)說(shuō)是Rust相比C++在多線程處理上使用起來(lái)更方便高效齐板;DoInThreadDelayed函數(shù)也有相關(guān)mock實(shí)現(xiàn)吵瞻,可以從mock實(shí)現(xiàn)來(lái)理解上面所說(shuō)的細(xì)節(jié)葛菇,這里就不列舉出來(lái)了
-
下面這段代碼的設(shè)計(jì)也很巧妙,可以在實(shí)際應(yīng)用中借鑒:
static void add_service_impl(int server_if,
vector<btgatt_db_element_t> service) {
if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
LOG_ERROR("%s: Attept to register restricted service", __func__);
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
server_if, service.data(), service.size());
return;
}
BTA_GATTS_AddService(
server_if, service,
jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}
extern void BTA_GATTS_AddService(tGATT_IF server_if,
std::vector<btgatt_db_element_t> service,
BTA_GATTS_AddServiceCb cb) {
do_in_main_thread(FROM_HERE,
base::Bind(&bta_gatts_add_service_impl, server_if,
std::move(service), std::move(cb)));
}
template <typename R, typename... Args>
base::Callback<R(Args...)> jni_thread_wrapper(const base::Location& from_here,
base::Callback<R(Args...)> cb) {
return base::Bind(
[](const base::Location& from_here, base::Callback<R(Args...)> cb,
Args... args) {
do_in_jni_thread(from_here,
base::Bind(cb, std::forward<Args>(args)...));
},
from_here, std::move(cb));
}
static void on_service_added_cb(tGATT_STATUS status, int server_if,
vector<btgatt_db_element_t> service) {
HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, status, server_if,
service.data(), service.size());
}
有兩點(diǎn):
- 用了兩個(gè)線程來(lái)完成一個(gè)task橡羞,準(zhǔn)確的說(shuō)是一個(gè)線程完成主體任務(wù)處理眯停,另一個(gè)線程完成任務(wù)結(jié)果callback處理,這樣主體任務(wù)線程的處理不會(huì)因?yàn)閱我蝗蝿?wù)callback而阻塞 (這個(gè)比較經(jīng)典的場(chǎng)景就是在UI設(shè)計(jì)里卿泽,以前最開(kāi)始使用Qt做tool時(shí)有遇到:后臺(tái)處理莺债,前臺(tái)刷新邏輯做線程分離,后臺(tái)處理不會(huì)導(dǎo)致UI界面卡住又厉,UI界面重繪不會(huì)導(dǎo)致后臺(tái)任務(wù)阻塞九府,甚至復(fù)雜處理邏輯可以使用更多的線程來(lái)保證)
- jni_thread_wrapper中使用了Bind和C++ 匿名函數(shù)對(duì)callback函數(shù)做統(tǒng)一封裝,實(shí)際處理時(shí)調(diào)用
cb.Run(GATT_ERROR, server_if, std::move(service));
void bta_gatts_add_service_impl(tGATT_IF server_if,
std::vector<btgatt_db_element_t> service,
BTA_GATTS_AddServiceCb cb) {
uint8_t rcb_idx =
bta_gatts_find_app_rcb_idx_by_app_if(&bta_gatts_cb, server_if);
LOG(INFO) << __func__ << ": rcb_idx=" << +rcb_idx;
if (rcb_idx == BTA_GATTS_INVALID_APP) {
cb.Run(GATT_ERROR, server_if, std::move(service));
return;
}
Notes:
- 當(dāng)然上面任務(wù)的處理是屬于異步的覆致,對(duì)于時(shí)效性要求特別高的場(chǎng)景侄旬,可能不適合使用;如果有很多產(chǎn)生任務(wù)的線程煌妈,那可能需要合理規(guī)劃任務(wù)處理線程的個(gè)數(shù)以及實(shí)際分配協(xié)同等
- 下面rust中多線程處理代碼儡羔,rust在多線程處理上比C++更高效,這個(gè)需要更多研究璧诵。
pub fn main_message_loop_thread_do_delayed(
thread: &mut MessageLoopThread,
closure: cxx::UniquePtr<ffi::OnceClosure>,
delay_ms: i64,
) {
assert!(init_flags::gd_rust_is_enabled());
if delay_ms == 0 {
if thread.tx.send(closure).is_err() {
log::error!("could not post task - shutting down?");
}
} else {
thread.rt.spawn(async move {
// NOTE: tokio's sleep can't wake up the system...
// but hey, neither could the message loop from libchrome.
//
// ...and this way we don't use timerfds arbitrarily.
//
// #yolo
tokio::time::sleep(Duration::from_millis(delay_ms.try_into().unwrap_or(0))).await;
closure.Run();
});
}
}