Une modulo to create snapshot
fix removeSnapshot
This commit is contained in:
@@ -14,6 +14,10 @@ import kotlin.time.Duration.Companion.minutes
|
|||||||
data class SnapshotConfig(
|
data class SnapshotConfig(
|
||||||
val maxSnapshotCacheSize: Int = 20,
|
val maxSnapshotCacheSize: Int = 20,
|
||||||
val maxSnapshotCacheTtl: Duration = 10.minutes,
|
val maxSnapshotCacheTtl: Duration = 10.minutes,
|
||||||
|
/**
|
||||||
|
* Only create [snapshots][Projection] every [X][modulo] [events][Event]
|
||||||
|
*/
|
||||||
|
val modulo: Int = 10,
|
||||||
)
|
)
|
||||||
|
|
||||||
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
|
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
|
||||||
@@ -34,12 +38,15 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
* 5. save it
|
* 5. save it
|
||||||
* 6. remove old one
|
* 6. remove old one
|
||||||
*/
|
*/
|
||||||
fun applyAndPutToCache(event: E): P =
|
fun applyAndPutToCache(event: E) {
|
||||||
getUntil(event)
|
if ((event.version % snapshotCacheConfig.modulo) == 0) {
|
||||||
.also {
|
getUntil(event)
|
||||||
save(it)
|
.also {
|
||||||
removeOldSnapshot(it.aggregateId)
|
save(it)
|
||||||
}
|
removeOldSnapshot(it.aggregateId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the last version of the [Projection] from the cache.
|
* Build the last version of the [Projection] from the cache.
|
||||||
@@ -70,9 +77,9 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
}
|
}
|
||||||
|
|
||||||
val missingEventOfSnapshot =
|
val missingEventOfSnapshot =
|
||||||
eventStream.readGreaterOfVersion(
|
eventStream.readVersionBetween(
|
||||||
event.aggregateId,
|
event.aggregateId,
|
||||||
lastSnapshot?.lastEventVersion ?: 0,
|
(lastSnapshot?.lastEventVersion ?: 1)..event.version,
|
||||||
)
|
)
|
||||||
|
|
||||||
return if (lastSnapshot?.lastEventVersion == event.version) {
|
return if (lastSnapshot?.lastEventVersion == event.version) {
|
||||||
@@ -91,27 +98,40 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
|
|||||||
projectionsSnapshot[aggregateId]?.let { queue ->
|
projectionsSnapshot[aggregateId]?.let { queue ->
|
||||||
// never remove the last one
|
// never remove the last one
|
||||||
val theLastOne = getLastSnapshot(aggregateId)
|
val theLastOne = getLastSnapshot(aggregateId)
|
||||||
|
removeByDate(queue, theLastOne)
|
||||||
|
removeBySize(queue, theLastOne)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// remove the oldest by time
|
private fun removeBySize(
|
||||||
val now = Clock.System.now()
|
queue: ConcurrentLinkedQueue<Pair<P, Instant>>,
|
||||||
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
|
theLastOne: Pair<P, Instant>?,
|
||||||
val toRemove = queue.filter { deadLine > it.second }
|
) {
|
||||||
(toRemove - theLastOne).forEach { queue.remove(it) }
|
// Remove if size exceeds the limit
|
||||||
|
val size = queue.size
|
||||||
// Remove if size exceeds the limit
|
if (size > snapshotCacheConfig.maxSnapshotCacheSize) {
|
||||||
if (queue.size > snapshotCacheConfig.maxSnapshotCacheSize) {
|
val numberToRemove = size - snapshotCacheConfig.maxSnapshotCacheSize
|
||||||
val numberToRemove = projectionsSnapshot.size - snapshotCacheConfig.maxSnapshotCacheSize
|
if (numberToRemove > 0) {
|
||||||
if (numberToRemove > 0) {
|
queue
|
||||||
queue
|
.sortedBy { it.first.lastEventVersion }
|
||||||
.sortedByDescending { it.first.lastEventVersion }
|
.take(numberToRemove)
|
||||||
.take(numberToRemove)
|
.let { it - theLastOne }
|
||||||
.let { it - theLastOne }
|
.forEach { queue.remove(it) }
|
||||||
.forEach { queue.remove(it) }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun removeByDate(
|
||||||
|
queue: ConcurrentLinkedQueue<Pair<P, Instant>>,
|
||||||
|
theLastOne: Pair<P, Instant>?,
|
||||||
|
) {
|
||||||
|
// remove the oldest by time
|
||||||
|
val now = Clock.System.now()
|
||||||
|
val deadLine = now - snapshotCacheConfig.maxSnapshotCacheTtl
|
||||||
|
val toRemove = queue.filter { deadLine > it.second }
|
||||||
|
(toRemove - theLastOne).forEach { queue.remove(it) }
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Save the snapshot.
|
* Save the snapshot.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -28,4 +28,9 @@ interface EventStream<E : Event<ID>, ID : AggregateId> {
|
|||||||
aggregateId: ID,
|
aggregateId: ID,
|
||||||
version: Int,
|
version: Int,
|
||||||
): Set<E>
|
): Set<E>
|
||||||
|
|
||||||
|
fun readVersionBetween(
|
||||||
|
aggregateId: ID,
|
||||||
|
version: IntRange,
|
||||||
|
): Set<E>
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,6 +51,15 @@ class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID>
|
|||||||
.filter { it.aggregateId == aggregateId }
|
.filter { it.aggregateId == aggregateId }
|
||||||
.filter { it.version > version }
|
.filter { it.version > version }
|
||||||
.toSet()
|
.toSet()
|
||||||
|
|
||||||
|
override fun readVersionBetween(
|
||||||
|
aggregateId: ID,
|
||||||
|
version: IntRange,
|
||||||
|
): Set<E> =
|
||||||
|
events
|
||||||
|
.filter { it.aggregateId == aggregateId }
|
||||||
|
.filter { version.contains(it.version) }
|
||||||
|
.toSet()
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? =
|
inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? =
|
||||||
|
|||||||
Reference in New Issue
Block a user