diff --git a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt index eab12b9..3053465 100644 --- a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt @@ -37,7 +37,7 @@ data class ICantPlayCommand( if (playableCards.isEmpty()) { val takenCard = state.deck.stack.first() - eventHandler.handle { + eventHandler.handle(payload.aggregateId) { PlayerHavePassEvent( aggregateId = payload.aggregateId, player = payload.player, diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt index 85b5c92..3b8f25c 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt @@ -30,7 +30,7 @@ data class IWantToJoinTheGameCommand( eventHandler: GameEventHandler, ) { if (!state.isStarted) { - eventHandler.handle { + eventHandler.handle(payload.aggregateId) { NewPlayerEvent( aggregateId = payload.aggregateId, player = payload.player, diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt index 64bd030..af3aa6c 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt @@ -41,7 +41,7 @@ data class IWantToPlayCardCommand( } if (state.canBePlayThisCard(payload.player, payload.card)) { - eventHandler.handle { + eventHandler.handle(payload.aggregateId) { CardIsPlayedEvent( aggregateId = payload.aggregateId, card = payload.card, diff --git a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt index c48a91b..30844ff 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt @@ -39,7 +39,7 @@ data class IamReadyToPlayCommand( } else if (playerIsAlreadyReady) { playerErrorNotifier("You are already ready") } else { - eventHandler.handle { + eventHandler.handle(payload.aggregateId) { PlayerReadyEvent( aggregateId = payload.aggregateId, player = payload.player, diff --git a/src/main/kotlin/eventDemo/app/event/EventHandler.kt b/src/main/kotlin/eventDemo/app/event/EventHandler.kt index cfe56a5..267689f 100644 --- a/src/main/kotlin/eventDemo/app/event/EventHandler.kt +++ b/src/main/kotlin/eventDemo/app/event/EventHandler.kt @@ -9,5 +9,8 @@ import eventDemo.libs.event.Event interface EventHandler, ID : AggregateId> { fun registerProjectionBuilder(builder: (E) -> Unit) - fun handle(buildEvent: (version: Int) -> E): E + fun handle( + aggregateId: ID, + buildEvent: (version: Int) -> E, + ): E } diff --git a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt index cd9c8bc..b62dbf9 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt @@ -3,6 +3,10 @@ package eventDemo.app.event import eventDemo.app.entity.GameId import eventDemo.app.event.event.GameEvent import eventDemo.libs.event.VersionBuilder +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * A stream to publish and read the played card event. @@ -12,18 +16,26 @@ class GameEventHandler( private val eventStream: GameEventStream, private val versionBuilder: VersionBuilder, ) : EventHandler { - private val projectionsBuilders: MutableList<(GameEvent) -> Unit> = mutableListOf() + private val projectionsBuilders: ConcurrentLinkedQueue<(GameEvent) -> Unit> = ConcurrentLinkedQueue() + private val locks: ConcurrentHashMap = ConcurrentHashMap() override fun registerProjectionBuilder(builder: GameProjectionBuilder) { projectionsBuilders.add(builder) } - override fun handle(buildEvent: (version: Int) -> GameEvent): GameEvent = - buildEvent(versionBuilder.buildNextVersion()).also { event -> - eventStream.publish(event) - projectionsBuilders.forEach { it(event) } - eventBus.publish(event) - } + override fun handle( + aggregateId: GameId, + buildEvent: (version: Int) -> GameEvent, + ): GameEvent = + locks + .computeIfAbsent(aggregateId) { ReentrantLock() } + .withLock { + buildEvent(versionBuilder.buildNextVersion(aggregateId)) + .also { eventStream.publish(it) } + }.also { event -> + projectionsBuilders.forEach { it(event) } + eventBus.publish(event) + } } typealias GameProjectionBuilder = (GameEvent) -> Unit diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt index 4fd6fd0..0826666 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt @@ -1,8 +1,6 @@ package eventDemo.app.event.projection import eventDemo.app.entity.Card -import eventDemo.app.entity.GameId -import eventDemo.app.event.GameEventStream import eventDemo.app.event.event.CardIsPlayedEvent import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameStartedEvent @@ -14,23 +12,8 @@ import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerWinEvent import io.github.oshai.kotlinlogging.KotlinLogging -fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState { - val events = eventStream.readAll(this) - if (events.isEmpty()) return GameState(this) - return events.buildStateFromEvents().also { - KotlinLogging.logger {}.warn { "state is build from scratch for game: $this " } - } -} - -fun Collection.buildStateFromEvents(): GameState { - val gameId = this.firstOrNull()?.aggregateId ?: error("Cannot build GameState from an empty list") - return fold(GameState(gameId)) { state, event -> - state.apply(event) - } -} - -fun GameState?.apply(event: GameEvent): GameState = - (this ?: GameState(event.aggregateId)).let { state -> +fun GameState.apply(event: GameEvent): GameState = + this.let { state -> val logger = KotlinLogging.logger { } if (event is PlayerActionEvent) { if (state.currentPlayerTurn != event.player) { diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt index 8e07cb5..6580ee7 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt @@ -6,14 +6,16 @@ import eventDemo.app.event.GameEventStream import eventDemo.app.event.event.GameEvent class GameStateRepository( - private val eventStream: GameEventStream, + eventStream: GameEventStream, eventHandler: GameEventHandler, - maxSnapshotCacheSize: Int = 20, + snapshotConfig: SnapshotConfig = SnapshotConfig(), ) { private val projectionsSnapshot = ProjectionSnapshotRepositoryInMemory( - applyToProjection = GameState?::apply, - maxSnapshotCacheSize = maxSnapshotCacheSize, + eventStream = eventStream, + snapshotCacheConfig = snapshotConfig, + applyToProjection = GameState::apply, + initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, ) init { @@ -27,9 +29,7 @@ class GameStateRepository( * * It fetches it from the local cache if possible, otherwise it builds it. */ - fun getLast(gameId: GameId): GameState = - projectionsSnapshot.getLast(gameId) - ?: gameId.buildStateFromEventStream(eventStream) + fun getLast(gameId: GameId): GameState = projectionsSnapshot.getLast(gameId) /** * Get the [GameState] to the specific [event][GameEvent]. @@ -37,8 +37,5 @@ class GameStateRepository( * * It fetches it from the local cache if possible, otherwise it builds it. */ - fun getUntil(event: GameEvent): GameState = - projectionsSnapshot.getUntil(event) - ?: (eventStream.readAll(event.aggregateId).takeWhile { it != event } + event) - .buildStateFromEvents() + fun getUntil(event: GameEvent): GameState = projectionsSnapshot.getUntil(event) } diff --git a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt index cbebd20..c4cd6b8 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -2,53 +2,172 @@ package eventDemo.app.event.projection import eventDemo.libs.event.AggregateId import eventDemo.libs.event.Event +import eventDemo.libs.event.EventStream +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + +data class SnapshotConfig( + val maxSnapshotCacheSize: Int = 20, + val maxSnapshotCacheTtl: Duration = 10.minutes, +) class ProjectionSnapshotRepositoryInMemory, P : Projection, ID : AggregateId>( - private val maxSnapshotCacheSize: Int = 20, - private val applyToProjection: P?.(event: E) -> P, + private val eventStream: EventStream, + private val initialStateBuilder: (ID) -> P, + private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), + private val applyToProjection: P.(event: E) -> P, ) { - private val projectionsSnapshot: ConcurrentHashMap = ConcurrentHashMap() + private val projectionsSnapshot: ConcurrentHashMap>> = ConcurrentHashMap() - fun applyAndPutToCache(event: E): P { - // lock here - return projectionsSnapshot - .filterKeys { it.aggregateId == event.aggregateId } - .toList() - .find { (e, _) -> e.version == (event.version - 1) } - ?.second - .applyToProjection(event) - .also { projectionsSnapshot.put(event, it) } - .also { removeOldSnapshot() } - // Unlock here + /** + * Create a snapshot for the event + * + * 1. get the last snapshot with a version lower than that of the event + * 2. get the events with a greater version of the snapshot + * 3. apply the event to the snapshot + * 4. apply the new event to the projection + * 5. save it + * 6. remove old one + */ + fun applyAndPutToCache(event: E): P = + getUntil(event) + .also { + save(it) + removeOldSnapshot(it.aggregateId) + } + + /** + * Build the last version of the [Projection] from the cache. + * + * 1. get the last snapshot + * 2. get the missing event to the snapshot + * 3. apply the missing events to the snapshot + */ + fun getLast(aggregateId: ID): P { + val lastSnapshot = getLastSnapshot(aggregateId)?.first + val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot) + return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot) } - private fun removeOldSnapshot() { - if (projectionsSnapshot.size > maxSnapshotCacheSize) { - val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize + /** + * Build the [Projection] to the specific [event][Event]. + * + * It does not contain the [events][Event] it after this one. + * + * 1. get the last snapshot before the event + * 2. get the events with a greater version of the snapshot but lower of passed event + * 3. apply the events to the snapshot + */ + fun getUntil(event: E): P { + val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first + if (lastSnapshot?.lastEventVersion == event.version) { + return lastSnapshot + } - projectionsSnapshot - .keys - .sortedBy { it.version } - .take(numberToRemove) - .forEach { event -> - projectionsSnapshot.remove(event) - } + val missingEventOfSnapshot = + eventStream.readGreaterOfVersion( + event.aggregateId, + lastSnapshot?.lastEventVersion ?: 0, + ) + + return if (lastSnapshot?.lastEventVersion == event.version) { + lastSnapshot + } else { + lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot) } } /** - * Get the last version of the [Projection] from the cache. + * Remove the oldest snapshot. + * + * The rules are pass in the controller. */ - fun getLast(aggregateId: ID): P? = - projectionsSnapshot - .filter { it.key.aggregateId == aggregateId } - .maxByOrNull { (event, _) -> event.version } - ?.value + private fun removeOldSnapshot(aggregateId: ID) { + projectionsSnapshot[aggregateId]?.let { queue -> + // never remove the last one + val theLastOne = getLastSnapshot(aggregateId) + + // remove the oldest by time + val now = Clock.System.now() + val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl + val toRemove = queue.filter { deadLine > it.second } + (toRemove - theLastOne).forEach { queue.remove(it) } + + // Remove if size exceeds the limit + if (queue.size > snapshotCacheConfig.maxSnapshotCacheSize) { + val numberToRemove = projectionsSnapshot.size - snapshotCacheConfig.maxSnapshotCacheSize + if (numberToRemove > 0) { + queue + .sortedByDescending { it.first.lastEventVersion } + .take(numberToRemove) + .let { it - theLastOne } + .forEach { queue.remove(it) } + } + } + } + } /** - * Get the [Projection] to the specific [event][Event]. - * It does not contain the [events][Event] it after this one. + * Save the snapshot. */ - fun getUntil(event: E): P? = projectionsSnapshot.get(event) + private fun save(projection: P) { + projectionsSnapshot + .computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() } + .add(Pair(projection, Clock.System.now())) + } + + /** + * Get the last snapshot when the version is lower of then event version + */ + private fun getLastSnapshotBeforeOrEqualEvent(event: E) = + projectionsSnapshot[event.aggregateId] + ?.sortedByDescending { it.first.lastEventVersion } + ?.find { it.first.lastEventVersion <= event.version } + + /** + * Get the last snapshot (with the higher version). + */ + private fun getLastSnapshot(aggregateId: ID) = + projectionsSnapshot[aggregateId] + ?.maxByOrNull { it.first.lastEventVersion } + + /** + * Get the events from the [event stream][EventStream] when the version is higher of the snapshot. + * + * If the snapshot is null, it takes all events from the event [event stream][EventStream] + */ + private fun getEventAfterTheSnapshot( + aggregateId: ID, + snapshot: P?, + ) = eventStream + .readGreaterOfVersion(aggregateId, snapshot?.lastEventVersion ?: 0) + + /** + * Apply events to the projection. + */ + private fun P?.applyEvents( + aggregateId: ID, + eventsToApply: Set, + ): P = + eventsToApply + .fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure) + + /** + * Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event. + */ + private val applyToProjectionSecure: P.(event: E) -> P = { event -> + if (event.version == lastEventVersion + 1) { + applyToProjection(event) + } else if (event.version <= lastEventVersion) { + KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." } + this + } else { + error("The version of the event must follow directly after the version of the projection.") + } + } } diff --git a/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt b/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt index adb19c8..0cfc9f9 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/ReactionEventListener.kt @@ -36,7 +36,7 @@ class ReactionEventListener( ) { if (state.isReady && !state.isStarted) { val reactionEvent = - eventHandler.handle { + eventHandler.handle(state.aggregateId) { GameStartedEvent.new( id = state.aggregateId, players = state.players, @@ -65,7 +65,7 @@ class ReactionEventListener( val winner = state.playerHasNoCardLeft().firstOrNull() if (winner != null) { val reactionEvent = - eventHandler.handle { + eventHandler.handle(state.aggregateId) { PlayerWinEvent( aggregateId = state.aggregateId, player = winner, diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt index 8198eaa..56552ef 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt @@ -7,6 +7,7 @@ import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventHandler import eventDemo.app.event.GameEventStream import eventDemo.app.event.projection.GameStateRepository +import eventDemo.app.event.projection.SnapshotConfig import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.libs.command.CommandStreamChannelBuilder import eventDemo.libs.event.EventBusInMemory @@ -37,7 +38,7 @@ val appKoinModule = GameEventStream(EventStreamInMemory()) } single { - GameStateRepository(get(), get()) + GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig()) } single { CommandStreamChannelBuilder() diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index 897baad..be639b9 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -23,4 +23,9 @@ interface EventStream, ID : AggregateId> { /** Reads all events associated with a given aggregate ID */ fun readAll(aggregateId: ID): Set + + fun readGreaterOfVersion( + aggregateId: ID, + version: Int, + ): Set } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index 09595c4..598b15d 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -15,10 +15,12 @@ class EventStreamInMemory, ID : AggregateId> : EventStream private val events: Queue = ConcurrentLinkedQueue() override fun publish(event: E) { - events.add(event) - logger.atInfo { - message = "Event published: $event" - payload = mapOf("event" to event) + if (events.none { it.eventId == event.eventId }) { + events.add(event) + logger.atInfo { + message = "Event published: $event" + payload = mapOf("event" to event) + } } } @@ -40,6 +42,15 @@ class EventStreamInMemory, ID : AggregateId> : EventStream events .filter { it.aggregateId == aggregateId } .toSet() + + override fun readGreaterOfVersion( + aggregateId: ID, + version: Int, + ): Set = + events + .filter { it.aggregateId == aggregateId } + .filter { it.version > version } + .toSet() } inline fun , ID : AggregateId> EventStream.readLastOf(aggregateId: ID): R? = diff --git a/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt b/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt index 16b0d04..4f3e2df 100644 --- a/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt +++ b/src/main/kotlin/eventDemo/libs/event/VersionBuilder.kt @@ -1,7 +1,7 @@ package eventDemo.libs.event interface VersionBuilder { - fun buildNextVersion(): Int + fun buildNextVersion(aggregateId: AggregateId): Int - fun getLastVersion(): Int + fun getLastVersion(aggregateId: AggregateId): Int } diff --git a/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt b/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt index 4d8ae1f..3b5ca07 100644 --- a/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt +++ b/src/main/kotlin/eventDemo/libs/event/VersionBuilderLocal.kt @@ -1,11 +1,21 @@ package eventDemo.libs.event +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger class VersionBuilderLocal : VersionBuilder { - private val version: AtomicInteger = AtomicInteger(0) + private val logger = KotlinLogging.logger { } + private val versions: ConcurrentHashMap = ConcurrentHashMap() - override fun buildNextVersion(): Int = version.addAndGet(1) + override fun buildNextVersion(aggregateId: AggregateId): Int = + versionOfAggregate(aggregateId) + .addAndGet(1) + .also { logger.debug { "New version $it" } } - override fun getLastVersion(): Int = version.toInt() + override fun getLastVersion(aggregateId: AggregateId): Int = versionOfAggregate(aggregateId).toInt() + + private fun versionOfAggregate(aggregateId: AggregateId) = + versions + .computeIfAbsent(aggregateId) { AtomicInteger(0) } } diff --git a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt index 255172c..3188156 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt @@ -29,7 +29,7 @@ class GameStateBuilderTest : NewPlayerEvent( aggregateId = gameId, player = player1, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -41,7 +41,7 @@ class GameStateBuilderTest : NewPlayerEvent( aggregateId = gameId, player = player2, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -52,7 +52,7 @@ class GameStateBuilderTest : PlayerReadyEvent( aggregateId = gameId, player = player1, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -63,7 +63,7 @@ class GameStateBuilderTest : PlayerReadyEvent( aggregateId = gameId, player = player2, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -77,7 +77,7 @@ class GameStateBuilderTest : id = gameId, players = setOf(player1, player2), shuffleIsDisabled = true, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -94,7 +94,7 @@ class GameStateBuilderTest : aggregateId = gameId, card = playedCard, player = player1, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId @@ -111,7 +111,7 @@ class GameStateBuilderTest : aggregateId = gameId, card = playedCard, player = player2, - version = versionBuilder.buildNextVersion(), + version = versionBuilder.buildNextVersion(gameId), ) apply(event).also { state -> state.aggregateId shouldBeEqual gameId diff --git a/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt index 2ae3a5b..cbb965a 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/GameStateRepositoryTest.kt @@ -28,7 +28,7 @@ class GameStateRepositoryTest : val repo = get() val eventHandler = get() eventHandler - .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { event -> assertNotNull(repo.getUntil(event)).also { assertNotNull(it.players) shouldBeEqual setOf(player1) @@ -48,7 +48,7 @@ class GameStateRepositoryTest : val eventHandler = get() eventHandler - .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { assertNotNull(repo.getLast(aggregateId)).also { assertNotNull(it.players) shouldBeEqual setOf(player1) @@ -56,7 +56,7 @@ class GameStateRepositoryTest : } eventHandler - .handle { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } + .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .also { assertNotNull(repo.getLast(aggregateId)).also { assertNotNull(it.players) shouldBeEqual setOf(player1, player2) @@ -74,7 +74,7 @@ class GameStateRepositoryTest : val event1 = eventHandler - .handle { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } + .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { event1 -> assertNotNull(repo.getUntil(event1)).also { assertNotNull(it.players) shouldBeEqual setOf(player1) @@ -82,7 +82,7 @@ class GameStateRepositoryTest : } eventHandler - .handle { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } + .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .also { event2 -> assertNotNull(repo.getUntil(event2)).also { assertNotNull(it.players) shouldBeEqual setOf(player1, player2) @@ -108,7 +108,7 @@ class GameStateRepositoryTest : repeat(100) { r2 -> val playerX = Player("player$r$r2") eventHandler - .handle { + .handle(aggregateId) { NewPlayerEvent( aggregateId = aggregateId, player = playerX, @@ -119,8 +119,10 @@ class GameStateRepositoryTest : } }.joinAll() - repo.getLast(aggregateId).players shouldHaveSize 1000 - repo.getLast(aggregateId).lastEventVersion shouldBeEqual 1000 + repo.getLast(aggregateId).run { + lastEventVersion shouldBeEqual 1000 + players shouldHaveSize 1000 + } } } diff --git a/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt index cea409a..a72e5f2 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt @@ -2,6 +2,9 @@ package eventDemo.app.event.projection import eventDemo.libs.event.AggregateId import eventDemo.libs.event.Event +import eventDemo.libs.event.EventStream +import eventDemo.libs.event.EventStreamInMemory +import eventDemo.libs.event.VersionBuilderLocal import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.DelicateCoroutinesApi @@ -11,6 +14,8 @@ import kotlinx.coroutines.launch import kotlinx.datetime.Clock import kotlinx.datetime.Instant import java.util.UUID +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import kotlin.test.assertNotNull @OptIn(DelicateCoroutinesApi::class) @@ -18,53 +23,84 @@ class ProjectionSnapshotRepositoryInMemoryTest : FunSpec({ test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { - repeat(10) { - val repo = getRepoTest() - val aggregateId = IdTest() + val eventStream: EventStream = EventStreamInMemory() + val repo = getSnapshotRepoTest(eventStream) + val aggregateId = IdTest() - val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) - repo.applyAndPutToCache(eventOther) - assertNotNull(repo.getUntil(eventOther)).also { - assertNotNull(it.value) shouldBeEqual "valOther" - } + val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) + eventStream.publish(eventOther) + repo.applyAndPutToCache(eventOther) + assertNotNull(repo.getUntil(eventOther)).also { + assertNotNull(it.value) shouldBeEqual "valOther" + } - val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) - repo.applyAndPutToCache(event1) - assertNotNull(repo.getLast(event1.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } + val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) + eventStream.publish(event1) + repo.applyAndPutToCache(event1) + assertNotNull(repo.getLast(event1.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } - val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) - repo.applyAndPutToCache(event2) - assertNotNull(repo.getLast(event2.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1val2" - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event2)).also { - assertNotNull(it.value) shouldBeEqual "val1val2" - } + val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) + eventStream.publish(event2) + repo.applyAndPutToCache(event2) + assertNotNull(repo.getLast(event2.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event2)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" } } test("ProjectionSnapshotRepositoryInMemory should be thread safe") { - val repo = getRepoTest(2000) + val eventStream: EventStream = EventStreamInMemory() + val repo = getSnapshotRepoTest(eventStream) val aggregateId = IdTest() - (1..10) - .map { r -> + val versionBuilder = VersionBuilderLocal() + val lock = ReentrantLock() + (0..9) + .map { GlobalScope.launch { - repeat(10) { - val eventX = EventXTest(num = 1, version = r, aggregateId = aggregateId) + (1..10).map { + val eventX = + lock.withLock { + EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) + .also { eventStream.publish(it) } + } repo.applyAndPutToCache(eventX) } } }.joinAll() assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 } + + test("removeOldSnapshot") { + val versionBuilder = VersionBuilderLocal() + val eventStream: EventStream = EventStreamInMemory() + val repo = getSnapshotRepoTest(eventStream, SnapshotConfig(2)) + val aggregateId = IdTest() + + fun buildEndSendEventX() { + EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) + .also { eventStream.publish(it) } + .also { repo.applyAndPutToCache(it) } + } + + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 1 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 2 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 3 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 4 + } }) @JvmInline @@ -105,9 +141,16 @@ private data class EventXTest( val num: Int, ) : TestEvents -private fun getRepoTest(maxSnapshotCacheSize: Int = 2000): ProjectionSnapshotRepositoryInMemory = - ProjectionSnapshotRepositoryInMemory(maxSnapshotCacheSize) { event -> - (this ?: ProjectionTest(event.aggregateId)).let { projection -> +private fun getSnapshotRepoTest( + eventStream: EventStream, + snapshotConfig: SnapshotConfig = SnapshotConfig(2000), +): ProjectionSnapshotRepositoryInMemory = + ProjectionSnapshotRepositoryInMemory( + eventStream = eventStream, + initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, + snapshotCacheConfig = snapshotConfig, + ) { event -> + this.let { projection -> when (event) { is Event1Test -> { projection.copy(value = (projection.value ?: "") + event.value1) @@ -120,6 +163,8 @@ private fun getRepoTest(maxSnapshotCacheSize: Int = 2000): ProjectionSnapshotRep is EventXTest -> { projection.copy(num = projection.num + event.num) } - } + }.copy( + lastEventVersion = event.version, + ) } } diff --git a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt index 23d7602..ba19bc6 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt @@ -71,11 +71,11 @@ class GameStateRouteTest : val eventHandler by inject() val stateRepo by inject() runBlocking { - eventHandler.handle { NewPlayerEvent(gameId, player1, it) } - eventHandler.handle { NewPlayerEvent(gameId, player2, it) } - eventHandler.handle { PlayerReadyEvent(gameId, player1, it) } - eventHandler.handle { PlayerReadyEvent(gameId, player2, it) } - eventHandler.handle { + eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) } + eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } + eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } + eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } + eventHandler.handle(gameId) { GameStartedEvent.new( gameId, setOf(player1, player2), @@ -92,7 +92,7 @@ class GameStateRouteTest : it.color shouldBeEqual Card.Color.Red } delay(100) - eventHandler.handle { + eventHandler.handle(gameId) { CardIsPlayedEvent( gameId, assertNotNull(lastPlayedCard), diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index 3003f57..a9453cf 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -11,7 +11,8 @@ import eventDemo.app.entity.Player import eventDemo.app.event.GameEventStream import eventDemo.app.event.event.disableShuffleDeck import eventDemo.app.event.projection.GameState -import eventDemo.app.event.projection.buildStateFromEventStream +import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory +import eventDemo.app.event.projection.apply import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.app.eventListener.ReactionEventListener import eventDemo.app.notification.ItsTheTurnOfNotification @@ -148,7 +149,12 @@ class GameStateTest : joinAll(player1Job, player2Job) - val state = id.buildStateFromEventStream(eventStream) + val state = + ProjectionSnapshotRepositoryInMemory( + eventStream = eventStream, + initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, + applyToProjection = GameState::apply, + ).getLast(id) state.aggregateId shouldBeEqual id assertTrue(state.isStarted) diff --git a/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt index 170a882..2dab4b8 100644 --- a/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/VersionBuilderLocalTest.kt @@ -6,6 +6,12 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import java.util.UUID + +@JvmInline +private value class IdTest( + override val id: UUID = UUID.randomUUID(), +) : AggregateId @OptIn(DelicateCoroutinesApi::class) class VersionBuilderLocalTest : @@ -13,30 +19,34 @@ class VersionBuilderLocalTest : test("buildNextVersion") { VersionBuilderLocal().run { - buildNextVersion() shouldBeEqual 1 - buildNextVersion() shouldBeEqual 2 - buildNextVersion() shouldBeEqual 3 + val id = IdTest() + buildNextVersion(id) shouldBeEqual 1 + buildNextVersion(id) shouldBeEqual 2 + buildNextVersion(IdTest()) shouldBeEqual 1 + buildNextVersion(id) shouldBeEqual 3 } } test("buildNextVersion concurrently") { val versionBuilder = VersionBuilderLocal() + val id = IdTest() (1..20) .map { GlobalScope.launch { (1..1000).map { - versionBuilder.buildNextVersion() + versionBuilder.buildNextVersion(id) } } }.joinAll() - versionBuilder.getLastVersion() shouldBeEqual 20 * 1000 + versionBuilder.getLastVersion(id) shouldBeEqual 20 * 1000 } test("getLastVersion") { VersionBuilderLocal().run { - getLastVersion() shouldBeEqual 0 - getLastVersion() shouldBeEqual 0 - getLastVersion() shouldBeEqual 0 + val id = IdTest() + getLastVersion(id) shouldBeEqual 0 + getLastVersion(id) shouldBeEqual 0 + getLastVersion(id) shouldBeEqual 0 } } })