CommandStreamChannelTest

This commit is contained in:
2025-03-05 20:14:33 +01:00
parent c50127ba1b
commit d84e8359c9
6 changed files with 84 additions and 31 deletions

View File

@@ -63,4 +63,5 @@ dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version") testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version")
testImplementation("io.ktor:ktor-server-test-host-jvm:2.3.11") testImplementation("io.ktor:ktor-server-test-host-jvm:2.3.11")
testImplementation("io.kotest:kotest-runner-junit5:$kotest_version") testImplementation("io.kotest:kotest-runner-junit5:$kotest_version")
testImplementation("io.mockk:mockk:1.13.17")
} }

View File

@@ -11,11 +11,8 @@ import eventDemo.app.event.GameEventStream
import eventDemo.app.event.buildStateFromEventStream import eventDemo.app.event.buildStateFromEventStream
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
import io.ktor.websocket.Frame import io.ktor.websocket.Frame
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
/** /**
@@ -34,21 +31,21 @@ class GameCommandHandler(
/** /**
* Init the handler * Init the handler
*/ */
fun init(player: Player) { suspend fun init(player: Player) {
CoroutineScope(Dispatchers.IO).launch { commandStream.process { command ->
commandStream.process { command -> if (command.payload.player.id != player.id) {
if (command.payload.player.id != player.id) { runBlocking {
nack() nack()
} }
}
val gameState = command.buildGameState() val gameState = command.buildGameState()
when (command) { when (command) {
is IWantToPlayCardCommand -> command.run(gameState, playerNotifier, eventStream) is IWantToPlayCardCommand -> command.run(gameState, playerNotifier, eventStream)
is IamReadyToPlayCommand -> command.run(gameState, playerNotifier, eventStream) is IamReadyToPlayCommand -> command.run(gameState, playerNotifier, eventStream)
is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream) is IWantToJoinTheGameCommand -> command.run(gameState, playerNotifier, eventStream)
is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream) is ICantPlayCommand -> command.run(gameState, playerNotifier, eventStream)
}
} }
} }
} }

View File

@@ -10,6 +10,7 @@ import io.ktor.server.auth.jwt.JWTPrincipal
import io.ktor.server.auth.principal import io.ktor.server.auth.principal
import io.ktor.server.routing.Route import io.ktor.server.routing.Route
import io.ktor.server.websocket.webSocket import io.ktor.server.websocket.webSocket
import kotlinx.coroutines.launch
fun Route.gameSocket( fun Route.gameSocket(
eventStream: GameEventStream, eventStream: GameEventStream,
@@ -17,7 +18,9 @@ fun Route.gameSocket(
) { ) {
authenticate { authenticate {
webSocket("/game") { webSocket("/game") {
GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer()) launch {
GameCommandHandler(eventStream, incoming, outgoing).init(call.getPlayer())
}
GameEventPlayerNotificationListener(eventBus, outgoing).init() GameEventPlayerNotificationListener(eventBus, outgoing).init()
} }
} }

View File

@@ -30,9 +30,9 @@ interface CommandStream<C : Command> {
* A class to implement success/failed action. * A class to implement success/failed action.
*/ */
interface ComputeStatus { interface ComputeStatus {
fun ack() suspend fun ack()
fun nack() suspend fun nack()
} }
/** /**

View File

@@ -3,9 +3,10 @@ package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame import io.ktor.websocket.Frame
import io.ktor.websocket.readText import io.ktor.websocket.readText
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.launch
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
@@ -27,12 +28,11 @@ class CommandStreamChannel<C : Command>(
type: KClass<C>, type: KClass<C>,
command: C, command: C,
) { ) {
outgoing.send(Frame.Text(serializer(command)))
logger.atInfo { logger.atInfo {
message = "Command published: $command" message = "Command published: $command"
payload = mapOf("command" to command) payload = mapOf("command" to command)
} }
outgoing.send(Frame.Text(serializer(command)))
} }
override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) { override suspend fun process(action: CommandStream.ComputeStatus.(C) -> Unit) {
@@ -48,7 +48,7 @@ class CommandStreamChannel<C : Command>(
} }
} }
private fun compute( private suspend fun compute(
command: C, command: C,
action: CommandStream.ComputeStatus.(C) -> Unit, action: CommandStream.ComputeStatus.(C) -> Unit,
) { ) {
@@ -56,42 +56,46 @@ class CommandStreamChannel<C : Command>(
object : CommandStream.ComputeStatus { object : CommandStream.ComputeStatus {
var isSet: Boolean = false var isSet: Boolean = false
override fun ack() { override suspend fun ack() {
if (!isSet) markAsSuccess(command) else error("Already NACK") if (!isSet) markAsSuccess(command) else error("Already NACK")
isSet = true isSet = true
} }
override fun nack() { override suspend fun nack() {
if (!isSet) markAsFailed(command) else error("Already ACK") if (!isSet) markAsFailed(command) else error("Already ACK")
isSet = true isSet = true
} }
} }
if (runCatching { status.action(command) }.isFailure) { val action = runCatching { status.action(command) }
if (action.isFailure) {
logger.atInfo {
message = "Error"
payload = mapOf("command" to command)
cause = action.exceptionOrNull()
}
markAsFailed(command) markAsFailed(command)
} else if (!status.isSet) { } else if (!status.isSet) {
status.ack() status.ack()
} }
} }
private fun markAsSuccess(command: C) { private suspend fun markAsSuccess(command: C) {
logger.atInfo { logger.atInfo {
message = "Compute command SUCCESS and it removed of the stack : $command" message = "Compute command SUCCESS and it removed of the stack : $command"
payload = mapOf("command" to command) payload = mapOf("command" to command)
} }
runBlocking { GlobalScope.launch {
outgoing.send(Frame.Text("Command executed successfully")) // outgoing.send(Frame.Text("Command executed successfully"))
} }
} }
private fun markAsFailed(command: C) { private suspend fun markAsFailed(command: C) {
failedCommand.add(command) failedCommand.add(command)
logger.atWarn { logger.atWarn {
message = "Compute command FAILED and it put it ot the top of the stack : $command" message = "Compute command FAILED and it put it ot the top of the stack : $command"
payload = mapOf("command" to command) payload = mapOf("command" to command)
} }
runBlocking { outgoing.send(Frame.Text("Command execution failed"))
outgoing.send(Frame.Text("Command execution failed"))
}
} }
} }

View File

@@ -0,0 +1,48 @@
package eventDemo.libs.command
import io.kotest.core.spec.style.FunSpec
import io.ktor.websocket.Frame
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class CommandTest(
override val id: CommandId,
) : Command
class CommandStreamChannelTest :
FunSpec({
test("send and receive") {
val command = CommandTest(CommandId())
val command2 = CommandTest(CommandId())
val command3 = CommandTest(CommandId())
val channel = Channel<Frame>()
val stream =
CommandStreamChannel<CommandTest>(
incoming = channel,
outgoing = channel,
serializer = { it.id.toString() },
deserializer = { CommandTest(CommandId(it)) },
)
val spyCall: () -> Unit = mockk(relaxed = true)
runBlocking {
launch {
stream.process {
println("In action ${it.id}")
spyCall()
}
}
launch {
stream.send(command, command2)
stream.send(command3)
channel.close()
}.join()
verify(exactly = 3) { spyCall() }
}
}
})