Node.js躬行記(23)——Worker threads
- 2022 年 9 月 26 日
- 筆記
- Node.js躬行記
Node.js 官方提供了 Cluster 和 Child process 創建子進程,通過 Worker threads 模組創建子執行緒。但前者無法共享記憶體,通訊必須使用 JSON 格式,有一定的局限性和性能問題。後者更輕量,並且可以共享記憶體,通過傳輸 ArrayBuffer 實例或共享 SharedArrayBuffer 實例來做到這一點,即數據格式沒有太多要求。但是要注意,數據中不能包含函數。
Worker threads 從 Node V12 開始成為正式標準,其對於執行 CPU 密集型的操作很有用,而對 I/O 密集型工作沒有多大幫助。 Node.js 內置的非同步 I/O 操作要比它效率更高。注意,Worker threads 是基於 Node.js 架構的多工作執行緒,如下圖所示。在每個工作執行緒中,都會包含 V8 和 libuv,即都包含Event Loop。
一、執行緒池
創建、執行、銷毀一個 Worker 的開銷是很大的,所以需要實現一個執行緒池(Worker Pool),在初始化時創建有限數量的 Worker 並載入單一的 worker.js,主執行緒和 Worker 可進行進程間通訊,當所有任務完成後,這些 Worker 將會被統一銷毀。
在 Worker 中通過 parentPort.postMessage() 向主執行緒發送消息,而在主執行緒中可以通過 worker.on(‘message’) 接收發送過來的消息,worker 是一個 Worker 實例,例如 new Worker(filePath)。
下面是一個官方示例,isMainThread 可判斷當前是否是主執行緒,workerData 是傳遞給 Worker 的數據。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { module.exports = function parseJSAsync(script) { return new Promise((resolve, reject) => { const worker = new Worker(__filename, { workerData: script }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); }); }); }; } else { const script = workerData; parentPort.postMessage(script); }
下面是一個執行緒池示例,參考自《worker_threads 初體驗》一文,做了微調,具體在此不在贅述,可閱讀原文或注釋。
// 獲取當前設備的 CPU 執行緒數目,作為 numberOfThreads 的默認值。 const { length: cpusLength } = require('os').cpus(); const { Worker } = require('worker_threads'); class WorkerPool { constructor(workerPath, options = {}, numberOfThreads = cpusLength) { if (numberOfThreads < 1) { throw new Error('Number of threads should be greater or equal than 1!'); } this.workerPath = workerPath; this.numberOfThreads = numberOfThreads; // 任務隊列 this._queue = []; // Worker 索引 this._workersById = {}; // Worker 激活狀態索引 this._activeWorkersById = {}; // 創建 Workers for (let i = 0; i < this.numberOfThreads; i++) { const worker = new Worker(workerPath, options); this._workersById[i] = worker; // 將這些 Worker 設置為未激活狀態 this._activeWorkersById[i] = false; } } /** * 檢查空閑的 Worker */ getInactiveWorkerId() { for (let i = 0; i < this.numberOfThreads; i++) { if (!this._activeWorkersById[i]) return i; } return -1; } /** * 調用 Worker 執行,目的是在指定的 Worker 里執行指定的任務 */ runWorker(workerId, taskObj) { const worker = this._workersById[workerId]; // 當任務執行完畢後執行 const doAfterTaskIsFinished = () => { // 去除所有的 Listener,不然一次次添加不同的 Listener 會記憶體溢出(OOM) worker.removeAllListeners('message'); worker.removeAllListeners('error'); // 將這個 Worker 設為未激活狀態 this._activeWorkersById[workerId] = false; if (this._queue.length) { // 任務隊列非空,使用該 Worker 執行任務隊列中第一個任務 this.runWorker(workerId, this._queue.shift()); } }; // 將這個 Worker 設置為激活狀態 this._activeWorkersById[workerId] = true; // 設置兩個回調,用於 Worker 的監聽器 const messageCallback = result => { taskObj.cb(null, result); doAfterTaskIsFinished(); }; const errorCallback = error => { taskObj.cb(error); doAfterTaskIsFinished(); }; // 為 Worker 添加 'message' 和 'error' 兩個 Listener worker.once('message', messageCallback); worker.once('error', errorCallback); // 將數據傳給 Worker 供其獲取和執行 worker.postMessage(taskObj.data); } /** * 運行執行緒 */ run(data) { // Promise 是個好東西 return new Promise((resolve, reject) => { // 調用 getInactiveWorkerId() 獲取一個空閑的 Worker const availableWorkerId = this.getInactiveWorkerId(); const taskObj = { data, cb: (error, result) => { // 雖然 Workers 需要使用 Listener 和 Callback,但這不能阻止我們使用 Promise,對吧? // 不,你不能 util.promisify(taskObj) 。人不能,至少不應該。 if (error) reject(error); return resolve(result); } }; if (availableWorkerId === -1) { // 當前沒有空閑的 Workers 了,把任務丟進隊列里,這樣一旦有 Workers 空閑時就會開始執行。 this._queue.push(taskObj); return null; } // 有一個空閑的 Worker,用它執行任務 this.runWorker(availableWorkerId, taskObj); }) } /** * 銷毀 */ destroy(force = false) { for (let i = 0; i < this.numberOfThreads; i++) { if (this._activeWorkersById[i] && !force) { // 通常情況下,不應該在還有 Worker 在執行的時候就銷毀它,這一定是什麼地方出了問題,所以還是拋個 Error 比較好 // 不過保留一個 force 參數,總有人用得到的 throw new Error(`The worker ${i} is still runing!`); } // 銷毀這個 Worker this._workersById[i].terminate(); } } } module.exports = WorkerPool;
二、實踐
之所以需要多執行緒,是為了解決一個優化需求。就是有一個介面,裡面有很多查詢資料庫(MySQL和MongoDB)的操作,單條語句並不會慢,但累加後整體的響應速度就會變慢,那麼就想通過多執行緒,同時處理一些查詢語句,然後整合結果。
先對執行緒池做最簡單的處理,創建 worker.js,接收 userId。
const { isMainThread, parentPort } = require('worker_threads'); // 不是主執行緒時執行 if (!isMainThread) { parentPort.on('message', async ({userId }) => { console.log('postMessage', userId); parentPort.postMessage(userId); }); }
然後初始化執行緒池,將數組中的 userId 傳遞給 Worker,pool.run({ userId: item })。
const WorkerPool = require('./workerPool'); const { join } = require('path'); async function workerMain(services) { const workerPath = join(__dirname + '/worker.js'); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); Promise.all([4,12,13,15].map(async item => { await pool.run({ userId: item }); })).then(json => { // 銷毀執行緒池 pool.destroy(); }); }
輸出順序沒有按照數組的順序,並且每次的輸出順序還都是不同的,由此可知,程式碼是並發運行的。
postMessage 12 postMessage 4 postMessage 15 postMessage 13
那麼接下來就引入資料庫查詢的程式碼,公司項目基於 sequelize.js 封裝了增刪改查的邏輯,通過 services 變數可以調用相關的操作。在主執行緒中,計劃將 services 傳遞到 Worker 中。
async function workerMain(services) { // Worker Threads 不能共享實例以及帶函數的對象 const workerPath = join(__dirname + '/worker.js', { workerData: services }); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); // 省略程式碼...... }
然而報錯了,大致是下面這個意思,無法克隆,因為對象中包含函數,就會引發錯誤。
node:internal/worker:349 ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args); could not be cloned.
想以通訊的方式實現資料庫的並發查詢,目前看來不能完成。
其實可以在 worker.js 中單獨引入 services, 不過由於我們在腳本文件中採用了 import 語法,因此在執行時會報錯,SyntaxError: Cannot use import statement outside a module。
const { isMainThread, parentPort, workerData } = require('worker_threads'); const services = require('../services'); // 不是主執行緒時執行 if (!isMainThread) { // 省略程式碼...... }
還有一種解決方案,其成本就比較高,就是單獨再實現一套服務層,也就是說再封裝一層符合Node.js 模組化語法的資料庫操作集合。