Рубрики
Без рубрики

Опрос с использованием rxjs

Руководство по решению опроса с использованием реактивного программирования и RXJS. Tagged with Rxjs, JavaScript, Angular, WebDev.

Поскольку наблюдаемые приобретают все большую популярность в JavaScript, мы стремимся выполнять наши повседневные задачи, используя их и оценить, действительно ли они стоят всей ажиотажа. Одна задача, которую вы можете выполнить, – это опросить бэкэнд, чтобы узнать, выполнила ли более длительное задание.

Мы проведем пример такого сценария и реализуем решение с использованием RXJS. На нашем пути мы узнаем несколько основных операторов для RXJS, а также несколько методов, а также как избежать ловушки или двух. В конце я представлю реальный пример, чтобы показать вам, как реализовать то, что мы узнали в конкретном сценарии.

Вы должны принести базовое понимание потоков/ Наблюдаемые а также солидные основы в JavaScript, чтобы насладиться этим постом. Для остальной части этого поста я буду рассматривать поток и наблюдаемый как взаимозаменяемые слова для одного и того же. В то время как мы рассмотрим много основных вещей, они в основном будут специфичными для RXJS и меньше оснований на потоках. Если вы ищете общее введение, рассмотрим название GIST ” Введение в реактивное программирование, которое вам не хватало “.

Код для этого поста был протестирован с использованием RXJS 6.2.0.

Сценарий

Допустим, у нас есть бэкэнд, который раскрывает конечную точку/задачи/[TaskId], которые вы можете запросить, чтобы узнать о статусе конкретной задачи. Это возвращает такой объект, как таковой:

{
  // Whether the task is still running
  processing: boolean;
  // A unique ID for this task
  taskId: string;
}

Как только мы начнем избираться, мы хотим получить текущее состояние этой задачи два раза в секунду и прекратить опрос один раз обработка Анкет

Программное решение

Чтобы начать, мы рассмотрим программное решение для этой проблемы.

    async pollUntilTaskFinished(taskId) {
      const fetchResponse = await fetch(`/tasks/${taskId}`)
      const responseObject = await fetchResponse.json()
      if (responseObject.processing) {
        setTimeout(() => pollUntilTaskFinished(taskId), 500)
      } else {
        pollingFinishedFor(taskId)
      }
    }

Здесь мы просто вызываем новый тайм -аут каждый раз, когда бэкэнд все еще обрабатывает.

Использование RXJS

Теперь мы собираемся достичь того же поведения, используя RXJS.

Прежде всего, нам нужно что -то, чтобы излучать событие каждого x время. RXJS предоставляет две функции для этого:

  • интервал

  • таймер

В то время как интервал Излучает первое событие после данного времени, а затем непрерывно с тем же интервалом, таймер начинается после данного времени, чтобы излучать события каждого x время. Для наших двух обновлений в секунду мы можем начать с использования таймера (0, 500). Это начнет стрелять в события вправо и после этого два раза в секунду.

Давайте сначала посмотрим на это в действии, записав что -то в консоли.

    import { timer } from 'rxjs'

    timer(0, 500)
      .subscribe(() => console.log('polling'))

Вы должны увидеть свой консольный отпечаток «опрос» два раза в секунду.

Позаботьтесь о импорте из правильного пакета (RXJS или RXJS/Operators). К сожалению, документация RXJS может быть недоступна с использованием версии, которую вы используете.

Затем мы хотим превратить эти «тика» в запросы на нашу бэкэнд. Мы собираемся использовать одну и ту же выборку сверху, но на этот раз превратить обещание в наблюдаемое Анкет К счастью, RXJS обеспечивает удобные функции для этого, а именно от Анкет Используя это, теперь мы можем создать наблюдаемый (или поток), представляющий запрос на бэкэнд на каждом тике и продолжать работать с этим.

    import { timer, from } from 'rxjs'
    import { map } from 'rxjs/operators'

    timer(0, 500)
      .pipe(from(fetch(`/tasks/${taskId}`)).pipe(map(response => response.json())))

.pipe это способ указать, что преобразование теперь произойдет в потоке. Извлекая операторов в свой собственный импорт RXJS обеспечивает лучшее проводник, чем перегруженная наблюдаемая реализация когда -либо, см. Это объяснение Для большего контекста.

Результатом этого будет Поток потоков Анкет Каждое излучаемое значение само по себе будет наблюдаемым. Чтобы управлять хаосом, мы можем пробиться concatmap который будет сглаживать все потоки в один, содержащий вложенные значения.

    import { timer, from } from 'rxjs'
    import { map, concatMap } from 'rxjs/operators'

    timer(0, 500)
      .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
        .pipe(map(response => response.json())))
      )

Закончить опрос

Наконец, мы действительно заботимся о том, чтобы получить мероприятие, которое сообщает нам о готовой обработке бэкэнд, что наш опрос завершен. Мы можем достичь этого, фильтруя для событий, в которых бэкэнд больше не обрабатывает, и принимая только первый из них. С помощью возьми (1) Мы указываем, что мы заботимся только об одном (первом) событии, сообщающем нам, что обработка завершена. Это остановит наш опрос после того, как бэкэнд будет выполнен, обрабатывая задачу.

    import { timer, from } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    timer(0, 500)
      .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
        .pipe(map(response => response.json())))
      )
      .pipe(filter(backendData => backendData.processing === false))
      .pipe(take(1))

Сделать все это вместе

Теперь пришло время собрать все это вместе и заменить нашу функцию сверху выше, используя новый код на основе RXJS. Последний штрих – использовать подписаться В конце нашего потока работают с единственным событием, которое испускает наш поток.

    import { timer, from } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    pollUntilTaskFinished(taskId) {
      timer(0, 500)
        .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
          .pipe(map(response => response.json())))
        )
        .pipe(filter(backendData => backendData.processing === false))
        .pipe(take(1))
        .subscribe(() => pollingFinishedFor(taskId))
    }

Возможно, вы не захотите вызывать функцию, как только вы закончите, но используйте вывод вашего наблюдаемого, чтобы отобразить ваш пользовательский интерфейс. Благодаря использованию Merge, который объединяет два потока вместе, мы можем сопоставить наш опрос на два состояния и использовать выход непосредственно для нашего пользовательского интерфейса.

Чтобы достичь этого, мы объединим наш поток сверху вместе с начальным значением, которое мы превращаемся в поток, используя из Анкет

    import { timer, from, merge, of } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    const loadingEmoji = merge(
      of(true),
      timer(0, 500)
        .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
          .pipe(map(response => response.json())))
        )
        .pipe(filter(backendData => backendData.processing === false))
      )
        .pipe(take(2))
        .pipe(map(processing => processing ? '⏳' : '✅'));

После того, как мы сопоставляем ответ из нашего бэкэнда на атрибут обработки, используя карта , в свою очередь, мы можем отобразить результат на смайлики, чтобы отобразить нашим пользователям.

Пример реального мира

Теория всегда хорошая, но реальный мир обычно представляет собой отличную проблему, чем хорошо написанный и содержащий учебник. Позвольте мне представить вам решение проблемы, с которой мы столкнулись при создании наших знаний о опросах с использованием RXJS.

Ситуация: у нас есть угловое приложение, для которого мы используем Ngxs Как государственный менеджер. Подобно Redux, он использует действия для представления событий, изменяющих состояние.

Как выясняется, NGXS обеспечивает поток всех действий, отправленных как наблюдаемый, который мы можем зацепить. Вот наше окончательное решение, чтобы опросить бэкэнд для состояний обработки для каждого Документ *Что *Получается добавлено в состояние и обновите состояние после того, как бэкэнд будет выполнена обработка.

    .actions$
      .pipe(ofActionSuccessful(AddDocument))
      .pipe(filter((action: AddDocument) => action.document.externalProcessingState === environment.documentStates.processing))
      .pipe(map((action: AddDocument) => action.document))
      .pipe(mergeMap((document: Document) => timer(environment.polling.startingOffset, environment.polling.interval)
         // Here we want a new stream per document add.
        .pipe(concatMap(() => from(this.backend.fetch(`/documents/${document.uuid}`))))
        .pipe(concatMap(response => from(response.json())))
        .pipe(filter((polledDocument: Document) => polledDocument.externalProcessingState !== environment.documentStates.processing))
        .pipe(take(1)))
      )
      .subscribe((polledDocument: Document) => {
                    this.store.dispatch(new AddDocument(polledDocument));
      });

Несколько заметок:

  • среда является угловой средой, обеспечивающей конфигурацию для нашего приложения.

  • Бэкэнд это услуга, обеспечивающая связь с нашей бэкэнд. Он вставляет несколько необходимых заголовков и тому подобное.

  • Это использует TypeScript So SopledDocument: документ описывает переменную с именем «OlpledDocument», которая следует за типом «документ».

Хитрое здесь заключается в том, что нам нужно создать новый «поток опросов» на документ, добавляемый в наш штат. Сначала мы попытались вытащить логику на один уровень, но это закончилось тем, что мы только смогли опросить на одном документе на нагрузку на страницу, так как Take (1) заблокировал поток для всех будущих опросов.

Завершая

Сегодня мы построили нашу первую логику опроса, используя RXJS, узнав об этой великой библиотеке на этом пути. Мы также взглянули на пример реального мира и увидели, насколько выразительным он может сделать наш код.

Теперь выходите и примените свои вновь обретенные знания.

Другие великие ресурсы

https://blog.strongbrew.io/rxjs-polling/

https://www.sitepoint.com/angular-rxjs-create-api-service-rest-backend/

https://www.learnrxjs.io/recipes/http-polling.html

Первоначально опубликовано на makeitnew.io 30 августа 2018 года.

Оригинал: “https://dev.to/hoverbaum/polling-using-rxjs-3c3i”