>  기사  >  웹 프론트엔드  >  Node.js 멀티스레딩에 대해 자세히 살펴보기(가이드)

Node.js 멀티스레딩에 대해 자세히 살펴보기(가이드)

青灯夜游
青灯夜游앞으로
2020-08-13 10:18:112076검색

Node.js 멀티스레딩에 대해 자세히 살펴보기(가이드)

많은 사람들이 단일 스레드 Node.js가 다중 스레드 백엔드와 어떻게 경쟁할 수 있는지 궁금해합니다. 알려진 단일 스레드 특성을 고려할 때 많은 대기업이 Node를 백엔드로 선택하는 것이 직관에 반하는 것처럼 보일 수 있습니다. 그 이유를 이해하려면 단일 스레딩이 실제로 무엇을 의미하는지 이해해야 합니다. [동영상 튜토리얼 추천: nodejs 동영상 튜토리얼 ]

JavaScript 디자인은 양식 유효성 검사, 무지개색 마우스 궤적 생성 등 비교적 간단한 작업을 온라인에서 수행하는 데 매우 적합합니다. 2009년 Node.js 창립자 Ryan Dahl은 개발자가 언어로 백엔드 코드를 작성할 수 있도록 했습니다.

일반적으로 멀티스레딩을 지원하는 백엔드 언어에는 스레드와 기타 스레드 지향 기능 간의 데이터를 동기화하는 다양한 메커니즘이 있습니다. JavaScript에 이러한 기능에 대한 지원을 추가하려면 전체 언어를 수정해야 하지만 이는 Dahl의 목표가 아니었습니다. 순수한 JavaScript가 멀티스레딩을 지원하도록 하기 위해 그는 해결 방법을 찾아야 했습니다. 비밀을 살펴보겠습니다...

Node.js 작동 방식

Node.js는 두 가지 유형의 스레드를 사용합니다. 하나는 이벤트 루프에서 처리하기 위한 기본 스레드이고 다른 하나는 worker 풀에 있는 여러 보조 스레드입니다.

이벤트 루프는 콜백(함수)을 가져와 등록하여 미래의 특정 시점에 실행할 준비가 된 메커니즘입니다. 연관된 JavaScript 코드와 동일한 스레드에서 실행됩니다. JavaScript 작업이 스레드를 차단하면 이벤트 루프도 차단됩니다.

작업 풀은 별도의 스레드를 생성하고 처리한 다음 작업을 동기적으로 실행하고 결과를 이벤트 루프에 반환하는 실행 모델입니다. 이벤트 루프는 반환된 결과를 사용하여 제공된 콜백을 실행합니다.

간단히 말하면 비동기 I/O 작업, 주로 시스템 디스크 및 네트워크와의 상호 작용을 담당합니다. 주로 fs(I/O 집약적) 또는 crypto(CPU 집약적)와 같은 모듈에서 사용됩니다. 작업 풀은 Node가 JavaScript와 C++ 내부적으로 통신할 때 약간의 지연이 발생하지만 거의 눈에 띄지 않습니다. fs(I/O 密集)或 crypto(CPU 密集)等模块使用。工作池用 libuv 实现,当 Node 需要在 JavaScript 和 C++ 之间进行内部通信时,会导致轻微的延迟,但这几乎不可察觉。

基于这两种机制,我们可以编写如下代码:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

前面提到的 fs 模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数,并用文件的内容执行它。

以上是非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉工作池去读取文件,并用结果去调用提供的函数即可。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。

在不需要同步执行某些复杂操作时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。如果应用程序中有大量这类功能,就可能会明显降低服务器的吞吐量,甚至完全冻结它。在这种情况下,无法继续将工作委派给工作池。

在需要对数据进行复杂的计算时(如AI、机器学习或大数据)无法真正有效地使用 Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在 Node.js v10.5.0 发布之前就是这种情况,在这一版本增加了对多线程的支持。

简介:worker_threads

worker_threads 模块允许我们创建功能齐全的多线程 Node.js 程序。

thread worker 是在单独的线程中生成的一段代码(通常从文件中取出)。

注意,术语 thread workerworkerthread 经常互换使用,他们都指的是同一件事。

要想使用 thread worker,必须导入 worker_threads 模块。让我们先写一个函数来帮助我们生成这些thread worker,然后再讨论它们的属性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要创建一个 worker,首先必须创建一个 Worker 类的实例。它的第一个参数提供了包含 worker 的代码的文件的路径;第二个参数提供了一个名为 workerData 的包含一个属性的对象。这是我们希望线程在开始运行时可以访问的数据。

请注意:不管你是用的是 JavaScript, 还是最终要转换为 JavaScript 的语言(例如,TypeScript),路径应该始终引用带有 .js.mjs 扩展名的文件。

我还想指出为什么使用回调方法,而不是返回在触发  message  事件时将解决的 promise。这是因为 worker 可以发送许多  message

이 두 메커니즘을 기반으로 다음 코드를 작성할 수 있습니다.

worker.on('error', (error) => {});

앞서 언급한 fs 모듈은 작업자 풀에 스레드 중 하나를 사용하여 파일 내용을 읽고 이를 알리도록 지시합니다. 사이클이 완료되면 이벤트가 발생합니다. 그런 다음 이벤트 루프는 제공된 콜백 함수를 가져와 파일 내용으로 실행합니다.

위는 어떤 일이 일어날 때까지 동기적으로 기다릴 필요가 없는 비차단 코드의 예입니다. 작업자 풀에 파일을 읽고 결과와 함께 제공된 함수를 호출하라고 지시하면 됩니다. 작업자 풀에는 자체 스레드가 있으므로 파일을 읽는 동안 이벤트 루프가 정상적으로 계속 실행될 수 있습니다. 🎜🎜복잡한 작업을 동기식으로 수행할 필요가 없을 때는 괜찮습니다. 실행하는 데 너무 오래 걸리는 함수는 스레드를 차단합니다. 애플리케이션에 이러한 기능이 많이 있으면 서버 처리량이 크게 느려지거나 서버가 완전히 정지될 수도 있습니다. 이 경우 더 이상 작업자 풀에 작업을 위임할 수 없습니다. 🎜🎜Node.js는 데이터(예: AI, 기계 학습 또는 빅 데이터)에 대한 복잡한 계산을 수행해야 할 때 실제로 효과적으로 사용할 수 없습니다. 작업이 기본(유일한) 스레드를 차단하여 서버가 응답하지 않게 되기 때문입니다. 멀티스레딩 지원이 추가된 Node.js v10.5.0이 출시되기 전의 경우였습니다. 🎜🎜소개: 🎜worker_threads🎜🎜🎜 worker_threads 모듈을 사용하면 완전한 기능을 갖춘 멀티스레드 Node.js 프로그램을 만들 수 있습니다. 🎜🎜스레드 워커는 별도의 스레드에서 생성된 코드 조각(보통 파일에서 가져옴)입니다. 🎜🎜 🎜스레드 워커🎜, 🎜worker🎜 및 🎜thread🎜라는 용어는 종종 같은 의미로 사용되며 모두 동일한 것을 나타냅니다. 🎜🎜스레드 작업자를 사용하려면 worker_threads 모듈을 가져와야 합니다. 먼저 이러한 스레드 작업자를 생성하는 데 도움이 되는 함수를 작성한 다음 해당 속성에 대해 논의해 보겠습니다. 🎜
worker.on('exit', (exitCode) => {});
🎜워커를 만들려면 먼저 Worker 클래스의 인스턴스를 만들어야 합니다. 첫 번째 매개변수는 작업자의 코드가 포함된 파일에 대한 경로를 제공하고, 두 번째 매개변수는 속성이 포함된 workerData라는 개체를 제공합니다. 이는 스레드가 실행을 시작할 때 액세스할 수 있는 데이터입니다. 🎜🎜참고: JavaScript를 사용하는지 또는 최종적으로 JavaScript로 변환할 언어(예: TypeScript)를 사용하는지에 관계없이 경로는 항상 .js 또는 를 사용하여 경로를 참조해야 합니다. mjs 파일 확장자. 🎜🎜message 이벤트가 실행될 때 해결될 Promise를 반환하는 대신 콜백 메서드가 사용되는 이유도 지적하고 싶습니다. 이는 작업자가 단 하나의 이벤트가 아닌 여러 개의 message 이벤트를 보낼 수 있기 때문입니다. 🎜🎜위의 예에서 볼 수 있듯이 스레드 간의 통신은 이벤트 기반입니다. 즉, 지정된 이벤트를 보낸 후 작업자가 호출하는 리스너를 설정한다는 의미입니다. 🎜🎜가장 일반적인 이벤트는 다음과 같습니다. 🎜
worker.on('error', (error) => {});

只要 worker 中有未捕获的异常,就会发出 error 事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。

worker.on('exit', (exitCode) => {});

在 worker 退出时会发出 exit 事件。如果在worker中调用了 process.exit(),那么 exitCode 将被提供给回调。如果 worker 以 worker.terminate() 终止,则代码为1。

worker.on('online', () => {});

只要 worker 停止解析 JavaScript 代码并开始执行,就会发出 online 事件。它不常用,但在特定情况下可以提供信息。

worker.on('message', (data) => {});

只要 worker 将数据发送到父线程,就会发出 message 事件。

现在让我们来看看如何在线程之间共享数据。

在线程之间交换数据

要将数据发送到另一个线程,可以用 port.postMessage() 方法。它的原型如下:

port.postMessage(data[, transferList])

port 对象可以是 parentPort,也可以是 MessagePort 的实例 —— 稍后会详细讲解。

数据参数

第一个参数 —— 这里被称为 data —— 是一个被复制到另一个线程的对象。它可以是复制算法所支持的任何内容。

数据由结构化克隆算法进行复制。引用自 Mozilla:

它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。

该算法不复制函数、错误、属性描述符或原型链。还需要注意的是,以这种方式复制对象与使用 JSON 不同,因为它可以包含循环引用和类型化数组,而 JSON 不能。

由于能够复制类型化数组,该算法可以在线程之间共享内存。

在线程之间共享内存

人们可能会说像 clusterchild_process 这样的模块在很久以前就开始使用线程了。这话对,也不对。

cluster 模块可以创建多个节点实例,其中一个主进程在它们之间对请求进行路由。集群能够有效地增加服务器的吞吐量;但是我们不能用 cluster 模块生成一个单独的线程。

人们倾向于用 PM2 这样的工具来集中管理他们的程序,而不是在自己的代码中手动执行,如果你有兴趣,可以研究一下如何使用 cluster 模块。

child_process 模块可以生成任何可执行文件,无论它是否是用 JavaScript 写的。它和 worker_threads 非常相似,但缺少后者的几个重要功能。

具体来说 thread workers 更轻量,并且与其父线程共享相同的进程 ID。它们还可以与父线程共享内存,这样可以避免对大的数据负载进行序列化,从而更有效地来回传递数据。

现在让我们看一下如何在线程之间共享内存。为了共享内存,必须将  ArrayBufferSharedArrayBuffer 的实例作为数据参数发送到另一个线程。

这是一个与其父线程共享内存的 worker:

import { parentPort } from 'worker_threads';

parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);

 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }

 parentPort.postMessage({ arr });
});

首先,我们创建一个 SharedArrayBuffer,其内存需要包含100个32位整数。接下来创建一个 Int32Array 实例,它将用缓冲区来保存其结构,然后用一些随机数填充数组并将其发送到父线程。

在父线程中:

import path from &#39;path&#39;;

import { runWorker } from &#39;../run-worker&#39;;

const worker = runWorker(path.join(__dirname, &#39;worker.js&#39;), (err, { arr }) => {
 if (err) {
   return null;
 }

 arr[0] = 5;
});

worker.postMessage({});

arr [0] 的值改为5,实际上会在两个线程中修改它。

当然,通过共享内存,我们冒险在一个线程中修改一个值,同时也在另一个线程中进行了修改。但是我们在这个过程中也得到了一个好处:该值不需要进行序列化就可以另一个线程中使用,这极大地提高了效率。只需记住管理数据正确的引用,以便在完成数据处理后对其进行垃圾回收。

共享一个整数数组固然很好,但我们真正感兴趣的是共享对象 —— 这是存储信息的默认方式。不幸的是,没有 SharedObjectBuffer 或类似的东西,但我们可以自己创建一个类似的结构

transferList参数

transferList 中只能包含 ArrayBufferMessagePort。一旦它们被传送到另一个线程,就不能再次被传送了;因为内存里的内容已经被移动到了另一个线程。

目前,还不能通过 transferList(可以使用 child_process 模块)来传输网络套接字。

创建通信渠道

线程之间的通信是通过 port 进行的,port 是 MessagePort 类的实例,并启用基于事件的通信。

使用 port 在线程之间进行通信的方法有两种。第一个是默认值,这个方法比较容易。在 worker 的代码中,我们从worker_threads 模块导入一个名为 parentPort 的对象,并使用对象的 .postMessage() 方法将消息发送到父线程。

这是一个例子:

import { parentPort } from &#39;worker_threads&#39;;
const data = {
 // ...
};

parentPort.postMessage(data);

parentPort 是 Node.js 在幕后创建的 MessagePort 实例,用于与父线程进行通信。这样就可以用 parentPortworker 对象在线程之间进行通信。

线程间的第二种通信方式是创建一个 MessageChannel 并将其发送给 worker。以下代码是如何创建一个新的 MessagePort 并与我们的 worker 共享它:

import path from &#39;path&#39;;
import { Worker, MessageChannel } from &#39;worker_threads&#39;;

const worker = new Worker(path.join(__dirname, &#39;worker.js&#39;));

const { port1, port2 } = new MessageChannel();

port1.on(&#39;message&#39;, (message) => {
 console.log(&#39;message from worker:&#39;, message);
});

worker.postMessage({ port: port2 }, [port2]);

在创建 port1port2 之后,我们在 port1 上设置事件监听器并将 port2 发送给 worker。我们必须将它包含在 transferList 中,以便将其传输给 worker 。

在 worker 内部:

import { parentPort, MessagePort } from &#39;worker_threads&#39;;

parentPort.on(&#39;message&#39;, (data) => {
 const { port }: { port: MessagePort } = data;

 port.postMessage(&#39;heres your message!&#39;);
});

这样,我们就能使用父线程发送的 port 了。

使用 parentPort 不一定是错误的方法,但最好用 MessageChannel 的实例创建一个新的 MessagePort,然后与生成的 worker 共享它。

请注意,在后面的例子中,为了简便起见,我用了 parentPort

使用 worker 的两种方式

可以通过两种方式使用 worker。第一种是生成一个 worker,然后执行它的代码,并将结果发送到父线程。通过这种方法,每当出现新任务时,都必须重新创建一个工作者。

第二种方法是生成一个 worker 并为 message 事件设置监听器。每次触发 message 时,它都会完成工作并将结果发送回父线程,这会使 worker 保持活动状态以供以后使用。

Node.js 文档推荐第二种方法,因为在创建 thread worker 时需要创建虚拟机并解析和执行代码,这会产生比较大的开销。所以这种方法比不断产生新 worker 的效率更高。

这种方法被称为工作池,因为我们创建了一个工作池并让它们等待,在需要时调度 message 事件来完成工作。

以下是一个产生、执行然后关闭 worker 例子:

import { parentPort } from &#39;worker_threads&#39;;

const collection = [];

for (let i = 0; i < 10; i += 1) {
 collection[i] = i;
}

parentPort.postMessage(collection);

collection 发送到父线程后,它就会退出。

下面是一个 worker 的例子,它可以在给定任务之前等待很长一段时间:

import { parentPort } from &#39;worker_threads&#39;;

parentPort.on(&#39;message&#39;, (data: any) => {
 const result = doSomething(data);

 parentPort.postMessage(result);
});

worker_threads 模块中可用的重要属性

worker_threads 模块中有一些可用的属性:

isMainThread

当不在工作线程内操作时,该属性为 true 。如果你觉得有必要,可以在 worker 文件的开头包含一个简单的 if 语句,以确保它只作为 worker 运行。

import { isMainThread } from &#39;worker_threads&#39;;

if (isMainThread) {
 throw new Error(&#39;Its not a worker&#39;);
}

workerData

产生线程时包含在 worker 的构造函数中的数据。

const worker = new Worker(path, { workerData });

在工作线程中:

import { workerData } from &#39;worker_threads&#39;;

console.log(workerData.property);

parentPort

前面提到的 MessagePort 实例,用于与父线程通信。

threadId

分配给 worker 的唯一标识符。


现在我们知道了技术细节,接下来实现一些东西并在实践中检验学到的知识。

实现 setTimeout

setTimeout 是一个无限循环,顾名思义,用来检测程序运行时间是否超时。它在循环中检查起始时间与给定毫秒数之和是否小于实际日期。

import { parentPort, workerData } from &#39;worker_threads&#39;;

const time = Date.now();

while (true) {
    if (time + workerData.time <= Date.now()) {
        parentPort.postMessage({});
        break;
    }
}

这个特定的实现产生一个线程,然后执行它的代码,最后在完成后退出。

接下来实现使用这个 worker 的代码。首先创建一个状态,用它来跟踪生成的 worker:

const timeoutState: { [key: string]: Worker } = {};

然后时负责创建 worker 并将其保存到状态的函数:

export function setTimeout(callback: (err: any) => any, time: number) {
 const id = uuidv4();

 const worker = runWorker(
   path.join(__dirname, &#39;./timeout-worker.js&#39;),
   (err) => {
     if (!timeoutState[id]) {
       return null;
     }

     timeoutState[id] = null;

     if (err) {
       return callback(err);
     }

     callback(null);
   },
   {
     time,
   },
 );

 timeoutState[id] = worker;

 return id;
}

首先,我们使用 UUID 包为 worker 创建一个唯一的标识符,然后用先前定义的函数 runWorker 来获取 worker。我们还向 worker 传入一个回调函数,一旦 worker 发送了数据就会被触发。最后,把 worker 保存在状态中并返回 id

在回调函数中,我们必须检查该 worker 是否仍然存在于该状态中,因为有可能会 cancelTimeout(),这将会把它删除。如果确实存在,就把它从状态中删除,并调用传给 setTimeout 函数的 callback

cancelTimeout 函数使用 .terminate() 方法强制 worker 退出,并从该状态中删除该这个worker:

export function cancelTimeout(id: string) {
 if (timeoutState[id]) {
   timeoutState[id].terminate();

   timeoutState[id] = undefined;

   return true;
 }

 return false;
}

如果你有兴趣,我也实现了 setInterval,代码在这里,但因为它对线程什么都没做(我们重用setTimeout的代码),所以我决定不在这里进行解释。

我已经创建了一个短小的测试代码,目的是检查这种方法与原生方法的不同之处。你可以在这里找到代码。这些是结果:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

我们可以看到 setTimeout 有一点延迟 - 大约40ms  - 这时 worker 被创建时的消耗。平均 CPU 成本也略高,但没什么难以忍受的(CPU 成本是整个过程持续时间内 CPU 使用率的平均值)。

如果我们可以重用 worker,就能够降低延迟和 CPU 使用率,这就是要实现工作池的原因。

实现工作池

如上所述,工作池是给定数量的被事先创建的 worker,他们保持空闲并监听 message 事件。一旦 message 事件被触发,他们就会开始工作并发回结果。

为了更好地描述我们将要做的事情,下面我们来创建一个由八个 thread worker 组成的工作池:

const pool = new WorkerPool(path.join(__dirname, &#39;./test-worker.js&#39;), 8);

如果你熟悉限制并发操作,那么你在这里看到的逻辑几乎相同,只是一个不同的用例。

如上面的代码片段所示,我们把指向 worker 的路径和要生成的 worker 数量传给了 WorkerPool 的构造函数。

export class WorkerPool<T, N> {
 private queue: QueueItem<T, N>[] = [];
 private workersById: { [key: number]: Worker } = {};
 private activeWorkersById: { [key: number]: boolean } = {};

 public constructor(public workerPath: string, public numberOfThreads: number) {
   this.init();
 }
}

这里还有其他一些属性,如 workersByIdactiveWorkersById,我们可以分别保存现有的 worker 和当前正在运行的 worker 的 ID。还有 queue,我们可以使用以下结构来保存对象:

type QueueCallback<N> = (err: any, result?: N) => void;

interface QueueItem<T, N> {
 callback: QueueCallback<N>;
 getData: () => T;
}

callback 只是默认的节点回调,第一个参数是错误,第二个参数是可能的结果。 getData 是传递给工作池 .run() 方法的函数(如下所述),一旦项目开始处理就会被调用。 getData 函数返回的数据将传给工作线程。

.init() 方法中,我们创建了 worker 并将它们保存在以下状态中:

private init() {
  if (this.numberOfThreads < 1) {
    return null;
  }

  for (let i = 0; i < this.numberOfThreads; i += 1) {
    const worker = new Worker(this.workerPath);

    this.workersById[i] = worker;
    this.activeWorkersById[i] = false;
  }
}

为避免无限循环,我们首先要确保线程数 > 1。然后创建有效的 worker 数,并将它们的索引保存在 workersById 状态。我们在 activeWorkersById 状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。

现在我们必须实现前面提到的 .run() 方法来设置一个 worker 可用的任务。

public run(getData: () => T) {
  return new Promise<N>((resolve, reject) => {
    const availableWorkerId = this.getInactiveWorkerId();

    const queueItem: QueueItem<T, N> = {
      getData,
      callback: (error, result) => {
        if (error) {
          return reject(error);
        }
return resolve(result);
      },
    };

    if (availableWorkerId === -1) {
      this.queue.push(queueItem);

      return null;
    }

    this.runWorker(availableWorkerId, queueItem);
  });
}

在 promise 函数里,我们首先通过调用 .getInactiveWorkerId() 来检查是否存在空闲的 worker 可以来处理数据:

private getInactiveWorkerId(): number {
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    if (!this.activeWorkersById[i]) {
      return i;
    }
  }

  return -1;
}

接下来,我们创建一个 queueItem,在其中保存传递给 .run() 方法的 getData 函数以及回调。在回调中,我们要么 resolve 或者 reject promise,这取决于 worker 是否将错误传递给回调。

如果 availableWorkerId 的值是 -1,意味着当前没有可用的 worker,我们将 queueItem 添加到 queue。如果有可用的 worker,则调用 .runWorker() 方法来执行 worker。

.runWorker() 方法中,我们必须把当前 worker 的 activeWorkersById 设置为使用状态;为 messageerror 事件设置事件监听器(并在之后清理它们);最后将数据发送给 worker。

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
 const worker = this.workersById[workerId];

 this.activeWorkersById[workerId] = true;

 const messageCallback = (result: N) => {
   queueItem.callback(null, result);

   cleanUp();
 };

 const errorCallback = (error: any) => {
   queueItem.callback(error);

   cleanUp();
 };

 const cleanUp = () => {
   worker.removeAllListeners(&#39;message&#39;);
   worker.removeAllListeners(&#39;error&#39;);

   this.activeWorkersById[workerId] = false;

   if (!this.queue.length) {
     return null;
   }

   this.runWorker(workerId, this.queue.shift());
 };

 worker.once(&#39;message&#39;, messageCallback);
 worker.once(&#39;error&#39;, errorCallback);

 worker.postMessage(await queueItem.getData());
}

首先,通过使用传递的 workerId,我们从 workersById 中获得 worker 引用。然后,在 activeWorkersById 中,将 [workerId] 属性设置为true,这样我们就能知道在 worker 在忙,不要运行其他任务。

接下来,分别创建 messageCallbackerrorCallback 用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给 worker。

在回调中,我们调用 queueItem 的回调,然后调用 cleanUp 函数。在 cleanUp 函数中,要删除事件侦听器,因为我们会多次重用同一个 worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。

activeWorkersById 状态中,我们将 [workerId] 属性设置为 false,并检查队列是否为空。如果不是,就从 queue 中删除第一个项目,并用另一个 queueItem 再次调用 worker。

接着创建一个在收到 message 事件中的数据后进行一些计算的 worker:

import { isMainThread, parentPort } from &#39;worker_threads&#39;;

if (isMainThread) {
 throw new Error(&#39;Its not a worker&#39;);
}

const doCalcs = (data: any) => {
 const collection = [];

 for (let i = 0; i < 1000000; i += 1) {
   collection[i] = Math.round(Math.random() * 100000);
 }

 return collection.sort((a, b) => {
   if (a > b) {
     return 1;
   }

   return -1;
 });
};

parentPort.on(&#39;message&#39;, (data: any) => {
 const result = doCalcs(data);

 parentPort.postMessage(result);
});

worker 创建了一个包含 100 万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。

以下是工作池简单用法的示例:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, &#39;./test-worker.js&#39;), 8);

const items = [...new Array(100)].fill(null);

Promise.all(
 items.map(async (_, i) => {
   await pool.run(() => ({ i }));

   console.log(&#39;finished&#39;, i);
 }),
).then(() => {
 console.log(&#39;finished all&#39;);
});

首先创建一个由八个 worker 组成的工作池。然后创建一个包含 100 个元素的数组,对于每个元素,我们在工作池中运行一个任务。开始运行后将立即执行八个任务,其余任务被放入队列并逐个执行。通过使用工作池,我们不必每次都创建一个 worker,从而大大提高了效率。

结论

worker_threads 提供了一种为程序添加多线程支持的简单的方法。通过将繁重的 CPU 计算委托给其他线程,可以显着提高服务器的吞吐量。通过官方线程支持,我们可以期待更多来自AI、机器学习和大数据等领域的开发人员和工程师使用 Node.js.

英文原文地址:https://blog.logrocket.com/a-complete-guide-to-threads-in-node-js-4fa3898fe74f

相关推荐:nodejs 教程

위 내용은 Node.js 멀티스레딩에 대해 자세히 살펴보기(가이드)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제