Send Success notification when Command is executed

extract Action of the Commands
simplify somme classes
This commit is contained in:
2025-03-16 00:17:06 +01:00
parent a2f93d4edd
commit ca95344ca9
26 changed files with 498 additions and 310 deletions

View File

@@ -1,29 +0,0 @@
package eventDemo.app.command
import eventDemo.app.command.command.GameCommand
import eventDemo.app.notification.ErrorNotification
import eventDemo.app.notification.Notification
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.SendChannel
typealias ErrorNotifier = suspend (String) -> Unit
fun errorNotifier(
command: GameCommand,
channel: SendChannel<Notification>,
): ErrorNotifier =
{
val logger = KotlinLogging.logger { }
ErrorNotification(message = it, command = command)
.let { notification ->
logger.atWarn {
message = "Notification ERROR sent: ${notification.message}"
payload =
mapOf(
"notification" to notification,
"command" to command,
)
}
channel.send(notification)
}
}

View File

@@ -1,34 +1,27 @@
package eventDemo.app.command
import eventDemo.app.command.action.ICantPlay
import eventDemo.app.command.action.IWantToJoinTheGame
import eventDemo.app.command.action.IWantToPlayCard
import eventDemo.app.command.action.IamReadyToPlay
import eventDemo.app.command.command.GameCommand
import eventDemo.app.command.command.ICantPlayCommand
import eventDemo.app.command.command.IWantToJoinTheGameCommand
import eventDemo.app.command.command.IWantToPlayCardCommand
import eventDemo.app.command.command.IamReadyToPlayCommand
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.projection.GameStateRepository
import eventDemo.app.notification.Notification
import kotlinx.coroutines.channels.SendChannel
import eventDemo.app.event.event.GameEvent
class GameCommandActionRunner(
private val eventHandler: GameEventHandler,
private val gameStateRepository: GameStateRepository,
private val iWantToPlayCard: IWantToPlayCard,
private val iamReadyToPlay: IamReadyToPlay,
private val iWantToJoinTheGame: IWantToJoinTheGame,
private val iCantPlay: ICantPlay,
) {
suspend fun run(
command: GameCommand,
outgoingErrorChannelNotification: SendChannel<Notification>,
) {
val gameState = gameStateRepository.getLast(command.payload.aggregateId)
try {
fun run(command: GameCommand): (Int) -> GameEvent =
when (command) {
is IWantToPlayCardCommand -> command.run(gameState, this.eventHandler)
is IamReadyToPlayCommand -> command.run(gameState, this.eventHandler)
is IWantToJoinTheGameCommand -> command.run(gameState, this.eventHandler)
is ICantPlayCommand -> command.run(gameState, this.eventHandler)
}
} catch (e: CommandException) {
errorNotifier(command, outgoingErrorChannelNotification)(e.message)
}
is IWantToPlayCardCommand -> iWantToPlayCard.run(command)
is IamReadyToPlayCommand -> iamReadyToPlay.run(command)
is IWantToJoinTheGameCommand -> iWantToJoinTheGame.run(command)
is ICantPlayCommand -> iCantPlay.run(command)
}
}

View File

@@ -2,46 +2,144 @@ package eventDemo.app.command
import eventDemo.app.command.command.GameCommand
import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventBus
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.GameEvent
import eventDemo.app.notification.CommandErrorNotification
import eventDemo.app.notification.CommandSuccessNotification
import eventDemo.app.notification.Notification
import eventDemo.libs.command.CommandStreamChannelBuilder
import eventDemo.libs.command.CommandId
import eventDemo.libs.command.CommandStreamChannel
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
/**
* Listen [GameCommand] on [GameCommandStream], check the validity and execute an action.
* Listen [GameCommand] on [CommandStreamChannel], check the validity and execute an action.
*
* This action can be executing an action and produce a new [GameEvent] after verification.
*/
class GameCommandHandler(
private val commandStreamChannel: CommandStreamChannelBuilder<GameCommand>,
private val commandStreamChannel: CommandStreamChannel<GameCommand>,
private val eventHandler: GameEventHandler,
private val runner: GameCommandActionRunner,
eventBus: GameEventBus,
listenerPriority: Int = DEFAULT_PRIORITY,
) {
private val logger = KotlinLogging.logger { }
private val eventCommandMap = EventCommandMap()
companion object Config {
const val DEFAULT_PRIORITY = 1000
}
// subscribe to the event bus to send success notification after save the event.
init {
eventBus.subscribe(listenerPriority) { event: GameEvent ->
eventCommandMap[event.eventId]?.apply {
channel.sendSuccess(commandId)()
} ?: logger.warn { "No Notification for event: $event" }
}
}
/**
* Init the handler
* Run a command and publish the event.
*
* It restricts to run only once a command.
*
* If the command fail, send an [error notification][CommandErrorNotification],
* if success, send a [success notification][CommandSuccessNotification]
*/
suspend fun handle(
player: Player,
incomingCommandChannel: ReceiveChannel<GameCommand>,
outgoingErrorChannelNotification: SendChannel<Notification>,
channelNotification: SendChannel<Notification>,
) =
commandStreamChannel(incomingCommandChannel)
.process { command ->
commandStreamChannel.process(incomingCommandChannel) { command ->
if (command.payload.player.id != player.id) {
logger.atWarn {
message = "Handle command Refuse, the player of the command is not the same: $command"
payload = mapOf("command" to command)
}
nack()
channelNotification.sendError(command)("You are not the author of this command\n")
} else {
logger.atInfo {
message = "Handle command: $command"
payload = mapOf("command" to command)
}
runner.run(command, outgoingErrorChannelNotification)
try {
val eventBuilder = runner.run(command)
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
}
} catch (e: CommandException) {
logger.atWarn {
message = e.message
payload = mapOf("command" to command)
}
channelNotification.sendError(command)(e.message)
}
}
}
}
private fun SendChannel<Notification>.sendSuccess(commandId: CommandId): suspend () -> Unit =
{
val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = commandId)
.also { notification ->
logger.atDebug {
message = "Notification SUCCESS sent"
payload =
mapOf(
"notification" to notification,
"commandId" to commandId,
)
}
send(notification)
}
}
private fun SendChannel<Notification>.sendError(command: GameCommand): suspend (String) -> Unit =
{
val logger = KotlinLogging.logger { }
CommandErrorNotification(message = it, command = command)
.also { notification ->
logger.atWarn {
message = "Notification ERROR sent: ${notification.message}"
payload =
mapOf(
"notification" to notification,
"command" to command,
)
}
send(notification)
}
}
/**
* Map to record the command that triggered the event.
*/
private class EventCommandMap {
val map = ConcurrentHashMap<UUID, Output>()
fun set(
eventId: UUID,
channel: SendChannel<Notification>,
commandId: CommandId,
) {
map[eventId] = Output(channel, commandId)
}
operator fun get(eventId: UUID): Output? =
map[eventId]
data class Output(
val channel: SendChannel<Notification>,
val commandId: CommandId,
)
}

View File

@@ -1,13 +0,0 @@
package eventDemo.app.command
import eventDemo.app.command.command.GameCommand
import eventDemo.libs.command.CommandStream
import eventDemo.libs.command.CommandStreamChannel
import kotlinx.coroutines.channels.ReceiveChannel
/**
* A stream to publish and read the game command.
*/
class GameCommandStream(
incoming: ReceiveChannel<GameCommand>,
) : CommandStream<GameCommand> by CommandStreamChannel(incoming)

View File

@@ -0,0 +1,8 @@
package eventDemo.app.command.action
import eventDemo.libs.command.Command
import eventDemo.libs.event.Event
sealed interface CommandAction<C : Command, E : Event<*>> {
fun run(command: C): (Int) -> E
}

View File

@@ -0,0 +1,36 @@
package eventDemo.app.command.action
import eventDemo.app.command.CommandException
import eventDemo.app.command.command.ICantPlayCommand
import eventDemo.app.event.event.PlayerHavePassEvent
import eventDemo.app.event.projection.GameStateRepository
/**
* A command to perform an action to play a new card
*/
data class ICantPlay(
private val gameStateRepository: GameStateRepository,
) : CommandAction<ICantPlayCommand, PlayerHavePassEvent> {
override fun run(command: ICantPlayCommand): (Int) -> PlayerHavePassEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId)
if (state.currentPlayerTurn != command.payload.player) {
throw CommandException("Its not your turn!")
}
val playableCards = state.playableCards(command.payload.player)
if (playableCards.isNotEmpty()) {
throw CommandException("You can and must play one card, like ${playableCards.first()::class.simpleName}")
}
val takenCard = state.deck.stack.first()
return { version ->
PlayerHavePassEvent(
aggregateId = command.payload.aggregateId,
player = command.payload.player,
takenCard = takenCard,
version = version,
)
}
}
}

View File

@@ -0,0 +1,28 @@
package eventDemo.app.command.action
import eventDemo.app.command.CommandException
import eventDemo.app.command.command.IWantToJoinTheGameCommand
import eventDemo.app.event.event.NewPlayerEvent
import eventDemo.app.event.projection.GameStateRepository
/**
* A command to perform an action to play a new card
*/
data class IWantToJoinTheGame(
private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToJoinTheGameCommand, NewPlayerEvent> {
override fun run(command: IWantToJoinTheGameCommand): (Int) -> NewPlayerEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId)
if (!state.isStarted) {
return {
NewPlayerEvent(
aggregateId = command.payload.aggregateId,
player = command.payload.player,
version = it,
)
}
} else {
throw CommandException("The game is already started")
}
}
}

View File

@@ -0,0 +1,36 @@
package eventDemo.app.command.action
import eventDemo.app.command.CommandException
import eventDemo.app.command.command.IWantToPlayCardCommand
import eventDemo.app.event.event.CardIsPlayedEvent
import eventDemo.app.event.projection.GameStateRepository
/**
* A command to perform an action to play a new card
*/
data class IWantToPlayCard(
private val gameStateRepository: GameStateRepository,
) : CommandAction<IWantToPlayCardCommand, CardIsPlayedEvent> {
override fun run(command: IWantToPlayCardCommand): (Int) -> CardIsPlayedEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId)
if (!state.isStarted) {
throw CommandException("The game is Not started")
}
if (state.currentPlayerTurn != command.payload.player) {
throw CommandException("Its not your turn!")
}
if (!state.canBePlayThisCard(command.payload.player, command.payload.card)) {
throw CommandException("You cannot play this card")
}
return { version ->
CardIsPlayedEvent(
aggregateId = command.payload.aggregateId,
card = command.payload.card,
player = command.payload.player,
version = version,
)
}
}
}

View File

@@ -0,0 +1,36 @@
package eventDemo.app.command.action
import eventDemo.app.command.CommandException
import eventDemo.app.command.command.IamReadyToPlayCommand
import eventDemo.app.event.event.PlayerReadyEvent
import eventDemo.app.event.projection.GameStateRepository
/**
* A command to set as ready to play
*/
class IamReadyToPlay(
private val gameStateRepository: GameStateRepository,
) : CommandAction<IamReadyToPlayCommand, PlayerReadyEvent> {
@Throws(CommandException::class)
override fun run(command: IamReadyToPlayCommand): (Int) -> PlayerReadyEvent {
val state = gameStateRepository.getLast(command.payload.aggregateId)
val playerExist: Boolean = state.players.contains(command.payload.player)
val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(command.payload.player)
if (state.isStarted) {
throw CommandException("The game is already started")
} else if (!playerExist) {
throw CommandException("You are not in the game")
} else if (playerIsAlreadyReady) {
throw CommandException("You are already ready")
} else {
return { version: Int ->
PlayerReadyEvent(
aggregateId = command.payload.aggregateId,
player = command.payload.player,
version = version,
)
}
}
}
}

View File

@@ -1,11 +1,7 @@
package eventDemo.app.command.command
import eventDemo.app.command.CommandException
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.PlayerHavePassEvent
import eventDemo.app.event.projection.GameState
import eventDemo.libs.command.CommandId
import kotlinx.serialization.Serializable
@@ -23,28 +19,4 @@ data class ICantPlayCommand(
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
suspend fun run(
state: GameState,
eventHandler: GameEventHandler,
) {
if (state.currentPlayerTurn != payload.player) {
throw CommandException("Its not your turn!")
}
val playableCards = state.playableCards(payload.player)
if (playableCards.isEmpty()) {
val takenCard = state.deck.stack.first()
eventHandler.handle(payload.aggregateId) {
PlayerHavePassEvent(
aggregateId = payload.aggregateId,
player = payload.player,
takenCard = takenCard,
version = it,
)
}
} else {
throw CommandException("You can and must play one card, like ${playableCards.first()::class.simpleName}")
}
}
}

View File

@@ -1,11 +1,7 @@
package eventDemo.app.command.command
import eventDemo.app.command.CommandException
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.NewPlayerEvent
import eventDemo.app.event.projection.GameState
import eventDemo.libs.command.CommandId
import kotlinx.serialization.Serializable
@@ -23,21 +19,4 @@ data class IWantToJoinTheGameCommand(
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
suspend fun run(
state: GameState,
eventHandler: GameEventHandler,
) {
if (!state.isStarted) {
eventHandler.handle(payload.aggregateId) {
NewPlayerEvent(
aggregateId = payload.aggregateId,
player = payload.player,
version = it,
)
}
} else {
throw CommandException("The game is already started")
}
}
}

View File

@@ -1,12 +1,8 @@
package eventDemo.app.command.command
import eventDemo.app.command.CommandException
import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.CardIsPlayedEvent
import eventDemo.app.event.projection.GameState
import eventDemo.libs.command.CommandId
import kotlinx.serialization.Serializable
@@ -25,29 +21,4 @@ data class IWantToPlayCardCommand(
override val player: Player,
val card: Card,
) : GameCommand.Payload
suspend fun run(
state: GameState,
eventHandler: GameEventHandler,
) {
if (!state.isStarted) {
throw CommandException("The game is Not started")
}
if (state.currentPlayerTurn != payload.player) {
throw CommandException("Its not your turn!")
}
if (state.canBePlayThisCard(payload.player, payload.card)) {
eventHandler.handle(payload.aggregateId) {
CardIsPlayedEvent(
aggregateId = payload.aggregateId,
card = payload.card,
player = payload.player,
version = it,
)
}
} else {
throw CommandException("You cannot play this card")
}
}
}

View File

@@ -1,11 +1,7 @@
package eventDemo.app.command.command
import eventDemo.app.command.CommandException
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.event.PlayerReadyEvent
import eventDemo.app.event.projection.GameState
import eventDemo.libs.command.CommandId
import kotlinx.serialization.Serializable
@@ -23,29 +19,4 @@ data class IamReadyToPlayCommand(
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
@Throws(CommandException::class)
suspend fun run(
state: GameState,
eventHandler: GameEventHandler,
) {
val playerExist: Boolean = state.players.contains(payload.player)
val playerIsAlreadyReady: Boolean = state.readyPlayers.contains(payload.player)
if (state.isStarted) {
throw CommandException("The game is already started")
} else if (!playerExist) {
throw CommandException("You are not in the game")
} else if (playerIsAlreadyReady) {
throw CommandException("You are already ready")
} else {
eventHandler.handle(payload.aggregateId) {
PlayerReadyEvent(
aggregateId = payload.aggregateId,
player = payload.player,
version = it,
)
}
}
}
}

View File

@@ -9,7 +9,7 @@ import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* A stream to publish and read the played card event.
* Handle the event to dispatch it to store, bus and projections builders
*/
class GameEventHandler(
private val eventBus: GameEventBus,
@@ -23,17 +23,26 @@ class GameEventHandler(
projectionsBuilders.add(builder)
}
/**
* Build Event, and send it to the event store and bus.
* Build also the projections.
*/
override fun handle(
aggregateId: GameId,
buildEvent: (version: Int) -> GameEvent,
): GameEvent =
locks
// Get lock for the aggregate
.computeIfAbsent(aggregateId) { ReentrantLock() }
.withLock {
// Build event with the version
buildEvent(versionBuilder.buildNextVersion(aggregateId))
// then publish it to the event store
.also { eventStore.publish(it) }
}.also { event ->
// Build the projections
projectionsBuilders.forEach { it(event) }
// Publish to the bus
eventBus.publish(event)
}
}

View File

@@ -0,0 +1,24 @@
package eventDemo.app.eventListener
import eventDemo.app.notification.CommandSuccessNotification
import eventDemo.app.notification.Notification
import eventDemo.libs.command.CommandId
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.SendChannel
private fun SendChannel<Notification>.successNotifier(commandId: CommandId): suspend () -> Unit =
{
val logger = KotlinLogging.logger { }
CommandSuccessNotification(commandId = commandId)
.let { notification ->
logger.atDebug {
message = "Notification SUCCESS sent"
payload =
mapOf(
"notification" to notification,
"commandId" to commandId,
)
}
send(notification)
}
}

View File

@@ -5,13 +5,11 @@ import eventDemo.libs.command.Command
import kotlinx.serialization.Serializable
import java.util.UUID
sealed interface CommandStateNotification : Notification
@Serializable
data class ErrorNotification(
data class CommandErrorNotification(
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID(),
val message: String,
val command: Command,
) : Notification,
CommandStateNotification
CommandNotification

View File

@@ -0,0 +1,3 @@
package eventDemo.app.notification
sealed interface CommandNotification : Notification

View File

@@ -0,0 +1,14 @@
package eventDemo.app.notification
import eventDemo.configuration.UUIDSerializer
import eventDemo.libs.command.CommandId
import kotlinx.serialization.Serializable
import java.util.UUID
@Serializable
data class CommandSuccessNotification(
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID(),
val commandId: CommandId,
) : Notification,
CommandNotification

View File

@@ -9,7 +9,8 @@ import eventDemo.app.event.GameEventStore
import eventDemo.app.event.projection.GameStateRepository
import eventDemo.app.event.projection.SnapshotConfig
import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.libs.command.CommandStreamChannelBuilder
import eventDemo.libs.command.CommandRunnerController
import eventDemo.libs.command.CommandStreamChannel
import eventDemo.libs.event.EventBusInMemory
import eventDemo.libs.event.EventStoreInMemory
import eventDemo.libs.event.VersionBuilder
@@ -41,12 +42,20 @@ val appKoinModule =
GameStateRepository(get(), get(), snapshotConfig = SnapshotConfig())
}
single {
CommandStreamChannelBuilder<GameCommand>()
CommandStreamChannel<GameCommand>(get())
}
single {
CommandRunnerController<GameCommand>()
}
single {
GameCommandHandler(get(), get(), get(), get())
}
singleOf(::VersionBuilderLocal) bind VersionBuilder::class
singleOf(::GameEventHandler)
singleOf(::GameCommandActionRunner)
singleOf(::GameCommandHandler)
singleOf(::PlayerNotificationEventListener)
// Actions
configureActions()
}

View File

@@ -0,0 +1,18 @@
package eventDemo.configuration
import eventDemo.app.command.action.ICantPlay
import eventDemo.app.command.action.IWantToJoinTheGame
import eventDemo.app.command.action.IWantToPlayCard
import eventDemo.app.command.action.IamReadyToPlay
import org.koin.core.module.Module
import org.koin.core.module.dsl.singleOf
/**
* Configure all actions
*/
fun Module.configureActions() {
singleOf(::IWantToPlayCard)
singleOf(::IamReadyToPlay)
singleOf(::IWantToJoinTheGame)
singleOf(::ICantPlay)
}

View File

@@ -0,0 +1,51 @@
package eventDemo.libs.command
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
/**
* Controls the execution of a command to prevent it from being executed more than once.
*/
class CommandRunnerController<C : Command>(
private val maxCacheTime: Duration = 10.minutes,
) {
private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap()
suspend fun runOnlyOnce(
command: C,
action: CommandBlock<C>,
) {
if (!isAlreadyExecuted(command)) {
action(command)
setAsExecuted(command)
removeOldCache()
} else {
throw Exception("Command already executed", command)
}
}
private fun setAsExecuted(command: C) {
executedCommand.computeIfAbsent(command.id) { Pair(false, Clock.System.now()) }
}
private fun removeOldCache() {
executedCommand
.filterValues { (_, date) ->
(date + maxCacheTime) > Clock.System.now()
}.keys
.forEach {
executedCommand.remove(it)
}
}
private fun isAlreadyExecuted(command: C): Boolean =
executedCommand[command.id]?.first ?: false
class Exception(
override val message: String,
val command: Command,
) : kotlin.Exception(message)
}

View File

@@ -1,35 +0,0 @@
package eventDemo.libs.command
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
/**
* Represent a Command stream.
*
* The stream contains a list of all actions yet to be executed.
*/
interface CommandStream<C : Command> {
/**
* A class to implement success/failed action.
*/
interface ComputeStatus {
suspend fun ack()
suspend fun nack()
}
/**
* Apply an action to all command income in the stream.
*/
suspend fun process(action: CommandBlock<C>)
@OptIn(DelicateCoroutinesApi::class)
fun blockAndProcess(action: CommandBlock<C>) {
GlobalScope.launch {
process(action)
}
}
}
typealias CommandBlock<C> = suspend CommandStream.ComputeStatus.(C) -> Unit

View File

@@ -2,95 +2,56 @@ package eventDemo.libs.command
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
class CommandStreamChannelBuilder<C : Command>(
private val maxCacheTime: Duration = 10.minutes,
) {
operator fun invoke(incoming: ReceiveChannel<C>): CommandStreamChannel<C> =
CommandStreamChannel(incoming, maxCacheTime)
}
/**
* Manage [Command]'s with kotlin Channel
* Manage [Command]'s with kotlin Channel.
*
* Use [CommandRunnerController] to prevent multiple executions.
*
* Add logs when command success or failed
*/
class CommandStreamChannel<C : Command>(
private val incoming: ReceiveChannel<C>,
private val maxCacheTime: Duration = 10.minutes,
) : CommandStream<C> {
private val controller: CommandRunnerController<C>,
) {
private val logger = KotlinLogging.logger {}
private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap()
override suspend fun process(action: CommandBlock<C>) {
suspend fun process(
incoming: ReceiveChannel<C>,
action: CommandBlock<C>,
) {
for (command in incoming) {
val now = Clock.System.now()
val (status, _) = executedCommand.computeIfAbsent(command.id) { Pair(false, now) }
if (status) {
try {
controller.runOnlyOnce(command) {
// Wrap action to add logs
runAndLogStatus(command, action)
}
} catch (e: CommandRunnerController.Exception) {
logger.atWarn {
message = "Command already executed: $command"
message = e.message
payload = mapOf("command" to command)
}
} else {
compute(command, action)
}
executedCommand
.filterValues { (_, date) ->
(date + maxCacheTime) > now
}.keys
.forEach {
executedCommand.remove(it)
}
}
}
private suspend fun compute(
private suspend fun runAndLogStatus(
command: C,
action: CommandBlock<C>,
) {
val status =
object : CommandStream.ComputeStatus {
var isSet: Boolean = false
override suspend fun ack() {
if (!isSet) markAsSuccess(command) else error("Already NACK")
isSet = true
}
override suspend fun nack() {
if (!isSet) markAsFailed(command) else error("Already ACK")
isSet = true
}
}
val actionResult = runCatching { status.action(command) }
val actionResult = runCatching { action(command) }
if (actionResult.isFailure) {
logger.atInfo {
message = "Error on compute the Command: $command"
logger.atWarn {
message = "Compute command FAILED: $command"
payload = mapOf("command" to command)
cause = actionResult.exceptionOrNull()
}
markAsFailed(command)
} else if (!status.isSet) {
status.ack()
}
}
private suspend fun markAsSuccess(command: C) {
} else if (actionResult.isSuccess) {
logger.atInfo {
message = "Compute command SUCCESS: $command"
payload = mapOf("command" to command)
}
}
}
}
private suspend fun markAsFailed(command: C) {
logger.atWarn {
message = "Compute command FAILED: $command"
payload = mapOf("command" to command)
}
}
}
typealias CommandBlock<C> = suspend (C) -> Unit

View File

@@ -6,11 +6,13 @@ import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.app.eventListener.ReactionEventListener
import eventDemo.app.notification.CommandSuccessNotification
import eventDemo.app.notification.Notification
import eventDemo.app.notification.WelcomeToTheGameNotification
import eventDemo.configuration.appKoinModule
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.collections.shouldContain
import io.kotest.matchers.equals.shouldBeEqual
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
@@ -36,7 +38,12 @@ class GameCommandHandlerTest :
commandHandler.handle(player, channelCommand, channelNotification)
}
channelCommand.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)))
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand ->
channelCommand.send(sendCommand)
channelNotification.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
assertIs<WelcomeToTheGameNotification>(channelNotification.receive()).let {
it.players shouldContain player
}

View File

@@ -15,6 +15,7 @@ import eventDemo.app.event.projection.ProjectionSnapshotRepositoryInMemory
import eventDemo.app.event.projection.apply
import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.app.eventListener.ReactionEventListener
import eventDemo.app.notification.CommandSuccessNotification
import eventDemo.app.notification.ItsTheTurnOfNotification
import eventDemo.app.notification.Notification
import eventDemo.app.notification.PlayerAsJoinTheGameNotification
@@ -59,14 +60,25 @@ class GameStateTest :
val player1Job =
launch {
channelCommand1.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)))
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
channelNotification1.receive().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1)
}
channelNotification1.receive().let {
assertIs<PlayerAsJoinTheGameNotification>(it).player shouldBeEqual player2
}
channelCommand1.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)))
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
channelNotification1.receive().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player2
}
@@ -80,7 +92,13 @@ class GameStateTest :
player shouldBeEqual player1
}
}
channelCommand1.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())))
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
channelNotification1.receive().let {
assertIs<ItsTheTurnOfNotification>(it).apply {
@@ -99,14 +117,27 @@ class GameStateTest :
val player2Job =
launch {
delay(100)
channelCommand2.send(IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)))
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
channelNotification2.receive().let {
assertIs<WelcomeToTheGameNotification>(it).players shouldBeEqual setOf(player1, player2)
}
channelNotification2.receive().let {
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player1
}
channelCommand2.send(IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)))
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
val player2Hand =
channelNotification2.receive().let {
assertIs<TheGameWasStartedNotification>(it).hand shouldHaveSize 7
@@ -129,7 +160,13 @@ class GameStateTest :
player shouldBeEqual player2
}
}
channelCommand2.send(IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())))
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
}
}
}
koinApplication { modules(appKoinModule) }.koin.apply {

View File

@@ -3,7 +3,10 @@ package eventDemo.libs.command
import io.kotest.core.spec.style.FunSpec
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
@Serializable
@@ -11,6 +14,7 @@ class CommandTest(
override val id: CommandId,
) : Command
@OptIn(DelicateCoroutinesApi::class)
class CommandStreamChannelTest :
FunSpec({
@@ -18,15 +22,17 @@ class CommandStreamChannelTest :
val command = CommandTest(CommandId())
val channel = Channel<CommandTest>()
val stream =
CommandStreamChannel(channel)
val stream = CommandStreamChannel(CommandRunnerController())
val spyCall: () -> Unit = mockk(relaxed = true)
stream.blockAndProcess {
GlobalScope.launch {
stream.process(channel) {
println("In action ${it.id}")
spyCall()
}
}
channel.send(command)
verify(exactly = 1) { spyCall() }
}