概述
在應(yīng)用開(kāi)發(fā)中,為了避免主線程阻塞,提高應(yīng)用性能韩脑,需要將一些耗時(shí)操作放在子線程中執(zhí)行。此時(shí)粹污,子線程就需要訪問(wèn)主線程中的數(shù)據(jù)段多。ArkTS采用了基于消息通信的Actor并發(fā)模型,具有內(nèi)存隔離的特性壮吩,所以跨線程傳輸數(shù)據(jù)時(shí)需要將數(shù)據(jù)序列化衩匣,但是AkrTS支持通過(guò)可共享對(duì)象SharedArrayBuffer實(shí)現(xiàn)直接的共享內(nèi)存。
在開(kāi)發(fā)應(yīng)用時(shí)粥航,如果遇到數(shù)據(jù)量較大琅捏,并且需要多個(gè)線程同時(shí)操作的情況,推薦使用SharedArrayBuffer共享內(nèi)存递雀,可以減少數(shù)據(jù)在線程間傳遞時(shí)需要復(fù)制和序列化的額外開(kāi)銷(xiāo)柄延。比如,音視頻解碼播放、多個(gè)線程同時(shí)讀取寫(xiě)入文件等場(chǎng)景搜吧。由于內(nèi)存是共享的市俊,所以在多個(gè)線程同時(shí)操作同一塊內(nèi)存時(shí),可能會(huì)引起數(shù)據(jù)的紊亂滤奈,這時(shí)就需要使用鎖來(lái)確保數(shù)據(jù)操作的有序性摆昧。本文將基于此具體展開(kāi)說(shuō)明。關(guān)于多線程的使用和原理蜒程,可參考OpenHarmony多線程能力場(chǎng)景化示例實(shí)踐绅你,本文將不再詳細(xì)講述。
工作原理
可共享對(duì)象SharedArrayBuffer昭躺,是擁有固定長(zhǎng)度的原始二進(jìn)制數(shù)據(jù)緩沖區(qū)忌锯,可以存儲(chǔ)任何類(lèi)型的數(shù)據(jù),包括數(shù)字领炫、字符串等偶垮。它支持在多線程之間傳遞,傳遞之后的SharedArrayBuffer對(duì)象和原始的SharedArrayBuffer對(duì)象可以指向同一塊內(nèi)存帝洪,進(jìn)而達(dá)到共享內(nèi)存的目的似舵。SharedArrayBuffer對(duì)象存儲(chǔ)的數(shù)據(jù)在子線程中被修改時(shí),需要通過(guò)原子操作保證其同步性葱峡,即下個(gè)操作開(kāi)始之前務(wù)必需要保證上個(gè)操作已經(jīng)結(jié)束啄枕。下面將通過(guò)示例說(shuō)明原子操作保證同步性的必要性。
非原子操作
......
// 非原子操作族沃,進(jìn)行10000次++
@Concurrent
function normalProcess(int32Array: Int32Array) {
for (let i = 0; i < 10000; i++) {
int32Array[0]++;
}
}
// 原子操作频祝,進(jìn)行10000次++
@Concurrent
function atomicsProcess(int32Array: Int32Array) {
for (let i = 0; i < 10000; i++) {
Atomics.add(int32Array, 0, 1);
}
}
......
@State result: string = "計(jì)算結(jié)果:";
private taskNum: number = 2;
private scroller: Scroller = new Scroller();
......
Button("非原子操作")
.width("80%")
.fontSize(30)
.fontWeight(FontWeight.Bold)
.margin({ top: 30 })
.onClick(async () => {
this.sharedArrayBufferUsage(false);
})
Scroll(this.scroller) {
Column() {
Text(this.result)
.width("80%")
.fontSize(30)
.fontWeight(FontWeight.Bold)
.fontColor(Color.Blue)
}
}.height("60%")
.margin({ top: 30 })
......
// 根據(jù)傳入的值isAtomics判斷是否使用原子操作
sharedArrayBufferUsage(isAtomics: boolean) {
// 創(chuàng)建長(zhǎng)度為4的SharedArrayBuffer對(duì)象
let sab: SharedArrayBuffer = new SharedArrayBuffer(4);
// 由于SharedArrayBuffer是原始二進(jìn)制數(shù)據(jù)緩沖區(qū),無(wú)法直接使用脆淹,所以這里轉(zhuǎn)換為Int32Array類(lèi)型進(jìn)行后續(xù)操作
let int32Array: Int32Array = new Int32Array(sab);
int32Array[0] = 0;
let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
// 創(chuàng)建Task對(duì)象常空,并放入TaskGroup中執(zhí)行
for (let i = 0; i < this.taskNum; i++) {
if (isAtomics) {
taskGroup.addTask(new taskpool.Task(atomicsProcess, int32Array));
} else {
taskGroup.addTask(new taskpool.Task(normalProcess, int32Array));
}
}
taskpool.execute(taskGroup).then(() => {
// 將結(jié)果打印在Text上
this.result = this.result + "\n" + int32Array;
// 如果Scroll不在最低端,則滑動(dòng)到最低端
if (!this.scroller.isAtEnd()) {
this.scroller.scrollEdge(Edge.Bottom);
}
}).catch((e: BusinessError) => {
logger.error(e.message);
})
}
在這段代碼中盖溺,創(chuàng)建了2個(gè)task漓糙,對(duì)SharedArrayBuffer分別進(jìn)行了10000次自增操作,預(yù)期的結(jié)果應(yīng)該是20000烘嘱。點(diǎn)擊按鈕查看計(jì)算結(jié)果昆禽,就會(huì)發(fā)現(xiàn)最后的結(jié)果并不一定是20000,并且每次點(diǎn)擊后蝇庭,計(jì)算的結(jié)果都可能是不同的醉鳖。
這是因?yàn)镾haredArrayBuffer是共享內(nèi)存的,多個(gè)線程同時(shí)進(jìn)行自增時(shí)哮内,是操作的同一塊內(nèi)存盗棵,而自增操作并不是原子操作,需要經(jīng)過(guò)以下三個(gè)步驟:
- 第一步,從內(nèi)存中取值
- 第二步纹因,對(duì)取出的值+1
- 第三步喷屋,將結(jié)果寫(xiě)入內(nèi)存
當(dāng)多個(gè)線程同時(shí)操作時(shí),就會(huì)發(fā)生這樣一種情況:A線程在第一步取值1000瞭恰,第二步+1操作后是1001屯曹,在執(zhí)行第三步之前,B線程也去取值了惊畏,這時(shí)由于A線程還沒(méi)有將結(jié)果寫(xiě)入內(nèi)存恶耽,所以B線程取到的值依然是1000,然后A執(zhí)行第三步將1001寫(xiě)入了內(nèi)存陕截,而B(niǎo)會(huì)對(duì)1000進(jìn)行+1操作并將結(jié)果1001寫(xiě)入同一塊內(nèi)存驳棱。這樣就會(huì)導(dǎo)致明明進(jìn)行了兩次+1的操作批什,但是結(jié)果并沒(méi)有變成預(yù)期的1002农曲,而是1001。所以在這個(gè)示例中會(huì)出現(xiàn)結(jié)果不符合預(yù)期的情況驻债。
原子操作
下面修改一下代碼乳规,將自增操作改為使用Atomics.add()方法的原子操作。
......
Button("原子操作")
.width("80%")
.fontSize(30)
.fontWeight(FontWeight.Bold)
.margin({ top: 30 })
.onClick(async () => {
this.sharedArrayBufferUsage(true);
})
......
點(diǎn)擊按鈕查看計(jì)算結(jié)果合呐,就會(huì)發(fā)現(xiàn)不論計(jì)算多少次暮的,結(jié)果一直都是20000。這是因?yàn)樘适担硬僮魇遣豢芍袛嗟囊粋€(gè)或者一系列操作冻辩,可以保證在A線程執(zhí)行完取值、計(jì)算拆祈、寫(xiě)入內(nèi)存這三個(gè)步驟之前恨闪,不會(huì)被B線程中斷,也就不會(huì)發(fā)生非原子操作示例中B線程取到舊值的情況放坏,而是每次都能拿到A線程寫(xiě)入內(nèi)存的新值咙咽。所以,在使用SharedArrayBuffer共享內(nèi)存時(shí)淤年,一定要注意使用原子操作保證同步性钧敞,否則就可能會(huì)造成數(shù)據(jù)的紊亂。
場(chǎng)景示例
在應(yīng)用開(kāi)發(fā)中使用多線程時(shí)麸粮,會(huì)遇到處理復(fù)雜邏輯的情況溉苛,是無(wú)法保證整個(gè)線程都是一個(gè)原子操作的,此時(shí)就可以使用鎖來(lái)解決一段代碼的原子性問(wèn)題弄诲。
鎖的實(shí)現(xiàn)
并發(fā)編程重在解決線程間分工炊昆、同步與互斥的問(wèn)題,而實(shí)現(xiàn)互斥的重要方式是通過(guò)鎖。示例通過(guò)Atomics和SharedArrayBuffer簡(jiǎn)單實(shí)現(xiàn)不可重入鎖類(lèi)NonReentrantLock凤巨。
constructor()通過(guò)傳入可共享對(duì)象SharedArrayBuffer初始化鎖视乐,實(shí)現(xiàn)多線程共享同一塊內(nèi)存,以作為共同操作的標(biāo)志位敢茁,從而控制鎖的狀態(tài)佑淀。
const UNLOCKED = 0;
const LOCKED_SINGLE = 1;
const LOCKED_MULTI = 2;
export class NonReentrantLock {
flag: Int32Array;
constructor(sab: SharedArrayBuffer) { // 傳入一個(gè)4bytes的SharedArrayBuffer
this.flag= new Int32Array(sab); // 其視圖為只有一位的int數(shù)組(1 = 4bytes * 8 / 32bit)
}
lock(): void {...}
tryLock(): boolean {...}
unlock(): void {...}
}
lock()方法用于獲取鎖,如果獲取鎖失敗彰檬,則線程進(jìn)入阻塞狀態(tài)伸刃。
lock(): void {
const flag= this.flag;
let c = UNLOCKED;
// 如果flag數(shù)組的0位置,當(dāng)前值為UNLOCKED逢倍,則改為L(zhǎng)OCKED_SINGLE捧颅;否則,進(jìn)入do-while循環(huán)较雕,阻塞線程
if ((c = Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_SINGLE)) !== UNLOCKED) {
do {
// 有線程拿不到鎖時(shí)碉哑,修改標(biāo)志位為L(zhǎng)OCKED_MULTI,并使之進(jìn)入睡眠阻塞狀態(tài)
if (c === LOCKED_MULTI || Atomics.compareExchange(flag, 0, LOCKED_SINGLE, LOCKED_MULTI) !== UNLOCKED) {
Atomics.wait(flag, 0, LOCKED_MULTI);
}
// 被喚醒的線程亮蒋,如果還是沒(méi)有拿到鎖扣典,就回到循環(huán)中,重新進(jìn)入阻塞狀態(tài)
} while ((c = Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_MULTI)) !== UNLOCKED);
}
}
tryLock()方法用于嘗試獲取鎖慎玖,如果獲取鎖成功則返回true贮尖,失敗返回false,但不會(huì)阻塞線程趁怔。
tryLock(): boolean {
const flag= this.flag;
return Atomics.compareExchange(flag, 0, UNLOCKED, LOCKED_SINGLE) === UNLOCKED;
}
unlock()方法用于釋放鎖湿硝。
unlock(): void {
// 局部化flag,保證只有獲取鎖的線程可以釋放鎖
const flag= this.flag;
let v0 = Atomics.sub(flag, 0, 1);
if (v0 !== LOCKED_SINGLE) {
Atomics.store(flag, 0, UNLOCKED);
// 只喚醒在數(shù)組0索引位置等待的其中一個(gè)線程润努,去上方lock()方法while條件中檢測(cè)关斜,嘗試獲取鎖
Atomics.notify(flag, 0, 1);
}
}
鎖的應(yīng)用
示例通過(guò)多線程寫(xiě)入文件的場(chǎng)景,展示多線程不合理操作共享內(nèi)存時(shí)任连,出現(xiàn)的線程不安全問(wèn)題蚤吹,進(jìn)而導(dǎo)致輸出文件亂碼的情況。并通過(guò)使用上文實(shí)現(xiàn)的NonReentrantLock随抠,解決該問(wèn)題裁着。
主線程通過(guò)startWrite(useLock: boolean)方法,開(kāi)啟多線程寫(xiě)入文件拱她,并通過(guò)useLock參數(shù)控制是否使用鎖睹簇。
@Component
export struct LockUsage {
taskNum: number = 10; // 任務(wù)數(shù)擎椰,實(shí)際并行線程數(shù)依設(shè)備而定
baseDir: string = getContext().filesDir + '/TextDir'; // 文件寫(xiě)入的應(yīng)用沙箱路徑
sabInLock: SharedArrayBuffer = new SharedArrayBuffer(4); // 在主線程娇掏,初始化子線程鎖標(biāo)志位,所使用的共享內(nèi)存
sabForLine: SharedArrayBuffer = new SharedArrayBuffer(4); // 在主線程矿酵,初始化子線程偏移位,所使用的共享內(nèi)存
@State result: string = "";
build() {
Row() {
Column() {
// 不使用鎖寫(xiě)入的按鈕
Button($r('app.string.not_use_lock'))
.width("80%").fontSize(30)
.fontWeight(FontWeight.Bold)
.margin({ top: 30 })
.onClick(async () => {
this.startWrite(false);
})
// 使用鎖寫(xiě)入的按鈕
Button($r('app.string.use_lock'))
.width("80%")
.fontSize(30)
.fontWeight(FontWeight.Bold)
.margin({ top: 30 })
.onClick(async () => {
this.startWrite(true);
})
// 運(yùn)行狀態(tài)說(shuō)明
Text(this.result)
.width("80%")
.fontSize(30)
.fontWeight(FontWeight.Bold)
.fontColor(Color.Blue)
.margin({ top: 30 })
}
.width('100%')
}
.height('100%')
}
startWrite(useLock: boolean): void {
// 指明運(yùn)行狀態(tài)為“寫(xiě)入文件開(kāi)始”
this.result = getContext().resourceManager.getStringSync($r('app.string.write_file_start'));
// 初始化寫(xiě)入時(shí)的偏移量
let whichLineToWrite: Int32Array = new Int32Array(this.sabForLine);
Atomics.store(whichLineToWrite, 0, 0);
// 開(kāi)啟多線程依據(jù)偏移量指定位置寫(xiě)入文件
// 通過(guò)主線程的sabInLock:SharedArrayBuffer初始化鎖矗积,保證多線程操作同一處鎖標(biāo)志位
// 通過(guò)主線程的sabForLine:SharedArrayBuffer初始化偏移位全肮,保證多線程操作同一處偏移位置
let taskPoolGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
for (let i: number = 0; i < this.taskNum; i++) {
taskPoolGroup.addTask(new taskpool.Task(createWriteTask, this.baseDir, i, this.sabInLock, this.sabForLine, useLock));
}
taskpool.execute(taskPoolGroup).then(() => {
// 指明運(yùn)行狀態(tài)為“寫(xiě)入文件成功”
this.result = this.result = getContext().resourceManager.getStringSync($r('app.string.write_file_success'));
}).catch(() => {
// 指明運(yùn)行狀態(tài)為“寫(xiě)入文件失敗”
this.result = getContext().resourceManager.getStringSync($r('app.string.write_file_failed'));
})
}
}
子線程根據(jù)偏移量在指定位置寫(xiě)入文件,并通過(guò)偏移量自增棘捣,指定下次的寫(xiě)入位置辜腺。
@Concurrent
async function createWriteTask(baseDir: string, writeText: number, sabInLock: SharedArrayBuffer, sabForLine: SharedArrayBuffer, useLock: boolean): Promise<void> {
class Option { // 寫(xiě)入文件時(shí)的接口方法參數(shù)類(lèi)
offset: number = 0;
length: number = 0;
encoding: string = 'utf-8';
constructor(offset: number, length: number) {
this.offset = offset;
this.length = length;
}
}
// 初始化輸出文件目錄
let filePath: string | undefined = undefined;
filePath = baseDir + useLock ? "/useLock.txt" : "/unusedLock.txt";
if (!fs.accessSync(baseDir)) {
fs.mkdirSync(baseDir);
}
// 利用主線程傳入的SharedArrayBuffer初始化鎖
let nrl: NonReentrantLock | undefined = undefined;
if (useLock) {
nrl = new NonReentrantLock(sabInLock);
}
// 利用主線程傳入的SharedArrayBuffer初始化寫(xiě)入文件時(shí)的偏移量
let whichLineToWrite: Int32Array = new Int32Array(sabForLine);
let str: string = writeText + '\n';
for (let i: number = 0; i < 100; i++) {
// 獲取鎖
if (useLock && nrl !== undefined) {
nrl.lock();
}
// 寫(xiě)入文件
let file: fs.File = fs.openSync(filePath, fs.OpenMode.READ_WRITE | fs.OpenMode.CREATE);
try {
fs.writeSync(file.fd, str, new Option(whichLineToWrite[0], str.length));
} catch (err) {
logger.error(`errorCode : ${err.code},errMessage : ${err.message}`);
}
fs.closeSync(file);
// 修改偏移量,指定下次寫(xiě)入時(shí)的位置
whichLineToWrite[0] += str.length;
// 釋放鎖
if (useLock && nrl !== undefined) {
nrl.unlock();
}
}
}
從應(yīng)用沙箱地址查看寫(xiě)入的文件乍恐,可以看到unusedLock.txt文件评疗,所寫(xiě)行數(shù)不足1000行,且存在亂碼茵烈,如圖1所示百匆。
圖1 不使用鎖寫(xiě)入的文件
而usedLock.txt文件,所寫(xiě)行數(shù)剛好1000行呜投,且不存在亂碼加匈,如圖2所示。
圖2 使用鎖寫(xiě)入的文件
總結(jié)
綜上所述宙彪,雖然使用了基于消息通信的Actor并發(fā)模型矩动,但是ArkTS依舊支持通過(guò)共享內(nèi)存的方式進(jìn)行線程間通信有巧。同時(shí)释漆,在使用SharedArrayBuffer進(jìn)行共享內(nèi)存時(shí),也需要通過(guò)原子操作或者鎖來(lái)解決線程間同步與互斥的問(wèn)題篮迎。合理使用多線程共享內(nèi)存男图,才能在保證線程安全的前提下,提升應(yīng)用的性能甜橱。
寫(xiě)在最后
如果你覺(jué)得這篇內(nèi)容對(duì)你還蠻有幫助逊笆,我想邀請(qǐng)你幫我三個(gè)小忙:
- 點(diǎn)贊,轉(zhuǎn)發(fā)岂傲,有你們的 『點(diǎn)贊和評(píng)論』难裆,才是我創(chuàng)造的動(dòng)力。
- 關(guān)注小編镊掖,同時(shí)可以期待后續(xù)文章ing??乃戈,不定期分享原創(chuàng)知識(shí)。
- 想要獲取更多完整鴻蒙最新學(xué)習(xí)知識(shí)點(diǎn)亩进,請(qǐng)移步前往小編:
https://gitee.com/MNxiaona/733GH/blob/master/jianshu