本篇文章跟大家探討一下Node.js利用多個核心的方法--worker_threads模組提供的多執行緒模型,介紹一下Node.js多進程模型中實作共享記憶體的方法。
Node.js 由於其單執行緒模型的設計,導致一個Node進程(的主執行緒)只能利用一個CPU核心,然而現在的機器基本上都是多核心的,這造成了嚴重的性能浪費。通常來說,想要利用到多個核心一般有以下的方法:
編寫Node的C 外掛擴充執行緒池,並在JS程式碼中將CPU耗時任務委託給其它線程處理。
使用worker_threads模組提供的多執行緒模型(尚在實驗階段)。
使用child_process 或 #cluster模組提供的多進程模型,每個進程都是獨立的Node.js進程。
從易用、程式碼入侵性、穩定性的角度來說,多進程模型通常是首要的選擇。 【推薦學習:《nodejs 教學》】
Node.js cluster 多進程模型存在的問題
##在cluster模組提供的多進程模型中,每個Node進程都是一個獨立且完整的應用程式,有自己的記憶體空間,其它進程無法存取。因此雖然在專案啟動時,所有Worker進程具有一致的狀態和行為,但在之後的運行中無法保證其狀態維持一致。
例如,專案啟動時有兩個Worker進程,進程A和進程B,兩個進程都宣告了變數a=1。但之後項目接收到一個請求,Master進程將其分派給進程A來處理,這個請求將a的值變更為了2,那麼此時進程A的內存空間中a=2,但是進程B的內存空間中a依舊是1。此時如果有個請求讀取a的值,Master進程將這個請求分派給進程A和進程B時讀取到的結果是不一致的,這就出現了一致性問題。 cluster模組在設計時並沒有給出解決方案,而是要求Worker進程是無狀態的,即程式設計師在寫程式碼時不應該允許在處理請求時修改記憶體中的值,以此來保障所有Worker進程的一致性。然而在實務上總會有各種各樣的情況需要寫內存,例如記錄用戶的登入狀態等,在許多企業的實踐中,通常會把這些狀態資料記錄在外部,例如資料庫、redis、訊息佇列、檔案系統等,每次處理有狀態請求時會讀寫外部儲存空間。
這不失為一種有效的做法,然而這需要額外引入一個外部儲存空間,同時還要自行處理多進程並發存取下的一致性問題,自行維護資料的生命週期(因為Node進程和維護在外部的資料並不是同步建立和銷毀的),以及在高並發存取情況下的IO效能瓶頸(如果是儲存在資料庫等非記憶體環境中)。其實本質上來說,我們只是需要一個可供多個進程共享存取的空間罷了,並不需要持久化存儲,這段空間的生命週期最好與Node進程強綁定,這樣在使用時能省去不少麻煩。因此跨進程的共享記憶體就成了最適合在這種場景使用的方式。
Node.js 的共享記憶體
很遺憾Node本身並未提供共享記憶體的實現,因此我們可以看看npm倉庫中第三方庫的實作。這些函式庫有些是透過C 插件擴充Node的函數實現的,有些是透過Node提供的IPC機制實現的,但很遺憾它們的實作都很簡單,並未提供互斥存取、物件監聽等功能,這使得使用者必須自己小心維護這段共享內存,否則就會導致時序問題。 轉了一圈下來沒找到我想要的。 。 。那就算了,我自己寫一個。#以JS物件為基本單位進行讀寫存取。
能夠進程間互斥訪問,當一個進程訪問時,其它進程被阻塞。
能夠監聽共享記憶體中的對象,當對象改變的時候監聽的進程能被通知到。
在滿足上述條件的前提下,實作方式盡可能簡單。
可以發現,其實我們並不需要作業系統層面的共享內存,只需要能夠多個Node進程能存取同一個物件就行了,那麼就可以在Node本身所提供的機制上實現。 可以使用Master進程的一段記憶體空間作為共享記憶體空間,Worker進程透過IPC將讀寫請求委託給Master進程,由Master進程進行讀寫,然後再透過IPC將結果回傳給Worker進程。
為了讓共享記憶體的使用方式在Master進程和Worker進程中一致,我們可以將對共享記憶體的操作抽離成一個接口,在Master進程和Worker進程中各自實現這個接口。類別圖如下圖所示,用一個SharedMemory
類別作為抽象接口,在server.js
入口檔案中聲明該物件。其在Master進程中實例化為Manager
對象,在Worker進程中實例化為Worker
物件。 Manager
物件來維護共享內存,並處理對共享記憶體的讀寫請求,而Worker
物件則將讀寫請求傳送到Master進程。
可以使用Manager
類別中的一個屬性作為共享記憶體對象,存取該對象的方式與存取普通JS對象的方式一致,然後再做一層封裝,只暴露get
、set
、remove
等基本操作,避免該屬性直接被修改。
由於Master進程會優先於所有Worker進程創建,因此,可以在Master進程中聲明共享內存空間之後再創建Worker進程,以此來保證每個Worker進程創建後都可以立即訪問共享內存。
為了使用簡單,我們可以將SharedMemory
設計成單例,這樣每個行程中就只有一個實例,並且可以在import
了SharedMemory
之後直接使用。
讀寫控制與IPC通訊
先實作對外介面SharedMemory
類,這裡沒有使用讓Manager
和Worker
繼承SharedMemory
的方式,而是讓SharedMemory
在實例化的時候回傳一個Manager
或Worker
的實例,從而實現自動選擇子類別。
在Node 16中
isPrimary
取代了isMaster
,這裡為了相容使用了兩個寫法。
// shared-memory.js class SharedMemory { constructor() { if (cluster.isMaster || cluster.isPrimary) { return new Manager(); } else { return new Worker(); } } }
Manager
負責管理共享記憶體空間,我們直接在Manager
物件中增加__sharedMemory__
屬性,由於本身也是JS對象,會被納入JS的垃圾回收管理中,因此我們不需要進行記憶體清理、資料遷移等操作,使得實作上非常簡潔。之後在__sharedMemory__
之中定義set
、get
、remove
等標準操作來提供存取方式。
我們透過cluster.on('online', callback)
來監聽worker進程的創建事件,並在創建後立即用worker.on('message', callback )
來監聽來自worker進程的IPC通信,並把通信訊息交給handle
函數處理。
handle
函數的職責是區分worker進程是想進行哪一個操作,並取出操作的參數委託給對應的set
、get
、remove
函數(注意不是__sharedMemory__
中的set
、get
、remove
)進行處理,並將處理後的結果回饋給worker進程。
// manager.js const cluster = require('cluster'); class Manager { constructor() { this.__sharedMemory__ = { set(key, value) { this.memory[key] = value; }, get(key) { return this.memory[key]; }, remove(key) { delete this.memory[key]; }, memory: {}, }; // Listen the messages from worker processes. cluster.on('online', (worker) => { worker.on('message', (data) => { this.handle(data, worker); return false; }); }); } handle(data, target) { const args = data.value ? [data.key, data.value] : [data.key]; this[data.method](...args).then((value) => { const msg = { id: data.id, // workerId uuid: data.uuid, // communicationID value, }; target.send(msg); }); } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); resolve('OK'); }); } get(key) { return new Promise((resolve) => { resolve(this.__sharedMemory__.get(key)); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); resolve('OK'); }); } }
Worker
自物件建立開始就使用process.on
監聽來自Master進程的回傳訊息(畢竟不能等訊息發送出去以後再監聽吧,那就來不及了)。至於__getCallbacks__
物件的作用一會兒再說。此時Worker
物件便建立完成。
之後項目運行到某個地方的時候,如果要存取共享內存,就會調用Worker
的set
、get
、remove
函數,它們又會呼叫handle
函數將訊息透過process.send
傳送到master進程,同時,將得到傳回結果時要進行的動作記錄在在 __getCallbacks__
中。當結果回傳時,會被先前在process.on
中的函數監聽到,並從__getCallbacks__
#中取出對應的回呼函數,並執行。
因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } }
一次共享内存访问的完整流程是:调用Worker
的set
/get
/remove
函数 -> 调用Worker
的handle
函数,向master进程通信并将回调函数记录在__getCallbacks__
-> master进程监听到来自worker进程的消息 -> 调用Manager
的handle
函数 -> 调用Manager
的set
/get
/remove
函数 -> 调用__sharedMemory__
的set
/get
/remove
函数 -> 操作完成返回Manager
的set
/get
/remove
函数 -> 操作完成返回handle
函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__
中取出回调函数并执行。
互斥访问
到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:
时间 | 进程A | 进程B | 共享内存中变量x的值 |
---|---|---|---|
t0 | 0 | ||
t1 | 读取x(x=0) | 0 | |
t2 | x1=x+1(x1=1) | 读取x(x=0) | 0 |
t3 | 将x1的值写回x | x2=x+1(x2=1) | 1 |
t4 | 将x2的值写回x | 1 |
进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。
在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。
为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。
在Manager
的__sharedMemory__
中加入locks
属性,用来记录哪个对象的锁被拿走了,lockRequestQueues
属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock
函数和releaseLock
函数,用来申请和归还锁,以及handleLockRequest
函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues
队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest
检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks
中对应的记录删掉,然后再调用handleLockRequest
让队首的任务获得锁。
// manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... }
在Worker
中,则是增加getLock
和releaseLock
两个函数,行为与get
、set
类似,都是调用handle
函数。
// worker.js class Worker { getLock(key) { return new Promise((resolve) => { this.handle('getLock', key, null, (value) => { resolve(value); }); }); } releaseLock(key, lockId) { return new Promise((resolve) => { this.handle('releaseLock', key, lockId, (value) => { resolve(value); }); }); } ... }
监听对象
有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set
属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager
的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。
为此,我们先在__sharedMemory__
中加入listeners
属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen
函数,其将监听回调函数记录到__sharedMemory__.listeners
中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在set
和remove
函数返回前调用notifyListener
,将所有记录在__sharedMemory__.listeners
中监听该对象的所有函数取出并调用。
// manager.js class Manager { constructor() { this.__sharedMemory__ = { ... listeners: {}, }; } handle(data, target) { if (data.method === 'listen') { this.listen(data.key, (value) => { const msg = { isNotified: true, id: data.id, uuid: data.uuid, value, }; target.send(msg); }); } else { ... } } notifyListener(key) { const listeners = this.__sharedMemory__.listeners[key]; if (listeners?.length > 0) { Promise.all( listeners.map( (callback) => new Promise((resolve) => { callback(this.__sharedMemory__.get(key)); resolve(); }) ) ); } } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); this.notifyListener(key); resolve('OK'); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); this.notifyListener(key); resolve('OK'); }); } listen(key, callback) { if (typeof callback === 'function') { this.__sharedMemory__.listeners[key] = this.__sharedMemory__.listeners[key] ?? []; this.__sharedMemory__.listeners[key].push(callback); } else { throw new Error('a listener must have a callback.'); } } ... }
在Worker
中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__
属性用来记录监听操作的回调函数,与__getCallbacks__
不同,它里面的函数在收到master的回信之后不会删除。
// worker.js class Worker { constructor() { ... this.__getListenerCallbacks__ = {}; process.on('message', (data) => { if (data.isNotified) { const callback = this.__getListenerCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } } else { ... } }); } handle(method, key, value, callback) { ... if (method === 'listen') { this.__getListenerCallbacks__[uuid] = callback; } else { this.__getCallbacks__[uuid] = callback; } } listen(key, callback) { if (typeof callback === 'function') { this.handle('listen', key, null, callback); } else { throw new Error('a listener must have a callback.'); } } ... }
LRU缓存
有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager
再增加一个共享内存__sharedLRUMemory__
,其为一个lru-cache
实例,并增加getLRU
、setLRU
、removeLRU
函数,与set
、get
、remove
函数类似。
// manager.js const LRU = require('lru-cache'); class Manager { constructor() { ... this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 }; this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions); } getLRU(key) { return new Promise((resolve) => { resolve(this.__sharedLRUMemory__.get(key)); }); } setLRU(key, value) { return new Promise((resolve) => { this.__sharedLRUMemory__.set(key, value); resolve('OK'); }); } removeLRU(key) { return new Promise((resolve) => { this.__sharedLRUMemory__.del(key); resolve('OK'); }); } ... }
Worker
中也增加getLRU
、setLRU
、removeLRU
函数。
// worker.js class Worker { getLRU(key) { return new Promise((resolve) => { this.handle('getLRU', key, null, (value) => { resolve(value); }); }); } setLRU(key, value) { return new Promise((resolve) => { this.handle('setLRU', key, value, () => { resolve(); }); }); } removeLRU(key) { return new Promise((resolve) => { this.handle('removeLRU', key, null, () => { resolve(); }); }); } ... }
目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库,欢迎pull request和报bug),可以直接通过npm安装:
npm i cluster-shared-memory
下面的示例包含了基本使用方法:
const cluster = require('cluster'); // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象 require('cluster-shared-memory'); if (cluster.isMaster) { // 在 master 进程中 fork 子进程 for (let i = 0; i < 2; i++) { cluster.fork(); } } else { const sharedMemoryController = require('./src/shared-memory'); const obj = { name: 'Tom', age: 10, }; // 写对象 await sharedMemoryController.set('myObj', obj); // 读对象 const myObj = await sharedMemoryController.get('myObj'); // 互斥访问对象,首先获得对象的锁 const lockId = await sharedMemoryController.getLock('myObj'); const newObj = await sharedMemoryController.get('myObj'); newObj.age = newObj.age + 1; await sharedMemoryController.set('myObj', newObj); // 操作完之后释放锁 await sharedMemoryController.releaseLock('requestTimes', lockId); // 或者使用 mutex 函数自动获取和释放锁 await sharedMemoryController.mutex('myObj', async () => { const newObjM = await sharedMemoryController.get('myObj'); newObjM.age = newObjM.age + 1; await sharedMemoryController.set('myObj', newObjM); }); // 监听对象 sharedMemoryController.listen('myObj', (value) => { console.log(`myObj: ${value}`); }); //写LRU缓存 await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'}); // 读对象 const cacheItem = await sharedMemoryController.getLRU('cacheItem'); }
这种实现目前尚有几个缺点:
不能使用PM2的自动创建worker进程的功能。
由于PM2会使用自己的
cluster
模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。
传输的对象必须可序列化,且不能太大。
如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。
原文地址:https://juejin.cn/post/6992091006220894215
作者:FinalZJY
更多编程相关知识,请访问:编程视频!!
以上是淺談Node.js多進程模型中如何實作共享記憶體(程式碼詳解)的詳細內容。更多資訊請關注PHP中文網其他相關文章!