diff --git a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt index 86de600..a12b98c 100644 --- a/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt +++ b/src/main/kotlin/eventDemo/business/event/projection/projectionListener/PlayerNotificationListener.kt @@ -31,6 +31,9 @@ class PlayerNotificationListener( ) { private val logger = KotlinLogging.logger {} + /** + * Forward projection from [bus][GameProjectionBus] to the player [notification][outgoingNotification] + */ fun startListening( currentPlayer: Player, gameId: GameId, @@ -39,7 +42,7 @@ class PlayerNotificationListener( return projectionBus.subscribe { currentState -> if (currentState !is GameState) return@subscribe if (currentState.aggregateId != gameId) return@subscribe - withLoggingContext("projection" to currentState.toString()) { + withLoggingContext("currentPlayer" to currentPlayer.toString(), "projection" to currentState.toString()) { fun Notification.send() { withLoggingContext("notification" to this.toString()) { if (currentState.players.contains(currentPlayer)) { diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index e712770..d6ee097 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -1,6 +1,7 @@ package eventDemo.adapter.interfaceLayer.query import eventDemo.Tag +import eventDemo.adapter.infrastructureLayer.event.projection.GameStateRepositoryInMemory import eventDemo.business.command.GameCommandHandler import eventDemo.business.command.command.GameCommand import eventDemo.business.command.command.IWantToJoinTheGameCommand @@ -12,7 +13,6 @@ import eventDemo.business.entity.Player import eventDemo.business.event.GameEventStore import eventDemo.business.event.event.disableShuffleDeck import eventDemo.business.event.projection.gameState.GameState -import eventDemo.business.event.projection.gameState.apply import eventDemo.business.event.projection.projectionListener.PlayerNotificationListener import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.ItsTheTurnOfNotification @@ -22,11 +22,10 @@ import eventDemo.business.notification.PlayerAsPlayACardNotification import eventDemo.business.notification.PlayerWasReadyNotification import eventDemo.business.notification.TheGameWasStartedNotification import eventDemo.business.notification.WelcomeToTheGameNotification -import eventDemo.libs.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.testKoinApplicationWithConfig +import io.kotest.assertions.nondeterministic.eventually import io.kotest.assertions.nondeterministic.until import io.kotest.core.spec.style.FunSpec -import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers @@ -36,7 +35,6 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout -import kotlin.test.assertIs import kotlin.test.assertNotNull import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds @@ -63,141 +61,124 @@ class GameSimulationTest : var player1HasJoin = false testKoinApplicationWithConfig { + val commandHandler by inject() + val eventStore by inject() + val playerNotificationListener by inject() + + // Run command handler + // In the normal process, these handlers is invoque players connect to the websocket + run { + GlobalScope.launch(Dispatchers.IO) { + commandHandler.handle(player1, gameId, channelCommand1, channelNotification1) + } + GlobalScope.launch(Dispatchers.IO) { + commandHandler.handle(player2, gameId, channelCommand2, channelNotification2) + } + } + + // Consume etch notification of players, and put theses in list. + // is used later to control when other players can be executing the next action + val player1Notifications = mutableListOf() + val player2Notifications = mutableListOf() + run { + GlobalScope.launch { + for (notification in channelNotification1) { + player1Notifications.add(notification) + } + } + + GlobalScope.launch { + for (notification in channelNotification2) { + player2Notifications.add(notification) + } + } + } + + // The player 1 actions val player1Job = launch { + playerNotificationListener.startListening(player1, gameId) { + channelNotification1.trySendBlocking(it) + } IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player1)).also { sendCommand -> channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + player1Notifications.waitNotification { commandId == sendCommand.id } } player1HasJoin = true - channelNotification1.receive().let { - assertIs(it).players shouldBeEqual setOf(player1) - } - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } + player1Notifications.waitNotification { players == setOf(player1) } + player1Notifications.waitNotification { player == player2 } + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player1)).also { sendCommand -> channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + player1Notifications.waitNotification { commandId == sendCommand.id } } - channelNotification1.receive().let { - assertIs(it).player shouldBeEqual player2 - } - val player1Hand = - channelNotification1.receive().let { - assertIs(it).hand shouldHaveSize 7 - } + player1Notifications.waitNotification { player == player2 } + val player1Hand = player1Notifications.waitNotification { hand.size == 7 }.hand + playedCard1 = player1Hand.first() - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - } - } + player1Notifications.waitNotification { player == player1 } IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player1, player1Hand.first())).also { sendCommand -> channelCommand1.send(sendCommand) - channelNotification1.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + player1Notifications.waitNotification { commandId == sendCommand.id } } - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - } - } + player1Notifications.waitNotification { player == player2 } - channelNotification1.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - card shouldBeEqual assertNotNull(playedCard2) - } + player1Notifications.waitNotification { + player == player2 && card == playedCard2 } } + // The player 2 actions val player2Job = launch { + // wait the player 1 has join the game until(1.seconds) { player1HasJoin } - IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand -> - channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + + playerNotificationListener.startListening(player2, gameId) { + channelNotification2.trySendBlocking(it) } - channelNotification2.receive().let { - assertIs(it).players shouldBeEqual setOf(player1, player2) - } - channelNotification2.receive().let { - assertIs(it).player shouldBeEqual player1 + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand -> + channelCommand2.send(sendCommand) + player2Notifications.waitNotification { commandId == sendCommand.id } } + player2Notifications.waitNotification { players == setOf(player1, player2) } + player2Notifications.waitNotification { player == player1 } + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player2)).also { sendCommand -> channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + player2Notifications.waitNotification { commandId == sendCommand.id } } val player2Hand = - channelNotification2.receive().let { - assertIs(it).hand shouldHaveSize 7 - } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - } - } - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player1 - card shouldBeEqual assertNotNull(playedCard1) - } + player2Notifications.waitNotification { hand.size == 7 }.hand + + player2Notifications.waitNotification { player == player1 } + player2Notifications.waitNotification { + player == player1 && card == playedCard1 } playedCard2 = player2Hand.first() - channelNotification2.receive().let { - assertIs(it).apply { - player shouldBeEqual player2 - } - } + player2Notifications.waitNotification { player == player2 } IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player2, player2Hand.first())).also { sendCommand -> channelCommand2.send(sendCommand) - channelNotification2.receive().let { - assertIs(it).commandId shouldBeEqual sendCommand.id - } + player2Notifications.waitNotification { commandId == sendCommand.id } } } - val commandHandler by inject() - val eventStore by inject() - val playerNotificationListener by inject() - playerNotificationListener.startListening(player1, gameId) { channelNotification1.trySendBlocking(it) } - playerNotificationListener.startListening(player2, gameId) { channelNotification2.trySendBlocking(it) } - - GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, gameId, channelCommand1, channelNotification1) - } - GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, gameId, channelCommand2, channelNotification2) - } - + // Wait the end of the game joinAll(player1Job, player2Job) - val state = - ProjectionSnapshotRepositoryInMemory( - eventStore = eventStore, - initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) }, - applyToProjection = GameState::apply, - ).getLast(gameId) + // Build the last state from the event store + val state = GameStateRepositoryInMemory(eventStore = eventStore).getLast(gameId) + // Check if the state is correct state.aggregateId shouldBeEqual gameId assertTrue(state.isStarted) state.players shouldBeEqual setOf(player1, player2) @@ -209,3 +190,8 @@ class GameSimulationTest : } } }) + +private suspend inline fun MutableList.waitNotification(crossinline block: T.() -> Boolean): T = + eventually(1.seconds) { + filterIsInstance().first { block(it) } + }