Reactive Push, Composition и UI на примере Reactive Stocks - 1
Реактивное программирование — парадигма программирования, основанная на потоках данных и распространении изменений.
Реактивное приложение - приложение, характеризующееся следующими свойствами:
-
ориентированность на события
-
масштабируемость (способность к увеличению производительности при добавлении ресурсов)
-
отказоустойчивость и отзывчивость (малое время отклика)
-
способность работать в реальном времени (наличие гарантированного время отклика, не зависящее от нагрузки).
Приложения, использующие асинхронную модель, характеризуются также слабой связанностью - отправитель и получатель могут быть реализованы без оглядки на детали, как именно события распространяются в системе, и реализация больше фокусируется на содержимом передачи, т.е. на контракте сообщений.
Дополнительным преимуществом асинхронной модели, т.е. основанной на передаче сообщений, а не синхронных вызовах, является то, что вызывающий поток не блокируется, как при модели синхронных вызовов, а продолжает выполнение. Неблокирующее приложение обладает меньшими задержками и большей пропускной способностью по сравнению с приложением, основанном на блокирующей синхронизации - поскольку мы более эффективно используем ресурсы процессора.
Корнем слова “реактивный” является слово “react”, т.е. реагировать, отзываться, т.е. можно говорить, что реактивные приложения:
-
реагируют на события — событийно-ориентированные;
-
реагируют на загрузку — масштабируемые;
-
реагируют на отказы — отказоустойчивые.
Когда говорят, что запросы являются реактивными, имеют в виду, что они являются асинхронными, т.е. неблокирующими.
Приложение Reactive Stocks демонстрирует 4 аспекта реактивного программирования: реактивный push, реактивные запросы, композицию реактивных запросов, и реактивный интерфейс.
Исходники можно также просмотреть тут.
В нем не показана реактивная pull - модель, при которой клиент периодически запрашивает данные, ожидая их появления, с помощью реактивных запросов; в отличие от push-модели, при pull-модели запрос инициируется клиентом.
В случае реактивного push-запроса сервер “проталкивает” данные к своим потребителям незамедлительно после того, когда данные становятся доступны, вместо того, чтобы заставлять клиента впустую тратить ресурсы, постоянно запрашивая и ожидая данные.
push лучше чем pull, поскольку мы не делаем лишних запросов с клиента - но это при условии, что клиент достаточно быстро обрабатывает push запросы с сервера и при условии, что и сервер, и клиент поддерживают такой тип взаимодействия (например, с помощью WebSocket).
Реактивная композиция - комбинирование асинхронных неблокирующих запросов; например, у нас есть запрос к вызову, реализованному асинхронным Play-контроллером, и этот вызов, асинхронный по своей природе, приводит к двум дополнительным параллельным асинхронным вызовам к двум разным веб сервисам, результат которых нужно объединить и вернуть клиенту.
Существует еще одна модель - 2-сторонняя реактивность - что на самом деле означает двусторонний реактивный push.
Вообще, в последние годы возник ряд технологий, поддерживающих создание реактивных приложений (Microsoft Reactive extensions, Ractive.js и др.).
Среди прочего, одной из основных причин, по которым создание реактивных приложений стало востребованным, можно назвать повышение требований к отзывчивости HTML5 интерфейсов, а также также значительный рост числа запросов к веб-приложениям и сервисам. К тому же, например, у мобильных устройств несколько ограничены возможности для pull-запросов, соответственно push-модель, основанная на WebSocket’ах, является для них более подходящей.
Ну или пример из еще один жизни реактивных интерфейсов - одновременные изменения в issue на github видны одновременно всем пользователям без перезагрузки страницы.
И еще пару слов о том, что, возможно, покажется малосвязанным напрямую с реактивным программированием, но косвенно с ним связано - развертывание, или деплоймент, и мониторинг. Деплоймент, при котором время даунтайма стремится к нулю, и мониторинг, который позволяет не допустить снижения отзывчивости или вообще отказа системы, являются довольно важными вещами для обеспечения “реактивности” приложения.
Например, беcконтейнерный деплоймент (а именно так может работать Play-приложение, т.е. безо всякого контейнера), потому что он неплохо ложится в парадигму непрерывного развертывания continuous delivery, т.е. для развертывания часто достаточно простого копирования, вместо манипуляций со сложной инфраструктурой серверов приложений.
Некоторые дополнительные приемы, призванные снизить время неработоспособности приложения:
- “Canary deployments”
- “Rolling updates”
- “Различные приемы для миграции схемы БД в условиях непрерывного развертывания”
В качестве инструмента мониторинга может быть использована Typesafe Console - которая мониторит события в реактивном приложении, написанном с использованием Play и Akka. Также, мониторинг для приложений, написанных с использованием Scala/Akka/Play, поддерживается в New Relic - с использованием специального агента. Поддерживается он в Takipi и AppDynamics
Reactive Stocks
Итак, приложение Reactive Stocks. На самом деле приложение Reactive Stocks является шаблоном в Typesafe Activator, но при этом не является совсем уже примитивным.
Приложение Reactive Stocks написано на Scala и Java с использованием Play и Akka с целью показать на сравнительно простом примере реактивный подход. В частности, Reactive Composition и Reactive Push.
Идея приложения довольно проста - на каждой открытой странице оно показывает набор графиков котировок, и значения цен для этих котировок “проталкиваются” от сервера в клиент с помощью WebSocket. В данном случае значения фейковые, генерируемые случайным образом. Reactive Stocks написано с использованием Play и Akka, причем бэкенд частично написан на Java, а не только на Scala - с целью показать, насколько оба языка просто сосуществуют в рамках одного приложения. Фронтенд использует CoffeeScript в качестве клиентского языка, и WebSockets в для push-запросов.
В Reactive Stocks используются четыре вида “реактивности”: реактивный push, реактивные запросы, реактивная композиция, и реактивный UI. Ни реактивный pull, ни 2-сторонняя реактивность в Reactive Stocks не используются, хотя в реальном приложении, которое бы показывало котировки и использовало бы реальный источник биржевых котировок (а не генерирующий последовательность случайных значений), скорее всего, использовался бы реактивный pull - поскольку большинство веб сервисов котировок реализовано как REST или SOAP веб-сервисы, и не используют WebSockets.
Реактивный push
Приложение использует WebSocket, для того чтобы “втолкнуть” данные о котировках в клиента, т.е. браузер. Для создания соединения WebSocket
в Play, сперва должен быть определен маршрут (route) в файле conf/routes
, а именно:
GET /ws controllers.Application.ws
Метод ws
в контроллере Application.java
создает объект WebSocket
, принимающий запросы на отслеживание котировок и отсылающий значения котировок обратно; WebSocket
также создаст UserActor
(на каждую сессию с пользователем, фактически страницу, создается свой WebSocket
, а следовательно - UserActor
) и передаст в него ссылку на out-канал WebSocket
для обратной связи.
После того, как UserActor
создан, набор котировок по умолчанию (который определяется параметром default.stocks
в файле конфигурации application.conf
) добавляется в список отслеживаемых для данной сессии котировок.
Каждая котировка (обозначаемая уникальным символом - например, GOOG
или ORCL
) соответствует одному StockActor
у. StockActor
держит последние 50 значений цен котировки. В ответ на сообщение FetchLatest
можно получить всю историю цен. В ответ на сообщение FetchLatest
будет получена новая цена путем вызова метода newPrice
в StockQuote
- источнике цен. Каждый StockActor
отсылает сообщение FetchLatest
самому себе каждые 75 миллисекунд. Как только получено новое значение цены, оно добавляется в историю цен (фактически очередь значений, Queue
) и оно же рассылается всем подписчикам, то есть всем UserActor
ам, которые отслеживают котировки. UserActor
сериализует сообщение о ценах в JSON и “проталкивает” это сообщение в клиента с помощью WebSocket
.
Если описать приложение ReactiveStocks на уровне классов, то получится примерно следующее:
Диаграмма последовательности, показывающая взаимодействие компонент с помощью сообщений, выглядит так:
Если описать словами, как события распространяются в системе и какие участники задействованы, то получится примерно следующее:
index.coffee
В index.coffee
(то есть на клиенте, браузере) вызывается $
(синоним onLoad
в JQuery), в котором создается канал двусторонней связи , основанный на WebSocket, связывающий клиент с вызовом /ws
, т.е. фактически с вызовом статического метода controllers.Application.ws
(см. также файл routes
). controllers.Application
) является основным веб-контроллером, который:
- возвращает индексную страницу (представленную темплейтом
index.scala.html
); - Создает
WebSocket
, т.е. устанавливает двусторонний канал связи на стороне сервера; - Создает
UserActor
для каждого соединения с пользователем, т.е.,WebSocket
а (каждой открытой странице соответствуетWebSocket
), и передаетWebSocket
у ссылку наUserActor
. - Принимает запросы на отслеживание котировки для переданного символа.
- Инициирует отписку от обновлений котировок.
Application и WebSocket
- В методе
controllers.Application.ws
, создается объектWebSocket
. В последнем есть методonReady
, который является коллбеком инициализации и в который передаются 2 объекта - соответственно типовWebSocket.In
иWebSocket.Out
- т.е. входной и выходной каналыWebSocket
а. Также вonReady
первым делом инстанциируетсяUserActor
, в который передается выходной каналWebSocket
а для дальнейшей обратной связи. - На каждый
WebSocket
приходится ровно одинUserActor
. - В ответ на запрос пользователя об отслеживании котировки, присылаемый через входной канал
WebSocket.In
,WebSocket
генерирует сообщениеWatchStock
, содержащее символ котировки к акторуStocksActor
(который существует в единственном числе и является родительским актором по отношению к группеStockActor
ов, каждый из которых соответствует какой-то одной котировке, например, “GOOG”), указываяuserActor
в качестве отправителя, с тем чтобы ответные сообщения направлялись непосредственно вuserActor
. - Получив сообщение
WatchStock
,StocksActor
(который один и родитель) извлекает изcontext
‘а (который имеет типActorContext
) один изStockActor
ов - соответствующий переданному символу котировки, и если такого нету - создает его, а потом переадресовывает ему сообщениеWatchStock
. - У входного канала веб-сокета есть обработчик
onClose
; вызывается он в случае, если соединение с клиентом прервано. В нем отсылается сообщениеUnwatchStock
акторуStocksActor
(который родитель), который форвардит его всем своимStockActor
ам, и последние удаляют отправителя (UserActor
) из списка подписчиков, и если число подписчиков равно нулю - тоStockActor
останавливает себя.
StocksActor
StocksActor
является актором-родителем для акторов StockActor
:
- На сообщение
WatchStock
он извлекает из контекста или создает соответсвующий переданному символу котировкиStockActor
, и форвардит сообщение этому актору; - На сообщение
UnwatchStock
- форвардит сообщение соответствующемуStockActor
у - если сообщениеUnwatchStock
содержит символ котировки; - Если сообщение
UnwatchStock
не содержит символа, то сообщениеUnwatchStock
форвардится всемcontext.children
.
StockActor
Каждый StockActor
содержит множество подписчиков изменений котировок, хранятся они в поле watchers
. В поле stockHistory
содержится история изменений цен, а поле stockQuote
есть тот самый сервисный объект, который собственно, значения цен и возвращает. Конкретно в данном случае, он инициализируется моковой реализацией - FakeStockQuote
, который генерирует случайные значения цены. Поле stockTick
- планировщик, который периодически высылает сообщение FetchLatest
каждые 0.075 секунды. При инициализации StockActor
сразу получает 50 начальных случайных значений цены для истории цен; каждое новое значение добавляется в конец очереди истории цен, а самый старый элемент удаляется, и таким образом размер очереди никогда не превышает 50 элементов.
- Сообщение
FetchLatest
регулярно высылается с помощью планировщикаstockTick
; в ответ на это сообщениеStockActor
опрашиваетstockQuote
и добавляет полученную цену кstockHistory
(удаляя первый элемент, чтобы очередь не превышала 50 элементов), и затем высылает сообщениеStockUpdate
, содержащее символ и новое значение цены всем своим подписчикам (которые являются акторами типаUserActor
). - Поучив сообщение
WatchStock
,StockActor
высылает сообщениеStockHistory
с 50 последними ценами отправителю сообщенияWatchStock
, и добавляет отправителя к множеству подписчиков (watchers
) - собственно, механизм подписки реализован на сообщенияхWatchStock
. - В ответ на сообщение
UnwatchStock
,StockActor
отписывает отправителя (т.е., удаляет его из множестваwatchers
). Если больше не остается ни одного подписчика, тоStockActor
останавливает сначала планировщикstockTick
, а затем и себя.
UserActor
UserActor является подписчиком на сообщения StockUpdate
and StockHistory
от актора StockActor
, и содержит out-канал WebSocket
а, используемого для коммуникации с клиентом; UserActor
может послать 2 вида JSON сообщений, а именно: stockupdate
- который содержит символ котировки и значение цены, и stockhistory
- который содержит опять-таки символ и массив из 50 последних значений котировки.
- Первое, что
UserActor
делает после инициализации - он вычитывает список исходных котировок из конфигурационного параметра"default.stocks"
и высылает сообщениеWatchStock
для каждого из вычитанных символов - чтобы подписаться на котировки из актораStocksActor
. - Если
UserActor
получает сообщениеStockUpdate
от одного изStockActor
ов,UserActor
конвертирует сообщение в другое, JSON-сообщениеstockupdate
, которое понимает клиент, и отсылает его через out-каналWebSocket
а - т.е. проталкивает (push) его в клиентскую часть. - Когда
UserActor
получает сообщениеStockHistory
от одного изStockActor
ов,UserActor
затем конвертирует его в JSON-сообщениеstockhistory
, в котором, помимо символа, хранится массив последних значений цены и записывает его в выходной каналWebSocket
а.