From 99f0760d3ce36f2083bd9ea2d18242a44da0aa06 Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Wed, 5 Mar 2025 22:11:24 +0100 Subject: [PATCH] Clean and fix --- .../app/command/GameCommandHandler.kt | 37 +++++++++----- .../eventDemo/app/command/GameCommandRoute.kt | 13 ++--- .../eventDemo/app/event/GameEventStream.kt | 9 ++-- .../GameEventPlayerNotificationListener.kt | 14 ++---- .../GameEventReactionListener.kt | 9 ++-- .../eventDemo/configuration/ConfigureDI.kt | 10 +++- .../ConfigureWebSocketsGameRoute.kt | 10 ++-- .../eventDemo/libs/command/CommandStream.kt | 14 +++++- .../libs/command/CommandStreamChannel.kt | 49 ++++++++++--------- .../libs/command/CommandStreamInMemory.kt | 15 +++--- .../libs/command/CommandStreamChannelTest.kt | 22 +++------ 11 files changed, 110 insertions(+), 92 deletions(-) diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt index 8f9e9d5..3c90ad9 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt @@ -10,10 +10,15 @@ import eventDemo.app.entity.Player import eventDemo.app.event.GameEventStream import eventDemo.app.event.buildStateFromEventStream import eventDemo.app.event.event.GameEvent +import eventDemo.libs.command.CommandBlock import io.ktor.websocket.Frame +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.launch /** * Listen [GameCommand] on [GameCommandStream], check the validity and execute an action. @@ -22,21 +27,31 @@ import kotlinx.coroutines.runBlocking */ class GameCommandHandler( private val eventStream: GameEventStream, - incoming: ReceiveChannel, - outgoing: SendChannel, ) { - private val commandStream = GameCommandStream(incoming, outgoing) - private val playerNotifier: (String) -> Unit = { runBlocking { outgoing.send(Frame.Text(it)) } } - /** * Init the handler */ - suspend fun init(player: Player) { + @OptIn(DelicateCoroutinesApi::class) + fun handle( + player: Player, + incoming: ReceiveChannel, + outgoing: SendChannel, + ): Job { + val commandStream = GameCommandStream(incoming, outgoing) + val playerNotifier: (String) -> Unit = { outgoing.trySendBlocking(Frame.Text(it)) } + return GlobalScope.launch { + init(player, commandStream, playerNotifier) + } + } + + private suspend fun init( + player: Player, + commandStream: GameCommandStream, + playerNotifier: (String) -> Unit, + ) { commandStream.process { command -> if (command.payload.player.id != player.id) { - runBlocking { - nack() - } + nack() } val gameState = command.buildGameState() @@ -47,7 +62,7 @@ class GameCommandHandler( is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream) is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream) } - } + } as CommandBlock } private fun GameCommand.buildGameState(): GameState = payload.gameId.buildStateFromEventStream(eventStream) diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt index bbe0f77..768883b 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt @@ -1,8 +1,6 @@ package eventDemo.app.command import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventBus -import eventDemo.app.event.GameEventStream import eventDemo.app.eventListener.GameEventPlayerNotificationListener import io.ktor.server.application.ApplicationCall import io.ktor.server.auth.authenticate @@ -10,18 +8,15 @@ import io.ktor.server.auth.jwt.JWTPrincipal import io.ktor.server.auth.principal import io.ktor.server.routing.Route import io.ktor.server.websocket.webSocket -import kotlinx.coroutines.launch fun Route.gameSocket( - eventStream: GameEventStream, - eventBus: GameEventBus, + playerNotificationListener: GameEventPlayerNotificationListener, + commandHandler: GameCommandHandler, ) { authenticate { webSocket("/game") { - launch { - GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer()) - } - GameEventPlayerNotificationListener(eventBus, outgoing).init() + commandHandler.handle(call.getPlayer(), incoming, outgoing) + playerNotificationListener.startListening(outgoing) } } } diff --git a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt index 2c7d31f..317f275 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt @@ -2,18 +2,17 @@ package eventDemo.app.event import eventDemo.app.entity.GameId import eventDemo.app.event.event.GameEvent -import eventDemo.libs.event.EventBus import eventDemo.libs.event.EventStream /** * A stream to publish and read the played card event. */ class GameEventStream( - private val eventBus: EventBus, - private val m: EventStream, -) : EventStream by m { + private val eventBus: GameEventBus, + private val eventStream: EventStream, +) : EventStream by eventStream { override fun publish(event: GameEvent) { - m.publish(event) + eventStream.publish(event) eventBus.publish(event) } } diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt index 8a604a5..057fed1 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt @@ -1,22 +1,18 @@ package eventDemo.app.eventListener -import eventDemo.app.entity.GameId +import eventDemo.app.event.GameEventBus import eventDemo.app.event.event.GameEvent -import eventDemo.libs.event.EventBus import eventDemo.shared.toFrame import io.ktor.websocket.Frame import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.channels.trySendBlocking class GameEventPlayerNotificationListener( - private val eventBus: EventBus, - private val outgoing: SendChannel, + private val eventBus: GameEventBus, ) { - fun init() { + fun startListening(outgoing: SendChannel) { eventBus.subscribe { event: GameEvent -> - runBlocking { - outgoing.send(event.toFrame()) - } + outgoing.trySendBlocking(event.toFrame()) } } } diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt index 7de3dc8..0a15a53 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt @@ -1,15 +1,14 @@ package eventDemo.app.eventListener -import eventDemo.app.entity.GameId +import eventDemo.app.event.GameEventBus +import eventDemo.app.event.GameEventStream import eventDemo.app.event.buildStateFromEventStream import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameStartedEvent -import eventDemo.libs.event.EventBus -import eventDemo.libs.event.EventStream class GameEventReactionListener( - private val eventBus: EventBus, - private val eventStream: EventStream, + private val eventBus: GameEventBus, + private val eventStream: GameEventStream, ) { fun init() { eventBus.subscribe { event: GameEvent -> diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt index 82c28db..80ef6cc 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt @@ -1,11 +1,14 @@ package eventDemo.configuration +import eventDemo.app.command.GameCommandHandler import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventStream +import eventDemo.app.eventListener.GameEventPlayerNotificationListener import eventDemo.libs.event.EventBusInMemory import eventDemo.libs.event.EventStreamInMemory import io.ktor.server.application.Application import io.ktor.server.application.install +import org.koin.core.module.dsl.singleOf import org.koin.dsl.module import org.koin.ktor.plugin.Koin import org.koin.logger.slf4jLogger @@ -19,10 +22,15 @@ fun Application.configureKoin() { val appKoinModule = module { + single { + GameEventBus(EventBusInMemory()) + } single { GameEventStream(get(), EventStreamInMemory()) } single { - GameEventBus(EventBusInMemory()) + GameCommandHandler(get()) } + + singleOf(::GameEventPlayerNotificationListener) } diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt b/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt index 18ab2fd..2c637ab 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt @@ -1,16 +1,16 @@ package eventDemo.configuration +import eventDemo.app.command.GameCommandHandler import eventDemo.app.command.gameSocket -import eventDemo.app.event.GameEventBus -import eventDemo.app.event.GameEventStream +import eventDemo.app.eventListener.GameEventPlayerNotificationListener import io.ktor.server.application.Application import io.ktor.server.routing.routing fun Application.declareWebSocketsGameRoute( - eventStream: GameEventStream, - eventBus: GameEventBus, + playerNotificationListener: GameEventPlayerNotificationListener, + commandHandler: GameCommandHandler, ) { routing { - gameSocket(eventStream, eventBus) + gameSocket(playerNotificationListener, commandHandler) } } diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt index 28f46f5..a3acdae 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt @@ -1,5 +1,8 @@ package eventDemo.libs.command +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import kotlin.reflect.KClass /** @@ -11,7 +14,7 @@ interface CommandStream { /** * Send a new [Command] to the queue. */ - suspend fun send( + fun send( type: KClass, command: C, ) @@ -19,7 +22,7 @@ interface CommandStream { /** * Send multiple [Command] to the queue. */ - suspend fun send( + fun send( type: KClass, vararg commands: C, ) { @@ -39,6 +42,13 @@ interface CommandStream { * Apply an action to all command income in the stream. */ suspend fun process(action: CommandBlock) + + @OptIn(DelicateCoroutinesApi::class) + fun blockAndProcess(action: CommandBlock) { + GlobalScope.launch { + process(action) + } + } } suspend inline fun CommandStream.send(vararg command: C) = send(C::class, *command) diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index 5ea7526..cacce8a 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -3,10 +3,11 @@ package eventDemo.libs.command import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.websocket.Frame import io.ktor.websocket.readText -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.launch +import kotlinx.coroutines.channels.onFailure +import kotlinx.coroutines.channels.onSuccess +import kotlinx.coroutines.channels.trySendBlocking import kotlin.reflect.KClass /** @@ -19,23 +20,30 @@ class CommandStreamChannel( private val deserializer: (String) -> C, ) : CommandStream { private val logger = KotlinLogging.logger {} - private val failedCommand = mutableListOf() /** * Send a new [Command] to the queue. */ - override suspend fun send( + override fun send( type: KClass, command: C, ) { - outgoing.send(Frame.Text(serializer(command))) - logger.atInfo { - message = "Command published: $command" - payload = mapOf("command" to command) - } + outgoing + .trySendBlocking(Frame.Text(serializer(command))) + .onSuccess { + logger.atInfo { + message = "Command published: $command" + payload = mapOf("command" to command) + } + }.onFailure { + logger.atError { + message = "Command FAILED: $command" + payload = mapOf("command" to command) + } + } } - override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) { + override suspend fun process(action: CommandBlock) { // incoming.consumeEach { commandAsFrame -> // if (commandAsFrame is Frame.Text) { // compute(deserializer(commandAsFrame.readText()), action) @@ -50,7 +58,7 @@ class CommandStreamChannel( private suspend fun compute( command: C, - action: CommandStream.ComputeStatus.(C) -> Unit, + action: CommandBlock, ) { val status = object : CommandStream.ComputeStatus { @@ -67,12 +75,12 @@ class CommandStreamChannel( } } - val action = runCatching { status.action(command) } - if (action.isFailure) { + val actionResult = runCatching { status.action(command) } + if (actionResult.isFailure) { logger.atInfo { - message = "Error" + message = "Error on compute the Command" payload = mapOf("command" to command) - cause = action.exceptionOrNull() + cause = actionResult.exceptionOrNull() } markAsFailed(command) } else if (!status.isSet) { @@ -82,20 +90,17 @@ class CommandStreamChannel( private suspend fun markAsSuccess(command: C) { logger.atInfo { - message = "Compute command SUCCESS and it removed of the stack : $command" + message = "Compute command SUCCESS and it removed of the stack" payload = mapOf("command" to command) } - GlobalScope.launch { -// outgoing.send(Frame.Text("Command executed successfully")) - } +// outgoing.trySendBlocking(Frame.Text("Command executed successfully")) } private suspend fun markAsFailed(command: C) { - failedCommand.add(command) logger.atWarn { - message = "Compute command FAILED and it put it ot the top of the stack : $command" + message = "Compute command FAILED" payload = mapOf("command" to command) } - outgoing.send(Frame.Text("Command execution failed")) +// outgoing.trySendBlocking(Frame.Text("Command execution failed")) } } diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt index 8c5a2e9..2379745 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt @@ -3,9 +3,10 @@ package eventDemo.libs.command import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.channels.trySendBlocking import kotlin.reflect.KClass -typealias CommandBlock = CommandStream.ComputeStatus.(C) -> Unit +typealias CommandBlock = suspend CommandStream.ComputeStatus.(C) -> Unit /** * Manage [Command]'s @@ -14,7 +15,6 @@ typealias CommandBlock = CommandStream.ComputeStatus.(C) -> Unit */ abstract class CommandStreamInMemory : CommandStream { private val logger = KotlinLogging.logger {} - private val failedCommand = mutableListOf() private val queue: Channel = Channel(onUndeliveredElement = { logger.atWarn { "${it::class.simpleName} command not send" } @@ -23,7 +23,7 @@ abstract class CommandStreamInMemory : CommandStream { /** * Send a new [Command] to the queue. */ - override suspend fun send( + override fun send( type: KClass, command: C, ) { @@ -31,7 +31,7 @@ abstract class CommandStreamInMemory : CommandStream { message = "Command published: $command" payload = mapOf("command" to command) } - queue.send(command) + queue.trySendBlocking(command) } override suspend fun process(action: CommandBlock) { @@ -43,7 +43,7 @@ abstract class CommandStreamInMemory : CommandStream { } } - private fun compute( + private suspend fun compute( command: C, action: CommandBlock, ) { @@ -51,12 +51,12 @@ abstract class CommandStreamInMemory : CommandStream { object : CommandStream.ComputeStatus { var isSet: Boolean = false - override fun ack() { + override suspend fun ack() { if (!isSet) markAsSuccess(command) else error("Already NACK") isSet = true } - override fun nack() { + override suspend fun nack() { if (!isSet) markAsFailed(command) else error("Already ACK") isSet = true } @@ -77,7 +77,6 @@ abstract class CommandStreamInMemory : CommandStream { } private fun markAsFailed(command: C) { - failedCommand.add(command) logger.atWarn { message = "Compute command FAILED and it put it ot the top of the stack : $command" payload = mapOf("command" to command) diff --git a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt index 4b9fa81..d301c71 100644 --- a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt +++ b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt @@ -5,8 +5,6 @@ import io.ktor.websocket.Frame import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking class CommandTest( override val id: CommandId, @@ -30,19 +28,13 @@ class CommandStreamChannelTest : ) val spyCall: () -> Unit = mockk(relaxed = true) - runBlocking { - launch { - stream.process { - println("In action ${it.id}") - spyCall() - } - } - launch { - stream.send(command, command2) - stream.send(command3) - channel.close() - }.join() - verify(exactly = 3) { spyCall() } + + stream.blockAndProcess { + println("In action ${it.id}") + spyCall() } + stream.send(command, command2) + stream.send(command3) + verify(exactly = 3) { spyCall() } } })