Heim >Web-Frontend >js-Tutorial >Nodejs Stream Data Stream Benutzerhandbuch_node.js
1. Einleitung
In diesem Artikel wird die grundlegende Methode zum Entwickeln von Programmen mithilfe von Node.js-Streams vorgestellt.
<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also." Doug McIlroy. October 11, 1964</code>
Der erste Kontakt mit Stream stammt aus den Anfängen von Unix. Jahrzehntelange Praxis hat gezeigt, dass Stream-Ideen problemlos einige riesige Systeme entwickeln können. Unter Unix wird Stream über |; im Knoten implementiert. Als integriertes Stream-Modul werden viele Kernmodule und Module von Drittanbietern verwendet. Wie bei Unix ist auch die Hauptoperation von Node Stream .pipe(). Benutzer können den Anti-Druck-Mechanismus verwenden, um das Gleichgewicht zwischen Lesen und Schreiben zu steuern.
Stream kann Entwicklern eine einheitliche Schnittstelle bieten, die wiederverwendet werden kann und das Lese- und Schreibgleichgewicht zwischen Streams über die abstrakte Stream-Schnittstelle steuern kann.
2. Warum Stream verwenden?
E/A im Knoten ist asynchron, daher sind zum Lesen und Schreiben auf Festplatte und Netzwerk Rückruffunktionen erforderlich. Das Folgende ist ein einfacher Code für einen Datei-Download-Server:
<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(8000);</code>
Diese Codes können die erforderlichen Funktionen erfüllen, aber der Dienst muss die gesamten Dateidaten im Speicher zwischenspeichern, bevor sie die Dateidaten senden. Wenn die Datei „data.txt“ groß ist und die Menge an Parallelität groß ist Speicher wird verschwendet. Da der Benutzer warten muss, bis die gesamte Datei im Speicher zwischengespeichert ist, bevor er die Dateidaten akzeptiert, führt dies zu einer sehr schlechten Benutzererfahrung. Aber glücklicherweise sind beide Parameter (req, res) Stream, sodass wir fs.createReadStream() anstelle von fs.readFile():
verwenden können<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);</code>
.pipe() hört auf die Ereignisse „data“ und „end“ von fs.createReadStream(), sodass die Datei „data.txt“ nicht die gesamte Datei zwischenspeichern muss, sondern ein Datenblock dies kann wird sofort nach Abschluss der Client-Verbindung an den Client gesendet. Ein weiterer Vorteil der Verwendung von .pipe() besteht darin, dass das Problem des Lese-Schreib-Ungleichgewichts gelöst werden kann, das bei sehr großen Client-Verzögerungen auftritt. Wenn Sie die Datei vor dem Senden komprimieren möchten, können Sie ein Modul eines Drittanbieters verwenden:
<code class="hljs javascript">var http = require('http'); var fs = require('fs'); var oppressor = require('oppressor'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(oppressor(req)).pipe(res); }); server.listen(8000);</code>
Auf diese Weise wird die Datei für Browser komprimiert, die gzip und deflate unterstützen. Das Oppressor-Modul übernimmt die gesamte Inhaltskodierung.
Stream macht die Entwicklung von Programmen einfach.
3. Grundkonzepte
Es gibt fünf grundlegende Streams: lesbar, beschreibbar, transformieren, duplex und „klassisch“.
3-1, Pfeife
Alle Arten von Stream-Sammlungen verwenden .pipe(), um ein Eingabe-Ausgabe-Paar zu erstellen, einen lesbaren Stream-Src zu empfangen und seine Daten wie folgt an einen beschreibbaren Stream-DST auszugeben:
<code class="hljs perl">src.pipe(dst)</code>
.pipe(dst)-Methode gibt den dst-Stream zurück, sodass mehrere .pipe() wie folgt nacheinander verwendet werden können:
<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>
Funktioniert genauso wie der folgende Code:
<code class="hljs perl">a.pipe( b ); b.pipe( c ); c.pipe( d );</code>
3-2, lesbare Streams
Durch Aufrufen der .pipe()-Methode von Readable-Streams können die Daten von Readable-Streams in einen Writable-, Transform- oder Duplex-Stream geschrieben werden.
<code class="hljs perl">readableStream.pipe( dst )</code>
1>Lesbaren Stream erstellen
Hier erstellen wir einen lesbaren Stream!
<code class="hljs perl">var Readable = require('stream').Readable; var rs = new Readable; rs.push('beep '); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout); $ node read0.js beep boop </code>
rs.push( null ) benachrichtigt den Datenempfänger, dass die Daten gesendet wurden.
Beachten Sie, dass wir rs.pipe(process.stdout); nicht aufgerufen haben, bevor wir den gesamten Dateninhalt in den lesbaren Stream geschoben haben, aber der gesamte Dateninhalt, den wir eingeschoben haben, wurde trotzdem vollständig ausgegeben. Dies liegt daran, dass der lesbare Stream alle gepusht hat Die Daten werden zwischengespeichert, bis der Empfänger die Daten liest. In vielen Fällen ist es jedoch besser, die Daten erst dann in den lesbaren Stream zu übertragen, wenn sie empfangen werden, als die gesamten Daten zwischenzuspeichern. Schreiben wir die Funktion ._read() neu:
<code class="hljs javascript">var Readable = require('stream').Readable; var rs = Readable(); var c = 97; rs._read = function () { rs.push(String.fromCharCode(c++)); if (c > 'z'.charCodeAt(0)) rs.push(null); }; rs.pipe(process.stdout);</code> <code class="hljs bash">$ node read1.js abcdefghijklmnopqrstuvwxyz</code>
Der obige Code erreicht durch Überschreiben der _read()-Methode, dass Daten nur dann in den lesbaren Stream verschoben werden, wenn der Datenempfänger Daten anfordert. Die Methode _read() kann auch einen Größenparameter empfangen, der die von der Datenanforderung angeforderte Datengröße angibt, der lesbare Stream kann diesen Parameter jedoch bei Bedarf ignorieren.
Beachten Sie, dass wir mit util.inherits() auch lesbare Streams erben können. Um zu veranschaulichen, dass die Methode _read() nur aufgerufen wird, wenn der Datenempfänger Daten anfordert, nehmen wir eine Verzögerung vor, wenn Daten in den lesbaren Stream verschoben werden, wie folgt:
<code class="hljs javascript">var Readable = require('stream').Readable; var rs = Readable(); var c = 97 - 1; rs._read = function () { if (c >= 'z'.charCodeAt(0)) return rs.push(null); setTimeout(function () { rs.push(String.fromCharCode(++c)); }, 100); }; rs.pipe(process.stdout); process.on('exit', function () { console.error('\n_read() called ' + (c - 97) + ' times'); }); process.stdout.on('error', process.exit);</code>
Führen Sie das Programm mit dem folgenden Befehl aus und wir stellen fest, dass die Methode _read() nur fünfmal aufgerufen wird:
<code class="hljs bash">$ node read2.js | head -c5 abcde _read() called 5 times</code>
Der Grund für die Verwendung eines Timers besteht darin, dass das System Zeit benötigt, um ein Signal zu senden, um das Programm zum Schließen der Pipe zu benachrichtigen. Process.stdout.on('error', fn) wird verwendet, um das System zu verarbeiten, das ein SIGPIPE-Signal sendet, da der Header-Befehl die Pipe schließt, da dies dazu führt, dass Process.stdout das EPIPE-Ereignis auslöst. Wenn Sie einen lesbaren Stream erstellen möchten, der Daten in beliebiger Form übertragen kann, setzen Sie beim Erstellen des Streams einfach den Parameter objectMode auf true, zum Beispiel: Readable({ objectMode: true }).
2>Lesbare Stream-Daten lesen
In den meisten Fällen verwenden wir einfach die Pipe-Methode, um die Daten vom lesbaren Stream in eine andere Stream-Form umzuleiten. In manchen Fällen kann es jedoch sinnvoller sein, Daten direkt aus dem lesbaren Stream zu lesen. Wie folgt:
<code class="hljs php">process.stdin.on('readable', function () { var buf = process.stdin.read(); console.dir(buf); }); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js <buffer 0a="" 61="" 62="" 63=""> <buffer 0a="" 64="" 65="" 66=""> <buffer 0a="" 67="" 68="" 69=""> null</buffer></buffer></buffer></code>
当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:
<code class="hljs javascript">process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); });</code>
如下运行程序发现,输出结果并不完全!
<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js <buffer 61="" 62="" 63=""> <buffer 0a="" 64="" 65=""> <buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>
这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。
<code class="hljs javascript">process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); process.stdin.read(0); });</code>
这次运行结果如下:
<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js <buffer 0a="" 64="" 65=""> <buffer 0a="" 68="" 69=""></buffer></buffer></code>
我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:
<code class="hljs javascript">var offset = 0; process.stdin.on('readable', function () { var buf = process.stdin.read(); if (!buf) return; for (; offset < buf.length; offset++) { if (buf[offset] === 0x0a) { console.dir(buf.slice(0, offset).toString()); buf = buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 'hearties' 'heartiest' 'heartily' 'heartiness' 'heartiness\'s' 'heartland' 'heartland\'s' 'heartlands' 'heartless' 'heartlessly'</code>
当然,有很多模块可以实现这个功能,如:split 。
3-3、writable streams
writable streams只可以作为.pipe()函数的目的参数。如下代码:
<code class="hljs perl">src.pipe( writableStream );</code>
1>创建 writable stream
重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。
<code class="hljs php">var Writable = require('stream').Writable; var ws = Writable(); ws._write = function (chunk, enc, next) { console.dir(chunk); next(); }; process.stdin.pipe(ws); $ (echo beep; sleep 1; echo boop) | node write0.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>
第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream
<code class="hljs css">Writable({ objectMode: true })</code>
2>写数据到 writable stream
调用writable stream的.write(data)方法即可完成数据写入。
<code class="hljs vala">process.stdout.write('beep boop\n');</code>
调用.end()方法通知writable stream 数据已经写入完成。
<code class="hljs javascript">var fs = require('fs'); var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () { ws.end('boop\n'); }, 1000); $ node writing1.js $ cat message.txt beep boop</code>
如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。
3-4、classic streams
Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。
1>classic readable streams
Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:
<code class="hljs javascript">var Stream = require('stream'); var stream = new Stream; stream.readable = true; var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout); $ node classic0.js ABCDEFGHIJ</code>
如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:
<code class="hljs php">process.stdin.on('data', function (buf) { console.log(buf); }); process.stdin.on('end', function () { console.log('__END__'); }); $ (echo beep; sleep 1; echo boop) | node classic1.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""> __END__</buffer></buffer></code>
需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:
<code class="hljs php">var through = require('through'); process.stdin.pipe(through(write, end)); function write (buf) { console.log(buf); } function end () { console.log('__END__'); } $ (echo beep; sleep 1; echo boop) | node through.js <buffer 0a="" 62="" 65="" 70=""> <buffer 0a="" 62="" 6f="" 70=""> __END__</buffer></buffer></code>
或者也可以使用concat-stream来缓存整个流的内容:
<code class="hljs oxygene">var concat = require('concat-stream'); process.stdin.pipe(concat(function (body) { console.log(JSON.parse(body)); })); $ echo '{"beep":"boop"}' | node concat.js { beep: 'boop' }</code>
当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。
2>classic writable streams
Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。
4、transform
transform是一个对读入数据过滤然输出的流。
5、duplex
duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:
<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>
以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!