diff --git a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt index f90cdd2..cb481ec 100644 --- a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt +++ b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt @@ -48,6 +48,12 @@ private fun DefaultWebSocketServerSession.runWebSocket( val currentPlayer = call.getPlayer() val outgoingFrameChannel: SendChannel = fromFrameChannel(outgoing) withLoggingContext("currentPlayer" to currentPlayer.toString()) { + val notificationListener = + playerNotificationListener.startListening( + currentPlayer, + gameId, + ) { outgoingFrameChannel.trySendBlocking(it) } + // TODO change GlobalScope GlobalScope.launch { commandHandler.handle( @@ -56,12 +62,8 @@ private fun DefaultWebSocketServerSession.runWebSocket( toObjectChannel(incoming), outgoingFrameChannel, ) + notificationListener.close() } - - playerNotificationListener.startListening( - currentPlayer, - gameId, - ) { outgoingFrameChannel.trySendBlocking(it) } } } diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt index 0beb6e2..86de600 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt @@ -35,8 +35,8 @@ class PlayerNotificationListener( currentPlayer: Player, gameId: GameId, outgoingNotification: (Notification) -> Unit, - ) { - projectionBus.subscribe { currentState -> + ): AutoCloseable { + return projectionBus.subscribe { currentState -> if (currentState !is GameState) return@subscribe if (currentState.aggregateId != gameId) return@subscribe withLoggingContext("projection" to currentState.toString()) { diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt index 1b9de11..d23f0b9 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIInfrastructure.kt @@ -16,10 +16,13 @@ import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.libs.event.projection.SnapshotConfig import org.koin.core.module.Module import org.koin.core.module.dsl.singleOf +import org.koin.core.scope.Scope +import org.koin.core.scope.ScopeCallback import org.koin.dsl.bind import javax.sql.DataSource fun Module.configureDIInfrastructure(config: Configuration) { + // Postgresql config single { HikariConfig() .apply { @@ -30,18 +33,27 @@ fun Module.configureDIInfrastructure(config: Configuration) { minimumIdle = 10 }.let { HikariDataSource(it) + }.also { datasource -> + registerCallback( + object : ScopeCallback { + override fun onScopeClose(scope: Scope) { + datasource.close() + } + }, + ) } } bind DataSource::class - single { + // RabbitMQ config + factory { ConnectionFactory().apply { host = config.rabbitmq.url port = config.rabbitmq.port - virtualHost = virtualHost username = config.rabbitmq.username password = config.rabbitmq.password } } + singleOf(::GameEventBusInRabbinMQ) bind GameEventBus::class singleOf(::GameEventStoreInPostgresql) bind GameEventStore::class singleOf(::GameProjectionBusInMemory) bind GameProjectionBus::class diff --git a/src/main/kotlin/eventDemo/libs/bus/Bus.kt b/src/main/kotlin/eventDemo/libs/bus/Bus.kt index 831e82a..3f661ed 100644 --- a/src/main/kotlin/eventDemo/libs/bus/Bus.kt +++ b/src/main/kotlin/eventDemo/libs/bus/Bus.kt @@ -6,5 +6,9 @@ interface Bus { /** * @param priority The higher the priority, the more it will be called first */ - fun subscribe(block: suspend (T) -> Unit) + fun subscribe(block: suspend (T) -> Unit): Subscription + + interface Subscription : AutoCloseable { + override fun close() + } } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt index 7dd1620..fb4cb68 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInMemory.kt @@ -23,7 +23,12 @@ class BusInMemory( } } - override fun subscribe(block: suspend (E) -> Unit) { + override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { subscribers.add(block) + return object : Bus.Subscription { + override fun close() { + subscribers.remove(block) + } + } } } diff --git a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt index f9ffb2a..a5c4f96 100644 --- a/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt +++ b/src/main/kotlin/eventDemo/libs/bus/BusInRabbitMQ.kt @@ -1,60 +1,90 @@ package eventDemo.libs.bus -import com.rabbitmq.client.CancelCallback +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.BuiltinExchangeType +import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory -import com.rabbitmq.client.DeliverCallback -import com.rabbitmq.client.Delivery +import com.rabbitmq.client.DefaultConsumer +import com.rabbitmq.client.Envelope import io.ktor.utils.io.core.toByteArray import kotlinx.coroutines.runBlocking class BusInRabbitMQ( private val connectionFactory: ConnectionFactory, - private val queueName: String, + private val exchangeName: String, private val objectToString: (E) -> String, private val stringToObject: (String) -> E, ) : Bus { + private val connection: Connection = connectionFactory.newConnection() + get() { + return if (field.isOpen) { + field + } else { + connectionFactory.newConnection() + } + } + private val routingKey = "" + init { - connectionFactory - .newConnection() + connection .createChannel() .use { - it.queueDeclare( - queueName, + it.exchangeDeclare( + exchangeName, + BuiltinExchangeType.FANOUT, true, false, - false, emptyMap(), ) } } override suspend fun publish(item: E) { - connectionFactory - .newConnection() + connection .createChannel() .use { it.basicPublish( - "", - queueName, - null, + exchangeName, + routingKey, + AMQP.BasicProperties(), objectToString(item).toByteArray(), ) } } - override fun subscribe(block: suspend (E) -> Unit) { - connectionFactory - .newConnection() + override fun subscribe(block: suspend (E) -> Unit): Bus.Subscription { + connection .createChannel() - .basicConsume( - queueName, - true, - DeliverCallback { _: String, message: Delivery -> - runBlocking { - block(stringToObject(message.body.toString(Charsets.UTF_8))) + .also { channel -> + val queue = + channel + .queueDeclare() + .queue + .also { channel.queueBind(it, exchangeName, routingKey) } + + channel + .basicConsume( + queue, + object : DefaultConsumer(channel) { + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray, + ) { + runBlocking { + block(stringToObject(body.toString(Charsets.UTF_8))) + } + channel.basicAck(envelope.deliveryTag, false) + } + }, + ) + }.let { + return object : Bus.Subscription { + override fun close() { + it.close() } - }, - CancelCallback {}, - ) + } + } } } diff --git a/src/test/kotlin/eventDemo/Helpers.kt b/src/test/kotlin/eventDemo/Helpers.kt index 15a725d..bf12d6c 100644 --- a/src/test/kotlin/eventDemo/Helpers.kt +++ b/src/test/kotlin/eventDemo/Helpers.kt @@ -1,5 +1,6 @@ package eventDemo +import com.zaxxer.hikari.HikariDataSource import eventDemo.business.entity.Card import eventDemo.business.entity.Deck import eventDemo.configuration.business.configureGameListener @@ -26,10 +27,12 @@ fun Deck.allCards(): Set = suspend fun testKoinApplicationWithConfig(block: suspend Koin.() -> T): T = koinApplication { modules(appKoinModule(ApplicationConfig("application.conf").configuration())) } .koin - .apply { + .run { cleanDataTest() configureGameListener() - }.block() + block() + .apply { get().close() } + } @KtorDsl fun testApplicationWithConfig( @@ -53,14 +56,14 @@ fun testApplicationWithConfig( } fun DataSource.cleanEventSource() { - this.connection - .prepareStatement( - """ - truncate event_stream; - """.trimIndent(), - ).use { - it.execute() - } + this.connection.use { + it + .prepareStatement( + """ + truncate event_stream; + """.trimIndent(), + ).execute() + } } fun UnifiedJedis.cleanProjections() { diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index f2d9743..c279134 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -24,6 +24,7 @@ import eventDemo.business.notification.WelcomeToTheGameNotification import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.testKoinApplicationWithConfig import io.kotest.assertions.nondeterministic.until +import io.kotest.core.NamedTag import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual @@ -43,6 +44,8 @@ import kotlin.time.Duration.Companion.seconds @DelicateCoroutinesApi class GameSimulationTest : FunSpec({ + tags(NamedTag("postgresql")) + test("Simulation of a game") { withTimeout(2.seconds) { disableShuffleDeck() @@ -59,120 +62,120 @@ class GameSimulationTest : var player1HasJoin = false - val player1Job = - launch { - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player1)).also { sendCommand -> - channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - - player1HasJoin = true - - channelNotification1.receive().let { - assertIs(it).players shouldBeEqual setOf(player1) - } - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } - IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player1)).also { sendCommand -> - channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - val player1Hand = - channelNotification1.receive().let { - assertIs(it).hand shouldHaveSize 7 - } - playedCard1 = player1Hand.first() - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - } - } - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } - - IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player1, player1Hand.first())).also { sendCommand -> - channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - } - } - - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - card shouldBeEqual assertNotNull(playedCard2) - } - } - } - - val player2Job = - launch { - until(1.seconds) { player1HasJoin } - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand -> - channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - - channelNotification2.receive().let { - assertIs(it).players shouldBeEqual setOf(player1, player2) - } - channelNotification2.receive().let { - assertIs(it).player shouldBeEqual player1 - } - - IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player2)).also { sendCommand -> - channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - - val player2Hand = - channelNotification2.receive().let { - assertIs(it).hand shouldHaveSize 7 - } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - } - } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - card shouldBeEqual assertNotNull(playedCard1) - } - } - playedCard2 = player2Hand.first() - - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - } - } - - IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player2, player2Hand.first())).also { sendCommand -> - channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } - } - } - testKoinApplicationWithConfig { + val player1Job = + launch { + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player1)).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + + player1HasJoin = true + + channelNotification1.receive().let { + assertIs(it).players shouldBeEqual setOf(player1) + } + channelNotification1.receive().let { + assertIs(it).player shouldBeEqual player2 + } + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player1)).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + val player1Hand = + channelNotification1.receive().let { + assertIs(it).hand shouldHaveSize 7 + } + playedCard1 = player1Hand.first() + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelNotification1.receive().let { + assertIs(it).player shouldBeEqual player2 + } + + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player1, player1Hand.first())).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + + channelNotification1.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + card shouldBeEqual assertNotNull(playedCard2) + } + } + } + + val player2Job = + launch { + until(1.seconds) { player1HasJoin } + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + + channelNotification2.receive().let { + assertIs(it).players shouldBeEqual setOf(player1, player2) + } + channelNotification2.receive().let { + assertIs(it).player shouldBeEqual player1 + } + + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player2)).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + + val player2Hand = + channelNotification2.receive().let { + assertIs(it).hand shouldHaveSize 7 + } + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player1 + card shouldBeEqual assertNotNull(playedCard1) + } + } + playedCard2 = player2Hand.first() + + channelNotification2.receive().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player2, player2Hand.first())).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + } + val commandHandler by inject() val eventStore by inject() val playerNotificationListener by inject() diff --git a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt index c27141e..ab75650 100644 --- a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt @@ -9,6 +9,7 @@ import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.Notification import eventDemo.business.notification.WelcomeToTheGameNotification import eventDemo.testKoinApplicationWithConfig +import io.kotest.core.NamedTag import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain import io.kotest.matchers.equals.shouldBeEqual @@ -24,6 +25,8 @@ import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) class GameCommandHandlerTest : FunSpec({ + tags(NamedTag("postgresql")) + test("handle a command should execute the command") { withTimeout(5.seconds) { testKoinApplicationWithConfig { diff --git a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt index beaadbe..cd72c4e 100644 --- a/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt +++ b/src/test/kotlin/eventDemo/business/event/projection/GameStateRepositoryTest.kt @@ -9,6 +9,7 @@ import eventDemo.business.event.projection.gameState.GameStateRepository import eventDemo.testKoinApplicationWithConfig import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.eventuallyConfig +import io.kotest.core.NamedTag import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual @@ -23,6 +24,8 @@ import kotlin.time.Duration.Companion.seconds @OptIn(DelicateCoroutinesApi::class) class GameStateRepositoryTest : FunSpec({ + tags(NamedTag("postgresql")) + val player1 = Player("Tesla") val player2 = Player(name = "Einstein") diff --git a/src/test/kotlin/eventDemo/adapter/infrastructureLayer/PostgresqlTest.kt b/src/test/kotlin/eventDemo/externalServices/PostgresqlTest.kt similarity index 93% rename from src/test/kotlin/eventDemo/adapter/infrastructureLayer/PostgresqlTest.kt rename to src/test/kotlin/eventDemo/externalServices/PostgresqlTest.kt index 531df77..aac27a5 100644 --- a/src/test/kotlin/eventDemo/adapter/infrastructureLayer/PostgresqlTest.kt +++ b/src/test/kotlin/eventDemo/externalServices/PostgresqlTest.kt @@ -1,4 +1,4 @@ -package eventDemo.adapter.infrastructureLayer +package eventDemo.externalServices import eventDemo.testKoinApplicationWithConfig import io.kotest.core.NamedTag diff --git a/src/test/kotlin/eventDemo/adapter/infrastructureLayer/RedisTest.kt b/src/test/kotlin/eventDemo/externalServices/RedisTest.kt similarity index 90% rename from src/test/kotlin/eventDemo/adapter/infrastructureLayer/RedisTest.kt rename to src/test/kotlin/eventDemo/externalServices/RedisTest.kt index 53bace6..7d69304 100644 --- a/src/test/kotlin/eventDemo/adapter/infrastructureLayer/RedisTest.kt +++ b/src/test/kotlin/eventDemo/externalServices/RedisTest.kt @@ -1,4 +1,4 @@ -package eventDemo.adapter.infrastructureLayer +package eventDemo.externalServices import io.kotest.core.NamedTag import io.kotest.core.spec.style.FunSpec diff --git a/src/test/kotlin/eventDemo/libs/bus/BusTest.kt b/src/test/kotlin/eventDemo/libs/bus/BusTest.kt index fd4c597..608fddc 100644 --- a/src/test/kotlin/eventDemo/libs/bus/BusTest.kt +++ b/src/test/kotlin/eventDemo/libs/bus/BusTest.kt @@ -1,10 +1,13 @@ package eventDemo.libs.bus import com.rabbitmq.client.ConnectionFactory -import io.kotest.assertions.nondeterministic.until +import io.kotest.assertions.nondeterministic.eventually import io.kotest.core.spec.style.FunSpec import io.kotest.datatest.withData -import io.kotest.matchers.equals.shouldBeEqual +import io.kotest.matchers.string.shouldStartWith +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify import kotlin.random.Random import kotlin.time.Duration.Companion.seconds @@ -19,7 +22,6 @@ class BusTest : ConnectionFactory().apply { host = "localhost" port = 5672 - virtualHost = virtualHost username = "event-demo" password = "changeit" } @@ -29,24 +31,24 @@ class BusTest : BusInRabbitMQ::class.java.simpleName to BusInRabbitMQ( factory, - "testQueue", + "testExchange", { it.value }, { ObjTest(it) }, ), ) withData(list) { bus -> - val value = "hello${Random.nextInt()}" - var isCalled = false + val spy = spyk(mockk<() -> Unit>()) bus.subscribe { obj -> - isCalled = true - obj.value shouldBeEqual value + spy() + obj.value shouldStartWith "testMessage" } - bus.publish(ObjTest(value)) + bus.publish(ObjTest("testMessage${Random.nextInt()}")) + bus.publish(ObjTest("testMessage${Random.nextInt()}")) - until(3.seconds) { - isCalled shouldBeEqual true + eventually(1.seconds) { + verify(exactly = 2) { spy() } } } } diff --git a/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt b/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt index 2cbccfb..c38009b 100644 --- a/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt +++ b/src/test/kotlin/eventDemo/libs/event/EventStreamTest.kt @@ -1,6 +1,7 @@ package eventDemo.libs.event import eventDemo.testKoinApplicationWithConfig +import io.kotest.core.NamedTag import io.kotest.core.spec.style.FunSpec import io.kotest.datatest.withData import io.kotest.matchers.collections.shouldHaveSize @@ -12,11 +13,14 @@ import kotlinx.coroutines.launch import kotlinx.serialization.json.Json import org.junit.jupiter.api.assertNull import org.junit.jupiter.api.assertThrows +import org.koin.core.Koin import kotlin.test.assertNotNull @DelicateCoroutinesApi class EventStreamTest : FunSpec({ + tags(NamedTag("postgresql")) + fun EventStream.with3Events(block: EventStream.(id: IdTest) -> Unit) = also { publish(EventXTest(aggregateId = aggregateId, version = 1, num = 1)) @@ -25,37 +29,39 @@ class EventStreamTest : block(aggregateId) } - suspend fun eventStreams(): List> = - testKoinApplicationWithConfig { - listOf( - EventStreamInMemory(IdTest()), - EventStreamInPostgresql( - IdTest(), - dataSource = get(), - objectToString = { Json.encodeToString(it) }, - stringToObject = { Json.decodeFromString(it) }, - ), - ) - } + fun Koin.eventStreams(): List> = + listOf( + EventStreamInMemory(IdTest()), + EventStreamInPostgresql( + IdTest(), + dataSource = get(), + objectToString = { Json.encodeToString(it) }, + stringToObject = { Json.decodeFromString(it) }, + ), + ) context("readVersionBetween should only return the event of aggregate") { - withData(eventStreams()) { stream -> - stream.with3Events { - readVersionBetween(1..2) shouldHaveSize 2 - readVersionBetween(1..1) shouldHaveSize 1 - readVersionBetween(2..20) shouldHaveSize 2 - readVersionBetween(4..20) shouldHaveSize 0 + testKoinApplicationWithConfig { + withData(eventStreams()) { stream -> + stream.with3Events { + readVersionBetween(1..2) shouldHaveSize 2 + readVersionBetween(1..1) shouldHaveSize 1 + readVersionBetween(2..20) shouldHaveSize 2 + readVersionBetween(4..20) shouldHaveSize 0 + } } } } context("readAll should only return the event of aggregate") { - withData(eventStreams()) { stream -> - stream.with3Events { - readAll() shouldHaveSize 3 - readAll().also { - it.forEachIndexed { i, event -> - event.version shouldBeEqual i + 1 + testKoinApplicationWithConfig { + withData(eventStreams()) { stream -> + stream.with3Events { + readAll() shouldHaveSize 3 + readAll().also { + it.forEachIndexed { i, event -> + event.version shouldBeEqual i + 1 + } } } } @@ -63,50 +69,58 @@ class EventStreamTest : } context("getByVersion should only return the event with this version") { - withData(eventStreams()) { stream -> - stream.with3Events { - assertNotNull(getByVersion(1)).version shouldBeEqual 1 - assertNotNull(getByVersion(2)).version shouldBeEqual 2 - assertNotNull(getByVersion(3)).version shouldBeEqual 3 - assertNull(getByVersion(4)) + testKoinApplicationWithConfig { + withData(eventStreams()) { stream -> + stream.with3Events { + assertNotNull(getByVersion(1)).version shouldBeEqual 1 + assertNotNull(getByVersion(2)).version shouldBeEqual 2 + assertNotNull(getByVersion(3)).version shouldBeEqual 3 + assertNull(getByVersion(4)) + } } } } context("readGreaterOfVersion should only return the events with greater version") { - withData(eventStreams()) { - it.with3Events { - assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2 - assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1 - assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0 - assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0 + testKoinApplicationWithConfig { + withData(eventStreams()) { + it.with3Events { + assertNotNull(readGreaterOfVersion(1)) shouldHaveSize 2 + assertNotNull(readGreaterOfVersion(2)) shouldHaveSize 1 + assertNotNull(readGreaterOfVersion(3)) shouldHaveSize 0 + assertNotNull(readGreaterOfVersion(30)) shouldHaveSize 0 + } } } } context("publish should be throw error when publish another aggregate event") { - withData(eventStreams()) { - assertThrows { it.publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) } + testKoinApplicationWithConfig { + withData(eventStreams()) { + assertThrows { it.publish(EventXTest(aggregateId = IdTest(), version = 1, num = 1)) } + } } } context("publish should be concurrently secure") { - withData(eventStreams()) { stream -> - (0..9) - .map { i1 -> - GlobalScope.launch { - (1..10).forEach { i2 -> - stream.publish( - EventXTest( - aggregateId = stream.aggregateId, - version = (i1 * 10) + i2, - num = (i1 * 10) + i2, - ), - ) + testKoinApplicationWithConfig { + withData(eventStreams()) { stream -> + (0..9) + .map { i1 -> + GlobalScope.launch { + (1..10).forEach { i2 -> + stream.publish( + EventXTest( + aggregateId = stream.aggregateId, + version = (i1 * 10) + i2, + num = (i1 * 10) + i2, + ), + ) + } } - } - }.joinAll() - stream.readAll() shouldHaveSize 100 + }.joinAll() + stream.readAll() shouldHaveSize 100 + } } } })