From 70d596acf01cb67aadfe7210a89b6f03f6b49526 Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Sun, 30 Mar 2025 03:09:01 +0200 Subject: [PATCH] create GameListRepositoryInRedis --- .../projection/GameListRepositoryInRedis.kt | 57 +++++++++++++++++++ .../eventDemo/configuration/Configure.kt | 3 +- .../injection/ConfigureDIInfrastructure.kt | 6 +- .../ProjectionSnapshotRepositoryInRedis.kt | 27 +++++---- 4 files changed, 79 insertions(+), 14 deletions(-) create mode 100644 src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt new file mode 100644 index 0000000..a673212 --- /dev/null +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameListRepositoryInRedis.kt @@ -0,0 +1,57 @@ +package eventDemo.adapter.infrastructureLayer.event.projection + +import eventDemo.business.entity.GameId +import eventDemo.business.event.GameEventBus +import eventDemo.business.event.GameEventStore +import eventDemo.business.event.projection.GameProjectionBus +import eventDemo.business.event.projection.gameList.GameList +import eventDemo.business.event.projection.gameList.GameListRepository +import eventDemo.business.event.projection.gameList.apply +import eventDemo.business.event.projection.gameState.GameState +import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis +import eventDemo.libs.event.projection.SnapshotConfig +import io.github.oshai.kotlinlogging.withLoggingContext +import kotlinx.serialization.json.Json +import redis.clients.jedis.UnifiedJedis + +/** + * Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus]. + */ +class GameListRepositoryInRedis( + eventStore: GameEventStore, + jedis: UnifiedJedis, + snapshotConfig: SnapshotConfig = SnapshotConfig(), +) : GameListRepository { + private val projectionsSnapshot = + ProjectionSnapshotRepositoryInRedis( + eventStore = eventStore, + snapshotCacheConfig = snapshotConfig, + initialStateBuilder = { aggregateId: GameId -> GameList(aggregateId) }, + projectionClass = GameList::class, + projectionToJson = { Json.encodeToString(GameList.serializer(), it) }, + jsonToProjection = { Json.decodeFromString(GameList.serializer(), it) }, + applyToProjection = GameList::apply, + jedis = jedis, + ) + + fun subscribeToBus( + projectionBus: GameProjectionBus, + eventBus: GameEventBus, + ) { + eventBus.subscribe { event -> + withLoggingContext("event" to event.toString()) { + projectionsSnapshot + .applyAndPutToCache(event) + .also { projectionBus.publish(it) } + } + } + } + + /** + * Get the last version of the [GameState] from the all eventStream. + * + * It fetches it from the local cache if possible, otherwise it builds it. + */ + override fun getList(): List = + projectionsSnapshot.getList() +} diff --git a/src/main/kotlin/eventDemo/configuration/Configure.kt b/src/main/kotlin/eventDemo/configuration/Configure.kt index 822f704..4fb602c 100644 --- a/src/main/kotlin/eventDemo/configuration/Configure.kt +++ b/src/main/kotlin/eventDemo/configuration/Configure.kt @@ -10,6 +10,7 @@ import eventDemo.configuration.route.declareHttpGameRoute import eventDemo.configuration.route.declareWebSocketsGameRoute import io.ktor.server.application.Application import org.koin.ktor.ext.get +import org.koin.ktor.ext.getKoin fun Application.configure() { configureKoin() @@ -24,5 +25,5 @@ fun Application.configure() { configureHttpRouting() declareHttpGameRoute() - configureGameListener() + getKoin().configureGameListener() } diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index ec50da2..4e10e36 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -4,7 +4,7 @@ import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource import eventDemo.adapter.infrastructureLayer.event.GameEventBusInMemory import eventDemo.adapter.infrastructureLayer.event.GameEventStoreInPostgresql -import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInMemory +import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInRedis import eventDemo.adapter.infrastructureLayer.event.projection.GameProjectionBusInMemory import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis import eventDemo.business.event.GameEventBus @@ -21,7 +21,7 @@ import redis.clients.jedis.UnifiedJedis import javax.sql.DataSource fun Module.configureDIInfrastructure(config: Configuration) { - factory { + single { JedisPooled(config.redisUrl) } bind UnifiedJedis::class @@ -47,6 +47,6 @@ fun Module.configureDIInfrastructure(config: Configuration) { } bind GameStateRepository::class single { - GameListRepositoryInMemory(get(), get(), get(), snapshotConfig = SnapshotConfig()) + GameListRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) } bind GameListRepository::class } diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt index caae5a6..afcb086 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt @@ -7,6 +7,7 @@ import eventDemo.libs.toRanges import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.withLoggingContext import redis.clients.jedis.UnifiedJedis +import redis.clients.jedis.params.ScanParams import redis.clients.jedis.params.SortingParams import kotlin.reflect.KClass @@ -55,13 +56,16 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID offset: Int, ): List

= jedis - .sort( - projectionClass.redisKeySearchList, - SortingParams() - .desc() - .by("score") - .limit(limit, offset), - ).map { jsonToProjection(it) } + .scan( + offset.toString(), + ScanParams() + .match(projectionClass.redisKeySearchList) + .count(limit), + ).result + .mapNotNull { key -> + getLastByKey(key) + ?.let(jsonToProjection) + } /** * Get the last version of the [Projection] from the cache. @@ -71,16 +75,19 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID * 3. apply the missing events to the snapshot */ override fun getLast(aggregateId: ID): P = + getLastByKey(projectionClass.redisKey(aggregateId)) + ?.let(jsonToProjection) + ?: initialStateBuilder(aggregateId) + + private fun getLastByKey(key: String): String? = jedis .sort( - projectionClass.redisKey(aggregateId), + key, SortingParams() .desc() .by("score") .limit(0, 1), ).firstOrNull() - ?.let(jsonToProjection) - ?: initialStateBuilder(aggregateId) /** * Build the [Projection] to the specific [event][Event].