Потоки vs акторы - II
Преимущества и особенности использования акторов
####Отсутствие разделяемой памяти####
Использование многопоточности с передачей сообщений снимает проблемы, связанные с использованием разделяемой памяти. Модифицируемое состояние, которое содержится в акторе, недоступно напрямую другим акторам. Поскольку акторы обрабатывают только одно сообщение за раз, то мы автоматически получаем потокобезопаcность.
####Управление потоками####
Акторы, как и Executor
‘ы, частично снимают с нас ответственность за создание и управление потоками. Работая с акторами, мы знаем только о жизненном цикле актора. Например, мы можем организовать распределение задач по ядрам, назначив по актору на ядро процессора и выделив маршрутизатор, который бы распределял нагрузку среди группы акторов. Тем более что сделать это очень просто - например, пусть у нас есть класс актора SimpleActor
, принимающий на вход некоторый простейший Message
, содержащий строку. Ниже приведен пример создания 10 акторов с роутером RoundRobinRouter
- и затем отсылающий 10 сообщений этому роутеру. Как результат, роутер распределит по кругу сообщения акторам. Как видно, роутер - это тоже актор, который перенаправляет сообщения другим, стоящим за ним акторам, которые называются ‘routees’ - т.е. акторы, для которых осуществляется маршрутизация.
val system = ActorSystem("SimpleSystem")
val simpleRouted = system.actorOf(Props[SimpleActor].withRouter(
RoundRobinRouter(nrOfInstances = 10)
), name = "simpleRoutedActor")
for (n <- 1 until 10) simpleRouted ! Message("Hello, Akka #%d!".format(n))
Если же нам нужно создать пул акторов, которые бы разгребали очередь сообщений и работали наподобие ForkJoin
пула, то мы можем создать диспетчер (dispatcher) с перехватом работы (Work Stealing Dispatcher), который базируется на ForkJoinPool
, и осуществляет балансировку загрузки для тех акторов, для которых он используется:
val workStealingDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
.workStealingDispatcher
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(8)
.buildThreadPool
И вообще, в Akka есть разные виды диспетчеров: событийно-ориентированные, потоко-ориентированные, с балансировкой, а-ля ForkJoin
и др.
####Надежность####
Акторы помогают улучшить надежность системы. Например, мы можем настроить стратегию супервизора таким образом, что упавший актор будет перезапущен, и решение об этом может принимать актор-родитель, а необязательно непосредственно сам актор. Например, пусть у нас есть 3 актора в иерархии, родитель, дочерний актор и дочерний актор дочернего актора. Родитель переопределяет supervisorStrategy
:
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries =
10, withinTimeRange = 1 minute) {
case _: ConfigurationException => Stop
case _: Exception => Restart
}
Дочерний актор и его собственный дочерний актор тоже переопределяет supervisorStrategy
, но делает это по-другому:
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries =
10, withinTimeRange = 1 minute) {
case _: ConfigurationException => Escalate
case _: Exception => Restart
}
В случае, если в дочернем акторе или его дочернем акторе выбрасывается исключение ConfigurationException
, оно будет проброшено в корневой актор-родитель, который остановит оба подчиненных актора.
####Распределенность####
Akka довольно легко конфигурируется для работы в распределенном варианте, при этом в коде мало что меняется, все взаимодействия по-прежнему остаются все той же передачей сообщений. Т.е. прозрачность в этом отношении такова, что мы используем один и тот же API, будь это система акторов внутри одной JVM или кластер из тысячи машин. Все решает конфигурация, и еще нужно добавить один дополнительный jar - akka-remote
. Как минимум, в файле application.conf
мы должны указать нечто вроде:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
Т.е. вкратце, для перехода от нераспределенной конфигурации к распределенной, нужно:
-
изменить
akka.actor.LocalActorRefProvider
наakka.remote.RemoteActorRefProvider
. -
Добавить имя хоста - это имя будет использоваться для дальнейшей идентификации данной системы акторов.
-
Указать номер порта, который будет слушать данная система акторов.
И затем получить уже существующий актор на удаленном хосте или создать оный можно соотвественно с помощью методов actorSelection(path)
или actorOf(Props(...), actorName)
####Управление сложными изменениями состояний####
При моделировании системы с большим количеством переходов из одного состояния в другое, которое возникает при нетривиальном взаимодействии в многопоточной системе, могут помочь конечные автоматы (Finite State Machines), поддержка которых есть в Akka. Вообще, конечные автоматы позволяют реализовать поддержку набора сложных правил и условий. Конечные автоматы позволяют структурировать процесс управления состоянием акторов и запрограмировать сложную логику их взимодействия. Вместо того, чтобы использовать все переменные приложения в качестве расширенного определения его состояния, конечный автомат создает единственную переменную, в которой хранится информация о состоянии приложения. Такой переменной обычно является элемент перечисления некоторого множества действительных состояний.
Мощь подхода, использующего конечные автоматы, обусловлена тем, что он позволяет в явном виде определить действительные состояния для некоторого аспекта приложения и задать соответствующие варианты поведения при переходах приложения из одного состояния в другое.
Например, пусть дан поток нулей и единиц, есть таблица переходов:
| input
| 0 | 1
state --------
s0 | s0 | s1
s1 | s0 | s2
s2 | s0 | s3
Актор, реализующий эту таблицу переходов с помощью Akka FSM, будет выглядеть приблизительно так:
class CounterActor extends Actor with ActorLogging {
override def receive =
s0 // Define initial state of Actor
def s0: PartialFunction[Any, Unit] = {
case ONE => context.become(s1) // move to `s1` state upon receiving a One
case ZERO => () // do nothing upon receipt of a Zero
case TerminateStream => sender ! State0 // reply to whomever sent this messag, i.e. TerminateStream,
// that the current state is 'State0'
case badInput => log.error(makeErrorMsg(badInput))
}
def s1: PartialFunction[Any, Unit] = {
case ONE => context.become(s2)
case ZERO => context.become(s0)
case TerminateStream => sender ! State1; context.become(s0)
case badInput => log.error(makeErrorMsg(badInput))
}
def s2: PartialFunction[Any, Unit] = {
case ONE => context.become(s3)
case ZERO => context.become(s0)
case TerminateStream => sender ! State2; context.become(s0)
case badInput => log.error(makeErrorMsg(badInput))
}
Если все-таки нужно использовать разделяемую память
Память у потоков в пределах одного процесса является общей, так что использование разделяемой памяти для обмена данными между ними не является столь уж противоестственным, к тому же иногда это эффективнее с точки зрения производительности. Но, разумеется, использование разделяемой памяти имеет определенные сложности, о которых говорилось ранее - необходимость синхронизации для устранения конфликтов доступа.
Здесь, конечно, многие проблемы может решить уже рассмотренный подход, основанный на передаче сообщений. К сожалению, Message passing concurrency, хоть и является подходом, который может применятся очень широко, иногда не является вполне соответствущим решаемой задаче. При многопоточном программировании, управляемом данными, в центре всего находятся данные, на изменения в которых должны вызываться обработчики. В этом случае передача сообщений не совсем подходит.
Затем, нужно понимать, что Message passing concurrency является по своей природе асинхронным. Если нам нужна синхронность, то использование асинхронных сообщений лишь добавляет лишнего кода, и выигрыш может быть не совсем очевиден, поскольку решение по своей сложности может приблизиться к многопоточному решению с использованием блокировок (поскольку для эмуляции синхронности, там, где в обчыном случае у нас был бы просто вызов метода, мы должы послать сообщение и организовать обработку ответного сообщения).
Также, при использовании Message passing concurrency могут возникать свои специфические проблемы, а именно:
-
Если мы имеем цепочку сообщений, передаваемых от актора к актору и приводящих к изменению их состояния, но при этом мы должны быть уверены, что вся цепочка дошла до адресатов и необходимые изменения были произведены, т.е. фактически мы говорим о транзакции.
-
Если мы должны обращаться к одной структуре данных из нескольких акторов.
-
Если у нас есть хранилище, которое менятся из нескольких акторов.
Компромиссным подходом, призванным частично решить проблемы многопоточного программирования с использованием разделяемой памяти, являтся т.н. “Программная транзакционная память” (Software Transactional Memory, или STM). Первая реализация данного подхода была осуществлена в Haskell. Сам подход зарекомендовал себя как сравнительно надежный и простой в использовании. Доступ к разделяемой памяти, т.е. чтение или запись, осуществляется в т.н. “атомарных” (atomic) блоках.
В Akka есть поддержка этой парадигмы. Akka STM фактически превращает динамическую память JVM в транзакционный источник данных с семантикой begin/commit/rollback
- очень похоже на обычную транзакционную БД, только без хранилища. Можно сказать, что она поддерживает три первые буквы в слове ACID;
-
Атомарность (atomicity): или все изменения, сделанные во премя выполнения транзакции, применяются, или ни одно из них. Это касается только транзакционных структур данных.
-
Согласованность (consistency): транзакция должна оставить данные в согласованном состоянии. В Akka уровень изоляции соотвествует уровню SERIALIZABLE в СУБД Oracle.
-
Изолированность (isolation): изменения, сделанные в других транзакциях, не должны быть видны в данной транзакции.
Ниже приведен простой пример изменения целочисленного счетчика использованием STM. Здесь мы создаем Ref
, транзакционую ссылочную переменную, и затем мнеяем ее в контексте транзакции, обозначенной секцией atomic
:
import akka.stm._
val ref = Ref(0)
def counter = atomic {
ref alter (_ + 1)
}
counter
// -> 1
counter
// -> 2