ES / CQRS обработка параллелизма

20

Недавно я начал погружаться в CQRS / ES, потому что мне может понадобиться применить его на работе. Это кажется очень многообещающим в нашем случае, поскольку это решило бы много проблем.

Я набросал свое грубое понимание того, как приложение ES / CQRS должно выглядеть в контексте с упрощенным банковским сценарием (снятие денег).

ES / CQRS

Чтобы подвести итог, если человек А снимает деньги:

  • команда выдана
  • команда передана для проверки / подтверждения
  • событие передается в хранилище событий, если проверка прошла успешно
  • агрегатор удаляет событие, чтобы применить изменения к агрегату

Из того, что я понял, журнал событий является источником правды, так как это журнал ФАКТОВ, мы можем затем извлечь из него любую проекцию.


Теперь, что я не понимаю, в этой великой схеме вещей, что происходит в этом случае:

  • Правило: баланс не может быть отрицательным
  • человек А имеет баланс 100e
  • человек A выдает команду на снятие 100e
  • проходит валидация и отправляется событие MoneyWithdrewEvent 100e
  • тем временем, человек А выпускает другую команду WithdrawCommand со значением 100e
  • первый MoneyWithdrewEvent еще не был агрегирован, поэтому проверка проходит успешно, так как проверка проверки по совокупности (которая еще не была обновлена)
  • MoneyWithdrewEvent of 100e выдается в другой раз

==> Мы находимся в несогласованном состоянии с балансом -100e, а журнал содержит 2 MoneyWithdrewEvent

Как я понимаю, есть несколько стратегий, чтобы справиться с этой проблемой:

  • a) поместите агрегированный идентификатор версии вместе с событием в хранилище событий, чтобы при несоответствии версий при модификации ничего не происходило
  • б) использовать некоторые стратегии блокировки, подразумевая, что уровень проверки должен каким-то образом создать

Вопросы, связанные со стратегиями:

  • а) В этом случае журнал событий больше не является источником правды, как с этим бороться? Кроме того, мы вернулись к клиенту ОК, тогда как было разрешено снятие средств, лучше ли в этом случае использовать блокировки?
  • б) замки == тупики, есть ли у вас какие-либо идеи о лучших практиках?

В целом, правильно ли я понимаю, как справиться с параллелизмом?

Примечание: я понимаю, что один и тот же человек, снимающий два раза деньги за такое короткое время, невозможен, но я взял простой пример, чтобы не потеряться в деталях

Луи Ф.
источник
Почему бы не обновить агрегат на шаге 4 вместо ожидания до шага 7?
Эрик Эйдт
То есть вы имеете в виду, что в этом случае хранилище событий - это просто журнал, который читается только при запуске приложения для воссоздания агрегатов / других проекций?
Луи Ф.

Ответы:

19

Я набросал свое грубое понимание того, как приложение ES / CQRS должно выглядеть в контексте с упрощенным банковским сценарием (снятие денег).

Это прекрасный пример приложения, полученного из событий. Давайте начнем.

Каждый раз, когда команда обрабатывается или повторяется (вы поймете, наберитесь терпения), выполняются следующие шаги:

  1. команда достигает обработчика команды, то есть службы в Application layer.
  2. то команда обработчик идентифицирует Aggregateи загружает его из хранилища (в данном случае загрузка выполняются new-ный собой Aggregateэкземпляр, выборка все ранее испускаемых событий этого агрегата и повторно применять их к самой совокупности; Совокупная версия хранятся позднее использование; после применения событий Агрегат находится в своем окончательном состоянии - т. е. текущий баланс счета рассчитывается как число)
  3. обработчик команды вызывает соответствующий метод на Aggregate, как Account::withdrawMoney(100)и собирает Поддавшись события, то есть MoneyWithdrewEvent(AccountId, 100); если на счете недостаточно денег (баланс <100), возникает исключение, и все отменяется; в противном случае выполняется следующий шаг.
  4. обработчик команд пытается сохранить в Aggregateхранилище (в этом случае хранилище является Event Store); он делает это, добавляя новые события в Event streamif и only, если the versionиз Aggregateних все еще тот, который был при Aggregateзагрузке. Если версия не совпадает, то команда повторяется - перейдите к шагу 1 . Если versionэто то же самое, то события добавляются к, Event streamи клиенту предоставляется Successстатус.

Эта проверка версии называется оптимистической блокировкой и представляет собой общий механизм блокировки. Еще одним механизмом является пессимистическая блокировка, когда другие записи блокируются (как в не начатой), пока не завершится текущая.

Термин Event streamявляется абстракцией вокруг всех событий, которые были выпущены одним и тем же Агрегатом.

Вы должны понимать, что Event storeэто просто еще один вид постоянства, где хранятся все изменения в Агрегате, а не только конечное состояние.

а) В этом случае журнал событий больше не является источником правды, как с этим бороться? Кроме того, мы вернулись к клиенту ОК, тогда как было разрешено снятие средств, лучше ли в этом случае использовать блокировки?

Магазин событий всегда является источником правды.

б) замки == тупики, есть ли у вас какие-либо идеи о лучших практиках?

При использовании оптимистической блокировки у вас нет блокировок, просто повторная команда.

В любом случае, замки! = Тупики

Константин Гальбену
источник
2
Существует некоторая оптимизация в отношении загрузки места, Aggregateгде вы не применяете все события, но сохраняете моментальный снимок Aggregateдо определенного момента в прошлом и применяете только события, которые произошли после этого момента.
Константин Гальбену
Хорошо, я думаю, что мое замешательство связано с тем фактом, что хранилище событий == шина событий (я имею в виду kafka), поэтому перестройка агрегата может быть дорогостоящей, так как вам может понадобиться перечитать МНОГО событий. В случае наличия снимка Aggregate, когда снимок должен быть обновлен? Хранилище снимков совпадает с хранилищем событий или это материализованное представление, полученное из шины событий?
Луи Ф.
Существует несколько стратегий создания снимка. Одним из них является создание снимка каждые n событий. Вы должны хранить снимок вместе с событиями в том же месте / постоянстве / базе данных в том же коммите. Идея заключается в том, что снимок сильно связан с версией Агрегата.
Константин Гальбену
Хорошо, я думаю, у меня есть более четкое видение того, как справиться с этим. Теперь последний вопрос, какова роль событийного автобуса в конце? Если агрегаты обновляются синхронно?
Луи Ф.
1
Да, вы можете использовать RabbitMQ или любой другой канал, по которому вы хотите отправлять события в считанные модели асинхронно, но только после того, как вы сохраните их в хранилище событий. Действительно, после сохранения они не проверяются: события представляют собой факты, которые произошли; Модель чтения может или не может понравиться, что что-то произошло, но это не может изменить историю.
Константин Гальбену
1

Я набросал свое грубое понимание того, как приложение ES / CQRS должно выглядеть в контексте с упрощенным банковским сценарием (снятие денег).

Близко. Проблема в том, что логика обновления вашего «агрегата» находится в странном месте.

Более обычная реализация состоит в том, что модель данных, которую ваш обработчик команд хранит в памяти, и поток событий в хранилище событий синхронизируются.

Простой пример для описания - это случай, когда обработчик команд выполняет синхронные записи в хранилище событий и обновляет свою локальную копию модели, если соединение с хранилищем событий указывает на успешную запись.

Если обработчику команд необходимо выполнить повторную синхронизацию с хранилищем событий (поскольку его внутренняя модель не соответствует хранилищу событий), он делает это, загружая историю из хранилища и восстанавливая свое собственное внутреннее состояние.

Другими словами, стрелки 2 и 3 (если они присутствуют) обычно связаны с хранилищем событий, а не с хранилищем агрегатов.

поместите агрегированный идентификатор версии вместе с событием в хранилище событий, чтобы при несоответствии версий при модификации ничего не происходило

Вариации этого являются обычным случаем - вместо добавления к потоку в потоке событий мы обычно помещаем PUT в определенное место в потоке; если эта операция несовместима с состоянием хранилища, запись завершается неудачно, и служба может выбрать подходящий режим сбоя (отказ клиента, повтор, объединение ....). Использование идемпотентной записи решает ряд проблем в распределенном обмене сообщениями, но, конечно, это требует наличия хранилища, которое поддерживает идемпотентную запись.

VoiceOfUnreason
источник
Хм, я думаю, что неправильно понял компонент хранилища событий. Я думал, что все должно пройти через это и получить поток. Что, если мой магазин событий - это кафка, и он доступен только для чтения? Я не могу позволить себе на шаге 2 и 3 снова просмотреть все сообщения. Кажется, что в целом мое видение соответствовало этому: medium.com/technology-learning/…
Луи Ф.