diff --git a/build.gradle.kts b/build.gradle.kts index 27b9ece..6a7fd72 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -63,4 +63,5 @@ dependencies { testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version") testImplementation("io.ktor:ktor-server-test-host-jvm:2.3.11") testImplementation("io.kotest:kotest-runner-junit5:$kotest_version") + testImplementation("io.mockk:mockk:1.13.17") } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt index e7064a8..8f9e9d5 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt @@ -11,11 +11,8 @@ import eventDemo.app.event.GameEventStream import eventDemo.app.event.buildStateFromEventStream import eventDemo.app.event.event.GameEvent import io.ktor.websocket.Frame -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking /** @@ -34,21 +31,21 @@ class GameCommandHandler( /** * Init the handler */ - fun init(player: Player) { - CoroutineScope(Dispatchers.IO).launch { - commandStream.process { command -> - if (command.payload.player.id != player.id) { + suspend fun init(player: Player) { + commandStream.process { command -> + if (command.payload.player.id != player.id) { + runBlocking { nack() } + } - val gameState = command.buildGameState() + val gameState = command.buildGameState() - when (command) { - is IWantToPlayCardCommand -> command.run(gameState, playerNotifier, eventStream) - is IamReadyToPlayCommand -> command.run(gameState, playerNotifier, eventStream) - is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream) - is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream) - } + when (command) { + is IWantToPlayCardCommand -> command.run(gameState, playerNotifier, eventStream) + is IamReadyToPlayCommand -> command.run(gameState, playerNotifier, eventStream) + is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream) + is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream) } } } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt index 13ac544..bbe0f77 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt @@ -10,6 +10,7 @@ import io.ktor.server.auth.jwt.JWTPrincipal import io.ktor.server.auth.principal import io.ktor.server.routing.Route import io.ktor.server.websocket.webSocket +import kotlinx.coroutines.launch fun Route.gameSocket( eventStream: GameEventStream, @@ -17,7 +18,9 @@ fun Route.gameSocket( ) { authenticate { webSocket("/game") { - GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer()) + launch { + GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer()) + } GameEventPlayerNotificationListener(eventBus, outgoing).init() } } diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt index 7cbb23f..28f46f5 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt @@ -30,9 +30,9 @@ interface CommandStream { * A class to implement success/failed action. */ interface ComputeStatus { - fun ack() + suspend fun ack() - fun nack() + suspend fun nack() } /** diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index f89661f..5ea7526 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -3,9 +3,10 @@ package eventDemo.libs.command import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.websocket.Frame import io.ktor.websocket.readText +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.launch import kotlin.reflect.KClass /** @@ -27,12 +28,11 @@ class CommandStreamChannel( type: KClass, command: C, ) { + outgoing.send(Frame.Text(serializer(command))) logger.atInfo { message = "Command published: $command" payload = mapOf("command" to command) } - - outgoing.send(Frame.Text(serializer(command))) } override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) { @@ -48,7 +48,7 @@ class CommandStreamChannel( } } - private fun compute( + private suspend fun compute( command: C, action: CommandStream.ComputeStatus.(C) -> Unit, ) { @@ -56,42 +56,46 @@ class CommandStreamChannel( object : CommandStream.ComputeStatus { var isSet: Boolean = false - override fun ack() { + override suspend fun ack() { if (!isSet) markAsSuccess(command) else error("Already NACK") isSet = true } - override fun nack() { + override suspend fun nack() { if (!isSet) markAsFailed(command) else error("Already ACK") isSet = true } } - if (runCatching { status.action(command) }.isFailure) { + val action = runCatching { status.action(command) } + if (action.isFailure) { + logger.atInfo { + message = "Error" + payload = mapOf("command" to command) + cause = action.exceptionOrNull() + } markAsFailed(command) } else if (!status.isSet) { status.ack() } } - private fun markAsSuccess(command: C) { + private suspend fun markAsSuccess(command: C) { logger.atInfo { message = "Compute command SUCCESS and it removed of the stack : $command" payload = mapOf("command" to command) } - runBlocking { - outgoing.send(Frame.Text("Command executed successfully")) + GlobalScope.launch { +// outgoing.send(Frame.Text("Command executed successfully")) } } - private fun markAsFailed(command: C) { + private suspend fun markAsFailed(command: C) { failedCommand.add(command) logger.atWarn { message = "Compute command FAILED and it put it ot the top of the stack : $command" payload = mapOf("command" to command) } - runBlocking { - outgoing.send(Frame.Text("Command execution failed")) - } + outgoing.send(Frame.Text("Command execution failed")) } } diff --git a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt new file mode 100644 index 0000000..4b9fa81 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt @@ -0,0 +1,48 @@ +package eventDemo.libs.command + +import io.kotest.core.spec.style.FunSpec +import io.ktor.websocket.Frame +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +class CommandTest( + override val id: CommandId, +) : Command + +class CommandStreamChannelTest : + FunSpec({ + + test("send and receive") { + val command = CommandTest(CommandId()) + val command2 = CommandTest(CommandId()) + val command3 = CommandTest(CommandId()) + + val channel = Channel() + val stream = + CommandStreamChannel( + incoming = channel, + outgoing = channel, + serializer = { it.id.toString() }, + deserializer = { CommandTest(CommandId(it)) }, + ) + + val spyCall: () -> Unit = mockk(relaxed = true) + runBlocking { + launch { + stream.process { + println("In action ${it.id}") + spyCall() + } + } + launch { + stream.send(command, command2) + stream.send(command3) + channel.close() + }.join() + verify(exactly = 3) { spyCall() } + } + } + })