Недавно я начал погружаться в CQRS / ES, потому что мне может понадобиться применить его на работе. Это кажется очень многообещающим в нашем случае, поскольку это решило бы много проблем.
Я набросал свое грубое понимание того, как приложение ES / CQRS должно выглядеть в контексте с упрощенным банковским сценарием (снятие денег).
Чтобы подвести итог, если человек А снимает деньги:
- команда выдана
- команда передана для проверки / подтверждения
- событие передается в хранилище событий, если проверка прошла успешно
- агрегатор удаляет событие, чтобы применить изменения к агрегату
Из того, что я понял, журнал событий является источником правды, так как это журнал ФАКТОВ, мы можем затем извлечь из него любую проекцию.
Теперь, что я не понимаю, в этой великой схеме вещей, что происходит в этом случае:
- Правило: баланс не может быть отрицательным
- человек А имеет баланс 100e
- человек A выдает команду на снятие 100e
- проходит валидация и отправляется событие MoneyWithdrewEvent 100e
- тем временем, человек А выпускает другую команду WithdrawCommand со значением 100e
- первый MoneyWithdrewEvent еще не был агрегирован, поэтому проверка проходит успешно, так как проверка проверки по совокупности (которая еще не была обновлена)
- MoneyWithdrewEvent of 100e выдается в другой раз
==> Мы находимся в несогласованном состоянии с балансом -100e, а журнал содержит 2 MoneyWithdrewEvent
Как я понимаю, есть несколько стратегий, чтобы справиться с этой проблемой:
- a) поместите агрегированный идентификатор версии вместе с событием в хранилище событий, чтобы при несоответствии версий при модификации ничего не происходило
- б) использовать некоторые стратегии блокировки, подразумевая, что уровень проверки должен каким-то образом создать
Вопросы, связанные со стратегиями:
- а) В этом случае журнал событий больше не является источником правды, как с этим бороться? Кроме того, мы вернулись к клиенту ОК, тогда как было разрешено снятие средств, лучше ли в этом случае использовать блокировки?
- б) замки == тупики, есть ли у вас какие-либо идеи о лучших практиках?
В целом, правильно ли я понимаю, как справиться с параллелизмом?
Примечание: я понимаю, что один и тот же человек, снимающий два раза деньги за такое короткое время, невозможен, но я взял простой пример, чтобы не потеряться в деталях
источник
Ответы:
Это прекрасный пример приложения, полученного из событий. Давайте начнем.
Каждый раз, когда команда обрабатывается или повторяется (вы поймете, наберитесь терпения), выполняются следующие шаги:
Application layer
.Aggregate
и загружает его из хранилища (в данном случае загрузка выполняютсяnew
-ный собойAggregate
экземпляр, выборка все ранее испускаемых событий этого агрегата и повторно применять их к самой совокупности; Совокупная версия хранятся позднее использование; после применения событий Агрегат находится в своем окончательном состоянии - т. е. текущий баланс счета рассчитывается как число)Aggregate
, какAccount::withdrawMoney(100)
и собирает Поддавшись события, то естьMoneyWithdrewEvent(AccountId, 100)
; если на счете недостаточно денег (баланс <100), возникает исключение, и все отменяется; в противном случае выполняется следующий шаг.Aggregate
хранилище (в этом случае хранилище являетсяEvent Store
); он делает это, добавляя новые события вEvent stream
if и only, если theversion
изAggregate
них все еще тот, который был приAggregate
загрузке. Если версия не совпадает, то команда повторяется - перейдите к шагу 1 . Еслиversion
это то же самое, то события добавляются к,Event stream
и клиенту предоставляетсяSuccess
статус.Эта проверка версии называется оптимистической блокировкой и представляет собой общий механизм блокировки. Еще одним механизмом является пессимистическая блокировка, когда другие записи блокируются (как в не начатой), пока не завершится текущая.
Термин
Event stream
является абстракцией вокруг всех событий, которые были выпущены одним и тем же Агрегатом.Вы должны понимать, что
Event store
это просто еще один вид постоянства, где хранятся все изменения в Агрегате, а не только конечное состояние.Магазин событий всегда является источником правды.
При использовании оптимистической блокировки у вас нет блокировок, просто повторная команда.
В любом случае, замки! = Тупики
источник
Aggregate
где вы не применяете все события, но сохраняете моментальный снимокAggregate
до определенного момента в прошлом и применяете только события, которые произошли после этого момента.Aggregate
, когда снимок должен быть обновлен? Хранилище снимков совпадает с хранилищем событий или это материализованное представление, полученное из шины событий?Близко. Проблема в том, что логика обновления вашего «агрегата» находится в странном месте.
Более обычная реализация состоит в том, что модель данных, которую ваш обработчик команд хранит в памяти, и поток событий в хранилище событий синхронизируются.
Простой пример для описания - это случай, когда обработчик команд выполняет синхронные записи в хранилище событий и обновляет свою локальную копию модели, если соединение с хранилищем событий указывает на успешную запись.
Если обработчику команд необходимо выполнить повторную синхронизацию с хранилищем событий (поскольку его внутренняя модель не соответствует хранилищу событий), он делает это, загружая историю из хранилища и восстанавливая свое собственное внутреннее состояние.
Другими словами, стрелки 2 и 3 (если они присутствуют) обычно связаны с хранилищем событий, а не с хранилищем агрегатов.
Вариации этого являются обычным случаем - вместо добавления к потоку в потоке событий мы обычно помещаем PUT в определенное место в потоке; если эта операция несовместима с состоянием хранилища, запись завершается неудачно, и служба может выбрать подходящий режим сбоя (отказ клиента, повтор, объединение ....). Использование идемпотентной записи решает ряд проблем в распределенном обмене сообщениями, но, конечно, это требует наличия хранилища, которое поддерживает идемпотентную запись.
источник