Maison  >  Article  >  interface Web  >  Une brève discussion sur la façon d'implémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)

Une brève discussion sur la façon d'implémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)

青灯夜游
青灯夜游avant
2021-08-05 10:26:034390parcourir

Cet article discutera avec vous de Node.jsEn utilisant la méthode multi-core - le modèle multi-threading fourni par le module worker_threads, nous présenterons la méthode d'implémentation de la mémoire partagée dans le modèle multi-processus Node.js.

Une brève discussion sur la façon d'implémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)

Node.js En raison de sa conception de modèle à thread unique, un processus Node (le thread principal) ne peut utiliser qu'un seul cœur de processeur. Cependant, les machines d'aujourd'hui sont fondamentalement multicœurs, ce qui entraîne un sérieux gaspillage de performances. De manière générale, si vous souhaitez profiter de plusieurs cœurs, vous disposez généralement des méthodes suivantes :

  • Écrivez un plug-in C++ pour Node afin d'étendre le pool de threads et déléguez les tâches gourmandes en temps CPU à d'autres threads du Code JS.

  • Utilisez le modèle multi-threading fourni par le module worker_threads (encore expérimental).

  • Utilisez le modèle multi-processus fourni par le module child_process ou cluster Chaque processus est un processus Node.js indépendant.

Du point de vue de la facilité d'utilisation, de l'intrusion du code et de la stabilité, le modèle multi-processus est généralement le premier choix. [Apprentissage recommandé : "Tutoriel Nodejs"]

Problèmes avec le modèle multi-processus du cluster Node.js

Dans le modèle multi-processus fourni par le module cluster, chaque processus Node est un processus indépendant et complet processus Le processus de candidature possède son propre espace mémoire et n'est pas accessible aux autres processus. Par conséquentBien que tous les processus Worker aient un statut et un comportement cohérents au démarrage du projet, il n'y a aucune garantie que leur statut restera cohérent lors des exécutions ultérieures.

Par exemple, lorsque le projet est démarré, il existe deux processus Worker, le processus A et le processus B. Les deux processus déclarent la variable a=1. Mais ensuite, le projet a reçu une requête et le processus maître l'a assignée au processus A pour le traitement. Cette requête a changé la valeur de a en 2. A ce moment, a=2 dans l'espace mémoire du processus A, mais a=2 dans. l'espace mémoire du processus B. Toujours 1. A ce moment, s'il y a une demande de lecture de la valeur de a, les résultats lus lorsque le processus maître envoie la demande au processus A et au processus B sont incohérents, ce qui provoque un problème de cohérence.

Le module cluster n'a pas fourni de solution lors de la conception, mais exigeait que le processus Worker soit sans état, c'est-à-dire que les programmeurs ne devaient pas être autorisés à modifier les valeurs en mémoire lors du traitement des requêtes lors de l'écriture de code, afin de protéger cohérence du processus de tous les travailleurs. Cependant, dans la pratique, il existe toujours diverses situations qui nécessitent l'écriture en mémoire, comme l'enregistrement de l'état de connexion de l'utilisateur. Dans la pratique de nombreuses entreprises, ces données d'état sont généralement enregistrées en externe, comme les bases de données, les redis et les files d'attente de messages. système de fichiers, etc. , chaque fois qu'une requête avec état est traitée, l'espace de stockage externe sera lu et écrit.

C'est une approche efficace,

Cependant, cela nécessite l'introduction d'un espace de stockage externe supplémentaire, et en même temps, il doit gérer le problème de cohérence sous accès simultané par plusieurs processus, et maintenir le cycle de vie des données en lui-même (car le processus et la maintenance du nœud sont externes, les données ne sont pas créées et détruites de manière synchrone), et il existe un goulot d'étranglement des performances d'E/S en cas d'accès simultané élevé (s'il est stocké dans un environnement sans mémoire tel qu'une base de données) . En fait, nous avons simplement besoin d'un espace qui peut être partagé et accessible par plusieurs processus. Nous n'avons pas besoin de stockage persistant. Il est préférable que le cycle de vie de cet espace soit fortement lié au processus Node, afin que nous puissions économiser beaucoup. de temps lors de son utilisation. Par conséquent, la mémoire partagée entre processus est devenue la méthode la plus appropriée à utiliser dans ce scénario.

Mémoire partagée de Node.js

Malheureusement, Node lui-même ne fournit pas d'implémentation de mémoire partagée, nous pouvons donc examiner l'implémentation de bibliothèques tierces dans le référentiel npm. Certaines de ces bibliothèques sont implémentées en étendant les fonctions de Node via des plug-ins C++, et d'autres sont implémentées via le mécanisme IPC fourni par Node. Malheureusement, leurs implémentations sont très simples et ne fournissent pas d'accès, de surveillance d'objets et d'autres fonctions mutuellement exclusives, ce qui n'est pas le cas. fait en utilisant L'auteur doit soigneusement maintenir cette mémoire partagée, sinon cela entraînera des problèmes de timing.

J’ai regardé autour de moi et je n’ai pas trouvé ce que je voulais. . . Oubliez ça, j'en écrirai un moi-même.

Conception de la mémoire partagée

Tout d'abord, nous devons clarifier quel type de mémoire partagée est nécessaire. J'ai commencé en fonction de mes propres besoins (afin de l'utiliser dans le projet pour stocker les données d'état accessibles à travers les processus), et en même temps Compte tenu de la polyvalence, nous considérerons d'abord les points suivants :

  • Utiliser les objets JS comme unité de base pour l'accès en lecture et en écriture.

  • Capable d'accéder mutuellement exclusif entre les processus. Lorsqu'un processus accède, les autres processus sont bloqués.

  • peut surveiller les objets dans la mémoire partagée et le processus de surveillance peut être averti lorsque l'objet change.

  • Sous réserve de remplir les conditions ci-dessus, la méthode de mise en œuvre est aussi simple que possible.

On constate qu'en fait nous n'avons pas besoin de mémoire partagée au niveau du système d'exploitation, nous avons seulement besoin de pouvoir faire en sorte que plusieurs processus Node accèdent au même objet. Ensuite, nous pouvons l'implémenter sur le mécanisme fourni par. Noeud lui-même. Vous pouvez utiliser une section de l'espace mémoire du processus Master comme espace mémoire partagé. Le processus Worker délègue les requêtes de lecture et d'écriture au processus Master via IPC, et le processus Master lit et écrit, puis renvoie les résultats au processus Master. Processus de travail via IPC.

Afin de rendre cohérente l'utilisation de la mémoire partagée dans le processus Master et le processus Worker, nous pouvons résumer le fonctionnement de la mémoire partagée dans une interface, et implémenter cette interface dans le processus Master et le processus Worker respectivement. Le diagramme de classes est présenté dans la figure ci-dessous. Une classe SharedMemory est utilisée comme interface abstraite et l'objet est déclaré dans le fichier d'entrée server.js. Il est instancié en tant qu'objet Manager dans le processus Master et en tant qu'objet Worker dans le processus Worker. L'objet Manager maintient la mémoire partagée et gère les requêtes de lecture et d'écriture vers la mémoire partagée, tandis que l'objet Worker envoie les requêtes de lecture et d'écriture au processus maître. SharedMemory类作为抽象接口,在server.js入口文件中声明该对象。其在Master进程中实例化为Manager对象,在Worker进程中实例化为Worker对象。Manager对象来维护共享内存,并处理对共享内存的读写请求,而Worker对象则将读写请求发送到Master进程。

Une brève discussion sur la façon dimplémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)

可以使用Manager类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露getsetremove等基本操作,避免该属性直接被修改。

由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

为了使用简单,我们可以将SharedMemory设计成单例,这样每个进程中就只有一个实例,并可以在importSharedMemory之后直接使用。

代码实现

读写控制与IPC通信

首先实现对外接口SharedMemory类,这里没有使用让ManagerWorker继承SharedMemory的方式,而是让SharedMemory在实例化的时候返回一个ManagerWorker的实例,从而实现自动选择子类。

在Node 16中isPrimary替代了isMaster,这里为了兼容使用了两种写法。

// shared-memory.js
class SharedMemory {
  constructor() {
    if (cluster.isMaster || cluster.isPrimary) {
      return new Manager();
    } else {
      return new Worker();
    }
  }
}

Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

// manager.js
const cluster = require('cluster');

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      set(key, value) {
        this.memory[key] = value;
      },
      get(key) {
        return this.memory[key];
      },
      remove(key) {
        delete this.memory[key];
      },
      memory: {},
    };

    // Listen the messages from worker processes.
    cluster.on('online', (worker) => {
      worker.on('message', (data) => {
        this.handle(data, worker);
        return false;
      });
    });
  }

  handle(data, target) {
    const args = data.value ? [data.key, data.value] : [data.key];
    this[data.method](...args).then((value) => {
      const msg = {
        id: data.id, // workerId
        uuid: data.uuid, // communicationID
        value,
      };
      target.send(msg);
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      resolve('OK');
    });
  }

  get(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedMemory__.get(key));
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      resolve('OK');
    });
  }
}

Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__

Une brève discussion sur la façon dimplémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)🎜🎜Oui Utilisez un attribut dans la classe Manager comme objet de mémoire partagée. La manière d'accéder à l'objet est la même que la manière d'accéder aux objets JS ordinaires, puis créez une couche d'encapsulation pour exposer uniquement get , <code>set, remove et d'autres opérations de base pour empêcher la modification directe de l'attribut. 🎜🎜Étant donné que le processus Master sera créé avant tous les processus Worker, vous pouvez créer le processus Worker après avoir déclaré l'espace mémoire partagé dans le processus Master pour garantir que chaque processus Worker peut accéder à la mémoire partagée immédiatement après sa création. 🎜🎜Pour simplifier l'utilisation, nous pouvons concevoir SharedMemory comme un singleton, de sorte qu'il n'y ait qu'une seule instance dans chaque processus, et SharedMemory puisse être importé code> Utilisez-le directement après le code>. 🎜<h2>🎜Implémentation du code🎜</h2>🎜🎜<span style="max-width:90%">Contrôle de lecture et d'écriture et communication IPC</span>🎜🎜🎜Implémentez d'abord l'interface externe <code>SharedMemory code>, nous n'utilisons pas ici la méthode permettant à <code>Manager et Worker d'hériter de SharedMemory, mais de laisser SharedMemory code> être utilisé dans l'instance Une fois transformé, une instance de Manager ou Worker est renvoyée pour réaliser la sélection automatique des sous-classes. 🎜
🎜Dans le nœud 16isPrimary remplace isMaster Deux méthodes d'écriture sont utilisées ici pour des raisons de compatibilité. 🎜
// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}
🎜Manager est responsable de la gestion de l'espace mémoire partagé. Nous ajoutons directement l'attribut __sharedMemory__ à l'objet Manager, car. ce sont également les objets JS qui seront inclus dans la gestion du garbage collection de JS, nous n'avons donc pas besoin d'effectuer des opérations telles que le nettoyage de la mémoire et la migration des données, ce qui rend la mise en œuvre très simple. Ensuite, les opérations standard telles que set, get et remove sont définies dans __sharedMemory__ pour fournir des méthodes d'accès. 🎜🎜Nous écoutons l'événement de création du processus de travail via cluster.on('online', callback) et utilisons worker.on('message', callback)immédiatement après la création du code> pour surveiller la communication IPC du processus de travail et transmettre le message de communication à la fonction handle pour traitement. 🎜🎜La responsabilité de la fonction handle est de distinguer le type d'opération que le processus de travail souhaite effectuer, de retirer les paramètres de l'opération et de les confier au setcorrespondant >, fonction get , remove (notez qu'elle n'est pas set, get, remove code> dans <code>__sharedMemory__) Traiter et renvoyer les résultats traités au processus de travail. 🎜
// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}
🎜Worker utilise process.on pour surveiller le message de retour du processus maître depuis la création de l'objet (après tout, vous ne pouvez pas attendre que le message soit envoyé avant de le surveiller, alors il sera trop tard) ). Quant au rôle de l'objet __getCallbacks__, nous en reparlerons plus tard. À ce stade, l'objet Worker est créé. 🎜🎜Lorsque le projet s'exécutera plus tard, si vous souhaitez accéder à la mémoire partagée, vous appellerez set de Worker, get et remove, ils appelleront la fonction handle pour envoyer le message au processus maître via process.send. récupère l'opération à effectuer lors du renvoi du résultat. Documenté dans __getCallbacks__. Lorsque le résultat revient, il sera surveillé par la fonction précédente dans process.on, et la fonction de rappel correspondante sera extraite de __getCallbacks__ et exécutée. 🎜

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.js
const cluster = require(&#39;cluster&#39;);
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === &#39;function&#39;) {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;set&#39;, key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle(&#39;get&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle(&#39;remove&#39;, key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间 进程A 进程B 共享内存中变量x的值
t0

0
t1 读取x(x=0)
0
t2 x1=x+1(x1=1) 读取x(x=0) 0
t3 将x1的值写回x x2=x+1(x2=1) 1
t4
将x2的值写回x 1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.js
const { v4: uuid4 } = require(&#39;uuid&#39;);

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve(&#39;OK&#39;);
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.js
class Worker {
  getLock(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLock&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      this.handle(&#39;releaseLock&#39;, key, lockId, (value) => {
        resolve(value);
      });
    });
  }
  ...
}

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

Une brève discussion sur la façon dimplémenter la mémoire partagée dans le modèle multi-processus Node.js (explication détaillée du code)

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.js
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      listeners: {},
    };
  }

  handle(data, target) {
    if (data.method === &#39;listen&#39;) {
      this.listen(data.key, (value) => {
        const msg = {
          isNotified: true,
          id: data.id,
          uuid: data.uuid,
          value,
        };
        target.send(msg);
      });
    } else {
      ...
    }
  }

  notifyListener(key) {
    const listeners = this.__sharedMemory__.listeners[key];
    if (listeners?.length > 0) {
      Promise.all(
        listeners.map(
          (callback) =>
            new Promise((resolve) => {
              callback(this.__sharedMemory__.get(key));
              resolve();
            })
        )
      );
    }
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      this.notifyListener(key);
      resolve(&#39;OK&#39;);
    });
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.__sharedMemory__.listeners[key] =
        this.__sharedMemory__.listeners[key] ?? [];
      this.__sharedMemory__.listeners[key].push(callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.js
class Worker {
  constructor() {
    ...
    this.__getListenerCallbacks__ = {};

    process.on(&#39;message&#39;, (data) => {
      if (data.isNotified) {
        const callback = this.__getListenerCallbacks__[data.uuid];
        if (callback && typeof callback === &#39;function&#39;) {
          callback(data.value);
        }
      } else {
        ...
      }
    });
  }

  handle(method, key, value, callback) {
    ...
    if (method === &#39;listen&#39;) {
      this.__getListenerCallbacks__[uuid] = callback;
    } else {
      this.__getCallbacks__[uuid] = callback;
    }
  }

  listen(key, callback) {
    if (typeof callback === &#39;function&#39;) {
      this.handle(&#39;listen&#39;, key, null, callback);
    } else {
      throw new Error(&#39;a listener must have a callback.&#39;);
    }
  }
  ...
}

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.js
const LRU = require(&#39;lru-cache&#39;);

class Manager {
  constructor() {
    ...
    this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };
    this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
  }

  getLRU(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedLRUMemory__.get(key));
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.set(key, value);
      resolve(&#39;OK&#39;);
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.del(key);
      resolve(&#39;OK&#39;);
    });
  }
  ...
}

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.js
class Worker {
  getLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;getLRU&#39;, key, null, (value) => {
        resolve(value);
      });
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.handle(&#39;setLRU&#39;, key, value, () => {
        resolve();
      });
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.handle(&#39;removeLRU&#39;, key, null, () => {
        resolve();
      });
    });
  }
  ...
}

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require(&#39;cluster&#39;);
// 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象
require(&#39;cluster-shared-memory&#39;);

if (cluster.isMaster) {
  // 在 master 进程中 fork 子进程
  for (let i = 0; i < 2; i++) {
    cluster.fork();
  }
} else {
  const sharedMemoryController = require(&#39;./src/shared-memory&#39;);
  const obj = {
    name: &#39;Tom&#39;,
    age: 10,
  };
  
  // 写对象
  await sharedMemoryController.set(&#39;myObj&#39;, obj);
  
  // 读对象
  const myObj = await sharedMemoryController.get(&#39;myObj&#39;);
  
  // 互斥访问对象,首先获得对象的锁
  const lockId = await sharedMemoryController.getLock(&#39;myObj&#39;);
  const newObj = await sharedMemoryController.get(&#39;myObj&#39;);
  newObj.age = newObj.age + 1;
  await sharedMemoryController.set(&#39;myObj&#39;, newObj);
  // 操作完之后释放锁
  await sharedMemoryController.releaseLock(&#39;requestTimes&#39;, lockId);
  
  // 或者使用 mutex 函数自动获取和释放锁
  await sharedMemoryController.mutex(&#39;myObj&#39;, async () => {
    const newObjM = await sharedMemoryController.get(&#39;myObj&#39;);
    newObjM.age = newObjM.age + 1;
    await sharedMemoryController.set(&#39;myObj&#39;, newObjM);
  });
  
  // 监听对象
  sharedMemoryController.listen(&#39;myObj&#39;, (value) => {
    console.log(`myObj: ${value}`);
  });
  
  //写LRU缓存
  await sharedMemoryController.setLRU(&#39;cacheItem&#39;, {user: &#39;Tom&#39;});
  
  // 读对象
  const cacheItem = await sharedMemoryController.getLRU(&#39;cacheItem&#39;);
}

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

更多编程相关知识,请访问:编程视频!!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer