現(xiàn)代操作系統(tǒng)都是「多任務(wù)」的,也就是操作系統(tǒng)可以「并發(fā)」處理多個任務(wù)匣摘,比如可以在瀏覽頁面的時候同時播放音樂怒医。但是,一般來說我們的 PC 只有一個物理 CPU 脆丁,那么它是如何做到在只有一個 CPU 的情況下世舰,并發(fā)處理多個任務(wù)的呢?我們簡單探究一下槽卫。
前置知識
我們先簡單熟悉一下 CPU 硬件相關(guān)的術(shù)語:
- Sockets(physical CPU): 物理CPU跟压,指我們主板上實際插入的CPU,一般來說 PC 只有一個晒夹,服務(wù)器可能會有多個
- Cores: CPU物理核心裆馒,CPU商品上宣傳的一共幾核指代的就是這個
- Logical Processors: 邏輯處理器姊氓,如果采用超線程(多線程)技術(shù)的話,會比物理核心數(shù)多
總的來說: Logical Processors = Sockets _ Cores _ SMT(HT) Multiple
邏輯處理器數(shù)量也就代表了操作系統(tǒng)認(rèn)為能「并行」執(zhí)行的任務(wù)的最高數(shù)量
并發(fā) VS 并行
我們對「并發(fā)」和「并行」先下個定義喷好,「并發(fā)」指的是系統(tǒng)允許多個任務(wù)同時存在翔横,「并行」則指的是系統(tǒng)支持多個任務(wù)同時執(zhí)行,「并發(fā)」和「并行」的關(guān)鍵區(qū)別在于是否能同時執(zhí)行梗搅。在只有單一邏輯處理器的情況下禾唁,我們的操作系統(tǒng)只能「并發(fā)」執(zhí)行任務(wù),比如早期的單核 CPU 電腦无切。但是我們?nèi)匀豢梢赃吢牳柽厼g覽網(wǎng)頁荡短,這是因為 CPU 速度足夠快,可以在系統(tǒng)的使用過程中快速切換任務(wù)哆键,這樣我們就感覺到多個任務(wù)同時存在掘托。在單一邏輯處理器的情況下,雖然我們可以「并發(fā)」執(zhí)行任務(wù)籍嘹,但實際上我們同時也只能執(zhí)行一個任務(wù)闪盔,對于 IO 密集類型的任務(wù),我們用到 CPU 的時間不多辱士,決定任務(wù)快慢的往往是硬盤以及網(wǎng)絡(luò)等硬件泪掀,「并發(fā)」執(zhí)行也未嘗不可,但是對于計算密集型的任務(wù)颂碘,我們需要占用更多的 CPU 時間异赫,如果「并發(fā)」執(zhí)行,則往往會造成任務(wù)的卡頓(響應(yīng)時間過長)头岔,因此我們需要「并行」的執(zhí)行該任務(wù)塔拳,而邏輯處理器的數(shù)量代表了能「并行」執(zhí)行任務(wù)的最高數(shù)量,這也是為什么現(xiàn)在的處理器大多是多核處理器的原因所在切油。
進(jìn)程 VS 線程
我們使用的一個個程序可以稱為「進(jìn)程」( process )蝙斜,而 process 下可以開辟多個「線程」( thread ),這里引用一下 Microsoft 官方對于進(jìn)程和線程的解釋About Processes and Threads:
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
A thread is the entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients.
在操作系統(tǒng)層面澎胡,process 相互獨立孕荠,擁有一塊獨立的虛擬地址空間(內(nèi)存中),而同一 process 下的 thread 共享該虛擬地址空間攻谁,這也是 process 和 thread 最典型稚伍,最根本的區(qū)別
多進(jìn)程 VS 多線程
假如我們現(xiàn)在要開發(fā)一款瀏覽器,瀏覽器的基礎(chǔ)功能包括 HTTP 請求戚宦,GUI 渲染等功能个曙,如果我們采用單線程來開發(fā),那么勢必會遇到一個問題: 當(dāng)需要網(wǎng)絡(luò)請求的時候,我們的瀏覽器就會卡住垦搬,所有的用戶操作如輸入等都沒有響應(yīng)呼寸,等網(wǎng)絡(luò)請求完成,我們才可以進(jìn)行后續(xù)操作猴贰,非常影響用戶體驗对雪,這也是為什么像瀏覽器這樣的程序大多都是多線程的原因,我們需要任務(wù)同時進(jìn)行米绕。但是我們前面講到的多進(jìn)程也可以多任務(wù)同時進(jìn)行瑟捣,那么問題就來了,當(dāng)我們需要實現(xiàn)多任務(wù)的時候栅干,多進(jìn)程和多線程該如何選擇呢迈套?
多進(jìn)程
前面我們提到過,進(jìn)程之間是相互獨立的碱鳞,每個進(jìn)程有獨立的虛址空間桑李,那么當(dāng)一個進(jìn)程因為某些原因崩掉了,其他的進(jìn)程也不會受到影響(主進(jìn)程掛掉除外劫笙,但是主進(jìn)程一般只負(fù)責(zé)調(diào)度芙扎,掛掉的幾率較小)填大,所以當(dāng)我們需要較高的穩(wěn)定性時,可以考慮多進(jìn)程俏橘。但是創(chuàng)建進(jìn)程的開銷是比較大的允华,因此要考慮資源問題。
多線程
多線程可以共享虛址空間寥掐,而且創(chuàng)建一個線程的開銷較小靴寂,這樣我們就可以減少資源的占用。但是正是因為線程之間可以共享虛址空間召耘,當(dāng)一個線程掛掉了百炬,整個進(jìn)程會隨之掛掉,所以多線程的穩(wěn)定性相比多進(jìn)程較差污它。
Node.js 中的多線程與多進(jìn)程
child_process & cluster
Node.js提供了多種方法來創(chuàng)建多進(jìn)程剖踊,例如 child_process 提供的 child_process.spawn()
和 child_process.fork()
,那么什么是 spawn :
Spawn in computing refers to a function that loads and executes a new child process. The current process may wait for the child to terminate or may continue to execute concurrent computing.
所以 child_process.spawn
的作用是創(chuàng)建了一個子進(jìn)程衫贬,然后在子進(jìn)程執(zhí)行一些命令德澈,但是 child_process.spawn()
有一個缺點,就是不能進(jìn)行進(jìn)程間通信(IPC: Inter Process Communication)固惯,那么當(dāng)需要進(jìn)程間通信的時候梆造,就需要使用child_process.fork()
。
涉及到現(xiàn)實中多進(jìn)程的運用葬毫,我們往往不會只起一個子進(jìn)程镇辉,當(dāng)我們需要進(jìn)程間共享一個端口時屡穗,這時候就可以使用Node.js提供的cluster
,cluster
創(chuàng)建子進(jìn)程內(nèi)部也是通過child_process.fork()
實現(xiàn)的,支持IPC
structured clone
當(dāng)我們創(chuàng)建了一個子進(jìn)程的時候忽肛,進(jìn)程間的通信 Node.js 已經(jīng)幫我們封裝好了村砂,使用 worker.send(message)
和 process.on('message', handle)
就可以實現(xiàn)進(jìn)程間的通信,以 cluster
為例:
if (cluster.isPrimary) {
const worker = cluster.fork();
worker.send('hi there');
} else if (cluster.isWorker) {
process.on('message', (msg) => {
process.send(msg);
});
}
但是需要注意一點麻裁,我們發(fā)送的 message 會被 structured clone 一份箍镜,然后傳遞給其他進(jìn)程,因此我們需要注意如果傳遞了一個 Object 過去煎源,Object 中定義的 Function 及其 prototype 等內(nèi)容都不會被clone過去色迂。這里發(fā)散一下,如果我們需要深拷貝一個對象手销,而且該對象滿足Structured clone的相關(guān)算法要求歇僧,那么我們可以考慮使用structuredClone
(caniuse)或者直接創(chuàng)建一個worker來拷貝(當(dāng)然不推薦)
worker_threads
上述我們講到進(jìn)程間的資源是獨立的,當(dāng)我們想共享數(shù)據(jù)的時候锋拖,我們需要structured clone 對應(yīng)的數(shù)據(jù)然后傳遞過去诈悍,這在共享數(shù)據(jù)量較小的時候還可以接受,但是當(dāng)數(shù)據(jù)量較多時兽埃,克隆數(shù)據(jù)是一個比較大的開銷侥钳,這是我們所不能接受的,因此我們需要多線程來共享內(nèi)存(數(shù)據(jù))柄错,Node.js 中也提供了相應(yīng)的方法 worker_threads
舷夺。
多線程在 ko 中的實踐
ko
ko 是基于 webpack@5.x 的打包工具,其倉庫采用了 Monorepo 的方式進(jìn)行包管理售貌。
在這里给猾,ko 提供了 concurrency 模式,該模式下使用多線程執(zhí)行 eslint 颂跨、prettier 或 stylelint 敢伸,這里簡單介紹一下如何實現(xiàn)。
獲取需要 lint 的所有文件
這里使用的是 fast-glob
恒削,主要代碼如下所示 factory/runner.ts:
import fg, { Pattern } from 'fast-glob';
protected async getEntries(
patterns: Pattern[],
ignoreFiles: string[]
): Promise<string[]> {
return fg(patterns, {
dot: true,
ignore: this.getIgnorePatterns(...ignoreFiles),
});
}
private getIgnorePatterns(...ignoreFiles: string[]) {
return ['.gitignore', ...ignoreFiles]
.map(fileName => {
const filePath = join(this.cwd, fileName);
if (existsSync(filePath)) {
return readFileSync(filePath, 'utf-8')
.split('\n')
.filter(str => str && !str.startsWith('#'));
}
return [];
})
.reduce((acc, current) => {
current.forEach(p => {
if (!acc.includes(p)) {
acc.push(p);
}
});
return acc;
}, []);
}
返回的是需要 lint 的所有文件路徑
lint 相關(guān)的 Parser
我們以 eslint 為例eslint/parser.ts:
import { eslint } from 'ko-lint-config';
import LintParserFactory from '../factory/parser';
import { IParserOpts } from '../interfaces';
class ESLintParser extends LintParserFactory {
static readonly EXTENSIONS = ['ts', 'tsx', 'js', 'jsx'];
private eslintInstance: eslint.ESLint;
private opts: IParserOpts;
private config: Record<string, any>;
constructor(opts: IParserOpts) {
super();
this.opts = opts;
this.generateConfig();
this.initInstance();
}
private initInstance() {
const { write } = this.opts;
this.eslintInstance = new eslint.ESLint({
fix: write,
overrideConfig: this.config,
useEslintrc: false,
extensions: ESLintParser.EXTENSIONS,
});
}
public async format(file: string): Promise<string> {
const formatter = await this.eslintInstance.loadFormatter();
let resultText = '';
try {
const result = await this.eslintInstance.lintFiles(file);
if (result[0].errorCount) {
resultText = formatter.format(result) as string;
}
return resultText;
} catch (ex) {
console.log(ex);
process.exit(1);
}
}
public generateConfig() {
if (this.opts.configPath) {
this.config = this.getConfigFromFile(this.opts.configPath);
} else {
const localConfigPath = this.detectLocalRunnerConfig(this.opts.name);
if (localConfigPath) {
this.config = this.getConfigFromFile(localConfigPath);
}
}
}
}
export default ESLintParser;
所有的 parser 實現(xiàn)了 format() 方法池颈,作用是輸入一個文件的路徑,然后進(jìn)行 lint 蔓同,如果有相關(guān)的錯誤則返回錯誤結(jié)果饶辙。
Thread Pool
創(chuàng)建一個線程的是有開銷的,雖然相比創(chuàng)建進(jìn)程而言消耗的較小斑粱,但是我們也并不能無休止創(chuàng)建線程弃揽。線程是需要調(diào)度的,如果我們創(chuàng)建了很多線程,那么系統(tǒng)花在線程調(diào)度的時間往往會更長矿微,導(dǎo)致的結(jié)果是我們開了多個線程痕慢,但是執(zhí)行程序的耗時反而更長了。為了更好的使用線程涌矢,我們引入線程池的概念 WikiPedia:
In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program
還是WikiPedia的示例圖:
簡單來說掖举,線程池創(chuàng)建了一定數(shù)量的線程,每個線程從任務(wù)隊列中獲取任務(wù)并執(zhí)行娜庇,然后繼續(xù)執(zhí)行下一個任務(wù)直到結(jié)束塔次。ko中也實現(xiàn)了相關(guān)的線程池 threads/Pool.ts。
import { join } from 'path';
import { Worker } from 'worker_threads';
import { IThreadOpts, IParserOpts } from '../interfaces';
class ThreadPool {
private readonly workers: Worker[] = [];
private readonly workerPList: Promise<boolean>[] = [];
private readonly opts: IThreadOpts;
private queue: string[];
private stdout: string[] = [];
constructor(opts: IThreadOpts) {
console.log('Using Multithreading...');
this.opts = opts;
this.queue = this.opts.entries;
this.format();
}
format() {
const { concurrentNumber, configPath, write, name } = this.opts;
if (this.workers.length < concurrentNumber) {
this.workerPList.push(
this.createWorker({
configPath,
write,
name,
})
);
this.format();
}
}
createWorker(opts: IParserOpts): Promise<boolean> {
const worker = new Worker(join(__dirname, './Worker.js'), {
workerData: {
opts,
},
});
return new Promise(resolve => {
worker.postMessage(this.queue.shift());
worker.on('message', (result: string) => {
this.stdout.push(result);
if (this.queue.length === 0) {
resolve(true);
} else {
const next = this.queue.shift();
worker.postMessage(next);
}
});
worker.on('error', err => {
console.log(err);
process.exit(1);
});
this.workers.push(worker);
});
}
async exec(): Promise<string[]> {
return Promise.all(this.workerPList).then(() => {
return this.stdout;
});
}
}
export default ThreadPool;
這里的 workers
維護(hù)了多個 worker 名秀,相當(dāng)于線程池的概念励负,而任務(wù)隊列對應(yīng)的則是 queue
,也就是傳入的需要 lint 的所有文件匕得,當(dāng)一個 worker 執(zhí)行完一個文件的 lint 之后继榆,從 queue
中拿一個新的文件繼續(xù)執(zhí)行新的 lint 任務(wù),當(dāng) queue
為空時汁掠,我們結(jié)束任務(wù)并返回最終結(jié)果略吨。
需要注意的一點是關(guān)于 concurrentNumber
也就是我們啟動的線程數(shù)量,這里我們默認(rèn)是 Logical Processors 的數(shù)量考阱。
結(jié)果
那么我們來對比一下多線程和普通情況下的性能翠忠,以執(zhí)行 eslint 為例:
硬件信息:
- CPU: Apple M1
- Memory: 8 GB LPDDR4
普通模式下的log為:
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write
exec eslint with 704 files cost 31.71s
多線程模式下的log為:
exec cmd: pnpm exec ko eslint '**/*.{ts,tsx,js,jsx}' --write --concurrency
Using Multithreading...
exec eslint with 704 files cost 23.60s
可以看到性能有一定程度的提升,但是并沒有我們想象中的性能提升多倍乞榨,這是為什么呢负间?我們簡單分析一下:
- 線程啟動及其調(diào)度消耗了一定的時間
- 線程內(nèi)部涉及到了IO操作,而不是單純的運算
但是可以肯定的是姜凄,隨著需要 lint 的文件數(shù)量增多,兩個模式下所用的時間差會增大趾访。
線程安全
在 ko 中态秧, 我們針對 lint 進(jìn)行了多線程的操作,性能上有了一定程度的提升扼鞋,但是我們線程間總的來說是相互獨立的申鱼,沒有使用到共享內(nèi)存的情況。那么當(dāng)我們需要共享內(nèi)存時云头,會遇到一個問題捐友,我們啟用了多個線程,線程之間針對共享內(nèi)存可能存在競爭關(guān)系溃槐,也就是可能會同時操作共享內(nèi)存中的數(shù)據(jù)匣砖,這個時候我們就不能保證數(shù)據(jù)的準(zhǔn)確性,專業(yè)術(shù)語描述為不是線程安全的。遇到這種情況猴鲫,我們一般會涉及到一個專業(yè)術(shù)語鎖(Lock)
我們回到 work_threads
对人,看一下官方文檔中是如何共享內(nèi)存的:
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// This posts a copy of `uint8Array`:
port2.postMessage(uint8Array);
// This does not copy data, but renders `uint8Array` unusable:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
// The memory for the `sharedUint8Array` is accessible from both the
// original and the copy received by `.on('message')`:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);
// This transfers a freshly created message port to the receiver.
// This can be used, for example, to create communication channels between
// multiple `Worker` threads that are children of the same parent thread.
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);
注意一點,如果我們想共享內(nèi)存拂共,我們可以傳遞 ArrayBuffer
或者 SharedArrayBuffer
牺弄,那么這兩種類型的數(shù)據(jù)有什么特殊性呢?
答案是 ArrayBuffer
和 SharedArrayBuffer
支持 Atomics 一起使用,可以實現(xiàn) Lock 相關(guān)的概念