簡介
應(yīng)用中的每個(gè)進(jìn)程都會(huì)有一個(gè)主線程常挚,主線程主要承擔(dān)執(zhí)行UI繪制操作、管理ArkTS引擎實(shí)例的創(chuàng)建和銷毀折欠、分發(fā)和處理事件咪奖、管理Ability生命周期等職責(zé)羊赵,具體可參見線程模型概述昧捷。因此,開發(fā)應(yīng)用時(shí)應(yīng)當(dāng)盡量避免將耗時(shí)的操作放在主線程中執(zhí)行衩茸。ArkTS提供了Worker和TaskPool兩種多線程并發(fā)能力楞慈,多線程并發(fā)允許在同一時(shí)間段內(nèi)同時(shí)執(zhí)行多段代碼,這兩個(gè)并發(fā)的基本能力可參見TaskPool和Worker的對(duì)比聚霜。
在介紹Worker和TaskPool的詳細(xì)使用方法前蝎宇,我們先簡單介紹并發(fā)模型的相關(guān)概念,以便于大家的理解凉唐。
并發(fā)模型概述
并發(fā)的意思是多個(gè)任務(wù)同時(shí)執(zhí)行台囱。并發(fā)模型分為兩大類:基于內(nèi)存共享的并發(fā)模型和基于消息傳遞的并發(fā)模型玄坦。
在基于內(nèi)存共享的并發(fā)模型中豺总,并發(fā)線程通過讀寫內(nèi)存中的共享對(duì)象來進(jìn)行交互喻喳”砺祝基于共享內(nèi)存的并發(fā)編程需要滿足三條性質(zhì):
原子性:指一個(gè)操作是不可中斷的蹦哼,要么全部執(zhí)行成功要么全部執(zhí)行失敗纲熏。
有序性:指程序執(zhí)行的順序必須符合預(yù)期,不能出現(xiàn)亂序的情況鱼填。
可見性:指當(dāng)一個(gè)線程修改了共享變量后苹丸,其他線程能夠立即得知這個(gè)修改。
現(xiàn)代程序語言一般通過鎖塑陵、內(nèi)存屏障阻桅、原子指令來滿足這三條性質(zhì)√苏拢基于內(nèi)存共享的并發(fā)模型與底層硬件接近,在能正確撰寫并發(fā)代碼的情況下蚓土,可以最大發(fā)揮底層硬件性能宏侍,實(shí)現(xiàn)性能優(yōu)秀的多線程程序。但是這種并發(fā)模型難以掌握蜀漆,即使資深的程序員也非常容易犯錯(cuò)谅河。典型的基于內(nèi)存共享并發(fā)模型的程序語言有C++ 、Swift和Java等确丢。
在基于消息傳遞的并發(fā)模型中绷耍,并發(fā)線程的內(nèi)存相互隔離,需要通過通信通道相互發(fā)送消息來進(jìn)行交互鲜侥。典型的基于消息傳遞的并發(fā)模型一般有兩種:CSP和Actor赘阀。
CSP(Communicating Sequential Processes宋欺,通信順序進(jìn)程)中的計(jì)算單元并不能直接互相發(fā)送信息斑司。需要通過通道(Channel)作為媒介進(jìn)行消息傳遞:發(fā)送方需要將消息發(fā)送到Channel僵缺,而接收方需要從Channel讀取消息。與CSP不同,在Actor模型中,每個(gè)Actor可以看做一個(gè)獨(dú)立的計(jì)算單元龟糕,并且相互之間內(nèi)存隔離校摩,每個(gè)Actor中存在信箱(Mail Box)坤塞,Actor之間可以直接進(jìn)行消息傳遞宛瞄,如下圖所示:
圖1 Actor消息傳遞示意圖
CSP與Actor之間的主要區(qū)別:
Actor需要明確指定消息接收方挣轨,而CSP中處理單元不用關(guān)心這些摩幔,只需要把消息發(fā)送給Channel,而接收方只需要從Channel讀取消息。
由于在默認(rèn)情況下Channel是沒有緩存的剪况,因此對(duì)Channel的發(fā)送(Send)動(dòng)作是同步阻塞的镐作,直到另外一個(gè)持有該Channel引用的執(zhí)行塊取出消息,而Actor模型中信箱本質(zhì)是隊(duì)列,因此消息的發(fā)送和接收可以是異步的。
典型的基于消息傳遞的并發(fā)模型的程序語言有:Dart、JS和ArkTS烁设。當(dāng)前系統(tǒng)中Worker和TaskPool都是基于Actor并發(fā)模型實(shí)現(xiàn)的并發(fā)能力。
Worker
基本概念和運(yùn)作原理
當(dāng)前系統(tǒng)中的Worker是一個(gè)獨(dú)立的線程铜幽,基本概念可參見TaskPool和Worker的對(duì)比串稀。Worker擁有獨(dú)立的運(yùn)行環(huán)境,每個(gè)Worker線程和主線程一樣擁有自己的內(nèi)存空間母截、消息隊(duì)列(MessageQueue)清寇、事件輪詢機(jī)制(EventLoop)负饲、調(diào)用棧(CallStack)等迟杂。線程之間通過消息(Massage)進(jìn)行交互傍睹,如下圖所示:
圖2 線程交互示意圖
在多核的情況下(下圖中的CPU 1和CPU 2同時(shí)工作)隔盛,多個(gè)Worker線程(下圖中的worker thread1和worker thread2)可以同時(shí)執(zhí)行,因此Worker線程做到了真正的并發(fā)拾稳,如下圖所示:
圖3 Worker線程并發(fā)示意圖
使用場(chǎng)景和開發(fā)示例
對(duì)于Worker吮炕,有以下適用場(chǎng)景:
運(yùn)行時(shí)間超過3分鐘的任務(wù),需要使用Worker访得。
有關(guān)聯(lián)的一系列同步任務(wù)龙亲,例如數(shù)據(jù)庫增、刪悍抑、改鳄炉、查等,要保證同一個(gè)句柄搜骡,需要使用Worker拂盯。
以視頻解壓的場(chǎng)景為例,點(diǎn)擊右上角下載按鈕浆兰,該示例會(huì)執(zhí)行網(wǎng)絡(luò)下載并監(jiān)聽磕仅,下載完成后自動(dòng)執(zhí)行解壓操作珊豹。當(dāng)視頻過大時(shí)簸呈,可能會(huì)出現(xiàn)解壓時(shí)長超過3分鐘耗時(shí)的情況,因此我們選用該場(chǎng)景來說明如何使用Worker店茶。
場(chǎng)景預(yù)覽圖如下所示:
圖4 場(chǎng)景預(yù)覽圖
使用步驟如下:
- 宿主線程創(chuàng)建一個(gè)Worker線程蜕便。通過
new worker.ThreadWorker()
創(chuàng)建Worker實(shí)例,示例代碼如下:
// 引入worker模塊
import worker, { MessageEvents } from '@ohos.worker';
import type common from '@ohos.app.ability.common';
let workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/pages/workers/worker.ts', {
name: 'FriendsMoments Worker'
});
- 宿主線程給Worker線程發(fā)送任務(wù)消息贩幻。宿主線程通過postMessage方法來發(fā)送消息給Worker線程轿腺,啟動(dòng)下載解壓任務(wù)两嘴,示例代碼如下:
// 請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù)
let context: common.UIAbilityContext = getContext(this) as common.UIAbilityContext;
// 參數(shù)中mediaData和isImageData是根據(jù)開發(fā)者自己的業(yè)務(wù)需求添加的,其中mediaData為數(shù)據(jù)路徑族壳、isImageData為判斷圖片或視頻的標(biāo)識(shí)
workerInstance.postMessage({ context, mediaData: this.mediaData, isImageData: this.isImageData });
- Worker線程監(jiān)聽宿主線程發(fā)送的消息憔辫。Worker線程在onmessage中接收到宿主線程的postMessage請(qǐng)求,執(zhí)行下載解壓任務(wù)仿荆,示例代碼如下:
// 引入worker模塊
import worker, { MessageEvents } from '@ohos.worker';
let workerPort = worker.workerPort;
// 接收宿主線程的postMessage請(qǐng)求
workerPort.onmessage = (e: MessageEvents): void => {
// 下載視頻文件
let context: common.UIAbilityContext = e.data.context;
let filesDir: string = context.filesDir;
let time: number = new Date().getTime();
let inFilePath: string = `${filesDir}/${time.toString()}.zip`;
let mediaDataUrl: string = e.data.mediaData;
let urlPart: string = mediaDataUrl.split('.')[1];
let length: number = urlPart.split('/').length;
let fileName: string = urlPart.split('/')[length-1];
let options: zlib.Options = {
level: zlib.CompressLevel.COMPRESS_LEVEL_DEFAULT_COMPRESSION
};
request.downloadFile(context, {
url: mediaDataUrl,
filePath: inFilePath
}).then((downloadTask) => {
downloadTask.on('progress', (receivedSize: number, totalSize: number) => {
Logger.info(`receivedSize:${receivedSize},totalSize:${totalSize}`);
});
downloadTask.on('complete', () => {
// 下載完成之后執(zhí)行解壓操作
zlib.decompressFile(inFilePath, filesDir, options, (errData: BusinessError) => {
if (errData !== null) {
...
// 異常處理
}
let videoPath: string = `${filesDir}/${fileName}/${fileName}.mp4`;
workerPort.postMessage({ 'isComplete': true, 'filePath': videoPath });
})
});
downloadTask.on('fail', () => {
...
// 異常處理
});
}).catch((err) => {
...
// 異常處理
});
};
宿主線程監(jiān)聽Worker線程發(fā)送的信息贰您。宿主線程通過onmessage接收到Worker線程發(fā)送的消息,并執(zhí)行下載的結(jié)果通知拢操。
釋放Worker資源锦亦。在業(yè)務(wù)完成或者頁面銷毀時(shí),調(diào)用workerPort.close()接口主動(dòng)釋放Worker資源令境,示例代碼如下所示:
workerInstance.onmessage = (e: MessageEvents): void => {
if (e.data) {
this.downComplete = e.data['isComplete'];
this.filePath = e.data['filePath'];
workerInstance.terminate();
setTimeout(() => {
this.downloadStatus = false;
}, LOADING_DURATION_OPEN);
}
};
TaskPool
基本概念和運(yùn)作原理
相比使用Worker實(shí)現(xiàn)多線程并發(fā)杠园,TaskPool更加易于使用,創(chuàng)建開銷也少于Worker舔庶,并且Worker線程有個(gè)數(shù)限制抛蚁,需要開發(fā)者自己掌握,TaskPool的基本概念可參見TaskPool和Worker的對(duì)比惕橙。TaskPool作用是為應(yīng)用程序提供一個(gè)多線程的運(yùn)行環(huán)境篮绿。TaskPool在Worker之上實(shí)現(xiàn)了調(diào)度器和Worker線程池,TaskPool根據(jù)任務(wù)的優(yōu)先級(jí)吕漂,將其放入不同的優(yōu)先級(jí)隊(duì)列亲配,調(diào)度器會(huì)依據(jù)自己實(shí)現(xiàn)的調(diào)度算法(優(yōu)先級(jí),防饑餓)惶凝,從優(yōu)先級(jí)隊(duì)列中取出任務(wù)吼虎,放入TaskPool中的Worker線程池,執(zhí)行相關(guān)任務(wù)苍鲜,流程圖如下所示:
圖5 TaskPool流程示意圖
TaskPool有如下的特點(diǎn):
- 輕量化的并行機(jī)制思灰。
- 降低整體資源的消耗。
- 提高系統(tǒng)的整體性能混滔。
- 無需關(guān)心線程實(shí)例的生命周期洒疚。
- 可以使用TaskPool API創(chuàng)建后臺(tái)任務(wù)(Task),并對(duì)所創(chuàng)建的任務(wù)進(jìn)行如任務(wù)執(zhí)行坯屿、任務(wù)取消的操作油湖。
- 根據(jù)任務(wù)負(fù)載動(dòng)態(tài)調(diào)節(jié)TaskPool工作線程的數(shù)量,以使任務(wù)按照預(yù)期時(shí)間完成任務(wù)领跛。
- 可以設(shè)置任務(wù)的優(yōu)先級(jí)乏德。
- 可以設(shè)置任務(wù)組(TaskGroup)將任務(wù)關(guān)聯(lián)起來。
使用場(chǎng)景和開發(fā)示例
TaskPool的適用場(chǎng)景主要分為如下三類:
- 需要設(shè)置優(yōu)先級(jí)的任務(wù)。
- 需要頻繁取消的任務(wù)喊括。
- 大量或者調(diào)度點(diǎn)較分散的任務(wù)胧瓜。
因?yàn)榕笥讶?chǎng)景存在不同好友同時(shí)上傳視頻圖片,在頻繁滑動(dòng)時(shí)將多次觸發(fā)下載任務(wù)郑什,所以下面將以使用朋友圈加載網(wǎng)絡(luò)數(shù)據(jù)并且進(jìn)行解析和數(shù)據(jù)處理的場(chǎng)景為例府喳,來演示如何使用TaskPool進(jìn)行大量或調(diào)度點(diǎn)較分散的任務(wù)開發(fā)和處理。場(chǎng)景的預(yù)覽圖如下所示:
圖6 朋友圈場(chǎng)景預(yù)覽圖
使用步驟如下:
- 首先import引入TaskPool模塊蘑拯,TaskPool的API介紹可參見@ohos.taskpool(啟動(dòng)TaskPool)劫拢。
import taskpool from '@ohos.taskpool';
- new一個(gè)task對(duì)象,其中傳入被調(diào)用的方法和參數(shù)强胰。
...
// 創(chuàng)建task任務(wù)項(xiàng)舱沧,參數(shù)1.任務(wù)執(zhí)行需要傳入函數(shù) 參數(shù)2.任務(wù)執(zhí)行傳入函數(shù)的參數(shù) (本示例中此參數(shù)為被調(diào)用的網(wǎng)絡(luò)地址字符串)
let task: taskpool.Task = new taskpool.Task(getWebData, jsonUrl);
...
// 獲取網(wǎng)絡(luò)數(shù)據(jù)
@Concurrent
async function getWebData(url: string): Promise<Array<FriendMoment>> {
try {
let webData: http.HttpResponse = await http.createHttp().request(
url,
{ header: {
'Content-Type': 'application/json'
},
connectTimeout: 60000, readTimeout: 60000
})
if (typeof (webData.result) === 'string') {
// 解析json字符串
let jsonObj: Array<FriendMoment> = await JSON.parse(webData.result).FriendMoment;
let friendMomentBuckets: Array<FriendMoment> = new Array<FriendMoment>();
// 下方源碼省略,主要為數(shù)據(jù)解析和耗時(shí)操作處理
...
return friendMomentBuckets;
} else {
// 異常處理
...
}
} catch (err) {
// 異常處理
...
}
}
- 之后使用taskpool.execute執(zhí)行TaskPool任務(wù)偶洋,將待執(zhí)行的函數(shù)放入TaskPool內(nèi)部任務(wù)隊(duì)列等待執(zhí)行熟吏。execute需要兩個(gè)參數(shù):創(chuàng)建的任務(wù)對(duì)象、等待執(zhí)行的任務(wù)組的優(yōu)先級(jí)玄窝,默認(rèn)值是Priority.MEDIUM牵寺。在TaskPool中執(zhí)行完數(shù)據(jù)下載、解析和處理后恩脂,再返回給主線程中帽氓。
let friendMomentArray: Array<FriendMoment> = await taskpool.execute(task, taskpool.Priority.MEDIUM) as Array<FriendMoment>;
- 將新獲取的momentData通過AppStorage.setOrCreate傳入頁面組件中。
// 獲取頁面組件中的momentData對(duì)象俩块,其中是組件所需的username黎休、image、video等數(shù)據(jù)
let momentData = AppStorage.get<FriendMomentsData>('momentData');
// 循環(huán)遍歷對(duì)象并依次傳入momentData
for (let i = 0; i < friendMomentArray.length; i++) {
momentData.pushData(friendMomentArray[i]);
}
// 將更新的momentData返回給頁面組件
AppStorage.setOrCreate('momentData', momentData);
其他場(chǎng)景示例和方案思考
在日常開發(fā)過程中玉凯,我們還會(huì)碰到一些其他并發(fā)場(chǎng)景問題势腮,下面我們介紹了常用并發(fā)場(chǎng)景的示例方案推薦。
Worker線程調(diào)用主線程類型的方法
我們?cè)谥骶€程中創(chuàng)建了一個(gè)對(duì)象漫仆,假如類型為MyMath捎拯,我們需要把這個(gè)對(duì)象傳遞到Worker線程中,然后在Worker線程中執(zhí)行該類型中的一些耗時(shí)操作方法盲厌,比如Math中的compute方法署照,類結(jié)構(gòu)示例代碼如下:
class MyMath {
a: number = 0;
b: number = 1;
constructor(a: number, b: number) {
this.a = a;
this.b = b;
}
compute(): number {
return this.a + this.b;
}
}
主線程代碼:
private math: MyMath = new MyMath(2, 3); // 初始化a和b的值為2和3
private workerInstance: worker.ThreadWorker;
this.workerInstance = new worker.ThreadWorker("entry/ets/worker/MyWorker.ts");
this.workerInstance.postMessage(this.math); // 發(fā)送到Worker線程中,期望執(zhí)行MyMath中的compute方法吗浩,預(yù)期值是2+3=5
MyMath對(duì)象在進(jìn)行線程傳遞后建芙,會(huì)丟失類中的方法屬性,導(dǎo)致我們只是在Worker線程中可以獲取到MyMath的數(shù)據(jù)拓萌,但是無法在子系統(tǒng)中直接調(diào)用MyMath的compute方法岁钓,示意代碼如下:
const workerPort = worker.workerPort;
workerPort.onmessage = (e: MessageEvents): void => {
let a = e.data.a;
let b = e.data.b;
}
這種情況下我們可以怎么去實(shí)現(xiàn)在Worker線程中調(diào)用主線程中類的方法呢?
首先微王,我們嘗試使用強(qiáng)制轉(zhuǎn)換的方式把Worker線程接收到數(shù)據(jù)強(qiáng)制轉(zhuǎn)換成MyMath類型屡限,示例代碼如下:
const workerPort = worker.workerPort;
workerPort.onmessage = (e: MessageEvents): void => {
let math = e.data as MyMath; // 方法一:強(qiáng)制轉(zhuǎn)換
console.log('math compute:' + math.compute()); // 執(zhí)行失敗,不會(huì)打印此日志
}
強(qiáng)制轉(zhuǎn)換后執(zhí)行方法失敗炕倘,不會(huì)打印此日志钧大。因?yàn)樾蛄谢瘋鬏斊胀▽?duì)象時(shí),僅支持傳遞屬性罩旋,不支持傳遞其原型及方法啊央。接下來我們嘗試第二種方法,根據(jù)數(shù)據(jù)重新初始化一個(gè)MyMath對(duì)象涨醋,然后執(zhí)行compute方法瓜饥,示例代碼如下:
const workerPort = worker.workerPort;
workerPort.onmessage = (e: MessageEvents): void => {
// 重新構(gòu)造原類型的對(duì)象
let math = new MyMath(0, 0);
math.a = e.data.a;
math.b = e.data.b;
console.log('math compute:' + math.compute()); // 成功打印出結(jié)果:5
}
第二種方法成功在Worker線程中調(diào)用了MyMath的compute方法。但是這種方式還有弊端浴骂,比如每次使用到這個(gè)類進(jìn)行傳遞乓土,我們就得重新進(jìn)行構(gòu)造初始化,而且構(gòu)造的代碼會(huì)分散到工程的各處溯警,很難進(jìn)行維護(hù)趣苏,于是我們有了第三種改進(jìn)方案。
第三種方法梯轻,我們需要構(gòu)造一個(gè)接口類食磕,包含了我們需要線程間調(diào)用的基礎(chǔ)方法,這個(gè)接口類主要是管理和約束MyMath類的功能規(guī)格喳挑,保證MyMath類和它的代理類MyMathProxy類在主線程和子線程的功能一致性彬伦,示例代碼如下:
interface MyMathInterface {
compute():number;
}
然后,我們把MyMath類繼承這個(gè)方法伊诵,并且額外構(gòu)造一個(gè)代理類媚朦,繼承MyMath類,示例代碼如下:
class MyMath implements MyMathInterface {
a: number = 0;
b: number = 1;
constructor(a: number, b: number) {
console.log('MyMath constructor a:' + a + ' b:' + b)
this.a = a;
this.b = b;
}
compute(): number {
return this.a + this.b;
}
}
class MyMathProxy implements MyMathInterface {
private myMath: MyMath;
constructor(math: MyMath) {
this.myMath = new MyMath(math.a, math.b);
}
// 代理MyMath類的compute方法
compute(): number {
return this.myMath.compute();
}
}
我們?cè)谥骶€程構(gòu)造并且傳遞MyMath對(duì)象后日戈,在Worker線程中轉(zhuǎn)換成MyMathProxy询张,即可調(diào)用到MyMath的compute方法了,并且無需在多處進(jìn)行初始化構(gòu)造浙炼,只要把構(gòu)造邏輯放到MyMathProxy或者M(jìn)yMath的構(gòu)造函數(shù)中份氧,Worker線程中的示例代碼如下:
const workerPort = worker.workerPort;
workerPort.onmessage = (e: MessageEvents): void => {
// 方法三:使用代理類構(gòu)造對(duì)象
let proxy = new MyMathProxy(e.data)
console.log('math compute:' + proxy.compute()); // 成功打印出結(jié)果:5
}
大家可以根據(jù)實(shí)際場(chǎng)景選擇第二種或者第三種方案。
寫在最后
如果你覺得這篇內(nèi)容對(duì)你還蠻有幫助弯屈,我想邀請(qǐng)你幫我三個(gè)小忙:
- 點(diǎn)贊蜗帜,轉(zhuǎn)發(fā),有你們的 『點(diǎn)贊和評(píng)論』资厉,才是我創(chuàng)造的動(dòng)力厅缺。
- 關(guān)注小編,同時(shí)可以期待后續(xù)文章ing??,不定期分享原創(chuàng)知識(shí)湘捎。
- 想要獲取更多完整鴻蒙最新學(xué)習(xí)知識(shí)點(diǎn)诀豁,請(qǐng)移步前往小編:
https://gitee.com/MNxiaona/733GH/blob/master/jianshu