>웹 프론트엔드 >JS 튜토리얼 >Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

青灯夜游
青灯夜游앞으로
2022-10-14 20:05:412175검색

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

I. 前言

本文论点主要面向 Node.js 开发语言

>> Show Me Code,目前代码正在 dev 分支,已完成单元测试,尚待测试所有场景。

>> 建议通读 Node.js 官方文档 -【不要阻塞事件循环】

Node.js 即服务端 Javascript,得益于宿主环境的不同,它拥有比在浏览器上更多的能力。比如:完整的文件系统访问权限、网络协议、套接字编程、进程和线程操作、C++ 插件源码级的支持、Buffer 二进制、Crypto 加密套件的天然支持。【相关教程推荐:nodejs视频教程

Node.js 的是一门单线程的语言,它基于 V8 引擎开发,v8 在设计之初是在浏览器端对 JavaScript 语言的解析运行引擎,其最大的特点是单线程,这样的设计避免了一些多线程状态同步问题,使得其更轻量化易上手。

一、名词定义

1. 进程

学术上说,进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。我们这里将进程比喻为工厂的车间,它代表 CPU 所能处理的单个任务。任一时刻,CPU 总是运行一个进程,其他进程处于非运行状态。

进程具有以下特性:

  • 进程是拥有资源的基本单位,资源分配给进程,同一进程的所有线程共享该进程的所有资源;
  • 进程之间可以并发执行;
  • 在创建或撤消进程时,系统都要为之分配和回收资源,与线程相比系统开销较大;
  • 一个进程可以有多个线程,但至少有一个线程;

2. 线程

在早期的操作系统中并没有线程的概念,进程是能拥有资源和独立运行的最小单位,也是程序执行的最小单位。任务调度采用的是时间片轮转的抢占式调度方式,而进程是任务调度的最小单位,每个进程有各自独立的一块内存,使得各个进程之间内存地址相互隔离。

后来,随着计算机的发展,对 CPU 的要求越来越高,进程之间的切换开销较大,已经无法满足越来越复杂的程序的要求了。于是就发明了线程,线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元。这里把线程比喻一个车间的工人,即一个车间可以允许由多个工人协同完成一个任务,即一个进程中可能包含多个线程。

线程具有以下特性:

  • 线程作为调度和分配的基本单位;
  • 多个线程之间也可并发执行;
  • 线程是真正用来执行程序的,执行计算的;
  • 线程不拥有系统资源,但可以访问隶属于进程的资源,一个线程只能属于一个进程;

Node.js 的多进程有助于充分利用 CPU 等资源,Node.js 的多线程提升了单进程上任务的并行处理能力。

在 Node.js 中,每个 worker 线程都有他自己的 V8 实例和事件循环机制 (Event Loop)。但是,和进程不同,workers 之间是可以共享内存的。

二、Node.js 异步机制

1. Node.js 内部线程池、异步机制以及宏任务优先级划分

Node.js 的单线程是指程序的主要执行线程是单线程,这个主线程同时也负责事件循环。而其实语言内部也会创建线程池来处理主线程程序的 网络 IO / 文件 IO / 定时器 等调用产生的异步任务。一个例子就是定时器 Timer 的实现:在 Node.js 中使用定时器时,Node.js 会开启一个定时器线程进行计时,计时结束时,定时器回调函数会被放入位于主线程的宏任务队列。当事件循环系统执行完主线程同步代码和当前阶段的所有微任务时,该回调任务最后再被取出执行。所以 Node.js 的定时器其实是不准确的,只能保证在预计时间时我们的回调任务被放入队列等待执行,而不是直接被执行。

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

Node.js의 이벤트 루프 이벤트 루프 시스템과 결합된 멀티 스레딩 메커니즘을 통해 개발자는 타이머, IO 및 네트워크 요청을 포함하여 하나의 스레드에서 비동기 메커니즘을 사용할 수 있습니다. 그러나 응답성이 뛰어나고 성능이 뛰어난 서버를 달성하기 위해 Node.js의 이벤트 루프는 매크로 작업을 더욱 우선시합니다.

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

Node.js 매크로 작업 간의 우선순위: 타이머 > 보류 중 > 확인 >

  • 타이머 콜백: 시간이 지나면 실행이 빠를수록 정확하므로 우선순위가 가장 높으며 이해하기 쉽습니다.
  • 보류 중인 콜백: 네트워크, IO 및 기타 예외를 처리할 때의 콜백입니다. 일부 유닉스 시스템은 오류가 보고될 때까지 기다리므로 처리해야 합니다.
  • Poll 콜백: IO 데이터 및 네트워크 연결을 처리하는 작업은 서버에서 주로 처리합니다.
  • Check Callback: setImmediate 실행을 위한 콜백입니다. IO 실행 직후에 콜백할 수 있는 것이 특징입니다.
  • 닫기 콜백: 지연된 실행은 리소스에 영향을 주지 않으며 우선순위가 가장 낮습니다.

Node.js 최적화 및 마이크로 작업 구분: process.nextTick >

2. Node.js의 매크로 작업과 마이크로 작업의 실행 타이밍

노드 11 이전에는 Node.js의 이벤트 루프는 한 번에 하나의 매크로 작업을 실행한 다음 모든 마이크로 작업을 실행하는 브라우저와 같지 않았습니다. 특정 개수의 타이머 매크로태스크를 실행한 후 모든 마이크로태스크를 실행하고 특정 개수의 보류 중인 매크로태스크를 실행한 다음 나머지 마이크로태스크도 마찬가지입니다. 노드 11 이후에는 모든 마이크로 태스크를 실행하기 위해 각 매크로 태스크로 변경되었습니다.

Node.js의 매크로 작업에도 우선 순위가 있습니다. Node.js의 이벤트 루프가 다음 우선 순위의 매크로 작업을 실행하기 전에 현재 우선 순위의 매크로 작업을 모두 실행하면 "배고프다"가 발생하게 됩니다. 상태. 특정 단계에 매크로 작업이 너무 많으면 다음 단계는 실행되지 않습니다. 따라서 각 유형의 매크로 작업에는 실행 횟수를 제한하는 메커니즘이 있으며 나머지는 계속 실행되도록 후속 이벤트 루프로 넘겨집니다. .

최종 성능은 다음과 같습니다. 즉, 특정 수의 타이머 매크로 작업을 실행하고, 각 매크로 작업 사이에 모든 마이크로 작업을 실행한 다음, 특정 수의 대기 중인 콜백 매크로 작업을 실행하고, 각 매크로 작업 사이에 모든 마이크로 작업을 실행합니다.

3. Node.js의 다중 프로세스

1. child_process 메소드를 사용하여 수동으로 프로세스 생성

Node.js 프로그램은 child_process 모듈을 통해 하위 프로세스를 생성하는 기능을 제공합니다. 여러 자식 프로세스 생성 방법:

  • spawn 새로운 프로세스를 생성하고, 실행 결과는 이벤트를 통해서만 얻을 수 있어 운영이 번거롭습니다.
const spawn = require('child_process').spawn;
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.log(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});
  • execFile 새 프로세스를 생성하고 그 뒤에 있는 파일 이름에 따라 실행 파일을 실행하고 옵션을 가져오고 호출 결과를 콜백 형식으로 반환할 수 있습니다. 훨씬 더 편리해졌습니다.
execFile('/path/to/node', ['--version'], function(error, stdout, stderr){
    if(error){
        throw error;
    }
    console.log(stdout);
});
  • exec 쉘 명령을 직접 실행할 수 있는 새로운 프로세스를 생성하여 쉘 명령 실행을 단순화하고 실행 결과를 콜백 형식으로 반환합니다.
exec('ls -al', function(error, stdout, stderr){
    if(error) {
        console.error('error:' + error);
        return;
    }
    console.log('stdout:' + stdout);
    console.log('stderr:' + typeof stderr);
});
  • fork 새로운 프로세스를 생성하고 노드 프로그램을 실행합니다. 프로세스에는 완전한 V8 인스턴스가 생성되고 나면 기본 프로세스에서 하위 프로세스로의 IPC 통신이 자동으로 활성화되어 가장 많은 리소스를 소비합니다.
var child = child_process.fork('./anotherSilentChild.js', {
    silent: true
});

child.stdout.setEncoding('utf8');
child.stdout.on('data', function(data){
    console.log(data);
});

그 중spawn은 모든 메소드의 기본이 되며, exec의 최하위 레이어에서는 execFile을 호출합니다.

2. 클러스터 방식을 사용하여 반자동으로 프로세스 생성

다음은 Cluster 모듈을 사용하여 http 서비스 클러스터를 생성하는 간단한 예입니다. 예제에서는 클러스터를 생성할 때 동일한 Js 실행 파일이 사용됩니다. 파일에 cluster.isPrimary가 사용되어 현재 실행 환경이 기본 프로세스에 있는지 아니면 하위 프로세스에 있는지 확인됩니다. 메인 프로세스인 경우 현재 실행 파일을 사용하여 하위 프로세스를 생성합니다. 인스턴스인 경우 하위 프로세스의 비즈니스 처리 흐름으로 들어갑니다. Cluster 模块创建一个 http 服务集群的简单示例。示例中创建 Cluster 时使用同一个 Js 执行文件,在文件内使用 cluster.isPrimary 判断当前执行环境是在主进程还是子进程,如果是主进程则使用当前执行文件创建子进程实例,如果时子进程则进入子进程的业务处理流程。

/*
  简单示例:使用同一个 JS 执行文件创建子进程集群 Cluster
*/
const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);
  // Fork workers.
  for (let i = 0; i  {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
  console.log(`Worker ${process.pid} started`);
}

Cluster 模块允许设立一个主进程和若干个子进程,使用 child_process.fork() 在内部隐式创建子进程,由主进程监控和协调子进程的运行。

子进程之间采用进程间通信交换消息,Cluster 模块内置一个负载均衡器,采用 Round-robin 算法(轮流执行)协调各个子进程之间的负载。运行时,所有新建立的连接都由主进程完成,然后主进程再把 TCP 连接分配给指定的子进程。

使用集群创建的子进程可以使用同一个端口,Node.js 内部对 http/net

$: npm install -g pm2
Cluster 모듈을 사용하면 기본 프로세스와 여러 하위 프로세스를 설정할 수 있습니다. child_process.fork()를 사용하여 내부적으로 하위 프로세스를 생성하고 기본 프로세스를 생성하세요. 프로세스의 운영을 모니터링하고 조정합니다. 🎜🎜프로세스 간 통신은 하위 프로세스 간에 메시지를 교환하는 데 사용됩니다. 클러스터 모듈에는 로드 밸런서가 내장되어 있으며 라운드 로빈 알고리즘(순서대로 실행)을 사용하여 하위 프로세스 간의 로드를 조정합니다. 실행 시 새로 설정된 모든 연결은 기본 프로세스에 의해 완료되고, 기본 프로세스는 지정된 하위 프로세스에 TCP 연결을 할당합니다. 🎜🎜클러스터를 사용하여 생성된 하위 프로세스는 동일한 포트를 사용할 수 있습니다. Node.js는 http/net 내장 모듈을 특별하게 지원합니다. Node.js 메인 프로세스는 대상 포트를 청취하는 역할을 담당하며, 요청을 받은 후 로드 밸런싱 정책에 따라 요청을 하위 프로세스에 분배합니다. 🎜

3. 使用基于 Cluster 封装的 PM2 工具全自动创建进程

PM2 是常用的 node 进程管理工具,它可以提供 node.js 应用管理能力,如自动重载、性能监控、负载均衡等。

其主要用于 独立应用 的进程化管理,在 Node.js 单机服务部署方面比较适合。可以用于生产环境下启动同个应用的多个实例提高 CPU 利用率、抗风险、热加载等能力。

由于是外部库,需要使用 npm 包管理器安装:

$: npm install -g pm2

pm2 支持直接运行 server.js 启动项目,如下:

$: pm2 start server.js

即可启动 Node.js 应用,成功后会看到打印的信息:

┌──────────┬────┬─────────┬──────┬───────┬────────┬─────────┬────────┬─────┬───────────┬───────┬──────────┐
│ App name │ id │ version │ mode │ pid   │ status │ restart │ uptime │ cpu │ mem       │ user  │ watching │
├──────────┼────┼─────────┼──────┼───────┼────────┼─────────┼────────┼─────┼───────────┼───────┼──────────┤
│ server   │ 0  │ 1.0.0   │ fork │ 24776 │ online │ 9       │ 19m    │ 0%  │ 35.4 MB   │ 23101 │ disabled │
└──────────┴────┴─────────┴──────┴───────┴────────┴─────────┴────────┴─────┴───────────┴───────┴──────────┘

pm2 也支持配置文件启动,通过配置文件 ecosystem.config.js 可以定制 pm2 的各项参数:

module.exports = {
  apps : [{
    name: 'API', // 应用名
    script: 'app.js', // 启动脚本
    args: 'one two', // 命令行参数
    instances: 1, // 启动实例数量
    autorestart: true, // 自动重启
    watch: false, // 文件更改监听器
    max_memory_restart: '1G', // 最大内存使用亮
    env: { // development 默认环境变量
      // pm2 start ecosystem.config.js --watch --env development
      NODE_ENV: 'development'
    },
    env_production: { // production 自定义环境变量
      NODE_ENV: 'production'
    }
  }],

  deploy : {
    production : {
      user : 'node',
      host : '212.83.163.1',
      ref  : 'origin/master',
      repo : 'git@github.com:repo.git',
      path : '/var/www/production',
      'post-deploy' : 'npm install && pm2 reload ecosystem.config.js --env production'
    }
  }
};

pm2 logs 日志功能也十分强大:

$: pm2 logs

II. Node.js 中进程池和线程池的适用场景

一般我们使用计算机执行的任务包含以下几种类型的任务:

  • 计算密集型任务:任务包含大量计算,CPU 占用率高。

    const matrix = {};
    for (let i = 0; i 
  • IO 密集型任务:任务包含频繁的、持续的网络 IO 和磁盘 IO 的调用。

    const {copyFileSync, constants} = require('fs');
    copyFileSync('big-file.zip', 'destination.zip');
  • 混合型任务:既有计算也有 IO。

一、进程池的适用场景

使用进程池的最大意义在于充分利用多核 CPU 资源,同时减少子进程创建和销毁的资源消耗

进程是操作系统分配资源的基本单位,使用多进程架构能够更多的获取 CPU 时间、内存等资源。为了应对 CPU-Sensitive 场景,以及充分发挥 CPU 多核性能,Node 提供了 child_process 模块用于创建子进程。

子进程的创建和销毁需要较大的资源成本,因此池化子进程的创建和销毁过程,利用进程池来管理所有子进程。

除了这一点,Node.js 中子进程也是唯一的执行二进制文件的方式,Node.js 可通过流 (stdin/stdout/stderr) 或 IPC 和子进程通信。

通过 Stream 通信

const {spawn} = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});

通过 IPC 通信

const cp = require('child_process');
const n = cp.fork(`${__dirname}/sub.js`);

n.on('message', (m) => {
  console.log('PARENT got message:', m);
});

n.send({hello: 'world'});

二、线程池的适用场景

使用线程池的最大意义在于多任务并行,为主线程降压,同时减少线程创建和销毁的资源消耗。单个 CPU 密集性的计算任务使用线程执行并不会更快,甚至线程的创建、销毁、上下文切换、线程通信、数据序列化等操作还会额外增加资源消耗。

但是如果一个计算机程序中有很多同一类型的阻塞任务需要执行,那么将他们交给线程池可以成倍的减少任务总的执行时间,因为在同一时刻多个线程在并行进行计算。如果多个任务只使用主线程执行,那么最终消耗的时间是线性叠加的,同时主线程阻塞之后也会影响其它任务的处理。

特别是对 Node.js 这种单主线程的语言来讲,主线程如果消耗了过多的时间来执行这些耗时任务,那么对整个 Node.js 单个进程实例的性能影响将是致命的。这些占用着 CPU 时间的操作将导致其它任务获取的 CPU 时间不足或 CPU 响应不够及时,被影响的任务将进入 “饥饿” 状态。

因此 Node.js 启动后主线程应尽量承担调度的角色,批量重型 CPU 占用任务的执行应交由额外的工作线程处理,主线程最后拿到工作线程的执行结果再返回给任务调用方。另一方面由于 IO 操作 Node.js 内部作了优化和支持,因此 IO 操作应该直接交给主线程,主线程再使用内部线程池处理。

Node.js 的异步能不能解决过多占用 CPU 任务的执行问题?

答案是:不能,过多的异步 CPU 占用任务会阻塞事件循环。

Node.js 的异步在 网络 IO / 磁盘 IO 处理上很有用,宏任务微任务系统 + 内部线程调用能分担主进程的执行压力。但是如果单独将 CPU 占用任务放入宏任务队列或微任务队列,对任务的执行速度提升没有任何帮助,只是一种任务调度方式的优化而已。

我们只是延迟了任务的执行或是将巨大任务分散成多个再分批执行,但是任务最终还是要在主线程被执行。如果这类任务过多,那么任务分片和延迟的效果将完全消失,一个任务可以,那十个一百个呢?量变将会引起质变。

以下是 Node.js 官方博客中的原文:

“如果你需要做更复杂的任务,拆分可能也不是一个好选项。这是因为拆分之后任务仍然在事件循环线程中执行,并且你无法利用机器的多核硬件能力。 请记住,事件循环线程只负责协调客户端的请求,而不是独自执行完所有任务。 对一个复杂的任务,最好把它从事件循环线程转移到工作线程池上。”

  • 场景:间歇性让主进程 瘫痪

每一秒钟,主线程有一半时间被占用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask(); // 100ms
  doHeavyTask(); // 200ms
  doHeavyTask(); // 300ms
  doHeavyTask(); // 400ms
  doHeavyTask(); // 500ms
}, 1e3);
  • 场景:高频性让主进程 半瘫痪

每 200ms,主线程有一半时间被占用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask();
}, 1e3);

setInterval(() => {
  doHeavyTask();
}, 1.2e3);

setInterval(() => {
  doHeavyTask();
}, 1.4e3);

setInterval(() => {
  doHeavyTask();
}, 1.6e3);

setInterval(() => {
  doHeavyTask();
}, 1.8e3);

以下是官方博客的原文摘录:

“因此,你应该保证永远不要阻塞事件轮询线程。换句话说,每个 JavaScript 回调应该快速完成。这些当然对于 await,Promise.then 也同样适用。”

III. 进程池

进程池是对进程的创建、执行任务、销毁等流程进行管控的一个应用或是一套程序逻辑。之所以称之为池是因为其内部包含多个进程实例,进程实例随时都在进程池内进行着状态流转,多个创建的实例可以被重复利用,而不是每次执行完一系列任务后就被销毁。因此,进程池的部分存在目的是为了减少进程创建的资源消耗。

此外进程池最重要的一个作用就是负责将任务分发给各个进程执行,各个进程的任务执行优先级取决于进程池上的负载均衡运算,由算法决定应该将当前任务派发给哪个进程,以达到最高的 CPU 和内存利用率。常见的负载均衡算法有:

  • POLLING - 轮询:子进程轮流处理请求
  • WEIGHTS - 权重:子进程根据设置的权重来处理请求
  • RANDOM - 随机:子进程随机处理请求
  • SPECIFY - 指定:子进程根据指定的进程 id 处理请求
  • WEIGHTS_POLLING - 权重轮询:权重轮询策略与轮询策略类似,但是权重轮询策略会根据权重来计算子进程的轮询次数,从而稳定每个子进程的平均处理请求数量。
  • WEIGHTS_RANDOM - 权重随机:权重随机策略与随机策略类似,但是权重随机策略会根据权重来计算子进程的随机次数,从而稳定每个子进程的平均处理请求数量。
  • MINIMUM_CONNECTION - 最小连接数:选择子进程上具有最小连接活动数量的子进程处理请求。
  • WEIGHTS_MINIMUM_CONNECTION - 权重最小连接数:权重最小连接数策略与最小连接数策略类似,不过各个子进程被选中的概率由连接数和权重共同决定。

一、要点

「 对单一任务的控制不重要,对单个进程宏观的资源占用更需关注 」

二、流程设计

进程池架构图参考之前的进程管理工具开发相关 文章,本文只需关注进程池部分。

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

1. 关键流程

  • 进程池创建进程时会初始化进程实例内的 ProcessHost 事务对象,进程实例向事务对象注册多种任务监听器。
  • 用户向进程池发起单个任务调用请求,可传入进程绑定的 ID 和指定的任务名。
  • 判断用户是否传入 ID 参数指定使用某个进程执行任务,如果未指定 ID:
    • 进程池判断当前进程池进程数量是否已超过最大值,如果未超过则创建新进程,用此进程处理当前任务,并将进程放入进程池。
    • 如果进程池进程数量已达最大值,则根据负载均衡算法选择一个进程处理当前任务。
  • 指定 ID 时:
    • 通过用户传入的 ID 参数找到对应进程,将任务分发给此进程执行。
    • 如果未找到 ID 所对应的进程,则向用户抛出异常。
  • 任务由进程池派发给目标进程后,ProcessHost 事务对象会根据该任务的任务名触发子进程内的监听器。
  • 子进程内的监听器函数可执行同步任务和异步任务,异步任务返回 Promise 对象,同步任务返回值。
  • ProcessHost 事务对象的监听器函数执行完毕后,会将任务结果返回给进程池,进程池再将结果通过异步回调函数返回给用户。
  • 用户也可向进程池所有子进程发起个任务调用请求,最终将会通过 Promise 的返回所有子进程的任务执行结果。

2. 名词解释

  • ProcessHost 事务中心:运行在子进程中,用于事件触发以及和主进程通信。开发者在子进程执行文件中向其注册多个具有特定任务名的任务事件,主进程会向某个子进程发送任务请求,并由事务中心调用指定的事件监听器处理请求。
  • LoadBalancer 负载均衡器:用于选择一个进程处理任务,可根据不同的负载均衡算法实现不同的选择策略。
  • LifeCycle: 设计之初用于管控子进程的智能启停,某个进程在长时间未被使用时进入休眠状态,当有新任务到来时再唤醒进程。目前还有些难点需要解决,比如进程的唤醒和休眠不好实现,进程的使用情况不好统计,该功能暂时不可用。

三、进程池使用方式

更多示例见:进程池 mocha 单元测试

1. 创建进程池

main.js

const { ChildProcessPool, LoadBalancer } = require('electron-re');

const processPool = new ChildProcessPool({
  path: path.join(__dirname, 'child_process/child.js'),
  max: 4,
  strategy: LoadBalancer.ALGORITHM.POLLING,
);

child.js

const { ProcessHost } = require('electron-re');

ProcessHost
  .registry('test1', (params) => {
    console.log('test1');
    return 1 + 1;
  })
  .registry('test2', (params) => {
    console.log('test2');
    return new Promise((resolve) => resolve(true));
  });

2. 向一个子进程发送任务请求

processPool.send('test1', { value: "test1"}).then((result) => {
  console.log(result);
});

3. 向所有子进程发送任务请求

processPool.sendToAll('test1', { value: "test1"}).then((results) => {
  console.log(results);
});

四、进程池实际使用场景

1. Electron 网页代理工具中多进程的应用

1)基本代理原理:

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

2)单进程下客户端执行原理:

  • 通过用户预先保存的服务器配置信息,使用 node.js 子进程来启动 ss-local 可执行文件建立和 ss 服务器的连接来代理用户本地电脑的流量,每个子进程占用一个 socket 端口。
  • 其它支持 socks5 代理的 proxy 工具比如:浏览器上的 SwitchOmega 插件会和这个端口的 tcp 服务建立连接,将 tcp 流量加密后通过代理服务器转发给我们需要访问的目标服务器。

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

3)多进程下客户端执行原理:

以上描述的是客户端连接单个节点的工作模式,节点订阅组中的负载均衡模式需要同时启动多个子进程,每个子进程启动 ss-local 执行文件占用一个本地端口并连接到远端一个服务器节点。

每个子进程启动时选择的端口是会变化的,因为某些端口可能已经被系统占用,程序需要先选择未被使用的端口。并且浏览器 proxy 工具也不可能同时连接到我们本地启动的子进程上的多个 ss-local 服务上。因此需要一个占用固定端口的中间节点接收 proxy 工具发出的连接请求,然后按照某种分发规则将 tcp 流量转发到各个子进程的 ss-local 服务的端口上。

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

2. 다중 프로세스 파일 분할 업로드 Electron 클라이언트

이전에 Node.js 측의 업로드 작업 관리, IO 작업 등을 지원하는 클라이언트를 만들었습니다. 여러 프로세스를 사용하여 구현했지만 gitlab의 실험 분기에서 직접 만들었습니다(이스케이프).

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

IV.스레드 풀

CPU 집약적 작업 계산의 시스템 오버헤드를 줄이기 위해 Node.js는 v10.5.0에서 처음 출시된 작업자 스레드 Worker_threads라는 새로운 기능을 도입했습니다. 실험적인 기능으로 나타납니다. 작업자 스레드를 통해 프로세스 내에서 여러 스레드를 생성할 수 있으며, 기본 스레드와 작업자 스레드는 parentPort를 사용하여 통신하고 작업자 스레드는 MessageChannel을 통해 직접 통신할 수 있습니다. 개발자가 스레드를 사용하는 데 중요한 기능으로, v12.11.0 안정 버전의 프로덕션 환경에서 Worker_threads를 정상적으로 사용할 수 있습니다. v10.5.0 作为实验性功能出现。通过 worker_threads 可以在进程内创建多个线程,主线程与 worker 线程使用 parentPort 通信,worker 线程之间可通过 MessageChannel 直接通信。worker_threads 做为开发者使用线程的重要特性,在 v12.11.0 稳定版已经能正常在生产环境使用了。

但是线程的创建需要额外的 CPU 和内存资源,如果要多次使用一个线程的话,应该将其保存起来,当该线程完全不使用时需要及时关闭以减少内存占用。想象我们在需要使用线程时直接创建,使用完后立刻销毁,可能线程自身的创建和销毁成本已经超过了使用线程本身节省下的资源成本。Node.js 内部虽然有使用线程池,但是对于开发者而言是完全透明不可见的,因此封装一个能够维护线程生命周期的线程池工具的重要性就体现了。

为了强化多异步任务的调度,线程池除了提供维护线程的能力,也提供维护任务队列的能力。当发送请求给线程池让其执行一个异步任务时,如果线程池内没有空闲线程,那该任务就会被直接丢弃了,显然这不是想要的效果。

因此可以考虑为线程池添加一个任务队列的调度逻辑:当线程池没有空闲线程时,将该任务放入待执行任务队列 (FIFO),线程池在某个时机取出任务交由某个空闲线程执行,执行完成后触发异步回调函数,将执行结果返回给请求调用方。但是线程池的任务队列内的任务数量应该考虑限制到一个特殊值,防止线程池负载过大影响 Node.js 应用整体运行性能。

一、要点

「 对单一任务的控制重要,对单个线程的资源占用无需关注 」

二、详细设计

Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.

任务流转过程

  • 调用者可通过 StaticPool/StaticExcutor/DynamicPool/DynamicExcutor

    그러나 스레드를 생성하려면 추가 CPU 및 메모리 리소스가 필요합니다. 스레드를 여러 번 사용할 경우 스레드를 전혀 사용하지 않을 때는 메모리 사용량을 줄이기 위해 제때에 닫아야 합니다. . 쓰레드를 사용해야 할 때 직접 생성하고, 사용 후 즉시 소멸시킨다고 상상해 보세요. 아마도 쓰레드 자체를 생성하고 소멸시키는 데 드는 비용이 쓰레드 자체를 사용하여 절약하는 리소스 비용을 초과했을 수도 있습니다. Node.js는 내부적으로 스레드 풀을 사용하지만 완전히 투명하고 개발자에게 보이지 않기 때문에 스레드 수명주기를 유지할 수 있는 스레드 풀 도구를 캡슐화하는 것이 중요합니다.
  • 여러 비동기 작업의 예약을 향상시키기 위해 스레드 풀은 스레드를 유지 관리하는 기능뿐만 아니라 작업 대기열을 유지 관리하는 기능도 제공합니다. 비동기 작업을 수행하기 위해 스레드 풀에 요청을 보낼 때 스레드 풀에 유휴 스레드가 없으면 작업이 직접 삭제됩니다. 이는 분명히 원하는 효과가 아닙니다.

    따라서 스레드 풀에 작업 대기열 예약 논리를 추가하는 것을 고려할 수 있습니다. 스레드 풀에 유휴 스레드가 없으면 해당 작업을 실행될 작업 대기열(FIFO)에 넣으면 스레드 풀이 해당 스레드를 제거합니다. 실행이 완료된 후 비동기 콜백 함수가 트리거되고 실행 결과가 요청 호출자에게 반환됩니다. 그러나 스레드 풀의 작업 큐에 있는 작업 수는 스레드 풀의 과도한 로드가 Node.js 애플리케이션의 전체 성능에 영향을 미치는 것을 방지하기 위해 특별한 값으로 제한되어야 합니다.
  • 1. 요점

    「 단일 작업의 제어가 중요합니다. 스레드의 리소스 사용량에 신경 쓸 필요가 없습니다."
  • 2. 세부 디자인

    Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.
  • 작업 흐름 프로세스

    • 호출자는 StaticPool/StaticExcutor/DynamicPool/을 전달할 수 있습니다. DynamicExcutor 인스턴스를 스레드로 풀은 작업을 전달합니다(주요 용어는 아래에 설명되어 있습니다). 다양한 인스턴스 간의 가장 큰 차이점은 매개변수 동적 기능입니다.

    • 작업은 스레드 풀에 의해 내부적으로 생성됩니다. 작업은 한편으로는 사용자가 전달한 작업 계산 매개변수를 전달하는 주요 전송 매체로 사용됩니다. 작업 상태, 시작 시간, 종료 시간, 작업 ID, 작업 재시도 횟수, 작업이 재시도를 지원하는지 여부, 작업 유형 등과 같은 작업 전송 프로세스 중 상태 변경 사항을 기록합니다.

    • 작업이 생성된 후 먼저 현재 스레드 풀의 스레드 수가 상한에 도달했는지 확인합니다. 상한에 도달하지 않은 경우 새 스레드를 생성하여 스레드 저장 영역에 넣습니다. 그런 다음 이 스레드를 사용하여 현재 작업을 직접 실행합니다.

      🎜🎜스레드 풀의 스레드 수가 제한을 초과하는 경우 작업을 실행하지 않은 유휴 스레드가 있는지 확인하고 유휴 스레드를 가져온 후 이 스레드를 사용하여 현재 작업을 직접 실행합니다. 🎜🎜🎜🎜유휴 스레드가 없으면 현재 대기 중인 작업 큐가 가득 찼는지 여부를 판단합니다. 작업 큐가 가득 찬 경우 오류가 발생하여 호출자가 작업이 성공적으로 실행되지 않았음을 감지할 수 있습니다. 처음으로. 🎜🎜🎜🎜작업 대기열이 가득 차지 않은 경우 작업을 작업 대기열에 넣고 작업 주기 시스템이 해당 작업을 꺼내서 실행할 때까지 기다립니다. 🎜🎜🎜🎜위의 4/5/6 단계의 세 가지 경우에 대해 작업이 실행된 후 작업이 성공적으로 실행되었는지 여부를 판단하여 성공 콜백 함수가 트리거되고 Promise 상태가 채워집니다. 실패할 경우 재시도가 지원되는지 확인합니다. 재시도가 지원되면 작업이 +1 재시도된 다음 작업 대기열의 끝에 다시 배치됩니다. 작업이 재시도를 지원하지 않으면 직접 실패하고 실패한 비동기 콜백 함수가 트리거되며 약속 상태가 거부됩니다. 🎜🎜🎜🎜전체 스레드 풀 수명 주기 동안 특정 주기로 작업 큐의 선두에서 작업을 획득하고 스레드 저장 영역에서 유휴 스레드를 획득한 후 이 스레드를 사용하여 작업을 수행하는 작업 주기 시스템이 있습니다. 작업을 실행합니다. 이 프로세스는 섹션 7 단계 설명도 준수합니다. 🎜
    • 任务循环系统除了取任务执行,如果线程池设置了任务超时时间的话,也会判断正在执行中的任务是否超时,超时后会终止该线程的所有运行中的代码。

    模块说明

    • StaticPool
      • 定义:静态线程池,可使用固定的 execFunction/execString/execFile 执行参数来启动工作线程,执行参数在进程池创建后不能更改。
      • 进程池创建之后除了执行参数不可变外,其它参数比如:任务超时时间、任务重试次数、线程池任务轮询间隔时间、最大任务数、最大线程数、是否懒创建线程等都可以通过 API 随时更改。
    • StaticExcutor
      • 定义:静态线程池的执行器实例,继承所属线程池的固定执行参数 execFunction/execString/execFile 且不可更改。
      • 执行器实例创建之后除了执行参数不可变外,其它参数比如:任务超时时间、任务重试次数、transferList 等都可以通过 API 随时更改。
      • 静态线程池的各个执行器实例的参数设置互不影响,参数默认继承于所属线程池,参数在执行器上更改后具有比所属线程池同名参数更高的优先级。
    • DynamicPool
      • 定义:动态线程池,无需使用 execFunction/execString/execFile 执行参数即可创建线程池。执行参数在调用 exec() 方法时动态传入,因此执行参数可能不固定。
      • 线程池创建之后执行参数默认为 null,其它参数比如:任务超时时间、任务重试次数、transferList 等都可以通过 API 随时更改。
    • DynamicExcutor
      • 定义:动态线程池的执行器实例,继承所属线程池的其它参数,执行参数为 null
      • 执行器实例创建之后,其它参数比如:任务超时时间、任务重试次数、transferList 等都可以通过 API 随时更改。
      • 动态线程池的各个执行器实例的参数设置互不影响,参数默认继承于所属线程池,参数在执行器上更改后具有比所属线程池同名参数更高的优先级。
      • 动态执行器实例在执行任务之前需要先设置执行参数 execFunction/execString/execFile,执行参数可以随时改变。
    • ThreadGenerator
      • 定义:线程创建的工厂方法,会进行参数校验。
    • Thread
      • 定义:线程实例,内部简单封装了 worker_threads API。
    • TaskGenerator
      • 定义:任务创建的工厂方法,会进行参数校验。
    • Task
      • 定义:单个任务,记录了任务执行状态、任务开始结束时间、任务重试次数、任务携带参数等。
    • TaskQueue
      • 定义:任务队列,在数组中存放任务,以先入先出方式 (FIFO) 向线程池提供任务,使用 Map 来存储 taskId 和 task 之间的映射关系。
    • Task Loop
      • 任务循环,每个循环的默认时间间隔为 2S,每次循环中会处理超时任务、将新任务派发给空闲线程等。

    三、线程池使用方式

    更多示例见:线程池 mocha 单元测试

    1. 创建静态线程池

    main.js

    const { StaticThreadPool } = require(`electron-re`);
    const threadPool = new StaticThreadPool({
      execPath: path.join(__dirname, './worker_threads/worker.js'),
      lazyLoad: true, // 懒加载
      maxThreads: 24, // 最大线程数
      maxTasks: 48, // 最大任务数
      taskRetry: 1, // 任务重试次数
      taskLoopTime: 1e3, // 任务轮询时间
    });
    const executor = threadPool.createExecutor();

    worker.js

    const fibonaccis = (n) => {
      if (n  {
      return fibonaccis(value);
    }

    2. 使用静态线程池发送任务请求

    threadPool.exec(15).then((res) => {
      console.log(+res.data === 610)
    });
    
    executor
      .setTaskRetry(2) // 不影响 pool 的全局设置
      .setTaskTimeout(2e3) // 不影响 pool 的全局设置
      .exec(15).then((res) => {
        console.log(+res.data === 610)
      });

    3. 动态线程池和动态执行器

    const { DynamicThreadPool } = require(`electron-re`);
    const threadPool = new DynamicThreadPool({
      maxThreads: 24, // 最大线程数
      maxTasks: 48, // 最大任务数
      taskRetry: 1, // 任务重试次数
    });
    const executor = threadPool.createExecutor({
      execFunction: (value) => { return 'dynamic:' + value; },
    });
    
    threadPool.exec('test', {
      execString: `module.exports = (value) => { return 'dynamic:' + value; };`,
    });
    executor.exec('test');
    executor
      .setExecPath('/path/to/exec-file.js')
      .exec('test');

    四、线程池实际使用场景

    暂未在项目中实际使用,可考虑在前端图片像素处理、音视频转码处理等 CPU 密集性任务中进行实践。

    这里有篇文章写了 web_worker 的一些应用场景,web_worker 和 worker_threads 是类似的,宿主环境不同,一些权限和能力的不同而已。

    V.Ending

    초기 project는 Electron 애플리케이션 개발을 위한 도구 세트로 BrowserService / ChildProcessPool / Simple process Monitoring UI / Inter-Process Communication 등의 기능을 제공하며, 스레드 풀의 사실 처음에는 계획되지 않았으며 스레드 풀 자체는 독립적이며 Electron-re의 다른 모듈 기능에 의존하지 않습니다. 앞으로는 독립적이어야 합니다. BrowserService / ChildProcessPool / 简易进程监控 UI / 进程间通信 等功能,线程池的加入其实是当初没有计划的,而且线程池本身是独立的,不依赖 electron-re 中其它模块功能,之后应该会被独立出去。

    进程池和线程池的实现方案上还需完善。

    比如进程池未支持子进程空闲时自动退出以解除资源占用,当时做了另一版监听 ProcessHost 的任务执行情况来让子进程空闲时休眠,想通过此方式节省资源占用。不过由于没有 node.js API 级别的支持以分辨子进程空闲的情况,并且子进程的休眠 / 唤醒功能比较鸡肋 (有尝试通过向子进程发送 SIGSTOP/SIGCONT 信号实现),最终这个特性被废除了。

    后面可以考虑支持 CPU/Memory 的负载均衡算法,目前已经通过项目中的 ProcessManager 模块来实现资源占用情况采集了。

    线程池方面相对的可用度还是较高,提供了 pool/excutor 两个层级的调用管理,支持链式调用,在一些需要提升数据传输性能的场景支持 transferList

    프로세스 풀과 스레드 풀 구현을 개선해야 합니다.

    예를 들어, 프로세스 풀은 자원 점유를 완화하기 위해 유휴 상태일 때 하위 프로세스의 자동 종료를 지원하지 않습니다. 당시에는 ProcessHost의 작업 실행 상태를 모니터링하여 하위 프로세스가 절전 모드로 전환될 수 있도록 하는 또 다른 버전이 만들어졌습니다. 이런 식으로 자원 점유를 저장하고 싶었습니다. 그러나 자식 프로세스의 유휴 상태를 식별하는 node.js API 수준 지원이 없고 자식 프로세스의 절전/깨우기 기능은 상대적으로 쓸모가 없기 때문에 (SIGSTOP을 전송하여 이를 달성하려는 시도가 있습니다) /SIGCONT는 하위 프로세스에 신호를 보냅니다. 결국 이 기능은 더 이상 사용되지 않습니다.

    추후 CPU/메모리 로드 밸런싱 알고리즘 지원을 고려할 수 있습니다. 현재 리소스 사용량 수집은 프로젝트의 ProcessManager 모듈을 통해 구현되었습니다. 🎜🎜스레드 풀의 상대적 가용성은 여전히 ​​높습니다. 이는 두 가지 수준의 호출 관리 풀/실행자를 제공하고 체인 호출을 지원하며 데이터 전송 성능이 필요한 일부 시나리오에서 를 지원합니다. 개선되었습니다. transferList 메소드는 데이터 복제를 방지합니다. 다른 오픈소스 Node 스레드 풀 솔루션과 비교하여 작업 큐 기능 강화에 중점을 두고 작업 재시도, 작업 타임아웃 등의 기능을 지원합니다. 🎜🎜노드 관련 지식을 더 보려면 🎜nodejs 튜토리얼🎜을 방문하세요! 🎜

    위 내용은 Node를 사용하여 경량 프로세스 풀과 스레드 풀을 구현하는 방법에 대해 이야기해 보겠습니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제