diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt index 27fa66d..9e2a6ce 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt @@ -30,7 +30,10 @@ class ReactionListener( } } } else { - logger.error { "${this::class.simpleName} is already init for this bus" } + "${this::class.simpleName} is already init for this bus".let { + logger.error { it } + error(it) + } } } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt index fb4cb68..9e002e2 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt @@ -13,7 +13,7 @@ class BusInMemory( override suspend fun publish(item: E) { withLoggingContext("busItem" to item.toString()) { - logger.info { "Item sent to the bus: $item" } + logger.info { "Item sent to the bus" } subscribers .forEach { coroutineScope { diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt index a5c4f96..f75f9c2 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt @@ -6,6 +6,7 @@ import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.DefaultConsumer import com.rabbitmq.client.Envelope +import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.utils.io.core.toByteArray import kotlinx.coroutines.runBlocking @@ -15,6 +16,8 @@ class BusInRabbitMQ( private val objectToString: (E) -> String, private val stringToObject: (String) -> E, ) : Bus { + private val logger = KotlinLogging.logger { } + private val connection: Connection = connectionFactory.newConnection() get() { return if (field.isOpen) { @@ -50,6 +53,7 @@ class BusInRabbitMQ( objectToString(item).toByteArray(), ) } + logger.info { "Item sent to the bus" } } override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt index b3f9fcf..042d090 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepository.kt @@ -7,7 +7,7 @@ interface ProjectionSnapshotRepository, P : Projection, ID : A /** * Create a snapshot for the event */ - fun applyAndPutToCache(event: E): P + suspend fun applyAndPutToCache(event: E): P fun count(aggregateId: ID): Int diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt index dcbbf87..8531ca9 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInMemory.kt @@ -32,7 +32,7 @@ class ProjectionSnapshotRepositoryInMemory, P : Projection, ID * 5. save it * 6. remove old one */ - override fun applyAndPutToCache(event: E): P = + override suspend fun applyAndPutToCache(event: E): P = getUntil(event) .also { withLoggingContext("projection" to it.toString()) { diff --git a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt index afcb086..572f31f 100644 --- a/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/libs/event/projection/ProjectionSnapshotRepositoryInRedis.kt @@ -33,7 +33,7 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID * 5. save it * 6. remove old one */ - override fun applyAndPutToCache(event: E): P = + override suspend fun applyAndPutToCache(event: E): P = getUntil(event) .also { withLoggingContext(mapOf("projection" to it.toString(), "event" to event.toString())) { @@ -131,16 +131,13 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID } private fun save(projection: P) { - repeat(5) { - val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection)) - if (added < 1) { - logger.error { "Projection NOT saved" } - } else { - logger.info { "Projection saved" } - return - } + val added = jedis.zadd(projection.redisKey, projection.lastEventVersion.toDouble(), projectionToJson(projection)) + if (added < 1) { + logger.error { "Projection NOT saved (already exists)" } + } else { + logger.info { "Projection saved" } + jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) } - jedis.expire(projection.redisKey, snapshotCacheConfig.maxSnapshotCacheTtl.inWholeSeconds) } /** @@ -195,7 +192,7 @@ class ProjectionSnapshotRepositoryInRedis, P : Projection, ID it.last.toDouble(), ).also { removedCount -> if (removedCount > 0) { - logger.info { + logger.debug { "$removedCount snapshot removed Modulo(${snapshotCacheConfig.modulo}) (${it.first} to ${it.last}) [lastVersion=$lastVersion]" } } diff --git a/src/test/kotlin/eventDemo/Helpers.kt b/src/test/kotlin/eventDemo/Helpers.kt index d285984..414988f 100644 --- a/src/test/kotlin/eventDemo/Helpers.kt +++ b/src/test/kotlin/eventDemo/Helpers.kt @@ -48,7 +48,6 @@ fun testApplicationWithConfig( application { val koin = getKoin() koin.cleanDataTest() - koin.configureGameListener() configBuilder(koin) } block() diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt index f3322c2..be7b648 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameListRouteTest.kt @@ -8,9 +8,7 @@ import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.projection.gameList.GameList import eventDemo.testApplicationWithConfig -import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.eventually -import io.kotest.assertions.nondeterministic.eventuallyConfig import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.collections.shouldHaveSize @@ -24,7 +22,6 @@ import io.ktor.http.HttpStatusCode import kotlinx.coroutines.runBlocking import kotlin.test.assertEquals import kotlin.test.assertTrue -import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds class GameListRouteTest : @@ -60,27 +57,20 @@ class GameListRouteTest : }, ) { // Wait until the projection is created - eventually( - eventuallyConfig { - duration = 3.seconds - interval = 300.milliseconds - }, - ) { - continually(1.seconds) { - httpClient() - .get("/games") { - withAuth(player1) - accept(ContentType.Application.Json) - }.apply { - assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) - call.body>().first().let { - it.status shouldBeEqual GameList.Status.OPENING - it.players shouldHaveSize 1 - it.players shouldContain player1 - it.winners shouldHaveSize 0 - } + eventually(1.seconds) { + httpClient() + .get("/games") { + withAuth(player1) + accept(ContentType.Application.Json) + }.apply { + assertEquals(HttpStatusCode.OK, status, message = bodyAsText()) + call.body>().first().let { + it.status shouldBeEqual GameList.Status.OPENING + it.players shouldHaveSize 1 + it.players shouldContain player1 + it.winners shouldHaveSize 0 } - } + } } } } diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt index 811a335..083a9ed 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameStateRouteTest.kt @@ -5,7 +5,6 @@ import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.GameEventHandler import eventDemo.business.event.event.CardIsPlayedEvent -import eventDemo.business.event.event.GameStartedEvent import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.event.PlayerReadyEvent import eventDemo.business.event.event.disableShuffleDeck @@ -22,7 +21,6 @@ import io.ktor.client.request.get import io.ktor.client.statement.bodyAsText import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode -import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlin.test.assertEquals import kotlin.test.assertIs @@ -66,23 +64,13 @@ class GameStateRouteTest : eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } - eventHandler.handle(gameId) { - GameStartedEvent.new( - gameId, - setOf(player1, player2), - it, - shuffleIsDisabled = true, - ) - } - delay(100) - lastPlayedCard = stateRepo.getLast(gameId).playableCards(player1).first() + lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } assertNotNull(lastPlayedCard) .let { assertIs(lastPlayedCard) } .let { it.number shouldBeEqual 0 it.color shouldBeEqual Card.Color.Red } - delay(100) eventHandler.handle(gameId) { CardIsPlayedEvent( gameId, @@ -91,7 +79,6 @@ class GameStateRouteTest : it, ) } - delay(100) } }) { eventually(1.seconds) { @@ -131,23 +118,13 @@ class GameStateRouteTest : eventHandler.handle(gameId) { NewPlayerEvent(gameId, player2, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player1, it) } eventHandler.handle(gameId) { PlayerReadyEvent(gameId, player2, it) } - eventHandler.handle(gameId) { - GameStartedEvent.new( - gameId, - setOf(player1, player2), - it, - shuffleIsDisabled = true, - ) - } - delay(100) - lastPlayedCard = stateRepo.getLast(gameId).playableCards(player1).first() + lastPlayedCard = eventually { stateRepo.getLast(gameId).playableCards(player1).first() } assertNotNull(lastPlayedCard) .let { assertIs(lastPlayedCard) } .let { it.number shouldBeEqual 0 it.color shouldBeEqual Card.Color.Red } - delay(100) eventHandler.handle(gameId) { CardIsPlayedEvent( gameId, @@ -156,7 +133,6 @@ class GameStateRouteTest : it, ) } - delay(100) } }) { eventually(1.seconds) { diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt index bbb84f6..15eae7c 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt @@ -158,7 +158,7 @@ class ProjectionSnapshotRepositoryTest : val versionBuilder = VersionBuilderLocal() val aggregateId = IdTest() - fun buildEndSendEventX() { + suspend fun buildEndSendEventX() { EventXTest(num = 1, version = versionBuilder.buildNextVersion(aggregateId), aggregateId = aggregateId) .also { eventStore.publish(it) } .also { repo.applyAndPutToCache(it) }