原文链接:Reading and writing Node.js streams

NodeJS 应用程序中处理大型数据集既是一个便利工具,也是一个双刃剑。处理大量数据的能力非常有用,但也可能导致性能瓶颈和内存耗尽。传统上,开发者通过一次性将整个数据集读取到内存中来解决这一挑战。虽然这种方法对于较小的数据集直观有效,但对于大型文件则变得低效且资源密集。

这就是 Node.js 流的用武之地。流提供了一种根本不同的方法,允许你增量地处理数据并优化内存使用。通过以可管理的块处理数据,流使你能够构建可扩展的应用程序,即使面对最艰巨的数据集也能高效应对。正如流行的说法,“流是随时间变化的数组”。在本文中,我们将探讨 Node.js 流的核心概念。你将深入了解如何利用流进行实时数据处理、内存管理和构建可扩展的应用程序。

通过本指南的学习,你将学会如何创建和管理可读流和可写流,以及如何处理背压和错误管理。

什么是 Node.js 流?

Node.js 流为你的应用程序管理数据流提供了强大的抽象。它们擅长处理大数据集,例如视频或实时传输,而不影响性能。

这种方法不同于传统的将整个数据集加载到内存中的方法。流以块的形式处理数据,大大减少了内存使用。Node.js 中的所有流都继承自 EventEmitter 类,允许它们在数据处理的各个阶段发出事件。这些流可以是可读的、可写的或两者兼而有之,为不同的数据处理场景提供灵活性。

事件驱动架构

Node.js 基于事件驱动架构,使其在实时 I/O 方面表现出色。这意味着在输入可用时立即消费输入,在应用程序生成输出时立即发送输出。流与这种方法无缝集成,使持续的数据处理成为可能。

它们通过在关键阶段发出事件来实现这一点。这些事件包括接收到数据(data 事件)和流完成(end 事件)的信号。开发者可以监听这些事件并相应地执行自定义逻辑。事件驱动的特性使流在处理来自外部来源的实时数据时非常高效。

为什么使用流?

流相较于其他数据处理方法提供了三个关键优势:

  • 内存效率流以块的形式增量处理数据,而不是将整个数据集加载到内存中。这在处理大数据集时尤为重要,因为它大大减少了内存使用并防止与内存相关的性能问题。
  • 提高响应时间流允许立即处理数据。当数据块到达时,可以在不等待整个负载接收的情况下处理它。这减少了延迟并提高了应用程序的整体响应能力。
  • 实时处理的可扩展性通过块处理数据,Node.js 使用流可以高效地处理大量数据,资源有限的情况下也能胜任。这种可扩展性使流非常适合需要实时处理高数据量的应用程序

这些优势使流成为构建高性能、可扩展的 Node.js 应用程序的强大工具,特别是在处理大数据集或实时数据处理时。

💡什么时候不使用流: 如果你的应用程序已经在内存中有所有可用的数据,使用流可能会增加不必要的开销并减慢应用程序的速度。在这种情况下,传统方法可能更高效。

流动控制:流中的背压

在 Node.js 流中,背压是管理生产者(数据源)与消费者(数据目的地)之间数据流的关键机制,尤其是在处理大数据集时。它确保生产者不会压垮消费者,从而避免崩溃、数据丢失或性能问题。

想象一下消防水管——背压就像一个阀门,调节水流以防止消费者(即拿水管的人)被冲走。

关键点是:计算机的内存是有限的。没有背压,生产者(如消防水管)可能以远超消费者(处理数据的程序)存储数据的速度发送数据。这会导致两个问题:

  1. 数据丢失:如果数据到达的速度超过消费者处理的速度,可能会丢失一些数据或完全丢弃。
  2. 性能问题:消费者可能难以跟上数据的到达速度,导致处理缓慢、崩溃或内存使用率过高。

背压通过创建受控流来防止这些问题。消费者向生产者发出信号,要求减慢数据流,如果它接近内存容量。这确保数据以可管理的块进行处理,避免内存过载和数据丢失。

背压的工作原理

背压的主要目标是为消费者提供一种机制,以节流生产者,确保数据流不超过消费者的处理能力。这通过受控的缓冲系统实现,流临时保存数据,直到消费者准备好处理更多数据。

在 Node.js 流中,通过一个名为“highWaterMark”的内部属性管理缓冲和流量控制。highWaterMark 指定了内部缓冲区在暂停额外数据读取或写入之前可以缓冲的最大字节数(对于字节流)或对象数(对于对象流)。

缓冲和流量控制

以下是缓冲和流量控制管理背压的简要概述:

  • 数据生产:当生产者(如可读流)生成数据时,它将内部缓冲区填充到 highWaterMark 设置的限制。
  • 缓冲:如果内部缓冲区达到 highWaterMark,流将暂时停止从源读取数据或接受更多数据,直到一些缓冲区被消费。
  • 数据消费:消费者(如可写流)处理来自缓冲区的数据。随着缓冲区水平下降并低于 highWaterMark,流恢复读取或接受数据,补充缓冲区。

这种背压机制确保通过管道流动的数据得到调节,防止消费者不堪重负,并确保平滑、高效和可靠的数据处理。

示例前提条件

确保你满足以下要求以跟随本指南:

  • 安装 Node.js v20。如果你使用 Linux 或 macOS,建议使用 Node 版本管理器 (nvm) 来管理你的 Node.js 安装。
     nvm install 20
     nvm use 20
  • 设置工作区。
     mkdir workshop
     cd workshop
     git clone https://github.com/mcollina/streams-training.git

使用可读流

Readable 是我们用来顺序读取数据源的类。

关键方法和事件

可读流通过几个核心方法和事件操作,允许对数据处理进行精细控制:

  • on(‘data’):每当流中有数据可用时触发此事件。它非常快,因为流尽可能快地推送数据,使其适用于高吞吐量场景。
  • on(‘end’):当没有更多数据可从流中读取时触发。它表示数据传递的完成。
  • on(‘readable’):当有数据可从流中读取或流已到达末尾时触发此事件。它允许在需要时更受控地读取数据。

基本可读流

以下是一个简单的可读流实现示例,动态生成数据:

const { Readable } = require('stream');
 
class MyStream extends Readable {
  #count = 0;
  _read(size) {
    this.push(':-)');
    if (this.#count++ === 5) { this.push(null); }
  }
}
 
const stream = new MyStream();
 
stream.on('data', chunk => {
  console.log(chunk.toString());  
});

在此代码中,MyStream 类扩展了 Readable 并重写了 _read 方法,将字符串 ”:-)” 推送到内部缓冲区。在推送字符串五次后,它通过推送 null 来表示流的结束。on('data') 事件处理程序在接收到每个数据块时将其记录到控制台。

输出

:-)
:-)
:-)
:-)
:-)
:-)

通过暂停和恢复管理背压

可以通过根据应用程序的处理数据的能力来暂停和恢复流来处理背压。以下是实现方法:

const stream = new MyStream();
 
stream.on('data', chunk => {
  console.log(chunk.toString());
  stream.pause();  // 暂停接收数据以模拟处理延迟
  setTimeout(() => {
	stream.resume();  // 一秒后恢复
	}, 1000);
});
在这个设置中,每次接收到数据时,流都会暂停,并设置一个超时在一秒钟后恢复流。这模拟了每个数据块处理需要时间的场景,通过使用 `pause()``resume()` 控制数据流展示了基本的背压管理。
 
## **使用 “readable” 事件进行高级控制**
 
为了对数据流进行更细粒度的控制,可以使用 **readable** 事件。此事件更复杂,但为某些应用程序提供了更好的性能,因为它允许显式控制何时从流中读取数据:
 
```javascript
const stream = new MyStream({
  highWaterMark: 1
});
 
stream.on("readable", () => {
  console.count(">> readable event");
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log(chunk.toString());  // 处理数据块
  }
});
stream.on("end", () => console.log('>> end event'));
 
() => {
    stream.resume();  // 一秒后恢复
  }, 1000);
});

在这个设置中,每次接收到数据时,流都会暂停,并设置一个超时在一秒钟后恢复流。这模拟了每个数据块处理需要时间的场景,通过使用 pause()resume() 控制数据流展示了基本的背压管理。

使用 “readable” 事件进行高级控制

为了对数据流进行更细粒度的控制,可以使用 readable 事件。此事件更复杂,但为某些应用程序提供了更好的性能,因为它允许显式控制何时从流中读取数据:

const stream = new MyStream({
  highWaterMark: 1
});
 
stream.on("readable", () => {
  console.count(">> readable event");
  let chunk;
  while ((chunk = stream.read()) !== null) {
    console.log(chunk.toString());  // 处理数据块
  }
});
stream.on("end", () => console.log('>> end event'));

在这里,使用 readable 事件根据需要手动从流中提取数据。可读事件处理程序内的循环持续从流缓冲区读取数据,直到它返回 “null”,表明缓冲区暂时为空或流已结束。将 highWaterMark 设置为 1 保持缓冲区大小较小,更频繁地触发 readable 事件,允许对数据流进行更细粒度的控制。

>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
:-)
>> readable event: 7
>> end event

异步迭代器

异步迭代器是我们推荐的用于建模流数据的标准方式。相比于 Web 和 Node.js 中的所有流原语,异步迭代器更易于理解和使用。在 Node.js 的最新版本中,异步迭代器 已经成为一种更优雅和可读的方法来与流进行交互。基于事件的基础,异步迭代器提供了更高级的抽象,简化了流的消费。

在 Node.js 中,所有可读流都是异步可迭代的。这意味着你可以使用 for await…of 语法循环通过流的数据,当数据可用时处理每一部分数据,以异步代码的效率和简单性处理。

以下是使用异步迭代器与可读流的示例:

import { setTimeout as sleep } from 'timers/promises';
 
async function* generate() {
  yield 'hello';
  await sleep(10);  // 模拟延迟
  yield ' ';
  await sleep(10);  // 模拟另一个延迟
  yield 'world';
}
 
Readable.from(generate()).on('data', chunk => console.log(chunk));

在这个示例中,generate 函数是一个异步生成器,它在每个暂停之间产生三部分数据,使用 await sleep(10) 模拟异步操作的行为,例如 API 调用或数据库查询,其中数据可用之间存在明显延迟。

Readable.from() 方法用于将该生成器转换为可读流。然后,该流在每个数据块可用时发出数据,并使用 on('data') 事件处理程序将其记录到控制台。运行此代码的输出将是:

hello
world

使用异步迭代器与流的好处

使用异步迭代器与流简化了异步数据流的处理方式:

  • 增强可读性:代码结构更清晰和可读,特别是在处理多个异步数据源时。
  • 错误处理:异步迭代器允许使用类似常规异步函数的 try/catch 块来进行简单明了的错误处理。
  • 流量控制:消费者通过等待下一部分数据来控制流量,从而固有地管理背压,允许更高效的内存使用和处理。

异步迭代器提供了一种更现代化且通常更可读的方法来处理可读流,特别是在处理异步数据源或更喜欢基于循环的数据处理方法时。

使用可写流

可写流对于创建文件、上传数据或任何涉及顺序输出数据的任务非常有用。虽然可读流提供数据源,Node.js 中的可写流则作为数据的目的地。

可写流的关键方法和事件

  • .write():此方法用于向流中写入数据块。它通过缓冲数据到定义的限制(highWaterMark),并返回一个布尔值,指示是否可以立即写入更多数据。
  • .end():此方法表示数据写入过程的结束。它表示流完成写操作并可能执行任何必要的清理。

创建可写流

以下是创建一个可写流的示例,它将所有传入数据转换为大写然后写入标准输出:

import { Writable } from 'stream';
import { once } from 'events';
 
class MyStream extends Writable {
  constructor() {
    super({ highWaterMark: 10 /* 10 bytes */ });
  }
  _write(data, encode, cb) {
    process.stdout.write(data.toString().toUpperCase() + '\n', cb);
  }
}
const stream = new MyStream();
 
for (let i = 0; i < 10; i++) {
  const waitDrain = !stream.write('hello');
 
  if (waitDrain) {
    console.log('>> wait drain');
    await once(stream, 'drain');
  }
}
 
stream.end('world');

在这段代码中,MyStream 是一个自定义的可写流,缓冲区容量(highWaterMark)为 10 字节。它重写了 _write 方法,将数据转换为大写后写出。

循环尝试将 hello 写入流十次。如果缓冲区填满(waitDrain 变为 true),它会等待 drain 事件,然后继续,确保不会压垮流的缓冲区。

输出将是:

HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD

如何不通过 HTTP 流式传输文件

在处理流时,特别是在 HTTP 上下文中,必须注意潜在的问题。让我们看看一个如何不通过 HTTP 流式传输文件的示例:

const { createReadStream } = require('fs');
const { createServer } = require('http');
 
const server = createServer((req, res) => {
  createReadStream(__filename).pipe(res);
});
server.listen(3000);

虽然看起来简单,但这段代码有一个隐藏的危险。如果在文件读取过程中发生错误(例如,文件不存在),错误可能不会被正确处理,导致服务器崩溃。

管道流中的错误处理

处理流中的错误,特别是在管道传输时,可能会因为任何部分的流管道中的错误影响整个过程而变得复杂。以下是一个示例:

const fs = require('fs');
const { Transform } = require('stream');
 
const upper = new Transform({
  transform: function (data, enc, cb) {
    this.push(data.toString().toUpperCase());
    cb();
  }
});
 
fs.createReadStream(__filename)
  .pipe(upper)
  .pipe(process.stdout);

在这种情况下,如果任何阶段(读取、转换或写入)发生错误,可能不会被捕获,导致意外行为。使用 stream/promises 模块中的 pipeline() 函数等技术可以更有效地管理这些错误。

使用异步迭代器与管道

将管道与异步迭代器结合提供了一种强大的方法,以最少的缓冲处理流转换:

import fs from 'fs';
import { pipeline } from 'stream/promises';
 
await pipeline(
  fs.createReadStream(import.meta.filename),
  async function* (source) {
    for await (let chunk of source) {
      yield chunk.toString().toUpperCase();
    }
  },
  process.stdout
);

这段代码实现了与前面示例相同的结果,但具有更好的错误处理。pipeline 函数自动管理管道中的错误,简化了代码并使其免受内存过载问题的困扰。

通过结合流的高效性、背压机制的控制和异步迭代器的优雅,你可以构建能够优雅高效处理大量数据的高性能应用程序。