From 2fb4c778fd0969688a96dc8d2d82342b3056196f Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Sun, 30 Mar 2025 03:08:36 +0200 Subject: [PATCH] move subscribeToBus in configureGameListener function, and use it on test --- .../projection/GameStateRepositoryInRedis.kt | 7 +-- .../projectionListener/ReactionListener.kt | 7 +-- .../business/ConfigureGameListener.kt | 17 ++++--- .../injection/ConfigureDIInfrastructure.kt | 2 +- src/test/kotlin/eventDemo/Helpers.kt | 22 +++++++++ .../query/GameSimulationTest.kt | 4 +- .../command/GameCommandHandlerTest.kt | 4 +- .../projection/GameStateRepositoryTest.kt | 30 ++++++------- .../ProjectionSnapshotRepositoryTest.kt | 45 ++++++++++++++++--- .../eventDemo/libs/event/EventStreamTest.kt | 3 ++ 10 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt index ad429a1..ea2e12d 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/projection/GameStateRepositoryInRedis.kt @@ -19,8 +19,6 @@ import redis.clients.jedis.UnifiedJedis */ class GameStateRepositoryInRedis( eventStore: GameEventStore, - projectionBus: GameProjectionBus, - eventBus: GameEventBus, jedis: UnifiedJedis, snapshotConfig: SnapshotConfig = SnapshotConfig(), ) : GameStateRepository { @@ -36,7 +34,10 @@ class GameStateRepositoryInRedis( jedis = jedis, ) - init { + fun subscribeToBus( + projectionBus: GameProjectionBus, + eventBus: GameEventBus, + ) { // On new event was received, build snapshot and publish it to the projection bus eventBus.subscribe { event -> withLoggingContext("event" to event.toString()) { diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt index a1913e4..eccdfcb 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/ReactionListener.kt @@ -12,9 +12,7 @@ import io.github.oshai.kotlinlogging.withLoggingContext import java.util.concurrent.ConcurrentSkipListSet class ReactionListener( - private val projectionBus: GameProjectionBus, private val eventHandler: GameEventHandler, - private val priority: Int = DEFAULT_PRIORITY, ) { companion object Config { const val DEFAULT_PRIORITY = -1000 @@ -23,7 +21,10 @@ class ReactionListener( private val logger = KotlinLogging.logger { } - fun init() { + fun subscribeToBus( + projectionBus: GameProjectionBus, + priority: Int = DEFAULT_PRIORITY, + ) { if (registeredListeners.add(projectionBus)) { projectionBus.subscribe(priority) { projection: Projection -> if (projection !is GameState) return@subscribe diff --git a/src/main/kotlin/eventDemo/configuration/business/ConfigureGameListener.kt b/src/main/kotlin/eventDemo/configuration/business/ConfigureGameListener.kt index b4bdff0..f418544 100644 --- a/src/main/kotlin/eventDemo/configuration/business/ConfigureGameListener.kt +++ b/src/main/kotlin/eventDemo/configuration/business/ConfigureGameListener.kt @@ -1,10 +1,17 @@ package eventDemo.configuration.business +import eventDemo.adapter.infrastructureLayer.event.projection.GameListRepositoryInRedis +import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInRedis import eventDemo.business.event.projection.projectionListener.ReactionListener -import io.ktor.server.application.Application -import org.koin.ktor.ext.get +import org.koin.core.Koin -fun Application.configureGameListener() { - ReactionListener(get(), get()) - .init() +fun Koin.configureGameListener() { + ReactionListener(get()) + .subscribeToBus(get()) + + get() + .subscribeToBus(get(), get()) + + get() + .subscribeToBus(get(), get()) } diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index 274087b..ec50da2 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -43,7 +43,7 @@ fun Module.configureDIInfrastructure(config: Configuration) { singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class single { - GameStateRepositoryInRedis(get(), get(), get(), get(), snapshotConfig = SnapshotConfig()) + GameStateRepositoryInRedis(get(), get(), snapshotConfig = SnapshotConfig()) } bind GameStateRepository::class single { diff --git a/src/test/kotlin/eventDemo/Helpers.kt b/src/test/kotlin/eventDemo/Helpers.kt index 880360a..83701ad 100644 --- a/src/test/kotlin/eventDemo/Helpers.kt +++ b/src/test/kotlin/eventDemo/Helpers.kt @@ -2,6 +2,7 @@ package eventDemo import eventDemo.business.entity.Card import eventDemo.business.entity.Deck +import eventDemo.configuration.business.configureGameListener import eventDemo.configuration.injection.appKoinModule import eventDemo.configuration.ktor.configuration import io.ktor.server.config.ApplicationConfig @@ -11,6 +12,8 @@ import io.ktor.utils.io.KtorDsl import org.koin.core.Koin import org.koin.core.module.KoinApplicationDslMarker import org.koin.dsl.koinApplication +import redis.clients.jedis.UnifiedJedis +import javax.sql.DataSource fun Deck.allCardCount(): Int = stack.size + discard.size + playersHands.values.flatten().size @@ -31,6 +34,25 @@ suspend fun testApplicationWithConfig(block: suspend ApplicationTestBuilder.(koi } val koin = koinApplication { modules(appKoinModule(conf.configuration())) }.koin + koin.cleanDataTest() + koin.configureGameListener() block(koin) } } + +fun DataSource.cleanEventSource() { + this.connection.prepareStatement( + """ + truncate event_stream; + """.trimIndent(), + ) +} + +fun UnifiedJedis.cleanProjections() { + flushAll() +} + +fun Koin.cleanDataTest() { + get().cleanEventSource() + get().cleanProjections() +} diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index 2f5db8f..ef60c77 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -13,7 +13,6 @@ import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.projection.gameState.GameState import eventDemo.business.event.projection.gameState.apply import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener -import eventDemo.business.event.projection.projectionListener.ReactionListener import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.ItsTheTurnOfNotification import eventDemo.business.notification.Notification @@ -22,6 +21,7 @@ import eventDemo.business.notification.PlayerAsPlayACardNotification import eventDemo.business.notification.PlayerWasReadyNotification import eventDemo.business.notification.TheGameWasStartedNotification import eventDemo.business.notification.WelcomeToTheGameNotification +import eventDemo.configuration.business.configureGameListener import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.testKoinApplicationWithConfig import io.kotest.assertions.nondeterministic.until @@ -177,7 +177,7 @@ class GameSimulationTest : val commandHandler by inject() val eventStore by inject() val playerNotificationListener by inject() - ReactionListener(get(), get()).init() + configureGameListener() playerNotificationListener.startListening(player1, gameId) { channelNotification1.trySendBlocking(it) } playerNotificationListener.startListening(player2, gameId) { channelNotification2.trySendBlocking(it) } diff --git a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt index 9acd132..b67a362 100644 --- a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt @@ -5,10 +5,10 @@ import eventDemo.business.command.command.IWantToJoinTheGameCommand import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener -import eventDemo.business.event.projection.projectionListener.ReactionListener import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.Notification import eventDemo.business.notification.WelcomeToTheGameNotification +import eventDemo.configuration.business.configureGameListener import eventDemo.testKoinApplicationWithConfig import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain @@ -28,13 +28,13 @@ class GameCommandHandlerTest : test("handle a command should execute the command") { withTimeout(1.seconds) { testKoinApplicationWithConfig { + configureGameListener() val commandHandler by inject() val notificationListener by inject() val gameId = GameId() val player = Player("Tesla") val channelCommand = Channel(Channel.BUFFERED) val channelNotification = Channel(Channel.BUFFERED) - ReactionListener(get(), get()).init() notificationListener.startListening( player, gameId, diff --git a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt index 41be9a2..90ca6de 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt @@ -6,7 +6,7 @@ import eventDemo.business.event.GameEventHandler import eventDemo.business.event.event.NewPlayerEvent import eventDemo.business.event.projection.gameState.GameState import eventDemo.business.event.projection.gameState.GameStateRepository -import eventDemo.testKoinApplicationWithConfig +import eventDemo.testApplicationWithConfig import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventuallyConfig import io.kotest.core.spec.style.FunSpec @@ -18,7 +18,6 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch -import org.koin.core.context.stopKoin import kotlin.test.assertNotNull import kotlin.time.Duration.Companion.seconds @@ -30,9 +29,9 @@ class GameStateRepositoryTest : test("GameStateRepository should build the projection when a new event occurs") { val aggregateId = GameId() - testKoinApplicationWithConfig { - val repo = get() - val eventHandler = get() + testApplicationWithConfig { koin -> + val repo = koin.get() + val eventHandler = koin.get() eventHandler .handle(aggregateId) { NewPlayerEvent(aggregateId = aggregateId, player = player1, version = it) } .also { event -> @@ -47,15 +46,14 @@ class GameStateRepositoryTest : } } } - stopKoin() } test("get should build the last version of the state") { val aggregateId = GameId() - testKoinApplicationWithConfig { - val repo = get() - val eventHandler = get() - val projectionBus = get() + testApplicationWithConfig { koin -> + val repo = koin.get() + val eventHandler = koin.get() + val projectionBus = koin.get() var state: GameState? = null projectionBus.subscribe { @@ -88,9 +86,9 @@ class GameStateRepositoryTest : test("getUntil should build the state until the event") { repeat(10) { val aggregateId = GameId() - testKoinApplicationWithConfig { - val repo = get() - val eventHandler = get() + testApplicationWithConfig { koin -> + val repo = koin.get() + val eventHandler = koin.get() val event1 = eventHandler @@ -117,9 +115,9 @@ class GameStateRepositoryTest : test("getUntil should be concurrently secure") { val aggregateId = GameId() - testKoinApplicationWithConfig { - val repo = get() - val eventHandler = get() + testApplicationWithConfig { koin -> + val repo = koin.get() + val eventHandler = koin.get() (1..10) .map { r -> diff --git a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt index 2a3ed42..bbb84f6 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/ProjectionSnapshotRepositoryTest.kt @@ -1,5 +1,6 @@ package eventDemo.business.event.projection +import eventDemo.cleanProjections import eventDemo.configuration.serializer.UUIDSerializer import eventDemo.libs.event.AggregateId import eventDemo.libs.event.Event @@ -96,6 +97,37 @@ class ProjectionSnapshotRepositoryTest : } } + context("getList method must be return all inserted events") { + withData(list) { (eventStore, repo) -> + val aggregateId = IdTest() + val otherAggregateId = IdTest() + + val eventOther = Event2Test(value2 = "valOther", version = 1, aggregateId = otherAggregateId) + eventStore.publish(eventOther) + repo.applyAndPutToCache(eventOther) + assertNotNull(repo.getUntil(eventOther)).also { + assertNotNull(it.value) shouldBeEqual "valOther" + } + + val event1 = Event1Test(value1 = "val1", version = 1, aggregateId = aggregateId) + eventStore.publish(event1) + repo.applyAndPutToCache(event1) + + val event2 = Event2Test(value2 = "val2", version = 2, aggregateId = aggregateId) + eventStore.publish(event2) + repo.applyAndPutToCache(event2) + + repo.getList().apply { + any { it.aggregateId == otherAggregateId } shouldBeEqual true + any { it.aggregateId == aggregateId } shouldBeEqual true + any { it.value == "val1val2" } shouldBeEqual true + any { it.value == "valOther" } shouldBeEqual true + any { it.lastEventVersion == 2 } shouldBeEqual true + any { it.lastEventVersion == 1 } shouldBeEqual true + } + } + } + context("ProjectionSnapshotRepository should be thread safe") { continually(1.seconds) { withData(list) { (eventStore, repo) -> @@ -199,10 +231,12 @@ private fun getSnapshotRepoInMemoryTest( private fun getSnapshotRepoInRedisTest( eventStore: EventStore, snapshotConfig: SnapshotConfig, -): ProjectionSnapshotRepository = - ProjectionSnapshotRepositoryInRedis( +): ProjectionSnapshotRepository { + val jedis = JedisPooled("redis://localhost:6379") + jedis.cleanProjections() + return ProjectionSnapshotRepositoryInRedis( eventStore = eventStore, - jedis = JedisPooled("redis://localhost:6379"), + jedis = jedis, initialStateBuilder = { aggregateId: IdTest -> ProjectionTest(aggregateId) }, snapshotCacheConfig = snapshotConfig, projectionClass = ProjectionTest::class, @@ -210,16 +244,17 @@ private fun getSnapshotRepoInRedisTest( jsonToProjection = { Json.decodeFromString(it) }, applyToProjection = apply, ) +} private val apply: ProjectionTest.(TestEvents) -> ProjectionTest = { event -> this.let { projection -> when (event) { is Event1Test -> { - projection.copy(value = (projection.value ?: "") + event.value1) + projection.copy(value = (projection.value.orEmpty()) + event.value1) } is Event2Test -> { - projection.copy(value = (projection.value ?: "") + event.value2) + projection.copy(value = (projection.value.orEmpty()) + event.value2) } is EventXTest -> { diff --git a/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt b/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt index 2cbccfb..844c76f 100644 --- a/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt @@ -1,5 +1,6 @@ package eventDemo.libs.event +import eventDemo.cleanEventSource import eventDemo.testKoinApplicationWithConfig import io.kotest.core.spec.style.FunSpec import io.kotest.datatest.withData @@ -12,6 +13,7 @@ import kotlinx.coroutines.launch import kotlinx.serialization.json.Json import org.junit.jupiter.api.assertNull import org.junit.jupiter.api.assertThrows +import javax.sql.DataSource import kotlin.test.assertNotNull @DelicateCoroutinesApi @@ -27,6 +29,7 @@ class EventStreamTest : suspend fun eventStreams(): List> = testKoinApplicationWithConfig { + get().cleanEventSource() listOf( EventStreamInMemory(IdTest()), EventStreamInPostgresql(