ホームページ >ウェブフロントエンド >jsチュートリアル >Nodeのプロセス間通信について語る記事

Nodeのプロセス間通信について語る記事

青灯夜游
青灯夜游転載
2022-09-05 18:55:042229ブラウズ

プロセス間で通信するにはどうすればよいですか?次の記事では、Node プロセス間通信の原理を紹介しますので、皆様のお役に立てれば幸いです。

Nodeのプロセス間通信について語る記事

#前提知識

ファイル記述子

Linux システムではすべてが扱われますファイルとして保存され、プロセスが既存のファイルを開くと、ファイル記述子が返されます。 ファイル記述子は、プロセスによって開かれたファイルを管理するためにオペレーティング システムによって作成されるインデックスであり、開かれたファイルを指すために使用されます。 プロセスが開始されると、オペレーティング システムは PCB 制御ブロックを各プロセスに割り当てます。PCB には、現在のプロセスのすべてのファイル記述子、つまり現在のプロセスによって開かれたすべてのファイルを保存するファイル記述子テーブルが存在します。

? プロセス内のファイル記述子はシステム ファイルにどのように対応しますか? カーネルでは、システムは他の 2 つのテーブルを維持します。

    オープン ファイル テーブル
  • i ノード テーブル
ファイル記述子は配列の添字です。 0 から始まり、上向きに増加します。デフォルトでは、0/1/2 が入力/出力/エラー ストリームのファイル記述子になります。 PCB に保持されているファイル記述テーブルでは、ファイル記述子と対応するオープン ファイル テーブルに基づいて、対応するファイル ポインタを見つけることができます。 オープン ファイル テーブルは、ファイル オフセット (ファイルの読み取りおよび書き込み時に更新される)、ファイルのステータス識別子、i ノード テーブルへのポインタを維持します。 ファイルを実際に操作したい場合は、i ノード テーブルを利用して実際のファイルに関する関連情報を取得する必要があります。

ファイル間の関係

Nodeのプロセス間通信について語る記事

#プロセス A では、ファイル記述子 1/20 はすべて同じオープン ファイル テーブル エントリ 23 を指します。これは、同じオープン関数への複数の呼び出しが原因である可能性があります。ファイルです。
  • プロセス A/B のファイル記述子 2 はすべて同じファイルを指しています。これは、子プロセスを作成するために fork が呼び出されたためである可能性があります。A/B は親子関係のプロセスです。
  • プロセス A のファイル記述子 0 プロセス B とプロセス B のファイル記述子は、異なるオープン ファイル テーブル エントリを指していますが、これらのテーブル エントリは同じファイルを指しています。これは、プロセス A/B がそれぞれオープン コールを開始したためである可能性があります。同じファイルへの
概要

同じプロセスの異なるファイル記述子は同じファイルを指すことができます
  • 異なるプロセス同じファイル記述子を持つことができます
  • 異なるプロセスの同じファイル記述子が異なるファイルを指すことができます
  • 異なるプロセスの異なるファイル記述子が同じファイルを指すことができます
ファイル記述子のリダイレクト

プロセスが読み書きされるたびに、ファイル記述子から始まり、対応するオープン ファイル テーブル エントリが検索され、次に対応する i ノードが検索されます。 table

? ファイル記述子の実装方法? リダイレクト? 対応するファイル ポインタはファイル記述子テーブルにあるため、ファイル ポインタを変更すると、後続の 2 つのテーブルの内容も変更されますか? 例: ファイル記述子 1 はモニターを指し、ファイル記述子 1 は log.txt ファイルを指し、ファイル記述子 1 は log.txt に対応します

シェル ペア ファイル記述子 リダイレクト

> は出力リダイレクト記号、 と

cat hello.txt

を使用すると、結果は On に出力されます。モニターする場合は、> を使用してリダイレクトします。 cat hello.txt 1 > log.txtファイル log.txt を出力モードで開き、ファイル記述子 1

Nodeのプロセス間通信について語る記事

にバインドします。 c 関数はファイル記述子をリダイレクトします

#dup

dup 関数は、oldfd を指す新しいファイル記述子を開くために使用されます。同じファイル、共有ファイル オフセットです。およびファイルのステータス
int main(int argc, char const *argv[])
{
    int fd = open("log.txt");
    int copyFd = dup(fd);
    //将fd阅读文件置于文件末尾,计算偏移量。
    cout 

dup(3) を呼び出すと、新しい最小記述子が開かれます。これは 4 で、この 4 は 3 が指すファイルを指します。 fd 操作は変更されたファイルです。Nodeのプロセス間通信について語る記事

dup2

dup2 関数は、指定された newfd が、oldfd が指すファイルを指します。 dup2 を実行すると、newfd と oldfd は同時に同じファイルを指し、ファイル オフセットとファイル ステータスを共有します
int main(int argc, char const *argv[])
{
    int fd = open("log.txt");
    int copyFd = dup(fd);
    //将fd阅读文件置于文件末尾,计算偏移量。
    cout 

Node中通信原理

Node 中的 IPC 通道具体实现是由 libuv 提供的。根据系统的不同实现方式不同,window 下采用命名管道实现,*nix 下采用 Domain Socket 实现。在应用层只体现为 message 事件和 send 方法。【相关教程推荐:nodejs视频教程

Nodeのプロセス間通信について語る記事

父进程在实际创建子进程之前,会创建 IPC 通道并监听它,等到创建出真实的子进程后,通过环境变量(NODE_CHANNEL_FD)告诉子进程该 IPC 通道的文件描述符。

子进程在启动的过程中,会根据该文件描述符去连接 IPC 通道,从而完成父子进程的连接。

建立连接之后可以自由的通信了,IPC 通道是使用命名管道或者 Domain Socket 创建的,属于双向通信。并且它是在系统内核中完成的进程通信

Nodeのプロセス間通信について語る記事

⚠️ 只有在启动的子进程是 Node 进程时,子进程才会根据环境变量去连接对应的 IPC 通道,对于其他类型的子进程则无法实现进程间通信,除非其他进程也按着该约定去连接这个 IPC 通道。

unix domain socket

是什么

我们知道经典的通信方式是有 Socket,我们平时熟知的 Socket 是基于网络协议的,用于两个不同主机上的两个进程通信,通信需要指定 IP/Host 等。 但如果我们同一台主机上的两个进程想要通信,如果使用 Socket 需要指定 IP/Host,经过网络协议等,会显得过于繁琐。所以 Unix Domain Socket 诞生了。

UDS 的优势:

  • 绑定 socket 文件而不是绑定 IP/Host;不需要经过网络协议,而是数据的拷贝
  • 也支持 SOCK_STREAM(流套接字)和 SOCK_DGRAM(数据包套接字),但由于是在本机通过内核通信,不会丢包也不会出现发送包的次序和接收包的次序不一致的问题

如何实现

流程图

Nodeのプロセス間通信について語る記事

Server 端
int main(int argc, char *argv[])
{
    int server_fd ,ret, client_fd;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    // 创建 socket
    server_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    // 初始化 server 信息
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");

    // 绑定
    ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv));

    //设置监听,设置能够同时和服务端连接的客户端数量
    ret = listen(server_fd, 36);

    //等待客户端连接
    client_fd = accept(server_fd, (struct sockaddr *)&client, &len);
    printf("=====client bind file:%s\n", client.sun_path);

    while (1) {
        recvlen = recv(client_fd, buf, sizeof(buf), 0);
        if (recvlen == -1) {
            perror("recv error");
            return -1;
        } else if (recvlen == 0) {
            printf("client disconnet...\n");
            close(client_fd);
            break;
        } else {
            printf("recv buf %s\n", buf);
            send(client_fd, buf, recvlen, 0);
        }
    }

    close(client_fd);
    close(server_fd);
    return 0;
}
Client 端
int main(int argc, char *argv[])
{
    int client_fd ,ret;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    //创建socket
    client_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    //给客户端绑定一个套接字文件
    client.sun_family = AF_LOCAL;
    strcpy(client.sun_path, "client.sock");
    ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client));

    //初始化server信息
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");
    //连接
    connect(client_fd, (struct sockaddr *)&serv, sizeof(serv));

    while (1) {
        fgets(buf, sizeof(buf), stdin);
        send(client_fd, buf, strlen(buf)+1, 0);

        recv(client_fd, buf, sizeof(buf), 0);
        printf("recv buf %s\n", buf);
    }

    close(client_fd);
    return 0;
}

命名管道(Named Pipe)

是什么

命名管道是可以在同一台计算机的不同进程之间,或者跨越一个网络的不同计算机的不同进程之间的可靠的单向或者双向的数据通信。 创建命名管道的进程被称为管道服务端(Pipe Server),连接到这个管道的进程称为管道客户端(Pipe Client)。

命名管道的命名规范:\server\pipe[\path]\name

  • 其中 server 指定一个服务器的名字,本机适用 \. 表示,\192.10.10.1 表示网络上的服务器
  • \pipe 是一个不可变化的字串,用于指定该文件属于 NPFS(Named Pipe File System)
  • [\path]\name 是唯一命名管道名称的标识

怎么实现

流程图

Nodeのプロセス間通信について語る記事

Pipe Server
void ServerTest()
{
    HANDLE  serverNamePipe;
    char    pipeName[MAX_PATH] = {0};
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 创建管道实例
    serverNamePipe = CreateNamedPipeA(pipeName,
        PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH,
        PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL);
    WriteLog("创建管道成功...");
    // 等待客户端连接
    BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL );
    WriteLog( "收到客户端的连接成功...");
    // 接收数据
    memset( szReadBuf, 0, MAX_BUFFER );
    bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL );
    // 业务逻辑处理 (只为测试用返回原来的数据)
    WriteLog( "收到客户数据:[%s]", szReadBuf);
    // 发送数据
    if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) )
    {
        WriteLog("向客户写入数据失败:[%#x]", GetLastError());
        return ;
    }
    WriteLog("写入数据成功...");
}
Pipe Client
void ClientTest()
{
    char    pipeName[MAX_PATH] = {0};
    HANDLE  clientNamePipe;
    DWORD   dwRet;
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 检测管道是否可用
    if(!WaitNamedPipeA(pipeName, 10000)){
        WriteLog("管道[%s]无法打开", pipeName);
        return ;
    }
    // 连接管道
    clientNamePipe = CreateFileA(pipeName,
        GENERIC_READ|GENERIC_WRITE,
        0,
        NULL,
        OPEN_EXISTING,
        FILE_ATTRIBUTE_NORMAL,
        NULL);
    WriteLog("管道连接成功...");
    scanf( "%s", szWritebuf );
    // 发送数据
    if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){
        WriteLog("发送数据失败,GetLastError=[%#x]", GetLastError());
        return ;
    }
    printf("发送数据成功:%s\n", szWritebuf );
    // 接收数据
    if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){
        WriteLog("接收数据失败,GetLastError=[%#x]", GetLastError() );
        return ;
    }
    WriteLog( "接收到服务器返回:%s", szReadBuf );
    // 关闭管道
    CloseHandle(clientNamePipe);
}

Node 创建子进程的流程

Unix

Nodeのプロセス間通信について語る記事

对于创建子进程、创建管道、重定向管道均是在 c++ 层实现的

创建子进程

int main(int argc,char *argv[]){
    pid_t pid = fork();
    if (pid <h4 data-id="heading-23"><strong>创建管道</strong></h4><p>使用 socketpair 创建管道,其创建出来的管道是全双工的,返回的文件描述符中的任何一个都可读和可写</p><pre class="brush:php;toolbar:false">int main ()
{
    int fd[2];
    int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);

    if (fork()){ /* 父进程 */
        int val = 0;
        close(fd[1]);
        while (1){
            sleep(1);
            ++val;
            printf("发送数据: %d\n", val);
            write(fd[0], &val, sizeof(val));
            read(fd[0], &val, sizeof(val));
            printf("接收数据: %d\n", val);
        }
    } else {  /*子进程*/
        int val;
        close(fd[0]);
        while(1){
            read(fd[1], &val, sizeof(val));
            ++val;
            write(fd[1], &val, sizeof(val));
        }
    }
}

当我们使用 socketpair 创建了管道之后,父进程关闭了 fd[1],子进程关闭了 fd[0]。子进程可以通过 fd[1] 读写数据;同理主进程通过 fd[0]读写数据完成通信。

对应代码:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344

child_process.fork 的详细调用

fork 函数开启一个子进程的流程

Nodeのプロセス間通信について語る記事

  • 初始化参数中的 options.stdio,并且调用 spawn 函数

    function spawn(file, args, options) {
      const child = new ChildProcess();
    
      child.spawn(options);
    }
  • 创建 ChildProcess 实例,创建子进程也是调用 C++ 层 this._handle.spawn 方法

    function ChildProcess() {
    	// C++层定义
    	this._handle = new Process();
    }
  • 通过 child.spawn 调用到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法会根据 options.stdio 创建和 C++ 交互的 Pipe 对象,并获得对应的文件描述符,将文件描述符写入到环境变量 NODE_CHANNEL_FD 中,调用 C++ 层创建子进程,在调用 setupChannel 方法

    ChildProcess.prototype.spawn = function(options) {
      // 预处理进程间通信的数据结构
    	stdio = getValidStdio(stdio, false);
    	const ipc = stdio.ipc;
        const ipcFd = stdio.ipcFd;
    	//将文件描述符写入环境变量中
    	if (ipc !== undefined) {
        ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);
      }
    	// 创建进程
    	const err = this._handle.spawn(options);
        // 添加send方法和监听IPC中数据
    	if (ipc !== undefined) setupChannel(this, ipc, serialization);
    }
  • 子进程启动时,会根据环境变量中是否存在 NODE_CHANNEL_FD 判断是否调用 _forkChild 方法,创建一个 Pipe 对象, 同时调用 open 方法打开对应的文件描述符,在调用setupChannel

    function _forkChild(fd, serializationMode) {
      const p = new Pipe(PipeConstants.IPC);
      p.open(fd);
      p.unref();
      const control = setupChannel(process, p, serializationMode);
    }

句柄传递

setupChannel 主要是完成了处理接收的消息、发送消息、处理文件描述符传递等

function setipChannel(){
	channel.onread = function(arrayBuffer){
		//...
	}
	target.on('internalMessage', function(message, handle){
		//...
	})
	target.send = function(message, handle, options, callback){
		//...
	}
	target._send = function(message, handle, options, callback){
		//...
	}
	function handleMessage(message, handle, internal){
		//...
	}
}
  • target.send: process.send 方法,这里 target 就是进程对象本身.
  • target._send: 执行具体 send 逻辑的函数, 当参数 handle 不存在时, 表示普通的消息传递;若存在,包装为内部对象,表明是一个 internalMessage 事件触发。调用使用JSON.stringify 序列化对象, 使用channel.writeUtf8String 写入文件描述符中
  • channel.onread: 获取到数据时触发, 跟 channel.writeUtf8String 相对应。通过 JSON.parse 反序列化 message 之后, 调用 handleMessage 进而触发对应事件
  • handleMessage: 用来判断是触发 message 事件还是 internalMessage 事件
  • target.on('internalMessage'): 针对内部对象做特殊处理,在调用 message 事件

1Nodeのプロセス間通信について語る記事

进程间消息传递

  • 父进程通过 child.send 发送消息 和 server/socket 句柄对象

  • 普通消息直接 JSON.stringify 序列化;对于句柄对象来说,需要先包装成为内部对象

    message = {
    	cmd: 'NODE_HANDLE',
    	type: null,
    	msg: message
    };

    通过 handleConversion.[message.type].send 的方法取出句柄对象对应的 C++ 层面的 TCP 对象,在采用JSON.stringify 序列化

    const handleConversion = {
    	'net.Server': {
        simultaneousAccepts: true,
    
        send(message, server, options) {
          return server._handle;
        },
    
        got(message, handle, emit) {
          const server = new net.Server();
          server.listen(handle, () => {
            emit(server);
          });
        }
      }
    //....
    }
  • 最后将序列化后的内部对象和 TCP 对象写入到 IPC 通道中

  • 子进程在接收到消息之后,使用 JSON.parse 反序列化消息,如果为内部对象触发 internalMessage 事件

  • 检查是否带有 TCP 对象,通过 handleConversion.[message.type].got 得到和父进程一样的句柄对象

  • 最后发触发 message 事件传递处理好的消息和句柄对象,子进程通过 process.on 接收

更多node相关知识,请访问:nodejs 教程

以上がNodeのプロセス間通信について語る記事の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。