本篇文章跟大家介紹一下Node.js中的多進程,了解Cluster 模組、Egg.js 多進程模型,希望對大家有幫助!
眾所周知,JS是單線程執行的,所有的非同步都是靠事件循環完成的,如果一個Web 服務僅有一個線程,那麼如何充分利用機器或容器的閒置資源呢?同時,當程式碼 Crash 之後沒有進行擷取異常,那麼執行緒就會退出,那麼基於 Node.js 的 Web 服務是如何保證整個應用程式的健全性的呢?
Node.js 提供了Cluster 模組解決上述問題,透過這個模組,開發者可以透過建立子程序的模式來建立一個集群,充分利用機器或容器的資源,同時該模組允許多個子程序監聽同一個連接埠。 【推薦學習:《nodejs 教學》】
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
首先從const cluster = require('cluster')
說起,這行程式碼匯入了Node 的Cluster 模組,而在Node 內部,Master 進程與Worker 進程引入的檔案卻不一樣,詳情請見如下程式碼:
'use strict'; const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrPrimary}`);
不同的檔案意味著兩種進程在執行中的表現也不一樣,例如:
// internal/cluster/master.js cluster.isWorker = false; cluster.isMaster = true; // internal/cluster/child.js cluster.isWorker = true; cluster.isMaster = false;
這也是為什麼Cluster 模組到處的變數能區分不同類型進程的原因,接下來讓我們分別從主、子進程兩個方向去了解具體的過程
在上述程式碼裡,Master進程並沒有做太多事情,只是根據CPU 數量去fork 子進程,那麼我們深入到源代碼里大致來看一下,相關描述均在代碼的註釋內
// lib/internal/cluster/master.js // 初始化cluster const cluster = new EventEmitter(); // 创建监听地址与server对应的map const handles = new SafeMap(); // 初始化 cluster.isWorker = false; cluster.isMaster = true; cluster.workers = {}; cluster.settings = {}; cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR; // Master distributes connections. // 自增的子进程id let ids = 0; // 向cluster添加fork方法 cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 为当前fork的子进程生成当前cluster内的唯一id const id = ++ids; // 创建子进程 const workerProcess = createWorkerProcess(id, env); // 创建对应的worker实例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件监听.... // 监听内部消息事件,并交由onmessage处理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster发出fork事件 process.nextTick(emitForkNT, worker); // 将worker实例放在cluster.workers中维护 cluster.workers[worker.id] = worker; // 返回worker return worker; }; // 创建子进程函数 function createWorkerProcess(id, env) { // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 执行参数 const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相关逻辑... // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); } // 内部消息事件处理函数 function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message); } // 获取server function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 创建当前子进程监听地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查询是否有已经创建好的该监听地址的server let handle = handles.get(key); // 没有对应的server则进行创建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子进程处理连接的方式,默认为轮询 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 将监听地址信息传入构造函数创建监听实例 handle = new constructor(key, address, message); // 缓存监听实例 handles.set(key, handle); } // 向server添加自定义信息,用于server发出listening事件后透传到worker if (!handle.data) handle.data = message.data; // 添加server发出listening事件后的回调函数通知子进程 handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); if (errno) handles.delete(key); // Gives other workers a chance to retry. send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
// lib/internal/cluster/round_robin_handle.js // 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息 function RoundRobinHandle(key, address, { port, fd, flags }) { // 初始化handle this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); // 监听文件描述符,不讨论 if (fd >= 0) this.server.listen({ fd }); // 监听ip:port else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); // 监听UNIX socket,不讨论 } else this.server.listen(address); // UNIX socket path. // 注册server发出listening事件的回调函数 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } // 添加worker,server发出listening事件后调用master.js中传入的回调函数 RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; // 删除worker,轮询时不再分配给该worker RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; for (const handle of this.handles) { handle.close(); } this.handles = []; this.handle.close(); this.handle = null; return true; }; // 轮询调度函数 RoundRobinHandle.prototype.distribute = function(err, handle) { ArrayPrototypePush(this.handles, handle); const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; // 将handle交给worker RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift(this.handles); if (handle === undefined) { this.free.set(worker.id, worker); // Add to ready queue again. return; } // 向该worker发出newconn事件 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
在每個子進程中,我們都建立了一個HTTP Server,然後執行listen
函數監聽8000 端口,而HTTP Server 實例是由Net Server 原型鏈繼承得到的, listen
函數即為Net Server 原型上的listen
函數,具體如下:
// lib/_http_server.js function Server(options, requestListener) { .... } ObjectSetPrototypeOf(Server.prototype, net.Server.prototype); ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js Server.prototype.listen = function(...args) { // 由于篇幅原因,省略一些参数nomolize和其他监听的处理 // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口 if (typeof options.port === 'number' || typeof options.port === 'string') { validatePort(options.port, 'options.port'); backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // 省略... }; // 集群监听函数 function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数 function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } } // 启用监听handle function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过 // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); let rval = null; // 篇幅原因,创建监听句柄的代码... this._handle = rval; } // 在this上设置的faux handle上设置onconnection函数用于监听连接进入 this._handle.onconnection = onconnection; }
同時,在開始解析的時候我們說過,在引入Cluster 模組的時候,會根據目前進程的env中是否包含NODE_UNIQUE_ID去判斷是否為子進程,若為子進程,則執行child.js
檔案
Tips:IPC 通訊中發送的message.cmd的值如果以NODE為前綴,它將回應一個內部事件internalMessage
// lib/internal/cluster/child.js // 初始化 const cluster = new EventEmitter(); // 存储生成的 faux handle const handles = new SafeMap(); // 存储监听地址与监听地址index的对应关系 const indexes = new SafeMap(); cluster.isWorker = true; cluster.isMaster = false; cluster.worker = null; cluster.Worker = Worker; // 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量 // 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数 cluster._setupWorker = function() { // 初始化worker实例 const worker = new Worker({ id: +process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online' }); cluster.worker = worker; // 处理断开连接事件 process.once('disconnect', () => { worker.emit('disconnect'); if (!worker.exitedAfterDisconnect) { // Unexpected disconnect, master exited, or some such nastiness, so // worker exits immediately. process.exit(0); } }); // IPC 内部通信事件监听 process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]); } }; // 添加获取server函数,会在net server监听端口时被执行 // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 设置 indexesKey 与 index的对应关系 indexes.set(indexesKey, index); // 传递地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主进程发送queryServer消息 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数 // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内 if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); }; // 创建 faux handle,并保存其对应关系 // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); let key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) ObjectAssign(out, message.sockname); return 0; } // 创建Faux handle // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); // 保存faux handle handles.set(key, handle); // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数 cb(0, handle); } // 处理请求 // Round-robin connection. function onconnection(message, handle) { // 获取faux handle的key const key = message.key; // 获取faux hadle const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求 if (accepted) server.onconnection(0, handle); }
至此,所有的內容都連結起來了。
在先前的程式碼分析中我們可以知道,Cluster 叢集會在Master 行程中建立Net Server,在Worker進程運行建立HTTP Server 的時候,會將監聽位址的資訊傳入cluster._getServer
函數建立一個faux handle
並設定到子程序的Net Server 上,在Worker 程序初始化的時候會註冊IPC 通訊回呼函數,在回呼函數內,呼叫在子程序中Net Server 模組初始化後的{faux handle}.onconnection
函數,並將傳過來的連接的handle 傳入完成請求響應。
我們可以在Master 行程中監聽Worker 行程的error
、disconntect
、exit
事件,在這些事件中去做對應的處理,例如清理退出的程序並重新fork
,或者使用已經封裝好的npm 包,例如cfork
在Egg.js 的多進程模型中,多了另外一個進程類型,即Agent 進程,該進程主要用來處理多重行程不好處理的一些事情還有減少長連結的數量,具體關係如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
在egg-cluster
套件內,使用了cfork
套件去保證Worker 進程掛掉後自動重啟
在我們的一個Egg 應用程式內,日誌系統並沒有使用Egg 原生的日誌,使用了一個內部基於
log4js
套件的日誌庫,在使用的時候,將需要用到的Logger 擴展至Application 物件上,這樣的話每個Worker 進程在初始化的時候都會創建新的Logger,也就是會存在多進程寫日誌的問題,但並沒有出現多進程寫入日誌的錯誤問題
在追蹤原始碼的過程中發現,log4js
雖然提供了Cluster 模式,但在上層封裝中並沒有開啟log4js
的Cluster 模式,所以每個Logger的appender 都使用flag a
打開一個寫入流,到這裡並沒有得到答案
後來在CNode 中找到了答案,在unix 下使用 flag a
開啟的可寫入流對應的libuv 檔案池實作是UV_FS_O_APPEND
,也就是O_APPEND
,而O_APPEND
本身在man 手冊中就定義為原子操作,核心保證了對這個可寫流的並發寫是安全的不需要在應用層額外加鎖(除了在NFS 類的文件系統上並發寫會造成文件資訊丟失或損壞),NFS 類的網絡掛載的檔案系統主要是靠模擬掉底層的api 來實現的類別本地操作,顯然無法在競爭條件下完美還原這類的原子操作api,所以如果你的日誌要寫到類似oss 雲盤掛載本地的這種就不能這麼乾,多進程寫的話必須在應用層自己手動加鎖
寶劍鋒從磨礪出,梅花香自苦寒來,衝鴨~
更多程式相關知識,請造訪:程式入門! !
以上是一文了解Node.js中的多進程模型的詳細內容。更多資訊請關注PHP中文網其他相關文章!