diff --git a/src/main/kotlin/eventDemo/app/command/ErrorNotifier.kt b/src/main/kotlin/eventDemo/app/command/ErrorNotifier.kt deleted file mode 100644 index 44d062b..0000000 --- a/src/main/kotlin/eventDemo/app/command/ErrorNotifier.kt +++ /dev/null @@ -1,29 +0,0 @@ -package eventDemo.app.command - -import eventDemo.app.command.command.GameCommand -import eventDemo.app.notification.ErrorNotification -import eventDemo.app.notification.Notification -import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.channels.SendChannel - -typealias ErrorNotifier = suspend (String) -> Unit - -fun errorNotifier( - command: GameCommand, - channel: SendChannel, -): ErrorNotifier = - { - val logger = KotlinLogging.logger { } - ErrorNotification(message = it, command = command) - .let { notification -> - logger.atWarn { - message = "Notification ERROR sent: ${notification.message}" - payload = - mapOf( - "notification" to notification, - "command" to command, - ) - } - channel.send(notification) - } - } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandActionRunner.kt b/src/main/kotlin/eventDemo/app/command/GameCommandActionRunner.kt index b6a4fa3..a82e08b 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandActionRunner.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandActionRunner.kt @@ -1,34 +1,27 @@ package eventDemo.app.command +import eventDemo.app.command.action.ICantPlay +import eventDemo.app.command.action.IWantToJoinTheGame +import eventDemo.app.command.action.IWantToPlayCard +import eventDemo.app.command.action.IamReadyToPlay import eventDemo.app.command.command.GameCommand import eventDemo.app.command.command.ICantPlayCommand import eventDemo.app.command.command.IWantToJoinTheGameCommand import eventDemo.app.command.command.IWantToPlayCardCommand import eventDemo.app.command.command.IamReadyToPlayCommand -import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.projection.GameStateRepository -import eventDemo.app.notification.Notification -import kotlinx.coroutines.channels.SendChannel +import eventDemo.app.event.event.GameEvent class GameCommandActionRunner( - private val eventHandler: GameEventHandler, - private val gameStateRepository: GameStateRepository, + private val iWantToPlayCard: IWantToPlayCard, + private val iamReadyToPlay: IamReadyToPlay, + private val iWantToJoinTheGame: IWantToJoinTheGame, + private val iCantPlay: ICantPlay, ) { - suspend fun run( - command: GameCommand, - outgoingErrorChannelNotification: SendChannel, - ) { - val gameState = gameStateRepository.getLast(command.payload.aggregateId) - - try { - when (command) { - is IWantToPlayCardCommand -> command.run(gameState, this.eventHandler) - is IamReadyToPlayCommand -> command.run(gameState, this.eventHandler) - is IWantToJoinTheGameCommand -> command.run(gameState, this.eventHandler) - is ICantPlayCommand -> command.run(gameState, this.eventHandler) - } - } catch (e: CommandException) { - errorNotifier(command, outgoingErrorChannelNotification)(e.message) + fun run(command: GameCommand): (Int) -> GameEvent = + when (command) { + is IWantToPlayCardCommand -> iWantToPlayCard.run(command) + is IamReadyToPlayCommand -> iamReadyToPlay.run(command) + is IWantToJoinTheGameCommand -> iWantToJoinTheGame.run(command) + is ICantPlayCommand -> iCantPlay.run(command) } - } } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt index 195322b..c60209d 100644 --- a/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/app/command/GameCommandHandler.kt @@ -2,46 +2,144 @@ package eventDemo.app.command import eventDemo.app.command.command.GameCommand import eventDemo.app.entity.Player +import eventDemo.app.event.GameEventBus +import eventDemo.app.event.GameEventHandler import eventDemo.app.event.event.GameEvent +import eventDemo.app.notification.CommandErrorNotification +import eventDemo.app.notification.CommandSuccessNotification import eventDemo.app.notification.Notification -import eventDemo.libs.command.CommandStreamChannelBuilder +import eventDemo.libs.command.CommandId +import eventDemo.libs.command.CommandStreamChannel import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap /** - * Listen [GameCommand] on [GameCommandStream], check the validity and execute an action. + * Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action. * * This action can be executing an action and produce a new [GameEvent] after verification. */ class GameCommandHandler( - private val commandStreamChannel: CommandStreamChannelBuilder, + private val commandStreamChannel: CommandStreamChannel, + private val eventHandler: GameEventHandler, private val runner: GameCommandActionRunner, + eventBus: GameEventBus, + listenerPriority: Int = DEFAULT_PRIORITY, ) { private val logger = KotlinLogging.logger { } + private val eventCommandMap = EventCommandMap() + + companion object Config { + const val DEFAULT_PRIORITY = 1000 + } + + // subscribe to the event bus to send success notification after save the event. + init { + eventBus.subscribe(listenerPriority) { event: GameEvent -> + eventCommandMap[event.eventId]?.apply { + channel.sendSuccess(commandId)() + } ?: logger.warn { "No Notification for event: $event" } + } + } /** - * Init the handler + * Run a command and publish the event. + * + * It restricts to run only once a command. + * + * If the command fail, send an [error notification][CommandErrorNotification], + * if success, send a [success notification][CommandSuccessNotification] */ suspend fun handle( player: Player, incomingCommandChannel: ReceiveChannel, - outgoingErrorChannelNotification: SendChannel, + channelNotification: SendChannel, ) = - commandStreamChannel(incomingCommandChannel) - .process { command -> - if (command.payload.player.id != player.id) { + commandStreamChannel.process(incomingCommandChannel) { command -> + if (command.payload.player.id != player.id) { + logger.atWarn { + message = "Handle command Refuse, the player of the command is not the same: $command" + payload = mapOf("command" to command) + } + channelNotification.sendError(command)("You are not the author of this command\n") + } else { + logger.atInfo { + message = "Handle command: $command" + payload = mapOf("command" to command) + } + try { + val eventBuilder = runner.run(command) + + eventHandler.handle(command.payload.aggregateId) { version -> + eventBuilder(version) + .also { eventCommandMap.set(it.eventId, channelNotification, command.id) } + } + } catch (e: CommandException) { logger.atWarn { - message = "Handle command Refuse, the player of the command is not the same: $command" + message = e.message payload = mapOf("command" to command) } - nack() - } else { - logger.atInfo { - message = "Handle command: $command" - payload = mapOf("command" to command) - } - runner.run(command, outgoingErrorChannelNotification) + channelNotification.sendError(command)(e.message) } } + } +} + +private fun SendChannel.sendSuccess(commandId: CommandId): suspend () -> Unit = + { + val logger = KotlinLogging.logger { } + CommandSuccessNotification(commandId = commandId) + .also { notification -> + logger.atDebug { + message = "Notification SUCCESS sent" + payload = + mapOf( + "notification" to notification, + "commandId" to commandId, + ) + } + send(notification) + } + } + +private fun SendChannel.sendError(command: GameCommand): suspend (String) -> Unit = + { + val logger = KotlinLogging.logger { } + CommandErrorNotification(message = it, command = command) + .also { notification -> + logger.atWarn { + message = "Notification ERROR sent: ${notification.message}" + payload = + mapOf( + "notification" to notification, + "command" to command, + ) + } + send(notification) + } + } + +/** + * Map to record the command that triggered the event. + */ +private class EventCommandMap { + val map = ConcurrentHashMap() + + fun set( + eventId: UUID, + channel: SendChannel, + commandId: CommandId, + ) { + map[eventId] = Output(channel, commandId) + } + + operator fun get(eventId: UUID): Output? = + map[eventId] + + data class Output( + val channel: SendChannel, + val commandId: CommandId, + ) } diff --git a/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt b/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt deleted file mode 100644 index 60a11d9..0000000 --- a/src/main/kotlin/eventDemo/app/command/GameCommandStream.kt +++ /dev/null @@ -1,13 +0,0 @@ -package eventDemo.app.command - -import eventDemo.app.command.command.GameCommand -import eventDemo.libs.command.CommandStream -import eventDemo.libs.command.CommandStreamChannel -import kotlinx.coroutines.channels.ReceiveChannel - -/** - * A stream to publish and read the game command. - */ -class GameCommandStream( - incoming: ReceiveChannel, -) : CommandStream by CommandStreamChannel(incoming) diff --git a/src/main/kotlin/eventDemo/app/command/action/CommandAction.kt b/src/main/kotlin/eventDemo/app/command/action/CommandAction.kt new file mode 100644 index 0000000..847a40a --- /dev/null +++ b/src/main/kotlin/eventDemo/app/command/action/CommandAction.kt @@ -0,0 +1,8 @@ +package eventDemo.app.command.action + +import eventDemo.libs.command.Command +import eventDemo.libs.event.Event + +sealed interface CommandAction> { + fun run(command: C): (Int) -> E +} diff --git a/src/main/kotlin/eventDemo/app/command/action/ICantPlay.kt b/src/main/kotlin/eventDemo/app/command/action/ICantPlay.kt new file mode 100644 index 0000000..e83f989 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/command/action/ICantPlay.kt @@ -0,0 +1,36 @@ +package eventDemo.app.command.action + +import eventDemo.app.command.CommandException +import eventDemo.app.command.command.ICantPlayCommand +import eventDemo.app.event.event.PlayerHavePassEvent +import eventDemo.app.event.projection.GameStateRepository + +/** + * A command to perform an action to play a new card + */ +data class ICantPlay( + private val gameStateRepository: GameStateRepository, +) : CommandAction { + override fun run(command: ICantPlayCommand): (Int) -> PlayerHavePassEvent { + val state = gameStateRepository.getLast(command.payload.aggregateId) + + if (state.currentPlayerTurn != command.payload.player) { + throw CommandException("Its not your turn!") + } + + val playableCards = state.playableCards(command.payload.player) + if (playableCards.isNotEmpty()) { + throw CommandException("You can and must play one card, like ${playableCards.first()::class.simpleName}") + } + + val takenCard = state.deck.stack.first() + return { version -> + PlayerHavePassEvent( + aggregateId = command.payload.aggregateId, + player = command.payload.player, + takenCard = takenCard, + version = version, + ) + } + } +} diff --git a/src/main/kotlin/eventDemo/app/command/action/IWantToJoinTheGame.kt b/src/main/kotlin/eventDemo/app/command/action/IWantToJoinTheGame.kt new file mode 100644 index 0000000..69fb04e --- /dev/null +++ b/src/main/kotlin/eventDemo/app/command/action/IWantToJoinTheGame.kt @@ -0,0 +1,28 @@ +package eventDemo.app.command.action + +import eventDemo.app.command.CommandException +import eventDemo.app.command.command.IWantToJoinTheGameCommand +import eventDemo.app.event.event.NewPlayerEvent +import eventDemo.app.event.projection.GameStateRepository + +/** + * A command to perform an action to play a new card + */ +data class IWantToJoinTheGame( + private val gameStateRepository: GameStateRepository, +) : CommandAction { + override fun run(command: IWantToJoinTheGameCommand): (Int) -> NewPlayerEvent { + val state = gameStateRepository.getLast(command.payload.aggregateId) + if (!state.isStarted) { + return { + NewPlayerEvent( + aggregateId = command.payload.aggregateId, + player = command.payload.player, + version = it, + ) + } + } else { + throw CommandException("The game is already started") + } + } +} diff --git a/src/main/kotlin/eventDemo/app/command/action/IWantToPlayCard.kt b/src/main/kotlin/eventDemo/app/command/action/IWantToPlayCard.kt new file mode 100644 index 0000000..e93efe2 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/command/action/IWantToPlayCard.kt @@ -0,0 +1,36 @@ +package eventDemo.app.command.action + +import eventDemo.app.command.CommandException +import eventDemo.app.command.command.IWantToPlayCardCommand +import eventDemo.app.event.event.CardIsPlayedEvent +import eventDemo.app.event.projection.GameStateRepository + +/** + * A command to perform an action to play a new card + */ +data class IWantToPlayCard( + private val gameStateRepository: GameStateRepository, +) : CommandAction { + override fun run(command: IWantToPlayCardCommand): (Int) -> CardIsPlayedEvent { + val state = gameStateRepository.getLast(command.payload.aggregateId) + + if (!state.isStarted) { + throw CommandException("The game is Not started") + } + if (state.currentPlayerTurn != command.payload.player) { + throw CommandException("Its not your turn!") + } + if (!state.canBePlayThisCard(command.payload.player, command.payload.card)) { + throw CommandException("You cannot play this card") + } + + return { version -> + CardIsPlayedEvent( + aggregateId = command.payload.aggregateId, + card = command.payload.card, + player = command.payload.player, + version = version, + ) + } + } +} diff --git a/src/main/kotlin/eventDemo/app/command/action/IamReadyToPlay.kt b/src/main/kotlin/eventDemo/app/command/action/IamReadyToPlay.kt new file mode 100644 index 0000000..6998503 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/command/action/IamReadyToPlay.kt @@ -0,0 +1,36 @@ +package eventDemo.app.command.action + +import eventDemo.app.command.CommandException +import eventDemo.app.command.command.IamReadyToPlayCommand +import eventDemo.app.event.event.PlayerReadyEvent +import eventDemo.app.event.projection.GameStateRepository + +/** + * A command to set as ready to play + */ +class IamReadyToPlay( + private val gameStateRepository: GameStateRepository, +) : CommandAction { + @Throws(CommandException::class) + override fun run(command: IamReadyToPlayCommand): (Int) -> PlayerReadyEvent { + val state = gameStateRepository.getLast(command.payload.aggregateId) + val playerExist: Boolean = state.players.contains(command.payload.player) + val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player) + + if (state.isStarted) { + throw CommandException("The game is already started") + } else if (!playerExist) { + throw CommandException("You are not in the game") + } else if (playerIsAlreadyReady) { + throw CommandException("You are already ready") + } else { + return { version: Int -> + PlayerReadyEvent( + aggregateId = command.payload.aggregateId, + player = command.payload.player, + version = version, + ) + } + } + } +} diff --git a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt index 719510f..58bb28a 100644 --- a/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/ICantPlayCommand.kt @@ -1,11 +1,7 @@ package eventDemo.app.command.command -import eventDemo.app.command.CommandException import eventDemo.app.entity.GameId import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.event.PlayerHavePassEvent -import eventDemo.app.event.projection.GameState import eventDemo.libs.command.CommandId import kotlinx.serialization.Serializable @@ -23,28 +19,4 @@ data class ICantPlayCommand( override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload - - suspend fun run( - state: GameState, - eventHandler: GameEventHandler, - ) { - if (state.currentPlayerTurn != payload.player) { - throw CommandException("Its not your turn!") - } - val playableCards = state.playableCards(payload.player) - if (playableCards.isEmpty()) { - val takenCard = state.deck.stack.first() - - eventHandler.handle(payload.aggregateId) { - PlayerHavePassEvent( - aggregateId = payload.aggregateId, - player = payload.player, - takenCard = takenCard, - version = it, - ) - } - } else { - throw CommandException("You can and must play one card, like ${playableCards.first()::class.simpleName}") - } - } } diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt index 4b90615..399215c 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToJoinTheGameCommand.kt @@ -1,11 +1,7 @@ package eventDemo.app.command.command -import eventDemo.app.command.CommandException import eventDemo.app.entity.GameId import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.event.NewPlayerEvent -import eventDemo.app.event.projection.GameState import eventDemo.libs.command.CommandId import kotlinx.serialization.Serializable @@ -23,21 +19,4 @@ data class IWantToJoinTheGameCommand( override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload - - suspend fun run( - state: GameState, - eventHandler: GameEventHandler, - ) { - if (!state.isStarted) { - eventHandler.handle(payload.aggregateId) { - NewPlayerEvent( - aggregateId = payload.aggregateId, - player = payload.player, - version = it, - ) - } - } else { - throw CommandException("The game is already started") - } - } } diff --git a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt index 5cb4e8e..f146e36 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IWantToPlayCardCommand.kt @@ -1,12 +1,8 @@ package eventDemo.app.command.command -import eventDemo.app.command.CommandException import eventDemo.app.entity.Card import eventDemo.app.entity.GameId import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.event.CardIsPlayedEvent -import eventDemo.app.event.projection.GameState import eventDemo.libs.command.CommandId import kotlinx.serialization.Serializable @@ -25,29 +21,4 @@ data class IWantToPlayCardCommand( override val player: Player, val card: Card, ) : GameCommand.Payload - - suspend fun run( - state: GameState, - eventHandler: GameEventHandler, - ) { - if (!state.isStarted) { - throw CommandException("The game is Not started") - } - if (state.currentPlayerTurn != payload.player) { - throw CommandException("Its not your turn!") - } - - if (state.canBePlayThisCard(payload.player, payload.card)) { - eventHandler.handle(payload.aggregateId) { - CardIsPlayedEvent( - aggregateId = payload.aggregateId, - card = payload.card, - player = payload.player, - version = it, - ) - } - } else { - throw CommandException("You cannot play this card") - } - } } diff --git a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt index b71d5c2..a2596b9 100644 --- a/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt +++ b/src/main/kotlin/eventDemo/app/command/command/IamReadyToPlayCommand.kt @@ -1,11 +1,7 @@ package eventDemo.app.command.command -import eventDemo.app.command.CommandException import eventDemo.app.entity.GameId import eventDemo.app.entity.Player -import eventDemo.app.event.GameEventHandler -import eventDemo.app.event.event.PlayerReadyEvent -import eventDemo.app.event.projection.GameState import eventDemo.libs.command.CommandId import kotlinx.serialization.Serializable @@ -23,29 +19,4 @@ data class IamReadyToPlayCommand( override val aggregateId: GameId, override val player: Player, ) : GameCommand.Payload - - @Throws(CommandException::class) - suspend fun run( - state: GameState, - eventHandler: GameEventHandler, - ) { - val playerExist: Boolean = state.players.contains(payload.player) - val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(payload.player) - - if (state.isStarted) { - throw CommandException("The game is already started") - } else if (!playerExist) { - throw CommandException("You are not in the game") - } else if (playerIsAlreadyReady) { - throw CommandException("You are already ready") - } else { - eventHandler.handle(payload.aggregateId) { - PlayerReadyEvent( - aggregateId = payload.aggregateId, - player = payload.player, - version = it, - ) - } - } - } } diff --git a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt index f39a5b0..1c94828 100644 --- a/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt +++ b/src/main/kotlin/eventDemo/app/event/GameEventHandler.kt @@ -9,7 +9,7 @@ import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock /** - * A stream to publish and read the played card event. + * Handle the event to dispatch it to store, bus and projections builders */ class GameEventHandler( private val eventBus: GameEventBus, @@ -23,17 +23,26 @@ class GameEventHandler( projectionsBuilders.add(builder) } + /** + * Build Event, and send it to the event store and bus. + * Build also the projections. + */ override fun handle( aggregateId: GameId, buildEvent: (version: Int) -> GameEvent, ): GameEvent = locks + // Get lock for the aggregate .computeIfAbsent(aggregateId) { ReentrantLock() } .withLock { + // Build event with the version buildEvent(versionBuilder.buildNextVersion(aggregateId)) + // then publish it to the event store .also { eventStore.publish(it) } }.also { event -> + // Build the projections projectionsBuilders.forEach { it(event) } + // Publish to the bus eventBus.publish(event) } } diff --git a/src/main/kotlin/eventDemo/app/eventListener/SuccessNotificationEventListener.kt b/src/main/kotlin/eventDemo/app/eventListener/SuccessNotificationEventListener.kt new file mode 100644 index 0000000..518ed71 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/eventListener/SuccessNotificationEventListener.kt @@ -0,0 +1,24 @@ +package eventDemo.app.eventListener + +import eventDemo.app.notification.CommandSuccessNotification +import eventDemo.app.notification.Notification +import eventDemo.libs.command.CommandId +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.channels.SendChannel + +private fun SendChannel.successNotifier(commandId: CommandId): suspend () -> Unit = + { + val logger = KotlinLogging.logger { } + CommandSuccessNotification(commandId = commandId) + .let { notification -> + logger.atDebug { + message = "Notification SUCCESS sent" + payload = + mapOf( + "notification" to notification, + "commandId" to commandId, + ) + } + send(notification) + } + } diff --git a/src/main/kotlin/eventDemo/app/notification/ErrorNotification.kt b/src/main/kotlin/eventDemo/app/notification/CommandErrorNotification.kt similarity index 75% rename from src/main/kotlin/eventDemo/app/notification/ErrorNotification.kt rename to src/main/kotlin/eventDemo/app/notification/CommandErrorNotification.kt index 582e711..fd0ba31 100644 --- a/src/main/kotlin/eventDemo/app/notification/ErrorNotification.kt +++ b/src/main/kotlin/eventDemo/app/notification/CommandErrorNotification.kt @@ -5,13 +5,11 @@ import eventDemo.libs.command.Command import kotlinx.serialization.Serializable import java.util.UUID -sealed interface CommandStateNotification : Notification - @Serializable -data class ErrorNotification( +data class CommandErrorNotification( @Serializable(with = UUIDSerializer::class) override val id: UUID = UUID.randomUUID(), val message: String, val command: Command, ) : Notification, - CommandStateNotification + CommandNotification diff --git a/src/main/kotlin/eventDemo/app/notification/CommandNotification.kt b/src/main/kotlin/eventDemo/app/notification/CommandNotification.kt new file mode 100644 index 0000000..9d5b166 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/notification/CommandNotification.kt @@ -0,0 +1,3 @@ +package eventDemo.app.notification + +sealed interface CommandNotification : Notification diff --git a/src/main/kotlin/eventDemo/app/notification/CommandSuccessNotification.kt b/src/main/kotlin/eventDemo/app/notification/CommandSuccessNotification.kt new file mode 100644 index 0000000..5dd92d4 --- /dev/null +++ b/src/main/kotlin/eventDemo/app/notification/CommandSuccessNotification.kt @@ -0,0 +1,14 @@ +package eventDemo.app.notification + +import eventDemo.configuration.UUIDSerializer +import eventDemo.libs.command.CommandId +import kotlinx.serialization.Serializable +import java.util.UUID + +@Serializable +data class CommandSuccessNotification( + @Serializable(with = UUIDSerializer::class) + override val id: UUID = UUID.randomUUID(), + val commandId: CommandId, +) : Notification, + CommandNotification diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt index e1eb53f..b3a29a5 100644 --- a/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDI.kt @@ -9,7 +9,8 @@ import eventDemo.app.event.GameEventStore import eventDemo.app.event.projection.GameStateRepository import eventDemo.app.event.projection.SnapshotConfig import eventDemo.app.eventListener.PlayerNotificationEventListener -import eventDemo.libs.command.CommandStreamChannelBuilder +import eventDemo.libs.command.CommandRunnerController +import eventDemo.libs.command.CommandStreamChannel import eventDemo.libs.event.EventBusInMemory import eventDemo.libs.event.EventStoreInMemory import eventDemo.libs.event.VersionBuilder @@ -41,12 +42,20 @@ val appKoinModule = GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig()) } single { - CommandStreamChannelBuilder() + CommandStreamChannel(get()) + } + single { + CommandRunnerController() + } + single { + GameCommandHandler(get(), get(), get(), get()) } singleOf(::VersionBuilderLocal) bind VersionBuilder::class singleOf(::GameEventHandler) singleOf(::GameCommandActionRunner) - singleOf(::GameCommandHandler) singleOf(::PlayerNotificationEventListener) + + // Actions + configureActions() } diff --git a/src/main/kotlin/eventDemo/configuration/ConfigureDIAction.kt b/src/main/kotlin/eventDemo/configuration/ConfigureDIAction.kt new file mode 100644 index 0000000..f877a8e --- /dev/null +++ b/src/main/kotlin/eventDemo/configuration/ConfigureDIAction.kt @@ -0,0 +1,18 @@ +package eventDemo.configuration + +import eventDemo.app.command.action.ICantPlay +import eventDemo.app.command.action.IWantToJoinTheGame +import eventDemo.app.command.action.IWantToPlayCard +import eventDemo.app.command.action.IamReadyToPlay +import org.koin.core.module.Module +import org.koin.core.module.dsl.singleOf + +/** + * Configure all actions + */ +fun Module.configureActions() { + singleOf(::IWantToPlayCard) + singleOf(::IamReadyToPlay) + singleOf(::IWantToJoinTheGame) + singleOf(::ICantPlay) +} diff --git a/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt b/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt new file mode 100644 index 0000000..697cd39 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/command/CommandRunnerController.kt @@ -0,0 +1,51 @@ +package eventDemo.libs.command + +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + +/** + * Controls the execution of a command to prevent it from being executed more than once. + */ +class CommandRunnerController( + private val maxCacheTime: Duration = 10.minutes, +) { + private val executedCommand: ConcurrentHashMap> = ConcurrentHashMap() + + suspend fun runOnlyOnce( + command: C, + action: CommandBlock, + ) { + if (!isAlreadyExecuted(command)) { + action(command) + setAsExecuted(command) + removeOldCache() + } else { + throw Exception("Command already executed", command) + } + } + + private fun setAsExecuted(command: C) { + executedCommand.computeIfAbsent(command.id) { Pair(false, Clock.System.now()) } + } + + private fun removeOldCache() { + executedCommand + .filterValues { (_, date) -> + (date + maxCacheTime) > Clock.System.now() + }.keys + .forEach { + executedCommand.remove(it) + } + } + + private fun isAlreadyExecuted(command: C): Boolean = + executedCommand[command.id]?.first ?: false + + class Exception( + override val message: String, + val command: Command, + ) : kotlin.Exception(message) +} diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt b/src/main/kotlin/eventDemo/libs/command/CommandStream.kt deleted file mode 100644 index 30cb605..0000000 --- a/src/main/kotlin/eventDemo/libs/command/CommandStream.kt +++ /dev/null @@ -1,35 +0,0 @@ -package eventDemo.libs.command - -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch - -/** - * Represent a Command stream. - * - * The stream contains a list of all actions yet to be executed. - */ -interface CommandStream { - /** - * A class to implement success/failed action. - */ - interface ComputeStatus { - suspend fun ack() - - suspend fun nack() - } - - /** - * Apply an action to all command income in the stream. - */ - suspend fun process(action: CommandBlock) - - @OptIn(DelicateCoroutinesApi::class) - fun blockAndProcess(action: CommandBlock) { - GlobalScope.launch { - process(action) - } - } -} - -typealias CommandBlock = suspend CommandStream.ComputeStatus.(C) -> Unit diff --git a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt index 5ccc6f8..bc5963c 100644 --- a/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt +++ b/src/main/kotlin/eventDemo/libs/command/CommandStreamChannel.kt @@ -2,95 +2,56 @@ package eventDemo.libs.command import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import java.util.concurrent.ConcurrentHashMap -import kotlin.time.Duration -import kotlin.time.Duration.Companion.minutes - -class CommandStreamChannelBuilder( - private val maxCacheTime: Duration = 10.minutes, -) { - operator fun invoke(incoming: ReceiveChannel): CommandStreamChannel = - CommandStreamChannel(incoming, maxCacheTime) -} /** - * Manage [Command]'s with kotlin Channel + * Manage [Command]'s with kotlin Channel. + * + * Use [CommandRunnerController] to prevent multiple executions. + * + * Add logs when command success or failed */ class CommandStreamChannel( - private val incoming: ReceiveChannel, - private val maxCacheTime: Duration = 10.minutes, -) : CommandStream { + private val controller: CommandRunnerController, +) { private val logger = KotlinLogging.logger {} - private val executedCommand: ConcurrentHashMap> = ConcurrentHashMap() - override suspend fun process(action: CommandBlock) { + suspend fun process( + incoming: ReceiveChannel, + action: CommandBlock, + ) { for (command in incoming) { - val now = Clock.System.now() - val (status, _) = executedCommand.computeIfAbsent(command.id) { Pair(false, now) } - - if (status) { + try { + controller.runOnlyOnce(command) { + // Wrap action to add logs + runAndLogStatus(command, action) + } + } catch (e: CommandRunnerController.Exception) { logger.atWarn { - message = "Command already executed: $command" + message = e.message payload = mapOf("command" to command) } - } else { - compute(command, action) } - executedCommand - .filterValues { (_, date) -> - (date + maxCacheTime) > now - }.keys - .forEach { - executedCommand.remove(it) - } } } - private suspend fun compute( + private suspend fun runAndLogStatus( command: C, action: CommandBlock, ) { - val status = - object : CommandStream.ComputeStatus { - var isSet: Boolean = false - - override suspend fun ack() { - if (!isSet) markAsSuccess(command) else error("Already NACK") - isSet = true - } - - override suspend fun nack() { - if (!isSet) markAsFailed(command) else error("Already ACK") - isSet = true - } - } - - val actionResult = runCatching { status.action(command) } + val actionResult = runCatching { action(command) } if (actionResult.isFailure) { - logger.atInfo { - message = "Error on compute the Command: $command" + logger.atWarn { + message = "Compute command FAILED: $command" payload = mapOf("command" to command) cause = actionResult.exceptionOrNull() } - markAsFailed(command) - } else if (!status.isSet) { - status.ack() - } - } - - private suspend fun markAsSuccess(command: C) { - logger.atInfo { - message = "Compute command SUCCESS: $command" - payload = mapOf("command" to command) - } - } - - private suspend fun markAsFailed(command: C) { - logger.atWarn { - message = "Compute command FAILED: $command" - payload = mapOf("command" to command) + } else if (actionResult.isSuccess) { + logger.atInfo { + message = "Compute command SUCCESS: $command" + payload = mapOf("command" to command) + } } } } + +typealias CommandBlock = suspend (C) -> Unit diff --git a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt index c0b6cb5..bab1496 100644 --- a/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/app/command/GameCommandHandlerTest.kt @@ -6,11 +6,13 @@ import eventDemo.app.entity.GameId import eventDemo.app.entity.Player import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.app.eventListener.ReactionEventListener +import eventDemo.app.notification.CommandSuccessNotification import eventDemo.app.notification.Notification import eventDemo.app.notification.WelcomeToTheGameNotification import eventDemo.configuration.appKoinModule import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContain +import io.kotest.matchers.equals.shouldBeEqual import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel @@ -36,7 +38,12 @@ class GameCommandHandlerTest : commandHandler.handle(player, channelCommand, channelNotification) } - channelCommand.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player))) + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand -> + channelCommand.send(sendCommand) + channelNotification.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } assertIs(channelNotification.receive()).let { it.players shouldContain player } diff --git a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt index 2bed51c..948a6e2 100644 --- a/src/test/kotlin/eventDemo/app/query/GameStateTest.kt +++ b/src/test/kotlin/eventDemo/app/query/GameStateTest.kt @@ -15,6 +15,7 @@ import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory import eventDemo.app.event.projection.apply import eventDemo.app.eventListener.PlayerNotificationEventListener import eventDemo.app.eventListener.ReactionEventListener +import eventDemo.app.notification.CommandSuccessNotification import eventDemo.app.notification.ItsTheTurnOfNotification import eventDemo.app.notification.Notification import eventDemo.app.notification.PlayerAsJoinTheGameNotification @@ -59,14 +60,25 @@ class GameStateTest : val player1Job = launch { - channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1))) + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + channelNotification1.receive().let { assertIs(it).players shouldBeEqual setOf(player1) } channelNotification1.receive().let { assertIs(it).player shouldBeEqual player2 } - channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1))) + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } channelNotification1.receive().let { assertIs(it).player shouldBeEqual player2 } @@ -80,7 +92,13 @@ class GameStateTest : player shouldBeEqual player1 } } - channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first()))) + + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).also { sendCommand -> + channelCommand1.send(sendCommand) + channelNotification1.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } channelNotification1.receive().let { assertIs(it).apply { @@ -99,14 +117,27 @@ class GameStateTest : val player2Job = launch { delay(100) - channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2))) + IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + channelNotification2.receive().let { assertIs(it).players shouldBeEqual setOf(player1, player2) } channelNotification2.receive().let { assertIs(it).player shouldBeEqual player1 } - channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2))) + + IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } + val player2Hand = channelNotification2.receive().let { assertIs(it).hand shouldHaveSize 7 @@ -129,7 +160,13 @@ class GameStateTest : player shouldBeEqual player2 } } - channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first()))) + + IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).also { sendCommand -> + channelCommand2.send(sendCommand) + channelNotification2.receive().let { + assertIs(it).commandId shouldBeEqual sendCommand.id + } + } } koinApplication { modules(appKoinModule) }.koin.apply { diff --git a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt index 5fae3cb..3afc896 100644 --- a/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt +++ b/src/test/kotlin/eventDemo/libs/command/CommandStreamChannelTest.kt @@ -3,7 +3,10 @@ package eventDemo.libs.command import io.kotest.core.spec.style.FunSpec import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import kotlinx.serialization.Serializable @Serializable @@ -11,6 +14,7 @@ class CommandTest( override val id: CommandId, ) : Command +@OptIn(DelicateCoroutinesApi::class) class CommandStreamChannelTest : FunSpec({ @@ -18,15 +22,17 @@ class CommandStreamChannelTest : val command = CommandTest(CommandId()) val channel = Channel() - val stream = - CommandStreamChannel(channel) + val stream = CommandStreamChannel(CommandRunnerController()) val spyCall: () -> Unit = mockk(relaxed = true) - stream.blockAndProcess { - println("In action ${it.id}") - spyCall() + GlobalScope.launch { + stream.process(channel) { + println("In action ${it.id}") + spyCall() + } } + channel.send(command) verify(exactly = 1) { spyCall() } }