写点什么

NodeJs 中 Buffer 与 Stream 理解

用户头像
小风以北
关注
发布于: 2021 年 03 月 31 日

1. 定义

1.1 什么是 Stream

Stream 流是一种数据传输手段,是端到端信息交换的一种方式,而且是有顺序的,是逐块读取数据、处理内容,用于顺序读取输入或写入输出。


其实 Stream 在计算机中一个相当古老的概念,它起源于 1960 年代早期的 Unix。


"Stream 是随着时间的推移从源流到目的地的一系列数据" @ddprrt


我们要知道 Stream 可以有多种类型:文件,计算机的内存或键盘或鼠标等输入设备都可以称谓。一旦打开一个流,数据就会从其源头到消耗它的进程成块地流动。如果是一个文件,则每个字符或字节将被读取一次,在比如我们在键盘的上面敲打的每个按键都将通过流传输数据然后进行响应。


Stream 的应用在理论上输入可以是无止境的,而且没有限制所以被广泛应用

1.2 Node.js 中的 Stream

Node.js 中的 Stream 是由 Node 核心 Stream 模块封装提供的功能,是 EventEmitter 类的实例,基于事件的。


在 Node.js 中有 4 种流:


  • 可写:用于写入数据

  • 可读:用于读取数据

  • 双工:用于读取和写入数据

  • 转换:在写入或读取时可以修改数据的位置,例如在压缩功能时候你可以写入和读取解压缩数据。

2. 运用

可能文字太过枯燥,在这里通过一些案例给大家介绍下。


本地添加了一个大约大约 1G 的 test.txt 文件,我们用两中方式来读取它,看下它的内存变化。


$ lltotal 1935264-rw-r--r--  1 fengshi  staff   1.1K May 12 20:10 index.jsdrwxr-xr-x  4 fengshi  staff   128B May  6 15:12 node_modules-rw-r--r--  1 fengshi  staff   352B May  6 15:12 package-lock.json-rw-r--r--  1 fengshi  staff   244B May  6 15:12 package.json-rw-r--r--  1 fengshi  staff   942M May 12 20:10 test.txt
复制代码


先不使用流的方式直接读取到内存返回


const http = require('http');const fs = require('fs');const path = require('path');
const textPath=path.resolve(__dirname,'test.txt')http.createServer((req, res) => { fs.readFile(textPath, (err, data) => { res.end(data); });
}).listen(3000);
复制代码


因为资源比较大视频生成的 gif,可能不是很清晰,注意看清左边 MEM 内存的变化就好了



可以清晰看到当执行 curl 后内存一下子就涨起来满了,几何时增长


我们现在在使用流的方式看下


const http = require('http');const fs = require('fs');const path = require('path');
const textPath=path.resolve(__dirname,'test.txt')http.createServer((req, res) => {fs.createReadStream(textPath).pipe(res);}).listen(3000);
复制代码



可以明显看到 MEM 比较缓慢增加,然后比较固定 10M 左右,右边的数据也一直在输出,如果我们要读的数据在大一些假如 10G 的电影,结果可想而至!!!


知道了这些我们在看下 node 中的 4 种 Stream,理解流的思想、其实和理解生产者消费者问题(也称有限缓冲问题)有异曲同工之处。


  • 可读流 Readable

  • 可写流 Writable:可写入数据的流 fs.createWriteStream() HTTP requests TCP, sockets、child process stdin、process.stdout, process.stderr。

  • 双工流 Duplex:TCP sockets、zlib streams、crypto streams。

  • 转换流 Transform:zlib streams、crypto streams。

2.1 可读流 Readable

Readable 流有两种模式运行:flowing 和 paused


  • 在 flowing 模式下,将自动从底层系统读取数据,并使用事件通过 EventEmitter 接口将其尽快提供给应用程序 。

  • 在 paused 模式下,必须显式调用 read()该方法才可以从流中读取数据块

2.1.1 flowing 模式

最简易的使用就是比较常用的监听 data 事件,和使用 pipe()来获取数据源


const readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log(`Received ${chunk} bytes of data.`);});
fs.createReadStream(textPath).pipe(res);
复制代码


当添加'data'事件或者 pipe()处理就自动切换到 flowing 模式了,但是在 flowing 模式下一定要消费这些数据,不然数据可能会丢失。


(flowing 模式画了一个比较***的图,只是想举例奈何...,)



在 flowing 模式下数据并不会直接指向消费者,会先存在水池中,消费者再通过监听 data 事件从水池中获取数据来生长,但是水池的容量有限,如果消费速度比数据流入到池的速度慢,为了不浪费,和缓解这种压力,在可读流中有 highWatermark 这个值表示触发警戒线,在源码里的默认大小是 options.highWaterMark = 64 * 1024;超过这个警戒线,数据就不会在往水池里输入了,或者当消费者主动调用了 pause()方法也不会输了。

2.1.2 paused 模式

所有 Readable 流都以 paused 模式开始的,监听 readable 的回调函数参数不会传递数据,在暂停模式下需要用户手动调用 read() 方法才能得到数据。


const fs = require('fs');rs = fs.createReadStream(sourcePath);//监听 readable事件的时候,会进入暂停模式rs.on('readable', () => {        const ch = rs.read(1);});
复制代码


可读流 Readable 的一些场景:fs.createReadStream() HTTP responses TCP, sockets


我们可以手动实现一个简易的可读流


const { Readable } = require('stream'); const inStream = new Readable({  read(){}});inStream.push('ABCDEFGHIJKLM');inStream.push('NOPQRSTUVWXYZ');inStream.push(null); // No more datainStream.pipe(process.stdout);
复制代码

2.2 可写流 Writable

Writable 流是对数据流向终端设备的抽象,用来消费处理上游的数据,通过可写流程序可以把数据写入设备。


基本原理和读流 Stream 比较相似,当数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入缓存起来。


实现一个可写流 Writable


const { Writable } = require('stream'); const writeStream = new Writable({  write(){}});writeStream.write('11');writeStream.write('22');writeStream.write('33');writeStream.end();
复制代码


最后需要调用 end()表示已无数据传入


理解了这些在来看下我们常用的 fs.createReadStream(textPath).pipe(res);这个代码的pipe()方法就会清晰很多。


可写流 Writable 的一些场景 fs.createWriteStream() HTTP requests TCP, sockets、process.stdout

2.3 双工流 Duplex

双工流 Duplex,可能一下看到双工这个词会又有点陌生,如果熟悉 Websocket 通信的应该清楚它的一个特性全双工,就是值发送方和接受方都是各自独立的方法,发送和接收都没有任何关系。(后面的章节会通过 Websocket 实现实时股票行情数据展示的项目案例给大家讲解下,希望多多关注)


双工流的读写是完全独立操作 对于 Duplex 流来说直接把 writable 流与 reabable 流两者进行结合来,但是写入的数据与读取的数据没有任何的联系。


实现一个 Duplex 同时实现一个 writable 流和 reabable 流就可以:


const { Duplex } = require('stream');const inoutStream = new Duplex({  write(chunk) {  },  read() {  }});process.stdin.pipe(inoutStream).pipe(process.stdout);
复制代码


双工流 Duplex 的一些场景:TCP sockets、zlib streams、crypto streams

2.4 转换流 Transform

transform 流其实可以当作一个特殊的 Duplex 流,因为它集成了双工流 Duplex 的一些作用,拥有 Readable 和 Writable 的能力。


不同的是转换流 transform 的读取与写入数据端是关联的在中间做了转换处理,最比较典型的应用就是 zlib 模块对于文件的压缩和解压了。


在 Node 的官方中提供了zlib模块,内置了转换流 Transform 使用可读流读取 test.txt 文件,通过 pipe 到 zli 中,内部通过转换流 Transform 处理后,在通过管道输出到可写流,这样通过转换流 Transform 就处理了一个文件的压缩应用。


const fs = require('fs');const zlib = require('zlib');fs.createReadStream('test.txt')  .pipe(zlib.createGzip())  .pipe(fs.createWriteStream('test.gz'));
复制代码


转换流 Transform 的一些场景 zlib streams、crypto streams

3. Buffer

在开发中文件、网络等操作处理的都是基于二进制传输的,但是由于 js 没有处理二进制的操作,所以 node 实现了 Buffer 模块用于以字节序列的形式来表示二进制数据,许多 Node.js 的 API(例如流和文件系统操作)都支持 Buffer,因为与操作系统或其他进程的交互通常总是以二进制数据的形式发生.

3.1 定义理解

官方定义:Buffer 在 Node 中的作用就是处理二进制数据。每个缓冲区对应于 V8 外部分配的一些原始内存,缓冲区的行为有点像整数数组,但是不能调整大小,缓冲区中的整数每个表示一个字节,因此限制为 0 到 255 之间的值(包括 0 和 255)。当输出 Buffer 数据的时候,会得到数据的十六进制值链表示。


理解:简单点解释 Buffer 就是将数据从一个地方移到另一个地方的临时存放点,就好比我们正在看腾讯视频一样,无需下载整个视频就可以开始观看视频。如果你的网速度太慢,则会看到“正在缓冲”,这表示他正在尝试收集数据缓冲,以便你继续观看该视频。


为了论证这个观点给大家写了个案例,大家看了后应该会清晰点。


const fs = require('fs');
require('http').createServer((req, res) => { fs.readFile('./test.txt', (err, file) => { console.log(file, Buffer.isBuffer(file)) res.end(file); });}).listen(3000);
<Buffer 7b 0a 20 20 22 61 61 22 3a 20 22 61 61 22 0a 7d 0a> true
复制代码


可以看到在当对一个文件进行读取处理的时候,返回的就是一个 buffer 数据,存放在内存里,可能有些同学会想说我上面不是说处理的是二进制数据吗,7b 0a 20 20 22 61 61 22 3a 20 22 61 61 22 0a 7d 0a 这些不是二进制数据啊?


的确这些表面看不是 2 进制,这里只是转换成了 16 进制表示,因为系统对内存的识别是以 Byte(字节)为单位的,1byte=8bit 就是 8 个 bit,


可是为什么不直接使用二进制表示?对于这个问题我们要考虑的有以下几点要说明下


  • 数据在物理层,数据链路层是以二进制进行传递的,在应用层是以 16 进制进行传输的

  • 二进制文件比较难阅读,当到一个堆字节的序列时,不知道它是什么意思

  • 二进制文件不方便修改难操作,使用 ASCII 表示就方便很多

  • 二进制文件可能会造成混乱,当计算机以不同的方式读取数据时,就会发生问题


然后我们可以直接在控制台里面执行 Buffer.from('hello world')可以看到输出的 buffer 形式数据。


console.log(Buffer.from('hello world'))
<Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64>
复制代码


其实你知道了存放到 buffer 的就是一堆转换成二进制使用 16 进制表示的数据,当你想在输出其他格式时只不过是在内部进行了再次转换其他格式罢了,另外要注意的一点就是你每次往 Buffer 里面存放的数据,都是往内存里面塞,这个内存不是由 V8 分配的是外部 c++直接调用系统申请的,内存在系统中是很宝贵的,如果你一下子把太多东西放进去就不太好了,对与打数据就需要综合借用其他服务一起使用了。

3.2 交互运用

Buffer 类是在全局作用域中的,因此不需要使用 require('buffer')调用 Buffer 的展示形式主要是以原始内存块和解码后的数据两种方式展示。


  1. 创建比较常用的几种创建缓冲区的方法


  • Buffer.from()

  • Buffer.alloc()

  • Buffer.allocUnsafe()


Buffer.from()Buffer.from 用于从数组,字符串创建缓冲区


Buffer.from('yi qi xue node') 
复制代码


打印:<Buffer 79 69 20 71 69 20 78 75 65 20 6e 6f 64 65>


Buffer.alloc()Buffer.alloc 接受大小(整数)作为参数,并返回指定大小的新初始化缓冲区(创建一定大小的填充缓冲区)


Buffer.alloc(8)
复制代码


打印: <Buffer 00 00 00 00 00 00 00 00>8 字节的缓冲区,每个位都预填充了 0


Buffer.allocUnsafe()Buffer.allocUnsafe 接受整数大小作为参数,并返回未初始化的新缓冲区,这意味着它可能会在内存中包含一些旧的或敏感的数据。因此需要谨慎使用,由于在创建缓冲区时不涉及初始化,所以会比 Buffer.alloc()的速度更快


当创建一个 buffer 成功之后就可以对新创建的 buffer 进行各种处理了 2. 读取可以直接像数组一样进行访问


const buf = Buffer.from('yi qi xue node!')console.log(buf[0]) //121console.log(buf[1]) //105console.log(buf[2]) //32console.log(buf[3]) //131
复制代码


返回的这些数字代码都是 Unicode 代码,用于标识缓冲区位置中的字符 unicode121 代码表示的就是 y,105 表示 i,32 表示空格依次往下


Unicode 为每种语言中的每个字符设定了统一并且唯一的二进制编码,以满足跨语言、跨平台进行文本转换、处理的要求,保证了规则会使乱码问题消失


  1. 获取长度直接使用 length 属性就可以获得


const buf = Buffer.from('yi qi xue node!')console.log(buf.length) //14
复制代码


  1. 遍历迭代可以直接像操作数组一样来操作 buffer 对象的值


const buf = Buffer.from('yi qi xue node')for (const item of buf) {  console.log(item) //72 101 121 33 113 105 32 120 117 32 110 111 100 101}
复制代码


  1. buffer 数据更新写入可以先新建一个新的初始化的 buffer 对象


const buf = Buffer.alloc(14)buf.write('yi qi xue node')
复制代码


写入我们需要传输的数据,打印出<Buffer 79 69 20 71 69 20 78 75 65 20 6e 6f 64 65>如果像查看写入的数据可以调用 buf.toString()方法就可以打印出 yi qi xue node 了。


  1. buffer 复制


const buf1 = Buffer.from('yi qi xue node')let buf2 = Buffer.alloc(14) buf1.copy(buf2)console.log(buf2) //<Buffer 79 69 20 71 69 20 78 75 65 20 6e 6f 64 65>
复制代码


copy(buffer,1,2,3)copy 方法后还包含三个参数选择 1 定义开始位置 2 结束位置和 3 新的缓冲区长度

3.3 面试考点

一般针对 node 中 buffer 相关的问题更多的会从 buffer 本身的一些知识点去考察,例如下面一些常问问题


  1. 生成的 Buffer 对象会占用 V8 分配的内存?


Buffer 属于堆外内存,不由 V8 直接分配的,所以不会


  1. Buffer 在 node 中的一些应用场景?


由于 buffer 是为了解决二进制数据处理的,所以针对像图片 影视 音频等二进制数据的管理存储,以及编码解码 I/O 处理 加密解密等


  1. Buffer.alloc 与 Buffer.allocUnsafe 区别


这个问题在前面交互应用里面有介绍,Buffer.allocUnsafe 接受整数大小作为参数,并返回未初始化的新缓冲区,这意味着它可能会在内存中包含一些旧的或敏感的数据。因此需要谨慎使用,由于在创建缓冲区时不涉及初始化,所以会比 Buffer.alloc()的速度更快,Buffer.alloc() 返回指定大小的新初始化缓冲区


  1. Buffer 转中文乱码问题


原因是因为在中文编码中三个 Buffer 值表示一个中文字,在读取流操操作处理中,Buffer 是被分成一段一段进行传输的,可能会出现一段 Buffer 元素不能够被 3 整除就会出现中文乱码的问题,解决一般在创建可读流的时候就设置编码 setEncoding('utf8'),先对 buffer 进行解码。


  1. Buffer 的内存机制


在上面的文章中我们知道了 Buffer 对象的内存分配不由 V8 进行分配,在 V8 中内存主要分为新生代和老生代两代,新内存中的对象存活时间短,老内存中的对象存活时间长,node 在内存的使用上是向 C++层面申请内存,在 js 中分配内存的策略


node 采用的是 slab 的分配机制,slab 其实就是一块申请好的固定内存区域,它有 3 种状态:


  • full:完全分配

  • partial:部分分配

  • empty:未被分配


采用 slab 的机制进行预先申请和事后分配,使 js 与操作系统之间减少不必要的系统调用


以上是关于 node 中 Buffer 和 Stream 这两个在业务中不常用,但在数据操作中很核心的东西,更多细节深入的点大家结合案例自己动手操作实验下

我是来自《小风以北》公众号的小风,我们下期再见
发布于: 2021 年 03 月 31 日阅读数: 10
用户头像

小风以北

关注

公众号【小风以北】 2020.04.09 加入

一个不只会撸代码的程序员

评论

发布
暂无评论
NodeJs中Buffer与Stream理解