>웹 프론트엔드 >JS 튜토리얼 >Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)

Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)

青灯夜游
青灯夜游앞으로
2021-08-05 10:26:034559검색

이 기사에서는 여러분과 함께 논의할 것입니다. Node.jsworker_threads 모듈에서 제공하는 멀티스레딩 모델인 멀티 코어 방식을 사용하여 Node.js 멀티 프로세스 모델에서 공유 메모리를 구현하는 방법을 소개합니다.

Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)

Node.js 단일 스레드 모델 설계로 인해 노드 프로세스(메인 스레드)는 하나의 CPU 코어만 사용할 수 있습니다. 그러나 오늘날의 시스템은 기본적으로 멀티 코어이므로 심각한 성능 낭비가 발생합니다. 일반적으로 다중 코어를 활용하려는 경우 일반적으로 다음과 같은 방법이 있습니다.

  • Node용 C++ 플러그인을 작성하여 스레드 풀을 확장하고 CPU 시간이 많이 소요되는 작업을 다른 스레드에 위임합니다. JS 코드.

  • worker_threads 모듈에서 제공하는 멀티스레딩 모델을 사용하세요(아직 실험적임).

  • child_process 또는 cluster 모듈에서 제공하는 다중 프로세스 모델을 사용하세요. 각 프로세스는 독립적인 Node.js 프로세스입니다.

사용 편의성, 코드 침입, 안정성 측면에서 볼 때 일반적으로 다중 프로세스 모델이 첫 번째 선택입니다. [추천 학습: "nodejs Tutorial"]

Node.js 클러스터 다중 프로세스 모델의 문제점

클러스터 모듈에서 제공하는 다중 프로세스 모델에서 각 Node 프로세스는 독립적이고 완전합니다. 프로세스 애플리케이션 프로세스는 자체 메모리 공간을 가지며 다른 프로세스에서 액세스할 수 없습니다. 따라서모든 작업자 프로세스는 프로젝트가 시작될 때 일관된 상태와 동작을 가지지만 후속 실행 중에 해당 상태가 일관되게 유지된다는 보장은 없습니다.

예를 들어 프로젝트가 시작되면 프로세스 A와 프로세스 B라는 두 개의 작업자 프로세스가 있습니다. 두 프로세스 모두 변수 a=1을 선언합니다. 그런데 프로젝트가 요청을 받았고, 마스터 프로세스가 처리를 위해 이를 프로세스 A에 할당했습니다. 이 요청으로 a의 값이 2로 변경되었습니다. 이때 프로세스 A의 메모리 공간에서는 a=2이지만 메모리 공간에서는 a=2입니다. 프로세스 B의 메모리 공간. 여전히 1. 이때, a의 값을 읽어 달라는 요청이 있으면 마스터 프로세스가 프로세스 A와 프로세스 B에 요청을 디스패치할 때 읽은 결과가 일치하지 않아 일관성 문제가 발생한다.

클러스터 모듈은 설계 시 솔루션을 제공하지 않았지만 작업자 프로세스가 상태 비저장이어야 했습니다. 즉, 프로그래머는 코드 작성 시 요청을 처리할 때 메모리의 값을 수정하는 것이 허용되어서는 안 됩니다. 모든 작업자 프로세스 일관성. 그러나 실제로는 사용자의 로그인 상태를 기록하는 등 메모리에 기록해야 하는 다양한 상황이 항상 존재합니다. 실제로 이러한 상태 데이터는 데이터베이스, Redis, 메시지 대기열 등 외부에 기록됩니다. 상태 저장 요청이 처리될 때마다 외부 저장 공간을 읽고 씁니다.

이것은 효과적인 접근 방식입니다.

그러나 이를 위해서는 추가 외부 저장 공간이 필요하며 동시에 여러 프로세스의 동시 액세스에서 일관성 문제를 처리하고 데이터의 수명 주기를 유지해야 합니다. 자체적으로(Node 프로세스와 유지보수가 외부이기 때문에 데이터가 동기적으로 생성 및 소멸되지 않음), 동시 접속량이 많은 경우(데이터베이스 등 비메모리 환경에 저장되는 경우) IO 성능 병목 현상이 발생함 . 사실 본질적으로 여러 프로세스에서 공유하고 액세스할 수 있는 공간만 있으면 됩니다. 영구 저장소는 필요하지 않습니다. 이 공간의 수명 주기는 노드 프로세스에 가장 잘 연결되어 있으므로 많은 비용을 절약할 수 있습니다. 사용시 번거로움이 적습니다. 따라서 크로스 프로세스 공유 메모리는 이 시나리오에서 사용하기에 가장 적합한 방법이 되었습니다.

Node.js의 공유 메모리

안타깝게도 Node 자체에서는 공유 메모리 구현을 제공하지 않으므로 npm 저장소에서 타사 라이브러리의 구현을 살펴볼 수 있습니다. 이러한 라이브러리 중 일부는 C++ 플러그인을 통해 Node의 기능을 확장하여 구현되고 일부는 Node에서 제공하는 IPC 메커니즘을 통해 구현됩니다. 불행하게도 이들 라이브러리의 구현은 매우 간단하며 상호 배타적인 액세스, 개체 모니터링 및 기타 기능을 제공하지 않습니다. 작성자는 이 공유 메모리를 주의 깊게 유지해야 합니다. 그렇지 않으면 타이밍 문제가 발생할 수 있습니다.

주위를 둘러보았으나 원하는 것을 찾을 수 없었습니다. . . 잊어버리세요. 제가 직접 작성하겠습니다.

공유 메모리 설계

우선 어떤 종류의 공유 ​​메모리가 필요한지 명확히 해야 합니다. (프로세스 전반에 걸쳐 액세스되는 상태 데이터를 프로젝트에서 사용하기 위해) 필요에 따라 시작했습니다. 동시에 다양성을 고려하여 다음 사항을 먼저 고려할 것입니다.

  • JS 객체를 읽기 및 쓰기 액세스의 기본 단위로 사용합니다.

  • 한 프로세스가 접근하면 다른 프로세스는 차단됩니다.

  • 공유 메모리의 객체를 모니터링할 수 있으며, 객체가 변경되면 모니터링 프로세스에 알릴 수 있습니다.

  • 위의 조건을 충족한다는 전제하에 구현 방법은 최대한 간단합니다.

실제로 운영 체제 수준에서는 공유 메모리가 필요하지 않다는 것을 알 수 있습니다. 그러면 여러 노드 프로세스가 동일한 개체에 액세스할 수 있으면 다음에서 제공하는 메커니즘에 구현할 수 있습니다. 노드 자체. Master 프로세스의 메모리 공간 중 일부를 공유 메모리 공간으로 사용할 수 있습니다. Worker 프로세스는 IPC를 통해 읽기 및 쓰기 요청을 Master 프로세스에 위임하고, Master 프로세스는 읽고 쓴 후 결과를 반환합니다. IPC를 통한 작업자 프로세스.

마스터 프로세스와 작업자 프로세스에서 공유 메모리를 일관되게 사용하기 위해 공유 메모리의 작업을 인터페이스로 추상화하고 이 인터페이스를 각각 마스터 프로세스와 작업자 프로세스에 구현할 수 있습니다. 클래스 다이어그램은 아래 그림과 같습니다. SharedMemory 클래스가 추상 인터페이스로 사용되며 개체는 server.js 항목 파일에 선언됩니다. 마스터 프로세스에서는 Manager 개체로, 작업자 프로세스에서는 Worker 개체로 인스턴스화됩니다. Manager 개체는 공유 메모리를 유지하고 공유 메모리에 대한 읽기 및 쓰기 요청을 처리하는 반면, Worker 개체는 마스터 프로세스에 읽기 및 쓰기 요청을 보냅니다. SharedMemory类作为抽象接口,在server.js入口文件中声明该对象。其在Master进程中实例化为Manager对象,在Worker进程中实例化为Worker对象。Manager对象来维护共享内存,并处理对共享内存的读写请求,而Worker对象则将读写请求发送到Master进程。

Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)

可以使用Manager类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露getsetremove等基本操作,避免该属性直接被修改。

由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

为了使用简单,我们可以将SharedMemory设计成单例,这样每个进程中就只有一个实例,并可以在importSharedMemory之后直接使用。

代码实现

读写控制与IPC通信

首先实现对外接口SharedMemory类,这里没有使用让ManagerWorker继承SharedMemory的方式,而是让SharedMemory在实例化的时候返回一个ManagerWorker的实例,从而实现自动选择子类。

在Node 16中isPrimary替代了isMaster,这里为了兼容使用了两种写法。

// shared-memory.js
class SharedMemory {
  constructor() {
    if (cluster.isMaster || cluster.isPrimary) {
      return new Manager();
    } else {
      return new Worker();
    }
  }
}

Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

// manager.js
const cluster = require('cluster');

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      set(key, value) {
        this.memory[key] = value;
      },
      get(key) {
        return this.memory[key];
      },
      remove(key) {
        delete this.memory[key];
      },
      memory: {},
    };

    // Listen the messages from worker processes.
    cluster.on('online', (worker) => {
      worker.on('message', (data) => {
        this.handle(data, worker);
        return false;
      });
    });
  }

  handle(data, target) {
    const args = data.value ? [data.key, data.value] : [data.key];
    this[data.method](...args).then((value) => {
      const msg = {
        id: data.id, // workerId
        uuid: data.uuid, // communicationID
        value,
      };
      target.send(msg);
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      resolve('OK');
    });
  }

  get(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedMemory__.get(key));
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      resolve('OK');
    });
  }
}

Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__

Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)🎜🎜예 Manager 클래스의 속성을 공유 메모리 객체로 사용하여 객체에 접근하는 방법은 일반 JS 객체에 접근하는 방법과 동일하며, 만 노출되도록 캡슐화 계층을 만듭니다. >get , set, remove 및 기타 기본 작업을 통해 속성이 직접 수정되는 것을 방지할 수 있습니다. 🎜🎜모든 Worker 프로세스보다 먼저 Master 프로세스가 생성되므로, 각 Worker 프로세스가 생성된 후 즉시 공유 메모리에 접근할 수 있도록 Master 프로세스에서 공유 메모리 공간을 선언한 후 Worker 프로세스를 생성하면 됩니다. 🎜🎜사용 편의성을 위해 SharedMemory를 싱글톤으로 설계하여 각 프로세스에 인스턴스가 하나만 있고 SharedMemory가져오기할 수 있습니다. code> code> 바로 뒤에 사용하세요. 🎜

🎜코드 구현🎜

🎜🎜읽기 및 쓰기 제어 및 IPC 통신🎜🎜🎜먼저 외부 인터페이스 SharedMemory 구현 code> 클래스, 여기서는 <code>ManagerWorkerSharedMemory를 상속하도록 하는 방법을 사용하지 않고 SharedMemory를 허용합니다. code>는 인스턴스에서 사용됩니다. 변환되면 <code>Manager 또는 Worker의 인스턴스가 반환되어 하위 클래스의 자동 선택을 실현합니다. 🎜
🎜노드 16isPrimary isMaster를 대체합니다. 여기서는 호환성을 위해 두 가지 쓰기 방법이 사용됩니다. 🎜
// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}
🎜Manager는 공유 메모리 공간 관리를 담당합니다. __sharedMemory__ 속성을 ​​Manager 객체에 직접 추가합니다. 또한 JS 객체는 JS의 가비지 수집 관리에 포함되므로 메모리 정리 및 데이터 마이그레이션과 같은 작업을 수행할 필요가 없으므로 구현이 매우 간단합니다. 그런 다음 set, getremove와 같은 표준 작업이 __sharedMemory__에 정의되어 액세스 방법을 제공합니다. 🎜🎜cluster.on('online', callback)을 통해 작업자 프로세스의 생성 이벤트를 수신하고 즉시 worker.on('message', callback)을 사용합니다. 생성 코드 이후> 작업자 프로세스에서 IPC 통신을 모니터링하고 처리를 위해 통신 메시지를 handle 함수에 전달합니다. 🎜🎜handle 함수의 역할은 작업자 프로세스가 어떤 작업을 수행하려는지 구별하고 해당 작업의 매개변수를 꺼내어 해당 set)에 맡기는 것입니다. >, get , remove 함수(set, get, remove가 아님) code> in <code>__sharedMemory__) 처리한 결과를 작업자 프로세스에 반환합니다. 🎜
// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}
🎜Workerprocess.on을 사용하여 객체가 생성된 이후 마스터 프로세스의 반환 메시지를 모니터링합니다(결국 메시지가 생성될 때까지 기다릴 수 없습니다). 모니터링하기 전에 전송하면 너무 늦습니다) ). __getCallbacks__ 객체의 역할에 대해서는 나중에 다루겠습니다. 이 시점에서 Worker 개체가 생성됩니다. 🎜🎜나중에 프로젝트가 어딘가에서 실행될 때 공유 메모리에 액세스하려면 Workerset, get을 호출하고 remove 함수를 사용하면 handle 함수를 호출하여 process.send를 통해 마스터 프로세스에 메시지를 보냅니다. 결과를 반환할 때 수행할 작업을 가져옵니다. __getCallbacks__에 문서화되어 있습니다. 결과가 반환되면 process.on의 이전 함수에서 모니터링하고 해당 콜백 함수를 __getCallbacks__에서 꺼내어 실행합니다. 🎜

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间 进程A 进程B 共享内存中变量x的值
t0

0
t1 读取x(x=0)
0
t2 x1=x+1(x1=1) 读取x(x=0) 0
t3 将x1的值写回x x2=x+1(x2=1) 1
t4
将x2的值写回x 1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.js
class Worker {
  getLock(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLock&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      this.handle(&#39;releaseLock&#39;, key, lockId, (value) => {
        resolve(value);
      });
    });
  }
  ...
}

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.js
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      listeners: {},
    };
  }

  handle(data, target) {
    if (data.method === &#39;listen&#39;) {
      this.listen(data.key, (value) => {
        const msg = {
          isNotified: true,
          id: data.id,
          uuid: data.uuid,
          value,
        };
        target.send(msg);
      });
    } else {
      ...
    }
  }

  notifyListener(key) {
    const listeners = this.__sharedMemory__.listeners[key];
    if (listeners?.length > 0) {
      Promise.all(
        listeners.map(
          (callback) =>
            new Promise((resolve) => {
              callback(this.__sharedMemory__.get(key));
              resolve();
            })
        )
      );
    }
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.__sharedMemory__.listeners[key] =
        this.__sharedMemory__.listeners[key] ?? [];
      this.__sharedMemory__.listeners[key].push(callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.js
class Worker {
  constructor() {
    ...
    this.__getListenerCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      if (data.isNotified) {
        const callback = this.__getListenerCallbacks__[data.uuid];
        if (callback && typeof callback === &#39;function&#39;) {
          callback(data.value);
        }
      } else {
        ...
      }
    });
  }

  handle(method, key, value, callback) {
    ...
    if (method === &#39;listen&#39;) {
      this.__getListenerCallbacks__[uuid] = callback;
    } else {
      this.__getCallbacks__[uuid] = callback;
    }
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.handle(&#39;listen&#39;, key, null, callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.js
const LRU = require(&#39;lru-cache&#39;);

class Manager {
  constructor() {
    ...
    this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };
    this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
  }

  getLRU(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedLRUMemory__.get(key));
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.set(key, value);
      resolve(&#39;OK&#39;);
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.del(key);
      resolve(&#39;OK&#39;);
    });
  }
  ...
}

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.js
class Worker {
  getLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLRU&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;setLRU&#39;, key, value, () => {
        resolve();
      });
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;removeLRU&#39;, key, null, () => {
        resolve();
      });
    });
  }
  ...
}

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require(&#39;cluster&#39;);
// 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象
require(&#39;cluster-shared-memory&#39;);

if (cluster.isMaster) {
  // 在 master 进程中 fork 子进程
  for (let i = 0; i < 2; i++) {
    cluster.fork();
  }
} else {
  const sharedMemoryController = require(&#39;./src/shared-memory&#39;);
  const obj = {
    name: &#39;Tom&#39;,
    age: 10,
  };
  
  // 写对象
  await sharedMemoryController.set(&#39;myObj&#39;, obj);
  
  // 读对象
  const myObj = await sharedMemoryController.get(&#39;myObj&#39;);
  
  // 互斥访问对象,首先获得对象的锁
  const lockId = await sharedMemoryController.getLock(&#39;myObj&#39;);
  const newObj = await sharedMemoryController.get(&#39;myObj&#39;);
  newObj.age = newObj.age + 1;
  await sharedMemoryController.set(&#39;myObj&#39;, newObj);
  // 操作完之后释放锁
  await sharedMemoryController.releaseLock(&#39;requestTimes&#39;, lockId);
  
  // 或者使用 mutex 函数自动获取和释放锁
  await sharedMemoryController.mutex(&#39;myObj&#39;, async () => {
    const newObjM = await sharedMemoryController.get(&#39;myObj&#39;);
    newObjM.age = newObjM.age + 1;
    await sharedMemoryController.set(&#39;myObj&#39;, newObjM);
  });
  
  // 监听对象
  sharedMemoryController.listen(&#39;myObj&#39;, (value) => {
    console.log(`myObj: ${value}`);
  });
  
  //写LRU缓存
  await sharedMemoryController.setLRU(&#39;cacheItem&#39;, {user: &#39;Tom&#39;});
  
  // 读对象
  const cacheItem = await sharedMemoryController.getLRU(&#39;cacheItem&#39;);
}

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

更多编程相关知识,请访问:编程视频!!

위 내용은 Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 掘金--FinalZJY에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제