Add AggregateID for the PlayerNotificationEventListener and CommandHandler

This commit is contained in:
2025-03-17 19:41:47 +01:00
parent 0374712824
commit 1a96c68521
11 changed files with 96 additions and 42 deletions

View File

@@ -7,5 +7,5 @@ import eventDemo.libs.event.EventBus
import eventDemo.libs.event.EventBusInMemory
class GameEventBusInMemory :
GameEventBus,
EventBus<GameEvent, GameId> by EventBusInMemory<GameEvent, GameId>()
GameEventBus(),
EventBus<GameEvent, GameId> by EventBusInMemory()

View File

@@ -1,12 +1,16 @@
package eventDemo.adapter.interfaceLayer
import eventDemo.business.command.GameCommandHandler
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.eventListener.PlayerNotificationEventListener
import eventDemo.business.notification.Notification
import eventDemo.configuration.ktor.BadRequestException
import eventDemo.configuration.ktor.HttpErrorBadRequest
import eventDemo.libs.fromFrameChannel
import eventDemo.libs.toObjectChannel
import io.github.oshai.kotlinlogging.withLoggingContext
import io.ktor.http.parameters
import io.ktor.server.application.ApplicationCall
import io.ktor.server.auth.authenticate
import io.ktor.server.auth.jwt.JWTPrincipal
@@ -18,6 +22,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.launch
import java.util.UUID
@DelicateCoroutinesApi
fun Route.gameWebSocket(
@@ -25,18 +30,27 @@ fun Route.gameWebSocket(
commandHandler: GameCommandHandler,
) {
authenticate {
webSocket("/game") {
webSocket("/game/{id}") {
val currentPlayer = call.getPlayer()
val gameId =
call.parameters["id"]?.let { GameId(UUID.fromString(it)) }
?: throw BadRequestException(HttpErrorBadRequest("No ID fore the game"))
val outgoingFrameChannel: SendChannel<Notification> = fromFrameChannel(outgoing)
withLoggingContext("currentPlayer" to currentPlayer.toString()) {
GlobalScope.launch {
commandHandler.handle(
currentPlayer,
gameId,
toObjectChannel(incoming),
outgoingFrameChannel,
)
}
playerNotificationListener.startListening({ outgoingFrameChannel.trySendBlocking(it) }, currentPlayer)
playerNotificationListener.startListening(
{ outgoingFrameChannel.trySendBlocking(it) },
currentPlayer,
gameId,
)
}
}
}

View File

@@ -1,6 +1,7 @@
package eventDemo.business.command
import eventDemo.business.command.command.GameCommand
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.GameEventHandler
@@ -55,27 +56,35 @@ class GameCommandHandler(
*/
suspend fun handle(
player: Player,
gameId: GameId,
incomingCommandChannel: ReceiveChannel<GameCommand>,
channelNotification: SendChannel<Notification>,
) {
commandStreamChannel.process(incomingCommandChannel) { command ->
withLoggingContext("command" to command.toString()) {
if (command.payload.aggregateId.id != gameId.id) {
logger.warn { "Handle command Refuse, the gameId of the command is not the same" }
channelNotification.sendError(command)("The gameId in the command does not match with your game")
return@process
}
if (command.payload.player.id != player.id) {
logger.warn { "Handle command Refuse, the player of the command is not the same" }
channelNotification.sendError(command)("You are not the author of this command")
} else {
logger.info { "Handle command" }
try {
val eventBuilder = runner.run(command)
return@process
}
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
}
} catch (e: CommandException) {
logger.warn(e) { e.message }
channelNotification.sendError(command)(e.message)
logger.info { "Handle command" }
try {
val eventBuilder = runner.run(command)
eventHandler.handle(command.payload.aggregateId) { version ->
eventBuilder(version)
.also { eventCommandMap.set(it.eventId, channelNotification, command.id) }
}
} catch (e: CommandException) {
logger.warn(e) { e.message }
channelNotification.sendError(command)(e.message)
}
}
}

View File

@@ -3,5 +3,13 @@ package eventDemo.business.event
import eventDemo.business.entity.GameId
import eventDemo.business.event.event.GameEvent
import eventDemo.libs.event.EventBus
import java.util.UUID
interface GameEventBus : EventBus<GameEvent, GameId>
abstract class GameEventBus :
EventBus<GameEvent, GameId>,
Comparable<GameEventBus> {
private val instanceId: UUID = UUID.randomUUID()
override fun compareTo(other: GameEventBus): Int =
compareValues(instanceId, other.instanceId)
}

View File

@@ -24,8 +24,8 @@ data class GameStartedEvent(
fun new(
id: GameId,
players: Set<Player>,
shuffleIsDisabled: Boolean = isDisabled,
version: Int,
shuffleIsDisabled: Boolean = isDisabled,
): GameStartedEvent =
GameStartedEvent(
aggregateId = id,

View File

@@ -1,6 +1,7 @@
package eventDemo.business.event.eventListener
import eventDemo.business.entity.Card
import eventDemo.business.entity.GameId
import eventDemo.business.entity.Player
import eventDemo.business.event.GameEventBus
import eventDemo.business.event.event.CardIsPlayedEvent
@@ -35,9 +36,14 @@ class PlayerNotificationEventListener(
fun startListening(
outgoingNotification: (Notification) -> Unit,
currentPlayer: Player,
gameId: GameId,
) {
eventBus.subscribe { event: GameEvent ->
withLoggingContext("event" to event.toString()) {
if (event.aggregateId != gameId) {
return@subscribe
}
val currentState = gameStateRepository.getUntil(event)
fun Notification.send() {

View File

@@ -10,6 +10,7 @@ import eventDemo.business.event.projection.GameState
import eventDemo.business.event.projection.GameStateRepository
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import java.util.concurrent.ConcurrentSkipListSet
class ReactionEventListener(
private val eventBus: GameEventBus,
@@ -19,17 +20,22 @@ class ReactionEventListener(
) {
companion object Config {
const val DEFAULT_PRIORITY = -1000
val registeredListeners = ConcurrentSkipListSet<GameEventBus>()
}
private val logger = KotlinLogging.logger { }
fun init() {
eventBus.subscribe(priority) { event: GameEvent ->
withLoggingContext("event" to event.toString()) {
val state = gameStateRepository.getUntil(event)
sendStartGameEvent(state, event)
sendWinnerEvent(state)
if (registeredListeners.add(eventBus)) {
eventBus.subscribe(priority) { event: GameEvent ->
withLoggingContext("event" to event.toString()) {
val state = gameStateRepository.getUntil(event)
sendStartGameEvent(state, event)
sendWinnerEvent(state)
}
}
} else {
logger.error { "${this::class.java.simpleName} is already init for this bus" }
}
}

View File

@@ -10,6 +10,7 @@ import io.ktor.server.plugins.cors.routing.CORS
import io.ktor.server.plugins.statuspages.StatusPages
import io.ktor.server.resources.Resources
import io.ktor.server.response.respondText
import kotlinx.serialization.Serializable
fun Application.configureHttpRouting() {
install(CORS) {
@@ -38,13 +39,14 @@ class BadRequestException(
val httpError: HttpErrorBadRequest,
) : Exception()
@Serializable
class HttpErrorBadRequest(
statusCode: HttpStatusCode,
val title: String = statusCode.description,
val invalidParams: List<InvalidParam>,
val title: String = HttpStatusCode.BadRequest.description,
val invalidParams: List<InvalidParam> = emptyList(),
) {
val statusCode: Int = statusCode.value
val statusCode: Int = HttpStatusCode.BadRequest.value
@Serializable
data class InvalidParam(
val name: String,
val reason: String,

View File

@@ -34,10 +34,19 @@ class GameCommandHandlerTest :
val channelCommand = Channel<GameCommand>(Channel.BUFFERED)
val channelNotification = Channel<Notification>(Channel.BUFFERED)
ReactionEventListener(get(), get(), get()).init()
notificationListener.startListening({ channelNotification.trySendBlocking(it) }, player)
notificationListener.startListening(
{ channelNotification.trySendBlocking(it) },
player,
gameId,
)
GlobalScope.launch {
commandHandler.handle(player, channelCommand, channelNotification)
commandHandler.handle(
player,
gameId,
channelCommand,
channelNotification,
)
}
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player)).also { sendCommand ->

View File

@@ -48,7 +48,7 @@ class GameSimulationTest :
test("Simulation of a game") {
withTimeout(2.seconds) {
disableShuffleDeck()
val id = GameId()
val gameId = GameId()
val player1 = Player(name = "Nikola")
val player2 = Player(name = "Einstein")
val channelCommand1 = Channel<GameCommand>(Channel.BUFFERED)
@@ -61,7 +61,7 @@ class GameSimulationTest :
val player1Job =
launch {
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player1)).also { sendCommand ->
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player1)).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -74,7 +74,7 @@ class GameSimulationTest :
channelNotification1.receive().let {
assertIs<PlayerAsJoinTheGameNotification>(it).player shouldBeEqual player2
}
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player1)).also { sendCommand ->
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player1)).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -94,7 +94,7 @@ class GameSimulationTest :
}
}
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player1, player1Hand.first())).also { sendCommand ->
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player1, player1Hand.first())).also { sendCommand ->
channelCommand1.send(sendCommand)
channelNotification1.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -118,7 +118,7 @@ class GameSimulationTest :
val player2Job =
launch {
delay(100)
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(id, player2)).also { sendCommand ->
IWantToJoinTheGameCommand(IWantToJoinTheGameCommand.Payload(gameId, player2)).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -132,7 +132,7 @@ class GameSimulationTest :
assertIs<PlayerWasReadyNotification>(it).player shouldBeEqual player1
}
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(id, player2)).also { sendCommand ->
IamReadyToPlayCommand(IamReadyToPlayCommand.Payload(gameId, player2)).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -162,7 +162,7 @@ class GameSimulationTest :
}
}
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(id, player2, player2Hand.first())).also { sendCommand ->
IWantToPlayCardCommand(IWantToPlayCardCommand.Payload(gameId, player2, player2Hand.first())).also { sendCommand ->
channelCommand2.send(sendCommand)
channelNotification2.receive().let {
assertIs<CommandSuccessNotification>(it).commandId shouldBeEqual sendCommand.id
@@ -175,14 +175,14 @@ class GameSimulationTest :
val eventStore by inject<GameEventStore>()
val playerNotificationListener by inject<PlayerNotificationEventListener>()
ReactionEventListener(get(), get(), get()).init()
playerNotificationListener.startListening({ channelNotification1.trySendBlocking(it) }, player1)
playerNotificationListener.startListening({ channelNotification2.trySendBlocking(it) }, player2)
playerNotificationListener.startListening({ channelNotification1.trySendBlocking(it) }, player1, gameId)
playerNotificationListener.startListening({ channelNotification2.trySendBlocking(it) }, player2, gameId)
GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player1, channelCommand1, channelNotification1)
commandHandler.handle(player1, gameId, channelCommand1, channelNotification1)
}
GlobalScope.launch(Dispatchers.IO) {
commandHandler.handle(player2, channelCommand2, channelNotification2)
commandHandler.handle(player2, gameId, channelCommand2, channelNotification2)
}
joinAll(player1Job, player2Job)
@@ -192,9 +192,9 @@ class GameSimulationTest :
eventStore = eventStore,
initialStateBuilder = { aggregateId: GameId -> GameState(aggregateId) },
applyToProjection = GameState::apply,
).getLast(id)
).getLast(gameId)
state.aggregateId shouldBeEqual id
state.aggregateId shouldBeEqual gameId
assertTrue(state.isStarted)
state.players shouldBeEqual setOf(player1, player2)
state.readyPlayers shouldBeEqual setOf(player1, player2)

View File

@@ -79,8 +79,8 @@ class GameStateRouteTest :
GameStartedEvent.new(
gameId,
setOf(player1, player2),
shuffleIsDisabled = true,
it,
shuffleIsDisabled = true,
)
}
delay(100)