test: fix GameStateRouteTest

This commit is contained in:
2025-04-14 21:29:36 +02:00
parent 4086a21dcb
commit a7f82e6bce
10 changed files with 35 additions and 66 deletions

View File

@@ -30,7 +30,10 @@ class ReactionListener(
}
}
} else {
logger.error { "${this::class.simpleName} is already init for this bus" }
"${this::class.simpleName} is already init for this bus".let {
logger.error { it }
error(it)
}
}
}

View File

@@ -13,7 +13,7 @@ class BusInMemory<E>(
override suspend fun publish(item: E) {
withLoggingContext("busItem" to item.toString()) {
logger.info { "Item sent to the bus: $item" }
logger.info { "Item sent to the bus" }
subscribers
.forEach {
coroutineScope {

View File

@@ -6,6 +6,7 @@ import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.utils.io.core.toByteArray
import kotlinx.coroutines.runBlocking
@@ -15,6 +16,8 @@ class BusInRabbitMQ<E>(
private val objectToString: (E) -> String,
private val stringToObject: (String) -> E,
) : Bus<E> {
private val logger = KotlinLogging.logger { }
private val connection: Connection = connectionFactory.newConnection()
get() {
return if (field.isOpen) {
@@ -50,6 +53,7 @@ class BusInRabbitMQ<E>(
objectToString(item).toByteArray(),
)
}
logger.info { "Item sent to the bus" }
}
override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription {

View File

@@ -7,7 +7,7 @@ interface ProjectionSnapshotRepository<E : Event<ID>, P : Projection<ID>, ID : A
/**
* Create a snapshot for the event
*/
fun applyAndPutToCache(event: E): P
suspend fun applyAndPutToCache(event: E): P
fun count(aggregateId: ID): Int

View File

@@ -32,7 +32,7 @@ class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID
* 5. save it
* 6. remove old one
*/
override fun applyAndPutToCache(event: E): P =
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext("projection" to it.toString()) {

View File

@@ -33,7 +33,7 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
* 5. save it
* 6. remove old one
*/
override fun applyAndPutToCache(event: E): P =
override suspend fun applyAndPutToCache(event: E): P =
getUntil(event)
.also {
withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) {
@@ -131,16 +131,13 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
}
private fun save(projection: P) {
repeat(5) {
val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection))
if (added < 1) {
logger.error { "Projection NOT saved" }
} else {
logger.info { "Projection saved" }
return
}
val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection))
if (added < 1) {
logger.error { "Projection NOT saved (already exists)" }
} else {
logger.info { "Projection saved" }
jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
}
jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds)
}
/**
@@ -195,7 +192,7 @@ class ProjectionSnapshotRepositoryInRedis<E : Event<ID>, P : Projection<ID>, ID
it.last.toDouble(),
).also { removedCount ->
if (removedCount > 0) {
logger.info {
logger.debug {
"$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]"
}
}