From cba9971ca168cc1e72ac798e4d1582d4f2144b52 Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Thu, 17 Apr 2025 01:24:25 +0200 Subject: [PATCH] feat: remove snapshot on ProjectionRepository --- .../projection/GameListRepositoryInMemory.kt | 24 +- .../projection/GameListRepositoryInRedis.kt | 18 +- .../projection/GameStateRepositoryInMemory.kt | 43 +--- .../projection/GameStateRepositoryInRedis.kt | 39 +-- .../interfaceLayer/query/ReadTheGameState.kt | 4 +- .../business/command/GameCommandHandler.kt | 14 +- .../business/command/action/ICantPlay.kt | 2 +- .../command/action/IWantToJoinTheGame.kt | 2 +- .../command/action/IWantToPlayCard.kt | 2 +- .../business/command/action/IamReadyToPlay.kt | 2 +- .../gameState/GameStateRepository.kt | 7 +- .../projectionListener/ReactionListener.kt | 4 +- .../injection/ConfigureDIInfrastructure.kt | 5 +- src/main/kotlin/eventDemo/libs/bus/Bus.kt | 15 +- .../kotlin/eventDemo/libs/bus/BusInMemory.kt | 10 +- .../eventDemo/libs/bus/BusInRabbitMQ.kt | 18 +- .../eventDemo/libs/command/CommandHandler.kt | 6 +- .../libs/command/CommandRunnerController.kt | 2 +- .../libs/command/CommandStreamChannel.kt | 4 +- .../eventDemo/libs/event/EventHandler.kt | 2 +- .../eventDemo/libs/event/EventHandlerImpl.kt | 8 +- .../event/projection/ProjectionRepository.kt | 34 +++ .../projection/ProjectionRepositoryAbs.kt | 64 +++++ .../ProjectionRepositoryInMemory.kt | 46 ++++ .../projection/ProjectionRepositoryInRedis.kt | 80 ++++++ .../ProjectionSnapshotRepository.kt | 35 --- .../ProjectionSnapshotRepositoryInMemory.kt | 226 ----------------- .../ProjectionSnapshotRepositoryInRedis.kt | 240 ------------------ .../libs/event/projection/SnapshotConfig.kt | 26 -- src/test/kotlin/eventDemo/Helpers.kt | 19 +- .../interfaceLayer/query/GameListRouteTest.kt | 19 +- .../query/GameSimulationTest.kt | 25 +- .../query/GameStateRouteTest.kt | 80 +++--- .../business/event/GameEventHandlerTest.kt | 3 + .../projection/GameStateRepositoryTest.kt | 63 +---- .../eventDemo/libs/event/EventHandlerTest.kt | 38 +++ .../projection/ProjectionRepositoryTest.kt} | 120 +++------ 37 files changed, 478 insertions(+), 871 deletions(-) create mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepository.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryAbs.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInMemory.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInRedis.kt delete mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt delete mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt delete mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt delete mode 100644 src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt create mode 100644 src/test/kotlin/eventDemo/libs/event/EventHandlerTest.kt rename src/test/kotlin/eventDemo/{business/event/projection/ProjectionSnapshotRepositoryTest.kt => libs/event/projection/ProjectionRepositoryTest.kt} (60%) diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt index a0ebc8f..4491510 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt @@ -2,28 +2,20 @@ package eventDemo.adapter.infrastructureLayer.event.projection import eventDemo.business.entity.GameId import eventDemo.business.event.GameEventBus -import eventDemo.business.event.GameEventStore import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.apply -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory -import eventDemo.libs.event.projection.SnapshotConfig +import eventDemo.libs.event.projection.ProjectionRepositoryInMemory import io.github.oshai.kotlinlogging.withLoggingContext /** * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. */ -class GameListRepositoryInMemory( - eventStore: GameEventStore, - snapshotConfig: SnapshotConfig = SnapshotConfig(), -) : GameListRepository { - private val projectionsSnapshot = - ProjectionSnapshotRepositoryInMemory( - name = GameListRepositoryInMemory::class, - eventStore = eventStore, - snapshotCacheConfig = snapshotConfig, +class GameListRepositoryInMemory : GameListRepository { + private val projectionsRepository = + ProjectionRepositoryInMemory( applyToProjection = GameList::apply, initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, ) @@ -32,11 +24,11 @@ class GameListRepositoryInMemory( projectionBus: GameProjectionBus, eventBus: GameEventBus, ) { - // On new event was received, build snapshot and publish it to the projection bus + // On new event was received, build projection and publish it to the projection bus eventBus.subscribe { event -> withLoggingContext("event" to event.toString()) { - projectionsSnapshot - .applyAndPutToCache(event) + projectionsRepository + .applyAndSave(event) .also { projectionBus.publish(it) } } } @@ -48,5 +40,5 @@ class GameListRepositoryInMemory( * It fetches it from the local cache if possible, otherwise it builds it. */ override fun getList(): List = - projectionsSnapshot.getList() + projectionsRepository.getList() } diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt index 6e4f539..cadbc34 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt @@ -2,14 +2,12 @@ package eventDemo.adapter.infrastructureLayer.event.projection import eventDemo.business.entity.GameId import eventDemo.business.event.GameEventBus -import eventDemo.business.event.GameEventStore import eventDemo.business.event.projection.GameList import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.apply -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis -import eventDemo.libs.event.projection.SnapshotConfig +import eventDemo.libs.event.projection.ProjectionRepositoryInRedis import io.github.oshai.kotlinlogging.withLoggingContext import kotlinx.serialization.json.Json import redis.clients.jedis.UnifiedJedis @@ -18,14 +16,10 @@ import redis.clients.jedis.UnifiedJedis * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. */ class GameListRepositoryInRedis( - eventStore: GameEventStore, jedis: UnifiedJedis, - snapshotConfig: SnapshotConfig = SnapshotConfig(), ) : GameListRepository { - private val projectionsSnapshot = - ProjectionSnapshotRepositoryInRedis( - eventStore = eventStore, - snapshotCacheConfig = snapshotConfig, + private val projectionsRepository = + ProjectionRepositoryInRedis( initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, projectionClass = GameList::class, projectionToJson = { Json.encodeToString(GameList.serializer(), it) }, @@ -40,8 +34,8 @@ class GameListRepositoryInRedis( ) { eventBus.subscribe { event -> withLoggingContext("event" to event.toString()) { - projectionsSnapshot - .applyAndPutToCache(event) + projectionsRepository + .applyAndSave(event) .also { projectionBus.publish(it) } } } @@ -53,5 +47,5 @@ class GameListRepositoryInRedis( * It fetches it from the local cache if possible, otherwise it builds it. */ override fun getList(): List = - projectionsSnapshot.getList() + projectionsRepository.getList() } diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt index b19bd88..612a195 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt @@ -2,28 +2,19 @@ package eventDemo.adapter.infrastructureLayer.event.projection import eventDemo.business.entity.GameId import eventDemo.business.event.GameEventBus -import eventDemo.business.event.GameEventStore -import eventDemo.business.event.event.GameEvent import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.apply -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory -import eventDemo.libs.event.projection.SnapshotConfig +import eventDemo.libs.event.projection.ProjectionRepositoryInMemory import io.github.oshai.kotlinlogging.withLoggingContext /** * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. */ -class GameStateRepositoryInMemory( - eventStore: GameEventStore, - snapshotConfig: SnapshotConfig = SnapshotConfig(), -) : GameStateRepository { - private val projectionsSnapshot = - ProjectionSnapshotRepositoryInMemory( - name = GameStateRepositoryInMemory::class, - eventStore = eventStore, - snapshotCacheConfig = snapshotConfig, +class GameStateRepositoryInMemory : GameStateRepository { + private val projectionsRepository = + ProjectionRepositoryInMemory( applyToProjection = GameState::apply, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, ) @@ -32,33 +23,19 @@ class GameStateRepositoryInMemory( projectionBus: GameProjectionBus, eventBus: GameEventBus, ) { - // On new event was received, build snapshot and publish it to the projection bus + // On new event was received, build projection and publish it to the projection bus eventBus.subscribe { event -> withLoggingContext("event" to event.toString()) { - projectionsSnapshot - .applyAndPutToCache(event) + projectionsRepository + .applyAndSave(event) .also { projectionBus.publish(it) } } } } /** - * Get the last version of the [GameState] from the all eventStream. - * - * It fetches it from the local cache if possible, otherwise it builds it. + * Get the [GameState]. */ - override fun getLast(gameId: GameId): GameState = - projectionsSnapshot.getLast(gameId) - - /** - * Get the [GameState] to the specific [event][GameEvent]. - * It does not contain the [events][GameEvent] it after this one. - * - * It fetches it from the local cache if possible, otherwise it builds it. - */ - override fun getUntil(event: GameEvent): GameState = - projectionsSnapshot.getUntil(event) - - override fun count(gameId: GameId): Int = - projectionsSnapshot.count(gameId) + override fun get(gameId: GameId): GameState = + projectionsRepository.get(gameId) } diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt index 969ea02..4fcf615 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt @@ -2,14 +2,11 @@ package eventDemo.adapter.infrastructureLayer.event.projection import eventDemo.business.entity.GameId import eventDemo.business.event.GameEventBus -import eventDemo.business.event.GameEventStore -import eventDemo.business.event.event.GameEvent import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.apply -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis -import eventDemo.libs.event.projection.SnapshotConfig +import eventDemo.libs.event.projection.ProjectionRepositoryInRedis import io.github.oshai.kotlinlogging.withLoggingContext import kotlinx.serialization.json.Json import redis.clients.jedis.UnifiedJedis @@ -18,14 +15,10 @@ import redis.clients.jedis.UnifiedJedis * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. */ class GameStateRepositoryInRedis( - eventStore: GameEventStore, jedis: UnifiedJedis, - snapshotConfig: SnapshotConfig = SnapshotConfig(), ) : GameStateRepository { - private val projectionsSnapshot = - ProjectionSnapshotRepositoryInRedis( - eventStore = eventStore, - snapshotCacheConfig = snapshotConfig, + private val projectionsRepository = + ProjectionRepositoryInRedis( initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, projectionClass = GameState::class, projectionToJson = { Json.encodeToString(GameState.serializer(), it) }, @@ -38,33 +31,19 @@ class GameStateRepositoryInRedis( projectionBus: GameProjectionBus, eventBus: GameEventBus, ) { - // On new event was received, build snapshot and publish it to the projection bus + // On new event was received, build projection and publish it to the projection bus eventBus.subscribe { event -> withLoggingContext("event" to event.toString()) { - projectionsSnapshot - .applyAndPutToCache(event) + projectionsRepository + .applyAndSave(event) .also { projectionBus.publish(it) } } } } /** - * Get the last version of the [GameState] from the all eventStream. - * - * It fetches it from the local cache if possible, otherwise it builds it. + * Get the [GameState]. */ - override fun getLast(gameId: GameId): GameState = - projectionsSnapshot.getLast(gameId) - - /** - * Get the [GameState] to the specific [event][GameEvent]. - * It does not contain the [events][GameEvent] it after this one. - * - * It fetches it from the local cache if possible, otherwise it builds it. - */ - override fun getUntil(event: GameEvent): GameState = - projectionsSnapshot.getUntil(event) - - override fun count(gameId: GameId): Int = - projectionsSnapshot.count(gameId) + override fun get(gameId: GameId): GameState = + projectionsRepository.get(gameId) } diff --git a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/ReadTheGameState.kt b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/ReadTheGameState.kt index 4f0b995..50dc4f6 100644 --- a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/ReadTheGameState.kt +++ b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/ReadTheGameState.kt @@ -38,7 +38,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) { // Read the last played card on the game. get { body -> gameStateRepository - .getLast(body.game.id) + .get(body.game.id) .cardOnCurrentStack ?.let { call.respond(it) } ?: call.response.status(HttpStatusCode.BadRequest) @@ -46,7 +46,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) { // Read the last played card on the game. get { body -> - val state = gameStateRepository.getLast(body.game.id) + val state = gameStateRepository.get(body.game.id) call.respond(state) } } diff --git a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt index 56d4734..3b16325 100644 --- a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt @@ -86,12 +86,12 @@ class GameCommandHandler( * If the command fail, send an [error notification][CommandErrorNotification], * if success, send a [success notification][CommandSuccessNotification] */ - suspend fun handle( + fun handle( player: Player, gameId: GameId, command: GameCommand, - sendSuccess: suspend () -> Unit, - sendError: suspend (message: String) -> Unit, + sendSuccess: () -> Unit, + sendError: (message: String) -> Unit, ) { if (command.payload.aggregateId.id != gameId.id) { logger.warn { "Handle command Refuse, the gameId of the command is not the same" } @@ -114,26 +114,26 @@ class GameCommandHandler( } } -private fun SendChannel.sendSuccess(command: GameCommand): suspend () -> Unit = +private fun SendChannel.sendSuccess(command: GameCommand): () -> Unit = { val logger = KotlinLogging.logger { } CommandSuccessNotification(commandId = command.id) .also { notification -> withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) { logger.debug { "Notification SUCCESS sent" } - send(notification) + trySend(notification) } } } -private fun SendChannel.sendError(command: GameCommand): suspend (message: String) -> Unit = +private fun SendChannel.sendError(command: GameCommand): (message: String) -> Unit = { val logger = KotlinLogging.logger { } CommandErrorNotification(message = it, command = command) .also { notification -> withLoggingContext("notification" to notification.toString(), "command" to command.toString()) { logger.warn { "Notification ERROR sent: ${notification.message}" } - send(notification) + trySend(notification) } } } diff --git a/src/main/kotlin/eventDemo/business/command/action/ICantPlay.kt b/src/main/kotlin/eventDemo/business/command/action/ICantPlay.kt index 0d365e8..2084d56 100644 --- a/src/main/kotlin/eventDemo/business/command/action/ICantPlay.kt +++ b/src/main/kotlin/eventDemo/business/command/action/ICantPlay.kt @@ -12,7 +12,7 @@ data class ICantPlay( private val gameStateRepository: GameStateRepository, ) : CommandAction { override fun run(command: ICantPlayCommand): (version: Int) -> PlayerHavePassEvent { - val state = gameStateRepository.getLast(command.payload.aggregateId) + val state = gameStateRepository.get(command.payload.aggregateId) if (state.currentPlayerTurn != command.payload.player) { throw CommandException("Its not your turn!") diff --git a/src/main/kotlin/eventDemo/business/command/action/IWantToJoinTheGame.kt b/src/main/kotlin/eventDemo/business/command/action/IWantToJoinTheGame.kt index 29f5ca4..34b67be 100644 --- a/src/main/kotlin/eventDemo/business/command/action/IWantToJoinTheGame.kt +++ b/src/main/kotlin/eventDemo/business/command/action/IWantToJoinTheGame.kt @@ -12,7 +12,7 @@ data class IWantToJoinTheGame( private val gameStateRepository: GameStateRepository, ) : CommandAction { override fun run(command: IWantToJoinTheGameCommand): (version: Int) -> NewPlayerEvent { - val state = gameStateRepository.getLast(command.payload.aggregateId) + val state = gameStateRepository.get(command.payload.aggregateId) if (!state.isStarted) { return { NewPlayerEvent( diff --git a/src/main/kotlin/eventDemo/business/command/action/IWantToPlayCard.kt b/src/main/kotlin/eventDemo/business/command/action/IWantToPlayCard.kt index 6266aa7..50c3aa1 100644 --- a/src/main/kotlin/eventDemo/business/command/action/IWantToPlayCard.kt +++ b/src/main/kotlin/eventDemo/business/command/action/IWantToPlayCard.kt @@ -12,7 +12,7 @@ data class IWantToPlayCard( private val gameStateRepository: GameStateRepository, ) : CommandAction { override fun run(command: IWantToPlayCardCommand): (version: Int) -> CardIsPlayedEvent { - val state = gameStateRepository.getLast(command.payload.aggregateId) + val state = gameStateRepository.get(command.payload.aggregateId) if (!state.isStarted) { throw CommandException("The game is Not started") diff --git a/src/main/kotlin/eventDemo/business/command/action/IamReadyToPlay.kt b/src/main/kotlin/eventDemo/business/command/action/IamReadyToPlay.kt index 0213419..f148632 100644 --- a/src/main/kotlin/eventDemo/business/command/action/IamReadyToPlay.kt +++ b/src/main/kotlin/eventDemo/business/command/action/IamReadyToPlay.kt @@ -13,7 +13,7 @@ class IamReadyToPlay( ) : CommandAction { @Throws(CommandException::class) override fun run(command: IamReadyToPlayCommand): (version: Int) -> PlayerReadyEvent { - val state = gameStateRepository.getLast(command.payload.aggregateId) + val state = gameStateRepository.get(command.payload.aggregateId) val playerExist: Boolean = state.players.contains(command.payload.player) val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player) diff --git a/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt b/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt index b8cd07a..8f3187f 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt @@ -1,12 +1,7 @@ package eventDemo.business.event.projection import eventDemo.business.entity.GameId -import eventDemo.business.event.event.GameEvent interface GameStateRepository { - fun getLast(gameId: GameId): GameState - - fun getUntil(event: GameEvent): GameState - - fun count(gameId: GameId): Int + fun get(gameId: GameId): GameState } diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt index 5813719..3479463 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt @@ -37,7 +37,7 @@ class ReactionListener( } } - private suspend fun sendStartGameEvent(state: GameState) { + private fun sendStartGameEvent(state: GameState) { if (state.isReady && !state.isStarted) { val reactionEvent = eventHandler.handle(state.aggregateId) { @@ -54,7 +54,7 @@ class ReactionListener( } } - private suspend fun sendWinnerEvent(state: GameState) { + private fun sendWinnerEvent(state: GameState) { val winner = state.playerHasNoCardLeft().firstOrNull() if (winner != null) { val reactionEvent = diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index 0c14786..f1147d0 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -13,7 +13,6 @@ import eventDemo.business.event.GameEventStore import eventDemo.business.event.projection.GameListRepository import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.GameStateRepository -import eventDemo.libs.event.projection.SnapshotConfig import org.koin.core.module.Module import org.koin.core.module.dsl.singleOf import org.koin.core.scope.Scope @@ -65,10 +64,10 @@ fun Module.configureDIInfrastructure(config: Configuration) { singleOf(::GameProjectionBusInRabbitMQ) bind GameProjectionBus::class single { - GameStateRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) + GameStateRepositoryInRedis(get()) } bind GameStateRepository::class single { - GameListRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) + GameListRepositoryInRedis(get()) } bind GameListRepository::class } diff --git a/src/main/kotlin/eventDemo/libs/bus/Bus.kt b/src/main/kotlin/eventDemo/libs/bus/Bus.kt index 3f661ed..89381eb 100644 --- a/src/main/kotlin/eventDemo/libs/bus/Bus.kt +++ b/src/main/kotlin/eventDemo/libs/bus/Bus.kt @@ -1,13 +1,22 @@ package eventDemo.libs.bus interface Bus { - suspend fun publish(item: T) + /** + * Publish a new [message][item] to the bus. + */ + fun publish(item: T) /** - * @param priority The higher the priority, the more it will be called first + * Subscribe a [lambda][block] to the bus. + * + * When a message is sent to the bus, the [block] is executed. */ - fun subscribe(block: suspend (T) -> Unit): Subscription + fun subscribe(block: (T) -> Unit): Subscription + /** + * The returns of the [subscribe] method. + * It can be called to [cancel][close] the subscription. + */ interface Subscription : AutoCloseable { override fun close() } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt index 9e002e2..53ec23b 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt @@ -9,21 +9,19 @@ class BusInMemory( val name: KClass<*> = BusInMemory::class, ) : Bus { private val logger = KotlinLogging.logger(name.qualifiedName.toString()) - private val subscribers: MutableList Unit> = mutableListOf() + private val subscribers: MutableList<(E) -> Unit> = mutableListOf() - override suspend fun publish(item: E) { + override fun publish(item: E) { withLoggingContext("busItem" to item.toString()) { logger.info { "Item sent to the bus" } subscribers .forEach { - coroutineScope { - it(item) - } + it(item) } } } - override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { + override fun subscribe(block: (E) -> Unit): Bus.Subscription { subscribers.add(block) return object : Bus.Subscription { override fun close() { diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt index f75f9c2..0efc12f 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt @@ -42,21 +42,19 @@ class BusInRabbitMQ( } } - override suspend fun publish(item: E) { + override fun publish(item: E) { connection .createChannel() - .use { - it.basicPublish( - exchangeName, - routingKey, - AMQP.BasicProperties(), - objectToString(item).toByteArray(), - ) - } + .basicPublish( + exchangeName, + routingKey, + AMQP.BasicProperties(), + objectToString(item).toByteArray(), + ) logger.info { "Item sent to the bus" } } - override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { + override fun subscribe(block: (E) -> Unit): Bus.Subscription { connection .createChannel() .also { channel -> diff --git a/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt b/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt index 61ded43..3ddda75 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt @@ -44,7 +44,7 @@ class CommandHandler, E : Event, ID : AggregateId, C : Command>( * * It restricts to run only once the [command]. */ - suspend fun handle( + fun handle( aggregateId: ID, command: C, callback: CommandCallback, @@ -99,10 +99,10 @@ private class EventCommandMap>( val event: E, val date: Instant, ) { - suspend operator fun invoke(error: CommandException? = null) { + operator fun invoke(error: CommandException? = null) { callback(command, error) } } } -typealias CommandCallback = suspend (command: C, error: CommandException?) -> Unit +typealias CommandCallback = (command: C, error: CommandException?) -> Unit diff --git a/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt b/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt index 697cd39..ab36f1b 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt @@ -14,7 +14,7 @@ class CommandRunnerController( ) { private val executedCommand: ConcurrentHashMap> = ConcurrentHashMap() - suspend fun runOnlyOnce( + fun runOnlyOnce( command: C, action: CommandBlock, ) { diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index 0ac22fe..f9d18f7 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -34,7 +34,7 @@ class CommandStreamChannel( } } - private suspend fun runAndLogStatus( + private fun runAndLogStatus( command: C, action: CommandBlock, ) { @@ -47,4 +47,4 @@ class CommandStreamChannel( } } -typealias CommandBlock = suspend (C) -> Unit +typealias CommandBlock = (C) -> Unit diff --git a/src/main/kotlin/eventDemo/libs/event/EventHandler.kt b/src/main/kotlin/eventDemo/libs/event/EventHandler.kt index 0991099..00eb39b 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventHandler.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventHandler.kt @@ -4,7 +4,7 @@ package eventDemo.libs.event * A stream to publish and read the played card event. */ interface EventHandler, ID : AggregateId> { - suspend fun handle( + fun handle( aggregateId: ID, buildEvent: (version: Int) -> E, ): E diff --git a/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt b/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt index 5bb5826..d577530 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventHandlerImpl.kt @@ -19,7 +19,7 @@ class EventHandlerImpl, ID : AggregateId>( /** * Build Event then send it to the event store and bus. */ - override suspend fun handle( + override fun handle( aggregateId: ID, buildEvent: (version: Int) -> E, ): E = @@ -34,13 +34,9 @@ class EventHandlerImpl, ID : AggregateId>( .also { withLoggingContext("event" to it.toString()) { eventStore.publish(it) + eventBus.publish(it) } } - }.also { event -> - withLoggingContext("event" to event.toString()) { - // Publish to the bus - eventBus.publish(event) - } } } } diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepository.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepository.kt new file mode 100644 index 0000000..ed48797 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepository.kt @@ -0,0 +1,34 @@ +package eventDemo.libs.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event + +interface ProjectionRepository, P : Projection, ID : AggregateId> { + /** + * Update projection with the event. + */ + fun apply(event: E): P + + /** + * Update projection with the event, and save it. + */ + fun applyAndSave(event: E): P + + /** + * Save the projection. + */ + fun save(projection: P) + + /** + * Build the list of all [Projections][Projection] + */ + fun getList( + limit: Int = 100, + offset: Int = 0, + ): List

+ + /** + * Build the last version of the [Projection] from the cache. + */ + fun get(aggregateId: ID): P +} diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryAbs.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryAbs.kt new file mode 100644 index 0000000..f8f98bf --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryAbs.kt @@ -0,0 +1,64 @@ +package eventDemo.libs.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import io.github.oshai.kotlinlogging.KotlinLogging +import io.github.oshai.kotlinlogging.withLoggingContext +import io.ktor.util.collections.ConcurrentMap + +/** + * Repository abstraction to declare common process + */ +abstract class ProjectionRepositoryAbs, P : Projection, ID : AggregateId>( + private val applyToProjection: P.(event: E) -> P, +) : ProjectionRepository { + private val logger = KotlinLogging.logger {} + + /** + * Update projection with the event. + * + * 1. get the last projection + * 2. apply the new event to the projection + */ + override fun apply(event: E): P = + get(event.aggregateId).applyToProjectionSecure(event) + + /** + * Update projection with the event, and save it. + * + * 1. get the last projection + * 2. apply the new event to projection + * 3. save it + */ + override fun applyAndSave(event: E): P = + apply(event) + .also { + withLoggingContext("projection" to it.toString(), "event" to event.toString()) { + save(it) + } + } + + /** + * Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event. + */ + protected val applyToProjectionSecure: P.(event: E) -> P = { event -> + withLoggingContext("event" to event.toString(), "projection" to this.toString()) { + if (canBeApply(event)) { + applyToProjection(event) + } else if (event.version <= lastEventVersion) { + "Event is already in the Projection, skip apply.".let { + logger.warn { it } + error(it) + } + } else { + "The version of the event must follow directly after the version of the projection.".let { + logger.error { it } + error(it) + } + } + } + } + + private fun P.canBeApply(event: E): Boolean = + event.version == lastEventVersion + 1 +} diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInMemory.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInMemory.kt new file mode 100644 index 0000000..3098c48 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInMemory.kt @@ -0,0 +1,46 @@ +package eventDemo.libs.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import java.util.concurrent.ConcurrentHashMap + +class ProjectionRepositoryInMemory, P : Projection, ID : AggregateId>( + private val initialStateBuilder: (aggregateId: ID) -> P, + applyToProjection: P.(event: E) -> P, +) : ProjectionRepositoryAbs(applyToProjection), + ProjectionRepository { + private val projections: ConcurrentHashMap = ConcurrentHashMap() + + /** + * Build the list of all [Projections][Projection] + */ + override fun getList( + limit: Int, + offset: Int, + ): List

= + projections + .values + .drop(offset) + .take(limit) + + /** + * Get the [Projection]. + */ + override fun get(aggregateId: ID): P = + projections[aggregateId] + ?: initialStateBuilder(aggregateId) + + /** + * Save the projection. + */ + override fun save(projection: P) { + projections.compute(projection.aggregateId) { id: ID, proj: P? -> + val currentProjection = proj ?: initialStateBuilder(projection.aggregateId) + if (currentProjection.lastEventVersion < projection.lastEventVersion) { + projection + } else { + currentProjection + } + } + } +} diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInRedis.kt new file mode 100644 index 0000000..eba9554 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryInRedis.kt @@ -0,0 +1,80 @@ +package eventDemo.libs.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import io.github.oshai.kotlinlogging.KotlinLogging +import redis.clients.jedis.UnifiedJedis +import redis.clients.jedis.params.ScanParams +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import kotlin.reflect.KClass + +class ProjectionRepositoryInRedis, P : Projection, ID : AggregateId>( + private val jedis: UnifiedJedis, + private val initialStateBuilder: (aggregateId: ID) -> P, + private val projectionClass: KClass

, + private val projectionToJson: (P) -> String, + private val jsonToProjection: (String) -> P, + applyToProjection: P.(event: E) -> P, +) : ProjectionRepositoryAbs(applyToProjection), + ProjectionRepository { + val logger = KotlinLogging.logger { } + private val lock = ReentrantLock() + + /** + * Get the list of all [Projections][Projection] + */ + override fun getList( + limit: Int, + offset: Int, + ): List

= + jedis + .hscan( + projectionClass.redisHKey, + offset.toString(), + ScanParams() + .match("*") + .count(limit), + ).result + .mapNotNull { + jsonToProjection(it.value) + } + + /** + * Get the [Projection]. + */ + override fun get(aggregateId: ID): P = + jedis + .hget( + projectionClass.redisHKey, + aggregateId.id.toString(), + ).let { + if (it == null || it == "nil") { + initialStateBuilder(aggregateId) + } else { + jsonToProjection(it) + } + } + + override fun save(projection: P) { + lock.withLock { + if (get(projection.aggregateId).lastEventVersion < projection.lastEventVersion) { + jedis.hset( + projection.redisHKey, + projection.aggregateId.id.toString(), + projectionToJson(projection), + ) + logger.info { "Projection saved" } + } else { + logger.error { "Projection save SKIP (an early version exists)" } + error("Projection save SKIP (an early version exists)") + } + } + } +} + +private val

> KClass

.redisHKey: String get() = + "projection:$simpleName" + +private val

> P.redisHKey: String get() = + this::class.redisHKey diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt deleted file mode 100644 index 042d090..0000000 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt +++ /dev/null @@ -1,35 +0,0 @@ -package eventDemo.libs.event.projection - -import eventDemo.libs.event.AggregateId -import eventDemo.libs.event.Event - -interface ProjectionSnapshotRepository, P : Projection, ID : AggregateId> { - /** - * Create a snapshot for the event - */ - suspend fun applyAndPutToCache(event: E): P - - fun count(aggregateId: ID): Int - - fun countAll(): Int - - /** - * Build the list of all [Projections][Projection] - */ - fun getList( - limit: Int = 100, - offset: Int = 0, - ): List

- - /** - * Build the last version of the [Projection] from the cache. - */ - fun getLast(aggregateId: ID): P - - /** - * Build the [Projection] to the specific [event][Event]. - * - * It does not contain the [events][Event] it after this one. - */ - fun getUntil(event: E): P -} diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt deleted file mode 100644 index 8531ca9..0000000 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ /dev/null @@ -1,226 +0,0 @@ -package eventDemo.libs.event.projection - -import eventDemo.libs.event.AggregateId -import eventDemo.libs.event.Event -import eventDemo.libs.event.EventStore -import eventDemo.libs.event.EventStream -import io.github.oshai.kotlinlogging.KotlinLogging -import io.github.oshai.kotlinlogging.withLoggingContext -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedQueue -import kotlin.reflect.KClass - -class ProjectionSnapshotRepositoryInMemory, P : Projection, ID : AggregateId>( - val name: KClass<*> = ProjectionSnapshotRepositoryInMemory::class, - private val eventStore: EventStore, - private val initialStateBuilder: (aggregateId: ID) -> P, - private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), - private val applyToProjection: P.(event: E) -> P, -) : ProjectionSnapshotRepository { - private val projectionsSnapshot: ConcurrentHashMap>> = ConcurrentHashMap() - private val logger = KotlinLogging.logger(name.qualifiedName.toString()) - - /** - * Create a snapshot for the event - * - * 1. get the last snapshot with a version lower than that of the event - * 2. get the events with a greater version of the snapshot - * 3. apply the event to the snapshot - * 4. apply the new event to the projection - * 5. save it - * 6. remove old one - */ - override suspend fun applyAndPutToCache(event: E): P = - getUntil(event) - .also { - withLoggingContext("projection" to it.toString()) { - save(it) - removeOldSnapshot(it.aggregateId) - } - } - - override fun count(aggregateId: ID): Int = - projectionsSnapshot[aggregateId]?.count() ?: 0 - - override fun countAll(): Int = - projectionsSnapshot.mappingCount().toInt() - - /** - * Build the list of all [Projections][Projection] - */ - override fun getList( - limit: Int, - offset: Int, - ): List

= - projectionsSnapshot - .map { (id, b) -> - getLast(id) - }.drop(offset) - .take(limit) - - /** - * Build the last version of the [Projection] from the cache. - * - * 1. get the last snapshot - * 2. get the missing event to the snapshot - * 3. apply the missing events to the snapshot - */ - override fun getLast(aggregateId: ID): P { - val lastSnapshot = getLastSnapshot(aggregateId)?.first - val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot) - return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot) - } - - /** - * Build the [Projection] to the specific [event][Event]. - * - * It does not contain the [events][Event] it after this one. - * - * 1. get the last snapshot before the event - * 2. get the events with a greater version of the snapshot but lower of passed event - * 3. apply the events to the snapshot - */ - override fun getUntil(event: E): P { - val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first - if (lastSnapshot?.lastEventVersion == event.version) { - return lastSnapshot - } - - val missingEventOfSnapshot = - eventStore - .getStream(event.aggregateId) - // take the last snapshot version +1 to event version - .readVersionBetween(lastSnapshot, event) - - return if (lastSnapshot?.lastEventVersion == event.version) { - lastSnapshot - } else { - lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot) - } - } - - /** - * Remove the oldest [snapshot][P] of the [queue][projectionsSnapshot]. - * - * The rules are pass in the controller. - */ - private fun removeOldSnapshot(aggregateId: ID) { - projectionsSnapshot[aggregateId]?.let { queue -> - if (snapshotCacheConfig.enabled) { - queue - .excludeFirstAndLast() - .excludeTheHeadBySize() - .excludeNewerByDate() - .excludeByModulo() - .forEach { queue.remove(it) } - } - } - } - - /** - * Return a new list without the first and last snapshot. - * - * Exclude from deletion the first and the last. - */ - private fun FilteredList

.excludeFirstAndLast(): FilteredList

= - sortedBy { it.first.lastEventVersion } - .drop(1) - .dropLast(1) - - /** - * Return a new list of event filtered by the version modulo. - * - * Exclude from deletion 1 element out of 10 (if modulo 10 in [config][snapshotCacheConfig]). - */ - private fun FilteredList

.excludeByModulo(): FilteredList

= - filter { (it.first.lastEventVersion % snapshotCacheConfig.modulo) != 1 } - - /** - * Return a new list of event filtered by the maximum size. - * - * Exclude from removal all [snapshot][projectionsSnapshot] that in the head of the queue. - */ - private fun FilteredList

.excludeTheHeadBySize(): FilteredList

{ - // filter if size exceeds the limit - return sortedBy { it.first.lastEventVersion } - .dropLast(snapshotCacheConfig.maxSnapshotCacheSize) - } - - /** - * Return a new list of event filtered by the maximum date. - * - * Exclude from removal all [snapshot][projectionsSnapshot] that newer of the date (in [config][SnapshotConfig]). - */ - private fun FilteredList

.excludeNewerByDate(): FilteredList

{ - val now = Clock.System.now() - val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl - return filter { deadLine < it.second } - } - - /** - * Save the snapshot. - */ - private fun save(projection: P) { - projectionsSnapshot - .computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() } - .add(Pair(projection, Clock.System.now())) - .also { logger.info { "Projection saved" } } - } - - /** - * Get the last snapshot when the version is lower of then event version - */ - private fun getLastSnapshotBeforeOrEqualEvent(event: E) = - projectionsSnapshot[event.aggregateId] - ?.sortedByDescending { it.first.lastEventVersion } - ?.find { it.first.lastEventVersion <= event.version } - - /** - * Get the last snapshot (with the higher version). - */ - private fun getLastSnapshot(aggregateId: ID) = - projectionsSnapshot[aggregateId] - ?.maxByOrNull { it.first.lastEventVersion } - - /** - * Get the events from the [event stream][EventStream] when the version is higher of the snapshot. - * - * If the snapshot is null, it takes all events from the event [event stream][EventStream] - */ - private fun getEventAfterTheSnapshot( - aggregateId: ID, - snapshot: P?, - ) = - eventStore - .getStream(aggregateId) - .readGreaterOfVersion(snapshot?.lastEventVersion ?: 0) - - /** - * Apply events to the projection. - */ - private fun P?.applyEvents( - aggregateId: ID, - eventsToApply: Set, - ): P = - eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure) - - /** - * Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event. - */ - private val applyToProjectionSecure: P.(event: E) -> P = { event -> - withLoggingContext("event" to event.toString(), "projection" to this.toString()) { - if (event.version == lastEventVersion + 1) { - applyToProjection(event) - } else if (event.version <= lastEventVersion) { - KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." } - this - } else { - error("The version of the event must follow directly after the version of the projection.") - } - } - } -} - -private typealias FilteredList

= Collection> diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt deleted file mode 100644 index 5141c35..0000000 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt +++ /dev/null @@ -1,240 +0,0 @@ -package eventDemo.libs.event.projection - -import eventDemo.libs.event.AggregateId -import eventDemo.libs.event.Event -import eventDemo.libs.event.EventStore -import eventDemo.libs.toRanges -import io.github.oshai.kotlinlogging.KotlinLogging -import io.github.oshai.kotlinlogging.withLoggingContext -import redis.clients.jedis.UnifiedJedis -import redis.clients.jedis.params.ScanParams -import redis.clients.jedis.params.SortingParams -import kotlin.reflect.KClass - -class ProjectionSnapshotRepositoryInRedis, P : Projection, ID : AggregateId>( - private val eventStore: EventStore, - private val jedis: UnifiedJedis, - private val initialStateBuilder: (aggregateId: ID) -> P, - private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), - private val projectionClass: KClass

, - private val projectionToJson: (P) -> String, - private val jsonToProjection: (String) -> P, - private val applyToProjection: P.(event: E) -> P, -) : ProjectionSnapshotRepository { - val logger = KotlinLogging.logger { } - - /** - * Create a snapshot for the event - * - * 1. get the last snapshot with a version lower than that of the event - * 2. get the events with a greater version of the snapshot - * 3. apply the event to the snapshot - * 4. apply the new event to the projection - * 5. save it - * 6. remove old one - */ - override suspend fun applyAndPutToCache(event: E): P = - getUntil(event) - .also { - withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) { - save(it) - removeOldSnapshot(it.aggregateId, event.version) - } - } - - override fun count(aggregateId: ID): Int = - jedis.zcount(projectionClass.redisKey(aggregateId), Double.MIN_VALUE, Double.MAX_VALUE).toInt() - - override fun countAll(): Int = - jedis.zcount(projectionClass.redisKey, Double.MIN_VALUE, Double.MAX_VALUE).toInt() - - /** - * Get the list of all [Projections][Projection] - */ - override fun getList( - limit: Int, - offset: Int, - ): List

= - jedis - .scan( - offset.toString(), - ScanParams() - .match(projectionClass.redisKeySearchList) - .count(limit), - ).result - .mapNotNull { key -> - getLastByKey(key) - ?.let(jsonToProjection) - } - - /** - * Get the last version of the [Projection] from the cache. - * - * 1. get the last snapshot - * 2. get the missing event to the snapshot - * 3. apply the missing events to the snapshot - */ - override fun getLast(aggregateId: ID): P = - getLastByKey(projectionClass.redisKey(aggregateId)) - ?.let(jsonToProjection) - ?: initialStateBuilder(aggregateId) - - private fun getLastByKey(key: String): String? = - jedis - .sort( - key, - SortingParams() - .desc() - .by("score") - .limit(0, 1), - ).firstOrNull() - - /** - * Build the [Projection] to the specific [event][Event]. - * - * It does not contain the [events][Event] it after this one. - * - * 1. get the last snapshot before the event - * 2. get the events with a greater version of the snapshot but lower of passed event - * 3. apply the events to the snapshot - */ - override fun getUntil(event: E): P { - val lastSnapshot = - jedis - .zrangeByScore( - projectionClass.redisKey(event.aggregateId), - 1.0, - event.version.toDouble(), - 0, - 1, - ).firstOrNull() - ?.let(jsonToProjection) - if (lastSnapshot?.lastEventVersion == event.version) { - return lastSnapshot - } - if (lastSnapshot != null && lastSnapshot.lastEventVersion > event.version) { - logger.error { "Cannot be apply event on more recent snapshot" } - error("Cannot be apply event on more recent snapshot") - } - - val missingEventOfSnapshot = - eventStore - .getStream(event.aggregateId) - // take the last snapshot version +1 to event version - .readVersionBetween(lastSnapshot, event) - - return if (lastSnapshot?.lastEventVersion == event.version) { - lastSnapshot - } else { - lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot) - } - } - - private fun save(projection: P) { - val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection)) - if (added < 1) { - logger.error { "Projection NOT saved (already exists)" } - } else { - logger.info { "Projection saved" } - if (snapshotCacheConfig.maxSnapshotCacheTtl.isFinite()) { - jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) - } - } - } - - /** - * Apply events to the projection. - */ - private fun P?.applyEvents( - aggregateId: ID, - eventsToApply: Set, - ): P = - eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure) - - /** - * Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event. - */ - private val applyToProjectionSecure: P.(event: E) -> P = { event -> - withLoggingContext("event" to event.toString(), "projection" to this.toString()) { - if (event.version == lastEventVersion + 1) { - applyToProjection(event) - } else if (event.version <= lastEventVersion) { - KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." } - this - } else { - error("The version of the event must follow directly after the version of the projection.") - } - } - } - - fun removeOldSnapshot( - aggregateId: AggregateId, - lastVersion: Int, - ) { - if (snapshotCacheConfig.enabled) { - removeByModulo(aggregateId, lastVersion) - removeTheHeadBySize(aggregateId, lastVersion) - } - } - - private fun removeByModulo( - aggregateId: AggregateId, - lastVersion: Int, - ) { - (lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo)) - .let { if (it < 2) 2 else it } - .let { IntRange(it, lastVersion - 1) } - .filter { (it % snapshotCacheConfig.modulo) != 1 } - .toRanges() - .map { - jedis - .zremrangeByScore( - projectionClass.redisKey(aggregateId), - it.first.toDouble(), - it.last.toDouble(), - ).also { removedCount -> - if (removedCount > 0) { - logger.debug { - "$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]" - } - } - } - } - } - - private fun removeTheHeadBySize( - aggregateId: AggregateId, - lastVersion: Int, - ) { - (lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo)) - .toDouble() - .let { - jedis - .zremrangeByScore( - projectionClass.redisKey(aggregateId), - 2.0, - it, - ).also { removedCount -> - if (removedCount > 0) { - logger.info { - "$removedCount snapshot removed Size(${snapshotCacheConfig.maxSnapshotCacheSize}) (1.0 to $it) [lastVersion=$lastVersion]" - } - } - } - } - } -} - -val

> KClass

.redisKeySearchList: String get() { - return "projection:$simpleName:*" -} - -val

> P.redisKey: String get() { - return "projection:${this::class.simpleName}:${aggregateId.id}" -} - -fun

, A : AggregateId> KClass

.redisKey(aggregateId: A): String = - "projection:$simpleName:${aggregateId.id}" - -val

> KClass

.redisKey: String get() = - "projection:$simpleName" diff --git a/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt b/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt deleted file mode 100644 index 33c9ddf..0000000 --- a/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt +++ /dev/null @@ -1,26 +0,0 @@ -package eventDemo.libs.event.projection - -import kotlin.time.Duration -import kotlin.time.Duration.Companion.minutes - -data class SnapshotConfig( - /** - * Keep snapshot when is on the head of the queue cache - */ - val maxSnapshotCacheSize: Int = 20, - /** - * Keep snapshot when is newer of - * - * snapshot.date > now + maxSnapshotCacheTtl - */ - val maxSnapshotCacheTtl: Duration = 10.minutes, - /** - * Keep snapshot when version is this modulo - * - * snapshot.lastVersion % modulo == 1 - */ - val modulo: Int = 10, - val enabled: Boolean = true, -) - -val DISABLED_CONFIG = SnapshotConfig(Int.MAX_VALUE, Duration.INFINITE, Int.MAX_VALUE, enabled = false) diff --git a/src/test/kotlin/eventDemo/Helpers.kt b/src/test/kotlin/eventDemo/Helpers.kt index 414988f..e05220d 100644 --- a/src/test/kotlin/eventDemo/Helpers.kt +++ b/src/test/kotlin/eventDemo/Helpers.kt @@ -6,11 +6,17 @@ import eventDemo.business.entity.Deck import eventDemo.configuration.business.configureGameListener import eventDemo.configuration.injection.appKoinModule import eventDemo.configuration.ktor.configuration +import io.github.oshai.kotlinlogging.KotlinLogging +import io.kotest.engine.runBlocking import io.ktor.server.config.ApplicationConfig import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.testApplication import io.ktor.utils.io.KtorDsl +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch import org.koin.core.Koin +import org.koin.core.context.startKoin import org.koin.core.module.KoinApplicationDslMarker import org.koin.dsl.koinApplication import org.koin.ktor.ext.getKoin @@ -39,6 +45,7 @@ fun testApplicationWithConfig( configBuilder: Koin.() -> Unit = {}, block: suspend ApplicationTestBuilder.() -> Unit, ) { + val logger = KotlinLogging.logger {} testApplication { val conf = ApplicationConfig("application.conf") environment { @@ -46,11 +53,19 @@ fun testApplicationWithConfig( } application { + logger.info { "Config App" } val koin = getKoin() koin.cleanDataTest() - configBuilder(koin) + runCatching { + logger.info { "Starting A" } + configBuilder(koin) + logger.info { "A finish" } + + } } - block() + logger.info { "Starting B" } + this@testApplication.block() + logger.info { "B finish" } } } diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt index 87a85bc..0f91679 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt @@ -8,6 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.projection.GameList import eventDemo.testApplicationWithConfig +import io.github.oshai.kotlinlogging.KotlinLogging import io.kotest.assertions.nondeterministic.eventually import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain @@ -19,18 +20,20 @@ import io.ktor.client.request.get import io.ktor.client.statement.bodyAsText import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlin.test.assertEquals import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds +val logger = KotlinLogging.logger {} class GameListRouteTest : FunSpec({ test("/games with no game started") { testApplicationWithConfig { val player1 = Player(name = "Nikola") - + logger.info { "Starting player1" } httpClient() .get("/games") { withAuth(player1) @@ -48,16 +51,14 @@ class GameListRouteTest : val player1 = Player(name = "Nikola") testApplicationWithConfig( { - runBlocking { - get() - .handle(gameId) { - NewPlayerEvent(gameId, player1, it) - } - } + get() + .handle(gameId) { + NewPlayerEvent(gameId, player1, it) + } }, ) { // Wait until the projection is created - eventually(1.seconds) { + eventually(3.seconds) { httpClient() .get("/games") { withAuth(player1) @@ -81,7 +82,6 @@ class GameListRouteTest : val player2 = Player(name = "Einstein") testApplicationWithConfig({ val eventHandler = get() - runBlocking { eventHandler.handle(gameId) { NewPlayerEvent(gameId, player1, it) } eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } @@ -93,7 +93,6 @@ class GameListRouteTest : it, shuffleIsDisabled = true, ) - } } }) { eventually(1.seconds) { diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index 5a691d1..88fc3b8 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -1,7 +1,6 @@ package eventDemo.adapter.interfaceLayer.query import eventDemo.Tag -import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.command.GameCommand import eventDemo.business.command.command.IWantToJoinTheGameCommand @@ -10,9 +9,9 @@ import eventDemo.business.command.command.IamReadyToPlayCommand import eventDemo.business.entity.Card import eventDemo.business.entity.GameId import eventDemo.business.entity.Player -import eventDemo.business.event.GameEventStore import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.projection.GameState +import eventDemo.business.event.projection.GameStateRepository import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.ItsTheTurnOfNotification @@ -61,9 +60,9 @@ class GameSimulationTest : var player1HasJoin = false testKoinApplicationWithConfig { - val commandHandler by inject() - val eventStore by inject() - val playerNotificationListener by inject() + val commandHandler = get() + val playerNotificationListener = get() + val gameStateRepository = get() // Run command handler // In the normal process, these handlers is invoque players connect to the websocket @@ -76,8 +75,8 @@ class GameSimulationTest : } } - // Consume etch notification of players, and put theses in list. - // is used later to control when other players can be executing the next action + // Consume etch notification of players, and put theses in a list. + // Is used later to control when other players can execute the next action val player1Notifications = mutableListOf() val player2Notifications = mutableListOf() run { @@ -94,7 +93,7 @@ class GameSimulationTest : } } - // The player 1 actions + // Player 1 actions val player1Job = launch { playerNotificationListener.startListening(player1, gameId) { @@ -132,11 +131,11 @@ class GameSimulationTest : } } - // The player 2 actions + // Player 2 actions val player2Job = launch { - // wait the player 1 has join the game - until(1.seconds) { player1HasJoin } + // wait player 1 has joined the game + until(3.seconds) { player1HasJoin } playerNotificationListener.startListening(player2, gameId) { channelNotification2.trySendBlocking(it) @@ -176,7 +175,7 @@ class GameSimulationTest : joinAll(player1Job, player2Job) // Build the last state from the event store - val state = GameStateRepositoryInMemory(eventStore = eventStore).getLast(gameId) + val state = gameStateRepository.get(gameId) // Check if the state is correct state.aggregateId shouldBeEqual gameId @@ -192,6 +191,6 @@ class GameSimulationTest : }) private suspend inline fun MutableList.waitNotification(crossinline block: T.() -> Boolean): T = - eventually(1.seconds) { + eventually(3.seconds) { filterIsInstance().first { block(it) } } diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt index 9718347..98c5d90 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt @@ -11,7 +11,9 @@ import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameStateRepository import eventDemo.testApplicationWithConfig +import io.github.oshai.kotlinlogging.KotlinLogging import io.kotest.assertions.nondeterministic.eventually +import io.kotest.assertions.nondeterministic.until import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual @@ -53,7 +55,6 @@ class GameStateRouteTest : val gameId = GameId() val player1 = Player(name = "Nikola") val player2 = Player(name = "Einstein") - var lastPlayedCard: Card? = null testApplicationWithConfig({ disableShuffleDeck() val eventHandler = get() @@ -64,9 +65,8 @@ class GameStateRouteTest : eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } - lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } - assertNotNull(lastPlayedCard) - .let { assertIs(lastPlayedCard) } + val lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() } + assertIs(lastPlayedCard) .let { it.number shouldBeEqual 0 it.color shouldBeEqual Card.Color.Red @@ -74,32 +74,36 @@ class GameStateRouteTest : eventHandler.handle(gameId) { CardIsPlayedEvent( gameId, - assertNotNull(lastPlayedCard), + lastPlayedCard, player1, it, ) } + until(3.seconds) { + stateRepo + .get(gameId) + .deck.discard + .last() == lastPlayedCard + } } }) { - eventually(1.seconds) { - httpClient() - .get("/games/$gameId/state") { - withAuth(player1) - accept(ContentType.Application.Json) - }.apply { - assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) - call.body().apply { - aggregateId shouldBeEqual gameId - players shouldHaveSize 2 - isStarted shouldBeEqual true - assertIs(lastEvent) - readyPlayers shouldBeEqual setOf(player1, player2) - direction shouldBeEqual GameState.Direction.CLOCKWISE - assertNotNull(lastCardPlayer) shouldBeEqual player1 - assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red - } + httpClient() + .get("/games/$gameId/state") { + withAuth(player1) + accept(ContentType.Application.Json) + }.apply { + assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) + call.body().apply { + aggregateId shouldBeEqual gameId + players shouldHaveSize 2 + isStarted shouldBeEqual true + assertIs(lastEvent) + readyPlayers shouldBeEqual setOf(player1, player2) + direction shouldBeEqual GameState.Direction.CLOCKWISE + assertNotNull(lastCardPlayer) shouldBeEqual player1 + assertNotNull(colorOnCurrentStack) shouldBeEqual Card.Color.Red } - } + } } } @@ -118,9 +122,8 @@ class GameStateRouteTest : eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } - lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } - assertNotNull(lastPlayedCard) - .let { assertIs(lastPlayedCard) } + lastPlayedCard = eventually(3.seconds) { stateRepo.get(gameId).playableCards(player1).first() } + assertIs(lastPlayedCard) .let { it.number shouldBeEqual 0 it.color shouldBeEqual Card.Color.Red @@ -133,18 +136,23 @@ class GameStateRouteTest : it, ) } + + until(3.seconds) { + stateRepo + .get(gameId) + .deck.discard + .last() == lastPlayedCard + } } }) { - eventually(1.seconds) { - httpClient() - .get("/games/$gameId/card/last") { - withAuth(player1) - accept(ContentType.Application.Json) - }.apply { - assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) - assertEquals(assertNotNull(lastPlayedCard), call.body()) - } - } + httpClient() + .get("/games/$gameId/card/last") { + withAuth(player1) + accept(ContentType.Application.Json) + }.apply { + assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) + assertEquals(assertNotNull(lastPlayedCard), call.body()) + } } } }) diff --git a/src/test/kotlin/eventDemo/business/event/GameEventHandlerTest.kt b/src/test/kotlin/eventDemo/business/event/GameEventHandlerTest.kt index 19e3ac9..911db7e 100644 --- a/src/test/kotlin/eventDemo/business/event/GameEventHandlerTest.kt +++ b/src/test/kotlin/eventDemo/business/event/GameEventHandlerTest.kt @@ -7,6 +7,8 @@ import eventDemo.business.entity.Player import eventDemo.business.event.event.GameEvent import eventDemo.business.event.event.NewPlayerEvent import eventDemo.libs.event.VersionBuilderLocal +import eventDemo.testApplicationWithConfig +import io.kotest.assertions.nondeterministic.until import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual @@ -16,6 +18,7 @@ import io.mockk.spyk import io.mockk.verify import kotlin.test.assertIs import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.seconds class GameEventHandlerTest : FunSpec({ diff --git a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt index 5a5780c..4a533a2 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt @@ -2,7 +2,6 @@ package eventDemo.business.event.projection import ch.qos.logback.classic.Level import com.rabbitmq.client.impl.ForgivingExceptionHandler -import com.zaxxer.hikari.pool.ProxyConnection import eventDemo.Tag import eventDemo.business.command.GameCommandHandler import eventDemo.business.entity.GameId @@ -17,7 +16,6 @@ import io.kotest.common.KotestInternal import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual -import io.kotest.matchers.shouldBe import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll @@ -41,13 +39,10 @@ class GameStateRepositoryTest : val eventHandler = get() eventHandler .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } - .also { event -> + .also { // Wait until the projection is created eventually(1.seconds) { - assertNotNull(repo.getUntil(event)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) - } - assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(repo.get(aggregateId)).also { assertNotNull(it.players) shouldBeEqual setOf(player1) } } @@ -68,9 +63,9 @@ class GameStateRepositoryTest : var state: GameState? = null projectionBus.subscribe { - repo.getLast(aggregateId).also { - state = it - } + repo + .get(aggregateId) + .also { state = it } } eventHandler @@ -86,7 +81,7 @@ class GameStateRepositoryTest : .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .also { eventually(1.seconds) { - assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(repo.get(aggregateId)).also { assertNotNull(it.players) shouldBeEqual setOf(player1, player2) } } @@ -95,44 +90,7 @@ class GameStateRepositoryTest : } } - test("getUntil should build the state until the event") { - withLogLevel( - GameCommandHandler::class.java.name to Level.ERROR, - ForgivingExceptionHandler::class.java.name to Level.OFF, - ProxyConnection::class.java.name to Level.OFF, - Logger.ROOT_LOGGER_NAME to Level.INFO, - ) { - repeat(10) { - val aggregateId = GameId() - testKoinApplicationWithConfig { - val repo = get() - val eventHandler = get() - - val event1 = - eventHandler - .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } - .also { event1 -> - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) - } - } - - eventHandler - .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } - .also { event2 -> - assertNotNull(repo.getUntil(event2)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1, player2) - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) - } - } - } - } - } - } - - test("getUntil should be concurrently secure").config(tags = setOf(Tag.Concurrence)) { + test("get should be concurrently secure").config(tags = setOf(Tag.Concurrence)) { withLogLevel( Logger.ROOT_LOGGER_NAME to Level.ERROR, ForgivingExceptionHandler::class.java.name to Level.OFF, @@ -167,17 +125,12 @@ class GameStateRepositoryTest : includeFirst = false }, ) { - repo.getLast(aggregateId).run { + repo.get(aggregateId).run { lastEventVersion shouldBeEqual 200 players shouldHaveSize 200 } - repo.count(aggregateId) shouldBe 21 } } } } - - xtest("get should be concurrently secure") { - tags(Tag.Concurrence) - } }) diff --git a/src/test/kotlin/eventDemo/libs/event/EventHandlerTest.kt b/src/test/kotlin/eventDemo/libs/event/EventHandlerTest.kt new file mode 100644 index 0000000..fa1c2a3 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/event/EventHandlerTest.kt @@ -0,0 +1,38 @@ +package eventDemo.libs.event + +import eventDemo.libs.bus.Bus +import eventDemo.libs.bus.BusInMemory +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.equals.shouldBeEqual + +class EventHandlerTest : + FunSpec({ + test("EventHandler::handle should returns the built event") { + val eventBus: Bus = BusInMemory() + val eventStore: EventStore = EventStoreInMemory() + val versionBuilder: VersionBuilder = VersionBuilderLocal() + val aggregateId: IdTest = IdTest() + val handler = + EventHandlerImpl( + eventBus, + eventStore, + versionBuilder, + ) + + // When + val event = + handler.handle(aggregateId) { + EventXTest(aggregateId = aggregateId, version = it, num = 1) + } + + // Then + event.aggregateId shouldBeEqual aggregateId + event.version shouldBeEqual 1 + } + + xtest("EventHandler::handle should publish the event into the store") + + xtest("EventHandler::handle should publish the event into the bus") + + xtest("EventHandler::handle should publish the event into the bus in incremental order") + }) diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryTest.kt similarity index 60% rename from src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt rename to src/test/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryTest.kt index 15eae7c..ad921b1 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/projection/ProjectionRepositoryTest.kt @@ -1,4 +1,4 @@ -package eventDemo.business.event.projection +package eventDemo.libs.event.projection import eventDemo.cleanProjections import eventDemo.configuration.serializer.UUIDSerializer @@ -7,12 +7,6 @@ import eventDemo.libs.event.Event import eventDemo.libs.event.EventStore import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.VersionBuilderLocal -import eventDemo.libs.event.projection.DISABLED_CONFIG -import eventDemo.libs.event.projection.Projection -import eventDemo.libs.event.projection.ProjectionSnapshotRepository -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis -import eventDemo.libs.event.projection.SnapshotConfig import io.kotest.assertions.nondeterministic.continually import io.kotest.core.spec.style.FunSpec import io.kotest.datatest.withData @@ -22,6 +16,7 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.datetime.Clock import kotlinx.datetime.Instant import kotlinx.serialization.Serializable @@ -34,14 +29,14 @@ import kotlin.test.assertNotNull import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) -class ProjectionSnapshotRepositoryTest : +class ProjectionRepositoryTest : FunSpec({ data class TestData( val store: EventStore, - val snapshotRepo: ProjectionSnapshotRepository, + val repository: ProjectionRepository, ) : WithDataTestName { override fun dataTestName(): String = - "${snapshotRepo::class.simpleName} with ${store::class.simpleName}" + "${repository::class.simpleName} with ${store::class.simpleName}" } val eventStores = @@ -50,48 +45,40 @@ class ProjectionSnapshotRepositoryTest : ) val projectionRepo = listOf( - ::getSnapshotRepoInMemoryTest, - ::getSnapshotRepoInRedisTest, + ::getRepoInMemoryTest, + ::getRepoInRedisTest, ) val list = eventStores.flatMap { store -> projectionRepo.map { repo -> - store().let { store -> TestData(store, repo(store, DISABLED_CONFIG)) } + TestData(store(), repo()) } } - context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { + context("when call applyAndSave, the projection should be built and save to the repository") { withData(list) { (eventStore, repo) -> val aggregateId = IdTest() val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) - eventStore.publish(eventOther) - repo.applyAndPutToCache(eventOther) - assertNotNull(repo.getUntil(eventOther)).also { +// eventStore.publish(eventOther) + val p = repo.applyAndSave(eventOther) + println(p) + assertNotNull(repo.get(eventOther.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "valOther" } val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) - eventStore.publish(event1) - repo.applyAndPutToCache(event1) - assertNotNull(repo.getLast(event1.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event1)).also { +// eventStore.publish(event1) + repo.applyAndSave(event1) + assertNotNull(repo.get(event1.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "val1" } val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) - eventStore.publish(event2) - repo.applyAndPutToCache(event2) - assertNotNull(repo.getLast(event2.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1val2" - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event2)).also { +// eventStore.publish(event2) + repo.applyAndSave(event2) + assertNotNull(repo.get(event2.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "val1val2" } } @@ -104,18 +91,18 @@ class ProjectionSnapshotRepositoryTest : val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId) eventStore.publish(eventOther) - repo.applyAndPutToCache(eventOther) - assertNotNull(repo.getUntil(eventOther)).also { + repo.applyAndSave(eventOther) + assertNotNull(repo.get(eventOther.aggregateId)).also { assertNotNull(it.value) shouldBeEqual "valOther" } val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) eventStore.publish(event1) - repo.applyAndPutToCache(event1) + repo.applyAndSave(event1) val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) eventStore.publish(event2) - repo.applyAndPutToCache(event2) + repo.applyAndSave(event2) repo.getList().apply { any { it.aggregateId == otherAggregateId } shouldBeEqual true @@ -128,7 +115,7 @@ class ProjectionSnapshotRepositoryTest : } } - context("ProjectionSnapshotRepository should be thread safe") { + context("ProjectionRepository should be thread safe") { continually(1.seconds) { withData(list) { (eventStore, repo) -> val aggregateId = IdTest() @@ -137,43 +124,24 @@ class ProjectionSnapshotRepositoryTest : (0..9) .map { GlobalScope.launch { - (1..10).forEach { - val eventX = - lock.withLock { - EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStore.publish(it) } + repeat(10) { + lock.withLock { + runBlocking { + EventXTest( + num = 1, + version = versionBuilder.buildNextVersion(aggregateId), + aggregateId = aggregateId, + ).also { repo.applyAndSave(it) } } - repo.applyAndPutToCache(eventX) + } } } }.joinAll() - assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 - assertNotNull(repo.count(aggregateId)) shouldBeEqual 100 + assertNotNull(repo.get(aggregateId)).lastEventVersion shouldBeEqual 100 + assertNotNull(repo.get(aggregateId)).num shouldBeEqual 100 } } } - - context("removeOldSnapshot") { - withData(list) { (eventStore, repo) -> - val versionBuilder = VersionBuilderLocal() - val aggregateId = IdTest() - - suspend fun buildEndSendEventX() { - EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStore.publish(it) } - .also { repo.applyAndPutToCache(it) } - } - - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 1 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 2 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 3 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 4 - } - } }) @JvmInline @@ -217,28 +185,18 @@ private data class EventXTest( val num: Int, ) : TestEvents -private fun getSnapshotRepoInMemoryTest( - eventStore: EventStore, - snapshotConfig: SnapshotConfig, -): ProjectionSnapshotRepository = - ProjectionSnapshotRepositoryInMemory( - eventStore = eventStore, +private fun getRepoInMemoryTest(): ProjectionRepository = + ProjectionRepositoryInMemory( initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, - snapshotCacheConfig = snapshotConfig, applyToProjection = apply, ) -private fun getSnapshotRepoInRedisTest( - eventStore: EventStore, - snapshotConfig: SnapshotConfig, -): ProjectionSnapshotRepository { +private fun getRepoInRedisTest(): ProjectionRepository { val jedis = JedisPooled("redis://localhost:6379") jedis.cleanProjections() - return ProjectionSnapshotRepositoryInRedis( - eventStore = eventStore, + return ProjectionRepositoryInRedis( jedis = jedis, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, - snapshotCacheConfig = snapshotConfig, projectionClass = ProjectionTest::class, projectionToJson = { Json.encodeToString(it) }, jsonToProjection = { Json.decodeFromString(it) },