From f03265292a27bdfa64b7b2a1d79e5380a63219c3 Mon Sep 17 00:00:00 2001 From: Fabrice Lecomte Date: Wed, 16 Apr 2025 00:42:59 +0200 Subject: [PATCH] refactor: move CommandHandler to libs --- .../query/GameCommandRouteWebSocket.kt | 2 +- .../business/command/GameCommandHandler.kt | 158 +++++++++--------- .../injection/ConfigureDIBusiness.kt | 2 +- .../injection/ConfigureDILibs.kt | 6 - .../eventDemo/libs/command/CommandHandler.kt | 108 ++++++++++++ .../query/GameSimulationTest.kt | 4 +- .../command/GameCommandHandlerTest.kt | 2 +- 7 files changed, 189 insertions(+), 93 deletions(-) create mode 100644 src/main/kotlin/eventDemo/libs/command/CommandHandler.kt diff --git a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt index cb481ec..b1aa063 100644 --- a/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt +++ b/src/main/kotlin/eventDemo/adapter/interfaceLayer/query/GameCommandRouteWebSocket.kt @@ -56,7 +56,7 @@ private fun DefaultWebSocketServerSession.runWebSocket( // TODO change GlobalScope GlobalScope.launch { - commandHandler.handle( + commandHandler.handleIncomingPlayerCommands( currentPlayer, gameId, toObjectChannel(incoming), diff --git a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt index 0f858a4..56d4734 100644 --- a/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt +++ b/src/main/kotlin/eventDemo/business/command/GameCommandHandler.kt @@ -4,48 +4,82 @@ import eventDemo.business.command.command.GameCommand import eventDemo.business.entity.GameId import eventDemo.business.entity.Player import eventDemo.business.event.GameEventBus -import eventDemo.business.event.GameEventHandler +import eventDemo.business.event.GameEventStore import eventDemo.business.event.event.GameEvent import eventDemo.business.notification.CommandErrorNotification import eventDemo.business.notification.CommandSuccessNotification import eventDemo.business.notification.Notification -import eventDemo.libs.command.CommandId -import eventDemo.libs.command.CommandStreamChannel +import eventDemo.libs.command.CommandHandler +import eventDemo.libs.command.CommandRunnerController +import eventDemo.libs.event.EventHandlerImpl +import eventDemo.libs.event.VersionBuilder import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.withLoggingContext import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import kotlin.time.Duration -import kotlin.time.Duration.Companion.minutes /** - * Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action. + * Listen [GameCommand] on [GameEventBus], check the validity and execute an action. * * This action can be executing an action and produce a new [GameEvent] after verification. */ class GameCommandHandler( - private val commandStreamChannel: CommandStreamChannel, - private val eventHandler: GameEventHandler, - private val runner: GameCommandActionRunner, + eventBus: GameEventBus, + eventStore: GameEventStore, + versionBuilder: VersionBuilder, + runner: GameCommandActionRunner, ) { private val logger = KotlinLogging.logger { } - private val eventCommandMap = EventCommandMap() - // subscribe to the event bus to send success notification after save the event. - fun subscribeToBus(eventBus: GameEventBus) { - eventBus.subscribe { event: GameEvent -> - eventCommandMap[event.eventId]?.apply { - channel.sendSuccess(commandId)() - } ?: logger.warn { "No Notification for event: $event" } + private val eventHandler = + EventHandlerImpl( + eventBus, + eventStore, + versionBuilder, + ) + private val commandHandler = + CommandHandler( + CommandRunnerController(), + eventHandler, + ) { + runner.run(it) + } + + /** + * Subscribe to the [event bus][GameEventBus] + * to send success [notification][Notification] after save the [event][GameEvent]. + */ + fun subscribeToBus(eventBus: GameEventBus) = + commandHandler.subscribeToBus(eventBus) + + /** + * Lisent incoming [command][GameCommand] from the [channel][ReceiveChannel], + * run the command and publish the generated [event][GameEvent] to the bus. + * + * It restricts to run only once a command. + * + * If the command fail, send an [error notification][CommandErrorNotification], + * if success, send a [success notification][CommandSuccessNotification] + */ + suspend fun handleIncomingPlayerCommands( + player: Player, + gameId: GameId, + incomingCommandChannel: ReceiveChannel, + channelNotification: SendChannel, + ) { + for (command in incomingCommandChannel) { + handle( + player, + gameId, + command, + channelNotification.sendSuccess(command), + channelNotification.sendError(command), + ) } } /** - * Run a command and publish the event. + * Run the [command] and publish the generated [event][GameEvent] to the bus. * * It restricts to run only once a command. * @@ -55,46 +89,37 @@ class GameCommandHandler( suspend fun handle( player: Player, gameId: GameId, - incomingCommandChannel: ReceiveChannel, - channelNotification: SendChannel, + command: GameCommand, + sendSuccess: suspend () -> Unit, + sendError: suspend (message: String) -> Unit, ) { - commandStreamChannel.process(incomingCommandChannel) { command -> - withLoggingContext("command" to command.toString()) { - if (command.payload.aggregateId.id != gameId.id) { - logger.warn { "Handle command Refuse, the gameId of the command is not the same" } - channelNotification.sendError(command)("The gameId in the command does not match with your game") - return@process - } + if (command.payload.aggregateId.id != gameId.id) { + logger.warn { "Handle command Refuse, the gameId of the command is not the same" } + sendError("The gameId in the command does not match with your game") + return + } + if (command.payload.player.id != player.id) { + logger.warn { "Handle command Refuse, the player of the command is not the same" } + sendError("You are not the author of this command") + return + } - if (command.payload.player.id != player.id) { - logger.warn { "Handle command Refuse, the player of the command is not the same" } - channelNotification.sendError(command)("You are not the author of this command") - return@process - } - - logger.info { "Handle command" } - try { - val eventBuilder = runner.run(command) - - eventHandler.handle(command.payload.aggregateId) { version -> - eventBuilder(version) - .also { eventCommandMap.set(it.eventId, channelNotification, command.id) } - } - } catch (e: CommandException) { - logger.warn(e) { e.message } - channelNotification.sendError(command)(e.message) - } + commandHandler.handle(gameId, command) { _, error -> + if (error != null) { + sendError(error.message) // Business + } else { + sendSuccess() } } } } -private fun SendChannel.sendSuccess(commandId: CommandId): suspend () -> Unit = +private fun SendChannel.sendSuccess(command: GameCommand): suspend () -> Unit = { val logger = KotlinLogging.logger { } - CommandSuccessNotification(commandId = commandId) + CommandSuccessNotification(commandId = command.id) .also { notification -> - withLoggingContext("notification" to notification.toString(), "commandId" to commandId.toString()) { + withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) { logger.debug { "Notification SUCCESS sent" } send(notification) } @@ -112,34 +137,3 @@ private fun SendChannel.sendError(command: GameCommand): suspend ( } } } - -/** - * Map to record the command that triggered the event. - */ -private class EventCommandMap( - val retention: Duration = 10.minutes, -) { - val map = ConcurrentHashMap() - - fun set( - eventId: UUID, - channel: SendChannel, - commandId: CommandId, - ) { - map[eventId] = Output(channel, commandId, Clock.System.now()) - - map - .filterValues { it.date < (Clock.System.now() - retention) } - .keys - .forEach(map::remove) - } - - operator fun get(eventId: UUID): Output? = - map[eventId] - - data class Output( - val channel: SendChannel, - val commandId: CommandId, - val date: Instant, - ) -} diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIBusiness.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIBusiness.kt index 96b0306..c41885b 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIBusiness.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDIBusiness.kt @@ -10,7 +10,7 @@ import org.koin.core.module.dsl.singleOf fun Module.configureDIBusiness() { single { - GameCommandHandler(get(), get(), get()) + GameCommandHandler(get(), get(), get(), get()) } singleOf(::GameEventHandler) singleOf(::GameCommandActionRunner) diff --git a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDILibs.kt b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDILibs.kt index 034b9e2..d13a1f8 100644 --- a/src/main/kotlin/eventDemo/configuration/injection/ConfigureDILibs.kt +++ b/src/main/kotlin/eventDemo/configuration/injection/ConfigureDILibs.kt @@ -1,8 +1,5 @@ package eventDemo.configuration.injection -import eventDemo.business.command.command.GameCommand -import eventDemo.libs.command.CommandRunnerController -import eventDemo.libs.command.CommandStreamChannel import eventDemo.libs.event.VersionBuilder import eventDemo.libs.event.VersionBuilderLocal import org.koin.core.module.Module @@ -10,8 +7,5 @@ import org.koin.core.module.dsl.singleOf import org.koin.dsl.bind fun Module.configureDILibs() { - single { - CommandStreamChannel(CommandRunnerController()) - } singleOf(::VersionBuilderLocal) bind VersionBuilder::class } diff --git a/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt b/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt new file mode 100644 index 0000000..61ded43 --- /dev/null +++ b/src/main/kotlin/eventDemo/libs/command/CommandHandler.kt @@ -0,0 +1,108 @@ +package eventDemo.libs.command + +import eventDemo.business.command.CommandException +import eventDemo.business.command.command.GameCommand +import eventDemo.business.event.event.GameEvent +import eventDemo.libs.bus.Bus +import eventDemo.libs.event.AggregateId +import eventDemo.libs.event.Event +import eventDemo.libs.event.EventHandler +import io.github.oshai.kotlinlogging.KotlinLogging +import io.github.oshai.kotlinlogging.withLoggingContext +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + +/** + * Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action. + * + * This action can be executing an action and produce a new [GameEvent] after verification. + */ +class CommandHandler, E : Event, ID : AggregateId, C : Command>( + private val controller: CommandRunnerController, + private val eventHandler: EventHandler, + private val runner: (command: C) -> (version: Int) -> E, +) { + private val logger = KotlinLogging.logger { } + private val eventCommandMap = EventCommandMap() + + /** subscribe to the event bus to run callback after event was saved. */ + fun subscribeToBus(eventBus: B) { + eventBus.subscribe { event: E -> + eventCommandMap[event.eventId]?.invoke() + ?: logger.debug { "No Notification for event: $event" } + } + } + + /** + * Run the [command] and publish generated [event][Event]. + * + * The [callback] is call after execute the [command] + * + * It restricts to run only once the [command]. + */ + suspend fun handle( + aggregateId: ID, + command: C, + callback: CommandCallback, + ) { + controller.runOnlyOnce(command) { + withLoggingContext("command" to command.toString()) { + logger.info { "Handle command" } + try { + val eventBuilder = runner(command) + + eventHandler.handle(aggregateId) { version -> + eventBuilder(version) + .also { eventCommandMap.set(callback, it, command) } + } + } catch (e: CommandException) { + logger.warn(e) { e.message } + callback(command, e) + } + } + } + } +} + +/** + * Map to record the command that triggered the event. + */ +private class EventCommandMap>( + val retention: Duration = 10.minutes, +) { + val map = ConcurrentHashMap>() + + fun set( + callback: CommandCallback, + event: E, + command: C, + ) { + map[event.eventId] = Callback(callback, command, event, Clock.System.now()) + + // Remove older + map + .filterValues { it.date < (Clock.System.now() - retention) } + .keys + .forEach(map::remove) + } + + operator fun get(eventId: UUID): Callback? = + map[eventId] + + data class Callback>( + val callback: CommandCallback, + val command: C, + val event: E, + val date: Instant, + ) { + suspend operator fun invoke(error: CommandException? = null) { + callback(command, error) + } + } +} + +typealias CommandCallback = suspend (command: C, error: CommandException?) -> Unit diff --git a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt index b6904d8..5a691d1 100644 --- a/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt +++ b/src/test/kotlin/eventDemo/adapter/interfaceLayer/query/GameSimulationTest.kt @@ -69,10 +69,10 @@ class GameSimulationTest : // In the normal process, these handlers is invoque players connect to the websocket run { GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player1, gameId, channelCommand1, channelNotification1) + commandHandler.handleIncomingPlayerCommands(player1, gameId, channelCommand1, channelNotification1) } GlobalScope.launch(Dispatchers.IO) { - commandHandler.handle(player2, gameId, channelCommand2, channelNotification2) + commandHandler.handleIncomingPlayerCommands(player2, gameId, channelCommand2, channelNotification2) } } diff --git a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt index 4121d6b..8dab3cc 100644 --- a/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt +++ b/src/test/kotlin/eventDemo/business/command/GameCommandHandlerTest.kt @@ -42,7 +42,7 @@ class GameCommandHandlerTest : ) { channelNotification.trySendBlocking(it) } GlobalScope.launch { - commandHandler.handle( + commandHandler.handleIncomingPlayerCommands( player, gameId, channelCommand,