From 286dedac76a51f4ddc0f198ba7ff60b1aacd8083 Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Thu, 13 Mar 2025 00:27:44 +0100 Subject: [PATCH] extract projection snapshot logic implement GameStateRepositoryTest add lambda to the GameEventHandler.handle{} to set the version add VersionBuilder add version to the events add creation date to the events rename gameId to aggregateId add EventHandler interface --- .../app/command/GameCommandRunner.kt | 2 +- .../app/command/command/GameCommand.kt | 2 +- .../app/command/command/ICantPlayCommand.kt | 11 +- .../command/IWantToJoinTheGameCommand.kt | 13 +- .../command/command/IWantToPlayCardCommand.kt | 15 +- .../command/command/IamReadyToPlayCommand.kt | 13 +- .../eventDemo/app/event/EventHandler.kt | 13 ++ .../eventDemo/app/event/GameEventHandler.kt | 12 +- .../app/event/event/CardIsPlayedEvent.kt | 11 +- .../eventDemo/app/event/event/GameEvent.kt | 3 +- .../app/event/event/GameStartedEvent.kt | 10 +- .../app/event/event/NewPlayerEvent.kt | 6 +- .../app/event/event/PlayerActionEvent.kt | 2 +- .../app/event/event/PlayerChoseColorEvent.kt | 6 +- .../app/event/event/PlayerHavePassEvent.kt | 6 +- .../app/event/event/PlayerReadyEvent.kt | 6 +- .../app/event/event/PlayerWinEvent.kt | 6 +- .../app/event/projection/GameState.kt | 5 +- .../app/event/projection/GameStateBuilder.kt | 27 +-- .../event/projection/GameStateRepository.kt | 77 ++----- .../app/event/projection/Projection.kt | 8 + .../ProjectionSnapshotRepositoryInMemory.kt | 54 +++++ .../eventListener/ReactionEventListener.kt | 25 ++- .../eventDemo/app/query/ReadTheGameState.kt | 4 +- .../eventDemo/configuration/ConfigureDI.kt | 4 + src/main/kotlin/eventDemo/libs/event/Event.kt | 5 +- .../libs/event/EventStreamInMemory.kt | 4 +- .../eventDemo/libs/event/VersionBuilder.kt | 7 + .../libs/event/VersionBuilderLocal.kt | 11 + .../app/command/GameCommandHandlerTest.kt | 2 + .../event/projection/GameStateBuilderTest.kt | 65 ++++-- .../projection/GameStateRepositoryTest.kt | 126 ++++++++++- ...rojectionSnapshotRepositoryInMemoryTest.kt | 125 +++++++++++ .../eventDemo/app/query/GameStateRouteTest.kt | 26 +-- .../eventDemo/app/query/GameStateTest.kt | 196 +++++++++--------- .../libs/event/VersionBuilderLocalTest.kt | 42 ++++ 36 files changed, 684 insertions(+), 266 deletions(-) create mode 100644 src/main/kotlin/eventDemo/app/event/EventHandler.kt create mode 100644 src/main/kotlin/eventDemo/app/event/projection/Projection.kt create mode 100644 src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt create mode 100644 src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt create mode 100644 src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandRunner.kt b/src/main/kotlin/eventDemo/app/command/GameCommandRunner.kt index 6718df9..602d874 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandRunner.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandRunner.kt @@ -18,7 +18,7 @@ class GameCommandRunner( command: GameCommand, outgoingErrorChannelNotification: SendChannel, ) { - val gameState = gameStateRepository.get(command.payload.gameId) + val gameState = gameStateRepository.getLast(command.payload.aggregateId) val errorNotifier = errorNotifier(command, outgoingErrorChannelNotification) when (command) { diff --git a/src/main/kotlin/eventDemo/app/command/command/GameCommand.kt b/src/main/kotlin/eventDemo/app/command/command/GameCommand.kt index c6d251e..788ecb3 100644 --- a/src/main/kotlin/eventDemo/app/command/command/GameCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/GameCommand.kt @@ -11,7 +11,7 @@ sealed interface GameCommand : Command { @Serializable sealed interface Payload { - val gameId: GameId + val aggregateId: GameId val player: Player } } diff --git a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt index 74a4b25..eab12b9 100644 --- a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt @@ -20,7 +20,7 @@ data class ICantPlayCommand( @Serializable data class Payload( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload @@ -37,13 +37,14 @@ data class ICantPlayCommand( if (playableCards.isEmpty()) { val takenCard = state.deck.stack.first() - eventHandler.handle( + eventHandler.handle { PlayerHavePassEvent( - gameId = payload.gameId, + aggregateId = payload.aggregateId, player = payload.player, takenCard = takenCard, - ), - ) + version = it, + ) + } } else { playerErrorNotifier("You can and must play one card, like ${playableCards.first()::class.simpleName}") } diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt index 46bd4c5..85b5c92 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt @@ -20,7 +20,7 @@ data class IWantToJoinTheGameCommand( @Serializable data class Payload( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload @@ -30,12 +30,13 @@ data class IWantToJoinTheGameCommand( eventHandler: GameEventHandler, ) { if (!state.isStarted) { - eventHandler.handle( + eventHandler.handle { NewPlayerEvent( - payload.gameId, - payload.player, - ), - ) + aggregateId = payload.aggregateId, + player = payload.player, + version = it, + ) + } } else { playerErrorNotifier("The game is already started") } diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt index 6fd91d7..64bd030 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt @@ -21,7 +21,7 @@ data class IWantToPlayCardCommand( @Serializable data class Payload( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, val card: Card, ) : GameCommand.Payload @@ -41,13 +41,14 @@ data class IWantToPlayCardCommand( } if (state.canBePlayThisCard(payload.player, payload.card)) { - eventHandler.handle( + eventHandler.handle { CardIsPlayedEvent( - payload.gameId, - payload.card, - payload.player, - ), - ) + aggregateId = payload.aggregateId, + card = payload.card, + player = payload.player, + version = it, + ) + } } else { playerErrorNotifier("You cannot play this card") } diff --git a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt index d68d300..c48a91b 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt @@ -20,7 +20,7 @@ data class IamReadyToPlayCommand( @Serializable data class Payload( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload @@ -39,12 +39,13 @@ data class IamReadyToPlayCommand( } else if (playerIsAlreadyReady) { playerErrorNotifier("You are already ready") } else { - eventHandler.handle( + eventHandler.handle { PlayerReadyEvent( - payload.gameId, - payload.player, - ), - ) + aggregateId = payload.aggregateId, + player = payload.player, + version = it, + ) + } } } } diff --git a/src/main/kotlin/eventDemo/app/event/EventHandler.kt b/src/main/kotlin/eventDemo/app/event/EventHandler.kt new file mode 100644 index 0000000..cfe56a5 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/event/EventHandler.kt @@ -0,0 +1,13 @@ +package eventDemo.app.event + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event + +/** + * A stream to publish and read the played card event. + */ +interface EventHandler, ID : AggregateId> { + fun registerProjectionBuilder(builder: (E) -> Unit) + + fun handle(buildEvent: (version: Int) -> E): E +} diff --git a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt index bc208c4..cd9c8bc 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt @@ -1,6 +1,8 @@ package eventDemo.app.event +import eventDemo.app.entity.GameId import eventDemo.app.event.event.GameEvent +import eventDemo.libs.event.VersionBuilder /** * A stream to publish and read the played card event. @@ -8,20 +10,20 @@ import eventDemo.app.event.event.GameEvent class GameEventHandler( private val eventBus: GameEventBus, private val eventStream: GameEventStream, -) { + private val versionBuilder: VersionBuilder, +) : EventHandler { private val projectionsBuilders: MutableList<(GameEvent) -> Unit> = mutableListOf() - fun registerProjectionBuilder(builder: GameProjectionBuilder) { + override fun registerProjectionBuilder(builder: GameProjectionBuilder) { projectionsBuilders.add(builder) } - fun handle(vararg events: GameEvent) { - events.forEach { event -> + override fun handle(buildEvent: (version: Int) -> GameEvent): GameEvent = + buildEvent(versionBuilder.buildNextVersion()).also { event -> eventStream.publish(event) projectionsBuilders.forEach { it(event) } eventBus.publish(event) } - } } typealias GameProjectionBuilder = (GameEvent) -> Unit diff --git a/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt b/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt index 6b775c5..1c38d6d 100644 --- a/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt @@ -3,15 +3,20 @@ package eventDemo.app.event.event import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * An [GameEvent] to represent a played card. */ data class CardIsPlayedEvent( - override val gameId: GameId, + override val aggregateId: GameId, val card: Card, override val player: Player, - override val eventId: UUID = UUID.randomUUID(), + override val version: Int, ) : GameEvent, - PlayerActionEvent + PlayerActionEvent { + override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() +} diff --git a/src/main/kotlin/eventDemo/app/event/event/GameEvent.kt b/src/main/kotlin/eventDemo/app/event/event/GameEvent.kt index cacf37a..8c5203f 100644 --- a/src/main/kotlin/eventDemo/app/event/event/GameEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/GameEvent.kt @@ -11,5 +11,6 @@ import java.util.UUID @Serializable sealed interface GameEvent : Event { override val eventId: UUID - override val gameId: GameId + override val aggregateId: GameId + override val version: Int } diff --git a/src/main/kotlin/eventDemo/app/event/event/GameStartedEvent.kt b/src/main/kotlin/eventDemo/app/event/event/GameStartedEvent.kt index 78f3c95..48565e3 100644 --- a/src/main/kotlin/eventDemo/app/event/event/GameStartedEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/GameStartedEvent.kt @@ -4,26 +4,31 @@ import eventDemo.app.entity.Deck import eventDemo.app.entity.GameId import eventDemo.app.entity.Player import eventDemo.app.entity.initHands +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * This [GameEvent] is sent when all players are ready. */ data class GameStartedEvent( - override val gameId: GameId, + override val aggregateId: GameId, val firstPlayer: Player, val deck: Deck, + override val version: Int, ) : GameEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() companion object { fun new( id: GameId, players: Set, shuffleIsDisabled: Boolean = isDisabled, + version: Int, ): GameStartedEvent = GameStartedEvent( - gameId = id, + aggregateId = id, firstPlayer = if (shuffleIsDisabled) players.first() else players.random(), deck = Deck @@ -31,6 +36,7 @@ data class GameStartedEvent( .let { if (shuffleIsDisabled) it else it.shuffle() } .initHands(players) .placeFirstCardOnDiscard(), + version = version, ) } } diff --git a/src/main/kotlin/eventDemo/app/event/event/NewPlayerEvent.kt b/src/main/kotlin/eventDemo/app/event/event/NewPlayerEvent.kt index db51e3b..0d314f8 100644 --- a/src/main/kotlin/eventDemo/app/event/event/NewPlayerEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/NewPlayerEvent.kt @@ -2,14 +2,18 @@ package eventDemo.app.event.event import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * An [GameEvent] to represent a new player joining the game. */ data class NewPlayerEvent( - override val gameId: GameId, + override val aggregateId: GameId, val player: Player, + override val version: Int, ) : GameEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt index 8c993d6..ffe9e41 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt @@ -2,6 +2,6 @@ package eventDemo.app.event.event import eventDemo.app.entity.Player -sealed interface PlayerActionEvent { +sealed interface PlayerActionEvent : GameEvent { val player: Player } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt index b192bf3..9adff0f 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt @@ -3,16 +3,20 @@ package eventDemo.app.event.event import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * This [GameEvent] is sent when a player chose a color. */ data class PlayerChoseColorEvent( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, val color: Card.Color, + override val version: Int, ) : GameEvent, PlayerActionEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt index 1396170..14d30fc 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt @@ -3,16 +3,20 @@ package eventDemo.app.event.event import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * This [GameEvent] is sent when a player can play. */ data class PlayerHavePassEvent( - override val gameId: GameId, + override val aggregateId: GameId, override val player: Player, val takenCard: Card, + override val version: Int, ) : GameEvent, PlayerActionEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerReadyEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerReadyEvent.kt index ff5de53..f912bb9 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerReadyEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerReadyEvent.kt @@ -2,14 +2,18 @@ package eventDemo.app.event.event import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * This [GameEvent] is sent when a player is ready. */ data class PlayerReadyEvent( - override val gameId: GameId, + override val aggregateId: GameId, val player: Player, + override val version: Int, ) : GameEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerWinEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerWinEvent.kt index 933f7b3..10aa237 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerWinEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerWinEvent.kt @@ -2,14 +2,18 @@ package eventDemo.app.event.event import eventDemo.app.entity.GameId import eventDemo.app.entity.Player +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.UUID /** * This [GameEvent] is sent when a player is ready. */ data class PlayerWinEvent( - override val gameId: GameId, + override val aggregateId: GameId, val player: Player, + override val version: Int, ) : GameEvent { override val eventId: UUID = UUID.randomUUID() + override val createdAt: Instant = Clock.System.now() } diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameState.kt b/src/main/kotlin/eventDemo/app/event/projection/GameState.kt index b0f2d33..d3de8e1 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameState.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameState.kt @@ -8,7 +8,8 @@ import kotlinx.serialization.Serializable @Serializable data class GameState( - val gameId: GameId, + override val aggregateId: GameId, + override val lastEventVersion: Int = 0, val players: Set = emptySet(), val currentPlayerTurn: Player? = null, val cardOnCurrentStack: LastCard? = null, @@ -18,7 +19,7 @@ data class GameState( val deck: Deck = Deck(players), val isStarted: Boolean = false, val playerWins: Set = emptySet(), -) { +) : Projection { @Serializable data class LastCard( val card: Card, diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt index 8358e72..4fd6fd0 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt @@ -23,27 +23,28 @@ fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState { } fun Collection.buildStateFromEvents(): GameState { - val gameId = this.firstOrNull()?.gameId ?: error("Cannot build GameState from an empty list") + val gameId = this.firstOrNull()?.aggregateId ?: error("Cannot build GameState from an empty list") return fold(GameState(gameId)) { state, event -> state.apply(event) } } -fun GameState.apply(event: GameEvent): GameState = - let { state -> +fun GameState?.apply(event: GameEvent): GameState = + (this ?: GameState(event.aggregateId)).let { state -> val logger = KotlinLogging.logger { } if (event is PlayerActionEvent) { if (state.currentPlayerTurn != event.player) { logger.atError { - message = "Inconsistent player turn. CurrentPlayerTurn: $currentPlayerTurn | Player: ${event.player}" + message = "Inconsistent player turn. CurrentPlayerTurn: $state.currentPlayerTurn | Player: ${event.player}" payload = mapOf( - "CurrentPlayerTurn" to (currentPlayerTurn ?: "No currentPlayerTurn"), + "CurrentPlayerTurn" to (state.currentPlayerTurn ?: "No currentPlayerTurn"), "Player" to event.player, ) } } } + when (event) { is CardIsPlayedEvent -> { val nextDirectionAfterPlay = @@ -60,9 +61,9 @@ fun GameState.apply(event: GameEvent): GameState = val currentPlayerAfterThePlay = if (event.card is Card.AllColorCard) { - currentPlayerTurn + state.currentPlayerTurn } else { - nextPlayer(nextDirectionAfterPlay) + state.nextPlayer(nextDirectionAfterPlay) } state.copy( @@ -98,14 +99,14 @@ fun GameState.apply(event: GameEvent): GameState = logger.error { "taken card is not ot top of the stack: ${event.takenCard}" } } state.copy( - currentPlayerTurn = nextPlayerTurn, + currentPlayerTurn = state.nextPlayerTurn, deck = state.deck.takeOneCardFromStackTo(event.player), ) } is PlayerChoseColorEvent -> { state.copy( - currentPlayerTurn = nextPlayerTurn, + currentPlayerTurn = state.nextPlayerTurn, colorOnCurrentStack = event.color, ) } @@ -121,9 +122,11 @@ fun GameState.apply(event: GameEvent): GameState = } is PlayerWinEvent -> { - copy( - playerWins = playerWins + event.player, + state.copy( + playerWins = state.playerWins + event.player, ) } - } + }.copy( + lastEventVersion = event.version, + ) } diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt index e71140b..8e07cb5 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt @@ -4,73 +4,32 @@ import eventDemo.app.entity.GameId import eventDemo.app.event.GameEventHandler import eventDemo.app.event.GameEventStream import eventDemo.app.event.event.GameEvent -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicInteger class GameStateRepository( private val eventStream: GameEventStream, eventHandler: GameEventHandler, - private val maxSnapshotCacheSize: Int = 20, + maxSnapshotCacheSize: Int = 20, ) { - private val projections: ConcurrentHashMap = ConcurrentHashMap() - private val version: AtomicInteger = AtomicInteger(0) - private val projectionsSnapshot: ConcurrentHashMap = ConcurrentHashMap() - private val sortedSnapshotByVersion: ConcurrentHashMap = ConcurrentHashMap() + private val projectionsSnapshot = + ProjectionSnapshotRepositoryInMemory( + applyToProjection = GameState?::apply, + maxSnapshotCacheSize = maxSnapshotCacheSize, + ) init { eventHandler.registerProjectionBuilder { event -> - val projection = projections[event.gameId] - if (projection == null) { - event - .buildStateFromEventStreamTo(eventStream) - .update() - } else { - projection - .apply(event) - .also { projections[it.gameId] = it } - .also { state -> - val newVersion = version.addAndGet(1) - saveSnapshot(event, state, newVersion) - removeOldSnapshot() - } - } + projectionsSnapshot.applyAndPutToCache(event) } } - private fun removeOldSnapshot() { - if (projectionsSnapshot.size > maxSnapshotCacheSize) { - val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize - sortedSnapshotByVersion - .toList() - .sortedBy { it.second } - .take(numberToRemove) - .toMap() - .keys - .forEach { event -> - sortedSnapshotByVersion.remove(event) - projectionsSnapshot.remove(event) - } - } - } - - private fun saveSnapshot( - event: GameEvent, - state: GameState, - newVersion: Int, - ) { - projectionsSnapshot[event] = state - sortedSnapshotByVersion[event] = newVersion - } - /** * Get the last version of the [GameState] from the all eventStream. * * It fetches it from the local cache if possible, otherwise it builds it. */ - fun get(gameId: GameId): GameState = - projections.computeIfAbsent(gameId) { - gameId.buildStateFromEventStream(eventStream) - } + fun getLast(gameId: GameId): GameState = + projectionsSnapshot.getLast(gameId) + ?: gameId.buildStateFromEventStream(eventStream) /** * Get the [GameState] to the specific [event][GameEvent]. @@ -79,17 +38,7 @@ class GameStateRepository( * It fetches it from the local cache if possible, otherwise it builds it. */ fun getUntil(event: GameEvent): GameState = - projectionsSnapshot.computeIfAbsent(event) { - event.buildStateFromEventStreamTo(eventStream) - } - - private fun GameState.update() { - projections[gameId] = this - } - - /** - * Build the state to the specific event - */ - private fun GameEvent.buildStateFromEventStreamTo(eventStream: GameEventStream): GameState = - run { eventStream.readAll(gameId).takeWhile { it != this } + this }.buildStateFromEvents() + projectionsSnapshot.getUntil(event) + ?: (eventStream.readAll(event.aggregateId).takeWhile { it != event } + event) + .buildStateFromEvents() } diff --git a/src/main/kotlin/eventDemo/app/event/projection/Projection.kt b/src/main/kotlin/eventDemo/app/event/projection/Projection.kt new file mode 100644 index 0000000..9c4460e --- /dev/null +++ b/src/main/kotlin/eventDemo/app/event/projection/Projection.kt @@ -0,0 +1,8 @@ +package eventDemo.app.event.projection + +import eventDemo.libs.event.AggregateId + +interface Projection { + val aggregateId: ID + val lastEventVersion: Int +} diff --git a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt new file mode 100644 index 0000000..cbebd20 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -0,0 +1,54 @@ +package eventDemo.app.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import java.util.concurrent.ConcurrentHashMap + +class ProjectionSnapshotRepositoryInMemory, P : Projection, ID : AggregateId>( + private val maxSnapshotCacheSize: Int = 20, + private val applyToProjection: P?.(event: E) -> P, +) { + private val projectionsSnapshot: ConcurrentHashMap = ConcurrentHashMap() + + fun applyAndPutToCache(event: E): P { + // lock here + return projectionsSnapshot + .filterKeys { it.aggregateId == event.aggregateId } + .toList() + .find { (e, _) -> e.version == (event.version - 1) } + ?.second + .applyToProjection(event) + .also { projectionsSnapshot.put(event, it) } + .also { removeOldSnapshot() } + // Unlock here + } + + private fun removeOldSnapshot() { + if (projectionsSnapshot.size > maxSnapshotCacheSize) { + val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize + + projectionsSnapshot + .keys + .sortedBy { it.version } + .take(numberToRemove) + .forEach { event -> + projectionsSnapshot.remove(event) + } + } + } + + /** + * Get the last version of the [Projection] from the cache. + */ + fun getLast(aggregateId: ID): P? = + projectionsSnapshot + .filter { it.key.aggregateId == aggregateId } + .maxByOrNull { (event, _) -> event.version } + ?.value + + /** + * Get the [Projection] to the specific [event][Event]. + * It does not contain the [events][Event] it after this one. + */ + fun getUntil(event: E): P? = projectionsSnapshot.get(event) +} diff --git a/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt b/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt index 07ef376..adb19c8 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt @@ -36,10 +36,13 @@ class ReactionEventListener( ) { if (state.isReady && !state.isStarted) { val reactionEvent = - GameStartedEvent.new( - state.gameId, - state.players, - ) + eventHandler.handle { + GameStartedEvent.new( + id = state.aggregateId, + players = state.players, + version = it, + ) + } logger.atInfo { message = "Reaction event was Send $reactionEvent on reaction of: $event" payload = @@ -48,7 +51,6 @@ class ReactionEventListener( "reactionEvent" to reactionEvent, ) } - eventHandler.handle(reactionEvent) } else { if (event is PlayerReadyEvent) { logger.info { "All players was not ready ${state.readyPlayers}" } @@ -63,10 +65,14 @@ class ReactionEventListener( val winner = state.playerHasNoCardLeft().firstOrNull() if (winner != null) { val reactionEvent = - PlayerWinEvent( - state.gameId, - winner, - ) + eventHandler.handle { + PlayerWinEvent( + aggregateId = state.aggregateId, + player = winner, + version = it, + ) + } + logger.atInfo { message = "Reaction event was Send $reactionEvent on reaction of: $event" payload = @@ -75,7 +81,6 @@ class ReactionEventListener( "reactionEvent" to reactionEvent, ) } - eventHandler.handle(reactionEvent) } } } diff --git a/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt b/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt index 06d984e..73f2871 100644 --- a/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt +++ b/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt @@ -39,7 +39,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) { // Read the last played card on the game. get { body -> gameStateRepository - .get(body.game.id) + .getLast(body.game.id) .cardOnCurrentStack ?.card ?.let { call.respond(it) } @@ -48,7 +48,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) { // Read the last played card on the game. get { body -> - val state = gameStateRepository.get(body.game.id) + val state = gameStateRepository.getLast(body.game.id) call.respond(state) } } diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt index 88083fa..8198eaa 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt @@ -11,9 +11,12 @@ import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.libs.command.CommandStreamChannelBuilder import eventDemo.libs.event.EventBusInMemory import eventDemo.libs.event.EventStreamInMemory +import eventDemo.libs.event.VersionBuilder +import eventDemo.libs.event.VersionBuilderLocal import io.ktor.server.application.Application import io.ktor.server.application.install import org.koin.core.module.dsl.singleOf +import org.koin.dsl.bind import org.koin.dsl.module import org.koin.ktor.plugin.Koin import org.koin.logger.slf4jLogger @@ -40,6 +43,7 @@ val appKoinModule = CommandStreamChannelBuilder() } + singleOf(::VersionBuilderLocal) bind VersionBuilder::class singleOf(::GameEventHandler) singleOf(::GameCommandRunner) singleOf(::GameCommandHandler) diff --git a/src/main/kotlin/eventDemo/libs/event/Event.kt b/src/main/kotlin/eventDemo/libs/event/Event.kt index 1b9596b..46b5ca2 100644 --- a/src/main/kotlin/eventDemo/libs/event/Event.kt +++ b/src/main/kotlin/eventDemo/libs/event/Event.kt @@ -1,5 +1,6 @@ package eventDemo.libs.event +import kotlinx.datetime.Instant import java.util.UUID /** @@ -16,5 +17,7 @@ interface AggregateId { */ interface Event { val eventId: UUID - val gameId: ID + val aggregateId: ID + val createdAt: Instant + val version: Int } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index 9f9f9bf..09595c4 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -34,11 +34,11 @@ class EventStreamInMemory, ID : AggregateId> : EventStream ): R? = events .filterIsInstance(eventType.java) - .lastOrNull { it.gameId == aggregateId } + .lastOrNull { it.aggregateId == aggregateId } override fun readAll(aggregateId: ID): Set = events - .filter { it.gameId == aggregateId } + .filter { it.aggregateId == aggregateId } .toSet() } diff --git a/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt b/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt new file mode 100644 index 0000000..16b0d04 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt @@ -0,0 +1,7 @@ +package eventDemo.libs.event + +interface VersionBuilder { + fun buildNextVersion(): Int + + fun getLastVersion(): Int +} diff --git a/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt b/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt new file mode 100644 index 0000000..4d8ae1f --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt @@ -0,0 +1,11 @@ +package eventDemo.libs.event + +import java.util.concurrent.atomic.AtomicInteger + +class VersionBuilderLocal : VersionBuilder { + private val version: AtomicInteger = AtomicInteger(0) + + override fun buildNextVersion(): Int = version.addAndGet(1) + + override fun getLastVersion(): Int = version.toInt() +} diff --git a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt index 5283ede..d7827eb 100644 --- a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt @@ -11,12 +11,14 @@ import eventDemo.app.notification.WelcomeToTheGameNotification import eventDemo.configuration.appKoinModule import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import org.koin.dsl.koinApplication import kotlin.test.assertIs +@OptIn(DelicateCoroutinesApi::class) class GameCommandHandlerTest : FunSpec({ test("handle a command should execute the command") { diff --git a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt index 38dbf35..255172c 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt @@ -8,6 +8,7 @@ import eventDemo.app.event.event.GameStartedEvent import eventDemo.app.event.event.NewPlayerEvent import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.disableShuffleDeck +import eventDemo.libs.event.VersionBuilderLocal import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.equals.shouldBeEqual import kotlin.test.assertIs @@ -17,34 +18,55 @@ class GameStateBuilderTest : FunSpec({ test("apply") { disableShuffleDeck() + val versionBuilder = VersionBuilderLocal() val gameId = GameId() val player1 = Player(name = "Nikola") val player2 = Player(name = "Einstein") GameState(gameId) .run { - val event = NewPlayerEvent(gameId, player1) + val event = + NewPlayerEvent( + aggregateId = gameId, + player = player1, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId state.isReady shouldBeEqual false state.isStarted shouldBeEqual false } }.run { - val event = NewPlayerEvent(gameId, player2) + val event = + NewPlayerEvent( + aggregateId = gameId, + player = player2, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId state.players shouldBeEqual setOf(player1, player2) } }.run { - val event = PlayerReadyEvent(gameId, player1) + val event = + PlayerReadyEvent( + aggregateId = gameId, + player = player1, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId state.readyPlayers shouldBeEqual setOf(player1) } }.run { - val event = PlayerReadyEvent(gameId, player2) + val event = + PlayerReadyEvent( + aggregateId = gameId, + player = player2, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId state.readyPlayers shouldBeEqual setOf(player1, player2) state.isReady shouldBeEqual true state.isStarted shouldBeEqual false @@ -52,12 +74,13 @@ class GameStateBuilderTest : }.run { val event = GameStartedEvent.new( - gameId, - setOf(player1, player2), + id = gameId, + players = setOf(player1, player2), shuffleIsDisabled = true, + version = versionBuilder.buildNextVersion(), ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId state.isStarted shouldBeEqual true assertIs(state.deck.stack.first()).let { it.number shouldBeEqual 6 @@ -66,9 +89,15 @@ class GameStateBuilderTest : } }.run { val playedCard = playableCards(player1)[0] - val event = CardIsPlayedEvent(gameId, playedCard, player1) + val event = + CardIsPlayedEvent( + aggregateId = gameId, + card = playedCard, + player = player1, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard assertIs(playedCard).let { it.number shouldBeEqual 0 @@ -77,9 +106,15 @@ class GameStateBuilderTest : } }.run { val playedCard = playableCards(player2)[0] - val event = CardIsPlayedEvent(gameId, playedCard, player2) + val event = + CardIsPlayedEvent( + aggregateId = gameId, + card = playedCard, + player = player2, + version = versionBuilder.buildNextVersion(), + ) apply(event).also { state -> - state.gameId shouldBeEqual gameId + state.aggregateId shouldBeEqual gameId assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard assertIs(playedCard).let { it.number shouldBeEqual 7 diff --git a/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt index 4a93e64..2ae3a5b 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt @@ -1,16 +1,128 @@ package eventDemo.app.event.projection +import eventDemo.app.entity.GameId +import eventDemo.app.entity.Player +import eventDemo.app.event.GameEventHandler +import eventDemo.app.event.event.NewPlayerEvent +import eventDemo.configuration.appKoinModule import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.equals.shouldBeEqual +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import org.koin.core.context.stopKoin +import org.koin.dsl.koinApplication +import kotlin.test.assertNotNull +@OptIn(DelicateCoroutinesApi::class) class GameStateRepositoryTest : FunSpec({ - xtest("GameStateRepository should build the projection when a new event occurs") { } + val player1 = Player("Tesla") + val player2 = Player(name = "Einstein") - xtest("get should build the last version of the state") { } - xtest("get should be concurrently secure") { } - xtest("get should be concurrently secure") { } + test("GameStateRepository should build the projection when a new event occurs") { + val aggregateId = GameId() + koinApplication { modules(appKoinModule) }.koin.apply { + val repo = get() + val eventHandler = get() + eventHandler + .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .also { event -> + assertNotNull(repo.getUntil(event)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + } + } + stopKoin() + } - xtest("getUntil should build the state until the event") { } - xtest("call getUntil twice should get the state from the cache") { } - xtest("getUntil should be concurrently secure") { } + test("get should build the last version of the state") { + val aggregateId = GameId() + koinApplication { modules(appKoinModule) }.koin.apply { + val repo = get() + val eventHandler = get() + + eventHandler + .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .also { + assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + } + + eventHandler + .handle { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } + .also { + assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1, player2) + } + } + } + } + + test("getUntil should build the state until the event") { + repeat(10) { + val aggregateId = GameId() + koinApplication { modules(appKoinModule) }.koin.apply { + val repo = get() + val eventHandler = get() + + val event1 = + eventHandler + .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .also { event1 -> + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + } + + eventHandler + .handle { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } + .also { event2 -> + assertNotNull(repo.getUntil(event2)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1, player2) + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + } + } + } + } + + test("getUntil should be concurrently secure") { + val aggregateId = GameId() + koinApplication { modules(appKoinModule) }.koin.apply { + val repo = get() + val eventHandler = get() + + (1..10) + .map { r -> + GlobalScope + .launch { + repeat(100) { r2 -> + val playerX = Player("player$r$r2") + eventHandler + .handle { + NewPlayerEvent( + aggregateId = aggregateId, + player = playerX, + version = it, + ) + } + } + } + }.joinAll() + + repo.getLast(aggregateId).players shouldHaveSize 1000 + repo.getLast(aggregateId).lastEventVersion shouldBeEqual 1000 + } + } + + xtest("get should be concurrently secure") { } }) diff --git a/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt new file mode 100644 index 0000000..cea409a --- /dev/null +++ b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt @@ -0,0 +1,125 @@ +package eventDemo.app.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.equals.shouldBeEqual +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import java.util.UUID +import kotlin.test.assertNotNull + +@OptIn(DelicateCoroutinesApi::class) +class ProjectionSnapshotRepositoryInMemoryTest : + FunSpec({ + + test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { + repeat(10) { + val repo = getRepoTest() + val aggregateId = IdTest() + + val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) + repo.applyAndPutToCache(eventOther) + assertNotNull(repo.getUntil(eventOther)).also { + assertNotNull(it.value) shouldBeEqual "valOther" + } + + val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) + repo.applyAndPutToCache(event1) + assertNotNull(repo.getLast(event1.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + + val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) + repo.applyAndPutToCache(event2) + assertNotNull(repo.getLast(event2.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event2)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" + } + } + } + + test("ProjectionSnapshotRepositoryInMemory should be thread safe") { + val repo = getRepoTest(2000) + val aggregateId = IdTest() + (1..10) + .map { r -> + GlobalScope.launch { + repeat(10) { + val eventX = EventXTest(num = 1, version = r, aggregateId = aggregateId) + repo.applyAndPutToCache(eventX) + } + } + }.joinAll() + assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 + } + }) + +@JvmInline +private value class IdTest( + override val id: UUID = UUID.randomUUID(), +) : AggregateId + +private data class ProjectionTest( + override val aggregateId: IdTest, + override val lastEventVersion: Int = 0, + var value: String? = null, + var num: Int = 0, +) : Projection + +private sealed interface TestEvents : Event + +private data class Event1Test( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val value1: String, +) : TestEvents + +private data class Event2Test( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val value2: String, +) : TestEvents + +private data class EventXTest( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val num: Int, +) : TestEvents + +private fun getRepoTest(maxSnapshotCacheSize: Int = 2000): ProjectionSnapshotRepositoryInMemory = + ProjectionSnapshotRepositoryInMemory(maxSnapshotCacheSize) { event -> + (this ?: ProjectionTest(event.aggregateId)).let { projection -> + when (event) { + is Event1Test -> { + projection.copy(value = (projection.value ?: "") + event.value1) + } + + is Event2Test -> { + projection.copy(value = (projection.value ?: "") + event.value2) + } + + is EventXTest -> { + projection.copy(num = projection.num + event.num) + } + } + } + } diff --git a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt index 4d4b8e3..23d7602 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt @@ -50,7 +50,7 @@ class GameStateRouteTest : }.apply { assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) val state = call.body() - id shouldBeEqual state.gameId + id shouldBeEqual state.aggregateId state.players shouldHaveSize 0 state.isStarted shouldBeEqual false } @@ -71,19 +71,20 @@ class GameStateRouteTest : val eventHandler by inject() val stateRepo by inject() runBlocking { - eventHandler.handle( - NewPlayerEvent(gameId, player1), - NewPlayerEvent(gameId, player2), - PlayerReadyEvent(gameId, player1), - PlayerReadyEvent(gameId, player2), + eventHandler.handle { NewPlayerEvent(gameId, player1, it) } + eventHandler.handle { NewPlayerEvent(gameId, player2, it) } + eventHandler.handle { PlayerReadyEvent(gameId, player1, it) } + eventHandler.handle { PlayerReadyEvent(gameId, player2, it) } + eventHandler.handle { GameStartedEvent.new( gameId, setOf(player1, player2), shuffleIsDisabled = true, - ), - ) + it, + ) + } delay(100) - lastPlayedCard = stateRepo.get(gameId).playableCards(player1).first() + lastPlayedCard = stateRepo.getLast(gameId).playableCards(player1).first() assertNotNull(lastPlayedCard) .let { assertIs(lastPlayedCard) } .let { @@ -91,13 +92,14 @@ class GameStateRouteTest : it.color shouldBeEqual Card.Color.Red } delay(100) - eventHandler.handle( + eventHandler.handle { CardIsPlayedEvent( gameId, assertNotNull(lastPlayedCard), player1, - ), - ) + it, + ) + } delay(100) } } diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index 281d3a0..3003f57 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -32,127 +32,131 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout import org.koin.dsl.koinApplication import kotlin.test.assertIs import kotlin.test.assertNotNull import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds @DelicateCoroutinesApi class GameStateTest : FunSpec({ test("Simulation of a game") { - disableShuffleDeck() - val id = GameId() - val player1 = Player(name = "Nikola") - val player2 = Player(name = "Einstein") - val channelCommand1 = Channel(Channel.BUFFERED) - val channelCommand2 = Channel(Channel.BUFFERED) - val channelNotification1 = Channel(Channel.BUFFERED) - val channelNotification2 = Channel(Channel.BUFFERED) + withTimeout(2.seconds) { + disableShuffleDeck() + val id = GameId() + val player1 = Player(name = "Nikola") + val player2 = Player(name = "Einstein") + val channelCommand1 = Channel(Channel.BUFFERED) + val channelCommand2 = Channel(Channel.BUFFERED) + val channelNotification1 = Channel(Channel.BUFFERED) + val channelNotification2 = Channel(Channel.BUFFERED) - var playedCard1: Card? = null - var playedCard2: Card? = null + var playedCard1: Card? = null + var playedCard2: Card? = null - val player1Job = - launch { - channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1))) - channelNotification1.receive().let { - assertIs(it).players shouldBeEqual setOf(player1) - } - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } - channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1))) - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } - val player1Hand = + val player1Job = + launch { + channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1))) channelNotification1.receive().let { - assertIs(it).hand shouldHaveSize 7 + assertIs(it).players shouldBeEqual setOf(player1) } - playedCard1 = player1Hand.first() - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 + channelNotification1.receive().let { + assertIs(it).player shouldBeEqual player2 } - } - channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first()))) + channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1))) + channelNotification1.receive().let { + assertIs(it).player shouldBeEqual player2 + } + val player1Hand = + channelNotification1.receive().let { + assertIs(it).hand shouldHaveSize 7 + } + playedCard1 = player1Hand.first() + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first()))) - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + card shouldBeEqual assertNotNull(playedCard2) + } } } - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - card shouldBeEqual assertNotNull(playedCard2) - } - } - } - - val player2Job = - launch { - delay(100) - channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2))) - channelNotification2.receive().let { - assertIs(it).players shouldBeEqual setOf(player1, player2) - } - channelNotification2.receive().let { - assertIs(it).player shouldBeEqual player1 - } - channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2))) - val player2Hand = + val player2Job = + launch { + delay(100) + channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2))) channelNotification2.receive().let { - assertIs(it).hand shouldHaveSize 7 + assertIs(it).players shouldBeEqual setOf(player1, player2) } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 + channelNotification2.receive().let { + assertIs(it).player shouldBeEqual player1 } + channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2))) + val player2Hand = + channelNotification2.receive().let { + assertIs(it).hand shouldHaveSize 7 + } + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + card shouldBeEqual assertNotNull(playedCard1) + } + } + playedCard2 = player2Hand.first() + + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first()))) } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - card shouldBeEqual assertNotNull(playedCard1) - } + + koinApplication { modules(appKoinModule) }.koin.apply { + val commandHandler by inject() + val eventStream by inject() + val playerNotificationListener by inject() + ReactionEventListener(get(), get(), get()).init() + playerNotificationListener.startListening(channelNotification1, player1) + playerNotificationListener.startListening(channelNotification2, player2) + + GlobalScope.launch(Dispatchers.IO) { + commandHandler.handle(player1, channelCommand1, channelNotification1) } - playedCard2 = player2Hand.first() - - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - } + GlobalScope.launch(Dispatchers.IO) { + commandHandler.handle(player2, channelCommand2, channelNotification2) } - channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first()))) + + joinAll(player1Job, player2Job) + + val state = id.buildStateFromEventStream(eventStream) + + state.aggregateId shouldBeEqual id + assertTrue(state.isStarted) + state.players shouldBeEqual setOf(player1, player2) + state.readyPlayers shouldBeEqual setOf(player1, player2) + state.direction shouldBeEqual GameState.Direction.CLOCKWISE + assertNotNull(state.cardOnCurrentStack) shouldBeEqual GameState.LastCard(assertNotNull(playedCard2), player2) } - - koinApplication { modules(appKoinModule) }.koin.apply { - val commandHandler by inject() - val eventStream by inject() - val playerNotificationListener by inject() - ReactionEventListener(get(), get(), get()).init() - playerNotificationListener.startListening(channelNotification1, player1) - playerNotificationListener.startListening(channelNotification2, player2) - - GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, channelCommand1, channelNotification1) - } - GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, channelCommand2, channelNotification2) - } - - joinAll(player1Job, player2Job) - - val state = id.buildStateFromEventStream(eventStream) - - state.gameId shouldBeEqual id - assertTrue(state.isStarted) - state.players shouldBeEqual setOf(player1, player2) - state.readyPlayers shouldBeEqual setOf(player1, player2) - state.direction shouldBeEqual GameState.Direction.CLOCKWISE - assertNotNull(state.cardOnCurrentStack) shouldBeEqual GameState.LastCard(assertNotNull(playedCard2), player2) } } }) diff --git a/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt new file mode 100644 index 0000000..170a882 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt @@ -0,0 +1,42 @@ +package eventDemo.libs.event + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.equals.shouldBeEqual +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch + +@OptIn(DelicateCoroutinesApi::class) +class VersionBuilderLocalTest : + FunSpec({ + + test("buildNextVersion") { + VersionBuilderLocal().run { + buildNextVersion() shouldBeEqual 1 + buildNextVersion() shouldBeEqual 2 + buildNextVersion() shouldBeEqual 3 + } + } + + test("buildNextVersion concurrently") { + val versionBuilder = VersionBuilderLocal() + (1..20) + .map { + GlobalScope.launch { + (1..1000).map { + versionBuilder.buildNextVersion() + } + } + }.joinAll() + versionBuilder.getLastVersion() shouldBeEqual 20 * 1000 + } + + test("getLastVersion") { + VersionBuilderLocal().run { + getLastVersion() shouldBeEqual 0 + getLastVersion() shouldBeEqual 0 + getLastVersion() shouldBeEqual 0 + } + } + })