Рубрики
Без рубрики

Node.js Streams: Все, что вам нужно знать

Обновление: эта статья теперь является частью моей книги «Node.js за пределы основы». Прочитайте обновленную версию этого контента и больше о узле в jscomplete.com/node-beyyond-basics.node.js Потоки имеют репутацию для того, чтобы быть трудно работать, и еще сложнее понять. Хорошо у меня есть

Автор оригинала: Samer Buna.

Node.js Потоки имеют репутацию для того, чтобы быть трудно работать, и еще сложнее понять. Ну, у меня есть хорошие новости для вас – это больше не так.

За прошедшие годы разработчики создали множество пакетов там с единственной целью разработки с потоками проще. Но в этой статье я собираюсь сосредоточиться на родном Node.js Stream API Отказ

Что именно потоки?

Потоки – это коллекции данных – просто как массивы или строки. Разница в том, что потоки могут быть не доступны все сразу, и им не нужно вписываться в память. Это делает потоки действительно мощными при работе с большим количеством данных или данные, которые приходят от внешнего источника одного Кусок вовремя.

Однако потоки находятся не только о работе с большими данными. Они также дают нам мощность композиции в нашем коде. Также как мы можем составить мощные команды Linux с помощью других небольших команд Linux, мы можем сделать точно так же в узле с потоками.

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input

grep.pipe(wc)

Многие из встроенных модулей в узле реализуют потоковый интерфейс:

В списке выше есть несколько примеров для нативных объектов Node.js, которые также являются читаемыми и записи для записи. Некоторые из этих объектов являются как читаемые, так и для записи для записи, такие как сокеты TCP, ZLIB и Crypto Streams.

Обратите внимание, что объекты также тесно связаны. В то время как HTTP-ответ – это читаемый поток на клиенте, это запись в записи на сервере. Это связано с тем, что в корпусе HTTP мы в основном читаем от одного объекта ( http. IncomingMessage ) И напишите другому ( http. ServerResponse ).

Также обратите внимание, как STDIO потоки ( stdin , stdout , Стдерр ) иметь типы обратных потоков, когда речь идет о дочерних процессах. Это позволяет очень простой способ трубы и из этих потоков из основного процесса STDIO потоки.

Практический пример потоков

Теория отличная, но часто не на 100% убедительно. Давайте посмотрим пример, демонстрирующий различие потоков, может сделать в коде, когда речь идет о расходе памяти.

Давайте сначала создадим большой файл:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

Посмотрите, что я использовал для создания этого большого файла. Пирентный поток!

ФС Модуль можно использовать для чтения и записи в файлы, используя интерфейс потока. В приведенном выше примере мы пишем к этому Big.file через пирентный поток 1 миллион линий с петлей.

Запуск скрипта выше генерирует файл, который составляет около ~ 400 МБ.

Вот простой узел веб-сервера, предназначенный для исключительного обслуживания Big.file :

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

Когда сервер получает запрос, он будет служить большим файлам, используя асинхронный метод, Fs.readfile Отказ Но эй, это не так, как мы блокируем петлю событий или что-нибудь. Каждая вещь отличная, верно? Верно?

Ну давайте посмотрим, что произойдет, когда мы запустим сервер, подключаем к нему, и отслеживать память при этом.

Когда я запустил сервер, он начался с нормальным количеством памяти, 8,7 МБ:

Затем я подключался к серверу. Обратите внимание, что случилось с потребляемой памятью:

WOW – потребление памяти подскочило до 434,8 МБ.

Мы в основном поставили все Big.file Содержание в памяти, прежде чем мы выписали его на объект ответа. Это очень неэффективно.

Объект отклика HTTP ( res в коде выше) также является питьевым потоком. Это означает, что если у нас есть читаемый поток, который представляет содержимое Big.file , мы можем просто использовать те два друг на друга и достигать в основном тот же результат, не потребляя ~ 400 МБ памяти.

Узел ФС Модуль может дать нам читаемый поток для любого файла, используя CreateReadStream метод. Мы можем трусить это на объект ответа:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

Теперь, когда вы подключаетесь к этому серверу, происходит волшебная вещь (посмотрите на потребление памяти):

Что творится?

Когда клиент просит этого большого файла, мы транслируем его один кусок за раз, что означает, что мы не буфер его вообще в памяти. Использование памяти выросло примерно на 25 МБ, и это.

Вы можете протолкнуть этот пример к его пределам. Регенерация Big.file С пятью миллионами линий вместо всего миллиона, что приведет к файлу более 2 ГБ, и это фактически больше, чем предел буфера по умолчанию в узле.

Если вы попытаетесь обслуживать этот файл, используя Fs.readfile Вы просто не можете, по умолчанию (вы можете изменить пределы). Но с FskcreateSeadstream , нет проблем со всеми потоковыми 2 ГБ данных для запроса, а наилучшие из всех, использование памяти процесса примерно одинаково.

Готов изучать ручьи сейчас?

Потоки 101.

В Node.js есть четыре фундаментальных потока в Node.js: читаемые, записи, дуплексные и преобразователи.

  • Читательский поток – это абстракция для источника, из которого данные могут потребляться. Пример этого FskcreateSeadstream метод.
  • Пирентный поток – это абстракция для пункта назначения, к которому можно записать данные. Пример этого FS.CREATEWRITESTERESTREAM метод.
  • Дуплексные потоки являются читательными, так и для записи. Пример этого является гнездом TCP.
  • Поток преобразования в основном является дуплексной поток, который может использоваться для изменения или преобразования данных, как написано и прочитано. Пример этого zlib.creategzip. Поток для сжимания данных с помощью GZIP. Вы можете подумать о потоке преобразования как функцию, в котором вход – это часть потока записи, и вывод читаемая деталь. Вы также можете услышать трансформные потоки, называемые « через потоки ».

Все потоки – это экземпляры Eventemitter Отказ Они излучают события, которые можно использовать для чтения и записи данных. Тем не менее, мы можем потреблять данные потоков в более простой способ, используя труба метод.

Метод трубы

Вот волшебная линия, которую вам нужно помнить:

readableSrc.pipe(writableDest)

В этой простой строке мы подключаемся к выходу читабельного потока – источник данных, как вход записи записи – пункт назначения. Источник должен быть читаемый поток, и пункт назначения должен быть пиремным. Конечно, они также могут быть дуплексными/преобразованными потоками. На самом деле, если мы находимся в дуплексном потоке, мы можем цепивать вызовы труб так же, как и в Linux:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

труба Метод возвращает поток назначения, который позволил нам сделать цепочку выше. Для потоков А (читабельно), B а также C . (дуплекс) и D (Пишется), мы можем:

a.pipe(b).pipe(c).pipe(d)

# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# Which, in Linux, is equivalent to:
$ a | b | c | d

труба Метод – самый простой способ потребности потоков. Как правило, рекомендуется либо использовать труба Метод или потребляйте потоки с событиями, но избегайте смешивания этих двух. Обычно, когда вы используете труба Метод вам не нужно использовать события, но если вам нужно потреблять потоки более настраиваемыми способами, события будут способом идти.

Потоковые события

Рядом с чтением от читаемого потока источника и письменного письма в пишущийся пункт назначения труба Метод автоматически управляет несколькими вещами по пути. Например, он обрабатывает ошибки, концевые файлы и случаи, когда один поток медленнее или быстрее, чем другой.

Однако потоки также могут быть использованы с событиями напрямую. Вот упрощенный эквивалентный код событий, что труба Способ в основном делает для чтения и записи данных:

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

Вот список важных событий и функций, которые можно использовать с читаемыми и записи журналов:

События и функции каким-то образом связаны, потому что они обычно используются вместе.

Наиболее важные события на читаемый поток являются:

  • данные событие, которое испускается всякий раз, когда поток передает кусок данных потребителю
  • конец Событие, которое испускается, когда нет больше данных, которые нужно потребляться из потока.

Наиболее важные события на репортаж:

  • Слейте Мероприятие, которое является сигналом, который писсусный поток может получать больше данных.
  • Готово Событие, которое испускается, когда все данные были покрашены в базовую систему.

События и функции могут быть объединены для создания на заказ и оптимизированное использование потоков. Чтобы потреблять читаемый поток, мы можем использовать труба / unpipe Методы или читать / невысокий / резюме методы. Чтобы потреблять писантный поток, мы можем сделать это местом назначения трубка / unpipe или просто напишите ему с Написать Метод и вызов конец Метод, когда мы закончим.

Приостановленные и текущие режимы читаемых потоков

Читаемые потоки имеют два основных режима, которые влияют на то, как мы можем их потреблять:

  • Они могут быть либо в Пауза Режим
  • Или В течет Режим

Эти режимы иногда называют модами тяги и толчка.

Все читаемые потоки начнутся в приостановленном режиме по умолчанию, но их можно легко переключить, чтобы вытекать и обратно в приостановку при необходимости. Иногда переключение происходит автоматически.

Когда читаемый поток находится в режиме пауза, мы можем использовать Читать () Однако способ прочтения от потока по требованию, однако для читаемого потока в режиме текучести данные постоянно текут, и мы должны слушать события, чтобы потреблять его.

В режиме текучести данные могут быть фактически потеряны, если потребители не могут обрабатывать его. Вот почему, когда у нас есть читаемый поток в режиме текучести, нам нужен данные обработчик события. На самом деле, просто добавляя данные Обработчик событий переключает пауз поток в режим текучести и удаления данные Обработчик событий переключает поток обратно в приостановленный режим. Некоторые из этого сделано для обратной совместимости со более старыми интерфейсами потоков узлов.

Чтобы вручную переключаться между этими двумя модами потока, вы можете использовать резюме () и Пауза () методы.

При употреблении читаемых потоков с использованием труба Метод, нам не нужно беспокоиться об этих режимах, как труба Управляет их автоматически.

Реализация потоков

Когда мы говорим о потоках в Node.js, есть две основные разные задачи:

  • Задача Реализация потоки.
  • Задача потребляющий их.

До сих пор мы говорили о том, что поглощают только потребляющие потоки. Давайте реализовать некоторые!

Поток использует, как правило, те, кто требовать транслировать модуль.

Реализация записи пирета

Чтобы реализовать репутарный поток, нам нужно использовать Пирена Конструктор из потокового модуля.

const { Writable } = require('stream');

Мы можем реализовать пирентный поток во многих отношениях. Мы можем, например, продлить Пирена Конструктор, если мы хотим

class myWritableStream extends Writable {
}

Однако я предпочитаю более простой подход конструктора. Мы просто создаем объект из Пирена Конструктор и пропустите ряд вариантов. Единственный необходимый вариант – это Написать Функция, которая обнажает кусок данных для записи.

const { Writable } = require('stream');

const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

Этот метод записи занимает три аргумента.

  • Кусок Обычно это буфер, если мы не настраиваем поток потока.
  • кодирование Аргумент необходим в этом случае, но обычно мы можем его игнорировать.
  • Обратный вызов Это функция, которую нам нужно позвонить после того, как мы закончим обработку куска данных. Это то, что сигнализируют, были ли пишет успешно или нет. Чтобы сигнализировать сбой, вызовите обратный вызов с помощью объекта ошибки.

В OutStream мы просто console.log кусок как строка и позвонить в Перезвоните После этого без ошибки, чтобы указать успех. Это очень просто и, вероятно, не так полезно эхо транслировать. Он будет отслеживать все, что он получает.

Чтобы потреблять этот поток, мы можем просто использовать его с Process.Stdin , который является читаемым потоком, поэтому мы можем просто труба Process.Stdin в нашу OutStream Отказ

Когда мы запускаем код выше, все, что мы введем в Process.Stdin будет повторяться, используя OutStream console.log линия.

Это не очень полезный поток для реализации, потому что он на самом деле уже реализован и встроенный. Это очень эквивалентно Process.Stdout Отказ Мы можем просто труба stdin в stdout И мы получим тому же эхо-функцию с этой одной линией:

process.stdin.pipe(process.stdout);

Реализовать читаемый поток

Для реализации читаемого потока нам требуется Читаемый Интерфейс и построить объект из него и реализовать Читать () Метод в параметре конфигурации потока:

const { Readable } = require('stream');

const inStream = new Readable({
  read() {}
});

Существует простой способ реализации читаемых потоков. Мы можем просто напрямую толчок Данные, которые мы хотим потребителям потреблять.

const { Readable } = require('stream'); 

const inStream = new Readable({
  read() {}
});

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

Когда мы толчок А null Объект, это означает, что мы хотим сигнализировать, что поток не имеет больше данных.

Чтобы потреблять этот простым читаемым потоком, мы можем просто трусить его в пиковый поток Process.Stdout Отказ

Когда мы запустим код выше, мы будем читать все данные из instruam и повторяйте его стандарту. Очень просто, но и не очень эффективно.

Мы в основном выталкивая все данные в потоке до Трубовать это к Process.Stdout Отказ Гораздо лучший способ нажать данные по запросу, по требованию , когда потребитель спрашивает об этом. Мы можем сделать это, реализуя Читать () Метод в объекте конфигурации:

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

Когда метод чтения вызывается на читаемый поток, реализация может нажать частичные данные в очередь. Например, мы можем протолкнуть одну букву одновременно, начиная с кода символов 65 (который представляет собой A), и увеличивая, что на каждом толчке:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65;

inStream.pipe(process.stdout);

Пока потребитель читает читаемый поток, читать Метод продолжит огонь, и мы нажимаем больше букв. Нам нужно где-то остановить этот цикл где-то, и именно поэтому a, если отказаться от ноль, когда текущийcharcode превышает 90 (который представляет z).

Этот код эквивалентен тем, чтобы проще, мы начали, но теперь мы толкаем данные по требованию, когда потребитель спрашивает об этом. Вы должны всегда делать это.

Реализация дуплексных/преобразований потоков

С двусторонними потоками мы можем реализовать как читаемые, так и для записи журналов с тем же объектом. Как будто мы наследуем с обоих интерфейсов.

Вот пример двухуровневого потока, который сочетает в себе два записи и читаемые примеры, реализованные выше:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);

Объединяя методы, мы можем использовать этот дуплексной поток для чтения букв от A до Z, и мы также можем использовать его для его эхо. Мы выбираем читаемое stdin Поток в этот дуплексной поток для использования функции ECHO, и мы проводят дуплексного потока в пишет stdout поток, чтобы увидеть буквы A через Z.

Важно понимать, что читаемые и пиректируемые стороны дуплексного потока работают полностью независимо друг от друга. Это просто группировка двух функций в объект.

Поток преобразования – более интересный дуплексной поток, потому что его выход вычисляется с его ввода.

Для трансформации потока нам не нужно реализовать читать или Написать Методы, нам нужно только реализовать трансформировать Метод, который объединяет их обоих. У этого есть подпись Написать метод, и мы можем использовать его в толчок данные также.

Вот простой трансформирующий поток, который откроет все, что вы вводите в него после преобразования его в верхний формат региона:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

В этом трансформировании потока, который мы употребляем в точности как предыдущий пример дуплексного потока, мы реализовали только трансформироваться () метод. В этом методе мы преобразуем Кусок в его прописную версию, а затем толчок эта версия как читаемая часть.

Режим потоков объекта

По умолчанию потоки ожидают значения буфера/строки. Есть ObjectMode Флаг, который мы можем установить, чтобы поток принять любой объект JavaScript.

Вот простой пример, чтобы продемонстрировать это. Следующая комбинация потоков преобразований делает для отображения строки значений, разделенных запятыми в объект JavaScript. Итак, "A, B, C, D" становится {A: B, C: D} Отказ

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

Передаем входную строку (например, «A, B, C, D» ) через CommasPlitter который подталкивает массив в качестве читабельных данных ( [«A», «B», «C», «D»] ). Добавление ReadableObjectMode Флаг на этом потоке необходимо, потому что мы толкаем там объект, а не строка.

Затем мы возьмем массив и трубу его в ArrayToobject транслировать. Нам нужен ПисаетсяobjectMode Флаг, чтобы сделать этот поток принять объект. Это также толкает объект (входной массив сопоставляется в объект), и именно поэтому мы также нуждались в ReadableObjectMode Флаг там тоже. Последний ObjectTostring Поток принимает объект, но выталкивает строку, и именно поэтому нам нужно только ПисаетсяobjectMode Флаг там. Читаемая часть – нормальная строка (строгое объект).

Встроенные потоки преобразования узла

Узел имеет несколько очень полезных встроенных потоков преобразования. А именно, Zlib и Crypto Streams.

Вот пример, который использует zlib.creategzip () Поток в сочетании с ФС Читаемые/пиректируемые потоки для создания сценария сжатия файла:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

Вы можете использовать этот скрипт для GZIP любого файла, который вы проходите в качестве аргумента. Мы готовим читаемый поток для этого файла в встроенный поток Zlib, а затем в пиковый поток для нового gzured-файла. Простой.

Прохладная вещь об использовании труб заключается в том, что мы можем на самом деле комбинировать их с событиями, если нам нужно. Например, я хочу, чтобы пользователь видел индикатор прогресса, когда сценарий работает, и сообщение «Готово», когда сценарий выполнен. Так как труба Метод Возвращает поток назначения, мы можем цепотать оформление обработчиков событий:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

Так с труба Метод, мы легко потребляем потоки, но мы все еще можем дополнительно настроить наше взаимодействие с этими потоками, используя события, где это необходимо.

Что значительно о труба Метод, хотя мы можем использовать его в составить Наша программа по частям, гораздо читабельно. Например, вместо прослушивания данные Событие выше, мы можем просто создать поток преобразования для отчета о ходе прогресса и заменить .на () Позвоните с другим .трубка () вызов:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

Это ReportProgress Поток – это простой проходной поток, но он также сообщает о прогрессе в стандартных. Обратите внимание, как я использовал второй аргумент в Обратный вызов () Функция для толкания данных внутри трансформироваться () метод. Это эквивалентно настаивать данные сначала.

Применения комбинированных потоков бесконечны. Например, если нам нужно зашифровать файл до или после того, как мы сочтеем его, все, что нам нужно сделать, это труба другой поток преобразования в том, что нам нужно было. Мы можем использовать узел Крипто Модуль для этого:

const crypto = require('crypto');
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

Сценарий над сжимами, а затем шифрует пропущенный файл и только те, кто имеет секрет, может использовать выпущенный файл. Мы не можем расстегнуть этот файл с нормальными утилитами Unzip, потому что он зашифрован.

Чтобы на самом деле быть в состоянии расстегнуть что-либо на молнии с скриптом выше, нам нужно использовать противоположные потоки для Crypto и Zlib в обратном порядке, что простое:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

Предполагая, что прошедший файл является сжатой версией, код выше создаст чтение потока от этого, труба их в Crypto Созданный () поток (используя один и тот же секрет), трубу вывода этого в ZLIB Creategunzip () Поток, а затем писать вещи обратно в файл без удлинительной части.

Это все, что у меня есть для этой темы. Спасибо за прочтение! До скорого!

Изучение реагировать или узел? Оформить заказ моих книг:

Оригинал: “https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/”