Stream在node.js中是一個抽象的接口,基於EventEmitter,也是一種Buffer的高階封裝,用來處理流資料。流模組便是提供各種API讓我們可以很簡單的使用Stream。
流分為四種類型,如下所示:
Readable,可讀流
Writable,可寫流
Duplex,讀寫流
Transform,擴展的Duplex,可修改寫入的數據,可修改寫入的數據
1、Readable可讀流透過stream.Readable可建立一個可讀流,它有兩種模式:暫停和流動。const fs = require('fs'); const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('data',(chunk)=>{ chunkArr.push(chunk); chunkLen+=chunk.length; }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });readable事件
const rs = fs.createReadStream('./appbak.js'); var chunkArr = [], chunkLen = 0; rs.on('readable',()=>{ var chunk = null; //这里需要判断是否到了流的末尾 if((chunk = rs.read()) !== null){ chunkArr.push(chunk); chunkLen+=chunk.length; } }); rs.on('end',(chunk)=>{ console.log(Buffer.concat(chunkArr,chunkLen).toString()); });pause和resume方法stream.pause()方法讓流進入暫停模式,並停止'data'事件觸發,stream.resume()方法使流進入流動模式,並恢復'data'事件觸發,也可以用來消費所有數據,如下所示:
const rs = fs.createReadStream('./下载.png'); rs.on('data',(chunk)=>{ console.log(`接收到${chunk.length}字节数据...`); rs.pause(); console.log(`数据接收将暂停1.5秒.`); setTimeout(()=>{ rs.resume(); },1000); }); rs.on('end',(chunk)=>{ console.log(`数据接收完毕`); });pipe(destination[, options])方法pipe()方法綁定一個可寫流到可讀流上,並自動切換到流動模式,將所有資料輸出到可寫流,以及做好了資料流的管理,不會發生資料遺失的問題,使用如下所示:
const rs = fs.createReadStream('./app.js'); rs.pipe(process.stdout);以上介紹了多種可讀流的資料消費的方法,但對於一個可讀流,最好只選擇其中的一種,建議使用pipe()方法。 2、Writable可寫流所有的可寫流都是基於stream.Writable類別創建的,創建之後便可將資料寫入該流。
const ws = fs.createWriteStream('./test.txt'); ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');}); ws.end('done.')背壓機制
function copy(src,dest){ src = path.resolve(src); dest = path.resolve(dest); const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dest); console.log('正在复制中...'); const stime = +new Date(); rs.on('data',(chunk)=>{ if(null === ws.write(chunk)){ rs.pause(); } }); ws.on('drain',()=>{ rs.resume(); }); rs.on('end',()=>{ const etime = +new Date(); console.log(`已完成,用时:${(etime-stime)/1000}秒`); ws.end(); }); function calcProgress(){ } } copy('./CSS权威指南 第3版.pdf','./javascript.pdf');drain事件
const ws = fs.createWriteStream('./alphabet.txt'); const alphabetStr = 'abcdefghijklmnopqrstuvwxyz'; ws.on('finish',()=>{ console.log('done.'); }); for(let letter of alphabetStr.split()){ ws.write(letter); } ws.end();//必须调用end([chunk][, encoding][, callback])方法end()方法被呼叫之後,便不能再呼叫stream.write()方法寫入數據,負責將拋出錯誤。 3、Duplex讀寫流Duplex流同時實現了Readable與Writable類的接口,既是可讀流,也是可寫流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是Duplex流。 4、Transform流
Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。
5、四种流的实现
stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:
| Use-case | Class | Method(s) to implement |
| ------------- |-------------| -----|
| Reading only | Readable | _read |
| Writing only | Writable | _write, _writev |
| Reading and writing | Duplex | _read, _write, _writev |
| Operate on written data, then read the result | Transform | _transform, _flush |
Readable流实现
如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:
const Readable = require('stream').Readable; const util = require('util'); const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split(); /*function AbReadable(){ if(!this instanceof AbReadable){ return new AbReadable(); } Readable.call(this); } util.inherits(AbReadable,Readable); AbReadable.prototype._read = function(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } }; const abReadable = new AbReadable(); abReadable.pipe(process.stdout);*/ /*class AbReadable extends Readable{ constructor(){ super(); } _read(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } } } const abReadable = new AbReadable(); abReadable.pipe(process.stdout);*/ /*const abReadable = new Readable({ read(){ if(!alphabetArr.length){ this.push(null); }else{ this.push(alphabetArr.shift()); } } }); abReadable.pipe(process.stdout);*/ const abReadable = Readable(); abReadable._read = function(){ if (!alphabetArr.length) { this.push(null); } else { this.push(alphabetArr.shift()); } } abReadable.pipe(process.stdout);
以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。
Writable流实现
我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):
/*class MyWritable extends Writable{ constructor(){ super(); } _write(chunk,encoding,callback){ process.stdout.write(chunk); callback(); } } const myWritable = new MyWritable();*/ const myWritable = new Writable({ write(chunk,encoding,callback){ process.stdout.write(chunk); callback(); } }); myWritable.on('finish',()=>{ process.stdout.write('done'); }) myWritable.write('a'); myWritable.write('b'); myWritable.write('c'); myWritable.end();
Duplex流实现
实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:
class MyDuplex extends Duplex{ constructor(){ super(); this.source = []; } _read(){ if (!this.source.length) { this.push(null); } else { this.push(this.source.shift()); } } _write(chunk,encoding,cb){ this.source.push(chunk); cb(); } } const myDuplex = new MyDuplex(); myDuplex.on('finish',()=>{ process.stdout.write('write done.') }); myDuplex.on('end',()=>{ process.stdout.write('read done.') }); myDuplex.write('\na\n'); myDuplex.write('c\n'); myDuplex.end('b\n'); myDuplex.pipe(process.stdout);
上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。
Transform流实现
实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:
class MyTransform extends Transform{ constructor(){ super(); } _transform(chunk, encoding, callback){ chunk = (chunk+'').toUpperCase(); callback(null,chunk); } } const myTransform = new MyTransform(); myTransform.write('hello world!'); myTransform.end(); myTransform.pipe(process.stdout);
上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:
_transform(chunk, encoding, callback){ chunk = (chunk+'').toUpperCase() this.push(chunk) callback(); }
Object Mode流实现
我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:
const rs = Readable(); rs.push('a'); rs.push('b'); rs.push(null); rs.on('data',(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62> const rs1 = Readable({objectMode:!0}); rs1.push('a'); rs1.push('b'); rs1.push(null); rs1.on('data',(chunk)=>{console.log(chunk);});//a与b
下面利用Transform流实现一个简单的CSS压缩工具,如下所示:
function minify(src,dest){ const transform = new Transform({ transform(chunk,encoding,cb){ cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,'')); } }); fs.createReadStream(src,{encoding:'utf8'}).pipe(transform).pipe(fs.createWriteStream(dest)); } minify('./reset.css','./reset.min.css');
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。
更多浅谈Node.js:理解stream相关文章请关注PHP中文网!