From 442379dc492886cd93456ca040acb211b52eb3cc Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Wed, 26 Mar 2025 03:36:49 +0100 Subject: [PATCH] fix and improve ProjectionSnapshotRepositoryInRedis --- .../projection/GameListRepositoryInMemory.kt | 9 +- .../projection/GameStateRepositoryInMemory.kt | 12 +- .../projection/GameStateRepositoryInRedis.kt | 12 +- .../eventDemo/business/event/EventHandler.kt | 2 +- .../business/event/GameEventHandler.kt | 2 +- .../gameState/GameStateRepository.kt | 2 + .../projectionListener/ReactionListener.kt | 6 +- src/main/kotlin/eventDemo/libs/bus/Bus.kt | 6 +- .../kotlin/eventDemo/libs/bus/BusInMemory.kt | 16 +- .../libs/event/EventStoreInMemory.kt | 5 +- .../eventDemo/libs/event/EventStream.kt | 8 + .../ProjectionSnapshotRepository.kt | 9 +- .../ProjectionSnapshotRepositoryInMemory.kt | 46 ++++-- .../ProjectionSnapshotRepositoryInRedis.kt | 153 +++++++++++------- .../libs/event/projection/SnapshotConfig.kt | 3 + .../interfaceLayer/query/GameListRouteTest.kt | 29 ++-- .../projection/GameStateRepositoryTest.kt | 54 +++++-- .../ProjectionSnapshotRepositoryTest.kt | 60 +++---- 18 files changed, 283 insertions(+), 151 deletions(-) diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt index 38ce181..b1684e8 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInMemory.kt @@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameList.apply import eventDemo.business.event.projection.gameState.GameState import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.SnapshotConfig +import io.github.oshai.kotlinlogging.withLoggingContext /** * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. @@ -30,9 +31,11 @@ class GameListRepositoryInMemory( init { eventBus.subscribe { event -> - projectionsSnapshot - .applyAndPutToCache(event) - .also { projectionBus.publish(it) } + withLoggingContext("event" to event.toString()) { + projectionsSnapshot + .applyAndPutToCache(event) + .also { projectionBus.publish(it) } + } } } diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt index adde094..1aedcc1 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInMemory.kt @@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.business.event.projection.gameState.apply import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.SnapshotConfig +import io.github.oshai.kotlinlogging.withLoggingContext /** * Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus]. @@ -31,9 +32,11 @@ class GameStateRepositoryInMemory( init { // On new event was received, build snapshot and publish it to the projection bus eventBus.subscribe { event -> - projectionsSnapshot - .applyAndPutToCache(event) - .also { projectionBus.publish(it) } + withLoggingContext("event" to event.toString()) { + projectionsSnapshot + .applyAndPutToCache(event) + .also { projectionBus.publish(it) } + } } } @@ -53,4 +56,7 @@ class GameStateRepositoryInMemory( */ override fun getUntil(event: GameEvent): GameState = projectionsSnapshot.getUntil(event) + + override fun count(gameId: GameId): Int = + projectionsSnapshot.count(gameId) } diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt index e2836a3..ad429a1 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt @@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.business.event.projection.gameState.apply import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.SnapshotConfig +import io.github.oshai.kotlinlogging.withLoggingContext import kotlinx.serialization.json.Json import redis.clients.jedis.UnifiedJedis @@ -38,9 +39,11 @@ class GameStateRepositoryInRedis( init { // On new event was received, build snapshot and publish it to the projection bus eventBus.subscribe { event -> - projectionsSnapshot - .applyAndPutToCache(event) - .also { projectionBus.publish(it) } + withLoggingContext("event" to event.toString()) { + projectionsSnapshot + .applyAndPutToCache(event) + .also { projectionBus.publish(it) } + } } } @@ -60,4 +63,7 @@ class GameStateRepositoryInRedis( */ override fun getUntil(event: GameEvent): GameState = projectionsSnapshot.getUntil(event) + + override fun count(gameId: GameId): Int = + projectionsSnapshot.count(gameId) } diff --git a/src/main/kotlin/eventDemo/business/event/EventHandler.kt b/src/main/kotlin/eventDemo/business/event/EventHandler.kt index 178afd9..f3be01c 100644 --- a/src/main/kotlin/eventDemo/business/event/EventHandler.kt +++ b/src/main/kotlin/eventDemo/business/event/EventHandler.kt @@ -9,7 +9,7 @@ import eventDemo.libs.event.Event interface EventHandler, ID : AggregateId> { fun registerProjectionBuilder(builder: (event: E) -> Unit) - fun handle( + suspend fun handle( aggregateId: ID, buildEvent: (version: Int) -> E, ): E diff --git a/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt index 0679f11..ddabfa0 100644 --- a/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/business/event/GameEventHandler.kt @@ -27,7 +27,7 @@ class GameEventHandler( /** * Build Event then send it to the event store and bus. */ - override fun handle( + override suspend fun handle( aggregateId: GameId, buildEvent: (version: Int) -> GameEvent, ): GameEvent = diff --git a/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt b/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt index a4a522f..1cdf721 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/gameState/GameStateRepository.kt @@ -7,4 +7,6 @@ interface GameStateRepository { fun getLast(gameId: GameId): GameState fun getUntil(event: GameEvent): GameState + + fun count(gameId: GameId): Int } diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt index b814c7f..a1913e4 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt @@ -33,11 +33,11 @@ class ReactionListener( } } } else { - logger.error { "${this::class.java.simpleName} is already init for this bus" } + logger.error { "${this::class.simpleName} is already init for this bus" } } } - private fun sendStartGameEvent(state: GameState) { + private suspend fun sendStartGameEvent(state: GameState) { if (state.isReady && !state.isStarted) { val reactionEvent = eventHandler.handle(state.aggregateId) { @@ -54,7 +54,7 @@ class ReactionListener( } } - private fun sendWinnerEvent(state: GameState) { + private suspend fun sendWinnerEvent(state: GameState) { val winner = state.playerHasNoCardLeft().firstOrNull() if (winner != null) { val reactionEvent = diff --git a/src/main/kotlin/eventDemo/libs/bus/Bus.kt b/src/main/kotlin/eventDemo/libs/bus/Bus.kt index f98d2fd..4c4f030 100644 --- a/src/main/kotlin/eventDemo/libs/bus/Bus.kt +++ b/src/main/kotlin/eventDemo/libs/bus/Bus.kt @@ -1,13 +1,13 @@ package eventDemo.libs.bus -interface Bus { - fun publish(item: E) +interface Bus { + suspend fun publish(item: T) /** * @param priority The higher the priority, the more it will be called first */ fun subscribe( priority: Int = 0, - block: suspend (E) -> Unit, + block: suspend (T) -> Unit, ) } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt index 66f1fa3..c71655f 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt @@ -1,21 +1,21 @@ package eventDemo.libs.bus import io.github.oshai.kotlinlogging.withLoggingContext -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.coroutineScope class BusInMemory : Bus { private val subscribers: MutableList Unit>> = mutableListOf() - override fun publish(item: E) { - subscribers - .sortedByDescending { (priority, _) -> priority } - .forEach { (_, block) -> - runBlocking { - withLoggingContext("busItem" to item.toString()) { + override suspend fun publish(item: E) { + withLoggingContext("busItem" to item.toString()) { + subscribers + .sortedByDescending { (priority, _) -> priority } + .forEach { (_, block) -> + coroutineScope { block(item) } } - } + } } override fun subscribe( diff --git a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt index 2e0ec52..d9d8f93 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStoreInMemory.kt @@ -1,5 +1,6 @@ package eventDemo.libs.event +import io.github.oshai.kotlinlogging.withLoggingContext import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -10,5 +11,7 @@ class EventStoreInMemory, ID : AggregateId> : EventStore { streams.computeIfAbsent(aggregateId) { EventStreamInMemory() } override fun publish(event: E) = - getStream(event.aggregateId).publish(event) + withLoggingContext("event" to event.toString()) { + getStream(event.aggregateId).publish(event) + } } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index 2e7991e..e38a342 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -1,5 +1,7 @@ package eventDemo.libs.event +import eventDemo.libs.event.projection.Projection + /** * Interface representing an event stream for publishing and reading domain events */ @@ -17,5 +19,11 @@ interface EventStream> { fun readVersionBetween(version: IntRange): Set + fun

> readVersionBetween( + projection: P?, + event: E, + ): Set = + readVersionBetween(((projection?.lastEventVersion ?: 0) + 1)..event.version) + fun getByVersion(version: Int): E? } diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt index b14d363..b3f9fcf 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt @@ -9,10 +9,17 @@ interface ProjectionSnapshotRepository, P : Projection, ID : A */ fun applyAndPutToCache(event: E): P + fun count(aggregateId: ID): Int + + fun countAll(): Int + /** * Build the list of all [Projections][Projection] */ - fun getList(): List

+ fun getList( + limit: Int = 100, + offset: Int = 0, + ): List

/** * Build the last version of the [Projection] from the cache. diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt index 3ab7850..be85e49 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -18,6 +18,7 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID private val applyToProjection: P.(event: E) -> P, ) : ProjectionSnapshotRepository { private val projectionsSnapshot: ConcurrentHashMap>> = ConcurrentHashMap() + private val logger = KotlinLogging.logger { } /** * Create a snapshot for the event @@ -32,17 +33,30 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID override fun applyAndPutToCache(event: E): P = getUntil(event) .also { - save(it) - removeOldSnapshot(it.aggregateId) + withLoggingContext("projection" to it.toString()) { + save(it) + removeOldSnapshot(it.aggregateId) + } } + override fun count(aggregateId: ID): Int = + projectionsSnapshot[aggregateId]?.count() ?: 0 + + override fun countAll(): Int = + projectionsSnapshot.mappingCount().toInt() + /** * Build the list of all [Projections][Projection] */ - override fun getList(): List

= - projectionsSnapshot.map { (id, b) -> - getLast(id) - } + override fun getList( + limit: Int, + offset: Int, + ): List

= + projectionsSnapshot + .map { (id, b) -> + getLast(id) + }.drop(offset) + .take(limit) /** * Build the last version of the [Projection] from the cache. @@ -75,7 +89,8 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID val missingEventOfSnapshot = eventStore .getStream(event.aggregateId) - .readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version) + // take the last snapshot version +1 to event version + .readVersionBetween(lastSnapshot, event) return if (lastSnapshot?.lastEventVersion == event.version) { lastSnapshot @@ -91,12 +106,14 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID */ private fun removeOldSnapshot(aggregateId: ID) { projectionsSnapshot[aggregateId]?.let { queue -> - queue - .excludeFirstAndLast() - .excludeTheHeadBySize() - .excludeNewerByDate() - .excludeByModulo() - .forEach { queue.remove(it) } + if (snapshotCacheConfig.enabled) { + queue + .excludeFirstAndLast() + .excludeTheHeadBySize() + .excludeNewerByDate() + .excludeByModulo() + .forEach { queue.remove(it) } + } } } @@ -153,6 +170,7 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID projectionsSnapshot .computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() } .add(Pair(projection, Clock.System.now())) + .also { logger.info { "Projection saved" } } } /** @@ -200,7 +218,7 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID if (event.version == lastEventVersion + 1) { applyToProjection(event) } else if (event.version <= lastEventVersion) { - KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." } + KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." } this } else { error("The version of the event must follow directly after the version of the projection.") diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt index 928b82e..caae5a6 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt @@ -7,7 +7,6 @@ import eventDemo.libs.toRanges import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.withLoggingContext import redis.clients.jedis.UnifiedJedis -import redis.clients.jedis.params.ScanParams import redis.clients.jedis.params.SortingParams import kotlin.reflect.KClass @@ -21,6 +20,8 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID private val jsonToProjection: (String) -> P, private val applyToProjection: P.(event: E) -> P, ) : ProjectionSnapshotRepository { + val logger = KotlinLogging.logger { } + /** * Create a snapshot for the event * @@ -34,22 +35,33 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID override fun applyAndPutToCache(event: E): P = getUntil(event) .also { - save(it) - removeOldSnapshot(it.aggregateId, event.version) + withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) { + save(it) + removeOldSnapshot(it.aggregateId, event.version) + } } + override fun count(aggregateId: ID): Int = + jedis.zcount(projectionClass.redisKey(aggregateId), Double.MIN_VALUE, Double.MAX_VALUE).toInt() + + override fun countAll(): Int = + jedis.zcount(projectionClass.redisKey, Double.MIN_VALUE, Double.MAX_VALUE).toInt() + /** * Get the list of all [Projections][Projection] */ - override fun getList(): List

= + override fun getList( + limit: Int, + offset: Int, + ): List

= jedis - .scan( - "0", - ScanParams() - .match(projectionClass.redisKeySearchListLatest) - .count(100), - ).result - .map { jsonToProjection(it) } + .sort( + projectionClass.redisKeySearchList, + SortingParams() + .desc() + .by("score") + .limit(limit, offset), + ).map { jsonToProjection(it) } /** * Get the last version of the [Projection] from the cache. @@ -60,7 +72,13 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID */ override fun getLast(aggregateId: ID): P = jedis - .get(projectionClass.redisKeyLatest(aggregateId)) + .sort( + projectionClass.redisKey(aggregateId), + SortingParams() + .desc() + .by("score") + .limit(0, 1), + ).firstOrNull() ?.let(jsonToProjection) ?: initialStateBuilder(aggregateId) @@ -76,22 +94,27 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID override fun getUntil(event: E): P { val lastSnapshot = jedis - .sort( + .zrangeByScore( projectionClass.redisKey(event.aggregateId), - SortingParams() - .desc() - .by("score") - .limit(0, 1), + 1.0, + event.version.toDouble(), + 0, + 1, ).firstOrNull() ?.let(jsonToProjection) if (lastSnapshot?.lastEventVersion == event.version) { return lastSnapshot } + if (lastSnapshot != null && lastSnapshot.lastEventVersion > event.version) { + logger.error { "Cannot be apply event on more recent snapshot" } + error("Cannot be apply event on more recent snapshot") + } val missingEventOfSnapshot = eventStore .getStream(event.aggregateId) - .readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version) + // take the last snapshot version +1 to event version + .readVersionBetween(lastSnapshot, event) return if (lastSnapshot?.lastEventVersion == event.version) { lastSnapshot @@ -101,9 +124,16 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID } private fun save(projection: P) { - jedis.zadd(projection.redisKeyVersion, projection.lastEventVersion.toDouble(), projectionToJson(projection)) - jedis.expire(projection.redisKeyVersion, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) - jedis.set(projection.redisKeyLatest, projectionToJson(projection)) + repeat(5) { + val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection)) + if (added < 1) { + logger.error { "Projection NOT saved" } + } else { + logger.info { "Projection saved" } + return + } + } + jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) } /** @@ -123,7 +153,7 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID if (event.version == lastEventVersion + 1) { applyToProjection(event) } else if (event.version <= lastEventVersion) { - KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." } + KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." } this } else { error("The version of the event must follow directly after the version of the projection.") @@ -131,65 +161,74 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID } } - private fun removeOldSnapshot( + fun removeOldSnapshot( aggregateId: AggregateId, lastVersion: Int, ) { - removeByModulo(aggregateId, lastVersion) - removeTheHeadBySize(aggregateId) + if (snapshotCacheConfig.enabled) { + removeByModulo(aggregateId, lastVersion) + removeTheHeadBySize(aggregateId, lastVersion) + } } private fun removeByModulo( aggregateId: AggregateId, lastVersion: Int, ) { - (lastVersion - snapshotCacheConfig.maxSnapshotCacheSize) - .let { if (it < 0) 0 else it } + (lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo)) + .let { if (it < 2) 2 else it } .let { IntRange(it, lastVersion - 1) } .filter { (it % snapshotCacheConfig.modulo) != 1 } .toRanges() .map { - jedis.zremrangeByScore( - projectionClass.redisKey(aggregateId), - it.min().toDouble(), - it.max().toDouble(), - ) + jedis + .zremrangeByScore( + projectionClass.redisKey(aggregateId), + it.first.toDouble(), + it.last.toDouble(), + ).also { removedCount -> + if (removedCount > 0) { + logger.info { + "$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]" + } + } + } } } - private fun removeTheHeadBySize(aggregateId: AggregateId) { - val size = - jedis.zcount( - projectionClass.redisKey(aggregateId), - Double.MIN_VALUE, - Double.MAX_VALUE, - ) - - LongRange((size - snapshotCacheConfig.maxSnapshotCacheSize), size) + private fun removeTheHeadBySize( + aggregateId: AggregateId, + lastVersion: Int, + ) { + (lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo)) + .toDouble() .let { - jedis.zremrangeByRank( - projectionClass.redisKey(aggregateId), - 1, - it.max(), - ) + jedis + .zremrangeByScore( + projectionClass.redisKey(aggregateId), + 2.0, + it, + ).also { removedCount -> + if (removedCount > 0) { + logger.info { + "$removedCount snapshot removed Size(${snapshotCacheConfig.maxSnapshotCacheSize}) (1.0 to $it) [lastVersion=$lastVersion]" + } + } + } } } } -val

> KClass

.redisKeySearchListLatest: String get() { - return "projection:$simpleName:*:latest" +val

> KClass

.redisKeySearchList: String get() { + return "projection:$simpleName:*" } -val

> P.redisKeyVersion: String get() { - return "projection:${this::class.simpleName}:${aggregateId.id}:$lastEventVersion" +val

> P.redisKey: String get() { + return "projection:${this::class.simpleName}:${aggregateId.id}" } -val

> P.redisKeyLatest: String get() { - return "projection:${this::class.simpleName}:${aggregateId.id}:latest" -} - -fun > KClass

.redisKeyLatest(aggregateId: A): String = - "projection:$simpleName:${aggregateId.id}:latest" - fun

, A : AggregateId> KClass

.redisKey(aggregateId: A): String = "projection:$simpleName:${aggregateId.id}" + +val

> KClass

.redisKey: String get() = + "projection:$simpleName" diff --git a/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt b/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt index 2129da4..33c9ddf 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/SnapshotConfig.kt @@ -20,4 +20,7 @@ data class SnapshotConfig( * snapshot.lastVersion % modulo == 1 */ val modulo: Int = 10, + val enabled: Boolean = true, ) + +val DISABLED_CONFIG = SnapshotConfig(Int.MAX_VALUE, Duration.INFINITE, Int.MAX_VALUE, enabled = false) diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt index d174dec..493f30c 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt @@ -8,6 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.projection.gameList.GameList import eventDemo.configuration.configure +import io.kotest.assertions.nondeterministic.eventually import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.collections.shouldHaveSize @@ -24,6 +25,7 @@ import org.koin.core.context.stopKoin import org.koin.ktor.ext.inject import kotlin.test.assertEquals import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds class GameListRouteTest : FunSpec({ @@ -62,19 +64,22 @@ class GameListRouteTest : } } - httpClient() - .get("/games") { - withAuth(player1) - accept(ContentType.Application.Json) - }.apply { - assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) - call.body>().first().let { - it.status shouldBeEqual GameList.Status.OPENING - it.players shouldHaveSize 1 - it.players shouldContain player1 - it.winners shouldHaveSize 0 + // Wait until the projection is created + eventually(3.seconds) { + httpClient() + .get("/games") { + withAuth(player1) + accept(ContentType.Application.Json) + }.apply { + assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) + call.body>().first().let { + it.status shouldBeEqual GameList.Status.OPENING + it.players shouldHaveSize 1 + it.players shouldContain player1 + it.winners shouldHaveSize 0 + } } - } + } } } diff --git a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt index 5d18995..2fe6001 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt @@ -4,12 +4,17 @@ import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.GameEventHandler import eventDemo.business.event.event.NewPlayerEvent +import eventDemo.business.event.projection.gameState.GameState import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.configuration.injection.Configuration import eventDemo.configuration.injection.appKoinModule +import io.kotest.assertions.nondeterministic.eventually +import io.kotest.assertions.nondeterministic.eventuallyConfig import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.comparables.shouldBeGreaterThan import io.kotest.matchers.equals.shouldBeEqual +import io.kotest.matchers.ints.shouldBeLessThan import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll @@ -17,6 +22,7 @@ import kotlinx.coroutines.launch import org.koin.core.context.stopKoin import org.koin.dsl.koinApplication import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) class GameStateRepositoryTest : @@ -32,11 +38,14 @@ class GameStateRepositoryTest : eventHandler .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { event -> - assertNotNull(repo.getUntil(event)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) - } - assertNotNull(repo.getLast(aggregateId)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) + // Wait until the projection is created + eventually(1.seconds) { + assertNotNull(repo.getUntil(event)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } + assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1) + } } } } @@ -48,20 +57,31 @@ class GameStateRepositoryTest : koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply { val repo = get() val eventHandler = get() + val projectionBus = get() + + var state: GameState? = null + projectionBus.subscribe { + repo.getLast(aggregateId).also { + state = it + } + } eventHandler .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { - assertNotNull(repo.getLast(aggregateId)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1) + eventually(1.seconds) { + assertNotNull(state).players.isNotEmpty() shouldBeEqual true + assertNotNull(state).players shouldBeEqual setOf(player1) } } eventHandler .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) } .also { - assertNotNull(repo.getLast(aggregateId)).also { - assertNotNull(it.players) shouldBeEqual setOf(player1, player2) + eventually(1.seconds) { + assertNotNull(repo.getLast(aggregateId)).also { + assertNotNull(it.players) shouldBeEqual setOf(player1, player2) + } } } } @@ -121,9 +141,19 @@ class GameStateRepositoryTest : } }.joinAll() - repo.getLast(aggregateId).run { - lastEventVersion shouldBeEqual 1000 - players shouldHaveSize 1000 + eventually( + eventuallyConfig { + duration = 10.seconds + interval = 1.seconds + includeFirst = false + }, + ) { + repo.getLast(aggregateId).run { + lastEventVersion shouldBeEqual 1000 + players shouldHaveSize 1000 + } + repo.count(aggregateId) shouldBeGreaterThan 20 + repo.count(aggregateId) shouldBeLessThan 30 } } } diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt index 262cbde..c1c800b 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt @@ -6,14 +6,16 @@ import eventDemo.libs.event.Event import eventDemo.libs.event.EventStore import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.VersionBuilderLocal +import eventDemo.libs.event.projection.DISABLED_CONFIG import eventDemo.libs.event.projection.Projection import eventDemo.libs.event.projection.ProjectionSnapshotRepository import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis import eventDemo.libs.event.projection.SnapshotConfig +import io.kotest.assertions.nondeterministic.continually import io.kotest.core.spec.style.FunSpec -import io.kotest.datatest.WithDataTestName import io.kotest.datatest.withData +import io.kotest.engine.names.WithDataTestName import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope @@ -28,6 +30,7 @@ import java.util.UUID import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) class ProjectionSnapshotRepositoryTest : @@ -42,7 +45,7 @@ class ProjectionSnapshotRepositoryTest : val eventStores = listOf( - EventStoreInMemory(), + { EventStoreInMemory() }, ) val projectionRepo = listOf( @@ -52,17 +55,13 @@ class ProjectionSnapshotRepositoryTest : val list = eventStores.flatMap { store -> - projectionRepo.map { - TestData(store, it(store, SnapshotConfig(2000))) + projectionRepo.map { repo -> + store().let { store -> TestData(store, repo(store, DISABLED_CONFIG)) } } } - val nameFn: (TestData) -> String = { (eventStore, repo) -> - "${repo::class.simpleName} with ${eventStore::class.simpleName}" - } - context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { - withData(nameFn = nameFn, list) { (eventStore, repo) -> + withData(list) { (eventStore, repo) -> val aggregateId = IdTest() val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) @@ -97,25 +96,28 @@ class ProjectionSnapshotRepositoryTest : } } - context("ProjectionSnapshotRepositoryInMemory should be thread safe") { - withData(list) { (eventStore, repo) -> - val aggregateId = IdTest() - val versionBuilder = VersionBuilderLocal() - val lock = ReentrantLock() - (0..9) - .map { - GlobalScope.launch { - (1..10).map { - val eventX = - lock.withLock { - EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStore.publish(it) } - } - repo.applyAndPutToCache(eventX) + context("ProjectionSnapshotRepository should be thread safe") { + continually(1.seconds) { + withData(list) { (eventStore, repo) -> + val aggregateId = IdTest() + val versionBuilder = VersionBuilderLocal() + val lock = ReentrantLock() + (0..9) + .map { + GlobalScope.launch { + (1..10).map { + val eventX = + lock.withLock { + EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) + .also { eventStore.publish(it) } + } + repo.applyAndPutToCache(eventX) + } } - } - }.joinAll() - assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 + }.joinAll() + assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 + assertNotNull(repo.count(aggregateId)) shouldBeEqual 100 + } } } @@ -185,7 +187,7 @@ private data class EventXTest( private fun getSnapshotRepoInMemoryTest( eventStore: EventStore, - snapshotConfig: SnapshotConfig = SnapshotConfig(2000), + snapshotConfig: SnapshotConfig, ): ProjectionSnapshotRepository = ProjectionSnapshotRepositoryInMemory( eventStore = eventStore, @@ -196,7 +198,7 @@ private fun getSnapshotRepoInMemoryTest( private fun getSnapshotRepoInRedisTest( eventStore: EventStore, - snapshotConfig: SnapshotConfig = SnapshotConfig(2000), + snapshotConfig: SnapshotConfig, ): ProjectionSnapshotRepository = ProjectionSnapshotRepositoryInRedis( eventStore = eventStore,