refactor: move CommandHandler to libs

This commit is contained in:
2025-04-16 00:42:59 +02:00
parent 1f995b6ed9
commit f03265292a
7 changed files with 189 additions and 93 deletions

View File

@@ -56,7 +56,7 @@ private fun DefaultWebSocketServerSession.runWebSocket(
// TODO change GlobalScope
GlobalScope.launch {
commandHandler.handle(
commandHandler.handleIncomingPlayerCommands(
currentPlayer,
gameId,
toObjectChannel(incoming),

View File

@@ -4,48 +4,82 @@ import eventDemo.business.command.command.GameCommand
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventHandler
import eventDemo.business.event.GameEventStore
import eventDemo.business.event.event.GameEvent
import eventDemo.business.notification.CommandErrorNotification
import eventDemo.business.notification.CommandSuccessNotification
import eventDemo.business.notification.Notification
import eventDemo.libs.command.CommandId
import eventDemo.libs.command.CommandStreamChannel
import eventDemo.libs.command.CommandHandler
import eventDemo.libs.command.CommandRunnerController
import eventDemo.libs.event.EventHandlerImpl
import eventDemo.libs.event.VersionBuilder
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
/**
* Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action.
* Listen [GameCommand] on [GameEventBus], check the validity and execute an action.
*
* This action can be executing an action and produce a new [GameEvent] after verification.
*/
class GameCommandHandler(
private val commandStreamChannel: CommandStreamChannel<GameCommand>,
private val eventHandler: GameEventHandler,
private val runner: GameCommandActionRunner,
eventBus: GameEventBus,
eventStore: GameEventStore,
versionBuilder: VersionBuilder,
runner: GameCommandActionRunner,
) {
private val logger = KotlinLogging.logger { }
private val eventCommandMap = EventCommandMap()
// subscribe to the event bus to send success notification after save the event.
fun subscribeToBus(eventBus: GameEventBus) {
eventBus.subscribe { event: GameEvent ->
eventCommandMap[event.eventId]?.apply {
channel.sendSuccess(commandId)()
} ?: logger.warn { "No Notification for event: $event" }
private val eventHandler =
EventHandlerImpl(
eventBus,
eventStore,
versionBuilder,
)
private val commandHandler =
CommandHandler(
CommandRunnerController<GameCommand>(),
eventHandler,
) {
runner.run(it)
}
/**
* Subscribe to the [event bus][GameEventBus]
* to send success [notification][Notification] after save the [event][GameEvent].
*/
fun subscribeToBus(eventBus: GameEventBus) =
commandHandler.subscribeToBus(eventBus)
/**
* Lisent incoming [command][GameCommand] from the [channel][ReceiveChannel],
* run the command and publish the generated [event][GameEvent] to the bus.
*
* It restricts to run only once a command.
*
* If the command fail, send an [error notification][CommandErrorNotification],
* if success, send a [success notification][CommandSuccessNotification]
*/
suspend fun handleIncomingPlayerCommands(
player: Player,
gameId: GameId,
incomingCommandChannel: ReceiveChannel<GameCommand>,
channelNotification: SendChannel<Notification>,
) {
for (command in incomingCommandChannel) {
handle(
player,
gameId,
command,
channelNotification.sendSuccess(command),
channelNotification.sendError(command),
)
}
}
/**
* Run a command and publish the event.
* Run the [command] and publish the generated [event][GameEvent] to the bus.
*
* It restricts to run only once a command.
*
@@ -55,46 +89,37 @@ class GameCommandHandler(
suspend fun handle(
player: Player,
gameId: GameId,
incomingCommandChannel: ReceiveChannel<GameCommand>,
channelNotification: SendChannel<Notification>,
command: GameCommand,
sendSuccess: suspend () -> Unit,
sendError: suspend (message: String) -> Unit,
) {
commandStreamChannel.process(incomingCommandChannel) { command ->
withLoggingContext("command" to command.toString()) {
if (command.payload.aggregateId.id != gameId.id) {
logger.warn { "Handle command Refuse, the gameId of the command is not the same" }
channelNotification.sendError(command)("The gameId in the command does not match with your game")
return@process
}
if (command.payload.aggregateId.id != gameId.id) {
logger.warn { "Handle command Refuse, the gameId of the command is not the same" }
sendError("The gameId in the command does not match with your game")
return
}
if (command.payload.player.id != player.id) {
logger.warn { "Handle command Refuse, the player of the command is not the same" }
sendError("You are not the author of this command")
return
}
if (command.payload.player.id != player.id) {
logger.warn { "Handle command Refuse, the player of the command is not the same" }
channelNotification.sendError(command)("You are not the author of this command")
return@process
}
logger.info { "Handle command" }
try {
val eventBuilder = runner.run(command)
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
}
} catch (e: CommandException) {
logger.warn(e) { e.message }
channelNotification.sendError(command)(e.message)
}
commandHandler.handle(gameId, command) { _, error ->
if (error != null) {
sendError(error.message) // Business
} else {
sendSuccess()
}
}
}
}
private fun SendChannel<Notification>.sendSuccess(commandId: CommandId): suspend () -> Unit =
private fun SendChannel<Notification>.sendSuccess(command: GameCommand): suspend () -> Unit =
{
val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = commandId)
CommandSuccessNotification(commandId = command.id)
.also { notification ->
withLoggingContext("notification" to notification.toString(), "commandId" to commandId.toString()) {
withLoggingContext("notification" to notification.toString(), "commandId" to command.id.toString()) {
logger.debug { "Notification SUCCESS sent" }
send(notification)
}
@@ -112,34 +137,3 @@ private fun SendChannel<Notification>.sendError(command: GameCommand): suspend (
}
}
}
/**
* Map to record the command that triggered the event.
*/
private class EventCommandMap(
val retention: Duration = 10.minutes,
) {
val map = ConcurrentHashMap<UUID, Output>()
fun set(
eventId: UUID,
channel: SendChannel<Notification>,
commandId: CommandId,
) {
map[eventId] = Output(channel, commandId, Clock.System.now())
map
.filterValues { it.date < (Clock.System.now() - retention) }
.keys
.forEach(map::remove)
}
operator fun get(eventId: UUID): Output? =
map[eventId]
data class Output(
val channel: SendChannel<Notification>,
val commandId: CommandId,
val date: Instant,
)
}

View File

@@ -10,7 +10,7 @@ import org.koin.core.module.dsl.singleOf
fun Module.configureDIBusiness() {
single {
GameCommandHandler(get(), get(), get())
GameCommandHandler(get(), get(), get(), get())
}
singleOf(::GameEventHandler)
singleOf(::GameCommandActionRunner)

View File

@@ -1,8 +1,5 @@
package eventDemo.configuration.injection
import eventDemo.business.command.command.GameCommand
import eventDemo.libs.command.CommandRunnerController
import eventDemo.libs.command.CommandStreamChannel
import eventDemo.libs.event.VersionBuilder
import eventDemo.libs.event.VersionBuilderLocal
import org.koin.core.module.Module
@@ -10,8 +7,5 @@ import org.koin.core.module.dsl.singleOf
import org.koin.dsl.bind
fun Module.configureDILibs() {
single {
CommandStreamChannel<GameCommand>(CommandRunnerController())
}
singleOf(::VersionBuilderLocal) bind VersionBuilder::class
}

View File

@@ -0,0 +1,108 @@
package eventDemo.libs.command
import eventDemo.business.command.CommandException
import eventDemo.business.command.command.GameCommand
import eventDemo.business.event.event.GameEvent
import eventDemo.libs.bus.Bus
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import eventDemo.libs.event.EventHandler
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
/**
* Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action.
*
* This action can be executing an action and produce a new [GameEvent] after verification.
*/
class CommandHandler<B : Bus<E>, E : Event<ID>, ID : AggregateId, C : Command>(
private val controller: CommandRunnerController<C>,
private val eventHandler: EventHandler<E, ID>,
private val runner: (command: C) -> (version: Int) -> E,
) {
private val logger = KotlinLogging.logger { }
private val eventCommandMap = EventCommandMap<C, E>()
/** subscribe to the event bus to run callback after event was saved. */
fun subscribeToBus(eventBus: B) {
eventBus.subscribe { event: E ->
eventCommandMap[event.eventId]?.invoke()
?: logger.debug { "No Notification for event: $event" }
}
}
/**
* Run the [command] and publish generated [event][Event].
*
* The [callback] is call after execute the [command]
*
* It restricts to run only once the [command].
*/
suspend fun handle(
aggregateId: ID,
command: C,
callback: CommandCallback<C>,
) {
controller.runOnlyOnce(command) {
withLoggingContext("command" to command.toString()) {
logger.info { "Handle command" }
try {
val eventBuilder = runner(command)
eventHandler.handle(aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(callback, it, command) }
}
} catch (e: CommandException) {
logger.warn(e) { e.message }
callback(command, e)
}
}
}
}
}
/**
* Map to record the command that triggered the event.
*/
private class EventCommandMap<C : Command, E : Event<*>>(
val retention: Duration = 10.minutes,
) {
val map = ConcurrentHashMap<UUID, Callback<C, E>>()
fun set(
callback: CommandCallback<C>,
event: E,
command: C,
) {
map[event.eventId] = Callback(callback, command, event, Clock.System.now())
// Remove older
map
.filterValues { it.date < (Clock.System.now() - retention) }
.keys
.forEach(map::remove)
}
operator fun get(eventId: UUID): Callback<C, E>? =
map[eventId]
data class Callback<C : Command, E : Event<*>>(
val callback: CommandCallback<C>,
val command: C,
val event: E,
val date: Instant,
) {
suspend operator fun invoke(error: CommandException? = null) {
callback(command, error)
}
}
}
typealias CommandCallback<C> = suspend (command: C, error: CommandException?) -> Unit