首頁  >  文章  >  web前端  >  node實作Cluster共享記憶體詳解

node實作Cluster共享記憶體詳解

小云云
小云云原創
2018-01-20 10:53:161654瀏覽

本文主要介紹了node 利用進程通訊實現Cluster共享內存,小編覺得還挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧,希望能幫助大家。

Node.js的標準API並沒有提供進程共享內存,然而透過IPC介面的send方法和對message事件的監聽,就可以實現一個多進程之間的協同機制,透過通訊來操作共享記憶體。

##IPC的基本用法:


#
// worker进程 发送消息
process.send(‘读取共享内存');
 
// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
   // 有worker进程建立,即开始监听message事件
   worker.on(‘message', function(data) {
     // 处理来自worker的请求
     // 回传结果
     worker.send(‘result')
   });
});

在Node.js中,透過send和on(' message', callback)實現的IPC通訊有幾個特點。首先,master和worker之間可以互相通信,而各個worker之間不能直接通信,但是worker之間可以透過master轉發實現間接通信。另外,透過send方法傳遞的數據,會先被JSON.stringify處理後再傳遞,接收後會再用JSON.parse解析。所以Buffer物件傳遞後會變成數組,而function則無法直接傳遞。反過來說,就是可以直接傳遞除了buffer和function之外的所有資料型別(已經很強大了,而且buffer和function也可以用變通的方法實作傳遞)。

基於以上特點,我們可以設計一個透過IPC來共享記憶體的方案:

1、worker進程作為共享記憶體的使用者,並不會直接操作共享內存,而是透過send方法通知master程序進行寫入(set)或讀取(get)操作。

2、master進程初始化一個Object物件作為共享內存,並根據worker發送的message,對Object的鍵值進行讀寫。

3、由於要使用跨進程通信,所以worker發起的set和get都是異步操作,master根據請求進行實際讀寫操作,然後將結果返回給worker(即把結果數據send給worker )。

##資料格式

為了實現進程間非同步的讀寫功能,需要對通訊資料的格式做一點規範。

首先是worker的請求資料:


requestMessage = {
  isSharedMemoryMessage: true, // 表示这是一次共享内存的操作通信
  method: ‘set', // or ‘get' 操作的方法
  id: cluster.worker.id, // 发起操作的进程(在一些特殊场景下,用于保证master可以回信)
  uuid: uuid, // 此次操作的(用于注册/调用回调函数)
  key: key, // 要操作的键
  value: value // 键对应的值(写入)
}

master在接到資料後,會根據method執行對應操作,然後根據requestMessage.id將結果資料發給對應的worker,資料格式如下:


responseMessage = {
  isSharedMemoryMessage: true, // 标记这是一次共享内存通信
  uuid: requestMessage.uuid, // 此次操作的唯一标示
  value: value // 返回值。get操作为key对应的值,set操作为成功或失败
}

規範資料格式的意義在於,master在接收到請求後,能夠將處理結果傳送給對應的worker ,而worker在接到回傳的結果後,能夠呼叫此次通訊對應的callback,從而實現協同。

規範資料格式後,接下來要做的就是設計兩套程式碼,分別用於master進程和worker進程,監聽通訊並處理通訊數據,實現共享記憶體的功能。

##User類別

User類別的實例在worker進程中運作,負責傳送操作共享記憶體的請求,並監聽master的回信。


var User = function() {
  var self = this;
  self.__uuid__ = 0;
 
  // 缓存回调函数
  self.__getCallbacks__ = {};
 
  // 接收每次操作请求的回信
  process.on('message', function(data) {
    
    if (!data.isSharedMemoryMessage) return;
    // 通过uuid找到相应的回调函数
    var cb = self.__getCallbacks__[data.uuid];
    if (cb && typeof cb == 'function') {
      cb(data.value)
    }
    // 卸载回调函数
    self.__getCallbacks__[data.uuid] = undefined;
  });
};
 
// 处理操作
User.prototype.handle = function(method, key, value, callback) {
 
  var self = this;
  var uuid = self.__uuid__++;
 
  process.send({
    isSharedMemoryMessage: true,
    method: method,
    id: cluster.worker.id,
    uuid: uuid,
    key: key,
    value: value
  });
 
  // 注册回调函数
  self.__getCallbacks__[uuid] = callback;
 
};
 
User.prototype.set = function(key, value, callback) {
  this.handle('set', key, value, callback);
};
 
User.prototype.get = function(key, callback) {
  this.handle('get', key, null, callback);
};

##Manager類別

#Manager類別的實例在master進程中工作,用於初始化一個Object作為共享內存,並根據User實例的請求,在共享記憶體中增加鍵值對,或讀取鍵值,然後將結果傳回。


var Manager = function() {
 
  var self = this;
  
  // 初始化共享内存
  self.__sharedMemory__ = {};
    
  // 监听并处理来自worker的请求
  cluster.on('online', function(worker) {
    worker.on('message', function(data) {
      // isSharedMemoryMessage是操作共享内存的通信标记
      if (!data.isSharedMemoryMessage) return;
      self.handle(data);
    });
  });
};
 
Manager.prototype.handle = function(data) {
  var self = this;
  var value = this[data.method](data);
 
  var msg = {
    // 标记这是一次共享内存通信
    isSharedMemoryMessage: true,       
    // 此次操作的唯一标示
    uuid: data.uuid,
    // 返回值
    value: value
  };
 
  cluster.workers[data.id].send(msg);
};
 
// set操作返回ok表示成功
Manager.prototype.set = function(data) {
  this.__sharedMemory__[data.key] = data.value;
  return 'OK';
};
 
// get操作返回key对应的值
Manager.prototype.get = function(data) {
  return this.__sharedMemory__[data.key];
};

##使用方法


#
if (cluster.isMaster) {
 
  // 初始化Manager的实例
  var sharedMemoryManager = new Manager();
 
  // fork第一个worker
  cluster.fork();
 
  // 1秒后fork第二个worker
  setTimeout(function() {
    cluster.fork();
  }, 1000);
   
} else {
 
  // 初始化User类的实例
  var sharedMemoryUser = new User();
 
  if (cluster.worker.id == 1) {
    // 第一个worker向共享内存写入一组数据,用a标记
    sharedMemoryUser.set('a', [0, 1, 2, 3]);
  }
 
  if (cluster.worker.id == 2) {
    // 第二个worker从共享内存读取a的值
    sharedMemoryUser.get('a', function(data) {
      console.log(data); // => [0, 1, 2, 3]
    });
  }
  
}

以上就是一個透過IPC通訊實現的多進程共享記憶體功能,需要注意的是,這種方法是直接在master進程的記憶體裡快取數據,必須注意記憶體的使用情況,這裡可以考慮加入一些簡單的淘汰策略,優化記憶體的使用。另外,如果單次讀寫的資料比較大,IPC通訊的耗時也會隨之增加。

完整程式碼:https://github.com/x6doooo/sharedmemory

相關推薦:

##有關laravel5.2和redis_cluster配置的介紹

MySQL之-Cluster集群建置(基於手動編譯安裝包)詳解

關於共享記憶體通訊的詳細介紹

以上是node實作Cluster共享記憶體詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn