A new snapshot is always created after new event
This commit is contained in:
@@ -15,10 +15,20 @@ import kotlin.time.Duration
|
|||||||
import kotlin.time.Duration.Companion.minutes
|
import kotlin.time.Duration.Companion.minutes
|
||||||
|
|
||||||
data class SnapshotConfig(
|
data class SnapshotConfig(
|
||||||
|
/**
|
||||||
|
* Keep snapshot when is on the head of the queue cache
|
||||||
|
*/
|
||||||
val maxSnapshotCacheSize: Int = 20,
|
val maxSnapshotCacheSize: Int = 20,
|
||||||
|
/**
|
||||||
|
* Keep snapshot when is newer of
|
||||||
|
*
|
||||||
|
* snapshot.date > now + maxSnapshotCacheTtl
|
||||||
|
*/
|
||||||
val maxSnapshotCacheTtl: Duration = 10.minutes,
|
val maxSnapshotCacheTtl: Duration = 10.minutes,
|
||||||
/**
|
/**
|
||||||
* Only create [snapshots][Projection] every [X][modulo] [events][Event]
|
* Keep snapshot when version is this modulo
|
||||||
|
*
|
||||||
|
* snapshot.lastVersion % modulo == 1
|
||||||
*/
|
*/
|
||||||
val modulo: Int = 10,
|
val modulo: Int = 10,
|
||||||
)
|
)
|
||||||
@@ -42,13 +52,11 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
* 6. remove old one
|
* 6. remove old one
|
||||||
*/
|
*/
|
||||||
fun applyAndPutToCache(event: E) {
|
fun applyAndPutToCache(event: E) {
|
||||||
if ((event.version % snapshotCacheConfig.modulo) == 1) {
|
getUntil(event)
|
||||||
getUntil(event)
|
.also {
|
||||||
.also {
|
save(it)
|
||||||
save(it)
|
removeOldSnapshot(it.aggregateId)
|
||||||
removeOldSnapshot(it.aggregateId)
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -100,46 +108,65 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the oldest snapshot.
|
* Remove the oldest [snapshot][P] of the [queue][projectionsSnapshot].
|
||||||
*
|
*
|
||||||
* The rules are pass in the controller.
|
* The rules are pass in the controller.
|
||||||
*/
|
*/
|
||||||
private fun removeOldSnapshot(aggregateId: ID) {
|
private fun removeOldSnapshot(aggregateId: ID) {
|
||||||
projectionsSnapshot[aggregateId]?.let { queue ->
|
projectionsSnapshot[aggregateId]?.let { queue ->
|
||||||
// never remove the last one
|
queue
|
||||||
val theLastOne = getLastSnapshot(aggregateId)
|
.excludeFirstAndLast()
|
||||||
removeByDate(queue, theLastOne)
|
.excludeTheHeadBySize()
|
||||||
removeBySize(queue, theLastOne)
|
.excludeNewerByDate()
|
||||||
|
.excludeByModulo()
|
||||||
|
.forEach { queue.remove(it) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun removeBySize(
|
/**
|
||||||
queue: ConcurrentLinkedQueue<Pair<P, Instant>>,
|
* Return a new list without the first and last snapshot.
|
||||||
theLastOne: Pair<P, Instant>?,
|
*
|
||||||
) {
|
* Exclude from deletion the first and the last.
|
||||||
// Remove if size exceeds the limit
|
*/
|
||||||
val size = queue.size
|
private fun FilteredList<P>.excludeFirstAndLast(): FilteredList<P> =
|
||||||
|
sortedBy { it.first.lastEventVersion }
|
||||||
|
.drop(1)
|
||||||
|
.dropLast(1)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a new list of event filtered by the version modulo.
|
||||||
|
*
|
||||||
|
* Exclude from deletion 1 element out of 10 (if modulo 10 in [config][snapshotCacheConfig]).
|
||||||
|
*/
|
||||||
|
private fun FilteredList<P>.excludeByModulo(): FilteredList<P> =
|
||||||
|
filter { (it.first.lastEventVersion % snapshotCacheConfig.modulo) != 1 }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a new list of event filtered by the maximum size.
|
||||||
|
*
|
||||||
|
* Exclude from removal all [snapshot][projectionsSnapshot] that in the head of the queue.
|
||||||
|
*/
|
||||||
|
private fun FilteredList<P>.excludeTheHeadBySize(): FilteredList<P> {
|
||||||
|
// filter if size exceeds the limit
|
||||||
if (size > snapshotCacheConfig.maxSnapshotCacheSize) {
|
if (size > snapshotCacheConfig.maxSnapshotCacheSize) {
|
||||||
val numberToRemove = size - snapshotCacheConfig.maxSnapshotCacheSize
|
val numberToRemove = size - snapshotCacheConfig.maxSnapshotCacheSize
|
||||||
if (numberToRemove > 0) {
|
if (numberToRemove > 0) {
|
||||||
queue
|
return sortedBy { it.first.lastEventVersion }
|
||||||
.sortedBy { it.first.lastEventVersion }
|
.takeLast(numberToRemove)
|
||||||
.take(numberToRemove)
|
|
||||||
.let { it - theLastOne }
|
|
||||||
.forEach { queue.remove(it) }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun removeByDate(
|
/**
|
||||||
queue: ConcurrentLinkedQueue<Pair<P, Instant>>,
|
* Return a new list of event filtered by the maximum date.
|
||||||
theLastOne: Pair<P, Instant>?,
|
*
|
||||||
) {
|
* Exclude from removal all [snapshot][projectionsSnapshot] that newer of the date (in [config][SnapshotConfig]).
|
||||||
// remove the oldest by time
|
*/
|
||||||
|
private fun FilteredList<P>.excludeNewerByDate(): FilteredList<P> {
|
||||||
val now = Clock.System.now()
|
val now = Clock.System.now()
|
||||||
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
|
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
|
||||||
val toRemove = queue.filter { deadLine > it.second }
|
return filter { deadLine < it.second }
|
||||||
(toRemove - theLastOne).forEach { queue.remove(it) }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -204,3 +231,5 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private typealias FilteredList<P> = Collection<Pair<P, Instant>>
|
||||||
|
|||||||
Reference in New Issue
Block a user