>  기사  >  웹 프론트엔드  >  Nodejs 스트림 데이터 스트림 사용자 Manual_node.js

Nodejs 스트림 데이터 스트림 사용자 Manual_node.js

WBOY
WBOY원래의
2016-05-16 15:05:021668검색

1. 소개

이 글에서는 node.js 스트림을 활용하여 프로그램을 개발하는 기본적인 방법을 소개합니다.

<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964</code>

Stream에 대한 최초의 노출은 Unix 초기부터였습니다. 수십 년간의 연습을 통해 Stream 아이디어가 일부 거대한 시스템을 쉽게 개발할 수 있다는 것이 입증되었습니다. 유닉스에서 Stream은 내장 스트림 모듈로 |를 통해 구현되며, 많은 핵심 모듈과 타사 모듈이 사용됩니다. Unix와 마찬가지로 노드 Stream의 주요 작업도 .pipe()입니다. 사용자는 압력 방지 메커니즘을 사용하여 읽기와 쓰기의 균형을 제어할 수 있습니다.

Stream은 추상적인 Stream 인터페이스를 통해 스트림 간의 읽기 및 쓰기 균형을 재사용하고 제어할 수 있는 통합 인터페이스를 개발자에게 제공할 수 있습니다.

2. 스트림을 사용하는 이유

노드의 I/O는 비동기식이므로 디스크와 네트워크에 읽고 쓰려면 데이터를 읽기 위한 콜백 함수가 필요합니다. 다음은 파일 다운로드 서버에 대한 간단한 코드입니다.

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);</code>

이러한 코드는 필요한 기능을 달성할 수 있지만 서비스는 파일 데이터를 보내기 전에 전체 파일 데이터를 메모리에 캐시해야 합니다. "data.txt" 파일이 크고 동시성 양이 많은 경우 메모리가 낭비됩니다. 사용자는 파일 데이터를 수락하기 전에 전체 파일이 메모리에 캐시될 때까지 기다려야 하기 때문에 이로 인해 사용자 경험이 다소 저하됩니다. 하지만 다행스럽게도 두 매개변수(req, res) 모두 Stream이므로 fs.readFile() 대신 fs.createReadStream()을 사용할 수 있습니다.

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);</code>

.pipe() 메서드는 fs.createReadStream()의 'data' 및 'end' 이벤트를 수신하므로 "data.txt" 파일은 전체 파일을 캐시할 필요가 없으며 데이터 블록은 클라이언트 연결이 완료된 후 즉시 전송됩니다. .pipe() 사용의 또 다른 이점은 클라이언트 지연이 매우 클 때 발생하는 읽기-쓰기 불균형 문제를 해결할 수 있다는 것입니다. 파일을 보내기 전에 압축하려면 타사 모듈을 사용할 수 있습니다.

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);</code>

이렇게 하면 gzip 및 deflate를 지원하는 브라우저에서 파일이 압축됩니다. 억압자 모듈은 모든 콘텐츠 인코딩을 처리합니다.

Stream을 사용하면 프로그램 개발이 쉬워집니다.

3. 기본 개념

읽기 가능, 쓰기 가능, 변환, 이중 및 "클래식"의 다섯 가지 기본 스트림이 있습니다.

3-1, 파이프

모든 유형의 스트림 컬렉션은 다음과 같이 .pipe()를 사용하여 입력-출력 쌍을 만들고, 읽을 수 있는 스트림 src를 수신하고 해당 데이터를 쓰기 가능한 스트림 dst에 출력합니다.

<code class="hljs perl">src.pipe(dst)</code>

.pipe(dst) 메서드는 dst 스트림을 반환하므로 다음과 같이 여러 .pipe()를 연속해서 사용할 수 있습니다.

<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>

다음 코드와 동일한 기능:

<code class="hljs perl">a.pipe( b );
b.pipe( c );
c.pipe( d );</code>

3-2, 읽기 가능한 스트림

Readable 스트림의 .pipe() 메서드를 호출하면 Readable 스트림의 데이터를 Writable, Transform 또는 Duplex 스트림에 쓸 수 있습니다.

<code class="hljs perl">readableStream.pipe( dst )</code>

1>읽을 수 있는 스트림 만들기

여기서 읽을 수 있는 스트림을 만듭니다!

<code class="hljs perl">var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
</code>

rs.push( null )은 데이터가 전송되었음을 데이터 수신자에게 알립니다.

모든 데이터 콘텐츠를 읽기 가능한 스트림으로 푸시하기 전에 rs.pipe(process.stdout);를 호출하지 않았지만 우리가 푸시한 모든 데이터 콘텐츠가 여전히 완전히 출력되었습니다. 이는 읽기 가능한 스트림이 모두 푸시되었기 때문입니다. 데이터는 수신자가 데이터를 읽을 때까지 캐시됩니다. 그러나 많은 경우에는 전체 데이터를 캐싱하는 것보다 데이터를 수신할 때만 읽기 가능한 스트림에 데이터를 푸시하는 것이 더 좋습니다. ._read() 함수를 다시 작성해 보겠습니다.

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);</code>
<code class="hljs bash">$ node read1.js
abcdefghijklmnopqrstuvwxyz</code>

위 코드는 데이터 수신자가 데이터를 요청할 때만 읽을 수 있는 스트림에 데이터를 푸시하도록 _read() 메서드를 재정의함으로써 달성됩니다. _read() 메서드는 데이터 요청에서 요청한 데이터 크기를 나타내는 크기 매개변수를 수신할 수도 있지만 읽기 가능한 스트림은 필요에 따라 이 매개변수를 무시할 수 있습니다.

util.inherits()를 사용하여 읽기 가능한 스트림을 상속할 수도 있습니다. _read() 메서드는 데이터 수신자가 데이터를 요청할 때만 호출된다는 점을 설명하기 위해 다음과 같이 데이터를 읽기 가능한 스트림으로 푸시할 때 지연을 만듭니다.

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);</code>

다음 명령을 사용하여 프로그램을 실행하면 _read() 메서드가 5번만 호출되는 것을 확인할 수 있습니다.

<code class="hljs bash">$ node read2.js | head -c5
abcde
_read() called 5 times</code>

타이머를 사용하는 이유는 시스템이 프로그램에 파이프를 닫으라는 신호를 보내는 데 시간이 필요하기 때문입니다. Process.stdout.on('error', fn)은 헤더 명령이 파이프를 닫으므로 SIGPIPE 신호를 보내는 시스템을 처리하는 데 사용됩니다. 이로 인해 process.stdout이 EPIPE 이벤트를 트리거하기 때문입니다. 어떤 형식으로든 데이터를 푸시할 수 있는 읽기 가능한 스트림을 생성하려면 스트림 생성 시 objectMode 매개변수를 true로 설정하기만 하면 됩니다(예: Readable({ objectMode: true })).

2>읽기 가능한 스트림 데이터 읽기

대부분의 경우 읽기 가능한 스트림에서 다른 형태의 스트림으로 데이터를 리디렉션하기 위해 단순히 파이프 방법을 사용하지만, 어떤 경우에는 읽기 가능한 스트림에서 직접 데이터를 읽는 것이 더 유용할 수 있습니다. 다음과 같습니다:

<code class="hljs php">process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<buffer 0a="" 61="" 62="" 63="">
<buffer 0a="" 64="" 65="" 66="">
<buffer 0a="" 67="" 68="" 69="">
null</buffer></buffer></buffer></code>

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});</code>

如下运行程序发现,输出结果并不完全!

<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<buffer 61="" 62="" 63="">
<buffer 0a="" 64="" 65="">
<buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});</code>

这次运行结果如下:

<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<buffer 0a="" 64="" 65="">
<buffer 0a="" 68="" 69=""></buffer></buffer></code>

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

<code class="hljs javascript">var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'</code>

当然,有很多模块可以实现这个功能,如:split 。

3-3、writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

<code class="hljs perl">src.pipe( writableStream );</code>

1>创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

<code class="hljs php">var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

<code class="hljs css">Writable({ objectMode: true })</code>

2>写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

<code class="hljs vala">process.stdout.write('beep boop\n');</code>

调用.end()方法通知writable stream 数据已经写入完成。

<code class="hljs javascript">var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop</code>

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。

3-4、classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

1>classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

<code class="hljs javascript">var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ</code>

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

<code class="hljs php">process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:

<code class="hljs php">var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

或者也可以使用concat-stream来缓存整个流的内容:

<code class="hljs oxygene">var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }</code>

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。

2>classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

4、transform

transform是一个对读入数据过滤然输出的流。

5、duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>

以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.