diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt index d8f3eb0..2f64a1c 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -15,10 +15,20 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes data class SnapshotConfig( + /** + * Keep snapshot when is on the head of the queue cache + */ val maxSnapshotCacheSize: Int = 20, + /** + * Keep snapshot when is newer of + * + * snapshot.date > now + maxSnapshotCacheTtl + */ val maxSnapshotCacheTtl: Duration = 10.minutes, /** - * Only create [snapshots][Projection] every [X][modulo] [events][Event] + * Keep snapshot when version is this modulo + * + * snapshot.lastVersion % modulo == 1 */ val modulo: Int = 10, ) @@ -42,13 +52,11 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID * 6. remove old one */ fun applyAndPutToCache(event: E) { - if ((event.version % snapshotCacheConfig.modulo) == 1) { - getUntil(event) - .also { - save(it) - removeOldSnapshot(it.aggregateId) - } - } + getUntil(event) + .also { + save(it) + removeOldSnapshot(it.aggregateId) + } } /** @@ -100,46 +108,65 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID } /** - * Remove the oldest snapshot. + * Remove the oldest [snapshot][P] of the [queue][projectionsSnapshot]. * * The rules are pass in the controller. */ private fun removeOldSnapshot(aggregateId: ID) { projectionsSnapshot[aggregateId]?.let { queue -> - // never remove the last one - val theLastOne = getLastSnapshot(aggregateId) - removeByDate(queue, theLastOne) - removeBySize(queue, theLastOne) + queue + .excludeFirstAndLast() + .excludeTheHeadBySize() + .excludeNewerByDate() + .excludeByModulo() + .forEach { queue.remove(it) } } } - private fun removeBySize( - queue: ConcurrentLinkedQueue>, - theLastOne: Pair?, - ) { - // Remove if size exceeds the limit - val size = queue.size + /** + * Return a new list without the first and last snapshot. + * + * Exclude from deletion the first and the last. + */ + private fun FilteredList

.excludeFirstAndLast(): FilteredList

= + sortedBy { it.first.lastEventVersion } + .drop(1) + .dropLast(1) + + /** + * Return a new list of event filtered by the version modulo. + * + * Exclude from deletion 1 element out of 10 (if modulo 10 in [config][snapshotCacheConfig]). + */ + private fun FilteredList

.excludeByModulo(): FilteredList

= + filter { (it.first.lastEventVersion % snapshotCacheConfig.modulo) != 1 } + + /** + * Return a new list of event filtered by the maximum size. + * + * Exclude from removal all [snapshot][projectionsSnapshot] that in the head of the queue. + */ + private fun FilteredList

.excludeTheHeadBySize(): FilteredList

{ + // filter if size exceeds the limit if (size > snapshotCacheConfig.maxSnapshotCacheSize) { val numberToRemove = size - snapshotCacheConfig.maxSnapshotCacheSize if (numberToRemove > 0) { - queue - .sortedBy { it.first.lastEventVersion } - .take(numberToRemove) - .let { it - theLastOne } - .forEach { queue.remove(it) } + return sortedBy { it.first.lastEventVersion } + .takeLast(numberToRemove) } } + return this } - private fun removeByDate( - queue: ConcurrentLinkedQueue>, - theLastOne: Pair?, - ) { - // remove the oldest by time + /** + * Return a new list of event filtered by the maximum date. + * + * Exclude from removal all [snapshot][projectionsSnapshot] that newer of the date (in [config][SnapshotConfig]). + */ + private fun FilteredList

.excludeNewerByDate(): FilteredList

{ val now = Clock.System.now() val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl - val toRemove = queue.filter { deadLine > it.second } - (toRemove - theLastOne).forEach { queue.remove(it) } + return filter { deadLine < it.second } } /** @@ -204,3 +231,5 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID } } } + +private typealias FilteredList

= Collection>