博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
可读可写流简明实现指北【多图,附demo源码】
阅读量:6761 次
发布时间:2019-06-26

本文共 5126 字,大约阅读时间需要 17 分钟。

  • 可写流实现
    • 初始化参数
    • write()
      • 流程图
      • 注意事项
    • _write()
      • 流程图
      • 注意事项
    • end()
  • 可读流实现
    • 初始化参数
    • 流动模式和暂停模式
      • 流动模式
      • 暂停模式
        • case1
        • case2
      • 流程图
      • 实现难点
        • howMuchToRead
    • pipe
  • 源码

pre-notify

可读可写流的简明实现,以求加深对可读流可写流的印象与理解。

本文用流程图概括了整个源码的实现,着重讲述了比较重要以及难实现的点,推荐打开尾巴处的仓库地址,对照实际的代码来阅读。

(づ ̄ 3 ̄)づ Let`s go!

可写流实现

初始化参数

哼,这才不是骗字数呢,默认参数不重要吗?你记了那么多遍记住了吗?嗯。。

class WriteStream extends EventEmitter{	constructor(path,options){    	this.path = path;        this.flags = options.flags||'w';        this.encoding = options.encoding||'utf8';        this.highWaterMark = options.highWaterMark||16*1024;        this.mode = options.mode||0o666;        this.autoClose = options.autoClose||true;        this.fd = options.fd||null;                // 之所以需要pos,是因为还可以将flags设置成a 追加嘛        this.pos = options.start||0;                // 用来标识是否正在真正写入文件        this.writing = false;                // 用于缓存正在写入时write进的东东        this.buffers = [];                // 用来标识缓存区的大小        this.leng = 0;                // 用来标识写入文件完毕后是否需要触发drain事件        this.needDrain = false;                // 用来标识是否已经调用过end()方法        this.isEnd = false;                // --- --- ---                this.open();  // 打开文件,缓存fd    }}复制代码

write()

流程图

_write部分没有详细注释,其中有一点需要格外注意的是,如果此次write方法是通过调用end间接调用的,那么在_write写入文件完毕后会关闭文件。

注意事项

  • 写入的必须是 buffer 或则 字符串,数字是不行的
let flag = ws.write(1+'','utf8',()=>{    console.log('ok');});复制代码
  • 只有当攒存的数据大于 hightWaterMark 缓存的数据被清空时,才会触发 drain 事件。
  • needDrainisEnd 都是针对于整个写入对象来说的。

_write()

流程图

注意事项

  • 这里判断是否为 end() 方法调用并不是依靠 isEnd 而是依据调用 _write 方法时的第三个参数 end ,因为 isEnd 的改变是在本轮执行时就改变了,而我们要关闭文件的话必须确保的是在调用完 end 以后。
// write 方法中...else{ // isWriting    this.push({    	chunk        ,end        ,callback    });}...// --- --- ---// clearBuffer 方法中...if(buf){  this._write(buf.chunk,()=>{    buf.callback();    this.clearBuffer();  },buf.end);  ...复制代码

end()

end(chunk,encoding=this.encoding,callback=()=>{}){    this.write(chunk,encoding,callback,true);  }复制代码

第三个参数为内部使用,用来标识是通过end调用的write方法,调用之后不再允许使用write继续写入,并且在end实际写入文件后关闭文件

可读流实现

初始化参数

class ReadStream extends EventEmitter{	constructor(path,options){    	this.path = path;        this.flags = options.flags||'r';        this.highWaterMark = options.highWaterMark||64*1024;        this.encoding = options.encoding||null;        this.mode = options.mode||0o666;        this.autoClose = options.autoClose||true;        this.fd = options.fd||null;                this.pos = options.start||0;        this.end = options.end||null;                // 标识可读流此刻的模式 流动||暂停        this.flowing = false;                // 每一次读取的buffer的大小        this.buffer = Buffer.alloc(this.highWaterMark);                // 用于暂停模式时缓存读取的数据        this.buffers = [];                // 相当于rs._readableState.length        this.length = 0;                // 是否需要发射readable事件        // 只有缓存区被读取干净时才会发射事件        this.emittedReadable = false;                // --- --- ---                this.open();                this.on('newListener',(eventName)=>{  // 切换为流动模式读取        	if(eventName === 'data'){            	this.flowing = true;                this.read();            }            if(eventName === 'readable'){  // 切换为暂停模式读取            	this.flowing = false;                this.read();            }        });    }}复制代码

流动模式和暂停模式

从上面的参数初始化可知,可读流可以通过监听两种不同的事件来获取数据。

流动模式

监听的第一种 data 事件被称之为可读流的 流动模式 读取,监听之后它会框框的不停发射它所读取到的data,每次读取到的data大小取决于 highWaterMark 。另外我们可以在data的回调中通过.pause() 方法暂停文件的读取和data的发射,什么时候想恢复了还可以通过 .resume() 来恢复文件的读取和data的发射。

暂停模式

监听的第二种 readable 事件被称之为可读流的 暂停模式 读取。

不同于流动模式的读取,暂停模式下,首先当我们一旦监听readable事件,它会先去读取 highWaterMark 个字节到缓存中并且会触发一次 readable 事件来通知我们,而我们想要拿到这些缓存中的数据需要通过 read(n)

并且这个模式下,它很智能,只要我们从缓存中拿取了数据且剩下的数据小于 highWaterMark 时,它就会自动续杯,往缓冲区再填充 highWaterMark 这么多字节的数据。

注意: 它每次填充的数据都是刚好 hightWaterMark 这么多,不会多也不会少。

那,readable 事件除了刚开始那一次触发,什么时候会再触发呢?

答案是当缓存区被抽干,嗯。。。完全抽干再续上杯的时候就会再一次触发的 readable 事件。

注意: 续杯并不一定等于会触发readable,只有缓冲区被抽干,并且还续了杯,才会触发readable

case1

让我们看如下这么个栗子

// 假设hightWaterMark为3rs.on('readable',()=>{    let result = rs.read(1);    console.log(result);    result = rs.read(1);    console.log(result);    result = rs.read(1);    console.log(result);})<<

之所以产生这样的结果,就是因为我们在readable回调了刚好读取了 highWaterMark 这么多字节的数据,每一次刚好把缓冲区读完,这意味着它续杯的时候就会再一次触发 readable,这样就形成了递归,不断触发readable。

case2

还有一种情况会不断触发 readable

rs.on('readable',()=>{	let result = rs.read();  // 什么都不填    console.log(result);})<<

实际上这个栗子是上面栗子的简写形式,rs.read() 就相当于 rs.read(rs.highWaterMark)

流程图

上面的流程图中有一点是没有详细注释的,就是当要读取的字节数大于缓冲区中存储的字节数时,Node.js源码中是会将 hightWaterMark 先扩充(扩充的大小是按照2的N次方的方式来扩充的),再去读取数据。嗯。。。读一个比你设置的hightWaterMark还大的,有虾米意义?早知如此,当初就该把highWaterMark设置大点不就好咯?我们这里的实现略过这种情况。

实现难点

howMuchToRead

读不像写,读的时候不仅可以设置 start 还能设置 end

So,当我们设置了 end 时,我们每次读取的大小可能就不再是 highWaterMark 个了,准确来说我们最后一次读取的量应该是 this.end-this.pos+1 这么多个。

注意: 之所以要+1,是因为流的API是全Node中最奇葩的,它的索引位置是包前又包后的!

所以每次读取前,我们需要先计算先读取的字节数

let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.hightWaterMark;复制代码

pipe

pipe实现就很简单咯,就是利用可写流的 flag 和 可读流的流动模式 以及 pauseresume 方法。

pipe(ws){    this.on('data',(data)=>{    	let flag = ws.write(data);        if(!flag)this.pause();    });        ws.on('drain',()=>{    	this.resume();    });       this.on('end',()=>{    	ws.end();    });}复制代码

源码

仓库地址:

转载地址:http://ltbeo.baihongyu.com/

你可能感兴趣的文章
c# ftp第三方库FluentFTP
查看>>
利用grep命令查找字符串分析log文件的一次实践
查看>>
vim 小技巧
查看>>
Unity3D之高级渲染-Shader Forge增强版
查看>>
Android逆向——smali复杂类解析
查看>>
不吹不擂,你想要的Python面试都在这里了【315+道题】
查看>>
电商数据库表设计
查看>>
深入理解virtual/new/override 这些关键字的意义
查看>>
Mysql存储过程包括事务,且传入sql数据运行
查看>>
[zz]开源点评:ZeroMQ简介
查看>>
16个 html5 框架
查看>>
用WinSock实现HTTP的GET - Unicorn - 博客频道 - CSDN.NET
查看>>
CentOS修改系统时间
查看>>
编写一个主函数 main,使用两个值作为实参,并输出它们的和。
查看>>
3D打印机切片与控制软件
查看>>
PHP empty、isset、isnull的区别
查看>>
数字按照不同格式转换成字符串
查看>>
__weak如何实现目标值自己主动设置nil的
查看>>
thttpd增加gzip压缩响应报文体功能,以减少传输数据量
查看>>
Windows下搭建IOS开发环境(一)
查看>>