2021-10-21 18:02:09
Node.js 中的可读流(Readable Stream)是用于生产数据供程序消费的流,常见的数据生产方式包括读取磁盘文件、网络请求内容等。以下是关于可读流的详细说明和自定义实现方法:
一、可读流的基本概念使用 fs.createReadStream 读取磁盘文件。
使用 process.stdin 读取控制台输入。
要实现自定义可读流,可以继承 stream.Readable 类并重写 _read 方法。以下是一个每 100 毫秒生成一个随机数的可读流示例:
const Readable = require('stream').Readable;class RandomNumberStream extends Readable { constructor(max) { super(); this.max = max; } _read() { const ctx = this; setTimeout(() => { if (ctx.max) { const randomNumber = parseInt(Math.random() * 10000); ctx.push(`${randomNumber}n`); ctx.max -= 1; } else { ctx.push(null); } }, 100); }}module.exports = RandomNumberStream;关键点说明:使用 this.push 将数据推入缓冲区。
推送 null 表示数据生产完毕。
size 参数:指示应读取多少数据,但很多实现会忽略此参数。
push 方法:只能推送字符串或 Buffer 类型的数据。
通过设置 max 参数,可以控制生成随机数的数量:
const RandomNumberStream = require('./RandomNumberStream');const rns = new RandomNumberStream(5);rns.pipe(process.stdout);四、流动模式与暂停模式可读流有两种工作模式:
流动模式:
数据由底层系统读出并尽可能快地提供给应用程序。
可以通过以下方式启动:
添加 data 事件监听器。
调用 resume() 方法。
使用 pipe() 方法将数据转接到可写流。
示例:
rns.on('data', chunk => { console.log(chunk.toString());});暂停模式:
需要显式调用 read() 方法读取数据。
可以通过以下方式切换:
调用 pause() 方法。
移除所有 data 事件监听器并调用 unpipe() 方法。
data 事件:
在流动模式下,当有数据可读时触发。
示例:
rns.on('data', chunk => { console.log(chunk);});end 事件:
当数据读取完毕时触发。
示例:
rns.on('end', () => { console.log('done');});error 事件:
当数据处理过程中出现错误时触发。
示例:
rns.on('error', err => { console.log(err);});read(size) 方法:
在暂停模式下,从内部缓冲区拉取并返回数据。
如果指定 size 参数,则返回指定字节的数据;否则返回所有可用数据。
示例:
rns.on('readable', () => { let chunk; while ((chunk = rns.read()) !== null) { console.log(chunk.toString()); }});readable 事件:
当可读流准备好数据时触发。
示例:
rns.on('readable', () => { console.log('Data is ready to be read');});数据推送:
使用 this.push 推送数据时,只能推送字符串或 Buffer 类型。
推送 null 表示数据生产完毕。
模式切换:
使用 pipe() 方法会自动将流切换到流动模式。
在流动模式下,_read 方法会被自动反复调用。
事件监听顺序:
Node.js 确保在数据生产之前绑定事件监听器,避免数据丢失。
通过以上方法,可以灵活地实现和控制自定义可读流,满足不同的数据处理需求。