在前端工程化中产生了很多工具,例如grunt,gulp,webpack,babel...等等,这些工具都是通过node中的stream实现。 在node中stream也是非常非常非常重要的模块,比如我们常用的console就是基于stream的实例,还有net,http等核心模块都是基于stream来实现的,可见stream是多么的重要。
1.什么是stream?
是一种数据传输手段,从一个地方传输到另一个地方。
在写node的时候会存在读取文件,比如现在我们有一个非常大的文件,50G吧const fs = require('fs'); // test文件50个G fs.readFileSync('./test.text');复制代码
这个时候需要消耗大量的时候去读取这个文件,然而我们可能关心的并不是文件所有内容,还会存在直接读取失败。stream就是为了解决这些问题而产生,我们读一些数据处理一些数据,当读到所关心数据的时候,则可以不再继续读取。
stream翻译成中文‘流’,就像水一样,从水龙头流向水杯。
2. Stream模块
stream继承于EventEmitter,拥有事件触发和事件监听功能。主要分为4种基本流类型:
- Readable (可读流)
- Writable (可写流)
- Duplex (读写流)
- Transform (转换流) 在流中默认可操作的类型string和Buffer,如果需要处理其他类型的js值需要传入参数objectMode: true(默认为false)
在流中存在一个重要的概念,缓存区,就像拿水杯去接水,水杯就是缓存区,当水杯满,则会关闭水龙头,等把水杯里面的水消耗完毕,再打开水龙头去接水。
stream默认缓存区大小为16384(16kb),可以通过highWaterMark参数设置缓存区大小,但设置encoding后,以设置的字符编码为单位衡量。3. Readable
首先创建一个可读流,可接收5个参数:
- highWaterMark 缓存区字节大小,默认16384
- encoding 字符编码,默认为null,就是buffer
- objectMode 是否操作js其他类型 默认false
- read 对内部的_read()方式实现 子类实现,父类调用
- destroy 对内部的_ destroy()方法实现 子类实现,父类调用
可读流中分为2种模式流动模式和暂停模式。
监听data事件,触发流动模式,会源源不断生产数据触发data事件:const { Readable } = require('stream'); let i = 0; const rs = Readable({ encoding: 'utf8', // 这里传入的read方法,会被写入_read() read: (size) => { // size 为highWaterMark大小 // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流 if (i < 10) { rs.push(`当前读取数据: ${i++}`); } else { rs.push(null); } }, // 源代码,可覆盖 destroy(err, cb) { rs.push(null); cb(err); } }); rs.on('data', (data) => { console.log(data); // 每次push数据则触发data事件 // 当前读取数据: 0 // 当前读取数据: 1 // 当前读取数据: 2 // 当前读取数据: 3 // 当前读取数据: 4 // 当前读取数据: 5 // 当前读取数据: 6 // 当前读取数据: 7 // 当前读取数据: 8 // 当前读取数据: 9 })复制代码
监听readable事件,触发暂停模式,当流有了新数据或到了流结束之前触发readable事件,需要显示调用read([size])读取数据:
const { Readable } = require('stream'); let i = 0; const rs = Readable({ encoding: 'utf8', highWaterMark: 9, // 这里传入的read方法,会被写入_read() read: (size) => { // size 为highWaterMark大小 // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流 if (i < 10) { // push其实是把数据放入缓存区 rs.push(`当前读取数据: ${i++}`); } else { rs.push(null); } } }); rs.on('readable', () => { const data = rs.read(9); console.log(data); // })复制代码
read([size]) size参数:
- 不传代表读取缓存区所有数据。
- 传入0 填充缓存区, 但返回null
- size < 当前缓存区数据 返回所需数据
- size > 当前缓存区数据 返回null 并改变highWaterMark值
这里的缓存区数据不是指highWaterMark,获取缓存区数据大小rs._readableState.length。
流的模式可以自由切换: 通过rs._readableState.flowing的值获取当前状态
- null 初始状态
- false 暂停模式
- true 流动模式
rs.pause()切换到暂停模式 rs.resume()切换到流动模式
在可读流里面还可以监听其他事件:
rs.on('close', () => { // 流关闭时或文件关闭时触发 }) rs.on('end', () => { // 在流中没有数据可供消费时触发 }) rs.on('error', (err) => { // 发生错误时候 })复制代码
4. Writable
可写流可接受参数:
- highWaterMark 缓存区字节大小,默认16384
- decodeStrings 是否将字符编码传入缓冲区
- objectMode 是否操作js其他类型 默认false
- write 子类实现,供父类调用 实现写入底层数据
- writev 子类实现,供父类调用 一次处理多个chunk写入底层数据
- destroy 可以覆盖父类方法,不能直接调用,销毁流时,父类调用
- final 完成写入所有数据时父类触发
在实现流除了用上面直接传入参数的方式,还可以用继承类
class WS extends stream.Writable { constructor() { super({ highWaterMark: 1 }); } _write(chunk, encoding, cb) { console.log(this._writableState.length); // chunk 为需要写入的数据 // encoding 字符编码 // cb 回调函数, 如果写入成功需要调用cb去执行下一次写入,如果发生错误,可以cb(new Error([错误信息])) if (chunk.length < 4) { fs.writeFileSync('./2.text', chunk, { flag: 'a' }); cb(); } else{ cb(new Error('超出4个字节')); } }}const ws = new WS();let i = 0;function next() { let flag = true; // write() 会返回boolean false -> 缓存区没满 true —> 已满,需要暂停写入数据 while(i < 10 && flag) { flag = ws.write(`${i++}`); console.log('flag', flag); }}next();// 当所有缓存区数据已经成功写入底层数据,缓存区没有数据了,触发drain事件ws.on('drain', () => { console.log('drain'); // 继续写入缓存区数据 next();})复制代码
可写流的end事件,一旦触发end事件,后续不能再写入数据.
ws.write('start'); ws.end('end'); ws.wrtie('test'); // 报错 write after end复制代码
finish事件:
ws.write('start'); ws.end('end'); ws.on('finish', () => { console.log('调用end方法后,并且所有数据已经写入底层') })复制代码
cork()与uncork(),强制所有数据先写入缓存区,直到调用uncork()或end(),这时一并写入底层:
const ws = stream.Writable({ writev(chunks, encoding, cb) { // 这时chunks为一个数组,包含所有的chunk // 现在length为10 console.log(chunk.length); } }); // 写入数据之前,强制写入数据放入缓存区 ws.cork(); // 写入数据 for (let i = 0; i < 10; i++) { ws.write(i.toString()); } // 写入完毕,可以触发写入底层 ws.uncork();复制代码
5. Duplex
读写流,该方法继承了可写流和可读流,但相互之间没有关系,各自独立缓存区,拥有Writable和Readable所有方法和事件,同时实现_read()和_write()方法。
const fs = require('fs'); const stream = require('stream'); const duplex = stream.Duplex({ write(chunk, encoding, cb) { console.log(chunk.toString('utf8')); // 写入 }, read() { this.push('读取'); this.push(null); } }); console.log(duplex.read(6).toString('utf8')); // 读取 duplex.write('写入');复制代码
6. Transform
转换流,这个流在前端工程化中用到最多,从一个地方读取数据,转换数据后输出到一个地方,该流继承于Duplex。
const fs = require('fs'); const stream = require('stream'); const transform = stream.Transform({ transform(chunk, encoding, cb){ // 把数据转换成大写字母,然后push到缓存区 this.push(chunk.toString().toUpperCase()); cb(); } }); transform.write('a'); console.log(transform.read(1).toString()); // A复制代码
7. fs快速创建可读/可写流
可读流和可写流都需要我们去实现父类的方法,那么fs这个模块帮我们做了这件事情,fs里面实现了高效并且可靠的可读/可写流,提供快速创建流,不再去实现父类_write()或_read()。下面我们来看看如何使用:
const fs = require('fs'); /** * 创建可读流 * * 第一个参数文件路径 * * 第二个参数为options * flags?: string; encoding?: string; 字符编码 fd?: number; 文件打开后的标识符 mode?: number; 文件的权限 autoClose?: boolean; 读取完毕后,是否自动关闭文件 start?: number; 从哪个位置开始读取 end?: number; 读到什么时候结束 highWaterMark?: number; 最高水位线 */ const rs = fs.createReadStream('1.text'); rs.on('data', data => { console.log(data); }) /** * 创建可写流 * * 第一个参数文件路径 * * 第二个参数为options * flags?: string; encoding?: string; 字符编码 fd?: number; 文件打开后的标识符 mode?: number; 文件的权限 autoClose?: boolean; 写入完毕后,是否自动关闭文件 start?: number; 从什么位置开始写入 */ const ws = fs.createWriteStream('2.text'); ws.write('123');复制代码
8. pipe
在流中搭建一条管道,从可读流中到可写流。
可读流中有pipe()方法,在可写流中可以监听pipe事件,下面实现了从可读流中通过管道到可写流:
const fs = require('fs'); const stream = require('stream'); const rs = stream.Readable({ read() { this.push(fs.readFileSync('./1.text')); // 文件内容 test this.push(null); } }); const ws = stream.Writable({ write(chunk, encoding, cb) { // chunk为test buffer fs.writeFileSync('./2.text', chunk.toString()); cb(); } }); ws.on('pipe', data => { // 触发pipe事件 console.log(data); }); rs.pipe(ws);复制代码
9. 总结
流分为四种基本类型,两种模式。流中的数据不是直接写入或读取,有缓存区的概念。