ホームページ >ウェブフロントエンド >jsチュートリアル >Node.js のマルチプロセス モデルについて 1 つの記事で学習します

Node.js のマルチプロセス モデルについて 1 つの記事で学習します

青灯夜游
青灯夜游転載
2021-10-19 09:52:451774ブラウズ

この記事では、Node.js のマルチプロセスについて紹介し、Cluster モジュールと Egg.js のマルチプロセス モデルについて理解します。

Node.js のマルチプロセス モデルについて 1 つの記事で学習します

ご存知のとおり、JS は単一のスレッドで実行され、すべての非同期はイベント ループによって完了します。Web サービスにスレッドが 1 つしかない場合、マシンやコンテナのアイドル状態のリソースを活用する場合はどうすればよいでしょうか?同時に、コードがクラッシュし、例外がキャッチされなかった場合、スレッドは終了します。では、Node.js に基づく Web サービスは、アプリケーション全体の堅牢性をどのように確保しているのでしょうか?

Cluster モジュール

Node.js は、上記の問題を解決するための Cluster モジュールを提供しており、開発者はこのモジュールを通じて子プロセスを作成してクラスターを作成できます。 . 、マシンまたはコンテナのリソースを最大限に活用し、このモジュールにより、複数の子プロセスが同じポートをリッスンできるようになります。 [推奨学習: 「nodejs チュートリアル 」]

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(&#39;exit&#39;, function(worker, code, signal) {
    console.log(&#39;worker &#39; + worker.process.pid + &#39; died&#39;);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

コード分析による子プロセスの作成プロセス

まず、const cluster = require('cluster') から始めましょう。このコード行は Node の Cluster モジュールをインポートしますが、Node の内部には、マスター プロセスとワーカー プロセスは同じではありません。同じです。詳細については、次のコードを参照してください:

&#39;use strict&#39;;

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;master&#39;;
module.exports = require(`internal/cluster/${childOrPrimary}`);

ファイルが異なるということは、2 つのプロセスが実行中に異なる動作をすることを意味します。たとえば:

// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;

// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;

Thisこれが、クラスター モジュール内のあらゆる場所の変数が区別できる理由です さまざまな種類のプロセスが存在する理由 次に、メイン プロセスとサブプロセスそれぞれの具体的なプロセスを理解しましょう

メイン プロセス

上記のコードでは、マスター プロセスは多くのことを行いません。CPU の数に基づいて子プロセスをフォークするだけです。ソース コードを詳しく調べて、大まかに見てみましょう。関連する説明は次のとおりです。コードのコメント

#
// lib/internal/cluster/master.js

// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.

// 自增的子进程id
let ids = 0;

// 向cluster添加fork方法
cluster.fork = function(env) {
  // 初始化cluster.settings
  cluster.setupMaster();
  // 为当前fork的子进程生成当前cluster内的唯一id
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  // 创建对应的worker实例
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 省略一些worker的事件监听....

  // 监听内部消息事件,并交由onmessage处理
  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  // cluster发出fork事件
  process.nextTick(emitForkNT, worker);
  // 将worker实例放在cluster.workers中维护
  cluster.workers[worker.id] = worker;
  // 返回worker
  return worker;
};

// 创建子进程函数
function createWorkerProcess(id, env) {
  // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // 执行参数
  const execArgv = [...cluster.settings.execArgv];

  // 省略debug模式相关逻辑...

  // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

// 内部消息事件处理函数
function onmessage(message, handle) {
  const worker = this;

  if (message.act === &#39;online&#39;)
    online(worker);
  // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
  else if (message.act === &#39;queryServer&#39;)
    queryServer(worker, message);
  else if (message.act === &#39;listening&#39;)
    listening(worker, message);
  else if (message.act === &#39;exitedAfterDisconnect&#39;)
    exitedAfterDisconnect(worker, message);
  else if (message.act === &#39;close&#39;)
    close(worker, message);
}

// 获取server
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  // 创建当前子进程监听地址信息的key
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  // 在handles map中查询是否有已经创建好的该监听地址的server
  let handle = handles.get(key);

  // 没有对应的server则进行创建
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === &#39;string&#39; &&
        process.platform !== &#39;win32&#39;) {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // 主、子进程处理连接的方式,默认为轮询
    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === &#39;udp4&#39; ||
        message.addressType === &#39;udp6&#39;) {
      constructor = SharedHandle;
    }

    // 将监听地址信息传入构造函数创建监听实例
    handle = new constructor(key, address, message);
    // 缓存监听实例
    handles.set(key, handle);
  }

  // 向server添加自定义信息,用于server发出listening事件后透传到worker
  if (!handle.data)
    handle.data = message.data;

  // 添加server发出listening事件后的回调函数通知子进程
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
// lib/internal/cluster/round_robin_handle.js

// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // 初始化handle
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // 监听文件描述符,不讨论
  if (fd >= 0)
    this.server.listen({ fd });
  // 监听ip:port
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  // 监听UNIX socket,不讨论
  } else
    this.server.listen(address);  // UNIX socket path.

  // 注册server发出listening事件的回调函数
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once(&#39;listening&#39;, done);
  this.server.once(&#39;error&#39;, (err) => {
    send(err.errno, null);
  });
};

// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);

  if (!existed)
    return false;

  this.free.delete(worker.id);

  if (this.all.size !== 0)
    return false;

  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];

  this.handle.close();
  this.handle = null;
  return true;
};

// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};

// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = ArrayPrototypeShift(this.handles);

  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }

  // 向该worker发出newconn事件
  const message = { act: &#39;newconn&#39;, key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};

子プロセス

各子プロセスで、HTTP サーバーを作成し、listen 関数を実行します。ポート 8000 をリッスンします。HTTP サーバー インスタンスは、ネット サーバー プロトタイプ チェーンから継承されます。listen 関数は、次のように、ネット サーバー プロトタイプの listen 関数です。

// lib/_http_server.js

function Server(options, requestListener) {
  ....
}

ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js

Server.prototype.listen = function(...args) {

  // 由于篇幅原因,省略一些参数nomolize和其他监听的处理
  
  // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
  if (typeof options.port === &#39;number&#39; || typeof options.port === &#39;string&#39;) {
    validatePort(options.port, &#39;options.port&#39;);
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  
  // 省略...
};

// 集群监听函数
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require(&#39;cluster&#39;);

  // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port);
      return server.emit(&#39;error&#39;, ex);
    }

    // Reuse master&#39;s server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd);

  // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;);
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;);

    let rval = null;
    
    // 篇幅原因,创建监听句柄的代码...
    
    this._handle = rval;
  }
  
  // 在this上设置的faux handle上设置onconnection函数用于监听连接进入
  this._handle.onconnection = onconnection;
}

同時に、解析を開始するときに、Clusterモジュールを導入する際に、現在のプロセスのenvにNODE_UNIQUE_IDが含まれているかどうかで子プロセスであるかどうかを判断すると述べました。は子プロセスです。

child.js ファイル

ヒント: IPC 通信が進行中です。送信された message.cmd の値に NODE というプレフィックスが付いている場合、内部メッセージに応答します。 eventinternalMessage

// lib/internal/cluster/child.js

// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;

// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
  // 初始化worker实例
  const worker = new Worker({
    id: +process.env.NODE_UNIQUE_ID | 0,
    process: process,
    state: &#39;online&#39;
  });

  cluster.worker = worker;

  // 处理断开连接事件
  process.once(&#39;disconnect&#39;, () => {
    worker.emit(&#39;disconnect&#39;);

    if (!worker.exitedAfterDisconnect) {
      // Unexpected disconnect, master exited, or some such nastiness, so
      // worker exits immediately.
      process.exit(0);
    }
  });

  // IPC 内部通信事件监听
  process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  send({ act: &#39;online&#39; });

  function onmessage(message, handle) {
    // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
    if (message.act === &#39;newconn&#39;)
      onconnection(message, handle);
    else if (message.act === &#39;disconnect&#39;)
      ReflectApply(_disconnect, worker, [true]);
  }
};

// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;)
    address = path.resolve(address);

  // 生成地址信息的的key
  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], &#39;:&#39;);

  // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  // 设置 indexesKey 与 index的对应关系
  indexes.set(indexesKey, index);

  // 传递地址信息及index
  const message = {
    act: &#39;queryServer&#39;,
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  // 向主进程发送queryServer消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;)
      obj._setServerData(reply.data);

    // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
    // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once(&#39;listening&#39;, () => {
    cluster.worker.state = &#39;listening&#39;;
    const address = obj.address();
    message.act = &#39;listening&#39;;
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: &#39;close&#39;, key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // 创建Faux handle
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  // 保存faux handle
  handles.set(key, handle);
  // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
  cb(0, handle);
}

// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
  // 获取faux handle的key
  const key = message.key;
  // 获取faux hadle
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
  if (accepted)
    server.onconnection(0, handle);
}
この時点で、すべてのコンテンツが接続されています。

複数の子プロセスが同じポートをリッスンできる理由

前のコード分析では、クラスター クラスターがマスター プロセスにネット サーバーを作成することがわかりました。 、およびワーカー プロセス内のネット サーバー HTTP サーバーを作成するプロセスが実行されると、リスニング アドレス情報が

cluster._getServer 関数に渡されて、疑似ハンドル が作成され、子プロセスのネットサーバーに設定し、ワーカープロセス内で初期化 IPC通信のコールバック関数が登録されます コールバック関数には初期化後の{疑似ハンドル}.onconnection関数が登録されます子プロセスの Net Server モジュールが呼び出され、渡された接続ハンドルが渡されて完了し、要求応答が完了します。

クラスター作業の堅牢性を確保する方法

ワーカー プロセスの

errordisconntect を監視できます。マスター プロセス exit イベントは、終了したプロセスと fork のクリーンアップや、cfork# などのカプセル化された npm パッケージの使用など、これらのイベントで対応する処理を実行します。

##Egg.js マルチプロセス モデル

Egg.js のマルチプロセス モデルには、別のプロセス タイプであるエージェント プロセスがあります。主に、複数のプロセスで処理するのが難しい処理や、長いリンクの数を減らすために使用されます。具体的な関係は次のとおりです:

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|

In the

egg-cluster

パッケージ、cfork# が使用されます ## ワーカー プロセスがハングした後に自動的に再起動するようにするためのパッケージ 問題記録

弊社のいずれかでEgg アプリケーション、ロギング システムは Egg ネイティブを使用しません ログについては、

log4js
パッケージに基づく内部ログ ライブラリが使用されます 使用される場合、必要な Logger は Application オブジェクトに拡張されるため、各 Worker プロセスはロガー、つまり、複数のプロセスがログを書き込む場合には問題が発生しますが、複数のプロセスがログを書き込む場合にはエラーの問題はありません

ソース コードをトレースする過程で、log4js はクラスター モードを提供しているにもかかわらず、log4js のクラスター モードが上位層のパッケージで有効になっていないことがわかりました。 , したがって、各 Logger The appenders all use flag a to open a write stream. ここまで答えが得られませんでした.

後で CNode で答えを見つけましたフラグ a によって開かれた書き込み可能ストリームに対応する libuv ファイル プール実装は、UV_FS_O_APPEND、つまり O_APPEND および O_APPEND# です。 ## 自体は man マニュアルの「操作」でアトムとして定義されており、カーネルはこの書き込み可能なストリームへの同時書き込みが安全であることを保証し、アプリケーション層で追加のロックを必要としません (ただし、NFS のようなファイル システムでの同時書き込みによりファイルが破損する可能性があります) NFS のようなネットワーク マウント ファイル システムは主に、準ローカル操作を実装するための基礎となる API のシミュレーションに依存しています。明らかに、このタイプのアトミック操作 API は競合条件下では完全に復元できません。ローカルにマウントされた oss クラウド ディスクのようなものに書き込まれます。これはできません。複数のプロセスで書き込む場合は、アプリケーション層で手動でロックする必要があります。

結論

剣の刃は研ぐことから生まれ、梅の香りは厳しい寒さから生まれるアヒル~

プログラミング関連の知識については、こちらをご覧ください:

プログラミング入門

! !

以上がNode.js のマルチプロセス モデルについて 1 つの記事で学習しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。