Heim  >  Artikel  >  Web-Frontend  >  Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)

Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)

青灯夜游
青灯夜游nach vorne
2021-08-05 10:26:034281Durchsuche

In diesem Artikel besprechen wir mit Ihnen Node.jsAnhand der Multi-Core-Methode – dem vom Modul worker_threads bereitgestellten Multithreading-Modell – stellen wir die Methode zur Implementierung von Shared Memory im Node.js-Multiprozessmodell vor.

Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)

Node.js Aufgrund seines Single-Thread-Modelldesigns kann ein Node-Prozess (der Haupt-Thread) nur einen CPU-Kern nutzen, was zu erheblichen Leistungsverlusten führt. Wenn Sie mehrere Kerne nutzen möchten, stehen Ihnen im Allgemeinen die folgenden Methoden zur Verfügung:

  • Schreiben Sie ein C++-Plugin für Node, um den Thread-Pool zu erweitern und CPU-zeitaufwändige Aufgaben an andere Threads im zu delegieren JS-Code.

  • Verwenden Sie das Multithreading-Modell, das vom Modul worker_threads bereitgestellt wird (noch experimentell).

  • Verwenden Sie das vom Modul child_process oder cluster bereitgestellte Multiprozessmodell. Jeder Prozess ist ein unabhängiger Node.js-Prozess.

Aus Sicht der Benutzerfreundlichkeit, des Codeeinbruchs und der Stabilität ist das Multiprozessmodell normalerweise die erste Wahl. [Empfohlenes Lernen: „nodejs Tutorial“]

Probleme mit dem Node.js-Cluster-Multiprozessmodell

In dem vom Clustermodul bereitgestellten Multiprozessmodell ist jeder Node-Prozess unabhängig und vollständig Prozess Der Anwendungsprozess verfügt über einen eigenen Speicherbereich und kann von anderen Prozessen nicht aufgerufen werden. DaherObwohl alle Worker-Prozesse beim Start des Projekts einen konsistenten Status und ein einheitliches Verhalten aufweisen, gibt es keine Garantie dafür, dass ihr Status bei nachfolgenden Ausführungen konsistent bleibt.

Wenn das Projekt beispielsweise gestartet wird, gibt es zwei Worker-Prozesse, Prozess A und Prozess B. Beide Prozesse deklarieren die Variable a=1. Aber dann erhielt das Projekt eine Anfrage und der Master-Prozess wies sie Prozess A zur Verarbeitung zu. Diese Anfrage änderte den Wert von a auf 2. Zu diesem Zeitpunkt war a = 2 im Speicherbereich von Prozess A, aber a = 2 in der Speicherplatz von Prozess B. Noch 1. Wenn zu diesem Zeitpunkt eine Anforderung zum Lesen des Werts von a vorliegt, sind die Ergebnisse, die gelesen werden, wenn der Master-Prozess die Anforderung an Prozess A und Prozess B sendet, inkonsistent, was zu einem Konsistenzproblem führt.

Das Cluster-Modul bot beim Entwurf keine Lösung, erforderte jedoch, dass der Worker-Prozess zustandslos ist. Das heißt, Programmierer sollten nicht berechtigt sein, den Wert im Speicher zu ändern, wenn sie beim Schreiben von Code Anforderungen verarbeiten, um alle Worker zu schützen Prozesskonsistenz. In der Praxis gibt es jedoch immer wieder Situationen, in denen das Schreiben in den Speicher erforderlich ist, z. B. das Aufzeichnen des Anmeldestatus des Benutzers. In der Praxis werden diese Statusdaten normalerweise extern aufgezeichnet, z. B. in Datenbanken, Redis und Nachrichtenwarteschlangen. Dateisystem usw. , jedes Mal, wenn eine zustandsbehaftete Anfrage verarbeitet wird, wird der externe Speicherplatz gelesen und geschrieben.

Dies ist ein effektiver Ansatz.

Er erfordert jedoch die Einführung eines zusätzlichen externen Speicherplatzes und muss gleichzeitig das Konsistenzproblem bei gleichzeitigem Zugriff durch mehrere Prozesse lösen und den Lebenszyklus der Daten aufrechterhalten selbst (da der Knotenprozess und die Wartung extern sind. Die Daten werden nicht synchron erstellt und zerstört) und es gibt einen E/A-Leistungsengpass bei hohem gleichzeitigem Zugriff (wenn sie in einer Nicht-Speicherumgebung wie einer Datenbank gespeichert sind). . Tatsächlich benötigen wir im Wesentlichen nur einen Raum, auf den mehrere Prozesse zugreifen können. Wir benötigen keinen dauerhaften Speicher. Der Lebenszyklus dieses Raums ist am besten stark an den Knotenprozess gebunden, sodass wir viel sparen können Weniger Zeitaufwand bei der Nutzung. Daher ist prozessübergreifender gemeinsam genutzter Speicher die am besten geeignete Methode für den Einsatz in diesem Szenario.

Gemeinsamer Speicher von Node.js

Leider bietet Node selbst keine Implementierung von Shared Memory, sodass wir uns die Implementierung von Bibliotheken von Drittanbietern im npm-Repository ansehen können. Einige dieser Bibliotheken werden durch die Erweiterung der Node-Funktionen durch C++-Plug-Ins implementiert, andere werden durch den von Node bereitgestellten IPC-Mechanismus implementiert. Leider sind ihre Implementierungen sehr einfach und bieten keinen sich gegenseitig ausschließenden Zugriff, Objektüberwachung und andere Funktionen macht die Verwendung Der Autor muss diesen gemeinsamen Speicher selbst sorgfältig pflegen, da es sonst zu Zeitproblemen kommt.

Ich habe mich umgesehen und konnte nicht finden, was ich wollte. . . Vergiss es, ich schreibe selbst eins.

Design des Shared Memory

Zunächst müssen wir klären, welche Art von Shared Memory ich basierend auf meinen eigenen Bedürfnissen benötige (um es im Projekt zum Speichern von Statusdaten zu verwenden, auf die prozessübergreifend zugegriffen wird). und gleichzeitig In Anbetracht der Vielseitigkeit werden wir zunächst die folgenden Punkte berücksichtigen:

  • Verwenden Sie JS-Objekte als Grundeinheit für Lese- und Schreibzugriff.

  • Kann den Zugriff zwischen Prozessen gegenseitig ausschließen. Wenn ein Prozess zugreift, werden andere Prozesse blockiert.

  • kann Objekte im gemeinsam genutzten Speicher überwachen und der Überwachungsprozess kann benachrichtigt werden, wenn sich das Objekt ändert.

  • Unter der Voraussetzung, dass die oben genannten Bedingungen erfüllt sind, ist die Implementierungsmethode so einfach wie möglich.

Es kann festgestellt werden, dass wir auf Betriebssystemebene tatsächlich keinen gemeinsamen Speicher benötigen. Wir müssen nur in der Lage sein, dass mehrere Knotenprozesse auf dasselbe Objekt zugreifen. Dann können wir es auf dem von bereitgestellten Mechanismus implementieren Knoten selbst. Sie können einen Teil des Speicherbereichs des Master-Prozesses als gemeinsam genutzten Speicherbereich verwenden. Der Worker-Prozess delegiert Lese- und Schreibanforderungen über IPC an den Master-Prozess, und der Master-Prozess liest und schreibt und gibt die Ergebnisse dann an den Master-Prozess zurück Arbeitsprozess durch IPC.

Um die Verwendung des gemeinsam genutzten Speichers im Master-Prozess und im Worker-Prozess konsistent zu machen, können wir die Operation des gemeinsam genutzten Speichers in eine Schnittstelle abstrahieren und diese Schnittstelle jeweils im Master-Prozess und im Worker-Prozess implementieren. Das Klassendiagramm ist in der folgenden Abbildung dargestellt. Als abstrakte Schnittstelle wird eine Klasse SharedMemory verwendet und das Objekt wird in der Eintragsdatei server.js deklariert. Es wird als Manager-Objekt im Master-Prozess und als Worker-Objekt im Worker-Prozess instanziiert. Das Manager-Objekt verwaltet den gemeinsam genutzten Speicher und verarbeitet Lese- und Schreibanforderungen an den gemeinsam genutzten Speicher, während das Worker-Objekt Lese- und Schreibanforderungen an den Master-Prozess sendet. SharedMemory类作为抽象接口,在server.js入口文件中声明该对象。其在Master进程中实例化为Manager对象,在Worker进程中实例化为Worker对象。Manager对象来维护共享内存,并处理对共享内存的读写请求,而Worker对象则将读写请求发送到Master进程。

Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)

可以使用Manager类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露getsetremove等基本操作,避免该属性直接被修改。

由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

为了使用简单,我们可以将SharedMemory设计成单例,这样每个进程中就只有一个实例,并可以在importSharedMemory之后直接使用。

代码实现

读写控制与IPC通信

首先实现对外接口SharedMemory类,这里没有使用让ManagerWorker继承SharedMemory的方式,而是让SharedMemory在实例化的时候返回一个ManagerWorker的实例,从而实现自动选择子类。

在Node 16中isPrimary替代了isMaster,这里为了兼容使用了两种写法。

// shared-memory.js
class SharedMemory {
  constructor() {
    if (cluster.isMaster || cluster.isPrimary) {
      return new Manager();
    } else {
      return new Worker();
    }
  }
}

Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

// manager.js
const cluster = require('cluster');

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      set(key, value) {
        this.memory[key] = value;
      },
      get(key) {
        return this.memory[key];
      },
      remove(key) {
        delete this.memory[key];
      },
      memory: {},
    };

    // Listen the messages from worker processes.
    cluster.on('online', (worker) => {
      worker.on('message', (data) => {
        this.handle(data, worker);
        return false;
      });
    });
  }

  handle(data, target) {
    const args = data.value ? [data.key, data.value] : [data.key];
    this[data.method](...args).then((value) => {
      const msg = {
        id: data.id, // workerId
        uuid: data.uuid, // communicationID
        value,
      };
      target.send(msg);
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      resolve('OK');
    });
  }

  get(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedMemory__.get(key));
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      resolve('OK');
    });
  }
}

Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__

Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)🎜🎜Ja Verwenden Sie ein Attribut in der Klasse Manager als Shared-Memory-Objekt. Der Zugriff auf das Objekt ist derselbe wie beim Zugriff auf normale JS-Objekte und erstellt dann eine Kapselungsebene, um nur get , set, remove und andere grundlegende Operationen, um zu verhindern, dass das Attribut direkt geändert wird. 🎜🎜Da der Master-Prozess vor allen Worker-Prozessen erstellt wird, können Sie den Worker-Prozess erstellen, nachdem Sie den gemeinsam genutzten Speicherbereich im Master-Prozess deklariert haben, um sicherzustellen, dass jeder Worker-Prozess unmittelbar nach seiner Erstellung auf den gemeinsam genutzten Speicher zugreifen kann. 🎜🎜Zur Vereinfachung der Verwendung können wir SharedMemory als Singleton entwerfen, sodass es in jedem Prozess nur eine Instanz gibt und SharedMemory import sein kann code> Verwenden Sie es direkt nach code>. 🎜<h2>🎜Code-Implementierung🎜</h2>🎜🎜<span style="max-width:90%">Lese- und Schreibsteuerung und IPC-Kommunikation</span>🎜🎜🎜Implementieren Sie zunächst die externe Schnittstelle <code>SharedMemory code>-Klasse verwenden wir hier nicht die Methode, <code>Manager und Worker SharedMemory erben zu lassen, sondern lassen SharedMemory code> in der Instanz verwendet werden. Bei der Transformation wird eine Instanz von <code>Manager oder Worker zurückgegeben, um die automatische Auswahl von Unterklassen zu realisieren. 🎜
🎜In Knoten 16isPrimary ersetzt isMaster Aus Kompatibilitätsgründen werden hier zwei Schreibmethoden verwendet. 🎜
// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}
🎜Manager ist für die Verwaltung des gemeinsam genutzten Speicherbereichs verantwortlich. Wir fügen das Attribut __sharedMemory__ direkt zum Objekt Manager hinzu Es ist auch so, dass JS-Objekte in die Garbage-Collection-Verwaltung von JS einbezogen werden, sodass wir keine Vorgänge wie Speicherbereinigung und Datenmigration durchführen müssen, was die Implementierung sehr einfach macht. Anschließend werden Standardoperationen wie set, get und remove in __sharedMemory__ definiert, um Zugriffsmethoden bereitzustellen. 🎜🎜Wir hören das Erstellungsereignis des Arbeitsprozesses über cluster.on('online', callback) ab und verwenden sofort worker.on('message', callback)nach der Erstellung Code>, um die IPC-Kommunikation vom Arbeitsprozess zu überwachen und die Kommunikationsnachricht zur Verarbeitung an die Funktion handle zu übergeben. 🎜🎜Die Aufgabe der Funktion handle besteht darin, zu unterscheiden, welche Art von Operation der Arbeitsprozess ausführen möchte, und die Parameter der Operation herauszunehmen und sie dem entsprechenden setanzuvertrauen >, get , remove Funktion (beachten Sie, dass es nicht set, get, remove ist code> in <code>__sharedMemory__) Verarbeiten Sie die verarbeiteten Ergebnisse und geben Sie sie an den Arbeitsprozess zurück. 🎜
// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}
🎜Worker verwendet process.on, um die Rückgabenachricht vom Master-Prozess seit der Erstellung des Objekts zu überwachen (schließlich können Sie es kaum erwarten, bis die Nachricht eintrifft gesendet, bevor es überwacht wird, dann ist es zu spät) ). Was die Rolle des __getCallbacks__-Objekts betrifft, werden wir später darüber sprechen. An diesem Punkt wird das Worker-Objekt erstellt. 🎜🎜Wenn das Projekt später irgendwo ausgeführt wird und Sie auf den gemeinsamen Speicher zugreifen möchten, rufen Sie Workers set, get usw. auf Mit der Funktion remove rufen sie die Funktion handle auf, um die Nachricht über process.send an den Masterprozess zu senden Holen Sie sich die Operation, die bei der Rückgabe des Ergebnisses ausgeführt werden soll. Dokumentiert in __getCallbacks__. Wenn das Ergebnis zurückgegeben wird, wird es von der vorherigen Funktion in process.on überwacht und die entsprechende Rückruffunktion wird aus __getCallbacks__ entnommen und ausgeführt. 🎜

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间 进程A 进程B 共享内存中变量x的值
t0

0
t1 读取x(x=0)
0
t2 x1=x+1(x1=1) 读取x(x=0) 0
t3 将x1的值写回x x2=x+1(x2=1) 1
t4
将x2的值写回x 1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.js
class Worker {
  getLock(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLock&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      this.handle(&#39;releaseLock&#39;, key, lockId, (value) => {
        resolve(value);
      });
    });
  }
  ...
}

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

Eine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung)

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.js
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      listeners: {},
    };
  }

  handle(data, target) {
    if (data.method === &#39;listen&#39;) {
      this.listen(data.key, (value) => {
        const msg = {
          isNotified: true,
          id: data.id,
          uuid: data.uuid,
          value,
        };
        target.send(msg);
      });
    } else {
      ...
    }
  }

  notifyListener(key) {
    const listeners = this.__sharedMemory__.listeners[key];
    if (listeners?.length > 0) {
      Promise.all(
        listeners.map(
          (callback) =>
            new Promise((resolve) => {
              callback(this.__sharedMemory__.get(key));
              resolve();
            })
        )
      );
    }
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.__sharedMemory__.listeners[key] =
        this.__sharedMemory__.listeners[key] ?? [];
      this.__sharedMemory__.listeners[key].push(callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.js
class Worker {
  constructor() {
    ...
    this.__getListenerCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      if (data.isNotified) {
        const callback = this.__getListenerCallbacks__[data.uuid];
        if (callback && typeof callback === &#39;function&#39;) {
          callback(data.value);
        }
      } else {
        ...
      }
    });
  }

  handle(method, key, value, callback) {
    ...
    if (method === &#39;listen&#39;) {
      this.__getListenerCallbacks__[uuid] = callback;
    } else {
      this.__getCallbacks__[uuid] = callback;
    }
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.handle(&#39;listen&#39;, key, null, callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.js
const LRU = require(&#39;lru-cache&#39;);

class Manager {
  constructor() {
    ...
    this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };
    this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
  }

  getLRU(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedLRUMemory__.get(key));
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.set(key, value);
      resolve(&#39;OK&#39;);
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.del(key);
      resolve(&#39;OK&#39;);
    });
  }
  ...
}

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.js
class Worker {
  getLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLRU&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;setLRU&#39;, key, value, () => {
        resolve();
      });
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;removeLRU&#39;, key, null, () => {
        resolve();
      });
    });
  }
  ...
}

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require(&#39;cluster&#39;);
// 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象
require(&#39;cluster-shared-memory&#39;);

if (cluster.isMaster) {
  // 在 master 进程中 fork 子进程
  for (let i = 0; i < 2; i++) {
    cluster.fork();
  }
} else {
  const sharedMemoryController = require(&#39;./src/shared-memory&#39;);
  const obj = {
    name: &#39;Tom&#39;,
    age: 10,
  };
  
  // 写对象
  await sharedMemoryController.set(&#39;myObj&#39;, obj);
  
  // 读对象
  const myObj = await sharedMemoryController.get(&#39;myObj&#39;);
  
  // 互斥访问对象,首先获得对象的锁
  const lockId = await sharedMemoryController.getLock(&#39;myObj&#39;);
  const newObj = await sharedMemoryController.get(&#39;myObj&#39;);
  newObj.age = newObj.age + 1;
  await sharedMemoryController.set(&#39;myObj&#39;, newObj);
  // 操作完之后释放锁
  await sharedMemoryController.releaseLock(&#39;requestTimes&#39;, lockId);
  
  // 或者使用 mutex 函数自动获取和释放锁
  await sharedMemoryController.mutex(&#39;myObj&#39;, async () => {
    const newObjM = await sharedMemoryController.get(&#39;myObj&#39;);
    newObjM.age = newObjM.age + 1;
    await sharedMemoryController.set(&#39;myObj&#39;, newObjM);
  });
  
  // 监听对象
  sharedMemoryController.listen(&#39;myObj&#39;, (value) => {
    console.log(`myObj: ${value}`);
  });
  
  //写LRU缓存
  await sharedMemoryController.setLRU(&#39;cacheItem&#39;, {user: &#39;Tom&#39;});
  
  // 读对象
  const cacheItem = await sharedMemoryController.getLRU(&#39;cacheItem&#39;);
}

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

更多编程相关知识,请访问:编程视频!!

Das obige ist der detaillierte Inhalt vonEine kurze Diskussion darüber, wie Shared Memory im Node.js-Mehrprozessmodell implementiert wird (detaillierte Codeerklärung). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:掘金--FinalZJY. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen