From 60288468284f3fa217e9e0684342b475ae02198f Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Mon, 10 Mar 2025 22:18:06 +0100 Subject: [PATCH] Improve concurrency Fix GameState.currentPlayerTurn and nextPlayer Add ItsTheTurnOfNotification Improve test --- .../app/command/GameCommandHandler.kt | 43 ++--- .../app/command/command/ICantPlayCommand.kt | 8 +- .../command/IWantToJoinTheGameCommand.kt | 4 +- .../command/command/IWantToPlayCardCommand.kt | 8 +- .../command/command/IamReadyToPlayCommand.kt | 4 +- .../app/event/event/CardIsPlayedEvent.kt | 5 +- .../app/event/event/PlayerActionEvent.kt | 7 + .../app/event/event/PlayerChoseColorEvent.kt | 5 +- .../app/event/event/PlayerHavePassEvent.kt | 5 +- .../app/event/projection/GameState.kt | 68 ++++--- .../app/event/projection/GameStateBuilder.kt | 44 +++-- .../event/projection/GameStateRepository.kt | 3 +- .../GameEventPlayerNotificationListener.kt | 171 ++++++++++-------- .../GameEventReactionListener.kt | 11 +- .../notification/ItsTheTurnOfNotification.kt | 13 ++ .../eventDemo/app/query/ReadTheGameState.kt | 2 +- .../libs/command/CommandStreamChannel.kt | 2 +- .../kotlin/eventDemo/libs/event/EventBus.kt | 2 +- .../eventDemo/libs/event/EventBusInMemory.kt | 12 +- .../eventDemo/libs/event/EventStream.kt | 2 +- .../libs/event/EventStreamInMemory.kt | 6 +- .../event/projection/GameStateBuilderTest.kt | 4 +- .../eventDemo/app/query/GameStateTest.kt | 151 ++++++++++------ 23 files changed, 344 insertions(+), 236 deletions(-) create mode 100644 src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt create mode 100644 src/main/kotlin/eventDemo/app/notification/ItsTheTurnOfNotification.kt diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt index f2ac542..2434371 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt @@ -15,7 +15,6 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.websocket.Frame import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.channels.trySendBlocking /** * Listen [GameCommand] on [GameCommandStream], check the validity and execute an action. @@ -33,39 +32,29 @@ class GameCommandHandler( */ suspend fun handle( player: Player, - incoming: ReceiveChannel, - outgoing: SendChannel, - ) { - val commandStream = GameCommandStream(incoming) - val playerErrorNotifier: (String) -> Unit = { + incomingCommandChannel: ReceiveChannel, + outgoingErrorChannelNotification: SendChannel, + ) = GameCommandStream(incomingCommandChannel).process { command -> + if (command.payload.player.id != player.id) { + nack() + } + + val playerErrorNotifier: suspend (String) -> Unit = { val notification = ErrorNotification(message = it) - logger.atInfo { + logger.atWarn { message = "Notification send ERROR: ${notification.message}" payload = mapOf("notification" to notification) } - outgoing.trySendBlocking(notification.toFrame()) + outgoingErrorChannelNotification.send(notification.toFrame()) } - return init(player, commandStream, playerErrorNotifier) - } - private suspend fun init( - player: Player, - commandStream: GameCommandStream, - playerErrorNotifier: (String) -> Unit, - ) { - commandStream.process { command -> - if (command.payload.player.id != player.id) { - nack() - } + val gameState = gameStateRepository.get(command.payload.gameId) - val gameState = gameStateRepository.get(command.payload.gameId) - - when (command) { - is IWantToPlayCardCommand -> command.run(gameState, playerErrorNotifier, eventHandler) - is IamReadyToPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler) - is IWantToJoinTheGameCommand -> command.run(gameState, playerErrorNotifier, eventHandler) - is ICantPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler) - } + when (command) { + is IWantToPlayCardCommand -> command.run(gameState, playerErrorNotifier, eventHandler) + is IamReadyToPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler) + is IWantToJoinTheGameCommand -> command.run(gameState, playerErrorNotifier, eventHandler) + is ICantPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler) } } } diff --git a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt index cfc8590..1150ca0 100644 --- a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt @@ -23,11 +23,15 @@ data class ICantPlayCommand( override val player: Player, ) : GameCommand.Payload - fun run( + suspend fun run( state: GameState, - playerErrorNotifier: (String) -> Unit, + playerErrorNotifier: suspend (String) -> Unit, eventHandler: GameEventHandler, ) { + if (state.currentPlayerTurn != payload.player) { + playerErrorNotifier("Its not your turn!") + return + } val playableCards = state.playableCards(payload.player) if (playableCards.isEmpty()) { val takenCard = state.deck.stack.first() diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt index bcd4695..fe974ff 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt @@ -24,9 +24,9 @@ data class IWantToJoinTheGameCommand( override val player: Player, ) : GameCommand.Payload - fun run( + suspend fun run( state: GameState, - playerErrorNotifier: (String) -> Unit, + playerErrorNotifier: suspend (String) -> Unit, eventHandler: GameEventHandler, ) { val logger = KotlinLogging.logger {} diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt index 2b2e664..a728369 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt @@ -25,15 +25,19 @@ data class IWantToPlayCardCommand( val card: Card, ) : GameCommand.Payload - fun run( + suspend fun run( state: GameState, - playerErrorNotifier: (String) -> Unit, + playerErrorNotifier: suspend (String) -> Unit, eventHandler: GameEventHandler, ) { if (!state.isStarted) { playerErrorNotifier("The game is Not started") return } + if (state.currentPlayerTurn != payload.player) { + playerErrorNotifier("Its not your turn!") + return + } if (state.canBePlayThisCard(payload.player, payload.card)) { eventHandler.handle( diff --git a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt index 0883032..e8b0dd8 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt @@ -23,9 +23,9 @@ data class IamReadyToPlayCommand( override val player: Player, ) : GameCommand.Payload - fun run( + suspend fun run( state: GameState, - playerErrorNotifier: (String) -> Unit, + playerErrorNotifier: suspend (String) -> Unit, eventHandler: GameEventHandler, ) { val playerExist: Boolean = state.players.contains(payload.player) diff --git a/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt b/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt index dc79777..6b775c5 100644 --- a/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/CardIsPlayedEvent.kt @@ -11,6 +11,7 @@ import java.util.UUID data class CardIsPlayedEvent( override val gameId: GameId, val card: Card, - val player: Player, + override val player: Player, override val eventId: UUID = UUID.randomUUID(), -) : GameEvent +) : GameEvent, + PlayerActionEvent diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt new file mode 100644 index 0000000..8c993d6 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerActionEvent.kt @@ -0,0 +1,7 @@ +package eventDemo.app.event.event + +import eventDemo.app.entity.Player + +sealed interface PlayerActionEvent { + val player: Player +} diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt index 5efd879..b192bf3 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerChoseColorEvent.kt @@ -10,8 +10,9 @@ import java.util.UUID */ data class PlayerChoseColorEvent( override val gameId: GameId, - val player: Player, + override val player: Player, val color: Card.Color, -) : GameEvent { +) : GameEvent, + PlayerActionEvent { override val eventId: UUID = UUID.randomUUID() } diff --git a/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt b/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt index b854eb9..1396170 100644 --- a/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt +++ b/src/main/kotlin/eventDemo/app/event/event/PlayerHavePassEvent.kt @@ -10,8 +10,9 @@ import java.util.UUID */ data class PlayerHavePassEvent( override val gameId: GameId, - val player: Player, + override val player: Player, val takenCard: Card, -) : GameEvent { +) : GameEvent, + PlayerActionEvent { override val eventId: UUID = UUID.randomUUID() } diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameState.kt b/src/main/kotlin/eventDemo/app/event/projection/GameState.kt index 070f5f6..b0f2d33 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameState.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameState.kt @@ -10,9 +10,9 @@ import kotlinx.serialization.Serializable data class GameState( val gameId: GameId, val players: Set = emptySet(), - val lastPlayer: Player? = null, - val lastCard: LastCard? = null, - val lastColor: Card.Color? = null, + val currentPlayerTurn: Player? = null, + val cardOnCurrentStack: LastCard? = null, + val colorOnCurrentStack: Card.Color? = null, val direction: Direction = Direction.CLOCKWISE, val readyPlayers: Set = emptySet(), val deck: Deck = Deck(players), @@ -42,8 +42,8 @@ data class GameState( return players.size == readyPlayers.size && players.all { readyPlayers.contains(it) } } - private val lastPlayerIndex: Int? get() { - val i = players.indexOf(lastPlayer) + private val currentPlayerIndex: Int? get() { + val i = players.indexOf(currentPlayerTurn) return if (i == -1) { null } else { @@ -51,28 +51,42 @@ data class GameState( } } - private val nextPlayerIndex: Int get() { - if (players.size == 0) return 0 + private fun nextPlayerIndex(direction: Direction): Int { + if (players.isEmpty()) return 0 - val y = - if (direction == Direction.CLOCKWISE) { - +1 - } else { - -1 - } - - return ((lastPlayerIndex ?: 0) + y) % players.size - } - - val nextPlayer: Player? by lazy { - if (players.isEmpty()) { - null + return if (direction == Direction.CLOCKWISE) { + sidePlayerIndexClockwise } else { - players.elementAt(nextPlayerIndex) + sidePlayerIndexCounterClockwise } } - val Player.currentIndex: Int get() = players.indexOf(this) + fun nextPlayer(direction: Direction): Player = players.elementAt(nextPlayerIndex(direction)) + + private val sidePlayerIndexClockwise: Int by lazy { + if (players.isEmpty()) { + 0 + } else { + ((currentPlayerIndex ?: 0) + 1) % players.size + } + } + private val sidePlayerIndexCounterClockwise: Int by lazy { + if (players.isEmpty()) { + 0 + } else { + ((currentPlayerIndex ?: 0) - 1) % players.size + } + } + + val nextPlayerTurn: Player? by lazy { + if (players.isEmpty()) { + null + } else { + nextPlayer(direction) + } + } + + private val Player.currentIndex: Int get() = players.indexOf(this) fun Player.playerDiffIndex(nextPlayer: Player): Int = if (direction == Direction.CLOCKWISE) { @@ -82,8 +96,8 @@ data class GameState( }.let { it % players.size } val Player.cardOnBoardIsForYou: Boolean get() { - if (lastCard == null) error("No card") - return this.playerDiffIndex(lastCard.player) == 1 + if (cardOnCurrentStack == null) error("No card") + return this.playerDiffIndex(cardOnCurrentStack.player) == 1 } fun playableCards(player: Player): List = @@ -102,7 +116,7 @@ data class GameState( player: Player, card: Card, ): Boolean { - val cardOnBoard = lastCard?.card ?: return false + val cardOnBoard = cardOnCurrentStack?.card ?: return false return when (cardOnBoard) { is Card.NumericCard -> { when (card) { @@ -134,7 +148,7 @@ data class GameState( is Card.ChangeColorCard -> { when (card) { is Card.AllColorCard -> true - is Card.ColorCard -> card.color == lastColor + is Card.ColorCard -> card.color == colorOnCurrentStack } } @@ -156,7 +170,7 @@ data class GameState( } else { when (card) { is Card.AllColorCard -> true - is Card.ColorCard -> card.color == lastColor + is Card.ColorCard -> card.color == colorOnCurrentStack } } } diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt index 2680761..7dcb532 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateBuilder.kt @@ -7,18 +7,22 @@ import eventDemo.app.event.event.CardIsPlayedEvent import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameStartedEvent import eventDemo.app.event.event.NewPlayerEvent +import eventDemo.app.event.event.PlayerActionEvent import eventDemo.app.event.event.PlayerChoseColorEvent import eventDemo.app.event.event.PlayerHavePassEvent import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerWinEvent +import io.github.oshai.kotlinlogging.KotlinLogging fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState { val events = eventStream.readAll(this) if (events.isEmpty()) return GameState(this) - return events.buildStateFromEvents() + return events.buildStateFromEvents().also { + KotlinLogging.logger {}.warn { "state is build from scratch for game: $this " } + } } -fun List.buildStateFromEvents(): GameState { +fun Collection.buildStateFromEvents(): GameState { val gameId = this.firstOrNull()?.gameId ?: error("Cannot build GameState from an empty list") return fold(GameState(gameId)) { state, event -> state.apply(event) @@ -27,9 +31,14 @@ fun List.buildStateFromEvents(): GameState { fun GameState.apply(event: GameEvent): GameState = let { state -> + if (event is PlayerActionEvent) { + if (state.currentPlayerTurn != event.player) { + error("inconsistent player turn. currentPlayerTurn: $currentPlayerTurn | player: ${event.player}") + } + } when (event) { is CardIsPlayedEvent -> { - val direction = + val nextDirectionAfterPlay = when (event.card) { is Card.ReverseCard -> state.direction.revert() else -> state.direction @@ -38,14 +47,21 @@ fun GameState.apply(event: GameEvent): GameState = val color = when (event.card) { is Card.ColorCard -> event.card.color - else -> state.lastColor + is Card.AllColorCard -> null + } + + val currentPlayerAfterThePlay = + if (event.card is Card.AllColorCard) { + currentPlayerTurn + } else { + nextPlayer(nextDirectionAfterPlay) } state.copy( - lastPlayer = event.player, - direction = direction, - lastColor = color, - lastCard = GameState.LastCard(event.card, event.player), + currentPlayerTurn = currentPlayerAfterThePlay, + direction = nextDirectionAfterPlay, + colorOnCurrentStack = color, + cardOnCurrentStack = GameState.LastCard(event.card, event.player), deck = state.deck.putOneCardFromHand(event.player, event.card), ) } @@ -59,6 +75,7 @@ fun GameState.apply(event: GameEvent): GameState = } is PlayerReadyEvent -> { + if (state.isStarted) error("The game is already started") state.copy( readyPlayers = state.readyPlayers + event.player, ) @@ -67,22 +84,23 @@ fun GameState.apply(event: GameEvent): GameState = is PlayerHavePassEvent -> { if (event.takenCard != state.deck.stack.first()) error("taken card is not ot top of the stack") state.copy( - lastPlayer = event.player, + currentPlayerTurn = nextPlayerTurn, deck = state.deck.takeOneCardFromStackTo(event.player), ) } is PlayerChoseColorEvent -> { state.copy( - lastColor = event.color, + currentPlayerTurn = nextPlayerTurn, + colorOnCurrentStack = event.color, ) } is GameStartedEvent -> { state.copy( - lastColor = (event.deck.discard.first() as? Card.ColorCard)?.color ?: state.lastColor, - lastCard = GameState.LastCard(event.deck.discard.first(), event.firstPlayer), - lastPlayer = event.firstPlayer, + colorOnCurrentStack = (event.deck.discard.first() as? Card.ColorCard)?.color ?: state.colorOnCurrentStack, + cardOnCurrentStack = GameState.LastCard(event.deck.discard.first(), event.firstPlayer), + currentPlayerTurn = event.firstPlayer, deck = event.deck, isStarted = true, ) diff --git a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt index fc111b5..e262e07 100644 --- a/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt +++ b/src/main/kotlin/eventDemo/app/event/projection/GameStateRepository.kt @@ -22,8 +22,7 @@ class GameStateRepository( val projection = projections[event.gameId] if (projection == null) { event - .gameId - .buildStateFromEventStream(eventStream) + .buildStateFromEventStreamTo(eventStream) .update() } else { projection diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt index b4f43e0..2808d40 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventPlayerNotificationListener.kt @@ -1,5 +1,6 @@ package eventDemo.app.eventListener +import eventDemo.app.entity.Card import eventDemo.app.entity.Player import eventDemo.app.event.GameEventBus import eventDemo.app.event.event.CardIsPlayedEvent @@ -11,6 +12,8 @@ import eventDemo.app.event.event.PlayerHavePassEvent import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerWinEvent import eventDemo.app.event.projection.GameStateRepository +import eventDemo.app.notification.ItsTheTurnOfNotification +import eventDemo.app.notification.Notification import eventDemo.app.notification.PlayerAsJoinTheGameNotification import eventDemo.app.notification.PlayerAsPlayACardNotification import eventDemo.app.notification.PlayerHavePassNotification @@ -33,100 +36,110 @@ class GameEventPlayerNotificationListener( private val logger = KotlinLogging.logger {} fun startListening( - outgoing: SendChannel, + outgoingNotificationChannel: SendChannel, currentPlayer: Player, ) { eventBus.subscribe { event: GameEvent -> val currentState = gameStateRepository.getUntil(event) - val notification = - when (event) { - is NewPlayerEvent -> { - if (currentPlayer != event.player) { - PlayerAsJoinTheGameNotification( - player = event.player, - ) - } else { - WelcomeToTheGameNotification( - players = currentState.players, - ) - } - } - is CardIsPlayedEvent -> { - if (currentPlayer != event.player) { - PlayerAsPlayACardNotification( - player = event.player, - card = event.card, - ) - } else { - null - } + fun Notification.send() { + if (currentState.players.contains(currentPlayer)) { + // Only notify players who have already joined the game. + outgoingNotificationChannel.trySendBlocking(toFrame()) + logger.atInfo { + message = "Notification for player ${currentPlayer.name} was SEND: ${this@send}" + payload = mapOf("notification" to this@send, "event" to event) } - - is GameStartedEvent -> { - TheGameWasStartedNotification( - hand = - event.deck.playersHands.getHand(currentPlayer) - ?: error("You are not in the game"), - ) + } else { + // Rare use case, when a connexion is created with the channel, + // but the player was not already join in the game + logger.atWarn { + message = "Notification for player ${currentPlayer.name} was SKIP, No player on the game: ${this@send}" + payload = mapOf("notification" to this@send, "event" to event) } + } + } - is PlayerChoseColorEvent -> { - if (currentPlayer != event.player) { - PlayerWasChoseTheCardColorNotification( - player = event.player, - color = event.color, - ) - } else { - null - } - } + fun sendNextTurnNotif() = + ItsTheTurnOfNotification( + player = currentState.currentPlayerTurn ?: error("No player turn defined"), + ).send() - is PlayerHavePassEvent -> { - if (currentPlayer == event.player) { - YourNewCardNotification( - card = event.takenCard, - ) - } else { - PlayerHavePassNotification( - player = event.player, - ) - } - } - - is PlayerReadyEvent -> { - if (currentPlayer != event.player) { - PlayerWasReadyNotification( - player = event.player, - ) - } else { - null - } - } - - is PlayerWinEvent -> { - PlayerWinNotification( + when (event) { + is NewPlayerEvent -> { + if (currentPlayer != event.player) { + PlayerAsJoinTheGameNotification( player = event.player, - ) + ).send() + } else { + WelcomeToTheGameNotification( + players = currentState.players, + ).send() } } - if (notification == null) { - logger.atInfo { - message = "Notification Ignore: $event" - payload = mapOf("event" to event) + is CardIsPlayedEvent -> { + if (currentPlayer != event.player) { + PlayerAsPlayACardNotification( + player = event.player, + card = event.card, + ).send() + } + + if (event.card !is Card.AllColorCard) { + ItsTheTurnOfNotification( + player = currentState.currentPlayerTurn ?: error("No player turn defined"), + ).send() + } } - } else if (currentState.players.contains(currentPlayer)) { - // Only notify players who have already joined the game. - outgoing.trySendBlocking(notification.toFrame()) - logger.atInfo { - message = "Notification SEND: $notification" - payload = mapOf("notification" to notification, "event" to event) + + is GameStartedEvent -> { + TheGameWasStartedNotification( + hand = + event.deck.playersHands.getHand(currentPlayer) + ?: error("You are not in the game"), + ).send() + + sendNextTurnNotif() } - } else { - logger.atInfo { - message = "Notification SKIP: $notification" - payload = mapOf("notification" to notification, "event" to event) + + is PlayerChoseColorEvent -> { + if (currentPlayer != event.player) { + PlayerWasChoseTheCardColorNotification( + player = event.player, + color = event.color, + ).send() + } + + sendNextTurnNotif() + } + + is PlayerHavePassEvent -> { + if (currentPlayer == event.player) { + YourNewCardNotification( + card = event.takenCard, + ).send() + } else { + PlayerHavePassNotification( + player = event.player, + ).send() + } + + sendNextTurnNotif() + } + + is PlayerReadyEvent -> { + if (currentPlayer != event.player) { + PlayerWasReadyNotification( + player = event.player, + ).send() + } + } + + is PlayerWinEvent -> { + PlayerWinNotification( + player = event.player, + ).send() } } } diff --git a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt index 4f0118e..9443dbe 100644 --- a/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt +++ b/src/main/kotlin/eventDemo/app/eventListener/GameEventReactionListener.kt @@ -4,6 +4,7 @@ import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventHandler import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameStartedEvent +import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerWinEvent import eventDemo.app.event.projection.GameState import eventDemo.app.event.projection.GameStateRepository @@ -29,7 +30,7 @@ class GameEventReactionListener( } } - private fun sendStartGameEvent( + private suspend fun sendStartGameEvent( state: GameState, event: GameEvent, ) { @@ -40,7 +41,7 @@ class GameEventReactionListener( state.players, ) logger.atInfo { - message = "Event Send on reaction of: $event" + message = "Reaction event was Send $reactionEvent on reaction of: $event" payload = mapOf( "event" to event, @@ -48,6 +49,10 @@ class GameEventReactionListener( ) } eventHandler.handle(reactionEvent) + } else { + if (event is PlayerReadyEvent) { + logger.info { "All players was not ready ${state.readyPlayers}" } + } } } @@ -63,7 +68,7 @@ class GameEventReactionListener( winner, ) logger.atInfo { - message = "Event Send on reaction of: $event" + message = "Reaction event was Send $reactionEvent on reaction of: $event" payload = mapOf( "event" to event, diff --git a/src/main/kotlin/eventDemo/app/notification/ItsTheTurnOfNotification.kt b/src/main/kotlin/eventDemo/app/notification/ItsTheTurnOfNotification.kt new file mode 100644 index 0000000..f3043d8 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/notification/ItsTheTurnOfNotification.kt @@ -0,0 +1,13 @@ +package eventDemo.app.notification + +import eventDemo.app.entity.Player +import eventDemo.shared.UUIDSerializer +import kotlinx.serialization.Serializable +import java.util.UUID + +@Serializable +data class ItsTheTurnOfNotification( + @Serializable(with = UUIDSerializer::class) + override val id: UUID = UUID.randomUUID(), + val player: Player, +) : Notification diff --git a/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt b/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt index a361d80..451f19f 100644 --- a/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt +++ b/src/main/kotlin/eventDemo/app/query/ReadTheGameState.kt @@ -40,7 +40,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) { get { body -> gameStateRepository .get(body.game.id) - .lastCard + .cardOnCurrentStack ?.card ?.let { call.respond(it) } ?: call.response.status(HttpStatusCode.BadRequest) diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index 821256a..bb31b57 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -49,7 +49,7 @@ class CommandStreamChannel( val actionResult = runCatching { status.action(command) } if (actionResult.isFailure) { logger.atInfo { - message = "Error on compute the Command" + message = "Error on compute the Command: $command" payload = mapOf("command" to command) cause = actionResult.exceptionOrNull() } diff --git a/src/main/kotlin/eventDemo/libs/event/EventBus.kt b/src/main/kotlin/eventDemo/libs/event/EventBus.kt index 78ead60..c89ceda 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventBus.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventBus.kt @@ -8,6 +8,6 @@ interface EventBus, ID : AggregateId> { */ fun subscribe( priority: Int = 0, - block: (E) -> Unit, + block: suspend (E) -> Unit, ) } diff --git a/src/main/kotlin/eventDemo/libs/event/EventBusInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventBusInMemory.kt index c6ceece..130f3ae 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventBusInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventBusInMemory.kt @@ -1,19 +1,23 @@ package eventDemo.libs.event +import kotlinx.coroutines.runBlocking + class EventBusInMemory, ID : AggregateId> : EventBus { - private val subscribers: MutableList Unit>> = mutableListOf() + private val subscribers: MutableList Unit>> = mutableListOf() override fun publish(event: E) { subscribers - .sortedByDescending { (priority, block) -> priority } + .sortedByDescending { (priority, _) -> priority } .forEach { (_, block) -> - block(event) + runBlocking { + block(event) + } } } override fun subscribe( priority: Int, - block: (E) -> Unit, + block: suspend (E) -> Unit, ) { subscribers.add(priority to block) } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStream.kt b/src/main/kotlin/eventDemo/libs/event/EventStream.kt index 25b9c6a..897baad 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStream.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStream.kt @@ -22,5 +22,5 @@ interface EventStream, ID : AggregateId> { ): R? /** Reads all events associated with a given aggregate ID */ - fun readAll(aggregateId: ID): List + fun readAll(aggregateId: ID): Set } diff --git a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt index ac1754c..7edc751 100644 --- a/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt +++ b/src/main/kotlin/eventDemo/libs/event/EventStreamInMemory.kt @@ -1,6 +1,8 @@ package eventDemo.libs.event import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue import kotlin.reflect.KClass /** @@ -10,7 +12,7 @@ import kotlin.reflect.KClass */ class EventStreamInMemory, ID : AggregateId> : EventStream { private val logger = KotlinLogging.logger {} - private val events: MutableList = mutableListOf() + private val events: Queue = ConcurrentLinkedQueue() override fun publish(event: E) { events.add(event) @@ -34,7 +36,7 @@ class EventStreamInMemory, ID : AggregateId> : EventStream .filterIsInstance(eventType.java) .lastOrNull { it.gameId == aggregateId } - override fun readAll(aggregateId: ID): List = events + override fun readAll(aggregateId: ID): Set = events.toSet() } inline fun , ID : AggregateId> EventStream.readLastOf(aggregateId: ID): R? = diff --git a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt index 816f2d0..38dbf35 100644 --- a/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt +++ b/src/test/kotlin/eventDemo/app/event/projection/GameStateBuilderTest.kt @@ -69,7 +69,7 @@ class GameStateBuilderTest : val event = CardIsPlayedEvent(gameId, playedCard, player1) apply(event).also { state -> state.gameId shouldBeEqual gameId - assertNotNull(state.lastCard).card shouldBeEqual playedCard + assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard assertIs(playedCard).let { it.number shouldBeEqual 0 it.color shouldBeEqual Card.Color.Red @@ -80,7 +80,7 @@ class GameStateBuilderTest : val event = CardIsPlayedEvent(gameId, playedCard, player2) apply(event).also { state -> state.gameId shouldBeEqual gameId - assertNotNull(state.lastCard).card shouldBeEqual playedCard + assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard assertIs(playedCard).let { it.number shouldBeEqual 7 it.color shouldBeEqual Card.Color.Red diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index ca3c081..e107637 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -4,6 +4,7 @@ import eventDemo.app.command.GameCommandHandler import eventDemo.app.command.command.IWantToJoinTheGameCommand import eventDemo.app.command.command.IWantToPlayCardCommand import eventDemo.app.command.command.IamReadyToPlayCommand +import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player import eventDemo.app.event.GameEventStream @@ -12,6 +13,7 @@ import eventDemo.app.event.projection.GameState import eventDemo.app.event.projection.buildStateFromEventStream import eventDemo.app.eventListener.GameEventPlayerNotificationListener import eventDemo.app.eventListener.GameEventReactionListener +import eventDemo.app.notification.ItsTheTurnOfNotification import eventDemo.app.notification.PlayerAsJoinTheGameNotification import eventDemo.app.notification.PlayerAsPlayACardNotification import eventDemo.app.notification.PlayerWasReadyNotification @@ -29,6 +31,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import org.koin.dsl.koinApplication import kotlin.test.assertIs @@ -43,75 +46,105 @@ class GameStateTest : val id = GameId() val player1 = Player(name = "Nikola") val player2 = Player(name = "Einstein") - val channelIn1 = Channel() - val channelIn2 = Channel() - val channelOut1 = Channel(Channel.BUFFERED) - val channelOut2 = Channel(Channel.BUFFERED) + val channelCommand1 = Channel(Channel.BUFFERED) + val channelCommand2 = Channel(Channel.BUFFERED) + val channelNotification1 = Channel(Channel.BUFFERED) + val channelNotification2 = Channel(Channel.BUFFERED) + + var playedCard1: Card? = null + var playedCard2: Card? = null + + val player1Job = + launch { + channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).toFrame()) + channelNotification1.receive().toNotification().let { + assertIs(it).players shouldBeEqual setOf(player1) + } + channelNotification1.receive().toNotification().let { + assertIs(it).player shouldBeEqual player2 + } + channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).toFrame()) + channelNotification1.receive().toNotification().let { + assertIs(it).player shouldBeEqual player2 + } + val player1Hand = + channelNotification1.receive().toNotification().let { + assertIs(it).hand shouldHaveSize 7 + } + playedCard1 = player1Hand.first() + channelNotification1.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).toFrame()) + + channelNotification1.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + + channelNotification1.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player2 + card shouldBeEqual assertNotNull(playedCard2) + } + } + } + + val player2Job = + launch { + delay(100) + channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).toFrame()) + channelNotification2.receive().toNotification().let { + assertIs(it).players shouldBeEqual setOf(player1, player2) + } + channelNotification2.receive().toNotification().let { + assertIs(it).player shouldBeEqual player1 + } + channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).toFrame()) + val player2Hand = + channelNotification2.receive().toNotification().let { + assertIs(it).hand shouldHaveSize 7 + } + channelNotification2.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player1 + } + } + channelNotification2.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player1 + card shouldBeEqual assertNotNull(playedCard1) + } + } + playedCard2 = player2Hand.first() + + channelNotification2.receive().toNotification().let { + assertIs(it).apply { + player shouldBeEqual player2 + } + } + channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).toFrame()) + } koinApplication { modules(appKoinModule) }.koin.apply { val commandHandler by inject() - val playerNotificationListener by inject() val eventStream by inject() + val playerNotificationListener by inject() GameEventReactionListener(get(), get(), get()).init() - playerNotificationListener.startListening(channelOut1, player1) - playerNotificationListener.startListening(channelOut2, player2) + playerNotificationListener.startListening(channelNotification1, player1) + playerNotificationListener.startListening(channelNotification2, player2) GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, channelIn1, channelOut1) + commandHandler.handle(player1, channelCommand1, channelNotification1) } GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, channelIn2, channelOut2) + commandHandler.handle(player2, channelCommand2, channelNotification2) } - channelIn1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).toFrame()) - delay(50) - channelIn2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).toFrame()) - delay(50) - channelOut1.receive().toNotification().let { - assertIs(it).players shouldBeEqual setOf(player1) - } - - channelOut2.receive().toNotification().let { - assertIs(it).players shouldBeEqual setOf(player1, player2) - } - channelOut1.receive().toNotification().let { - assertIs(it).player shouldBeEqual player2 - } - - channelIn1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).toFrame()) - delay(50) - channelIn2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).toFrame()) - delay(50) - - channelOut1.receive().toNotification().let { - assertIs(it).player shouldBeEqual player2 - } - channelOut2.receive().toNotification().let { - assertIs(it).player shouldBeEqual player1 - } - - val player1Hand = - channelOut1.receive().toNotification().let { - assertIs(it).hand shouldHaveSize 7 - } - val player2Hand = - channelOut2.receive().toNotification().let { - assertIs(it).hand shouldHaveSize 7 - } - - channelIn1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).toFrame()) - delay(50) - channelOut2.receive().toNotification().let { - assertIs(it).player shouldBeEqual player1 - assertIs(it).card shouldBeEqual player1Hand.first() - } - - channelIn2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).toFrame()) - delay(50) - channelOut1.receive().toNotification().let { - assertIs(it).player shouldBeEqual player2 - assertIs(it).card shouldBeEqual player2Hand.first() - } + joinAll(player1Job, player2Job) val state = id.buildStateFromEventStream(eventStream) @@ -120,7 +153,7 @@ class GameStateTest : state.players shouldBeEqual setOf(player1, player2) state.readyPlayers shouldBeEqual setOf(player1, player2) state.direction shouldBeEqual GameState.Direction.CLOCKWISE - assertNotNull(state.lastCard) shouldBeEqual GameState.LastCard(player2Hand.first(), player2) + assertNotNull(state.cardOnCurrentStack) shouldBeEqual GameState.LastCard(assertNotNull(playedCard2), player2) } } })