CommandStreamChannel block the duplicate call
add GameCommandHandlerTest Create a CommandStreamChannelBuilder to inject maxCacheTime Add missing empty disabled test fix EventStreamInMemory.readAll
This commit is contained in:
@@ -11,6 +11,7 @@ import eventDemo.app.event.event.GameEvent
|
||||
import eventDemo.app.event.projection.GameStateRepository
|
||||
import eventDemo.app.notification.ErrorNotification
|
||||
import eventDemo.app.notification.Notification
|
||||
import eventDemo.libs.command.CommandStreamChannelBuilder
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
@@ -23,6 +24,7 @@ import kotlinx.coroutines.channels.SendChannel
|
||||
class GameCommandHandler(
|
||||
private val eventHandler: GameEventHandler,
|
||||
private val gameStateRepository: GameStateRepository,
|
||||
private val commandStreamChannel: CommandStreamChannelBuilder<GameCommand>,
|
||||
) {
|
||||
private val logger = KotlinLogging.logger { }
|
||||
|
||||
@@ -33,27 +35,43 @@ class GameCommandHandler(
|
||||
player: Player,
|
||||
incomingCommandChannel: ReceiveChannel<GameCommand>,
|
||||
outgoingErrorChannelNotification: SendChannel<Notification>,
|
||||
) = GameCommandStream(incomingCommandChannel).process { command ->
|
||||
if (command.payload.player.id != player.id) {
|
||||
nack()
|
||||
}
|
||||
|
||||
val playerErrorNotifier: suspend (String) -> Unit = {
|
||||
val notification = ErrorNotification(message = it)
|
||||
logger.atWarn {
|
||||
message = "Notification send ERROR: ${notification.message}"
|
||||
payload = mapOf("notification" to notification)
|
||||
) = commandStreamChannel(incomingCommandChannel)
|
||||
.process { command ->
|
||||
if (command.payload.player.id != player.id) {
|
||||
logger.atWarn {
|
||||
message = "Handle command Refuse, the player of the command is not the same: $command"
|
||||
payload = mapOf("command" to command)
|
||||
}
|
||||
nack()
|
||||
} else {
|
||||
logger.atInfo {
|
||||
message = "Handle command: $command"
|
||||
payload = mapOf("command" to command)
|
||||
}
|
||||
command.run(outgoingErrorChannelNotification)
|
||||
}
|
||||
outgoingErrorChannelNotification.send(notification)
|
||||
}
|
||||
|
||||
val gameState = gameStateRepository.get(command.payload.gameId)
|
||||
private suspend fun GameCommand.run(outgoingErrorChannelNotification: SendChannel<Notification>) {
|
||||
val gameState = gameStateRepository.get(payload.gameId)
|
||||
val playerErrorNotifier = errorNotifier(outgoingErrorChannelNotification)
|
||||
|
||||
when (command) {
|
||||
is IWantToPlayCardCommand -> command.run(gameState, playerErrorNotifier, eventHandler)
|
||||
is IamReadyToPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler)
|
||||
is IWantToJoinTheGameCommand -> command.run(gameState, playerErrorNotifier, eventHandler)
|
||||
is ICantPlayCommand -> command.run(gameState, playerErrorNotifier, eventHandler)
|
||||
when (this) {
|
||||
is IWantToPlayCardCommand -> run(gameState, playerErrorNotifier, eventHandler)
|
||||
is IamReadyToPlayCommand -> run(gameState, playerErrorNotifier, eventHandler)
|
||||
is IWantToJoinTheGameCommand -> run(gameState, playerErrorNotifier, eventHandler)
|
||||
is ICantPlayCommand -> run(gameState, playerErrorNotifier, eventHandler)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun errorNotifier(channel: SendChannel<Notification>): suspend (String) -> Unit =
|
||||
{
|
||||
val logger = KotlinLogging.logger { }
|
||||
val notification = ErrorNotification(message = it)
|
||||
logger.atWarn {
|
||||
message = "Notification send ERROR: ${notification.message}"
|
||||
payload = mapOf("notification" to notification)
|
||||
}
|
||||
channel.send(notification)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package eventDemo.app.command
|
||||
|
||||
import eventDemo.app.entity.Player
|
||||
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
||||
import eventDemo.app.notification.Notification
|
||||
import eventDemo.libs.fromFrameChannel
|
||||
import eventDemo.libs.toObjectChannel
|
||||
import io.ktor.server.application.ApplicationCall
|
||||
@@ -12,6 +13,7 @@ import io.ktor.server.routing.Route
|
||||
import io.ktor.server.websocket.webSocket
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
@DelicateCoroutinesApi
|
||||
@@ -22,14 +24,15 @@ fun Route.gameSocket(
|
||||
authenticate {
|
||||
webSocket("/game") {
|
||||
val currentPlayer = call.getPlayer()
|
||||
val outgoingFrameChannel: SendChannel<Notification> = fromFrameChannel(outgoing)
|
||||
GlobalScope.launch {
|
||||
commandHandler.handle(
|
||||
currentPlayer,
|
||||
toObjectChannel(incoming),
|
||||
fromFrameChannel(outgoing),
|
||||
outgoingFrameChannel,
|
||||
)
|
||||
}
|
||||
playerNotificationListener.startListening(outgoing, currentPlayer)
|
||||
playerNotificationListener.startListening(outgoingFrameChannel, currentPlayer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,10 @@ class GameStateRepository(
|
||||
eventHandler: GameEventHandler,
|
||||
private val maxSnapshotCacheSize: Int = 20,
|
||||
) {
|
||||
private val projections: MutableMap<GameId, GameState> = ConcurrentHashMap()
|
||||
private val projections: ConcurrentHashMap<GameId, GameState> = ConcurrentHashMap()
|
||||
private val version: AtomicInteger = AtomicInteger(0)
|
||||
private val projectionsSnapshot: MutableMap<GameEvent, GameState> = ConcurrentHashMap()
|
||||
private val sortedSnapshotByVersion: MutableMap<GameEvent, Int> = ConcurrentHashMap()
|
||||
private val projectionsSnapshot: ConcurrentHashMap<GameEvent, GameState> = ConcurrentHashMap()
|
||||
private val sortedSnapshotByVersion: ConcurrentHashMap<GameEvent, Int> = ConcurrentHashMap()
|
||||
|
||||
init {
|
||||
eventHandler.registerProjectionBuilder { event ->
|
||||
@@ -68,8 +68,9 @@ class GameStateRepository(
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
fun get(gameId: GameId): GameState =
|
||||
projections[gameId]
|
||||
?: gameId.buildStateFromEventStream(eventStream)
|
||||
projections.computeIfAbsent(gameId) {
|
||||
gameId.buildStateFromEventStream(eventStream)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the [GameState] to the specific [event][GameEvent].
|
||||
@@ -78,8 +79,9 @@ class GameStateRepository(
|
||||
* It fetches it from the local cache if possible, otherwise it builds it.
|
||||
*/
|
||||
fun getUntil(event: GameEvent): GameState =
|
||||
projectionsSnapshot[event]
|
||||
?: event.buildStateFromEventStreamTo(eventStream)
|
||||
projectionsSnapshot.computeIfAbsent(event) {
|
||||
event.buildStateFromEventStreamTo(eventStream)
|
||||
}
|
||||
|
||||
private fun GameState.update() {
|
||||
projections[gameId] = this
|
||||
|
||||
@@ -23,9 +23,7 @@ import eventDemo.app.notification.PlayerWinNotification
|
||||
import eventDemo.app.notification.TheGameWasStartedNotification
|
||||
import eventDemo.app.notification.WelcomeToTheGameNotification
|
||||
import eventDemo.app.notification.YourNewCardNotification
|
||||
import eventDemo.shared.toFrame
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import io.ktor.websocket.Frame
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
|
||||
@@ -36,7 +34,7 @@ class GameEventPlayerNotificationListener(
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
fun startListening(
|
||||
outgoingNotificationChannel: SendChannel<Frame>,
|
||||
outgoingNotificationChannel: SendChannel<Notification>,
|
||||
currentPlayer: Player,
|
||||
) {
|
||||
eventBus.subscribe { event: GameEvent ->
|
||||
@@ -45,7 +43,7 @@ class GameEventPlayerNotificationListener(
|
||||
fun Notification.send() {
|
||||
if (currentState.players.contains(currentPlayer)) {
|
||||
// Only notify players who have already joined the game.
|
||||
outgoingNotificationChannel.trySendBlocking(toFrame())
|
||||
outgoingNotificationChannel.trySendBlocking(this)
|
||||
logger.atInfo {
|
||||
message = "Notification for player ${currentPlayer.name} was SEND: ${this@send}"
|
||||
payload = mapOf("notification" to this@send, "event" to event)
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package eventDemo.configuration
|
||||
|
||||
import eventDemo.app.command.GameCommandHandler
|
||||
import eventDemo.app.command.command.GameCommand
|
||||
import eventDemo.app.event.GameEventBus
|
||||
import eventDemo.app.event.GameEventHandler
|
||||
import eventDemo.app.event.GameEventStream
|
||||
import eventDemo.app.event.projection.GameStateRepository
|
||||
import eventDemo.app.eventListener.GameEventPlayerNotificationListener
|
||||
import eventDemo.libs.command.CommandStreamChannelBuilder
|
||||
import eventDemo.libs.event.EventBusInMemory
|
||||
import eventDemo.libs.event.EventStreamInMemory
|
||||
import io.ktor.server.application.Application
|
||||
@@ -33,6 +35,9 @@ val appKoinModule =
|
||||
single {
|
||||
GameStateRepository(get(), get())
|
||||
}
|
||||
single {
|
||||
CommandStreamChannelBuilder<GameCommand>()
|
||||
}
|
||||
|
||||
singleOf(::GameEventHandler)
|
||||
singleOf(::GameCommandHandler)
|
||||
|
||||
@@ -2,18 +2,48 @@ package eventDemo.libs.command
|
||||
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
class CommandStreamChannelBuilder<C : Command>(
|
||||
private val maxCacheTime: Duration = 10.minutes,
|
||||
) {
|
||||
operator fun invoke(incoming: ReceiveChannel<C>): CommandStreamChannel<C> = CommandStreamChannel(incoming, maxCacheTime)
|
||||
}
|
||||
|
||||
/**
|
||||
* Manage [Command]'s with kotlin Channel
|
||||
*/
|
||||
class CommandStreamChannel<C : Command>(
|
||||
private val incoming: ReceiveChannel<C>,
|
||||
private val maxCacheTime: Duration = 10.minutes,
|
||||
) : CommandStream<C> {
|
||||
private val logger = KotlinLogging.logger {}
|
||||
private val executedCommand: ConcurrentHashMap<CommandId, Pair<Boolean, Instant>> = ConcurrentHashMap()
|
||||
|
||||
override suspend fun process(action: CommandBlock<C>) {
|
||||
for (command in incoming) {
|
||||
compute(command, action)
|
||||
val now = Clock.System.now()
|
||||
val (status, _) = executedCommand.computeIfAbsent(command.id) { Pair(false, now) }
|
||||
|
||||
if (status) {
|
||||
logger.atWarn {
|
||||
message = "Command already executed: $command"
|
||||
payload = mapOf("command" to command)
|
||||
}
|
||||
} else {
|
||||
compute(command, action)
|
||||
}
|
||||
executedCommand
|
||||
.filterValues { (_, date) ->
|
||||
(date + maxCacheTime) > now
|
||||
}.keys
|
||||
.forEach {
|
||||
executedCommand.remove(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,10 @@ class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID>
|
||||
.filterIsInstance(eventType.java)
|
||||
.lastOrNull { it.gameId == aggregateId }
|
||||
|
||||
override fun readAll(aggregateId: ID): Set<E> = events.toSet()
|
||||
override fun readAll(aggregateId: ID): Set<E> =
|
||||
events
|
||||
.filter { it.gameId == aggregateId }
|
||||
.toSet()
|
||||
}
|
||||
|
||||
inline fun <reified R : E, E : Event<ID>, ID : AggregateId> EventStream<E, ID>.readLastOf(aggregateId: ID): R? =
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
package eventDemo.shared
|
||||
|
||||
import eventDemo.app.command.command.GameCommand
|
||||
import eventDemo.app.event.event.GameEvent
|
||||
import eventDemo.app.notification.Notification
|
||||
import io.ktor.websocket.Frame
|
||||
import io.ktor.websocket.readText
|
||||
import kotlinx.serialization.json.Json
|
||||
|
||||
fun Frame.Text.toEvent(): GameEvent = Json.decodeFromString(GameEvent.serializer(), readText())
|
||||
|
||||
fun GameEvent.toFrame(): Frame.Text = Frame.Text(Json.encodeToString(GameEvent.serializer(), this))
|
||||
|
||||
fun Frame.Text.toCommand(): GameCommand = Json.decodeFromString(GameCommand.serializer(), readText())
|
||||
|
||||
fun GameCommand.toFrame(): Frame.Text = Frame.Text(Json.encodeToString(GameCommand.serializer(), this))
|
||||
|
||||
fun Frame.toNotification(): Notification =
|
||||
Json.decodeFromString(
|
||||
Notification.serializer(),
|
||||
(this as Frame.Text).readText(),
|
||||
)
|
||||
|
||||
fun Notification.toFrame(): Frame.Text = Frame.Text(Json.encodeToString(Notification.serializer(), this))
|
||||
Reference in New Issue
Block a user