Home >Web Front-end >JS Tutorial >Detailed explanation of node implementation of Cluster shared memory

Detailed explanation of node implementation of Cluster shared memory

小云云
小云云Original
2018-01-20 10:53:161750browse

This article mainly introduces node to use process communication to realize Cluster shared memory. The editor thinks it is quite good. Now I will share it with you and give it as a reference. Let’s follow the editor to take a look, I hope it can help everyone.

The standard API of Node.js does not provide process shared memory. However, through the send method of the IPC interface and monitoring of message events, a collaboration mechanism between multiple processes can be realized through communication. Operate shared memory.

##Basic usage of IPC:


// worker进程 发送消息
process.send(‘读取共享内存');
 
// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
   // 有worker进程建立,即开始监听message事件
   worker.on(‘message', function(data) {
     // 处理来自worker的请求
     // 回传结果
     worker.send(‘result')
   });
});

In Node.js, through send and on(' message', callback) has several characteristics. First of all, master and workers can communicate with each other, but workers cannot communicate directly with each other, but workers can communicate indirectly through master forwarding. In addition, the data passed through the send method will be processed by JSON.stringify before being passed. After being received, it will be parsed using JSON.parse. Therefore, the Buffer object will become an array after being passed, but the function cannot be passed directly. On the other hand, all data types except buffer and function can be transferred directly (which is already very powerful, and buffer and function can also be transferred using alternative methods).

Based on the above characteristics, we can design a solution to share memory through IPC:

1. As a user of shared memory, the worker process does not directly operate the shared memory, but through send The method notifies the master process to perform a write (set) or read (get) operation.

2. The master process initializes an Object object as shared memory, and reads and writes the key value of the Object according to the message sent by the worker.

3. Since cross-process communication is used, the set and get initiated by the worker are asynchronous operations. The master performs actual read and write operations according to the request, and then returns the results to the worker (that is, sends the result data to the worker ).

##Data format

In order to realize asynchronous reading and writing functions between processes, the format of communication data needs to be standardized.

The first is the worker's request data:


requestMessage = {
  isSharedMemoryMessage: true, // 表示这是一次共享内存的操作通信
  method: ‘set', // or ‘get' 操作的方法
  id: cluster.worker.id, // 发起操作的进程(在一些特殊场景下,用于保证master可以回信)
  uuid: uuid, // 此次操作的(用于注册/调用回调函数)
  key: key, // 要操作的键
  value: value // 键对应的值(写入)
}

After receiving the data, the master will perform the corresponding operations according to the method, and then send the result according to requestMessage.id The data is sent to the corresponding worker. The data format is as follows:


responseMessage = {
  isSharedMemoryMessage: true, // 标记这是一次共享内存通信
  uuid: requestMessage.uuid, // 此次操作的唯一标示
  value: value // 返回值。get操作为key对应的值,set操作为成功或失败
}

The significance of standardizing the data format is that after receiving the request, the master can send the processing result to the corresponding worker. , and after receiving the returned result, the worker can call the callback corresponding to this communication to achieve collaboration.

After standardizing the data format, the next thing to do is to design two sets of codes, respectively for the master process and the worker process, to monitor communication and process communication data to realize the function of shared memory.

##User class

Instances of the User class work in the worker process and are responsible for sending requests to operate shared memory and listening for replies from the master.


var User = function() {
  var self = this;
  self.__uuid__ = 0;
 
  // 缓存回调函数
  self.__getCallbacks__ = {};
 
  // 接收每次操作请求的回信
  process.on('message', function(data) {
    
    if (!data.isSharedMemoryMessage) return;
    // 通过uuid找到相应的回调函数
    var cb = self.__getCallbacks__[data.uuid];
    if (cb && typeof cb == 'function') {
      cb(data.value)
    }
    // 卸载回调函数
    self.__getCallbacks__[data.uuid] = undefined;
  });
};
 
// 处理操作
User.prototype.handle = function(method, key, value, callback) {
 
  var self = this;
  var uuid = self.__uuid__++;
 
  process.send({
    isSharedMemoryMessage: true,
    method: method,
    id: cluster.worker.id,
    uuid: uuid,
    key: key,
    value: value
  });
 
  // 注册回调函数
  self.__getCallbacks__[uuid] = callback;
 
};
 
User.prototype.set = function(key, value, callback) {
  this.handle('set', key, value, callback);
};
 
User.prototype.get = function(key, callback) {
  this.handle('get', key, null, callback);
};

##Manager class

Instances of the Manager class work in the master process and are used to initialize an Object as shared memory , and according to the request of the User instance, add key-value pairs in the shared memory, or read the key-value, and then send the result back.


var Manager = function() {
 
  var self = this;
  
  // 初始化共享内存
  self.__sharedMemory__ = {};
    
  // 监听并处理来自worker的请求
  cluster.on('online', function(worker) {
    worker.on('message', function(data) {
      // isSharedMemoryMessage是操作共享内存的通信标记
      if (!data.isSharedMemoryMessage) return;
      self.handle(data);
    });
  });
};
 
Manager.prototype.handle = function(data) {
  var self = this;
  var value = this[data.method](data);
 
  var msg = {
    // 标记这是一次共享内存通信
    isSharedMemoryMessage: true,       
    // 此次操作的唯一标示
    uuid: data.uuid,
    // 返回值
    value: value
  };
 
  cluster.workers[data.id].send(msg);
};
 
// set操作返回ok表示成功
Manager.prototype.set = function(data) {
  this.__sharedMemory__[data.key] = data.value;
  return 'OK';
};
 
// get操作返回key对应的值
Manager.prototype.get = function(data) {
  return this.__sharedMemory__[data.key];
};

##Usage method


##

if (cluster.isMaster) {
 
  // 初始化Manager的实例
  var sharedMemoryManager = new Manager();
 
  // fork第一个worker
  cluster.fork();
 
  // 1秒后fork第二个worker
  setTimeout(function() {
    cluster.fork();
  }, 1000);
   
} else {
 
  // 初始化User类的实例
  var sharedMemoryUser = new User();
 
  if (cluster.worker.id == 1) {
    // 第一个worker向共享内存写入一组数据,用a标记
    sharedMemoryUser.set('a', [0, 1, 2, 3]);
  }
 
  if (cluster.worker.id == 2) {
    // 第二个worker从共享内存读取a的值
    sharedMemoryUser.get('a', function(data) {
      console.log(data); // => [0, 1, 2, 3]
    });
  }
  
}

The above is achieved through IPC communication Multi-process shared memory function. It should be noted that this method caches data directly in the memory of the master process. You must pay attention to the memory usage. Here you can consider adding some simple elimination strategies to optimize memory usage. In addition, if the data read and written in a single time is relatively large, the time consuming of IPC communication will also increase accordingly.

Full code: https://github.com/x6doooo/sharedmemory


Related recommendations:


About laravel5.2 and redis_cluster Introduction to configuration

MySQL-Cluster cluster construction (based on manual compilation of installation packages) Detailed explanation

Detailed introduction to shared memory communication

The above is the detailed content of Detailed explanation of node implementation of Cluster shared memory. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn