Rumah >hujung hadapan web >tutorial js >Ketahui tentang model berbilang proses dalam Node.js dalam satu artikel
Artikel ini akan memperkenalkan anda kepada pelbagai proses dalam Node.js, dan mempelajari tentang modul Kluster dan model berbilang proses Egg.js saya harap ia akan membantu anda.
Seperti yang kita semua tahu, JS dilaksanakan dalam satu utas dan semua tak segerak diselesaikan oleh gelung acara Jika perkhidmatan Web hanya mempunyai satu utas, bagaimana ia boleh sepenuhnya Bagaimana pula dengan menggunakan sumber terbiar mesin atau bekas? Pada masa yang sama, apabila kod ranap dan tiada pengecualian ditangkap, urutan akan keluar Jadi bagaimanakah perkhidmatan web berdasarkan Node.js memastikan keteguhan keseluruhan aplikasi?
Node.js menyediakan modul Kluster untuk menyelesaikan masalah di atas Melalui modul ini, pembangun boleh mencipta kluster dengan mencipta proses anak . , menggunakan sepenuhnya sumber mesin atau bekas, dan modul ini membenarkan berbilang proses kecil untuk mendengar port yang sama. [Pembelajaran yang disyorkan: "tutorial nodejs"]
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
Mari mulakan dengan const cluster = require('cluster')
Baris kod ini mengimport modul Kluster Node Walau bagaimanapun, dalam Node, fail yang diperkenalkan oleh proses Master dan proses Worker adalah berbeza kod berikut:
'use strict'; const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrPrimary}`);
Fail yang berbeza bermakna kedua-dua proses berkelakuan berbeza semasa pelaksanaan, contohnya:
// internal/cluster/master.js cluster.isWorker = false; cluster.isMaster = true; // internal/cluster/child.js cluster.isWorker = true; cluster.isMaster = false;
Inilah sebabnya pembolehubah di mana-mana dalam modul Kluster boleh membezakan yang berbeza jenis proses Seterusnya, marilah kita memahami proses khusus dari proses utama dan sub-proses masing-masing
Dalam kod di atas, proses Master tidak melakukan banyak Perkara itu hanya untuk memotong proses kanak-kanak mengikut bilangan CPU, kemudian mari kita pergi jauh ke dalam kod sumber dan lihat secara kasar Penerangan yang berkaitan ada dalam ulasan kod
// lib/internal/cluster/master.js // 初始化cluster const cluster = new EventEmitter(); // 创建监听地址与server对应的map const handles = new SafeMap(); // 初始化 cluster.isWorker = false; cluster.isMaster = true; cluster.workers = {}; cluster.settings = {}; cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR; // Master distributes connections. // 自增的子进程id let ids = 0; // 向cluster添加fork方法 cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 为当前fork的子进程生成当前cluster内的唯一id const id = ++ids; // 创建子进程 const workerProcess = createWorkerProcess(id, env); // 创建对应的worker实例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件监听.... // 监听内部消息事件,并交由onmessage处理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster发出fork事件 process.nextTick(emitForkNT, worker); // 将worker实例放在cluster.workers中维护 cluster.workers[worker.id] = worker; // 返回worker return worker; }; // 创建子进程函数 function createWorkerProcess(id, env) { // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 执行参数 const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相关逻辑... // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); } // 内部消息事件处理函数 function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message); } // 获取server function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 创建当前子进程监听地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查询是否有已经创建好的该监听地址的server let handle = handles.get(key); // 没有对应的server则进行创建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子进程处理连接的方式,默认为轮询 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 将监听地址信息传入构造函数创建监听实例 handle = new constructor(key, address, message); // 缓存监听实例 handles.set(key, handle); } // 向server添加自定义信息,用于server发出listening事件后透传到worker if (!handle.data) handle.data = message.data; // 添加server发出listening事件后的回调函数通知子进程 handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); if (errno) handles.delete(key); // Gives other workers a chance to retry. send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
// lib/internal/cluster/round_robin_handle.js // 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息 function RoundRobinHandle(key, address, { port, fd, flags }) { // 初始化handle this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); // 监听文件描述符,不讨论 if (fd >= 0) this.server.listen({ fd }); // 监听ip:port else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); // 监听UNIX socket,不讨论 } else this.server.listen(address); // UNIX socket path. // 注册server发出listening事件的回调函数 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } // 添加worker,server发出listening事件后调用master.js中传入的回调函数 RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; // 删除worker,轮询时不再分配给该worker RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; for (const handle of this.handles) { handle.close(); } this.handles = []; this.handle.close(); this.handle = null; return true; }; // 轮询调度函数 RoundRobinHandle.prototype.distribute = function(err, handle) { ArrayPrototypePush(this.handles, handle); const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; // 将handle交给worker RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift(this.handles); if (handle === undefined) { this.free.set(worker.id, worker); // Add to ready queue again. return; } // 向该worker发出newconn事件 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
Dalam setiap proses anak, kami mencipta Pelayan HTTP, dan kemudian melaksanakan fungsi listen
untuk mendengar port 8000. Contoh Pelayan HTTP diwarisi daripada rangkaian prototaip Pelayan Bersih. Fungsi listen
ialah fungsi listen
pada prototaip Pelayan Bersih, seperti berikut:
// lib/_http_server.js function Server(options, requestListener) { .... } ObjectSetPrototypeOf(Server.prototype, net.Server.prototype); ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js Server.prototype.listen = function(...args) { // 由于篇幅原因,省略一些参数nomolize和其他监听的处理 // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口 if (typeof options.port === 'number' || typeof options.port === 'string') { validatePort(options.port, 'options.port'); backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // 省略... }; // 集群监听函数 function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数 function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } } // 启用监听handle function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过 // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); let rval = null; // 篇幅原因,创建监听句柄的代码... this._handle = rval; } // 在this上设置的faux handle上设置onconnection函数用于监听连接进入 this._handle.onconnection = onconnection; }
Pada masa yang sama, apabila kami mula menghuraikan, kami berkata bahawa apabila modul Kluster diperkenalkan, ia akan berdasarkan env proses semasa sama ada ia mengandungi NODE_UNIQUE_ID untuk menentukan sama ada ia proses anak Jika ia adalah proses anak, laksanakan fail child.js
>
Petua: Jika nilai message.cmd yang dihantar dalam komunikasi IPC diawali dengan NODE, ia Akan bertindak balas kepada internalMesej acara dalaman
// lib/internal/cluster/child.js // 初始化 const cluster = new EventEmitter(); // 存储生成的 faux handle const handles = new SafeMap(); // 存储监听地址与监听地址index的对应关系 const indexes = new SafeMap(); cluster.isWorker = true; cluster.isMaster = false; cluster.worker = null; cluster.Worker = Worker; // 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量 // 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数 cluster._setupWorker = function() { // 初始化worker实例 const worker = new Worker({ id: +process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online' }); cluster.worker = worker; // 处理断开连接事件 process.once('disconnect', () => { worker.emit('disconnect'); if (!worker.exitedAfterDisconnect) { // Unexpected disconnect, master exited, or some such nastiness, so // worker exits immediately. process.exit(0); } }); // IPC 内部通信事件监听 process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]); } }; // 添加获取server函数,会在net server监听端口时被执行 // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 设置 indexesKey 与 index的对应关系 indexes.set(indexesKey, index); // 传递地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主进程发送queryServer消息 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数 // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内 if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); }; // 创建 faux handle,并保存其对应关系 // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); let key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) ObjectAssign(out, message.sockname); return 0; } // 创建Faux handle // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); // 保存faux handle handles.set(key, handle); // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数 cb(0, handle); } // 处理请求 // Round-robin connection. function onconnection(message, handle) { // 获取faux handle的key const key = message.key; // 获取faux hadle const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求 if (accepted) server.onconnection(0, handle); }Pada ketika ini, semua kandungan bersambung.
untuk mencipta cluster._getServer
dan menetapkannya kepada Pelayan Bersih proses anak. Apabila proses Worker dimulakan, fungsi panggil balik komunikasi IPC akan didaftarkan Dalam panggilan balik Dalam fungsi, panggil fungsi faux handle
selepas modul Pelayan Bersih dimulakan dalam proses anak, dan lulus pemegang sambungan yang diluluskan untuk melengkapkan. meminta respons. {faux handle}.onconnection
, error
, disconntect
proses Pekerja dalam proses Induk . Dalam ini Lakukan pemprosesan yang sepadan dalam acara tersebut, seperti membersihkan proses yang telah keluar dan memulakan semula exit
, atau menggunakan pakej npm terkapsul, seperti fork
cfork
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|Dalam pakej
, pakej egg-cluster
digunakan untuk memastikan proses Pekerja dimulakan semula secara automatik selepas ia ditutupcfork
Dalam salah satu aplikasi Egg kami, sistem pembalakan tidak menggunakan log asli Egg, tetapi menggunakan pustaka log dalaman berdasarkanpakej. Apabila menggunakannya, sambungan Logger yang perlu digunakan pada objek Aplikasi Dalam kes ini, setiap proses Worker akan mencipta Logger baharu semasa pemulaan, yang bermaksud bahawa akan terdapat masalah berbilang proses menulis log. tidak akan ada masalah ralat berbilang proses menulis log
log4js
Dalam proses menjejaki kod sumber, kami mendapati bahawa walaupun log4js
menyediakan mod Kluster, mod Kluster log4js
tidak didayakan dalam pakej atas, jadi setiap pelengkap Logger dibuka menggunakan flag a
Strim tulis, saya tidak mendapat jawapan di sini
Kemudian saya menemui jawapannya dalam CNode Pelaksanaan kumpulan fail libuv yang sepadan dengan strim boleh tulis dibuka menggunakan flag a
di bawah unix ialah UV_FS_O_APPEND
, iaitu O_APPEND
, dan O_APPEND
itu sendiri ditakrifkan sebagai operasi atom dalam manual manusia Kernel memastikan penulisan serentak aliran boleh tulis ini selamat tanpa memerlukan kunci tambahan pada lapisan aplikasi (kecuali. untuk NFS Penulisan serentak pada sistem fail seperti NFS akan menyebabkan maklumat fail hilang atau rosak). dipulihkan dengan sempurna dalam keadaan kompetitif. API operasi atom, jadi jika log anda ditulis pada sesuatu seperti cakera awan oss yang dipasang secara setempat, anda tidak boleh melakukan ini Jika anda menulis dalam berbilang proses, anda mesti menguncinya secara manual pada lapisan aplikasi
Pedang pedang muncul dari mengasah, dan aroma bunga plum muncul dari sejuk pahit. lawati:
Pengenalan kepada PengaturcaraanAtas ialah kandungan terperinci Ketahui tentang model berbilang proses dalam Node.js dalam satu artikel. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!