diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt
index 8f9e9d5..3c90ad9 100644
--- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt
+++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt
@@ -10,10 +10,15 @@ import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventStream
import eventDemo.app.event.buildStateFromEventStream
import eventDemo.app.event.event.GameEvent
+import eventDemo.libs.command.CommandBlock
import io.ktor.websocket.Frame
+import kotlinx.coroutines.DelicateCoroutinesApi
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.channels.trySendBlocking
+import kotlinx.coroutines.launch
/**
* Listen [GameCommand] on [GameCommandStream], check the validity and execute an action.
@@ -22,21 +27,31 @@ import kotlinx.coroutines.runBlocking
*/
class GameCommandHandler(
private val eventStream: GameEventStream,
- incoming: ReceiveChannel,
- outgoing: SendChannel,
) {
- private val commandStream = GameCommandStream(incoming, outgoing)
- private val playerNotifier: (String) -> Unit = { runBlocking { outgoing.send(Frame.Text(it)) } }
-
/**
* Init the handler
*/
- suspend fun init(player: Player) {
+ @OptIn(DelicateCoroutinesApi::class)
+ fun handle(
+ player: Player,
+ incoming: ReceiveChannel,
+ outgoing: SendChannel,
+ ): Job {
+ val commandStream = GameCommandStream(incoming, outgoing)
+ val playerNotifier: (String) -> Unit = { outgoing.trySendBlocking(Frame.Text(it)) }
+ return GlobalScope.launch {
+ init(player, commandStream, playerNotifier)
+ }
+ }
+
+ private suspend fun init(
+ player: Player,
+ commandStream: GameCommandStream,
+ playerNotifier: (String) -> Unit,
+ ) {
commandStream.process { command ->
if (command.payload.player.id != player.id) {
- runBlocking {
- nack()
- }
+ nack()
}
val gameState = command.buildGameState()
@@ -47,7 +62,7 @@ class GameCommandHandler(
is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream)
is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream)
}
- }
+ } as CommandBlock
}
private fun GameCommand.buildGameState(): GameState = payload.gameId.buildStateFromEventStream(eventStream)
diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt
index bbe0f77..768883b 100644
--- a/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt
+++ b/src/main/kotlin/eventDemo/app/command/GameCommandRoute.kt
@@ -1,8 +1,6 @@
package eventDemo.app.command
import eventDemo.app.entity.Player
-import eventDemo.app.event.GameEventBus
-import eventDemo.app.event.GameEventStream
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
import io.ktor.server.application.ApplicationCall
import io.ktor.server.auth.authenticate
@@ -10,18 +8,15 @@ import io.ktor.server.auth.jwt.JWTPrincipal
import io.ktor.server.auth.principal
import io.ktor.server.routing.Route
import io.ktor.server.websocket.webSocket
-import kotlinx.coroutines.launch
fun Route.gameSocket(
- eventStream: GameEventStream,
- eventBus: GameEventBus,
+ playerNotificationListener: GameEventPlayerNotificationListener,
+ commandHandler: GameCommandHandler,
) {
authenticate {
webSocket("/game") {
- launch {
- GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer())
- }
- GameEventPlayerNotificationListener(eventBus, outgoing).init()
+ commandHandler.handle(call.getPlayer(), incoming, outgoing)
+ playerNotificationListener.startListening(outgoing)
}
}
}
diff --git a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt
index 2c7d31f..317f275 100644
--- a/src/main/kotlin/eventDemo/app/event/GameEventStream.kt
+++ b/src/main/kotlin/eventDemo/app/event/GameEventStream.kt
@@ -2,18 +2,17 @@ package eventDemo.app.event
import eventDemo.app.entity.GameId
import eventDemo.app.event.event.GameEvent
-import eventDemo.libs.event.EventBus
import eventDemo.libs.event.EventStream
/**
* A stream to publish and read the played card event.
*/
class GameEventStream(
- private val eventBus: EventBus,
- private val m: EventStream,
-) : EventStream by m {
+ private val eventBus: GameEventBus,
+ private val eventStream: EventStream,
+) : EventStream by eventStream {
override fun publish(event: GameEvent) {
- m.publish(event)
+ eventStream.publish(event)
eventBus.publish(event)
}
}
diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt
index 8a604a5..057fed1 100644
--- a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt
+++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt
@@ -1,22 +1,18 @@
package eventDemo.app.eventListener
-import eventDemo.app.entity.GameId
+import eventDemo.app.event.GameEventBus
import eventDemo.app.event.event.GameEvent
-import eventDemo.libs.event.EventBus
import eventDemo.shared.toFrame
import io.ktor.websocket.Frame
import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.channels.trySendBlocking
class GameEventPlayerNotificationListener(
- private val eventBus: EventBus,
- private val outgoing: SendChannel,
+ private val eventBus: GameEventBus,
) {
- fun init() {
+ fun startListening(outgoing: SendChannel) {
eventBus.subscribe { event: GameEvent ->
- runBlocking {
- outgoing.send(event.toFrame())
- }
+ outgoing.trySendBlocking(event.toFrame())
}
}
}
diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt
index 7de3dc8..0a15a53 100644
--- a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt
+++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt
@@ -1,15 +1,14 @@
package eventDemo.app.eventListener
-import eventDemo.app.entity.GameId
+import eventDemo.app.event.GameEventBus
+import eventDemo.app.event.GameEventStream
import eventDemo.app.event.buildStateFromEventStream
import eventDemo.app.event.event.GameEvent
import eventDemo.app.event.event.GameStartedEvent
-import eventDemo.libs.event.EventBus
-import eventDemo.libs.event.EventStream
class GameEventReactionListener(
- private val eventBus: EventBus,
- private val eventStream: EventStream,
+ private val eventBus: GameEventBus,
+ private val eventStream: GameEventStream,
) {
fun init() {
eventBus.subscribe { event: GameEvent ->
diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt
index 82c28db..80ef6cc 100644
--- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt
+++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt
@@ -1,11 +1,14 @@
package eventDemo.configuration
+import eventDemo.app.command.GameCommandHandler
import eventDemo.app.event.GameEventBus
import eventDemo.app.event.GameEventStream
+import eventDemo.app.eventListener.GameEventPlayerNotificationListener
import eventDemo.libs.event.EventBusInMemory
import eventDemo.libs.event.EventStreamInMemory
import io.ktor.server.application.Application
import io.ktor.server.application.install
+import org.koin.core.module.dsl.singleOf
import org.koin.dsl.module
import org.koin.ktor.plugin.Koin
import org.koin.logger.slf4jLogger
@@ -19,10 +22,15 @@ fun Application.configureKoin() {
val appKoinModule =
module {
+ single {
+ GameEventBus(EventBusInMemory())
+ }
single {
GameEventStream(get(), EventStreamInMemory())
}
single {
- GameEventBus(EventBusInMemory())
+ GameCommandHandler(get())
}
+
+ singleOf(::GameEventPlayerNotificationListener)
}
diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt b/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt
index 18ab2fd..2c637ab 100644
--- a/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt
+++ b/src/main/kotlin/eventDemo/configuration/ConfigureWebSocketsGameRoute.kt
@@ -1,16 +1,16 @@
package eventDemo.configuration
+import eventDemo.app.command.GameCommandHandler
import eventDemo.app.command.gameSocket
-import eventDemo.app.event.GameEventBus
-import eventDemo.app.event.GameEventStream
+import eventDemo.app.eventListener.GameEventPlayerNotificationListener
import io.ktor.server.application.Application
import io.ktor.server.routing.routing
fun Application.declareWebSocketsGameRoute(
- eventStream: GameEventStream,
- eventBus: GameEventBus,
+ playerNotificationListener: GameEventPlayerNotificationListener,
+ commandHandler: GameCommandHandler,
) {
routing {
- gameSocket(eventStream, eventBus)
+ gameSocket(playerNotificationListener, commandHandler)
}
}
diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt
index 28f46f5..a3acdae 100644
--- a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt
+++ b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt
@@ -1,5 +1,8 @@
package eventDemo.libs.command
+import kotlinx.coroutines.DelicateCoroutinesApi
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.launch
import kotlin.reflect.KClass
/**
@@ -11,7 +14,7 @@ interface CommandStream {
/**
* Send a new [Command] to the queue.
*/
- suspend fun send(
+ fun send(
type: KClass,
command: C,
)
@@ -19,7 +22,7 @@ interface CommandStream {
/**
* Send multiple [Command] to the queue.
*/
- suspend fun send(
+ fun send(
type: KClass,
vararg commands: C,
) {
@@ -39,6 +42,13 @@ interface CommandStream {
* Apply an action to all command income in the stream.
*/
suspend fun process(action: CommandBlock)
+
+ @OptIn(DelicateCoroutinesApi::class)
+ fun blockAndProcess(action: CommandBlock) {
+ GlobalScope.launch {
+ process(action)
+ }
+ }
}
suspend inline fun CommandStream.send(vararg command: C) = send(C::class, *command)
diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt
index 5ea7526..cacce8a 100644
--- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt
+++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt
@@ -3,10 +3,11 @@ package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
-import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.channels.onFailure
+import kotlinx.coroutines.channels.onSuccess
+import kotlinx.coroutines.channels.trySendBlocking
import kotlin.reflect.KClass
/**
@@ -19,23 +20,30 @@ class CommandStreamChannel(
private val deserializer: (String) -> C,
) : CommandStream {
private val logger = KotlinLogging.logger {}
- private val failedCommand = mutableListOf()
/**
* Send a new [Command] to the queue.
*/
- override suspend fun send(
+ override fun send(
type: KClass,
command: C,
) {
- outgoing.send(Frame.Text(serializer(command)))
- logger.atInfo {
- message = "Command published: $command"
- payload = mapOf("command" to command)
- }
+ outgoing
+ .trySendBlocking(Frame.Text(serializer(command)))
+ .onSuccess {
+ logger.atInfo {
+ message = "Command published: $command"
+ payload = mapOf("command" to command)
+ }
+ }.onFailure {
+ logger.atError {
+ message = "Command FAILED: $command"
+ payload = mapOf("command" to command)
+ }
+ }
}
- override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) {
+ override suspend fun process(action: CommandBlock) {
// incoming.consumeEach { commandAsFrame ->
// if (commandAsFrame is Frame.Text) {
// compute(deserializer(commandAsFrame.readText()), action)
@@ -50,7 +58,7 @@ class CommandStreamChannel(
private suspend fun compute(
command: C,
- action: CommandStream.ComputeStatus.(C) -> Unit,
+ action: CommandBlock,
) {
val status =
object : CommandStream.ComputeStatus {
@@ -67,12 +75,12 @@ class CommandStreamChannel(
}
}
- val action = runCatching { status.action(command) }
- if (action.isFailure) {
+ val actionResult = runCatching { status.action(command) }
+ if (actionResult.isFailure) {
logger.atInfo {
- message = "Error"
+ message = "Error on compute the Command"
payload = mapOf("command" to command)
- cause = action.exceptionOrNull()
+ cause = actionResult.exceptionOrNull()
}
markAsFailed(command)
} else if (!status.isSet) {
@@ -82,20 +90,17 @@ class CommandStreamChannel(
private suspend fun markAsSuccess(command: C) {
logger.atInfo {
- message = "Compute command SUCCESS and it removed of the stack : $command"
+ message = "Compute command SUCCESS and it removed of the stack"
payload = mapOf("command" to command)
}
- GlobalScope.launch {
-// outgoing.send(Frame.Text("Command executed successfully"))
- }
+// outgoing.trySendBlocking(Frame.Text("Command executed successfully"))
}
private suspend fun markAsFailed(command: C) {
- failedCommand.add(command)
logger.atWarn {
- message = "Compute command FAILED and it put it ot the top of the stack : $command"
+ message = "Compute command FAILED"
payload = mapOf("command" to command)
}
- outgoing.send(Frame.Text("Command execution failed"))
+// outgoing.trySendBlocking(Frame.Text("Command execution failed"))
}
}
diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt
index 8c5a2e9..2379745 100644
--- a/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt
+++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamInMemory.kt
@@ -3,9 +3,10 @@ package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.channels.trySendBlocking
import kotlin.reflect.KClass
-typealias CommandBlock = CommandStream.ComputeStatus.(C) -> Unit
+typealias CommandBlock = suspend CommandStream.ComputeStatus.(C) -> Unit
/**
* Manage [Command]'s
@@ -14,7 +15,6 @@ typealias CommandBlock = CommandStream.ComputeStatus.(C) -> Unit
*/
abstract class CommandStreamInMemory : CommandStream {
private val logger = KotlinLogging.logger {}
- private val failedCommand = mutableListOf()
private val queue: Channel =
Channel(onUndeliveredElement = {
logger.atWarn { "${it::class.simpleName} command not send" }
@@ -23,7 +23,7 @@ abstract class CommandStreamInMemory : CommandStream {
/**
* Send a new [Command] to the queue.
*/
- override suspend fun send(
+ override fun send(
type: KClass,
command: C,
) {
@@ -31,7 +31,7 @@ abstract class CommandStreamInMemory : CommandStream {
message = "Command published: $command"
payload = mapOf("command" to command)
}
- queue.send(command)
+ queue.trySendBlocking(command)
}
override suspend fun process(action: CommandBlock) {
@@ -43,7 +43,7 @@ abstract class CommandStreamInMemory : CommandStream {
}
}
- private fun compute(
+ private suspend fun compute(
command: C,
action: CommandBlock,
) {
@@ -51,12 +51,12 @@ abstract class CommandStreamInMemory : CommandStream {
object : CommandStream.ComputeStatus {
var isSet: Boolean = false
- override fun ack() {
+ override suspend fun ack() {
if (!isSet) markAsSuccess(command) else error("Already NACK")
isSet = true
}
- override fun nack() {
+ override suspend fun nack() {
if (!isSet) markAsFailed(command) else error("Already ACK")
isSet = true
}
@@ -77,7 +77,6 @@ abstract class CommandStreamInMemory : CommandStream {
}
private fun markAsFailed(command: C) {
- failedCommand.add(command)
logger.atWarn {
message = "Compute command FAILED and it put it ot the top of the stack : $command"
payload = mapOf("command" to command)
diff --git a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt
index 4b9fa81..d301c71 100644
--- a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt
+++ b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt
@@ -5,8 +5,6 @@ import io.ktor.websocket.Frame
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
class CommandTest(
override val id: CommandId,
@@ -30,19 +28,13 @@ class CommandStreamChannelTest :
)
val spyCall: () -> Unit = mockk(relaxed = true)
- runBlocking {
- launch {
- stream.process {
- println("In action ${it.id}")
- spyCall()
- }
- }
- launch {
- stream.send(command, command2)
- stream.send(command3)
- channel.close()
- }.join()
- verify(exactly = 3) { spyCall() }
+
+ stream.blockAndProcess {
+ println("In action ${it.id}")
+ spyCall()
}
+ stream.send(command, command2)
+ stream.send(command3)
+ verify(exactly = 3) { spyCall() }
}
})