From 23cc3e35679ec6c910596af29aace40cfe1c2f1c Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Fri, 14 Mar 2025 02:06:28 +0100 Subject: [PATCH] Une modulo to create snapshot fix removeSnapshot --- .../ProjectionSnapshotRepositoryInMemory.kt | 68 ++++++++++++------- .../eventDemo/libs/event/EventStream.kt | 5 ++ .../libs/event/EventStreamInMemory.kt | 9 +++ 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt index c4cd6b8..e91e273 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -14,6 +14,10 @@ import kotlin.time.Duration.Companion.minutes data class SnapshotConfig( val maxSnapshotCacheSize: Int = 20, val maxSnapshotCacheTtl: Duration = 10.minutes, + /** + * Only create [snapshots][Projection] every [X][modulo] [events][Event] + */ + val modulo: Int = 10, ) class ProjectionSnapshotRepositoryInMemory, P : Projection, ID : AggregateId>( @@ -34,12 +38,15 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID * 5. save it * 6. remove old one */ - fun applyAndPutToCache(event: E): P = - getUntil(event) - .also { - save(it) - removeOldSnapshot(it.aggregateId) - } + fun applyAndPutToCache(event: E) { + if ((event.version % snapshotCacheConfig.modulo) == 0) { + getUntil(event) + .also { + save(it) + removeOldSnapshot(it.aggregateId) + } + } + } /** * Build the last version of the [Projection] from the cache. @@ -70,9 +77,9 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID } val missingEventOfSnapshot = - eventStream.readGreaterOfVersion( + eventStream.readVersionBetween( event.aggregateId, - lastSnapshot?.lastEventVersion ?: 0, + (lastSnapshot?.lastEventVersion ?: 1)..event.version, ) return if (lastSnapshot?.lastEventVersion == event.version) { @@ -91,27 +98,40 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID projectionsSnapshot[aggregateId]?.let { queue -> // never remove the last one val theLastOne = getLastSnapshot(aggregateId) + removeByDate(queue, theLastOne) + removeBySize(queue, theLastOne) + } + } - // remove the oldest by time - val now = Clock.System.now() - val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl - val toRemove = queue.filter { deadLine > it.second } - (toRemove - theLastOne).forEach { queue.remove(it) } - - // Remove if size exceeds the limit - if (queue.size > snapshotCacheConfig.maxSnapshotCacheSize) { - val numberToRemove = projectionsSnapshot.size - snapshotCacheConfig.maxSnapshotCacheSize - if (numberToRemove > 0) { - queue - .sortedByDescending { it.first.lastEventVersion } - .take(numberToRemove) - .let { it - theLastOne } - .forEach { queue.remove(it) } - } + private fun removeBySize( + queue: ConcurrentLinkedQueue>, + theLastOne: Pair?, + ) { + // Remove if size exceeds the limit + val size = queue.size + if (size > snapshotCacheConfig.maxSnapshotCacheSize) { + val numberToRemove = size - snapshotCacheConfig.maxSnapshotCacheSize + if (numberToRemove > 0) { + queue + .sortedBy { it.first.lastEventVersion } + .take(numberToRemove) + .let { it - theLastOne } + .forEach { queue.remove(it) } } } } + private fun removeByDate( + queue: ConcurrentLinkedQueue>, + theLastOne: Pair?, + ) { + // remove the oldest by time + val now = Clock.System.now() + val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl + val toRemove = queue.filter { deadLine > it.second } + (toRemove - theLastOne).forEach { queue.remove(it) } + } + /** * Save the snapshot. */ diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index be639b9..c9bbcf7 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -28,4 +28,9 @@ interface EventStream, ID : AggregateId> { aggregateId: ID, version: Int, ): Set + + fun readVersionBetween( + aggregateId: ID, + version: IntRange, + ): Set } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index 598b15d..f5837a3 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -51,6 +51,15 @@ class EventStreamInMemory, ID : AggregateId> : EventStream .filter { it.aggregateId == aggregateId } .filter { it.version > version } .toSet() + + override fun readVersionBetween( + aggregateId: ID, + version: IntRange, + ): Set = + events + .filter { it.aggregateId == aggregateId } + .filter { version.contains(it.version) } + .toSet() } inline fun , ID : AggregateId> EventStream.readLastOf(aggregateId: ID): R? =