From 9670a000f017839fb1d316c93588a55d120a454e Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Tue, 25 Mar 2025 01:05:52 +0100 Subject: [PATCH] create ProjectionSnapshotRepositoryInRedis --- .../injection/ConfigureDIInfrastructure.kt | 26 +- src/main/kotlin/eventDemo/libs/ListToRange.kt | 11 + .../ProjectionSnapshotRepositoryInRedis.kt | 194 +++++++++++++++ ...rojectionSnapshotRepositoryInMemoryTest.kt | 173 ------------- .../ProjectionSnapshotRepositoryTest.kt | 230 ++++++++++++++++++ 5 files changed, 458 insertions(+), 176 deletions(-) create mode 100644 src/main/kotlin/eventDemo/libs/ListToRange.kt create mode 100644 src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt delete mode 100644 src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt create mode 100644 src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index d2ef60a..46e97fe 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -11,15 +11,35 @@ import eventDemo.business.event.projection.GameProjectionBus import eventDemo.business.event.projection.gameList.GameListRepository import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.libs.event.projection.SnapshotConfig +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer import org.koin.core.module.Module import org.koin.core.module.dsl.singleOf import org.koin.dsl.bind import redis.clients.jedis.JedisPooled +import redis.clients.jedis.UnifiedJedis +import redis.clients.jedis.json.JsonObjectMapper fun Module.configureDIInfrastructure(redisUrl: String) { - single { - JedisPooled(redisUrl) - } + factory { + JedisPooled(redisUrl).apply { + setJsonObjectMapper( + object : JsonObjectMapper { + override fun fromJson( + value: String, + valueType: Class, + ): T { + val s: KSerializer = serializer(valueType) as KSerializer + return Json.decodeFromString(s, value) + } + + override fun toJson(value: Any): String = + Json.encodeToString(value) + }, + ) + } + } bind UnifiedJedis::class singleOf(::GameEventBusInMemory) bind GameEventBus::class singleOf(::GameEventStoreInMemory) bind GameEventStore::class diff --git a/src/main/kotlin/eventDemo/libs/ListToRange.kt b/src/main/kotlin/eventDemo/libs/ListToRange.kt new file mode 100644 index 0000000..4db29c3 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/ListToRange.kt @@ -0,0 +1,11 @@ +package eventDemo.libs + +fun List.toRanges(): List = + fold(listOf()) { acc, i -> + val last = acc.lastOrNull() + if (last != null && last.max() + 1 == i) { + (acc - setOf(last)) + setOf(IntRange(last.min(), i)) + } else { + acc + setOf(IntRange(i, i)) + } + } diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt new file mode 100644 index 0000000..ad783dc --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt @@ -0,0 +1,194 @@ +package eventDemo.libs.event.projection + +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import eventDemo.libs.event.EventStore +import eventDemo.libs.toRanges +import io.github.oshai.kotlinlogging.KotlinLogging +import io.github.oshai.kotlinlogging.withLoggingContext +import redis.clients.jedis.UnifiedJedis +import redis.clients.jedis.params.ScanParams +import redis.clients.jedis.params.SortingParams +import kotlin.reflect.KClass + +class ProjectionSnapshotRepositoryInRedis, P : Projection, ID : AggregateId>( + private val eventStore: EventStore, + private val jedis: UnifiedJedis, + private val initialStateBuilder: (aggregateId: ID) -> P, + private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), + private val projectionClass: KClass

, + private val projectionToJson: (P) -> String, + private val jsonToProjection: (String) -> P, + private val applyToProjection: P.(event: E) -> P, +) : ProjectionSnapshotRepository { + /** + * Create a snapshot for the event + * + * 1. get the last snapshot with a version lower than that of the event + * 2. get the events with a greater version of the snapshot + * 3. apply the event to the snapshot + * 4. apply the new event to the projection + * 5. save it + * 6. remove old one + */ + override fun applyAndPutToCache(event: E): P = + getUntil(event) + .also { + save(it) + removeOldSnapshot(it.aggregateId, event.version) + } + + /** + * Get the list of all [Projections][Projection] + */ + override fun getList(): List

= + jedis + .scan( + "0", + ScanParams() + .match(projectionClass.redisKeySearchListLatest) + .count(100), + ).result + .map { jsonToProjection(it) } + + /** + * Get the last version of the [Projection] from the cache. + * + * 1. get the last snapshot + * 2. get the missing event to the snapshot + * 3. apply the missing events to the snapshot + */ + override fun getLast(aggregateId: ID): P = + jedis + .get(projectionClass.redisKeyLatest(aggregateId)) + .let(jsonToProjection) + + /** + * Build the [Projection] to the specific [event][Event]. + * + * It does not contain the [events][Event] it after this one. + * + * 1. get the last snapshot before the event + * 2. get the events with a greater version of the snapshot but lower of passed event + * 3. apply the events to the snapshot + */ + override fun getUntil(event: E): P { + val lastSnapshot = + jedis + .sort( + projectionClass.redisKey(event.aggregateId), + SortingParams() + .desc() + .by("score") + .limit(0, 1), + ).firstOrNull() + ?.let(jsonToProjection) + if (lastSnapshot?.lastEventVersion == event.version) { + return lastSnapshot + } + + val missingEventOfSnapshot = + eventStore + .getStream(event.aggregateId) + .readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version) + + return if (lastSnapshot?.lastEventVersion == event.version) { + lastSnapshot + } else { + lastSnapshot.applyEvents(event.aggregateId, missingEventOfSnapshot) + } + } + + private fun save(projection: P) { + jedis.zadd(projection.redisKeyVersion, projection.lastEventVersion.toDouble(), projectionToJson(projection)) + jedis.expire(projection.redisKeyVersion, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) + jedis.set(projection.redisKeyLatest, projectionToJson(projection)) + } + + /** + * Apply events to the projection. + */ + private fun P?.applyEvents( + aggregateId: ID, + eventsToApply: Set, + ): P = + eventsToApply.fold(this ?: initialStateBuilder(aggregateId), applyToProjectionSecure) + + /** + * Wrap the [applyToProjection] lambda to avoid duplicate apply of the same event. + */ + private val applyToProjectionSecure: P.(event: E) -> P = { event -> + withLoggingContext("event" to event.toString(), "projection" to this.toString()) { + if (event.version == lastEventVersion + 1) { + applyToProjection(event) + } else if (event.version <= lastEventVersion) { + KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." } + this + } else { + error("The version of the event must follow directly after the version of the projection.") + } + } + } + + private fun removeOldSnapshot( + aggregateId: AggregateId, + lastVersion: Int, + ) { + removeByModulo(aggregateId, lastVersion) + removeTheHeadBySize(aggregateId) + } + + private fun removeByModulo( + aggregateId: AggregateId, + lastVersion: Int, + ) { + (lastVersion - snapshotCacheConfig.maxSnapshotCacheSize) + .let { if (it < 0) 0 else it } + .let { IntRange(it, lastVersion - 1) } + .filter { (it % snapshotCacheConfig.modulo) != 1 } + .toRanges() + .map { + jedis.zremrangeByScore( + projectionClass.redisKey(aggregateId), + it.min().toDouble(), + it.max().toDouble(), + ) + } + } + + private fun removeTheHeadBySize(aggregateId: AggregateId) { + val size = + jedis.zcount( + projectionClass.redisKey(aggregateId), + Double.MIN_VALUE, + Double.MAX_VALUE, + ) + + LongRange((size - snapshotCacheConfig.maxSnapshotCacheSize), size) + .let { + jedis.zremrangeByRank( + projectionClass.redisKey(aggregateId), + 1, + it.max(), + ) + } + } +} + +val

> KClass

.redisKeySearchListLatest: String get() { + return "projection:$simpleName:*:latest" +} + +val

> P.redisKeyVersion: String get() { + return "projection:${this::class.simpleName}:${aggregateId.id}:$lastEventVersion" +} + +val

> P.redisKeyLatest: String get() { + return "projection:${this::class.simpleName}:${aggregateId.id}:latest" +} + +fun > KClass

.redisKeyLatest(aggregateId: A): String = + "projection:$simpleName:${aggregateId.id}:latest" + +fun

, A : AggregateId> KClass

.redisKey(aggregateId: A): String = + "projection:$simpleName:${aggregateId.id}" diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt deleted file mode 100644 index 4a1a335..0000000 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryInMemoryTest.kt +++ /dev/null @@ -1,173 +0,0 @@ -package eventDemo.business.event.projection - -import eventDemo.libs.event.AggregateId -import eventDemo.libs.event.Event -import eventDemo.libs.event.EventStore -import eventDemo.libs.event.EventStoreInMemory -import eventDemo.libs.event.VersionBuilderLocal -import eventDemo.libs.event.projection.Projection -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory -import eventDemo.libs.event.projection.SnapshotConfig -import io.kotest.core.spec.style.FunSpec -import io.kotest.matchers.equals.shouldBeEqual -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import java.util.UUID -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock -import kotlin.test.assertNotNull - -@OptIn(DelicateCoroutinesApi::class) -class ProjectionSnapshotRepositoryInMemoryTest : - FunSpec({ - - test("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { - val eventStore: EventStore = EventStoreInMemory() - val repo = getSnapshotRepoTest(eventStore) - val aggregateId = IdTest() - - val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) - eventStore.publish(eventOther) - repo.applyAndPutToCache(eventOther) - assertNotNull(repo.getUntil(eventOther)).also { - assertNotNull(it.value) shouldBeEqual "valOther" - } - - val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) - eventStore.publish(event1) - repo.applyAndPutToCache(event1) - assertNotNull(repo.getLast(event1.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - - val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) - eventStore.publish(event2) - repo.applyAndPutToCache(event2) - assertNotNull(repo.getLast(event2.aggregateId)).also { - assertNotNull(it.value) shouldBeEqual "val1val2" - } - assertNotNull(repo.getUntil(event1)).also { - assertNotNull(it.value) shouldBeEqual "val1" - } - assertNotNull(repo.getUntil(event2)).also { - assertNotNull(it.value) shouldBeEqual "val1val2" - } - } - - test("ProjectionSnapshotRepositoryInMemory should be thread safe") { - val eventStore: EventStore = EventStoreInMemory() - val repo = getSnapshotRepoTest(eventStore) - val aggregateId = IdTest() - val versionBuilder = VersionBuilderLocal() - val lock = ReentrantLock() - (0..9) - .map { - GlobalScope.launch { - (1..10).map { - val eventX = - lock.withLock { - EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStore.publish(it) } - } - repo.applyAndPutToCache(eventX) - } - } - }.joinAll() - assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 - } - - test("removeOldSnapshot") { - val versionBuilder = VersionBuilderLocal() - val eventStore: EventStore = EventStoreInMemory() - val repo = getSnapshotRepoTest(eventStore, SnapshotConfig(2)) - val aggregateId = IdTest() - - fun buildEndSendEventX() { - EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) - .also { eventStore.publish(it) } - .also { repo.applyAndPutToCache(it) } - } - - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 1 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 2 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 3 - buildEndSendEventX() - repo.getLast(aggregateId).num shouldBeEqual 4 - } - }) - -@JvmInline -private value class IdTest( - override val id: UUID = UUID.randomUUID(), -) : AggregateId - -private data class ProjectionTest( - override val aggregateId: IdTest, - override val lastEventVersion: Int = 0, - var value: String? = null, - var num: Int = 0, -) : Projection - -private sealed interface TestEvents : Event - -private data class Event1Test( - override val eventId: UUID = UUID.randomUUID(), - override val aggregateId: IdTest, - override val createdAt: Instant = Clock.System.now(), - override val version: Int, - val value1: String, -) : TestEvents - -private data class Event2Test( - override val eventId: UUID = UUID.randomUUID(), - override val aggregateId: IdTest, - override val createdAt: Instant = Clock.System.now(), - override val version: Int, - val value2: String, -) : TestEvents - -private data class EventXTest( - override val eventId: UUID = UUID.randomUUID(), - override val aggregateId: IdTest, - override val createdAt: Instant = Clock.System.now(), - override val version: Int, - val num: Int, -) : TestEvents - -private fun getSnapshotRepoTest( - eventStore: EventStore, - snapshotConfig: SnapshotConfig = SnapshotConfig(2000), -): ProjectionSnapshotRepositoryInMemory = - ProjectionSnapshotRepositoryInMemory( - eventStore = eventStore, - initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, - snapshotCacheConfig = snapshotConfig, - ) { event -> - this.let { projection -> - when (event) { - is Event1Test -> { - projection.copy(value = (projection.value ?: "") + event.value1) - } - - is Event2Test -> { - projection.copy(value = (projection.value ?: "") + event.value2) - } - - is EventXTest -> { - projection.copy(num = projection.num + event.num) - } - }.copy( - lastEventVersion = event.version, - ) - } - } diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt new file mode 100644 index 0000000..262cbde --- /dev/null +++ b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt @@ -0,0 +1,230 @@ +package eventDemo.business.event.projection + +import eventDemo.configuration.serializer.UUIDSerializer +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import eventDemo.libs.event.EventStore +import eventDemo.libs.event.EventStoreInMemory +import eventDemo.libs.event.VersionBuilderLocal +import eventDemo.libs.event.projection.Projection +import eventDemo.libs.event.projection.ProjectionSnapshotRepository +import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory +import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis +import eventDemo.libs.event.projection.SnapshotConfig +import io.kotest.core.spec.style.FunSpec +import io.kotest.datatest.WithDataTestName +import io.kotest.datatest.withData +import io.kotest.matchers.equals.shouldBeEqual +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import redis.clients.jedis.JedisPooled +import java.util.UUID +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import kotlin.test.assertNotNull + +@OptIn(DelicateCoroutinesApi::class) +class ProjectionSnapshotRepositoryTest : + FunSpec({ + data class TestData( + val store: EventStore, + val snapshotRepo: ProjectionSnapshotRepository, + ) : WithDataTestName { + override fun dataTestName(): String = + "${snapshotRepo::class.simpleName} with ${store::class.simpleName}" + } + + val eventStores = + listOf( + EventStoreInMemory(), + ) + val projectionRepo = + listOf( + ::getSnapshotRepoInMemoryTest, + ::getSnapshotRepoInRedisTest, + ) + + val list = + eventStores.flatMap { store -> + projectionRepo.map { + TestData(store, it(store, SnapshotConfig(2000))) + } + } + + val nameFn: (TestData) -> String = { (eventStore, repo) -> + "${repo::class.simpleName} with ${eventStore::class.simpleName}" + } + + context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") { + withData(nameFn = nameFn, list) { (eventStore, repo) -> + val aggregateId = IdTest() + + val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest()) + eventStore.publish(eventOther) + repo.applyAndPutToCache(eventOther) + assertNotNull(repo.getUntil(eventOther)).also { + assertNotNull(it.value) shouldBeEqual "valOther" + } + + val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) + eventStore.publish(event1) + repo.applyAndPutToCache(event1) + assertNotNull(repo.getLast(event1.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + + val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) + eventStore.publish(event2) + repo.applyAndPutToCache(event2) + assertNotNull(repo.getLast(event2.aggregateId)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" + } + assertNotNull(repo.getUntil(event1)).also { + assertNotNull(it.value) shouldBeEqual "val1" + } + assertNotNull(repo.getUntil(event2)).also { + assertNotNull(it.value) shouldBeEqual "val1val2" + } + } + } + + context("ProjectionSnapshotRepositoryInMemory should be thread safe") { + withData(list) { (eventStore, repo) -> + val aggregateId = IdTest() + val versionBuilder = VersionBuilderLocal() + val lock = ReentrantLock() + (0..9) + .map { + GlobalScope.launch { + (1..10).map { + val eventX = + lock.withLock { + EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) + .also { eventStore.publish(it) } + } + repo.applyAndPutToCache(eventX) + } + } + }.joinAll() + assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100 + } + } + + context("removeOldSnapshot") { + withData(list) { (eventStore, repo) -> + val versionBuilder = VersionBuilderLocal() + val aggregateId = IdTest() + + fun buildEndSendEventX() { + EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) + .also { eventStore.publish(it) } + .also { repo.applyAndPutToCache(it) } + } + + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 1 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 2 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 3 + buildEndSendEventX() + repo.getLast(aggregateId).num shouldBeEqual 4 + } + } + }) + +@JvmInline +@Serializable +private value class IdTest( + @Serializable(with = UUIDSerializer::class) + override val id: UUID = UUID.randomUUID(), +) : AggregateId + +@Serializable +private data class ProjectionTest( + override val aggregateId: IdTest, + override val lastEventVersion: Int = 0, + var value: String? = null, + var num: Int = 0, +) : Projection + +private sealed interface TestEvents : Event + +private data class Event1Test( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val value1: String, +) : TestEvents + +private data class Event2Test( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val value2: String, +) : TestEvents + +private data class EventXTest( + override val eventId: UUID = UUID.randomUUID(), + override val aggregateId: IdTest, + override val createdAt: Instant = Clock.System.now(), + override val version: Int, + val num: Int, +) : TestEvents + +private fun getSnapshotRepoInMemoryTest( + eventStore: EventStore, + snapshotConfig: SnapshotConfig = SnapshotConfig(2000), +): ProjectionSnapshotRepository = + ProjectionSnapshotRepositoryInMemory( + eventStore = eventStore, + initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, + snapshotCacheConfig = snapshotConfig, + applyToProjection = apply, + ) + +private fun getSnapshotRepoInRedisTest( + eventStore: EventStore, + snapshotConfig: SnapshotConfig = SnapshotConfig(2000), +): ProjectionSnapshotRepository = + ProjectionSnapshotRepositoryInRedis( + eventStore = eventStore, + jedis = JedisPooled("redis://localhost:6379"), + initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, + snapshotCacheConfig = snapshotConfig, + projectionClass = ProjectionTest::class, + projectionToJson = { Json.encodeToString(it) }, + jsonToProjection = { Json.decodeFromString(it) }, + applyToProjection = apply, + ) + +private val apply: ProjectionTest.(TestEvents) -> ProjectionTest = { event -> + this.let { projection -> + when (event) { + is Event1Test -> { + projection.copy(value = (projection.value ?: "") + event.value1) + } + + is Event2Test -> { + projection.copy(value = (projection.value ?: "") + event.value2) + } + + is EventXTest -> { + projection.copy(num = projection.num + event.num) + } + }.copy( + lastEventVersion = event.version, + ) + } +}