extract projection snapshot logic

implement GameStateRepositoryTest
add lambda to the GameEventHandler.handle{} to set the version

add VersionBuilder
add version to the events
add creation date to the events
rename gameId to aggregateId
add EventHandler interface
This commit is contained in:
2025-03-13 00:27:44 +01:00
parent d5b033e731
commit 286dedac76
36 changed files with 684 additions and 266 deletions

View File

@@ -18,7 +18,7 @@ class GameCommandRunner(
command: GameCommand,
outgoingErrorChannelNotification: SendChannel<Notification>,
) {
val gameState = gameStateRepository.get(command.payload.gameId)
val gameState = gameStateRepository.getLast(command.payload.aggregateId)
val errorNotifier = errorNotifier(command, outgoingErrorChannelNotification)
when (command) {

View File

@@ -11,7 +11,7 @@ sealed interface GameCommand : Command {
@Serializable
sealed interface Payload {
val gameId: GameId
val aggregateId: GameId
val player: Player
}
}

View File

@@ -20,7 +20,7 @@ data class ICantPlayCommand(
@Serializable
data class Payload(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
@@ -37,13 +37,14 @@ data class ICantPlayCommand(
if (playableCards.isEmpty()) {
val takenCard = state.deck.stack.first()
eventHandler.handle(
eventHandler.handle {
PlayerHavePassEvent(
gameId = payload.gameId,
aggregateId = payload.aggregateId,
player = payload.player,
takenCard = takenCard,
),
)
version = it,
)
}
} else {
playerErrorNotifier("You can and must play one card, like ${playableCards.first()::class.simpleName}")
}

View File

@@ -20,7 +20,7 @@ data class IWantToJoinTheGameCommand(
@Serializable
data class Payload(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
@@ -30,12 +30,13 @@ data class IWantToJoinTheGameCommand(
eventHandler: GameEventHandler,
) {
if (!state.isStarted) {
eventHandler.handle(
eventHandler.handle {
NewPlayerEvent(
payload.gameId,
payload.player,
),
)
aggregateId = payload.aggregateId,
player = payload.player,
version = it,
)
}
} else {
playerErrorNotifier("The game is already started")
}

View File

@@ -21,7 +21,7 @@ data class IWantToPlayCardCommand(
@Serializable
data class Payload(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
val card: Card,
) : GameCommand.Payload
@@ -41,13 +41,14 @@ data class IWantToPlayCardCommand(
}
if (state.canBePlayThisCard(payload.player, payload.card)) {
eventHandler.handle(
eventHandler.handle {
CardIsPlayedEvent(
payload.gameId,
payload.card,
payload.player,
),
)
aggregateId = payload.aggregateId,
card = payload.card,
player = payload.player,
version = it,
)
}
} else {
playerErrorNotifier("You cannot play this card")
}

View File

@@ -20,7 +20,7 @@ data class IamReadyToPlayCommand(
@Serializable
data class Payload(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
) : GameCommand.Payload
@@ -39,12 +39,13 @@ data class IamReadyToPlayCommand(
} else if (playerIsAlreadyReady) {
playerErrorNotifier("You are already ready")
} else {
eventHandler.handle(
eventHandler.handle {
PlayerReadyEvent(
payload.gameId,
payload.player,
),
)
aggregateId = payload.aggregateId,
player = payload.player,
version = it,
)
}
}
}
}

View File

@@ -0,0 +1,13 @@
package eventDemo.app.event
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
/**
* A stream to publish and read the played card event.
*/
interface EventHandler<E : Event<ID>, ID : AggregateId> {
fun registerProjectionBuilder(builder: (E) -> Unit)
fun handle(buildEvent: (version: Int) -> E): E
}

View File

@@ -1,6 +1,8 @@
package eventDemo.app.event
import eventDemo.app.entity.GameId
import eventDemo.app.event.event.GameEvent
import eventDemo.libs.event.VersionBuilder
/**
* A stream to publish and read the played card event.
@@ -8,20 +10,20 @@ import eventDemo.app.event.event.GameEvent
class GameEventHandler(
private val eventBus: GameEventBus,
private val eventStream: GameEventStream,
) {
private val versionBuilder: VersionBuilder,
) : EventHandler<GameEvent, GameId> {
private val projectionsBuilders: MutableList<(GameEvent) -> Unit> = mutableListOf()
fun registerProjectionBuilder(builder: GameProjectionBuilder) {
override fun registerProjectionBuilder(builder: GameProjectionBuilder) {
projectionsBuilders.add(builder)
}
fun handle(vararg events: GameEvent) {
events.forEach { event ->
override fun handle(buildEvent: (version: Int) -> GameEvent): GameEvent =
buildEvent(versionBuilder.buildNextVersion()).also { event ->
eventStream.publish(event)
projectionsBuilders.forEach { it(event) }
eventBus.publish(event)
}
}
}
typealias GameProjectionBuilder = (GameEvent) -> Unit

View File

@@ -3,15 +3,20 @@ package eventDemo.app.event.event
import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* An [GameEvent] to represent a played card.
*/
data class CardIsPlayedEvent(
override val gameId: GameId,
override val aggregateId: GameId,
val card: Card,
override val player: Player,
override val eventId: UUID = UUID.randomUUID(),
override val version: Int,
) : GameEvent,
PlayerActionEvent
PlayerActionEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -11,5 +11,6 @@ import java.util.UUID
@Serializable
sealed interface GameEvent : Event<GameId> {
override val eventId: UUID
override val gameId: GameId
override val aggregateId: GameId
override val version: Int
}

View File

@@ -4,26 +4,31 @@ import eventDemo.app.entity.Deck
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import eventDemo.app.entity.initHands
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* This [GameEvent] is sent when all players are ready.
*/
data class GameStartedEvent(
override val gameId: GameId,
override val aggregateId: GameId,
val firstPlayer: Player,
val deck: Deck,
override val version: Int,
) : GameEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
companion object {
fun new(
id: GameId,
players: Set<Player>,
shuffleIsDisabled: Boolean = isDisabled,
version: Int,
): GameStartedEvent =
GameStartedEvent(
gameId = id,
aggregateId = id,
firstPlayer = if (shuffleIsDisabled) players.first() else players.random(),
deck =
Deck
@@ -31,6 +36,7 @@ data class GameStartedEvent(
.let { if (shuffleIsDisabled) it else it.shuffle() }
.initHands(players)
.placeFirstCardOnDiscard(),
version = version,
)
}
}

View File

@@ -2,14 +2,18 @@ package eventDemo.app.event.event
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* An [GameEvent] to represent a new player joining the game.
*/
data class NewPlayerEvent(
override val gameId: GameId,
override val aggregateId: GameId,
val player: Player,
override val version: Int,
) : GameEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -2,6 +2,6 @@ package eventDemo.app.event.event
import eventDemo.app.entity.Player
sealed interface PlayerActionEvent {
sealed interface PlayerActionEvent : GameEvent {
val player: Player
}

View File

@@ -3,16 +3,20 @@ package eventDemo.app.event.event
import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* This [GameEvent] is sent when a player chose a color.
*/
data class PlayerChoseColorEvent(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
val color: Card.Color,
override val version: Int,
) : GameEvent,
PlayerActionEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -3,16 +3,20 @@ package eventDemo.app.event.event
import eventDemo.app.entity.Card
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* This [GameEvent] is sent when a player can play.
*/
data class PlayerHavePassEvent(
override val gameId: GameId,
override val aggregateId: GameId,
override val player: Player,
val takenCard: Card,
override val version: Int,
) : GameEvent,
PlayerActionEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -2,14 +2,18 @@ package eventDemo.app.event.event
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* This [GameEvent] is sent when a player is ready.
*/
data class PlayerReadyEvent(
override val gameId: GameId,
override val aggregateId: GameId,
val player: Player,
override val version: Int,
) : GameEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -2,14 +2,18 @@ package eventDemo.app.event.event
import eventDemo.app.entity.GameId
import eventDemo.app.entity.Player
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.util.UUID
/**
* This [GameEvent] is sent when a player is ready.
*/
data class PlayerWinEvent(
override val gameId: GameId,
override val aggregateId: GameId,
val player: Player,
override val version: Int,
) : GameEvent {
override val eventId: UUID = UUID.randomUUID()
override val createdAt: Instant = Clock.System.now()
}

View File

@@ -8,7 +8,8 @@ import kotlinx.serialization.Serializable
@Serializable
data class GameState(
val gameId: GameId,
override val aggregateId: GameId,
override val lastEventVersion: Int = 0,
val players: Set<Player> = emptySet(),
val currentPlayerTurn: Player? = null,
val cardOnCurrentStack: LastCard? = null,
@@ -18,7 +19,7 @@ data class GameState(
val deck: Deck = Deck(players),
val isStarted: Boolean = false,
val playerWins: Set<Player> = emptySet(),
) {
) : Projection<GameId> {
@Serializable
data class LastCard(
val card: Card,

View File

@@ -23,27 +23,28 @@ fun GameId.buildStateFromEventStream(eventStream: GameEventStream): GameState {
}
fun Collection<GameEvent>.buildStateFromEvents(): GameState {
val gameId = this.firstOrNull()?.gameId ?: error("Cannot build GameState from an empty list")
val gameId = this.firstOrNull()?.aggregateId ?: error("Cannot build GameState from an empty list")
return fold(GameState(gameId)) { state, event ->
state.apply(event)
}
}
fun GameState.apply(event: GameEvent): GameState =
let { state ->
fun GameState?.apply(event: GameEvent): GameState =
(this ?: GameState(event.aggregateId)).let { state ->
val logger = KotlinLogging.logger { }
if (event is PlayerActionEvent) {
if (state.currentPlayerTurn != event.player) {
logger.atError {
message = "Inconsistent player turn. CurrentPlayerTurn: $currentPlayerTurn | Player: ${event.player}"
message = "Inconsistent player turn. CurrentPlayerTurn: $state.currentPlayerTurn | Player: ${event.player}"
payload =
mapOf(
"CurrentPlayerTurn" to (currentPlayerTurn ?: "No currentPlayerTurn"),
"CurrentPlayerTurn" to (state.currentPlayerTurn ?: "No currentPlayerTurn"),
"Player" to event.player,
)
}
}
}
when (event) {
is CardIsPlayedEvent -> {
val nextDirectionAfterPlay =
@@ -60,9 +61,9 @@ fun GameState.apply(event: GameEvent): GameState =
val currentPlayerAfterThePlay =
if (event.card is Card.AllColorCard) {
currentPlayerTurn
state.currentPlayerTurn
} else {
nextPlayer(nextDirectionAfterPlay)
state.nextPlayer(nextDirectionAfterPlay)
}
state.copy(
@@ -98,14 +99,14 @@ fun GameState.apply(event: GameEvent): GameState =
logger.error { "taken card is not ot top of the stack: ${event.takenCard}" }
}
state.copy(
currentPlayerTurn = nextPlayerTurn,
currentPlayerTurn = state.nextPlayerTurn,
deck = state.deck.takeOneCardFromStackTo(event.player),
)
}
is PlayerChoseColorEvent -> {
state.copy(
currentPlayerTurn = nextPlayerTurn,
currentPlayerTurn = state.nextPlayerTurn,
colorOnCurrentStack = event.color,
)
}
@@ -121,9 +122,11 @@ fun GameState.apply(event: GameEvent): GameState =
}
is PlayerWinEvent -> {
copy(
playerWins = playerWins + event.player,
state.copy(
playerWins = state.playerWins + event.player,
)
}
}
}.copy(
lastEventVersion = event.version,
)
}

View File

@@ -4,73 +4,32 @@ import eventDemo.app.entity.GameId
import eventDemo.app.event.GameEventHandler
import eventDemo.app.event.GameEventStream
import eventDemo.app.event.event.GameEvent
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
class GameStateRepository(
private val eventStream: GameEventStream,
eventHandler: GameEventHandler,
private val maxSnapshotCacheSize: Int = 20,
maxSnapshotCacheSize: Int = 20,
) {
private val projections: ConcurrentHashMap<GameId, GameState> = ConcurrentHashMap()
private val version: AtomicInteger = AtomicInteger(0)
private val projectionsSnapshot: ConcurrentHashMap<GameEvent, GameState> = ConcurrentHashMap()
private val sortedSnapshotByVersion: ConcurrentHashMap<GameEvent, Int> = ConcurrentHashMap()
private val projectionsSnapshot =
ProjectionSnapshotRepositoryInMemory(
applyToProjection = GameState?::apply,
maxSnapshotCacheSize = maxSnapshotCacheSize,
)
init {
eventHandler.registerProjectionBuilder { event ->
val projection = projections[event.gameId]
if (projection == null) {
event
.buildStateFromEventStreamTo(eventStream)
.update()
} else {
projection
.apply(event)
.also { projections[it.gameId] = it }
.also { state ->
val newVersion = version.addAndGet(1)
saveSnapshot(event, state, newVersion)
removeOldSnapshot()
}
}
projectionsSnapshot.applyAndPutToCache(event)
}
}
private fun removeOldSnapshot() {
if (projectionsSnapshot.size > maxSnapshotCacheSize) {
val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize
sortedSnapshotByVersion
.toList()
.sortedBy { it.second }
.take(numberToRemove)
.toMap()
.keys
.forEach { event ->
sortedSnapshotByVersion.remove(event)
projectionsSnapshot.remove(event)
}
}
}
private fun saveSnapshot(
event: GameEvent,
state: GameState,
newVersion: Int,
) {
projectionsSnapshot[event] = state
sortedSnapshotByVersion[event] = newVersion
}
/**
* Get the last version of the [GameState] from the all eventStream.
*
* It fetches it from the local cache if possible, otherwise it builds it.
*/
fun get(gameId: GameId): GameState =
projections.computeIfAbsent(gameId) {
gameId.buildStateFromEventStream(eventStream)
}
fun getLast(gameId: GameId): GameState =
projectionsSnapshot.getLast(gameId)
?: gameId.buildStateFromEventStream(eventStream)
/**
* Get the [GameState] to the specific [event][GameEvent].
@@ -79,17 +38,7 @@ class GameStateRepository(
* It fetches it from the local cache if possible, otherwise it builds it.
*/
fun getUntil(event: GameEvent): GameState =
projectionsSnapshot.computeIfAbsent(event) {
event.buildStateFromEventStreamTo(eventStream)
}
private fun GameState.update() {
projections[gameId] = this
}
/**
* Build the state to the specific event
*/
private fun GameEvent.buildStateFromEventStreamTo(eventStream: GameEventStream): GameState =
run { eventStream.readAll(gameId).takeWhile { it != this } + this }.buildStateFromEvents()
projectionsSnapshot.getUntil(event)
?: (eventStream.readAll(event.aggregateId).takeWhile { it != event } + event)
.buildStateFromEvents()
}

View File

@@ -0,0 +1,8 @@
package eventDemo.app.event.projection
import eventDemo.libs.event.AggregateId
interface Projection<ID : AggregateId> {
val aggregateId: ID
val lastEventVersion: Int
}

View File

@@ -0,0 +1,54 @@
package eventDemo.app.event.projection
import eventDemo.libs.event.AggregateId
import eventDemo.libs.event.Event
import java.util.concurrent.ConcurrentHashMap
class ProjectionSnapshotRepositoryInMemory<E : Event<ID>, P : Projection<ID>, ID : AggregateId>(
private val maxSnapshotCacheSize: Int = 20,
private val applyToProjection: P?.(event: E) -> P,
) {
private val projectionsSnapshot: ConcurrentHashMap<E, P> = ConcurrentHashMap()
fun applyAndPutToCache(event: E): P {
// lock here
return projectionsSnapshot
.filterKeys { it.aggregateId == event.aggregateId }
.toList()
.find { (e, _) -> e.version == (event.version - 1) }
?.second
.applyToProjection(event)
.also { projectionsSnapshot.put(event, it) }
.also { removeOldSnapshot() }
// Unlock here
}
private fun removeOldSnapshot() {
if (projectionsSnapshot.size > maxSnapshotCacheSize) {
val numberToRemove = projectionsSnapshot.size - maxSnapshotCacheSize
projectionsSnapshot
.keys
.sortedBy { it.version }
.take(numberToRemove)
.forEach { event ->
projectionsSnapshot.remove(event)
}
}
}
/**
* Get the last version of the [Projection] from the cache.
*/
fun getLast(aggregateId: ID): P? =
projectionsSnapshot
.filter { it.key.aggregateId == aggregateId }
.maxByOrNull { (event, _) -> event.version }
?.value
/**
* Get the [Projection] to the specific [event][Event].
* It does not contain the [events][Event] it after this one.
*/
fun getUntil(event: E): P? = projectionsSnapshot.get(event)
}

View File

@@ -36,10 +36,13 @@ class ReactionEventListener(
) {
if (state.isReady && !state.isStarted) {
val reactionEvent =
GameStartedEvent.new(
state.gameId,
state.players,
)
eventHandler.handle {
GameStartedEvent.new(
id = state.aggregateId,
players = state.players,
version = it,
)
}
logger.atInfo {
message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload =
@@ -48,7 +51,6 @@ class ReactionEventListener(
"reactionEvent" to reactionEvent,
)
}
eventHandler.handle(reactionEvent)
} else {
if (event is PlayerReadyEvent) {
logger.info { "All players was not ready ${state.readyPlayers}" }
@@ -63,10 +65,14 @@ class ReactionEventListener(
val winner = state.playerHasNoCardLeft().firstOrNull()
if (winner != null) {
val reactionEvent =
PlayerWinEvent(
state.gameId,
winner,
)
eventHandler.handle {
PlayerWinEvent(
aggregateId = state.aggregateId,
player = winner,
version = it,
)
}
logger.atInfo {
message = "Reaction event was Send $reactionEvent on reaction of: $event"
payload =
@@ -75,7 +81,6 @@ class ReactionEventListener(
"reactionEvent" to reactionEvent,
)
}
eventHandler.handle(reactionEvent)
}
}
}

View File

@@ -39,7 +39,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game.
get<Game.Card> { body ->
gameStateRepository
.get(body.game.id)
.getLast(body.game.id)
.cardOnCurrentStack
?.card
?.let { call.respond(it) }
@@ -48,7 +48,7 @@ fun Route.readTheGameState(gameStateRepository: GameStateRepository) {
// Read the last played card on the game.
get<Game.State> { body ->
val state = gameStateRepository.get(body.game.id)
val state = gameStateRepository.getLast(body.game.id)
call.respond(state)
}
}

View File

@@ -11,9 +11,12 @@ import eventDemo.app.eventListener.PlayerNotificationEventListener
import eventDemo.libs.command.CommandStreamChannelBuilder
import eventDemo.libs.event.EventBusInMemory
import eventDemo.libs.event.EventStreamInMemory
import eventDemo.libs.event.VersionBuilder
import eventDemo.libs.event.VersionBuilderLocal
import io.ktor.server.application.Application
import io.ktor.server.application.install
import org.koin.core.module.dsl.singleOf
import org.koin.dsl.bind
import org.koin.dsl.module
import org.koin.ktor.plugin.Koin
import org.koin.logger.slf4jLogger
@@ -40,6 +43,7 @@ val appKoinModule =
CommandStreamChannelBuilder<GameCommand>()
}
singleOf(::VersionBuilderLocal) bind VersionBuilder::class
singleOf(::GameEventHandler)
singleOf(::GameCommandRunner)
singleOf(::GameCommandHandler)

View File

@@ -1,5 +1,6 @@
package eventDemo.libs.event
import kotlinx.datetime.Instant
import java.util.UUID
/**
@@ -16,5 +17,7 @@ interface AggregateId {
*/
interface Event<ID : AggregateId> {
val eventId: UUID
val gameId: ID
val aggregateId: ID
val createdAt: Instant
val version: Int
}

View File

@@ -34,11 +34,11 @@ class EventStreamInMemory<E : Event<ID>, ID : AggregateId> : EventStream<E, ID>
): R? =
events
.filterIsInstance(eventType.java)
.lastOrNull { it.gameId == aggregateId }
.lastOrNull { it.aggregateId == aggregateId }
override fun readAll(aggregateId: ID): Set<E> =
events
.filter { it.gameId == aggregateId }
.filter { it.aggregateId == aggregateId }
.toSet()
}

View File

@@ -0,0 +1,7 @@
package eventDemo.libs.event
interface VersionBuilder {
fun buildNextVersion(): Int
fun getLastVersion(): Int
}

View File

@@ -0,0 +1,11 @@
package eventDemo.libs.event
import java.util.concurrent.atomic.AtomicInteger
class VersionBuilderLocal : VersionBuilder {
private val version: AtomicInteger = AtomicInteger(0)
override fun buildNextVersion(): Int = version.addAndGet(1)
override fun getLastVersion(): Int = version.toInt()
}