diff --git a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/GameEventBusInMemory.kt b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/GameEventBusInMemory.kt index 49ab033..814a3a1 100644 --- a/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/GameEventBusInMemory.kt +++ b/src/main/kotlin/eventDemo/adapter/infrastructureLayer/event/GameEventBusInMemory.kt @@ -7,5 +7,5 @@ import eventDemo.libs.event.EventBus import eventDemo.libs.event.EventBusInMemory class GameEventBusInMemory : - GameEventBus, - EventBus by EventBusInMemory() + GameEventBus(), + EventBus by EventBusInMemory() diff --git a/src/main/kotlin/eventDemo/adapter/interfaceLayer/GameCommandRouteWebSocket.kt b/src/main/kotlin/eventDemo/adapter/interfaceLayer/GameCommandRouteWebSocket.kt index be3899c..4e5b683 100644 --- a/src/main/kotlin/eventDemo/adapter/interfaceLayer/GameCommandRouteWebSocket.kt +++ b/src/main/kotlin/eventDemo/adapter/interfaceLayer/GameCommandRouteWebSocket.kt @@ -1,12 +1,16 @@ package eventDemo.adapter.interfaceLayer import eventDemo.business.command.GameCommandHandler +import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.eventListener.PlayerNotificationEventListener import eventDemo.business.notification.Notification +import eventDemo.configuration.ktor.BadRequestException +import eventDemo.configuration.ktor.HttpErrorBadRequest import eventDemo.libs.fromFrameChannel import eventDemo.libs.toObjectChannel import io.github.oshai.kotlinlogging.withLoggingContext +import io.ktor.http.parameters import io.ktor.server.application.ApplicationCall import io.ktor.server.auth.authenticate import io.ktor.server.auth.jwt.JWTPrincipal @@ -18,6 +22,7 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.launch +import java.util.UUID @DelicateCoroutinesApi fun Route.gameWebSocket( @@ -25,18 +30,27 @@ fun Route.gameWebSocket( commandHandler: GameCommandHandler, ) { authenticate { - webSocket("/game") { + webSocket("/game/{id}") { val currentPlayer = call.getPlayer() + val gameId = + call.parameters["id"]?.let { GameId(UUID.fromString(it)) } + ?: throw BadRequestException(HttpErrorBadRequest("No ID fore the game")) val outgoingFrameChannel: SendChannel = fromFrameChannel(outgoing) withLoggingContext("currentPlayer" to currentPlayer.toString()) { GlobalScope.launch { commandHandler.handle( currentPlayer, + gameId, toObjectChannel(incoming), outgoingFrameChannel, ) } - playerNotificationListener.startListening({ outgoingFrameChannel.trySendBlocking(it) }, currentPlayer) + + playerNotificationListener.startListening( + { outgoingFrameChannel.trySendBlocking(it) }, + currentPlayer, + gameId, + ) } } } diff --git a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt index 4ae4b15..9449112 100644 --- a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt @@ -1,6 +1,7 @@ package eventDemo.business.command import eventDemo.business.command.command.GameCommand +import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.GameEventBus import eventDemo.business.event.GameEventHandler @@ -55,27 +56,35 @@ class GameCommandHandler( */ suspend fun handle( player: Player, + gameId: GameId, incomingCommandChannel: ReceiveChannel, channelNotification: SendChannel, ) { commandStreamChannel.process(incomingCommandChannel) { command -> withLoggingContext("command" to command.toString()) { + if (command.payload.aggregateId.id != gameId.id) { + logger.warn { "Handle command Refuse, the gameId of the command is not the same" } + channelNotification.sendError(command)("The gameId in the command does not match with your game") + return@process + } + if (command.payload.player.id != player.id) { logger.warn { "Handle command Refuse, the player of the command is not the same" } channelNotification.sendError(command)("You are not the author of this command") - } else { - logger.info { "Handle command" } - try { - val eventBuilder = runner.run(command) + return@process + } - eventHandler.handle(command.payload.aggregateId) { version -> - eventBuilder(version) - .also { eventCommandMap.set(it.eventId, channelNotification, command.id) } - } - } catch (e: CommandException) { - logger.warn(e) { e.message } - channelNotification.sendError(command)(e.message) + logger.info { "Handle command" } + try { + val eventBuilder = runner.run(command) + + eventHandler.handle(command.payload.aggregateId) { version -> + eventBuilder(version) + .also { eventCommandMap.set(it.eventId, channelNotification, command.id) } } + } catch (e: CommandException) { + logger.warn(e) { e.message } + channelNotification.sendError(command)(e.message) } } } diff --git a/src/main/kotlin/eventDemo/business/event/GameEventBus.kt b/src/main/kotlin/eventDemo/business/event/GameEventBus.kt index a9e010e..6d6d4d0 100644 --- a/src/main/kotlin/eventDemo/business/event/GameEventBus.kt +++ b/src/main/kotlin/eventDemo/business/event/GameEventBus.kt @@ -3,5 +3,13 @@ package eventDemo.business.event import eventDemo.business.entity.GameId import eventDemo.business.event.event.GameEvent import eventDemo.libs.event.EventBus +import java.util.UUID -interface GameEventBus : EventBus +abstract class GameEventBus : + EventBus, + Comparable { + private val instanceId: UUID = UUID.randomUUID() + + override fun compareTo(other: GameEventBus): Int = + compareValues(instanceId, other.instanceId) +} diff --git a/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt b/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt index 1dd2f81..83f26e4 100644 --- a/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt +++ b/src/main/kotlin/eventDemo/business/event/event/GameStartedEvent.kt @@ -24,8 +24,8 @@ data class GameStartedEvent( fun new( id: GameId, players: Set, - shuffleIsDisabled: Boolean = isDisabled, version: Int, + shuffleIsDisabled: Boolean = isDisabled, ): GameStartedEvent = GameStartedEvent( aggregateId = id, diff --git a/src/main/kotlin/eventDemo/business/event/eventListener/PlayerNotificationEventListener.kt b/src/main/kotlin/eventDemo/business/event/eventListener/PlayerNotificationEventListener.kt index d76ebbf..a0b231c 100644 --- a/src/main/kotlin/eventDemo/business/event/eventListener/PlayerNotificationEventListener.kt +++ b/src/main/kotlin/eventDemo/business/event/eventListener/PlayerNotificationEventListener.kt @@ -1,6 +1,7 @@ package eventDemo.business.event.eventListener import eventDemo.business.entity.Card +import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.GameEventBus import eventDemo.business.event.event.CardIsPlayedEvent @@ -35,9 +36,14 @@ class PlayerNotificationEventListener( fun startListening( outgoingNotification: (Notification) -> Unit, currentPlayer: Player, + gameId: GameId, ) { eventBus.subscribe { event: GameEvent -> withLoggingContext("event" to event.toString()) { + if (event.aggregateId != gameId) { + return@subscribe + } + val currentState = gameStateRepository.getUntil(event) fun Notification.send() { diff --git a/src/main/kotlin/eventDemo/business/event/eventListener/ReactionEventListener.kt b/src/main/kotlin/eventDemo/business/event/eventListener/ReactionEventListener.kt index a564015..2e54069 100644 --- a/src/main/kotlin/eventDemo/business/event/eventListener/ReactionEventListener.kt +++ b/src/main/kotlin/eventDemo/business/event/eventListener/ReactionEventListener.kt @@ -10,6 +10,7 @@ import eventDemo.business.event.projection.GameState import eventDemo.business.event.projection.GameStateRepository import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.withLoggingContext +import java.util.concurrent.ConcurrentSkipListSet class ReactionEventListener( private val eventBus: GameEventBus, @@ -19,17 +20,22 @@ class ReactionEventListener( ) { companion object Config { const val DEFAULT_PRIORITY = -1000 + val registeredListeners = ConcurrentSkipListSet() } private val logger = KotlinLogging.logger { } fun init() { - eventBus.subscribe(priority) { event: GameEvent -> - withLoggingContext("event" to event.toString()) { - val state = gameStateRepository.getUntil(event) - sendStartGameEvent(state, event) - sendWinnerEvent(state) + if (registeredListeners.add(eventBus)) { + eventBus.subscribe(priority) { event: GameEvent -> + withLoggingContext("event" to event.toString()) { + val state = gameStateRepository.getUntil(event) + sendStartGameEvent(state, event) + sendWinnerEvent(state) + } } + } else { + logger.error { "${this::class.java.simpleName} is already init for this bus" } } } diff --git a/src/main/kotlin/eventDemo/configuration/ktor/ConfigureHttp.kt b/src/main/kotlin/eventDemo/configuration/ktor/ConfigureHttp.kt index 5d68bff..91c3c12 100644 --- a/src/main/kotlin/eventDemo/configuration/ktor/ConfigureHttp.kt +++ b/src/main/kotlin/eventDemo/configuration/ktor/ConfigureHttp.kt @@ -10,6 +10,7 @@ import io.ktor.server.plugins.cors.routing.CORS import io.ktor.server.plugins.statuspages.StatusPages import io.ktor.server.resources.Resources import io.ktor.server.response.respondText +import kotlinx.serialization.Serializable fun Application.configureHttpRouting() { install(CORS) { @@ -38,13 +39,14 @@ class BadRequestException( val httpError: HttpErrorBadRequest, ) : Exception() +@Serializable class HttpErrorBadRequest( - statusCode: HttpStatusCode, - val title: String = statusCode.description, - val invalidParams: List, + val title: String = HttpStatusCode.BadRequest.description, + val invalidParams: List = emptyList(), ) { - val statusCode: Int = statusCode.value + val statusCode: Int = HttpStatusCode.BadRequest.value + @Serializable data class InvalidParam( val name: String, val reason: String, diff --git a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt index f65b1ea..3027947 100644 --- a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt @@ -34,10 +34,19 @@ class GameCommandHandlerTest : val channelCommand = Channel(Channel.BUFFERED) val channelNotification = Channel(Channel.BUFFERED) ReactionEventListener(get(), get(), get()).init() - notificationListener.startListening({ channelNotification.trySendBlocking(it) }, player) + notificationListener.startListening( + { channelNotification.trySendBlocking(it) }, + player, + gameId, + ) GlobalScope.launch { - commandHandler.handle(player, channelCommand, channelNotification) + commandHandler.handle( + player, + gameId, + channelCommand, + channelNotification, + ) } IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand -> diff --git a/src/test/kotlin/eventDemo/app/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/app/query/GameSimulationTest.kt index bc749ca..85a7d7f 100644 --- a/src/test/kotlin/eventDemo/app/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameSimulationTest.kt @@ -48,7 +48,7 @@ class GameSimulationTest : test("Simulation of a game") { withTimeout(2.seconds) { disableShuffleDeck() - val id = GameId() + val gameId = GameId() val player1 = Player(name = "Nikola") val player2 = Player(name = "Einstein") val channelCommand1 = Channel(Channel.BUFFERED) @@ -61,7 +61,7 @@ class GameSimulationTest : val player1Job = launch { - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).also { sendCommand -> + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player1)).also { sendCommand -> channelCommand1.send(sendCommand) channelNotification1.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -74,7 +74,7 @@ class GameSimulationTest : channelNotification1.receive().let { assertIs(it).player shouldBeEqual player2 } - IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).also { sendCommand -> + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player1)).also { sendCommand -> channelCommand1.send(sendCommand) channelNotification1.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -94,7 +94,7 @@ class GameSimulationTest : } } - IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).also { sendCommand -> + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player1, player1Hand.first())).also { sendCommand -> channelCommand1.send(sendCommand) channelNotification1.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -118,7 +118,7 @@ class GameSimulationTest : val player2Job = launch { delay(100) - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).also { sendCommand -> + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand -> channelCommand2.send(sendCommand) channelNotification2.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -132,7 +132,7 @@ class GameSimulationTest : assertIs(it).player shouldBeEqual player1 } - IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).also { sendCommand -> + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player2)).also { sendCommand -> channelCommand2.send(sendCommand) channelNotification2.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -162,7 +162,7 @@ class GameSimulationTest : } } - IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).also { sendCommand -> + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player2, player2Hand.first())).also { sendCommand -> channelCommand2.send(sendCommand) channelNotification2.receive().let { assertIs(it).commandId shouldBeEqual sendCommand.id @@ -175,14 +175,14 @@ class GameSimulationTest : val eventStore by inject() val playerNotificationListener by inject() ReactionEventListener(get(), get(), get()).init() - playerNotificationListener.startListening({ channelNotification1.trySendBlocking(it) }, player1) - playerNotificationListener.startListening({ channelNotification2.trySendBlocking(it) }, player2) + playerNotificationListener.startListening({ channelNotification1.trySendBlocking(it) }, player1, gameId) + playerNotificationListener.startListening({ channelNotification2.trySendBlocking(it) }, player2, gameId) GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, channelCommand1, channelNotification1) + commandHandler.handle(player1, gameId, channelCommand1, channelNotification1) } GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, channelCommand2, channelNotification2) + commandHandler.handle(player2, gameId, channelCommand2, channelNotification2) } joinAll(player1Job, player2Job) @@ -192,9 +192,9 @@ class GameSimulationTest : eventStore = eventStore, initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, applyToProjection = GameState::apply, - ).getLast(id) + ).getLast(gameId) - state.aggregateId shouldBeEqual id + state.aggregateId shouldBeEqual gameId assertTrue(state.isStarted) state.players shouldBeEqual setOf(player1, player2) state.readyPlayers shouldBeEqual setOf(player1, player2) diff --git a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt index 3299094..a73d468 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateRouteTest.kt @@ -79,8 +79,8 @@ class GameStateRouteTest : GameStartedEvent.new( gameId, setOf(player1, player2), - shuffleIsDisabled = true, it, + shuffleIsDisabled = true, ) } delay(100)