В моей предыдущей статье Как думать реактивно и анимируют движущиеся объекты с помощью RXJS Я описал, как построить MobileObject Класс, который имитирует движение объекта, подлежащего ускорениям, наложенным на него внешним контроллером.
Теперь я хочу показать вам простую распределенную систему, которая позволяет Контроллер Приложение для удаленного контроля перемещения MobileObject. Второе удаленное приложение Монитор , Показывает движение объекта на двумерном плане. В центре системы кладет MobileObjectServer , что это место, где MobileObjects жить.
Целью данной статьи является объяснение того, насколько реактивное мышление может постепенно создавать конструкцию, которая очень естественно отображает требования к аккумулятору. Мы в конечном итоге решаем проблему, подписывающую только один наблюдаемый Отказ
Мы сосредоточимся на серверной части, которая наиболее интригующая с этой точки зрения.
Для реализации мы будем использовать RXJS и Teadncript. Сервер работает на узле. Все компоненты связываются с использованием веб-розеток.
Полная кодовая база, состоящая из серверного контроллера и монитора, можно найти здесь Отказ
Схема распределенной системы
Логическая схема распределенной системы представлена на следующей диаграмме:
В центре кладет MobileObjectServer где экземпляры MobileObjets бег. Каждый MobileObject контролируется его Контроллер , это веб-приложение, через которое мы можем выдавать команды (например, ускорить, тормоз) к MobileObject Отказ Движение всех MobileObjects можно увидеть на одном или нескольких Мониторы Отказ Каждый Монитор снова это веб-приложение.
Следующая диаграмма показывает поток взаимодействия образца между одним Контроллер один Монитор, и MobileObjectServer Отказ
Требования к серверу с точки зрения событий
Мы можем выразить требования к серверу часть нашей распределенной системы с точки зрения событий:
- Event1. – Когда Контроллер connects => Создать Mobileobj т.к.
- Event2 – Когда Контроллер Получает команду => Переслать команду t HE MobilyObj ECT контролируется T Он контролирует лент
- Event3. – Когда Контроллер Отключить => Удалить т HE Mobilyobje CT контролируется T Он контролирует лент
- Event4. – Когда Монитор Connects => Начните отправлять данные динамики всех runni NG Mobileobje CTS к новому подключению Эд Мони топ
- Event5. — Когда MobileObject добавляется => Начните отправлять свои данные динамики всем монитор RS подключен
- Event6. – Когда Монитор Отключить => Перейти посылать потоки данных динамики для A LL Mobileobje CTS до Th в мони топ
Реактивное мышление создаст дизайн, который естественным образом отображает требования, выраженные таким образом.
Элементы, составляющие сервер
Серверный компонент распределенного приложения состоит из двух основных элементов:
- MobileObject класс, который реализует логику динамического движения с использованием RxJs наблюдаемого – это было подробно описано здесь
- MobileObjectServer класс , Что управляет протоколом Web-Socket, получая команды из Контроллер и отправка на Мониторы Вся информация о динамике MobileObject. Эта реализация была вдохновлена Эта статья от Луис Авилс .
Apis mobileobject
Давайте краткий обзор MobileObject Класс – все детали можно найти здесь Пока код можно найти в Этот репозиторий Отказ
MobileObject предлагает две семьи API.
Первый – это набор методов, через которые внешний Контроллер Может выдавать команды, которые влияют на динамику объекта (например, ускоряются, тормозные).
Второй – это потоки чтенных данных, которые связываются с внешними клиентами, Мониторы Соответствующие данные о динамическом поведении объекта (то есть его положение и скорость со временем).
Для того, чтобы переместить экземпляр A MobileObject , а Контроллер должен включить его (с помощью Turnon () методом), примените желаемое ускорение (с методами AccelRatex (ACC: номер) и Ускорение (счет: номер) ), а затем, возможно, тормоз (с методом тормоз () ).
Когда Монитор Подключается к MobileObjectServer , MobileObjectServer подписки к динамики и наблюдаемый из MobileObjects работает на сервере. Затем он начинает отправлять данные, связанные с их движением к подключенным Мониторы Отказ
Для этой статьи это все, что вам нужно знать о MobileObject Отказ
Розетки как наблюдаемые
MobileObjectServer Начинает что-то делать, когда клиент, либо Контроллер или Монитор открывает подключение WebSocket. В течение времени MobileObjectServer Можно получить много запросов открыть связь со многими клиентами.
Это похоже на наблюдаемое из розетки. Вот как получить его, используя Socket.io библиотека:
import { Server } from 'http';
import { Observable } from 'rxjs';
import { Observer } from 'rxjs';
import * as socketIoServer from 'socket.io';
import {SocketObs} from './socket-obs';
export function sockets(httpServer: Server, port) {
httpServer.listen(port, () => {
console.log('Running server on port %s', port);
});
return new Observable(
(subscriber: Observer) => {
socketIoServer(httpServer).on('connect',
socket => {
console.log('client connected');
subscriber.next(new SocketObs(socket));
}
);
}
);
} Через функцию розетки мы создаем наблюдаемое из Socketobs (Мы увидим реализацию этого класса позже). В любое время сервер WebSocket получает соединить Запрос и создает новый розетка наблюдаемый, возвращенный этой функцией, испускает экземпляр Socketobs который обернет розетка только что создал.
Сообщения над розетками как наблюдаемые
Розетки могут быть использованы для отправки сообщений от клиента на сервер и наоборот. С Socket.io Библиотека, мы можем отправлять сообщения, используя Эмит метод.
Socketio.socket.emit (событие: строка, … args: любой []): Socketio.socket.
Параметр событие можно рассматривать как идентификатор типа сообщения, который мы хотим отправить. ... args Параметры могут быть использованы для отправки данных, специфичных для одного сообщения.
Кто бы ни заинтересован в определенном типе сообщения (или события, использовать The Socket.io терминологию) может начать слушать сокет, используя метод на Отказ
Sockeio.emitter.on (событие: строка, FN: Функция): Socketio.emitter.emitter.
Опять же, последовательности сообщений, полученных приемником, выглядят как наблюдаемые. Это то, как мы можем создавать наблюдаемые, которые фактически выделяют в любое время, которое получено сообщение определенного типа.
OnMessagetype Метод – это тот, который делает трюк. Это возвращает наблюдаемый, который излучает в любое время сообщение о типе Мессагетип получен.
import { Observable, Observer } from 'rxjs';
export class SocketObs {
constructor(private socket: SocketIO.Socket) {}
onMessageType(messageType): Observable {
return new Observable((observer: Observer) => {
this.socket.on(messageType, data => observer.next(data));
});
}
} Таким образом, сокеты события или сообщения, как мы их называем здесь, были преобразованы в наблюдаемые. Это будут основы нашего дизайна.
Определить природу клиента
Есть два типа клиентов, которые могут соединиться с Mobileobjectserver. Один из них Контроллер И один это Монитор Отказ MobileObjectServer Сначала нужно определить, какой тип клиента будет иметь дело с определенным сокетом.
То, как мы решили реализовать такую логику, это иметь Контроллер и Монитор Отправьте разные типы сообщений в качестве первого сообщения.
- Контроллер Отправляет сообщение типа bind_controller
- Монитор Отправляет сообщение типа BIND_MONITOR
В зависимости от типа первого сообщения, полученного на розетке, MobileObjectServer умеет определить, сообщает ли он с Контроллер или Монитор Отказ
Как только розетка создана, MobileObjectServer Должен начать прослушивание как типов сообщений, bind_controller и bind_monitor. Первый произойдет победит. Это Гонка Между двумя наблюдаемыми, отображающими два разных типа сообщений.
Такая логика должна повторяться в любое время, когда создается новый сокет, это в любое время наблюдаемое, возвращенное функцией розетки излучает. Поэтому нам нужно объединить все события, которые выигрывают гонку. Нам нужно использовать Mergemap Оператор, который объединяет все события, поднятые вовлеченными наблюдателями, и выравнивают результаты в новый наблюдаемый ( mergemap ранее знал как flatmap ).
Код для получения этого результата является следующим:
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR),
socket.onMessageType(MessageType.BIND_CONTROLLER)
)
)
)
.subscribe();
}Теперь, когда мы знаем, как дифференцировать Контроллеры и Мониторы Мы можем сосредоточиться на том, что делать в этих двух случаях.
События, относящиеся для монитора
А Монитор Показывает движение всех MobileObjects которые работают на MobileObjectServer Отказ Итак, MobileObjectServer должен отправить правильную информацию мониторам в нужное время. Давайте сначала посмотрим, что те времена, то есть какие являются соответствующими событиями, которые MobileObjectServer должен быть в курсе, чтобы выполнить свою работу.
Добавление и удаление мобильных устройств
Первые соответствующие события:
- a MobileObject был добавлен => MobileObject показан на T Он мони топ
- a MobileObject был удален => MobileObject удаляется из T Он мони топ
MobileObjects добавляются или удалены со временем, поэтому такие события могут быть смоделированы с двумя наблюдателями:
- наблюдаемый, который излучает, когда MobileObject добавлен
- наблюдаемый, который излучает, когда MobileObject удален
Однажды А Монитор подключен, MobileObjectServer начинает интересоваться обоим этими наблюдателями, поэтому он должен слияние их:
Подобно тому, что мы видели раньше, нам нужно повторить такую логику в любое время a Монитор добавлен. Поэтому нам нужно Mergemap все наблюдаемые, которые являются результатом слияние из «Мобильный объект добавил» Наблюдаемый с «Мобильный объект удален» Наблюдаемый.
Это код для получения наблюдаемого, который излучает в любое время A MobileObject должен быть добавлен или удален из каждого Монитор:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject();
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded;
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
}
Мы представили несколько вещей с этим кодом, которые стоит комментировать здесь.
Мы создали MobileObjectServer Класс, который будет местом, где мы будем кодировать все нашу логику сервера отныне.
Метод HandlemonitorBobs , что мы собираемся обогатить позже, возвращаются Просто слияние двух наблюдаемых, mobileobjectdded и MobileObjectRemureded , которые являются предметами. Это «внутреннее» слияние показано на рисунке выше.
Субъекты являются наблюдаемыми, и поэтому могут быть объединены, как мы здесь делаем. Но субъекты также наблюдатели, поэтому мы можем излучать события через них. Как мы увидим позже в коде, будет время, когда мы будем использовать эти предметы, чтобы излучать события, которые предлагают их имена.
Последний пункт связан с кодом, который мы добавили в методе StartSocketerVer:
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (sObs: SocketObs) => this.handleMonitorObs(sObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)Это в основном способ сказать: в любое время получено сообщение Bind_monitor, вернуть функцию
(socketObs: SocketObs) => this.handleMonitorObs(socketObs)
который будет выполнен в рамках Mergemap Оператор работает в результате Гонка функция. Это Mergemap Оператор является внешним Mergemap показано на рисунке выше.
Другой способ прочитать код: следующее: любое событие, соответствующее сообщению типа BIND_MONITOR, преобразуется логикой
mergeMap(() => this.handleMonitorObs(socket))
где розетка является примером типа Socketsobs выделяется Гонка функция.
Вскоре мы добавим что-то подобное для корпуса Bind_Controller, чтобы сделать всю эту логику.
Рукоятка Dynamics MobileObject наблюдается
Давайте рассмотрим один Монитор который соединяется с MobileObjectServer Отказ После подключения добавляется пара мобильных устройств в MobileObjectServer Отказ
Теперь для каждого MobileObject, Мы должны начать рассматривать динамику Наблюдаемые они предлагают как часть своих API. Эти наблюдаемые данные выделяют, по обычным интервалам времени, данные о динамике (положении и скорости) MobileObject Отказ Если MobileObject хранит ссылку на Mobileobject. мы можем получить свою динамику, наблюдаемую через mobileobject.dynamicsobs (См. APIS MobileObject).
Сначала мы должны преобразовать каждое событие, представляющее факт факта что а MobileObject был добавлен в серию событий, излучаемых его динамики Отказ Тогда мы Mergemap Все эти серии в новом наблюдаемом одном, что излучает все динамические события для всех добавленных мобильных устройств.
Затем мы применяем все это джаз для всех Мониторы которые подключаются к Mobileobjectserver. Поэтому мы в конечном итоге с новым наблюдаемым, который излучает данные динамики для всех Мониторы И все MobileObjects (плюс все события, связанные с тем, что MobileObject был удален).
Для каждого временного интервала у нас есть группы из четырех событий, связанных с выбросом данных о динамике наших MobileObjects Отказ Почему? Это имеет смысл, если мы думаем, что у нас есть два Мониторы и два MobileObjects. . Каждый MobileObject должен отправить данные динамики каждому Монитор за каждый раз промежуток времени. Поэтому правильно видеть четыре события за каждый интервал времени.
Как только это понятно, код очень прост:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject();
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
mergeMap(data => data.mobileObject.dynamicsObs)
);
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
}
Мы только что ввели одно простое изменение. Мы изменили расчетные материалы Способ добавить Mergemap оператор. Это преобразует mobileobjectdded Наблюдаемый так, чтобы новый наблюдаемый излучает данные динамики, которые мы ищем.
Остальные остались нетронутыми.
Пока что резюме
Что мы сделали до сих пор? Мы только что преобразовали наблюдаемые, чтобы получить новые наблюдаемые, которые выделяют все события MobileObjectServer заинтересован в том случае, когда он должен иметь дело с Монитор Отказ Ничего больше.
Вы можете увидеть, как эти преобразования отражены в коде в следующем изображении:
Единственное, что нам нужно сделать сейчас, это добавить желаемую побочные эффекты к соответствующим событиям. В конечном итоге это позволит нам достичь того, что мы хотим, то есть связь с мониторингом правильной информации в нужное время.
Но прежде чем переехать в побочные эффекты Давайте понять, что MobileObjectServer нужно делать при взаимодействии с Контроллер другой клиент в нашей распределенной системе.
События, относящиеся для контроллера
Когда Контроллер Подключается к MobileObjectServer Есть меньше вещей, что сервер должен заботиться о. По крайней мере, существует меньше вложенных соответствующих событий.
Вещи, которые MobileObjectServer нужно заботиться о том:
- А Контроллер подключил, что в нашей простой логике означает, что мы должны создать совершенно новый Mobileobject.
- Контроллер отправил команды для его Mobileobject.
- Контроллер отключился. В нашей реализации это означает, что мы как-то должны удалить MobileObject контролируется Контроллер (У нас есть отношения с 1 до 1 между MobileObject и его Контроллер Несомненно
Мы уже знаем первое событие: это излучается наблюдаемым, возвращенным Socket.onmessageType (bind_controller) .
Команды отправляются Контроллер к MobileObjectServer в виде сообщений. Таким образом, мы можем создать наблюдаемые команды, полученные в течение определенного розетка ( получено от определенного контроллера), поскольку каждый контроллер имеет свой разъем. Мы делаем это, просто используя OnMessagetype метод Socketobs
socket.onMessageType(CONTROLLER_COMMAND)
Socketobs Также предлагает метод, OnDisconnect , который возвращает наблюдаемый, который излучает, когда розетка отключен. Это то, что нам нужно, чтобы иметь дело с третьим событием.
Так как мы имеем дело с более чем одним Контроллер потенциально подключение к MobileObjectServer не должно удивить вас узнать, что нам нужно Mergemap результат слияние . Это тот же тип преобразования, который мы уже сделали несколько раз.
Код также не должен быть неожиданным.
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
.pipe(
map(() => (socketObs: SocketObs) => this.handleControllerObs(socketObs))
),
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
mergeMap(data => data.mobileObject.dynamicsObs)
);
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
handleControllerObs(socket: SocketObs) {
const commands = socket.onMessageType(MessageType.CONTROLLER_COMMAND);
const disconnect = socket.onDisconnect();
return merge(commands, disconnect);
}Мы просто добавили HandlecontrollerObs Метод, который занимается Команды получены и Отключить контроллера. Мы применим преобразование Mergemap к этому, как мы уже сделали с расчетные материалы Отказ
Сводка преобразований, применяемых к контроллерам
Следующая диаграмма иллюстрирует все преобразования, которые мы применили, начиная с наблюдаемого, которая испускает, когда Контроллер соединяет.
Окончательный наблюдаемый
Если мы собрали преобразования, которые мы сделали для обоих Мониторы и Контроллеры, То, что мы получаем, является следующим окончательным наблюдаемым.
Просто подписываясь на этот один окончательный наблюдаемый, все дерево событий разворачивается.
Побочные эффекты
Красивое дерево событий, которое мы создали, подписавшись на окончательный наблюдаемый, ничего не делают. Но это делает хорошую работу по сопоставлению События Мы определили при описании требований сервера в начале этой статьи.
В основном это говорит нам ясно, когда мы должны сделать что-то Отказ
Это что-то это то, что мы называем побочный эффект .
Когда контроллер подключается и отключает, мы соответственно создаем или удалите MobileObject Отказ Как побочный эффект Из этих действий мы поднимаем « MobileObject добавлено» и “MobileObject удалено” События с использованием mobileobjectdded. и MobileObjectRemureded Субъекты мы представляем некоторые пункты назад.
Как реализовать побочные эффекты
В RXJS есть разные способы реализации побочные эффекты Отказ
Наблюдатели – это один. Мы можем добавить наблюдателей, пока мы Подписаться используя Нажмите Оператор (ранее знаешь как сделать ).
Другой способ заключается в том, чтобы ввести их в любой функции, переходя к любому оператору RXJS.
Мы в основном собираемся использовать Нажмите Поскольку это позволяет нам размещать побочные эффекты на протяжении всего дерева событий. Но мы также собираемся разместить побочные эффекты непосредственно внутри функций, которые мы переходим к операторам RXJS.
Единственное место, которое мы не ставят побочные эффекты, это подписываться . Причина в том, что, учитывая, как мы его построили, последний наблюдатель излучает множество различных типов событий. Поэтому Подписаться , который работает так же для всех событий, не правильное место для поведения поведения, которое зависит от определенных типов событий.
Надеюсь, на этот пункт кодовый вид говорит о себе.
Последнее, но не в последнюю очередь: завершение наблюдаемых
Есть одно, что нам все еще нужно сделать, чтобы завершить наш дизайн: остановить потоки событий или завершить наблюдаемые, когда либо Контроллер или Монитор отключить.
Когда контроллер отключается
Когда контроллер отключается, мы удаляем MobileObject это контролирует. Как часть удаления, важно убедиться, что MobileObjectServer Перестает отправлять данные динамики, связанные с этим MobileObject к подключенным мониторам. Это означает, что мы должны заполнить следующие наблюдаемые:
mobObjInfo.mobObj.dynamicsObs .pipe( tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)), )
Мы можем легко добиться этого, просто используя доставка Оператор вместе с MobileObjectRemured Наблюдаемый мы уже знаем:
mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
takeUntil(this.mobileObjectRemoved.pipe(
filter(id => id === mobObjInfo.mobObjId)
))
)доставка гарантирует, что наблюдаемый завершен, когда наблюдаемый прошел как параметр для доставка излучает.
MobileObjectRemureded выделяет каждый раз a MobileObject удален. Тем не менее, мы хотим, чтобы перестать отправлять информацию о динамике, когда конкретный MobileObject идентифицируется по его идентификатору, удаляется. Итак, мы добавляем Фильтр логика.
Когда монитор отключается
В этом случае мы также можем использовать доставка Отказ
Мы знаем, когда монитор отключается, потому что розетка , типа Socketobs , связанные с ним излучает через socket.ondisconnect () Наблюдаемый. Так что нам нужно сделать, это прекратить отправку информации о динамике, когда socket.ondisconnect () излучает.
Поэтому окончательная логика для управления завершением наблюдаемой
mobObjInfo.mobObj.dynamicsObs .pipe( tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)), takeUntil(this.stopSendDynamics(socket, mobObjInfo.mobObjId)) )
куда
private stopSendDynamics(socket: SocketObs, mobObjId: string){
return merge(
this.mobileObjectRemoved.pipe(
filter(id => id === mobObjId)
),
socket.onDisconnect()
);
}И именно то, как ядро кода, реализующего нашу логику:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject();
public startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
.pipe(
map(() => (socketObs: SocketObs) => this.handleControllerObs(socketObs))
),
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
private handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
tap(mobObjInfo => socket.send(MessageType.MOBILE_OBJECT, mobObjInfo.mobObjId)),
mergeMap(mobObjInfo => mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
takeUntil(this.stopSendDynamicsInfo(socket, mobObjInfo.mobObjId))
)
)
);
const mobObjRemoved = this.mobileObjectRemoved
.pipe(
tap(mobObjId => socket.send(MessageType.MOBILE_OBJECT_REMOVED, mobObjId)),
);
return merge(mobObjAdded, mobObjRemoved);
}
private handleControllerObs(socket: SocketObs) {
const {mobObj, mobObjId} = this.newMobileObject();
this.mobileObjectAdded.next({mobObj, mobObjId});
const commands = socket.onMessageType(MessageType.CONTROLLER_COMMAND)
.pipe(
tap(command => this.execute(command, mobObj))
);
const disconnect = socket.onDisconnect()
.pipe(
tap(() => this.mobileObjectRemoved.next(mobObjId)),
);
return merge(commands, disconnect);
}
private stopSendDynamicsInfo(socket: SocketObs, mobObjId: string) {
return merge(this.mobileObjectRemoved.pipe(filter(id => id === mobObjId)), socket.onDisconnect());
}
}
Заключение
Это было довольно долгое путешествие. Мы видели некоторые рассуждения, вызванные реактивным мышлением и некоторыми реализациями этого рассуждения.
Мы начали преобразовывать события WebSockets в наблюдаемые. Затем, применяя дополнительные преобразования, мы завершили создание одного наблюдаемого, что после подписки, разворачиваем все события, которые нас интересуют.
На данный момент добавляя побочные эффекты, которые позволяют нам достичь нашей цели было простым.
Этот умственный процесс дизайна, который постепенно постепенно, является смысл, который я даю «реактивное мышление».
Полная база кода, включающая контроллер сервера и монитор, можно найти здесь Отказ
Оригинал: “https://www.freecodecamp.org/news/reactive-thinking-how-to-design-a-distributed-system-with-rxjs-websockets-and-node-57d772f89260/”