Рубрики
Без рубрики

Реактивное программирование и наблюдаемые последовательности с RXJS в Node.js

Работа с асинхронной неблокирующей обработкой всегда была нормой в мире JavaScript, и теперь становится очень популярным во многих других контекстах. Преимущества ясны: эффективное использование ресурсов. Но преимущества приходят по себестоимости: нетривиальное увеличение сложности. Со временем, поставщиками и то

Автор оригинала: Enrico Piccinin.

Работа с асинхронной неблокирующей обработкой всегда была нормой в мире JavaScript, и теперь становится очень популярным во многих других контекстах. Преимущества ясны: эффективное использование ресурсов. Но преимущества приходят по себестоимости: нетривиальное увеличение сложности.

Со временем, поставщиками и сообществом открытого исходного кода пытались найти способы уменьшить такую сложность без ущерба для преимуществ.

Асинхронная обработка началась с «обратных вызовов», а затем пришла обещание и будущее, Async и ждут. Недавно еще один ребенок приехал в город – Реактивный эффект С его различными языковыми реализациями – привлечение разработчиков новый мощный инструмент, наблюдаемый.

В этой статье мы хотим показать, как реализованы наблюдателями RXJS (Вариант осуществления JavaScript ActiveXX) может упростить выполнение кода с Node.js, популярным сервером JavaScript безблокирующей средой.

Простое использование корпуса – прочитайте, преобразовывать, запись и журнал

Чтобы сделать наши рассуждения конкретными, давайте начнем с простого использования. Предположим, нам нужно прочитать файлы, содержащиеся в Источник Dir , Превратите их содержание и напишите новые преобразованные файлы в Целевой дир , Удерживая журнал файлов, которые мы создали.

Синхронная реализация

Синхронная реализация этого случая использования довольно прост. В своем роде представление псевдо кода мы могли бы подумать о чем-то вроде:

read the names of the files of Source Dir
   for each file name
      read the file
      transform the content
      write the new file in Target Dir
      log the name of the new file
   end for
console.log('I am done')

Здесь нет ничего особенного для комментариев. Мы можем просто сказать, что мы уверены в последовательности выполнения каждой строки, и мы уверены, что все будет происходить, как описано следующим потоком событий. Каждый круг соответствует завершению операции ввода/вывода.

Что происходит в асинхронной неблокирующей среде, такой как Node.js

Node.js – асинхронная неблокирующая среда выполнения для JavaScript. Неблокировка означает, что Node.js не ждет операций ввода/вывода или сетевых операций, прежде чем перейти к выполнению следующей строки кода.

Обработка одного файла

Почты и записи файлов являются операциями ввода/вывода, где Node.js показывает свою неблокирующую природу. Если программа Node.js запрашивает файл для чтения, он должен предоставить функцию, которая должна выполняться при доступе содержимого файла (то называемая Callback ), а затем немедленно переместитесь на следующую операцию для выполнения.

Давайте рассмотрим случай только Один файл Отказ Чтение, преобразование, писать один Файл и обновление журнала Node.js выглядит что-то подобное:

import * as fs from 'fs'; // Node module to access file system
const fileName = 'one-file.txt';
fs.readFile(fileName, callback(err, data) => {
   const newContent = transform(data);
   const newFileName = newFileName(fileName); // calculate new name
   fs.writeFile(newFileName, newContent, err => {
      if(err) {// handle error};
      fs.appendFile('log.txt', newFileName  + ' written', err = {
         if (err) {// handle error}
      });
   });
})

Синтаксис может выглядеть немного запутано с 2 уровнями отступа, но если мы думаем о том, что происходит с точки зрения событий, мы все еще можем точно предвидеть последовательность:

Рай обещания

Это корпус использования, когда ложки JavaScript сияет. Использование обещания мы можем сделать код снова последовательно, не вмешиваясь в асинхронную природу Node.js.

Предполагая, что мы можем получить доступ к функциям, которые выполняют операции чтения и записи в файле и вернуть обещание, то наш код будет выглядеть так:

const fileName = 'my-file.txt';
readFilePromise(fileName)
.then(data => {
   const newContent = transform(data);
   const newFileName = newFileName(fileName); // build the new name
   return writeFilePromise(newFileName, newContent)
})
.then(newFileName => appendFile('log.txt', newFileName))
.then(newFileName => console.log(newFileName + ' written'))
.catch(err => // handle error)

Есть несколько способов преобразовать функции Node.js в Обещание На основе функций. Это один пример:

function readFilePromise(fileName: string): Promise{
   return new Promise(function(resolve, reject) {
      fs.readFile(fileName, function(err, data: Buffer) {
         if(err !== null) return reject(err);
         resolve(data);
      });
   });
}

Обработка многих файлов

Если мы вернемся к исходному корпусу, где мы должны преобразовывать все файлы, содержащиеся в каталоге, сложность увеличивается, а обещания начинают проявлять некоторые ограничения.

Давайте посмотрим на события, которые необходимо управлять реализацией Node.js.

Каждый круг представляет завершение операции ввода/вывода, либо чтение или запись. Каждая строка представляет собой обработку одного конкретного файла или цепочки обещаний.

Учитывая неблокирующая природа Node.js, нет уверенности на последовательности во времени таких событий. Возможно, мы закончим писать File2. прежде чем мы закончим чтение File3 Отказ

Параллельная обработка каждого файла делает использование обещаний более сложным (в конце этой статьи, обеспечивается реализация на основе обещания). Это сценарий, в котором реактива – RXJS в частности – и наблюдаемый блеск и позволяют создавать элегантные решения.

Что такое наблюдаемые и что вы можете сделать с ними?

Есть много мест, где подробно описаны официальные определения наблюдаемых, начиная с официального сайта Реактивный эффект Отказ

Здесь я просто хочу напомнить вам о паре свойств, которые всегда уделяли мое внимание:

  • Наблюдаемые модели A Поток событий
  • Наблюдаемый это “push” Брат итерабельный, который “тянет”

Как «Push» Brother of Rireable, наблюдаемый предлагает разработчикам многие из прохладных функций, предоставляемых, такие как:

  • Преобразовать «потоки событий» или наблюдаемых, через операторы, такие как карта , Фильтр и пропускать
  • Применить функциональный стиль программирования

Одна дополнительная очень важная вещь, которую наблюдаемые предложения – подписка. С помощью подписки код может применять «побочные эффекты» на события и выполнять определенные действия, когда случаются определенные события, такие как при возникновении ошибок или поток событий.

Как видите, наблюдаемый интерфейс предоставляет разработчикам возможность предоставить три различных функция, которые определяют, что для выполнения соответственно, когда: событие испускается своими данными, происходит ошибка, или поток событий завершается.

Я предполагаю, что все вышеперечисленное может звучать очень теоретически тем, кто еще не играл с наблюдаемым, но, надеюсь, следующая часть обсуждения, которая сосредоточена на нашем случае, сделает эти концепции более бетонными.

Реализация корпуса использования чтения, преобразования, записи и журнала через наблюдаемый

Наш случай для использования начинается с чтения списка файлов, содержащихся в Источник Dir Отказ Итак, давайте начнем оттуда.

Прочитайте все имена файлов, содержащиеся в каталоге

Давайте предположим, что у нас есть доступ к функции, которая получает в качестве входного имени каталога и возвращает наблюдаемый, который издает список имен файлов каталога после чтения структуры дерева каталогов.

readDirObservable(dirName: string) : Observable>

Мы можем подписаться на это наблюдаемое, и когда все имена файлов были прочитаны, начните делать что-то с ними:

Прочитайте список файлов

Давайте предположим, что мы можем получить доступ к функции, которая получает в качестве ввода списка файлов и излучает каждый раз, когда файл был прочитан (он испускает содержимое файла Буфер и его название строка ).

readFilesObservable(fileList: Array) 
   : Observable<{content: Buffer, fileName: string}>

Мы можем подписаться на такие Наблюдаемый и начните делать что-то с содержанием файлов.

Сочетание наблюдаемых – оператор переключения

Теперь у нас есть два наблюдателя, которые излучают список имен файлов, когда каталог был прочитан, и тот, который излучает каждый раз, когда файл прочитан.

Нам нужно объединить их для реализации первого этапа нашего использования нашего использования, которое является: когда ReadDiRobsiblebable излучает, нам нужно Переключатель к ReadFilesoBServable .

Хитрость здесь выполняется Переключатель оператор. Код выглядит как:

readDirObservable(dirName)
.switchMap(fileList => readFilesObservable(fileList))
.subscribe(
      data => console.log(data.fileName + ' read'), // do stuff with the data received
      err => { // manage error },
      () => console.log('All files read')
)

Мы должны упомянуть, что Переключатель Оператор мощнее, чем это. Его полная мощность, хотя и не может быть оценена в этом простом случае, и его полное описание находится за пределами объема этого поста. Если вы заинтересованы, это Отличная статья который описывает подробно Переключатель Отказ

Наблюдаемый генерируют поток наблюдаемых

Теперь у нас есть поток событий, представляющих завершение читать операция. После читать Нам нужно сделать трансформацию содержания, которое, ради простоты, мы предполагаем, что мы предполагаем синхронно, а затем нам нужно сохранить преобразованный контент в новом файле.

Но написание нового файла снова является операцией ввода/вывода или операция безблокировки. Таким образом, каждое событие «File-Read Completion» начинает новый путь разработки, который получает в качестве ввода содержимого и имени исходного файла и выделяет, когда новый файл написан в Целевой дир (Излучаемое событие несет название файла, написанного).

Опять же, мы предполагаем, что мы можем получить доступ к функции, которая выделяет, как только операция записи будет завершена, и выделяемые данные – это имя файла, написанного.

writeFileObservable(fileName: string, content: Buffer) :            Observable

В этом случае у нас есть разные «файл записи», возвращаемые writefileobseribable Функция, которая выделяет независимо. Было бы здорово слияние их в новое наблюдаемое, которое излучает в любое время каждый из этих «запись файла записи».

С реактивнымиx (или RXJS в JavaScript) мы можем достичь этого результата, используя Mergemap Оператор (также известный как FlatMap ). Это как код выглядит:

readDirObservable(dir)
.switchMap(fileList => readFilesObservable(fileList))
.map(data => transform(data.fileName, data.content))
.mergeMap(data => writeFileObservable(data.fileName, data.content))
.subscribe(
      file => console.log(data.fileName + ' written'),
      err => { // manage error },
      () => console.log('All files written')
)

Mergemap Оператор создал новый наблюдаемый, writefileobseribable Как показано на следующей диаграмме:

И что?

Применение того же подхода, если мы просто представляем, что у нас есть новая функция WriteLogBseribable , который записывает линию в журнале, как только файл написан и излучает имя файла, как только журнал обновляется, окончательный код для нашего использования будет выглядеть:

readDirObservable(dir)
.switchMap(fileList => readFilesObservable(fileList))
.map(data => transform(data.fileName, data.content))
.mergeMap(data => writeFileObservable(data.fileName, data.content))
.mergeMap(fileName => writeLogObservable(fileName))
.subscribe(
      file => console.log(fileName + ' logged'),
      err => { // manage error },
      () => console.log('All files have been transformed')
)

У нас нет отступов, представленных обратными вызовами.

Время течет только вдоль вертикальной оси, поэтому мы можем прочитать линию кода по линии и причин о том, что происходит линия после строки.

Мы приняли функциональный стиль.

Другими словами, мы видели преимущества наблюдаемой в действии.

Создавайте наблюдаемые из функций с обратными вызовами

Я надеюсь, что вы теперь думаете, что это выглядит довольно круто. Но даже в этом случае у вас может быть один вопрос. Все функции, которые делают этот код, просто не существуют. Нет ReadFilesoBServable или writefileobseribable В стандартных библиотеках Node.js. Как мы можем их создать?

BindCallback и BindNodeCallback

Пара функций, предоставляемых наблюдаемыми, а именно BindcallbackBindNodeCallback ) Приходите на нашу помощь.

Основная идея за ними состоит в том, чтобы обеспечить механизм преобразования функции F который принимает обратный вызов CB (CBINPUT) в качестве входного параметра в функцию, которая возвращает наблюдаемый Оббарда который излучает cbinput Отказ Другими словами, это преобразует Призыв из CB В Эмиссия cbinput Отказ

Абонент Оббарда может определить функцию, которая будет обработать cbinput (который играет ту же роль, что и CB (CBINPUT) ). Применение Конвенции является то, что функция обратного вызова CB (CBINPUT) Должен быть последний аргумент F Отказ

Вероятно, легче понять механизм, глядя на следующую диаграмму:

Начальная точка, функция f (x, cb) одинаково в двух случаях. Результат (что напечатано на консоли), одинаково в двух случаях.

То, что отличается, как получается результат. В первом случае результат определяется функцией обратного вызова, прошедшей в качестве ввода. Во втором случае он определяется функцией, определенной абонентом.

Еще один способ рассмотрения того, как Bindcallback Работы – это посмотреть на преобразование, которое он выполняет, как показано на диаграмме ниже.

Первый аргумент F становится значением, переданным новой функции FBOUND Отказ Аргументы, используемые в качестве параметров обратного вызова CB. Станьте значениями, излучаемыми новыми наблюдаемыми, возвращенными FBOUND Отказ

BINDODECALLBAK это вариация Bindcallback Исходя из Конвенции, что функция обратного вызова имеет Ошибка . Параметр в качестве первого параметра, наряду с Node.js Conventer Fs.readfile (Err, CB) Отказ

Создавайте наблюдаемые от функций без обратного вызова

BINDODECALLBAK был разработан для работы с функциями, которые ожидают обратный вызов как последний аргумент их ввода, но мы можем сделать его также с другими функциями.

Давайте рассмотрим стандартную функцию Node.js readline. . Это функция, используемая для чтения файлов линии по линии. В следующем примере показано как он работает:

Каждая строка прочитана, нажата в линии множество. Когда файл полностью прочитан, функция ProcessLinesccb называется.

Представьте, что теперь мы определяем новую функцию, _readlines , который обернут логику, определенную выше, как показано следующим фрагментом:

Как только все строки будут прочитаны, они обрабатываются функцией ProcessLinesccb который является последним входным параметром _readlines Отказ _readlines следовательно, функция, которая может быть обработана Bindcallback Отказ Через этот трюк мы можем преобразовать функцию Node.js Fs.Readline в наблюдаемый с использованием обычного Bindcallback Функция следующим образом:

Заключение

Асинхронная неблокирующая обработка – это сложный природой. Наши умы привыкли думать последовательно – это верно, по крайней мере, для тех из нас, кто начал кодировать несколько лет назад. Мы часто обнаруживаем, что это сложно рассуждать о том, что действительно происходит в этих условиях. Callback-Hell находится всего за углом.

Обещания и фьючерсы упростили некоторые из самых частых случаев, таких как «один раз» асинхронные события, «Запрос сейчас – отвечает позже» сценарий, типичный для HTTP-запросов.

Если мы перейдем с «одновременного» события, к обещанию «событий потоков» начнут проявлять некоторые ограничения. В таких случаях мы можем найти ReaciveX и наблюдаемые очень мощным инструментом.

Как поощрили: реализация нашего использования на основе обещания

Это реализация того же применения, основанного на обещаниях:

const promises = new Array();
readDirPromise(dir)
.then(fileList => {
   for (const file of fileList) {promises.push(
         readFilePromise(file)
         .then(file_content => transform(file_content))
         .then(file => writeLogPromise(file))
      );
   }
   return promises;
}
.then(promises => Promise.all(promises))
.then(() => console.log('I am done'))
.catch(err => { // manage error })

Оригинал: “https://www.freecodecamp.org/news/rxjs-and-node-8f4e0acebc7c/”