fix and improve ProjectionSnapshotRepositoryInRedis

This commit is contained in:
2025-03-26 03:36:49 +01:00
parent 22792a0427
commit 442379dc49
18 changed files with 283 additions and 151 deletions

View File

@@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameList.apply
import eventDemo.business.event.projection.gameState.GameState
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext
/**
* Manages [projections][GameList], their building and publication in the [bus][GameProjectionBus].
@@ -30,11 +31,13 @@ class GameListRepositoryInMemory(
init {
eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) {
projectionsSnapshot
.applyAndPutToCache(event)
.also { projectionBus.publish(it) }
}
}
}
/**
* Get the last version of the [GameState] from the all eventStream.

View File

@@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.business.event.projection.gameState.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext
/**
* Manages [projections][GameState], their building and publication in the [bus][GameProjectionBus].
@@ -31,11 +32,13 @@ class GameStateRepositoryInMemory(
init {
// On new event was received, build snapshot and publish it to the projection bus
eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) {
projectionsSnapshot
.applyAndPutToCache(event)
.also { projectionBus.publish(it) }
}
}
}
/**
* Get the last version of the [GameState] from the all eventStream.
@@ -53,4 +56,7 @@ class GameStateRepositoryInMemory(
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
}

View File

@@ -10,6 +10,7 @@ import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.business.event.projection.gameState.apply
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.serialization.json.Json
import redis.clients.jedis.UnifiedJedis
@@ -38,11 +39,13 @@ class GameStateRepositoryInRedis(
init {
// On new event was received, build snapshot and publish it to the projection bus
eventBus.subscribe { event ->
withLoggingContext("event" to event.toString()) {
projectionsSnapshot
.applyAndPutToCache(event)
.also { projectionBus.publish(it) }
}
}
}
/**
* Get the last version of the [GameState] from the all eventStream.
@@ -60,4 +63,7 @@ class GameStateRepositoryInRedis(
*/
override fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.getUntil(event)
override fun count(gameId: GameId): Int =
projectionsSnapshot.count(gameId)
}

View File

@@ -9,7 +9,7 @@ import eventDemo.libs.event.Event
interface EventHandler<E : Event<ID>, ID : AggregateId> {
fun registerProjectionBuilder(builder: (event: E) -> Unit)
fun handle(
suspend fun handle(
aggregateId: ID,
buildEvent: (version: Int) -> E,
): E

View File

@@ -27,7 +27,7 @@ class GameEventHandler(
/**
* Build Event then send it to the event store and bus.
*/
override fun handle(
override suspend fun handle(
aggregateId: GameId,
buildEvent: (version: Int) -> GameEvent,
): GameEvent =

View File

@@ -7,4 +7,6 @@ interface GameStateRepository {
fun getLast(gameId: GameId): GameState
fun getUntil(event: GameEvent): GameState
fun count(gameId: GameId): Int
}

View File

@@ -33,11 +33,11 @@ class ReactionListener(
}
}
} else {
logger.error { "${this::class.java.simpleName} is already init for this bus" }
logger.error { "${this::class.simpleName} is already init for this bus" }
}
}
private fun sendStartGameEvent(state: GameState) {
private suspend fun sendStartGameEvent(state: GameState) {
if (state.isReady && !state.isStarted) {
val reactionEvent =
eventHandler.handle(state.aggregateId) {
@@ -54,7 +54,7 @@ class ReactionListener(
}
}
private fun sendWinnerEvent(state: GameState) {
private suspend fun sendWinnerEvent(state: GameState) {
val winner = state.playerHasNoCardLeft().firstOrNull()
if (winner != null) {
val reactionEvent =

View File

@@ -1,13 +1,13 @@
package eventDemo.libs.bus
interface Bus<E> {
fun publish(item: E)
interface Bus<T> {
suspend fun publish(item: T)
/**
* @param priority The higher the priority, the more it will be called first
*/
fun subscribe(
priority: Int = 0,
block: suspend (E) -> Unit,
block: suspend (T) -> Unit,
)
}

View File

@@ -1,17 +1,17 @@
package eventDemo.libs.bus
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.coroutineScope
class BusInMemory<E> : Bus<E> {
private val subscribers: MutableList<Pair<Int, suspend (E) -> Unit>> = mutableListOf()
override fun publish(item: E) {
override suspend fun publish(item: E) {
withLoggingContext("busItem" to item.toString()) {
subscribers
.sortedByDescending { (priority, _) -> priority }
.forEach { (_, block) ->
runBlocking {
withLoggingContext("busItem" to item.toString()) {
coroutineScope {
block(item)
}
}

View File

@@ -1,5 +1,6 @@
package eventDemo.libs.event
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
@@ -10,5 +11,7 @@ class EventStoreInMemory<E : Event<ID>, ID : AggregateId> : EventStore<E, ID> {
streams.computeIfAbsent(aggregateId) { EventStreamInMemory() }
override fun publish(event: E) =
withLoggingContext("event" to event.toString()) {
getStream(event.aggregateId).publish(event)
}
}

View File

@@ -1,5 +1,7 @@
package eventDemo.libs.event
import eventDemo.libs.event.projection.Projection
/**
* Interface representing an event stream for publishing and reading domain events
*/
@@ -17,5 +19,11 @@ interface EventStream<E : Event<*>> {
fun readVersionBetween(version: IntRange): Set<E>
fun <P : Projection<*>> readVersionBetween(
projection: P?,
event: E,
): Set<E> =
readVersionBetween(((projection?.lastEventVersion ?: 0) + 1)..event.version)
fun getByVersion(version: Int): E?
}

View File

@@ -9,10 +9,17 @@ interface ProjectionSnapshotRepository<E : Event<ID>, P : Projection<ID>, ID : A
*/
fun applyAndPutToCache(event: E): P
fun count(aggregateId: ID): Int
fun countAll(): Int
/**
* Build the list of all [Projections][Projection]
*/
fun getList(): List<P>
fun getList(
limit: Int = 100,
offset: Int = 0,
): List<P>
/**
* Build the last version of the [Projection] from the cache.

View File

@@ -18,6 +18,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
private val projectionsSnapshot: ConcurrentHashMap<ID, ConcurrentLinkedQueue<Pair<P, Instant>>> = ConcurrentHashMap()
private val logger = KotlinLogging.logger { }
/**
* Create a snapshot for the event
@@ -32,17 +33,30 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
override fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext("projection" to it.toString()) {
save(it)
removeOldSnapshot(it.aggregateId)
}
}
override fun count(aggregateId: ID): Int =
projectionsSnapshot[aggregateId]?.count() ?: 0
override fun countAll(): Int =
projectionsSnapshot.mappingCount().toInt()
/**
* Build the list of all [Projections][Projection]
*/
override fun getList(): List<P> =
projectionsSnapshot.map { (id, b) ->
override fun getList(
limit: Int,
offset: Int,
): List<P> =
projectionsSnapshot
.map { (id, b) ->
getLast(id)
}
}.drop(offset)
.take(limit)
/**
* Build the last version of the [Projection] from the cache.
@@ -75,7 +89,8 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
.readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
@@ -91,6 +106,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
*/
private fun removeOldSnapshot(aggregateId: ID) {
projectionsSnapshot[aggregateId]?.let { queue ->
if (snapshotCacheConfig.enabled) {
queue
.excludeFirstAndLast()
.excludeTheHeadBySize()
@@ -99,6 +115,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
.forEach { queue.remove(it) }
}
}
}
/**
* Return a new list without the first and last snapshot.
@@ -153,6 +170,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
projectionsSnapshot
.computeIfAbsent(projection.aggregateId) { ConcurrentLinkedQueue() }
.add(Pair(projection, Clock.System.now()))
.also { logger.info { "Projection saved" } }
}
/**
@@ -200,7 +218,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")

View File

@@ -7,7 +7,6 @@ import eventDemo.libs.toRanges
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SortingParams
import kotlin.reflect.KClass
@@ -21,6 +20,8 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
private val jsonToProjection: (String) -> P,
private val applyToProjection: P.(event: E) -> P,
) : ProjectionSnapshotRepository<E, P, ID> {
val logger = KotlinLogging.logger { }
/**
* Create a snapshot for the event
*
@@ -34,22 +35,33 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
override fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) {
save(it)
removeOldSnapshot(it.aggregateId, event.version)
}
}
override fun count(aggregateId: ID): Int =
jedis.zcount(projectionClass.redisKey(aggregateId), Double.MIN_VALUE, Double.MAX_VALUE).toInt()
override fun countAll(): Int =
jedis.zcount(projectionClass.redisKey, Double.MIN_VALUE, Double.MAX_VALUE).toInt()
/**
* Get the list of all [Projections][Projection]
*/
override fun getList(): List<P> =
override fun getList(
limit: Int,
offset: Int,
): List<P> =
jedis
.scan(
"0",
ScanParams()
.match(projectionClass.redisKeySearchListLatest)
.count(100),
).result
.map { jsonToProjection(it) }
.sort(
projectionClass.redisKeySearchList,
SortingParams()
.desc()
.by("score")
.limit(limit, offset),
).map { jsonToProjection(it) }
/**
* Get the last version of the [Projection] from the cache.
@@ -60,7 +72,13 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
*/
override fun getLast(aggregateId: ID): P =
jedis
.get(projectionClass.redisKeyLatest(aggregateId))
.sort(
projectionClass.redisKey(aggregateId),
SortingParams()
.desc()
.by("score")
.limit(0, 1),
).firstOrNull()
?.let(jsonToProjection)
?: initialStateBuilder(aggregateId)
@@ -76,22 +94,27 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
override fun getUntil(event: E): P {
val lastSnapshot =
jedis
.sort(
.zrangeByScore(
projectionClass.redisKey(event.aggregateId),
SortingParams()
.desc()
.by("score")
.limit(0, 1),
1.0,
event.version.toDouble(),
0,
1,
).firstOrNull()
?.let(jsonToProjection)
if (lastSnapshot?.lastEventVersion == event.version) {
return lastSnapshot
}
if (lastSnapshot != null && lastSnapshot.lastEventVersion > event.version) {
logger.error { "Cannot be apply event on more recent snapshot" }
error("Cannot be apply event on more recent snapshot")
}
val missingEventOfSnapshot =
eventStore
.getStream(event.aggregateId)
.readVersionBetween((lastSnapshot?.lastEventVersion ?: 1)..event.version)
// take the last snapshot version +1 to event version
.readVersionBetween(lastSnapshot, event)
return if (lastSnapshot?.lastEventVersion == event.version) {
lastSnapshot
@@ -101,9 +124,16 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
}
private fun save(projection: P) {
jedis.zadd(projection.redisKeyVersion, projection.lastEventVersion.toDouble(), projectionToJson(projection))
jedis.expire(projection.redisKeyVersion, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
jedis.set(projection.redisKeyLatest, projectionToJson(projection))
repeat(5) {
val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection))
if (added < 1) {
logger.error { "Projection NOT saved" }
} else {
logger.info { "Projection saved" }
return
}
}
jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
}
/**
@@ -123,7 +153,7 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
if (event.version == lastEventVersion + 1) {
applyToProjection(event)
} else if (event.version <= lastEventVersion) {
KotlinLogging.logger { }.warn { "Event is already is the Projection, skip apply." }
KotlinLogging.logger { }.warn { "Event is already in the Projection, skip apply." }
this
} else {
error("The version of the event must follow directly after the version of the projection.")
@@ -131,65 +161,74 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
}
}
private fun removeOldSnapshot(
fun removeOldSnapshot(
aggregateId: AggregateId,
lastVersion: Int,
) {
if (snapshotCacheConfig.enabled) {
removeByModulo(aggregateId, lastVersion)
removeTheHeadBySize(aggregateId)
removeTheHeadBySize(aggregateId, lastVersion)
}
}
private fun removeByModulo(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - snapshotCacheConfig.maxSnapshotCacheSize)
.let { if (it < 0) 0 else it }
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.let { if (it < 2) 2 else it }
.let { IntRange(it, lastVersion - 1) }
.filter { (it % snapshotCacheConfig.modulo) != 1 }
.toRanges()
.map {
jedis.zremrangeByScore(
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
it.min().toDouble(),
it.max().toDouble(),
)
it.first.toDouble(),
it.last.toDouble(),
).also { removedCount ->
if (removedCount > 0) {
logger.info {
"$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]"
}
}
}
}
}
private fun removeTheHeadBySize(aggregateId: AggregateId) {
val size =
jedis.zcount(
projectionClass.redisKey(aggregateId),
Double.MIN_VALUE,
Double.MAX_VALUE,
)
LongRange((size - snapshotCacheConfig.maxSnapshotCacheSize), size)
private fun removeTheHeadBySize(
aggregateId: AggregateId,
lastVersion: Int,
) {
(lastVersion - (snapshotCacheConfig.maxSnapshotCacheSize * snapshotCacheConfig.modulo))
.toDouble()
.let {
jedis.zremrangeByRank(
jedis
.zremrangeByScore(
projectionClass.redisKey(aggregateId),
1,
it.max(),
)
2.0,
it,
).also { removedCount ->
if (removedCount > 0) {
logger.info {
"$removedCount snapshot removed Size(${snapshotCacheConfig.maxSnapshotCacheSize}) (1.0 to $it) [lastVersion=$lastVersion]"
}
}
}
}
}
}
val <P : Projection<*>> KClass<P>.redisKeySearchListLatest: String get() {
return "projection:$simpleName:*:latest"
val <P : Projection<*>> KClass<P>.redisKeySearchList: String get() {
return "projection:$simpleName:*"
}
val <P : Projection<*>> P.redisKeyVersion: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}:$lastEventVersion"
val <P : Projection<*>> P.redisKey: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}"
}
val <P : Projection<*>> P.redisKeyLatest: String get() {
return "projection:${this::class.simpleName}:${aggregateId.id}:latest"
}
fun <A : AggregateId, P : Projection<*>> KClass<P>.redisKeyLatest(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}:latest"
fun <P : Projection<*>, A : AggregateId> KClass<P>.redisKey(aggregateId: A): String =
"projection:$simpleName:${aggregateId.id}"
val <P : Projection<*>> KClass<P>.redisKey: String get() =
"projection:$simpleName"

View File

@@ -20,4 +20,7 @@ data class SnapshotConfig(
* snapshot.lastVersion % modulo == 1
*/
val modulo: Int = 10,
val enabled: Boolean = true,
)
val DISABLED_CONFIG = SnapshotConfig(Int.MAX_VALUE, Duration.INFINITE, Int.MAX_VALUE, enabled = false)

View File

@@ -8,6 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.business.event.event.PlayerReadyEvent
import eventDemo.business.event.projection.gameList.GameList
import eventDemo.configuration.configure
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldContain
import io.kotest.matchers.collections.shouldHaveSize
@@ -24,6 +25,7 @@ import org.koin.core.context.stopKoin
import org.koin.ktor.ext.inject
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds
class GameListRouteTest :
FunSpec({
@@ -62,6 +64,8 @@ class GameListRouteTest :
}
}
// Wait until the projection is created
eventually(3.seconds) {
httpClient()
.get("/games") {
withAuth(player1)
@@ -77,6 +81,7 @@ class GameListRouteTest :
}
}
}
}
test("/games return a game with status IS_STARTED") {
testApplication {

View File

@@ -4,12 +4,17 @@ import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.event.NewPlayerEvent
import eventDemo.business.event.projection.gameState.GameState
import eventDemo.business.event.projection.gameState.GameStateRepository
import eventDemo.configuration.injection.Configuration
import eventDemo.configuration.injection.appKoinModule
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.assertions.nondeterministic.eventuallyConfig
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.comparables.shouldBeGreaterThan
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.ints.shouldBeLessThan
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.joinAll
@@ -17,6 +22,7 @@ import kotlinx.coroutines.launch
import org.koin.core.context.stopKoin
import org.koin.dsl.koinApplication
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds
@OptIn(DelicateCoroutinesApi::class)
class GameStateRepositoryTest :
@@ -32,6 +38,8 @@ class GameStateRepositoryTest :
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also { event ->
// Wait until the projection is created
eventually(1.seconds) {
assertNotNull(repo.getUntil(event)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
}
@@ -40,6 +48,7 @@ class GameStateRepositoryTest :
}
}
}
}
stopKoin()
}
@@ -48,24 +57,35 @@ class GameStateRepositoryTest :
koinApplication { modules(appKoinModule(Configuration("redis://localhost:6379"))) }.koin.apply {
val repo = get<GameStateRepository>()
val eventHandler = get<GameEventHandler>()
val projectionBus = get<GameProjectionBus>()
var state: GameState? = null
projectionBus.subscribe {
repo.getLast(aggregateId).also {
state = it
}
}
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) }
.also {
assertNotNull(repo.getLast(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1)
eventually(1.seconds) {
assertNotNull(state).players.isNotEmpty() shouldBeEqual true
assertNotNull(state).players shouldBeEqual setOf(player1)
}
}
eventHandler
.handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player2, version = it) }
.also {
eventually(1.seconds) {
assertNotNull(repo.getLast(aggregateId)).also {
assertNotNull(it.players) shouldBeEqual setOf(player1, player2)
}
}
}
}
}
test("getUntil should build the state until the event") {
repeat(10) {
@@ -121,10 +141,20 @@ class GameStateRepositoryTest :
}
}.joinAll()
eventually(
eventuallyConfig {
duration = 10.seconds
interval = 1.seconds
includeFirst = false
},
) {
repo.getLast(aggregateId).run {
lastEventVersion shouldBeEqual 1000
players shouldHaveSize 1000
}
repo.count(aggregateId) shouldBeGreaterThan 20
repo.count(aggregateId) shouldBeLessThan 30
}
}
}

View File

@@ -6,14 +6,16 @@ import eventDemo.libs.event.Event
import eventDemo.libs.event.EventStore
import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilderLocal
import eventDemo.libs.event.projection.DISABLED_CONFIG
import eventDemo.libs.event.projection.Projection
import eventDemo.libs.event.projection.ProjectionSnapshotRepository
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInRedis
import eventDemo.libs.event.projection.SnapshotConfig
import io.kotest.assertions.nondeterministic.continually
import io.kotest.core.spec.style.FunSpec
import io.kotest.datatest.WithDataTestName
import io.kotest.datatest.withData
import io.kotest.engine.names.WithDataTestName
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
@@ -28,6 +30,7 @@ import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds
@OptIn(DelicateCoroutinesApi::class)
class ProjectionSnapshotRepositoryTest :
@@ -42,7 +45,7 @@ class ProjectionSnapshotRepositoryTest :
val eventStores =
listOf(
EventStoreInMemory<TestEvents, IdTest>(),
{ EventStoreInMemory<TestEvents, IdTest>() },
)
val projectionRepo =
listOf(
@@ -52,17 +55,13 @@ class ProjectionSnapshotRepositoryTest :
val list =
eventStores.flatMap { store ->
projectionRepo.map {
TestData(store, it(store, SnapshotConfig(2000)))
projectionRepo.map { repo ->
store().let { store -> TestData(store, repo(store, DISABLED_CONFIG)) }
}
}
val nameFn: (TestData) -> String = { (eventStore, repo) ->
"${repo::class.simpleName} with ${eventStore::class.simpleName}"
}
context("when call applyAndPutToCache, the getUntil method must be use the built projection cache") {
withData(nameFn = nameFn, list) { (eventStore, repo) ->
withData(list) { (eventStore, repo) ->
val aggregateId = IdTest()
val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = IdTest())
@@ -97,7 +96,8 @@ class ProjectionSnapshotRepositoryTest :
}
}
context("ProjectionSnapshotRepositoryInMemory should be thread safe") {
context("ProjectionSnapshotRepository should be thread safe") {
continually(1.seconds) {
withData(list) { (eventStore, repo) ->
val aggregateId = IdTest()
val versionBuilder = VersionBuilderLocal()
@@ -116,6 +116,8 @@ class ProjectionSnapshotRepositoryTest :
}
}.joinAll()
assertNotNull(repo.getLast(aggregateId)).num shouldBeEqual 100
assertNotNull(repo.count(aggregateId)) shouldBeEqual 100
}
}
}
@@ -185,7 +187,7 @@ private data class EventXTest(
private fun getSnapshotRepoInMemoryTest(
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInMemory(
eventStore = eventStore,
@@ -196,7 +198,7 @@ private fun getSnapshotRepoInMemoryTest(
private fun getSnapshotRepoInRedisTest(
eventStore: EventStore<TestEvents, IdTest>,
snapshotConfig: SnapshotConfig = SnapshotConfig(2000),
snapshotConfig: SnapshotConfig,
): ProjectionSnapshotRepository<TestEvents, ProjectionTest, IdTest> =
ProjectionSnapshotRepositoryInRedis(
eventStore = eventStore,