create a FrameChannelConverter

This commit is contained in:
2025-03-11 19:05:03 +01:00
parent c84562afe6
commit 0fbea7903a
12 changed files with 136 additions and 130 deletions

View File

@@ -10,9 +10,8 @@ import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.GameStateRepository
import eventDemo.app.notification.ErrorNotification import eventDemo.app.notification.ErrorNotification
import eventDemo.shared.toFrame import eventDemo.app.notification.Notification
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
@@ -32,8 +31,8 @@ class GameCommandHandler(
*/ */
suspend fun handle( suspend fun handle(
player: Player, player: Player,
incomingCommandChannel: ReceiveChannel<Frame>, incomingCommandChannel: ReceiveChannel<GameCommand>,
outgoingErrorChannelNotification: SendChannel<Frame>, outgoingErrorChannelNotification: SendChannel<Notification>,
) = GameCommandStream(incomingCommandChannel).process { command -> ) = GameCommandStream(incomingCommandChannel).process { command ->
if (command.payload.player.id != player.id) { if (command.payload.player.id != player.id) {
nack() nack()
@@ -45,7 +44,7 @@ class GameCommandHandler(
message = "Notification send ERROR: ${notification.message}" message = "Notification send ERROR: ${notification.message}"
payload = mapOf("notification" to notification) payload = mapOf("notification" to notification)
} }
outgoingErrorChannelNotification.send(notification.toFrame()) outgoingErrorChannelNotification.send(notification)
} }
val gameState = gameStateRepository.get(command.payload.gameId) val gameState = gameStateRepository.get(command.payload.gameId)

View File

@@ -2,6 +2,8 @@ package eventDemo.app.command
import eventDemo.app.entity.Player import eventDemo.app.entity.Player
import eventDemo.app.eventListener.GameEventPlayerNotificationListener import eventDemo.app.eventListener.GameEventPlayerNotificationListener
import eventDemo.libs.fromFrameChannel
import eventDemo.libs.toObjectChannel
import io.ktor.server.application.ApplicationCall import io.ktor.server.application.ApplicationCall
import io.ktor.server.auth.authenticate import io.ktor.server.auth.authenticate
import io.ktor.server.auth.jwt.JWTPrincipal import io.ktor.server.auth.jwt.JWTPrincipal
@@ -21,7 +23,11 @@ fun Route.gameSocket(
webSocket("/game") { webSocket("/game") {
val currentPlayer = call.getPlayer() val currentPlayer = call.getPlayer()
GlobalScope.launch { GlobalScope.launch {
commandHandler.handle(currentPlayer, incoming, outgoing) commandHandler.handle(
currentPlayer,
toObjectChannel(incoming),
fromFrameChannel(outgoing),
)
} }
playerNotificationListener.startListening(outgoing, currentPlayer) playerNotificationListener.startListening(outgoing, currentPlayer)
} }

View File

@@ -3,22 +3,11 @@ package eventDemo.app.command
import eventDemo.app.command.command.GameCommand import eventDemo.app.command.command.GameCommand
import eventDemo.libs.command.CommandStream import eventDemo.libs.command.CommandStream
import eventDemo.libs.command.CommandStreamChannel import eventDemo.libs.command.CommandStreamChannel
import eventDemo.libs.command.CommandStreamInMemory
import io.ktor.websocket.Frame
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.serialization.json.Json
/**
* A stream to publish and read the game command.
*/
class GameCommandStreamInMemory : CommandStreamInMemory<GameCommand>()
/** /**
* A stream to publish and read the game command. * A stream to publish and read the game command.
*/ */
class GameCommandStream( class GameCommandStream(
incoming: ReceiveChannel<Frame>, incoming: ReceiveChannel<GameCommand>,
) : CommandStream<GameCommand> by CommandStreamChannel( ) : CommandStream<GameCommand> by CommandStreamChannel(incoming)
incoming,
{ Json.decodeFromString(GameCommand.serializer(), it) },
)

View File

@@ -20,14 +20,19 @@ import java.util.UUID
fun Application.configureSerialization() { fun Application.configureSerialization() {
install(ContentNegotiation) { install(ContentNegotiation) {
json( json(
defaultJsonSerializer(),
)
}
}
fun defaultJsonSerializer(): Json =
Json { Json {
serializersModule = serializersModule =
SerializersModule { SerializersModule {
contextual(UUID::class) { UUIDSerializer } contextual(UUID::class) { UUIDSerializer }
contextual(GameId::class) { GameIdSerializer } contextual(GameId::class) { GameIdSerializer }
} contextual(CommandId::class) { CommandIdSerializer }
}, contextual(Player.PlayerId::class) { PlayerIdSerializer }
)
} }
} }

View File

@@ -0,0 +1,43 @@
package eventDemo.libs
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
inline fun <reified T> CoroutineScope.toObjectChannel(
frames: ReceiveChannel<Frame>,
bufferSize: Int = 0,
): ReceiveChannel<T> {
val logger = KotlinLogging.logger { }
return produce(capacity = bufferSize) {
frames.consumeEach { frame ->
if (frame is Frame.Text) {
logger.debug { "Conversion of the Frame: ${frame.readText()}" }
send(Json.decodeFromString(frame.readText()))
} else {
logger.warn { "The frame is not a text frame" }
}
}
}
}
inline fun <reified T> CoroutineScope.fromFrameChannel(frames: SendChannel<Frame>): SendChannel<T> {
val channel = Channel<T>()
launch {
channel.consumeEach { obj ->
frames.send(Frame.Text(Json.encodeToString(obj)))
}
}
return channel
}

View File

@@ -31,3 +31,5 @@ interface CommandStream<C : Command> {
} }
} }
} }
typealias CommandBlock<C> = suspend CommandStream.ComputeStatus.(C) -> Unit

View File

@@ -1,29 +1,19 @@
package eventDemo.libs.command package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
/** /**
* Manage [Command]'s with kotlin Channel * Manage [Command]'s with kotlin Channel
*/ */
class CommandStreamChannel<C : Command>( class CommandStreamChannel<C : Command>(
private val incoming: ReceiveChannel<Frame>, private val incoming: ReceiveChannel<C>,
private val deserializer: (String) -> C,
) : CommandStream<C> { ) : CommandStream<C> {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
override suspend fun process(action: CommandBlock<C>) { override suspend fun process(action: CommandBlock<C>) {
// incoming.consumeEach { commandAsFrame ->
// if (commandAsFrame is Frame.Text) {
// compute(deserializer(commandAsFrame.readText()), action)
// }
// }
for (command in incoming) { for (command in incoming) {
if (command is Frame.Text) { compute(command, action)
compute(deserializer(command.readText()), action)
}
} }
} }

View File

@@ -1,69 +0,0 @@
package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
typealias CommandBlock<C> = suspend CommandStream.ComputeStatus.(C) -> Unit
/**
* Manage [Command]'s
*
* It stores the new [Command] in memory.
*/
abstract class CommandStreamInMemory<C : Command> : CommandStream<C> {
private val logger = KotlinLogging.logger {}
private val queue: Channel<C> =
Channel(onUndeliveredElement = {
logger.atWarn { "${it::class.simpleName} command not send" }
})
override suspend fun process(action: CommandBlock<C>) {
queue.consumeEach { command ->
compute(command, action)
}
for (command in queue) {
compute(command, action)
}
}
private suspend fun compute(
command: C,
action: CommandBlock<C>,
) {
val status =
object : CommandStream.ComputeStatus {
var isSet: Boolean = false
override suspend fun ack() {
if (!isSet) markAsSuccess(command) else error("Already NACK")
isSet = true
}
override suspend fun nack() {
if (!isSet) markAsFailed(command) else error("Already ACK")
isSet = true
}
}
if (runCatching { status.action(command) }.isFailure) {
markAsFailed(command)
} else if (!status.isSet) {
status.ack()
}
}
private fun <C : Command> markAsSuccess(command: C) {
logger.atInfo {
message = "Compute command SUCCESS : $command"
payload = mapOf("command" to command)
}
}
private fun <C : Command> markAsFailed(command: C) {
logger.atWarn {
message = "Compute command FAILED : $command"
payload = mapOf("command" to command)
}
}
}

View File

@@ -20,6 +20,8 @@ import eventDemo.app.notification.PlayerWasReadyNotification
import eventDemo.app.notification.TheGameWasStartedNotification import eventDemo.app.notification.TheGameWasStartedNotification
import eventDemo.app.notification.WelcomeToTheGameNotification import eventDemo.app.notification.WelcomeToTheGameNotification
import eventDemo.configuration.appKoinModule import eventDemo.configuration.appKoinModule
import eventDemo.libs.fromFrameChannel
import eventDemo.libs.toObjectChannel
import eventDemo.shared.toFrame import eventDemo.shared.toFrame
import eventDemo.shared.toNotification import eventDemo.shared.toNotification
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
@@ -138,10 +140,10 @@ class GameStateTest :
playerNotificationListener.startListening(channelNotification2, player2) playerNotificationListener.startListening(channelNotification2, player2)
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player1, channelCommand1, channelNotification1) commandHandler.handle(player1, toObjectChannel(channelCommand1), fromFrameChannel(channelNotification1))
} }
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player2, channelCommand2, channelNotification2) commandHandler.handle(player2, toObjectChannel(channelCommand2), fromFrameChannel(channelNotification2))
} }
joinAll(player1Job, player2Job) joinAll(player1Job, player2Job)

View File

@@ -1,26 +1,15 @@
package eventDemo.app.query package eventDemo.app.query
import eventDemo.app.entity.GameId import eventDemo.configuration.defaultJsonSerializer
import eventDemo.configuration.GameIdSerializer
import eventDemo.configuration.UUIDSerializer
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.serialization.kotlinx.json.json import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.testing.ApplicationTestBuilder import io.ktor.server.testing.ApplicationTestBuilder
import kotlinx.serialization.json.Json
import kotlinx.serialization.modules.SerializersModule
import java.util.UUID
fun ApplicationTestBuilder.httpClient(): HttpClient = fun ApplicationTestBuilder.httpClient(): HttpClient =
createClient { createClient {
install(ContentNegotiation) { install(ContentNegotiation) {
json( json(
Json { defaultJsonSerializer(),
serializersModule =
SerializersModule {
contextual(UUID::class) { UUIDSerializer }
contextual(GameId::class) { GameIdSerializer }
}
},
) )
} }
} }

View File

@@ -0,0 +1,55 @@
package eventDemo.libs
import eventDemo.libs.command.Command
import eventDemo.libs.command.CommandId
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.equals.shouldBeEqual
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import java.util.UUID
import kotlin.test.assertIs
@Serializable
data class CommandTest(
override val id: CommandId,
) : Command
class FrameChannelConverterTest :
FunSpec({
test("toObjectChannel") {
val uuid = "d737c631-76af-406e-bc29-f3e5b97226a5"
val id = CommandId(UUID.fromString(uuid))
val jsonCommand = """{"id":"$uuid"}"""
val channel = Channel<Frame>()
launch {
val commandChannel = toObjectChannel<CommandTest>(channel)
commandChannel.receive().id shouldBeEqual id
channel.close()
}
channel.send(Frame.Text(jsonCommand))
}
test("fromFrameChannel") {
val uuid = "d737c631-76af-406e-bc29-f3e5b97226a5"
val id = CommandId(UUID.fromString(uuid))
val command = CommandTest(id)
val jsonCommand = """{"id":"$uuid"}"""
val channel = Channel<Frame>()
launch {
val commandChannel = fromFrameChannel<CommandTest>(channel)
commandChannel.send(command)
commandChannel.close()
}
assertIs<Frame.Text>(channel.receive()).readText() shouldBeEqual jsonCommand
}
})

View File

@@ -1,12 +1,10 @@
package eventDemo.libs.command package eventDemo.libs.command
import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.FunSpec
import io.ktor.websocket.Frame
import io.mockk.mockk import io.mockk.mockk
import io.mockk.verify import io.mockk.verify
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
@Serializable @Serializable
class CommandTest( class CommandTest(
@@ -19,12 +17,9 @@ class CommandStreamChannelTest :
test("send and receive") { test("send and receive") {
val command = CommandTest(CommandId()) val command = CommandTest(CommandId())
val channel = Channel<Frame>() val channel = Channel<CommandTest>()
val stream = val stream =
CommandStreamChannel<CommandTest>( CommandStreamChannel(channel)
incoming = channel,
deserializer = { Json.decodeFromString(it) },
)
val spyCall: () -> Unit = mockk(relaxed = true) val spyCall: () -> Unit = mockk(relaxed = true)
@@ -32,7 +27,7 @@ class CommandStreamChannelTest :
println("In action ${it.id}") println("In action ${it.id}")
spyCall() spyCall()
} }
channel.send(Frame.Text(Json.encodeToString(command))) channel.send(command)
verify(exactly = 1) { spyCall() } verify(exactly = 1) { spyCall() }
} }
}) })