写点什么

nodejs 中使用 worker_threads 来创建新的线程

发布于: 2021 年 01 月 21 日
nodejs中使用worker_threads来创建新的线程

简介

之前的文章中提到了,nodejs 中有两种线程,一种是 event loop 用来相应用户的请求和处理各种 callback。另一种就是 worker pool 用来处理各种耗时操作。


nodejs 的官网提到了一个能够使用 nodejs 本地 woker pool 的 lib 叫做 webworker-threads。


可惜的是 webworker-threads 的最后一次更新还是在 2 年前,而在最新的 nodejs 12 中,根本无法使用。


而 webworker-threads 的作者则推荐了一个新的 lib 叫做 web-worker。


web-worker 是构建于 nodejs 的 worker_threads 之上的,本文将会详细讲解 worker_threads 和 web-worker 的使用。


worker_threads

worker_threads 模块的源代码源自 lib/worker_threads.js,它指的是工作线程,可以开启一个新的线程来并行执行 javascript 程序。


worker_threads 主要用来处理 CPU 密集型操作,而不是 IO 操作,因为 nodejs 本身的异步 IO 已经非常强大了。


worker_threads 中主要有 5 个属性,3 个 class 和 3 个主要的方法。接下来我们将会一一讲解。


isMainThread

isMainThread 用来判断代码是否在主线程中运行,我们看一个使用的例子:


const { Worker, isMainThread } = require('worker_threads');
if (isMainThread) { console.log('在主线程中'); new Worker(__filename);} else { console.log('在工作线程中'); console.log(isMainThread); // 打印 'false'。}
复制代码

上面的例子中,我们从 worker_threads 模块中引入了 Worker 和 isMainThread,Worker 就是工作线程的主类,我们将会在后面详细讲解,这里我们使用 Worker 创建了一个工作线程。


MessageChannel

MessageChannel 代表的是一个异步双向通信 channel。MessageChannel 中没有方法,主要通过 MessageChannel 来连接两端的 MessagePort。


    class MessageChannel {        readonly port1: MessagePort;        readonly port2: MessagePort;    }
复制代码

当我们使用 new MessageChannel()的时候,会自动创建两个 MessagePort。


const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log('received', message));port2.postMessage({ foo: 'bar' });// Prints: received { foo: 'bar' } from the `port1.on('message')` listener
复制代码

通过 MessageChannel,我们可以进行 MessagePort 间的通信。


parentPort 和 MessagePort

parentPort 是一个 MessagePort 类型,parentPort 主要用于 worker 线程和主线程进行消息交互。


通过 parentPort.postMessage()发送的消息在主线程中将可以通过 worker.on(‘message’)接收。


主线程中通过 worker.postMessage()发送的消息将可以在工作线程中通过 parentPort.on(‘message’)接收。


我们看一下 MessagePort 的定义:


class MessagePort extends EventEmitter {        close(): void;        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;        ref(): void;        unref(): void;        start(): void;
addListener(event: "close", listener: () => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: "close"): boolean; emit(event: "message", value: any): boolean; emit(event: string | symbol, ...args: any[]): boolean;
on(event: "close", listener: () => void): this; on(event: "message", listener: (value: any) => void): this; on(event: string | symbol, listener: (...args: any[]) => void): this;
once(event: "close", listener: () => void): this; once(event: "message", listener: (value: any) => void): this; once(event: string | symbol, listener: (...args: any[]) => void): this;
prependListener(event: "close", listener: () => void): this; prependListener(event: "message", listener: (value: any) => void): this; prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: "close", listener: () => void): this; prependOnceListener(event: "message", listener: (value: any) => void): this; prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: "close", listener: () => void): this; removeListener(event: "message", listener: (value: any) => void): this; removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: "close", listener: () => void): this; off(event: "message", listener: (value: any) => void): this; off(event: string | symbol, listener: (...args: any[]) => void): this; }
复制代码

MessagePort 继承自 EventEmitter,它表示的是异步双向通信 channel 的一端。这个 channel 就叫做 MessageChannel,MessagePort 通过 MessageChannel 来进行通信。


我们可以通过 MessagePort 来传输结构体数据,内存区域或者其他的 MessagePorts。


从源代码中,我们可以看到 MessagePort 中有两个事件,close 和 message。


close 事件将会在 channel 的中任何一端断开连接的时候触发,而 message 事件将会在 port.postMessage 时候触发,下面我们看一个例子:


const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();
// Prints:// foobar// closed!port2.on('message', (message) => console.log(message));port2.on('close', () => console.log('closed!'));
port1.postMessage('foobar');port1.close();
复制代码

port.on(‘message’)实际上为 message 事件添加了一个 listener,port 还提供了 addListener 方法来手动添加 listener。


port.on(‘message’)会自动触发 port.start()方法,表示启动一个 port。


当 port 有 listener 存在的时候,这表示 port 存在一个 ref,当存在 ref 的时候,程序是不会结束的。我们可以通过调用 port.unref 方法来取消这个 ref。


接下来我们看一下怎么通过 port 来传输消息:


port.postMessage(value[, transferList])
复制代码

postMessage 可以接受两个参数,第一个参数是 value,这是一个 JavaScript 对象。第二个参数是 transferList。


先看一个传递一个参数的情况:


const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const circularData = {};circularData.foo = circularData;// Prints: { foo: [Circular] }port2.postMessage(circularData);
复制代码

通常来说 postMessage 发送的对象都是 value 的拷贝,但是如果你指定了 transferList,那么在 transferList 中的对象将会被 transfer 到 channel 的接受端,并且不再存在于发送端,就好像把对象传送出去一样。


transferList 是一个 list,list 中的对象可以是 ArrayBuffer, MessagePort 和 FileHandle。


如果 value 中包含 SharedArrayBuffer 对象,那么该对象不能被包含在 transferList 中。


看一个包含两个参数的例子:


const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);// post uint8Array的拷贝:port2.postMessage(uint8Array);
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
//port2.postMessage(uint8Array);
复制代码

上面的例子将输出:


Uint8Array(4) [ 1, 2, 3, 4 ]Uint8Array(4) [ 1, 2, 3, 4 ]
复制代码

第一个 postMessage 是拷贝,第二个 postMessage 是 transfer Uint8Array 底层的 buffer。


如果我们再次调用 port2.postMessage(uint8Array),我们会得到下面的错误:


DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.
复制代码


buffer 是 TypedArray 的底层存储结构,如果 buffer 被 transfer,那么之前的 TypedArray 将会变得不可用。


markAsUntransferable

要想避免这个问题,我们可以调用 markAsUntransferable 将 buffer 标记为不可 transferable. 我们看一个 markAsUntransferable 的例子:


const { MessageChannel, markAsUntransferable } = require('worker_threads');
const pooledBuffer = new ArrayBuffer(8);const typedArray1 = new Uint8Array(pooledBuffer);const typedArray2 = new Float64Array(pooledBuffer);
markAsUntransferable(pooledBuffer);
const { port1 } = new MessageChannel();port1.postMessage(typedArray1, [ typedArray1.buffer ]);
console.log(typedArray1);console.log(typedArray2);
复制代码

SHARE_ENV

SHARE_ENV 是传递给 worker 构造函数的一个 env 变量,通过设置这个变量,我们可以在主线程与工作线程进行共享环境变量的读写。


const { Worker, SHARE_ENV } = require('worker_threads');new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })  .on('exit', () => {    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.  });
复制代码

workerData

除了 postMessage(),还可以通过在主线程中传递 workerData 给 worker 的构造函数,从而将主线程中的数据传递给 worker:


const { Worker, isMainThread, workerData } = require('worker_threads');
if (isMainThread) { const worker = new Worker(__filename, { workerData: 'Hello, world!' });} else { console.log(workerData); // Prints 'Hello, world!'.}
复制代码

worker 类

先看一下 worker 的定义:


    class Worker extends EventEmitter {        readonly stdin: Writable | null;        readonly stdout: Readable;        readonly stderr: Readable;        readonly threadId: number;        readonly resourceLimits?: ResourceLimits;
constructor(filename: string | URL, options?: WorkerOptions);
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void; ref(): void; unref(): void;
terminate(): Promise<number>;
getHeapSnapshot(): Promise<Readable>;
addListener(event: "error", listener: (err: Error) => void): this; addListener(event: "exit", listener: (exitCode: number) => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: "online", listener: () => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this;
... }
复制代码

worker 继承自 EventEmitter,并且包含了 4 个重要的事件:error,exit,message 和 online。


worker 表示的是一个独立的 JavaScript 执行线程,我们可以通过传递 filename 或者 URL 来构造 worker。


每一个 worker 都有一对内置的 MessagePort,在 worker 创建的时候就会相互关联。worker 使用这对内置的 MessagePort 来和父线程进行通信。


通过 parentPort.postMessage()发送的消息在主线程中将可以通过 worker.on(‘message’)接收。


主线程中通过 worker.postMessage()发送的消息将可以在工作线程中通过 parentPort.on(‘message’)接收。


当然,你也可以显式的创建 MessageChannel 对象,然后将 MessagePort 作为消息传递给其他线程,我们看一个例子:


const assert = require('assert');const {  Worker, MessageChannel, MessagePort, isMainThread, parentPort} = require('worker_threads');if (isMainThread) {  const worker = new Worker(__filename);  const subChannel = new MessageChannel();  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);  subChannel.port2.on('message', (value) => {    console.log('接收到:', value);  });} else {  parentPort.once('message', (value) => {    assert(value.hereIsYourPort instanceof MessagePort);    value.hereIsYourPort.postMessage('工作线程正在发送此消息');    value.hereIsYourPort.close();  });}
复制代码

上面的例子中,我们借助了 worker 和 parentPort 本身的消息传递功能,传递了一个显式的 MessageChannel 中的 MessagePort。


然后又通过该 MessagePort 来进行消息的分发。


receiveMessageOnPort

除了 port 的 on(‘message’)方法之外,我们还可以使用 receiveMessageOnPort 来手动接收消息:


const { MessageChannel, receiveMessageOnPort } = require('worker_threads');const { port1, port2 } = new MessageChannel();port1.postMessage({ hello: 'world' });
console.log(receiveMessageOnPort(port2));// Prints: { message: { hello: 'world' } }console.log(receiveMessageOnPort(port2));// Prints: undefined
复制代码

moveMessagePortToContext

先了解一下 nodejs 中的 Context 的概念,我们可以从 vm 中创建 context,它是一个隔离的上下文环境,从而保证不同运行环境的安全性,我们看一个 context 的例子:


const vm = require('vm');
const x = 1;
const context = { x: 2 };vm.createContext(context); // 上下文隔离化对象。
const code = 'x += 40; var y = 17;';// `x` and `y` 是上下文中的全局变量。// 最初,x 的值为 2,因为这是 context.x 的值。vm.runInContext(code, context);
console.log(context.x); // 42console.log(context.y); // 17
console.log(x); // 1; y 没有定义。
复制代码

在 worker 中,我们可以将一个 MessagePort move 到其他的 context 中。


worker.moveMessagePortToContext(port, contextifiedSandbox)
复制代码

这个方法接收两个参数,第一个参数就是要 move 的 MessagePort,第二个参数就是 vm.createContext()创建的 context 对象。


worker_threads 的线程池

上面我们提到了使用单个的 worker thread,但是现在程序中一个线程往往是不够的,我们需要创建一个线程池来维护 worker thread 对象。


nodejs 提供了 AsyncResource 类,来作为对异步资源的扩展。


AsyncResource 类是 async_hooks 模块中的。


下面我们看下怎么使用 AsyncResource 类来创建 worker 的线程池。


假设我们有一个 task,使用来执行两个数相加,脚本名字叫做 task_processor.js:


const { parentPort } = require('worker_threads');parentPort.on('message', (task) => {  parentPort.postMessage(task.a + task.b);});
复制代码

下面是 worker pool 的实现:


const { AsyncResource } = require('async_hooks');const { EventEmitter } = require('events');const path = require('path');const { Worker } = require('worker_threads');
const kTaskInfo = Symbol('kTaskInfo');const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource { constructor(callback) { super('WorkerPoolTaskInfo'); this.callback = callback; }
done(err, result) { this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); // `TaskInfo`s are used only once. }}
class WorkerPool extends EventEmitter { constructor(numThreads) { super(); this.numThreads = numThreads; this.workers = []; this.freeWorkers = [];
for (let i = 0; i < numThreads; i++) this.addNewWorker(); }
addNewWorker() { const worker = new Worker(path.resolve(__dirname, 'task_processor.js')); worker.on('message', (result) => { // In case of success: Call the callback that was passed to `runTask`, // remove the `TaskInfo` associated with the Worker, and mark it as free // again. worker[kTaskInfo].done(null, result); worker[kTaskInfo] = null; this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); }); worker.on('error', (err) => { // In case of an uncaught exception: Call the callback that was passed to // `runTask` with the error. if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null); else this.emit('error', err); // Remove the worker from the list and start a new Worker to replace the // current one. this.workers.splice(this.workers.indexOf(worker), 1); this.addNewWorker(); }); this.workers.push(worker); this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); }
runTask(task, callback) { if (this.freeWorkers.length === 0) { // No free threads, wait until a worker thread becomes free. this.once(kWorkerFreedEvent, () => this.runTask(task, callback)); return; }
const worker = this.freeWorkers.pop(); worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); worker.postMessage(task); }
close() { for (const worker of this.workers) worker.terminate(); }}
module.exports = WorkerPool;
复制代码

我们给 worker 创建了一个新的 kTaskInfo 属性,并且将异步的 callback 封装到 WorkerPoolTaskInfo 中,赋值给 worker.kTaskInfo.


接下来我们就可以使用 workerPool 了:


const WorkerPool = require('./worker_pool.js');const os = require('os');
const pool = new WorkerPool(os.cpus().length);
let finished = 0;for (let i = 0; i < 10; i++) { pool.runTask({ a: 42, b: 100 }, (err, result) => { console.log(i, err, result); if (++finished === 10) pool.close(); });}
复制代码

本文作者:flydean 程序那些事

本文链接:http://www.flydean.com/nodejs-worker-thread/

本文来源:flydean 的博客

欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!


发布于: 2021 年 01 月 21 日阅读数: 22
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
nodejs中使用worker_threads来创建新的线程