create interface ProjectionSnapshotRepository

This commit is contained in:
2025-03-23 22:43:33 +01:00
parent dd7cfb943e
commit 23b304fdbd
2 changed files with 33 additions and 5 deletions

View File

@@ -0,0 +1,28 @@
package eventDemo.libs.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
interface ProjectionSnapshotRepository<E : Event<ID>, P : Projection<ID>, ID : AggregateId> {
/**
* Create a snapshot for the event
*/
fun applyAndPutToCache(event: E): P
/**
* Build the list of all [Projections][Projection]
*/
fun getList(): List<P>
/**
* Build the last version of the [Projection] from the cache.
*/
fun getLast(aggregateId: ID): P
/**
* Build the [Projection] to the specific [event][Event].
*
* It does not contain the [events][Event] it after this one.
*/
fun getUntil(event: E): P
}

View File

@@ -16,7 +16,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
private val initialStateBuilder: (aggregateId: ID) -> P, private val initialStateBuilder: (aggregateId: ID) -> P,
private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(), private val snapshotCacheConfig: SnapshotConfig = SnapshotConfig(),
private val applyToProjection: P.(event: E) -> P, private val applyToProjection: P.(event: E) -> P,
) { ) : ProjectionSnapshotRepository<E, P, ID> {
private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap() private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap()
/** /**
@@ -29,7 +29,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
* 5. save it * 5. save it
* 6. remove old one * 6. remove old one
*/ */
fun applyAndPutToCache(event: E): P = override fun applyAndPutToCache(event: E): P =
getUntil(event) getUntil(event)
.also { .also {
save(it) save(it)
@@ -39,7 +39,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
/** /**
* Build the list of all [Projections][Projection] * Build the list of all [Projections][Projection]
*/ */
fun getList(): List<P> = override fun getList(): List<P> =
projectionsSnapshot.map { (id, b) -> projectionsSnapshot.map { (id, b) ->
getLast(id) getLast(id)
} }
@@ -51,7 +51,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
* 2. get the missing event to the snapshot * 2. get the missing event to the snapshot
* 3. apply the missing events to the snapshot * 3. apply the missing events to the snapshot
*/ */
fun getLast(aggregateId: ID): P { override fun getLast(aggregateId: ID): P {
val lastSnapshot = getLastSnapshot(aggregateId)?.first val lastSnapshot = getLastSnapshot(aggregateId)?.first
val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot) val missingEventOfSnapshot = getEventAfterTheSnapshot(aggregateId, lastSnapshot)
return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot) return lastSnapshot.applyEvents(aggregateId, missingEventOfSnapshot)
@@ -66,7 +66,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
* 2. get the events with a greater version of the snapshot but lower of passed event * 2. get the events with a greater version of the snapshot but lower of passed event
* 3. apply the events to the snapshot * 3. apply the events to the snapshot
*/ */
fun getUntil(event: E): P { override fun getUntil(event: E): P {
val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first val lastSnapshot = getLastSnapshotBeforeOrEqualEvent(event)?.first
if (lastSnapshot?.lastEventVersion == event.version) { if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot return lastSnapshot