diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt index 2434371..dd9fecc 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt @@ -10,9 +10,8 @@ import eventDemo.app.event.GameEventHandler import eventDemo.app.event.event.GameEvent import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.notification.ErrorNotification -import eventDemo.shared.toFrame +import eventDemo.app.notification.Notification import io.github.oshai.kotlinlogging.KotlinLogging -import io.ktor.websocket.Frame import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel @@ -32,8 +31,8 @@ class GameCommandHandler( */ suspend fun handle( player: Player, - incomingCommandChannel: ReceiveChannel, - outgoingErrorChannelNotification: SendChannel, + incomingCommandChannel: ReceiveChannel, + outgoingErrorChannelNotification: SendChannel, ) = GameCommandStream(incomingCommandChannel).process { command -> if (command.payload.player.id != player.id) { nack() @@ -45,7 +44,7 @@ class GameCommandHandler( message = "Notification send ERROR: ${notification.message}" payload = mapOf("notification" to notification) } - outgoingErrorChannelNotification.send(notification.toFrame()) + outgoingErrorChannelNotification.send(notification) } val gameState = gameStateRepository.get(command.payload.gameId) diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt index aaf04e3..b969191 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt @@ -2,6 +2,8 @@ package eventDemo.app.command import eventDemo.app.entity.Player import eventDemo.app.eventListener.GameEventPlayerNotificationListener +import eventDemo.libs.fromFrameChannel +import eventDemo.libs.toObjectChannel import io.ktor.server.application.ApplicationCall import io.ktor.server.auth.authenticate import io.ktor.server.auth.jwt.JWTPrincipal @@ -21,7 +23,11 @@ fun Route.gameSocket( webSocket("/game") { val currentPlayer = call.getPlayer() GlobalScope.launch { - commandHandler.handle(currentPlayer, incoming, outgoing) + commandHandler.handle( + currentPlayer, + toObjectChannel(incoming), + fromFrameChannel(outgoing), + ) } playerNotificationListener.startListening(outgoing, currentPlayer) } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt b/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt index 3ba5929..89ee42a 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt @@ -3,22 +3,11 @@ package eventDemo.app.command import eventDemo.app.command.command.GameCommand import eventDemo.libs.command.CommandStream import eventDemo.libs.command.CommandStreamChannel -import eventDemo.libs.command.CommandStreamInMemory -import io.ktor.websocket.Frame import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.serialization.json.Json - -/** - * A stream to publish and read the game command. - */ -class GameCommandStreamInMemory : CommandStreamInMemory() /** * A stream to publish and read the game command. */ class GameCommandStream( - incoming: ReceiveChannel, -) : CommandStream by CommandStreamChannel( - incoming, - { Json.decodeFromString(GameCommand.serializer(), it) }, - ) + incoming: ReceiveChannel, +) : CommandStream by CommandStreamChannel(incoming) diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureSerialization.kt b/src/main/kotlin/eventDemo/configuration/ConfigureSerialization.kt index 757e5f6..622b7c3 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureSerialization.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureSerialization.kt @@ -20,17 +20,22 @@ import java.util.UUID fun Application.configureSerialization() { install(ContentNegotiation) { json( - Json { - serializersModule = - SerializersModule { - contextual(UUID::class) { UUIDSerializer } - contextual(GameId::class) { GameIdSerializer } - } - }, + defaultJsonSerializer(), ) } } +fun defaultJsonSerializer(): Json = + Json { + serializersModule = + SerializersModule { + contextual(UUID::class) { UUIDSerializer } + contextual(GameId::class) { GameIdSerializer } + contextual(CommandId::class) { CommandIdSerializer } + contextual(Player.PlayerId::class) { PlayerIdSerializer } + } + } + object CommandIdSerializer : KSerializer { override fun deserialize(decoder: Decoder): CommandId = CommandId(decoder.decodeString()) diff --git a/src/main/kotlin/eventDemo/libs/FrameChannelConverter.kt b/src/main/kotlin/eventDemo/libs/FrameChannelConverter.kt new file mode 100644 index 0000000..db08d46 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/FrameChannelConverter.kt @@ -0,0 +1,43 @@ +package eventDemo.libs + +import io.github.oshai.kotlinlogging.KotlinLogging +import io.ktor.websocket.Frame +import io.ktor.websocket.readText +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.launch +import kotlinx.serialization.json.Json + +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +inline fun CoroutineScope.toObjectChannel( + frames: ReceiveChannel, + bufferSize: Int = 0, +): ReceiveChannel { + val logger = KotlinLogging.logger { } + return produce(capacity = bufferSize) { + frames.consumeEach { frame -> + if (frame is Frame.Text) { + logger.debug { "Conversion of the Frame: ${frame.readText()}" } + send(Json.decodeFromString(frame.readText())) + } else { + logger.warn { "The frame is not a text frame" } + } + } + } +} + +inline fun CoroutineScope.fromFrameChannel(frames: SendChannel): SendChannel { + val channel = Channel() + launch { + channel.consumeEach { obj -> + frames.send(Frame.Text(Json.encodeToString(obj))) + } + } + return channel +} diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt index 34bd9cf..8f69967 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt @@ -31,3 +31,5 @@ interface CommandStream { } } } + +typealias CommandBlock = suspend CommandStream.ComputeStatus.(C) -> Unit diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index bb31b57..271c371 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -1,29 +1,19 @@ package eventDemo.libs.command import io.github.oshai.kotlinlogging.KotlinLogging -import io.ktor.websocket.Frame -import io.ktor.websocket.readText import kotlinx.coroutines.channels.ReceiveChannel /** * Manage [Command]'s with kotlin Channel */ class CommandStreamChannel( - private val incoming: ReceiveChannel, - private val deserializer: (String) -> C, + private val incoming: ReceiveChannel, ) : CommandStream { private val logger = KotlinLogging.logger {} override suspend fun process(action: CommandBlock) { -// incoming.consumeEach { commandAsFrame -> -// if (commandAsFrame is Frame.Text) { -// compute(deserializer(commandAsFrame.readText()), action) -// } -// } for (command in incoming) { - if (command is Frame.Text) { - compute(deserializer(command.readText()), action) - } + compute(command, action) } } diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt deleted file mode 100644 index 7b2985e..0000000 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt +++ /dev/null @@ -1,69 +0,0 @@ -package eventDemo.libs.command - -import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.consumeEach - -typealias CommandBlock = suspend CommandStream.ComputeStatus.(C) -> Unit - -/** - * Manage [Command]'s - * - * It stores the new [Command] in memory. - */ -abstract class CommandStreamInMemory : CommandStream { - private val logger = KotlinLogging.logger {} - private val queue: Channel = - Channel(onUndeliveredElement = { - logger.atWarn { "${it::class.simpleName} command not send" } - }) - - override suspend fun process(action: CommandBlock) { - queue.consumeEach { command -> - compute(command, action) - } - for (command in queue) { - compute(command, action) - } - } - - private suspend fun compute( - command: C, - action: CommandBlock, - ) { - val status = - object : CommandStream.ComputeStatus { - var isSet: Boolean = false - - override suspend fun ack() { - if (!isSet) markAsSuccess(command) else error("Already NACK") - isSet = true - } - - override suspend fun nack() { - if (!isSet) markAsFailed(command) else error("Already ACK") - isSet = true - } - } - - if (runCatching { status.action(command) }.isFailure) { - markAsFailed(command) - } else if (!status.isSet) { - status.ack() - } - } - - private fun markAsSuccess(command: C) { - logger.atInfo { - message = "Compute command SUCCESS : $command" - payload = mapOf("command" to command) - } - } - - private fun markAsFailed(command: C) { - logger.atWarn { - message = "Compute command FAILED : $command" - payload = mapOf("command" to command) - } - } -} diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index e107637..8bd8af3 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -20,6 +20,8 @@ import eventDemo.app.notification.PlayerWasReadyNotification import eventDemo.app.notification.TheGameWasStartedNotification import eventDemo.app.notification.WelcomeToTheGameNotification import eventDemo.configuration.appKoinModule +import eventDemo.libs.fromFrameChannel +import eventDemo.libs.toObjectChannel import eventDemo.shared.toFrame import eventDemo.shared.toNotification import io.kotest.core.spec.style.FunSpec @@ -138,10 +140,10 @@ class GameStateTest : playerNotificationListener.startListening(channelNotification2, player2) GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, channelCommand1, channelNotification1) + commandHandler.handle(player1, toObjectChannel(channelCommand1), fromFrameChannel(channelNotification1)) } GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, channelCommand2, channelNotification2) + commandHandler.handle(player2, toObjectChannel(channelCommand2), fromFrameChannel(channelNotification2)) } joinAll(player1Job, player2Job) diff --git a/src/test/kotlin/eventDemo/app/query/TestHttpClient.kt b/src/test/kotlin/eventDemo/app/query/TestHttpClient.kt index 9d85fb2..bea81bb 100644 --- a/src/test/kotlin/eventDemo/app/query/TestHttpClient.kt +++ b/src/test/kotlin/eventDemo/app/query/TestHttpClient.kt @@ -1,26 +1,15 @@ package eventDemo.app.query -import eventDemo.app.entity.GameId -import eventDemo.configuration.GameIdSerializer -import eventDemo.configuration.UUIDSerializer +import eventDemo.configuration.defaultJsonSerializer import io.ktor.client.HttpClient import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.serialization.kotlinx.json.json import io.ktor.server.testing.ApplicationTestBuilder -import kotlinx.serialization.json.Json -import kotlinx.serialization.modules.SerializersModule -import java.util.UUID fun ApplicationTestBuilder.httpClient(): HttpClient = createClient { install(ContentNegotiation) { json( - Json { - serializersModule = - SerializersModule { - contextual(UUID::class) { UUIDSerializer } - contextual(GameId::class) { GameIdSerializer } - } - }, + defaultJsonSerializer(), ) } } diff --git a/src/test/kotlin/eventDemo/libs/FrameChannelConverterTest.kt b/src/test/kotlin/eventDemo/libs/FrameChannelConverterTest.kt new file mode 100644 index 0000000..caa43a0 --- /dev/null +++ b/src/test/kotlin/eventDemo/libs/FrameChannelConverterTest.kt @@ -0,0 +1,55 @@ +package eventDemo.libs + +import eventDemo.libs.command.Command +import eventDemo.libs.command.CommandId +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.equals.shouldBeEqual +import io.ktor.websocket.Frame +import io.ktor.websocket.readText +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.serialization.Serializable +import java.util.UUID +import kotlin.test.assertIs + +@Serializable +data class CommandTest( + override val id: CommandId, +) : Command + +class FrameChannelConverterTest : + FunSpec({ + + test("toObjectChannel") { + val uuid = "d737c631-76af-406e-bc29-f3e5b97226a5" + val id = CommandId(UUID.fromString(uuid)) + val jsonCommand = """{"id":"$uuid"}""" + + val channel = Channel() + + launch { + val commandChannel = toObjectChannel(channel) + commandChannel.receive().id shouldBeEqual id + channel.close() + } + + channel.send(Frame.Text(jsonCommand)) + } + + test("fromFrameChannel") { + val uuid = "d737c631-76af-406e-bc29-f3e5b97226a5" + val id = CommandId(UUID.fromString(uuid)) + val command = CommandTest(id) + val jsonCommand = """{"id":"$uuid"}""" + + val channel = Channel() + + launch { + val commandChannel = fromFrameChannel(channel) + commandChannel.send(command) + commandChannel.close() + } + + assertIs(channel.receive()).readText() shouldBeEqual jsonCommand + } + }) diff --git a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt index 7f5ce9a..7abc13b 100644 --- a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt +++ b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt @@ -1,12 +1,10 @@ package eventDemo.libs.command import io.kotest.core.spec.style.FunSpec -import io.ktor.websocket.Frame import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.channels.Channel import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json @Serializable class CommandTest( @@ -19,12 +17,9 @@ class CommandStreamChannelTest : test("send and receive") { val command = CommandTest(CommandId()) - val channel = Channel() + val channel = Channel() val stream = - CommandStreamChannel( - incoming = channel, - deserializer = { Json.decodeFromString(it) }, - ) + CommandStreamChannel(channel) val spyCall: () -> Unit = mockk(relaxed = true) @@ -32,7 +27,7 @@ class CommandStreamChannelTest : println("In action ${it.id}") spyCall() } - channel.send(Frame.Text(Json.encodeToString(command))) + channel.send(command) verify(exactly = 1) { spyCall() } } })