Home  >  Article  >  Web Front-end  >  A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation)

A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation)

青灯夜游
青灯夜游forward
2021-08-05 10:26:034281browse

This article will discuss with youNode.jsUsing the multi-core method-the multi-threading model provided by the worker_threads module, we will introduce the method of realizing shared memory in the Node.js multi-process model.

A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation)

Node.js Due to its single-threaded model design, a Node process (the main thread) can only utilize one CPU core. However, today’s machines basically Multi-core, this causes a serious waste of performance. Generally speaking, if you want to take advantage of multiple cores, you generally have the following methods:

  • Write a C plug-in for Node to expand the thread pool, and delegate CPU time-consuming tasks to it in the JS code Other thread processing.

  • Use the multi-threading model provided by the worker_threads module (still experimental).

  • Use the multi-process model provided by the child_process or cluster module. Each process is an independent Node.js process.

From the perspective of ease of use, code intrusion, and stability, the multi-process model is usually the first choice. [Recommended learning: "nodejs Tutorial"]

Problems with the Node.js cluster multi-process model

in In the multi-process model provided by the cluster module, each Node process is an independent and complete application process, with its own memory space that cannot be accessed by other processes. Therefore Although all Worker processes have consistent status and behavior when the project is started, there is no guarantee that their status will remain consistent during subsequent runs.

For example, when the project is started, there are two Worker processes, process A and process B. Both processes declare the variable a=1. But then the project received a request, and the Master process assigned it to process A for processing. This request changed the value of a to 2. At this time, a=2 in the memory space of process A, but a=2 in the memory space of process B. Still 1. At this time, if there is a request to read the value of a, the results read when the Master process dispatches the request to process A and process B are inconsistent, which causes a consistency problem.

The cluster module did not provide a solution when designing, but required that the Worker process be stateless, that is, programmers should not be allowed to modify the values ​​​​in memory when processing requests when writing code. To ensure the consistency of all Worker processes. However, in practice, there are always various situations that require writing to memory, such as recording the user's login status. In the practice of many enterprises, usually record these status data externally, such as database, redis, Message queues, file systems, etc. will read and write external storage space each time a stateful request is processed.

This is an effective approach, However, this requires the introduction of an additional external storage space, and at the same time, it must handle the consistency issue under concurrent access by multiple processes and maintain the life cycle of the data by itself (because The Node process and the data maintained externally are not created and destroyed synchronously), and there is an IO performance bottleneck in the case of high concurrent access (if it is stored in a non-memory environment such as a database). In fact, essentially, we just need a space that can be shared and accessed by multiple processes. We do not need persistent storage. The life cycle of this space is best strongly bound to the Node process, so that we can save a lot of time when using it. Less hassle. Therefore, cross-process shared memory has become the most suitable method for use in this scenario.

Shared memory of Node.js

Unfortunately, Node itself does not provide an implementation of shared memory, so we can take a look at the npm repository Implementation of third-party libraries. Some of these libraries are implemented through C plug-ins that extend Node's functions, and some are implemented through the IPC mechanism provided by Node. Unfortunately, their implementations are very simple and do not provide mutually exclusive access, object monitoring and other functions, which makes using The author must carefully maintain this shared memory, otherwise it will cause timing problems.

I looked around and couldn’t find what I wanted. . . Forget it, I'll write one myself.

The design of shared memory

First of all, we must clarify what kind of shared memory is needed. I started based on my own needs (in order to use it in the project It stores state data accessed across processes), while taking into account versatility, so the following points will be first considered:

  • Using JS objects as the basic unit for read and write access.

  • #Can provide mutually exclusive access between processes. When one process accesses, other processes are blocked.

  • Can monitor objects in shared memory, and the monitoring process can be notified when the object changes.

  • #On the premise of meeting the above conditions, the implementation method should be as simple as possible.

It can be found that in fact we do not need shared memory at the operating system level. We only need to be able to have multiple Node processes access the same object. Then we can use Node Implemented on the mechanism provided by itself. You can use a memory space of the Master process as a shared memory space. The Worker process delegates read and write requests to the Master process through IPC, and the Master process reads and writes, and then returns the results to the Worker process through IPC.

In order to make the use of shared memory consistent in the Master process and the Worker process, we can abstract the operation of the shared memory into an interface, and implement this interface in the Master process and the Worker process respectively. . The class diagram is as shown below, using a SharedMemory class as the abstract interface, and declaring the object in the server.js entry file. It is instantiated as a Manager object in the Master process and as a Worker object in the Worker process. The Manager object maintains shared memory and handles read and write requests to shared memory, while the Worker object sends read and write requests to the Master process.

A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation)

You can use an attribute in the Manager class as a shared memory object. The way to access the object is the same as the way to access ordinary JS objects, and then Create a layer of encapsulation to expose only basic operations such as get, set, remove, etc. to prevent the property from being modified directly.

Since the Master process will be created prior to all Worker processes, you can create the Worker process after declaring the shared memory space in the Master process to ensure that each Worker process can access the shared memory immediately after it is created. .

For simplicity of use, we can design SharedMemory as a singleton, so that there is only one instance in each process, and it can be importSharedMemory Use it directly after .

Code implementation

Read and write control and IPC communication

First implement the external interfaceSharedMemory class, here we do not use the method of letting Manager and Worker inherit SharedMemory, but let SharedMemory be instantiated When you return an instance of Manager or Worker, you can automatically select a subclass.

In Node 16, isPrimary replaces isMaster. Two writing methods are used here for compatibility.

// shared-memory.js
class SharedMemory {
  constructor() {
    if (cluster.isMaster || cluster.isPrimary) {
      return new Manager();
    } else {
      return new Worker();
    }
  }
}

Manager is responsible for managing the shared memory space. We directly add the __sharedMemory__ attribute to the Manager object because it is also JS Objects will be included in the garbage collection management of JS, so we do not need to perform operations such as memory cleaning and data migration, making the implementation very simple. Then define standard operations such as set, get, remove in __sharedMemory__ to provide access methods.

We listen to the creation event of the worker process through cluster.on('online', callback), and immediately use worker.on('message', callback ) to monitor the IPC communication from the worker process and hand the communication message to the handle function for processing. The

handle function is responsible for distinguishing what kind of operation the worker process wants to perform, and taking out the parameters of the operation and entrusting them to the corresponding set, get, remove function (note that it is not set, get, remove in __sharedMemory__) for processing, and Return the processed results to the worker process.

// manager.js
const cluster = require('cluster');

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      set(key, value) {
        this.memory[key] = value;
      },
      get(key) {
        return this.memory[key];
      },
      remove(key) {
        delete this.memory[key];
      },
      memory: {},
    };

    // Listen the messages from worker processes.
    cluster.on('online', (worker) => {
      worker.on('message', (data) => {
        this.handle(data, worker);
        return false;
      });
    });
  }

  handle(data, target) {
    const args = data.value ? [data.key, data.value] : [data.key];
    this[data.method](...args).then((value) => {
      const msg = {
        id: data.id, // workerId
        uuid: data.uuid, // communicationID
        value,
      };
      target.send(msg);
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      resolve('OK');
    });
  }

  get(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedMemory__.get(key));
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      resolve('OK');
    });
  }
}

Worker Use process.on since the object was created to monitor the return message from the Master process (after all, you can’t wait for the message to be sent before listening, then That's too late). As for the role of the __getCallbacks__ object, we will talk about it later. At this point the Worker object is created.

Later, when the project runs somewhere, if you want to access the shared memory, Worker's set, get,# will be called. ##remove function, they will call the handle function to send the message to the master process through process.send. At the same time, the operations to be performed when the return result is obtained are recorded in __getCallbacks__ in. When the result returns, it will be monitored by the previous function in process.on, and the corresponding callback function will be taken out from __getCallbacks__ and executed.

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.js
const cluster = require('cluster');
const { v4: uuid4 } = require('uuid');

class Worker {
  constructor() {
    this.__getCallbacks__ = {};

    process.on('message', (data) => {
      const callback = this.__getCallbacks__[data.uuid];
      if (callback && typeof callback === 'function') {
        callback(data.value);
      }
      delete this.__getCallbacks__[data.uuid];
    });
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.handle('set', key, value, () => {
        resolve();
      });
    });
  }

  get(key) {
    return new Promise((resolve) => {
      this.handle('get', key, null, (value) => {
        resolve(value);
      });
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.handle('remove', key, null, () => {
        resolve();
      });
    });
  }

  handle(method, key, value, callback) {
    const uuid = uuid4(); // 每次通信的uuid
    process.send({
      id: cluster.worker.id,
      method,
      uuid,
      key,
      value,
    });
    this.__getCallbacks__[uuid] = callback;
  }
}

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间 进程A 进程B 共享内存中变量x的值
t0

0
t1 读取x(x=0)
0
t2 x1=x+1(x1=1) 读取x(x=0) 0
t3 将x1的值写回x x2=x+1(x2=1) 1
t4
将x2的值写回x 1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.js
const { v4: uuid4 } = require('uuid');

class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      locks: {},
      lockRequestQueues: {},
    };
  }

  getLock(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.lockRequestQueues[key] =
        this.__sharedMemory__.lockRequestQueues[key] ?? [];
      this.__sharedMemory__.lockRequestQueues[key].push(resolve);
      this.handleLockRequest(key);
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      if (lockId === this.__sharedMemory__.locks[key]) {
        delete this.__sharedMemory__.locks[key];
        this.handleLockRequest(key);
      }
      resolve('OK');
    });
  }

  handleLockRequest(key) {
    return new Promise((resolve) => {
      if (
        !this.__sharedMemory__.locks[key] &&
        this.__sharedMemory__.lockRequestQueues[key]?.length > 0
      ) {
        const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
        const lockId = uuid4();
        this.__sharedMemory__.locks[key] = lockId;
        callback(lockId);
      }
      resolve();
    });
  }
  ...
}

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.js
class Worker {
  getLock(key) {
    return new Promise((resolve) => {
      this.handle('getLock', key, null, (value) => {
        resolve(value);
      });
    });
  }

  releaseLock(key, lockId) {
    return new Promise((resolve) => {
      this.handle('releaseLock', key, lockId, (value) => {
        resolve(value);
      });
    });
  }
  ...
}

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation)

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.js
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...
      listeners: {},
    };
  }

  handle(data, target) {
    if (data.method === 'listen') {
      this.listen(data.key, (value) => {
        const msg = {
          isNotified: true,
          id: data.id,
          uuid: data.uuid,
          value,
        };
        target.send(msg);
      });
    } else {
      ...
    }
  }

  notifyListener(key) {
    const listeners = this.__sharedMemory__.listeners[key];
    if (listeners?.length > 0) {
      Promise.all(
        listeners.map(
          (callback) =>
            new Promise((resolve) => {
              callback(this.__sharedMemory__.get(key));
              resolve();
            })
        )
      );
    }
  }

  set(key, value) {
    return new Promise((resolve) => {
      this.__sharedMemory__.set(key, value);
      this.notifyListener(key);
      resolve('OK');
    });
  }

  remove(key) {
    return new Promise((resolve) => {
      this.__sharedMemory__.remove(key);
      this.notifyListener(key);
      resolve('OK');
    });
  }

  listen(key, callback) {
    if (typeof callback === 'function') {
      this.__sharedMemory__.listeners[key] =
        this.__sharedMemory__.listeners[key] ?? [];
      this.__sharedMemory__.listeners[key].push(callback);
    } else {
      throw new Error('a listener must have a callback.');
    }
  }
  ...
}

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.js
class Worker {
  constructor() {
    ...
    this.__getListenerCallbacks__ = {};

    process.on('message', (data) => {
      if (data.isNotified) {
        const callback = this.__getListenerCallbacks__[data.uuid];
        if (callback && typeof callback === 'function') {
          callback(data.value);
        }
      } else {
        ...
      }
    });
  }

  handle(method, key, value, callback) {
    ...
    if (method === 'listen') {
      this.__getListenerCallbacks__[uuid] = callback;
    } else {
      this.__getCallbacks__[uuid] = callback;
    }
  }

  listen(key, callback) {
    if (typeof callback === 'function') {
      this.handle('listen', key, null, callback);
    } else {
      throw new Error('a listener must have a callback.');
    }
  }
  ...
}

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.js
const LRU = require('lru-cache');

class Manager {
  constructor() {
    ...
    this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };
    this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
  }

  getLRU(key) {
    return new Promise((resolve) => {
      resolve(this.__sharedLRUMemory__.get(key));
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.set(key, value);
      resolve('OK');
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.__sharedLRUMemory__.del(key);
      resolve('OK');
    });
  }
  ...
}

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.js
class Worker {
  getLRU(key) {
    return new Promise((resolve) => {
      this.handle('getLRU', key, null, (value) => {
        resolve(value);
      });
    });
  }

  setLRU(key, value) {
    return new Promise((resolve) => {
      this.handle('setLRU', key, value, () => {
        resolve();
      });
    });
  }

  removeLRU(key) {
    return new Promise((resolve) => {
      this.handle('removeLRU', key, null, () => {
        resolve();
      });
    });
  }
  ...
}

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require('cluster');
// 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象
require('cluster-shared-memory');

if (cluster.isMaster) {
  // 在 master 进程中 fork 子进程
  for (let i = 0; i < 2; i++) {
    cluster.fork();
  }
} else {
  const sharedMemoryController = require(&#39;./src/shared-memory&#39;);
  const obj = {
    name: &#39;Tom&#39;,
    age: 10,
  };
  
  // 写对象
  await sharedMemoryController.set(&#39;myObj&#39;, obj);
  
  // 读对象
  const myObj = await sharedMemoryController.get(&#39;myObj&#39;);
  
  // 互斥访问对象,首先获得对象的锁
  const lockId = await sharedMemoryController.getLock(&#39;myObj&#39;);
  const newObj = await sharedMemoryController.get(&#39;myObj&#39;);
  newObj.age = newObj.age + 1;
  await sharedMemoryController.set(&#39;myObj&#39;, newObj);
  // 操作完之后释放锁
  await sharedMemoryController.releaseLock(&#39;requestTimes&#39;, lockId);
  
  // 或者使用 mutex 函数自动获取和释放锁
  await sharedMemoryController.mutex(&#39;myObj&#39;, async () => {
    const newObjM = await sharedMemoryController.get(&#39;myObj&#39;);
    newObjM.age = newObjM.age + 1;
    await sharedMemoryController.set(&#39;myObj&#39;, newObjM);
  });
  
  // 监听对象
  sharedMemoryController.listen(&#39;myObj&#39;, (value) => {
    console.log(`myObj: ${value}`);
  });
  
  //写LRU缓存
  await sharedMemoryController.setLRU(&#39;cacheItem&#39;, {user: &#39;Tom&#39;});
  
  // 读对象
  const cacheItem = await sharedMemoryController.getLRU(&#39;cacheItem&#39;);
}

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

更多编程相关知识,请访问:编程视频!!

The above is the detailed content of A brief discussion on how to implement shared memory in the Node.js multi-process model (detailed code explanation). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:掘金--FinalZJY. If there is any infringement, please contact admin@php.cn delete