이 기사에서는 여러분과 함께 논의할 것입니다. Node.jsworker_threads 모듈에서 제공하는 멀티스레딩 모델인 멀티 코어 방식을 사용하여 Node.js 멀티 프로세스 모델에서 공유 메모리를 구현하는 방법을 소개합니다.
Node.js 단일 스레드 모델 설계로 인해 노드 프로세스(메인 스레드)는 하나의 CPU 코어만 사용할 수 있습니다. 그러나 오늘날의 시스템은 기본적으로 멀티 코어이므로 심각한 성능 낭비가 발생합니다. 일반적으로 다중 코어를 활용하려는 경우 일반적으로 다음과 같은 방법이 있습니다.
Node용 C++ 플러그인을 작성하여 스레드 풀을 확장하고 CPU 시간이 많이 소요되는 작업을 다른 스레드에 위임합니다. JS 코드.
worker_threads 모듈에서 제공하는 멀티스레딩 모델을 사용하세요(아직 실험적임).
child_process 또는 cluster 모듈에서 제공하는 다중 프로세스 모델을 사용하세요. 각 프로세스는 독립적인 Node.js 프로세스입니다.
사용 편의성, 코드 침입, 안정성 측면에서 볼 때 일반적으로 다중 프로세스 모델이 첫 번째 선택입니다. [추천 학습: "nodejs Tutorial"]
Node.js 클러스터 다중 프로세스 모델의 문제점
클러스터 모듈에서 제공하는 다중 프로세스 모델에서 각 Node 프로세스는 독립적이고 완전합니다. 프로세스 애플리케이션 프로세스는 자체 메모리 공간을 가지며 다른 프로세스에서 액세스할 수 없습니다. 따라서모든 작업자 프로세스는 프로젝트가 시작될 때 일관된 상태와 동작을 가지지만 후속 실행 중에 해당 상태가 일관되게 유지된다는 보장은 없습니다.
예를 들어 프로젝트가 시작되면 프로세스 A와 프로세스 B라는 두 개의 작업자 프로세스가 있습니다. 두 프로세스 모두 변수 a=1을 선언합니다. 그런데 프로젝트가 요청을 받았고, 마스터 프로세스가 처리를 위해 이를 프로세스 A에 할당했습니다. 이 요청으로 a의 값이 2로 변경되었습니다. 이때 프로세스 A의 메모리 공간에서는 a=2이지만 메모리 공간에서는 a=2입니다. 프로세스 B의 메모리 공간. 여전히 1. 이때, a의 값을 읽어 달라는 요청이 있으면 마스터 프로세스가 프로세스 A와 프로세스 B에 요청을 디스패치할 때 읽은 결과가 일치하지 않아 일관성 문제가 발생한다.
클러스터 모듈은 설계 시 솔루션을 제공하지 않았지만 작업자 프로세스가 상태 비저장이어야 했습니다. 즉, 프로그래머는 코드 작성 시 요청을 처리할 때 메모리의 값을 수정하는 것이 허용되어서는 안 됩니다. 모든 작업자 프로세스 일관성. 그러나 실제로는 사용자의 로그인 상태를 기록하는 등 메모리에 기록해야 하는 다양한 상황이 항상 존재합니다. 실제로 이러한 상태 데이터는 데이터베이스, Redis, 메시지 대기열 등 외부에 기록됩니다. 상태 저장 요청이 처리될 때마다 외부 저장 공간을 읽고 씁니다.
이것은 효과적인 접근 방식입니다.그러나 이를 위해서는 추가 외부 저장 공간이 필요하며 동시에 여러 프로세스의 동시 액세스에서 일관성 문제를 처리하고 데이터의 수명 주기를 유지해야 합니다. 자체적으로(Node 프로세스와 유지보수가 외부이기 때문에 데이터가 동기적으로 생성 및 소멸되지 않음), 동시 접속량이 많은 경우(데이터베이스 등 비메모리 환경에 저장되는 경우) IO 성능 병목 현상이 발생함 . 사실 본질적으로 여러 프로세스에서 공유하고 액세스할 수 있는 공간만 있으면 됩니다. 영구 저장소는 필요하지 않습니다. 이 공간의 수명 주기는 노드 프로세스에 가장 잘 연결되어 있으므로 많은 비용을 절약할 수 있습니다. 사용시 번거로움이 적습니다. 따라서 크로스 프로세스 공유 메모리는 이 시나리오에서 사용하기에 가장 적합한 방법이 되었습니다.
Node.js의 공유 메모리
안타깝게도 Node 자체에서는 공유 메모리 구현을 제공하지 않으므로 npm 저장소에서 타사 라이브러리의 구현을 살펴볼 수 있습니다. 이러한 라이브러리 중 일부는 C++ 플러그인을 통해 Node의 기능을 확장하여 구현되고 일부는 Node에서 제공하는 IPC 메커니즘을 통해 구현됩니다. 불행하게도 이들 라이브러리의 구현은 매우 간단하며 상호 배타적인 액세스, 개체 모니터링 및 기타 기능을 제공하지 않습니다. 작성자는 이 공유 메모리를 주의 깊게 유지해야 합니다. 그렇지 않으면 타이밍 문제가 발생할 수 있습니다. 주위를 둘러보았으나 원하는 것을 찾을 수 없었습니다. . . 잊어버리세요. 제가 직접 작성하겠습니다.공유 메모리 설계
우선 어떤 종류의 공유 메모리가 필요한지 명확히 해야 합니다. (프로세스 전반에 걸쳐 액세스되는 상태 데이터를 프로젝트에서 사용하기 위해) 필요에 따라 시작했습니다. 동시에 다양성을 고려하여 다음 사항을 먼저 고려할 것입니다.JS 객체를 읽기 및 쓰기 액세스의 기본 단위로 사용합니다.
한 프로세스가 접근하면 다른 프로세스는 차단됩니다.
공유 메모리의 객체를 모니터링할 수 있으며, 객체가 변경되면 모니터링 프로세스에 알릴 수 있습니다.
위의 조건을 충족한다는 전제하에 구현 방법은 최대한 간단합니다.
실제로 운영 체제 수준에서는 공유 메모리가 필요하지 않다는 것을 알 수 있습니다. 그러면 여러 노드 프로세스가 동일한 개체에 액세스할 수 있으면 다음에서 제공하는 메커니즘에 구현할 수 있습니다. 노드 자체. Master 프로세스의 메모리 공간 중 일부를 공유 메모리 공간으로 사용할 수 있습니다. Worker 프로세스는 IPC를 통해 읽기 및 쓰기 요청을 Master 프로세스에 위임하고, Master 프로세스는 읽고 쓴 후 결과를 반환합니다. IPC를 통한 작업자 프로세스.
마스터 프로세스와 작업자 프로세스에서 공유 메모리를 일관되게 사용하기 위해 공유 메모리의 작업을 인터페이스로 추상화하고 이 인터페이스를 각각 마스터 프로세스와 작업자 프로세스에 구현할 수 있습니다. 클래스 다이어그램은 아래 그림과 같습니다. SharedMemory
클래스가 추상 인터페이스로 사용되며 개체는 server.js
항목 파일에 선언됩니다. 마스터 프로세스에서는 Manager
개체로, 작업자 프로세스에서는 Worker
개체로 인스턴스화됩니다. Manager
개체는 공유 메모리를 유지하고 공유 메모리에 대한 읽기 및 쓰기 요청을 처리하는 반면, Worker
개체는 마스터 프로세스에 읽기 및 쓰기 요청을 보냅니다. SharedMemory
类作为抽象接口,在server.js
入口文件中声明该对象。其在Master进程中实例化为Manager
对象,在Worker进程中实例化为Worker
对象。Manager
对象来维护共享内存,并处理对共享内存的读写请求,而Worker
对象则将读写请求发送到Master进程。
可以使用Manager
类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露get
、set
、remove
等基本操作,避免该属性直接被修改。
由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。
为了使用简单,我们可以将SharedMemory
设计成单例,这样每个进程中就只有一个实例,并可以在import
了SharedMemory
之后直接使用。
代码实现
读写控制与IPC通信
首先实现对外接口SharedMemory
类,这里没有使用让Manager
和Worker
继承SharedMemory
的方式,而是让SharedMemory
在实例化的时候返回一个Manager
或Worker
的实例,从而实现自动选择子类。
在Node 16中
isPrimary
替代了isMaster
,这里为了兼容使用了两种写法。
// shared-memory.js class SharedMemory { constructor() { if (cluster.isMaster || cluster.isPrimary) { return new Manager(); } else { return new Worker(); } } }
Manager
负责管理共享内存空间,我们直接在Manager
对象中增加__sharedMemory__
属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__
之中定义set
、get
、remove
等标准操作来提供访问方式。
我们通过cluster.on('online', callback)
来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)
来监听来自worker进程的IPC通信,并把通信消息交给handle
函数处理。
handle
函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的set
、get
、remove
函数(注意不是__sharedMemory__
中的set
、get
、remove
)进行处理,并将处理后的结果返还给worker进程。
// manager.js const cluster = require('cluster'); class Manager { constructor() { this.__sharedMemory__ = { set(key, value) { this.memory[key] = value; }, get(key) { return this.memory[key]; }, remove(key) { delete this.memory[key]; }, memory: {}, }; // Listen the messages from worker processes. cluster.on('online', (worker) => { worker.on('message', (data) => { this.handle(data, worker); return false; }); }); } handle(data, target) { const args = data.value ? [data.key, data.value] : [data.key]; this[data.method](...args).then((value) => { const msg = { id: data.id, // workerId uuid: data.uuid, // communicationID value, }; target.send(msg); }); } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); resolve('OK'); }); } get(key) { return new Promise((resolve) => { resolve(this.__sharedMemory__.get(key)); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); resolve('OK'); }); } }
Worker
自对象创建开始就使用process.on
监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__
对象的作用一会儿再说。此时Worker
对象便创建完成。
之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Worker
的set
、get
、remove
函数,它们又会调用handle
函数将消息通过process.send
发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__
中。当结果返回时,会被之前在process.on
中的函数监听到,并从__getCallbacks__

Manager
클래스의 속성을 공유 메모리 객체로 사용하여 객체에 접근하는 방법은 일반 JS 객체에 접근하는 방법과 동일하며, 만 노출되도록 캡슐화 계층을 만듭니다. >get
, set
, remove
및 기타 기본 작업을 통해 속성이 직접 수정되는 것을 방지할 수 있습니다. 🎜🎜모든 Worker 프로세스보다 먼저 Master 프로세스가 생성되므로, 각 Worker 프로세스가 생성된 후 즉시 공유 메모리에 접근할 수 있도록 Master 프로세스에서 공유 메모리 공간을 선언한 후 Worker 프로세스를 생성하면 됩니다. 🎜🎜사용 편의성을 위해 SharedMemory
를 싱글톤으로 설계하여 각 프로세스에 인스턴스가 하나만 있고 SharedMemory
를 가져오기
할 수 있습니다. code> code> 바로 뒤에 사용하세요. 🎜🎜코드 구현🎜
🎜🎜읽기 및 쓰기 제어 및 IPC 통신🎜🎜🎜먼저 외부 인터페이스SharedMemory 구현 code> 클래스, 여기서는 <code>Manager
및 Worker
가 SharedMemory
를 상속하도록 하는 방법을 사용하지 않고 SharedMemory를 허용합니다. code>는 인스턴스에서 사용됩니다. 변환되면 <code>Manager
또는 Worker
의 인스턴스가 반환되어 하위 클래스의 자동 선택을 실현합니다. 🎜🎜노드 16isPrimary
는isMaster
를 대체합니다. 여기서는 호환성을 위해 두 가지 쓰기 방법이 사용됩니다. 🎜
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } }🎜
Manager
는 공유 메모리 공간 관리를 담당합니다. __sharedMemory__
속성을 Manager
객체에 직접 추가합니다. 또한 JS 객체는 JS의 가비지 수집 관리에 포함되므로 메모리 정리 및 데이터 마이그레이션과 같은 작업을 수행할 필요가 없으므로 구현이 매우 간단합니다. 그런 다음 set
, get
및 remove
와 같은 표준 작업이 __sharedMemory__
에 정의되어 액세스 방법을 제공합니다. 🎜🎜cluster.on('online', callback)
을 통해 작업자 프로세스의 생성 이벤트를 수신하고 즉시 worker.on('message', callback)
을 사용합니다. 생성 코드 이후> 작업자 프로세스에서 IPC 통신을 모니터링하고 처리를 위해 통신 메시지를 handle
함수에 전달합니다. 🎜🎜handle
함수의 역할은 작업자 프로세스가 어떤 작업을 수행하려는지 구별하고 해당 작업의 매개변수를 꺼내어 해당 set
)에 맡기는 것입니다. >, get
, remove
함수(set
, get
, remove가 아님) code> in <code>__sharedMemory__
) 처리한 결과를 작업자 프로세스에 반환합니다. 🎜// manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... }🎜
Worker
는 process.on
을 사용하여 객체가 생성된 이후 마스터 프로세스의 반환 메시지를 모니터링합니다(결국 메시지가 생성될 때까지 기다릴 수 없습니다). 모니터링하기 전에 전송하면 너무 늦습니다) ). __getCallbacks__
객체의 역할에 대해서는 나중에 다루겠습니다. 이 시점에서 Worker
개체가 생성됩니다. 🎜🎜나중에 프로젝트가 어딘가에서 실행될 때 공유 메모리에 액세스하려면 Worker
의 set
, get
을 호출하고 remove
함수를 사용하면 handle
함수를 호출하여 process.send
를 통해 마스터 프로세스에 메시지를 보냅니다. 결과를 반환할 때 수행할 작업을 가져옵니다. __getCallbacks__
에 문서화되어 있습니다. 결과가 반환되면 process.on
의 이전 함수에서 모니터링하고 해당 콜백 함수를 __getCallbacks__
에서 꺼내어 실행합니다. 🎜因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } }
一次共享内存访问的完整流程是:调用Worker
的set
/get
/remove
函数 -> 调用Worker
的handle
函数,向master进程通信并将回调函数记录在__getCallbacks__
-> master进程监听到来自worker进程的消息 -> 调用Manager
的handle
函数 -> 调用Manager
的set
/get
/remove
函数 -> 调用__sharedMemory__
的set
/get
/remove
函数 -> 操作完成返回Manager
的set
/get
/remove
函数 -> 操作完成返回handle
函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__
中取出回调函数并执行。
互斥访问
到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:
时间 | 进程A | 进程B | 共享内存中变量x的值 |
---|---|---|---|
t0 | 0 | ||
t1 | 读取x(x=0) | 0 | |
t2 | x1=x+1(x1=1) | 读取x(x=0) | 0 |
t3 | 将x1的值写回x | x2=x+1(x2=1) | 1 |
t4 | 将x2的值写回x | 1 |
进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。
在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。
为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。
在Manager
的__sharedMemory__
中加入locks
属性,用来记录哪个对象的锁被拿走了,lockRequestQueues
属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock
函数和releaseLock
函数,用来申请和归还锁,以及handleLockRequest
函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues
队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest
检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks
中对应的记录删掉,然后再调用handleLockRequest
让队首的任务获得锁。
// manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... }
在Worker
中,则是增加getLock
和releaseLock
两个函数,行为与get
、set
类似,都是调用handle
函数。
// worker.js class Worker { getLock(key) { return new Promise((resolve) => { this.handle('getLock', key, null, (value) => { resolve(value); }); }); } releaseLock(key, lockId) { return new Promise((resolve) => { this.handle('releaseLock', key, lockId, (value) => { resolve(value); }); }); } ... }
监听对象
有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set
属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager
的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。
为此,我们先在__sharedMemory__
中加入listeners
属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen
函数,其将监听回调函数记录到__sharedMemory__.listeners
中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在set
和remove
函数返回前调用notifyListener
,将所有记录在__sharedMemory__.listeners
中监听该对象的所有函数取出并调用。
// manager.js class Manager { constructor() { this.__sharedMemory__ = { ... listeners: {}, }; } handle(data, target) { if (data.method === 'listen') { this.listen(data.key, (value) => { const msg = { isNotified: true, id: data.id, uuid: data.uuid, value, }; target.send(msg); }); } else { ... } } notifyListener(key) { const listeners = this.__sharedMemory__.listeners[key]; if (listeners?.length > 0) { Promise.all( listeners.map( (callback) => new Promise((resolve) => { callback(this.__sharedMemory__.get(key)); resolve(); }) ) ); } } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); this.notifyListener(key); resolve('OK'); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); this.notifyListener(key); resolve('OK'); }); } listen(key, callback) { if (typeof callback === 'function') { this.__sharedMemory__.listeners[key] = this.__sharedMemory__.listeners[key] ?? []; this.__sharedMemory__.listeners[key].push(callback); } else { throw new Error('a listener must have a callback.'); } } ... }
在Worker
中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__
属性用来记录监听操作的回调函数,与__getCallbacks__
不同,它里面的函数在收到master的回信之后不会删除。
// worker.js class Worker { constructor() { ... this.__getListenerCallbacks__ = {}; process.on('message', (data) => { if (data.isNotified) { const callback = this.__getListenerCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } } else { ... } }); } handle(method, key, value, callback) { ... if (method === 'listen') { this.__getListenerCallbacks__[uuid] = callback; } else { this.__getCallbacks__[uuid] = callback; } } listen(key, callback) { if (typeof callback === 'function') { this.handle('listen', key, null, callback); } else { throw new Error('a listener must have a callback.'); } } ... }
LRU缓存
有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager
再增加一个共享内存__sharedLRUMemory__
,其为一个lru-cache
实例,并增加getLRU
、setLRU
、removeLRU
函数,与set
、get
、remove
函数类似。
// manager.js const LRU = require('lru-cache'); class Manager { constructor() { ... this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 }; this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions); } getLRU(key) { return new Promise((resolve) => { resolve(this.__sharedLRUMemory__.get(key)); }); } setLRU(key, value) { return new Promise((resolve) => { this.__sharedLRUMemory__.set(key, value); resolve('OK'); }); } removeLRU(key) { return new Promise((resolve) => { this.__sharedLRUMemory__.del(key); resolve('OK'); }); } ... }
Worker
中也增加getLRU
、setLRU
、removeLRU
函数。
// worker.js class Worker { getLRU(key) { return new Promise((resolve) => { this.handle('getLRU', key, null, (value) => { resolve(value); }); }); } setLRU(key, value) { return new Promise((resolve) => { this.handle('setLRU', key, value, () => { resolve(); }); }); } removeLRU(key) { return new Promise((resolve) => { this.handle('removeLRU', key, null, () => { resolve(); }); }); } ... }
共享内存的使用方式
目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库,欢迎pull request和报bug),可以直接通过npm安装:
npm i cluster-shared-memory
下面的示例包含了基本使用方法:
const cluster = require('cluster'); // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象 require('cluster-shared-memory'); if (cluster.isMaster) { // 在 master 进程中 fork 子进程 for (let i = 0; i < 2; i++) { cluster.fork(); } } else { const sharedMemoryController = require('./src/shared-memory'); const obj = { name: 'Tom', age: 10, }; // 写对象 await sharedMemoryController.set('myObj', obj); // 读对象 const myObj = await sharedMemoryController.get('myObj'); // 互斥访问对象,首先获得对象的锁 const lockId = await sharedMemoryController.getLock('myObj'); const newObj = await sharedMemoryController.get('myObj'); newObj.age = newObj.age + 1; await sharedMemoryController.set('myObj', newObj); // 操作完之后释放锁 await sharedMemoryController.releaseLock('requestTimes', lockId); // 或者使用 mutex 函数自动获取和释放锁 await sharedMemoryController.mutex('myObj', async () => { const newObjM = await sharedMemoryController.get('myObj'); newObjM.age = newObjM.age + 1; await sharedMemoryController.set('myObj', newObjM); }); // 监听对象 sharedMemoryController.listen('myObj', (value) => { console.log(`myObj: ${value}`); }); //写LRU缓存 await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'}); // 读对象 const cacheItem = await sharedMemoryController.getLRU('cacheItem'); }
缺点
这种实现目前尚有几个缺点:
不能使用PM2的自动创建worker进程的功能。
由于PM2会使用自己的
cluster
模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。
传输的对象必须可序列化,且不能太大。
如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。
原文地址:https://juejin.cn/post/6992091006220894215
作者:FinalZJY
更多编程相关知识,请访问:编程视频!!
위 내용은 Node.js 다중 프로세스 모델에서 공유 메모리를 구현하는 방법에 대한 간략한 설명(자세한 코드 설명)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

Vercel是什么?本篇文章带大家了解一下Vercel,并介绍一下在Vercel中部署 Node 服务的方法,希望对大家有所帮助!

gm是基于node.js的图片处理插件,它封装了图片处理工具GraphicsMagick(GM)和ImageMagick(IM),可使用spawn的方式调用。gm插件不是node默认安装的,需执行“npm install gm -S”进行安装才可使用。

本篇文章带大家详解package.json和package-lock.json文件,希望对大家有所帮助!

本篇文章给大家分享一个Nodejs web框架:Fastify,简单介绍一下Fastify支持的特性、Fastify支持的插件以及Fastify的使用方法,希望对大家有所帮助!

如何用pkg打包nodejs可执行文件?下面本篇文章给大家介绍一下使用pkg将Node.js项目打包为可执行文件的方法,希望对大家有所帮助!

node怎么爬取数据?下面本篇文章给大家分享一个node爬虫实例,聊聊利用node抓取小说章节的方法,希望对大家有所帮助!

本篇文章给大家分享一个Node实战,介绍一下使用Node.js和adb怎么开发一个手机备份小工具,希望对大家有所帮助!

先介绍node.js的安装,再介绍使用node.js构建一个简单的web服务器,最后通过一个简单的示例,演示网页与服务器之间的数据交互的实现。


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

MinGW - Windows용 미니멀리스트 GNU
이 프로젝트는 osdn.net/projects/mingw로 마이그레이션되는 중입니다. 계속해서 그곳에서 우리를 팔로우할 수 있습니다. MinGW: GCC(GNU Compiler Collection)의 기본 Windows 포트로, 기본 Windows 애플리케이션을 구축하기 위한 무료 배포 가능 가져오기 라이브러리 및 헤더 파일로 C99 기능을 지원하는 MSVC 런타임에 대한 확장이 포함되어 있습니다. 모든 MinGW 소프트웨어는 64비트 Windows 플랫폼에서 실행될 수 있습니다.

안전한 시험 브라우저
안전한 시험 브라우저는 온라인 시험을 안전하게 치르기 위한 보안 브라우저 환경입니다. 이 소프트웨어는 모든 컴퓨터를 안전한 워크스테이션으로 바꿔줍니다. 이는 모든 유틸리티에 대한 액세스를 제어하고 학생들이 승인되지 않은 리소스를 사용하는 것을 방지합니다.

Eclipse용 SAP NetWeaver 서버 어댑터
Eclipse를 SAP NetWeaver 애플리케이션 서버와 통합합니다.

SublimeText3 영어 버전
권장 사항: Win 버전, 코드 프롬프트 지원!

mPDF
mPDF는 UTF-8로 인코딩된 HTML에서 PDF 파일을 생성할 수 있는 PHP 라이브러리입니다. 원저자인 Ian Back은 자신의 웹 사이트에서 "즉시" PDF 파일을 출력하고 다양한 언어를 처리하기 위해 mPDF를 작성했습니다. HTML2FPDF와 같은 원본 스크립트보다 유니코드 글꼴을 사용할 때 속도가 느리고 더 큰 파일을 생성하지만 CSS 스타일 등을 지원하고 많은 개선 사항이 있습니다. RTL(아랍어, 히브리어), CJK(중국어, 일본어, 한국어)를 포함한 거의 모든 언어를 지원합니다. 중첩된 블록 수준 요소(예: P, DIV)를 지원합니다.

뜨거운 주제



