Improve concurrency

Fix GameState.currentPlayerTurn and nextPlayer
Add ItsTheTurnOfNotification
Improve test
This commit is contained in:
2025-03-10 22:18:06 +01:00
parent 3a685496fd
commit 6028846828
23 changed files with 344 additions and 236 deletions

View File

@@ -15,7 +15,6 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.websocket.Frame import io.ktor.websocket.Frame
import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.trySendBlocking
/** /**
* Listen [GameCommand] on [GameCommandStream], check the validity and execute an action. * Listen [GameCommand] on [GameCommandStream], check the validity and execute an action.
@@ -33,29 +32,20 @@ class GameCommandHandler(
*/ */
suspend fun handle( suspend fun handle(
player: Player, player: Player,
incoming: ReceiveChannel<Frame>, incomingCommandChannel: ReceiveChannel<Frame>,
outgoing: SendChannel<Frame>, outgoingErrorChannelNotification: SendChannel<Frame>,
) { ) = GameCommandStream(incomingCommandChannel).process { command ->
val commandStream = GameCommandStream(incoming) if (command.payload.player.id != player.id) {
val playerErrorNotifier: (String) -> Unit = { nack()
}
val playerErrorNotifier: suspend (String) -> Unit = {
val notification = ErrorNotification(message = it) val notification = ErrorNotification(message = it)
logger.atInfo { logger.atWarn {
message = "Notification send ERROR: ${notification.message}" message = "Notification send ERROR: ${notification.message}"
payload = mapOf("notification" to notification) payload = mapOf("notification" to notification)
} }
outgoing.trySendBlocking(notification.toFrame()) outgoingErrorChannelNotification.send(notification.toFrame())
}
return init(player, commandStream, playerErrorNotifier)
}
private suspend fun init(
player: Player,
commandStream: GameCommandStream,
playerErrorNotifier: (String) -> Unit,
) {
commandStream.process { command ->
if (command.payload.player.id != player.id) {
nack()
} }
val gameState = gameStateRepository.get(command.payload.gameId) val gameState = gameStateRepository.get(command.payload.gameId)
@@ -68,4 +58,3 @@ class GameCommandHandler(
} }
} }
} }
}

View File

@@ -23,11 +23,15 @@ data class ICantPlayCommand(
override val player: Player, override val player: Player,
) : GameCommand.Payload ) : GameCommand.Payload
fun run( suspend fun run(
state: GameState, state: GameState,
playerErrorNotifier: (String) -> Unit, playerErrorNotifier: suspend (String) -> Unit,
eventHandler: GameEventHandler, eventHandler: GameEventHandler,
) { ) {
if (state.currentPlayerTurn != payload.player) {
playerErrorNotifier("Its not your turn!")
return
}
val playableCards = state.playableCards(payload.player) val playableCards = state.playableCards(payload.player)
if (playableCards.isEmpty()) { if (playableCards.isEmpty()) {
val takenCard = state.deck.stack.first() val takenCard = state.deck.stack.first()

View File

@@ -24,9 +24,9 @@ data class IWantToJoinTheGameCommand(
override val player: Player, override val player: Player,
) : GameCommand.Payload ) : GameCommand.Payload
fun run( suspend fun run(
state: GameState, state: GameState,
playerErrorNotifier: (String) -> Unit, playerErrorNotifier: suspend (String) -> Unit,
eventHandler: GameEventHandler, eventHandler: GameEventHandler,
) { ) {
val logger = KotlinLogging.logger {} val logger = KotlinLogging.logger {}

View File

@@ -25,15 +25,19 @@ data class IWantToPlayCardCommand(
val card: Card, val card: Card,
) : GameCommand.Payload ) : GameCommand.Payload
fun run( suspend fun run(
state: GameState, state: GameState,
playerErrorNotifier: (String) -> Unit, playerErrorNotifier: suspend (String) -> Unit,
eventHandler: GameEventHandler, eventHandler: GameEventHandler,
) { ) {
if (!state.isStarted) { if (!state.isStarted) {
playerErrorNotifier("The game is Not started") playerErrorNotifier("The game is Not started")
return return
} }
if (state.currentPlayerTurn != payload.player) {
playerErrorNotifier("Its not your turn!")
return
}
if (state.canBePlayThisCard(payload.player, payload.card)) { if (state.canBePlayThisCard(payload.player, payload.card)) {
eventHandler.handle( eventHandler.handle(

View File

@@ -23,9 +23,9 @@ data class IamReadyToPlayCommand(
override val player: Player, override val player: Player,
) : GameCommand.Payload ) : GameCommand.Payload
fun run( suspend fun run(
state: GameState, state: GameState,
playerErrorNotifier: (String) -> Unit, playerErrorNotifier: suspend (String) -> Unit,
eventHandler: GameEventHandler, eventHandler: GameEventHandler,
) { ) {
val playerExist: Boolean = state.players.contains(payload.player) val playerExist: Boolean = state.players.contains(payload.player)

View File

@@ -11,6 +11,7 @@ import java.util.UUID
data class CardIsPlayedEvent( data class CardIsPlayedEvent(
override val gameId: GameId, override val gameId: GameId,
val card: Card, val card: Card,
val player: Player, override val player: Player,
override val eventId: UUID = UUID.randomUUID(), override val eventId: UUID = UUID.randomUUID(),
) : GameEvent ) : GameEvent,
PlayerActionEvent

View File

@@ -0,0 +1,7 @@
package eventDemo.app.event.event
import eventDemo.app.entity.Player
sealed interface PlayerActionEvent {
val player: Player
}

View File

@@ -10,8 +10,9 @@ import java.util.UUID
*/ */
data class PlayerChoseColorEvent( data class PlayerChoseColorEvent(
override val gameId: GameId, override val gameId: GameId,
val player: Player, override val player: Player,
val color: Card.Color, val color: Card.Color,
) : GameEvent { ) : GameEvent,
PlayerActionEvent {
override val eventId: UUID = UUID.randomUUID() override val eventId: UUID = UUID.randomUUID()
} }

View File

@@ -10,8 +10,9 @@ import java.util.UUID
*/ */
data class PlayerHavePassEvent( data class PlayerHavePassEvent(
override val gameId: GameId, override val gameId: GameId,
val player: Player, override val player: Player,
val takenCard: Card, val takenCard: Card,
) : GameEvent { ) : GameEvent,
PlayerActionEvent {
override val eventId: UUID = UUID.randomUUID() override val eventId: UUID = UUID.randomUUID()
} }

View File

@@ -10,9 +10,9 @@ import kotlinx.serialization.Serializable
data class GameState( data class GameState(
val gameId: GameId, val gameId: GameId,
val players: Set<Player> = emptySet(), val players: Set<Player> = emptySet(),
val lastPlayer: Player? = null, val currentPlayerTurn: Player? = null,
val lastCard: LastCard? = null, val cardOnCurrentStack: LastCard? = null,
val lastColor: Card.Color? = null, val colorOnCurrentStack: Card.Color? = null,
val direction: Direction = Direction.CLOCKWISE, val direction: Direction = Direction.CLOCKWISE,
val readyPlayers: Set<Player> = emptySet(), val readyPlayers: Set<Player> = emptySet(),
val deck: Deck = Deck(players), val deck: Deck = Deck(players),
@@ -42,8 +42,8 @@ data class GameState(
return players.size == readyPlayers.size && players.all { readyPlayers.contains(it) } return players.size == readyPlayers.size && players.all { readyPlayers.contains(it) }
} }
private val lastPlayerIndex: Int? get() { private val currentPlayerIndex: Int? get() {
val i = players.indexOf(lastPlayer) val i = players.indexOf(currentPlayerTurn)
return if (i == -1) { return if (i == -1) {
null null
} else { } else {
@@ -51,28 +51,42 @@ data class GameState(
} }
} }
private val nextPlayerIndex: Int get() { private fun nextPlayerIndex(direction: Direction): Int {
if (players.size == 0) return 0 if (players.isEmpty()) return 0
val y = return if (direction == Direction.CLOCKWISE) {
if (direction == Direction.CLOCKWISE) { sidePlayerIndexClockwise
+1
} else { } else {
-1 sidePlayerIndexCounterClockwise
}
} }
return ((lastPlayerIndex ?: 0) + y) % players.size fun nextPlayer(direction: Direction): Player = players.elementAt(nextPlayerIndex(direction))
private val sidePlayerIndexClockwise: Int by lazy {
if (players.isEmpty()) {
0
} else {
((currentPlayerIndex ?: 0) + 1) % players.size
}
}
private val sidePlayerIndexCounterClockwise: Int by lazy {
if (players.isEmpty()) {
0
} else {
((currentPlayerIndex ?: 0) - 1) % players.size
}
} }
val nextPlayer: Player? by lazy { val nextPlayerTurn: Player? by lazy {
if (players.isEmpty()) { if (players.isEmpty()) {
null null
} else { } else {
players.elementAt(nextPlayerIndex) nextPlayer(direction)
} }
} }
val Player.currentIndex: Int get() = players.indexOf(this) private val Player.currentIndex: Int get() = players.indexOf(this)
fun Player.playerDiffIndex(nextPlayer: Player): Int = fun Player.playerDiffIndex(nextPlayer: Player): Int =
if (direction == Direction.CLOCKWISE) { if (direction == Direction.CLOCKWISE) {
@@ -82,8 +96,8 @@ data class GameState(
}.let { it % players.size } }.let { it % players.size }
val Player.cardOnBoardIsForYou: Boolean get() { val Player.cardOnBoardIsForYou: Boolean get() {
if (lastCard == null) error("No card") if (cardOnCurrentStack == null) error("No card")
return this.playerDiffIndex(lastCard.player) == 1 return this.playerDiffIndex(cardOnCurrentStack.player) == 1
} }
fun playableCards(player: Player): List<Card> = fun playableCards(player: Player): List<Card> =
@@ -102,7 +116,7 @@ data class GameState(
player: Player, player: Player,
card: Card, card: Card,
): Boolean { ): Boolean {
val cardOnBoard = lastCard?.card ?: return false val cardOnBoard = cardOnCurrentStack?.card ?: return false
return when (cardOnBoard) { return when (cardOnBoard) {
is Card.NumericCard -> { is Card.NumericCard -> {
when (card) { when (card) {
@@ -134,7 +148,7 @@ data class GameState(
is Card.ChangeColorCard -> { is Card.ChangeColorCard -> {
when (card) { when (card) {
is Card.AllColorCard -> true is Card.AllColorCard -> true
is Card.ColorCard -> card.color == lastColor is Card.ColorCard -> card.color == colorOnCurrentStack
} }
} }
@@ -156,7 +170,7 @@ data class GameState(
} else { } else {
when (card) { when (card) {
is Card.AllColorCard -> true is Card.AllColorCard -> true
is Card.ColorCard -> card.color == lastColor is Card.ColorCard -> card.color == colorOnCurrentStack
} }
} }
} }

View File

@@ -7,18 +7,22 @@ import eventDemo.app.event.event.CardIsPlayedEvent
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
import eventDemo.app.event.event.GameStartedEvent import eventDemo.app.event.event.GameStartedEvent
import eventDemo.app.event.event.NewPlayerEvent import eventDemo.app.event.event.NewPlayerEvent
import eventDemo.app.event.event.PlayerActionEvent
import eventDemo.app.event.event.PlayerChoseColorEvent import eventDemo.app.event.event.PlayerChoseColorEvent
import eventDemo.app.event.event.PlayerHavePassEvent import eventDemo.app.event.event.PlayerHavePassEvent
import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerReadyEvent
import eventDemo.app.event.event.PlayerWinEvent import eventDemo.app.event.event.PlayerWinEvent
import io.github.oshai.kotlinlogging.KotlinLogging
fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState { fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState {
val events = eventStream.readAll(this) val events = eventStream.readAll(this)
if (events.isEmpty()) return GameState(this) if (events.isEmpty()) return GameState(this)
return events.buildStateFromEvents() return events.buildStateFromEvents().also {
KotlinLogging.logger {}.warn { "state is build from scratch for game: $this " }
}
} }
fun List<GameEvent>.buildStateFromEvents(): GameState { fun Collection<GameEvent>.buildStateFromEvents(): GameState {
val gameId = this.firstOrNull()?.gameId ?: error("Cannot build GameState from an empty list") val gameId = this.firstOrNull()?.gameId ?: error("Cannot build GameState from an empty list")
return fold(GameState(gameId)) { state, event -> return fold(GameState(gameId)) { state, event ->
state.apply(event) state.apply(event)
@@ -27,9 +31,14 @@ fun List<GameEvent>.buildStateFromEvents(): GameState {
fun GameState.apply(event: GameEvent): GameState = fun GameState.apply(event: GameEvent): GameState =
let { state -> let { state ->
if (event is PlayerActionEvent) {
if (state.currentPlayerTurn != event.player) {
error("inconsistent player turn. currentPlayerTurn: $currentPlayerTurn | player: ${event.player}")
}
}
when (event) { when (event) {
is CardIsPlayedEvent -> { is CardIsPlayedEvent -> {
val direction = val nextDirectionAfterPlay =
when (event.card) { when (event.card) {
is Card.ReverseCard -> state.direction.revert() is Card.ReverseCard -> state.direction.revert()
else -> state.direction else -> state.direction
@@ -38,14 +47,21 @@ fun GameState.apply(event: GameEvent): GameState =
val color = val color =
when (event.card) { when (event.card) {
is Card.ColorCard -> event.card.color is Card.ColorCard -> event.card.color
else -> state.lastColor is Card.AllColorCard -> null
}
val currentPlayerAfterThePlay =
if (event.card is Card.AllColorCard) {
currentPlayerTurn
} else {
nextPlayer(nextDirectionAfterPlay)
} }
state.copy( state.copy(
lastPlayer = event.player, currentPlayerTurn = currentPlayerAfterThePlay,
direction = direction, direction = nextDirectionAfterPlay,
lastColor = color, colorOnCurrentStack = color,
lastCard = GameState.LastCard(event.card, event.player), cardOnCurrentStack = GameState.LastCard(event.card, event.player),
deck = state.deck.putOneCardFromHand(event.player, event.card), deck = state.deck.putOneCardFromHand(event.player, event.card),
) )
} }
@@ -59,6 +75,7 @@ fun GameState.apply(event: GameEvent): GameState =
} }
is PlayerReadyEvent -> { is PlayerReadyEvent -> {
if (state.isStarted) error("The game is already started")
state.copy( state.copy(
readyPlayers = state.readyPlayers + event.player, readyPlayers = state.readyPlayers + event.player,
) )
@@ -67,22 +84,23 @@ fun GameState.apply(event: GameEvent): GameState =
is PlayerHavePassEvent -> { is PlayerHavePassEvent -> {
if (event.takenCard != state.deck.stack.first()) error("taken card is not ot top of the stack") if (event.takenCard != state.deck.stack.first()) error("taken card is not ot top of the stack")
state.copy( state.copy(
lastPlayer = event.player, currentPlayerTurn = nextPlayerTurn,
deck = state.deck.takeOneCardFromStackTo(event.player), deck = state.deck.takeOneCardFromStackTo(event.player),
) )
} }
is PlayerChoseColorEvent -> { is PlayerChoseColorEvent -> {
state.copy( state.copy(
lastColor = event.color, currentPlayerTurn = nextPlayerTurn,
colorOnCurrentStack = event.color,
) )
} }
is GameStartedEvent -> { is GameStartedEvent -> {
state.copy( state.copy(
lastColor = (event.deck.discard.first() as? Card.ColorCard)?.color ?: state.lastColor, colorOnCurrentStack = (event.deck.discard.first() as? Card.ColorCard)?.color ?: state.colorOnCurrentStack,
lastCard = GameState.LastCard(event.deck.discard.first(), event.firstPlayer), cardOnCurrentStack = GameState.LastCard(event.deck.discard.first(), event.firstPlayer),
lastPlayer = event.firstPlayer, currentPlayerTurn = event.firstPlayer,
deck = event.deck, deck = event.deck,
isStarted = true, isStarted = true,
) )

View File

@@ -22,8 +22,7 @@ class GameStateRepository(
val projection = projections[event.gameId] val projection = projections[event.gameId]
if (projection == null) { if (projection == null) {
event event
.gameId .buildStateFromEventStreamTo(eventStream)
.buildStateFromEventStream(eventStream)
.update() .update()
} else { } else {
projection projection

View File

@@ -1,5 +1,6 @@
package eventDemo.app.eventListener package eventDemo.app.eventListener
import eventDemo.app.entity.Card
import eventDemo.app.entity.Player import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventBus import eventDemo.app.event.GameEventBus
import eventDemo.app.event.event.CardIsPlayedEvent import eventDemo.app.event.event.CardIsPlayedEvent
@@ -11,6 +12,8 @@ import eventDemo.app.event.event.PlayerHavePassEvent
import eventDemo.app.event.event.PlayerReadyEvent import eventDemo.app.event.event.PlayerReadyEvent
import eventDemo.app.event.event.PlayerWinEvent import eventDemo.app.event.event.PlayerWinEvent
import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.GameStateRepository
import eventDemo.app.notification.ItsTheTurnOfNotification
import eventDemo.app.notification.Notification
import eventDemo.app.notification.PlayerAsJoinTheGameNotification import eventDemo.app.notification.PlayerAsJoinTheGameNotification
import eventDemo.app.notification.PlayerAsPlayACardNotification import eventDemo.app.notification.PlayerAsPlayACardNotification
import eventDemo.app.notification.PlayerHavePassNotification import eventDemo.app.notification.PlayerHavePassNotification
@@ -33,22 +36,45 @@ class GameEventPlayerNotificationListener(
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
fun startListening( fun startListening(
outgoing: SendChannel<Frame>, outgoingNotificationChannel: SendChannel<Frame>,
currentPlayer: Player, currentPlayer: Player,
) { ) {
eventBus.subscribe { event: GameEvent -> eventBus.subscribe { event: GameEvent ->
val currentState = gameStateRepository.getUntil(event) val currentState = gameStateRepository.getUntil(event)
val notification =
fun Notification.send() {
if (currentState.players.contains(currentPlayer)) {
// Only notify players who have already joined the game.
outgoingNotificationChannel.trySendBlocking(toFrame())
logger.atInfo {
message = "Notification for player ${currentPlayer.name} was SEND: ${this@send}"
payload = mapOf("notification" to this@send, "event" to event)
}
} else {
// Rare use case, when a connexion is created with the channel,
// but the player was not already join in the game
logger.atWarn {
message = "Notification for player ${currentPlayer.name} was SKIP, No player on the game: ${this@send}"
payload = mapOf("notification" to this@send, "event" to event)
}
}
}
fun sendNextTurnNotif() =
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
when (event) { when (event) {
is NewPlayerEvent -> { is NewPlayerEvent -> {
if (currentPlayer != event.player) { if (currentPlayer != event.player) {
PlayerAsJoinTheGameNotification( PlayerAsJoinTheGameNotification(
player = event.player, player = event.player,
) ).send()
} else { } else {
WelcomeToTheGameNotification( WelcomeToTheGameNotification(
players = currentState.players, players = currentState.players,
) ).send()
} }
} }
@@ -57,9 +83,13 @@ class GameEventPlayerNotificationListener(
PlayerAsPlayACardNotification( PlayerAsPlayACardNotification(
player = event.player, player = event.player,
card = event.card, card = event.card,
) ).send()
} else { }
null
if (event.card !is Card.AllColorCard) {
ItsTheTurnOfNotification(
player = currentState.currentPlayerTurn ?: error("No player turn defined"),
).send()
} }
} }
@@ -68,7 +98,9 @@ class GameEventPlayerNotificationListener(
hand = hand =
event.deck.playersHands.getHand(currentPlayer) event.deck.playersHands.getHand(currentPlayer)
?: error("You are not in the game"), ?: error("You are not in the game"),
) ).send()
sendNextTurnNotif()
} }
is PlayerChoseColorEvent -> { is PlayerChoseColorEvent -> {
@@ -76,57 +108,38 @@ class GameEventPlayerNotificationListener(
PlayerWasChoseTheCardColorNotification( PlayerWasChoseTheCardColorNotification(
player = event.player, player = event.player,
color = event.color, color = event.color,
) ).send()
} else {
null
} }
sendNextTurnNotif()
} }
is PlayerHavePassEvent -> { is PlayerHavePassEvent -> {
if (currentPlayer == event.player) { if (currentPlayer == event.player) {
YourNewCardNotification( YourNewCardNotification(
card = event.takenCard, card = event.takenCard,
) ).send()
} else { } else {
PlayerHavePassNotification( PlayerHavePassNotification(
player = event.player, player = event.player,
) ).send()
} }
sendNextTurnNotif()
} }
is PlayerReadyEvent -> { is PlayerReadyEvent -> {
if (currentPlayer != event.player) { if (currentPlayer != event.player) {
PlayerWasReadyNotification( PlayerWasReadyNotification(
player = event.player, player = event.player,
) ).send()
} else {
null
} }
} }
is PlayerWinEvent -> { is PlayerWinEvent -> {
PlayerWinNotification( PlayerWinNotification(
player = event.player, player = event.player,
) ).send()
}
}
if (notification == null) {
logger.atInfo {
message = "Notification Ignore: $event"
payload = mapOf("event" to event)
}
} else if (currentState.players.contains(currentPlayer)) {
// Only notify players who have already joined the game.
outgoing.trySendBlocking(notification.toFrame())
logger.atInfo {
message = "Notification SEND: $notification"
payload = mapOf("notification" to notification, "event" to event)
}
} else {
logger.atInfo {
message = "Notification SKIP: $notification"
payload = mapOf("notification" to notification, "event" to event)
} }
} }
} }

View File

@@ -4,6 +4,7 @@ import eventDemo.app.event.GameEventBus
import eventDemo.app.event.GameEventHandler import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.GameEvent import eventDemo.app.event.event.GameEvent
import eventDemo.app.event.event.GameStartedEvent import eventDemo.app.event.event.GameStartedEvent
import eventDemo.app.event.event.PlayerReadyEvent
import eventDemo.app.event.event.PlayerWinEvent import eventDemo.app.event.event.PlayerWinEvent
import eventDemo.app.event.projection.GameState import eventDemo.app.event.projection.GameState
import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.GameStateRepository
@@ -29,7 +30,7 @@ class GameEventReactionListener(
} }
} }
private fun sendStartGameEvent( private suspend fun sendStartGameEvent(
state: GameState, state: GameState,
event: GameEvent, event: GameEvent,
) { ) {
@@ -40,7 +41,7 @@ class GameEventReactionListener(
state.players, state.players,
) )
logger.atInfo { logger.atInfo {
message = "Event Send on reaction of: $event" message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload = payload =
mapOf( mapOf(
"event" to event, "event" to event,
@@ -48,6 +49,10 @@ class GameEventReactionListener(
) )
} }
eventHandler.handle(reactionEvent) eventHandler.handle(reactionEvent)
} else {
if (event is PlayerReadyEvent) {
logger.info { "All players was not ready ${state.readyPlayers}" }
}
} }
} }
@@ -63,7 +68,7 @@ class GameEventReactionListener(
winner, winner,
) )
logger.atInfo { logger.atInfo {
message = "Event Send on reaction of: $event" message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload = payload =
mapOf( mapOf(
"event" to event, "event" to event,

View File

@@ -0,0 +1,13 @@
package eventDemo.app.notification
import eventDemo.app.entity.Player
import eventDemo.shared.UUIDSerializer
import kotlinx.serialization.Serializable
import java.util.UUID
@Serializable
data class ItsTheTurnOfNotification(
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID(),
val player: Player,
) : Notification

View File

@@ -40,7 +40,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
get<Game.Card> { body -> get<Game.Card> { body ->
gameStateRepository gameStateRepository
.get(body.game.id) .get(body.game.id)
.lastCard .cardOnCurrentStack
?.card ?.card
?.let { call.respond(it) } ?.let { call.respond(it) }
?: call.response.status(HttpStatusCode.BadRequest) ?: call.response.status(HttpStatusCode.BadRequest)

View File

@@ -49,7 +49,7 @@ class CommandStreamChannel<C : Command>(
val actionResult = runCatching { status.action(command) } val actionResult = runCatching { status.action(command) }
if (actionResult.isFailure) { if (actionResult.isFailure) {
logger.atInfo { logger.atInfo {
message = "Error on compute the Command" message = "Error on compute the Command: $command"
payload = mapOf("command" to command) payload = mapOf("command" to command)
cause = actionResult.exceptionOrNull() cause = actionResult.exceptionOrNull()
} }

View File

@@ -8,6 +8,6 @@ interface EventBus<E : Event<ID>, ID : AggregateId> {
*/ */
fun subscribe( fun subscribe(
priority: Int = 0, priority: Int = 0,
block: (E) -> Unit, block: suspend (E) -> Unit,
) )
} }

View File

@@ -1,19 +1,23 @@
package eventDemo.libs.event package eventDemo.libs.event
import kotlinx.coroutines.runBlocking
class EventBusInMemory<E : Event<ID>, ID : AggregateId> : EventBus<E, ID> { class EventBusInMemory<E : Event<ID>, ID : AggregateId> : EventBus<E, ID> {
private val subscribers: MutableList<Pair<Int, (E) -> Unit>> = mutableListOf() private val subscribers: MutableList<Pair<Int, suspend (E) -> Unit>> = mutableListOf()
override fun publish(event: E) { override fun publish(event: E) {
subscribers subscribers
.sortedByDescending { (priority, block) -> priority } .sortedByDescending { (priority, _) -> priority }
.forEach { (_, block) -> .forEach { (_, block) ->
runBlocking {
block(event) block(event)
} }
} }
}
override fun subscribe( override fun subscribe(
priority: Int, priority: Int,
block: (E) -> Unit, block: suspend (E) -> Unit,
) { ) {
subscribers.add(priority to block) subscribers.add(priority to block)
} }

View File

@@ -22,5 +22,5 @@ interface EventStream<E : Event<ID>, ID : AggregateId> {
): R? ): R?
/** Reads all events associated with a given aggregate ID */ /** Reads all events associated with a given aggregate ID */
fun readAll(aggregateId: ID): List<E> fun readAll(aggregateId: ID): Set<E>
} }

View File

@@ -1,6 +1,8 @@
package eventDemo.libs.event package eventDemo.libs.event
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
@@ -10,7 +12,7 @@ import kotlin.reflect.KClass
*/ */
class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID> { class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID> {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private val events: MutableList<E> = mutableListOf() private val events: Queue<E> = ConcurrentLinkedQueue()
override fun publish(event: E) { override fun publish(event: E) {
events.add(event) events.add(event)
@@ -34,7 +36,7 @@ class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID>
.filterIsInstance(eventType.java) .filterIsInstance(eventType.java)
.lastOrNull { it.gameId == aggregateId } .lastOrNull { it.gameId == aggregateId }
override fun readAll(aggregateId: ID): List<E> = events override fun readAll(aggregateId: ID): Set<E> = events.toSet()
} }
inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? = inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? =

View File

@@ -69,7 +69,7 @@ class GameStateBuilderTest :
val event = CardIsPlayedEvent(gameId, playedCard, player1) val event = CardIsPlayedEvent(gameId, playedCard, player1)
apply(event).also { state -> apply(event).also { state ->
state.gameId shouldBeEqual gameId state.gameId shouldBeEqual gameId
assertNotNull(state.lastCard).card shouldBeEqual playedCard assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard
assertIs<Card.NumericCard>(playedCard).let { assertIs<Card.NumericCard>(playedCard).let {
it.number shouldBeEqual 0 it.number shouldBeEqual 0
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red
@@ -80,7 +80,7 @@ class GameStateBuilderTest :
val event = CardIsPlayedEvent(gameId, playedCard, player2) val event = CardIsPlayedEvent(gameId, playedCard, player2)
apply(event).also { state -> apply(event).also { state ->
state.gameId shouldBeEqual gameId state.gameId shouldBeEqual gameId
assertNotNull(state.lastCard).card shouldBeEqual playedCard assertNotNull(state.cardOnCurrentStack).card shouldBeEqual playedCard
assertIs<Card.NumericCard>(playedCard).let { assertIs<Card.NumericCard>(playedCard).let {
it.number shouldBeEqual 7 it.number shouldBeEqual 7
it.color shouldBeEqual Card.Color.Red it.color shouldBeEqual Card.Color.Red

View File

@@ -4,6 +4,7 @@ import eventDemo.app.command.GameCommandHandler
import eventDemo.app.command.command.IWantToJoinTheGameCommand import eventDemo.app.command.command.IWantToJoinTheGameCommand
import eventDemo.app.command.command.IWantToPlayCardCommand import eventDemo.app.command.command.IWantToPlayCardCommand
import eventDemo.app.command.command.IamReadyToPlayCommand import eventDemo.app.command.command.IamReadyToPlayCommand
import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventStream import eventDemo.app.event.GameEventStream
@@ -12,6 +13,7 @@ import eventDemo.app.event.projection.GameState
import eventDemo.app.event.projection.buildStateFromEventStream import eventDemo.app.event.projection.buildStateFromEventStream
import eventDemo.app.eventListener.GameEventPlayerNotificationListener import eventDemo.app.eventListener.GameEventPlayerNotificationListener
import eventDemo.app.eventListener.GameEventReactionListener import eventDemo.app.eventListener.GameEventReactionListener
import eventDemo.app.notification.ItsTheTurnOfNotification
import eventDemo.app.notification.PlayerAsJoinTheGameNotification import eventDemo.app.notification.PlayerAsJoinTheGameNotification
import eventDemo.app.notification.PlayerAsPlayACardNotification import eventDemo.app.notification.PlayerAsPlayACardNotification
import eventDemo.app.notification.PlayerWasReadyNotification import eventDemo.app.notification.PlayerWasReadyNotification
@@ -29,6 +31,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import org.koin.dsl.koinApplication import org.koin.dsl.koinApplication
import kotlin.test.assertIs import kotlin.test.assertIs
@@ -43,75 +46,105 @@ class GameStateTest :
val id = GameId() val id = GameId()
val player1 = Player(name = "Nikola") val player1 = Player(name = "Nikola")
val player2 = Player(name = "Einstein") val player2 = Player(name = "Einstein")
val channelIn1 = Channel<Frame>() val channelCommand1 = Channel<Frame>(Channel.BUFFERED)
val channelIn2 = Channel<Frame>() val channelCommand2 = Channel<Frame>(Channel.BUFFERED)
val channelOut1 = Channel<Frame>(Channel.BUFFERED) val channelNotification1 = Channel<Frame>(Channel.BUFFERED)
val channelOut2 = Channel<Frame>(Channel.BUFFERED) val channelNotification2 = Channel<Frame>(Channel.BUFFERED)
var playedCard1: Card? = null
var playedCard2: Card? = null
val player1Job =
launch {
channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).toFrame())
channelNotification1.receive().toNotification().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1)
}
channelNotification1.receive().toNotification().let {
assertIs<PlayerAsJoinTheGameNotification>(it).player shouldBeEqual player2
}
channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).toFrame())
channelNotification1.receive().toNotification().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player2
}
val player1Hand =
channelNotification1.receive().toNotification().let {
assertIs<TheGameWasStartedNotification>(it).hand shouldHaveSize 7
}
playedCard1 = player1Hand.first()
channelNotification1.receive().toNotification().let {
assertIs<ItsTheTurnOfNotification>(it).apply {
player shouldBeEqual player1
}
}
channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).toFrame())
channelNotification1.receive().toNotification().let {
assertIs<ItsTheTurnOfNotification>(it).apply {
player shouldBeEqual player2
}
}
channelNotification1.receive().toNotification().let {
assertIs<PlayerAsPlayACardNotification>(it).apply {
player shouldBeEqual player2
card shouldBeEqual assertNotNull(playedCard2)
}
}
}
val player2Job =
launch {
delay(100)
channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).toFrame())
channelNotification2.receive().toNotification().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1, player2)
}
channelNotification2.receive().toNotification().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player1
}
channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).toFrame())
val player2Hand =
channelNotification2.receive().toNotification().let {
assertIs<TheGameWasStartedNotification>(it).hand shouldHaveSize 7
}
channelNotification2.receive().toNotification().let {
assertIs<ItsTheTurnOfNotification>(it).apply {
player shouldBeEqual player1
}
}
channelNotification2.receive().toNotification().let {
assertIs<PlayerAsPlayACardNotification>(it).apply {
player shouldBeEqual player1
card shouldBeEqual assertNotNull(playedCard1)
}
}
playedCard2 = player2Hand.first()
channelNotification2.receive().toNotification().let {
assertIs<ItsTheTurnOfNotification>(it).apply {
player shouldBeEqual player2
}
}
channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).toFrame())
}
koinApplication { modules(appKoinModule) }.koin.apply { koinApplication { modules(appKoinModule) }.koin.apply {
val commandHandler by inject<GameCommandHandler>() val commandHandler by inject<GameCommandHandler>()
val playerNotificationListener by inject<GameEventPlayerNotificationListener>()
val eventStream by inject<GameEventStream>() val eventStream by inject<GameEventStream>()
val playerNotificationListener by inject<GameEventPlayerNotificationListener>()
GameEventReactionListener(get(), get(), get()).init() GameEventReactionListener(get(), get(), get()).init()
playerNotificationListener.startListening(channelOut1, player1) playerNotificationListener.startListening(channelNotification1, player1)
playerNotificationListener.startListening(channelOut2, player2) playerNotificationListener.startListening(channelNotification2, player2)
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player1, channelIn1, channelOut1) commandHandler.handle(player1, channelCommand1, channelNotification1)
} }
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player2, channelIn2, channelOut2) commandHandler.handle(player2, channelCommand2, channelNotification2)
} }
channelIn1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).toFrame()) joinAll(player1Job, player2Job)
delay(50)
channelIn2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).toFrame())
delay(50)
channelOut1.receive().toNotification().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1)
}
channelOut2.receive().toNotification().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1, player2)
}
channelOut1.receive().toNotification().let {
assertIs<PlayerAsJoinTheGameNotification>(it).player shouldBeEqual player2
}
channelIn1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).toFrame())
delay(50)
channelIn2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).toFrame())
delay(50)
channelOut1.receive().toNotification().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player2
}
channelOut2.receive().toNotification().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player1
}
val player1Hand =
channelOut1.receive().toNotification().let {
assertIs<TheGameWasStartedNotification>(it).hand shouldHaveSize 7
}
val player2Hand =
channelOut2.receive().toNotification().let {
assertIs<TheGameWasStartedNotification>(it).hand shouldHaveSize 7
}
channelIn1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).toFrame())
delay(50)
channelOut2.receive().toNotification().let {
assertIs<PlayerAsPlayACardNotification>(it).player shouldBeEqual player1
assertIs<PlayerAsPlayACardNotification>(it).card shouldBeEqual player1Hand.first()
}
channelIn2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).toFrame())
delay(50)
channelOut1.receive().toNotification().let {
assertIs<PlayerAsPlayACardNotification>(it).player shouldBeEqual player2
assertIs<PlayerAsPlayACardNotification>(it).card shouldBeEqual player2Hand.first()
}
val state = id.buildStateFromEventStream(eventStream) val state = id.buildStateFromEventStream(eventStream)
@@ -120,7 +153,7 @@ class GameStateTest :
state.players shouldBeEqual setOf(player1, player2) state.players shouldBeEqual setOf(player1, player2)
state.readyPlayers shouldBeEqual setOf(player1, player2) state.readyPlayers shouldBeEqual setOf(player1, player2)
state.direction shouldBeEqual GameState.Direction.CLOCKWISE state.direction shouldBeEqual GameState.Direction.CLOCKWISE
assertNotNull(state.lastCard) shouldBeEqual GameState.LastCard(player2Hand.first(), player2) assertNotNull(state.cardOnCurrentStack) shouldBeEqual GameState.LastCard(assertNotNull(playedCard2), player2)
} }
} }
}) })